diff --git a/rsload.go b/rsload.go index 23eb5ba..9fc270e 100644 --- a/rsload.go +++ b/rsload.go @@ -47,8 +47,7 @@ func main() { defer conn.Close() if options.password != "" { - auth(options.password).Write(conn) - // fmt.Fprintf(conn, "*2\r\n$4\r\nauth\r\n$%d\r\n%s\r\n", len(options.password), options.password) + auth(options.password).Write(conn) v, err := readValue(conn) if err != nil { fmt.Printf("unable to auth: %v\n", err) @@ -67,49 +66,30 @@ func main() { } defer infile.Close() - c, e := make(chan value), make(chan error) + c := make(chan maybe) sent := make(chan value) - go streamValues(infile, c, e) + go streamValues(infile, c) go func() { - for { - select { - case v, ok := <-c: - if !ok { - return - } - v.Write(conn) - sent <- v - case err, ok := <-e: - if !ok { - return - } - fmt.Println(err) + defer close(sent) + for r := range c { + if r.ok() { + r.val().Write(conn) + sent <- r.val() + } else { + // this bad + fmt.Println(r.err()) } } }() - type pair struct { - request value - response value - } - - cc, ee := make(chan value), make(chan error) - go streamValues(conn, cc, ee) -ReadResponses: - for { - select { - case response, ok := <-cc: - if !ok { - break ReadResponses - } - request := <-sent - fmt.Println(pair{request, response}) - case err, ok := <-ee: - if !ok { - break ReadResponses - } - request := <-sent - fmt.Printf("fuck %v %v\n", request, err) + responses := make(chan maybe) + go streamValues(conn, responses) + for request := range sent { + response := <-responses + if response.ok() { + fmt.Fprintf(os.Stdout, "%v +> %v\n", request, response.val()) + } else { + fmt.Fprintf(os.Stderr, "%v -> %v\n", request, response.val()) } } } diff --git a/values.go b/values.go index c9c4c5d..2fa42c9 100644 --- a/values.go +++ b/values.go @@ -19,8 +19,25 @@ type value interface { Write(io.Writer) (int, error) } +type maybe struct { + value + error +} + +func (m maybe) err() error { + return m.error +} + +func (m maybe) ok() bool { + return m.error == nil +} + +func (m maybe) val() value { + return m.value +} + func auth(password string) value { - return Array{BulkString("auth"), BulkString(password)} + return Array{BulkString("auth"), BulkString(password)} } func isOK(v value) bool { @@ -31,9 +48,8 @@ func isOK(v value) bool { return vv == "OK" } -func streamValues(r io.Reader, c chan value, e chan error) { +func streamValues(r io.Reader, c chan maybe) { defer close(c) - defer close(e) r = bufio.NewReader(r) for { @@ -42,9 +58,9 @@ func streamValues(r io.Reader, c chan value, e chan error) { case io.EOF: return case nil: - c <- v + c <- maybe{value: v} default: - e <- err + c <- maybe{error: err} } } } diff --git a/values_test.go b/values_test.go index ae0f347..6c75697 100644 --- a/values_test.go +++ b/values_test.go @@ -132,24 +132,16 @@ func (s streamTest) run(t *testing.T) { out[i-1] = s[i].(value) } - c, e := make(chan value), make(chan error) - go streamValues(strings.NewReader(in), c, e) + c := make(chan maybe) + go streamValues(strings.NewReader(in), c) var count int - for { - select { - case v, ok := <-c: - if !ok { - return + for v := range c { + if v.ok() { + if !eq(out[count], v.val()) { + t.Errorf("expected %q, got %q", out[count], v.val()) } - if !eq(out[count], v) { - t.Errorf("expected %q, got %q", out[count], v) - } - case err, ok := <-e: - if !ok { - return - } - t.Error(err) - return + } else { + t.Error(v.err()) } count++ }