fixing up pending logic

master
Jordan Orelli 4 years ago
parent d1208a21f0
commit 6859ca132d

@ -19,17 +19,17 @@ type Client struct {
lastSeq int lastSeq int
conn *websocket.Conn conn *websocket.Conn
// outbox is the set of requests that we'd like to send. The send loop will outbox chan *pending
// read off of this channel and write these values to the underlying resolved chan Response
// websocket connection.
outbox chan Request
} }
// Dial dials the server specified by the client. The returned read-only // Dial dials the server specified by the client. The returned read-only
// channel is a channel of responses from the server that are not replies to a // channel is a channel of responses from the server that are not replies to a
// request sent by the client. // request sent by the client.
func (c *Client) Dial() (<-chan Response, error) { func (c *Client) Dial() (<-chan Response, error) {
c.outbox = make(chan Request) c.outbox = make(chan *pending)
c.resolved = make(chan Response)
dialer := websocket.Dialer{ dialer := websocket.Dialer{
HandshakeTimeout: 3 * time.Second, HandshakeTimeout: 3 * time.Second,
ReadBufferSize: 32 * 1024, ReadBufferSize: 32 * 1024,
@ -58,15 +58,25 @@ func (c *Client) Dial() (<-chan Response, error) {
return notifications, nil return notifications, nil
} }
func (c *Client) Send(v Value) { func (c *Client) Send(v Value) (Response, error) {
c.lastSeq++
d := 3 * time.Second d := 3 * time.Second
timeout := time.NewTimer(d) timeout := time.NewTimer(d)
done := make(chan struct{})
p := pending{v: v, done: done}
select { select {
case c.outbox <- NewRequest(c.lastSeq, v): case c.outbox <- &p:
timeout.Stop() timeout.Stop()
case <-timeout.C: case <-timeout.C:
c.Error("send timed out after %v", d) return Response{}, fmt.Errorf("send timed out after %v", d)
}
select {
case <-done:
return p.res, p.err
case <-timeout.C:
return Response{}, fmt.Errorf("send timed out (2) after %v", d)
} }
} }
@ -92,33 +102,55 @@ func (c *Client) readLoop(notifications chan<- Response) {
c.Child("read-frame").Info(string(b)) c.Child("read-frame").Info(string(b))
if res.Re <= 0 { if res.Re <= 0 {
notifications <- res notifications <- res
} else {
c.resolved <- res
} }
} }
} }
func (c *Client) writeLoop(done chan bool) { func (c *Client) writeLoop(done chan bool) {
sent := make(map[int]*pending)
for { for {
select { select {
case req := <-c.outbox: case p := <-c.outbox:
c.lastSeq++
req := NewRequest(c.lastSeq, p.v)
w, err := c.conn.NextWriter(websocket.TextMessage) w, err := c.conn.NextWriter(websocket.TextMessage)
if err != nil { if err != nil {
c.Error("unable to get a writer frame: %v", err) p.err = fmt.Errorf("unable to get a writer frame: %w", err)
close(p.done)
return return
} }
b, err := json.Marshal(req) b, err := json.Marshal(req)
if err != nil { if err != nil {
c.Error("unable to marshal outgoing response: %v", err) p.err = fmt.Errorf("unable to marshal outgoing response: %w", err)
close(p.done)
break break
} }
if _, err := w.Write(b); err != nil { if _, err := w.Write(b); err != nil {
c.Error("failed to write payload: %v", err) p.err = fmt.Errorf("failed to write payload: %w", err)
close(p.done)
break break
} }
if err := w.Close(); err != nil { if err := w.Close(); err != nil {
c.Error("failed to close write frame: %v", err) p.err = fmt.Errorf("failed to close write frame: %w", err)
close(p.done)
break break
} }
c.Child("write-frame").Info(string(b)) c.Child("write-frame").Info(string(b))
sent[c.lastSeq] = p
case res := <-c.resolved:
p, ok := sent[res.Re]
if !ok {
c.Error("saw response for unknown seq %d")
break
}
p.res = res
close(p.done)
case shouldClose := <-done: case shouldClose := <-done:
if shouldClose { if shouldClose {
msg := websocket.FormatCloseMessage(websocket.CloseNormalClosure, "") msg := websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")
@ -136,5 +168,11 @@ func (c *Client) writeLoop(done chan bool) {
} }
} }
} }
}
type pending struct {
v Value
res Response
err error
done chan struct{}
} }

Loading…
Cancel
Save