Skip to content
Snippets Groups Projects
pipe.go 19.4 KiB
Newer Older
// Copyright 2015, 2016 Eris Industries (UK) Ltd.
// This file is part of Eris-RT

// Eris-RT is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Eris-RT is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Eris-RT.  If not, see <http://www.gnu.org/licenses/>.

package erismint

Silas Davis's avatar
Silas Davis committed
	"bytes"
	"fmt"
	tm_common "github.com/tendermint/go-common"
Silas Davis's avatar
Silas Davis committed
	crypto "github.com/tendermint/go-crypto"
	db "github.com/tendermint/go-db"
	go_events "github.com/tendermint/go-events"
Silas Davis's avatar
Silas Davis committed
	wire "github.com/tendermint/go-wire"
	tm_types "github.com/tendermint/tendermint/types"
Silas Davis's avatar
Silas Davis committed
	tmsp_types "github.com/tendermint/tmsp/types"

	log "github.com/eris-ltd/eris-logger"

	"github.com/eris-ltd/eris-db/account"
	imath "github.com/eris-ltd/eris-db/common/math/integral"
	"github.com/eris-ltd/eris-db/config"
Silas Davis's avatar
Silas Davis committed
	core_types "github.com/eris-ltd/eris-db/core/types"
	"github.com/eris-ltd/eris-db/definitions"
	edb_event "github.com/eris-ltd/eris-db/event"
Silas Davis's avatar
Silas Davis committed
	vm "github.com/eris-ltd/eris-db/manager/eris-mint/evm"
	"github.com/eris-ltd/eris-db/manager/eris-mint/state"
Silas Davis's avatar
Silas Davis committed
	state_types "github.com/eris-ltd/eris-db/manager/eris-mint/state/types"
	manager_types "github.com/eris-ltd/eris-db/manager/types"
	rpc_tm_types "github.com/eris-ltd/eris-db/rpc/tendermint/core/types"
Silas Davis's avatar
Silas Davis committed
	"github.com/eris-ltd/eris-db/txs"
Silas Davis's avatar
Silas Davis committed
	erisMintState *state.State
	erisMint      *ErisMint
	// Pipe implementations
	accounts   *accounts
	blockchain *blockchain
	consensus  *consensus
Silas Davis's avatar
Silas Davis committed
	namereg    *namereg
	network    *network
	transactor *transactor
	// Consensus interface
	consensusEngine definitions.ConsensusEngine
Silas Davis's avatar
Silas Davis committed
	genesisDoc   *state_types.GenesisDoc
	genesisState *state.State
// NOTE [ben] Compiler check to ensure erisMintPipe successfully implements
Benjamin Bollen's avatar
Benjamin Bollen committed
// eris-db/definitions.Pipe
var _ definitions.Pipe = (*erisMintPipe)(nil)
// NOTE [ben] Compiler check to ensure erisMintPipe successfully implements
// eris-db/definitions.erisTendermintPipe
var _ definitions.TendermintPipe = (*erisMintPipe)(nil)
func NewErisMintPipe(moduleConfig *config.ModuleConfig,
	eventSwitch *go_events.EventSwitch) (*erisMintPipe, error) {
Silas Davis's avatar
Silas Davis committed

	startedState, genesisDoc, err := startState(moduleConfig.DataDir,
		moduleConfig.Config.GetString("db_backend"), moduleConfig.GenesisFile,
		moduleConfig.ChainId)
	if err != nil {
		return nil, fmt.Errorf("Failed to start state: %v", err)
	}
	// assert ChainId matches genesis ChainId
	log.WithFields(log.Fields{
		"chainId":         startedState.ChainID,
		"lastBlockHeight": startedState.LastBlockHeight,
		"lastBlockHash":   startedState.LastBlockHash,
	}).Debug("Loaded state")
	// start the application
	erisMint := NewErisMint(startedState, eventSwitch)

	// NOTE: [ben] Set Host opens an RPC pipe to Tendermint;  this is a remnant
	// of the old Eris-DB / Tendermint and should be considered as an in-process
	// call when possible
	tendermintHost := moduleConfig.Config.GetString("tendermint_host")
	erisMint.SetHostAddress(tendermintHost)

	// initialise the components of the pipe
	events := edb_event.NewEvents(eventSwitch)
Silas Davis's avatar
Silas Davis committed
	accounts := newAccounts(erisMint)
	namereg := newNameReg(erisMint)
	transactor := newTransactor(moduleConfig.ChainId, eventSwitch, erisMint,
		events)
	// TODO: make interface to tendermint core's rpc for these
	// blockchain := newBlockchain(chainID, genDocFile, blockStore)
	// consensus := newConsensus(erisdbApp)
	// net := newNetwork(erisdbApp)

Silas Davis's avatar
Silas Davis committed
		erisMintState: startedState,
		erisMint:      erisMint,
		accounts:      accounts,
		events:        events,
		namereg:       namereg,
		transactor:    transactor,
Silas Davis's avatar
Silas Davis committed
		consensus:     nil,
Silas Davis's avatar
Silas Davis committed
		genesisDoc:   genesisDoc,
		genesisState: nil,
	}, nil
