braking things up into packages

master
Jordan Orelli 4 years ago
parent 06b965d2aa
commit ad6dc3d6b5

@ -0,0 +1,112 @@
package server
import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"net"
"net/http"
"os"
"strconv"
"github.com/gorilla/websocket"
"github.com/jordanorelli/astro-domu/internal/errors"
"github.com/jordanorelli/astro-domu/internal/wire"
"github.com/jordanorelli/blammo"
)
type Server struct {
*blammo.Log
Host string
Port int
lastSessionID int
}
func (s *Server) Start() error {
if s.Host == "" {
s.Host = "127.0.0.1"
}
if s.Port == 0 {
s.Port = 12805
}
if s.Log == nil {
stdout := blammo.NewLineWriter(os.Stdout)
stderr := blammo.NewLineWriter(os.Stderr)
options := []blammo.Option{
blammo.DebugWriter(stdout),
blammo.InfoWriter(stdout),
blammo.ErrorWriter(stderr),
}
s.Log = blammo.NewLog("astro", options...).Child("server")
}
addr := fmt.Sprintf("%s:%d", s.Host, s.Port)
lis, err := net.Listen("tcp", addr)
if err != nil {
return fmt.Errorf("server failed to start a listener: %w", err)
}
s.Log.Info("listening for TCP traffic on %q", addr)
go s.runHTTPServer(lis)
return nil
}
func (s *Server) runHTTPServer(lis net.Listener) {
err := http.Serve(lis, s)
if err != nil && !errors.Is(err, http.ErrServerClosed) {
s.Error("error in http.Serve: %v", err)
}
}
func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
upgrader := websocket.Upgrader{}
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
s.Error("upgrade error: %v", err)
return
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
s.lastSessionID++
sn := session{
Log: s.Log.Child("sessions").Child(strconv.Itoa(s.lastSessionID)),
id: s.lastSessionID,
conn: conn,
outbox: make(chan wire.Response),
}
go sn.pump(ctx)
for {
t, r, err := conn.NextReader()
if err != nil {
s.Error("read error: %v", err)
return
}
switch t {
case websocket.TextMessage:
text, err := ioutil.ReadAll(r)
if err != nil {
s.Error("readall error: %v", err)
break
}
sn.Log.Child("received-frame").Info(string(text))
var req wire.Request
if err := json.Unmarshal(text, &req); err != nil {
s.Error("unable to parse request: %v", err)
sn.outbox <- wire.ErrorResponse(0, "unable to parse request: %v", err)
break
}
sn.outbox <- wire.NewResponse(req.Seq, wire.OK{})
case websocket.BinaryMessage:
sn.outbox <- wire.ErrorResponse(0, "unable to parse binary frames")
}
}
}

@ -0,0 +1,59 @@
package server
import (
"context"
"encoding/json"
"fmt"
"time"
"github.com/gorilla/websocket"
"github.com/jordanorelli/astro-domu/internal/wire"
"github.com/jordanorelli/blammo"
)
type session struct {
*blammo.Log
id int
conn *websocket.Conn
outbox chan wire.Response
}
// pump is the session send loop. Pump should pump the session's outbox
// messages to the underlying connection until the context is closed.
func (sn *session) pump(ctx context.Context) {
for {
select {
case res := <-sn.outbox:
if err := sn.sendResponse(res); err != nil {
sn.Error(err.Error())
}
case <-ctx.Done():
sn.Info("parent context done, shutting down write pump")
return
}
}
}
func (sn *session) sendResponse(res wire.Response) error {
payload, err := json.Marshal(res)
if err != nil {
return fmt.Errorf("failed to marshal outgoing response: %w", err)
}
if err := sn.conn.SetWriteDeadline(time.Now().Add(3 * time.Second)); err != nil {
return fmt.Errorf("failed to set write deadline: %w", err)
}
w, err := sn.conn.NextWriter(websocket.TextMessage)
if err != nil {
return fmt.Errorf("failed get a writer frame: %w", err)
}
if _, err := w.Write(payload); err != nil {
return fmt.Errorf("failed write payload: %w", err)
}
if err := w.Close(); err != nil {
return fmt.Errorf("failed to close write frame: %w", err)
}
sn.Child("sent-frame").Info(string(payload))
return nil
}

@ -0,0 +1,7 @@
package sim
// entity is any entity that can be simulated.
type entity interface {
// update is the standard tick function
update(time.Duration)
}

@ -0,0 +1,4 @@
package sim
type Room struct {
}

@ -0,0 +1,4 @@
package sim
type World struct {
}

