cleaning up

master
Jordan Orelli 11 years ago
parent b78c2b4646
commit 772b929823

@ -0,0 +1,117 @@
package main
import (
"bufio"
"fmt"
"os"
"time"
)
// type chunk represents a grouping of redis statements to be sent to a redis
// server.
type chunk struct {
id int
vals []value
t time.Time
}
// write out a chunk's statements onto a bufio.Writer. After sending all
// statements onto the bufio.Writer, reads from the provided channel of
// responses N times, where N is the number of statements originally sent.
func (c *chunk) send(w *bufio.Writer, responses chan maybe) *sendResult {
stats := new(sendResult)
stats.start()
defer stats.stop()
for _, v := range c.vals {
n, err := v.Write(w)
if err != nil {
fmt.Fprintf(os.Stderr, "error writing a statement: %v\n", err)
}
stats.byteSize += n
}
if err := w.Flush(); err != nil {
fmt.Fprintf(os.Stderr, "error flushing statement buffer: %v\n", err)
}
for _, request := range c.vals {
response, ok := <-responses
if !ok {
// we get here when the response channel closes too early
fmt.Fprintf(os.Stderr, "error reading redis response stream: response chan closed early")
return stats
}
stats.accumulate(request, response)
}
return stats
}
// -----------------------------------------------------------------------------
type sendResult struct {
read int // number of statement sread on the incoming statement stream
errors int // number of error responses seen on the redis response stream
replies int // number of reply responses seen on the redis response stream
byteSize int // number of bytes sent
elapsed time.Duration // time taken to send requests and receive responses from redis
paused time.Duration // sleep time inserted
startTime time.Time
}
func (s *sendResult) start() {
s.startTime = time.Now()
}
func (s *sendResult) stop() {
s.elapsed = time.Since(s.startTime)
}
func (s *sendResult) avg() time.Duration {
if s.read == 0 {
fmt.Fprintln(os.Stderr, "for some reason, we tried to divide by zero on sendResult.avg. we recovered, though.")
return 100 * time.Microsecond
}
return time.Duration(int64(s.elapsed) / int64(s.read))
}
func (s *sendResult) log() {
if !options.chunkInfo {
return
}
fmt.Printf("errors: %d replies: %d total: %d sent: %d elapsed: %v avg: %v size: %v\n", s.errors, s.replies, s.errors+s.replies, s.read, s.elapsed, s.avg(), s.byteSize)
}
func (s *sendResult) nextSize() int {
target := int(int64(chunk_target) / int64(s.avg()))
return min(target, chunk_max, 2*s.read)
}
func (s *sendResult) accumulate(request value, response maybe) {
s.read++
if response.ok() {
switch r := response.val().(type) {
case ErrorVal:
// these errors are errors reported from redis. e.g., if you
// try to do something like delete a key that doesn't exist or
// send an invalid command to redis
if options.verbose {
fmt.Fprintf(os.Stderr, "%q -> %q\n", request, response.val())
} else {
fmt.Fprintf(os.Stderr, "%q\n", r)
}
s.errors++
default:
if options.verbose {
fmt.Fprintf(os.Stdout, "%q -> %q\n", request, response.val())
}
s.replies++
}
} else {
// we get here when we encounter an error in the response stream.
// That is, the response stream contains bytes that are not valid
// in the redis protocol.
fmt.Fprintf(os.Stderr, "ResponseError: %v\n", response.err())
}
}

@ -0,0 +1,19 @@
package main
func min(vals ...int) int {
var best int
switch len(vals) {
case 0:
return 0
case 1:
return vals[0]
default:
best = vals[0]
}
for _, v := range vals[1:] {
if v < best {
best = v
}
}
return best
}

