You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
197 lines
4.1 KiB
Go
197 lines
4.1 KiB
Go
package wire
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"io/ioutil"
|
|
"net/url"
|
|
"time"
|
|
|
|
"github.com/gorilla/websocket"
|
|
"github.com/jordanorelli/blammo"
|
|
)
|
|
|
|
type Client struct {
|
|
*blammo.Log
|
|
Host string
|
|
Port int
|
|
|
|
conn *websocket.Conn
|
|
|
|
outbox chan *pending
|
|
resolved chan Response
|
|
done chan bool
|
|
}
|
|
|
|
// Dial dials the server specified by the client. The returned read-only
|
|
// channel is a channel of responses from the server that are not replies to a
|
|
// request sent by the client.
|
|
func (c *Client) Dial() (<-chan Response, error) {
|
|
c.outbox = make(chan *pending)
|
|
c.resolved = make(chan Response)
|
|
c.done = make(chan bool, 1)
|
|
|
|
dialer := websocket.Dialer{
|
|
HandshakeTimeout: 3 * time.Second,
|
|
ReadBufferSize: 32 * 1024,
|
|
WriteBufferSize: 32 * 1024,
|
|
Subprotocols: []string{"astrodomu@v0"},
|
|
}
|
|
|
|
path := url.URL{
|
|
Host: fmt.Sprintf("%s:%d", c.Host, c.Port),
|
|
Scheme: "ws",
|
|
Path: "/",
|
|
}
|
|
|
|
c.Info("dialing: %s", path.String())
|
|
conn, _, err := dialer.Dial(path.String(), nil)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("dial error: %w", err)
|
|
}
|
|
c.Info("connected to server")
|
|
|
|
c.conn = conn
|
|
notifications := make(chan Response)
|
|
go c.readLoop(notifications)
|
|
go c.writeLoop()
|
|
return notifications, nil
|
|
}
|
|
|
|
func (c *Client) Send(v Value) (Response, error) {
|
|
d := 3 * time.Second
|
|
timeout := time.NewTimer(d)
|
|
|
|
done := make(chan struct{})
|
|
p := pending{v: v, done: done}
|
|
|
|
select {
|
|
case c.outbox <- &p:
|
|
timeout.Stop()
|
|
case <-timeout.C:
|
|
return Response{}, fmt.Errorf("send timed out after %v", d)
|
|
}
|
|
|
|
select {
|
|
case <-done:
|
|
if p.err == nil {
|
|
if err, ok := p.res.Body.(error); ok {
|
|
return p.res, err
|
|
}
|
|
}
|
|
return p.res, p.err
|
|
case <-timeout.C:
|
|
return Response{}, fmt.Errorf("send timed out (2) after %v", d)
|
|
}
|
|
}
|
|
|
|
func (c *Client) Close() { c.done <- true }
|
|
|
|
func (c *Client) readLoop(notifications chan<- Response) {
|
|
defer close(notifications)
|
|
|
|
for {
|
|
c.Info("waiting for a reader frame")
|
|
_, r, err := c.conn.NextReader()
|
|
if err != nil {
|
|
if websocket.IsCloseError(err, websocket.CloseNormalClosure) {
|
|
c.Info("received close frame from server")
|
|
break
|
|
}
|
|
c.Error("unable to get a reader frame: %v", err)
|
|
break
|
|
}
|
|
b, err := ioutil.ReadAll(r)
|
|
if err != nil {
|
|
c.Error("unable to read frame: %v", err)
|
|
continue
|
|
}
|
|
var res Response
|
|
if err := json.Unmarshal(b, &res); err != nil {
|
|
c.Error("unable to parse frame data: %v", err)
|
|
continue
|
|
}
|
|
c.Child("read-frame").Info(string(b))
|
|
if res.Re <= 0 {
|
|
notifications <- res
|
|
} else {
|
|
c.resolved <- res
|
|
}
|
|
}
|
|
c.done <- false
|
|
|
|
c.Info("closing connection")
|
|
if err := c.conn.Close(); err != nil {
|
|
c.Error("error on connection close: %v", err)
|
|
} else {
|
|
c.Info("closed connection cleanly")
|
|
}
|
|
}
|
|
|
|
func (c *Client) writeLoop() {
|
|
sent := make(map[int]*pending)
|
|
|
|
for seq := 1; true; seq++ {
|
|
select {
|
|
case p := <-c.outbox:
|
|
req := Request{seq, p.v}
|
|
|
|
w, err := c.conn.NextWriter(websocket.TextMessage)
|
|
if err != nil {
|
|
p.err = fmt.Errorf("unable to get a writer frame: %w", err)
|
|
close(p.done)
|
|
return
|
|
}
|
|
b, err := json.Marshal(req)
|
|
if err != nil {
|
|
p.err = fmt.Errorf("unable to marshal outgoing response: %w", err)
|
|
close(p.done)
|
|
break
|
|
}
|
|
if _, err := w.Write(b); err != nil {
|
|
p.err = fmt.Errorf("failed to write payload: %w", err)
|
|
close(p.done)
|
|
break
|
|
}
|
|
if err := w.Close(); err != nil {
|
|
p.err = fmt.Errorf("failed to close write frame: %w", err)
|
|
close(p.done)
|
|
break
|
|
}
|
|
c.Child("write-frame").Info(string(b))
|
|
sent[seq] = p
|
|
|
|
case res := <-c.resolved:
|
|
p, ok := sent[res.Re]
|
|
if !ok {
|
|
c.Error("saw response for unknown seq %d", res.Re)
|
|
break
|
|
}
|
|
delete(sent, res.Re)
|
|
p.res = res
|
|
close(p.done)
|
|
|
|
case shouldClose := <-c.done:
|
|
c.Info("write loop sees done signal: %t", shouldClose)
|
|
|
|
if shouldClose {
|
|
c.Info("sending close frame")
|
|
msg := websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")
|
|
if err := c.conn.WriteMessage(websocket.CloseMessage, msg); err != nil {
|
|
c.Error("failed to write close message: %v", err)
|
|
} else {
|
|
c.Info("sent close frame")
|
|
}
|
|
}
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
type pending struct {
|
|
v Value
|
|
res Response
|
|
err error
|
|
done chan struct{}
|
|
}
|