diff --git a/internal/sim/effect.go b/internal/sim/effect.go index 8f7fce8..5bdbe74 100644 --- a/internal/sim/effect.go +++ b/internal/sim/effect.go @@ -3,7 +3,7 @@ package sim import "github.com/jordanorelli/astro-domu/internal/wire" type Effect interface { - wire.Value + //wire.Value exec(*room, *player, int) result } diff --git a/internal/sim/player.go b/internal/sim/player.go index ac056a0..27d90b6 100644 --- a/internal/sim/player.go +++ b/internal/sim/player.go @@ -1,6 +1,11 @@ package sim import ( + "encoding/json" + "fmt" + "time" + + "github.com/gorilla/websocket" "github.com/jordanorelli/astro-domu/internal/math" "github.com/jordanorelli/astro-domu/internal/wire" "github.com/jordanorelli/blammo" @@ -8,75 +13,14 @@ import ( type player struct { *blammo.Log - *room + //*room name string outbox chan wire.Response pending *Request avatar *entity } -type Move math.Vec - -func (Move) NetTag() string { return "move" } - -func (m *Move) exec(r *room, p *player, seq int) result { - pos := p.avatar.Position - target := pos.Add(math.Vec(*m)) - p.Info("running move for player %s from %v to %v", p.name, *m, target) - if !p.room.Contains(target) { - return result{reply: wire.Errorf("target cell (%d, %d) is out of bounds", target.X, target.Y)} - } - - currentTile := r.getTile(pos) - nextTile := r.getTile(target) - if nextTile.here != nil { - return result{reply: wire.Errorf("target cell (%d, %d) is occupied", target.X, target.Y)} - } - - currentTile.here, nextTile.here = nil, p.avatar - p.avatar.Position = target - return result{reply: wire.OK{}} -} - -// SpawnPlayer is a request to spawn a player -type SpawnPlayer struct { - Outbox chan wire.Response - Name string - queued bool -} - -var lastEntityID = 0 - -func (s *SpawnPlayer) exec(r *room, p *player, seq int) result { - if !s.queued { - r.Info("spawn player requested for: %s", s.Name) - - if _, ok := r.players[s.Name]; ok { - s.Outbox <- wire.ErrorResponse(seq, "a player is already logged in as %q", s.Name) - return result{} - } - - lastEntityID++ - avatar := &entity{ - ID: lastEntityID, - Position: math.Vec{0, 0}, - Glyph: '@', - behavior: doNothing{}, - } - p := &player{ - Log: r.Log.Child("players").Child(s.Name), - room: r, - name: s.Name, - outbox: s.Outbox, - avatar: avatar, - } - p.pending = &Request{Seq: seq, From: s.Name, Wants: s} - r.players[s.Name] = p - r.tiles[0].here = p.avatar - s.queued = true - return result{} - } - +func (p *player) start(c chan Request, conn *websocket.Conn, r *room) { welcome := wire.Welcome{ Rooms: make(map[string]wire.Room), Players: make(map[string]wire.Player), @@ -95,26 +39,140 @@ func (s *SpawnPlayer) exec(r *room, p *player, seq int) result { Entities: ents, } for _, p := range r.players { - welcome.Players[p.name] = wire.Player{ - Name: p.name, - Avatar: p.avatar.ID, - Room: r.name, + wp := wire.Player{ + Name: p.name, + Room: r.name, } + if p.avatar != nil { + wp.Avatar = p.avatar.ID + } + welcome.Players[p.name] = wp } - return result{ - reply: welcome, + p.outbox <- wire.Response{Re: 1, Body: welcome} + go p.readLoop(c, conn) + go p.runLoop(conn) +} + +func (p *player) readLoop(c chan Request, conn *websocket.Conn) { + for { + _, b, err := conn.ReadMessage() + if err != nil { + p.Error("read error: %v", err) + conn.Close() + return + } + p.Log.Child("received-frame").Info(string(b)) + + var req wire.Request + if err := json.Unmarshal(b, &req); err != nil { + p.Error("unable to parse request: %v", err) + continue + } + // sn.Info("received message of type %T", req.Body) + + effect, ok := req.Body.(Effect) + if !ok { + continue + } + c <- Request{ + From: p.name, + Seq: req.Seq, + Wants: effect, + } + } + +} + +func (p *player) runLoop(conn *websocket.Conn) { + for { + select { + case res := <-p.outbox: + if err := sendResponse(conn, res); err != nil { + p.Error(err.Error()) + } + } + // case sendCloseFrame := <-sn.done: + // sn.Info("saw done signal") + // if sendCloseFrame { + // sn.Info("sending close frame") + // msg := websocket.FormatCloseMessage(websocket.CloseNormalClosure, "") + // if err := sn.conn.WriteMessage(websocket.CloseMessage, msg); err != nil { + // sn.Error("failed to write close message: %v", err) + // } else { + // sn.Info("sent close frame") + // } + // } + // return + // } + } +} + +func sendResponse(conn *websocket.Conn, res wire.Response) error { + payload, err := json.Marshal(res) + if err != nil { + return fmt.Errorf("failed to marshal outgoing response: %w", err) + } + + if err := conn.SetWriteDeadline(time.Now().Add(3 * time.Second)); err != nil { + return fmt.Errorf("failed to set write deadline: %w", err) + } + + w, err := conn.NextWriter(websocket.TextMessage) + if err != nil { + return fmt.Errorf("failed get a writer frame: %w", err) + } + if _, err := w.Write(payload); err != nil { + return fmt.Errorf("failed write payload: %w", err) + } + if err := w.Close(); err != nil { + return fmt.Errorf("failed to close write frame: %w", err) + } + return nil +} + +type spawnPlayer struct{} + +func (s spawnPlayer) exec(r *room, p *player, seq int) result { + for n, t := range r.tiles { + if t.here == nil { + x, y := n%r.Width, n/r.Width + e := entity{ + Position: math.Vec{x, y}, + Glyph: '@', + behavior: doNothing{}, + } + p.avatar = &e + t.here = &e + break + } } + return result{} } -func (SpawnPlayer) NetTag() string { return "player/spawn" } +type Move math.Vec + +func (Move) NetTag() string { return "move" } + +func (m *Move) exec(r *room, p *player, seq int) result { + pos := p.avatar.Position + target := pos.Add(math.Vec(*m)) + p.Info("running move for player %s from %v to %v", p.name, *m, target) + if !r.Contains(target) { + return result{reply: wire.Errorf("target cell (%d, %d) is out of bounds", target.X, target.Y)} + } -// PlayerSpawned is an announcement that a player has spawned -type PlayerSpawned struct { - Name string `json:"name"` - Position [2]int `json:"position"` + currentTile := r.getTile(pos) + nextTile := r.getTile(target) + if nextTile.here != nil { + return result{reply: wire.Errorf("target cell (%d, %d) is occupied", target.X, target.Y)} + } + + currentTile.here, nextTile.here = nil, p.avatar + p.avatar.Position = target + return result{reply: wire.OK{}} } -func (PlayerSpawned) NetTag() string { return "player/spawned" } +var lastEntityID = 0 func init() { wire.Register(func() wire.Value { return new(Move) }) diff --git a/internal/sim/server.go b/internal/sim/server.go index af1f66c..5b992c2 100644 --- a/internal/sim/server.go +++ b/internal/sim/server.go @@ -2,10 +2,10 @@ package sim import ( "context" + "encoding/json" "fmt" "net" "net/http" - "strconv" "sync" "time" @@ -19,11 +19,6 @@ type Server struct { *blammo.Log http *http.Server world *world - - sync.Mutex - lastSessionID int - sessions map[int]*session - waitOnSessions sync.WaitGroup } func (s *Server) Start(host string, port int) error { @@ -32,7 +27,7 @@ func (s *Server) Start(host string, port int) error { } s.world = newWorld(s.Log.Child("world")) - go s.world.run(3) + go s.world.run(30) addr := fmt.Sprintf("%s:%d", host, port) lis, err := net.Listen("tcp", addr) @@ -57,59 +52,65 @@ func (s *Server) runHTTPServer(lis net.Listener) { } } -func (s *Server) createSession(conn *websocket.Conn) *session { - s.Lock() - defer s.Unlock() - - s.lastSessionID++ - sn := &session{ - Log: s.Log.Child("sessions").Child(strconv.Itoa(s.lastSessionID)), - id: s.lastSessionID, - start: time.Now(), - conn: conn, - outbox: make(chan wire.Response), - done: make(chan bool, 1), - } - if s.sessions == nil { - s.sessions = make(map[int]*session) +func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { + log := s.Log.Child("login") + upgrader := websocket.Upgrader{ + HandshakeTimeout: 3 * time.Second, + ReadBufferSize: 2 << 12, + WriteBufferSize: 2 << 12, + Subprotocols: []string{"astrodomu@v0"}, } - s.waitOnSessions.Add(1) - s.sessions[sn.id] = sn - // sn.entityID = s.world.SpawnPlayer(sn.id) - // s.Info("created session %d, %d sessions active", sn.id, len(s.sessions)) - return sn -} -// dropSession removes a session from the server. This should only be called as -// a result of the connection's read loop terminating -func (s *Server) dropSession(sn *session) { - s.Lock() - defer s.Unlock() + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + log.Error("upgrade error: %v", err) + return + } - close(sn.done) - delete(s.sessions, sn.id) - s.waitOnSessions.Add(-1) + t, rd, err := conn.NextReader() + if err != nil { + log.Error("unable to get a reader: %v", err) + conn.Close() + return + } - s.Info("dropped session %d after %v time connected, %d sessions active", sn.id, time.Since(sn.start), len(s.sessions)) -} + if t != websocket.TextMessage { + log.Error("first message is not text") + // TODO: send websocket close frame here + conn.Close() + return + } -func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { - upgrader := websocket.Upgrader{} + var req wire.Request + if err := json.NewDecoder(rd).Decode(&req); err != nil { + log.Error("unable to parse initial request: %v", err) + // TODO: send websocket close frame here + conn.Close() + return + } - conn, err := upgrader.Upgrade(w, r, nil) - if err != nil { - s.Error("upgrade error: %v", err) + login, ok := req.Body.(*wire.Login) + if !ok { + log.Error("first request is not wire.Login, is %T", req.Body) + // TODO: send websocket close frame here + conn.Close() return } - sn := s.createSession(conn) - go sn.run() - sn.read(s.world.Inbox) - s.dropSession(sn) + log.Info("login requested: %v", *login) - sn.Info("closing connection") - if err := conn.Close(); err != nil { - s.Error("error closing connection: %v", err) + failed := make(chan error, 1) + s.world.connect <- connect{ + conn: conn, + login: *login, + failed: failed, + } + e := <-failed + if e != nil { + log.Error("connect failed: %v", err) + // TODO: send websocket close frame here + conn.Close() + return } } @@ -117,7 +118,7 @@ func (s *Server) Shutdown() { s.Info("starting shutdown procedure") var wg sync.WaitGroup - wg.Add(3) + wg.Add(2) go func() { defer wg.Done() @@ -138,29 +139,6 @@ func (s *Server) Shutdown() { } }() - go func() { - defer wg.Done() - - log := s.Child("sessions") - s.Lock() - numSessions := len(s.sessions) - if numSessions > 0 { - log.Info("broadcasting shutdown to %d active sessions", numSessions) - for id, sn := range s.sessions { - log.Info("sending done signal to session: %d", id) - sn.done <- true - } - } else { - log.Info("no active sessions") - } - s.Unlock() - - if numSessions > 0 { - log.Info("waiting on %d connected sessions to shut down", numSessions) - } - s.waitOnSessions.Wait() - log.Info("all sessions have shut down") - }() wg.Wait() s.Info("shutdown procedure complete") } diff --git a/internal/sim/session.go b/internal/sim/session.go deleted file mode 100644 index 0d8f941..0000000 --- a/internal/sim/session.go +++ /dev/null @@ -1,154 +0,0 @@ -package sim - -import ( - "encoding/json" - "fmt" - "time" - - "github.com/gorilla/websocket" - "github.com/jordanorelli/astro-domu/internal/wire" - "github.com/jordanorelli/blammo" -) - -type session struct { - *blammo.Log - Name string - id int - entityID int - start time.Time - conn *websocket.Conn - outbox chan wire.Response - done chan bool -} - -// run is the session run loop. -func (sn *session) run() { - for { - select { - case res := <-sn.outbox: - if err := sn.sendResponse(res); err != nil { - sn.Error(err.Error()) - } - case sendCloseFrame := <-sn.done: - sn.Info("saw done signal") - if sendCloseFrame { - sn.Info("sending close frame") - msg := websocket.FormatCloseMessage(websocket.CloseNormalClosure, "") - if err := sn.conn.WriteMessage(websocket.CloseMessage, msg); err != nil { - sn.Error("failed to write close message: %v", err) - } else { - sn.Info("sent close frame") - } - } - return - } - } -} - -// read reads for messages on the underlying websocket. -func (sn *session) read(c chan Request) { - for { - t, b, err := sn.conn.ReadMessage() - if err != nil { - if v, ok := err.(*websocket.CloseError); ok { - switch v.Code { - case websocket.CloseNormalClosure, - websocket.CloseGoingAway: - sn.Info("received close frame with code %d (%v)", v.Code, err) - case websocket.CloseNoStatusReceived, - websocket.CloseProtocolError, - websocket.CloseUnsupportedData, - websocket.CloseAbnormalClosure, - websocket.CloseInvalidFramePayloadData, - websocket.ClosePolicyViolation, - websocket.CloseMessageTooBig, - websocket.CloseMandatoryExtension, - websocket.CloseInternalServerErr, - websocket.CloseServiceRestart, - websocket.CloseTryAgainLater, - websocket.CloseTLSHandshake: - sn.Error("received close frame with code %d (%v)", v.Code, err) - default: - sn.Error("received close frame with code %d (%v)", v.Code, err) - } - return - } - sn.Error("unexpected read error: %v", err) - return - } - - switch t { - case websocket.TextMessage: - sn.Log.Child("received-frame").Info(string(b)) - - var req wire.Request - if err := json.Unmarshal(b, &req); err != nil { - sn.Error("unable to parse request: %v", err) - sn.enqueue(wire.ErrorResponse(req.Seq, "unable to parse request: %v", err)) - break - } - sn.Info("received message of type %T", req.Body) - - switch v := req.Body.(type) { - case *wire.Login: - sn.Name = v.Name - c <- Request{ - From: sn.Name, - Seq: req.Seq, - Wants: &SpawnPlayer{ - Name: sn.Name, - Outbox: sn.outbox, - }, - } - case Effect: - c <- Request{ - From: sn.Name, - Seq: req.Seq, - Wants: v, - } - default: - sn.enqueue(wire.ErrorResponse(req.Seq, "not sure how to handle that")) - } - case websocket.BinaryMessage: - sn.enqueue(wire.ErrorResponse(0, "unable to parse binary frames")) - } - } -} - -// sendResponse sends an individual response on the underlying websocket connection -func (sn *session) sendResponse(res wire.Response) error { - payload, err := json.Marshal(res) - if err != nil { - return fmt.Errorf("failed to marshal outgoing response: %w", err) - } - - if err := sn.conn.SetWriteDeadline(time.Now().Add(3 * time.Second)); err != nil { - return fmt.Errorf("failed to set write deadline: %w", err) - } - - w, err := sn.conn.NextWriter(websocket.TextMessage) - if err != nil { - return fmt.Errorf("failed get a writer frame: %w", err) - } - if _, err := w.Write(payload); err != nil { - return fmt.Errorf("failed write payload: %w", err) - } - if err := w.Close(); err != nil { - return fmt.Errorf("failed to close write frame: %w", err) - } - sn.Child("sent-frame").Info(string(payload)) - return nil -} - -func (sn *session) enqueue(res wire.Response) { - select { - case sn.outbox <- res: - return - default: - select { - case <-sn.outbox: - sn.enqueue(res) - default: - } - } -} diff --git a/internal/sim/world.go b/internal/sim/world.go index 2324c5d..f4fd752 100644 --- a/internal/sim/world.go +++ b/internal/sim/world.go @@ -1,8 +1,10 @@ package sim import ( + "fmt" "time" + "github.com/gorilla/websocket" "github.com/jordanorelli/astro-domu/internal/math" "github.com/jordanorelli/astro-domu/internal/wire" "github.com/jordanorelli/blammo" @@ -11,14 +13,21 @@ import ( // world is the entire simulated world. A world consists of many rooms. type world struct { *blammo.Log - Inbox chan Request + inbox chan Request + connect chan connect - rooms []room done chan bool lastEntityID int + rooms map[string]*room players map[string]*player } +type connect struct { + conn *websocket.Conn + login wire.Login + failed chan error +} + func newWorld(log *blammo.Log) *world { bounds := math.CreateRect(10, 10) foyer := room{ @@ -37,9 +46,10 @@ func newWorld(log *blammo.Log) *world { log.Info("created foyer with bounds: %#v having width: %d height: %d area: %d", foyer.Rect, foyer.Width, foyer.Height, foyer.Area()) return &world{ Log: log, - rooms: []room{foyer}, + rooms: map[string]*room{"foyer": &foyer}, done: make(chan bool), - Inbox: make(chan Request), + inbox: make(chan Request), + connect: make(chan connect), players: make(map[string]*player), } } @@ -54,7 +64,10 @@ func (w *world) run(hz int) { for { select { - case req := <-w.Inbox: + case c := <-w.connect: + w.register(c) + + case req := <-w.inbox: w.Info("read from inbox: %v", req) if req.From == "" { @@ -62,17 +75,6 @@ func (w *world) run(hz int) { break } - if spawn, ok := req.Wants.(*SpawnPlayer); ok { - if _, ok := w.players[req.From]; ok { - spawn.Outbox <- wire.ErrorResponse(req.Seq, "a player is already logged in as %q", req.From) - break - } - spawn.exec(&w.rooms[0], nil, req.Seq) - p := w.rooms[0].players[req.From] - w.players[req.From] = p - break - } - p, ok := w.players[req.From] if !ok { w.Error("received non login request of type %T from unknown player %q", req.Wants, req.From) @@ -94,6 +96,31 @@ func (w *world) run(hz int) { } } +func (w *world) register(c connect) { + w.Info("register: %#v", c.login) + foyer := w.rooms["foyer"] + if len(foyer.players) >= 100 { + c.failed <- fmt.Errorf("room is full") + close(c.failed) + return + } + + p := player{ + Log: w.Log.Child("players").Child(c.login.Name), + name: c.login.Name, + outbox: make(chan wire.Response, 8), + pending: &Request{ + From: c.login.Name, + Seq: 1, + Wants: &spawnPlayer{}, + }, + } + foyer.players[c.login.Name] = &p + w.players[c.login.Name] = &p + + p.start(w.inbox, c.conn, foyer) +} + func (w *world) stop() error { w.Info("stopping simulation") w.done <- true diff --git a/internal/wire/client.go b/internal/wire/client.go index 40ab357..a419282 100644 --- a/internal/wire/client.go +++ b/internal/wire/client.go @@ -169,6 +169,7 @@ func (c *Client) writeLoop() { c.Error("saw response for unknown seq %d") break } + delete(sent, res.Re) p.res = res close(p.done)