From e21b5959694e5c02ed5b8054d7d89cbd4ceeba08 Mon Sep 17 00:00:00 2001 From: Silas Davis <silas@monax.io> Date: Mon, 29 Jan 2018 21:45:01 +0000 Subject: [PATCH] Overhaul events to use Tendermint's new pubsub package. Makes our events non-blocking and adds Query logic. Signed-off-by: Silas Davis <silas@monax.io> --- client/node_client.go | 2 +- client/websocket_client.go | 14 +- cmd/burrow/main.go | 6 +- config/config.go | 6 +- consensus/tendermint/events.go | 102 +++++++++ core/kernel.go | 98 +++++---- core/kernel_test.go | 2 +- event/cache.go | 51 +++-- event/cache_test.go | 59 +++-- event/convention.go | 74 +++++++ event/convention_test.go | 36 +++ event/data.go | 136 ------------ event/data_test.go | 64 ------ event/emitter.go | 192 ++++------------ event/emitter_test.go | 35 +++ event/query.go | 231 ++++++++++++++++++++ event/query_test.go | 43 ++++ execution/block_cache.go | 4 +- execution/events/events.go | 90 +++++++- execution/evm/events/events.go | 50 ++++- execution/evm/log_event_test.go | 42 ++-- execution/evm/vm.go | 18 +- execution/evm/vm_test.go | 48 ++-- execution/execution.go | 40 ++-- execution/execution_test.go | 62 +++--- execution/state_test.go | 4 +- execution/transactor.go | 154 ++++++------- rpc/result.go | 175 ++++++++++----- rpc/result_test.go | 56 ++++- rpc/service.go | 81 +++++-- rpc/tm/client/client.go | 4 +- rpc/tm/integration/client_test.go | 40 +--- rpc/tm/integration/shared.go | 11 +- rpc/tm/integration/websocket_client_test.go | 54 +++-- rpc/tm/integration/websocket_helpers.go | 68 +++--- rpc/tm/methods.go | 33 ++- rpc/tm/server.go | 6 +- rpc/v0/json_service.go | 32 ++- {event => rpc/v0}/subscriptions.go | 50 +++-- {event => rpc/v0}/subscriptions_test.go | 85 +------ rpc/v0/websocket_service.go | 12 +- {core => server}/server.go | 12 +- txs/tx.go | 63 +++--- 43 files changed, 1451 insertions(+), 994 deletions(-) create mode 100644 consensus/tendermint/events.go create mode 100644 event/convention.go create mode 100644 event/convention_test.go delete mode 100644 event/data.go delete mode 100644 event/data_test.go create mode 100644 event/emitter_test.go create mode 100644 event/query.go create mode 100644 event/query_test.go rename {event => rpc/v0}/subscriptions.go (79%) rename {event => rpc/v0}/subscriptions_test.go (70%) rename {core => server}/server.go (77%) diff --git a/client/node_client.go b/client/node_client.go index b4ddbb00..38ced41d 100644 --- a/client/node_client.go +++ b/client/node_client.go @@ -106,7 +106,7 @@ func (burrowNodeClient *burrowNodeClient) DeriveWebsocketClient() (nodeWsClient "endpoint", "/websocket", ) wsClient := rpcclient.NewWSClient(wsAddr, "/websocket") - if _, err = wsClient.Start(); err != nil { + if err = wsClient.Start(); err != nil { return nil, err } derivedBurrowNodeWebsocketClient := &burrowNodeWebsocketClient{ diff --git a/client/websocket_client.go b/client/websocket_client.go index f354bdb2..93abc8c5 100644 --- a/client/websocket_client.go +++ b/client/websocket_client.go @@ -76,11 +76,11 @@ func (burrowNodeWebsocketClient *burrowNodeWebsocketClient) WaitForConfirmation( confirmationChannel := make(chan Confirmation, 1) var latestBlockHash []byte - eventID := exe_events.EventStringAccInput(inputAddr) + eventID := exe_events.EventStringAccountInput(inputAddr) if err := burrowNodeWebsocketClient.Subscribe(eventID); err != nil { return nil, fmt.Errorf("Error subscribing to AccInput event (%s): %v", eventID, err) } - if err := burrowNodeWebsocketClient.Subscribe(tm_types.EventStringNewBlock()); err != nil { + if err := burrowNodeWebsocketClient.Subscribe(tm_types.EventNewBlock); err != nil { return nil, fmt.Errorf("Error subscribing to NewBlock event: %v", err) } // Read the incoming events @@ -115,7 +115,7 @@ func (burrowNodeWebsocketClient *burrowNodeWebsocketClient) WaitForConfirmation( switch response.ID { case tm_client.SubscribeRequestID: resultSubscribe := new(rpc.ResultSubscribe) - err = json.Unmarshal(*response.Result, resultSubscribe) + err = json.Unmarshal(response.Result, resultSubscribe) if err != nil { logging.InfoMsg(burrowNodeWebsocketClient.logger, "Unable to unmarshal ResultSubscribe", structure.ErrorKey, err) @@ -126,9 +126,9 @@ func (burrowNodeWebsocketClient *burrowNodeWebsocketClient) WaitForConfirmation( "event", resultSubscribe.EventID, "subscription_id", resultSubscribe.SubscriptionID) - case tm_client.EventResponseID(tm_types.EventStringNewBlock()): + case tm_client.EventResponseID(tm_types.EventNewBlock): resultEvent := new(rpc.ResultEvent) - err = json.Unmarshal(*response.Result, resultEvent) + err = json.Unmarshal(response.Result, resultEvent) if err != nil { logging.InfoMsg(burrowNodeWebsocketClient.logger, "Unable to unmarshal ResultEvent", structure.ErrorKey, err) @@ -145,14 +145,14 @@ func (burrowNodeWebsocketClient *burrowNodeWebsocketClient) WaitForConfirmation( case tm_client.EventResponseID(eventID): resultEvent := new(rpc.ResultEvent) - err = json.Unmarshal(*response.Result, resultEvent) + err = json.Unmarshal(response.Result, resultEvent) if err != nil { logging.InfoMsg(burrowNodeWebsocketClient.logger, "Unable to unmarshal ResultEvent", structure.ErrorKey, err) continue } - eventDataTx := resultEvent.EventDataTx() + eventDataTx := resultEvent.EventDataTx if eventDataTx == nil { // We are on the lookout for EventDataTx confirmationChannel <- Confirmation{ diff --git a/cmd/burrow/main.go b/cmd/burrow/main.go index 8a82c198..00cc8b0c 100644 --- a/cmd/burrow/main.go +++ b/cmd/burrow/main.go @@ -1,9 +1,9 @@ package main import ( + "context" "fmt" "os" - "strings" "github.com/hyperledger/burrow/config" @@ -42,7 +42,9 @@ func main() { fatalf("could not obtain config: %v", err) } - kern, err := conf.Kernel() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + kern, err := conf.Kernel(ctx) if err != nil { fatalf("could not create Burrow kernel: %v", err) } diff --git a/config/config.go b/config/config.go index a219bf9f..91c5331b 100644 --- a/config/config.go +++ b/config/config.go @@ -3,6 +3,8 @@ package config import ( "fmt" + "context" + acm "github.com/hyperledger/burrow/account" "github.com/hyperledger/burrow/config/source" "github.com/hyperledger/burrow/consensus/tendermint" @@ -37,7 +39,7 @@ func DefaultBurrowConfig() *BurrowConfig { } } -func (conf *BurrowConfig) Kernel() (*core.Kernel, error) { +func (conf *BurrowConfig) Kernel(ctx context.Context) (*core.Kernel, error) { if conf.GenesisDoc == nil { return nil, fmt.Errorf("no GenesisDoc defined in config, cannot make Kernel") } @@ -55,7 +57,7 @@ func (conf *BurrowConfig) Kernel() (*core.Kernel, error) { } privValidator := validator.NewPrivValidatorMemory(val, keys.Signer(keyClient, val.Address())) - return core.NewKernel(privValidator, conf.GenesisDoc, conf.Tendermint.TendermintConfig(), conf.RPC, logger) + return core.NewKernel(ctx, privValidator, conf.GenesisDoc, conf.Tendermint.TendermintConfig(), conf.RPC, logger) } func (conf *BurrowConfig) JSONString() string { diff --git a/consensus/tendermint/events.go b/consensus/tendermint/events.go new file mode 100644 index 00000000..c9e6b509 --- /dev/null +++ b/consensus/tendermint/events.go @@ -0,0 +1,102 @@ +package tendermint + +import ( + "context" + + "github.com/hyperledger/burrow/event" + "github.com/hyperledger/burrow/logging/structure" + tm_types "github.com/tendermint/tendermint/types" + "github.com/tendermint/tmlibs/pubsub" +) + +// Publishes all tendermint events available on subscribable to publisher +func PublishAllEvents(ctx context.Context, fromSubscribable event.Subscribable, subscriber string, + toPublisher event.Publisher) error { + + var err error + + // This is a work-around for the fact we cannot access a message's tags and need a separate query for each event type + tendermintEventTypes := []string{ + tm_types.EventBond, + tm_types.EventCompleteProposal, + tm_types.EventDupeout, + tm_types.EventFork, + tm_types.EventLock, + tm_types.EventNewBlock, + tm_types.EventNewBlockHeader, + tm_types.EventNewRound, + tm_types.EventNewRoundStep, + tm_types.EventPolka, + tm_types.EventRebond, + tm_types.EventRelock, + tm_types.EventTimeoutPropose, + tm_types.EventTimeoutWait, + tm_types.EventTx, + tm_types.EventUnbond, + tm_types.EventUnlock, + tm_types.EventVote, + tm_types.EventProposalHeartbeat, + } + + for _, eventType := range tendermintEventTypes { + publishErr := PublishEvent(ctx, fromSubscribable, subscriber, eventType, toPublisher) + if publishErr != nil && err == nil { + err = publishErr + } + } + + return err +} + +func PublishEvent(ctx context.Context, fromSubscribable event.Subscribable, subscriber string, eventType string, + toPublisher event.Publisher) error { + tags := map[string]interface{}{ + structure.ComponentKey: "Tendermint", + tm_types.EventTypeKey: eventType, + event.EventIDKey: eventType, + } + return event.PublishAll(ctx, fromSubscribable, subscriber, event.WrapQuery(tm_types.QueryForEvent(eventType)), + toPublisher, tags) +} + +type eventBusSubscriber struct { + tm_types.EventBusSubscriber +} + +func EventBusAsSubscribable(eventBus tm_types.EventBusSubscriber) event.Subscribable { + return eventBusSubscriber{eventBus} +} + +func (ebs eventBusSubscriber) Subscribe(ctx context.Context, subscriber string, query event.Queryable, + out chan<- interface{}) error { + qry, err := query.Query() + if err != nil { + return err + } + return ebs.EventBusSubscriber.Subscribe(ctx, subscriber, qry, out) +} + +func (ebs eventBusSubscriber) Unsubscribe(ctx context.Context, subscriber string, query event.Queryable) error { + qry, err := query.Query() + if err != nil { + return err + } + return ebs.EventBusSubscriber.Unsubscribe(ctx, subscriber, qry) +} + +type subscribableEventBus struct { + event.Subscribable +} + +func SubscribableAsEventBus(subscribable event.Subscribable) tm_types.EventBusSubscriber { + return subscribableEventBus{subscribable} +} + +func (seb subscribableEventBus) Subscribe(ctx context.Context, subscriber string, query pubsub.Query, + out chan<- interface{}) error { + return seb.Subscribable.Subscribe(ctx, subscriber, event.WrapQuery(query), out) +} + +func (seb subscribableEventBus) Unsubscribe(ctx context.Context, subscriber string, query pubsub.Query) error { + return seb.Subscribable.Unsubscribe(ctx, subscriber, event.WrapQuery(query)) +} diff --git a/core/kernel.go b/core/kernel.go index 89123de5..5348c70f 100644 --- a/core/kernel.go +++ b/core/kernel.go @@ -29,17 +29,17 @@ import ( "github.com/hyperledger/burrow/execution" "github.com/hyperledger/burrow/genesis" "github.com/hyperledger/burrow/logging" + "github.com/hyperledger/burrow/logging/structure" logging_types "github.com/hyperledger/burrow/logging/types" "github.com/hyperledger/burrow/rpc" "github.com/hyperledger/burrow/rpc/tm" "github.com/hyperledger/burrow/rpc/v0" v0_server "github.com/hyperledger/burrow/rpc/v0/server" + "github.com/hyperledger/burrow/server" "github.com/hyperledger/burrow/txs" tm_config "github.com/tendermint/tendermint/config" - "github.com/tendermint/tendermint/node" tm_types "github.com/tendermint/tendermint/types" dbm "github.com/tendermint/tmlibs/db" - "github.com/tendermint/tmlibs/events" ) const CooldownMilliseconds = 1000 @@ -47,20 +47,18 @@ const ServerShutdownTimeoutMilliseconds = 1000 // Kernel is the root structure of Burrow type Kernel struct { - eventSwitch events.EventSwitch - tmNode *node.Node + emitter event.Emitter service rpc.Service - serverLaunchers []ServerLauncher - servers []Server + serverLaunchers []server.Launcher + servers map[string]server.Server logger logging_types.InfoTraceLogger shutdownNotify chan struct{} shutdownOnce sync.Once } -func NewKernel(privValidator tm_types.PrivValidator, genesisDoc *genesis.GenesisDoc, tmConf *tm_config.Config, +func NewKernel(ctx context.Context, privValidator tm_types.PrivValidator, genesisDoc *genesis.GenesisDoc, tmConf *tm_config.Config, rpcConfig *rpc.RPCConfig, logger logging_types.InfoTraceLogger) (*Kernel, error) { - events.NewEventSwitch().Start() logger = logging.WithScope(logger, "NewKernel") stateDB := dbm.NewDB("burrow_state", dbm.GoLevelDBBackendStr, tmConf.DBDir()) @@ -71,21 +69,20 @@ func NewKernel(privValidator tm_types.PrivValidator, genesisDoc *genesis.Genesis state.Save() blockchain := bcm.NewBlockchain(genesisDoc) - evmEvents := event.NewEmitter(logger) tmGenesisDoc := tendermint.DeriveGenesisDoc(genesisDoc) checker := execution.NewBatchChecker(state, tmGenesisDoc.ChainID, blockchain, logger) - committer := execution.NewBatchCommitter(state, tmGenesisDoc.ChainID, blockchain, evmEvents, logger) + + emitter := event.NewEmitter(logger) + committer := execution.NewBatchCommitter(state, tmGenesisDoc.ChainID, blockchain, emitter, logger) tmNode, err := tendermint.NewNode(tmConf, privValidator, tmGenesisDoc, blockchain, checker, committer, logger) + if err != nil { return nil, err } - // Multiplex Tendermint and EVM events - eventEmitter := event.Multiplex(evmEvents, event.WrapEventSwitch(tmNode.EventSwitch(), logger)) - txCodec := txs.NewGoWireCodec() - transactor := execution.NewTransactor(blockchain, state, eventEmitter, - tendermint.BroadcastTxAsyncFunc(tmNode, txCodec), logger) + transactor := execution.NewTransactor(blockchain, state, emitter, tendermint.BroadcastTxAsyncFunc(tmNode, txCodec), + logger) // TODO: consider whether we need to be more explicit about pre-commit (check cache) versus committed (state) values // Note we pass the checker as the StateIterable to NewService which means the RPC layers will query the check @@ -93,23 +90,42 @@ func NewKernel(privValidator tm_types.PrivValidator, genesisDoc *genesis.Genesis // view of sequence values on the node that a client is communicating with. // Since we don't currently execute EVM code in the checker possible conflicts are limited to account creation // which increments the creator's account Sequence and SendTxs - service := rpc.NewService(state, state, eventEmitter, blockchain, transactor, query.NewNodeView(tmNode, txCodec), - logger) + service := rpc.NewService(ctx, state, state, emitter, blockchain, transactor, query.NewNodeView(tmNode, txCodec), logger) + + launchers := []server.Launcher{ + { + Name: "Tendermint", + Launch: func() (server.Server, error) { + err := tmNode.Start() + if err != nil { + return nil, fmt.Errorf("error starting Tendermint node: %v", err) + } + subscriber := fmt.Sprintf("TendermintFireHose-%s-%s", genesisDoc.ChainName, genesisDoc.ChainID()) + // Multiplex Tendermint and EVM events - servers := []ServerLauncher{ + err = tendermint.PublishAllEvents(ctx, tendermint.EventBusAsSubscribable(tmNode.EventBus()), subscriber, + emitter) + if err != nil { + return nil, fmt.Errorf("could not subscribe to Tendermint events: %v", err) + } + return server.ShutdownFunc(func(ctx context.Context) error { + return tmNode.Stop() + }), nil + }, + }, { - Name: "TM", - Launch: func() (Server, error) { - listener, err := tm.StartServer(service, "/websocket", rpcConfig.TM.ListenAddress, eventEmitter, logger) + Name: "RPC/tm", + Launch: func() (server.Server, error) { + listener, err := tm.StartServer(service, "/websocket", rpcConfig.TM.ListenAddress, emitter, logger) if err != nil { return nil, err } - return ListenersServer(listener), nil + return server.FromListeners(listener), nil }, }, { - Name: "V0", - Launch: func() (Server, error) { + Name: "RPC/V0", + Launch: func() (server.Server, error) { codec := v0.NewTCodec() jsonServer := v0.NewJSONServer(v0.NewJSONService(codec, service)) websocketServer := v0_server.NewWebSocketServer(rpcConfig.V0.Server.WebSocket.MaxWebSocketSessions, @@ -129,10 +145,10 @@ func NewKernel(privValidator tm_types.PrivValidator, genesisDoc *genesis.Genesis } return &Kernel{ - eventSwitch: eventEmitter, - tmNode: tmNode, + emitter: emitter, service: service, - serverLaunchers: servers, + serverLaunchers: launchers, + servers: make(map[string]server.Server), logger: logger, shutdownNotify: make(chan struct{}), }, nil @@ -140,17 +156,13 @@ func NewKernel(privValidator tm_types.PrivValidator, genesisDoc *genesis.Genesis // Boot the kernel starting Tendermint and RPC layers func (kern *Kernel) Boot() error { - _, err := kern.tmNode.Start() - if err != nil { - return fmt.Errorf("error starting Tendermint node: %v", err) - } for _, launcher := range kern.serverLaunchers { - server, err := launcher.Launch() + srvr, err := launcher.Launch() if err != nil { return fmt.Errorf("error launching %s server: %v", launcher.Name, err) } - kern.servers = append(kern.servers, server) + kern.servers[launcher.Name] = srvr } go kern.supervise() return nil @@ -180,11 +192,23 @@ func (kern *Kernel) Shutdown(ctx context.Context) (err error) { logging.InfoMsg(logger, "Shutting down servers") ctx, cancel := context.WithTimeout(ctx, ServerShutdownTimeoutMilliseconds*time.Millisecond) defer cancel() - for _, server := range kern.servers { - err = server.Shutdown(ctx) + // Shutdown servers in reverse order to boot + for i := len(kern.serverLaunchers) - 1; i >= 0; i-- { + name := kern.serverLaunchers[i].Name + srvr, ok := kern.servers[name] + if ok { + logging.InfoMsg(logger, "Shutting down server", "server_name", name) + sErr := srvr.Shutdown(ctx) + if sErr != nil { + logging.InfoMsg(logger, "Failed to shutdown server", + "server_name", name, + structure.ErrorKey, sErr) + if err == nil { + err = sErr + } + } + } } - logging.InfoMsg(logger, "Shutting down Tendermint node") - kern.tmNode.Stop() logging.InfoMsg(logger, "Shutdown complete") logging.Sync(kern.logger) // We don't want to wait for them, but yielding for a cooldown Let other goroutines flush diff --git a/core/kernel_test.go b/core/kernel_test.go index 0f6fd7b0..0ff68e0f 100644 --- a/core/kernel_test.go +++ b/core/kernel_test.go @@ -24,7 +24,7 @@ func TestBootThenShutdown(t *testing.T) { logger := loggers.NewNoopInfoTraceLogger() genesisDoc, privateAccounts := genesis.NewDeterministicGenesis(123).GenesisDoc(1, true, 1000, 1, true, 1000) privValidator := validator.NewPrivValidatorMemory(privateAccounts[0], privateAccounts[0]) - kern, err := NewKernel(privValidator, genesisDoc, tmConf, rpc.DefaultRPCConfig(), logger) + kern, err := NewKernel(context.Background(), privValidator, genesisDoc, tmConf, rpc.DefaultRPCConfig(), logger) require.NoError(t, err) err = kern.Boot() require.NoError(t, err) diff --git a/event/cache.go b/event/cache.go index 7279c097..5be39c53 100644 --- a/event/cache.go +++ b/event/cache.go @@ -1,43 +1,57 @@ package event +import ( + "context" +) + // When exceeded we will trim the buffer's backing array capacity to avoid excessive // allocation - const maximumBufferCapacityToLengthRatio = 2 -// An Cache buffers events for a Fireable -// All events are cached. Filtering happens on Flush +// A Cache buffers events for a Publisher. type Cache struct { - evsw Fireable - events []eventInfo + publisher Publisher + events []messageInfo } -var _ Fireable = &Cache{} +var _ Publisher = &Cache{} // Create a new Cache with an EventSwitch as backend -func NewEventCache(evsw Fireable) *Cache { +func NewEventCache(publisher Publisher) *Cache { return &Cache{ - evsw: evsw, + publisher: publisher, } } // a cached event -type eventInfo struct { - event string - data AnyEventData +type messageInfo struct { + // Hmm... might be unintended interactions with pushing a deadline into a cache - though usually we publish with an + // empty context + ctx context.Context + message interface{} + tags map[string]interface{} } // Cache an event to be fired upon finality. -func (evc *Cache) Fire(event string, eventData interface{}) { +func (evc *Cache) Publish(ctx context.Context, message interface{}, tags map[string]interface{}) error { // append to list (go will grow our backing array exponentially) - evc.events = append(evc.events, eventInfo{event: event, data: MapToAnyEventData(eventData)}) + evc.events = append(evc.events, messageInfo{ + ctx: ctx, + message: message, + tags: tags, + }) + return nil } -// Fire events by running evsw.Fire on all cached events. Blocks. -// Clears cached events -func (evc *Cache) Flush() { - for _, ei := range evc.events { - evc.evsw.Fire(ei.event, ei.data) +// Clears cached events by flushing them to Publisher +func (evc *Cache) Flush() error { + var err error + for _, mi := range evc.events { + publishErr := evc.publisher.Publish(mi.ctx, mi.message, mi.tags) + // Capture first by try to flush the rest + if publishErr != nil && err == nil { + err = publishErr + } } // Clear the buffer by re-slicing its length to zero if cap(evc.events) > len(evc.events)*maximumBufferCapacityToLengthRatio { @@ -49,4 +63,5 @@ func (evc *Cache) Flush() { // in previous cache round evc.events = evc.events[:0] } + return err } diff --git a/event/cache_test.go b/event/cache_test.go index 713b15d6..b4957a71 100644 --- a/event/cache_test.go +++ b/event/cache_test.go @@ -1,37 +1,60 @@ package event import ( + "context" + "fmt" "testing" + "time" "github.com/hyperledger/burrow/logging/loggers" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" ) func TestEventCache_Flush(t *testing.T) { - evts := NewEmitter(loggers.NewNoopInfoTraceLogger()) - evts.Subscribe("nothingness", "", func(data AnyEventData) { - // Check we are not initialising an empty buffer full of zeroed eventInfos in the Cache - require.FailNow(t, "We should never receive a message on this switch since none are fired") + //ctx, cancel := context.WithTimeout(context.Background(), 5 * time.Second) + //defer cancel() + ctx := context.Background() + errCh := make(chan error) + flushed := false + + em := NewEmitter(loggers.NewNoopInfoTraceLogger()) + SubscribeCallback(ctx, em, "nothingness", NewQueryBuilder(), func(message interface{}) { + // Check against sending a buffer of zeroed messages + if message == nil { + errCh <- fmt.Errorf("recevied empty message but none sent") + } }) - evc := NewEventCache(evts) + evc := NewEventCache(em) evc.Flush() // Check after reset evc.Flush() - fail := true - pass := false - evts.Subscribe("somethingness", "something", func(data AnyEventData) { - if fail { - require.FailNow(t, "Shouldn't see a message until flushed") + SubscribeCallback(ctx, em, "somethingness", NewQueryBuilder().AndEquals("foo", "bar"), func(interface{}) { + if flushed { + errCh <- nil + } else { + errCh <- fmt.Errorf("callback was run before messages were flushed") } - pass = true }) - evc.Fire("something", AnyEventData{}) - evc.Fire("something", AnyEventData{}) - evc.Fire("something", AnyEventData{}) - fail = false + + numMessages := 3 + tags := map[string]interface{}{"foo": "bar"} + for i := 0; i < numMessages; i++ { + evc.Publish(ctx, "something", tags) + evc.Publish(ctx, "something", tags) + evc.Publish(ctx, "something", tags) + } + flushed = true evc.Flush() - assert.True(t, pass) + for i := 0; i < numMessages; i++ { + select { + case <-time.After(2 * time.Second): + t.Fatalf("callback did not run before timeout after messages were sent") + case err := <-errCh: + if err != nil { + t.Error(err) + } + } + } } func TestEventCacheGrowth(t *testing.T) { @@ -66,6 +89,6 @@ func TestEventCacheGrowth(t *testing.T) { func fireNEvents(evc *Cache, n int) { for i := 0; i < n; i++ { - evc.Fire("something", AnyEventData{}) + evc.Publish(context.Background(), "something", nil) } } diff --git a/event/convention.go b/event/convention.go new file mode 100644 index 00000000..1fc69abf --- /dev/null +++ b/event/convention.go @@ -0,0 +1,74 @@ +package event + +import ( + "context" + "fmt" + "reflect" +) + +const ( + EventIDKey = "EventID" + MessageTypeKey = "MessageType" + TxTypeKey = "TxType" + TxHashKey = "TxHash" +) + +// Get a query that matches events with a specific eventID +func QueryForEventID(eventID string) *QueryBuilder { + // Since we're accepting external output here there is a chance it won't parse... + return NewQueryBuilder().AndEquals(EventIDKey, eventID) +} + +func PublishWithEventID(publisher Publisher, eventID string, eventData interface{}, + extraTags map[string]interface{}) error { + + if extraTags[EventIDKey] != nil { + return fmt.Errorf("PublishWithEventID was passed the extraTags with %s already set: %s = '%s'", + EventIDKey, EventIDKey, eventID) + } + tags := map[string]interface{}{ + EventIDKey: eventID, + MessageTypeKey: reflect.TypeOf(eventData).String(), + } + for k, v := range extraTags { + tags[k] = v + } + return publisher.Publish(context.Background(), eventData, tags) +} + +// Subscribe to messages matching query and launch a goroutine to run a callback for each one. The goroutine will exit +// when the context is done or the subscription is removed. +func SubscribeCallback(ctx context.Context, subscribable Subscribable, subscriber string, query Queryable, + callback func(message interface{})) error { + + out := make(chan interface{}) + go func() { + for { + msg, ok := <-out + if !ok { + return + } + callback(msg) + } + }() + err := subscribable.Subscribe(ctx, subscriber, query, out) + if err != nil { + // To clean up goroutine - otherwise subscribable should close channel for us + close(out) + } + return err +} + +func PublishAll(ctx context.Context, subscribable Subscribable, subscriber string, query Queryable, + publisher Publisher, extraTags map[string]interface{}) error { + + return SubscribeCallback(ctx, subscribable, subscriber, query, func(message interface{}) { + tags := make(map[string]interface{}) + for k, v := range extraTags { + tags[k] = v + } + + // Help! I can't tell which tags the original publisher used - so I can't forward them on + publisher.Publish(ctx, message, tags) + }) +} diff --git a/event/convention_test.go b/event/convention_test.go new file mode 100644 index 00000000..71e4a718 --- /dev/null +++ b/event/convention_test.go @@ -0,0 +1,36 @@ +package event + +import ( + "context" + "testing" + "time" + + "github.com/hyperledger/burrow/logging/loggers" + "github.com/stretchr/testify/assert" +) + +func TestSubscribeCallback(t *testing.T) { + ctx := context.Background() + em := NewEmitter(loggers.NewNoopInfoTraceLogger()) + ch := make(chan interface{}) + SubscribeCallback(ctx, em, "TestSubscribeCallback", MatchAllQueryable(), func(msg interface{}) { + ch <- msg + }) + + sent := "FROTHY" + + n := 10 + for i := 0; i < n; i++ { + + em.Publish(ctx, sent, nil) + } + + for i := 0; i < n; i++ { + select { + case <-time.After(2 * time.Second): + t.Fatalf("Timed out waiting for event") + case msg := <-ch: + assert.Equal(t, sent, msg) + } + } +} diff --git a/event/data.go b/event/data.go deleted file mode 100644 index 25c22826..00000000 --- a/event/data.go +++ /dev/null @@ -1,136 +0,0 @@ -package event - -import ( - "fmt" - - exe_events "github.com/hyperledger/burrow/execution/events" - evm_events "github.com/hyperledger/burrow/execution/evm/events" - "github.com/tendermint/go-wire/data" - tm_types "github.com/tendermint/tendermint/types" -) - -// Oh for a real sum type - -// AnyEventData provides a single type for our multiplexed event categories of EVM events and Tendermint events -type AnyEventData struct { - TMEventData *tm_types.TMEventData `json:",omitempty"` - BurrowEventData *EventData `json:",omitempty"` - Err *string `json:",omitempty"` -} - -type EventData struct { - EventDataInner `json:"unwrap"` -} - -type EventDataInner interface { -} - -func (ed EventData) Unwrap() EventDataInner { - return ed.EventDataInner -} - -func (ed EventData) MarshalJSON() ([]byte, error) { - return mapper.ToJSON(ed.EventDataInner) -} - -func (ed *EventData) UnmarshalJSON(data []byte) (err error) { - parsed, err := mapper.FromJSON(data) - if err == nil && parsed != nil { - ed.EventDataInner = parsed.(EventDataInner) - } - return err -} - -var mapper = data.NewMapper(EventData{}). - RegisterImplementation(exe_events.EventDataTx{}, "event_data_tx", biota()). - RegisterImplementation(evm_events.EventDataCall{}, "event_data_call", biota()). - RegisterImplementation(evm_events.EventDataLog{}, "event_data_log", biota()) - -// Get whichever element of the AnyEventData sum type that is not nil -func (aed AnyEventData) Get() interface{} { - if aed.TMEventData != nil { - return aed.TMEventData.Unwrap() - } - if aed.BurrowEventData != nil { - return aed.BurrowEventData.Unwrap() - } - if aed.Err != nil { - return *aed.Err - } - return nil -} - -// If this AnyEventData wraps an EventDataNewBlock then return a pointer to that value, else return nil -func (aed AnyEventData) EventDataNewBlock() *tm_types.EventDataNewBlock { - if aed.TMEventData != nil { - eventData, _ := aed.TMEventData.Unwrap().(tm_types.EventDataNewBlock) - return &eventData - } - return nil -} - -// If this AnyEventData wraps an EventDataLog then return a pointer to that value, else return nil -func (aed AnyEventData) EventDataLog() *evm_events.EventDataLog { - if aed.BurrowEventData != nil { - eventData, _ := aed.BurrowEventData.Unwrap().(evm_events.EventDataLog) - return &eventData - } - return nil -} - -// If this AnyEventData wraps an EventDataCall then return a pointer to that value, else return nil -func (aed AnyEventData) EventDataCall() *evm_events.EventDataCall { - if aed.BurrowEventData != nil { - eventData, _ := aed.BurrowEventData.Unwrap().(evm_events.EventDataCall) - return &eventData - } - return nil -} - -// If this AnyEventData wraps an EventDataTx then return a pointer to that value, else return nil -func (aed AnyEventData) EventDataTx() *exe_events.EventDataTx { - if aed.BurrowEventData != nil { - eventData, _ := aed.BurrowEventData.Unwrap().(exe_events.EventDataTx) - return &eventData - } - return nil -} - -func (aed AnyEventData) Error() string { - if aed.Err == nil { - return "" - } - return *aed.Err -} - -// Map any supported event data element to our AnyEventData sum type -func MapToAnyEventData(eventData interface{}) AnyEventData { - switch ed := eventData.(type) { - case AnyEventData: - return ed - - case tm_types.TMEventData: - return AnyEventData{TMEventData: &ed} - - case EventData: - return AnyEventData{BurrowEventData: &ed} - - case EventDataInner: - return AnyEventData{BurrowEventData: &EventData{ - EventDataInner: ed, - }} - - default: - errStr := fmt.Sprintf("could not map event data of type %T to AnyEventData", eventData) - return AnyEventData{Err: &errStr} - } -} - -// Type byte helper -var nextByte byte = 1 - -func biota() (b byte) { - b = nextByte - nextByte++ - return -} diff --git a/event/data_test.go b/event/data_test.go deleted file mode 100644 index 26187967..00000000 --- a/event/data_test.go +++ /dev/null @@ -1,64 +0,0 @@ -package event - -import ( - "encoding/json" - "testing" - - acm "github.com/hyperledger/burrow/account" - exe_events "github.com/hyperledger/burrow/execution/events" - "github.com/hyperledger/burrow/txs" - "github.com/stretchr/testify/assert" - tm_types "github.com/tendermint/tendermint/types" -) - -func TestSerialiseTMEventData(t *testing.T) { - roundTripAnyEventData(t, AnyEventData{ - TMEventData: &tm_types.TMEventData{ - TMEventDataInner: tm_types.EventDataNewBlock{ - Block: &tm_types.Block{ - LastCommit: &tm_types.Commit{}, - Header: &tm_types.Header{ - ChainID: "ChainID-ChainEgo", - }, - Data: &tm_types.Data{}, - }, - }, - }, - }) - -} - -func TestSerialiseEVMEventData(t *testing.T) { - roundTripAnyEventData(t, AnyEventData{ - BurrowEventData: &EventData{ - EventDataInner: exe_events.EventDataTx{ - Tx: &txs.CallTx{ - Address: &acm.Address{1, 2, 2, 3}, - }, - Return: []byte{1, 2, 3}, - Exception: "Exception", - }, - }, - }) -} - -func TestSerialiseError(t *testing.T) { - s := "random error" - roundTripAnyEventData(t, AnyEventData{ - Err: &s, - }) -} - -func roundTripAnyEventData(t *testing.T, aed AnyEventData) { - bs, err := json.Marshal(aed) - assert.NoError(t, err) - - aedOut := new(AnyEventData) - err = json.Unmarshal(bs, aedOut) - assert.NoError(t, err) - - bsOut, err := json.Marshal(aedOut) - assert.NoError(t, err) - assert.Equal(t, string(bs), string(bsOut)) - -} diff --git a/event/emitter.go b/event/emitter.go index 411a79f7..73996b7c 100644 --- a/event/emitter.go +++ b/event/emitter.go @@ -15,193 +15,97 @@ package event import ( + "context" "crypto/rand" "encoding/hex" "fmt" "strings" - "github.com/hyperledger/burrow/logging" "github.com/hyperledger/burrow/logging/structure" logging_types "github.com/hyperledger/burrow/logging/types" + "github.com/hyperledger/burrow/server" "github.com/tendermint/tmlibs/common" - go_events "github.com/tendermint/tmlibs/events" + "github.com/tendermint/tmlibs/pubsub" ) +const DefaultEventBufferCapacity = 2 << 10 + type Subscribable interface { - Subscribe(subId, event string, callback func(AnyEventData)) error - Unsubscribe(subId string) error + // Subscribe to all events matching query, which is a valid tmlibs Query + Subscribe(ctx context.Context, subscriber string, query Queryable, out chan<- interface{}) error + // Unsubscribe subscriber from a specific query string + Unsubscribe(ctx context.Context, subscriber string, query Queryable) error + UnsubscribeAll(ctx context.Context, subscriber string) error } -type Fireable interface { - Fire(event string, data interface{}) +type Publisher interface { + Publish(ctx context.Context, message interface{}, tags map[string]interface{}) error } type Emitter interface { - Fireable - go_events.EventSwitch Subscribable + Publisher + server.Server } // The events struct has methods for working with events. type emitter struct { - // Bah, Service infects everything - *common.BaseService - eventSwitch go_events.EventSwitch - logger logging_types.InfoTraceLogger -} - -var _ Emitter = &emitter{} - -func NewEmitter(logger logging_types.InfoTraceLogger) *emitter { - return WrapEventSwitch(go_events.NewEventSwitch(), logger) + common.BaseService + pubsubServer *pubsub.Server + logger logging_types.InfoTraceLogger } -func WrapEventSwitch(eventSwitch go_events.EventSwitch, logger logging_types.InfoTraceLogger) *emitter { - eventSwitch.Start() +func NewEmitter(logger logging_types.InfoTraceLogger) Emitter { + pubsubServer := pubsub.NewServer(pubsub.BufferCapacity(DefaultEventBufferCapacity)) + pubsubServer.BaseService = *common.NewBaseService(nil, "Emitter", pubsubServer) + pubsubServer.Start() return &emitter{ - BaseService: common.NewBaseService(nil, "BurrowEventEmitter", eventSwitch), - eventSwitch: eventSwitch, - logger: logger.With(structure.ComponentKey, "Events"), + pubsubServer: pubsubServer, + logger: logger.With(structure.ComponentKey, "Events"), } } -// Fireable -func (evts *emitter) Fire(event string, eventData interface{}) { - evts.eventSwitch.FireEvent(event, eventData) -} - -func (evts *emitter) FireEvent(event string, data go_events.EventData) { - evts.Fire(event, data) -} - -// EventSwitch -func (evts *emitter) AddListenerForEvent(listenerID, event string, cb go_events.EventCallback) { - evts.eventSwitch.AddListenerForEvent(listenerID, event, cb) -} - -func (evts *emitter) RemoveListenerForEvent(event string, listenerID string) { - evts.eventSwitch.RemoveListenerForEvent(event, listenerID) -} - -func (evts *emitter) RemoveListener(listenerID string) { - evts.eventSwitch.RemoveListener(listenerID) +// core.Server +func (em *emitter) Shutdown(ctx context.Context) error { + return em.pubsubServer.Stop() } -// Subscribe to an event. -func (evts *emitter) Subscribe(subId, event string, callback func(AnyEventData)) error { - logging.TraceMsg(evts.logger, "Subscribing to event", - structure.ScopeKey, "events.Subscribe", "subId", subId, "event", event) - evts.eventSwitch.AddListenerForEvent(subId, event, func(eventData go_events.EventData) { - if eventData == nil { - logging.TraceMsg(evts.logger, "Sent nil go-events EventData") - return - } - callback(MapToAnyEventData(eventData)) - }) - return nil -} - -// Un-subscribe from an event. -func (evts *emitter) Unsubscribe(subId string) error { - logging.TraceMsg(evts.logger, "Unsubscribing from event", - structure.ScopeKey, "events.Unsubscribe", "subId", subId) - evts.eventSwitch.RemoveListener(subId) - return nil -} - -// Provides an Emitter that wraps many underlying EventEmitters as a -// convenience for Subscribing and Unsubscribing on multiple EventEmitters at -// once -func Multiplex(events ...Emitter) *multiplexedEvents { - return &multiplexedEvents{ - BaseService: common.NewBaseService(nil, "BurrowMultiplexedEventEmitter", nil), - eventEmitters: events, - } -} - -type multiplexedEvents struct { - *common.BaseService - eventEmitters []Emitter +// Publisher +func (em *emitter) Publish(ctx context.Context, message interface{}, tags map[string]interface{}) error { + return em.pubsubServer.PublishWithTags(ctx, message, tags) } -var _ Emitter = &multiplexedEvents{} - -// Subscribe to an event. -func (multiEvents *multiplexedEvents) Subscribe(subId, event string, cb func(AnyEventData)) error { - for _, evts := range multiEvents.eventEmitters { - err := evts.Subscribe(subId, event, cb) - if err != nil { - return err - } - } - return nil -} - -func (multiEvents *multiplexedEvents) Unsubscribe(subId string) error { - for _, evts := range multiEvents.eventEmitters { - err := evts.Unsubscribe(subId) - if err != nil { - return err - } - } - return nil -} - -func (multiEvents *multiplexedEvents) Fire(event string, eventData interface{}) { - for _, evts := range multiEvents.eventEmitters { - evts.Fire(event, eventData) - } -} - -func (multiEvents *multiplexedEvents) FireEvent(event string, eventData go_events.EventData) { - multiEvents.Fire(event, eventData) -} - -// EventSwitch -func (multiEvents *multiplexedEvents) AddListenerForEvent(listenerID, event string, cb go_events.EventCallback) { - for _, evts := range multiEvents.eventEmitters { - evts.AddListenerForEvent(listenerID, event, cb) - } -} - -func (multiEvents *multiplexedEvents) RemoveListenerForEvent(event string, listenerID string) { - for _, evts := range multiEvents.eventEmitters { - evts.RemoveListenerForEvent(event, listenerID) +// Subscribable +func (em *emitter) Subscribe(ctx context.Context, subscriber string, query Queryable, out chan<- interface{}) error { + pubsubQuery, err := query.Query() + if err != nil { + return nil } + return em.pubsubServer.Subscribe(ctx, subscriber, pubsubQuery, out) } -func (multiEvents *multiplexedEvents) RemoveListener(listenerID string) { - for _, evts := range multiEvents.eventEmitters { - evts.RemoveListener(listenerID) +func (em *emitter) Unsubscribe(ctx context.Context, subscriber string, query Queryable) error { + pubsubQuery, err := query.Query() + if err != nil { + return nil } + return em.pubsubServer.Unsubscribe(ctx, subscriber, pubsubQuery) } -type noOpFireable struct { +func (em *emitter) UnsubscribeAll(ctx context.Context, subscriber string) error { + return em.pubsubServer.UnsubscribeAll(ctx, subscriber) } -func (*noOpFireable) Fire(string, interface{}) { - +// NoOpPublisher +func NewNoOpPublisher() Publisher { + return &noOpPublisher{} } -func NewNoOpFireable() Fireable { - return &noOpFireable{} +type noOpPublisher struct { } -// *********************************** Events *********************************** - -// EventSubscribe -type EventSub struct { - SubId string `json:"sub_id"` -} - -// EventUnsubscribe -type EventUnsub struct { - Result bool `json:"result"` -} - -// EventPoll -type PollResponse struct { - Events []interface{} `json:"events"` +func (nop *noOpPublisher) Publish(ctx context.Context, message interface{}, tags map[string]interface{}) error { + return nil } // ************************************************************************************** diff --git a/event/emitter_test.go b/event/emitter_test.go new file mode 100644 index 00000000..473bf4c5 --- /dev/null +++ b/event/emitter_test.go @@ -0,0 +1,35 @@ +package event + +import ( + "context" + "testing" + "time" + + "github.com/hyperledger/burrow/logging/loggers" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestEmitter(t *testing.T) { + em := NewEmitter(loggers.NewNoopInfoTraceLogger()) + ctx := context.Background() + out := make(chan interface{}) + + err := em.Subscribe(ctx, "TestEmitter", NewQueryBuilder().AndStrictlyGreaterThan("foo", 10), out) + require.NoError(t, err) + + msgMiss := struct{ flob string }{"flib"} + err = em.Publish(ctx, msgMiss, map[string]interface{}{"foo": 10}) + assert.NoError(t, err) + + msgHit := struct{ blib string }{"blab"} + err = em.Publish(ctx, msgHit, map[string]interface{}{"foo": 11}) + assert.NoError(t, err) + + select { + case msg := <-out: + assert.Equal(t, msgHit, msg) + case <-time.After(time.Second): + t.Errorf("timed out before receiving message matching subscription query") + } +} diff --git a/event/query.go b/event/query.go new file mode 100644 index 00000000..cd1aa6fc --- /dev/null +++ b/event/query.go @@ -0,0 +1,231 @@ +package event + +import ( + "bytes" + "fmt" + "strconv" + "text/template" + "time" + + "github.com/tendermint/tmlibs/pubsub" + "github.com/tendermint/tmlibs/pubsub/query" +) + +const ( + // Operators + equalString = "=" + greaterThanString = ">" + lessThanString = "<" + greaterOrEqualString = ">=" + lessOrEqualString = "<=" + containsString = "CONTAINS" + andString = "AND" + + // Values + trueString = "true" + falseString = "false" + emptyString = "empty" + timeString = "TIME" + dateString = "DATE" +) + +type Queryable interface { + Query() (pubsub.Query, error) +} + +// A yet-to-parsed query +type QueryString string + +func (qs QueryString) Query() (pubsub.Query, error) { + if isEmpty(string(qs)) { + return query.Empty{}, nil + } + return query.New(string(qs)) +} + +func MatchAllQueryable() Queryable { + return WrapQuery(query.Empty{}) +} + +// A pre-parsed query +type Query struct { + query pubsub.Query +} + +func WrapQuery(qry pubsub.Query) Query { + return Query{qry} +} + +func (q Query) Query() (pubsub.Query, error) { + return q.query, nil +} + +// A fluent query builder +type QueryBuilder struct { + queryString string + condition + // reusable buffer for building queryString + bytes.Buffer + error +} + +// Templates +type condition struct { + Tag string + Op string + Operand string +} + +var conditionTemplate = template.Must(template.New("condition").Parse("{{.Tag}} {{.Op}} {{.Operand}}")) + +// Creates a new query builder with a base query that is the conjunction of all queries passed +func NewQueryBuilder(queries ...string) *QueryBuilder { + qb := new(QueryBuilder) + qb.queryString = qb.and(stringIterator(queries...)) + return qb +} + +func (qb *QueryBuilder) String() string { + return qb.queryString +} + +func (qb *QueryBuilder) Query() (pubsub.Query, error) { + if qb.error != nil { + return nil, qb.error + } + if isEmpty(qb.queryString) { + return query.Empty{}, nil + } + return query.New(qb.String()) +} + +// Creates the conjunction of QueryBuilder and rightQuery +func (qb *QueryBuilder) And(queryBuilders ...*QueryBuilder) *QueryBuilder { + return NewQueryBuilder(qb.and(queryBuilderIterator(queryBuilders...))) +} + +// Creates the conjunction of QueryBuilder and tag = operand +func (qb *QueryBuilder) AndEquals(tag string, operand interface{}) *QueryBuilder { + qb.condition.Tag = tag + qb.condition.Op = equalString + qb.condition.Operand = qb.operand(operand) + return NewQueryBuilder(qb.and(stringIterator(qb.conditionString()))) +} + +func (qb *QueryBuilder) AndGreaterThanOrEqual(tag string, operand interface{}) *QueryBuilder { + qb.condition.Tag = tag + qb.condition.Op = greaterOrEqualString + qb.condition.Operand = qb.operand(operand) + return NewQueryBuilder(qb.and(stringIterator(qb.conditionString()))) +} + +func (qb *QueryBuilder) AndLessThanOrEqual(tag string, operand interface{}) *QueryBuilder { + qb.condition.Tag = tag + qb.condition.Op = lessOrEqualString + qb.condition.Operand = qb.operand(operand) + return NewQueryBuilder(qb.and(stringIterator(qb.conditionString()))) +} + +func (qb *QueryBuilder) AndStrictlyGreaterThan(tag string, operand interface{}) *QueryBuilder { + qb.condition.Tag = tag + qb.condition.Op = greaterThanString + qb.condition.Operand = qb.operand(operand) + return NewQueryBuilder(qb.and(stringIterator(qb.conditionString()))) +} + +func (qb *QueryBuilder) AndStrictlyLessThan(tag string, operand interface{}) *QueryBuilder { + qb.condition.Tag = tag + qb.condition.Op = lessThanString + qb.condition.Operand = qb.operand(operand) + return NewQueryBuilder(qb.and(stringIterator(qb.conditionString()))) +} + +func (qb *QueryBuilder) AndContains(tag string, operand interface{}) *QueryBuilder { + qb.condition.Tag = tag + qb.condition.Op = containsString + qb.condition.Operand = qb.operand(operand) + return NewQueryBuilder(qb.and(stringIterator(qb.conditionString()))) +} + +func (qb *QueryBuilder) and(queryIterator func(func(string))) string { + defer qb.Buffer.Reset() + qb.Buffer.WriteString(qb.queryString) + queryIterator(func(q string) { + if !isEmpty(q) { + if qb.Buffer.Len() > 0 { + qb.Buffer.WriteByte(' ') + qb.Buffer.WriteString(andString) + qb.Buffer.WriteByte(' ') + } + qb.Buffer.WriteString(q) + } + }) + return qb.Buffer.String() +} + +func (qb *QueryBuilder) operand(operand interface{}) string { + defer qb.Buffer.Reset() + switch oper := operand.(type) { + case string: + qb.Buffer.WriteByte('\'') + qb.Buffer.WriteString(oper) + qb.Buffer.WriteByte('\'') + return qb.Buffer.String() + case fmt.Stringer: + return qb.operand(oper.String()) + case bool: + if oper { + return trueString + } + return falseString + case int: + return strconv.FormatInt(int64(oper), 10) + case int64: + return strconv.FormatInt(oper, 10) + case uint: + return strconv.FormatUint(uint64(oper), 10) + case uint64: + return strconv.FormatUint(oper, 10) + case float32: + return strconv.FormatFloat(float64(oper), 'f', -1, 32) + case float64: + return strconv.FormatFloat(float64(oper), 'f', -1, 64) + case time.Time: + qb.Buffer.WriteString(timeString) + qb.Buffer.WriteByte(' ') + qb.Buffer.WriteString(oper.Format(time.RFC3339)) + return qb.Buffer.String() + default: + return fmt.Sprintf("%v", oper) + } +} + +func (qb *QueryBuilder) conditionString() string { + defer qb.Buffer.Reset() + err := conditionTemplate.Execute(&qb.Buffer, qb.condition) + if err != nil && qb.error == nil { + qb.error = err + } + return qb.Buffer.String() +} + +func isEmpty(queryString string) bool { + return queryString == "" || queryString == emptyString +} + +// Iterators over some strings +func stringIterator(strs ...string) func(func(string)) { + return func(callback func(string)) { + for _, s := range strs { + callback(s) + } + } +} + +func queryBuilderIterator(qbs ...*QueryBuilder) func(func(string)) { + return func(callback func(string)) { + for _, qb := range qbs { + callback(qb.String()) + } + } +} diff --git a/event/query_test.go b/event/query_test.go new file mode 100644 index 00000000..d6b2ecf1 --- /dev/null +++ b/event/query_test.go @@ -0,0 +1,43 @@ +package event + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestQueryBuilder(t *testing.T) { + qb := NewQueryBuilder() + qry, err := qb.Query() + require.NoError(t, err) + assert.Equal(t, emptyString, qry.String()) + + qb = qb.AndGreaterThanOrEqual("foo.size", 45) + qry, err = qb.Query() + require.NoError(t, err) + assert.Equal(t, "foo.size >= 45", qry.String()) + + qb = qb.AndEquals("bar.name", "marmot") + qry, err = qb.Query() + require.NoError(t, err) + assert.Equal(t, "foo.size >= 45 AND bar.name = 'marmot'", qry.String()) + + assert.True(t, qry.Matches(map[string]interface{}{"foo.size": 80, "bar.name": "marmot"})) + assert.False(t, qry.Matches(map[string]interface{}{"foo.size": 8, "bar.name": "marmot"})) + assert.False(t, qry.Matches(map[string]interface{}{"foo.size": 80, "bar.name": "marot"})) + + qb = qb.AndContains("bar.desc", "burrow") + qry, err = qb.Query() + require.NoError(t, err) + assert.Equal(t, "foo.size >= 45 AND bar.name = 'marmot' AND bar.desc CONTAINS 'burrow'", qry.String()) + + assert.True(t, qry.Matches(map[string]interface{}{"foo.size": 80, "bar.name": "marmot", "bar.desc": "lives in a burrow"})) + assert.False(t, qry.Matches(map[string]interface{}{"foo.size": 80, "bar.name": "marmot", "bar.desc": "lives in a shoe"})) + + qb = NewQueryBuilder().AndEquals("foo", "bar") + qb = qb.And(NewQueryBuilder().AndGreaterThanOrEqual("frogs", 4)) + qry, err = qb.Query() + require.NoError(t, err) + assert.Equal(t, "foo = 'bar' AND frogs >= 4", qry.String()) +} diff --git a/execution/block_cache.go b/execution/block_cache.go index 3c83e29e..7f1a2403 100644 --- a/execution/block_cache.go +++ b/execution/block_cache.go @@ -18,12 +18,10 @@ import ( "bytes" "fmt" "sort" + "sync" acm "github.com/hyperledger/burrow/account" . "github.com/hyperledger/burrow/binary" - - "sync" - "github.com/tendermint/merkleeyes/iavl" dbm "github.com/tendermint/tmlibs/db" "github.com/tendermint/tmlibs/merkle" diff --git a/execution/events/events.go b/execution/events/events.go index 266a1860..0ee74ea4 100644 --- a/execution/events/events.go +++ b/execution/events/events.go @@ -1,20 +1,24 @@ package events import ( + "context" "encoding/json" "fmt" + "reflect" acm "github.com/hyperledger/burrow/account" + "github.com/hyperledger/burrow/event" "github.com/hyperledger/burrow/txs" + "github.com/tmthrgd/go-hex" ) -func EventStringAccInput(addr acm.Address) string { return fmt.Sprintf("Acc/%s/Input", addr) } -func EventStringAccOutput(addr acm.Address) string { return fmt.Sprintf("Acc/%s/Output", addr) } -func EventStringNameReg(name string) string { return fmt.Sprintf("NameReg/%s", name) } -func EventStringPermissions(name string) string { return fmt.Sprintf("Permissions/%s", name) } -func EventStringBond() string { return "Bond" } -func EventStringUnbond() string { return "Unbond" } -func EventStringRebond() string { return "Rebond" } +func EventStringAccountInput(addr acm.Address) string { return fmt.Sprintf("Acc/%s/Input", addr) } +func EventStringAccountOutput(addr acm.Address) string { return fmt.Sprintf("Acc/%s/Output", addr) } +func EventStringNameReg(name string) string { return fmt.Sprintf("NameReg/%s", name) } +func EventStringPermissions(name string) string { return fmt.Sprintf("Permissions/%s", name) } +func EventStringBond() string { return "Bond" } +func EventStringUnbond() string { return "Unbond" } +func EventStringRebond() string { return "Rebond" } // All txs fire EventDataTx, but only CallTx might have Return or Exception type EventDataTx struct { @@ -23,6 +27,11 @@ type EventDataTx struct { Exception string `json:"exception"` } +// For re-use +var sendTxQuery = event.NewQueryBuilder(). + AndEquals(event.MessageTypeKey, reflect.TypeOf(EventDataTx{}).String()). + AndEquals(event.TxTypeKey, reflect.TypeOf(&txs.SendTx{}).String()) + type eventDataTx struct { Tx txs.Wrapper `json:"tx"` Return []byte `json:"return"` @@ -49,3 +58,70 @@ func (edTx *EventDataTx) UnmarshalJSON(data []byte) error { edTx.Exception = model.Exception return nil } + +// Publish/Subscribe + +func SubscribeAccountOutputSendTx(ctx context.Context, subscribable event.Subscribable, subscriber string, + address acm.Address, txHash []byte, ch chan<- *txs.SendTx) error { + + query := sendTxQuery.And(event.QueryForEventID(EventStringAccountOutput(address))). + AndEquals(event.TxHashKey, hex.EncodeUpperToString(txHash)) + + return event.SubscribeCallback(ctx, subscribable, subscriber, query, func(message interface{}) { + if eventDataCall, ok := message.(*EventDataTx); ok { + if sendTx, ok := eventDataCall.Tx.(*txs.SendTx); ok { + ch <- sendTx + } + } + }) +} + +func PublishAccountOutput(publisher event.Publisher, address acm.Address, txHash []byte, + tx txs.Tx, ret []byte, exception string) error { + + return event.PublishWithEventID(publisher, EventStringAccountOutput(address), + &EventDataTx{ + Tx: tx, + Return: ret, + Exception: exception, + }, + map[string]interface{}{ + "address": address, + event.TxTypeKey: reflect.TypeOf(tx).String(), + event.TxHashKey: hex.EncodeUpperToString(txHash), + }) +} + +func PublishAccountInput(publisher event.Publisher, address acm.Address, txHash []byte, + tx txs.Tx, ret []byte, exception string) error { + + return event.PublishWithEventID(publisher, EventStringAccountInput(address), + &EventDataTx{ + Tx: tx, + Return: ret, + Exception: exception, + }, + map[string]interface{}{ + "address": address, + event.TxTypeKey: reflect.TypeOf(tx).String(), + event.TxHashKey: hex.EncodeUpperToString(txHash), + }) +} + +func PublishNameReg(publisher event.Publisher, txHash []byte, tx *txs.NameTx) error { + return event.PublishWithEventID(publisher, EventStringNameReg(tx.Name), &EventDataTx{Tx: tx}, + map[string]interface{}{ + "name": tx.Name, + event.TxTypeKey: reflect.TypeOf(tx).String(), + event.TxHashKey: hex.EncodeUpperToString(txHash), + }) +} + +func PublishPermissions(publisher event.Publisher, name string, txHash []byte, tx *txs.PermissionsTx) error { + return event.PublishWithEventID(publisher, EventStringPermissions(name), &EventDataTx{Tx: tx}, + map[string]interface{}{ + "name": name, + event.TxTypeKey: reflect.TypeOf(tx).String(), + event.TxHashKey: hex.EncodeUpperToString(txHash), + }) +} diff --git a/execution/evm/events/events.go b/execution/evm/events/events.go index 4bc7a323..f018528e 100644 --- a/execution/evm/events/events.go +++ b/execution/evm/events/events.go @@ -15,16 +15,19 @@ package events import ( + "context" "fmt" acm "github.com/hyperledger/burrow/account" . "github.com/hyperledger/burrow/binary" + "github.com/hyperledger/burrow/event" + "github.com/tmthrgd/go-hex" ) // Functions to generate eventId strings -func EventStringAccCall(addr acm.Address) string { return fmt.Sprintf("Acc/%s/Call", addr) } -func EventStringLogEvent(addr acm.Address) string { return fmt.Sprintf("Log/%s", addr) } +func EventStringAccountCall(addr acm.Address) string { return fmt.Sprintf("Acc/%s/Call", addr) } +func EventStringLogEvent(addr acm.Address) string { return fmt.Sprintf("Log/%s", addr) } //---------------------------------------- @@ -52,3 +55,46 @@ type EventDataLog struct { Data []byte `json:"data"` Height uint64 `json:"height"` } + +// Publish/Subscribe + +// Subscribe to account call event - if TxHash is provided listens for a specifc Tx otherwise captures all +func SubscribeAccountCall(ctx context.Context, subscribable event.Subscribable, subscriber string, address acm.Address, + txHash []byte, ch chan<- *EventDataCall) error { + + query := event.QueryForEventID(EventStringAccountCall(address)) + + if len(txHash) > 0 { + query = query.AndEquals(event.TxHashKey, hex.EncodeUpperToString(txHash)) + } + + return event.SubscribeCallback(ctx, subscribable, subscriber, query, func(message interface{}) { + eventDataCall, ok := message.(*EventDataCall) + if ok { + ch <- eventDataCall + } + }) +} + +func SubscribeLogEvent(ctx context.Context, subscribable event.Subscribable, subscriber string, address acm.Address, + ch chan<- *EventDataLog) error { + + query := event.QueryForEventID(EventStringLogEvent(address)) + + return event.SubscribeCallback(ctx, subscribable, subscriber, query, func(message interface{}) { + eventDataLog, ok := message.(*EventDataLog) + if ok { + ch <- eventDataLog + } + }) +} + +func PublishAccountCall(publisher event.Publisher, address acm.Address, eventDataCall *EventDataCall) error { + return event.PublishWithEventID(publisher, EventStringAccountCall(address), eventDataCall, + map[string]interface{}{"address": address, event.TxHashKey: hex.EncodeUpperToString(eventDataCall.TxID)}) +} + +func PublishLogEvent(publisher event.Publisher, address acm.Address, eventDataLog *EventDataLog) error { + return event.PublishWithEventID(publisher, EventStringLogEvent(address), eventDataLog, + map[string]interface{}{"address": address}) +} diff --git a/execution/evm/log_event_test.go b/execution/evm/log_event_test.go index 76b7dc75..496e4e61 100644 --- a/execution/evm/log_event_test.go +++ b/execution/evm/log_event_test.go @@ -16,8 +16,10 @@ package evm import ( "bytes" + "context" "reflect" "testing" + "time" acm "github.com/hyperledger/burrow/account" . "github.com/hyperledger/burrow/binary" @@ -25,6 +27,7 @@ import ( . "github.com/hyperledger/burrow/execution/evm/asm" "github.com/hyperledger/burrow/execution/evm/events" "github.com/hyperledger/burrow/logging/loggers" + "github.com/stretchr/testify/require" ) var expectedData = []byte{0x10} @@ -51,27 +54,13 @@ func TestLog4(t *testing.T) { ourVm := NewVM(st, DefaultDynamicMemoryProvider, newParams(), acm.ZeroAddress, nil, logger) - eventSwitch := event.NewEmitter(loggers.NewNoopInfoTraceLogger()) - eventID := events.EventStringLogEvent(account2.Address()) + emitter := event.NewEmitter(loggers.NewNoopInfoTraceLogger()) - doneChan := make(chan struct{}, 1) + ch := make(chan *events.EventDataLog) - eventSwitch.Subscribe("test", eventID, func(eventData event.AnyEventData) { - logEvent := eventData.EventDataLog() - // No need to test address as this event would not happen if it wasn't correct - if !reflect.DeepEqual(logEvent.Topics, expectedTopics) { - t.Errorf("Event topics are wrong. Got: %v. Expected: %v", logEvent.Topics, expectedTopics) - } - if !bytes.Equal(logEvent.Data, expectedData) { - t.Errorf("Event data is wrong. Got: %s. Expected: %s", logEvent.Data, expectedData) - } - if logEvent.Height != expectedHeight { - t.Errorf("Event block height is wrong. Got: %d. Expected: %d", logEvent.Height, expectedHeight) - } - doneChan <- struct{}{} - }) + require.NoError(t, events.SubscribeLogEvent(context.Background(), emitter, "test", account2.Address(), ch)) - ourVm.SetFireable(eventSwitch) + ourVm.SetPublisher(emitter) var gas uint64 = 100000 @@ -95,8 +84,19 @@ func TestLog4(t *testing.T) { } _, err := ourVm.Call(account1, account2, code, []byte{}, 0, &gas) - <-doneChan - if err != nil { - t.Fatal(err) + require.NoError(t, err) + select { + case <-time.After(5 * time.Second): + t.Fatalf("timedout waiting for EventDataLog") + case eventDataLog := <-ch: + if !reflect.DeepEqual(eventDataLog.Topics, expectedTopics) { + t.Errorf("Event topics are wrong. Got: %v. Expected: %v", eventDataLog.Topics, expectedTopics) + } + if !bytes.Equal(eventDataLog.Data, expectedData) { + t.Errorf("Event data is wrong. Got: %s. Expected: %s", eventDataLog.Data, expectedData) + } + if eventDataLog.Height != expectedHeight { + t.Errorf("Event block height is wrong. Got: %d. Expected: %d", eventDataLog.Height, expectedHeight) + } } } diff --git a/execution/evm/vm.go b/execution/evm/vm.go index 780d3032..89a8304b 100644 --- a/execution/evm/vm.go +++ b/execution/evm/vm.go @@ -76,7 +76,7 @@ type VM struct { origin acm.Address txid []byte callDepth int - evc event.Fireable + publisher event.Publisher logger logging_types.InfoTraceLogger } @@ -98,8 +98,8 @@ func (vm *VM) Debugf(format string, a ...interface{}) { } // satisfies go_events.Eventable -func (vm *VM) SetFireable(evc event.Fireable) { - vm.evc = evc +func (vm *VM) SetPublisher(publisher event.Publisher) { + vm.publisher = publisher } // CONTRACT: it is the duty of the contract writer to call known permissions @@ -122,9 +122,8 @@ func HasPermission(state acm.StateWriter, acc acm.Account, perm ptypes.PermFlag) func (vm *VM) fireCallEvent(exception *string, output *[]byte, callerAddress, calleeAddress acm.Address, input []byte, value uint64, gas *uint64) { // fire the post call event (including exception if applicable) - if vm.evc != nil { - stringAccCall := events.EventStringAccCall(calleeAddress) - vm.evc.Fire(stringAccCall, events.EventDataCall{ + if vm.publisher != nil { + events.PublishAccountCall(vm.publisher, calleeAddress, &events.EventDataCall{ &events.CallData{Caller: callerAddress, Callee: calleeAddress, Data: input, Value: value, Gas: *gas}, vm.origin, vm.txid, @@ -778,16 +777,15 @@ func (vm *VM) call(caller, callee acm.MutableAccount, code, input []byte, value vm.Debugf(" => Memory err: %s", memErr) return nil, firstErr(err, ErrMemoryOutOfBounds) } - if vm.evc != nil { + if vm.publisher != nil { eventID := events.EventStringLogEvent(callee.Address()) fmt.Printf("eventID: %s\n", eventID) - log := events.EventDataLog{ + events.PublishLogEvent(vm.publisher, callee.Address(), &events.EventDataLog{ Address: callee.Address(), Topics: topics, Data: data, Height: vm.params.BlockHeight, - } - vm.evc.Fire(eventID, log) + }) } vm.Debugf(" => T:%X D:%X\n", topics, data) diff --git a/execution/evm/vm_test.go b/execution/evm/vm_test.go index ecff1891..44b7dbc6 100644 --- a/execution/evm/vm_test.go +++ b/execution/evm/vm_test.go @@ -15,6 +15,7 @@ package evm import ( + "context" "encoding/hex" "fmt" "testing" @@ -26,7 +27,6 @@ import ( . "github.com/hyperledger/burrow/binary" "github.com/hyperledger/burrow/event" - exe_events "github.com/hyperledger/burrow/execution/events" . "github.com/hyperledger/burrow/execution/evm/asm" . "github.com/hyperledger/burrow/execution/evm/asm/bc" evm_events "github.com/hyperledger/burrow/execution/evm/events" @@ -391,47 +391,41 @@ func makeAccountWithCode(state acm.Updater, name string, // event (in the case of no direct error from call we will block waiting for // at least 1 AccCall event) func runVMWaitError(ourVm *VM, caller, callee acm.MutableAccount, subscribeAddr acm.Address, - contractCode []byte, gas uint64) (output []byte, err error) { - eventCh := make(chan event.EventData) - output, err = runVM(eventCh, ourVm, caller, callee, subscribeAddr, - contractCode, gas) + contractCode []byte, gas uint64) ([]byte, error) { + eventCh := make(chan *evm_events.EventDataCall) + output, err := runVM(eventCh, ourVm, caller, callee, subscribeAddr, contractCode, gas) if err != nil { - return + return output, err } - msg := <-eventCh - var errString string - switch ev := msg.Unwrap().(type) { - case exe_events.EventDataTx: - errString = ev.Exception - case evm_events.EventDataCall: - errString = ev.Exception - } - - if errString != "" { - err = errors.New(errString) + select { + case eventDataCall := <-eventCh: + if eventDataCall.Exception != "" { + return output, errors.New(eventDataCall.Exception) + } + return output, nil } - return } // Subscribes to an AccCall, runs the vm, returns the output and any direct // exception -func runVM(eventCh chan event.EventData, ourVm *VM, caller, callee acm.MutableAccount, +func runVM(eventCh chan<- *evm_events.EventDataCall, ourVm *VM, caller, callee acm.MutableAccount, subscribeAddr acm.Address, contractCode []byte, gas uint64) ([]byte, error) { // we need to catch the event from the CALL to check for exceptions - evsw := event.NewEmitter(loggers.NewNoopInfoTraceLogger()) + emitter := event.NewEmitter(loggers.NewNoopInfoTraceLogger()) fmt.Printf("subscribe to %s\n", subscribeAddr) - evsw.Subscribe("test", evm_events.EventStringAccCall(subscribeAddr), - func(msg event.AnyEventData) { - eventCh <- *msg.BurrowEventData - }) - evc := event.NewEventCache(evsw) - ourVm.SetFireable(evc) + + err := evm_events.SubscribeAccountCall(context.Background(), emitter, "test", subscribeAddr, nil, eventCh) + if err != nil { + return nil, err + } + evc := event.NewEventCache(emitter) + ourVm.SetPublisher(evc) start := time.Now() output, err := ourVm.Call(caller, callee, contractCode, []byte{}, 0, &gas) fmt.Printf("Output: %v Error: %v\n", output, err) fmt.Println("Call took:", time.Since(start)) - go func() { evc.Flush() }() + evc.Flush() return output, err } diff --git a/execution/execution.go b/execution/execution.go index 1f755b4c..265e6c62 100644 --- a/execution/execution.go +++ b/execution/execution.go @@ -57,7 +57,7 @@ type executor struct { runCall bool state *State blockCache *BlockCache - fireable event.Fireable + publisher event.Publisher eventCache *event.Cache logger logging_types.InfoTraceLogger } @@ -69,16 +69,16 @@ func NewBatchChecker(state *State, chainID string, tip bcm.Tip, logger logging_types.InfoTraceLogger) BatchExecutor { - return newExecutor(false, state, chainID, tip, event.NewNoOpFireable(), + return newExecutor(false, state, chainID, tip, event.NewNoOpPublisher(), logging.WithScope(logger, "NewBatchExecutor")) } func NewBatchCommitter(state *State, chainID string, tip bcm.Tip, - fireable event.Fireable, + publisher event.Publisher, logger logging_types.InfoTraceLogger) BatchCommitter { - return newExecutor(true, state, chainID, tip, fireable, + return newExecutor(true, state, chainID, tip, publisher, logging.WithScope(logger, "NewBatchCommitter")) } @@ -86,7 +86,7 @@ func newExecutor(runCall bool, state *State, chainID string, tip bcm.Tip, - eventFireable event.Fireable, + eventFireable event.Publisher, logger logging_types.InfoTraceLogger) *executor { return &executor{ chainID: chainID, @@ -94,7 +94,7 @@ func newExecutor(runCall bool, runCall: runCall, state: state, blockCache: NewBlockCache(state), - fireable: eventFireable, + publisher: eventFireable, eventCache: event.NewEventCache(eventFireable), logger: logger.With(structure.ComponentKey, "Execution"), } @@ -144,7 +144,7 @@ func (exe *executor) Commit() ([]byte, error) { func (exe *executor) Reset() error { exe.blockCache = NewBlockCache(exe.state) - exe.eventCache = event.NewEventCache(exe.fireable) + exe.eventCache = event.NewEventCache(exe.publisher) return nil } @@ -207,12 +207,13 @@ func (exe *executor) Execute(tx txs.Tx) error { // if the exe.eventCache is nil, nothing will happen if exe.eventCache != nil { + txHash := txs.TxHash(exe.chainID, tx) for _, i := range tx.Inputs { - exe.eventCache.Fire(events.EventStringAccInput(i.Address), events.EventDataTx{tx, nil, ""}) + events.PublishAccountInput(exe.eventCache, i.Address, txHash, tx, nil, "") } for _, o := range tx.Outputs { - exe.eventCache.Fire(events.EventStringAccOutput(o.Address), events.EventDataTx{tx, nil, ""}) + events.PublishAccountOutput(exe.eventCache, o.Address, txHash, tx, nil, "") } } return nil @@ -363,7 +364,7 @@ func (exe *executor) Execute(tx txs.Tx) error { txCache.UpdateAccount(callee) vmach := evm.NewVM(txCache, evm.DefaultDynamicMemoryProvider, params, caller.Address(), txs.TxHash(exe.chainID, tx), logger) - vmach.SetFireable(exe.eventCache) + vmach.SetPublisher(exe.eventCache) // NOTE: Call() transfers the value from caller to callee iff call succeeds. ret, err = vmach.Call(caller, callee, code, tx.Data, value, &gas) if err != nil { @@ -396,11 +397,10 @@ func (exe *executor) Execute(tx txs.Tx) error { if err != nil { exception = err.Error() } - exe.eventCache.Fire(events.EventStringAccInput(tx.Input.Address), - events.EventDataTx{tx, ret, exception}) + txHash := txs.TxHash(exe.chainID, tx) + events.PublishAccountInput(exe.eventCache, tx.Input.Address, txHash, tx, ret, exception) if tx.Address != nil { - exe.eventCache.Fire(events.EventStringAccOutput(*tx.Address), - events.EventDataTx{tx, ret, exception}) + events.PublishAccountOutput(exe.eventCache, *tx.Address, txHash, tx, ret, exception) } } } else { @@ -565,8 +565,9 @@ func (exe *executor) Execute(tx txs.Tx) error { // TODO: maybe we want to take funds on error and allow txs in that don't do anythingi? if exe.eventCache != nil { - exe.eventCache.Fire(events.EventStringAccInput(tx.Input.Address), events.EventDataTx{tx, nil, ""}) - exe.eventCache.Fire(events.EventStringNameReg(tx.Name), events.EventDataTx{tx, nil, ""}) + txHash := txs.TxHash(exe.chainID, tx) + events.PublishAccountInput(exe.eventCache, tx.Input.Address, txHash, tx, nil, "") + events.PublishNameReg(exe.eventCache, txHash, tx) } return nil @@ -809,10 +810,9 @@ func (exe *executor) Execute(tx txs.Tx) error { } if exe.eventCache != nil { - exe.eventCache.Fire(events.EventStringAccInput(tx.Input.Address), - events.EventDataTx{tx, nil, ""}) - exe.eventCache.Fire(events.EventStringPermissions(permission.PermFlagToString(permFlag)), - events.EventDataTx{tx, nil, ""}) + txHash := txs.TxHash(exe.chainID, tx) + events.PublishAccountInput(exe.eventCache, tx.Input.Address, txHash, tx, nil, "") + events.PublishPermissions(exe.eventCache, permission.PermFlagToString(permFlag), txHash, tx) } return nil diff --git a/execution/execution_test.go b/execution/execution_test.go index a6774d05..dfc0086e 100644 --- a/execution/execution_test.go +++ b/execution/execution_test.go @@ -16,6 +16,7 @@ package execution import ( "bytes" + "context" "fmt" "strconv" "testing" @@ -429,7 +430,7 @@ func TestCallPermission(t *testing.T) { tx.Sign(testChainID, users[0]) // we need to subscribe to the Call event to detect the exception - _, exception := execTxWaitEvent(t, batchCommitter, tx, evm_events.EventStringAccCall(caller1ContractAddr)) // + _, exception := execTxWaitEvent(t, batchCommitter, tx, evm_events.EventStringAccountCall(caller1ContractAddr)) // if exception == "" { t.Fatal("Expected exception") } @@ -445,7 +446,7 @@ func TestCallPermission(t *testing.T) { tx.Sign(testChainID, users[0]) // we need to subscribe to the Call event to detect the exception - _, exception = execTxWaitEvent(t, batchCommitter, tx, evm_events.EventStringAccCall(caller1ContractAddr)) // + _, exception = execTxWaitEvent(t, batchCommitter, tx, evm_events.EventStringAccountCall(caller1ContractAddr)) // if exception != "" { t.Fatal("Unexpected exception:", exception) } @@ -475,7 +476,7 @@ func TestCallPermission(t *testing.T) { tx.Sign(testChainID, users[0]) // we need to subscribe to the Call event to detect the exception - _, exception = execTxWaitEvent(t, batchCommitter, tx, evm_events.EventStringAccCall(caller1ContractAddr)) // + _, exception = execTxWaitEvent(t, batchCommitter, tx, evm_events.EventStringAccountCall(caller1ContractAddr)) // if exception == "" { t.Fatal("Expected exception") } @@ -493,7 +494,7 @@ func TestCallPermission(t *testing.T) { tx.Sign(testChainID, users[0]) // we need to subscribe to the Call event to detect the exception - _, exception = execTxWaitEvent(t, batchCommitter, tx, evm_events.EventStringAccCall(caller1ContractAddr)) // + _, exception = execTxWaitEvent(t, batchCommitter, tx, evm_events.EventStringAccountCall(caller1ContractAddr)) // if exception != "" { t.Fatal("Unexpected exception", exception) } @@ -564,7 +565,7 @@ func TestCreatePermission(t *testing.T) { tx, _ = txs.NewCallTx(batchCommitter.blockCache, users[0].PublicKey(), &contractAddr, createCode, 100, 100, 100) tx.Sign(testChainID, users[0]) // we need to subscribe to the Call event to detect the exception - _, exception := execTxWaitEvent(t, batchCommitter, tx, evm_events.EventStringAccCall(contractAddr)) // + _, exception := execTxWaitEvent(t, batchCommitter, tx, evm_events.EventStringAccountCall(contractAddr)) // if exception == "" { t.Fatal("expected exception") } @@ -580,7 +581,7 @@ func TestCreatePermission(t *testing.T) { tx, _ = txs.NewCallTx(batchCommitter.blockCache, users[0].PublicKey(), &contractAddr, createCode, 100, 100, 100) tx.Sign(testChainID, users[0]) // we need to subscribe to the Call event to detect the exception - _, exception = execTxWaitEvent(t, batchCommitter, tx, evm_events.EventStringAccCall(contractAddr)) // + _, exception = execTxWaitEvent(t, batchCommitter, tx, evm_events.EventStringAccountCall(contractAddr)) // if exception != "" { t.Fatal("unexpected exception", exception) } @@ -606,7 +607,7 @@ func TestCreatePermission(t *testing.T) { tx, _ = txs.NewCallTx(batchCommitter.blockCache, users[0].PublicKey(), &contractAddr, createCode, 100, 10000, 100) tx.Sign(testChainID, users[0]) // we need to subscribe to the Call event to detect the exception - _, exception = execTxWaitEvent(t, batchCommitter, tx, evm_events.EventStringAccCall(acm.Address{})) // + _, exception = execTxWaitEvent(t, batchCommitter, tx, evm_events.EventStringAccountCall(acm.Address{})) // if exception != "" { t.Fatal("unexpected exception", exception) } @@ -860,7 +861,7 @@ func TestCreateAccountPermission(t *testing.T) { txCall.Sign(testChainID, users[0]) // we need to subscribe to the Call event to detect the exception - _, exception := execTxWaitEvent(t, batchCommitter, txCall, evm_events.EventStringAccCall(caller1ContractAddr)) // + _, exception := execTxWaitEvent(t, batchCommitter, txCall, evm_events.EventStringAccountCall(caller1ContractAddr)) // if exception == "" { t.Fatal("Expected exception") } @@ -875,7 +876,7 @@ func TestCreateAccountPermission(t *testing.T) { txCall.Sign(testChainID, users[0]) // we need to subscribe to the Call event to detect the exception - _, exception = execTxWaitEvent(t, batchCommitter, txCall, evm_events.EventStringAccCall(caller1ContractAddr)) // + _, exception = execTxWaitEvent(t, batchCommitter, txCall, evm_events.EventStringAccountCall(caller1ContractAddr)) // if exception != "" { t.Fatal("Unexpected exception", exception) } @@ -1104,38 +1105,35 @@ var ExceptionTimeOut = "timed out waiting for event" // run ExecTx and wait for the Call event on given addr // returns the msg data and an error/exception func execTxWaitEvent(t *testing.T, batchCommitter *executor, tx txs.Tx, eventid string) (interface{}, string) { - evsw := event.NewEmitter(logger) - ch := make(chan event.AnyEventData) - evsw.Subscribe("test", eventid, func(msg event.AnyEventData) { - ch <- msg - }) - evc := event.NewEventCache(evsw) + emitter := event.NewEmitter(logger) + ch := make(chan interface{}) + emitter.Subscribe(context.Background(), "test", event.QueryForEventID(eventid), ch) + evc := event.NewEventCache(emitter) batchCommitter.eventCache = evc go func() { if err := batchCommitter.Execute(tx); err != nil { - errStr := err.Error() - ch <- event.AnyEventData{Err: &errStr} + ch <- err.Error() } evc.Flush() }() ticker := time.NewTicker(5 * time.Second) - var msg event.AnyEventData + select { - case msg = <-ch: + case msg := <-ch: + switch ev := msg.(type) { + case *exe_events.EventDataTx: + return ev, ev.Exception + case *evm_events.EventDataCall: + return ev, ev.Exception + case string: + return nil, ev + default: + return ev, "" + } case <-ticker.C: return nil, ExceptionTimeOut } - switch ev := msg.Get().(type) { - case exe_events.EventDataTx: - return ev, ev.Exception - case evm_events.EventDataCall: - return ev, ev.Exception - case string: - return nil, ev - default: - return ev, "" - } } // give a contract perms for an snative, call it, it calls the snative, but shouldn't have permission @@ -1162,8 +1160,8 @@ func testSNativeCALL(t *testing.T, expectPass bool, batchCommitter *executor, do batchCommitter.blockCache.UpdateAccount(doug) tx, _ := txs.NewCallTx(batchCommitter.blockCache, users[0].PublicKey(), &dougAddress, data, 100, 10000, 100) tx.Sign(testChainID, users[0]) - fmt.Println("subscribing to", evm_events.EventStringAccCall(snativeAddress)) - ev, exception := execTxWaitEvent(t, batchCommitter, tx, evm_events.EventStringAccCall(snativeAddress)) + fmt.Println("subscribing to", evm_events.EventStringAccountCall(snativeAddress)) + ev, exception := execTxWaitEvent(t, batchCommitter, tx, evm_events.EventStringAccountCall(snativeAddress)) if exception == ExceptionTimeOut { t.Fatal("Timed out waiting for event") } @@ -1171,7 +1169,7 @@ func testSNativeCALL(t *testing.T, expectPass bool, batchCommitter *executor, do if exception != "" { t.Fatal("Unexpected exception", exception) } - evv := ev.(evm_events.EventDataCall) + evv := ev.(*evm_events.EventDataCall) ret := evv.Return if err := f(ret); err != nil { t.Fatal(err) diff --git a/execution/state_test.go b/execution/state_test.go index 3297a0d9..866defa0 100644 --- a/execution/state_test.go +++ b/execution/state_test.go @@ -42,7 +42,7 @@ var testGenesisDoc, testPrivAccounts = deterministicGenesis. var testChainID = testGenesisDoc.ChainID() func execTxWithStateAndBlockchain(state *State, tip bcm.Tip, tx txs.Tx) error { - exe := newExecutor(true, state, testChainID, tip, event.NewNoOpFireable(), logger) + exe := newExecutor(true, state, testChainID, tip, event.NewNoOpPublisher(), logger) if err := exe.Execute(tx); err != nil { return err } else { @@ -890,7 +890,7 @@ func TestSelfDestruct(t *testing.T) { tx.Input.Signature = acm.ChainSign(privAccounts[0], testChainID, tx) // we use cache instead of execTxWithState so we can run the tx twice - exe := NewBatchCommitter(state, testChainID, bcm.NewBlockchain(testGenesisDoc), event.NewNoOpFireable(), logger) + exe := NewBatchCommitter(state, testChainID, bcm.NewBlockchain(testGenesisDoc), event.NewNoOpPublisher(), logger) if err := exe.Execute(tx); err != nil { t.Errorf("Got error in executing call transaction, %v", err) } diff --git a/execution/transactor.go b/execution/transactor.go index 90df6728..2fc71f0a 100644 --- a/execution/transactor.go +++ b/execution/transactor.go @@ -15,16 +15,15 @@ package execution import ( - "bytes" + "context" "fmt" "sync" "time" - "reflect" - acm "github.com/hyperledger/burrow/account" "github.com/hyperledger/burrow/binary" "github.com/hyperledger/burrow/blockchain" + "github.com/hyperledger/burrow/consensus/tendermint/codes" "github.com/hyperledger/burrow/event" exe_events "github.com/hyperledger/burrow/execution/events" "github.com/hyperledger/burrow/execution/evm" @@ -37,9 +36,11 @@ import ( "github.com/tendermint/go-wire" ) +const BlockingTimeoutSeconds = 30 + type Call struct { - Return []byte `json:"return"` - GasUsed uint64 `json:"gas_used"` + Return []byte + GasUsed uint64 } type Transactor interface { @@ -102,7 +103,7 @@ func (trans *transactor) Call(fromAddress, toAddress acm.Address, data []byte) ( vmach := evm.NewVM(txCache, evm.DefaultDynamicMemoryProvider, params, caller.Address(), nil, logging.WithScope(trans.logger, "Call")) - vmach.SetFireable(trans.eventEmitter) + vmach.SetPublisher(trans.eventEmitter) gas := params.GasLimit ret, err := vmach.Call(caller, callee, callee.Code(), data, 0, &gas) @@ -154,7 +155,7 @@ func (trans *transactor) BroadcastTx(tx txs.Tx) (*txs.Receipt, error) { } switch checkTxResponse.Code { - case abci_types.CodeType_OK: + case codes.TxExecutionSuccessCode: receipt := new(txs.Receipt) err := wire.ReadBinaryBytes(checkTxResponse.Data, receipt) if err != nil { @@ -170,6 +171,7 @@ func (trans *transactor) BroadcastTx(tx txs.Tx) (*txs.Receipt, error) { // Orders calls to BroadcastTx using lock (waits for response from core before releasing) func (trans *transactor) Transact(privKey []byte, address acm.Address, data []byte, gasLimit, fee uint64) (*txs.Receipt, error) { + if len(privKey) != 64 { return nil, fmt.Errorf("Private key is not of the right length: %d\n", len(privKey)) } @@ -223,63 +225,46 @@ func (trans *transactor) Transact(privKey []byte, address acm.Address, data []by func (trans *transactor) TransactAndHold(privKey []byte, address acm.Address, data []byte, gasLimit, fee uint64) (*evm_events.EventDataCall, error) { - rec, tErr := trans.Transact(privKey, address, data, gasLimit, fee) - if tErr != nil { - return nil, tErr + + receipt, err := trans.Transact(privKey, address, data, gasLimit, fee) + if err != nil { + return nil, err } var addr acm.Address - if rec.CreatesContract { - addr = rec.ContractAddr + if receipt.CreatesContract { + addr = receipt.ContractAddr } else { addr = address } // We want non-blocking on the first event received (but buffer the value), // after which we want to block (and then discard the value - see below) wc := make(chan *evm_events.EventDataCall, 1) - subId := fmt.Sprintf("%X", rec.TxHash) - trans.eventEmitter.Subscribe(subId, evm_events.EventStringAccCall(addr), - func(eventData event.AnyEventData) { - eventDataCall := eventData.EventDataCall() - if eventDataCall == nil { - trans.logger.Info("error", "cold not be convert event data to EventDataCall", - structure.ScopeKey, "TransactAndHold", - "sub_id", subId, - "event_data_type", reflect.TypeOf(eventData.Get()).String()) - return - } - if bytes.Equal(eventDataCall.TxID, rec.TxHash) { - // Beware the contract of go-events subscribe is that we must not be - // blocking in an event callback when we try to unsubscribe! - // We work around this by using a non-blocking send. - select { - // This is a non-blocking send, but since we are using a buffered - // channel of size 1 we will always grab our first event even if we - // haven't read from the channel at the time we receive the first event. - case wc <- eventDataCall: - default: - } - } - }) - - timer := time.NewTimer(300 * time.Second) - toChan := timer.C - - var ret *evm_events.EventDataCall - var rErr error + + subID, err := event.GenerateSubscriptionID() + if err != nil { + return nil, err + } + + err = evm_events.SubscribeAccountCall(context.Background(), trans.eventEmitter, subID, addr, receipt.TxHash, wc) + if err != nil { + return nil, err + } + // Will clean up callback goroutine and subscription in pubsub + defer trans.eventEmitter.UnsubscribeAll(context.Background(), subID) + + timer := time.NewTimer(BlockingTimeoutSeconds * time.Second) + defer timer.Stop() select { - case <-toChan: - rErr = fmt.Errorf("Transaction timed out. Hash: " + subId) - case e := <-wc: - timer.Stop() - if e.Exception != "" { - rErr = fmt.Errorf("error when transacting: " + e.Exception) + case <-timer.C: + return nil, fmt.Errorf("transaction timed out TxHash: %X", receipt.TxHash) + case eventDataCall := <-wc: + if eventDataCall.Exception != "" { + return nil, fmt.Errorf("error when transacting: " + eventDataCall.Exception) } else { - ret = e + return eventDataCall, nil } } - trans.eventEmitter.Unsubscribe(subId) - return ret, rErr } func (trans *transactor) Send(privKey []byte, toAddress acm.Address, amount uint64) (*txs.Receipt, error) { @@ -330,57 +315,44 @@ func (trans *transactor) Send(privKey []byte, toAddress acm.Address, amount uint } func (trans *transactor) SendAndHold(privKey []byte, toAddress acm.Address, amount uint64) (*txs.Receipt, error) { - rec, tErr := trans.Send(privKey, toAddress, amount) - if tErr != nil { - return nil, tErr + receipt, err := trans.Send(privKey, toAddress, amount) + if err != nil { + return nil, err } wc := make(chan *txs.SendTx) - subId := fmt.Sprintf("%X", rec.TxHash) - - trans.eventEmitter.Subscribe(subId, exe_events.EventStringAccOutput(toAddress), - func(eventData event.AnyEventData) { - eventDataTx, ok := eventData.Get().(exe_events.EventDataTx) - if !ok { - trans.logger.Info("error", "cold not be convert event data to EventDataCall", - structure.ScopeKey, "SendAndHold", - "tx_hash", subId, - "event_data_type", reflect.TypeOf(eventData.Get()).String()) - return - } - tx, ok := eventDataTx.Tx.(*txs.SendTx) - if !ok { - trans.logger.Info("error", "EventDataTx was expected to contain SendTx", - structure.ScopeKey, "SendAndHold", - "sub_id", subId, - "tx_type", reflect.TypeOf(eventDataTx.Tx).String()) - return - } - - wc <- tx - }) - - timer := time.NewTimer(300 * time.Second) - toChan := timer.C - - var rErr error + + subID, err := event.GenerateSubscriptionID() + if err != nil { + return nil, err + } + + err = exe_events.SubscribeAccountOutputSendTx(context.Background(), trans.eventEmitter, subID, toAddress, + receipt.TxHash, wc) + if err != nil { + return nil, err + } + defer trans.eventEmitter.UnsubscribeAll(context.Background(), subID) + + timer := time.NewTimer(BlockingTimeoutSeconds * time.Second) + defer timer.Stop() pa, err := acm.GeneratePrivateAccountFromPrivateKeyBytes(privKey) - if tErr != nil { + if err != nil { return nil, err } select { - case <-toChan: - rErr = fmt.Errorf("Transaction timed out. Hash: " + subId) - case e := <-wc: - if e.Inputs[0].Address == pa.Address() && e.Inputs[0].Amount == amount { - timer.Stop() - trans.eventEmitter.Unsubscribe(subId) - return rec, rErr + case <-timer.C: + return nil, fmt.Errorf("transaction timed out TxHash: %X", receipt.TxHash) + case sendTx := <-wc: + // This is a double check - we subscribed to this tx's hash so something has gone wrong if the amounts don't match + if sendTx.Inputs[0].Address == pa.Address() && sendTx.Inputs[0].Amount == amount { + return receipt, nil } + return nil, fmt.Errorf("received SendTx but hash doesn't seem to match what we subscribed to, "+ + "received SendTx: %v which does not match receipt on sending: %v", sendTx, receipt) } - return nil, rErr } func (trans *transactor) TransactNameReg(privKey []byte, name, data string, amount, fee uint64) (*txs.Receipt, error) { diff --git a/rpc/result.go b/rpc/result.go index 01da8495..a26f214a 100644 --- a/rpc/result.go +++ b/rpc/result.go @@ -15,9 +15,13 @@ package rpc import ( + "encoding/json" + "fmt" + acm "github.com/hyperledger/burrow/account" - "github.com/hyperledger/burrow/event" "github.com/hyperledger/burrow/execution" + exe_events "github.com/hyperledger/burrow/execution/events" + evm_events "github.com/hyperledger/burrow/execution/evm/events" "github.com/hyperledger/burrow/genesis" "github.com/hyperledger/burrow/txs" ctypes "github.com/tendermint/tendermint/consensus/types" @@ -26,134 +30,187 @@ import ( ) type ResultGetStorage struct { - Key []byte `json:"key"` - Value []byte `json:"value"` + Key []byte + Value []byte } type ResultCall struct { - *execution.Call `json:"unwrap"` + execution.Call +} + +func (rc ResultCall) MarshalJSON() ([]byte, error) { + return json.Marshal(rc.Call) +} + +func (rc *ResultCall) UnmarshalJSON(data []byte) (err error) { + return json.Unmarshal(data, &rc.Call) } type ResultListAccounts struct { - BlockHeight uint64 `json:"block_height"` - Accounts []*acm.ConcreteAccount `json:"accounts"` + BlockHeight uint64 + Accounts []*acm.ConcreteAccount } type ResultDumpStorage struct { - StorageRoot []byte `json:"storage_root"` - StorageItems []StorageItem `json:"storage_items"` + StorageRoot []byte + StorageItems []StorageItem } type StorageItem struct { - Key []byte `json:"key"` - Value []byte `json:"value"` + Key []byte + Value []byte } type ResultListBlocks struct { - LastHeight uint64 `json:"last_height"` - BlockMetas []*tm_types.BlockMeta `json:"block_metas"` + LastHeight uint64 + BlockMetas []*tm_types.BlockMeta } type ResultGetBlock struct { - BlockMeta *tm_types.BlockMeta `json:"block_meta"` - Block *tm_types.Block `json:"block"` + BlockMeta *tm_types.BlockMeta + Block *tm_types.Block } type ResultStatus struct { - NodeInfo *p2p.NodeInfo `json:"node_info"` - GenesisHash []byte `json:"genesis_hash"` - PubKey acm.PublicKey `json:"pub_key"` - LatestBlockHash []byte `json:"latest_block_hash"` - LatestBlockHeight uint64 `json:"latest_block_height"` - LatestBlockTime int64 `json:"latest_block_time"` // nano - NodeVersion string `json:"node_version"` // nano + NodeInfo *p2p.NodeInfo + GenesisHash []byte + PubKey acm.PublicKey + LatestBlockHash []byte + LatestBlockHeight uint64 + LatestBlockTime int64 + NodeVersion string } type ResultChainId struct { - ChainName string `json:"chain_name"` - ChainId string `json:"chain_id"` - GenesisHash []byte `json:"genesis_hash"` + ChainName string + ChainId string + GenesisHash []byte } type ResultSubscribe struct { - EventID string `json:"event"` - SubscriptionID string `json:"subscription_id"` + EventID string + SubscriptionID string } type ResultUnsubscribe struct { - SubscriptionID string `json:"subscription_id"` + SubscriptionID string } type Peer struct { - NodeInfo *p2p.NodeInfo `json:"node_info"` - IsOutbound bool `json:"is_outbound"` + NodeInfo *p2p.NodeInfo + IsOutbound bool } type ResultNetInfo struct { - Listening bool `json:"listening"` - Listeners []string `json:"listeners"` - Peers []*Peer `json:"peers"` + Listening bool + Listeners []string + Peers []*Peer } type ResultListValidators struct { - BlockHeight uint64 `json:"block_height"` - BondedValidators []*acm.ConcreteValidator `json:"bonded_validators"` - UnbondingValidators []*acm.ConcreteValidator `json:"unbonding_validators"` + BlockHeight uint64 + BondedValidators []*acm.ConcreteValidator + UnbondingValidators []*acm.ConcreteValidator } type ResultDumpConsensusState struct { - RoundState *ctypes.RoundState `json:"consensus_state"` - PeerRoundStates []*ctypes.PeerRoundState `json:"peer_round_states"` + RoundState *ctypes.RoundState + PeerRoundStates []*ctypes.PeerRoundState } type ResultPeers struct { - Peers []*Peer `json:"peers"` + Peers []*Peer } type ResultListNames struct { - BlockHeight uint64 `json:"block_height"` - Names []*execution.NameRegEntry `json:"names"` + BlockHeight uint64 + Names []*execution.NameRegEntry } type ResultGeneratePrivateAccount struct { - PrivAccount *acm.ConcretePrivateAccount `json:"priv_account"` + PrivateAccount *acm.ConcretePrivateAccount } type ResultGetAccount struct { - Account *acm.ConcreteAccount `json:"account"` + Account *acm.ConcreteAccount } type ResultBroadcastTx struct { - *txs.Receipt `json:"unwrap"` + txs.Receipt +} + +// Unwrap + +func (rbt ResultBroadcastTx) MarshalJSON() ([]byte, error) { + return json.Marshal(rbt.Receipt) +} + +func (rbt ResultBroadcastTx) UnmarshalJSON(data []byte) (err error) { + return json.Unmarshal(data, &rbt.Receipt) } type ResultListUnconfirmedTxs struct { - N int `json:"n_txs"` - Txs []txs.Wrapper `json:"txs"` + NumTxs int + Txs []txs.Wrapper } type ResultGetName struct { - Entry *execution.NameRegEntry `json:"entry"` + Entry *execution.NameRegEntry } type ResultGenesis struct { - Genesis genesis.GenesisDoc `json:"genesis"` + Genesis genesis.GenesisDoc } type ResultSignTx struct { - Tx txs.Wrapper `json:"tx"` + Tx txs.Wrapper } type ResultEvent struct { - Event string `json:"event"` - event.AnyEventData `json:"data"` -} - -// Type byte helper -var nextByte byte = 1 - -func biota() (b byte) { - b = nextByte - nextByte++ - return + Event string + // TODO: move ResultEvent sum type here + TMEventData *tm_types.TMEventData `json:",omitempty"` + EventDataTx *exe_events.EventDataTx `json:",omitempty"` + EventDataCall *evm_events.EventDataCall `json:",omitempty"` + EventDataLog *evm_events.EventDataLog `json:",omitempty"` +} + +func (resultEvent ResultEvent) EventDataNewBlock() *tm_types.EventDataNewBlock { + if resultEvent.TMEventData != nil { + eventData, _ := resultEvent.TMEventData.Unwrap().(tm_types.EventDataNewBlock) + return &eventData + } + return nil +} + +// Map any supported event data element to our ResultEvent sum type +func NewResultEvent(event string, eventData interface{}) (*ResultEvent, error) { + switch ed := eventData.(type) { + case tm_types.TMEventData: + return &ResultEvent{ + Event: event, + TMEventData: &ed, + }, nil + + case exe_events.EventDataTx: + return &ResultEvent{ + Event: event, + EventDataTx: &ed, + }, nil + + case evm_events.EventDataCall: + return &ResultEvent{ + Event: event, + EventDataCall: &ed, + }, nil + + case evm_events.EventDataLog: + return &ResultEvent{ + Event: event, + EventDataLog: &ed, + }, nil + + default: + return nil, fmt.Errorf("could not map event data of type %T to ResultEvent", eventData) + } } diff --git a/rpc/result_test.go b/rpc/result_test.go index 5ddc6049..11dc2b8f 100644 --- a/rpc/result_test.go +++ b/rpc/result_test.go @@ -15,29 +15,30 @@ package rpc import ( - "testing" - "encoding/json" + "testing" acm "github.com/hyperledger/burrow/account" + "github.com/hyperledger/burrow/execution" "github.com/hyperledger/burrow/txs" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/tendermint/go-wire" + tm_types "github.com/tendermint/tendermint/types" ) func TestResultBroadcastTx(t *testing.T) { // Make sure these are unpacked as expected res := ResultBroadcastTx{ - Receipt: &txs.Receipt{ + Receipt: txs.Receipt{ ContractAddr: acm.Address{0, 2, 3}, CreatesContract: true, TxHash: []byte("foo"), }, } - assert.Equal(t, `{"tx_hash":"666F6F","creates_contract":true,"contract_addr":"0002030000000000000000000000000000000000"}`, - string(wire.JSONBytes(res))) + js := string(wire.JSONBytes(res)) + assert.Equal(t, `{"Receipt":{"TxHash":"666F6F","CreatesContract":true,"ContractAddr":"0002030000000000000000000000000000000000"}}`, js) res2 := new(ResultBroadcastTx) wire.ReadBinaryBytes(wire.BinaryBytes(res), res2) @@ -46,7 +47,7 @@ func TestResultBroadcastTx(t *testing.T) { func TestListUnconfirmedTxs(t *testing.T) { res := &ResultListUnconfirmedTxs{ - N: 3, + NumTxs: 3, Txs: []txs.Wrapper{ txs.Wrap(&txs.CallTx{ Address: &acm.Address{1}, @@ -55,7 +56,7 @@ func TestListUnconfirmedTxs(t *testing.T) { } bs, err := json.Marshal(res) require.NoError(t, err) - assert.Equal(t, `{"n_txs":3,"txs":[{"type":"call_tx","data":{"input":null,"address":"0100000000000000000000000000000000000000","gas_limit":0,"fee":0,"data":null}}]}`, + assert.Equal(t, `{"NumTxs":3,"Txs":[{"type":"call_tx","data":{"Input":null,"Address":"0100000000000000000000000000000000000000","GasLimit":0,"Fee":0,"Data":null}}]}`, string(bs)) } @@ -75,3 +76,44 @@ func TestResultListAccounts(t *testing.T) { require.NoError(t, err) assert.Equal(t, string(bs), string(bsOut)) } + +func TestResultCall_MarshalJSON(t *testing.T) { + res := ResultCall{ + Call: execution.Call{ + Return: []byte("hi"), + GasUsed: 1, + }, + } + bs, err := json.Marshal(res) + require.NoError(t, err) + + resOut := new(ResultCall) + json.Unmarshal(bs, resOut) + bsOut, err := json.Marshal(resOut) + require.NoError(t, err) + assert.Equal(t, string(bs), string(bsOut)) +} + +func TestResultEvent(t *testing.T) { + eventDataNewBlock := tm_types.EventDataNewBlock{ + Block: &tm_types.Block{ + Header: &tm_types.Header{ + ChainID: "chainy", + NumTxs: 30, + }, + }, + } + res := ResultEvent{ + TMEventData: &tm_types.TMEventData{ + TMEventDataInner: eventDataNewBlock, + }, + } + bs, err := json.Marshal(res) + require.NoError(t, err) + + resOut := new(ResultEvent) + json.Unmarshal(bs, resOut) + bsOut, err := json.Marshal(resOut) + require.NoError(t, err) + assert.Equal(t, string(bs), string(bsOut)) +} diff --git a/rpc/service.go b/rpc/service.go index bab21407..cdf139c7 100644 --- a/rpc/service.go +++ b/rpc/service.go @@ -15,6 +15,7 @@ package rpc import ( + "context" "fmt" acm "github.com/hyperledger/burrow/account" @@ -35,15 +36,19 @@ import ( // end up DoSing ourselves. const MaxBlockLookback = 100 +type SubscribableService interface { + // Events + Subscribe(ctx context.Context, subscriptionID string, eventID string, callback func(*ResultEvent)) error + Unsubscribe(ctx context.Context, subscriptionID string) error +} + // Base service that provides implementation for all underlying RPC methods type Service interface { + SubscribableService // Transact Transactor() execution.Transactor // List mempool transactions pass -1 for all unconfirmed transactions ListUnconfirmedTxs(maxTxs int) (*ResultListUnconfirmedTxs, error) - // Events - Subscribe(subscriptionId, eventId string, callback func(eventData event.AnyEventData)) error - Unsubscribe(subscriptionId string) error // Status Status() (*ResultStatus, error) NetInfo() (*ResultNetInfo, error) @@ -69,8 +74,9 @@ type Service interface { } type service struct { + ctx context.Context state acm.StateIterable - eventEmitter event.Emitter + subscribable event.Subscribable nameReg execution.NameRegIterable blockchain bcm.Blockchain transactor execution.Transactor @@ -79,16 +85,16 @@ type service struct { } var _ Service = &service{} -var _ event.Subscribable = Service(nil) -func NewService(state acm.StateIterable, nameReg execution.NameRegIterable, eventEmitter event.Emitter, - blockchain bcm.Blockchain, transactor execution.Transactor, nodeView query.NodeView, - logger logging_types.InfoTraceLogger) *service { +func NewService(ctx context.Context, state acm.StateIterable, nameReg execution.NameRegIterable, + subscribable event.Subscribable, blockchain bcm.Blockchain, transactor execution.Transactor, + nodeView query.NodeView, logger logging_types.InfoTraceLogger) *service { return &service{ + ctx: ctx, state: state, nameReg: nameReg, - eventEmitter: eventEmitter, + subscribable: subscribable, blockchain: blockchain, transactor: transactor, nodeView: nodeView, @@ -96,6 +102,15 @@ func NewService(state acm.StateIterable, nameReg execution.NameRegIterable, even } } +// Provides a sub-service with only the subscriptions methods +func NewSubscribableService(subscribable event.Subscribable, logger logging_types.InfoTraceLogger) *service { + return &service{ + ctx: context.Background(), + subscribable: subscribable, + logger: logger.With(structure.ComponentKey, "Service"), + } +} + // Transacting... func (s *service) Transactor() execution.Transactor { @@ -113,21 +128,39 @@ func (s *service) ListUnconfirmedTxs(maxTxs int) (*ResultListUnconfirmedTxs, err wrappedTxs[i] = txs.Wrap(tx) } return &ResultListUnconfirmedTxs{ - N: len(transactions), - Txs: wrappedTxs, + NumTxs: len(transactions), + Txs: wrappedTxs, }, nil } -// All methods in this file return (Result*, error) which is the return -// signature assumed by go-rpc -func (s *service) Subscribe(subscriptionId, eventId string, callback func(event.AnyEventData)) error { - logging.InfoMsg(s.logger, "Subscribing to event", - "eventId", eventId, "subscriptionId", subscriptionId) - return s.eventEmitter.Subscribe(subscriptionId, eventId, callback) +func (s *service) Subscribe(ctx context.Context, subscriptionID string, eventID string, + callback func(resultEvent *ResultEvent)) error { + + queryBuilder := event.QueryForEventID(eventID) + logging.InfoMsg(s.logger, "Subscribing to events", + "query", queryBuilder.String(), + "subscription_id", subscriptionID) + return event.SubscribeCallback(ctx, s.subscribable, subscriptionID, queryBuilder, + func(message interface{}) { + resultEvent, err := NewResultEvent(eventID, message) + if err != nil { + logging.InfoMsg(s.logger, "Received event that could not be mapped to ResultEvent", + structure.ErrorKey, err, + "event_id", eventID) + return + } + callback(resultEvent) + }) } -func (s *service) Unsubscribe(subscriptionId string) error { - return s.eventEmitter.Unsubscribe(subscriptionId) +func (s *service) Unsubscribe(ctx context.Context, subscriptionID string) error { + logging.InfoMsg(s.logger, "Unsubscribing from events", + "subscription_id", subscriptionID) + err := s.subscribable.UnsubscribeAll(ctx, subscriptionID) + if err != nil { + return fmt.Errorf("error unsubscribing from event with subscriptionID '%s': %v", subscriptionID, err) + } + return nil } func (s *service) Status() (*ResultStatus, error) { @@ -139,7 +172,7 @@ func (s *service) Status() (*ResultStatus, error) { latestBlockTime int64 ) if latestHeight != 0 { - latestBlockMeta = s.nodeView.BlockStore().LoadBlockMeta(int(latestHeight)) + latestBlockMeta = s.nodeView.BlockStore().LoadBlockMeta(int64(latestHeight)) latestBlockHash = latestBlockMeta.Header.Hash() latestBlockTime = latestBlockMeta.Header.Time.UnixNano() } @@ -289,8 +322,8 @@ func (s *service) ListNames(predicate func(*execution.NameRegEntry) bool) (*Resu func (s *service) GetBlock(height uint64) (*ResultGetBlock, error) { return &ResultGetBlock{ - Block: s.nodeView.BlockStore().LoadBlock(int(height)), - BlockMeta: s.nodeView.BlockStore().LoadBlockMeta(int(height)), + Block: s.nodeView.BlockStore().LoadBlock(int64(height)), + BlockMeta: s.nodeView.BlockStore().LoadBlockMeta(int64(height)), }, nil } @@ -314,7 +347,7 @@ func (s *service) ListBlocks(minHeight, maxHeight uint64) (*ResultListBlocks, er var blockMetas []*tm_types.BlockMeta for height := maxHeight; height >= minHeight; height-- { - blockMeta := s.nodeView.BlockStore().LoadBlockMeta(int(height)) + blockMeta := s.nodeView.BlockStore().LoadBlockMeta(int64(height)) blockMetas = append(blockMetas, blockMeta) } @@ -356,6 +389,6 @@ func (s *service) GeneratePrivateAccount() (*ResultGeneratePrivateAccount, error return nil, err } return &ResultGeneratePrivateAccount{ - PrivAccount: acm.AsConcretePrivateAccount(privateAccount), + PrivateAccount: acm.AsConcretePrivateAccount(privateAccount), }, nil } diff --git a/rpc/tm/client/client.go b/rpc/tm/client/client.go index baadabee..fffdeb72 100644 --- a/rpc/tm/client/client.go +++ b/rpc/tm/client/client.go @@ -30,12 +30,12 @@ type RPCClient interface { } func BroadcastTx(client RPCClient, tx txs.Tx) (*txs.Receipt, error) { - res := new(rpc.ResultBroadcastTx) + res := new(txs.Receipt) _, err := client.Call(tm.BroadcastTx, pmap("tx", txs.Wrap(tx)), res) if err != nil { return nil, err } - return res.Receipt, nil + return res, nil } func Status(client RPCClient) (*rpc.ResultStatus, error) { diff --git a/rpc/tm/integration/client_test.go b/rpc/tm/integration/client_test.go index 6eb3ab31..561eca76 100644 --- a/rpc/tm/integration/client_test.go +++ b/rpc/tm/integration/client_test.go @@ -23,14 +23,13 @@ import ( "time" "github.com/hyperledger/burrow/binary" - "github.com/hyperledger/burrow/event" exe_events "github.com/hyperledger/burrow/execution/events" + "github.com/hyperledger/burrow/rpc" tm_client "github.com/hyperledger/burrow/rpc/tm/client" "github.com/hyperledger/burrow/txs" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/tendermint/tendermint/consensus/types" - tm_types "github.com/tendermint/tendermint/types" "golang.org/x/crypto/ripemd160" ) @@ -57,15 +56,9 @@ func TestBroadcastTx(t *testing.T) { toAddr := privateAccounts[1].Address() tx := makeDefaultSendTxSigned(t, client, toAddr, amt) receipt, err := broadcastTxAndWaitForBlock(t, client, wsc, tx) - if err != nil { - t.Fatal(err) - } - if receipt.CreatesContract { - t.Fatal("This tx should not create a contract") - } - if len(receipt.TxHash) == 0 { - t.Fatal("Failed to compute tx hash") - } + require.NoError(t, err) + assert.False(t, receipt.CreatesContract, "This tx should not create a contract") + assert.NotEmpty(t, receipt.TxHash, "Failed to compute tx hash") n, errp := new(int), new(error) buf := new(bytes.Buffer) hasher := ripemd160.New() @@ -105,12 +98,6 @@ func TestGetStorage(t *testing.T) { wsc := newWSClient() defer stopWSClient(wsc) testWithAllClients(t, func(t *testing.T, clientName string, client tm_client.RPCClient) { - eid := tm_types.EventStringNewBlock() - subscribe(t, wsc, eid) - defer func() { - unsubscribe(t, wsc, eid) - }() - amt, gasLim, fee := uint64(1100), uint64(1000), uint64(1000) code := []byte{0x60, 0x5, 0x60, 0x1, 0x55} // Call with nil address will create a contract @@ -165,12 +152,6 @@ func TestCallContract(t *testing.T) { wsc := newWSClient() defer stopWSClient(wsc) testWithAllClients(t, func(t *testing.T, clientName string, client tm_client.RPCClient) { - eid := tm_types.EventStringNewBlock() - subscribe(t, wsc, eid) - defer func() { - unsubscribe(t, wsc, eid) - }() - // create the contract amt, gasLim, fee := uint64(6969), uint64(1000), uint64(1000) code, _, _ := simpleContract() @@ -218,9 +199,10 @@ func TestNameReg(t *testing.T) { func() { broadcastTxAndWaitForBlock(t, client, wsc, tx) }, - func(eid string, eventData event.AnyEventData) (bool, error) { - eventDataTx := eventData.EventDataTx() - assert.NotNil(t, eventDataTx, "could not convert %s to EventDataTx", eventData) + func(eventID string, resultEvent *rpc.ResultEvent) (bool, error) { + + eventDataTx := resultEvent.EventDataTx + assert.NotNil(t, eventDataTx, "could not convert %s to EventDataTx", resultEvent) tx, ok := eventDataTx.Tx.(*txs.NameTx) if !ok { t.Fatalf("Could not convert %v to *NameTx", eventDataTx) @@ -350,7 +332,7 @@ func TestListUnconfirmedTxs(t *testing.T) { } require.NoError(t, err) - if resp.N > 0 { + if resp.NumTxs > 0 { txChan <- resp.Txs } } @@ -376,8 +358,8 @@ func TestGetBlock(t *testing.T) { waitNBlocks(t, wsc, 3) resp, err := tm_client.GetBlock(client, 2) assert.NoError(t, err) - assert.Equal(t, 2, resp.Block.Height) - assert.Equal(t, 2, resp.BlockMeta.Header.Height) + assert.Equal(t, int64(2), resp.Block.Height) + assert.Equal(t, int64(2), resp.BlockMeta.Header.Height) }) } diff --git a/rpc/tm/integration/shared.go b/rpc/tm/integration/shared.go index f97d42e1..045debe8 100644 --- a/rpc/tm/integration/shared.go +++ b/rpc/tm/integration/shared.go @@ -35,6 +35,7 @@ import ( "github.com/hyperledger/burrow/core" "github.com/hyperledger/burrow/execution" "github.com/hyperledger/burrow/genesis" + "github.com/hyperledger/burrow/logging/lifecycle" "github.com/hyperledger/burrow/logging/loggers" "github.com/hyperledger/burrow/permission" "github.com/hyperledger/burrow/rpc" @@ -53,6 +54,9 @@ const ( testDir = "./test_scratch/tm_test" ) +// Enable logger output during tests +var debugLogging = false + // global variables for use across all tests var ( privateAccounts = makePrivateAccounts(5) // make keys @@ -75,13 +79,14 @@ func TestWrapper(runner func() int) int { os.Chdir(testDir) tmConf := tm_config.DefaultConfig() - // Uncomment for logs - //logger, _ := lifecycle.NewStdErrLogger() logger := loggers.NewNoopInfoTraceLogger() + if debugLogging { + logger, _ = lifecycle.NewStdErrLogger() + } privValidator := validator.NewPrivValidatorMemory(privateAccounts[0], privateAccounts[0]) genesisDoc = testGenesisDoc() - kernel, err := core.NewKernel(privValidator, genesisDoc, tmConf, rpc.DefaultRPCConfig(), logger) + kernel, err := core.NewKernel(context.Background(), privValidator, genesisDoc, tmConf, rpc.DefaultRPCConfig(), logger) if err != nil { panic(err) } diff --git a/rpc/tm/integration/websocket_client_test.go b/rpc/tm/integration/websocket_client_test.go index ba86dfc1..deb310ab 100644 --- a/rpc/tm/integration/websocket_client_test.go +++ b/rpc/tm/integration/websocket_client_test.go @@ -24,7 +24,6 @@ import ( "time" acm "github.com/hyperledger/burrow/account" - "github.com/hyperledger/burrow/event" exe_events "github.com/hyperledger/burrow/execution/events" evm_events "github.com/hyperledger/burrow/execution/evm/events" "github.com/hyperledger/burrow/rpc" @@ -47,15 +46,15 @@ func TestWSConnect(t *testing.T) { // receive a new block message func TestWSNewBlock(t *testing.T) { wsc := newWSClient() - eid := tm_types.EventStringNewBlock() + eid := tm_types.EventNewBlock subId := subscribeAndGetSubscriptionId(t, wsc, eid) defer func() { unsubscribe(t, wsc, subId) stopWSClient(wsc) }() waitForEvent(t, wsc, eid, func() {}, - func(eid string, eventData event.AnyEventData) (bool, error) { - fmt.Println("Check: ", eventData.EventDataNewBlock().Block) + func(eventID string, resultEvent *rpc.ResultEvent) (bool, error) { + fmt.Println("Check: ", resultEvent.EventDataNewBlock().Block) return true, nil }) } @@ -66,20 +65,20 @@ func TestWSBlockchainGrowth(t *testing.T) { t.Skip("skipping test in short mode.") } wsc := newWSClient() - eid := tm_types.EventStringNewBlock() + eid := tm_types.EventNewBlock subId := subscribeAndGetSubscriptionId(t, wsc, eid) defer func() { unsubscribe(t, wsc, subId) stopWSClient(wsc) }() // listen for NewBlock, ensure height increases by 1 - var initBlockN int - for i := 0; i < 2; i++ { + var initBlockN int64 + for i := int64(0); i < 2; i++ { waitForEvent(t, wsc, eid, func() {}, - func(eid string, eventData event.AnyEventData) (bool, error) { - eventDataNewBlock := eventData.EventDataNewBlock() + func(eventID string, resultEvent *rpc.ResultEvent) (bool, error) { + eventDataNewBlock := resultEvent.EventDataNewBlock() if eventDataNewBlock == nil { - t.Fatalf("Was expecting EventDataNewBlock but got %v", eventData) + t.Fatalf("Was expecting EventDataNewBlock but got %v", resultEvent) } block := eventDataNewBlock.Block if i == 0 { @@ -90,7 +89,6 @@ func TestWSBlockchainGrowth(t *testing.T) { block.Header.Height) } } - return true, nil }) } @@ -101,8 +99,8 @@ func TestWSSend(t *testing.T) { wsc := newWSClient() toAddr := privateAccounts[1].Address() amt := uint64(100) - eidInput := exe_events.EventStringAccInput(privateAccounts[0].Address()) - eidOutput := exe_events.EventStringAccOutput(toAddr) + eidInput := exe_events.EventStringAccountInput(privateAccounts[0].Address()) + eidOutput := exe_events.EventStringAccountOutput(toAddr) subIdInput := subscribeAndGetSubscriptionId(t, wsc, eidInput) subIdOutput := subscribeAndGetSubscriptionId(t, wsc, eidOutput) defer func() { @@ -125,7 +123,7 @@ func TestWSDoubleFire(t *testing.T) { t.Skip("skipping test in short mode.") } wsc := newWSClient() - eid := exe_events.EventStringAccInput(privateAccounts[0].Address()) + eid := exe_events.EventStringAccountInput(privateAccounts[0].Address()) subId := subscribeAndGetSubscriptionId(t, wsc, eid) defer func() { unsubscribe(t, wsc, subId) @@ -137,13 +135,13 @@ func TestWSDoubleFire(t *testing.T) { waitForEvent(t, wsc, eid, func() { tx := makeDefaultSendTxSigned(t, jsonRpcClient, toAddr, amt) broadcastTx(t, jsonRpcClient, tx) - }, func(eid string, b event.AnyEventData) (bool, error) { + }, func(eventID string, resultEvent *rpc.ResultEvent) (bool, error) { return true, nil }) // but make sure we don't hear about it twice err := waitForEvent(t, wsc, eid, func() {}, - func(eid string, b event.AnyEventData) (bool, error) { + func(eventID string, resultEvent *rpc.ResultEvent) (bool, error) { return false, nil }) assert.True(t, err.Timeout(), "We should have timed out waiting for second"+ @@ -156,7 +154,7 @@ func TestWSCallWait(t *testing.T) { t.Skip("skipping test in short mode.") } wsc := newWSClient() - eid1 := exe_events.EventStringAccInput(privateAccounts[0].Address()) + eid1 := exe_events.EventStringAccountInput(privateAccounts[0].Address()) subId1 := subscribeAndGetSubscriptionId(t, wsc, eid1) defer func() { unsubscribe(t, wsc, subId1) @@ -174,7 +172,7 @@ func TestWSCallWait(t *testing.T) { // susbscribe to the new contract amt = uint64(10001) - eid2 := exe_events.EventStringAccOutput(contractAddr) + eid2 := exe_events.EventStringAccountOutput(contractAddr) subId2 := subscribeAndGetSubscriptionId(t, wsc, eid2) defer func() { unsubscribe(t, wsc, subId2) @@ -206,7 +204,7 @@ func TestWSCallNoWait(t *testing.T) { // susbscribe to the new contract amt = uint64(10001) - eid := exe_events.EventStringAccOutput(contractAddr) + eid := exe_events.EventStringAccountOutput(contractAddr) subId := subscribeAndGetSubscriptionId(t, wsc, eid) defer unsubscribe(t, wsc, subId) // get the return value from a call @@ -235,7 +233,7 @@ func TestWSCallCall(t *testing.T) { contractAddr1 := receipt.ContractAddr // subscribe to the new contracts - eid := evm_events.EventStringAccCall(contractAddr1) + eid := evm_events.EventStringAccountCall(contractAddr1) subId := subscribeAndGetSubscriptionId(t, wsc, eid) defer unsubscribe(t, wsc, subId) // call contract2, which should call contract1, and wait for ev1 @@ -250,7 +248,7 @@ func TestWSCallCall(t *testing.T) { func() { }, // Event Checker - func(eid string, b event.AnyEventData) (bool, error) { + func(eventID string, resultEvent *rpc.ResultEvent) (bool, error) { return true, nil }) // call it @@ -268,21 +266,19 @@ func TestWSCallCall(t *testing.T) { func TestSubscribe(t *testing.T) { wsc := newWSClient() var subId string - subscribe(t, wsc, tm_types.EventStringNewBlock()) + subscribe(t, wsc, tm_types.EventNewBlock) - // timeout to check subscription process is live - timeout := time.After(timeoutSeconds * time.Second) Subscribe: for { select { - case <-timeout: + case <-time.After(timeoutSeconds * time.Second): t.Fatal("Timed out waiting for subscription result") case response := <-wsc.ResponsesCh: require.Nil(t, response.Error) res := new(rpc.ResultSubscribe) - require.NoError(t, json.Unmarshal(*response.Result, res)) - assert.Equal(t, tm_types.EventStringNewBlock(), res.EventID) + require.NoError(t, json.Unmarshal(response.Result, res)) + assert.Equal(t, tm_types.EventNewBlock, res.EventID) subId = res.SubscriptionID break Subscribe } @@ -302,9 +298,9 @@ Subscribe: case response := <-wsc.ResponsesCh: require.Nil(t, response.Error) - if response.ID == tm_client.EventResponseID(tm_types.EventStringNewBlock()) { + if response.ID == tm_client.EventResponseID(tm_types.EventNewBlock) { res := new(rpc.ResultEvent) - json.Unmarshal(*response.Result, res) + json.Unmarshal(response.Result, res) enb := res.EventDataNewBlock() if enb != nil { assert.Equal(t, genesisDoc.ChainID(), enb.Block.ChainID) diff --git a/rpc/tm/integration/websocket_helpers.go b/rpc/tm/integration/websocket_helpers.go index 6140a844..c2c2b1bf 100644 --- a/rpc/tm/integration/websocket_helpers.go +++ b/rpc/tm/integration/websocket_helpers.go @@ -25,12 +25,11 @@ import ( "time" acm "github.com/hyperledger/burrow/account" - "github.com/hyperledger/burrow/event" "github.com/hyperledger/burrow/rpc" tm_client "github.com/hyperledger/burrow/rpc/tm/client" "github.com/hyperledger/burrow/txs" "github.com/stretchr/testify/require" - rpcclient "github.com/tendermint/tendermint/rpc/lib/client" + "github.com/tendermint/tendermint/rpc/lib/client" tm_types "github.com/tendermint/tendermint/types" ) @@ -42,11 +41,12 @@ const ( //-------------------------------------------------------------------------------- // Utilities for testing the websocket service type blockPredicate func(block *tm_types.Block) bool +type resultEventChecker func(eventID string, resultEvent *rpc.ResultEvent) (bool, error) // create a new connection func newWSClient() *rpcclient.WSClient { wsc := rpcclient.NewWSClient(websocketAddr, websocketEndpoint) - if _, err := wsc.Start(); err != nil { + if err := wsc.Start(); err != nil { panic(err) } return wsc @@ -74,10 +74,10 @@ func subscribeAndGetSubscriptionId(t *testing.T, wsc *rpcclient.WSClient, eventI case <-timeout.C: t.Fatal("Timeout waiting for subscription result") case response := <-wsc.ResponsesCh: - require.Nil(t, response.Error, "got error response from websocket channel") + require.Nil(t, response.Error, "Got error response from websocket channel: %v", response.Error) if response.ID == tm_client.SubscribeRequestID { res := new(rpc.ResultSubscribe) - err := json.Unmarshal(*response.Result, res) + err := json.Unmarshal(response.Result, res) if err == nil { return res.SubscriptionID } @@ -107,7 +107,7 @@ func broadcastTxAndWaitForBlock(t *testing.T, client tm_client.RPCClient, wsc *r } func nextBlockPredicateFn() blockPredicate { - initialHeight := -1 + initialHeight := int64(-1) return func(block *tm_types.Block) bool { if initialHeight <= 0 { initialHeight = block.Height @@ -134,22 +134,20 @@ func waitNBlocks(t *testing.T, wsc *rpcclient.WSClient, n int) { } func runThenWaitForBlock(t *testing.T, wsc *rpcclient.WSClient, predicate blockPredicate, runner func()) { - eventDataChecker := func(event string, eventData event.AnyEventData) (bool, error) { + eventDataChecker := func(event string, eventData *rpc.ResultEvent) (bool, error) { eventDataNewBlock := eventData.EventDataNewBlock() if eventDataNewBlock == nil { return false, fmt.Errorf("could not convert %#v to EventDataNewBlock", eventData) } return predicate(eventDataNewBlock.Block), nil } - subscribeAndWaitForNext(t, wsc, tm_types.EventStringNewBlock(), runner, eventDataChecker) + subscribeAndWaitForNext(t, wsc, tm_types.EventNewBlock, runner, eventDataChecker) } -func subscribeAndWaitForNext(t *testing.T, wsc *rpcclient.WSClient, event string, runner func(), - eventDataChecker func(string, event.AnyEventData) (bool, error)) { - +func subscribeAndWaitForNext(t *testing.T, wsc *rpcclient.WSClient, event string, runner func(), checker resultEventChecker) { subId := subscribeAndGetSubscriptionId(t, wsc, event) defer unsubscribe(t, wsc, subId) - waitForEvent(t, wsc, event, runner, eventDataChecker) + waitForEvent(t, wsc, event, runner, checker) } // waitForEvent executes runner that is expected to trigger events. It then @@ -160,10 +158,10 @@ func subscribeAndWaitForNext(t *testing.T, wsc *rpcclient.WSClient, event string // waitForEvent will keep listening for new events. If an error is returned // waitForEvent will fail the test. func waitForEvent(t *testing.T, wsc *rpcclient.WSClient, eventID string, runner func(), - eventDataChecker func(string, event.AnyEventData) (bool, error)) waitForEventResult { + checker resultEventChecker) waitForEventResult { // go routine to wait for websocket msg - eventsCh := make(chan event.AnyEventData) + eventsCh := make(chan *rpc.ResultEvent) shutdownEventsCh := make(chan bool, 1) errCh := make(chan error) @@ -172,26 +170,26 @@ func waitForEvent(t *testing.T, wsc *rpcclient.WSClient, eventID string, runner // Read message go func() { - LOOP: for { select { case <-shutdownEventsCh: - break LOOP + return case r := <-wsc.ResponsesCh: if r.Error != nil { errCh <- r.Error - break LOOP + return } if r.ID == tm_client.EventResponseID(eventID) { - fmt.Println(string(*r.Result)) resultEvent := new(rpc.ResultEvent) - err := json.Unmarshal(*r.Result, resultEvent) - if err == nil { - eventsCh <- resultEvent.AnyEventData + err := json.Unmarshal(r.Result, resultEvent) + if err != nil { + errCh <- err + } else { + eventsCh <- resultEvent } } case <-wsc.Quit: - break LOOP + return } } }() @@ -206,7 +204,7 @@ func waitForEvent(t *testing.T, wsc *rpcclient.WSClient, eventID string, runner return waitForEventResult{timeout: true} case eventData := <-eventsCh: // run the check - stopWaiting, err := eventDataChecker(eventID, eventData) + stopWaiting, err := checker(eventID, eventData) require.NoError(t, err) if stopWaiting { return waitForEventResult{} @@ -228,11 +226,11 @@ func (err waitForEventResult) Timeout() bool { //-------------------------------------------------------------------------------- -func unmarshalValidateSend(amt uint64, toAddr acm.Address) func(string, event.AnyEventData) (bool, error) { - return func(eid string, eventData event.AnyEventData) (bool, error) { - data := eventData.EventDataTx() +func unmarshalValidateSend(amt uint64, toAddr acm.Address) resultEventChecker { + return func(eventID string, resultEvent *rpc.ResultEvent) (bool, error) { + data := resultEvent.EventDataTx if data == nil { - return true, fmt.Errorf("event data %s is not EventDataTx", eventData) + return true, fmt.Errorf("event data %s is not EventDataTx", resultEvent) } if data.Exception != "" { return true, fmt.Errorf(data.Exception) @@ -253,11 +251,11 @@ func unmarshalValidateSend(amt uint64, toAddr acm.Address) func(string, event.An } } -func unmarshalValidateTx(amt uint64, returnCode []byte) func(string, event.AnyEventData) (bool, error) { - return func(eid string, eventData event.AnyEventData) (bool, error) { - data := eventData.EventDataTx() +func unmarshalValidateTx(amt uint64, returnCode []byte) resultEventChecker { + return func(eventID string, resultEvent *rpc.ResultEvent) (bool, error) { + data := resultEvent.EventDataTx if data == nil { - return true, fmt.Errorf("event data %s is not EventDataTx", eventData) + return true, fmt.Errorf("event data %s is not EventDataTx", resultEvent) } if data.Exception != "" { return true, fmt.Errorf(data.Exception) @@ -279,11 +277,11 @@ func unmarshalValidateTx(amt uint64, returnCode []byte) func(string, event.AnyEv } } -func unmarshalValidateCall(origin acm.Address, returnCode []byte, txid *[]byte) func(string, event.AnyEventData) (bool, error) { - return func(eid string, eventData event.AnyEventData) (bool, error) { - data := eventData.EventDataCall() +func unmarshalValidateCall(origin acm.Address, returnCode []byte, txid *[]byte) resultEventChecker { + return func(eventID string, resultEvent *rpc.ResultEvent) (bool, error) { + data := resultEvent.EventDataCall if data == nil { - return true, fmt.Errorf("event data %s is not EventDataTx", eventData) + return true, fmt.Errorf("event data %s is not EventDataTx", resultEvent) } if data.Exception != "" { return true, fmt.Errorf(data.Exception) diff --git a/rpc/tm/methods.go b/rpc/tm/methods.go index c2d2da7b..2686f35a 100644 --- a/rpc/tm/methods.go +++ b/rpc/tm/methods.go @@ -3,6 +3,9 @@ package tm import ( "fmt" + "context" + "time" + acm "github.com/hyperledger/burrow/account" "github.com/hyperledger/burrow/event" "github.com/hyperledger/burrow/execution" @@ -12,6 +15,7 @@ import ( "github.com/tendermint/tendermint/rpc/lib/types" ) +// Method names const ( Subscribe = "subscribe" Unsubscribe = "unsubscribe" @@ -51,6 +55,8 @@ const ( SignTx = "unsafe/sign_tx" ) +const SubscriptionTimeoutSeconds = 5 * time.Second + func GetRoutes(service rpc.Service) map[string]*gorpc.RPCFunc { return map[string]*gorpc.RPCFunc{ // Transact @@ -60,7 +66,7 @@ func GetRoutes(service rpc.Service) map[string]*gorpc.RPCFunc { return nil, err } return &rpc.ResultBroadcastTx{ - Receipt: receipt, + Receipt: *receipt, }, nil }, "tx"), @@ -76,7 +82,7 @@ func GetRoutes(service rpc.Service) map[string]*gorpc.RPCFunc { if err != nil { return nil, err } - return &rpc.ResultCall{Call: call}, nil + return &rpc.ResultCall{Call: *call}, nil }, "fromAddress,toAddress,data"), CallCode: gorpc.NewRPCFunc(func(fromAddress acm.Address, code, data []byte) (*rpc.ResultCall, error) { @@ -84,32 +90,35 @@ func GetRoutes(service rpc.Service) map[string]*gorpc.RPCFunc { if err != nil { return nil, err } - return &rpc.ResultCall{Call: call}, nil + return &rpc.ResultCall{Call: *call}, nil }, "fromAddress,code,data"), // Events Subscribe: gorpc.NewWSRPCFunc(func(wsCtx rpctypes.WSRPCContext, eventID string) (*rpc.ResultSubscribe, error) { - subID, err := event.GenerateSubscriptionID() + subscriptionID, err := event.GenerateSubscriptionID() if err != nil { return nil, err } - err = service.Subscribe(subID, eventID, - func(eventData event.AnyEventData) { - // NOTE: EventSwitch callbacks must be nonblocking - wsCtx.TryWriteRPCResponse(rpctypes.NewRPCSuccessResponse(EventResponseID(wsCtx.Request.ID, eventID), - rpc.ResultEvent{Event: eventID, AnyEventData: eventData})) - }) + ctx, cancel := context.WithTimeout(context.Background(), SubscriptionTimeoutSeconds*time.Second) + defer cancel() + err = service.Subscribe(ctx, subscriptionID, eventID, func(resultEvent *rpc.ResultEvent) { + wsCtx.TryWriteRPCResponse(rpctypes.NewRPCSuccessResponse(EventResponseID(wsCtx.Request.ID, eventID), + resultEvent)) + }) if err != nil { return nil, err } return &rpc.ResultSubscribe{ EventID: eventID, - SubscriptionID: subID, + SubscriptionID: subscriptionID, }, nil }, "eventID"), Unsubscribe: gorpc.NewWSRPCFunc(func(wsCtx rpctypes.WSRPCContext, subscriptionID string) (*rpc.ResultUnsubscribe, error) { - err := service.Unsubscribe(subscriptionID) + ctx, cancel := context.WithTimeout(context.Background(), SubscriptionTimeoutSeconds*time.Second) + defer cancel() + // Since our model uses a random subscription ID per request we just drop all matching requests + err := service.Unsubscribe(ctx, subscriptionID) if err != nil { return nil, err } diff --git a/rpc/tm/server.go b/rpc/tm/server.go index 13726d58..13193f34 100644 --- a/rpc/tm/server.go +++ b/rpc/tm/server.go @@ -19,20 +19,20 @@ import ( "net/http" "github.com/hyperledger/burrow/consensus/tendermint" + "github.com/hyperledger/burrow/event" "github.com/hyperledger/burrow/logging/structure" logging_types "github.com/hyperledger/burrow/logging/types" "github.com/hyperledger/burrow/rpc" "github.com/tendermint/tendermint/rpc/lib/server" - "github.com/tendermint/tmlibs/events" ) -func StartServer(service rpc.Service, pattern, listenAddress string, evsw events.EventSwitch, +func StartServer(service rpc.Service, pattern, listenAddress string, emitter event.Emitter, logger logging_types.InfoTraceLogger) (net.Listener, error) { logger = logger.With(structure.ComponentKey, "RPC_TM") routes := GetRoutes(service) mux := http.NewServeMux() - wm := rpcserver.NewWebsocketManager(routes, evsw) + wm := rpcserver.NewWebsocketManager(routes, rpcserver.EventSubscriber(tendermint.SubscribableAsEventBus(emitter))) mux.HandleFunc(pattern, wm.WebsocketHandler) tmLogger := tendermint.NewLogger(logger) rpcserver.RegisterRPCFuncs(mux, routes, tmLogger) diff --git a/rpc/v0/json_service.go b/rpc/v0/json_service.go index ba2fbfad..78d87dd4 100644 --- a/rpc/v0/json_service.go +++ b/rpc/v0/json_service.go @@ -20,11 +20,25 @@ import ( "net/http" "github.com/gin-gonic/gin" - "github.com/hyperledger/burrow/event" "github.com/hyperledger/burrow/rpc" "github.com/hyperledger/burrow/rpc/v0/server" ) +// EventSubscribe +type EventSub struct { + SubId string `json:"sub_id"` +} + +// EventUnsubscribe +type EventUnsub struct { + Result bool `json:"result"` +} + +// EventPoll +type PollResponse struct { + Events []interface{} `json:"events"` +} + // Server used to handle JSON-RPC 2.0 requests. Implements server.Server type JsonRpcServer struct { service server.HttpService @@ -67,7 +81,7 @@ func (jrs *JsonRpcServer) handleFunc(c *gin.Context) { type JSONService struct { codec rpc.Codec service rpc.Service - eventSubs *event.Subscriptions + eventSubs *Subscriptions defaultHandlers map[string]RequestHandlerFunc } @@ -77,7 +91,7 @@ func NewJSONService(codec rpc.Codec, service rpc.Service) server.HttpService { tmhttps := &JSONService{ codec: codec, service: service, - eventSubs: event.NewSubscriptions(service), + eventSubs: NewSubscriptions(service), } dhMap := GetMethods(codec, service) @@ -150,7 +164,7 @@ func (js *JSONService) writeResponse(id string, result interface{}, w http.Respo // *************************************** Events ************************************ -// Subscribe to an event. +// Subscribe to an func (js *JSONService) EventSubscribe(request *rpc.RPCRequest, requester interface{}) (interface{}, int, error) { param := &EventIdParam{} @@ -163,10 +177,10 @@ func (js *JSONService) EventSubscribe(request *rpc.RPCRequest, if errC != nil { return nil, rpc.INTERNAL_ERROR, errC } - return &event.EventSub{subId}, 0, nil + return &EventSub{subId}, 0, nil } -// Un-subscribe from an event. +// Un-subscribe from an func (js *JSONService) EventUnsubscribe(request *rpc.RPCRequest, requester interface{}) (interface{}, int, error) { param := &SubIdParam{} err := json.Unmarshal(request.Params, param) @@ -175,11 +189,11 @@ func (js *JSONService) EventUnsubscribe(request *rpc.RPCRequest, requester inter } subId := param.SubId - err = js.service.Unsubscribe(subId) + err = js.service.Unsubscribe(context.Background(), subId) if err != nil { return nil, rpc.INTERNAL_ERROR, err } - return &event.EventUnsub{true}, 0, nil + return &EventUnsub{true}, 0, nil } // Check subscription event cache for new data. @@ -196,5 +210,5 @@ func (js *JSONService) EventPoll(request *rpc.RPCRequest, if errC != nil { return nil, rpc.INTERNAL_ERROR, errC } - return &event.PollResponse{result}, 0, nil + return &PollResponse{result}, 0, nil } diff --git a/event/subscriptions.go b/rpc/v0/subscriptions.go similarity index 79% rename from event/subscriptions.go rename to rpc/v0/subscriptions.go index 98356a54..80a3d372 100644 --- a/event/subscriptions.go +++ b/rpc/v0/subscriptions.go @@ -12,12 +12,16 @@ // See the License for the specific language governing permissions and // limitations under the License. -package event +package v0 import ( + "context" "fmt" "sync" "time" + + "github.com/hyperledger/burrow/event" + "github.com/hyperledger/burrow/rpc" ) var ( @@ -57,18 +61,18 @@ func (subsCache *SubscriptionsCache) poll() []interface{} { // Catches events that callers subscribe to and adds them to an array ready to be polled. type Subscriptions struct { - mtx *sync.RWMutex - subscribable Subscribable - subs map[string]*SubscriptionsCache - reap bool + mtx *sync.RWMutex + service rpc.Service + subs map[string]*SubscriptionsCache + reap bool } -func NewSubscriptions(subscribable Subscribable) *Subscriptions { +func NewSubscriptions(service rpc.Service) *Subscriptions { es := &Subscriptions{ - mtx: &sync.RWMutex{}, - subscribable: subscribable, - subs: make(map[string]*SubscriptionsCache), - reap: true, + mtx: &sync.RWMutex{}, + service: service, + subs: make(map[string]*SubscriptionsCache), + reap: true, } go reap(es) return es @@ -85,7 +89,7 @@ func reap(es *Subscriptions) { if time.Since(sub.ts) > reaperThreshold { // Seems like Go is ok with this.. delete(es.subs, id) - es.subscribable.Unsubscribe(id) + es.service.Unsubscribe(context.Background(), id) } } go reap(es) @@ -97,24 +101,24 @@ func reap(es *Subscriptions) { // happen it's for an insignificant amount of time (the time it takes to // carry out SubscriptionsCache.poll() ). func (subs *Subscriptions) Add(eventId string) (string, error) { - subId, errSID := GenerateSubscriptionID() - if errSID != nil { - return "", errSID + subId, err := event.GenerateSubscriptionID() + if err != nil { + return "", err } cache := newSubscriptionsCache() - errC := subs.subscribable.Subscribe(subId, eventId, - func(evt AnyEventData) { - cache.mtx.Lock() - defer cache.mtx.Unlock() - cache.events = append(cache.events, evt) - }) + err = subs.service.Subscribe(context.Background(), subId, eventId, func(resultEvent *rpc.ResultEvent) { + cache.mtx.Lock() + defer cache.mtx.Unlock() + cache.events = append(cache.events, resultEvent) + }) + if err != nil { + return "", err + } cache.subId = subId subs.mtx.Lock() defer subs.mtx.Unlock() subs.subs[subId] = cache - if errC != nil { - return "", errC - } + return subId, nil } diff --git a/event/subscriptions_test.go b/rpc/v0/subscriptions_test.go similarity index 70% rename from event/subscriptions_test.go rename to rpc/v0/subscriptions_test.go index c02bbb70..45d2130a 100644 --- a/event/subscriptions_test.go +++ b/rpc/v0/subscriptions_test.go @@ -12,87 +12,23 @@ // See the License for the specific language governing permissions and // limitations under the License. -package event +package v0 import ( "encoding/hex" "fmt" "runtime" - "sync" "testing" "time" + "github.com/hyperledger/burrow/event" + "github.com/hyperledger/burrow/logging/loggers" + "github.com/hyperledger/burrow/rpc" "github.com/stretchr/testify/assert" ) var mockInterval = 20 * time.Millisecond -type mockSub struct { - subId string - eventId string - f func(AnyEventData) - sdChan chan struct{} -} - -type mockEventData struct { - subId string - eventId string -} - -func (eventData mockEventData) AssertIsEVMEventData() {} - -// A mock event -func newMockSub(subId, eventId string, f func(AnyEventData)) mockSub { - return mockSub{subId, eventId, f, make(chan struct{})} -} - -type mockEventEmitter struct { - subs map[string]mockSub - mutex *sync.Mutex -} - -func newMockEventEmitter() *mockEventEmitter { - return &mockEventEmitter{make(map[string]mockSub), &sync.Mutex{}} -} - -func (mee *mockEventEmitter) Subscribe(subId, eventId string, callback func(AnyEventData)) error { - if _, ok := mee.subs[subId]; ok { - return nil - } - me := newMockSub(subId, eventId, callback) - mee.mutex.Lock() - mee.subs[subId] = me - mee.mutex.Unlock() - - go func() { - for { - select { - case <-me.sdChan: - mee.mutex.Lock() - delete(mee.subs, subId) - mee.mutex.Unlock() - return - case <-time.After(mockInterval): - me.f(AnyEventData{BurrowEventData: &EventData{ - EventDataInner: mockEventData{subId: subId, eventId: eventId}, - }}) - } - } - }() - return nil -} - -func (mee *mockEventEmitter) Unsubscribe(subId string) error { - mee.mutex.Lock() - sub, ok := mee.subs[subId] - mee.mutex.Unlock() - if !ok { - return nil - } - sub.sdChan <- struct{}{} - return nil -} - // Test that event subscriptions can be added manually and then automatically reaped. func TestSubReaping(t *testing.T) { runtime.GOMAXPROCS(runtime.NumCPU()) @@ -100,8 +36,8 @@ func TestSubReaping(t *testing.T) { reaperThreshold = 200 * time.Millisecond reaperTimeout = 100 * time.Millisecond - mee := newMockEventEmitter() - eSubs := NewSubscriptions(mee) + mee := event.NewEmitter(loggers.NewNoopInfoTraceLogger()) + eSubs := NewSubscriptions(rpc.NewSubscribableService(mee, loggers.NewNoopInfoTraceLogger())) doneChan := make(chan error) go func() { for i := 0; i < NUM_SUBS; i++ { @@ -133,7 +69,6 @@ func TestSubReaping(t *testing.T) { } time.Sleep(1100 * time.Millisecond) - assert.Len(t, mee.subs, 0) assert.Len(t, eSubs.subs, 0) t.Logf("Added %d subs that were all automatically reaped.", NUM_SUBS) } @@ -145,8 +80,8 @@ func TestSubManualClose(t *testing.T) { reaperThreshold = 10000 * time.Millisecond reaperTimeout = 10000 * time.Millisecond - mee := newMockEventEmitter() - eSubs := NewSubscriptions(mee) + mee := event.NewEmitter(loggers.NewNoopInfoTraceLogger()) + eSubs := NewSubscriptions(rpc.NewSubscribableService(mee, loggers.NewNoopInfoTraceLogger())) doneChan := make(chan error) go func() { for i := 0; i < NUM_SUBS; i++ { @@ -193,8 +128,8 @@ func TestSubFlooding(t *testing.T) { reaperTimeout = 10000 * time.Millisecond // Crank it up. Now pressure is 10 times higher on each sub. mockInterval = 1 * time.Millisecond - mee := newMockEventEmitter() - eSubs := NewSubscriptions(mee) + mee := event.NewEmitter(loggers.NewNoopInfoTraceLogger()) + eSubs := NewSubscriptions(rpc.NewSubscribableService(mee, loggers.NewNoopInfoTraceLogger())) doneChan := make(chan error) go func() { for i := 0; i < NUM_SUBS; i++ { diff --git a/rpc/v0/websocket_service.go b/rpc/v0/websocket_service.go index ce6611da..0e0e036f 100644 --- a/rpc/v0/websocket_service.go +++ b/rpc/v0/websocket_service.go @@ -18,6 +18,8 @@ import ( "encoding/json" "fmt" + "context" + "github.com/hyperledger/burrow/event" "github.com/hyperledger/burrow/rpc" "github.com/hyperledger/burrow/rpc/v0/server" @@ -123,13 +125,13 @@ func (ws *WebsocketService) EventSubscribe(request *rpc.RPCRequest, return nil, rpc.INTERNAL_ERROR, err } - err = ws.service.Subscribe(subId, eventId, func(eventData event.AnyEventData) { - ws.writeResponse(subId, eventData.Get(), session) + err = ws.service.Subscribe(context.Background(), subId, eventId, func(resultEvent *rpc.ResultEvent) { + ws.writeResponse(subId, resultEvent, session) }) if err != nil { return nil, rpc.INTERNAL_ERROR, err } - return &event.EventSub{SubId: subId}, 0, nil + return &EventSub{SubId: subId}, 0, nil } func (ws *WebsocketService) EventUnsubscribe(request *rpc.RPCRequest, requester interface{}) (interface{}, int, error) { @@ -139,11 +141,11 @@ func (ws *WebsocketService) EventUnsubscribe(request *rpc.RPCRequest, requester return nil, rpc.INVALID_PARAMS, err } - err = ws.service.Unsubscribe(param.SubId) + err = ws.service.Unsubscribe(context.Background(), param.SubId) if err != nil { return nil, rpc.INTERNAL_ERROR, err } - return &event.EventUnsub{Result: true}, 0, nil + return &EventUnsub{Result: true}, 0, nil } func (ws *WebsocketService) EventPoll(request *rpc.RPCRequest, requester interface{}) (interface{}, int, error) { diff --git a/core/server.go b/server/server.go similarity index 77% rename from core/server.go rename to server/server.go index 110f5477..08a70efb 100644 --- a/core/server.go +++ b/server/server.go @@ -1,4 +1,4 @@ -package core +package server import ( "context" @@ -11,7 +11,13 @@ type Server interface { Shutdown(context.Context) error } -type ServerLauncher struct { +type ShutdownFunc func(context.Context) error + +func (sf ShutdownFunc) Shutdown(ctx context.Context) error { + return sf(ctx) +} + +type Launcher struct { Name string Launch func() (Server, error) } @@ -22,7 +28,7 @@ type listenersServer struct { } // Providers a Server implementation from Listeners that are closed on shutdown -func ListenersServer(listeners ...net.Listener) Server { +func FromListeners(listeners ...net.Listener) Server { lns := make(map[net.Listener]struct{}, len(listeners)) for _, l := range listeners { lns[l] = struct{}{} diff --git a/txs/tx.go b/txs/tx.go index 407e9fe1..fe46f93a 100644 --- a/txs/tx.go +++ b/txs/tx.go @@ -98,6 +98,7 @@ var mapper = data.NewMapper(Wrapper{}). //----------------------------------------------------------------------------- type ( + // TODO: replace with sum-type struct like ResultEvent Tx interface { WriteSignBytes(chainID string, w io.Writer, n *int, err *error) } @@ -115,44 +116,44 @@ type ( } SendTx struct { - Inputs []*TxInput `json:"inputs"` - Outputs []*TxOutput `json:"outputs"` + Inputs []*TxInput + Outputs []*TxOutput } // BroadcastTx or Transact Receipt struct { - TxHash []byte `json:"tx_hash"` - CreatesContract bool `json:"creates_contract"` - ContractAddr acm.Address `json:"contract_addr"` + TxHash []byte + CreatesContract bool + ContractAddr acm.Address } NameTx struct { - Input *TxInput `json:"input"` - Name string `json:"name"` - Data string `json:"data"` - Fee uint64 `json:"fee"` + Input *TxInput + Name string + Data string + Fee uint64 } CallTx struct { - Input *TxInput `json:"input"` + Input *TxInput // Pointer since CallTx defines unset 'to' address as inducing account creation - Address *acm.Address `json:"address"` - GasLimit uint64 `json:"gas_limit"` - Fee uint64 `json:"fee"` - Data []byte `json:"data"` + Address *acm.Address + GasLimit uint64 + Fee uint64 + Data []byte } TxInput struct { - Address acm.Address `json:"address"` // Hash of the PublicKey - Amount uint64 `json:"amount"` // Must not exceed account balance - Sequence uint64 `json:"sequence"` // Must be 1 greater than the last committed TxInput - Signature acm.Signature `json:"signature"` // Depends on the PublicKey type and the whole Tx - PubKey acm.PublicKey `json:"pub_key"` // Must not be nil, may be nil + Address acm.Address + Amount uint64 + Sequence uint64 + Signature acm.Signature + PubKey acm.PublicKey } TxOutput struct { - Address acm.Address `json:"address"` // Hash of the PublicKey - Amount uint64 `json:"amount"` // The sum of all outputs must not exceed the inputs. + Address acm.Address + Amount uint64 } ) @@ -305,10 +306,10 @@ func (tx *NameTx) String() string { //----------------------------------------------------------------------------- type BondTx struct { - PubKey acm.PublicKey `json:"pub_key"` // NOTE: these don't have type byte - Signature acm.Signature `json:"signature"` - Inputs []*TxInput `json:"inputs"` - UnbondTo []*TxOutput `json:"unbond_to"` + PubKey acm.PublicKey + Signature acm.Signature + Inputs []*TxInput + UnbondTo []*TxOutput } func (tx *BondTx) WriteSignBytes(chainID string, w io.Writer, n *int, err *error) { @@ -339,9 +340,9 @@ func (tx *BondTx) String() string { //----------------------------------------------------------------------------- type UnbondTx struct { - Address acm.Address `json:"address"` - Height int `json:"height"` - Signature acm.Signature `json:"signature"` + Address acm.Address + Height int + Signature acm.Signature } func (tx *UnbondTx) WriteSignBytes(chainID string, w io.Writer, n *int, err *error) { @@ -356,9 +357,9 @@ func (tx *UnbondTx) String() string { //----------------------------------------------------------------------------- type RebondTx struct { - Address acm.Address `json:"address"` - Height int `json:"height"` - Signature acm.Signature `json:"signature"` + Address acm.Address + Height int + Signature acm.Signature } func (tx *RebondTx) WriteSignBytes(chainID string, w io.Writer, n *int, err *error) { -- GitLab