From eaa77618a693c7ef1de6b98ea2e5eafdc92fde44 Mon Sep 17 00:00:00 2001
From: Androlo <andreas@erisindustries.com>
Date: Wed, 10 Jun 2015 01:25:13 +0200
Subject: [PATCH] Event polling stress tests

---
 erisdb/erisdbss/http.go           | 201 ---------------------
 erisdb/erisdbss/server_manager.go | 279 ++++++++++++++++++++++++++++++
 erisdb/event_cache.go             |  44 ++---
 erisdb/event_cache_test.go        | 208 ++++++++++++++++++++++
 erisdb/serve.go                   |   2 +-
 server/idpool.go                  |   8 +-
 server/server_test.go             |  16 +-
 server/websocket.go               |   2 +-
 8 files changed, 523 insertions(+), 237 deletions(-)
 create mode 100644 erisdb/erisdbss/server_manager.go
 create mode 100644 erisdb/event_cache_test.go

diff --git a/erisdb/erisdbss/http.go b/erisdb/erisdbss/http.go
index cfd1d803..e318e919 100644
--- a/erisdb/erisdbss/http.go
+++ b/erisdb/erisdbss/http.go
@@ -2,7 +2,6 @@
 package erisdbss
 
 import (
-	"bufio"
 	"bytes"
 	"encoding/json"
 	"fmt"
@@ -13,19 +12,6 @@ import (
 	"github.com/tendermint/tendermint/state"
 	"net/http"
 	"os"
-	"os/exec"
-	"path"
-	"strings"
-	"sync"
-	"time"
-)
-
-const (
-	REAPER_TIMEOUT   = 5 * time.Second
-	REAPER_THRESHOLD = 10 * time.Second
-	CLOSE_TIMEOUT    = 1 * time.Second
-	PORT_BASE        = 29000
-	EXECUTABLE_NAME  = "erisdb"
 )
 
 const TendermintConfigDefault = `# This is a TOML config file.
@@ -125,191 +111,4 @@ func (this *ServerServer) handleFunc(c *gin.Context) {
 	enc.Encode(resp)
 	w.WriteHeader(200)
 
-}
-
-// A serve task. This wraps a running 'erisdb' process.
-type ServeTask struct {
-	sp          *exec.Cmd
-	workDir     string
-	start       time.Time
-	maxDuration time.Duration
-	port        uint16
-}
-
-// Create a new serve task.
-func newServeTask(port uint16, workDir string, maxDuration time.Duration, process *exec.Cmd) *ServeTask {
-	return &ServeTask{
-		process,
-		workDir,
-		time.Now(),
-		maxDuration,
-		port,
-	}
-}
-
-// Catches events that callers subscribe to and adds them to an array ready to be polled.
-type ServerManager struct {
-	mtx      *sync.Mutex
-	idPool   *server.IdPool
-	maxProcs uint
-	running  []*ServeTask
-	reap     bool
-	baseDir  string
-}
-
-//
-func NewServerManager(maxProcs uint, baseDir string) *ServerManager {
-	sm := &ServerManager{
-		mtx:      &sync.Mutex{},
-		idPool:   server.NewIdPool(maxProcs),
-		maxProcs: maxProcs,
-		running:  make([]*ServeTask, 0),
-		reap:     true,
-		baseDir:  baseDir,
-	}
-	go reap(sm)
-	return sm
-}
-
-func reap(sm *ServerManager) {
-	if !sm.reap {
-		return
-	}
-	time.Sleep(REAPER_TIMEOUT)
-	sm.mtx.Lock()
-	defer sm.mtx.Unlock()
-	// The processes are added in order so just read from bottom of array until
-	// a time is below reaper threshold, then break.
-	for len(sm.running) > 0 {
-		task := sm.running[0]
-		if time.Since(task.start) > task.maxDuration {
-			fmt.Printf("[SERVER REAPER] Closing down server on port: %d\n", task.port)
-			task.sp.Process.Kill()
-			sm.running = sm.running[1:]
-		} else {
-			break
-		}
-	}
-	go reap(sm)
-}
-
-// Add a new erisdb process to the list.
-func (this *ServerManager) add(data *RequestData) (*ResponseData, error) {
-	this.mtx.Lock()
-	defer this.mtx.Unlock()
-	config := server.DefaultServerConfig()
-	// Port is PORT_BASE + a value between 1 and the max number of servers.
-	port := uint16(PORT_BASE + this.idPool.GetId())
-	config.Bind.Port = port
-
-	folderName := fmt.Sprintf("testnode%d", port)
-	workDir, errCWD := this.createWorkDir(data, config, folderName)
-	if errCWD != nil {
-		return nil, errCWD
-	}
-	
-	// TODO ...
-	
-	// Create a new erisdb process.
-	proc := exec.Command(EXECUTABLE_NAME, workDir)
-	
-	reader, errSP := proc.StdoutPipe()
-	if errSP != nil {
-		return nil, errSP
-	}
-	
-	scanner := bufio.NewScanner(reader)
-	scanner.Split(bufio.ScanLines)
-	
-	if errStart := proc.Start(); errStart != nil {
-		return nil, errStart
-	}
-	
-	for scanner.Scan() {
-		text := scanner.Text()
-		if strings.Index(text, "DONTMINDME55891") != -1 {
-			break
-		}
-	}
-	if err := scanner.Err(); err != nil {
-		return nil, fmt.Errorf("Error reading from process stdout:", err)
-	}
-	
-	
-	time.Sleep(2000)
-	maxDur := time.Duration(data.MaxDuration) * time.Second
-	if maxDur == 0 {
-		maxDur = REAPER_THRESHOLD
-	}
-	st := newServeTask(port, workDir, maxDur, proc)
-	this.running = append(this.running, st)
-
-	URL := "http://" + config.Bind.Address + ":" + fmt.Sprintf("%d", port) + config.HTTP.JsonRpcPath
-
-	// TODO add validation data. The node should ideally return some post-deploy state data
-	// and send it back with the server URL, so that the validity of the chain can be
-	// established client-side before starting the tests.
-	return &ResponseData{URL: URL}, nil
-}
-
-// Creates a temp folder for the tendermint/erisdb node to run in.
-// Folder name is port based, so port=1337 meens folder="testnode1337"
-// Old folders are cleared out. before creating them, and the server will
-// clean out all of this servers workdir (defaults to ~/.edbservers) when
-// starting and when stopping.
-func (this *ServerManager) createWorkDir(data *RequestData, config *server.ServerConfig, folderName string) (string, error) {
-
-	workDir := path.Join(this.baseDir, folderName)
-	os.RemoveAll(workDir)
-	errED := EnsureDir(workDir)
-	if errED != nil {
-		return "", errED
-	}
-
-	cfgName := path.Join(workDir, "config.toml")
-	scName := path.Join(workDir, "server_conf.toml")
-	pvName := path.Join(workDir, "priv_validator.json")
-	genesisName := path.Join(workDir, "genesis.json")
-
-	// Write config.
-	WriteFile(cfgName, []byte(TendermintConfigDefault))
-
-	// Write validator.
-	errPV := writeJSON(data.PrivValidator, pvName)
-	if errPV != nil {
-		return "", errPV
-	}
-
-	// Write genesis
-	errG := writeJSON(data.Genesis, genesisName)
-	if errG != nil {
-		return "", errG
-	}
-
-	// Write server config.
-	errWC := server.WriteServerConfig(scName, config)
-	if errWC != nil {
-		return "", errWC
-	}
-
-	return workDir, nil
-}
-
-func writeJSON(v interface{}, file string) error {
-	var n int64
-	var errW error
-	fo, errC := os.Create(file)
-	if errC != nil {
-		return errC
-	}
-	binary.WriteJSON(v, fo, &n, &errW)
-	if errW != nil {
-		return errW
-	}
-	errL := fo.Close()
-	if errL != nil {
-		return errL
-	}
-	fmt.Printf("File written to %s.\n", file)
-	return nil
 }
\ No newline at end of file
diff --git a/erisdb/erisdbss/server_manager.go b/erisdb/erisdbss/server_manager.go
new file mode 100644
index 00000000..34a70878
--- /dev/null
+++ b/erisdb/erisdbss/server_manager.go
@@ -0,0 +1,279 @@
+package erisdbss
+
+import (
+	"bufio"
+	"fmt"
+	"github.com/eris-ltd/erisdb/server"
+	"github.com/tendermint/tendermint/binary"
+	. "github.com/tendermint/tendermint/common"
+	"os"
+	"os/exec"
+	"path"
+	"strings"
+	"sync"
+	"time"
+)
+
+const (
+	REAPER_TIMEOUT   = 5 * time.Second
+	REAPER_THRESHOLD = 10 * time.Second 
+	// Ports to new server processes are PORT_BASE + i, where i is an integer from the pool.
+	PORT_BASE        = 29000
+	// How long are we willing to wait for a process to become ready.
+	PROC_START_TIMEOUT = 3*time.Second
+	// Name of the process executable.
+	EXECUTABLE_NAME  = "erisdb"
+)
+
+// Executable processes. These are used to wrap processes that are . 
+type ExecProcess interface {
+	Start(chan <- error)
+	Kill() error
+}
+
+// Wrapper for exec.Cmd. Will wait for a token from stdout using line scanning.
+type CmdProcess struct {
+	cmd *exec.Cmd
+	token string
+}
+
+func newCmdProcess(cmd *exec.Cmd, token string) *CmdProcess {
+	return &CmdProcess{cmd, token}
+}
+
+func (this *CmdProcess) Start(doneChan chan <- error) {
+	
+	reader, errSP := this.cmd.StdoutPipe()
+	
+	if errSP != nil {
+		doneChan <- errSP
+	}
+	
+	scanner := bufio.NewScanner(reader)
+	scanner.Split(bufio.ScanLines)
+	
+	if errStart := this.cmd.Start(); errStart != nil {
+		doneChan <- errStart
+		return
+	}
+	
+	for scanner.Scan() {
+		text := scanner.Text()
+		if strings.Index(text, this.token) != -1 {
+			break
+		}
+	}
+	
+	if err := scanner.Err(); err != nil {
+		 doneChan <- fmt.Errorf("Error reading from process stdout:", err)
+		 return
+	}
+	doneChan <- nil
+}
+
+func (this *CmdProcess) Kill() error {
+	return this.cmd.Process.Kill()
+}
+
+// A serve task. This wraps a running process. It was designed to run 'erisdb' processes.
+type ServeTask struct {
+	sp          ExecProcess
+	workDir     string
+	start       time.Time
+	maxDuration time.Duration
+	port        uint16
+}
+
+// Create a new serve task.
+func newServeTask(port uint16, workDir string, maxDuration time.Duration, process ExecProcess) *ServeTask {
+	return &ServeTask{
+		process,
+		workDir,
+		time.Now(),
+		maxDuration,
+		port,
+	}
+}
+
+// Catches events that callers subscribe to and adds them to an array ready to be polled.
+type ServerManager struct {
+	mtx      *sync.Mutex
+	idPool   *server.IdPool
+	maxProcs uint
+	running  []*ServeTask
+	reap     bool
+	baseDir  string
+}
+
+//
+func NewServerManager(maxProcs uint, baseDir string) *ServerManager {
+	sm := &ServerManager{
+		mtx:      &sync.Mutex{},
+		idPool:   server.NewIdPool(maxProcs),
+		maxProcs: maxProcs,
+		running:  make([]*ServeTask, 0),
+		reap:     true,
+		baseDir:  baseDir,
+	}
+	go reap(sm)
+	return sm
+}
+
+func reap(sm *ServerManager) {
+	if !sm.reap {
+		return
+	}
+	time.Sleep(REAPER_TIMEOUT)
+	sm.mtx.Lock()
+	defer sm.mtx.Unlock()
+	// The processes are added in order so just read from bottom of array until
+	// a time is below reaper threshold, then break.
+	for len(sm.running) > 0 {
+		task := sm.running[0]
+		if time.Since(task.start) > task.maxDuration {
+			fmt.Printf("[SERVER REAPER] Closing down server on port: %d\n", task.port)
+			task.sp.Kill()
+			sm.running = sm.running[1:]
+			sm.idPool.ReleaseId(uint(task.port - PORT_BASE))
+		} else {
+			break
+		}
+	}
+	go reap(sm)
+}
+
+// Add a new erisdb process to the list.
+func (this *ServerManager) add(data *RequestData) (*ResponseData, error) {
+	this.mtx.Lock()
+	defer this.mtx.Unlock()
+	config := server.DefaultServerConfig()
+	// Port is PORT_BASE + a value between 1 and the max number of servers.
+	id, errId := this.idPool.GetId()
+	if errId != nil {
+		return nil, errId
+	}
+	port := uint16(PORT_BASE + id)
+	config.Bind.Port = port
+
+	folderName := fmt.Sprintf("testnode%d", port)
+	workDir, errCWD := this.createWorkDir(data, config, folderName)
+	if errCWD != nil {
+		return nil, errCWD
+	}
+	
+	// TODO ...
+	
+	// Create a new erisdb process.
+	cmd := exec.Command(EXECUTABLE_NAME, workDir)
+	proc := &CmdProcess{cmd, "DONTMINDME55891"}
+	
+	
+	errSt := waitForProcStarted(proc)
+	
+	if errSt != nil {
+		return nil, errSt
+	}
+	
+	maxDur := time.Duration(data.MaxDuration) * time.Second
+	if maxDur == 0 {
+		maxDur = REAPER_THRESHOLD
+	}
+	
+	st := newServeTask(port, workDir, maxDur, proc)
+	this.running = append(this.running, st)
+
+	URL := "http://" + config.Bind.Address + ":" + fmt.Sprintf("%d", port) + config.HTTP.JsonRpcPath
+
+	// TODO add validation data. The node should ideally return some post-deploy state data
+	// and send it back with the server URL, so that the validity of the chain can be
+	// established client-side before starting the tests.
+	return &ResponseData{URL: URL}, nil
+}
+
+// Creates a temp folder for the tendermint/erisdb node to run in.
+// Folder name is port based, so port=1337 meens folder="testnode1337"
+// Old folders are cleared out. before creating them, and the server will
+// clean out all of this servers workdir (defaults to ~/.edbservers) when
+// starting and when stopping.
+func (this *ServerManager) createWorkDir(data *RequestData, config *server.ServerConfig, folderName string) (string, error) {
+
+	workDir := path.Join(this.baseDir, folderName)
+	os.RemoveAll(workDir)
+	errED := EnsureDir(workDir)
+	if errED != nil {
+		return "", errED
+	}
+
+	cfgName := path.Join(workDir, "config.toml")
+	scName := path.Join(workDir, "server_conf.toml")
+	pvName := path.Join(workDir, "priv_validator.json")
+	genesisName := path.Join(workDir, "genesis.json")
+
+	// Write config.
+	WriteFile(cfgName, []byte(TendermintConfigDefault))
+
+	// Write validator.
+	errPV := writeJSON(data.PrivValidator, pvName)
+	if errPV != nil {
+		return "", errPV
+	}
+
+	// Write genesis
+	errG := writeJSON(data.Genesis, genesisName)
+	if errG != nil {
+		return "", errG
+	}
+
+	// Write server config.
+	errWC := server.WriteServerConfig(scName, config)
+	if errWC != nil {
+		return "", errWC
+	}
+
+	return workDir, nil
+}
+
+// Used to write json files using tendermints binary package.
+func writeJSON(v interface{}, file string) error {
+	var n int64
+	var errW error
+	fo, errC := os.Create(file)
+	if errC != nil {
+		return errC
+	}
+	binary.WriteJSON(v, fo, &n, &errW)
+	if errW != nil {
+		return errW
+	}
+	errL := fo.Close()
+	if errL != nil {
+		return errL
+	}
+	fmt.Printf("File written to %s.\n", file)
+	return nil
+}
+
+func waitForProcStarted(proc ExecProcess) error {
+	timeoutChan := make(chan struct{})
+	doneChan := make(chan error)
+	done := new(bool)
+	go func(b *bool){
+		time.Sleep(PROC_START_TIMEOUT)
+		if !*b {
+			timeoutChan <- struct{}{}
+		}
+	}(done)
+	go proc.Start(doneChan)
+	var errSt error
+	select {
+		case errD := <- doneChan:
+			errSt = errD
+			*done = true
+			break
+		case <- timeoutChan:
+			_ = proc.Kill()
+			errSt = fmt.Errorf("Process start timed out") 
+			break
+	}
+	return errSt
+}
\ No newline at end of file
diff --git a/erisdb/event_cache.go b/erisdb/event_cache.go
index 84aacf2c..38cec928 100644
--- a/erisdb/event_cache.go
+++ b/erisdb/event_cache.go
@@ -2,21 +2,25 @@ package erisdb
 
 import (
 	"fmt"
-	ep "github.com/eris-ltd/erisdb/erisdb/pipe"
 	"sync"
 	"time"
 )
 
-const (
-	REAPER_TIMEOUT   = 5 * time.Second
-	REAPER_THRESHOLD = 10 * time.Second
+var (
+	reaperTimeout   = 5 * time.Second
+	reaperThreshold = 10 * time.Second
 )
 
+type EventEmitter interface {
+	Subscribe(subId, eventId string, callback func(interface{})) (bool, error)
+	Unsubscribe(subId string) (bool, error)
+}
+
 type EventCache struct {
 	mtx    *sync.Mutex
 	events []interface{}
 	ts     time.Time
-	subId string
+	subId  string
 }
 
 func newEventCache() *EventCache {
@@ -36,7 +40,7 @@ func (this *EventCache) poll() []interface{} {
 		evts = this.events
 		this.events = []interface{}{}
 	} else {
-		evts =  []interface{}{}
+		evts = []interface{}{}
 	}
 	this.ts = time.Now()
 	return evts
@@ -44,18 +48,18 @@ func (this *EventCache) poll() []interface{} {
 
 // Catches events that callers subscribe to and adds them to an array ready to be polled.
 type EventSubscriptions struct {
-	mtx  *sync.Mutex
-	pipe ep.Pipe
-	subs map[string]*EventCache
-	reap bool
+	mtx          *sync.Mutex
+	eventEmitter EventEmitter
+	subs         map[string]*EventCache
+	reap         bool
 }
 
-func NewEventSubscriptions(pipe ep.Pipe) *EventSubscriptions {
+func NewEventSubscriptions(eventEmitter EventEmitter) *EventSubscriptions {
 	es := &EventSubscriptions{
-		mtx:  &sync.Mutex{},
-		pipe: pipe,
-		subs: make(map[string]*EventCache),
-		reap: true,
+		mtx:          &sync.Mutex{},
+		eventEmitter: eventEmitter,
+		subs:         make(map[string]*EventCache),
+		reap:         true,
 	}
 	go reap(es)
 	return es
@@ -65,15 +69,14 @@ func reap(es *EventSubscriptions) {
 	if !es.reap {
 		return
 	}
-	time.Sleep(REAPER_TIMEOUT)
+	time.Sleep(reaperTimeout)
 	es.mtx.Lock()
 	defer es.mtx.Unlock()
 	for id, sub := range es.subs {
-		if time.Since(sub.ts) > REAPER_THRESHOLD {
-			fmt.Println("[SUBSCRIPTION REAPER] Reaping sub: " + sub.subId)
+		if time.Since(sub.ts) > reaperThreshold {
 			// Seems like Go is ok with this..
 			delete(es.subs, id)
-			es.pipe.Events().Unsubscribe(id)
+			es.eventEmitter.Unsubscribe(id)
 		}
 	}
 	go reap(es)
@@ -90,12 +93,11 @@ func (this *EventSubscriptions) add(eventId string) (string, error) {
 		return "", errSID
 	}
 	cache := newEventCache()
-	_, errC := this.pipe.Events().Subscribe(subId, eventId,
+	_, errC := this.eventEmitter.Subscribe(subId, eventId,
 		func(evt interface{}) {
 			cache.mtx.Lock()
 			defer cache.mtx.Unlock()
 			cache.events = append(cache.events, evt)
-			cache.ts = time.Now()
 		})
 	cache.subId = subId
 	this.subs[subId] = cache
diff --git a/erisdb/event_cache_test.go b/erisdb/event_cache_test.go
new file mode 100644
index 00000000..7585627c
--- /dev/null
+++ b/erisdb/event_cache_test.go
@@ -0,0 +1,208 @@
+package erisdb
+
+import (
+	"encoding/hex"
+	"fmt"
+	"github.com/stretchr/testify/assert"
+	"runtime"
+	"testing"
+	"time"
+)
+
+var mockInterval = 10 * time.Millisecond
+
+type mockSub struct {
+	subId    string
+	eventId  string
+	f        func(interface{})
+	shutdown bool
+	sdChan   chan struct{}
+}
+
+// A mock event
+func newMockSub(subId, eventId string, f func(interface{})) mockSub {
+	return mockSub{subId, eventId, f, false, make(chan struct{})}
+}
+
+type mockEventEmitter struct {
+	subs map[string]mockSub
+}
+
+func newMockEventEmitter() *mockEventEmitter {
+	return &mockEventEmitter{make(map[string]mockSub)}
+}
+
+func (this *mockEventEmitter) Subscribe(subId, eventId string, callback func(interface{})) (bool, error) {
+	if _, ok := this.subs[subId]; ok {
+		return false, nil
+	}
+	me := newMockSub(subId, eventId, callback)
+
+	go func() {
+		<-me.sdChan
+		me.shutdown = true
+	}()
+	go func() {
+		for {
+			if !me.shutdown {
+				me.f(struct{}{})
+			} else {
+				return
+			}
+			time.Sleep(mockInterval)
+		}
+	}()
+	return true, nil
+}
+
+func (this *mockEventEmitter) Unsubscribe(subId string) (bool, error) {
+	sub, ok := this.subs[subId]
+	if !ok {
+		return false, nil
+	}
+	sub.shutdown = true
+	delete(this.subs, subId)
+	return true, nil
+}
+
+// Test that event subscriptions can be added manually and then automatically reaped.
+func TestSubReaping(t *testing.T) {
+	runtime.GOMAXPROCS(runtime.NumCPU())
+	NUM_SUBS := 1000
+	reaperThreshold = 500 * time.Millisecond
+	reaperTimeout = 100 * time.Millisecond
+
+	mee := newMockEventEmitter()
+	eSubs := NewEventSubscriptions(mee)
+	doneChan := make(chan error)
+	go func() {
+		for i := 0; i < NUM_SUBS; i++ {
+			time.Sleep(2 * time.Millisecond)
+			go func() {
+				id, err := eSubs.add("WeirdEvent")
+				if err != nil {
+					doneChan <- err
+					return
+				}
+				if len(id) != 64 {
+					doneChan <- fmt.Errorf("Id not of length 64")
+					return
+				}
+				_ , err2 := hex.DecodeString(id)
+				if err2 != nil {
+					doneChan <- err2
+				}
+				
+				doneChan <- nil
+			}()
+		}
+	}()
+	k := 0
+	for k < NUM_SUBS {
+		err := <- doneChan
+		assert.NoError(t, err)
+		k++
+	}
+	time.Sleep(1100*time.Millisecond)
+	
+	assert.Len(t, mee.subs, 0)
+	assert.Len(t, eSubs.subs, 0)
+	t.Logf("Added %d subs that were all automatically reaped.", NUM_SUBS)
+}
+
+// Test that event subscriptions can be added and removed manually.
+func TestSubManualClose(t *testing.T) {
+	NUM_SUBS := 1000
+	// Keep the reaper out of this.
+	reaperThreshold = 10000 * time.Millisecond
+	reaperTimeout = 10000 * time.Millisecond
+	
+	mee := newMockEventEmitter()
+	eSubs := NewEventSubscriptions(mee)
+	doneChan := make(chan error)
+	go func() {
+		for i := 0; i < NUM_SUBS; i++ {
+			time.Sleep(2 * time.Millisecond)
+			go func() {
+				id, err := eSubs.add("WeirdEvent")
+				if err != nil {
+					doneChan <- err
+					return
+				}
+				if len(id) != 64 {
+					doneChan <- fmt.Errorf("Id not of length 64")
+					return
+				}
+				_ , err2 := hex.DecodeString(id)
+				if err2 != nil {
+					doneChan <- err2
+				}
+				time.Sleep(100*time.Millisecond)
+				err3 := eSubs.remove(id)
+				if err3 != nil {
+					doneChan <- err3
+				}
+				doneChan <- nil
+			}()
+		}
+	}()
+	k := 0
+	for k < NUM_SUBS {
+		err := <- doneChan
+		assert.NoError(t, err)
+		k++
+	}
+	
+	assert.Len(t, mee.subs, 0)
+	assert.Len(t, eSubs.subs, 0)
+	t.Logf("Added %d subs that were all closed down by unsubscribing.", NUM_SUBS)
+}
+
+// Test that the system doesn't fail under high pressure.
+func TestSubFlooding(t *testing.T) {
+	NUM_SUBS := 1000
+	// Keep the reaper out of this.
+	reaperThreshold = 10000 * time.Millisecond
+	reaperTimeout = 10000 * time.Millisecond
+	// Crank it up. Now pressure is 10 times higher on each sub.
+	mockInterval = 1*time.Millisecond
+	mee := newMockEventEmitter()
+	eSubs := NewEventSubscriptions(mee)
+	doneChan := make(chan error)
+	go func() {
+		for i := 0; i < NUM_SUBS; i++ {
+			time.Sleep(1 * time.Millisecond)
+			go func() {
+				id, err := eSubs.add("WeirdEvent")
+				if err != nil {
+					doneChan <- err
+					return
+				}
+				if len(id) != 64 {
+					doneChan <- fmt.Errorf("Id not of length 64")
+					return
+				}
+				_ , err2 := hex.DecodeString(id)
+				if err2 != nil {
+					doneChan <- err2
+				}
+				time.Sleep(1000*time.Millisecond)
+				err3 := eSubs.remove(id)
+				if err3 != nil {
+					doneChan <- err3
+				}
+				doneChan <- nil
+			}()
+		}
+	}()
+	k := 0
+	for k < NUM_SUBS {
+		err := <- doneChan
+		assert.NoError(t, err)
+		k++
+	}
+	
+	assert.Len(t, mee.subs, 0)
+	assert.Len(t, eSubs.subs, 0)
+	t.Logf("Added %d subs that all received 1000 events each. They were all closed down by unsubscribing.", NUM_SUBS)
+}
diff --git a/erisdb/serve.go b/erisdb/serve.go
index 77df2dfd..939b82ca 100644
--- a/erisdb/serve.go
+++ b/erisdb/serve.go
@@ -59,7 +59,7 @@ func ServeErisDB(workDir string) (*server.ServeProcess, error) {
 	// Load the supporting objects.
 	pipe := ep.NewPipe(nd)
 	codec := &TCodec{}
-	evtSubs := NewEventSubscriptions(pipe)
+	evtSubs := NewEventSubscriptions(pipe.Events())
 	// The services.
 	tmwss := NewErisDbWsService(codec, pipe)
 	tmjs := NewErisDbJsonService(codec, pipe, evtSubs)
diff --git a/server/idpool.go b/server/idpool.go
index 96a4feaf..e8964e8c 100644
--- a/server/idpool.go
+++ b/server/idpool.go
@@ -1,6 +1,7 @@
 package server
 
 import (
+	"fmt"
 	"container/list"
 )
 
@@ -25,11 +26,14 @@ func (idp *IdPool) init(totNum uint) {
 }
 
 // Get an id from the pool.
-func (idp *IdPool) GetId() uint {
+func (idp *IdPool) GetId() (uint, error) {
+	if idp.ids.Len() == 0 {
+		return 0, fmt.Errorf("Out of IDs")
+	}
 	val := idp.ids.Front()
 	idp.ids.Remove(val)
 	num, _ := val.Value.(uint)
-	return num
+	return num, nil
 }
 
 // Release an id back into the pool.
diff --git a/server/server_test.go b/server/server_test.go
index 4fc4dcb7..62dbc59b 100644
--- a/server/server_test.go
+++ b/server/server_test.go
@@ -14,7 +14,7 @@ func TestIdGet(t *testing.T) {
 	arr := make([]uint, 100)
 	for i := 0; i < 100; i++ {
 		idparr[i] = uint(i + 1)
-		arr[i] = idPool.GetId()
+		arr[i], _ = idPool.GetId()
 	}
 	assert.Equal(t, idparr, arr, "Array of gotten id's is not [1, 2, ..., 101] as expected")
 }
@@ -25,7 +25,8 @@ func TestIdPut(t *testing.T) {
 		idPool.GetId()
 	}
 	idPool.ReleaseId(5)
-	assert.Equal(t, idPool.GetId(), uint(5), "Id gotten is not 5.")
+	id, _ := idPool.GetId()
+	assert.Equal(t, id, uint(5), "Id gotten is not 5.")
 }
 
 func TestIdFull(t *testing.T) {
@@ -33,13 +34,6 @@ func TestIdFull(t *testing.T) {
 	for i := 0; i < 10; i++ {
 		idPool.GetId()
 	}
-	assert.Panics(t, func(){idPool.GetId()})
-}
-
-func Test(t *testing.T) {
-	idPool := NewIdPool(10)
-	for i := 0; i < 10; i++ {
-		idPool.GetId()
-	}
-	assert.Panics(t, func(){idPool.GetId()})
+	_, err := idPool.GetId()
+	assert.Error(t, err)
 }
\ No newline at end of file
diff --git a/server/websocket.go b/server/websocket.go
index fc673ab4..91972bac 100644
--- a/server/websocket.go
+++ b/server/websocket.go
@@ -403,7 +403,7 @@ func (this *SessionManager) createSession(wsConn *websocket.Conn) (*WSSession, e
 	}
 
 	// Create and start
-	newId := this.idPool.GetId()
+	newId, _ := this.idPool.GetId()
 	conn := &WSSession{
 		sessionManager: this,
 		id:             newId,
-- 
GitLab