diff --git a/chunk.go b/chunk.go new file mode 100644 index 0000000..4d30837 --- /dev/null +++ b/chunk.go @@ -0,0 +1,117 @@ +package main + +import ( + "bufio" + "fmt" + "os" + "time" +) + +// type chunk represents a grouping of redis statements to be sent to a redis +// server. +type chunk struct { + id int + vals []value + t time.Time +} + +// write out a chunk's statements onto a bufio.Writer. After sending all +// statements onto the bufio.Writer, reads from the provided channel of +// responses N times, where N is the number of statements originally sent. +func (c *chunk) send(w *bufio.Writer, responses chan maybe) *sendResult { + stats := new(sendResult) + stats.start() + defer stats.stop() + + for _, v := range c.vals { + n, err := v.Write(w) + if err != nil { + fmt.Fprintf(os.Stderr, "error writing a statement: %v\n", err) + } + stats.byteSize += n + } + if err := w.Flush(); err != nil { + fmt.Fprintf(os.Stderr, "error flushing statement buffer: %v\n", err) + } + + for _, request := range c.vals { + response, ok := <-responses + if !ok { + // we get here when the response channel closes too early + fmt.Fprintf(os.Stderr, "error reading redis response stream: response chan closed early") + return stats + } + stats.accumulate(request, response) + } + + return stats +} + +// ----------------------------------------------------------------------------- + +type sendResult struct { + read int // number of statement sread on the incoming statement stream + errors int // number of error responses seen on the redis response stream + replies int // number of reply responses seen on the redis response stream + byteSize int // number of bytes sent + elapsed time.Duration // time taken to send requests and receive responses from redis + paused time.Duration // sleep time inserted + + startTime time.Time +} + +func (s *sendResult) start() { + s.startTime = time.Now() +} + +func (s *sendResult) stop() { + s.elapsed = time.Since(s.startTime) +} + +func (s *sendResult) avg() time.Duration { + if s.read == 0 { + fmt.Fprintln(os.Stderr, "for some reason, we tried to divide by zero on sendResult.avg. we recovered, though.") + return 100 * time.Microsecond + } + return time.Duration(int64(s.elapsed) / int64(s.read)) +} + +func (s *sendResult) log() { + if !options.chunkInfo { + return + } + fmt.Printf("errors: %d replies: %d total: %d sent: %d elapsed: %v avg: %v size: %v\n", s.errors, s.replies, s.errors+s.replies, s.read, s.elapsed, s.avg(), s.byteSize) +} + +func (s *sendResult) nextSize() int { + target := int(int64(chunk_target) / int64(s.avg())) + return min(target, chunk_max, 2*s.read) +} + +func (s *sendResult) accumulate(request value, response maybe) { + s.read++ + if response.ok() { + switch r := response.val().(type) { + case ErrorVal: + // these errors are errors reported from redis. e.g., if you + // try to do something like delete a key that doesn't exist or + // send an invalid command to redis + if options.verbose { + fmt.Fprintf(os.Stderr, "%q -> %q\n", request, response.val()) + } else { + fmt.Fprintf(os.Stderr, "%q\n", r) + } + s.errors++ + default: + if options.verbose { + fmt.Fprintf(os.Stdout, "%q -> %q\n", request, response.val()) + } + s.replies++ + } + } else { + // we get here when we encounter an error in the response stream. + // That is, the response stream contains bytes that are not valid + // in the redis protocol. + fmt.Fprintf(os.Stderr, "ResponseError: %v\n", response.err()) + } +} diff --git a/misc.go b/misc.go new file mode 100644 index 0000000..0e840f9 --- /dev/null +++ b/misc.go @@ -0,0 +1,19 @@ +package main + +func min(vals ...int) int { + var best int + switch len(vals) { + case 0: + return 0 + case 1: + return vals[0] + default: + best = vals[0] + } + for _, v := range vals[1:] { + if v < best { + best = v + } + } + return best +} diff --git a/rsload.go b/rsload.go index 0efed32..5807a38 100644 --- a/rsload.go +++ b/rsload.go @@ -15,13 +15,13 @@ var chunk_target = 250 * time.Millisecond var chunk_max = 10000 var options struct { - host string - port int - password string - pipe bool - profile string - verbose bool - chunkInfo bool + host string // hostname or ip address of the redis server to connect to + port int // redis port to connect to + password string // redis password used with redis auth + pipe bool // whether or not to take input from stdin + profile string // path to which a cpu profile will be written, used for debug purposes + verbose bool // whether or not to echo out request/response pairs + chunkInfo bool // whether or not to write out stats about each chunk sent } func usage(status int) { @@ -29,126 +29,6 @@ func usage(status int) { os.Exit(status) } -type chunk struct { - id int - vals []value - t time.Time -} - -type sendResult struct { - read int // number of statement sread on the incoming statement stream - errors int // number of error responses seen on the redis response stream - replies int // number of reply responses seen on the redis response stream - byteSize int // number of bytes sent - elapsed time.Duration // time taken to send requests and receive responses from redis - paused time.Duration // sleep time inserted - - startTime time.Time -} - -func (s *sendResult) start() { - s.startTime = time.Now() -} - -func (s *sendResult) stop() { - s.elapsed = time.Since(s.startTime) -} - -func (s *sendResult) avg() time.Duration { - if s.read == 0 { - fmt.Fprintln(os.Stderr, "for some reason, we tried to divide by zero on sendResult.avg. we recovered, though.") - return 100 * time.Microsecond - } - return time.Duration(int64(s.elapsed) / int64(s.read)) -} - -func (s *sendResult) log() { - if !options.chunkInfo { - return - } - fmt.Printf("errors: %d replies: %d total: %d sent: %d elapsed: %v avg: %v size: %v\n", s.errors, s.replies, s.errors+s.replies, s.read, s.elapsed, s.avg(), s.byteSize) -} - -func (s *sendResult) nextSize() int { - target := int(int64(chunk_target) / int64(s.avg())) - return min(target, chunk_max, 2*s.read) -} - -func min(vals ...int) int { - var best int - switch len(vals) { - case 0: - return 0 - case 1: - return vals[0] - default: - best = vals[0] - } - for _, v := range vals[1:] { - if v < best { - best = v - } - } - return best -} - -func (s *sendResult) accumulate(request value, response maybe) { - s.read++ - if response.ok() { - switch r := response.val().(type) { - case ErrorVal: - // these errors are errors reported from redis. e.g., if you - // try to do something like delete a key that doesn't exist or - // send an invalid command to redis - if options.verbose { - fmt.Fprintf(os.Stderr, "%q -> %q\n", request, response.val()) - } else { - fmt.Fprintf(os.Stderr, "%q\n", r) - } - s.errors++ - default: - if options.verbose { - fmt.Fprintf(os.Stdout, "%q -> %q\n", request, response.val()) - } - s.replies++ - } - } else { - // we get here when we encounter an error in the response stream. - // That is, the response stream contains bytes that are not valid - // in the redis protocol. - fmt.Fprintf(os.Stderr, "ResponseError: %v\n", response.err()) - } -} - -func (c *chunk) send(w *bufio.Writer, responses chan maybe) *sendResult { - stats := new(sendResult) - stats.start() - defer stats.stop() - - for _, v := range c.vals { - n, err := v.Write(w) - if err != nil { - fmt.Fprintf(os.Stderr, "error writing a statement: %v\n", err) - } - stats.byteSize += n - } - if err := w.Flush(); err != nil { - fmt.Fprintf(os.Stderr, "error flushing statement buffer: %v\n", err) - } - - for _, request := range c.vals { - response, ok := <-responses - if !ok { - // we get here when the response channel closes too early - fmt.Fprintf(os.Stderr, "error reading redis response stream: response chan closed early") - return stats - } - stats.accumulate(request, response) - } - - return stats -} - // sets up our input file for reading. If the --pipe option was specified, the // input file is stdin. Otherwise, the input file is the first argument on the // command line. @@ -171,28 +51,38 @@ func infile() *os.File { return f } -func main() { - flag.Parse() - +func connect() net.Conn { conn, err := net.Dial("tcp", fmt.Sprintf("%s:%d", options.host, options.port)) if err != nil { - fmt.Printf("unable to connect to redis: %v\n", err) + fmt.Fprintf(os.Stderr, "unable to connect to redis: %v\n", err) os.Exit(1) } - defer conn.Close() + auth(conn) + return conn +} - if options.password != "" { - fmt.Fprintf(conn, "*2\r\n$4\r\nauth\r\n$%d\r\n%s\r\n", len(options.password), options.password) - v, err := readValue(conn) - if err != nil { - fmt.Printf("unable to auth: %v\n", err) - os.Exit(1) - } - if !isOK(v) { - fmt.Printf("auth not OK: %q\n", v) - os.Exit(1) - } +func auth(c net.Conn) { + if options.password == "" { + return + } + + fmt.Fprintf(c, "*2\r\n$4\r\nauth\r\n$%d\r\n%s\r\n", len(options.password), options.password) + v, err := readValue(c) + if err != nil { + fmt.Printf("unable to auth: %v\n", err) + os.Exit(1) } + if !isOK(v) { + fmt.Printf("auth not OK: %q\n", v) + os.Exit(1) + } +} + +func main() { + flag.Parse() + + conn := connect() + defer conn.Close() if options.profile != "" { f, err := os.Create(options.profile)