@ -15,13 +15,13 @@ var chunk_target = 250 * time.Millisecond
var chunk_max = 10000
var options struct {
host string
port int
password string
pipe bool
profile string
verbose bool
chunkInfo bool
host string // hostname or ip address of the redis server to connect to
port int // redis port to connect to
password string // redis password used with redis auth
pipe bool // whether or not to take input from stdin
profile string // path to which a cpu profile will be written, used for debug purposes
verbose bool // whether or not to echo out request/response pairs
chunkInfo bool // whether or not to write out stats about each chunk sent
}
func usage(status int) {
@ -29,126 +29,6 @@ func usage(status int) {
os.Exit(status)
}
type chunk struct {
id int
vals []value
t time.Time
}
type sendResult struct {
read int // number of statement sread on the incoming statement stream
errors int // number of error responses seen on the redis response stream
replies int // number of reply responses seen on the redis response stream
byteSize int // number of bytes sent
elapsed time.Duration // time taken to send requests and receive responses from redis
paused time.Duration // sleep time inserted
startTime time.Time
}
func (s *sendResult) start() {
s.startTime = time.Now()
}
func (s *sendResult) stop() {
s.elapsed = time.Since(s.startTime)
}
func (s *sendResult) avg() time.Duration {
if s.read == 0 {
fmt.Fprintln(os.Stderr, "for some reason, we tried to divide by zero on sendResult.avg. we recovered, though.")
return 100 * time.Microsecond
}
return time.Duration(int64(s.elapsed) / int64(s.read))
}
func (s *sendResult) log() {
if !options.chunkInfo {
return
}
fmt.Printf("errors: %d replies: %d total: %d sent: %d elapsed: %v avg: %v size: %v\n", s.errors, s.replies, s.errors+s.replies, s.read, s.elapsed, s.avg(), s.byteSize)
}
func (s *sendResult) nextSize() int {
target := int(int64(chunk_target) / int64(s.avg()))
return min(target, chunk_max, 2*s.read)
}
func min(vals ...int) int {
var best int
switch len(vals) {
case 0:
return 0
case 1:
return vals[0]
default:
best = vals[0]
}
for _, v := range vals[1:] {
if v < best {
best = v
}
}
return best
}
func (s *sendResult) accumulate(request value, response maybe) {
s.read++
if response.ok() {
switch r := response.val().(type) {
case ErrorVal:
// these errors are errors reported from redis. e.g., if you
// try to do something like delete a key that doesn't exist or
// send an invalid command to redis
if options.verbose {
fmt.Fprintf(os.Stderr, "%q -> %q\n", request, response.val())
} else {
fmt.Fprintf(os.Stderr, "%q\n", r)
}
s.errors++
default:
if options.verbose {
fmt.Fprintf(os.Stdout, "%q -> %q\n", request, response.val())
}
s.replies++
}
} else {
// we get here when we encounter an error in the response stream.
// That is, the response stream contains bytes that are not valid
// in the redis protocol.
fmt.Fprintf(os.Stderr, "ResponseError: %v\n", response.err())
}
}
func (c *chunk) send(w *bufio.Writer, responses chan maybe) *sendResult {
stats := new(sendResult)
stats.start()
defer stats.stop()
for _, v := range c.vals {
n, err := v.Write(w)
if err != nil {
fmt.Fprintf(os.Stderr, "error writing a statement: %v\n", err)
}
stats.byteSize += n
}
if err := w.Flush(); err != nil {
fmt.Fprintf(os.Stderr, "error flushing statement buffer: %v\n", err)
}
for _, request := range c.vals {
response, ok := <-responses
if !ok {
// we get here when the response channel closes too early
fmt.Fprintf(os.Stderr, "error reading redis response stream: response chan closed early")
return stats
}
stats.accumulate(request, response)
}
return stats
}
// sets up our input file for reading. If the --pipe option was specified, the
// input file is stdin. Otherwise, the input file is the first argument on the
// command line.
@ -171,28 +51,38 @@ func infile() *os.File {
return f
}
func main() {
flag.Parse()
func connect() net.Conn {
conn, err := net.Dial("tcp", fmt.Sprintf("%s:%d", options.host, options.port))
if err != nil {
fmt.Printf("unable to connect to redis: %v\n", err)
fmt.Fprintf(os.Stderr, "unable to connect to redis: %v\n", err)
os.Exit(1)
}
defer conn.Close()
auth(conn)
return conn
}
if options.password != "" {
fmt.Fprintf(conn, "*2\r\n$4\r\nauth\r\n$%d\r\n%s\r\n", len(options.password), options.password)
v, err := readValue(conn)
if err != nil {
fmt.Printf("unable to auth: %v\n", err)
os.Exit(1)
}
if !isOK(v) {
fmt.Printf("auth not OK: %q\n", v)
os.Exit(1)
}
func auth(c net.Conn) {
if options.password == "" {
return
}
fmt.Fprintf(c, "*2\r\n$4\r\nauth\r\n$%d\r\n%s\r\n", len(options.password), options.password)
v, err := readValue(c)
if err != nil {
fmt.Printf("unable to auth: %v\n", err)
os.Exit(1)
}
if !isOK(v) {
fmt.Printf("auth not OK: %q\n", v)
os.Exit(1)
}
}
func main() {
flag.Parse()
conn := connect()
defer conn.Close()
if options.profile != "" {
f, err := os.Create(options.profile)

Loading…
Cancel
Save