From 1f8808b098a93c8323fd3cff60f6f428384a1474 Mon Sep 17 00:00:00 2001 From: Jordan Orelli Date: Sun, 7 Aug 2016 12:18:13 -0400 Subject: [PATCH] pool packet body buffers --- main.go | 5 +---- packet.go | 25 +++++++++++++++++++------ parser.go | 33 ++++++++++++++++++--------------- 3 files changed, 38 insertions(+), 25 deletions(-) diff --git a/main.go b/main.go index a928b2b..c584f95 100644 --- a/main.go +++ b/main.go @@ -9,7 +9,6 @@ import ( "fmt" "io" "os" - // "reflect" "runtime/pprof" "strings" ) @@ -130,11 +129,9 @@ func main() { for m := range c { if m.error != nil { fmt.Fprintln(os.Stderr, m.error) - } else { - // fmt.Println(reflect.TypeOf(m.Message)) } } if p.err != nil { - fmt.Println(err) + fmt.Printf("parser error: %v\n", p.err) } } diff --git a/packet.go b/packet.go index d60d7f4..79c1d99 100644 --- a/packet.go +++ b/packet.go @@ -4,21 +4,24 @@ import ( "fmt" "github.com/golang/protobuf/proto" + "github.com/golang/snappy" ) // packet represents the top-level envelope in the dota replay format. All // data in the replay file is packed into packets of at most 65kb. type packet struct { - cmd packetType - tick int64 - body []byte + cmd packetType + tick int64 + compressed bool + size int + body []byte } func (p packet) String() string { if len(p.body) > 30 { - return fmt.Sprintf("{packet cmd: %v tick: %v size: %d body: %x...}", p.cmd, p.tick, len(p.body), p.body[:27]) + return fmt.Sprintf("{packet cmd: %v tick: %v size: %d body: %x...}", p.cmd, p.tick, p.size, p.body[:27]) } - return fmt.Sprintf("{packet cmd: %v tick: %v size: %d body: %x}", p.cmd, p.tick, len(p.body), p.body) + return fmt.Sprintf("{packet cmd: %v tick: %v size: %d body: %x}", p.cmd, p.tick, p.size, p.body) } func (p *packet) Open(m *messageFactory, pbuf *proto.Buffer) (proto.Message, error) { @@ -26,7 +29,17 @@ func (p *packet) Open(m *messageFactory, pbuf *proto.Buffer) (proto.Message, err if err != nil { return nil, err } - pbuf.SetBuf(p.body) + + if p.compressed { + buf, err := snappy.Decode(nil, p.body[:p.size]) + if err != nil { + return nil, wrap(err, "packet open failed snappy decode") + } + pbuf.SetBuf(buf) + } else { + pbuf.SetBuf(p.body[:p.size]) + } + if err := pbuf.Unmarshal(msg); err != nil { return nil, err } diff --git a/parser.go b/parser.go index a1443bc..ca59c13 100644 --- a/parser.go +++ b/parser.go @@ -4,9 +4,9 @@ import ( "bufio" "fmt" "io" + "sync" "github.com/golang/protobuf/proto" - "github.com/golang/snappy" "github.com/jordanorelli/hyperstone/bit" "github.com/jordanorelli/hyperstone/dota" ) @@ -21,6 +21,8 @@ type parser struct { ewl entityWhitelist pwl packetWhitelist + + packets *sync.Pool } func newParser(r io.Reader) *parser { @@ -30,6 +32,11 @@ func newParser(r io.Reader) *parser { pbuf: new(proto.Buffer), ewl: allEntities, pwl: allPackets, + packets: &sync.Pool{ + New: func() interface{} { + return &packet{body: make([]byte, 1<<16)} + }, + }, } } @@ -58,6 +65,8 @@ func (p *parser) run(out chan maybe) { return } + p.packets.Put(pkt) + switch m := msg.(type) { case *dota.CDemoPacket: p.emitChildren(m, out) @@ -202,23 +211,17 @@ func (p *parser) readPacket() (*packet, error) { return p.readPacket() } + pkt := p.packets.Get().(*packet) + pkt.cmd = cmd + pkt.tick = int64(tick) + pkt.size = int(size) + pkt.compressed = compressed + if size > 0 { - buf := make([]byte, int(size)) - if _, err := io.ReadFull(p.source, buf); err != nil { + if _, err := io.ReadFull(p.source, pkt.body[:pkt.size]); err != nil { return nil, wrap(err, "readPacket couldn't read packet body") } - - if compressed { - var err error - buf, err = snappy.Decode(nil, buf) - if err != nil { - return nil, wrap(err, "readPacket couldn't snappy decode body") - } - } - // TODO: pool these! - return &packet{cmd, int64(tick), buf}, nil } - // TODO: pool these! - return &packet{cmd, int64(tick), nil}, nil + return pkt, nil }