From e1df1cdd8370b3f67aaa045a4516da99b0ae4c91 Mon Sep 17 00:00:00 2001 From: Jordan Orelli Date: Wed, 6 Aug 2014 19:39:43 +0000 Subject: [PATCH] add --pipe, fix bulkstring read error --- rsload.go | 36 +++++++++++++++++++++++------------- values.go | 7 +++++-- 2 files changed, 28 insertions(+), 15 deletions(-) diff --git a/rsload.go b/rsload.go index 5521094..3d81325 100644 --- a/rsload.go +++ b/rsload.go @@ -13,6 +13,7 @@ var options struct { port int password string buffer int + pipe bool } func usage(status int) { @@ -31,11 +32,6 @@ func randomString(n int) string { func main() { flag.Parse() - args := flag.Args() - if len(args) < 1 { - usage(1) - } - fname := args[0] conn, err := net.Dial("tcp", fmt.Sprintf("%s:%d", options.host, options.port)) if err != nil { @@ -57,12 +53,25 @@ func main() { } } - infile, err := os.Open(fname) - if err != nil { - fmt.Printf("unable to open file %s: %v\n", fname, err) - os.Exit(1) + var infile *os.File + + if options.pipe { + infile = os.Stdin + } else { + args := flag.Args() + if len(args) < 1 { + usage(1) + } + fname := args[0] + + var err error + infile, err = os.Open(fname) + if err != nil { + fmt.Printf("unable to open file %s: %v\n", fname, err) + os.Exit(1) + } + defer infile.Close() } - defer infile.Close() c := make(chan maybe) sent := make(chan value, options.buffer) @@ -77,8 +86,8 @@ func main() { r.val().Write(conn) sent <- r.val() } else { - // this bad - fmt.Println(r.err()) + fmt.Fprintf(os.Stderr, "InputError: %v\n", r.err()) + return } } }() @@ -97,7 +106,7 @@ func main() { replies++ } } else { - fmt.Fprintln(os.Stderr, response.err()) + fmt.Fprintf(os.Stderr, "ResponseError: %v\n", response.err()) } } fmt.Println("Last reply received from server.") @@ -109,6 +118,7 @@ func init() { flag.IntVar(&options.port, "p", 6379, "port") flag.StringVar(&options.password, "a", "", "password") flag.IntVar(&options.buffer, "buffer", 0, "number of outstanding statements allowed before throttling") + flag.BoolVar(&options.pipe, "pipe", false, "transfers input from stdin to server") } /* diff --git a/values.go b/values.go index 2fa42c9..08c212e 100644 --- a/values.go +++ b/values.go @@ -175,9 +175,12 @@ func readBulkString(prefix []byte, r io.Reader) (value, error) { n += 2 b := make([]byte, n) - n_read, err := r.Read(b) + n_read, err := io.ReadFull(r, b) switch err { - case io.EOF, nil: + case io.EOF: + fmt.Printf("saw eof after %d bytes looking for %d bytes in bulkstring\n", n_read, n) + fmt.Println(string(b)) + case nil: break default: return nil, fmt.Errorf("unable to read bulkstring in redis protocol: error on read: %v", err)