From 38fb2f855fe9637cb5d3701f6304da1d58b41d42 Mon Sep 17 00:00:00 2001 From: Milos Zivkovic Date: Mon, 27 Feb 2023 14:23:21 +0100 Subject: [PATCH 01/28] Add base tx indexer logic --- pkgs/bft/node/node.go | 42 +++++---------- pkgs/bft/state/txindex/indexer.go | 21 ++++---- pkgs/bft/state/txindex/indexer_service.go | 65 +++++++++++++++++++++-- pkgs/bft/state/txindex/null/null.go | 18 ++----- 4 files changed, 88 insertions(+), 58 deletions(-) diff --git a/pkgs/bft/node/node.go b/pkgs/bft/node/node.go index 514f11ecfff..fbc611ae8d8 100644 --- a/pkgs/bft/node/node.go +++ b/pkgs/bft/node/node.go @@ -28,7 +28,7 @@ import ( "github.com/gnolang/gno/pkgs/bft/state/txindex" "github.com/gnolang/gno/pkgs/events" - //"github.com/gnolang/gno/pkgs/bft/state/txindex/kv" + // "github.com/gnolang/gno/pkgs/bft/state/txindex/kv" "github.com/gnolang/gno/pkgs/bft/state/txindex/null" "github.com/gnolang/gno/pkgs/bft/store" "github.com/gnolang/gno/pkgs/bft/types" @@ -43,7 +43,7 @@ import ( verset "github.com/gnolang/gno/pkgs/versionset" ) -//------------------------------------------------------------------------------ +// ------------------------------------------------------------------------------ // DBContext specifies config information for loading a new DB. type DBContext struct { @@ -136,7 +136,7 @@ func CustomReactors(reactors map[string]p2p.Reactor) Option { } } -//------------------------------------------------------------------------------ +// ------------------------------------------------------------------------------ // Node is the highest level interface to a full Tendermint node. // It includes all configuration information and running services. @@ -195,35 +195,21 @@ func createAndStartProxyAppConns(clientCreator proxy.ClientCreator, logger log.L return proxyApp, nil } -func createAndStartIndexerService(config *cfg.Config, dbProvider DBProvider, - evsw events.EventSwitch, logger log.Logger, +func createAndStartIndexerService( + _ *cfg.Config, + _ DBProvider, + evSwitch events.EventSwitch, + logger log.Logger, ) (*txindex.IndexerService, txindex.TxIndexer, error) { - var txIndexer txindex.TxIndexer = &null.TxIndex{} - /* - switch config.TxIndex.Indexer { - case "kv": - store, err := dbProvider(&DBContext{"tx_index", config}) - if err != nil { - return nil, nil, err - } - switch { - case config.TxIndex.IndexTags != "": - txIndexer = kv.NewTxIndex(store, kv.IndexTags(splitAndTrimEmpty(config.TxIndex.IndexTags, ",", " "))) - case config.TxIndex.IndexAllTags: - txIndexer = kv.NewTxIndex(store, kv.IndexAllTags()) - default: - txIndexer = kv.NewTxIndex(store) - } - default: - txIndexer = &null.TxIndex{} - } - */ + // TODO start indexer based on the configuration + txIndexer := &null.TxIndex{} - indexerService := txindex.NewIndexerService(txIndexer, evsw) + indexerService := txindex.NewIndexerService(txIndexer, evSwitch) indexerService.SetLogger(logger.With("module", "txindex")) if err := indexerService.Start(); err != nil { return nil, nil, err } + return indexerService, txIndexer, nil } @@ -821,7 +807,7 @@ func (n *Node) Config() *cfg.Config { return n.config } -//------------------------------------------------------------------------------ +// ------------------------------------------------------------------------------ func (n *Node) Listeners() []string { return []string{ @@ -889,7 +875,7 @@ func makeNodeInfo( return nodeInfo, err } -//------------------------------------------------------------------------------ +// ------------------------------------------------------------------------------ var genesisDocKey = []byte("genesisDoc") diff --git a/pkgs/bft/state/txindex/indexer.go b/pkgs/bft/state/txindex/indexer.go index 2b5b4aae220..877153120c9 100644 --- a/pkgs/bft/state/txindex/indexer.go +++ b/pkgs/bft/state/txindex/indexer.go @@ -1,18 +1,15 @@ package txindex -// TxIndexer interface defines methods to index and search transactions. -type TxIndexer interface { /* - // AddBatch analyzes, indexes and stores a batch of transactions. - AddBatch(b *Batch) error +import "github.com/gnolang/gno/pkgs/bft/types" - // Index analyzes, indexes and stores a single transaction. - Index(result *types.TxResult) error +// TxIndexer indexes transactions for later processing +type TxIndexer interface { + // Start starts the transaction indexer + Start() error - // Get returns the transaction specified by hash or nil if the transaction is not indexed - // or stored. - Get(hash []byte) (*types.TxResult, error) + // Close stops the transaction indexer + Close() error - // Search allows you to query for transactions. - Search(q *query.Query) ([]*types.TxResult, error) - */ + // Index analyzes, indexes and stores a single transaction + Index(result *types.TxResult) error } diff --git a/pkgs/bft/state/txindex/indexer_service.go b/pkgs/bft/state/txindex/indexer_service.go index 7e8b9fb62be..c96cbfb6f27 100644 --- a/pkgs/bft/state/txindex/indexer_service.go +++ b/pkgs/bft/state/txindex/indexer_service.go @@ -1,6 +1,10 @@ package txindex import ( + "context" + "fmt" + + "github.com/gnolang/gno/pkgs/bft/types" "github.com/gnolang/gno/pkgs/events" "github.com/gnolang/gno/pkgs/service" ) @@ -10,22 +14,73 @@ import ( type IndexerService struct { service.BaseService - idr TxIndexer - evsw events.EventSwitch + cancelFn context.CancelFunc + + indexer TxIndexer + evSwitch events.EventSwitch } // NewIndexerService returns a new service instance. func NewIndexerService(idr TxIndexer, evsw events.EventSwitch) *IndexerService { - is := &IndexerService{idr: idr, evsw: evsw} + is := &IndexerService{indexer: idr, evSwitch: evsw} is.BaseService = *service.NewBaseService(nil, "IndexerService", is) + return is } func (is *IndexerService) OnStart() error { - // TODO + // Create a context for the intermediary monitor service + ctx, cancelFn := context.WithCancel(context.Background()) + is.cancelFn = cancelFn + + // Start the indexer + if err := is.indexer.Start(); err != nil { + return fmt.Errorf("unable to start transaction indexer, %w", err) + } + + // Start the intermediary monitor service + go is.monitorTxEvents(ctx) + return nil } func (is *IndexerService) OnStop() { - // TODO + // Close off any routines + is.cancelFn() + + // Attempt to gracefully stop the transaction indexer + if err := is.indexer.Close(); err != nil { + is.Logger.Error( + fmt.Sprintf("unable to gracefully stop transaction indexer, %v", err), + ) + } +} + +// monitorTxEvents acts as an intermediary feed service for the supplied +// transaction indexer. It relays transaction events that come from the event stream +func (is *IndexerService) monitorTxEvents(ctx context.Context) { + // Create a subscription for transaction events + subCh := events.SubscribeToEvent(is.evSwitch, "tx-indexer", types.TxResult{}) + + for { + select { + case <-ctx.Done(): + return + case evRaw := <-subCh: + // Cast the event + ev, ok := evRaw.(*types.TxResult) + if !ok { + is.Logger.Error("invalid transaction result type cast") + + continue + } + + // Alert the actual indexer + if err := is.indexer.Index(ev); err != nil { + is.Logger.Error( + fmt.Sprintf("unable to index transaction, %v", err), + ) + } + } + } } diff --git a/pkgs/bft/state/txindex/null/null.go b/pkgs/bft/state/txindex/null/null.go index fe5bc654c0a..caafa6cb250 100644 --- a/pkgs/bft/state/txindex/null/null.go +++ b/pkgs/bft/state/txindex/null/null.go @@ -2,6 +2,7 @@ package null import ( "github.com/gnolang/gno/pkgs/bft/state/txindex" + "github.com/gnolang/gno/pkgs/bft/types" ) var _ txindex.TxIndexer = (*TxIndex)(nil) @@ -9,23 +10,14 @@ var _ txindex.TxIndexer = (*TxIndex)(nil) // TxIndex acts as a /dev/null. type TxIndex struct{} -/* -// Get on a TxIndex is disabled and panics when invoked. -func (txi *TxIndex) Get(hash []byte) (*types.TxResult, error) { - return nil, errors.New(`Indexing is disabled (set 'tx_index = "kv"' in config)`) -} - -// AddBatch is a noop and always returns nil. -func (txi *TxIndex) AddBatch(batch *txindex.Batch) error { +func (t TxIndex) Start() error { return nil } -// Index is a noop and always returns nil. -func (txi *TxIndex) Index(result *types.TxResult) error { +func (t TxIndex) Close() error { return nil } -func (txi *TxIndex) Search(q *query.Query) ([]*types.TxResult, error) { - return []*types.TxResult{}, nil +func (t TxIndex) Index(_ *types.TxResult) error { + return nil } -*/ From ed13499f355af466c3e274ef37b72745bee50b26 Mon Sep 17 00:00:00 2001 From: Milos Zivkovic Date: Mon, 27 Feb 2023 15:58:15 +0100 Subject: [PATCH 02/28] Add file indexer --- cmd/gnoland/main.go | 42 +++++++++++++ pkgs/autofile/group.go | 5 +- pkgs/bft/config/config.go | 5 +- pkgs/bft/node/node.go | 31 +++++++--- pkgs/bft/state/txindex/config/config.go | 30 +++++++++ pkgs/bft/state/txindex/file/file.go | 74 +++++++++++++++++++++++ pkgs/bft/state/txindex/indexer.go | 7 ++- pkgs/bft/state/txindex/indexer_service.go | 8 +-- pkgs/bft/state/txindex/null/null.go | 24 ++++++-- 9 files changed, 201 insertions(+), 25 deletions(-) create mode 100644 pkgs/bft/state/txindex/config/config.go create mode 100644 pkgs/bft/state/txindex/file/file.go diff --git a/cmd/gnoland/main.go b/cmd/gnoland/main.go index 7dab9bcde12..a954ca4d57e 100644 --- a/cmd/gnoland/main.go +++ b/cmd/gnoland/main.go @@ -14,8 +14,11 @@ import ( "github.com/gnolang/gno/pkgs/bft/config" "github.com/gnolang/gno/pkgs/bft/node" "github.com/gnolang/gno/pkgs/bft/privval" + indexercfg "github.com/gnolang/gno/pkgs/bft/state/txindex/config" + "github.com/gnolang/gno/pkgs/bft/state/txindex/file" bft "github.com/gnolang/gno/pkgs/bft/types" "github.com/gnolang/gno/pkgs/crypto" + "github.com/gnolang/gno/pkgs/errors" gno "github.com/gnolang/gno/pkgs/gnolang" "github.com/gnolang/gno/pkgs/log" osm "github.com/gnolang/gno/pkgs/os" @@ -40,6 +43,9 @@ var flags struct { chainID string genesisRemote string rootDir string + + txIndexerType string + txIndexerPath string } func runMain(args []string) error { @@ -51,6 +57,10 @@ func runMain(args []string) error { fs.StringVar(&flags.chainID, "chainid", "dev", "chainid") fs.StringVar(&flags.rootDir, "root-dir", "testdir", "directory for config and data") fs.StringVar(&flags.genesisRemote, "genesis-remote", "localhost:26657", "replacement for '%%REMOTE%%' in genesis") + + fs.StringVar(&flags.txIndexerType, "tx-indexer-type", "none", "type of transaction indexer [none, file]") + fs.StringVar(&flags.txIndexerPath, "tx-indexer-path", "", "path for the file tx-indexer (required if indexer if 'file')") + fs.Parse(args) logger := log.NewTMLogger(log.NewSyncWriter(os.Stdout)) @@ -73,6 +83,14 @@ func runMain(args []string) error { writeGenesisFile(genDoc, genesisFilePath) } + // Initialize the indexer config + indexerCfg, err := getIndexerConfig() + if err != nil { + return fmt.Errorf("unable to parse indexer config, %w", err) + } + + cfg.Indexer = indexerCfg + // create application and node. gnoApp, err := gnoland.NewApp(rootDir, flags.skipFailingGenesisTxs, logger) if err != nil { @@ -103,6 +121,30 @@ func runMain(args []string) error { select {} // run forever } +// getIndexerConfig constructs an indexer config from provided user options +func getIndexerConfig() (*indexercfg.Config, error) { + var cfg *indexercfg.Config + + switch flags.txIndexerType { + case file.IndexerType: + if flags.txIndexerPath == "" { + return nil, errors.New("unspecified file transaction indexer path") + } + + // Fill out the configuration + cfg = &indexercfg.Config{ + IndexerType: file.IndexerType, + Params: map[string]any{ + file.Path: flags.txIndexerPath, + }, + } + default: + cfg = indexercfg.DefaultIndexerConfig() + } + + return cfg, nil +} + // Makes a local test genesis doc with local privValidator. func makeGenesisDoc(pvPub crypto.PubKey) *bft.GenesisDoc { gen := &bft.GenesisDoc{} diff --git a/pkgs/autofile/group.go b/pkgs/autofile/group.go index d55392291ad..3049d2d93b2 100644 --- a/pkgs/autofile/group.go +++ b/pkgs/autofile/group.go @@ -136,11 +136,12 @@ func (g *Group) Wait() { // Close closes the head file. The group must be stopped by this moment. func (g *Group) Close() { - g.FlushAndSync() + _ = g.FlushAndSync() g.mtx.Lock() + defer g.mtx.Unlock() + _ = g.Head.Close() - g.mtx.Unlock() } // HeadSizeLimit returns the current head size limit. diff --git a/pkgs/bft/config/config.go b/pkgs/bft/config/config.go index 317221ca47a..8222ab4235d 100644 --- a/pkgs/bft/config/config.go +++ b/pkgs/bft/config/config.go @@ -9,6 +9,7 @@ import ( cns "github.com/gnolang/gno/pkgs/bft/consensus/config" mem "github.com/gnolang/gno/pkgs/bft/mempool/config" rpc "github.com/gnolang/gno/pkgs/bft/rpc/config" + "github.com/gnolang/gno/pkgs/bft/state/txindex/config" "github.com/gnolang/gno/pkgs/errors" osm "github.com/gnolang/gno/pkgs/os" p2p "github.com/gnolang/gno/pkgs/p2p/config" @@ -24,6 +25,7 @@ type Config struct { P2P *p2p.P2PConfig `toml:"p2p"` Mempool *mem.MempoolConfig `toml:"mempool"` Consensus *cns.ConsensusConfig `toml:"consensus"` + Indexer *config.Config `toml:"indexer"` } // DefaultConfig returns a default configuration for a Tendermint node @@ -34,6 +36,7 @@ func DefaultConfig() *Config { P2P: p2p.DefaultP2PConfig(), Mempool: mem.DefaultMempoolConfig(), Consensus: cns.DefaultConsensusConfig(), + Indexer: config.DefaultIndexerConfig(), } } @@ -121,7 +124,7 @@ func (cfg *Config) ValidateBasic() error { return nil } -//----------------------------------------------------------------------------- +// ----------------------------------------------------------------------------- // BaseConfig const ( diff --git a/pkgs/bft/node/node.go b/pkgs/bft/node/node.go index fbc611ae8d8..9cc25b6d648 100644 --- a/pkgs/bft/node/node.go +++ b/pkgs/bft/node/node.go @@ -12,6 +12,7 @@ import ( "time" "github.com/gnolang/cors" + "github.com/gnolang/gno/pkgs/bft/state/txindex/file" "github.com/gnolang/gno/pkgs/amino" abci "github.com/gnolang/gno/pkgs/bft/abci/types" @@ -196,13 +197,28 @@ func createAndStartProxyAppConns(clientCreator proxy.ClientCreator, logger log.L } func createAndStartIndexerService( - _ *cfg.Config, - _ DBProvider, + cfg *cfg.Config, evSwitch events.EventSwitch, logger log.Logger, ) (*txindex.IndexerService, txindex.TxIndexer, error) { // TODO start indexer based on the configuration - txIndexer := &null.TxIndex{} + var ( + err error + txIndexer txindex.TxIndexer + ) + + // Instantiate the indexer based on the configuration + switch cfg.Indexer.IndexerType { + case file.IndexerType: + // Transaction indexes should be logged to files + txIndexer, err = file.NewTxIndexer(cfg.Indexer) + if err != nil { + return nil, nil, fmt.Errorf("unable to create file tx indexer, %w", err) + } + default: + // Transaction indexing should be omitted + txIndexer = null.NewNullIndexer() + } indexerService := txindex.NewIndexerService(txIndexer, evSwitch) indexerService.SetLogger(logger.With("module", "txindex")) @@ -426,7 +442,7 @@ func NewNode(config *cfg.Config, evsw := events.NewEventSwitch() // Transaction indexing - indexerService, txIndexer, err := createAndStartIndexerService(config, dbProvider, evsw, logger) + indexerService, txIndexer, err := createAndStartIndexerService(config, evsw, logger) if err != nil { return nil, err } @@ -831,12 +847,7 @@ func makeNodeInfo( genDoc *types.GenesisDoc, state sm.State, ) (p2p.NodeInfo, error) { - txIndexerStatus := "on" - if _, ok := txIndexer.(*null.TxIndex); ok { - txIndexerStatus = "off" - } else if txIndexer == nil { - txIndexerStatus = "none" - } + txIndexerStatus := txIndexer.GetType() bcChannel := bc.BlockchainChannel vset := version.VersionSet diff --git a/pkgs/bft/state/txindex/config/config.go b/pkgs/bft/state/txindex/config/config.go new file mode 100644 index 00000000000..e1380fc786c --- /dev/null +++ b/pkgs/bft/state/txindex/config/config.go @@ -0,0 +1,30 @@ +package config + +import "github.com/gnolang/gno/pkgs/bft/state/txindex/null" + +// IndexerParams defines the arbitrary indexer config params +type IndexerParams map[string]any + +// Config defines the specific transaction +// indexer configuration +type Config struct { + IndexerType string + Params IndexerParams +} + +// GetParam fetches the specific config param, if any. +// Returns nil if the param is not present +func (c *Config) GetParam(name string) any { + if c.Params != nil { + return c.Params[name] + } + + return nil +} + +// DefaultIndexerConfig returns the default indexer config +func DefaultIndexerConfig() *Config { + return &Config{ + IndexerType: null.IndexerType, + } +} diff --git a/pkgs/bft/state/txindex/file/file.go b/pkgs/bft/state/txindex/file/file.go new file mode 100644 index 00000000000..3695c71eec2 --- /dev/null +++ b/pkgs/bft/state/txindex/file/file.go @@ -0,0 +1,74 @@ +package file + +import ( + "fmt" + + "github.com/gnolang/gno/pkgs/amino" + "github.com/gnolang/gno/pkgs/autofile" + "github.com/gnolang/gno/pkgs/bft/state/txindex/config" + "github.com/gnolang/gno/pkgs/bft/types" + "github.com/gnolang/gno/pkgs/errors" +) + +const ( + IndexerType = "file-indexer" + Path = "path" +) + +// TxIndexer is the implementation of a transaction indexer +// that outputs to the local filesystem +type TxIndexer struct { + headPath string + group *autofile.Group +} + +// NewTxIndexer creates a new file-based tx indexer +func NewTxIndexer(cfg *config.Config) (*TxIndexer, error) { + // Parse config params + headPath, ok := cfg.GetParam(Path).(string) + if !ok { + return nil, errors.New("missing path param") + } + + return &TxIndexer{ + headPath: headPath, + }, nil +} + +func (t *TxIndexer) Start() error { + // Open the group + group, err := autofile.OpenGroup(t.headPath) + if err != nil { + return fmt.Errorf("unable to open file group for writing, %w", err) + } + + t.group = group + + return nil +} + +func (t *TxIndexer) Stop() error { + // Close off the group + t.group.Close() + + return nil +} + +func (t *TxIndexer) GetType() string { + return IndexerType +} + +func (t *TxIndexer) Index(tx *types.TxResult) error { + // Serialize the transaction using amino:binary + txRaw, err := amino.Marshal(tx) + if err != nil { + return fmt.Errorf("unable to marshal transaction, %w", err) + } + + // Write the serialized transaction info to the file group + if err = t.group.WriteLine(string(txRaw)); err != nil { + return fmt.Errorf("unable to save transaction index, %w", err) + } + + return nil +} diff --git a/pkgs/bft/state/txindex/indexer.go b/pkgs/bft/state/txindex/indexer.go index 877153120c9..cae616a97d7 100644 --- a/pkgs/bft/state/txindex/indexer.go +++ b/pkgs/bft/state/txindex/indexer.go @@ -7,8 +7,11 @@ type TxIndexer interface { // Start starts the transaction indexer Start() error - // Close stops the transaction indexer - Close() error + // Stop stops the transaction indexer + Stop() error + + // GetType returns the indexer type + GetType() string // Index analyzes, indexes and stores a single transaction Index(result *types.TxResult) error diff --git a/pkgs/bft/state/txindex/indexer_service.go b/pkgs/bft/state/txindex/indexer_service.go index c96cbfb6f27..33e4d90a45b 100644 --- a/pkgs/bft/state/txindex/indexer_service.go +++ b/pkgs/bft/state/txindex/indexer_service.go @@ -21,8 +21,8 @@ type IndexerService struct { } // NewIndexerService returns a new service instance. -func NewIndexerService(idr TxIndexer, evsw events.EventSwitch) *IndexerService { - is := &IndexerService{indexer: idr, evSwitch: evsw} +func NewIndexerService(idr TxIndexer, evSwitch events.EventSwitch) *IndexerService { + is := &IndexerService{indexer: idr, evSwitch: evSwitch} is.BaseService = *service.NewBaseService(nil, "IndexerService", is) return is @@ -49,7 +49,7 @@ func (is *IndexerService) OnStop() { is.cancelFn() // Attempt to gracefully stop the transaction indexer - if err := is.indexer.Close(); err != nil { + if err := is.indexer.Stop(); err != nil { is.Logger.Error( fmt.Sprintf("unable to gracefully stop transaction indexer, %v", err), ) @@ -60,7 +60,7 @@ func (is *IndexerService) OnStop() { // transaction indexer. It relays transaction events that come from the event stream func (is *IndexerService) monitorTxEvents(ctx context.Context) { // Create a subscription for transaction events - subCh := events.SubscribeToEvent(is.evSwitch, "tx-indexer", types.TxResult{}) + subCh := events.SubscribeToEvent(is.evSwitch, "tx-indexer", types.EventTx{}) for { select { diff --git a/pkgs/bft/state/txindex/null/null.go b/pkgs/bft/state/txindex/null/null.go index caafa6cb250..5005027f47d 100644 --- a/pkgs/bft/state/txindex/null/null.go +++ b/pkgs/bft/state/txindex/null/null.go @@ -5,19 +5,31 @@ import ( "github.com/gnolang/gno/pkgs/bft/types" ) -var _ txindex.TxIndexer = (*TxIndex)(nil) +var _ txindex.TxIndexer = (*TxIndexer)(nil) -// TxIndex acts as a /dev/null. -type TxIndex struct{} +const ( + IndexerType = "none" +) + +// TxIndexer acts as a /dev/null +type TxIndexer struct{} + +func NewNullIndexer() *TxIndexer { + return &TxIndexer{} +} -func (t TxIndex) Start() error { +func (t TxIndexer) Start() error { return nil } -func (t TxIndex) Close() error { +func (t TxIndexer) Stop() error { return nil } -func (t TxIndex) Index(_ *types.TxResult) error { +func (t TxIndexer) Index(_ *types.TxResult) error { return nil } + +func (t TxIndexer) GetType() string { + return "none" +} From 08360c261dbb9906a57d7a6a2a8f6bd3fdaf1cc7 Mon Sep 17 00:00:00 2001 From: Milos Zivkovic Date: Mon, 27 Feb 2023 16:01:11 +0100 Subject: [PATCH 03/28] Simplify types --- pkgs/bft/state/txindex/file/file.go | 2 +- pkgs/bft/state/txindex/indexer.go | 2 +- pkgs/bft/state/txindex/indexer_service.go | 4 ++-- pkgs/bft/state/txindex/null/null.go | 4 ++-- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/pkgs/bft/state/txindex/file/file.go b/pkgs/bft/state/txindex/file/file.go index 3695c71eec2..158ae16a566 100644 --- a/pkgs/bft/state/txindex/file/file.go +++ b/pkgs/bft/state/txindex/file/file.go @@ -58,7 +58,7 @@ func (t *TxIndexer) GetType() string { return IndexerType } -func (t *TxIndexer) Index(tx *types.TxResult) error { +func (t *TxIndexer) Index(tx types.TxResult) error { // Serialize the transaction using amino:binary txRaw, err := amino.Marshal(tx) if err != nil { diff --git a/pkgs/bft/state/txindex/indexer.go b/pkgs/bft/state/txindex/indexer.go index cae616a97d7..1f197233361 100644 --- a/pkgs/bft/state/txindex/indexer.go +++ b/pkgs/bft/state/txindex/indexer.go @@ -14,5 +14,5 @@ type TxIndexer interface { GetType() string // Index analyzes, indexes and stores a single transaction - Index(result *types.TxResult) error + Index(result types.TxResult) error } diff --git a/pkgs/bft/state/txindex/indexer_service.go b/pkgs/bft/state/txindex/indexer_service.go index 33e4d90a45b..cc9dedc3ab4 100644 --- a/pkgs/bft/state/txindex/indexer_service.go +++ b/pkgs/bft/state/txindex/indexer_service.go @@ -68,7 +68,7 @@ func (is *IndexerService) monitorTxEvents(ctx context.Context) { return case evRaw := <-subCh: // Cast the event - ev, ok := evRaw.(*types.TxResult) + ev, ok := evRaw.(types.EventTx) if !ok { is.Logger.Error("invalid transaction result type cast") @@ -76,7 +76,7 @@ func (is *IndexerService) monitorTxEvents(ctx context.Context) { } // Alert the actual indexer - if err := is.indexer.Index(ev); err != nil { + if err := is.indexer.Index(ev.Result); err != nil { is.Logger.Error( fmt.Sprintf("unable to index transaction, %v", err), ) diff --git a/pkgs/bft/state/txindex/null/null.go b/pkgs/bft/state/txindex/null/null.go index 5005027f47d..262ca222ee4 100644 --- a/pkgs/bft/state/txindex/null/null.go +++ b/pkgs/bft/state/txindex/null/null.go @@ -26,10 +26,10 @@ func (t TxIndexer) Stop() error { return nil } -func (t TxIndexer) Index(_ *types.TxResult) error { +func (t TxIndexer) Index(_ types.TxResult) error { return nil } func (t TxIndexer) GetType() string { - return "none" + return IndexerType } From 367b20d25b3191f89fa286f37596b84ad8a9250d Mon Sep 17 00:00:00 2001 From: Milos Zivkovic Date: Mon, 27 Feb 2023 17:05:06 +0100 Subject: [PATCH 04/28] Add unit test for file tx indexer --- pkgs/bft/node/node.go | 3 +- pkgs/bft/state/txindex/file/file.go | 13 +- pkgs/bft/state/txindex/file/file_test.go | 146 +++++++++++++++++++++++ 3 files changed, 158 insertions(+), 4 deletions(-) create mode 100644 pkgs/bft/state/txindex/file/file_test.go diff --git a/pkgs/bft/node/node.go b/pkgs/bft/node/node.go index 9cc25b6d648..084b2db16fe 100644 --- a/pkgs/bft/node/node.go +++ b/pkgs/bft/node/node.go @@ -201,7 +201,6 @@ func createAndStartIndexerService( evSwitch events.EventSwitch, logger log.Logger, ) (*txindex.IndexerService, txindex.TxIndexer, error) { - // TODO start indexer based on the configuration var ( err error txIndexer txindex.TxIndexer @@ -222,7 +221,7 @@ func createAndStartIndexerService( indexerService := txindex.NewIndexerService(txIndexer, evSwitch) indexerService.SetLogger(logger.With("module", "txindex")) - if err := indexerService.Start(); err != nil { + if err = indexerService.Start(); err != nil { return nil, nil, err } diff --git a/pkgs/bft/state/txindex/file/file.go b/pkgs/bft/state/txindex/file/file.go index 158ae16a566..33a5b5a540a 100644 --- a/pkgs/bft/state/txindex/file/file.go +++ b/pkgs/bft/state/txindex/file/file.go @@ -15,6 +15,11 @@ const ( Path = "path" ) +var ( + errMissingPath = errors.New("missing path param") + errInvalidType = errors.New("invalid config for file indexer specified") +) + // TxIndexer is the implementation of a transaction indexer // that outputs to the local filesystem type TxIndexer struct { @@ -25,9 +30,13 @@ type TxIndexer struct { // NewTxIndexer creates a new file-based tx indexer func NewTxIndexer(cfg *config.Config) (*TxIndexer, error) { // Parse config params + if IndexerType != cfg.IndexerType { + return nil, errInvalidType + } + headPath, ok := cfg.GetParam(Path).(string) if !ok { - return nil, errors.New("missing path param") + return nil, errMissingPath } return &TxIndexer{ @@ -60,7 +69,7 @@ func (t *TxIndexer) GetType() string { func (t *TxIndexer) Index(tx types.TxResult) error { // Serialize the transaction using amino:binary - txRaw, err := amino.Marshal(tx) + txRaw, err := amino.MarshalJSON(tx) if err != nil { return fmt.Errorf("unable to marshal transaction, %w", err) } diff --git a/pkgs/bft/state/txindex/file/file_test.go b/pkgs/bft/state/txindex/file/file_test.go new file mode 100644 index 00000000000..d3e9379fac9 --- /dev/null +++ b/pkgs/bft/state/txindex/file/file_test.go @@ -0,0 +1,146 @@ +package file + +import ( + "bufio" + "testing" + + "github.com/gnolang/gno/pkgs/amino" + "github.com/gnolang/gno/pkgs/bft/state/txindex/config" + "github.com/gnolang/gno/pkgs/bft/types" + "github.com/gnolang/gno/pkgs/testutils" + "github.com/stretchr/testify/assert" +) + +// generateTestTransactions generates random transaction results +func generateTestTransactions(count int) []types.TxResult { + txs := make([]types.TxResult, count) + + for i := 0; i < count; i++ { + txs[i] = types.TxResult{} + } + + return txs +} + +func TestTxIndexer_New(t *testing.T) { + t.Parallel() + + t.Run("invalid file path specified", func(t *testing.T) { + t.Parallel() + + cfg := &config.Config{ + IndexerType: "invalid", + } + + i, err := NewTxIndexer(cfg) + + assert.Nil(t, i) + assert.ErrorIs(t, err, errInvalidType) + }) + + t.Run("invalid file path specified", func(t *testing.T) { + t.Parallel() + + cfg := &config.Config{ + IndexerType: IndexerType, + Params: nil, + } + + i, err := NewTxIndexer(cfg) + + assert.Nil(t, i) + assert.ErrorIs(t, err, errMissingPath) + }) + + t.Run("valid file path specified", func(t *testing.T) { + t.Parallel() + + headPath := "." + + cfg := &config.Config{ + IndexerType: IndexerType, + Params: map[string]any{ + Path: headPath, + }, + } + + i, err := NewTxIndexer(cfg) + if i == nil { + t.Fatalf("unable to create indexer") + } + + assert.NoError(t, err) + assert.Equal(t, headPath, i.headPath) + assert.Equal(t, IndexerType, i.GetType()) + }) +} + +func TestTxIndexer_Index(t *testing.T) { + t.Parallel() + + headFile, cleanup := testutils.NewTestFile(t) + t.Cleanup(func() { + cleanup() + }) + + indexer, err := NewTxIndexer(&config.Config{ + IndexerType: IndexerType, + Params: map[string]any{ + Path: headFile.Name(), + }, + }) + + if err != nil { + t.Fatalf("unable to create tx indexer, %v", err) + } + + // Start the indexer + if err = indexer.Start(); err != nil { + t.Fatalf("unable to start indexer, %v", err) + } + + t.Cleanup(func() { + // Stop the indexer + if err = indexer.Stop(); err != nil { + t.Fatalf("unable to stop indexer gracefully, %v", err) + } + }) + + numTxs := 10 + txs := generateTestTransactions(numTxs) + + for _, tx := range txs { + if err = indexer.Index(tx); err != nil { + t.Fatalf("unable to index transaction, %v", err) + } + + // Force the group to flush + if err = indexer.group.FlushAndSync(); err != nil { + t.Fatalf("unable to flush buffer contents to file, %v", err) + } + } + + // Make sure the file group's size is valid + if indexer.group.ReadGroupInfo().TotalSize == 0 { + t.Fatalf("invalid group size") + } + + // Open file for reading + scanner := bufio.NewScanner(headFile) + + linesRead := 0 + for scanner.Scan() { + line := scanner.Bytes() + + var txRes types.TxResult + if err = amino.UnmarshalJSON(line, &txRes); err != nil { + t.Fatalf("unable to read indexer line") + } + + assert.Equal(t, txs[linesRead], txRes) + + linesRead++ + } + + assert.Equal(t, numTxs, linesRead) +} From 0ce73e525d7068631d4761dbef96391b930a1bc4 Mon Sep 17 00:00:00 2001 From: Milos Zivkovic Date: Mon, 27 Feb 2023 17:09:28 +0100 Subject: [PATCH 05/28] Add unit test for indexer config --- pkgs/bft/state/txindex/config/config_test.go | 45 ++++++++++++++++++++ 1 file changed, 45 insertions(+) create mode 100644 pkgs/bft/state/txindex/config/config_test.go diff --git a/pkgs/bft/state/txindex/config/config_test.go b/pkgs/bft/state/txindex/config/config_test.go new file mode 100644 index 00000000000..19ce61d824b --- /dev/null +++ b/pkgs/bft/state/txindex/config/config_test.go @@ -0,0 +1,45 @@ +package config + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestConfig_GetParam(t *testing.T) { + t.Parallel() + + const paramName = "param" + + testTable := []struct { + name string + cfg *Config + + expectedParam any + }{ + { + "param not set", + &Config{}, + nil, + }, + { + "valid param set", + &Config{ + Params: map[string]any{ + paramName: 10, + }, + }, + 10, + }, + } + + for _, testCase := range testTable { + testCase := testCase + + t.Run(testCase.name, func(t *testing.T) { + t.Parallel() + + assert.Equal(t, testCase.expectedParam, testCase.cfg.GetParam(paramName)) + }) + } +} From 994de46be497b0fbc1faba260f6af812c190283b Mon Sep 17 00:00:00 2001 From: Milos Zivkovic Date: Tue, 28 Feb 2023 11:27:49 +0100 Subject: [PATCH 06/28] Add unit test for the indexer service --- .../bft/state/txindex/indexer_service_test.go | 160 ++++++++++++++++++ pkgs/bft/state/txindex/mock_test.go | 85 ++++++++++ 2 files changed, 245 insertions(+) create mode 100644 pkgs/bft/state/txindex/indexer_service_test.go create mode 100644 pkgs/bft/state/txindex/mock_test.go diff --git a/pkgs/bft/state/txindex/indexer_service_test.go b/pkgs/bft/state/txindex/indexer_service_test.go new file mode 100644 index 00000000000..6849b13dcb2 --- /dev/null +++ b/pkgs/bft/state/txindex/indexer_service_test.go @@ -0,0 +1,160 @@ +package txindex + +import ( + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/gnolang/gno/pkgs/bft/types" + "github.com/gnolang/gno/pkgs/events" + "github.com/stretchr/testify/assert" +) + +// generateTxEvents generates random transaction events +func generateTxEvents(count int) []types.EventTx { + txEvents := make([]types.EventTx, count) + + for i := 0; i < count; i++ { + txEvents[i] = types.EventTx{ + Result: types.TxResult{}, + } + } + + return txEvents +} + +func TestIndexerService_Monitor(t *testing.T) { + t.Parallel() + + const defaultTimeout = 5 * time.Second + + var ( + startCalled = false + stopCalled = false + receivedResults = make([]types.TxResult, 0) + receivedSize atomic.Int64 + + cb events.EventCallback + cbSet atomic.Bool + + mockIndexer = &mockIndexer{ + startFn: func() error { + startCalled = true + + return nil + }, + stopFn: func() error { + stopCalled = true + + return nil + }, + indexFn: func(result types.TxResult) error { + receivedResults = append(receivedResults, result) + + // Atomic because we are accessing this size from a routine + receivedSize.Store(int64(len(receivedResults))) + + return nil + }, + } + mockEventSwitch = &mockEventSwitch{ + fireEventFn: func(event events.Event) { + // Exec the callback on event fire + cb(event) + }, + addListenerFn: func(_ string, callback events.EventCallback) { + // Attach callback + cb = callback + + // Atomic because we are accessing this info from a routine + cbSet.Store(true) + }, + } + ) + + // Create a new indexer instance + i := NewIndexerService(mockIndexer, mockEventSwitch) + if i == nil { + t.Fatal("unable to create indexer service") + } + + // Start the indexer + if err := i.OnStart(); err != nil { + t.Fatalf("unable to start indexer, %v", err) + } + + assert.True(t, startCalled) + + t.Cleanup(func() { + // Stop the indexer + i.OnStop() + + assert.True(t, stopCalled) + }) + + // Fire off the events so the indexer can catch them + + numEvents := 1000 + txEvents := generateTxEvents(numEvents) + + var wg sync.WaitGroup + + // Start a routine that asynchronously pushes events + wg.Add(1) + go func() { + defer wg.Done() + + timeout := time.After(defaultTimeout) + + for { + select { + case <-timeout: + return + default: + // If the callback is set, fire the events + if !cbSet.Load() { + // Listener not set yet + continue + } + + for _, event := range txEvents { + mockEventSwitch.FireEvent(event) + } + + return + } + } + }() + + // Start a routine that monitors received results + wg.Add(1) + go func() { + defer wg.Done() + + timeout := time.After(defaultTimeout) + + for { + select { + case <-timeout: + return + default: + if int(receivedSize.Load()) == numEvents { + return + } + } + } + }() + + wg.Wait() + + // Make sure all results were received + if len(receivedResults) != numEvents { + t.Fatalf("invalid number of results received, %d", len(receivedResults)) + } + + // Make sure all results match + for index, event := range txEvents { + assert.Equal(t, event.Result, receivedResults[index]) + } +} diff --git a/pkgs/bft/state/txindex/mock_test.go b/pkgs/bft/state/txindex/mock_test.go new file mode 100644 index 00000000000..5814a365154 --- /dev/null +++ b/pkgs/bft/state/txindex/mock_test.go @@ -0,0 +1,85 @@ +package txindex + +import ( + "github.com/gnolang/gno/pkgs/bft/types" + "github.com/gnolang/gno/pkgs/events" + "github.com/gnolang/gno/pkgs/service" +) + +// TxIndexer // + +type startDelegate func() error +type stopDelegate func() error +type getTypeDelegate func() string +type indexDelegate func(types.TxResult) error + +type mockIndexer struct { + startFn startDelegate + stopFn stopDelegate + getTypeFn getTypeDelegate + indexFn indexDelegate +} + +func (m mockIndexer) Start() error { + if m.startFn != nil { + return m.startFn() + } + + return nil +} + +func (m mockIndexer) Stop() error { + if m.stopFn != nil { + return m.stopFn() + } + + return nil +} + +func (m mockIndexer) GetType() string { + if m.getTypeFn != nil { + return m.getTypeFn() + } + + return "" +} + +func (m mockIndexer) Index(result types.TxResult) error { + if m.indexFn != nil { + return m.indexFn(result) + } + + return nil +} + +// EventSwitch // + +type fireEventDelegate func(events.Event) +type addListenerDelegate func(string, events.EventCallback) +type removeListenerDelegate func(string) + +type mockEventSwitch struct { + service.BaseService + + fireEventFn fireEventDelegate + addListenerFn addListenerDelegate + removeListenerFn removeListenerDelegate +} + +func (m *mockEventSwitch) FireEvent(ev events.Event) { + if m.fireEventFn != nil { + m.fireEventFn(ev) + } +} + +func (m *mockEventSwitch) AddListener(listenerID string, cb events.EventCallback) { + if m.addListenerFn != nil { + m.addListenerFn(listenerID, cb) + } +} + +func (m *mockEventSwitch) RemoveListener(listenerID string) { + if m.removeListenerFn != nil { + m.removeListenerFn(listenerID) + } +} From 6d21b5454c598f4499cdfb73168d50af74218283 Mon Sep 17 00:00:00 2001 From: Milos Zivkovic Date: Tue, 28 Feb 2023 11:32:18 +0100 Subject: [PATCH 07/28] Add additional docs --- pkgs/bft/state/txindex/file/file.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/pkgs/bft/state/txindex/file/file.go b/pkgs/bft/state/txindex/file/file.go index 33a5b5a540a..23dda88256f 100644 --- a/pkgs/bft/state/txindex/file/file.go +++ b/pkgs/bft/state/txindex/file/file.go @@ -44,6 +44,7 @@ func NewTxIndexer(cfg *config.Config) (*TxIndexer, error) { }, nil } +// Start starts the file transaction indexer, by opening the autofile group func (t *TxIndexer) Start() error { // Open the group group, err := autofile.OpenGroup(t.headPath) @@ -56,6 +57,7 @@ func (t *TxIndexer) Start() error { return nil } +// Stop stops the file transaction indexer, by closing the autofile group func (t *TxIndexer) Stop() error { // Close off the group t.group.Close() @@ -63,12 +65,14 @@ func (t *TxIndexer) Stop() error { return nil } +// GetType returns the file transaction indexer type func (t *TxIndexer) GetType() string { return IndexerType } +// Index marshals the transaction using amino, and writes it to the disk func (t *TxIndexer) Index(tx types.TxResult) error { - // Serialize the transaction using amino:binary + // Serialize the transaction using amino txRaw, err := amino.MarshalJSON(tx) if err != nil { return fmt.Errorf("unable to marshal transaction, %w", err) From ccdd6fd9b6f8fd5e267d64da6fadbe3282d07c5e Mon Sep 17 00:00:00 2001 From: Milos Zivkovic Date: Tue, 28 Feb 2023 12:15:42 +0100 Subject: [PATCH 08/28] Fix help output --- cmd/gnoland/main.go | 5 +++-- pkgs/bft/state/txindex/file/file.go | 2 +- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/cmd/gnoland/main.go b/cmd/gnoland/main.go index a954ca4d57e..2ef6eb699da 100644 --- a/cmd/gnoland/main.go +++ b/cmd/gnoland/main.go @@ -16,6 +16,7 @@ import ( "github.com/gnolang/gno/pkgs/bft/privval" indexercfg "github.com/gnolang/gno/pkgs/bft/state/txindex/config" "github.com/gnolang/gno/pkgs/bft/state/txindex/file" + "github.com/gnolang/gno/pkgs/bft/state/txindex/null" bft "github.com/gnolang/gno/pkgs/bft/types" "github.com/gnolang/gno/pkgs/crypto" "github.com/gnolang/gno/pkgs/errors" @@ -58,8 +59,8 @@ func runMain(args []string) error { fs.StringVar(&flags.rootDir, "root-dir", "testdir", "directory for config and data") fs.StringVar(&flags.genesisRemote, "genesis-remote", "localhost:26657", "replacement for '%%REMOTE%%' in genesis") - fs.StringVar(&flags.txIndexerType, "tx-indexer-type", "none", "type of transaction indexer [none, file]") - fs.StringVar(&flags.txIndexerPath, "tx-indexer-path", "", "path for the file tx-indexer (required if indexer if 'file')") + fs.StringVar(&flags.txIndexerType, "tx-indexer-type", "none", fmt.Sprintf("type of transaction indexer [%s, %s]", null.IndexerType, file.IndexerType)) + fs.StringVar(&flags.txIndexerPath, "tx-indexer-path", "", fmt.Sprintf("path for the file tx-indexer (required if indexer if '%s')", file.IndexerType)) fs.Parse(args) diff --git a/pkgs/bft/state/txindex/file/file.go b/pkgs/bft/state/txindex/file/file.go index 23dda88256f..4994450e76d 100644 --- a/pkgs/bft/state/txindex/file/file.go +++ b/pkgs/bft/state/txindex/file/file.go @@ -11,7 +11,7 @@ import ( ) const ( - IndexerType = "file-indexer" + IndexerType = "file" Path = "path" ) From 5c82d82e055889927af60c18c097b8f35eff6492 Mon Sep 17 00:00:00 2001 From: Milos Zivkovic Date: Tue, 28 Feb 2023 12:22:49 +0100 Subject: [PATCH 09/28] Update node info for the indexer --- pkgs/bft/node/node.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/pkgs/bft/node/node.go b/pkgs/bft/node/node.go index 084b2db16fe..e98eb5c41c9 100644 --- a/pkgs/bft/node/node.go +++ b/pkgs/bft/node/node.go @@ -846,7 +846,10 @@ func makeNodeInfo( genDoc *types.GenesisDoc, state sm.State, ) (p2p.NodeInfo, error) { - txIndexerStatus := txIndexer.GetType() + txIndexerStatus := "off" + if txIndexer.GetType() != null.IndexerType { + txIndexerStatus = "on" + } bcChannel := bc.BlockchainChannel vset := version.VersionSet From e612c06426b27114b1ced522be8d8e17aacc6da1 Mon Sep 17 00:00:00 2001 From: Milos Zivkovic Date: Tue, 28 Feb 2023 12:38:30 +0100 Subject: [PATCH 10/28] Add flush operation to indexer --- pkgs/bft/state/txindex/file/file.go | 5 +++++ pkgs/bft/state/txindex/file/file_test.go | 5 ----- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/pkgs/bft/state/txindex/file/file.go b/pkgs/bft/state/txindex/file/file.go index 4994450e76d..79254ef8f60 100644 --- a/pkgs/bft/state/txindex/file/file.go +++ b/pkgs/bft/state/txindex/file/file.go @@ -83,5 +83,10 @@ func (t *TxIndexer) Index(tx types.TxResult) error { return fmt.Errorf("unable to save transaction index, %w", err) } + // Flush output to storage + if err := t.group.FlushAndSync(); err != nil { + return fmt.Errorf("unable to flush and sync transaction index, %w", err) + } + return nil } diff --git a/pkgs/bft/state/txindex/file/file_test.go b/pkgs/bft/state/txindex/file/file_test.go index d3e9379fac9..48093b19be8 100644 --- a/pkgs/bft/state/txindex/file/file_test.go +++ b/pkgs/bft/state/txindex/file/file_test.go @@ -113,11 +113,6 @@ func TestTxIndexer_Index(t *testing.T) { if err = indexer.Index(tx); err != nil { t.Fatalf("unable to index transaction, %v", err) } - - // Force the group to flush - if err = indexer.group.FlushAndSync(); err != nil { - t.Fatalf("unable to flush buffer contents to file, %v", err) - } } // Make sure the file group's size is valid From b824a60ddca964e39cc97d49e7a77ba6b0f7c931 Mon Sep 17 00:00:00 2001 From: Milos Zivkovic Date: Tue, 28 Feb 2023 13:20:02 +0100 Subject: [PATCH 11/28] Add indexer config to test files --- pkgs/bft/config/config.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkgs/bft/config/config.go b/pkgs/bft/config/config.go index 8222ab4235d..24c54126d4f 100644 --- a/pkgs/bft/config/config.go +++ b/pkgs/bft/config/config.go @@ -76,6 +76,7 @@ func TestConfig() *Config { P2P: p2p.TestP2PConfig(), Mempool: mem.TestMempoolConfig(), Consensus: cns.TestConsensusConfig(), + Indexer: config.DefaultIndexerConfig(), } } From 2b9d8f3ae19479c70e278ee207478c48db6e7329 Mon Sep 17 00:00:00 2001 From: Milos Zivkovic Date: Tue, 28 Feb 2023 13:27:54 +0100 Subject: [PATCH 12/28] Add constant default value for indexer type --- cmd/gnoland/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/gnoland/main.go b/cmd/gnoland/main.go index 2ef6eb699da..d1ca733816b 100644 --- a/cmd/gnoland/main.go +++ b/cmd/gnoland/main.go @@ -59,7 +59,7 @@ func runMain(args []string) error { fs.StringVar(&flags.rootDir, "root-dir", "testdir", "directory for config and data") fs.StringVar(&flags.genesisRemote, "genesis-remote", "localhost:26657", "replacement for '%%REMOTE%%' in genesis") - fs.StringVar(&flags.txIndexerType, "tx-indexer-type", "none", fmt.Sprintf("type of transaction indexer [%s, %s]", null.IndexerType, file.IndexerType)) + fs.StringVar(&flags.txIndexerType, "tx-indexer-type", null.IndexerType, fmt.Sprintf("type of transaction indexer [%s, %s]", null.IndexerType, file.IndexerType)) fs.StringVar(&flags.txIndexerPath, "tx-indexer-path", "", fmt.Sprintf("path for the file tx-indexer (required if indexer if '%s')", file.IndexerType)) fs.Parse(args) From 7387d4af932f350043da20a4129b760dee95f50a Mon Sep 17 00:00:00 2001 From: Milos Zivkovic Date: Thu, 2 Mar 2023 11:20:19 +0100 Subject: [PATCH 13/28] Change binding to work on Macs for some reason --- pkgs/bft/rpc/test/helpers.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkgs/bft/rpc/test/helpers.go b/pkgs/bft/rpc/test/helpers.go index 66fa5adf66e..7820ef31e0c 100644 --- a/pkgs/bft/rpc/test/helpers.go +++ b/pkgs/bft/rpc/test/helpers.go @@ -65,8 +65,8 @@ func createConfig() *cfg.Config { c := cfg.ResetTestRoot(pathname) // and we use random ports to run in parallel - c.P2P.ListenAddress = "tcp://127.0.0.2:0" - c.RPC.ListenAddress = "tcp://127.0.0.2:0" + c.P2P.ListenAddress = "tcp://127.0.0.1:0" + c.RPC.ListenAddress = "tcp://127.0.0.1:0" c.RPC.CORSAllowedOrigins = []string{"https://tendermint.com/"} // c.TxIndex.IndexTags = "app.creator,tx.height" // see kvstore application return c From 90e69c264fbfa9ef8427ff7aa22f11c1dc503390 Mon Sep 17 00:00:00 2001 From: Milos Zivkovic Date: Mon, 15 May 2023 14:54:23 +0200 Subject: [PATCH 14/28] Remove leftover .gno files --- examples/gno.land/r/demo/milos20/milos20.gno | 13 ------------- 1 file changed, 13 deletions(-) delete mode 100644 examples/gno.land/r/demo/milos20/milos20.gno diff --git a/examples/gno.land/r/demo/milos20/milos20.gno b/examples/gno.land/r/demo/milos20/milos20.gno deleted file mode 100644 index 0aedcd3f867..00000000000 --- a/examples/gno.land/r/demo/milos20/milos20.gno +++ /dev/null @@ -1,13 +0,0 @@ -package milos20 - -var ( - name string -) - -func init() { - name = "Milos" -} - -func GetName() string { - return name -} From 9df0ce32d15142d007ec66c7bf22bc4e033953b9 Mon Sep 17 00:00:00 2001 From: Milos Zivkovic Date: Mon, 15 May 2023 15:14:26 +0200 Subject: [PATCH 15/28] Run gofumpt --- tm2/pkg/bft/state/txindex/file/file_test.go | 1 - tm2/pkg/bft/state/txindex/mock_test.go | 18 +++++++++++------- 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/tm2/pkg/bft/state/txindex/file/file_test.go b/tm2/pkg/bft/state/txindex/file/file_test.go index 3d02c96d6d5..bb86dcc8867 100644 --- a/tm2/pkg/bft/state/txindex/file/file_test.go +++ b/tm2/pkg/bft/state/txindex/file/file_test.go @@ -89,7 +89,6 @@ func TestTxIndexer_Index(t *testing.T) { Path: headFile.Name(), }, }) - if err != nil { t.Fatalf("unable to create tx indexer, %v", err) } diff --git a/tm2/pkg/bft/state/txindex/mock_test.go b/tm2/pkg/bft/state/txindex/mock_test.go index 68ff3c95666..2e60a321e47 100644 --- a/tm2/pkg/bft/state/txindex/mock_test.go +++ b/tm2/pkg/bft/state/txindex/mock_test.go @@ -8,10 +8,12 @@ import ( // TxIndexer // -type startDelegate func() error -type stopDelegate func() error -type getTypeDelegate func() string -type indexDelegate func(types.TxResult) error +type ( + startDelegate func() error + stopDelegate func() error + getTypeDelegate func() string + indexDelegate func(types.TxResult) error +) type mockIndexer struct { startFn startDelegate @@ -54,9 +56,11 @@ func (m mockIndexer) Index(result types.TxResult) error { // EventSwitch // -type fireEventDelegate func(events.Event) -type addListenerDelegate func(string, events.EventCallback) -type removeListenerDelegate func(string) +type ( + fireEventDelegate func(events.Event) + addListenerDelegate func(string, events.EventCallback) + removeListenerDelegate func(string) +) type mockEventSwitch struct { service.BaseService From c09a8317a7e28604298fff2163e987b53f3d8955 Mon Sep 17 00:00:00 2001 From: Milos Zivkovic Date: Tue, 16 May 2023 12:51:40 +0200 Subject: [PATCH 16/28] Simplify indexer type listing --- gno.land/cmd/gnoland/main.go | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/gno.land/cmd/gnoland/main.go b/gno.land/cmd/gnoland/main.go index 0002b07566c..30f629c29ac 100644 --- a/gno.land/cmd/gnoland/main.go +++ b/gno.land/cmd/gnoland/main.go @@ -117,7 +117,16 @@ func (c *gnolandCfg) RegisterFlags(fs *flag.FlagSet) { &c.txIndexerType, "tx-indexer-type", null.IndexerType, - fmt.Sprintf("type of transaction indexer [%s, %s]", null.IndexerType, file.IndexerType), + fmt.Sprintf( + "type of transaction indexer [%s]", + strings.Join( + []string{ + null.IndexerType, + file.IndexerType, + }, + ", ", + ), + ), ) fs.StringVar( From dee780653e66bb54364e55559313f89764d1b45b Mon Sep 17 00:00:00 2001 From: Milos Zivkovic Date: Tue, 16 May 2023 12:54:58 +0200 Subject: [PATCH 17/28] Add error logging for group methods --- tm2/pkg/autofile/group.go | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/tm2/pkg/autofile/group.go b/tm2/pkg/autofile/group.go index 2f917dc6d5a..189d7818bd8 100644 --- a/tm2/pkg/autofile/group.go +++ b/tm2/pkg/autofile/group.go @@ -125,7 +125,11 @@ func (g *Group) OnStart() error { // OnStop implements service.Service by stopping the goroutine described above. // NOTE: g.Head must be closed separately using Close. func (g *Group) OnStop() { - g.FlushAndSync() + if err := g.FlushAndSync(); err != nil { + g.Logger.Error( + fmt.Sprintf("unable to gracefully flush data, %s", err.Error()), + ) + } } // Wait blocks until all internal goroutines are finished. Supposed to be @@ -136,12 +140,20 @@ func (g *Group) Wait() { // Close closes the head file. The group must be stopped by this moment. func (g *Group) Close() { - _ = g.FlushAndSync() + if err := g.FlushAndSync(); err != nil { + g.Logger.Error( + fmt.Sprintf("unable to gracefully flush data, %s", err.Error()), + ) + } g.mtx.Lock() defer g.mtx.Unlock() - _ = g.Head.Close() + if err := g.Head.Close(); err != nil { + g.Logger.Error( + fmt.Sprintf("unable to gracefully close group head, %s", err.Error()), + ) + } } // HeadSizeLimit returns the current head size limit. From 8f13f4e39e4882ea9a7e4490349edb0c58d29a4b Mon Sep 17 00:00:00 2001 From: Milos Zivkovic Date: Tue, 16 May 2023 12:57:10 +0200 Subject: [PATCH 18/28] Add constants for indexer status --- tm2/pkg/bft/node/node.go | 4 ++-- tm2/pkg/p2p/node_info.go | 7 ++++++- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/tm2/pkg/bft/node/node.go b/tm2/pkg/bft/node/node.go index 1391540c9e2..711ea2f5503 100644 --- a/tm2/pkg/bft/node/node.go +++ b/tm2/pkg/bft/node/node.go @@ -844,9 +844,9 @@ func makeNodeInfo( genDoc *types.GenesisDoc, state sm.State, ) (p2p.NodeInfo, error) { - txIndexerStatus := "off" + txIndexerStatus := p2p.IndexerStatusOff if txIndexer.GetType() != null.IndexerType { - txIndexerStatus = "on" + txIndexerStatus = p2p.IndexerStatusOn } bcChannel := bc.BlockchainChannel diff --git a/tm2/pkg/p2p/node_info.go b/tm2/pkg/p2p/node_info.go index 9653a83c38a..4d1498414f0 100644 --- a/tm2/pkg/p2p/node_info.go +++ b/tm2/pkg/p2p/node_info.go @@ -12,6 +12,11 @@ const ( maxNumChannels = 16 // plenty of room for upgrades, for now ) +const ( + IndexerStatusOn = "on" + IndexerStatusOff = "off" +) + // Max size of the NodeInfo struct func MaxNodeInfoSize() int { return maxNodeInfoSize @@ -100,7 +105,7 @@ func (info NodeInfo) Validate() error { other := info.Other txIndex := other.TxIndex switch txIndex { - case "", "on", "off": + case "", IndexerStatusOn, IndexerStatusOff: default: return fmt.Errorf("info.Other.TxIndex should be either 'on', 'off', or empty string, got '%v'", txIndex) } From 2c5aaa64c80e5edd8259cb34d9360529f49528d2 Mon Sep 17 00:00:00 2001 From: Milos Zivkovic Date: Tue, 16 May 2023 13:00:34 +0200 Subject: [PATCH 19/28] Change import alias --- tm2/pkg/bft/config/config.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tm2/pkg/bft/config/config.go b/tm2/pkg/bft/config/config.go index 9159b35bea1..f16627134a7 100644 --- a/tm2/pkg/bft/config/config.go +++ b/tm2/pkg/bft/config/config.go @@ -9,7 +9,7 @@ import ( cns "github.com/gnolang/gno/tm2/pkg/bft/consensus/config" mem "github.com/gnolang/gno/tm2/pkg/bft/mempool/config" rpc "github.com/gnolang/gno/tm2/pkg/bft/rpc/config" - "github.com/gnolang/gno/tm2/pkg/bft/state/txindex/config" + txindex "github.com/gnolang/gno/tm2/pkg/bft/state/txindex/config" "github.com/gnolang/gno/tm2/pkg/errors" osm "github.com/gnolang/gno/tm2/pkg/os" p2p "github.com/gnolang/gno/tm2/pkg/p2p/config" @@ -25,7 +25,7 @@ type Config struct { P2P *p2p.P2PConfig `toml:"p2p"` Mempool *mem.MempoolConfig `toml:"mempool"` Consensus *cns.ConsensusConfig `toml:"consensus"` - Indexer *config.Config `toml:"indexer"` + Indexer *txindex.Config `toml:"indexer"` } // DefaultConfig returns a default configuration for a Tendermint node @@ -36,7 +36,7 @@ func DefaultConfig() *Config { P2P: p2p.DefaultP2PConfig(), Mempool: mem.DefaultMempoolConfig(), Consensus: cns.DefaultConsensusConfig(), - Indexer: config.DefaultIndexerConfig(), + Indexer: txindex.DefaultIndexerConfig(), } } @@ -76,7 +76,7 @@ func TestConfig() *Config { P2P: p2p.TestP2PConfig(), Mempool: mem.TestMempoolConfig(), Consensus: cns.TestConsensusConfig(), - Indexer: config.DefaultIndexerConfig(), + Indexer: txindex.DefaultIndexerConfig(), } } From 86c11c9f855f8fde945374e19a357f48dd06a041 Mon Sep 17 00:00:00 2001 From: Milos Zivkovic Date: Tue, 16 May 2023 13:02:31 +0200 Subject: [PATCH 20/28] Standardize error logging in the indexer service --- tm2/pkg/bft/state/txindex/indexer_service.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/tm2/pkg/bft/state/txindex/indexer_service.go b/tm2/pkg/bft/state/txindex/indexer_service.go index 755d999615b..3cadf0328db 100644 --- a/tm2/pkg/bft/state/txindex/indexer_service.go +++ b/tm2/pkg/bft/state/txindex/indexer_service.go @@ -77,9 +77,7 @@ func (is *IndexerService) monitorTxEvents(ctx context.Context) { // Alert the actual indexer if err := is.indexer.Index(ev.Result); err != nil { - is.Logger.Error( - fmt.Sprintf("unable to index transaction, %v", err), - ) + is.Logger.Error("unable to index transaction", "err", err) } } } From 443953d0e55302e5aa22ccfe50423b3ede5fb9c6 Mon Sep 17 00:00:00 2001 From: Milos Zivkovic Date: Fri, 7 Jul 2023 11:47:52 +0200 Subject: [PATCH 21/28] Rename indexer -> event store --- gno.land/cmd/gnoland/main.go | 52 ++++----- tm2/pkg/bft/config/config.go | 36 +++---- tm2/pkg/bft/node/node.go | 100 +++++++++--------- tm2/pkg/bft/rpc/core/pipe.go | 12 +-- .../{txindex => eventstore}/file/file.go | 40 +++---- .../{txindex => eventstore}/file/file_test.go | 56 +++++----- tm2/pkg/bft/state/eventstore/indexer.go | 18 ++++ .../bft/state/eventstore/indexer_service.go | 84 +++++++++++++++ .../indexer_service_test.go | 21 ++-- .../{txindex => eventstore}/mock_test.go | 14 +-- tm2/pkg/bft/state/eventstore/null/null.go | 35 ++++++ tm2/pkg/bft/state/eventstore/types/config.go | 29 +++++ .../types}/config_test.go | 2 +- tm2/pkg/bft/state/txindex/config/config.go | 30 ------ tm2/pkg/bft/state/txindex/indexer.go | 18 ---- tm2/pkg/bft/state/txindex/indexer_service.go | 84 --------------- tm2/pkg/bft/state/txindex/null/null.go | 35 ------ 17 files changed, 332 insertions(+), 334 deletions(-) rename tm2/pkg/bft/state/{txindex => eventstore}/file/file.go (54%) rename tm2/pkg/bft/state/{txindex => eventstore}/file/file_test.go (59%) create mode 100644 tm2/pkg/bft/state/eventstore/indexer.go create mode 100644 tm2/pkg/bft/state/eventstore/indexer_service.go rename tm2/pkg/bft/state/{txindex => eventstore}/indexer_service_test.go (86%) rename tm2/pkg/bft/state/{txindex => eventstore}/mock_test.go (84%) create mode 100644 tm2/pkg/bft/state/eventstore/null/null.go create mode 100644 tm2/pkg/bft/state/eventstore/types/config.go rename tm2/pkg/bft/state/{txindex/config => eventstore/types}/config_test.go (97%) delete mode 100644 tm2/pkg/bft/state/txindex/config/config.go delete mode 100644 tm2/pkg/bft/state/txindex/indexer.go delete mode 100644 tm2/pkg/bft/state/txindex/indexer_service.go delete mode 100644 tm2/pkg/bft/state/txindex/null/null.go diff --git a/gno.land/cmd/gnoland/main.go b/gno.land/cmd/gnoland/main.go index 072a22651ee..2897bb44e97 100644 --- a/gno.land/cmd/gnoland/main.go +++ b/gno.land/cmd/gnoland/main.go @@ -21,9 +21,9 @@ import ( "github.com/gnolang/gno/tm2/pkg/bft/config" "github.com/gnolang/gno/tm2/pkg/bft/node" "github.com/gnolang/gno/tm2/pkg/bft/privval" - indexercfg "github.com/gnolang/gno/tm2/pkg/bft/state/txindex/config" - "github.com/gnolang/gno/tm2/pkg/bft/state/txindex/file" - "github.com/gnolang/gno/tm2/pkg/bft/state/txindex/null" + "github.com/gnolang/gno/tm2/pkg/bft/state/eventstore/file" + "github.com/gnolang/gno/tm2/pkg/bft/state/eventstore/null" + eventstorecfg "github.com/gnolang/gno/tm2/pkg/bft/state/eventstore/types" bft "github.com/gnolang/gno/tm2/pkg/bft/types" "github.com/gnolang/gno/tm2/pkg/commands" "github.com/gnolang/gno/tm2/pkg/crypto" @@ -44,8 +44,8 @@ type gnolandCfg struct { genesisMaxVMCycles int64 config string - txIndexerType string - txIndexerPath string + txEventStoreType string + txEventStorePath string } func main() { @@ -138,15 +138,15 @@ func (c *gnolandCfg) RegisterFlags(fs *flag.FlagSet) { ) fs.StringVar( - &c.txIndexerType, - "tx-indexer-type", - null.IndexerType, + &c.txEventStoreType, + "tx-event-store-type", + null.EventStoreType, fmt.Sprintf( - "type of transaction indexer [%s]", + "type of transaction event store [%s]", strings.Join( []string{ - null.IndexerType, - file.IndexerType, + null.EventStoreType, + file.EventStoreType, }, ", ", ), @@ -154,10 +154,10 @@ func (c *gnolandCfg) RegisterFlags(fs *flag.FlagSet) { ) fs.StringVar( - &c.txIndexerPath, - "tx-indexer-path", + &c.txEventStorePath, + "tx-event-store-path", "", - fmt.Sprintf("path for the file tx-indexer (required if indexer if '%s')", file.IndexerType), + fmt.Sprintf("path for the file tx event store (required if event store if '%s')", file.EventStoreType), ) } @@ -189,12 +189,12 @@ func exec(c *gnolandCfg) error { } // Initialize the indexer config - indexerCfg, err := getIndexerConfig(c) + txEventStoreCfg, err := getTxEventStoreConfig(c) if err != nil { return fmt.Errorf("unable to parse indexer config, %w", err) } - cfg.Indexer = indexerCfg + cfg.TxEventStore = txEventStoreCfg // create application and node. gnoApp, err := gnoland.NewApp(rootDir, c.skipFailingGenesisTxs, logger, c.genesisMaxVMCycles) @@ -231,25 +231,25 @@ func exec(c *gnolandCfg) error { select {} // run forever } -// getIndexerConfig constructs an indexer config from provided user options -func getIndexerConfig(c *gnolandCfg) (*indexercfg.Config, error) { - var cfg *indexercfg.Config +// getTxEventStoreConfig constructs an event store config from provided user options +func getTxEventStoreConfig(c *gnolandCfg) (*eventstorecfg.Config, error) { + var cfg *eventstorecfg.Config - switch c.txIndexerType { - case file.IndexerType: - if c.txIndexerPath == "" { + switch c.txEventStoreType { + case file.EventStoreType: + if c.txEventStorePath == "" { return nil, errors.New("unspecified file transaction indexer path") } // Fill out the configuration - cfg = &indexercfg.Config{ - IndexerType: file.IndexerType, + cfg = &eventstorecfg.Config{ + EventStoreType: file.EventStoreType, Params: map[string]any{ - file.Path: c.txIndexerPath, + file.Path: c.txEventStorePath, }, } default: - cfg = indexercfg.DefaultIndexerConfig() + cfg = eventstorecfg.DefaultEventStoreConfig() } return cfg, nil diff --git a/tm2/pkg/bft/config/config.go b/tm2/pkg/bft/config/config.go index f16627134a7..fbd09a0d8f5 100644 --- a/tm2/pkg/bft/config/config.go +++ b/tm2/pkg/bft/config/config.go @@ -9,7 +9,7 @@ import ( cns "github.com/gnolang/gno/tm2/pkg/bft/consensus/config" mem "github.com/gnolang/gno/tm2/pkg/bft/mempool/config" rpc "github.com/gnolang/gno/tm2/pkg/bft/rpc/config" - txindex "github.com/gnolang/gno/tm2/pkg/bft/state/txindex/config" + eventstore "github.com/gnolang/gno/tm2/pkg/bft/state/eventstore/types" "github.com/gnolang/gno/tm2/pkg/errors" osm "github.com/gnolang/gno/tm2/pkg/os" p2p "github.com/gnolang/gno/tm2/pkg/p2p/config" @@ -21,22 +21,22 @@ type Config struct { BaseConfig `toml:",squash"` // Options for services - RPC *rpc.RPCConfig `toml:"rpc"` - P2P *p2p.P2PConfig `toml:"p2p"` - Mempool *mem.MempoolConfig `toml:"mempool"` - Consensus *cns.ConsensusConfig `toml:"consensus"` - Indexer *txindex.Config `toml:"indexer"` + RPC *rpc.RPCConfig `toml:"rpc"` + P2P *p2p.P2PConfig `toml:"p2p"` + Mempool *mem.MempoolConfig `toml:"mempool"` + Consensus *cns.ConsensusConfig `toml:"consensus"` + TxEventStore *eventstore.Config `toml:"tx_event_store"` } // DefaultConfig returns a default configuration for a Tendermint node func DefaultConfig() *Config { return &Config{ - BaseConfig: DefaultBaseConfig(), - RPC: rpc.DefaultRPCConfig(), - P2P: p2p.DefaultP2PConfig(), - Mempool: mem.DefaultMempoolConfig(), - Consensus: cns.DefaultConsensusConfig(), - Indexer: txindex.DefaultIndexerConfig(), + BaseConfig: DefaultBaseConfig(), + RPC: rpc.DefaultRPCConfig(), + P2P: p2p.DefaultP2PConfig(), + Mempool: mem.DefaultMempoolConfig(), + Consensus: cns.DefaultConsensusConfig(), + TxEventStore: eventstore.DefaultEventStoreConfig(), } } @@ -71,12 +71,12 @@ func LoadOrMakeConfigWithOptions(root string, options ConfigOptions) (cfg *Confi // TestConfig returns a configuration that can be used for testing func TestConfig() *Config { return &Config{ - BaseConfig: TestBaseConfig(), - RPC: rpc.TestRPCConfig(), - P2P: p2p.TestP2PConfig(), - Mempool: mem.TestMempoolConfig(), - Consensus: cns.TestConsensusConfig(), - Indexer: txindex.DefaultIndexerConfig(), + BaseConfig: TestBaseConfig(), + RPC: rpc.TestRPCConfig(), + P2P: p2p.TestP2PConfig(), + Mempool: mem.TestMempoolConfig(), + Consensus: cns.TestConsensusConfig(), + TxEventStore: eventstore.DefaultEventStoreConfig(), } } diff --git a/tm2/pkg/bft/node/node.go b/tm2/pkg/bft/node/node.go index 711ea2f5503..11a04261999 100644 --- a/tm2/pkg/bft/node/node.go +++ b/tm2/pkg/bft/node/node.go @@ -12,7 +12,7 @@ import ( "time" "github.com/gnolang/cors" - "github.com/gnolang/gno/tm2/pkg/bft/state/txindex/file" + "github.com/gnolang/gno/tm2/pkg/bft/state/eventstore/file" "github.com/gnolang/gno/tm2/pkg/amino" abci "github.com/gnolang/gno/tm2/pkg/bft/abci/types" @@ -26,8 +26,8 @@ import ( _ "github.com/gnolang/gno/tm2/pkg/bft/rpc/core/types" rpcserver "github.com/gnolang/gno/tm2/pkg/bft/rpc/lib/server" sm "github.com/gnolang/gno/tm2/pkg/bft/state" - "github.com/gnolang/gno/tm2/pkg/bft/state/txindex" - "github.com/gnolang/gno/tm2/pkg/bft/state/txindex/null" + "github.com/gnolang/gno/tm2/pkg/bft/state/eventstore" + "github.com/gnolang/gno/tm2/pkg/bft/state/eventstore/null" "github.com/gnolang/gno/tm2/pkg/bft/store" "github.com/gnolang/gno/tm2/pkg/bft/types" tmtime "github.com/gnolang/gno/tm2/pkg/bft/types/time" @@ -155,18 +155,18 @@ type Node struct { isListening bool // services - evsw events.EventSwitch - stateDB dbm.DB - blockStore *store.BlockStore // store the blockchain to disk - bcReactor p2p.Reactor // for fast-syncing - mempoolReactor *mempl.Reactor // for gossipping transactions - mempool mempl.Mempool - consensusState *cs.ConsensusState // latest consensus state - consensusReactor *cs.ConsensusReactor // for participating in the consensus - proxyApp proxy.AppConns // connection to the application - rpcListeners []net.Listener // rpc servers - txIndexer txindex.TxIndexer - indexerService *txindex.IndexerService + evsw events.EventSwitch + stateDB dbm.DB + blockStore *store.BlockStore // store the blockchain to disk + bcReactor p2p.Reactor // for fast-syncing + mempoolReactor *mempl.Reactor // for gossipping transactions + mempool mempl.Mempool + consensusState *cs.ConsensusState // latest consensus state + consensusReactor *cs.ConsensusReactor // for participating in the consensus + proxyApp proxy.AppConns // connection to the application + rpcListeners []net.Listener // rpc servers + txEventStore eventstore.TxEventStore + eventStoreService *eventstore.Service } func initDBs(config *cfg.Config, dbProvider DBProvider) (blockStore *store.BlockStore, stateDB dbm.DB, err error) { @@ -194,36 +194,36 @@ func createAndStartProxyAppConns(clientCreator proxy.ClientCreator, logger log.L return proxyApp, nil } -func createAndStartIndexerService( +func createAndStartEventStoreService( cfg *cfg.Config, evSwitch events.EventSwitch, logger log.Logger, -) (*txindex.IndexerService, txindex.TxIndexer, error) { +) (*eventstore.Service, eventstore.TxEventStore, error) { var ( - err error - txIndexer txindex.TxIndexer + err error + txEventStore eventstore.TxEventStore ) - // Instantiate the indexer based on the configuration - switch cfg.Indexer.IndexerType { - case file.IndexerType: - // Transaction indexes should be logged to files - txIndexer, err = file.NewTxIndexer(cfg.Indexer) + // Instantiate the event store based on the configuration + switch cfg.TxEventStore.EventStoreType { + case file.EventStoreType: + // Transaction events should be logged to files + txEventStore, err = file.NewTxEventStore(cfg.TxEventStore) if err != nil { - return nil, nil, fmt.Errorf("unable to create file tx indexer, %w", err) + return nil, nil, fmt.Errorf("unable to create file tx event store, %w", err) } default: - // Transaction indexing should be omitted - txIndexer = null.NewNullIndexer() + // Transaction event storing should be omitted + txEventStore = null.NewNullEventStore() } - indexerService := txindex.NewIndexerService(txIndexer, evSwitch) - indexerService.SetLogger(logger.With("module", "txindex")) + indexerService := eventstore.NewEventStoreService(txEventStore, evSwitch) + indexerService.SetLogger(logger.With("module", "eventstore")) if err = indexerService.Start(); err != nil { return nil, nil, err } - return indexerService, txIndexer, nil + return indexerService, txEventStore, nil } func doHandshake(stateDB dbm.DB, state sm.State, blockStore sm.BlockStore, @@ -432,14 +432,14 @@ func NewNode(config *cfg.Config, return nil, err } - // EventSwitch and IndexerService must be started before the handshake because - // we might need to index the txs of the replayed block as this might not have happened + // EventSwitch and EventStoreService must be started before the handshake because + // we might need to store the txs of the replayed block as this might not have happened // when the node stopped last time (i.e. the node stopped after it saved the block // but before it indexed the txs, or, endblocker panicked) evsw := events.NewEventSwitch() - // Transaction indexing - indexerService, txIndexer, err := createAndStartIndexerService(config, evsw, logger) + // Transaction event storing + eventStoreService, txEventStore, err := createAndStartEventStoreService(config, evsw, logger) if err != nil { return nil, err } @@ -501,7 +501,7 @@ func NewNode(config *cfg.Config, privValidator, fastSync, evsw, consensusLogger, ) - nodeInfo, err := makeNodeInfo(config, nodeKey, txIndexer, genDoc, state) + nodeInfo, err := makeNodeInfo(config, nodeKey, txEventStore, genDoc, state) if err != nil { return nil, errors.Wrap(err, "error making NodeInfo") } @@ -542,17 +542,17 @@ func NewNode(config *cfg.Config, nodeInfo: nodeInfo, nodeKey: nodeKey, - evsw: evsw, - stateDB: stateDB, - blockStore: blockStore, - bcReactor: bcReactor, - mempoolReactor: mempoolReactor, - mempool: mempool, - consensusState: consensusState, - consensusReactor: consensusReactor, - proxyApp: proxyApp, - txIndexer: txIndexer, - indexerService: indexerService, + evsw: evsw, + stateDB: stateDB, + blockStore: blockStore, + bcReactor: bcReactor, + mempoolReactor: mempoolReactor, + mempool: mempool, + consensusState: consensusState, + consensusReactor: consensusReactor, + proxyApp: proxyApp, + txEventStore: txEventStore, + eventStoreService: eventStoreService, } node.BaseService = *service.NewBaseService(logger, "Node", node) @@ -627,7 +627,7 @@ func (n *Node) OnStop() { // first stop the non-reactor services n.evsw.Stop() - n.indexerService.Stop() + n.eventStoreService.Stop() // now stop the reactors n.sw.Stop() @@ -665,7 +665,7 @@ func (n *Node) ConfigureRPC() { rpccore.SetPubKey(pubKey) rpccore.SetGenesisDoc(n.genesisDoc) rpccore.SetProxyAppQuery(n.proxyApp.Query()) - rpccore.SetTxIndexer(n.txIndexer) + rpccore.SetTxEventStore(n.txEventStore) rpccore.SetConsensusReactor(n.consensusReactor) rpccore.SetLogger(n.Logger.With("module", "rpc")) rpccore.SetEventSwitch(n.evsw) @@ -840,12 +840,12 @@ func (n *Node) NodeInfo() p2p.NodeInfo { func makeNodeInfo( config *cfg.Config, nodeKey *p2p.NodeKey, - txIndexer txindex.TxIndexer, + txEventStore eventstore.TxEventStore, genDoc *types.GenesisDoc, state sm.State, ) (p2p.NodeInfo, error) { txIndexerStatus := p2p.IndexerStatusOff - if txIndexer.GetType() != null.IndexerType { + if txEventStore.GetType() != null.EventStoreType { txIndexerStatus = p2p.IndexerStatusOn } diff --git a/tm2/pkg/bft/rpc/core/pipe.go b/tm2/pkg/bft/rpc/core/pipe.go index fd6c8fc0692..a8b102d9ab7 100644 --- a/tm2/pkg/bft/rpc/core/pipe.go +++ b/tm2/pkg/bft/rpc/core/pipe.go @@ -10,7 +10,7 @@ import ( "github.com/gnolang/gno/tm2/pkg/bft/proxy" cfg "github.com/gnolang/gno/tm2/pkg/bft/rpc/config" sm "github.com/gnolang/gno/tm2/pkg/bft/state" - "github.com/gnolang/gno/tm2/pkg/bft/state/txindex" + "github.com/gnolang/gno/tm2/pkg/bft/state/eventstore" "github.com/gnolang/gno/tm2/pkg/bft/types" "github.com/gnolang/gno/tm2/pkg/crypto" dbm "github.com/gnolang/gno/tm2/pkg/db" @@ -25,7 +25,7 @@ const ( maxPerPage = 100 ) -//---------------------------------------------- +// ---------------------------------------------- // These interfaces are used by RPC and must be thread safe type Consensus interface { @@ -50,7 +50,7 @@ type peers interface { Peers() p2p.IPeerSet } -//---------------------------------------------- +// ---------------------------------------------- // These package level globals come with setters // that are expected to be called only once, on startup @@ -68,7 +68,7 @@ var ( // objects pubKey crypto.PubKey genDoc *types.GenesisDoc // cache the genesis structure - txIndexer txindex.TxIndexer + txEventStore eventstore.TxEventStore consensusReactor *consensus.ConsensusReactor evsw events.EventSwitch gTxDispatcher *txDispatcher @@ -115,8 +115,8 @@ func SetProxyAppQuery(appConn proxy.AppConnQuery) { proxyAppQuery = appConn } -func SetTxIndexer(indexer txindex.TxIndexer) { - txIndexer = indexer +func SetTxEventStore(indexer eventstore.TxEventStore) { + txEventStore = indexer } func SetConsensusReactor(conR *consensus.ConsensusReactor) { diff --git a/tm2/pkg/bft/state/txindex/file/file.go b/tm2/pkg/bft/state/eventstore/file/file.go similarity index 54% rename from tm2/pkg/bft/state/txindex/file/file.go rename to tm2/pkg/bft/state/eventstore/file/file.go index b703b0f2090..46826096f2a 100644 --- a/tm2/pkg/bft/state/txindex/file/file.go +++ b/tm2/pkg/bft/state/eventstore/file/file.go @@ -5,32 +5,32 @@ import ( "github.com/gnolang/gno/tm2/pkg/amino" "github.com/gnolang/gno/tm2/pkg/autofile" - "github.com/gnolang/gno/tm2/pkg/bft/state/txindex/config" + storetypes "github.com/gnolang/gno/tm2/pkg/bft/state/eventstore/types" "github.com/gnolang/gno/tm2/pkg/bft/types" "github.com/gnolang/gno/tm2/pkg/errors" ) const ( - IndexerType = "file" - Path = "path" + EventStoreType = "file" + Path = "path" ) var ( errMissingPath = errors.New("missing path param") - errInvalidType = errors.New("invalid config for file indexer specified") + errInvalidType = errors.New("invalid config for file event store specified") ) -// TxIndexer is the implementation of a transaction indexer +// TxEventStore is the implementation of a transaction event store // that outputs to the local filesystem -type TxIndexer struct { +type TxEventStore struct { headPath string group *autofile.Group } -// NewTxIndexer creates a new file-based tx indexer -func NewTxIndexer(cfg *config.Config) (*TxIndexer, error) { +// NewTxEventStore creates a new file-based tx event store +func NewTxEventStore(cfg *storetypes.Config) (*TxEventStore, error) { // Parse config params - if IndexerType != cfg.IndexerType { + if EventStoreType != cfg.EventStoreType { return nil, errInvalidType } @@ -39,13 +39,13 @@ func NewTxIndexer(cfg *config.Config) (*TxIndexer, error) { return nil, errMissingPath } - return &TxIndexer{ + return &TxEventStore{ headPath: headPath, }, nil } -// Start starts the file transaction indexer, by opening the autofile group -func (t *TxIndexer) Start() error { +// Start starts the file transaction event store, by opening the autofile group +func (t *TxEventStore) Start() error { // Open the group group, err := autofile.OpenGroup(t.headPath) if err != nil { @@ -57,21 +57,21 @@ func (t *TxIndexer) Start() error { return nil } -// Stop stops the file transaction indexer, by closing the autofile group -func (t *TxIndexer) Stop() error { +// Stop stops the file transaction event store, by closing the autofile group +func (t *TxEventStore) Stop() error { // Close off the group t.group.Close() return nil } -// GetType returns the file transaction indexer type -func (t *TxIndexer) GetType() string { - return IndexerType +// GetType returns the file transaction event store type +func (t *TxEventStore) GetType() string { + return EventStoreType } // Index marshals the transaction using amino, and writes it to the disk -func (t *TxIndexer) Index(tx types.TxResult) error { +func (t *TxEventStore) Index(tx types.TxResult) error { // Serialize the transaction using amino txRaw, err := amino.MarshalJSON(tx) if err != nil { @@ -80,12 +80,12 @@ func (t *TxIndexer) Index(tx types.TxResult) error { // Write the serialized transaction info to the file group if err = t.group.WriteLine(string(txRaw)); err != nil { - return fmt.Errorf("unable to save transaction index, %w", err) + return fmt.Errorf("unable to save transaction event, %w", err) } // Flush output to storage if err := t.group.FlushAndSync(); err != nil { - return fmt.Errorf("unable to flush and sync transaction index, %w", err) + return fmt.Errorf("unable to flush and sync transaction event, %w", err) } return nil diff --git a/tm2/pkg/bft/state/txindex/file/file_test.go b/tm2/pkg/bft/state/eventstore/file/file_test.go similarity index 59% rename from tm2/pkg/bft/state/txindex/file/file_test.go rename to tm2/pkg/bft/state/eventstore/file/file_test.go index bb86dcc8867..bab520e2e34 100644 --- a/tm2/pkg/bft/state/txindex/file/file_test.go +++ b/tm2/pkg/bft/state/eventstore/file/file_test.go @@ -5,7 +5,7 @@ import ( "testing" "github.com/gnolang/gno/tm2/pkg/amino" - "github.com/gnolang/gno/tm2/pkg/bft/state/txindex/config" + storetypes "github.com/gnolang/gno/tm2/pkg/bft/state/eventstore/types" "github.com/gnolang/gno/tm2/pkg/bft/types" "github.com/gnolang/gno/tm2/pkg/testutils" "github.com/stretchr/testify/assert" @@ -22,17 +22,17 @@ func generateTestTransactions(count int) []types.TxResult { return txs } -func TestTxIndexer_New(t *testing.T) { +func TestTxEventStore_New(t *testing.T) { t.Parallel() t.Run("invalid file path specified", func(t *testing.T) { t.Parallel() - cfg := &config.Config{ - IndexerType: "invalid", + cfg := &storetypes.Config{ + EventStoreType: "invalid", } - i, err := NewTxIndexer(cfg) + i, err := NewTxEventStore(cfg) assert.Nil(t, i) assert.ErrorIs(t, err, errInvalidType) @@ -41,12 +41,12 @@ func TestTxIndexer_New(t *testing.T) { t.Run("invalid file path specified", func(t *testing.T) { t.Parallel() - cfg := &config.Config{ - IndexerType: IndexerType, - Params: nil, + cfg := &storetypes.Config{ + EventStoreType: EventStoreType, + Params: nil, } - i, err := NewTxIndexer(cfg) + i, err := NewTxEventStore(cfg) assert.Nil(t, i) assert.ErrorIs(t, err, errMissingPath) @@ -57,25 +57,25 @@ func TestTxIndexer_New(t *testing.T) { headPath := "." - cfg := &config.Config{ - IndexerType: IndexerType, + cfg := &storetypes.Config{ + EventStoreType: EventStoreType, Params: map[string]any{ Path: headPath, }, } - i, err := NewTxIndexer(cfg) + i, err := NewTxEventStore(cfg) if i == nil { - t.Fatalf("unable to create indexer") + t.Fatalf("unable to create event store") } assert.NoError(t, err) assert.Equal(t, headPath, i.headPath) - assert.Equal(t, IndexerType, i.GetType()) + assert.Equal(t, EventStoreType, i.GetType()) }) } -func TestTxIndexer_Index(t *testing.T) { +func TestTxEventStore_Index(t *testing.T) { t.Parallel() headFile, cleanup := testutils.NewTestFile(t) @@ -83,25 +83,25 @@ func TestTxIndexer_Index(t *testing.T) { cleanup() }) - indexer, err := NewTxIndexer(&config.Config{ - IndexerType: IndexerType, + eventStore, err := NewTxEventStore(&storetypes.Config{ + EventStoreType: EventStoreType, Params: map[string]any{ Path: headFile.Name(), }, }) if err != nil { - t.Fatalf("unable to create tx indexer, %v", err) + t.Fatalf("unable to create tx event store, %v", err) } - // Start the indexer - if err = indexer.Start(); err != nil { - t.Fatalf("unable to start indexer, %v", err) + // Start the event store + if err = eventStore.Start(); err != nil { + t.Fatalf("unable to start event store, %v", err) } t.Cleanup(func() { - // Stop the indexer - if err = indexer.Stop(); err != nil { - t.Fatalf("unable to stop indexer gracefully, %v", err) + // Stop the event store + if err = eventStore.Stop(); err != nil { + t.Fatalf("unable to stop event store gracefully, %v", err) } }) @@ -109,13 +109,13 @@ func TestTxIndexer_Index(t *testing.T) { txs := generateTestTransactions(numTxs) for _, tx := range txs { - if err = indexer.Index(tx); err != nil { - t.Fatalf("unable to index transaction, %v", err) + if err = eventStore.Index(tx); err != nil { + t.Fatalf("unable to store transaction, %v", err) } } // Make sure the file group's size is valid - if indexer.group.ReadGroupInfo().TotalSize == 0 { + if eventStore.group.ReadGroupInfo().TotalSize == 0 { t.Fatalf("invalid group size") } @@ -128,7 +128,7 @@ func TestTxIndexer_Index(t *testing.T) { var txRes types.TxResult if err = amino.UnmarshalJSON(line, &txRes); err != nil { - t.Fatalf("unable to read indexer line") + t.Fatalf("unable to read store line") } assert.Equal(t, txs[linesRead], txRes) diff --git a/tm2/pkg/bft/state/eventstore/indexer.go b/tm2/pkg/bft/state/eventstore/indexer.go new file mode 100644 index 00000000000..a8f5d6e0090 --- /dev/null +++ b/tm2/pkg/bft/state/eventstore/indexer.go @@ -0,0 +1,18 @@ +package eventstore + +import "github.com/gnolang/gno/tm2/pkg/bft/types" + +// TxEventStore stores transaction events for later processing +type TxEventStore interface { + // Start starts the transaction event store + Start() error + + // Stop stops the transaction event store + Stop() error + + // GetType returns the event store type + GetType() string + + // Index analyzes, indexes and stores a single transaction + Index(result types.TxResult) error +} diff --git a/tm2/pkg/bft/state/eventstore/indexer_service.go b/tm2/pkg/bft/state/eventstore/indexer_service.go new file mode 100644 index 00000000000..309329bdeca --- /dev/null +++ b/tm2/pkg/bft/state/eventstore/indexer_service.go @@ -0,0 +1,84 @@ +package eventstore + +import ( + "context" + "fmt" + + "github.com/gnolang/gno/tm2/pkg/bft/types" + "github.com/gnolang/gno/tm2/pkg/events" + "github.com/gnolang/gno/tm2/pkg/service" +) + +// Service connects the event bus and event store together in order +// to store events coming from event bus +type Service struct { + service.BaseService + + cancelFn context.CancelFunc + + txEventStore TxEventStore + evSwitch events.EventSwitch +} + +// NewEventStoreService returns a new service instance +func NewEventStoreService(idr TxEventStore, evSwitch events.EventSwitch) *Service { + is := &Service{txEventStore: idr, evSwitch: evSwitch} + is.BaseService = *service.NewBaseService(nil, "EventStoreService", is) + + return is +} + +func (is *Service) OnStart() error { + // Create a context for the intermediary monitor service + ctx, cancelFn := context.WithCancel(context.Background()) + is.cancelFn = cancelFn + + // Start the event store + if err := is.txEventStore.Start(); err != nil { + return fmt.Errorf("unable to start transaction event store, %w", err) + } + + // Start the intermediary monitor service + go is.monitorTxEvents(ctx) + + return nil +} + +func (is *Service) OnStop() { + // Close off any routines + is.cancelFn() + + // Attempt to gracefully stop the event store + if err := is.txEventStore.Stop(); err != nil { + is.Logger.Error( + fmt.Sprintf("unable to gracefully stop event store, %v", err), + ) + } +} + +// monitorTxEvents acts as an intermediary feed service for the supplied +// event store. It relays transaction events that come from the event stream +func (is *Service) monitorTxEvents(ctx context.Context) { + // Create a subscription for transaction events + subCh := events.SubscribeToEvent(is.evSwitch, "tx-event-store", types.EventTx{}) + + for { + select { + case <-ctx.Done(): + return + case evRaw := <-subCh: + // Cast the event + ev, ok := evRaw.(types.EventTx) + if !ok { + is.Logger.Error("invalid transaction result type cast") + + continue + } + + // Alert the actual tx event store + if err := is.txEventStore.Index(ev.Result); err != nil { + is.Logger.Error("unable to store transaction", "err", err) + } + } + } +} diff --git a/tm2/pkg/bft/state/txindex/indexer_service_test.go b/tm2/pkg/bft/state/eventstore/indexer_service_test.go similarity index 86% rename from tm2/pkg/bft/state/txindex/indexer_service_test.go rename to tm2/pkg/bft/state/eventstore/indexer_service_test.go index 24df1f05ade..8da445477d4 100644 --- a/tm2/pkg/bft/state/txindex/indexer_service_test.go +++ b/tm2/pkg/bft/state/eventstore/indexer_service_test.go @@ -1,4 +1,4 @@ -package txindex +package eventstore import ( "sync" @@ -24,7 +24,7 @@ func generateTxEvents(count int) []types.EventTx { return txEvents } -func TestIndexerService_Monitor(t *testing.T) { +func TestEventStoreService_Monitor(t *testing.T) { t.Parallel() const defaultTimeout = 5 * time.Second @@ -38,7 +38,7 @@ func TestIndexerService_Monitor(t *testing.T) { cb events.EventCallback cbSet atomic.Bool - mockIndexer = &mockIndexer{ + mockEventStore = &mockEventStore{ startFn: func() error { startCalled = true @@ -73,28 +73,27 @@ func TestIndexerService_Monitor(t *testing.T) { } ) - // Create a new indexer instance - i := NewIndexerService(mockIndexer, mockEventSwitch) + // Create a new event store instance + i := NewEventStoreService(mockEventStore, mockEventSwitch) if i == nil { - t.Fatal("unable to create indexer service") + t.Fatal("unable to create event store service") } - // Start the indexer + // Start the event store if err := i.OnStart(); err != nil { - t.Fatalf("unable to start indexer, %v", err) + t.Fatalf("unable to start event store, %v", err) } assert.True(t, startCalled) t.Cleanup(func() { - // Stop the indexer + // Stop the event store i.OnStop() assert.True(t, stopCalled) }) - // Fire off the events so the indexer can catch them - + // Fire off the events so the event store can catch them numEvents := 1000 txEvents := generateTxEvents(numEvents) diff --git a/tm2/pkg/bft/state/txindex/mock_test.go b/tm2/pkg/bft/state/eventstore/mock_test.go similarity index 84% rename from tm2/pkg/bft/state/txindex/mock_test.go rename to tm2/pkg/bft/state/eventstore/mock_test.go index 2e60a321e47..f460423f1fa 100644 --- a/tm2/pkg/bft/state/txindex/mock_test.go +++ b/tm2/pkg/bft/state/eventstore/mock_test.go @@ -1,4 +1,4 @@ -package txindex +package eventstore import ( "github.com/gnolang/gno/tm2/pkg/bft/types" @@ -6,7 +6,7 @@ import ( "github.com/gnolang/gno/tm2/pkg/service" ) -// TxIndexer // +// TxEventStore // type ( startDelegate func() error @@ -15,14 +15,14 @@ type ( indexDelegate func(types.TxResult) error ) -type mockIndexer struct { +type mockEventStore struct { startFn startDelegate stopFn stopDelegate getTypeFn getTypeDelegate indexFn indexDelegate } -func (m mockIndexer) Start() error { +func (m mockEventStore) Start() error { if m.startFn != nil { return m.startFn() } @@ -30,7 +30,7 @@ func (m mockIndexer) Start() error { return nil } -func (m mockIndexer) Stop() error { +func (m mockEventStore) Stop() error { if m.stopFn != nil { return m.stopFn() } @@ -38,7 +38,7 @@ func (m mockIndexer) Stop() error { return nil } -func (m mockIndexer) GetType() string { +func (m mockEventStore) GetType() string { if m.getTypeFn != nil { return m.getTypeFn() } @@ -46,7 +46,7 @@ func (m mockIndexer) GetType() string { return "" } -func (m mockIndexer) Index(result types.TxResult) error { +func (m mockEventStore) Index(result types.TxResult) error { if m.indexFn != nil { return m.indexFn(result) } diff --git a/tm2/pkg/bft/state/eventstore/null/null.go b/tm2/pkg/bft/state/eventstore/null/null.go new file mode 100644 index 00000000000..bb6574be4ed --- /dev/null +++ b/tm2/pkg/bft/state/eventstore/null/null.go @@ -0,0 +1,35 @@ +package null + +import ( + "github.com/gnolang/gno/tm2/pkg/bft/state/eventstore" + "github.com/gnolang/gno/tm2/pkg/bft/types" +) + +var _ eventstore.TxEventStore = (*TxEventStore)(nil) + +const ( + EventStoreType = "none" +) + +// TxEventStore acts as a /dev/null +type TxEventStore struct{} + +func NewNullEventStore() *TxEventStore { + return &TxEventStore{} +} + +func (t TxEventStore) Start() error { + return nil +} + +func (t TxEventStore) Stop() error { + return nil +} + +func (t TxEventStore) Index(_ types.TxResult) error { + return nil +} + +func (t TxEventStore) GetType() string { + return EventStoreType +} diff --git a/tm2/pkg/bft/state/eventstore/types/config.go b/tm2/pkg/bft/state/eventstore/types/config.go new file mode 100644 index 00000000000..08e25870b4d --- /dev/null +++ b/tm2/pkg/bft/state/eventstore/types/config.go @@ -0,0 +1,29 @@ +package types + +import "github.com/gnolang/gno/tm2/pkg/bft/state/eventstore/null" + +// EventStoreParams defines the arbitrary event store config params +type EventStoreParams map[string]any + +// Config defines the specific event store configuration +type Config struct { + EventStoreType string + Params EventStoreParams +} + +// GetParam fetches the specific config param, if any. +// Returns nil if the param is not present +func (c *Config) GetParam(name string) any { + if c.Params != nil { + return c.Params[name] + } + + return nil +} + +// DefaultEventStoreConfig returns the default event store config +func DefaultEventStoreConfig() *Config { + return &Config{ + EventStoreType: null.EventStoreType, + } +} diff --git a/tm2/pkg/bft/state/txindex/config/config_test.go b/tm2/pkg/bft/state/eventstore/types/config_test.go similarity index 97% rename from tm2/pkg/bft/state/txindex/config/config_test.go rename to tm2/pkg/bft/state/eventstore/types/config_test.go index 19ce61d824b..0f5683b7c61 100644 --- a/tm2/pkg/bft/state/txindex/config/config_test.go +++ b/tm2/pkg/bft/state/eventstore/types/config_test.go @@ -1,4 +1,4 @@ -package config +package types import ( "testing" diff --git a/tm2/pkg/bft/state/txindex/config/config.go b/tm2/pkg/bft/state/txindex/config/config.go deleted file mode 100644 index 1aa9c017e70..00000000000 --- a/tm2/pkg/bft/state/txindex/config/config.go +++ /dev/null @@ -1,30 +0,0 @@ -package config - -import "github.com/gnolang/gno/tm2/pkg/bft/state/txindex/null" - -// IndexerParams defines the arbitrary indexer config params -type IndexerParams map[string]any - -// Config defines the specific transaction -// indexer configuration -type Config struct { - IndexerType string - Params IndexerParams -} - -// GetParam fetches the specific config param, if any. -// Returns nil if the param is not present -func (c *Config) GetParam(name string) any { - if c.Params != nil { - return c.Params[name] - } - - return nil -} - -// DefaultIndexerConfig returns the default indexer config -func DefaultIndexerConfig() *Config { - return &Config{ - IndexerType: null.IndexerType, - } -} diff --git a/tm2/pkg/bft/state/txindex/indexer.go b/tm2/pkg/bft/state/txindex/indexer.go deleted file mode 100644 index ecaa247160f..00000000000 --- a/tm2/pkg/bft/state/txindex/indexer.go +++ /dev/null @@ -1,18 +0,0 @@ -package txindex - -import "github.com/gnolang/gno/tm2/pkg/bft/types" - -// TxIndexer indexes transactions for later processing -type TxIndexer interface { - // Start starts the transaction indexer - Start() error - - // Stop stops the transaction indexer - Stop() error - - // GetType returns the indexer type - GetType() string - - // Index analyzes, indexes and stores a single transaction - Index(result types.TxResult) error -} diff --git a/tm2/pkg/bft/state/txindex/indexer_service.go b/tm2/pkg/bft/state/txindex/indexer_service.go deleted file mode 100644 index 3cadf0328db..00000000000 --- a/tm2/pkg/bft/state/txindex/indexer_service.go +++ /dev/null @@ -1,84 +0,0 @@ -package txindex - -import ( - "context" - "fmt" - - "github.com/gnolang/gno/tm2/pkg/bft/types" - "github.com/gnolang/gno/tm2/pkg/events" - "github.com/gnolang/gno/tm2/pkg/service" -) - -// IndexerService connects event bus and transaction indexer together in order -// to index transactions coming from event bus. -type IndexerService struct { - service.BaseService - - cancelFn context.CancelFunc - - indexer TxIndexer - evSwitch events.EventSwitch -} - -// NewIndexerService returns a new service instance. -func NewIndexerService(idr TxIndexer, evSwitch events.EventSwitch) *IndexerService { - is := &IndexerService{indexer: idr, evSwitch: evSwitch} - is.BaseService = *service.NewBaseService(nil, "IndexerService", is) - - return is -} - -func (is *IndexerService) OnStart() error { - // Create a context for the intermediary monitor service - ctx, cancelFn := context.WithCancel(context.Background()) - is.cancelFn = cancelFn - - // Start the indexer - if err := is.indexer.Start(); err != nil { - return fmt.Errorf("unable to start transaction indexer, %w", err) - } - - // Start the intermediary monitor service - go is.monitorTxEvents(ctx) - - return nil -} - -func (is *IndexerService) OnStop() { - // Close off any routines - is.cancelFn() - - // Attempt to gracefully stop the transaction indexer - if err := is.indexer.Stop(); err != nil { - is.Logger.Error( - fmt.Sprintf("unable to gracefully stop transaction indexer, %v", err), - ) - } -} - -// monitorTxEvents acts as an intermediary feed service for the supplied -// transaction indexer. It relays transaction events that come from the event stream -func (is *IndexerService) monitorTxEvents(ctx context.Context) { - // Create a subscription for transaction events - subCh := events.SubscribeToEvent(is.evSwitch, "tx-indexer", types.EventTx{}) - - for { - select { - case <-ctx.Done(): - return - case evRaw := <-subCh: - // Cast the event - ev, ok := evRaw.(types.EventTx) - if !ok { - is.Logger.Error("invalid transaction result type cast") - - continue - } - - // Alert the actual indexer - if err := is.indexer.Index(ev.Result); err != nil { - is.Logger.Error("unable to index transaction", "err", err) - } - } - } -} diff --git a/tm2/pkg/bft/state/txindex/null/null.go b/tm2/pkg/bft/state/txindex/null/null.go deleted file mode 100644 index c11e1c5068b..00000000000 --- a/tm2/pkg/bft/state/txindex/null/null.go +++ /dev/null @@ -1,35 +0,0 @@ -package null - -import ( - "github.com/gnolang/gno/tm2/pkg/bft/state/txindex" - "github.com/gnolang/gno/tm2/pkg/bft/types" -) - -var _ txindex.TxIndexer = (*TxIndexer)(nil) - -const ( - IndexerType = "none" -) - -// TxIndexer acts as a /dev/null -type TxIndexer struct{} - -func NewNullIndexer() *TxIndexer { - return &TxIndexer{} -} - -func (t TxIndexer) Start() error { - return nil -} - -func (t TxIndexer) Stop() error { - return nil -} - -func (t TxIndexer) Index(_ types.TxResult) error { - return nil -} - -func (t TxIndexer) GetType() string { - return IndexerType -} From 16f07c81b15480b090f16a90e5e38b718372b63a Mon Sep 17 00:00:00 2001 From: Milos Zivkovic Date: Fri, 7 Jul 2023 11:50:23 +0200 Subject: [PATCH 22/28] Rename event switch reference --- tm2/pkg/bft/node/node.go | 4 ++-- tm2/pkg/bft/state/eventstore/indexer_service.go | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/tm2/pkg/bft/node/node.go b/tm2/pkg/bft/node/node.go index 11a04261999..e60e827aa53 100644 --- a/tm2/pkg/bft/node/node.go +++ b/tm2/pkg/bft/node/node.go @@ -196,7 +196,7 @@ func createAndStartProxyAppConns(clientCreator proxy.ClientCreator, logger log.L func createAndStartEventStoreService( cfg *cfg.Config, - evSwitch events.EventSwitch, + evsw events.EventSwitch, logger log.Logger, ) (*eventstore.Service, eventstore.TxEventStore, error) { var ( @@ -217,7 +217,7 @@ func createAndStartEventStoreService( txEventStore = null.NewNullEventStore() } - indexerService := eventstore.NewEventStoreService(txEventStore, evSwitch) + indexerService := eventstore.NewEventStoreService(txEventStore, evsw) indexerService.SetLogger(logger.With("module", "eventstore")) if err = indexerService.Start(); err != nil { return nil, nil, err diff --git a/tm2/pkg/bft/state/eventstore/indexer_service.go b/tm2/pkg/bft/state/eventstore/indexer_service.go index 309329bdeca..cf3d9333069 100644 --- a/tm2/pkg/bft/state/eventstore/indexer_service.go +++ b/tm2/pkg/bft/state/eventstore/indexer_service.go @@ -17,12 +17,12 @@ type Service struct { cancelFn context.CancelFunc txEventStore TxEventStore - evSwitch events.EventSwitch + evsw events.EventSwitch } // NewEventStoreService returns a new service instance -func NewEventStoreService(idr TxEventStore, evSwitch events.EventSwitch) *Service { - is := &Service{txEventStore: idr, evSwitch: evSwitch} +func NewEventStoreService(idr TxEventStore, evsw events.EventSwitch) *Service { + is := &Service{txEventStore: idr, evsw: evsw} is.BaseService = *service.NewBaseService(nil, "EventStoreService", is) return is @@ -60,7 +60,7 @@ func (is *Service) OnStop() { // event store. It relays transaction events that come from the event stream func (is *Service) monitorTxEvents(ctx context.Context) { // Create a subscription for transaction events - subCh := events.SubscribeToEvent(is.evSwitch, "tx-event-store", types.EventTx{}) + subCh := events.SubscribeToEvent(is.evsw, "tx-event-store", types.EventTx{}) for { select { From b7c1dca367b7df33a93b46208c2c6ac5553dfac6 Mon Sep 17 00:00:00 2001 From: Milos Zivkovic Date: Fri, 7 Jul 2023 11:51:16 +0200 Subject: [PATCH 23/28] Revert error assignment --- tm2/pkg/bft/node/node.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tm2/pkg/bft/node/node.go b/tm2/pkg/bft/node/node.go index e60e827aa53..7a37d81c6c8 100644 --- a/tm2/pkg/bft/node/node.go +++ b/tm2/pkg/bft/node/node.go @@ -219,7 +219,7 @@ func createAndStartEventStoreService( indexerService := eventstore.NewEventStoreService(txEventStore, evsw) indexerService.SetLogger(logger.With("module", "eventstore")) - if err = indexerService.Start(); err != nil { + if err := indexerService.Start(); err != nil { return nil, nil, err } From 20e9a5c2b98ff97f800b494af6c2492f11a2c6ea Mon Sep 17 00:00:00 2001 From: Milos Zivkovic Date: Fri, 7 Jul 2023 11:59:13 +0200 Subject: [PATCH 24/28] Move event store status to the event store package --- tm2/pkg/bft/node/node.go | 4 ++-- tm2/pkg/bft/state/eventstore/indexer.go | 5 +++++ tm2/pkg/p2p/node_info.go | 8 ++------ 3 files changed, 9 insertions(+), 8 deletions(-) diff --git a/tm2/pkg/bft/node/node.go b/tm2/pkg/bft/node/node.go index 7a37d81c6c8..dcca3bb8701 100644 --- a/tm2/pkg/bft/node/node.go +++ b/tm2/pkg/bft/node/node.go @@ -844,9 +844,9 @@ func makeNodeInfo( genDoc *types.GenesisDoc, state sm.State, ) (p2p.NodeInfo, error) { - txIndexerStatus := p2p.IndexerStatusOff + txIndexerStatus := eventstore.StatusOff if txEventStore.GetType() != null.EventStoreType { - txIndexerStatus = p2p.IndexerStatusOn + txIndexerStatus = eventstore.StatusOn } bcChannel := bc.BlockchainChannel diff --git a/tm2/pkg/bft/state/eventstore/indexer.go b/tm2/pkg/bft/state/eventstore/indexer.go index a8f5d6e0090..115cd700b60 100644 --- a/tm2/pkg/bft/state/eventstore/indexer.go +++ b/tm2/pkg/bft/state/eventstore/indexer.go @@ -2,6 +2,11 @@ package eventstore import "github.com/gnolang/gno/tm2/pkg/bft/types" +const ( + StatusOn = "on" + StatusOff = "off" +) + // TxEventStore stores transaction events for later processing type TxEventStore interface { // Start starts the transaction event store diff --git a/tm2/pkg/p2p/node_info.go b/tm2/pkg/p2p/node_info.go index 4d1498414f0..48ba8f7776b 100644 --- a/tm2/pkg/p2p/node_info.go +++ b/tm2/pkg/p2p/node_info.go @@ -3,6 +3,7 @@ package p2p import ( "fmt" + "github.com/gnolang/gno/tm2/pkg/bft/state/eventstore" "github.com/gnolang/gno/tm2/pkg/strings" "github.com/gnolang/gno/tm2/pkg/versionset" ) @@ -12,11 +13,6 @@ const ( maxNumChannels = 16 // plenty of room for upgrades, for now ) -const ( - IndexerStatusOn = "on" - IndexerStatusOff = "off" -) - // Max size of the NodeInfo struct func MaxNodeInfoSize() int { return maxNodeInfoSize @@ -105,7 +101,7 @@ func (info NodeInfo) Validate() error { other := info.Other txIndex := other.TxIndex switch txIndex { - case "", IndexerStatusOn, IndexerStatusOff: + case "", eventstore.StatusOn, eventstore.StatusOff: default: return fmt.Errorf("info.Other.TxIndex should be either 'on', 'off', or empty string, got '%v'", txIndex) } From c865287fb62c5cb92cf0d551afbc6fa9cbe23d70 Mon Sep 17 00:00:00 2001 From: Milos Zivkovic Date: Fri, 7 Jul 2023 12:05:56 +0200 Subject: [PATCH 25/28] Rename event store interface method --- tm2/pkg/bft/state/eventstore/file/file.go | 4 ++-- tm2/pkg/bft/state/eventstore/file/file_test.go | 4 ++-- tm2/pkg/bft/state/eventstore/indexer.go | 5 +++-- tm2/pkg/bft/state/eventstore/indexer_service.go | 2 +- tm2/pkg/bft/state/eventstore/indexer_service_test.go | 2 +- tm2/pkg/bft/state/eventstore/mock_test.go | 10 +++++----- tm2/pkg/bft/state/eventstore/null/null.go | 2 +- 7 files changed, 15 insertions(+), 14 deletions(-) diff --git a/tm2/pkg/bft/state/eventstore/file/file.go b/tm2/pkg/bft/state/eventstore/file/file.go index 46826096f2a..f4cd74721f5 100644 --- a/tm2/pkg/bft/state/eventstore/file/file.go +++ b/tm2/pkg/bft/state/eventstore/file/file.go @@ -70,8 +70,8 @@ func (t *TxEventStore) GetType() string { return EventStoreType } -// Index marshals the transaction using amino, and writes it to the disk -func (t *TxEventStore) Index(tx types.TxResult) error { +// Append marshals the transaction using amino, and writes it to the disk +func (t *TxEventStore) Append(tx types.TxResult) error { // Serialize the transaction using amino txRaw, err := amino.MarshalJSON(tx) if err != nil { diff --git a/tm2/pkg/bft/state/eventstore/file/file_test.go b/tm2/pkg/bft/state/eventstore/file/file_test.go index bab520e2e34..46d87582ce4 100644 --- a/tm2/pkg/bft/state/eventstore/file/file_test.go +++ b/tm2/pkg/bft/state/eventstore/file/file_test.go @@ -75,7 +75,7 @@ func TestTxEventStore_New(t *testing.T) { }) } -func TestTxEventStore_Index(t *testing.T) { +func TestTxEventStore_Append(t *testing.T) { t.Parallel() headFile, cleanup := testutils.NewTestFile(t) @@ -109,7 +109,7 @@ func TestTxEventStore_Index(t *testing.T) { txs := generateTestTransactions(numTxs) for _, tx := range txs { - if err = eventStore.Index(tx); err != nil { + if err = eventStore.Append(tx); err != nil { t.Fatalf("unable to store transaction, %v", err) } } diff --git a/tm2/pkg/bft/state/eventstore/indexer.go b/tm2/pkg/bft/state/eventstore/indexer.go index 115cd700b60..10ef9eefc9b 100644 --- a/tm2/pkg/bft/state/eventstore/indexer.go +++ b/tm2/pkg/bft/state/eventstore/indexer.go @@ -18,6 +18,7 @@ type TxEventStore interface { // GetType returns the event store type GetType() string - // Index analyzes, indexes and stores a single transaction - Index(result types.TxResult) error + // Append analyzes and appends a single transaction + // to the event store + Append(result types.TxResult) error } diff --git a/tm2/pkg/bft/state/eventstore/indexer_service.go b/tm2/pkg/bft/state/eventstore/indexer_service.go index cf3d9333069..d6ed40c4151 100644 --- a/tm2/pkg/bft/state/eventstore/indexer_service.go +++ b/tm2/pkg/bft/state/eventstore/indexer_service.go @@ -76,7 +76,7 @@ func (is *Service) monitorTxEvents(ctx context.Context) { } // Alert the actual tx event store - if err := is.txEventStore.Index(ev.Result); err != nil { + if err := is.txEventStore.Append(ev.Result); err != nil { is.Logger.Error("unable to store transaction", "err", err) } } diff --git a/tm2/pkg/bft/state/eventstore/indexer_service_test.go b/tm2/pkg/bft/state/eventstore/indexer_service_test.go index 8da445477d4..3fa5e8a7941 100644 --- a/tm2/pkg/bft/state/eventstore/indexer_service_test.go +++ b/tm2/pkg/bft/state/eventstore/indexer_service_test.go @@ -49,7 +49,7 @@ func TestEventStoreService_Monitor(t *testing.T) { return nil }, - indexFn: func(result types.TxResult) error { + appendFn: func(result types.TxResult) error { receivedResults = append(receivedResults, result) // Atomic because we are accessing this size from a routine diff --git a/tm2/pkg/bft/state/eventstore/mock_test.go b/tm2/pkg/bft/state/eventstore/mock_test.go index f460423f1fa..087d8f6e3e9 100644 --- a/tm2/pkg/bft/state/eventstore/mock_test.go +++ b/tm2/pkg/bft/state/eventstore/mock_test.go @@ -12,14 +12,14 @@ type ( startDelegate func() error stopDelegate func() error getTypeDelegate func() string - indexDelegate func(types.TxResult) error + appendDelegate func(types.TxResult) error ) type mockEventStore struct { startFn startDelegate stopFn stopDelegate getTypeFn getTypeDelegate - indexFn indexDelegate + appendFn appendDelegate } func (m mockEventStore) Start() error { @@ -46,9 +46,9 @@ func (m mockEventStore) GetType() string { return "" } -func (m mockEventStore) Index(result types.TxResult) error { - if m.indexFn != nil { - return m.indexFn(result) +func (m mockEventStore) Append(result types.TxResult) error { + if m.appendFn != nil { + return m.appendFn(result) } return nil diff --git a/tm2/pkg/bft/state/eventstore/null/null.go b/tm2/pkg/bft/state/eventstore/null/null.go index bb6574be4ed..40e3566d89e 100644 --- a/tm2/pkg/bft/state/eventstore/null/null.go +++ b/tm2/pkg/bft/state/eventstore/null/null.go @@ -26,7 +26,7 @@ func (t TxEventStore) Stop() error { return nil } -func (t TxEventStore) Index(_ types.TxResult) error { +func (t TxEventStore) Append(_ types.TxResult) error { return nil } From 10c248305a9f9a45e41446284728642f467d83cb Mon Sep 17 00:00:00 2001 From: Milos Zivkovic Date: Fri, 7 Jul 2023 12:08:11 +0200 Subject: [PATCH 26/28] Rename event store files --- tm2/pkg/bft/state/eventstore/{indexer.go => store.go} | 0 .../bft/state/eventstore/{indexer_service.go => store_service.go} | 0 .../eventstore/{indexer_service_test.go => store_service_test.go} | 0 3 files changed, 0 insertions(+), 0 deletions(-) rename tm2/pkg/bft/state/eventstore/{indexer.go => store.go} (100%) rename tm2/pkg/bft/state/eventstore/{indexer_service.go => store_service.go} (100%) rename tm2/pkg/bft/state/eventstore/{indexer_service_test.go => store_service_test.go} (100%) diff --git a/tm2/pkg/bft/state/eventstore/indexer.go b/tm2/pkg/bft/state/eventstore/store.go similarity index 100% rename from tm2/pkg/bft/state/eventstore/indexer.go rename to tm2/pkg/bft/state/eventstore/store.go diff --git a/tm2/pkg/bft/state/eventstore/indexer_service.go b/tm2/pkg/bft/state/eventstore/store_service.go similarity index 100% rename from tm2/pkg/bft/state/eventstore/indexer_service.go rename to tm2/pkg/bft/state/eventstore/store_service.go diff --git a/tm2/pkg/bft/state/eventstore/indexer_service_test.go b/tm2/pkg/bft/state/eventstore/store_service_test.go similarity index 100% rename from tm2/pkg/bft/state/eventstore/indexer_service_test.go rename to tm2/pkg/bft/state/eventstore/store_service_test.go From 2f9cf606822b7c2ebe3163f58825ec704b58b5a7 Mon Sep 17 00:00:00 2001 From: Milos Zivkovic Date: Thu, 13 Jul 2023 14:23:29 +0200 Subject: [PATCH 27/28] Resolve renamed cfg reference --- gno.land/cmd/gnoland/start.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gno.land/cmd/gnoland/start.go b/gno.land/cmd/gnoland/start.go index 332f3c9b232..b3b9937cf46 100644 --- a/gno.land/cmd/gnoland/start.go +++ b/gno.land/cmd/gnoland/start.go @@ -219,7 +219,7 @@ func execStart(c *startCfg, args []string, io *commands.IO) error { } // getTxEventStoreConfig constructs an event store config from provided user options -func getTxEventStoreConfig(c *gnolandCfg) (*eventstorecfg.Config, error) { +func getTxEventStoreConfig(c *startCfg) (*eventstorecfg.Config, error) { var cfg *eventstorecfg.Config switch c.txEventStoreType { From 3204a38851fcca13f9557ac972c4360a3713ca63 Mon Sep 17 00:00:00 2001 From: Milos Zivkovic Date: Thu, 17 Aug 2023 14:32:48 +0200 Subject: [PATCH 28/28] Resolve typo in help output --- gno.land/cmd/gnoland/start.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gno.land/cmd/gnoland/start.go b/gno.land/cmd/gnoland/start.go index 6d7f336f6e7..5ed7d75efe6 100644 --- a/gno.land/cmd/gnoland/start.go +++ b/gno.land/cmd/gnoland/start.go @@ -144,7 +144,7 @@ func (c *startCfg) RegisterFlags(fs *flag.FlagSet) { &c.txEventStorePath, "tx-event-store-path", "", - fmt.Sprintf("path for the file tx event store (required if event store if '%s')", file.EventStoreType), + fmt.Sprintf("path for the file tx event store (required if event store is '%s')", file.EventStoreType), ) }