getting stats about individual chunks

master
Jordan Orelli 11 years ago
parent 1cdb5667e5
commit 8d22dc5310

@ -7,6 +7,7 @@ import (
"net" "net"
"os" "os"
"runtime/pprof" "runtime/pprof"
"time"
) )
var chunk_size = 100 var chunk_size = 100
@ -79,9 +80,12 @@ func main() {
defer infile.Close() defer infile.Close()
} }
type chunk []value type chunk struct {
vals []value
t time.Time
}
c := make(chan maybe) c := make(chan maybe)
sent := make(chan chunk, 200) sent := make(chan *chunk, 200)
go streamValues(infile, c) go streamValues(infile, c)
go func() { go func() {
w := bufio.NewWriterSize(conn, 16384) w := bufio.NewWriterSize(conn, 16384)
@ -89,7 +93,7 @@ func main() {
close(sent) close(sent)
fmt.Println("All data transferred. Waiting for the last reply...") fmt.Println("All data transferred. Waiting for the last reply...")
}() }()
requests := make(chunk, 0, chunk_size) requests := &chunk{vals: make([]value, 0, chunk_size)}
for m := range c { for m := range c {
if !m.ok() { if !m.ok() {
fmt.Fprintf(os.Stderr, "InputError: %v\n", m.err()) fmt.Fprintf(os.Stderr, "InputError: %v\n", m.err())
@ -99,16 +103,17 @@ func main() {
fmt.Fprintf(os.Stderr, "WriteError: %v\n", err) fmt.Fprintf(os.Stderr, "WriteError: %v\n", err)
} }
requests = append(requests, m.val()) requests.vals = append(requests.vals, m.val())
if len(requests) == cap(requests) { if len(requests.vals) == cap(requests.vals) {
requests.t = time.Now()
if err := w.Flush(); err != nil { if err := w.Flush(); err != nil {
fmt.Fprintf(os.Stderr, "FlushError: %v\n", err) fmt.Fprintf(os.Stderr, "FlushError: %v\n", err)
} }
sent <- requests sent <- requests
requests = make(chunk, 0, chunk_size) requests = &chunk{vals: make([]value, 0, chunk_size)}
} }
} }
if len(requests) > 0 { if len(requests.vals) > 0 {
if err := w.Flush(); err != nil { if err := w.Flush(); err != nil {
fmt.Fprintf(os.Stderr, "FlushError: %v\n", err) fmt.Fprintf(os.Stderr, "FlushError: %v\n", err)
} }
@ -119,8 +124,9 @@ func main() {
replies, errors := 0, 0 replies, errors := 0, 0
responses := make(chan maybe) responses := make(chan maybe)
go streamValues(conn, responses) go streamValues(conn, responses)
id := 1
for requests := range sent { for requests := range sent {
for _, request := range requests { for _, request := range requests.vals {
response := <-responses response := <-responses
if response.ok() { if response.ok() {
switch r := response.val().(type) { switch r := response.val().(type) {
@ -141,6 +147,10 @@ func main() {
fmt.Fprintf(os.Stderr, "ResponseError: %v\n", response.err()) fmt.Fprintf(os.Stderr, "ResponseError: %v\n", response.err())
} }
} }
elapsed := time.Since(requests.t)
fmt.Fprintf(os.Stdout, "%d %d %v %v\n", id, len(requests.vals), elapsed,
time.Duration(int64(elapsed)/int64(len(requests.vals))))
id++
} }
fmt.Println("Last reply received from server.") fmt.Println("Last reply received from server.")
fmt.Printf("errors: %d, replies: %d\n", errors, replies) fmt.Printf("errors: %d, replies: %d\n", errors, replies)

Loading…
Cancel
Save