this is broken but still, cleaning up cruft is good

master
Jordan Orelli 4 years ago
parent b5af018ae7
commit 2a29ee6c88

@ -3,7 +3,7 @@ package sim
import "github.com/jordanorelli/astro-domu/internal/wire"
type Effect interface {
wire.Value
//wire.Value
exec(*room, *player, int) result
}

@ -1,6 +1,11 @@
package sim
import (
"encoding/json"
"fmt"
"time"
"github.com/gorilla/websocket"
"github.com/jordanorelli/astro-domu/internal/math"
"github.com/jordanorelli/astro-domu/internal/wire"
"github.com/jordanorelli/blammo"
@ -8,75 +13,14 @@ import (
type player struct {
*blammo.Log
*room
//*room
name string
outbox chan wire.Response
pending *Request
avatar *entity
}
type Move math.Vec
func (Move) NetTag() string { return "move" }
func (m *Move) exec(r *room, p *player, seq int) result {
pos := p.avatar.Position
target := pos.Add(math.Vec(*m))
p.Info("running move for player %s from %v to %v", p.name, *m, target)
if !p.room.Contains(target) {
return result{reply: wire.Errorf("target cell (%d, %d) is out of bounds", target.X, target.Y)}
}
currentTile := r.getTile(pos)
nextTile := r.getTile(target)
if nextTile.here != nil {
return result{reply: wire.Errorf("target cell (%d, %d) is occupied", target.X, target.Y)}
}
currentTile.here, nextTile.here = nil, p.avatar
p.avatar.Position = target
return result{reply: wire.OK{}}
}
// SpawnPlayer is a request to spawn a player
type SpawnPlayer struct {
Outbox chan wire.Response
Name string
queued bool
}
var lastEntityID = 0
func (s *SpawnPlayer) exec(r *room, p *player, seq int) result {
if !s.queued {
r.Info("spawn player requested for: %s", s.Name)
if _, ok := r.players[s.Name]; ok {
s.Outbox <- wire.ErrorResponse(seq, "a player is already logged in as %q", s.Name)
return result{}
}
lastEntityID++
avatar := &entity{
ID: lastEntityID,
Position: math.Vec{0, 0},
Glyph: '@',
behavior: doNothing{},
}
p := &player{
Log: r.Log.Child("players").Child(s.Name),
room: r,
name: s.Name,
outbox: s.Outbox,
avatar: avatar,
}
p.pending = &Request{Seq: seq, From: s.Name, Wants: s}
r.players[s.Name] = p
r.tiles[0].here = p.avatar
s.queued = true
return result{}
}
func (p *player) start(c chan Request, conn *websocket.Conn, r *room) {
welcome := wire.Welcome{
Rooms: make(map[string]wire.Room),
Players: make(map[string]wire.Player),
@ -95,26 +39,140 @@ func (s *SpawnPlayer) exec(r *room, p *player, seq int) result {
Entities: ents,
}
for _, p := range r.players {
welcome.Players[p.name] = wire.Player{
Name: p.name,
Avatar: p.avatar.ID,
Room: r.name,
wp := wire.Player{
Name: p.name,
Room: r.name,
}
if p.avatar != nil {
wp.Avatar = p.avatar.ID
}
welcome.Players[p.name] = wp
}
return result{
reply: welcome,
p.outbox <- wire.Response{Re: 1, Body: welcome}
go p.readLoop(c, conn)
go p.runLoop(conn)
}
func (p *player) readLoop(c chan Request, conn *websocket.Conn) {
for {
_, b, err := conn.ReadMessage()
if err != nil {
p.Error("read error: %v", err)
conn.Close()
return
}
p.Log.Child("received-frame").Info(string(b))
var req wire.Request
if err := json.Unmarshal(b, &req); err != nil {
p.Error("unable to parse request: %v", err)
continue
}
// sn.Info("received message of type %T", req.Body)
effect, ok := req.Body.(Effect)
if !ok {
continue
}
c <- Request{
From: p.name,
Seq: req.Seq,
Wants: effect,
}
}
}
func (p *player) runLoop(conn *websocket.Conn) {
for {
select {
case res := <-p.outbox:
if err := sendResponse(conn, res); err != nil {
p.Error(err.Error())
}
}
// case sendCloseFrame := <-sn.done:
// sn.Info("saw done signal")
// if sendCloseFrame {
// sn.Info("sending close frame")
// msg := websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")
// if err := sn.conn.WriteMessage(websocket.CloseMessage, msg); err != nil {
// sn.Error("failed to write close message: %v", err)
// } else {
// sn.Info("sent close frame")
// }
// }
// return
// }
}
}
func sendResponse(conn *websocket.Conn, res wire.Response) error {
payload, err := json.Marshal(res)
if err != nil {
return fmt.Errorf("failed to marshal outgoing response: %w", err)
}
if err := conn.SetWriteDeadline(time.Now().Add(3 * time.Second)); err != nil {
return fmt.Errorf("failed to set write deadline: %w", err)
}
w, err := conn.NextWriter(websocket.TextMessage)
if err != nil {
return fmt.Errorf("failed get a writer frame: %w", err)
}
if _, err := w.Write(payload); err != nil {
return fmt.Errorf("failed write payload: %w", err)
}
if err := w.Close(); err != nil {
return fmt.Errorf("failed to close write frame: %w", err)
}
return nil
}
type spawnPlayer struct{}
func (s spawnPlayer) exec(r *room, p *player, seq int) result {
for n, t := range r.tiles {
if t.here == nil {
x, y := n%r.Width, n/r.Width
e := entity{
Position: math.Vec{x, y},
Glyph: '@',
behavior: doNothing{},
}
p.avatar = &e
t.here = &e
break
}
}
return result{}
}
func (SpawnPlayer) NetTag() string { return "player/spawn" }
type Move math.Vec
func (Move) NetTag() string { return "move" }
func (m *Move) exec(r *room, p *player, seq int) result {
pos := p.avatar.Position
target := pos.Add(math.Vec(*m))
p.Info("running move for player %s from %v to %v", p.name, *m, target)
if !r.Contains(target) {
return result{reply: wire.Errorf("target cell (%d, %d) is out of bounds", target.X, target.Y)}
}
// PlayerSpawned is an announcement that a player has spawned
type PlayerSpawned struct {
Name string `json:"name"`
Position [2]int `json:"position"`
currentTile := r.getTile(pos)
nextTile := r.getTile(target)
if nextTile.here != nil {
return result{reply: wire.Errorf("target cell (%d, %d) is occupied", target.X, target.Y)}
}
currentTile.here, nextTile.here = nil, p.avatar
p.avatar.Position = target
return result{reply: wire.OK{}}
}
func (PlayerSpawned) NetTag() string { return "player/spawned" }
var lastEntityID = 0
func init() {
wire.Register(func() wire.Value { return new(Move) })

@ -2,10 +2,10 @@ package sim
import (
"context"
"encoding/json"
"fmt"
"net"
"net/http"
"strconv"
"sync"
"time"
@ -19,11 +19,6 @@ type Server struct {
*blammo.Log
http *http.Server
world *world
sync.Mutex
lastSessionID int
sessions map[int]*session
waitOnSessions sync.WaitGroup
}
func (s *Server) Start(host string, port int) error {
@ -32,7 +27,7 @@ func (s *Server) Start(host string, port int) error {
}
s.world = newWorld(s.Log.Child("world"))
go s.world.run(3)
go s.world.run(30)
addr := fmt.Sprintf("%s:%d", host, port)
lis, err := net.Listen("tcp", addr)
@ -57,59 +52,65 @@ func (s *Server) runHTTPServer(lis net.Listener) {
}
}
func (s *Server) createSession(conn *websocket.Conn) *session {
s.Lock()
defer s.Unlock()
s.lastSessionID++
sn := &session{
Log: s.Log.Child("sessions").Child(strconv.Itoa(s.lastSessionID)),
id: s.lastSessionID,
start: time.Now(),
conn: conn,
outbox: make(chan wire.Response),
done: make(chan bool, 1),
}
if s.sessions == nil {
s.sessions = make(map[int]*session)
func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
log := s.Log.Child("login")
upgrader := websocket.Upgrader{
HandshakeTimeout: 3 * time.Second,
ReadBufferSize: 2 << 12,
WriteBufferSize: 2 << 12,
Subprotocols: []string{"astrodomu@v0"},
}
s.waitOnSessions.Add(1)
s.sessions[sn.id] = sn
// sn.entityID = s.world.SpawnPlayer(sn.id)
// s.Info("created session %d, %d sessions active", sn.id, len(s.sessions))
return sn
}
// dropSession removes a session from the server. This should only be called as
// a result of the connection's read loop terminating
func (s *Server) dropSession(sn *session) {
s.Lock()
defer s.Unlock()
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Error("upgrade error: %v", err)
return
}
close(sn.done)
delete(s.sessions, sn.id)
s.waitOnSessions.Add(-1)
t, rd, err := conn.NextReader()
if err != nil {
log.Error("unable to get a reader: %v", err)
conn.Close()
return
}
s.Info("dropped session %d after %v time connected, %d sessions active", sn.id, time.Since(sn.start), len(s.sessions))
}
if t != websocket.TextMessage {
log.Error("first message is not text")
// TODO: send websocket close frame here
conn.Close()
return
}
func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
upgrader := websocket.Upgrader{}
var req wire.Request
if err := json.NewDecoder(rd).Decode(&req); err != nil {
log.Error("unable to parse initial request: %v", err)
// TODO: send websocket close frame here
conn.Close()
return
}
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
s.Error("upgrade error: %v", err)
login, ok := req.Body.(*wire.Login)
if !ok {
log.Error("first request is not wire.Login, is %T", req.Body)
// TODO: send websocket close frame here
conn.Close()
return
}
sn := s.createSession(conn)
go sn.run()
sn.read(s.world.Inbox)
s.dropSession(sn)
log.Info("login requested: %v", *login)
sn.Info("closing connection")
if err := conn.Close(); err != nil {
s.Error("error closing connection: %v", err)
failed := make(chan error, 1)
s.world.connect <- connect{
conn: conn,
login: *login,
failed: failed,
}
e := <-failed
if e != nil {
log.Error("connect failed: %v", err)
// TODO: send websocket close frame here
conn.Close()
return
}
}
@ -117,7 +118,7 @@ func (s *Server) Shutdown() {
s.Info("starting shutdown procedure")
var wg sync.WaitGroup
wg.Add(3)
wg.Add(2)
go func() {
defer wg.Done()
@ -138,29 +139,6 @@ func (s *Server) Shutdown() {
}
}()
go func() {
defer wg.Done()
log := s.Child("sessions")
s.Lock()
numSessions := len(s.sessions)
if numSessions > 0 {
log.Info("broadcasting shutdown to %d active sessions", numSessions)
for id, sn := range s.sessions {
log.Info("sending done signal to session: %d", id)
sn.done <- true
}
} else {
log.Info("no active sessions")
}
s.Unlock()
if numSessions > 0 {
log.Info("waiting on %d connected sessions to shut down", numSessions)
}
s.waitOnSessions.Wait()
log.Info("all sessions have shut down")
}()
wg.Wait()
s.Info("shutdown procedure complete")
}

@ -1,154 +0,0 @@
package sim
import (
"encoding/json"
"fmt"
"time"
"github.com/gorilla/websocket"
"github.com/jordanorelli/astro-domu/internal/wire"
"github.com/jordanorelli/blammo"
)
type session struct {
*blammo.Log
Name string
id int
entityID int
start time.Time
conn *websocket.Conn
outbox chan wire.Response
done chan bool
}
// run is the session run loop.
func (sn *session) run() {
for {
select {
case res := <-sn.outbox:
if err := sn.sendResponse(res); err != nil {
sn.Error(err.Error())
}
case sendCloseFrame := <-sn.done:
sn.Info("saw done signal")
if sendCloseFrame {
sn.Info("sending close frame")
msg := websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")
if err := sn.conn.WriteMessage(websocket.CloseMessage, msg); err != nil {
sn.Error("failed to write close message: %v", err)
} else {
sn.Info("sent close frame")
}
}
return
}
}
}
// read reads for messages on the underlying websocket.
func (sn *session) read(c chan Request) {
for {
t, b, err := sn.conn.ReadMessage()
if err != nil {
if v, ok := err.(*websocket.CloseError); ok {
switch v.Code {
case websocket.CloseNormalClosure,
websocket.CloseGoingAway:
sn.Info("received close frame with code %d (%v)", v.Code, err)
case websocket.CloseNoStatusReceived,
websocket.CloseProtocolError,
websocket.CloseUnsupportedData,
websocket.CloseAbnormalClosure,
websocket.CloseInvalidFramePayloadData,
websocket.ClosePolicyViolation,
websocket.CloseMessageTooBig,
websocket.CloseMandatoryExtension,
websocket.CloseInternalServerErr,
websocket.CloseServiceRestart,
websocket.CloseTryAgainLater,
websocket.CloseTLSHandshake:
sn.Error("received close frame with code %d (%v)", v.Code, err)
default:
sn.Error("received close frame with code %d (%v)", v.Code, err)
}
return
}
sn.Error("unexpected read error: %v", err)
return
}
switch t {
case websocket.TextMessage:
sn.Log.Child("received-frame").Info(string(b))
var req wire.Request
if err := json.Unmarshal(b, &req); err != nil {
sn.Error("unable to parse request: %v", err)
sn.enqueue(wire.ErrorResponse(req.Seq, "unable to parse request: %v", err))
break
}
sn.Info("received message of type %T", req.Body)
switch v := req.Body.(type) {
case *wire.Login:
sn.Name = v.Name
c <- Request{
From: sn.Name,
Seq: req.Seq,
Wants: &SpawnPlayer{
Name: sn.Name,
Outbox: sn.outbox,
},
}
case Effect:
c <- Request{
From: sn.Name,
Seq: req.Seq,
Wants: v,
}
default:
sn.enqueue(wire.ErrorResponse(req.Seq, "not sure how to handle that"))
}
case websocket.BinaryMessage:
sn.enqueue(wire.ErrorResponse(0, "unable to parse binary frames"))
}
}
}
// sendResponse sends an individual response on the underlying websocket connection
func (sn *session) sendResponse(res wire.Response) error {
payload, err := json.Marshal(res)
if err != nil {
return fmt.Errorf("failed to marshal outgoing response: %w", err)
}
if err := sn.conn.SetWriteDeadline(time.Now().Add(3 * time.Second)); err != nil {
return fmt.Errorf("failed to set write deadline: %w", err)
}
w, err := sn.conn.NextWriter(websocket.TextMessage)
if err != nil {
return fmt.Errorf("failed get a writer frame: %w", err)
}
if _, err := w.Write(payload); err != nil {
return fmt.Errorf("failed write payload: %w", err)
}
if err := w.Close(); err != nil {
return fmt.Errorf("failed to close write frame: %w", err)
}
sn.Child("sent-frame").Info(string(payload))
return nil
}
func (sn *session) enqueue(res wire.Response) {
select {
case sn.outbox <- res:
return
default:
select {
case <-sn.outbox:
sn.enqueue(res)
default:
}
}
}

@ -1,8 +1,10 @@
package sim
import (
"fmt"
"time"
"github.com/gorilla/websocket"
"github.com/jordanorelli/astro-domu/internal/math"
"github.com/jordanorelli/astro-domu/internal/wire"
"github.com/jordanorelli/blammo"
@ -11,14 +13,21 @@ import (
// world is the entire simulated world. A world consists of many rooms.
type world struct {
*blammo.Log
Inbox chan Request
inbox chan Request
connect chan connect
rooms []room
done chan bool
lastEntityID int
rooms map[string]*room
players map[string]*player
}
type connect struct {
conn *websocket.Conn
login wire.Login
failed chan error
}
func newWorld(log *blammo.Log) *world {
bounds := math.CreateRect(10, 10)
foyer := room{
@ -37,9 +46,10 @@ func newWorld(log *blammo.Log) *world {
log.Info("created foyer with bounds: %#v having width: %d height: %d area: %d", foyer.Rect, foyer.Width, foyer.Height, foyer.Area())
return &world{
Log: log,
rooms: []room{foyer},
rooms: map[string]*room{"foyer": &foyer},
done: make(chan bool),
Inbox: make(chan Request),
inbox: make(chan Request),
connect: make(chan connect),
players: make(map[string]*player),
}
}
@ -54,7 +64,10 @@ func (w *world) run(hz int) {
for {
select {
case req := <-w.Inbox:
case c := <-w.connect:
w.register(c)
case req := <-w.inbox:
w.Info("read from inbox: %v", req)
if req.From == "" {
@ -62,17 +75,6 @@ func (w *world) run(hz int) {
break
}
if spawn, ok := req.Wants.(*SpawnPlayer); ok {
if _, ok := w.players[req.From]; ok {
spawn.Outbox <- wire.ErrorResponse(req.Seq, "a player is already logged in as %q", req.From)
break
}
spawn.exec(&w.rooms[0], nil, req.Seq)
p := w.rooms[0].players[req.From]
w.players[req.From] = p
break
}
p, ok := w.players[req.From]
if !ok {
w.Error("received non login request of type %T from unknown player %q", req.Wants, req.From)
@ -94,6 +96,31 @@ func (w *world) run(hz int) {
}
}
func (w *world) register(c connect) {
w.Info("register: %#v", c.login)
foyer := w.rooms["foyer"]
if len(foyer.players) >= 100 {
c.failed <- fmt.Errorf("room is full")
close(c.failed)
return
}
p := player{
Log: w.Log.Child("players").Child(c.login.Name),
name: c.login.Name,
outbox: make(chan wire.Response, 8),
pending: &Request{
From: c.login.Name,
Seq: 1,
Wants: &spawnPlayer{},
},
}
foyer.players[c.login.Name] = &p
w.players[c.login.Name] = &p
p.start(w.inbox, c.conn, foyer)
}
func (w *world) stop() error {
w.Info("stopping simulation")
w.done <- true

@ -169,6 +169,7 @@ func (c *Client) writeLoop() {
c.Error("saw response for unknown seq %d")
break
}
delete(sent, res.Re)
p.res = res
close(p.done)

Loading…
Cancel
Save