From 761bc2231156ffa1445eb85cbdefec1a87544f65 Mon Sep 17 00:00:00 2001 From: Stana Miric <94966829+stana-miric@users.noreply.github.com> Date: Wed, 9 Aug 2023 13:15:15 +0200 Subject: [PATCH] Implementation for newPendingTransactions (eth_subscribe) (#1787) * Implementation for newPendingTransactions(eth_subscribe) --- jsonrpc/dispatcher.go | 2 + jsonrpc/dispatcher_test.go | 54 +++++++---- jsonrpc/eth_blockchain_test.go | 5 ++ jsonrpc/filter_manager.go | 160 ++++++++++++++++++++++++++++++--- jsonrpc/filter_manager_test.go | 81 ++++++++++++++++- jsonrpc/mocks_test.go | 36 ++++++-- txpool/operator.go | 15 ++++ 7 files changed, 318 insertions(+), 35 deletions(-) diff --git a/jsonrpc/dispatcher.go b/jsonrpc/dispatcher.go index 0cc86aa23d..3695cd3d5c 100644 --- a/jsonrpc/dispatcher.go +++ b/jsonrpc/dispatcher.go @@ -209,6 +209,8 @@ func (d *Dispatcher) handleSubscribe(req Request, conn wsConn) (string, Error) { return "", NewInternalError(err.Error()) } filterID = d.filterManager.NewLogFilter(logQuery, conn) + } else if subscribeMethod == "newPendingTransactions" { + filterID = d.filterManager.NewPendingTxFilter(conn) } else { return "", NewSubscriptionNotFoundError(subscribeMethod) } diff --git a/jsonrpc/dispatcher_test.go b/jsonrpc/dispatcher_test.go index 171edf2ac1..22a350baaa 100644 --- a/jsonrpc/dispatcher_test.go +++ b/jsonrpc/dispatcher_test.go @@ -8,6 +8,7 @@ import ( "testing" "time" + "github.com/0xPolygon/polygon-edge/txpool/proto" "github.com/0xPolygon/polygon-edge/types" "github.com/hashicorp/go-hclog" "github.com/stretchr/testify/assert" @@ -60,29 +61,29 @@ func expectBatchJSONResult(data []byte, v interface{}) error { func TestDispatcher_HandleWebsocketConnection_EthSubscribe(t *testing.T) { t.Parallel() - t.Run("clients should be able to receive \"newHeads\" event thru eth_subscribe", func(t *testing.T) { + store := newMockStore() + dispatcher := newTestDispatcher(t, + hclog.NewNullLogger(), + store, + &dispatcherParams{ + chainID: 0, + priceLimit: 0, + jsonRPCBatchLengthLimit: 20, + blockRangeLimit: 1000, + }, + ) + + t.Run("clients should be able to receive \"newHeads\" event through eth_subscribe", func(t *testing.T) { t.Parallel() - store := newMockStore() - dispatcher := newTestDispatcher(t, - hclog.NewNullLogger(), - store, - &dispatcherParams{ - chainID: 0, - priceLimit: 0, - jsonRPCBatchLengthLimit: 20, - blockRangeLimit: 1000, - }, - ) mockConnection, msgCh := newMockWsConnWithMsgCh() req := []byte(`{ "method": "eth_subscribe", "params": ["newHeads"] }`) - if _, err := dispatcher.HandleWs(req, mockConnection); err != nil { - t.Fatal(err) - } + _, err := dispatcher.HandleWs(req, mockConnection) + require.NoError(t, err) store.emitEvent(&mockEvent{ NewChain: []*mockHeader{ @@ -100,6 +101,27 @@ func TestDispatcher_HandleWebsocketConnection_EthSubscribe(t *testing.T) { t.Fatal("\"newHeads\" event not received in 2 seconds") } }) + + t.Run("clients should be able to receive \"newPendingTransactions\" event through eth_subscribe", func(t *testing.T) { + t.Parallel() + + mockConnection, msgCh := newMockWsConnWithMsgCh() + + req := []byte(`{ + "method": "eth_subscribe", + "params": ["newPendingTransactions"] + }`) + _, err := dispatcher.HandleWs(req, mockConnection) + require.NoError(t, err) + + store.emitTxPoolEvent(proto.EventType_ADDED, "evt1") + + select { + case <-msgCh: + case <-time.After(2 * time.Second): + t.Fatal("\"newPendingTransactions\" event not received in 2 seconds") + } + }) } func TestDispatcher_WebsocketConnection_RequestFormats(t *testing.T) { @@ -289,7 +311,7 @@ func TestDispatcherFuncDecode(t *testing.T) { for _, c := range cases { res := handleReq(c.typ, c.msg) if !reflect.DeepEqual(res, c.res) { - t.Fatal("bad") + t.Fatal("no tx pool events received in the predefined time slot") } } } diff --git a/jsonrpc/eth_blockchain_test.go b/jsonrpc/eth_blockchain_test.go index d59019809d..46032af364 100644 --- a/jsonrpc/eth_blockchain_test.go +++ b/jsonrpc/eth_blockchain_test.go @@ -9,6 +9,7 @@ import ( "github.com/0xPolygon/polygon-edge/helper/hex" "github.com/0xPolygon/polygon-edge/helper/progress" "github.com/0xPolygon/polygon-edge/state/runtime" + "github.com/0xPolygon/polygon-edge/txpool/proto" "github.com/0xPolygon/polygon-edge/types" "github.com/stretchr/testify/assert" ) @@ -589,6 +590,10 @@ func (m *mockBlockStore) FilterExtra(extra []byte) ([]byte, error) { return extra, nil } +func (m *mockBlockStore) TxPoolSubscribe(request *proto.SubscribeRequest) (<-chan *proto.TxPoolEvent, func(), error) { + return nil, nil, nil +} + func newTestBlock(number uint64, hash types.Hash) *types.Block { return &types.Block{ Header: &types.Header{ diff --git a/jsonrpc/filter_manager.go b/jsonrpc/filter_manager.go index c7e703bbb8..da44dfec5c 100644 --- a/jsonrpc/filter_manager.go +++ b/jsonrpc/filter_manager.go @@ -11,6 +11,7 @@ import ( "time" "github.com/0xPolygon/polygon-edge/blockchain" + "github.com/0xPolygon/polygon-edge/txpool/proto" "github.com/0xPolygon/polygon-edge/types" "github.com/google/uuid" "github.com/gorilla/websocket" @@ -25,6 +26,7 @@ var ( ErrIncorrectBlockRange = errors.New("incorrect range") ErrBlockRangeTooHigh = errors.New("block range too high") ErrNoWSConnection = errors.New("no websocket connection") + ErrUnknownSubscriptionType = errors.New("unknown subscription type") ) // defaultTimeout is the timeout to remove the filters that don't have a web socket stream @@ -35,6 +37,16 @@ const ( NoIndexInHeap = -1 ) +// subscriptionType determines which event type the filter is subscribed to +type subscriptionType byte + +const ( + // Blocks represents subscription type for blockchain events + Blocks subscriptionType = iota + // PendingTransactions represents subscription type for tx pool events + PendingTransactions +) + // filter is an interface that BlockFilter and LogFilter implement type filter interface { // hasWSConn returns the flag indicating the filter has web socket stream @@ -43,6 +55,9 @@ type filter interface { // getFilterBase returns filterBase that has common fields getFilterBase() *filterBase + // getSubscriptionType returns the type of the event the filter is subscribed to + getSubscriptionType() subscriptionType + // getUpdates returns stored data in a JSON serializable form getUpdates() (interface{}, error) @@ -159,6 +174,11 @@ func (f *blockFilter) sendUpdates() error { return nil } +// getSubscriptionType returns the type of the event the filter is subscribed to +func (f *blockFilter) getSubscriptionType() subscriptionType { + return Blocks +} + // logFilter is a filter to store logs that meet the conditions in query type logFilter struct { filterBase @@ -212,6 +232,63 @@ func (f *logFilter) sendUpdates() error { return nil } +// getSubscriptionType returns the type of the event the filter is subscribed to +func (f *logFilter) getSubscriptionType() subscriptionType { + return Blocks +} + +// pendingTxFilter is a filter to store pending tx +type pendingTxFilter struct { + filterBase + sync.Mutex + + txHashes []string +} + +// appendPendingTxHashes appends new pending tx hash to tx hashes +func (f *pendingTxFilter) appendPendingTxHashes(txHash string) { + f.Lock() + defer f.Unlock() + + f.txHashes = append(f.txHashes, txHash) +} + +// takePendingTxsUpdates returns all saved pending tx hashes in filter and sets a new slice +func (f *pendingTxFilter) takePendingTxsUpdates() []string { + f.Lock() + defer f.Unlock() + + txHashes := f.txHashes + f.txHashes = []string{} + + return txHashes +} + +// getSubscriptionType returns the type of the event the filter is subscribed to +func (f *pendingTxFilter) getSubscriptionType() subscriptionType { + return PendingTransactions +} + +// getUpdates returns stored pending tx hashes +func (f *pendingTxFilter) getUpdates() (interface{}, error) { + pendingTxHashes := f.takePendingTxsUpdates() + + return pendingTxHashes, nil +} + +// sendUpdates write the hashes for all pending transactions to web socket stream +func (f *pendingTxFilter) sendUpdates() error { + pendingTxHashes := f.takePendingTxsUpdates() + + for _, txHash := range pendingTxHashes { + if err := f.writeMessageToWs(txHash); err != nil { + return err + } + } + + return nil +} + // filterManagerStore provides methods required by FilterManager type filterManagerStore interface { // Header returns the current header of the chain (genesis if empty) @@ -228,6 +305,9 @@ type filterManagerStore interface { // GetBlockByNumber returns a block using the provided number GetBlockByNumber(num uint64, full bool) (*types.Block, bool) + + // TxPoolSubscribe subscribes for tx pool events + TxPoolSubscribe(request *proto.SubscribeRequest) (<-chan *proto.TxPoolEvent, func(), error) } // FilterManager manages all running filters @@ -279,7 +359,7 @@ func NewFilterManager(logger hclog.Logger, store filterManagerStore, blockRangeL // Run starts worker process to handle events func (f *FilterManager) Run() { // watch for new events in the blockchain - watchCh := make(chan *blockchain.Event) + blockWatchCh := make(chan *blockchain.Event) go func() { for { @@ -287,10 +367,24 @@ func (f *FilterManager) Run() { if evnt == nil { return } - watchCh <- evnt + blockWatchCh <- evnt } }() + // watch for new events in the tx pool + txRequest := &proto.SubscribeRequest{ + Types: []proto.EventType{proto.EventType_ADDED}, + } + + txWatchCh, txPoolUnsubscribe, err := f.store.TxPoolSubscribe(txRequest) + if err != nil { + f.logger.Error("Unable to subscribe to tx pool") + + return + } + + defer txPoolUnsubscribe() + var timeoutCh <-chan time.Time for { @@ -303,10 +397,16 @@ func (f *FilterManager) Run() { } select { - case evnt := <-watchCh: + case evnt := <-blockWatchCh: // new blockchain event if err := f.dispatchEvent(evnt); err != nil { - f.logger.Error("failed to dispatch event", "err", err) + f.logger.Error("failed to dispatch block event", "err", err) + } + + case evnt := <-txWatchCh: + // new tx pool event + if err := f.dispatchEvent(evnt); err != nil { + f.logger.Error("failed to dispatch tx pool event", "err", err) } case <-timeoutCh: @@ -359,6 +459,20 @@ func (f *FilterManager) NewLogFilter(logQuery *LogQuery, ws wsConn) string { return f.addFilter(filter) } +// NewPendingTxFilter adds new PendingTxFilter +func (f *FilterManager) NewPendingTxFilter(ws wsConn) string { + filter := &pendingTxFilter{ + filterBase: newFilterBase(ws), + txHashes: []string{}, + } + + if filter.hasWSConn() { + ws.SetFilterID(filter.id) + } + + return f.addFilter(filter) +} + // Exists checks the filter with given ID exists func (f *FilterManager) Exists(id string) bool { f.RLock() @@ -621,12 +735,26 @@ func (f *FilterManager) nextTimeoutFilter() (string, time.Time) { } // dispatchEvent is an event handler for new block event -func (f *FilterManager) dispatchEvent(evnt *blockchain.Event) error { +func (f *FilterManager) dispatchEvent(evnt interface{}) error { + var subType subscriptionType + // store new event in each filters - f.processEvent(evnt) + switch evt := evnt.(type) { + case *blockchain.Event: + f.processBlockEvent(evt) + + subType = Blocks + case *proto.TxPoolEvent: + f.processTxEvent(evt) + + subType = PendingTransactions + + default: + return ErrUnknownSubscriptionType + } // send data to web socket stream - if err := f.flushWsFilters(); err != nil { + if err := f.flushWsFilters(subType); err != nil { return err } @@ -634,7 +762,7 @@ func (f *FilterManager) dispatchEvent(evnt *blockchain.Event) error { } // processEvent makes each filter append the new data that interests them -func (f *FilterManager) processEvent(evnt *blockchain.Event) { +func (f *FilterManager) processBlockEvent(evnt *blockchain.Event) { f.RLock() defer f.RUnlock() @@ -710,15 +838,27 @@ func (f *FilterManager) appendLogsToFilters(header *block) error { return nil } +// processTxEvent makes each filter refresh the pending tx hashes +func (f *FilterManager) processTxEvent(evnt *proto.TxPoolEvent) { + f.RLock() + defer f.RUnlock() + + for _, f := range f.filters { + if txFilter, ok := f.(*pendingTxFilter); ok { + txFilter.appendPendingTxHashes(evnt.TxHash) + } + } +} + // flushWsFilters make each filters with web socket connection write the updates to web socket stream // flushWsFilters also removes the filters if flushWsFilters notices the connection is closed -func (f *FilterManager) flushWsFilters() error { +func (f *FilterManager) flushWsFilters(subType subscriptionType) error { closedFilterIDs := make([]string, 0) f.RLock() for id, filter := range f.filters { - if !filter.hasWSConn() { + if !filter.hasWSConn() || filter.getSubscriptionType() != subType { continue } diff --git a/jsonrpc/filter_manager_test.go b/jsonrpc/filter_manager_test.go index d0835fb363..5edc4ccc20 100644 --- a/jsonrpc/filter_manager_test.go +++ b/jsonrpc/filter_manager_test.go @@ -12,6 +12,7 @@ import ( "time" "github.com/0xPolygon/polygon-edge/blockchain" + "github.com/0xPolygon/polygon-edge/txpool/proto" "github.com/0xPolygon/polygon-edge/types" "github.com/gorilla/websocket" "github.com/hashicorp/go-hclog" @@ -383,6 +384,56 @@ func TestFilterBlock(t *testing.T) { } } +func TestFilterPendingTx(t *testing.T) { + t.Parallel() + + store := newMockStore() + + m := NewFilterManager(hclog.NewNullLogger(), store, 1000) + defer m.Close() + + go m.Run() + + // add pending tx filter + id := m.NewPendingTxFilter(nil) + + // emit two events + store.emitTxPoolEvent(proto.EventType_ADDED, "evt1") + store.emitTxPoolEvent(proto.EventType_ADDED, "evt2") + + // we need to wait for the manager to process the data + time.Sleep(500 * time.Millisecond) + + var res interface{} + + var fetchErr error + + if res, fetchErr = m.GetFilterChanges(id); fetchErr != nil { + t.Fatalf("Unable to get filter changes, %v", fetchErr) + } + + txHashes, ok := res.([]string) + require.True(t, ok) + require.Equal(t, 2, len(txHashes)) + require.Equal(t, "evt1", txHashes[0]) + require.Equal(t, "evt2", txHashes[1]) + + // emit one more event, it should not return the + // first two hashes + store.emitTxPoolEvent(proto.EventType_ADDED, "evt3") + time.Sleep(500 * time.Millisecond) + + if res, fetchErr = m.GetFilterChanges(id); fetchErr != nil { + t.Fatalf("Unable to get filter changes, %v", fetchErr) + } + + txHashes, ok = res.([]string) + require.True(t, ok) + require.NotNil(t, txHashes) + require.Equal(t, 1, len(txHashes)) + require.Equal(t, "evt3", txHashes[0]) +} + func TestFilterTimeout(t *testing.T) { t.Parallel() @@ -528,7 +579,7 @@ func TestFilterWebsocket(t *testing.T) { _, err := m.GetFilterChanges(id) assert.Equal(t, err, ErrWSFilterDoesNotSupportGetChanges) - // emit two events + // emit event store.emitEvent(&mockEvent{ NewChain: []*mockHeader{ { @@ -546,6 +597,34 @@ func TestFilterWebsocket(t *testing.T) { } } +func TestFilterPendingTxWebsocket(t *testing.T) { + t.Parallel() + + store := newMockStore() + + mock, msgCh := newMockWsConnWithMsgCh() + + m := NewFilterManager(hclog.NewNullLogger(), store, 1000) + defer m.Close() + + go m.Run() + + id := m.NewPendingTxFilter(mock) + + // we cannot call get filter changes for a websocket filter + _, err := m.GetFilterChanges(id) + assert.Equal(t, err, ErrWSFilterDoesNotSupportGetChanges) + + // emit event + store.emitTxPoolEvent(proto.EventType_ADDED, "evt1") + + select { + case <-msgCh: + case <-time.After(2 * time.Second): + t.Fatal("bad") + } +} + type mockWsConn struct { SetFilterIDFn func(string) GetFilterIDFn func() string diff --git a/jsonrpc/mocks_test.go b/jsonrpc/mocks_test.go index 1078090d39..4922c7c81c 100644 --- a/jsonrpc/mocks_test.go +++ b/jsonrpc/mocks_test.go @@ -5,6 +5,7 @@ import ( "sync" "github.com/0xPolygon/polygon-edge/blockchain" + "github.com/0xPolygon/polygon-edge/txpool/proto" "github.com/0xPolygon/polygon-edge/types" ) @@ -44,11 +45,12 @@ type mockEvent struct { type mockStore struct { JSONRPCStore - header *types.Header - subscription *blockchain.MockSubscription - receiptsLock sync.Mutex - receipts map[types.Hash][]*types.Receipt - accounts map[types.Address]*Account + header *types.Header + subscription *blockchain.MockSubscription + txPoolChannel chan *proto.TxPoolEvent + receiptsLock sync.Mutex + receipts map[types.Hash][]*types.Receipt + accounts map[types.Address]*Account // headers is the list of historical headers historicalHeaders []*types.Header @@ -56,9 +58,10 @@ type mockStore struct { func newMockStore() *mockStore { m := &mockStore{ - header: &types.Header{Number: 0}, - subscription: blockchain.NewMockSubscription(), - accounts: map[types.Address]*Account{}, + header: &types.Header{Number: 0}, + subscription: blockchain.NewMockSubscription(), + accounts: map[types.Address]*Account{}, + txPoolChannel: make(chan *proto.TxPoolEvent), } m.addHeader(m.header) @@ -108,6 +111,15 @@ func (m *mockStore) emitEvent(evnt *mockEvent) { m.subscription.Push(bEvnt) } +func (m *mockStore) emitTxPoolEvent(eventType proto.EventType, txHash string) { + evt := &proto.TxPoolEvent{ + Type: eventType, + TxHash: txHash, + } + + m.txPoolChannel <- evt +} + func (m *mockStore) GetAccount(root types.Hash, addr types.Address) (*Account, error) { if acc, ok := m.accounts[addr]; ok { return acc, nil @@ -137,6 +149,14 @@ func (m *mockStore) SubscribeEvents() blockchain.Subscription { return m.subscription } +func (m *mockStore) TxPoolSubscribe(request *proto.SubscribeRequest) (<-chan *proto.TxPoolEvent, func(), error) { + txPoolUnsubscribe := func() { + close(m.txPoolChannel) + } + + return m.txPoolChannel, txPoolUnsubscribe, nil +} + func (m *mockStore) GetHeaderByNumber(num uint64) (*types.Header, bool) { header := m.headerLoop(func(header *types.Header) bool { return header.Number == num diff --git a/txpool/operator.go b/txpool/operator.go index 3bd3136daa..4d7bdb5186 100644 --- a/txpool/operator.go +++ b/txpool/operator.go @@ -82,3 +82,18 @@ func (p *TxPool) Subscribe( } } } + +// TxPoolSubscribe subscribes to new events in the tx pool and returns subscription channel and unsubscribe fn +func (p *TxPool) TxPoolSubscribe(request *proto.SubscribeRequest) (<-chan *proto.TxPoolEvent, func(), error) { + if err := request.ValidateAll(); err != nil { + return nil, nil, err + } + + subscription := p.eventManager.subscribe(request.Types) + + cancelSubscription := func() { + p.eventManager.cancelSubscription(subscription.subscriptionID) + } + + return subscription.subscriptionChannel, cancelSubscription, nil +}