From 5f6f811f46a1738e5b636494ad4e15b5e6b9d759 Mon Sep 17 00:00:00 2001 From: Jordan Orelli Date: Tue, 12 Aug 2014 18:42:06 +0000 Subject: [PATCH] cleanup --- rsload.go | 160 +++++++++++++++++++++++++++++++++++++----------------- 1 file changed, 110 insertions(+), 50 deletions(-) diff --git a/rsload.go b/rsload.go index 8cf3924..028d5d3 100644 --- a/rsload.go +++ b/rsload.go @@ -36,57 +36,114 @@ type chunk struct { t time.Time } -func (c *chunk) send(w *bufio.Writer, responses chan maybe) (int, int) { - start := time.Now() - for _, v := range c.vals { - v.Write(w) +type sendResult struct { + read int // number of statement sread on the incoming statement stream + errors int // number of error responses seen on the redis response stream + replies int // number of reply responses seen on the redis response stream + byteSize int // number of bytes sent + elapsed time.Duration // time taken to send requests and receive responses from redis + paused time.Duration // sleep time inserted + + startTime time.Time +} + +func (s *sendResult) start() { + s.startTime = time.Now() +} + +func (s *sendResult) stop() { + s.elapsed = time.Since(s.startTime) +} + +func (s *sendResult) avg() time.Duration { + return time.Duration(int64(s.elapsed) / int64(s.read)) +} + +func (s *sendResult) log() { + if !options.chunkInfo { + return } - size := w.Buffered() - w.Flush() - errors, replies := 0, 0 - for _, request := range c.vals { - response, ok := <-responses - if !ok { - fmt.Fprintf(os.Stderr, "ohhhhhhhhhhhhh fuck\n") - return -1, -1 + fmt.Printf("errors: %d replies: %d total: %d sent: %d elapsed: %v avg: %v size: %v\n", s.errors, s.replies, s.errors+s.replies, s.read, s.elapsed, s.avg(), s.byteSize) +} + +func (s *sendResult) nextSize() int { + target := int(int64(chunk_target) / int64(s.avg())) + return min(target, chunk_max, 2*s.read) +} + +func min(vals ...int) int { + var best int + switch len(vals) { + case 0: + return 0 + case 1: + return vals[0] + default: + best = vals[0] + } + for _, v := range vals[1:] { + if v < best { + best = v } - if response.ok() { - switch r := response.val().(type) { - case ErrorVal: - if options.verbose { - fmt.Fprintf(os.Stderr, "%q -> %q\n", request, response.val()) - } else { - fmt.Fprintln(os.Stderr, r) - } - errors++ - default: - if options.verbose { - fmt.Fprintf(os.Stdout, "%q -> %q\n", request, response.val()) - } - replies++ + } + return best +} + +func (s *sendResult) accumulate(request value, response maybe) { + s.read++ + if response.ok() { + switch r := response.val().(type) { + case ErrorVal: + // these errors are errors reported from redis. e.g., if you + // try to do something like delete a key that doesn't exist or + // send an invalid command to redis + if options.verbose { + fmt.Fprintf(os.Stderr, "%q -> %q\n", request, response.val()) + } else { + fmt.Fprintln(os.Stderr, r) } - } else { - fmt.Fprintf(os.Stderr, "ResponseError: %v\n", response.err()) + s.errors++ + default: + if options.verbose { + fmt.Fprintf(os.Stdout, "%q -> %q\n", request, response.val()) + } + s.replies++ } + } else { + // we get here when we encounter an error in the response stream. + // That is, the response stream contains bytes that are not valid + // in the redis protocol. + fmt.Fprintf(os.Stderr, "ResponseError: %v\n", response.err()) } - elapsed := time.Since(start) - sleep := elapsed / 4 - time.Sleep(sleep) - avg := time.Duration(int64(elapsed) / int64(len(c.vals))) - next_size := int(int64(chunk_target) / int64(avg)) - if options.chunkInfo { - fmt.Printf("id: %d errors: %d replies: %d total: %d sent: %d elapsed: %v avg: %v size: %v sleep: %v next_size: %v\n", - c.id, errors, replies, errors+replies, len(c.vals), elapsed, avg, size, sleep, next_size) +} + +func (c *chunk) send(w *bufio.Writer, responses chan maybe) *sendResult { + stats := new(sendResult) + stats.start() + defer stats.stop() + + for _, v := range c.vals { + n, err := v.Write(w) + if err != nil { + fmt.Fprintf(os.Stderr, "error writing a statement: %v\n", err) + } + stats.byteSize += n } - if next_size < chunk_size*2 { - chunk_size = next_size - } else { - chunk_size *= 2 + if err := w.Flush(); err != nil { + fmt.Fprintf(os.Stderr, "error flushing statement buffer: %v\n", err) } - if chunk_size > chunk_max { - chunk_size = chunk_max + + for _, request := range c.vals { + response, ok := <-responses + if !ok { + // we get here when the response channel closes too early + fmt.Fprintf(os.Stderr, "error reading redis response stream: response chan closed early") + return stats + } + stats.accumulate(request, response) } - return errors, replies + + return stats } func main() { @@ -160,17 +217,20 @@ func main() { } requests.vals = append(requests.vals, m.val()) if len(requests.vals) == cap(requests.vals) { - nErrors, nReplies := requests.send(w, responses) - errors += nErrors - replies += nReplies + stats := requests.send(w, responses) + stats.log() + errors += stats.errors + replies += stats.replies + time.Sleep(stats.elapsed / 4) id++ - requests = &chunk{id: id, vals: make([]value, 0, chunk_size)} + requests = &chunk{id: id, vals: make([]value, 0, stats.nextSize())} } } if len(requests.vals) > 0 { - nErrors, nReplies := requests.send(w, responses) - errors += nErrors - replies += nReplies + stats := requests.send(w, responses) + stats.log() + errors += stats.errors + replies += stats.replies } fmt.Println("Last reply received from server.")