//------------------------------------------------------------------------------
// Start state

// Start state tries to load the existing state in the data directory;
// if an existing database can be loaded, it will validate that the
// chainId in the genesis of that loaded state matches the asserted chainId.
// If no state can be loaded, the JSON genesis file will be loaded into the
// state database as the zero state.
func startState(dataDir, backend, genesisFile, chainId string) (*state.State,
Silas Davis's avatar
Silas Davis committed
	*state_types.GenesisDoc, error) {
	// avoid Tendermints PanicSanity and return a clean error
	if backend != db.DBBackendMemDB &&
		backend != db.DBBackendLevelDB {
		return nil, nil, fmt.Errorf("Database backend %s is not supported by %s",
			backend, GetErisMintVersion)
	}

	stateDB := db.NewDB("erismint", backend, dataDir)
	newState := state.LoadState(stateDB)
	var genesisDoc *state_types.GenesisDoc
	if newState == nil {
		genesisDoc, newState = state.MakeGenesisStateFromFile(stateDB, genesisFile)
		newState.Save()
		buf, n, err := new(bytes.Buffer), new(int), new(error)
		wire.WriteJSON(genesisDoc, buf, n, err)
		stateDB.Set(state_types.GenDocKey, buf.Bytes())
		if *err != nil {
			return nil, nil, fmt.Errorf("Unable to write genesisDoc to db: %v", err)
		loadedGenesisDocBytes := stateDB.Get(state_types.GenDocKey)
		err := new(error)
		wire.ReadJSONPtr(&genesisDoc, loadedGenesisDocBytes, err)
		if *err != nil {
			return nil, nil, fmt.Errorf("Unable to read genesisDoc from db on startState: %v", err)
Silas Davis's avatar
Silas Davis committed
		// assert loaded genesis doc has the same chainId as the provided chainId
		if genesisDoc.ChainID != chainId {
			log.WithFields(log.Fields{
				"chainId from loaded genesis": genesisDoc.ChainID,
				"chainId from configuration":  chainId,
			}).Warn("Conflicting chainIds")
			// return nil, nil, fmt.Errorf("ChainId (%s) loaded from genesis document in existing database does not match configuration chainId (%s).",
			// genesisDoc.ChainID, chainId)
Silas Davis's avatar
Silas Davis committed
		}
Silas Davis's avatar
Silas Davis committed
	return newState, genesisDoc, nil

//------------------------------------------------------------------------------
// Implement definitions.Pipe for erisMintPipe
func (pipe *erisMintPipe) Accounts() definitions.Accounts {
Silas Davis's avatar
Silas Davis committed
	return pipe.accounts
func (pipe *erisMintPipe) Blockchain() definitions.Blockchain {
Silas Davis's avatar
Silas Davis committed
	return pipe.blockchain
func (pipe *erisMintPipe) Consensus() definitions.Consensus {
Silas Davis's avatar
Silas Davis committed
	return pipe.consensus
func (pipe *erisMintPipe) Events() edb_event.EventEmitter {
Silas Davis's avatar
Silas Davis committed
	return pipe.events
func (pipe *erisMintPipe) NameReg() definitions.NameReg {
Silas Davis's avatar
Silas Davis committed
	return pipe.namereg
func (pipe *erisMintPipe) Net() definitions.Net {
Silas Davis's avatar
Silas Davis committed
	return pipe.network
func (pipe *erisMintPipe) Transactor() definitions.Transactor {
Silas Davis's avatar
Silas Davis committed
	return pipe.transactor
func (pipe *erisMintPipe) GetApplication() manager_types.Application {
Silas Davis's avatar
Silas Davis committed
	return pipe.erisMint
func (pipe *erisMintPipe) SetConsensusEngine(
Silas Davis's avatar
Silas Davis committed
	consensus definitions.ConsensusEngine) error {
	if pipe.consensusEngine == nil {
		pipe.consensusEngine = consensus
	} else {
		return fmt.Errorf("Failed to set consensus engine for pipe; already set")
	}
	return nil
func (pipe *erisMintPipe) GetConsensusEngine() definitions.ConsensusEngine {
Silas Davis's avatar
Silas Davis committed
	return pipe.consensusEngine
}

func (pipe *erisMintPipe) GetTendermintPipe() (definitions.TendermintPipe,
	error) {
	return definitions.TendermintPipe(pipe), nil
}

func (pipe *erisMintPipe) consensusAndManagerEvents() edb_event.EventEmitter {
	// NOTE: [Silas] We could initialise this lazily and use the cached instance,
	// but for the time being that feels like a premature optimisation
	return edb_event.Multiplex(pipe.events, pipe.consensusEngine.Events())
}

//------------------------------------------------------------------------------
// Implement definitions.TendermintPipe for erisMintPipe
func (pipe *erisMintPipe) Subscribe(event string,
	rpcResponseWriter func(result rpc_tm_types.ErisDBResult)) (*rpc_tm_types.ResultSubscribe, error) {
	subscriptionId, err := edb_event.GenerateSubId()
	if err != nil {
		return nil, err
	}

	log.WithFields(log.Fields{"event": event, "subscriptionId": subscriptionId}).
	pipe.consensusAndManagerEvents().Subscribe(subscriptionId, event,
			result := rpc_tm_types.ErisDBResult(&rpc_tm_types.ResultEvent{event,
			// NOTE: EventSwitch callbacks must be nonblocking
			rpcResponseWriter(result)
		})
	return &rpc_tm_types.ResultSubscribe{
		SubscriptionId: subscriptionId,
		Event: event,
	}, nil
func (pipe *erisMintPipe) Unsubscribe(subscriptionId string) (*rpc_tm_types.ResultUnsubscribe, error) {
	log.WithFields(log.Fields{"subscriptionId": subscriptionId}).
		Info("Unsubscribing from event")
	pipe.consensusAndManagerEvents().Unsubscribe(subscriptionId)
	return &rpc_tm_types.ResultUnsubscribe{SubscriptionId: subscriptionId}, nil
func (pipe *erisMintPipe) Status() (*rpc_tm_types.ResultStatus, error) {
	memoryDatabase := db.NewMemDB()
	if pipe.genesisState == nil {
		pipe.genesisState = state.MakeGenesisState(memoryDatabase, pipe.genesisDoc)
	}
	genesisHash := pipe.genesisState.Hash()
	if pipe.consensusEngine == nil {
		return nil, fmt.Errorf("Consensus Engine is not set in pipe.")
	}
	latestHeight := pipe.consensusEngine.Height()
	var (
		latestBlockHash []byte
		latestBlockTime int64
	)
	if latestHeight != 0 {
		latestBlockMeta = pipe.consensusEngine.LoadBlockMeta(latestHeight)
		latestBlockHash = latestBlockMeta.Hash
		latestBlockTime = latestBlockMeta.Header.Time.UnixNano()
	}
		NodeInfo:          pipe.consensusEngine.NodeInfo(),
		GenesisHash:       genesisHash,
		PubKey:            pipe.consensusEngine.PublicValidatorKey(),
		LatestBlockHash:   latestBlockHash,
		LatestBlockHeight: latestHeight,
		LatestBlockTime:   latestBlockTime}, nil
}

func (pipe *erisMintPipe) NetInfo() (*rpc_tm_types.ResultNetInfo, error) {
	listening := pipe.consensusEngine.IsListening()
	listeners := []string{}
	for _, listener := range pipe.consensusEngine.Listeners() {
		listeners = append(listeners, listener.String())
	}
Silas Davis's avatar
Silas Davis committed
	peers := pipe.consensusEngine.Peers()
		Listening: listening,
		Listeners: listeners,
		Peers:     peers,
	}, nil
func (pipe *erisMintPipe) Genesis() (*rpc_tm_types.ResultGenesis, error) {
		// TODO: [ben] sharing pointer to unmutated GenesisDoc, but is not immutable
		Genesis: pipe.genesisDoc,
	}, nil
func (pipe *erisMintPipe) GetAccount(address []byte) (*rpc_tm_types.ResultGetAccount,
	cache := pipe.erisMint.GetCheckCache()
	// cache := mempoolReactor.Mempool.GetCache()
	account := cache.GetAccount(address)
	if account == nil {
		log.Warn("Nil Account")
		return &rpc_tm_types.ResultGetAccount{nil}, nil
	return &rpc_tm_types.ResultGetAccount{account}, nil
func (pipe *erisMintPipe) ListAccounts() (*rpc_tm_types.ResultListAccounts, error) {
	var blockHeight int
	var accounts []*account.Account
	state := pipe.erisMint.GetState()
	blockHeight = state.LastBlockHeight
	state.GetAccounts().Iterate(func(key []byte, value []byte) bool {
		accounts = append(accounts, account.DecodeAccount(value))
		return false
	})
	return &rpc_tm_types.ResultListAccounts{blockHeight, accounts}, nil
func (pipe *erisMintPipe) GetStorage(address, key []byte) (*rpc_tm_types.ResultGetStorage,
	state := pipe.erisMint.GetState()
	// state := consensusState.GetState()
	account := state.GetAccount(address)
	if account == nil {
		return nil, fmt.Errorf("UnknownAddress: %X", address)
	}
	storageRoot := account.StorageRoot
	storageTree := state.LoadStorage(storageRoot)

	_, value, exists := storageTree.Get(
		tm_common.LeftPadWord256(key).Bytes())
	if !exists {
		// value == nil {
		return &rpc_tm_types.ResultGetStorage{key, nil}, nil
	return &rpc_tm_types.ResultGetStorage{key, value}, nil
func (pipe *erisMintPipe) DumpStorage(address []byte) (*rpc_tm_types.ResultDumpStorage,
	state := pipe.erisMint.GetState()
	account := state.GetAccount(address)
	if account == nil {
		return nil, fmt.Errorf("UnknownAddress: %X", address)
	}
	storageRoot := account.StorageRoot
	storageTree := state.LoadStorage(storageRoot)
	storageItems := []rpc_tm_types.StorageItem{}
	storageTree.Iterate(func(key []byte, value []byte) bool {
		storageItems = append(storageItems, rpc_tm_types.StorageItem{key,
	return &rpc_tm_types.ResultDumpStorage{storageRoot, storageItems}, nil
func (pipe *erisMintPipe) Call(fromAddress, toAddress, data []byte) (*rpc_tm_types.ResultCall,
	st := pipe.erisMint.GetState()
	cache := state.NewBlockCache(st)
	outAcc := cache.GetAccount(toAddress)
	if outAcc == nil {
		return nil, fmt.Errorf("Account %x does not exist", toAddress)
	}
	callee := toVMAccount(outAcc)
	caller := &vm.Account{Address: tm_common.LeftPadWord256(fromAddress)}
	txCache := state.NewTxCache(cache)
	params := vm.Params{
		BlockHeight: int64(st.LastBlockHeight),
		BlockHash:   tm_common.LeftPadWord256(st.LastBlockHash),
		BlockTime:   st.LastBlockTime.Unix(),
		GasLimit:    st.GetGasLimit(),
	}

	vmach := vm.NewVM(txCache, params, caller.Address, nil)
	gas := st.GetGasLimit()
	ret, err := vmach.Call(caller, callee, callee.Code, data, 0, &gas)
	if err != nil {
		return nil, err
	}
	return &rpc_tm_types.ResultCall{Return: ret}, nil
func (pipe *erisMintPipe) CallCode(fromAddress, code, data []byte) (*rpc_tm_types.ResultCall,
	st := pipe.erisMint.GetState()
	cache := pipe.erisMint.GetCheckCache()
	callee := &vm.Account{Address: tm_common.LeftPadWord256(fromAddress)}
	caller := &vm.Account{Address: tm_common.LeftPadWord256(fromAddress)}
	txCache := state.NewTxCache(cache)
	params := vm.Params{
		BlockHeight: int64(st.LastBlockHeight),
		BlockHash:   tm_common.LeftPadWord256(st.LastBlockHash),
		BlockTime:   st.LastBlockTime.Unix(),
		GasLimit:    st.GetGasLimit(),
	}

	vmach := vm.NewVM(txCache, params, caller.Address, nil)
	gas := st.GetGasLimit()
	ret, err := vmach.Call(caller, callee, code, data, 0, &gas)
	if err != nil {
		return nil, err
	}
	return &rpc_tm_types.ResultCall{Return: ret}, nil
}

// TODO: [ben] deprecate as we should not allow unsafe behaviour
// where a user is allowed to send a private key over the wire,
// especially unencrypted.
func (pipe *erisMintPipe) SignTransaction(tx txs.Tx,
	privAccounts []*account.PrivAccount) (*rpc_tm_types.ResultSignTx,

	for i, privAccount := range privAccounts {
		if privAccount == nil || privAccount.PrivKey == nil {
			return nil, fmt.Errorf("Invalid (empty) privAccount @%v", i)
		}
	}
	switch tx.(type) {
Silas Davis's avatar
Silas Davis committed
	case *txs.SendTx:
		sendTx := tx.(*txs.SendTx)
		for i, input := range sendTx.Inputs {
			input.PubKey = privAccounts[i].PubKey
			input.Signature = privAccounts[i].Sign(pipe.transactor.chainID, sendTx)
		}
Silas Davis's avatar
Silas Davis committed
	case *txs.CallTx:
		callTx := tx.(*txs.CallTx)
		callTx.Input.PubKey = privAccounts[0].PubKey
		callTx.Input.Signature = privAccounts[0].Sign(pipe.transactor.chainID, callTx)
Silas Davis's avatar
Silas Davis committed
	case *txs.BondTx:
		bondTx := tx.(*txs.BondTx)
		// the first privaccount corresponds to the BondTx pub key.
		// the rest to the inputs
		bondTx.Signature = privAccounts[0].Sign(pipe.transactor.chainID, bondTx).(crypto.SignatureEd25519)
		for i, input := range bondTx.Inputs {
			input.PubKey = privAccounts[i+1].PubKey
			input.Signature = privAccounts[i+1].Sign(pipe.transactor.chainID, bondTx)
		}
Silas Davis's avatar
Silas Davis committed
	case *txs.UnbondTx:
		unbondTx := tx.(*txs.UnbondTx)
		unbondTx.Signature = privAccounts[0].Sign(pipe.transactor.chainID, unbondTx).(crypto.SignatureEd25519)
Silas Davis's avatar
Silas Davis committed
	case *txs.RebondTx:
		rebondTx := tx.(*txs.RebondTx)
		rebondTx.Signature = privAccounts[0].Sign(pipe.transactor.chainID, rebondTx).(crypto.SignatureEd25519)
	}
	return &rpc_tm_types.ResultSignTx{tx}, nil
func (pipe *erisMintPipe) GetName(name string) (*rpc_tm_types.ResultGetName, error) {
	currentState := pipe.erisMint.GetState()
	entry := currentState.GetNameRegEntry(name)
	if entry == nil {
		return nil, fmt.Errorf("Name %s not found", name)
	}
	return &rpc_tm_types.ResultGetName{entry}, nil
func (pipe *erisMintPipe) ListNames() (*rpc_tm_types.ResultListNames, error) {
Silas Davis's avatar
Silas Davis committed
	var names []*core_types.NameRegEntry
	currentState := pipe.erisMint.GetState()
	blockHeight = currentState.LastBlockHeight
	currentState.GetNames().Iterate(func(key []byte, value []byte) bool {
		names = append(names, state.DecodeNameRegEntry(value))
		return false
	})
	return &rpc_tm_types.ResultListNames{blockHeight, names}, nil
Silas Davis's avatar
Silas Davis committed
// NOTE: txs must be signed
func (pipe *erisMintPipe) BroadcastTxAsync(tx txs.Tx) (
	*rpc_tm_types.ResultBroadcastTx, error) {
Silas Davis's avatar
Silas Davis committed
	err := pipe.consensusEngine.BroadcastTransaction(txs.EncodeTx(tx), nil)
Silas Davis's avatar
Silas Davis committed
		return nil, fmt.Errorf("Error broadcasting txs: %v", err)
	return &rpc_tm_types.ResultBroadcastTx{}, nil
func (pipe *erisMintPipe) BroadcastTxSync(tx txs.Tx) (*rpc_tm_types.ResultBroadcastTx,
	responseChannel := make(chan *tmsp_types.Response, 1)
Silas Davis's avatar
Silas Davis committed
	err := pipe.consensusEngine.BroadcastTransaction(txs.EncodeTx(tx),
		func(res *tmsp_types.Response) {
			responseChannel <- res
		})
Silas Davis's avatar
Silas Davis committed
		return nil, fmt.Errorf("Error broadcasting txs: %v", err)
Silas Davis's avatar
Silas Davis committed
	// NOTE: [ben] This Response is set in /consensus/tendermint/local_client.go
	// a call to Application, here implemented by ErisMint, over local callback,
	// or TMSP RPC call.  Hence the result is determined by ErisMint/erismint.go
	// CheckTx() Result (Result converted to ReqRes into Response returned here)
Silas Davis's avatar
Silas Davis committed
	// NOTE: [ben] BroadcastTx just calls CheckTx in Tendermint (oddly... [Silas])
	response := <-responseChannel
	responseCheckTx := response.GetCheckTx()
	if responseCheckTx == nil {
		return nil, fmt.Errorf("Error, application did not return CheckTx response.")
	}
	resultBroadCastTx := &rpc_tm_types.ResultBroadcastTx{
		Code: responseCheckTx.Code,
		Data: responseCheckTx.Data,
		Log:  responseCheckTx.Log,
	}
	switch responseCheckTx.Code {
	case tmsp_types.CodeType_OK:
		return resultBroadCastTx, nil
	case tmsp_types.CodeType_EncodingError:
		return resultBroadCastTx, fmt.Errorf(resultBroadCastTx.Log)
	case tmsp_types.CodeType_InternalError:
		return resultBroadCastTx, fmt.Errorf(resultBroadCastTx.Log)
	default:
		log.WithFields(log.Fields{
Silas Davis's avatar
Silas Davis committed
			"application":    GetErisMintVersion().GetVersionString(),
			"TMSP_code_type": responseCheckTx.Code,
		}).Warn("Unknown error returned from Tendermint CheckTx on BroadcastTxSync")
		return resultBroadCastTx, fmt.Errorf("Unknown error returned: " + responseCheckTx.Log)
	}

// Returns the current blockchain height and metadata for a range of blocks
// between minHeight and maxHeight. Only returns maxBlockLookback block metadata
// from the top of the range of blocks.
// Passing 0 for maxHeight sets the upper height of the range to the current
// blockchain height.
func (pipe *erisMintPipe) BlockchainInfo(minHeight, maxHeight,
	maxBlockLookback int) (*rpc_tm_types.ResultBlockchainInfo, error) {

	latestHeight := pipe.consensusEngine.Height()
		maxHeight = latestHeight
		maxHeight = imath.MinInt(latestHeight, maxHeight)
	}
	if minHeight < 1 {
		minHeight = imath.MaxInt(1, maxHeight-maxBlockLookback)
	}

	blockMetas := []*tm_types.BlockMeta{}
	for height := maxHeight; height >= minHeight; height-- {
		blockMeta := pipe.consensusEngine.LoadBlockMeta(height)
	return &rpc_tm_types.ResultBlockchainInfo{latestHeight, blockMetas}, nil