From 82694d1118be0ffc6ba252e8d62a6bbfda3057bd Mon Sep 17 00:00:00 2001 From: Jordan Orelli Date: Fri, 15 Aug 2014 16:44:44 +0000 Subject: [PATCH] channels should be buffered up to one chunk's size otherwise we can have some i/o thrashing on simply decoding the input while reading the responses. This lets us decode our input up to one chunk in advance of our redis interactions. --- chunk.go | 15 +++++++++++++++ rsload.go | 11 ++++------- 2 files changed, 19 insertions(+), 7 deletions(-) diff --git a/chunk.go b/chunk.go index 4d30837..e858c6d 100644 --- a/chunk.go +++ b/chunk.go @@ -4,9 +4,15 @@ import ( "bufio" "fmt" "os" + "sync" "time" ) +var chunk_id struct { + sync.Mutex + val int +} + // type chunk represents a grouping of redis statements to be sent to a redis // server. type chunk struct { @@ -15,6 +21,15 @@ type chunk struct { t time.Time } +// creates a new chunk of a given size, automatically assigning it a +// monotomically increasing id. +func newChunk(size int) *chunk { + chunk_id.Lock() + defer chunk_id.Unlock() + chunk_id.val++ + return &chunk{id: chunk_id.val, vals: make([]value, 0, size)} +} + // write out a chunk's statements onto a bufio.Writer. After sending all // statements onto the bufio.Writer, reads from the provided channel of // responses N times, where N is the number of statements originally sent. diff --git a/rsload.go b/rsload.go index 5807a38..e4300d7 100644 --- a/rsload.go +++ b/rsload.go @@ -10,7 +10,6 @@ import ( "time" ) -var chunk_size = 1 var chunk_target = 250 * time.Millisecond var chunk_max = 10000 @@ -96,16 +95,15 @@ func main() { f := infile() - responses := make(chan maybe) + responses := make(chan maybe, chunk_max) go streamValues(conn, responses) - c := make(chan maybe) + c := make(chan maybe, chunk_max) go streamValues(f, c) w := bufio.NewWriterSize(conn, 16384) - id := 1 - requests := &chunk{id: id, vals: make([]value, 0, chunk_size)} + requests := newChunk(1) errors, replies := 0, 0 for m := range c { if !m.ok() { @@ -119,8 +117,7 @@ func main() { errors += stats.errors replies += stats.replies time.Sleep(time.Duration(float64(stats.elapsed) * 0.1)) - id++ - requests = &chunk{id: id, vals: make([]value, 0, stats.nextSize())} + requests = newChunk(stats.nextSize()) } } if len(requests.vals) > 0 {