block on chunks

master
Jordan Orelli 10 years ago
parent 8d22dc5310
commit 0c5e4ec18c

@ -27,6 +27,55 @@ func usage(status int) {
os.Exit(status) os.Exit(status)
} }
type chunk struct {
id int
vals []value
t time.Time
}
func (c *chunk) send(w *bufio.Writer, responses chan maybe) {
for _, v := range c.vals {
v.Write(w)
}
start := time.Now()
size := w.Buffered()
w.Flush()
errors, replies := 0, 0
for _, request := range c.vals {
response, ok := <-responses
if !ok {
fmt.Fprintf(os.Stderr, "ohhhhhhhhhhhhh fuck\n")
return
}
if response.ok() {
switch r := response.val().(type) {
case ErrorVal:
if options.verbose {
fmt.Fprintf(os.Stderr, "%q -> %q\n", request, response.val())
} else {
fmt.Fprintln(os.Stderr, r)
}
errors++
default:
if options.verbose {
fmt.Fprintf(os.Stdout, "%q -> %q\n", request, response.val())
}
replies++
}
} else {
fmt.Fprintf(os.Stderr, "ResponseError: %v\n", response.err())
}
}
elapsed := time.Since(start)
sleep := elapsed / 4
time.Sleep(sleep)
if true {
avg := time.Duration(int64(elapsed) / int64(len(c.vals)))
fmt.Printf("id: %d errors: %d replies: %d total: %d sent: %d elapsed: %v avg: %v size: %v sleep: %v\n",
c.id, errors, replies, errors+replies, len(c.vals), elapsed, avg, size, sleep)
}
}
func main() { func main() {
flag.Parse() flag.Parse()
@ -80,80 +129,96 @@ func main() {
defer infile.Close() defer infile.Close()
} }
type chunk struct { responses := make(chan maybe)
vals []value go streamValues(conn, responses)
t time.Time
}
c := make(chan maybe) c := make(chan maybe)
sent := make(chan *chunk, 200)
go streamValues(infile, c) go streamValues(infile, c)
go func() {
w := bufio.NewWriterSize(conn, 16384) w := bufio.NewWriterSize(conn, 16384)
defer func() {
close(sent) id := 1
fmt.Println("All data transferred. Waiting for the last reply...") requests := &chunk{id: id, vals: make([]value, 0, chunk_size)}
}()
requests := &chunk{vals: make([]value, 0, chunk_size)}
for m := range c { for m := range c {
if !m.ok() { if !m.ok() {
fmt.Fprintf(os.Stderr, "InputError: %v\n", m.err()) fmt.Fprintf(os.Stderr, "InputError: %v\n", m.err())
continue continue
} }
if _, err := m.val().Write(w); err != nil {
fmt.Fprintf(os.Stderr, "WriteError: %v\n", err)
}
requests.vals = append(requests.vals, m.val()) requests.vals = append(requests.vals, m.val())
if len(requests.vals) == cap(requests.vals) { if len(requests.vals) == cap(requests.vals) {
requests.t = time.Now() requests.send(w, responses)
if err := w.Flush(); err != nil { id++
fmt.Fprintf(os.Stderr, "FlushError: %v\n", err) requests = &chunk{id: id, vals: make([]value, 0, chunk_size)}
}
sent <- requests
requests = &chunk{vals: make([]value, 0, chunk_size)}
} }
} }
if len(requests.vals) > 0 { if len(requests.vals) > 0 {
if err := w.Flush(); err != nil { requests.send(w, responses)
fmt.Fprintf(os.Stderr, "FlushError: %v\n", err)
}
sent <- requests
} }
}()
replies, errors := 0, 0 // go func() {
responses := make(chan maybe) // w := bufio.NewWriterSize(conn, 16384)
go streamValues(conn, responses) // defer func() {
id := 1 // close(sent)
for requests := range sent { // fmt.Println("All data transferred. Waiting for the last reply...")
for _, request := range requests.vals { // }()
response := <-responses // requests := &chunk{vals: make([]value, 0, chunk_size)}
if response.ok() { // for m := range c {
switch r := response.val().(type) { // if !m.ok() {
case ErrorVal: // fmt.Fprintf(os.Stderr, "InputError: %v\n", m.err())
if options.verbose { // continue
fmt.Fprintf(os.Stderr, "%q -> %q\n", request, response.val()) // }
} else { // if _, err := m.val().Write(w); err != nil {
fmt.Fprintln(os.Stderr, r) // fmt.Fprintf(os.Stderr, "WriteError: %v\n", err)
} // }
errors++
default: // requests.vals = append(requests.vals, m.val())
if options.verbose { // if len(requests.vals) == cap(requests.vals) {
fmt.Fprintf(os.Stdout, "%q -> %q\n", request, response.val()) // requests.t = time.Now()
} // if err := w.Flush(); err != nil {
replies++ // fmt.Fprintf(os.Stderr, "FlushError: %v\n", err)
} // }
} else { // sent <- requests
fmt.Fprintf(os.Stderr, "ResponseError: %v\n", response.err()) // requests = &chunk{vals: make([]value, 0, chunk_size)}
} // }
} // }
elapsed := time.Since(requests.t) // if len(requests.vals) > 0 {
fmt.Fprintf(os.Stdout, "%d %d %v %v\n", id, len(requests.vals), elapsed, // if err := w.Flush(); err != nil {
time.Duration(int64(elapsed)/int64(len(requests.vals)))) // fmt.Fprintf(os.Stderr, "FlushError: %v\n", err)
id++ // }
} // sent <- requests
fmt.Println("Last reply received from server.") // }
fmt.Printf("errors: %d, replies: %d\n", errors, replies) // }()
// id := 1
// for requests := range sent {
// for _, request := range requests.vals {
// response := <-responses
// if response.ok() {
// switch r := response.val().(type) {
// case ErrorVal:
// if options.verbose {
// fmt.Fprintf(os.Stderr, "%q -> %q\n", request, response.val())
// } else {
// fmt.Fprintln(os.Stderr, r)
// }
// errors++
// default:
// if options.verbose {
// fmt.Fprintf(os.Stdout, "%q -> %q\n", request, response.val())
// }
// replies++
// }
// } else {
// fmt.Fprintf(os.Stderr, "ResponseError: %v\n", response.err())
// }
// }
// elapsed := time.Since(requests.t)
// fmt.Fprintf(os.Stdout, "%d %d %v %v\n", id, len(requests.vals), elapsed,
// time.Duration(int64(elapsed)/int64(len(requests.vals))))
// id++
// }
// fmt.Println("Last reply received from server.")
// fmt.Printf("errors: %d, replies: %d\n", errors, replies)
} }
func init() { func init() {

Loading…
Cancel
Save