pool packet body buffers

master
Jordan Orelli 8 years ago
parent b997309674
commit 1f8808b098

@ -9,7 +9,6 @@ import (
"fmt"
"io"
"os"
// "reflect"
"runtime/pprof"
"strings"
)
@ -130,11 +129,9 @@ func main() {
for m := range c {
if m.error != nil {
fmt.Fprintln(os.Stderr, m.error)
} else {
// fmt.Println(reflect.TypeOf(m.Message))
}
}
if p.err != nil {
fmt.Println(err)
fmt.Printf("parser error: %v\n", p.err)
}
}

@ -4,6 +4,7 @@ import (
"fmt"
"github.com/golang/protobuf/proto"
"github.com/golang/snappy"
)
// packet represents the top-level envelope in the dota replay format. All
@ -11,14 +12,16 @@ import (
type packet struct {
cmd packetType
tick int64
compressed bool
size int
body []byte
}
func (p packet) String() string {
if len(p.body) > 30 {
return fmt.Sprintf("{packet cmd: %v tick: %v size: %d body: %x...}", p.cmd, p.tick, len(p.body), p.body[:27])
return fmt.Sprintf("{packet cmd: %v tick: %v size: %d body: %x...}", p.cmd, p.tick, p.size, p.body[:27])
}
return fmt.Sprintf("{packet cmd: %v tick: %v size: %d body: %x}", p.cmd, p.tick, len(p.body), p.body)
return fmt.Sprintf("{packet cmd: %v tick: %v size: %d body: %x}", p.cmd, p.tick, p.size, p.body)
}
func (p *packet) Open(m *messageFactory, pbuf *proto.Buffer) (proto.Message, error) {
@ -26,7 +29,17 @@ func (p *packet) Open(m *messageFactory, pbuf *proto.Buffer) (proto.Message, err
if err != nil {
return nil, err
}
pbuf.SetBuf(p.body)
if p.compressed {
buf, err := snappy.Decode(nil, p.body[:p.size])
if err != nil {
return nil, wrap(err, "packet open failed snappy decode")
}
pbuf.SetBuf(buf)
} else {
pbuf.SetBuf(p.body[:p.size])
}
if err := pbuf.Unmarshal(msg); err != nil {
return nil, err
}

@ -4,9 +4,9 @@ import (
"bufio"
"fmt"
"io"
"sync"
"github.com/golang/protobuf/proto"
"github.com/golang/snappy"
"github.com/jordanorelli/hyperstone/bit"
"github.com/jordanorelli/hyperstone/dota"
)
@ -21,6 +21,8 @@ type parser struct {
ewl entityWhitelist
pwl packetWhitelist
packets *sync.Pool
}
func newParser(r io.Reader) *parser {
@ -30,6 +32,11 @@ func newParser(r io.Reader) *parser {
pbuf: new(proto.Buffer),
ewl: allEntities,
pwl: allPackets,
packets: &sync.Pool{
New: func() interface{} {
return &packet{body: make([]byte, 1<<16)}
},
},
}
}
@ -58,6 +65,8 @@ func (p *parser) run(out chan maybe) {
return
}
p.packets.Put(pkt)
switch m := msg.(type) {
case *dota.CDemoPacket:
p.emitChildren(m, out)
@ -202,23 +211,17 @@ func (p *parser) readPacket() (*packet, error) {
return p.readPacket()
}
pkt := p.packets.Get().(*packet)
pkt.cmd = cmd
pkt.tick = int64(tick)
pkt.size = int(size)
pkt.compressed = compressed
if size > 0 {
buf := make([]byte, int(size))
if _, err := io.ReadFull(p.source, buf); err != nil {
if _, err := io.ReadFull(p.source, pkt.body[:pkt.size]); err != nil {
return nil, wrap(err, "readPacket couldn't read packet body")
}
if compressed {
var err error
buf, err = snappy.Decode(nil, buf)
if err != nil {
return nil, wrap(err, "readPacket couldn't snappy decode body")
}
}
// TODO: pool these!
return &packet{cmd, int64(tick), buf}, nil
}
// TODO: pool these!
return &packet{cmd, int64(tick), nil}, nil
return pkt, nil
}

Loading…
Cancel
Save