diff --git a/values.go b/values.go index 6c155cb..3b41e21 100644 --- a/values.go +++ b/values.go @@ -18,6 +18,24 @@ var ( type value interface { } +func streamValues(r io.Reader, c chan value, e chan error) { + defer close(c) + defer close(e) + + r = bufio.NewReader(r) + for { + v, err := readValue(r) + switch err { + case io.EOF: + return + case nil: + c <- v + default: + e <- err + } + } +} + func readValue(r io.Reader) (value, error) { var br *bufio.Reader switch t := r.(type) { @@ -44,7 +62,7 @@ func readValue(r io.Reader) (value, error) { return nil, fmt.Errorf("unable to read redis protocol value: input %q is too small", line) } if line[len(line)-2] != '\r' { - return nil, fmt.Errorf("unable to read redis protocol value: bad line terminator") + return nil, fmt.Errorf("unable to read redis protocol value: bad line terminator: %q", line) } line = line[:len(line)-2] switch line[0] { diff --git a/values_test.go b/values_test.go index 0c6c3f1..0c6c2d9 100644 --- a/values_test.go +++ b/values_test.go @@ -10,29 +10,34 @@ type valueTest struct { out value } -func (test valueTest) run(t *testing.T) { - v, err := readValue(strings.NewReader(test.in + "\r\n")) - if err != nil { - t.Errorf("valueTest error: %v", err) - } - switch expected := test.out.(type) { +func eq(v1, v2 value) bool { + switch t1 := v1.(type) { case Array: - got, ok := v.(Array) + t2, ok := v2.(Array) if !ok { - t.Errorf("expected Array value, got %v", v) + return false } - if len(got) != len(expected) { - t.Errorf("expected Array of length %d, saw Array of length %d", len(expected), len(got)) + if len(t1) != len(t2) { + return false } - for i := 0; i < len(got); i++ { - if got[i] != expected[i] { - t.Errorf("Array values do not match: got %v, expected %v", got, expected) + for i := 0; i < len(t1); i++ { + if !eq(t1[i], t2[i]) { + return false } } + return true default: - if v != test.out { - t.Errorf("expected %v, got %v", test.out, v) - } + return v1 == v2 + } +} + +func (test valueTest) run(t *testing.T) { + v, err := readValue(strings.NewReader(test.in + "\r\n")) + if err != nil { + t.Errorf("valueTest error: %v", err) + } + if !eq(v, test.out) { + t.Errorf("expected %v, got %v", test.out, v) } } @@ -84,6 +89,7 @@ var valueTests = []valueTest{ {"*1\r\n+hello\r\n", Array{String("hello")}}, {"*2\r\n+one\r\n+two", Array{String("one"), String("two")}}, {"*2\r\n$4\r\necho\r\n$5\r\nhello", Array{BulkString("echo"), BulkString("hello")}}, + {"*2\r\n$4\r\necho\r\n$5\r\nhello\r\n+extra\r\n", Array{BulkString("echo"), BulkString("hello")}}, } func TestValues(t *testing.T) { @@ -91,3 +97,53 @@ func TestValues(t *testing.T) { test.run(t) } } + +type streamTest []interface{} + +var streamTests = []streamTest{ + {"+hello\r\n", String("hello")}, + {":1\r\n:2\r\n:3\r\n", Integer(1), Integer(2), Integer(3)}, + {"*0\r\n", Array{}}, + {"*1\r\n+one\r\n", Array{String("one")}}, + {"*2\r\n+one\r\n+two\r\n", Array{String("one"), String("two")}}, + { + "+preamble\r\n*2\r\n+one\r\n+two\r\n", + String("preamble"), + Array{String("one"), String("two")}, + }, + { + "+preamble\r\n*2\r\n+one\r\n+two\r\n+outro\r\n", + String("preamble"), + Array{String("one"), String("two")}, + String("outro"), + }, + { + "+preamble\r\n*2\r\n$3\r\none\r\n$3\r\ntwo\r\n+outro\r\n", + String("preamble"), + Array{BulkString("one"), BulkString("two")}, + String("outro"), + }, + {"-bad\r\n", Error("bad")}, +} + +func (s streamTest) run(t *testing.T) { + c, e := make(chan value), make(chan error) + r := strings.NewReader(s[0].(string)) + go streamValues(r, c, e) + for i := 1; i < len(s); i++ { + select { + case v := <-c: + if !eq(s[i], v) { + t.Errorf("expected %v, got %v", s[i], v) + } + case err := <-e: + t.Error(err) + } + } +} + +func TestStreams(t *testing.T) { + for _, test := range streamTests { + test.run(t) + } +}