diff --git a/event/cache_test.go b/event/cache_test.go
index b4957a715df3df78a88100feae2c90c5ca100392..877d04387a2caef15ab0ba135181babf0a7cf44a 100644
--- a/event/cache_test.go
+++ b/event/cache_test.go
@@ -18,30 +18,32 @@ func TestEventCache_Flush(t *testing.T) {
 	flushed := false
 
 	em := NewEmitter(loggers.NewNoopInfoTraceLogger())
-	SubscribeCallback(ctx, em, "nothingness", NewQueryBuilder(), func(message interface{}) {
+	SubscribeCallback(ctx, em, "nothingness", NewQueryBuilder(), func(message interface{}) bool {
 		// Check against sending a buffer of zeroed messages
 		if message == nil {
 			errCh <- fmt.Errorf("recevied empty message but none sent")
 		}
+		return false
 	})
 	evc := NewEventCache(em)
 	evc.Flush()
 	// Check after reset
 	evc.Flush()
-	SubscribeCallback(ctx, em, "somethingness", NewQueryBuilder().AndEquals("foo", "bar"), func(interface{}) {
-		if flushed {
-			errCh <- nil
-		} else {
-			errCh <- fmt.Errorf("callback was run before messages were flushed")
-		}
-	})
+	SubscribeCallback(ctx, em, "somethingness", NewQueryBuilder().AndEquals("foo", "bar"),
+		func(interface{}) bool {
+			if flushed {
+				errCh <- nil
+				return true
+			} else {
+				errCh <- fmt.Errorf("callback was run before messages were flushed")
+				return false
+			}
+		})
 
 	numMessages := 3
 	tags := map[string]interface{}{"foo": "bar"}
 	for i := 0; i < numMessages; i++ {
-		evc.Publish(ctx, "something", tags)
-		evc.Publish(ctx, "something", tags)
-		evc.Publish(ctx, "something", tags)
+		evc.Publish(ctx, fmt.Sprintf("something_%v", i), tags)
 	}
 	flushed = true
 	evc.Flush()
diff --git a/event/convention.go b/event/convention.go
index 1fc69abffc5c58094371c03fbe704c8133c52140..16832d5f3c08f5674fd6473b0b05494b2c8f856f 100644
--- a/event/convention.go
+++ b/event/convention.go
@@ -39,16 +39,24 @@ func PublishWithEventID(publisher Publisher, eventID string, eventData interface
 // Subscribe to messages matching query and launch a goroutine to run a callback for each one. The goroutine will exit
 // when the context is done or the subscription is removed.
 func SubscribeCallback(ctx context.Context, subscribable Subscribable, subscriber string, query Queryable,
-	callback func(message interface{})) error {
+	callback func(message interface{}) bool) error {
 
 	out := make(chan interface{})
 	go func() {
 		for {
 			msg, ok := <-out
 			if !ok {
+				// Channel closed, no need to unsubscribe or drain
+				return
+			}
+			if !callback(msg) {
+				// Callback is requesting stop so unsubscribe and drain channel
+				subscribable.Unsubscribe(context.Background(), subscriber, query)
+				// Not draining channel can starve other subscribers
+				for range out {
+				}
 				return
 			}
-			callback(msg)
 		}
 	}()
 	err := subscribable.Subscribe(ctx, subscriber, query, out)
@@ -62,13 +70,13 @@ func SubscribeCallback(ctx context.Context, subscribable Subscribable, subscribe
 func PublishAll(ctx context.Context, subscribable Subscribable, subscriber string, query Queryable,
 	publisher Publisher, extraTags map[string]interface{}) error {
 
-	return SubscribeCallback(ctx, subscribable, subscriber, query, func(message interface{}) {
+	return SubscribeCallback(ctx, subscribable, subscriber, query, func(message interface{}) bool {
 		tags := make(map[string]interface{})
 		for k, v := range extraTags {
 			tags[k] = v
 		}
-
 		// Help! I can't tell which tags the original publisher used - so I can't forward them on
 		publisher.Publish(ctx, message, tags)
+		return true
 	})
 }
diff --git a/event/convention_test.go b/event/convention_test.go
index 71e4a718f2fc13b43551a01f17a3c151f54a0c0d..31e9571b148affcac0639cf02f8533c283c3d496 100644
--- a/event/convention_test.go
+++ b/event/convention_test.go
@@ -13,8 +13,9 @@ func TestSubscribeCallback(t *testing.T) {
 	ctx := context.Background()
 	em := NewEmitter(loggers.NewNoopInfoTraceLogger())
 	ch := make(chan interface{})
-	SubscribeCallback(ctx, em, "TestSubscribeCallback", MatchAllQueryable(), func(msg interface{}) {
+	SubscribeCallback(ctx, em, "TestSubscribeCallback", MatchAllQueryable(), func(msg interface{}) bool {
 		ch <- msg
+		return true
 	})
 
 	sent := "FROTHY"
diff --git a/execution/events/events.go b/execution/events/events.go
index 0ee74ea4509c3e331c6c07b39a00357c428e5f01..63577bd824e95147748ed45e57def26b11648f65 100644
--- a/execution/events/events.go
+++ b/execution/events/events.go
@@ -67,12 +67,13 @@ func SubscribeAccountOutputSendTx(ctx context.Context, subscribable event.Subscr
 	query := sendTxQuery.And(event.QueryForEventID(EventStringAccountOutput(address))).
 		AndEquals(event.TxHashKey, hex.EncodeUpperToString(txHash))
 
-	return event.SubscribeCallback(ctx, subscribable, subscriber, query, func(message interface{}) {
+	return event.SubscribeCallback(ctx, subscribable, subscriber, query, func(message interface{}) bool {
 		if eventDataCall, ok := message.(*EventDataTx); ok {
 			if sendTx, ok := eventDataCall.Tx.(*txs.SendTx); ok {
 				ch <- sendTx
 			}
 		}
+		return true
 	})
 }
 
diff --git a/execution/evm/events/events.go b/execution/evm/events/events.go
index f018528e2308c969e60769620d62c9945734faa9..64c8ec444919648cf2c64e0a825c768dd3d52cd5 100644
--- a/execution/evm/events/events.go
+++ b/execution/evm/events/events.go
@@ -68,11 +68,12 @@ func SubscribeAccountCall(ctx context.Context, subscribable event.Subscribable,
 		query = query.AndEquals(event.TxHashKey, hex.EncodeUpperToString(txHash))
 	}
 
-	return event.SubscribeCallback(ctx, subscribable, subscriber, query, func(message interface{}) {
+	return event.SubscribeCallback(ctx, subscribable, subscriber, query, func(message interface{}) bool {
 		eventDataCall, ok := message.(*EventDataCall)
 		if ok {
 			ch <- eventDataCall
 		}
+		return true
 	})
 }
 
@@ -81,11 +82,12 @@ func SubscribeLogEvent(ctx context.Context, subscribable event.Subscribable, sub
 
 	query := event.QueryForEventID(EventStringLogEvent(address))
 
-	return event.SubscribeCallback(ctx, subscribable, subscriber, query, func(message interface{}) {
+	return event.SubscribeCallback(ctx, subscribable, subscriber, query, func(message interface{}) bool {
 		eventDataLog, ok := message.(*EventDataLog)
 		if ok {
 			ch <- eventDataLog
 		}
+		return true
 	})
 }
 
diff --git a/rpc/service.go b/rpc/service.go
index cdf139c7675d0bf59db9a1fce1a35b6a87382fac..a9001d512cd3ce0ef300f1ed42579ba311177314 100644
--- a/rpc/service.go
+++ b/rpc/service.go
@@ -38,7 +38,7 @@ const MaxBlockLookback = 100
 
 type SubscribableService interface {
 	// Events
-	Subscribe(ctx context.Context, subscriptionID string, eventID string, callback func(*ResultEvent)) error
+	Subscribe(ctx context.Context, subscriptionID string, eventID string, callback func(*ResultEvent) bool) error
 	Unsubscribe(ctx context.Context, subscriptionID string) error
 }
 
@@ -134,22 +134,24 @@ func (s *service) ListUnconfirmedTxs(maxTxs int) (*ResultListUnconfirmedTxs, err
 }
 
 func (s *service) Subscribe(ctx context.Context, subscriptionID string, eventID string,
-	callback func(resultEvent *ResultEvent)) error {
+	callback func(resultEvent *ResultEvent) bool) error {
 
 	queryBuilder := event.QueryForEventID(eventID)
 	logging.InfoMsg(s.logger, "Subscribing to events",
 		"query", queryBuilder.String(),
-		"subscription_id", subscriptionID)
+		"subscription_id", subscriptionID,
+		"event_id", eventID)
 	return event.SubscribeCallback(ctx, s.subscribable, subscriptionID, queryBuilder,
-		func(message interface{}) {
+		func(message interface{}) bool {
 			resultEvent, err := NewResultEvent(eventID, message)
 			if err != nil {
 				logging.InfoMsg(s.logger, "Received event that could not be mapped to ResultEvent",
 					structure.ErrorKey, err,
+					"subscription_id", subscriptionID,
 					"event_id", eventID)
-				return
+				return true
 			}
-			callback(resultEvent)
+			return callback(resultEvent)
 		})
 }
 
diff --git a/rpc/tm/methods.go b/rpc/tm/methods.go
index 2686f35a83287b65bae45ec2c7952bf8446e06ce..7da5d22b553dba8d0a91746a1b853da5522ac267 100644
--- a/rpc/tm/methods.go
+++ b/rpc/tm/methods.go
@@ -1,14 +1,15 @@
 package tm
 
 import (
-	"fmt"
-
 	"context"
+	"fmt"
 	"time"
 
 	acm "github.com/hyperledger/burrow/account"
 	"github.com/hyperledger/burrow/event"
 	"github.com/hyperledger/burrow/execution"
+	"github.com/hyperledger/burrow/logging"
+	logging_types "github.com/hyperledger/burrow/logging/types"
 	"github.com/hyperledger/burrow/rpc"
 	"github.com/hyperledger/burrow/txs"
 	gorpc "github.com/tendermint/tendermint/rpc/lib/server"
@@ -57,7 +58,8 @@ const (
 
 const SubscriptionTimeoutSeconds = 5 * time.Second
 
-func GetRoutes(service rpc.Service) map[string]*gorpc.RPCFunc {
+func GetRoutes(service rpc.Service, logger logging_types.InfoTraceLogger) map[string]*gorpc.RPCFunc {
+	logger = logging.WithScope(logger, "GetRoutes")
 	return map[string]*gorpc.RPCFunc{
 		// Transact
 		BroadcastTx: gorpc.NewRPCFunc(func(tx txs.Wrapper) (*rpc.ResultBroadcastTx, error) {
@@ -101,9 +103,15 @@ func GetRoutes(service rpc.Service) map[string]*gorpc.RPCFunc {
 			}
 			ctx, cancel := context.WithTimeout(context.Background(), SubscriptionTimeoutSeconds*time.Second)
 			defer cancel()
-			err = service.Subscribe(ctx, subscriptionID, eventID, func(resultEvent *rpc.ResultEvent) {
-				wsCtx.TryWriteRPCResponse(rpctypes.NewRPCSuccessResponse(EventResponseID(wsCtx.Request.ID, eventID),
-					resultEvent))
+			err = service.Subscribe(ctx, subscriptionID, eventID, func(resultEvent *rpc.ResultEvent) bool {
+				keepAlive := wsCtx.TryWriteRPCResponse(rpctypes.NewRPCSuccessResponse(
+					EventResponseID(wsCtx.Request.ID, eventID), resultEvent))
+				if !keepAlive {
+					logging.InfoMsg(logger, "dropping subscription because could not write to websocket",
+						"subscription_id", subscriptionID,
+						"event_id", eventID)
+				}
+				return keepAlive
 			})
 			if err != nil {
 				return nil, err
diff --git a/rpc/tm/server.go b/rpc/tm/server.go
index 13193f34d9a855175f93a2f77251b4a6e8146c3b..25f5f5cd8baae698fa74d7ab53af6290e6d2604c 100644
--- a/rpc/tm/server.go
+++ b/rpc/tm/server.go
@@ -30,7 +30,7 @@ func StartServer(service rpc.Service, pattern, listenAddress string, emitter eve
 	logger logging_types.InfoTraceLogger) (net.Listener, error) {
 
 	logger = logger.With(structure.ComponentKey, "RPC_TM")
-	routes := GetRoutes(service)
+	routes := GetRoutes(service, logger)
 	mux := http.NewServeMux()
 	wm := rpcserver.NewWebsocketManager(routes, rpcserver.EventSubscriber(tendermint.SubscribableAsEventBus(emitter)))
 	mux.HandleFunc(pattern, wm.WebsocketHandler)
diff --git a/rpc/v0/subscriptions.go b/rpc/v0/subscriptions.go
index 80a3d37269973524f5a82942802f6bfe5b8f4bb0..dbfab6f76fd6ef83e48dce613e0154f0c62da146 100644
--- a/rpc/v0/subscriptions.go
+++ b/rpc/v0/subscriptions.go
@@ -106,10 +106,11 @@ func (subs *Subscriptions) Add(eventId string) (string, error) {
 		return "", err
 	}
 	cache := newSubscriptionsCache()
-	err = subs.service.Subscribe(context.Background(), subId, eventId, func(resultEvent *rpc.ResultEvent) {
+	err = subs.service.Subscribe(context.Background(), subId, eventId, func(resultEvent *rpc.ResultEvent) bool {
 		cache.mtx.Lock()
 		defer cache.mtx.Unlock()
 		cache.events = append(cache.events, resultEvent)
+		return true
 	})
 	if err != nil {
 		return "", err
diff --git a/rpc/v0/websocket_service.go b/rpc/v0/websocket_service.go
index 0e0e036f60c9575918d13724f1424ddd9d7e9286..7917d84c12489580f19e8dbd50750d29015a3444 100644
--- a/rpc/v0/websocket_service.go
+++ b/rpc/v0/websocket_service.go
@@ -125,8 +125,9 @@ func (ws *WebsocketService) EventSubscribe(request *rpc.RPCRequest,
 		return nil, rpc.INTERNAL_ERROR, err
 	}
 
-	err = ws.service.Subscribe(context.Background(), subId, eventId, func(resultEvent *rpc.ResultEvent) {
+	err = ws.service.Subscribe(context.Background(), subId, eventId, func(resultEvent *rpc.ResultEvent) bool {
 		ws.writeResponse(subId, resultEvent, session)
+		return true
 	})
 	if err != nil {
 		return nil, rpc.INTERNAL_ERROR, err