Skip to content

Commit

Permalink
Implementation for newPendingTransactions (eth_subscribe) (#1787)
Browse files Browse the repository at this point in the history
* Implementation for  newPendingTransactions(eth_subscribe)
  • Loading branch information
stana-miric authored Aug 9, 2023
1 parent 032e5b9 commit 761bc22
Show file tree
Hide file tree
Showing 7 changed files with 318 additions and 35 deletions.
2 changes: 2 additions & 0 deletions jsonrpc/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,8 @@ func (d *Dispatcher) handleSubscribe(req Request, conn wsConn) (string, Error) {
return "", NewInternalError(err.Error())
}
filterID = d.filterManager.NewLogFilter(logQuery, conn)
} else if subscribeMethod == "newPendingTransactions" {
filterID = d.filterManager.NewPendingTxFilter(conn)
} else {
return "", NewSubscriptionNotFoundError(subscribeMethod)
}
Expand Down
54 changes: 38 additions & 16 deletions jsonrpc/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"testing"
"time"

"github.com/0xPolygon/polygon-edge/txpool/proto"
"github.com/0xPolygon/polygon-edge/types"
"github.com/hashicorp/go-hclog"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -60,29 +61,29 @@ func expectBatchJSONResult(data []byte, v interface{}) error {
func TestDispatcher_HandleWebsocketConnection_EthSubscribe(t *testing.T) {
t.Parallel()

t.Run("clients should be able to receive \"newHeads\" event thru eth_subscribe", func(t *testing.T) {
store := newMockStore()
dispatcher := newTestDispatcher(t,
hclog.NewNullLogger(),
store,
&dispatcherParams{
chainID: 0,
priceLimit: 0,
jsonRPCBatchLengthLimit: 20,
blockRangeLimit: 1000,
},
)

t.Run("clients should be able to receive \"newHeads\" event through eth_subscribe", func(t *testing.T) {
t.Parallel()

store := newMockStore()
dispatcher := newTestDispatcher(t,
hclog.NewNullLogger(),
store,
&dispatcherParams{
chainID: 0,
priceLimit: 0,
jsonRPCBatchLengthLimit: 20,
blockRangeLimit: 1000,
},
)
mockConnection, msgCh := newMockWsConnWithMsgCh()

req := []byte(`{
"method": "eth_subscribe",
"params": ["newHeads"]
}`)
if _, err := dispatcher.HandleWs(req, mockConnection); err != nil {
t.Fatal(err)
}
_, err := dispatcher.HandleWs(req, mockConnection)
require.NoError(t, err)

store.emitEvent(&mockEvent{
NewChain: []*mockHeader{
Expand All @@ -100,6 +101,27 @@ func TestDispatcher_HandleWebsocketConnection_EthSubscribe(t *testing.T) {
t.Fatal("\"newHeads\" event not received in 2 seconds")
}
})

t.Run("clients should be able to receive \"newPendingTransactions\" event through eth_subscribe", func(t *testing.T) {
t.Parallel()

mockConnection, msgCh := newMockWsConnWithMsgCh()

req := []byte(`{
"method": "eth_subscribe",
"params": ["newPendingTransactions"]
}`)
_, err := dispatcher.HandleWs(req, mockConnection)
require.NoError(t, err)

store.emitTxPoolEvent(proto.EventType_ADDED, "evt1")

select {
case <-msgCh:
case <-time.After(2 * time.Second):
t.Fatal("\"newPendingTransactions\" event not received in 2 seconds")
}
})
}

func TestDispatcher_WebsocketConnection_RequestFormats(t *testing.T) {
Expand Down Expand Up @@ -289,7 +311,7 @@ func TestDispatcherFuncDecode(t *testing.T) {
for _, c := range cases {
res := handleReq(c.typ, c.msg)
if !reflect.DeepEqual(res, c.res) {
t.Fatal("bad")
t.Fatal("no tx pool events received in the predefined time slot")
}
}
}
Expand Down
5 changes: 5 additions & 0 deletions jsonrpc/eth_blockchain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/0xPolygon/polygon-edge/helper/hex"
"github.com/0xPolygon/polygon-edge/helper/progress"
"github.com/0xPolygon/polygon-edge/state/runtime"
"github.com/0xPolygon/polygon-edge/txpool/proto"
"github.com/0xPolygon/polygon-edge/types"
"github.com/stretchr/testify/assert"
)
Expand Down Expand Up @@ -589,6 +590,10 @@ func (m *mockBlockStore) FilterExtra(extra []byte) ([]byte, error) {
return extra, nil
}

func (m *mockBlockStore) TxPoolSubscribe(request *proto.SubscribeRequest) (<-chan *proto.TxPoolEvent, func(), error) {
return nil, nil, nil
}

func newTestBlock(number uint64, hash types.Hash) *types.Block {
return &types.Block{
Header: &types.Header{
Expand Down
160 changes: 150 additions & 10 deletions jsonrpc/filter_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

"github.com/0xPolygon/polygon-edge/blockchain"
"github.com/0xPolygon/polygon-edge/txpool/proto"
"github.com/0xPolygon/polygon-edge/types"
"github.com/google/uuid"
"github.com/gorilla/websocket"
Expand All @@ -25,6 +26,7 @@ var (
ErrIncorrectBlockRange = errors.New("incorrect range")
ErrBlockRangeTooHigh = errors.New("block range too high")
ErrNoWSConnection = errors.New("no websocket connection")
ErrUnknownSubscriptionType = errors.New("unknown subscription type")
)

// defaultTimeout is the timeout to remove the filters that don't have a web socket stream
Expand All @@ -35,6 +37,16 @@ const (
NoIndexInHeap = -1
)

// subscriptionType determines which event type the filter is subscribed to
type subscriptionType byte

const (
// Blocks represents subscription type for blockchain events
Blocks subscriptionType = iota
// PendingTransactions represents subscription type for tx pool events
PendingTransactions
)

// filter is an interface that BlockFilter and LogFilter implement
type filter interface {
// hasWSConn returns the flag indicating the filter has web socket stream
Expand All @@ -43,6 +55,9 @@ type filter interface {
// getFilterBase returns filterBase that has common fields
getFilterBase() *filterBase

// getSubscriptionType returns the type of the event the filter is subscribed to
getSubscriptionType() subscriptionType

// getUpdates returns stored data in a JSON serializable form
getUpdates() (interface{}, error)

Expand Down Expand Up @@ -159,6 +174,11 @@ func (f *blockFilter) sendUpdates() error {
return nil
}

// getSubscriptionType returns the type of the event the filter is subscribed to
func (f *blockFilter) getSubscriptionType() subscriptionType {
return Blocks
}

// logFilter is a filter to store logs that meet the conditions in query
type logFilter struct {
filterBase
Expand Down Expand Up @@ -212,6 +232,63 @@ func (f *logFilter) sendUpdates() error {
return nil
}

// getSubscriptionType returns the type of the event the filter is subscribed to
func (f *logFilter) getSubscriptionType() subscriptionType {
return Blocks
}

// pendingTxFilter is a filter to store pending tx
type pendingTxFilter struct {
filterBase
sync.Mutex

txHashes []string
}

// appendPendingTxHashes appends new pending tx hash to tx hashes
func (f *pendingTxFilter) appendPendingTxHashes(txHash string) {
f.Lock()
defer f.Unlock()

f.txHashes = append(f.txHashes, txHash)
}

// takePendingTxsUpdates returns all saved pending tx hashes in filter and sets a new slice
func (f *pendingTxFilter) takePendingTxsUpdates() []string {
f.Lock()
defer f.Unlock()

txHashes := f.txHashes
f.txHashes = []string{}

return txHashes
}

// getSubscriptionType returns the type of the event the filter is subscribed to
func (f *pendingTxFilter) getSubscriptionType() subscriptionType {
return PendingTransactions
}

// getUpdates returns stored pending tx hashes
func (f *pendingTxFilter) getUpdates() (interface{}, error) {
pendingTxHashes := f.takePendingTxsUpdates()

return pendingTxHashes, nil
}

// sendUpdates write the hashes for all pending transactions to web socket stream
func (f *pendingTxFilter) sendUpdates() error {
pendingTxHashes := f.takePendingTxsUpdates()

for _, txHash := range pendingTxHashes {
if err := f.writeMessageToWs(txHash); err != nil {
return err
}
}

return nil
}

// filterManagerStore provides methods required by FilterManager
type filterManagerStore interface {
// Header returns the current header of the chain (genesis if empty)
Expand All @@ -228,6 +305,9 @@ type filterManagerStore interface {

// GetBlockByNumber returns a block using the provided number
GetBlockByNumber(num uint64, full bool) (*types.Block, bool)

// TxPoolSubscribe subscribes for tx pool events
TxPoolSubscribe(request *proto.SubscribeRequest) (<-chan *proto.TxPoolEvent, func(), error)
}

// FilterManager manages all running filters
Expand Down Expand Up @@ -279,18 +359,32 @@ func NewFilterManager(logger hclog.Logger, store filterManagerStore, blockRangeL
// Run starts worker process to handle events
func (f *FilterManager) Run() {
// watch for new events in the blockchain
watchCh := make(chan *blockchain.Event)
blockWatchCh := make(chan *blockchain.Event)

go func() {
for {
evnt := f.subscription.GetEvent()
if evnt == nil {
return
}
watchCh <- evnt
blockWatchCh <- evnt
}
}()

// watch for new events in the tx pool
txRequest := &proto.SubscribeRequest{
Types: []proto.EventType{proto.EventType_ADDED},
}

txWatchCh, txPoolUnsubscribe, err := f.store.TxPoolSubscribe(txRequest)
if err != nil {
f.logger.Error("Unable to subscribe to tx pool")

return
}

defer txPoolUnsubscribe()

var timeoutCh <-chan time.Time

for {
Expand All @@ -303,10 +397,16 @@ func (f *FilterManager) Run() {
}

select {
case evnt := <-watchCh:
case evnt := <-blockWatchCh:
// new blockchain event
if err := f.dispatchEvent(evnt); err != nil {
f.logger.Error("failed to dispatch event", "err", err)
f.logger.Error("failed to dispatch block event", "err", err)
}

case evnt := <-txWatchCh:
// new tx pool event
if err := f.dispatchEvent(evnt); err != nil {
f.logger.Error("failed to dispatch tx pool event", "err", err)
}

case <-timeoutCh:
Expand Down Expand Up @@ -359,6 +459,20 @@ func (f *FilterManager) NewLogFilter(logQuery *LogQuery, ws wsConn) string {
return f.addFilter(filter)
}

// NewPendingTxFilter adds new PendingTxFilter
func (f *FilterManager) NewPendingTxFilter(ws wsConn) string {
filter := &pendingTxFilter{
filterBase: newFilterBase(ws),
txHashes: []string{},
}

if filter.hasWSConn() {
ws.SetFilterID(filter.id)
}

return f.addFilter(filter)
}

// Exists checks the filter with given ID exists
func (f *FilterManager) Exists(id string) bool {
f.RLock()
Expand Down Expand Up @@ -621,20 +735,34 @@ func (f *FilterManager) nextTimeoutFilter() (string, time.Time) {
}

// dispatchEvent is an event handler for new block event
func (f *FilterManager) dispatchEvent(evnt *blockchain.Event) error {
func (f *FilterManager) dispatchEvent(evnt interface{}) error {
var subType subscriptionType

// store new event in each filters
f.processEvent(evnt)
switch evt := evnt.(type) {
case *blockchain.Event:
f.processBlockEvent(evt)

subType = Blocks
case *proto.TxPoolEvent:
f.processTxEvent(evt)

subType = PendingTransactions

default:
return ErrUnknownSubscriptionType
}

// send data to web socket stream
if err := f.flushWsFilters(); err != nil {
if err := f.flushWsFilters(subType); err != nil {
return err
}

return nil
}

// processEvent makes each filter append the new data that interests them
func (f *FilterManager) processEvent(evnt *blockchain.Event) {
func (f *FilterManager) processBlockEvent(evnt *blockchain.Event) {
f.RLock()
defer f.RUnlock()

Expand Down Expand Up @@ -710,15 +838,27 @@ func (f *FilterManager) appendLogsToFilters(header *block) error {
return nil
}

// processTxEvent makes each filter refresh the pending tx hashes
func (f *FilterManager) processTxEvent(evnt *proto.TxPoolEvent) {
f.RLock()
defer f.RUnlock()

for _, f := range f.filters {
if txFilter, ok := f.(*pendingTxFilter); ok {
txFilter.appendPendingTxHashes(evnt.TxHash)
}
}
}

// flushWsFilters make each filters with web socket connection write the updates to web socket stream
// flushWsFilters also removes the filters if flushWsFilters notices the connection is closed
func (f *FilterManager) flushWsFilters() error {
func (f *FilterManager) flushWsFilters(subType subscriptionType) error {
closedFilterIDs := make([]string, 0)

f.RLock()

for id, filter := range f.filters {
if !filter.hasWSConn() {
if !filter.hasWSConn() || filter.getSubscriptionType() != subType {
continue
}

Expand Down
Loading

0 comments on commit 761bc22

Please sign in to comment.