diff --git a/rsload.go b/rsload.go index 4bd9f45..c723175 100644 --- a/rsload.go +++ b/rsload.go @@ -1,14 +1,14 @@ package main import ( - "bufio" + // "bufio" "flag" "fmt" - "io" + // "io" "math/rand" "net" "os" - "strings" + // "strings" ) var options struct { @@ -50,39 +50,56 @@ func main() { fmt.Fprintf(conn, "*2\r\n$4\r\nauth\r\n$%d\r\n%s\r\n", len(options.password), options.password) } - f, err := os.Open(fname) + infile, err := os.Open(fname) if err != nil { fmt.Printf("unable to open file %s: %v\n", fname, err) os.Exit(1) } - defer f.Close() + defer infile.Close() - c := make(chan statement) - go split(f, c) - - s := randomString(32) + c, e := make(chan value), make(chan error) + sent := make(chan value, 1) + go streamValues(infile, c, e) go func() { - for s := range c { - if err := s.write(conn); err != nil { + for { + select { + case v, ok := <-c: + if !ok { + return + } + v.Write(conn) + sent <- v + case err, ok := <-e: + if !ok { + return + } fmt.Println(err) - break } } - fmt.Fprintf(conn, "*2\r\n$4\r\necho\r\n$32\r\n%s\r\n", s) }() - r := bufio.NewReader(conn) + type pair struct { + request value + response value + } + + cc, ee := make(chan value), make(chan error) + go streamValues(conn, cc, ee) +ReadResponses: for { - line, err := r.ReadString('\n') - switch err { - case nil: - if strings.TrimSpace(line) == s { - return + select { + case response, ok := <-cc: + if !ok { + break ReadResponses + } + request := <-sent + fmt.Println(pair{request, response}) + case err, ok := <-ee: + if !ok { + break ReadResponses } - case io.EOF: - return - default: - fmt.Println(err) + request := <-sent + fmt.Printf("fuck %v %v\n", request, err) } } } diff --git a/values.go b/values.go index 2ea7bf9..4694880 100644 --- a/values.go +++ b/values.go @@ -16,6 +16,7 @@ var ( ) type value interface { + Write(io.Writer) (int, error) } func streamValues(r io.Reader, c chan value, e chan error) { @@ -36,6 +37,12 @@ func streamValues(r io.Reader, c chan value, e chan error) { } } +func writeValues(w io.Writer, c chan value) { + for v := range c { + v.Write(w) + } +} + func readValue(r io.Reader) (value, error) { var br *bufio.Reader switch t := r.(type) { @@ -89,6 +96,10 @@ func readString(b []byte) (value, error) { return String(b), nil } +func (s String) Write(w io.Writer) (int, error) { + return fmt.Fprintf(w, "+%s\r\n", s) +} + // ------------------------------------------------------------------------------ type Error string @@ -97,6 +108,10 @@ func readError(b []byte) (value, error) { return Error(b), nil } +func (e Error) Write(w io.Writer) (int, error) { + return fmt.Fprintf(w, "-%s\r\n") +} + // ------------------------------------------------------------------------------ type Integer int64 @@ -109,6 +124,10 @@ func readInteger(b []byte) (value, error) { return Integer(i), nil } +func (i Integer) Write(w io.Writer) (int, error) { + return fmt.Fprintf(w, ":%d\r\n", i) +} + // ------------------------------------------------------------------------------ type BulkString string @@ -147,6 +166,10 @@ func readBulkString(prefix []byte, r io.Reader) (value, error) { return BulkString(b[:len(b)-2]), nil } +func (s BulkString) Write(w io.Writer) (int, error) { + return fmt.Fprintf(w, "$%d\r\n%s\r\n", len(s), s) +} + // ----------------------------------------------------------------------------------------- type Array []value @@ -174,3 +197,23 @@ func readArray(prefix []byte, r *bufio.Reader) (value, error) { } return a, nil } + +func (a Array) Write(w io.Writer) (int, error) { + n, err := fmt.Fprintf(w, "*%d\r\n", len(a)) + if err != nil { + return n, err + } + + var ( + nn int + e error + ) + for i := 0; i < len(a); i++ { + nn, e = a[i].Write(w) + n += nn + if e != nil { + return n, e + } + } + return n, nil +}