add value stream reading

master
Jordan Orelli 10 years ago
parent a73b957f1a
commit 738881cf97

@ -18,6 +18,24 @@ var (
type value interface { type value interface {
} }
func streamValues(r io.Reader, c chan value, e chan error) {
defer close(c)
defer close(e)
r = bufio.NewReader(r)
for {
v, err := readValue(r)
switch err {
case io.EOF:
return
case nil:
c <- v
default:
e <- err
}
}
}
func readValue(r io.Reader) (value, error) { func readValue(r io.Reader) (value, error) {
var br *bufio.Reader var br *bufio.Reader
switch t := r.(type) { switch t := r.(type) {
@ -44,7 +62,7 @@ func readValue(r io.Reader) (value, error) {
return nil, fmt.Errorf("unable to read redis protocol value: input %q is too small", line) return nil, fmt.Errorf("unable to read redis protocol value: input %q is too small", line)
} }
if line[len(line)-2] != '\r' { if line[len(line)-2] != '\r' {
return nil, fmt.Errorf("unable to read redis protocol value: bad line terminator") return nil, fmt.Errorf("unable to read redis protocol value: bad line terminator: %q", line)
} }
line = line[:len(line)-2] line = line[:len(line)-2]
switch line[0] { switch line[0] {

@ -10,29 +10,34 @@ type valueTest struct {
out value out value
} }
func (test valueTest) run(t *testing.T) { func eq(v1, v2 value) bool {
v, err := readValue(strings.NewReader(test.in + "\r\n")) switch t1 := v1.(type) {
if err != nil {
t.Errorf("valueTest error: %v", err)
}
switch expected := test.out.(type) {
case Array: case Array:
got, ok := v.(Array) t2, ok := v2.(Array)
if !ok { if !ok {
t.Errorf("expected Array value, got %v", v) return false
} }
if len(got) != len(expected) { if len(t1) != len(t2) {
t.Errorf("expected Array of length %d, saw Array of length %d", len(expected), len(got)) return false
} }
for i := 0; i < len(got); i++ { for i := 0; i < len(t1); i++ {
if got[i] != expected[i] { if !eq(t1[i], t2[i]) {
t.Errorf("Array values do not match: got %v, expected %v", got, expected) return false
} }
} }
return true
default: default:
if v != test.out { return v1 == v2
t.Errorf("expected %v, got %v", test.out, v) }
}
func (test valueTest) run(t *testing.T) {
v, err := readValue(strings.NewReader(test.in + "\r\n"))
if err != nil {
t.Errorf("valueTest error: %v", err)
} }
if !eq(v, test.out) {
t.Errorf("expected %v, got %v", test.out, v)
} }
} }
@ -84,6 +89,7 @@ var valueTests = []valueTest{
{"*1\r\n+hello\r\n", Array{String("hello")}}, {"*1\r\n+hello\r\n", Array{String("hello")}},
{"*2\r\n+one\r\n+two", Array{String("one"), String("two")}}, {"*2\r\n+one\r\n+two", Array{String("one"), String("two")}},
{"*2\r\n$4\r\necho\r\n$5\r\nhello", Array{BulkString("echo"), BulkString("hello")}}, {"*2\r\n$4\r\necho\r\n$5\r\nhello", Array{BulkString("echo"), BulkString("hello")}},
{"*2\r\n$4\r\necho\r\n$5\r\nhello\r\n+extra\r\n", Array{BulkString("echo"), BulkString("hello")}},
} }
func TestValues(t *testing.T) { func TestValues(t *testing.T) {
@ -91,3 +97,53 @@ func TestValues(t *testing.T) {
test.run(t) test.run(t)
} }
} }
type streamTest []interface{}
var streamTests = []streamTest{
{"+hello\r\n", String("hello")},
{":1\r\n:2\r\n:3\r\n", Integer(1), Integer(2), Integer(3)},
{"*0\r\n", Array{}},
{"*1\r\n+one\r\n", Array{String("one")}},
{"*2\r\n+one\r\n+two\r\n", Array{String("one"), String("two")}},
{
"+preamble\r\n*2\r\n+one\r\n+two\r\n",
String("preamble"),
Array{String("one"), String("two")},
},
{
"+preamble\r\n*2\r\n+one\r\n+two\r\n+outro\r\n",
String("preamble"),
Array{String("one"), String("two")},
String("outro"),
},
{
"+preamble\r\n*2\r\n$3\r\none\r\n$3\r\ntwo\r\n+outro\r\n",
String("preamble"),
Array{BulkString("one"), BulkString("two")},
String("outro"),
},
{"-bad\r\n", Error("bad")},
}
func (s streamTest) run(t *testing.T) {
c, e := make(chan value), make(chan error)
r := strings.NewReader(s[0].(string))
go streamValues(r, c, e)
for i := 1; i < len(s); i++ {
select {
case v := <-c:
if !eq(s[i], v) {
t.Errorf("expected %v, got %v", s[i], v)
}
case err := <-e:
t.Error(err)
}
}
}
func TestStreams(t *testing.T) {
for _, test := range streamTests {
test.run(t)
}
}

Loading…
Cancel
Save