diff --git a/rsload.go b/rsload.go index b436300..305866f 100644 --- a/rsload.go +++ b/rsload.go @@ -9,6 +9,8 @@ import ( "runtime/pprof" ) +var chunk_size = 100 + var options struct { host string port int @@ -77,8 +79,9 @@ func main() { defer infile.Close() } + type chunk []value c := make(chan maybe) - sent := make(chan value, options.buffer) + sent := make(chan chunk, 200) go streamValues(infile, c) go func() { w := bufio.NewWriterSize(conn, 16384) @@ -86,45 +89,57 @@ func main() { close(sent) fmt.Println("All data transferred. Waiting for the last reply...") }() - count := 0 - for r := range c { - count++ - if r.ok() { - r.val().Write(w) - if count == 100 { - w.Flush() - count = 0 + requests := make(chunk, 0, chunk_size) + for m := range c { + if !m.ok() { + fmt.Fprintf(os.Stderr, "InputError: %v\n", m.err()) + continue + } + if _, err := m.val().Write(w); err != nil { + fmt.Fprintf(os.Stderr, "WriteError: %v\n", err) + } + + requests = append(requests, m.val()) + if len(requests) == cap(requests) { + if err := w.Flush(); err != nil { + fmt.Fprintf(os.Stderr, "FlushError: %v\n", err) } - sent <- r.val() - } else { - fmt.Fprintf(os.Stderr, "InputError: %v\n", r.err()) - return + sent <- requests + requests = make(chunk, 0, chunk_size) } } + if len(requests) > 0 { + if err := w.Flush(); err != nil { + fmt.Fprintf(os.Stderr, "FlushError: %v\n", err) + } + sent <- requests + } }() replies, errors := 0, 0 responses := make(chan maybe) go streamValues(conn, responses) - for request := range sent { - response := <-responses - if response.ok() { - switch r := response.val().(type) { - case ErrorVal: - if options.verbose { - fmt.Fprintf(os.Stderr, "%q -> %q\n", request, response.val()) - } else { - fmt.Fprintln(os.Stderr, r) + for requests := range sent { + for _, request := range requests { + response := <-responses + if response.ok() { + switch r := response.val().(type) { + case ErrorVal: + if options.verbose { + fmt.Fprintf(os.Stderr, "%q -> %q\n", request, response.val()) + } else { + fmt.Fprintln(os.Stderr, r) + } + errors++ + default: + if options.verbose { + fmt.Fprintf(os.Stdout, "%q -> %q\n", request, response.val()) + } + replies++ } - errors++ - default: - if options.verbose { - fmt.Fprintf(os.Stdout, "%q -> %q\n", request, response.val()) - } - replies++ + } else { + fmt.Fprintf(os.Stderr, "ResponseError: %v\n", response.err()) } - } else { - fmt.Fprintf(os.Stderr, "ResponseError: %v\n", response.err()) } } fmt.Println("Last reply received from server.")