diff --git a/gno.land/cmd/gnoland/start.go b/gno.land/cmd/gnoland/start.go index b2134d86ea9..3914cc7775c 100644 --- a/gno.land/cmd/gnoland/start.go +++ b/gno.land/cmd/gnoland/start.go @@ -2,6 +2,7 @@ package main import ( "context" + "errors" "flag" "fmt" "path/filepath" @@ -17,6 +18,9 @@ import ( "github.com/gnolang/gno/tm2/pkg/bft/config" "github.com/gnolang/gno/tm2/pkg/bft/node" "github.com/gnolang/gno/tm2/pkg/bft/privval" + "github.com/gnolang/gno/tm2/pkg/bft/state/eventstore/file" + "github.com/gnolang/gno/tm2/pkg/bft/state/eventstore/null" + eventstorecfg "github.com/gnolang/gno/tm2/pkg/bft/state/eventstore/types" bft "github.com/gnolang/gno/tm2/pkg/bft/types" "github.com/gnolang/gno/tm2/pkg/commands" "github.com/gnolang/gno/tm2/pkg/crypto" @@ -35,6 +39,9 @@ type startCfg struct { rootDir string genesisMaxVMCycles int64 config string + + txEventStoreType string + txEventStorePath string } func newStartCmd(io *commands.IO) *commands.Command { @@ -116,6 +123,29 @@ func (c *startCfg) RegisterFlags(fs *flag.FlagSet) { "", "config file (optional)", ) + + fs.StringVar( + &c.txEventStoreType, + "tx-event-store-type", + null.EventStoreType, + fmt.Sprintf( + "type of transaction event store [%s]", + strings.Join( + []string{ + null.EventStoreType, + file.EventStoreType, + }, + ", ", + ), + ), + ) + + fs.StringVar( + &c.txEventStorePath, + "tx-event-store-path", + "", + fmt.Sprintf("path for the file tx event store (required if event store is '%s')", file.EventStoreType), + ) } func execStart(c *startCfg, args []string, io *commands.IO) error { @@ -145,6 +175,14 @@ func execStart(c *startCfg, args []string, io *commands.IO) error { writeGenesisFile(genDoc, genesisFilePath) } + // Initialize the indexer config + txEventStoreCfg, err := getTxEventStoreConfig(c) + if err != nil { + return fmt.Errorf("unable to parse indexer config, %w", err) + } + + cfg.TxEventStore = txEventStoreCfg + // create application and node. gnoApp, err := gnoland.NewApp(rootDir, c.skipFailingGenesisTxs, logger, c.genesisMaxVMCycles) if err != nil { @@ -180,6 +218,30 @@ func execStart(c *startCfg, args []string, io *commands.IO) error { select {} // run forever } +// getTxEventStoreConfig constructs an event store config from provided user options +func getTxEventStoreConfig(c *startCfg) (*eventstorecfg.Config, error) { + var cfg *eventstorecfg.Config + + switch c.txEventStoreType { + case file.EventStoreType: + if c.txEventStorePath == "" { + return nil, errors.New("unspecified file transaction indexer path") + } + + // Fill out the configuration + cfg = &eventstorecfg.Config{ + EventStoreType: file.EventStoreType, + Params: map[string]any{ + file.Path: c.txEventStorePath, + }, + } + default: + cfg = eventstorecfg.DefaultEventStoreConfig() + } + + return cfg, nil +} + // Makes a local test genesis doc with local privValidator. func makeGenesisDoc( pvPub crypto.PubKey, diff --git a/tm2/pkg/autofile/group.go b/tm2/pkg/autofile/group.go index 3350e1e62c5..189d7818bd8 100644 --- a/tm2/pkg/autofile/group.go +++ b/tm2/pkg/autofile/group.go @@ -125,7 +125,11 @@ func (g *Group) OnStart() error { // OnStop implements service.Service by stopping the goroutine described above. // NOTE: g.Head must be closed separately using Close. func (g *Group) OnStop() { - g.FlushAndSync() + if err := g.FlushAndSync(); err != nil { + g.Logger.Error( + fmt.Sprintf("unable to gracefully flush data, %s", err.Error()), + ) + } } // Wait blocks until all internal goroutines are finished. Supposed to be @@ -136,11 +140,20 @@ func (g *Group) Wait() { // Close closes the head file. The group must be stopped by this moment. func (g *Group) Close() { - g.FlushAndSync() + if err := g.FlushAndSync(); err != nil { + g.Logger.Error( + fmt.Sprintf("unable to gracefully flush data, %s", err.Error()), + ) + } g.mtx.Lock() - _ = g.Head.Close() - g.mtx.Unlock() + defer g.mtx.Unlock() + + if err := g.Head.Close(); err != nil { + g.Logger.Error( + fmt.Sprintf("unable to gracefully close group head, %s", err.Error()), + ) + } } // HeadSizeLimit returns the current head size limit. diff --git a/tm2/pkg/bft/config/config.go b/tm2/pkg/bft/config/config.go index 6f148c3b5c1..e05f514a284 100644 --- a/tm2/pkg/bft/config/config.go +++ b/tm2/pkg/bft/config/config.go @@ -9,6 +9,7 @@ import ( cns "github.com/gnolang/gno/tm2/pkg/bft/consensus/config" mem "github.com/gnolang/gno/tm2/pkg/bft/mempool/config" rpc "github.com/gnolang/gno/tm2/pkg/bft/rpc/config" + eventstore "github.com/gnolang/gno/tm2/pkg/bft/state/eventstore/types" "github.com/gnolang/gno/tm2/pkg/errors" osm "github.com/gnolang/gno/tm2/pkg/os" p2p "github.com/gnolang/gno/tm2/pkg/p2p/config" @@ -20,20 +21,22 @@ type Config struct { BaseConfig `toml:",squash"` // Options for services - RPC *rpc.RPCConfig `toml:"rpc"` - P2P *p2p.P2PConfig `toml:"p2p"` - Mempool *mem.MempoolConfig `toml:"mempool"` - Consensus *cns.ConsensusConfig `toml:"consensus"` + RPC *rpc.RPCConfig `toml:"rpc"` + P2P *p2p.P2PConfig `toml:"p2p"` + Mempool *mem.MempoolConfig `toml:"mempool"` + Consensus *cns.ConsensusConfig `toml:"consensus"` + TxEventStore *eventstore.Config `toml:"tx_event_store"` } // DefaultConfig returns a default configuration for a Tendermint node func DefaultConfig() *Config { return &Config{ - BaseConfig: DefaultBaseConfig(), - RPC: rpc.DefaultRPCConfig(), - P2P: p2p.DefaultP2PConfig(), - Mempool: mem.DefaultMempoolConfig(), - Consensus: cns.DefaultConsensusConfig(), + BaseConfig: DefaultBaseConfig(), + RPC: rpc.DefaultRPCConfig(), + P2P: p2p.DefaultP2PConfig(), + Mempool: mem.DefaultMempoolConfig(), + Consensus: cns.DefaultConsensusConfig(), + TxEventStore: eventstore.DefaultEventStoreConfig(), } } @@ -68,11 +71,12 @@ func LoadOrMakeConfigWithOptions(root string, options ConfigOptions) (cfg *Confi // TestConfig returns a configuration that can be used for testing func TestConfig() *Config { return &Config{ - BaseConfig: TestBaseConfig(), - RPC: rpc.TestRPCConfig(), - P2P: p2p.TestP2PConfig(), - Mempool: mem.TestMempoolConfig(), - Consensus: cns.TestConsensusConfig(), + BaseConfig: TestBaseConfig(), + RPC: rpc.TestRPCConfig(), + P2P: p2p.TestP2PConfig(), + Mempool: mem.TestMempoolConfig(), + Consensus: cns.TestConsensusConfig(), + TxEventStore: eventstore.DefaultEventStoreConfig(), } } @@ -121,7 +125,7 @@ func (cfg *Config) ValidateBasic() error { return nil } -//----------------------------------------------------------------------------- +// ----------------------------------------------------------------------------- // BaseConfig const ( diff --git a/tm2/pkg/bft/node/node.go b/tm2/pkg/bft/node/node.go index 23b42cec6b9..bdeb5061540 100644 --- a/tm2/pkg/bft/node/node.go +++ b/tm2/pkg/bft/node/node.go @@ -11,6 +11,7 @@ import ( "strings" "time" + "github.com/gnolang/gno/tm2/pkg/bft/state/eventstore/file" "github.com/rs/cors" "github.com/gnolang/gno/tm2/pkg/amino" @@ -25,8 +26,8 @@ import ( _ "github.com/gnolang/gno/tm2/pkg/bft/rpc/core/types" rpcserver "github.com/gnolang/gno/tm2/pkg/bft/rpc/lib/server" sm "github.com/gnolang/gno/tm2/pkg/bft/state" - "github.com/gnolang/gno/tm2/pkg/bft/state/txindex" - "github.com/gnolang/gno/tm2/pkg/bft/state/txindex/null" + "github.com/gnolang/gno/tm2/pkg/bft/state/eventstore" + "github.com/gnolang/gno/tm2/pkg/bft/state/eventstore/null" "github.com/gnolang/gno/tm2/pkg/bft/store" "github.com/gnolang/gno/tm2/pkg/bft/types" tmtime "github.com/gnolang/gno/tm2/pkg/bft/types/time" @@ -154,18 +155,18 @@ type Node struct { isListening bool // services - evsw events.EventSwitch - stateDB dbm.DB - blockStore *store.BlockStore // store the blockchain to disk - bcReactor p2p.Reactor // for fast-syncing - mempoolReactor *mempl.Reactor // for gossipping transactions - mempool mempl.Mempool - consensusState *cs.ConsensusState // latest consensus state - consensusReactor *cs.ConsensusReactor // for participating in the consensus - proxyApp proxy.AppConns // connection to the application - rpcListeners []net.Listener // rpc servers - txIndexer txindex.TxIndexer - indexerService *txindex.IndexerService + evsw events.EventSwitch + stateDB dbm.DB + blockStore *store.BlockStore // store the blockchain to disk + bcReactor p2p.Reactor // for fast-syncing + mempoolReactor *mempl.Reactor // for gossipping transactions + mempool mempl.Mempool + consensusState *cs.ConsensusState // latest consensus state + consensusReactor *cs.ConsensusReactor // for participating in the consensus + proxyApp proxy.AppConns // connection to the application + rpcListeners []net.Listener // rpc servers + txEventStore eventstore.TxEventStore + eventStoreService *eventstore.Service } func initDBs(config *cfg.Config, dbProvider DBProvider) (blockStore *store.BlockStore, stateDB dbm.DB, err error) { @@ -193,36 +194,36 @@ func createAndStartProxyAppConns(clientCreator proxy.ClientCreator, logger log.L return proxyApp, nil } -func createAndStartIndexerService(config *cfg.Config, dbProvider DBProvider, - evsw events.EventSwitch, logger log.Logger, -) (*txindex.IndexerService, txindex.TxIndexer, error) { - var txIndexer txindex.TxIndexer = &null.TxIndex{} - /* - switch config.TxIndex.Indexer { - case "kv": - store, err := dbProvider(&DBContext{"tx_index", config}) - if err != nil { - return nil, nil, err - } - switch { - case config.TxIndex.IndexTags != "": - txIndexer = kv.NewTxIndex(store, kv.IndexTags(splitAndTrimEmpty(config.TxIndex.IndexTags, ",", " "))) - case config.TxIndex.IndexAllTags: - txIndexer = kv.NewTxIndex(store, kv.IndexAllTags()) - default: - txIndexer = kv.NewTxIndex(store) - } - default: - txIndexer = &null.TxIndex{} +func createAndStartEventStoreService( + cfg *cfg.Config, + evsw events.EventSwitch, + logger log.Logger, +) (*eventstore.Service, eventstore.TxEventStore, error) { + var ( + err error + txEventStore eventstore.TxEventStore + ) + + // Instantiate the event store based on the configuration + switch cfg.TxEventStore.EventStoreType { + case file.EventStoreType: + // Transaction events should be logged to files + txEventStore, err = file.NewTxEventStore(cfg.TxEventStore) + if err != nil { + return nil, nil, fmt.Errorf("unable to create file tx event store, %w", err) } - */ + default: + // Transaction event storing should be omitted + txEventStore = null.NewNullEventStore() + } - indexerService := txindex.NewIndexerService(txIndexer, evsw) - indexerService.SetLogger(logger.With("module", "txindex")) + indexerService := eventstore.NewEventStoreService(txEventStore, evsw) + indexerService.SetLogger(logger.With("module", "eventstore")) if err := indexerService.Start(); err != nil { return nil, nil, err } - return indexerService, txIndexer, nil + + return indexerService, txEventStore, nil } func doHandshake(stateDB dbm.DB, state sm.State, blockStore sm.BlockStore, @@ -431,14 +432,14 @@ func NewNode(config *cfg.Config, return nil, err } - // EventSwitch and IndexerService must be started before the handshake because - // we might need to index the txs of the replayed block as this might not have happened + // EventSwitch and EventStoreService must be started before the handshake because + // we might need to store the txs of the replayed block as this might not have happened // when the node stopped last time (i.e. the node stopped after it saved the block // but before it indexed the txs, or, endblocker panicked) evsw := events.NewEventSwitch() - // Transaction indexing - indexerService, txIndexer, err := createAndStartIndexerService(config, dbProvider, evsw, logger) + // Transaction event storing + eventStoreService, txEventStore, err := createAndStartEventStoreService(config, evsw, logger) if err != nil { return nil, err } @@ -500,7 +501,7 @@ func NewNode(config *cfg.Config, privValidator, fastSync, evsw, consensusLogger, ) - nodeInfo, err := makeNodeInfo(config, nodeKey, txIndexer, genDoc, state) + nodeInfo, err := makeNodeInfo(config, nodeKey, txEventStore, genDoc, state) if err != nil { return nil, errors.Wrap(err, "error making NodeInfo") } @@ -541,17 +542,17 @@ func NewNode(config *cfg.Config, nodeInfo: nodeInfo, nodeKey: nodeKey, - evsw: evsw, - stateDB: stateDB, - blockStore: blockStore, - bcReactor: bcReactor, - mempoolReactor: mempoolReactor, - mempool: mempool, - consensusState: consensusState, - consensusReactor: consensusReactor, - proxyApp: proxyApp, - txIndexer: txIndexer, - indexerService: indexerService, + evsw: evsw, + stateDB: stateDB, + blockStore: blockStore, + bcReactor: bcReactor, + mempoolReactor: mempoolReactor, + mempool: mempool, + consensusState: consensusState, + consensusReactor: consensusReactor, + proxyApp: proxyApp, + txEventStore: txEventStore, + eventStoreService: eventStoreService, } node.BaseService = *service.NewBaseService(logger, "Node", node) @@ -626,7 +627,7 @@ func (n *Node) OnStop() { // first stop the non-reactor services n.evsw.Stop() - n.indexerService.Stop() + n.eventStoreService.Stop() // now stop the reactors n.sw.Stop() @@ -664,7 +665,7 @@ func (n *Node) ConfigureRPC() { rpccore.SetPubKey(pubKey) rpccore.SetGenesisDoc(n.genesisDoc) rpccore.SetProxyAppQuery(n.proxyApp.Query()) - rpccore.SetTxIndexer(n.txIndexer) + rpccore.SetTxEventStore(n.txEventStore) rpccore.SetConsensusReactor(n.consensusReactor) rpccore.SetLogger(n.Logger.With("module", "rpc")) rpccore.SetEventSwitch(n.evsw) @@ -839,15 +840,13 @@ func (n *Node) NodeInfo() p2p.NodeInfo { func makeNodeInfo( config *cfg.Config, nodeKey *p2p.NodeKey, - txIndexer txindex.TxIndexer, + txEventStore eventstore.TxEventStore, genDoc *types.GenesisDoc, state sm.State, ) (p2p.NodeInfo, error) { - txIndexerStatus := "on" - if _, ok := txIndexer.(*null.TxIndex); ok { - txIndexerStatus = "off" - } else if txIndexer == nil { - txIndexerStatus = "none" + txIndexerStatus := eventstore.StatusOff + if txEventStore.GetType() != null.EventStoreType { + txIndexerStatus = eventstore.StatusOn } bcChannel := bc.BlockchainChannel diff --git a/tm2/pkg/bft/rpc/core/pipe.go b/tm2/pkg/bft/rpc/core/pipe.go index fd6c8fc0692..a8b102d9ab7 100644 --- a/tm2/pkg/bft/rpc/core/pipe.go +++ b/tm2/pkg/bft/rpc/core/pipe.go @@ -10,7 +10,7 @@ import ( "github.com/gnolang/gno/tm2/pkg/bft/proxy" cfg "github.com/gnolang/gno/tm2/pkg/bft/rpc/config" sm "github.com/gnolang/gno/tm2/pkg/bft/state" - "github.com/gnolang/gno/tm2/pkg/bft/state/txindex" + "github.com/gnolang/gno/tm2/pkg/bft/state/eventstore" "github.com/gnolang/gno/tm2/pkg/bft/types" "github.com/gnolang/gno/tm2/pkg/crypto" dbm "github.com/gnolang/gno/tm2/pkg/db" @@ -25,7 +25,7 @@ const ( maxPerPage = 100 ) -//---------------------------------------------- +// ---------------------------------------------- // These interfaces are used by RPC and must be thread safe type Consensus interface { @@ -50,7 +50,7 @@ type peers interface { Peers() p2p.IPeerSet } -//---------------------------------------------- +// ---------------------------------------------- // These package level globals come with setters // that are expected to be called only once, on startup @@ -68,7 +68,7 @@ var ( // objects pubKey crypto.PubKey genDoc *types.GenesisDoc // cache the genesis structure - txIndexer txindex.TxIndexer + txEventStore eventstore.TxEventStore consensusReactor *consensus.ConsensusReactor evsw events.EventSwitch gTxDispatcher *txDispatcher @@ -115,8 +115,8 @@ func SetProxyAppQuery(appConn proxy.AppConnQuery) { proxyAppQuery = appConn } -func SetTxIndexer(indexer txindex.TxIndexer) { - txIndexer = indexer +func SetTxEventStore(indexer eventstore.TxEventStore) { + txEventStore = indexer } func SetConsensusReactor(conR *consensus.ConsensusReactor) { diff --git a/tm2/pkg/bft/state/eventstore/file/file.go b/tm2/pkg/bft/state/eventstore/file/file.go new file mode 100644 index 00000000000..f4cd74721f5 --- /dev/null +++ b/tm2/pkg/bft/state/eventstore/file/file.go @@ -0,0 +1,92 @@ +package file + +import ( + "fmt" + + "github.com/gnolang/gno/tm2/pkg/amino" + "github.com/gnolang/gno/tm2/pkg/autofile" + storetypes "github.com/gnolang/gno/tm2/pkg/bft/state/eventstore/types" + "github.com/gnolang/gno/tm2/pkg/bft/types" + "github.com/gnolang/gno/tm2/pkg/errors" +) + +const ( + EventStoreType = "file" + Path = "path" +) + +var ( + errMissingPath = errors.New("missing path param") + errInvalidType = errors.New("invalid config for file event store specified") +) + +// TxEventStore is the implementation of a transaction event store +// that outputs to the local filesystem +type TxEventStore struct { + headPath string + group *autofile.Group +} + +// NewTxEventStore creates a new file-based tx event store +func NewTxEventStore(cfg *storetypes.Config) (*TxEventStore, error) { + // Parse config params + if EventStoreType != cfg.EventStoreType { + return nil, errInvalidType + } + + headPath, ok := cfg.GetParam(Path).(string) + if !ok { + return nil, errMissingPath + } + + return &TxEventStore{ + headPath: headPath, + }, nil +} + +// Start starts the file transaction event store, by opening the autofile group +func (t *TxEventStore) Start() error { + // Open the group + group, err := autofile.OpenGroup(t.headPath) + if err != nil { + return fmt.Errorf("unable to open file group for writing, %w", err) + } + + t.group = group + + return nil +} + +// Stop stops the file transaction event store, by closing the autofile group +func (t *TxEventStore) Stop() error { + // Close off the group + t.group.Close() + + return nil +} + +// GetType returns the file transaction event store type +func (t *TxEventStore) GetType() string { + return EventStoreType +} + +// Append marshals the transaction using amino, and writes it to the disk +func (t *TxEventStore) Append(tx types.TxResult) error { + // Serialize the transaction using amino + txRaw, err := amino.MarshalJSON(tx) + if err != nil { + return fmt.Errorf("unable to marshal transaction, %w", err) + } + + // Write the serialized transaction info to the file group + if err = t.group.WriteLine(string(txRaw)); err != nil { + return fmt.Errorf("unable to save transaction event, %w", err) + } + + // Flush output to storage + if err := t.group.FlushAndSync(); err != nil { + return fmt.Errorf("unable to flush and sync transaction event, %w", err) + } + + return nil +} diff --git a/tm2/pkg/bft/state/eventstore/file/file_test.go b/tm2/pkg/bft/state/eventstore/file/file_test.go new file mode 100644 index 00000000000..46d87582ce4 --- /dev/null +++ b/tm2/pkg/bft/state/eventstore/file/file_test.go @@ -0,0 +1,140 @@ +package file + +import ( + "bufio" + "testing" + + "github.com/gnolang/gno/tm2/pkg/amino" + storetypes "github.com/gnolang/gno/tm2/pkg/bft/state/eventstore/types" + "github.com/gnolang/gno/tm2/pkg/bft/types" + "github.com/gnolang/gno/tm2/pkg/testutils" + "github.com/stretchr/testify/assert" +) + +// generateTestTransactions generates random transaction results +func generateTestTransactions(count int) []types.TxResult { + txs := make([]types.TxResult, count) + + for i := 0; i < count; i++ { + txs[i] = types.TxResult{} + } + + return txs +} + +func TestTxEventStore_New(t *testing.T) { + t.Parallel() + + t.Run("invalid file path specified", func(t *testing.T) { + t.Parallel() + + cfg := &storetypes.Config{ + EventStoreType: "invalid", + } + + i, err := NewTxEventStore(cfg) + + assert.Nil(t, i) + assert.ErrorIs(t, err, errInvalidType) + }) + + t.Run("invalid file path specified", func(t *testing.T) { + t.Parallel() + + cfg := &storetypes.Config{ + EventStoreType: EventStoreType, + Params: nil, + } + + i, err := NewTxEventStore(cfg) + + assert.Nil(t, i) + assert.ErrorIs(t, err, errMissingPath) + }) + + t.Run("valid file path specified", func(t *testing.T) { + t.Parallel() + + headPath := "." + + cfg := &storetypes.Config{ + EventStoreType: EventStoreType, + Params: map[string]any{ + Path: headPath, + }, + } + + i, err := NewTxEventStore(cfg) + if i == nil { + t.Fatalf("unable to create event store") + } + + assert.NoError(t, err) + assert.Equal(t, headPath, i.headPath) + assert.Equal(t, EventStoreType, i.GetType()) + }) +} + +func TestTxEventStore_Append(t *testing.T) { + t.Parallel() + + headFile, cleanup := testutils.NewTestFile(t) + t.Cleanup(func() { + cleanup() + }) + + eventStore, err := NewTxEventStore(&storetypes.Config{ + EventStoreType: EventStoreType, + Params: map[string]any{ + Path: headFile.Name(), + }, + }) + if err != nil { + t.Fatalf("unable to create tx event store, %v", err) + } + + // Start the event store + if err = eventStore.Start(); err != nil { + t.Fatalf("unable to start event store, %v", err) + } + + t.Cleanup(func() { + // Stop the event store + if err = eventStore.Stop(); err != nil { + t.Fatalf("unable to stop event store gracefully, %v", err) + } + }) + + numTxs := 10 + txs := generateTestTransactions(numTxs) + + for _, tx := range txs { + if err = eventStore.Append(tx); err != nil { + t.Fatalf("unable to store transaction, %v", err) + } + } + + // Make sure the file group's size is valid + if eventStore.group.ReadGroupInfo().TotalSize == 0 { + t.Fatalf("invalid group size") + } + + // Open file for reading + scanner := bufio.NewScanner(headFile) + + linesRead := 0 + for scanner.Scan() { + line := scanner.Bytes() + + var txRes types.TxResult + if err = amino.UnmarshalJSON(line, &txRes); err != nil { + t.Fatalf("unable to read store line") + } + + assert.Equal(t, txs[linesRead], txRes) + + linesRead++ + } + + assert.Equal(t, numTxs, linesRead) +} diff --git a/tm2/pkg/bft/state/eventstore/mock_test.go b/tm2/pkg/bft/state/eventstore/mock_test.go new file mode 100644 index 00000000000..087d8f6e3e9 --- /dev/null +++ b/tm2/pkg/bft/state/eventstore/mock_test.go @@ -0,0 +1,89 @@ +package eventstore + +import ( + "github.com/gnolang/gno/tm2/pkg/bft/types" + "github.com/gnolang/gno/tm2/pkg/events" + "github.com/gnolang/gno/tm2/pkg/service" +) + +// TxEventStore // + +type ( + startDelegate func() error + stopDelegate func() error + getTypeDelegate func() string + appendDelegate func(types.TxResult) error +) + +type mockEventStore struct { + startFn startDelegate + stopFn stopDelegate + getTypeFn getTypeDelegate + appendFn appendDelegate +} + +func (m mockEventStore) Start() error { + if m.startFn != nil { + return m.startFn() + } + + return nil +} + +func (m mockEventStore) Stop() error { + if m.stopFn != nil { + return m.stopFn() + } + + return nil +} + +func (m mockEventStore) GetType() string { + if m.getTypeFn != nil { + return m.getTypeFn() + } + + return "" +} + +func (m mockEventStore) Append(result types.TxResult) error { + if m.appendFn != nil { + return m.appendFn(result) + } + + return nil +} + +// EventSwitch // + +type ( + fireEventDelegate func(events.Event) + addListenerDelegate func(string, events.EventCallback) + removeListenerDelegate func(string) +) + +type mockEventSwitch struct { + service.BaseService + + fireEventFn fireEventDelegate + addListenerFn addListenerDelegate + removeListenerFn removeListenerDelegate +} + +func (m *mockEventSwitch) FireEvent(ev events.Event) { + if m.fireEventFn != nil { + m.fireEventFn(ev) + } +} + +func (m *mockEventSwitch) AddListener(listenerID string, cb events.EventCallback) { + if m.addListenerFn != nil { + m.addListenerFn(listenerID, cb) + } +} + +func (m *mockEventSwitch) RemoveListener(listenerID string) { + if m.removeListenerFn != nil { + m.removeListenerFn(listenerID) + } +} diff --git a/tm2/pkg/bft/state/eventstore/null/null.go b/tm2/pkg/bft/state/eventstore/null/null.go new file mode 100644 index 00000000000..40e3566d89e --- /dev/null +++ b/tm2/pkg/bft/state/eventstore/null/null.go @@ -0,0 +1,35 @@ +package null + +import ( + "github.com/gnolang/gno/tm2/pkg/bft/state/eventstore" + "github.com/gnolang/gno/tm2/pkg/bft/types" +) + +var _ eventstore.TxEventStore = (*TxEventStore)(nil) + +const ( + EventStoreType = "none" +) + +// TxEventStore acts as a /dev/null +type TxEventStore struct{} + +func NewNullEventStore() *TxEventStore { + return &TxEventStore{} +} + +func (t TxEventStore) Start() error { + return nil +} + +func (t TxEventStore) Stop() error { + return nil +} + +func (t TxEventStore) Append(_ types.TxResult) error { + return nil +} + +func (t TxEventStore) GetType() string { + return EventStoreType +} diff --git a/tm2/pkg/bft/state/eventstore/store.go b/tm2/pkg/bft/state/eventstore/store.go new file mode 100644 index 00000000000..10ef9eefc9b --- /dev/null +++ b/tm2/pkg/bft/state/eventstore/store.go @@ -0,0 +1,24 @@ +package eventstore + +import "github.com/gnolang/gno/tm2/pkg/bft/types" + +const ( + StatusOn = "on" + StatusOff = "off" +) + +// TxEventStore stores transaction events for later processing +type TxEventStore interface { + // Start starts the transaction event store + Start() error + + // Stop stops the transaction event store + Stop() error + + // GetType returns the event store type + GetType() string + + // Append analyzes and appends a single transaction + // to the event store + Append(result types.TxResult) error +} diff --git a/tm2/pkg/bft/state/eventstore/store_service.go b/tm2/pkg/bft/state/eventstore/store_service.go new file mode 100644 index 00000000000..d6ed40c4151 --- /dev/null +++ b/tm2/pkg/bft/state/eventstore/store_service.go @@ -0,0 +1,84 @@ +package eventstore + +import ( + "context" + "fmt" + + "github.com/gnolang/gno/tm2/pkg/bft/types" + "github.com/gnolang/gno/tm2/pkg/events" + "github.com/gnolang/gno/tm2/pkg/service" +) + +// Service connects the event bus and event store together in order +// to store events coming from event bus +type Service struct { + service.BaseService + + cancelFn context.CancelFunc + + txEventStore TxEventStore + evsw events.EventSwitch +} + +// NewEventStoreService returns a new service instance +func NewEventStoreService(idr TxEventStore, evsw events.EventSwitch) *Service { + is := &Service{txEventStore: idr, evsw: evsw} + is.BaseService = *service.NewBaseService(nil, "EventStoreService", is) + + return is +} + +func (is *Service) OnStart() error { + // Create a context for the intermediary monitor service + ctx, cancelFn := context.WithCancel(context.Background()) + is.cancelFn = cancelFn + + // Start the event store + if err := is.txEventStore.Start(); err != nil { + return fmt.Errorf("unable to start transaction event store, %w", err) + } + + // Start the intermediary monitor service + go is.monitorTxEvents(ctx) + + return nil +} + +func (is *Service) OnStop() { + // Close off any routines + is.cancelFn() + + // Attempt to gracefully stop the event store + if err := is.txEventStore.Stop(); err != nil { + is.Logger.Error( + fmt.Sprintf("unable to gracefully stop event store, %v", err), + ) + } +} + +// monitorTxEvents acts as an intermediary feed service for the supplied +// event store. It relays transaction events that come from the event stream +func (is *Service) monitorTxEvents(ctx context.Context) { + // Create a subscription for transaction events + subCh := events.SubscribeToEvent(is.evsw, "tx-event-store", types.EventTx{}) + + for { + select { + case <-ctx.Done(): + return + case evRaw := <-subCh: + // Cast the event + ev, ok := evRaw.(types.EventTx) + if !ok { + is.Logger.Error("invalid transaction result type cast") + + continue + } + + // Alert the actual tx event store + if err := is.txEventStore.Append(ev.Result); err != nil { + is.Logger.Error("unable to store transaction", "err", err) + } + } + } +} diff --git a/tm2/pkg/bft/state/eventstore/store_service_test.go b/tm2/pkg/bft/state/eventstore/store_service_test.go new file mode 100644 index 00000000000..3fa5e8a7941 --- /dev/null +++ b/tm2/pkg/bft/state/eventstore/store_service_test.go @@ -0,0 +1,159 @@ +package eventstore + +import ( + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/gnolang/gno/tm2/pkg/bft/types" + "github.com/gnolang/gno/tm2/pkg/events" + "github.com/stretchr/testify/assert" +) + +// generateTxEvents generates random transaction events +func generateTxEvents(count int) []types.EventTx { + txEvents := make([]types.EventTx, count) + + for i := 0; i < count; i++ { + txEvents[i] = types.EventTx{ + Result: types.TxResult{}, + } + } + + return txEvents +} + +func TestEventStoreService_Monitor(t *testing.T) { + t.Parallel() + + const defaultTimeout = 5 * time.Second + + var ( + startCalled = false + stopCalled = false + receivedResults = make([]types.TxResult, 0) + receivedSize atomic.Int64 + + cb events.EventCallback + cbSet atomic.Bool + + mockEventStore = &mockEventStore{ + startFn: func() error { + startCalled = true + + return nil + }, + stopFn: func() error { + stopCalled = true + + return nil + }, + appendFn: func(result types.TxResult) error { + receivedResults = append(receivedResults, result) + + // Atomic because we are accessing this size from a routine + receivedSize.Store(int64(len(receivedResults))) + + return nil + }, + } + mockEventSwitch = &mockEventSwitch{ + fireEventFn: func(event events.Event) { + // Exec the callback on event fire + cb(event) + }, + addListenerFn: func(_ string, callback events.EventCallback) { + // Attach callback + cb = callback + + // Atomic because we are accessing this info from a routine + cbSet.Store(true) + }, + } + ) + + // Create a new event store instance + i := NewEventStoreService(mockEventStore, mockEventSwitch) + if i == nil { + t.Fatal("unable to create event store service") + } + + // Start the event store + if err := i.OnStart(); err != nil { + t.Fatalf("unable to start event store, %v", err) + } + + assert.True(t, startCalled) + + t.Cleanup(func() { + // Stop the event store + i.OnStop() + + assert.True(t, stopCalled) + }) + + // Fire off the events so the event store can catch them + numEvents := 1000 + txEvents := generateTxEvents(numEvents) + + var wg sync.WaitGroup + + // Start a routine that asynchronously pushes events + wg.Add(1) + go func() { + defer wg.Done() + + timeout := time.After(defaultTimeout) + + for { + select { + case <-timeout: + return + default: + // If the callback is set, fire the events + if !cbSet.Load() { + // Listener not set yet + continue + } + + for _, event := range txEvents { + mockEventSwitch.FireEvent(event) + } + + return + } + } + }() + + // Start a routine that monitors received results + wg.Add(1) + go func() { + defer wg.Done() + + timeout := time.After(defaultTimeout) + + for { + select { + case <-timeout: + return + default: + if int(receivedSize.Load()) == numEvents { + return + } + } + } + }() + + wg.Wait() + + // Make sure all results were received + if len(receivedResults) != numEvents { + t.Fatalf("invalid number of results received, %d", len(receivedResults)) + } + + // Make sure all results match + for index, event := range txEvents { + assert.Equal(t, event.Result, receivedResults[index]) + } +} diff --git a/tm2/pkg/bft/state/eventstore/types/config.go b/tm2/pkg/bft/state/eventstore/types/config.go new file mode 100644 index 00000000000..08e25870b4d --- /dev/null +++ b/tm2/pkg/bft/state/eventstore/types/config.go @@ -0,0 +1,29 @@ +package types + +import "github.com/gnolang/gno/tm2/pkg/bft/state/eventstore/null" + +// EventStoreParams defines the arbitrary event store config params +type EventStoreParams map[string]any + +// Config defines the specific event store configuration +type Config struct { + EventStoreType string + Params EventStoreParams +} + +// GetParam fetches the specific config param, if any. +// Returns nil if the param is not present +func (c *Config) GetParam(name string) any { + if c.Params != nil { + return c.Params[name] + } + + return nil +} + +// DefaultEventStoreConfig returns the default event store config +func DefaultEventStoreConfig() *Config { + return &Config{ + EventStoreType: null.EventStoreType, + } +} diff --git a/tm2/pkg/bft/state/eventstore/types/config_test.go b/tm2/pkg/bft/state/eventstore/types/config_test.go new file mode 100644 index 00000000000..0f5683b7c61 --- /dev/null +++ b/tm2/pkg/bft/state/eventstore/types/config_test.go @@ -0,0 +1,45 @@ +package types + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestConfig_GetParam(t *testing.T) { + t.Parallel() + + const paramName = "param" + + testTable := []struct { + name string + cfg *Config + + expectedParam any + }{ + { + "param not set", + &Config{}, + nil, + }, + { + "valid param set", + &Config{ + Params: map[string]any{ + paramName: 10, + }, + }, + 10, + }, + } + + for _, testCase := range testTable { + testCase := testCase + + t.Run(testCase.name, func(t *testing.T) { + t.Parallel() + + assert.Equal(t, testCase.expectedParam, testCase.cfg.GetParam(paramName)) + }) + } +} diff --git a/tm2/pkg/bft/state/txindex/indexer.go b/tm2/pkg/bft/state/txindex/indexer.go deleted file mode 100644 index 2b5b4aae220..00000000000 --- a/tm2/pkg/bft/state/txindex/indexer.go +++ /dev/null @@ -1,18 +0,0 @@ -package txindex - -// TxIndexer interface defines methods to index and search transactions. -type TxIndexer interface { /* - // AddBatch analyzes, indexes and stores a batch of transactions. - AddBatch(b *Batch) error - - // Index analyzes, indexes and stores a single transaction. - Index(result *types.TxResult) error - - // Get returns the transaction specified by hash or nil if the transaction is not indexed - // or stored. - Get(hash []byte) (*types.TxResult, error) - - // Search allows you to query for transactions. - Search(q *query.Query) ([]*types.TxResult, error) - */ -} diff --git a/tm2/pkg/bft/state/txindex/indexer_service.go b/tm2/pkg/bft/state/txindex/indexer_service.go deleted file mode 100644 index fb5c3068ae4..00000000000 --- a/tm2/pkg/bft/state/txindex/indexer_service.go +++ /dev/null @@ -1,31 +0,0 @@ -package txindex - -import ( - "github.com/gnolang/gno/tm2/pkg/events" - "github.com/gnolang/gno/tm2/pkg/service" -) - -// IndexerService connects event bus and transaction indexer together in order -// to index transactions coming from event bus. -type IndexerService struct { - service.BaseService - - idr TxIndexer - evsw events.EventSwitch -} - -// NewIndexerService returns a new service instance. -func NewIndexerService(idr TxIndexer, evsw events.EventSwitch) *IndexerService { - is := &IndexerService{idr: idr, evsw: evsw} - is.BaseService = *service.NewBaseService(nil, "IndexerService", is) - return is -} - -func (is *IndexerService) OnStart() error { - // TODO - return nil -} - -func (is *IndexerService) OnStop() { - // TODO -} diff --git a/tm2/pkg/bft/state/txindex/null/null.go b/tm2/pkg/bft/state/txindex/null/null.go deleted file mode 100644 index ed90013b9a9..00000000000 --- a/tm2/pkg/bft/state/txindex/null/null.go +++ /dev/null @@ -1,31 +0,0 @@ -package null - -import ( - "github.com/gnolang/gno/tm2/pkg/bft/state/txindex" -) - -var _ txindex.TxIndexer = (*TxIndex)(nil) - -// TxIndex acts as a /dev/null. -type TxIndex struct{} - -/* -// Get on a TxIndex is disabled and panics when invoked. -func (txi *TxIndex) Get(hash []byte) (*types.TxResult, error) { - return nil, errors.New(`Indexing is disabled (set 'tx_index = "kv"' in config)`) -} - -// AddBatch is a noop and always returns nil. -func (txi *TxIndex) AddBatch(batch *txindex.Batch) error { - return nil -} - -// Index is a noop and always returns nil. -func (txi *TxIndex) Index(result *types.TxResult) error { - return nil -} - -func (txi *TxIndex) Search(q *query.Query) ([]*types.TxResult, error) { - return []*types.TxResult{}, nil -} -*/ diff --git a/tm2/pkg/p2p/node_info.go b/tm2/pkg/p2p/node_info.go index 9653a83c38a..48ba8f7776b 100644 --- a/tm2/pkg/p2p/node_info.go +++ b/tm2/pkg/p2p/node_info.go @@ -3,6 +3,7 @@ package p2p import ( "fmt" + "github.com/gnolang/gno/tm2/pkg/bft/state/eventstore" "github.com/gnolang/gno/tm2/pkg/strings" "github.com/gnolang/gno/tm2/pkg/versionset" ) @@ -100,7 +101,7 @@ func (info NodeInfo) Validate() error { other := info.Other txIndex := other.TxIndex switch txIndex { - case "", "on", "off": + case "", eventstore.StatusOn, eventstore.StatusOff: default: return fmt.Errorf("info.Other.TxIndex should be either 'on', 'off', or empty string, got '%v'", txIndex) }