added chunks

master
Jordan Orelli 11 years ago
parent 37895517bd
commit 1cdb5667e5

@ -9,6 +9,8 @@ import (
"runtime/pprof" "runtime/pprof"
) )
var chunk_size = 100
var options struct { var options struct {
host string host string
port int port int
@ -77,8 +79,9 @@ func main() {
defer infile.Close() defer infile.Close()
} }
type chunk []value
c := make(chan maybe) c := make(chan maybe)
sent := make(chan value, options.buffer) sent := make(chan chunk, 200)
go streamValues(infile, c) go streamValues(infile, c)
go func() { go func() {
w := bufio.NewWriterSize(conn, 16384) w := bufio.NewWriterSize(conn, 16384)
@ -86,45 +89,57 @@ func main() {
close(sent) close(sent)
fmt.Println("All data transferred. Waiting for the last reply...") fmt.Println("All data transferred. Waiting for the last reply...")
}() }()
count := 0 requests := make(chunk, 0, chunk_size)
for r := range c { for m := range c {
count++ if !m.ok() {
if r.ok() { fmt.Fprintf(os.Stderr, "InputError: %v\n", m.err())
r.val().Write(w) continue
if count == 100 { }
w.Flush() if _, err := m.val().Write(w); err != nil {
count = 0 fmt.Fprintf(os.Stderr, "WriteError: %v\n", err)
}
requests = append(requests, m.val())
if len(requests) == cap(requests) {
if err := w.Flush(); err != nil {
fmt.Fprintf(os.Stderr, "FlushError: %v\n", err)
} }
sent <- r.val() sent <- requests
} else { requests = make(chunk, 0, chunk_size)
fmt.Fprintf(os.Stderr, "InputError: %v\n", r.err())
return
} }
} }
if len(requests) > 0 {
if err := w.Flush(); err != nil {
fmt.Fprintf(os.Stderr, "FlushError: %v\n", err)
}
sent <- requests
}
}() }()
replies, errors := 0, 0 replies, errors := 0, 0
responses := make(chan maybe) responses := make(chan maybe)
go streamValues(conn, responses) go streamValues(conn, responses)
for request := range sent { for requests := range sent {
response := <-responses for _, request := range requests {
if response.ok() { response := <-responses
switch r := response.val().(type) { if response.ok() {
case ErrorVal: switch r := response.val().(type) {
if options.verbose { case ErrorVal:
fmt.Fprintf(os.Stderr, "%q -> %q\n", request, response.val()) if options.verbose {
} else { fmt.Fprintf(os.Stderr, "%q -> %q\n", request, response.val())
fmt.Fprintln(os.Stderr, r) } else {
fmt.Fprintln(os.Stderr, r)
}
errors++
default:
if options.verbose {
fmt.Fprintf(os.Stdout, "%q -> %q\n", request, response.val())
}
replies++
} }
errors++ } else {
default: fmt.Fprintf(os.Stderr, "ResponseError: %v\n", response.err())
if options.verbose {
fmt.Fprintf(os.Stdout, "%q -> %q\n", request, response.val())
}
replies++
} }
} else {
fmt.Fprintf(os.Stderr, "ResponseError: %v\n", response.err())
} }
} }
fmt.Println("Last reply received from server.") fmt.Println("Last reply received from server.")

Loading…
Cancel
Save