From 201ebfe8b58c7c702a6a5056e7aefdfe2b1eb493 Mon Sep 17 00:00:00 2001 From: Jordan Orelli Date: Sun, 25 Oct 2020 18:58:14 +0000 Subject: [PATCH] send and receive messages --- client.go | 33 ++++++++++++++----------- error.go | 13 ++++++++++ internal/errors/std.go | 38 ++++++++++++++++++++++++++++ response.go | 14 +++++++++++ server.go | 27 ++++++++++++++++---- session.go | 56 ++++++++++++++++++++++++++++++++++++++++++ 6 files changed, 161 insertions(+), 20 deletions(-) create mode 100644 error.go create mode 100644 internal/errors/std.go create mode 100644 response.go create mode 100644 session.go diff --git a/client.go b/client.go index 38e84d8..9b252eb 100644 --- a/client.go +++ b/client.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "io/ioutil" "net/url" "strings" "time" @@ -43,15 +44,13 @@ func (c *client) run(ctx context.Context) { return } c.conn = conn + go c.readLoop() c.Debug("dial response status: %d", res.StatusCode) for k, vals := range res.Header { c.Debug("dial response header: %s = %s", k, strings.Join(vals, ",")) } - tick := time.NewTicker(time.Second) - defer tick.Stop() - for { select { case req := <-c.outbox: @@ -67,25 +66,15 @@ func (c *client) run(ctx context.Context) { break } - n, err := w.Write(payload) - if err != nil { + if _, err := w.Write(payload); err != nil { c.Error("failed to write payload of length %d: %v", len(payload), err) break } - c.Info("wrote %d bytes for payload of length %d", n, len(payload)) + c.Child("sent-frame").Info(string(payload)) if err := w.Close(); err != nil { c.Error("failed to close websocket write frame: %v", err) } - case <-tick.C: - w, err := conn.NextWriter(websocket.TextMessage) - if err != nil { - c.Error("unable to get a websocket frame writer: %v", err) - break - } - - w.Write([]byte("hey")) - w.Close() case <-ctx.Done(): c.Info("parent context done, sending close message") msg := websocket.FormatCloseMessage(websocket.CloseNormalClosure, "") @@ -121,3 +110,17 @@ func (c *client) send(cmd string, args map[string]interface{}) { } c.outbox <- req } + +func (c *client) readLoop() { + for { + _, r, err := c.conn.NextReader() + if err != nil { + return + } + b, err := ioutil.ReadAll(r) + if err != nil { + return + } + c.Log.Child("received-frame").Info(string(b)) + } +} diff --git a/error.go b/error.go new file mode 100644 index 0000000..bbe70d0 --- /dev/null +++ b/error.go @@ -0,0 +1,13 @@ +package main + +func errorResponse(re int, err error) response { + var body struct { + Message string `json:"message"` + } + body.Message = err.Error() + return response{ + Re: re, + Type: "error", + Body: body, + } +} diff --git a/internal/errors/std.go b/internal/errors/std.go new file mode 100644 index 0000000..2de5b92 --- /dev/null +++ b/internal/errors/std.go @@ -0,0 +1,38 @@ +package errors + +import ( + std "errors" +) + +// New returns an error that formats as the given text. +// Each call to New returns a distinct error value even if the text is identical. +func New(text string) error { return std.New(text) } + +// Unwrap returns the result of calling the Unwrap method on err, if err's +// type contains an Unwrap method returning error. +// Otherwise, Unwrap returns nil. +func Unwrap(err error) error { return std.Unwrap(err) } + +// Is reports whether any error in err's chain matches target. +// +// The chain consists of err itself followed by the sequence of errors obtained by +// repeatedly calling Unwrap. +// +// An error is considered to match a target if it is equal to that target or if +// it implements a method Is(error) bool such that Is(target) returns true. +func Is(err, target error) bool { return std.Is(err, target) } + +// As finds the first error in err's chain that matches target, and if so, sets +// target to that error value and returns true. +// +// The chain consists of err itself followed by the sequence of errors obtained by +// repeatedly calling Unwrap. +// +// An error matches target if the error's concrete value is assignable to the value +// pointed to by target, or if the error has a method As(interface{}) bool such that +// As(target) returns true. In the latter case, the As method is responsible for +// setting target. +// +// As will panic if target is not a non-nil pointer to either a type that implements +// error, or to any interface type. As returns false if err is nil. +func As(err error, target interface{}) bool { return std.As(err, target) } diff --git a/response.go b/response.go new file mode 100644 index 0000000..d4a6bc5 --- /dev/null +++ b/response.go @@ -0,0 +1,14 @@ +package main + +type response struct { + Re int `json:"re"` + Type string `json:"type"` + Body interface{} `json:"body,omitempty"` +} + +func ok(re int) response { + return response{ + Re: re, + Type: "ok", + } +} diff --git a/server.go b/server.go index 7e99a2f..6965266 100644 --- a/server.go +++ b/server.go @@ -1,10 +1,12 @@ package main import ( + "context" "encoding/json" "fmt" "io/ioutil" "net/http" + "strconv" "github.com/gorilla/websocket" "github.com/jordanorelli/blammo" @@ -12,9 +14,10 @@ import ( type server struct { *blammo.Log - host string - port int - world *room + host string + port int + world *room + lastSessionID int } func (s *server) listen() error { @@ -30,6 +33,18 @@ func (s *server) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + s.lastSessionID++ + sn := session{ + Log: s.Log.Child("sessions").Child(strconv.Itoa(s.lastSessionID)), + id: s.lastSessionID, + conn: conn, + outbox: make(chan response), + } + go sn.pump(ctx) + for { t, r, err := conn.NextReader() if err != nil { @@ -44,14 +59,16 @@ func (s *server) ServeHTTP(w http.ResponseWriter, r *http.Request) { s.Error("readall error: %v", err) break } - s.Info("received: %s", text) + sn.Log.Child("received-frame").Info(string(text)) var body requestBody if err := json.Unmarshal(text, &body); err != nil { s.Error("unable to parse request: %v", err) + sn.outbox <- errorResponse(0, fmt.Errorf("unable to parse request: %v", err)) break } + sn.outbox <- ok(body.Seq) case websocket.BinaryMessage: - + sn.outbox <- errorResponse(0, fmt.Errorf("unable to parse binary frames")) } } } diff --git a/session.go b/session.go new file mode 100644 index 0000000..c0c8760 --- /dev/null +++ b/session.go @@ -0,0 +1,56 @@ +package main + +import ( + "context" + "encoding/json" + "time" + + "github.com/gorilla/websocket" + "github.com/jordanorelli/blammo" +) + +// session represents the server side of a client's session. i.e., a single +// connection along with its associated state. +type session struct { + *blammo.Log + id int + conn *websocket.Conn + outbox chan response +} + +// pump is the session send loop. Pump should pump the session's outbox +// messages to the underlying connection until the context is closed. +func (sn *session) pump(ctx context.Context) { + for { + select { + case res := <-sn.outbox: + payload, err := json.Marshal(res) + if err != nil { + sn.Error("failed to marshal outgoing response: %v", err) + break + } + + if err := sn.conn.SetWriteDeadline(time.Now().Add(3 * time.Second)); err != nil { + sn.Error("failed to set write deadline: %v", err) + break + } + w, err := sn.conn.NextWriter(websocket.TextMessage) + if err != nil { + sn.Error("failed get a writer frame: %v", err) + break + } + if _, err := w.Write(payload); err != nil { + sn.Error("failed write payload: %v", err) + break + } + if err := w.Close(); err != nil { + sn.Error("failed to close write frame: %v", err) + break + } + sn.Child("sent-frame").Info(string(payload)) + case <-ctx.Done(): + sn.Info("parent context done, shutting down write pump") + return + } + } +}