@ -0,0 +1,17 @@
package wire
import (
"fmt"
)
type Error struct {
val error
}
func (e Error) Error() string { return e.val.Error() }
func (e Error) NetTag() Tag { return T_Error }
func (e Error) Unwrap() error { return e.val }
func Errorf(t string, args ...interface{}) Error {
return Error{val: fmt.Errorf(t, args...)}
}

@ -0,0 +1,4 @@
package wire
type OK struct {}
func (OK) NetTag() Tag { return T_OK }

@ -0,0 +1,7 @@
package wire
type Request struct {
Seq int `json:"seq"`
Type string `json:"type"`
Body interface{} `json:"body"`
}

@ -0,0 +1,19 @@
package wire
type Response struct {
Re int `json:"re,omitempty"`
Type Tag `json:"type"`
Body interface{} `json:"body"`
}
func NewResponse(re int, v Value) Response {
return Response{
Re: re,
Type: v.NetTag(),
Body: v,
}
}
func ErrorResponse(re int, t string, args ...interface{}) Response {
return NewResponse(re, Errorf(t, args...))
}

@ -0,0 +1,45 @@
package wire
import (
"encoding/json"
"fmt"
)
type Tag uint
const (
T_None Tag = iota
T_Error
T_OK
)
func (t Tag) String() string {
switch t {
case T_Error:
return "error"
case T_OK:
return "ok"
default:
panic("unknown type tag")
}
}
func (t *Tag) UnmarshalJSON(b []byte) error {
var name string
if err := json.Unmarshal(b, &name); err != nil {
return err
}
switch name {
case "error":
*t = T_Error
return nil
case "ok":
*t = T_OK
return nil
default:
return fmt.Errorf("unknown type tag: %q", name)
}
}
func (t Tag) MarshalJSON() ([]byte, error) { return json.Marshal(t.String()) }

@ -0,0 +1,3 @@
package wire
type Value interface{ NetTag() Tag }

@ -2,9 +2,11 @@ package main
import ( import (
"os" "os"
"os/signal"
"time" "time"
"github.com/jordanorelli/astro-domu/internal/exit" "github.com/jordanorelli/astro-domu/internal/exit"
"github.com/jordanorelli/astro-domu/internal/server"
"github.com/jordanorelli/blammo" "github.com/jordanorelli/blammo"
) )
@ -34,23 +36,16 @@ func main() {
case "client": case "client":
runClient() runClient()
case "server": case "server":
stdout := blammo.NewLineWriter(os.Stdout) s := server.Server{}
stderr := blammo.NewLineWriter(os.Stderr) if err := s.Start(); err != nil {
exit.WithMessage(1, "unable to start server: %v", err)
options := []blammo.Option{
blammo.DebugWriter(stdout),
blammo.InfoWriter(stdout),
blammo.ErrorWriter(stderr),
} }
sig := make(chan os.Signal, 1)
log := blammo.NewLog("astro", options...).Child("server") signal.Notify(sig, os.Interrupt)
s := server{Log: log, host: "127.0.0.1", port: 12805} <-sig
err := s.listen()
log.Error("listen error: %v", err)
default: default:
exit.WithMessage(1, "supported options are [client|server]") exit.WithMessage(1, "supported options are [client|server]")
} }
} }
func runClient() { func runClient() {

@ -1,73 +0,0 @@
package main
import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"strconv"
"github.com/gorilla/websocket"
"github.com/jordanorelli/blammo"
)
type server struct {
*blammo.Log
host string
port int
lastSessionID int
}
func (s *server) listen() error {
return http.ListenAndServe(fmt.Sprintf("%s:%d", s.host, s.port), s)
}
func (s *server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
upgrader := websocket.Upgrader{}
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
s.Error("upgrade error: %v", err)
return
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
s.lastSessionID++
sn := session{
Log: s.Log.Child("sessions").Child(strconv.Itoa(s.lastSessionID)),
id: s.lastSessionID,
conn: conn,
outbox: make(chan response),
}
go sn.pump(ctx)
for {
t, r, err := conn.NextReader()
if err != nil {
s.Error("read error: %v", err)
return
}
switch t {
case websocket.TextMessage:
text, err := ioutil.ReadAll(r)
if err != nil {
s.Error("readall error: %v", err)
break
}
sn.Log.Child("received-frame").Info(string(text))
var body requestBody
if err := json.Unmarshal(text, &body); err != nil {
s.Error("unable to parse request: %v", err)
sn.outbox <- errorResponse(0, fmt.Errorf("unable to parse request: %v", err))
break
}
sn.outbox <- ok(body.Seq)
case websocket.BinaryMessage:
sn.outbox <- errorResponse(0, fmt.Errorf("unable to parse binary frames"))
}
}
}
Loading…
Cancel
Save