From 0c5e4ec18c7fa40a6ae95f48cc103087a05fae4e Mon Sep 17 00:00:00 2001 From: Jordan Orelli Date: Fri, 8 Aug 2014 19:56:53 +0000 Subject: [PATCH] block on chunks --- rsload.go | 199 ++++++++++++++++++++++++++++++++++++------------------ 1 file changed, 132 insertions(+), 67 deletions(-) diff --git a/rsload.go b/rsload.go index 9a04fdb..ce8d3c8 100644 --- a/rsload.go +++ b/rsload.go @@ -27,6 +27,55 @@ func usage(status int) { os.Exit(status) } +type chunk struct { + id int + vals []value + t time.Time +} + +func (c *chunk) send(w *bufio.Writer, responses chan maybe) { + for _, v := range c.vals { + v.Write(w) + } + start := time.Now() + size := w.Buffered() + w.Flush() + errors, replies := 0, 0 + for _, request := range c.vals { + response, ok := <-responses + if !ok { + fmt.Fprintf(os.Stderr, "ohhhhhhhhhhhhh fuck\n") + return + } + if response.ok() { + switch r := response.val().(type) { + case ErrorVal: + if options.verbose { + fmt.Fprintf(os.Stderr, "%q -> %q\n", request, response.val()) + } else { + fmt.Fprintln(os.Stderr, r) + } + errors++ + default: + if options.verbose { + fmt.Fprintf(os.Stdout, "%q -> %q\n", request, response.val()) + } + replies++ + } + } else { + fmt.Fprintf(os.Stderr, "ResponseError: %v\n", response.err()) + } + } + elapsed := time.Since(start) + sleep := elapsed / 4 + time.Sleep(sleep) + if true { + avg := time.Duration(int64(elapsed) / int64(len(c.vals))) + fmt.Printf("id: %d errors: %d replies: %d total: %d sent: %d elapsed: %v avg: %v size: %v sleep: %v\n", + c.id, errors, replies, errors+replies, len(c.vals), elapsed, avg, size, sleep) + } +} + func main() { flag.Parse() @@ -80,80 +129,96 @@ func main() { defer infile.Close() } - type chunk struct { - vals []value - t time.Time - } + responses := make(chan maybe) + go streamValues(conn, responses) + c := make(chan maybe) - sent := make(chan *chunk, 200) go streamValues(infile, c) - go func() { - w := bufio.NewWriterSize(conn, 16384) - defer func() { - close(sent) - fmt.Println("All data transferred. Waiting for the last reply...") - }() - requests := &chunk{vals: make([]value, 0, chunk_size)} - for m := range c { - if !m.ok() { - fmt.Fprintf(os.Stderr, "InputError: %v\n", m.err()) - continue - } - if _, err := m.val().Write(w); err != nil { - fmt.Fprintf(os.Stderr, "WriteError: %v\n", err) - } - requests.vals = append(requests.vals, m.val()) - if len(requests.vals) == cap(requests.vals) { - requests.t = time.Now() - if err := w.Flush(); err != nil { - fmt.Fprintf(os.Stderr, "FlushError: %v\n", err) - } - sent <- requests - requests = &chunk{vals: make([]value, 0, chunk_size)} - } - } - if len(requests.vals) > 0 { - if err := w.Flush(); err != nil { - fmt.Fprintf(os.Stderr, "FlushError: %v\n", err) - } - sent <- requests - } - }() + w := bufio.NewWriterSize(conn, 16384) - replies, errors := 0, 0 - responses := make(chan maybe) - go streamValues(conn, responses) id := 1 - for requests := range sent { - for _, request := range requests.vals { - response := <-responses - if response.ok() { - switch r := response.val().(type) { - case ErrorVal: - if options.verbose { - fmt.Fprintf(os.Stderr, "%q -> %q\n", request, response.val()) - } else { - fmt.Fprintln(os.Stderr, r) - } - errors++ - default: - if options.verbose { - fmt.Fprintf(os.Stdout, "%q -> %q\n", request, response.val()) - } - replies++ - } - } else { - fmt.Fprintf(os.Stderr, "ResponseError: %v\n", response.err()) - } + requests := &chunk{id: id, vals: make([]value, 0, chunk_size)} + for m := range c { + if !m.ok() { + fmt.Fprintf(os.Stderr, "InputError: %v\n", m.err()) + continue + } + requests.vals = append(requests.vals, m.val()) + if len(requests.vals) == cap(requests.vals) { + requests.send(w, responses) + id++ + requests = &chunk{id: id, vals: make([]value, 0, chunk_size)} } - elapsed := time.Since(requests.t) - fmt.Fprintf(os.Stdout, "%d %d %v %v\n", id, len(requests.vals), elapsed, - time.Duration(int64(elapsed)/int64(len(requests.vals)))) - id++ } - fmt.Println("Last reply received from server.") - fmt.Printf("errors: %d, replies: %d\n", errors, replies) + if len(requests.vals) > 0 { + requests.send(w, responses) + } + + // go func() { + // w := bufio.NewWriterSize(conn, 16384) + // defer func() { + // close(sent) + // fmt.Println("All data transferred. Waiting for the last reply...") + // }() + // requests := &chunk{vals: make([]value, 0, chunk_size)} + // for m := range c { + // if !m.ok() { + // fmt.Fprintf(os.Stderr, "InputError: %v\n", m.err()) + // continue + // } + // if _, err := m.val().Write(w); err != nil { + // fmt.Fprintf(os.Stderr, "WriteError: %v\n", err) + // } + + // requests.vals = append(requests.vals, m.val()) + // if len(requests.vals) == cap(requests.vals) { + // requests.t = time.Now() + // if err := w.Flush(); err != nil { + // fmt.Fprintf(os.Stderr, "FlushError: %v\n", err) + // } + // sent <- requests + // requests = &chunk{vals: make([]value, 0, chunk_size)} + // } + // } + // if len(requests.vals) > 0 { + // if err := w.Flush(); err != nil { + // fmt.Fprintf(os.Stderr, "FlushError: %v\n", err) + // } + // sent <- requests + // } + // }() + + // id := 1 + // for requests := range sent { + // for _, request := range requests.vals { + // response := <-responses + // if response.ok() { + // switch r := response.val().(type) { + // case ErrorVal: + // if options.verbose { + // fmt.Fprintf(os.Stderr, "%q -> %q\n", request, response.val()) + // } else { + // fmt.Fprintln(os.Stderr, r) + // } + // errors++ + // default: + // if options.verbose { + // fmt.Fprintf(os.Stdout, "%q -> %q\n", request, response.val()) + // } + // replies++ + // } + // } else { + // fmt.Fprintf(os.Stderr, "ResponseError: %v\n", response.err()) + // } + // } + // elapsed := time.Since(requests.t) + // fmt.Fprintf(os.Stdout, "%d %d %v %v\n", id, len(requests.vals), elapsed, + // time.Duration(int64(elapsed)/int64(len(requests.vals)))) + // id++ + // } + // fmt.Println("Last reply received from server.") + // fmt.Printf("errors: %d, replies: %d\n", errors, replies) } func init() {