From 8d22dc531078d942eb1df6b72c3255edac239bd3 Mon Sep 17 00:00:00 2001 From: Jordan Orelli Date: Fri, 8 Aug 2014 19:13:36 +0000 Subject: [PATCH] getting stats about individual chunks --- rsload.go | 26 ++++++++++++++++++-------- 1 file changed, 18 insertions(+), 8 deletions(-) diff --git a/rsload.go b/rsload.go index 305866f..9a04fdb 100644 --- a/rsload.go +++ b/rsload.go @@ -7,6 +7,7 @@ import ( "net" "os" "runtime/pprof" + "time" ) var chunk_size = 100 @@ -79,9 +80,12 @@ func main() { defer infile.Close() } - type chunk []value + type chunk struct { + vals []value + t time.Time + } c := make(chan maybe) - sent := make(chan chunk, 200) + sent := make(chan *chunk, 200) go streamValues(infile, c) go func() { w := bufio.NewWriterSize(conn, 16384) @@ -89,7 +93,7 @@ func main() { close(sent) fmt.Println("All data transferred. Waiting for the last reply...") }() - requests := make(chunk, 0, chunk_size) + requests := &chunk{vals: make([]value, 0, chunk_size)} for m := range c { if !m.ok() { fmt.Fprintf(os.Stderr, "InputError: %v\n", m.err()) @@ -99,16 +103,17 @@ func main() { fmt.Fprintf(os.Stderr, "WriteError: %v\n", err) } - requests = append(requests, m.val()) - if len(requests) == cap(requests) { + requests.vals = append(requests.vals, m.val()) + if len(requests.vals) == cap(requests.vals) { + requests.t = time.Now() if err := w.Flush(); err != nil { fmt.Fprintf(os.Stderr, "FlushError: %v\n", err) } sent <- requests - requests = make(chunk, 0, chunk_size) + requests = &chunk{vals: make([]value, 0, chunk_size)} } } - if len(requests) > 0 { + if len(requests.vals) > 0 { if err := w.Flush(); err != nil { fmt.Fprintf(os.Stderr, "FlushError: %v\n", err) } @@ -119,8 +124,9 @@ func main() { replies, errors := 0, 0 responses := make(chan maybe) go streamValues(conn, responses) + id := 1 for requests := range sent { - for _, request := range requests { + for _, request := range requests.vals { response := <-responses if response.ok() { switch r := response.val().(type) { @@ -141,6 +147,10 @@ func main() { fmt.Fprintf(os.Stderr, "ResponseError: %v\n", response.err()) } } + elapsed := time.Since(requests.t) + fmt.Fprintf(os.Stdout, "%d %d %v %v\n", id, len(requests.vals), elapsed, + time.Duration(int64(elapsed)/int64(len(requests.vals)))) + id++ } fmt.Println("Last reply received from server.") fmt.Printf("errors: %d, replies: %d\n", errors, replies)