send and receive messages

master
Jordan Orelli 4 years ago
parent dd8762667d
commit 201ebfe8b5

@ -4,6 +4,7 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"io/ioutil"
"net/url" "net/url"
"strings" "strings"
"time" "time"
@ -43,15 +44,13 @@ func (c *client) run(ctx context.Context) {
return return
} }
c.conn = conn c.conn = conn
go c.readLoop()
c.Debug("dial response status: %d", res.StatusCode) c.Debug("dial response status: %d", res.StatusCode)
for k, vals := range res.Header { for k, vals := range res.Header {
c.Debug("dial response header: %s = %s", k, strings.Join(vals, ",")) c.Debug("dial response header: %s = %s", k, strings.Join(vals, ","))
} }
tick := time.NewTicker(time.Second)
defer tick.Stop()
for { for {
select { select {
case req := <-c.outbox: case req := <-c.outbox:
@ -67,25 +66,15 @@ func (c *client) run(ctx context.Context) {
break break
} }
n, err := w.Write(payload) if _, err := w.Write(payload); err != nil {
if err != nil {
c.Error("failed to write payload of length %d: %v", len(payload), err) c.Error("failed to write payload of length %d: %v", len(payload), err)
break break
} }
c.Info("wrote %d bytes for payload of length %d", n, len(payload)) c.Child("sent-frame").Info(string(payload))
if err := w.Close(); err != nil { if err := w.Close(); err != nil {
c.Error("failed to close websocket write frame: %v", err) c.Error("failed to close websocket write frame: %v", err)
} }
case <-tick.C:
w, err := conn.NextWriter(websocket.TextMessage)
if err != nil {
c.Error("unable to get a websocket frame writer: %v", err)
break
}
w.Write([]byte("hey"))
w.Close()
case <-ctx.Done(): case <-ctx.Done():
c.Info("parent context done, sending close message") c.Info("parent context done, sending close message")
msg := websocket.FormatCloseMessage(websocket.CloseNormalClosure, "") msg := websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")
@ -121,3 +110,17 @@ func (c *client) send(cmd string, args map[string]interface{}) {
} }
c.outbox <- req c.outbox <- req
} }
func (c *client) readLoop() {
for {
_, r, err := c.conn.NextReader()
if err != nil {
return
}
b, err := ioutil.ReadAll(r)
if err != nil {
return
}
c.Log.Child("received-frame").Info(string(b))
}
}

@ -0,0 +1,13 @@
package main
func errorResponse(re int, err error) response {
var body struct {
Message string `json:"message"`
}
body.Message = err.Error()
return response{
Re: re,
Type: "error",
Body: body,
}
}

@ -0,0 +1,38 @@
package errors
import (
std "errors"
)
// New returns an error that formats as the given text.
// Each call to New returns a distinct error value even if the text is identical.
func New(text string) error { return std.New(text) }
// Unwrap returns the result of calling the Unwrap method on err, if err's
// type contains an Unwrap method returning error.
// Otherwise, Unwrap returns nil.
func Unwrap(err error) error { return std.Unwrap(err) }
// Is reports whether any error in err's chain matches target.
//
// The chain consists of err itself followed by the sequence of errors obtained by
// repeatedly calling Unwrap.
//
// An error is considered to match a target if it is equal to that target or if
// it implements a method Is(error) bool such that Is(target) returns true.
func Is(err, target error) bool { return std.Is(err, target) }
// As finds the first error in err's chain that matches target, and if so, sets
// target to that error value and returns true.
//
// The chain consists of err itself followed by the sequence of errors obtained by
// repeatedly calling Unwrap.
//
// An error matches target if the error's concrete value is assignable to the value
// pointed to by target, or if the error has a method As(interface{}) bool such that
// As(target) returns true. In the latter case, the As method is responsible for
// setting target.
//
// As will panic if target is not a non-nil pointer to either a type that implements
// error, or to any interface type. As returns false if err is nil.
func As(err error, target interface{}) bool { return std.As(err, target) }

@ -0,0 +1,14 @@
package main
type response struct {
Re int `json:"re"`
Type string `json:"type"`
Body interface{} `json:"body,omitempty"`
}
func ok(re int) response {
return response{
Re: re,
Type: "ok",
}
}

@ -1,10 +1,12 @@
package main package main
import ( import (
"context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"net/http" "net/http"
"strconv"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
"github.com/jordanorelli/blammo" "github.com/jordanorelli/blammo"
@ -12,9 +14,10 @@ import (
type server struct { type server struct {
*blammo.Log *blammo.Log
host string host string
port int port int
world *room world *room
lastSessionID int
} }
func (s *server) listen() error { func (s *server) listen() error {
@ -30,6 +33,18 @@ func (s *server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return return
} }
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
s.lastSessionID++
sn := session{
Log: s.Log.Child("sessions").Child(strconv.Itoa(s.lastSessionID)),
id: s.lastSessionID,
conn: conn,
outbox: make(chan response),
}
go sn.pump(ctx)
for { for {
t, r, err := conn.NextReader() t, r, err := conn.NextReader()
if err != nil { if err != nil {
@ -44,14 +59,16 @@ func (s *server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
s.Error("readall error: %v", err) s.Error("readall error: %v", err)
break break
} }
s.Info("received: %s", text) sn.Log.Child("received-frame").Info(string(text))
var body requestBody var body requestBody
if err := json.Unmarshal(text, &body); err != nil { if err := json.Unmarshal(text, &body); err != nil {
s.Error("unable to parse request: %v", err) s.Error("unable to parse request: %v", err)
sn.outbox <- errorResponse(0, fmt.Errorf("unable to parse request: %v", err))
break break
} }
sn.outbox <- ok(body.Seq)
case websocket.BinaryMessage: case websocket.BinaryMessage:
sn.outbox <- errorResponse(0, fmt.Errorf("unable to parse binary frames"))
} }
} }
} }

@ -0,0 +1,56 @@
package main
import (
"context"
"encoding/json"
"time"
"github.com/gorilla/websocket"
"github.com/jordanorelli/blammo"
)
// session represents the server side of a client's session. i.e., a single
// connection along with its associated state.
type session struct {
*blammo.Log
id int
conn *websocket.Conn
outbox chan response
}
// pump is the session send loop. Pump should pump the session's outbox
// messages to the underlying connection until the context is closed.
func (sn *session) pump(ctx context.Context) {
for {
select {
case res := <-sn.outbox:
payload, err := json.Marshal(res)
if err != nil {
sn.Error("failed to marshal outgoing response: %v", err)
break
}
if err := sn.conn.SetWriteDeadline(time.Now().Add(3 * time.Second)); err != nil {
sn.Error("failed to set write deadline: %v", err)
break
}
w, err := sn.conn.NextWriter(websocket.TextMessage)
if err != nil {
sn.Error("failed get a writer frame: %v", err)
break
}
if _, err := w.Write(payload); err != nil {
sn.Error("failed write payload: %v", err)
break
}
if err := w.Close(); err != nil {
sn.Error("failed to close write frame: %v", err)
break
}
sn.Child("sent-frame").Info(string(payload))
case <-ctx.Done():
sn.Info("parent context done, shutting down write pump")
return
}
}
}
Loading…
Cancel
Save