From ad6dc3d6b55c8efe3e4789ae8a6873e6c4538766 Mon Sep 17 00:00:00 2001 From: Jordan Orelli Date: Sun, 25 Oct 2020 22:09:49 +0000 Subject: [PATCH] braking things up into packages --- internal/server/server.go | 112 +++++++++++++++++++++++++++++++++++++ internal/server/session.go | 59 +++++++++++++++++++ internal/sim/entity.go | 7 +++ internal/sim/room.go | 4 ++ internal/sim/world.go | 4 ++ internal/wire/error.go | 17 ++++++ internal/wire/ok.go | 4 ++ internal/wire/request.go | 7 +++ internal/wire/response.go | 19 +++++++ internal/wire/tag.go | 45 +++++++++++++++ internal/wire/value.go | 3 + main.go | 21 +++---- server.go | 73 ------------------------ 13 files changed, 289 insertions(+), 86 deletions(-) create mode 100644 internal/server/server.go create mode 100644 internal/server/session.go create mode 100644 internal/sim/entity.go create mode 100644 internal/sim/room.go create mode 100644 internal/sim/world.go create mode 100644 internal/wire/error.go create mode 100644 internal/wire/ok.go create mode 100644 internal/wire/request.go create mode 100644 internal/wire/response.go create mode 100644 internal/wire/tag.go create mode 100644 internal/wire/value.go delete mode 100644 server.go diff --git a/internal/server/server.go b/internal/server/server.go new file mode 100644 index 0000000..51b30cb --- /dev/null +++ b/internal/server/server.go @@ -0,0 +1,112 @@ +package server + +import ( + "context" + "encoding/json" + "fmt" + "io/ioutil" + "net" + "net/http" + "os" + "strconv" + + "github.com/gorilla/websocket" + "github.com/jordanorelli/astro-domu/internal/errors" + "github.com/jordanorelli/astro-domu/internal/wire" + "github.com/jordanorelli/blammo" +) + +type Server struct { + *blammo.Log + Host string + Port int + + lastSessionID int +} + +func (s *Server) Start() error { + if s.Host == "" { + s.Host = "127.0.0.1" + } + if s.Port == 0 { + s.Port = 12805 + } + if s.Log == nil { + stdout := blammo.NewLineWriter(os.Stdout) + stderr := blammo.NewLineWriter(os.Stderr) + + options := []blammo.Option{ + blammo.DebugWriter(stdout), + blammo.InfoWriter(stdout), + blammo.ErrorWriter(stderr), + } + + s.Log = blammo.NewLog("astro", options...).Child("server") + } + + addr := fmt.Sprintf("%s:%d", s.Host, s.Port) + lis, err := net.Listen("tcp", addr) + if err != nil { + return fmt.Errorf("server failed to start a listener: %w", err) + } + s.Log.Info("listening for TCP traffic on %q", addr) + go s.runHTTPServer(lis) + + return nil +} + +func (s *Server) runHTTPServer(lis net.Listener) { + err := http.Serve(lis, s) + if err != nil && !errors.Is(err, http.ErrServerClosed) { + s.Error("error in http.Serve: %v", err) + } +} + +func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { + upgrader := websocket.Upgrader{} + + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + s.Error("upgrade error: %v", err) + return + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + s.lastSessionID++ + sn := session{ + Log: s.Log.Child("sessions").Child(strconv.Itoa(s.lastSessionID)), + id: s.lastSessionID, + conn: conn, + outbox: make(chan wire.Response), + } + go sn.pump(ctx) + + for { + t, r, err := conn.NextReader() + if err != nil { + s.Error("read error: %v", err) + return + } + + switch t { + case websocket.TextMessage: + text, err := ioutil.ReadAll(r) + if err != nil { + s.Error("readall error: %v", err) + break + } + sn.Log.Child("received-frame").Info(string(text)) + var req wire.Request + if err := json.Unmarshal(text, &req); err != nil { + s.Error("unable to parse request: %v", err) + sn.outbox <- wire.ErrorResponse(0, "unable to parse request: %v", err) + break + } + sn.outbox <- wire.NewResponse(req.Seq, wire.OK{}) + case websocket.BinaryMessage: + sn.outbox <- wire.ErrorResponse(0, "unable to parse binary frames") + } + } +} diff --git a/internal/server/session.go b/internal/server/session.go new file mode 100644 index 0000000..0500f37 --- /dev/null +++ b/internal/server/session.go @@ -0,0 +1,59 @@ +package server + +import ( + "context" + "encoding/json" + "fmt" + "time" + + "github.com/gorilla/websocket" + "github.com/jordanorelli/astro-domu/internal/wire" + "github.com/jordanorelli/blammo" +) + +type session struct { + *blammo.Log + id int + conn *websocket.Conn + outbox chan wire.Response +} + +// pump is the session send loop. Pump should pump the session's outbox +// messages to the underlying connection until the context is closed. +func (sn *session) pump(ctx context.Context) { + for { + select { + case res := <-sn.outbox: + if err := sn.sendResponse(res); err != nil { + sn.Error(err.Error()) + } + case <-ctx.Done(): + sn.Info("parent context done, shutting down write pump") + return + } + } +} + +func (sn *session) sendResponse(res wire.Response) error { + payload, err := json.Marshal(res) + if err != nil { + return fmt.Errorf("failed to marshal outgoing response: %w", err) + } + + if err := sn.conn.SetWriteDeadline(time.Now().Add(3 * time.Second)); err != nil { + return fmt.Errorf("failed to set write deadline: %w", err) + } + + w, err := sn.conn.NextWriter(websocket.TextMessage) + if err != nil { + return fmt.Errorf("failed get a writer frame: %w", err) + } + if _, err := w.Write(payload); err != nil { + return fmt.Errorf("failed write payload: %w", err) + } + if err := w.Close(); err != nil { + return fmt.Errorf("failed to close write frame: %w", err) + } + sn.Child("sent-frame").Info(string(payload)) + return nil +} diff --git a/internal/sim/entity.go b/internal/sim/entity.go new file mode 100644 index 0000000..15a45ad --- /dev/null +++ b/internal/sim/entity.go @@ -0,0 +1,7 @@ +package sim + +// entity is any entity that can be simulated. +type entity interface { + // update is the standard tick function + update(time.Duration) +} diff --git a/internal/sim/room.go b/internal/sim/room.go new file mode 100644 index 0000000..a31f9a4 --- /dev/null +++ b/internal/sim/room.go @@ -0,0 +1,4 @@ +package sim + +type Room struct { +} diff --git a/internal/sim/world.go b/internal/sim/world.go new file mode 100644 index 0000000..3f19aa9 --- /dev/null +++ b/internal/sim/world.go @@ -0,0 +1,4 @@ +package sim + +type World struct { +} diff --git a/internal/wire/error.go b/internal/wire/error.go new file mode 100644 index 0000000..03dd6f5 --- /dev/null +++ b/internal/wire/error.go @@ -0,0 +1,17 @@ +package wire + +import ( + "fmt" +) + +type Error struct { + val error +} + +func (e Error) Error() string { return e.val.Error() } +func (e Error) NetTag() Tag { return T_Error } +func (e Error) Unwrap() error { return e.val } + +func Errorf(t string, args ...interface{}) Error { + return Error{val: fmt.Errorf(t, args...)} +} diff --git a/internal/wire/ok.go b/internal/wire/ok.go new file mode 100644 index 0000000..8cb6adc --- /dev/null +++ b/internal/wire/ok.go @@ -0,0 +1,4 @@ +package wire + +type OK struct {} +func (OK) NetTag() Tag { return T_OK } diff --git a/internal/wire/request.go b/internal/wire/request.go new file mode 100644 index 0000000..9064657 --- /dev/null +++ b/internal/wire/request.go @@ -0,0 +1,7 @@ +package wire + +type Request struct { + Seq int `json:"seq"` + Type string `json:"type"` + Body interface{} `json:"body"` +} diff --git a/internal/wire/response.go b/internal/wire/response.go new file mode 100644 index 0000000..29c19e6 --- /dev/null +++ b/internal/wire/response.go @@ -0,0 +1,19 @@ +package wire + +type Response struct { + Re int `json:"re,omitempty"` + Type Tag `json:"type"` + Body interface{} `json:"body"` +} + +func NewResponse(re int, v Value) Response { + return Response{ + Re: re, + Type: v.NetTag(), + Body: v, + } +} + +func ErrorResponse(re int, t string, args ...interface{}) Response { + return NewResponse(re, Errorf(t, args...)) +} diff --git a/internal/wire/tag.go b/internal/wire/tag.go new file mode 100644 index 0000000..2b41c65 --- /dev/null +++ b/internal/wire/tag.go @@ -0,0 +1,45 @@ +package wire + +import ( + "encoding/json" + "fmt" +) + +type Tag uint + +const ( + T_None Tag = iota + T_Error + T_OK +) + +func (t Tag) String() string { + switch t { + case T_Error: + return "error" + case T_OK: + return "ok" + default: + panic("unknown type tag") + } +} + +func (t *Tag) UnmarshalJSON(b []byte) error { + var name string + if err := json.Unmarshal(b, &name); err != nil { + return err + } + + switch name { + case "error": + *t = T_Error + return nil + case "ok": + *t = T_OK + return nil + default: + return fmt.Errorf("unknown type tag: %q", name) + } +} + +func (t Tag) MarshalJSON() ([]byte, error) { return json.Marshal(t.String()) } diff --git a/internal/wire/value.go b/internal/wire/value.go new file mode 100644 index 0000000..8ea5050 --- /dev/null +++ b/internal/wire/value.go @@ -0,0 +1,3 @@ +package wire + +type Value interface{ NetTag() Tag } diff --git a/main.go b/main.go index 5248a1c..186b464 100644 --- a/main.go +++ b/main.go @@ -2,9 +2,11 @@ package main import ( "os" + "os/signal" "time" "github.com/jordanorelli/astro-domu/internal/exit" + "github.com/jordanorelli/astro-domu/internal/server" "github.com/jordanorelli/blammo" ) @@ -34,23 +36,16 @@ func main() { case "client": runClient() case "server": - stdout := blammo.NewLineWriter(os.Stdout) - stderr := blammo.NewLineWriter(os.Stderr) - - options := []blammo.Option{ - blammo.DebugWriter(stdout), - blammo.InfoWriter(stdout), - blammo.ErrorWriter(stderr), + s := server.Server{} + if err := s.Start(); err != nil { + exit.WithMessage(1, "unable to start server: %v", err) } - - log := blammo.NewLog("astro", options...).Child("server") - s := server{Log: log, host: "127.0.0.1", port: 12805} - err := s.listen() - log.Error("listen error: %v", err) + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt) + <-sig default: exit.WithMessage(1, "supported options are [client|server]") } - } func runClient() { diff --git a/server.go b/server.go deleted file mode 100644 index 8a6f03e..0000000 --- a/server.go +++ /dev/null @@ -1,73 +0,0 @@ -package main - -import ( - "context" - "encoding/json" - "fmt" - "io/ioutil" - "net/http" - "strconv" - - "github.com/gorilla/websocket" - "github.com/jordanorelli/blammo" -) - -type server struct { - *blammo.Log - host string - port int - lastSessionID int -} - -func (s *server) listen() error { - return http.ListenAndServe(fmt.Sprintf("%s:%d", s.host, s.port), s) -} - -func (s *server) ServeHTTP(w http.ResponseWriter, r *http.Request) { - upgrader := websocket.Upgrader{} - - conn, err := upgrader.Upgrade(w, r, nil) - if err != nil { - s.Error("upgrade error: %v", err) - return - } - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - s.lastSessionID++ - sn := session{ - Log: s.Log.Child("sessions").Child(strconv.Itoa(s.lastSessionID)), - id: s.lastSessionID, - conn: conn, - outbox: make(chan response), - } - go sn.pump(ctx) - - for { - t, r, err := conn.NextReader() - if err != nil { - s.Error("read error: %v", err) - return - } - - switch t { - case websocket.TextMessage: - text, err := ioutil.ReadAll(r) - if err != nil { - s.Error("readall error: %v", err) - break - } - sn.Log.Child("received-frame").Info(string(text)) - var body requestBody - if err := json.Unmarshal(text, &body); err != nil { - s.Error("unable to parse request: %v", err) - sn.outbox <- errorResponse(0, fmt.Errorf("unable to parse request: %v", err)) - break - } - sn.outbox <- ok(body.Seq) - case websocket.BinaryMessage: - sn.outbox <- errorResponse(0, fmt.Errorf("unable to parse binary frames")) - } - } -}