Newer
Older
"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
)
// TODO too much fluff. Should probably phase gorilla out and move closer
// to net in connections/session management. At some point...
const (
// Time allowed to write a message to the peer.
writeWait = 0 * time.Second
pongWait = 0 * time.Second
// Send pings to peer with this period. Must be less than pongWait.
pingPeriod = 0 * time.Second
)
// Services requests. Message bytes are passed along with the session
// object. The service is expected to write any response back using
// the Write function on WSSession, which passes the message over
// a channel to the write pump.
type WebSocketService interface {
Process([]byte, *WSSession)
}
// The websocket server handles incoming websocket connection requests,
// upgrading, reading, writing, and session management. Handling the
// actual requests is delegated to a websocket service.
type WebSocketServer struct {
upgrader websocket.Upgrader
running bool
Benjamin Bollen
committed
maxSessions uint16
config *ServerConfig
allOrigins bool
}
// Create a new server.
// maxSessions is the maximum number of active websocket connections that is allowed.
// NOTE: This is not the total number of connections allowed - only those that are
// upgraded to websockets. Requesting a websocket connection will fail with a 503 if
// the server is at capacity.
Benjamin Bollen
committed
func NewWebSocketServer(maxSessions uint16, service WebSocketService) *WebSocketServer {
return &WebSocketServer{
maxSessions: maxSessions,
sessionManager: NewSessionManager(maxSessions, service),
}
}
// Start the server. Adds the handler to the router and sets everything up.
func (this *WebSocketServer) Start(config *ServerConfig, router *gin.Engine) {
ReadBufferSize: int(config.WebSocket.ReadBufferSize),
// TODO Will this be enough for massive "get blockchain" requests?
WriteBufferSize: int(config.WebSocket.WriteBufferSize),
router.GET(config.WebSocket.WebSocketEndpoint, this.handleFunc)
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
this.running = true
}
// Is the server currently running.
func (this *WebSocketServer) Running() bool {
return this.running
}
// Shut the server down.
func (this *WebSocketServer) ShutDown() {
this.sessionManager.Shutdown()
this.running = false
}
// Get the session-manager.
func (this *WebSocketServer) SessionManager() *SessionManager {
return this.sessionManager
}
// Handler for websocket requests.
func (this *WebSocketServer) handleFunc(c *gin.Context) {
r := c.Request
w := c.Writer
// Upgrade to websocket.
wsConn, uErr := this.upgrader.Upgrade(w, r, nil)
if uErr != nil {
uErrStr := "Failed to upgrade to websocket connection: " + uErr.Error()
http.Error(w, uErrStr, 400)
log.Info(uErrStr)
return
}
session, cErr := this.sessionManager.createSession(wsConn)
if cErr != nil {
cErrStr := "Failed to establish websocket connection: " + cErr.Error()
http.Error(w, cErrStr, 503)
log.Info(cErrStr)
return
}
// Start the connection.
log.Info("New websocket connection.", "sessionId", session.id)
session.Open()
}
// Used to track sessions. Will notify when a session are opened
// and closed.
type SessionObserver interface {
NotifyOpened(*WSSession)
NotifyClosed(*WSSession)
}
// WSSession wraps a gorilla websocket.Conn, which in turn wraps a
// net.Conn object. Writing is done using the 'Write([]byte)' method,
// which passes the bytes on to the write pump over a channel.
type WSSession struct {
sessionManager *SessionManager
id uint
wsConn *websocket.Conn
writeChan chan []byte
writeCloseChan chan struct{}
service WebSocketService
opened bool
closed bool
}
// Write a text message to the client.
func (this *WSSession) Write(msg []byte) error {
if this.closed {
log.Warn("Attempting to write to closed session.", "sessionId", this.id)
return fmt.Errorf("Session is closed")
}
this.writeChan <- msg
return nil
}
// Private. Helper for writing control messages.
func (this *WSSession) write(mt int, payload []byte) error {
this.wsConn.SetWriteDeadline(time.Now().Add(writeWait))
return this.wsConn.WriteMessage(mt, payload)
}
// Get the session id number.
func (this *WSSession) Id() uint {
return this.id
}
// Starts the read and write pumps. Blocks on the former.
// Notifies all the observers.
func (this *WSSession) Open() {
this.opened = true
this.sessionManager.notifyOpened(this)
go this.writePump()
this.readPump()
}
// Closes the net connection and cleans up. Notifies all the observers.
func (this *WSSession) Close() {
if !this.closed {
this.closed = true
this.wsConn.Close()
this.sessionManager.removeSession(this.id)
log.Info("Closing websocket connection.", "sessionId", this.id, "remaining", len(this.sessionManager.activeSessions))
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
this.sessionManager.notifyClosed(this)
}
}
// Has the session been opened?
func (this *WSSession) Opened() bool {
return this.opened
}
// Has the session been closed?
func (this *WSSession) Closed() bool {
return this.closed
}
// Pump debugging
/*
var rp int = 0
var wp int = 0
var rpm *sync.Mutex = &sync.Mutex{}
var wpm *sync.Mutex = &sync.Mutex{}
*/
// Read loop. Will terminate on a failed read.
func (this *WSSession) readPump() {
/*
rpm.Lock()
defer func(){
rpm.Lock()
rp--
log.Debug("readpump removed", "total", rp)
rpm.Unlock()
}()
// this.wsConn.SetReadDeadline(time.Now().Add(pongWait))
// this.wsConn.SetPongHandler(func(string) error { this.wsConn.SetReadDeadline(time.Now().Add(pongWait)); return nil })
for {
// Read
msgType, msg, err := this.wsConn.ReadMessage()
// Read error.
if err != nil {
// Socket could have been gracefully closed, so not really an error.
log.Info("Socket closed. Removing.", "error", err.Error())
log.Info("Receiving non text-message from client, closing.")
// Process the request.
this.service.Process(msg, this)
}()
}
}
// Writes messages coming in on the write channel. Will terminate on failed writes,
// if pings are not responded to, or if a message comes in on the write close channel.
func (this *WSSession) writePump() {
/*
wpm.Lock()
defer func() {
wpm.Lock()
wp--
log.Debug("writepump removed", "total", wp)
wpm.Unlock()
}()
// ticker := time.NewTicker(pingPeriod)
// Write loop. Blocks while waiting for data to come in over a channel.
for {
select {
// Write request.
case msg := <-this.writeChan:
// Write the bytes to the socket.
err := this.wsConn.WriteMessage(websocket.TextMessage, msg)
if err != nil {
// Could be due to the socket being closed so not really an error.
log.Info("Writing to socket failed. Closing.")
return
}
case <-this.writeCloseChan:
return
// Ticker run out. Time for another ping message.
/*
case <-ticker.C:
if err := this.write(websocket.PingMessage, []byte{}); err != nil {
log.Debug("Failed to write ping message to socket. Closing.")
return
}
}
}
// Session manager handles the adding, tracking and removing of session objects.
type SessionManager struct {
Benjamin Bollen
committed
maxSessions uint16
activeSessions map[uint]*WSSession
idPool *IdPool
mtx *sync.Mutex
service WebSocketService
openEventChans []chan *WSSession
closeEventChans []chan *WSSession
}
// Create a new WebsocketManager.
Benjamin Bollen
committed
func NewSessionManager(maxSessions uint16, wss WebSocketService) *SessionManager {
maxSessions: maxSessions,
activeSessions: make(map[uint]*WSSession),
Benjamin Bollen
committed
idPool: NewIdPool(uint(maxSessions)),
mtx: &sync.Mutex{},
service: wss,
openEventChans: []chan *WSSession{},
func (this *SessionManager) Shutdown() {
this.activeSessions = nil
}
// Add a listener to session open events.
func (this *SessionManager) SessionOpenEventChannel() <-chan *WSSession {
lChan := make(chan *WSSession, 1)
this.openEventChans = append(this.openEventChans, lChan)
return lChan
}
// Remove a listener from session open events.
func (this *SessionManager) RemoveSessionOpenEventChannel(lChan chan *WSSession) bool {
ec := this.openEventChans
if len(ec) == 0 {
return false
}
for i, c := range ec {
ec[i], ec = ec[len(ec)-1], ec[:len(ec)-1]
return true
}
}
return false
}
// Add a listener to session close events
func (this *SessionManager) SessionCloseEventChannel() <-chan *WSSession {
lChan := make(chan *WSSession, 1)
this.closeEventChans = append(this.closeEventChans, lChan)
return lChan
}
// Remove a listener from session close events.
func (this *SessionManager) RemoveSessionCloseEventChannel(lChan chan *WSSession) bool {
ec := this.closeEventChans
if len(ec) == 0 {
return false
}
for i, c := range ec {
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
ec[i], ec = ec[len(ec)-1], ec[:len(ec)-1]
return true
}
}
return false
}
// Used to notify all observers that a new session was opened.
func (this *SessionManager) notifyOpened(session *WSSession) {
for _, lChan := range this.openEventChans {
lChan <- session
}
}
// Used to notify all observers that a new session was closed.
func (this *SessionManager) notifyClosed(session *WSSession) {
for _, lChan := range this.closeEventChans {
lChan <- session
}
}
// Creates a new session and adds it to the manager.
func (this *SessionManager) createSession(wsConn *websocket.Conn) (*WSSession, error) {
// Check that the capacity hasn't been exceeded.
this.mtx.Lock()
defer this.mtx.Unlock()
if this.atCapacity() {
return nil, fmt.Errorf("Already at capacity")
}
// Create and start
conn := &WSSession{
sessionManager: this,
id: newId,
wsConn: wsConn,
writeChan: make(chan []byte, maxMessageSize),
writeCloseChan: make(chan struct{}),
service: this.service,
}
this.activeSessions[conn.id] = conn
return conn, nil
}
// Remove a session from the list.
func (this *SessionManager) removeSession(id uint) {
this.mtx.Lock()
defer this.mtx.Unlock()
// Check that it exists.
_, ok := this.activeSessions[id]
if ok {
delete(this.activeSessions, id)
this.idPool.ReleaseId(id)
}
}
// True if the number of active connections is at the maximum.
func (this *SessionManager) atCapacity() bool {
return len(this.activeSessions) >= int(this.maxSessions)
}