From d1208a21f0e89e0264c3a891771a21a778f49a3f Mon Sep 17 00:00:00 2001 From: Jordan Orelli Date: Sun, 25 Oct 2020 23:10:12 +0000 Subject: [PATCH] put the client in the wire package --- client.go | 113 -------------------------------- internal/wire/client.go | 140 ++++++++++++++++++++++++++++++++++++++++ ui.go | 18 ++++-- ui_mode.go | 8 +-- 4 files changed, 156 insertions(+), 123 deletions(-) delete mode 100644 client.go create mode 100644 internal/wire/client.go diff --git a/client.go b/client.go deleted file mode 100644 index a1bcfe7..0000000 --- a/client.go +++ /dev/null @@ -1,113 +0,0 @@ -package main - -import ( - "context" - "encoding/json" - "fmt" - "io/ioutil" - "net/url" - "strings" - "time" - - "github.com/gorilla/websocket" - "github.com/jordanorelli/astro-domu/internal/wire" - "github.com/jordanorelli/blammo" -) - -type client struct { - *blammo.Log - host string - port int - lastSeq int - conn *websocket.Conn - outbox chan wire.Request -} - -func (c *client) run(ctx context.Context) { - c.outbox = make(chan wire.Request) - - dialer := websocket.Dialer{ - HandshakeTimeout: 3 * time.Second, - ReadBufferSize: 32 * 1024, - WriteBufferSize: 32 * 1024, - Subprotocols: []string{"astrodomu@v0"}, - } - - path := url.URL{ - Host: fmt.Sprintf("%s:%d", c.host, c.port), - Scheme: "ws", - Path: "/", - } - - conn, res, err := dialer.Dial(path.String(), nil) - if err != nil { - c.Error("dial error: %v", err) - return - } - c.conn = conn - go c.readLoop() - - c.Debug("dial response status: %d", res.StatusCode) - for k, vals := range res.Header { - c.Debug("dial response header: %s = %s", k, strings.Join(vals, ",")) - } - - for { - select { - case req := <-c.outbox: - payload, err := json.Marshal(req) - if err != nil { - c.Error("unable to marshal a request: %v", err) - break - } - - w, err := conn.NextWriter(websocket.TextMessage) - if err != nil { - c.Error("unable to get a websocket frame writer: %v", err) - break - } - - if _, err := w.Write(payload); err != nil { - c.Error("failed to write payload of length %d: %v", len(payload), err) - break - } - c.Child("sent-frame").Info(string(payload)) - - if err := w.Close(); err != nil { - c.Error("failed to close websocket write frame: %v", err) - } - case <-ctx.Done(): - c.Info("parent context done, sending close message") - msg := websocket.FormatCloseMessage(websocket.CloseNormalClosure, "") - if err := conn.WriteMessage(websocket.CloseMessage, msg); err != nil { - c.Error("failed to write close message: %v", err) - } - c.Info("closing connection") - if err := conn.Close(); err != nil { - c.Error("failed to close connection: %v", err) - } - c.Info("connection closed") - c.conn = nil - return - } - } -} - -func (c *client) send(v wire.Value) { - c.lastSeq++ - c.outbox <- wire.NewRequest(c.lastSeq, v) -} - -func (c *client) readLoop() { - for { - _, r, err := c.conn.NextReader() - if err != nil { - return - } - b, err := ioutil.ReadAll(r) - if err != nil { - return - } - c.Log.Child("received-frame").Info(string(b)) - } -} diff --git a/internal/wire/client.go b/internal/wire/client.go new file mode 100644 index 0000000..2c2642d --- /dev/null +++ b/internal/wire/client.go @@ -0,0 +1,140 @@ +package wire + +import ( + "encoding/json" + "fmt" + "io/ioutil" + "net/url" + "time" + + "github.com/gorilla/websocket" + "github.com/jordanorelli/blammo" +) + +type Client struct { + *blammo.Log + Host string + Port int + + lastSeq int + conn *websocket.Conn + + // outbox is the set of requests that we'd like to send. The send loop will + // read off of this channel and write these values to the underlying + // websocket connection. + outbox chan Request +} + +// Dial dials the server specified by the client. The returned read-only +// channel is a channel of responses from the server that are not replies to a +// request sent by the client. +func (c *Client) Dial() (<-chan Response, error) { + c.outbox = make(chan Request) + dialer := websocket.Dialer{ + HandshakeTimeout: 3 * time.Second, + ReadBufferSize: 32 * 1024, + WriteBufferSize: 32 * 1024, + Subprotocols: []string{"astrodomu@v0"}, + } + + path := url.URL{ + Host: fmt.Sprintf("%s:%d", c.Host, c.Port), + Scheme: "ws", + Path: "/", + } + + c.Info("dialing: %s", path.String()) + conn, _, err := dialer.Dial(path.String(), nil) + if err != nil { + return nil, fmt.Errorf("dial error: %w", err) + } + c.Info("connected to server") + + c.conn = conn + done := make(chan bool, 2) + notifications := make(chan Response) + go c.readLoop(notifications) + go c.writeLoop(done) + return notifications, nil +} + +func (c *Client) Send(v Value) { + c.lastSeq++ + d := 3 * time.Second + timeout := time.NewTimer(d) + select { + case c.outbox <- NewRequest(c.lastSeq, v): + timeout.Stop() + case <-timeout.C: + c.Error("send timed out after %v", d) + } +} + +func (c *Client) readLoop(notifications chan<- Response) { + defer close(notifications) + + for { + _, r, err := c.conn.NextReader() + if err != nil { + c.Error("unable to get a reader frame: %v", err) + return + } + b, err := ioutil.ReadAll(r) + if err != nil { + c.Error("unable to read frame: %v", err) + return + } + var res Response + if err := json.Unmarshal(b, &res); err != nil { + c.Error("unable to parse frame data: %v", err) + continue + } + c.Child("read-frame").Info(string(b)) + if res.Re <= 0 { + notifications <- res + } + } +} + +func (c *Client) writeLoop(done chan bool) { + for { + select { + case req := <-c.outbox: + w, err := c.conn.NextWriter(websocket.TextMessage) + if err != nil { + c.Error("unable to get a writer frame: %v", err) + return + } + b, err := json.Marshal(req) + if err != nil { + c.Error("unable to marshal outgoing response: %v", err) + break + } + if _, err := w.Write(b); err != nil { + c.Error("failed to write payload: %v", err) + break + } + if err := w.Close(); err != nil { + c.Error("failed to close write frame: %v", err) + break + } + c.Child("write-frame").Info(string(b)) + case shouldClose := <-done: + if shouldClose { + msg := websocket.FormatCloseMessage(websocket.CloseNormalClosure, "") + if err := c.conn.WriteMessage(websocket.CloseMessage, msg); err != nil { + c.Error("failed to write close message: %v", err) + } + c.Info("closing connection") + if err := c.conn.Close(); err != nil { + c.Error("failed to close connection: %v", err) + } + c.Info("connection closed") + c.conn = nil + return + + } + } + } + +} diff --git a/ui.go b/ui.go index da04024..6a27e52 100644 --- a/ui.go +++ b/ui.go @@ -6,6 +6,7 @@ import ( "github.com/gdamore/tcell/v2" "github.com/jordanorelli/astro-domu/internal/exit" + "github.com/jordanorelli/astro-domu/internal/wire" "github.com/jordanorelli/blammo" ) @@ -14,15 +15,22 @@ type ui struct { *blammo.Log screen tcell.Screen mode uiMode - client *client + client *wire.Client } func (ui *ui) run() { - ui.client = &client{ + ui.client = &wire.Client{ Log: ui.Child("client"), - host: "127.0.0.1", - port: 12805, + Host: "127.0.0.1", + Port: 12805, } + + _, err := ui.client.Dial() + if err != nil { + ui.Error("unable to dial server: %v", err) + return + } + ctx := context.Background() ctx, cancel := context.WithCancel(ctx) @@ -32,8 +40,6 @@ func (ui *ui) run() { time.Sleep(time.Second) }() - go ui.client.run(ctx) - screen, err := tcell.NewScreen() if err != nil { exit.WithMessage(1, "unable to create a screen: %v", err) diff --git a/ui_mode.go b/ui_mode.go index 7675c1f..0b2a2a2 100644 --- a/ui_mode.go +++ b/ui_mode.go @@ -23,16 +23,16 @@ func (m *boxWalker) handleEvent(ui *ui, e tcell.Event) bool { if key == tcell.KeyRune { switch v.Rune() { case 'w': - ui.client.send(wire.Self_Move{Delta: true, X: 0, Y: -1}) + ui.client.Send(wire.Self_Move{Delta: true, X: 0, Y: -1}) m.move(0, -1) case 'a': - ui.client.send(wire.Self_Move{Delta: true, X: -1, Y: 0}) + ui.client.Send(wire.Self_Move{Delta: true, X: -1, Y: 0}) m.move(-1, 0) case 's': - ui.client.send(wire.Self_Move{Delta: true, X: 0, Y: 1}) + ui.client.Send(wire.Self_Move{Delta: true, X: 0, Y: 1}) m.move(0, 1) case 'd': - ui.client.send(wire.Self_Move{Delta: true, X: 1, Y: 0}) + ui.client.Send(wire.Self_Move{Delta: true, X: 1, Y: 0}) m.move(1, 0) } }