master
Jordan Orelli 11 years ago
parent aa088a3fd2
commit 5f6f811f46

@ -36,57 +36,114 @@ type chunk struct {
t time.Time t time.Time
} }
func (c *chunk) send(w *bufio.Writer, responses chan maybe) (int, int) { type sendResult struct {
start := time.Now() read int // number of statement sread on the incoming statement stream
for _, v := range c.vals { errors int // number of error responses seen on the redis response stream
v.Write(w) replies int // number of reply responses seen on the redis response stream
byteSize int // number of bytes sent
elapsed time.Duration // time taken to send requests and receive responses from redis
paused time.Duration // sleep time inserted
startTime time.Time
}
func (s *sendResult) start() {
s.startTime = time.Now()
}
func (s *sendResult) stop() {
s.elapsed = time.Since(s.startTime)
}
func (s *sendResult) avg() time.Duration {
return time.Duration(int64(s.elapsed) / int64(s.read))
}
func (s *sendResult) log() {
if !options.chunkInfo {
return
} }
size := w.Buffered() fmt.Printf("errors: %d replies: %d total: %d sent: %d elapsed: %v avg: %v size: %v\n", s.errors, s.replies, s.errors+s.replies, s.read, s.elapsed, s.avg(), s.byteSize)
w.Flush() }
errors, replies := 0, 0
for _, request := range c.vals { func (s *sendResult) nextSize() int {
response, ok := <-responses target := int(int64(chunk_target) / int64(s.avg()))
if !ok { return min(target, chunk_max, 2*s.read)
fmt.Fprintf(os.Stderr, "ohhhhhhhhhhhhh fuck\n") }
return -1, -1
func min(vals ...int) int {
var best int
switch len(vals) {
case 0:
return 0
case 1:
return vals[0]
default:
best = vals[0]
}
for _, v := range vals[1:] {
if v < best {
best = v
} }
if response.ok() { }
switch r := response.val().(type) { return best
case ErrorVal: }
if options.verbose {
fmt.Fprintf(os.Stderr, "%q -> %q\n", request, response.val()) func (s *sendResult) accumulate(request value, response maybe) {
} else { s.read++
fmt.Fprintln(os.Stderr, r) if response.ok() {
} switch r := response.val().(type) {
errors++ case ErrorVal:
default: // these errors are errors reported from redis. e.g., if you
if options.verbose { // try to do something like delete a key that doesn't exist or
fmt.Fprintf(os.Stdout, "%q -> %q\n", request, response.val()) // send an invalid command to redis
} if options.verbose {
replies++ fmt.Fprintf(os.Stderr, "%q -> %q\n", request, response.val())
} else {
fmt.Fprintln(os.Stderr, r)
} }
} else { s.errors++
fmt.Fprintf(os.Stderr, "ResponseError: %v\n", response.err()) default:
if options.verbose {
fmt.Fprintf(os.Stdout, "%q -> %q\n", request, response.val())
}
s.replies++
} }
} else {
// we get here when we encounter an error in the response stream.
// That is, the response stream contains bytes that are not valid
// in the redis protocol.
fmt.Fprintf(os.Stderr, "ResponseError: %v\n", response.err())
} }
elapsed := time.Since(start) }
sleep := elapsed / 4
time.Sleep(sleep) func (c *chunk) send(w *bufio.Writer, responses chan maybe) *sendResult {
avg := time.Duration(int64(elapsed) / int64(len(c.vals))) stats := new(sendResult)
next_size := int(int64(chunk_target) / int64(avg)) stats.start()
if options.chunkInfo { defer stats.stop()
fmt.Printf("id: %d errors: %d replies: %d total: %d sent: %d elapsed: %v avg: %v size: %v sleep: %v next_size: %v\n",
c.id, errors, replies, errors+replies, len(c.vals), elapsed, avg, size, sleep, next_size) for _, v := range c.vals {
n, err := v.Write(w)
if err != nil {
fmt.Fprintf(os.Stderr, "error writing a statement: %v\n", err)
}
stats.byteSize += n
} }
if next_size < chunk_size*2 { if err := w.Flush(); err != nil {
chunk_size = next_size fmt.Fprintf(os.Stderr, "error flushing statement buffer: %v\n", err)
} else {
chunk_size *= 2
} }
if chunk_size > chunk_max {
chunk_size = chunk_max for _, request := range c.vals {
response, ok := <-responses
if !ok {
// we get here when the response channel closes too early
fmt.Fprintf(os.Stderr, "error reading redis response stream: response chan closed early")
return stats
}
stats.accumulate(request, response)
} }
return errors, replies
return stats
} }
func main() { func main() {
@ -160,17 +217,20 @@ func main() {
} }
requests.vals = append(requests.vals, m.val()) requests.vals = append(requests.vals, m.val())
if len(requests.vals) == cap(requests.vals) { if len(requests.vals) == cap(requests.vals) {
nErrors, nReplies := requests.send(w, responses) stats := requests.send(w, responses)
errors += nErrors stats.log()
replies += nReplies errors += stats.errors
replies += stats.replies
time.Sleep(stats.elapsed / 4)
id++ id++
requests = &chunk{id: id, vals: make([]value, 0, chunk_size)} requests = &chunk{id: id, vals: make([]value, 0, stats.nextSize())}
} }
} }
if len(requests.vals) > 0 { if len(requests.vals) > 0 {
nErrors, nReplies := requests.send(w, responses) stats := requests.send(w, responses)
errors += nErrors stats.log()
replies += nReplies errors += stats.errors
replies += stats.replies
} }
fmt.Println("Last reply received from server.") fmt.Println("Last reply received from server.")

Loading…
Cancel
Save