Skip to content

Commit

Permalink
allow multiple log subscriptions at the same time (#5358)
Browse files Browse the repository at this point in the history
  • Loading branch information
hexoscott authored Sep 16, 2022
1 parent a8a104c commit cd8cad6
Show file tree
Hide file tree
Showing 3 changed files with 591 additions and 19 deletions.
245 changes: 245 additions & 0 deletions ethdb/privateapi/logsfilter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,245 @@
package privateapi

import (
"context"
"testing"

"github.com/ledgerwatch/erigon-lib/gointerfaces"
"github.com/ledgerwatch/erigon-lib/gointerfaces/remote"
types2 "github.com/ledgerwatch/erigon-lib/gointerfaces/types"
"google.golang.org/grpc"

"github.com/ledgerwatch/erigon/common"
)

var (
address1 = common.HexToHash("0xdac17f958d2ee523a2206206994597c13d831ec7")
topic1 = common.HexToHash("0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef")
address160 *types2.H160
topic1H256 *types2.H256
)

func init() {
var a common.Address
a.SetBytes(address1.Bytes())
address160 = gointerfaces.ConvertAddressToH160(a)
topic1H256 = gointerfaces.ConvertHashToH256(topic1)
}

type testServer struct {
received chan *remote.LogsFilterRequest
receiveCompleted chan struct{}
sent []*remote.SubscribeLogsReply
ctx context.Context
grpc.ServerStream
}

func (ts *testServer) Send(m *remote.SubscribeLogsReply) error {
ts.sent = append(ts.sent, m)
return nil
}

func (ts *testServer) Recv() (*remote.LogsFilterRequest, error) {
// notify complete when the last request has been processed
defer func() {
if len(ts.received) == 0 {
ts.receiveCompleted <- struct{}{}
}
}()

return <-ts.received, nil
}

func createLog() *remote.SubscribeLogsReply {
return &remote.SubscribeLogsReply{
Address: gointerfaces.ConvertAddressToH160([20]byte{}),
BlockHash: gointerfaces.ConvertHashToH256([32]byte{}),
BlockNumber: 0,
Data: []byte{},
LogIndex: 0,
Topics: []*types2.H256{gointerfaces.ConvertHashToH256([32]byte{99, 99})},
TransactionHash: gointerfaces.ConvertHashToH256([32]byte{}),
TransactionIndex: 0,
Removed: false,
}
}

func TestLogsFilter_EmptyFilter_DoesNotDistributeAnything(t *testing.T) {
events := NewEvents()
agg := NewLogsFilterAggregator(events)

srv := &testServer{
received: make(chan *remote.LogsFilterRequest, 256),
receiveCompleted: make(chan struct{}, 1),
sent: make([]*remote.SubscribeLogsReply, 0),
ctx: context.Background(),
ServerStream: nil,
}

req1 := &remote.LogsFilterRequest{
AllAddresses: false,
Addresses: nil,
AllTopics: false,
Topics: nil,
}
srv.received <- req1

go func() {
err := agg.subscribeLogs(srv)
if err != nil {
t.Error(err)
}
}()

<-srv.receiveCompleted

// now see if a log would be sent or not
log := createLog()
agg.distributeLogs([]*remote.SubscribeLogsReply{log})

if len(srv.sent) != 0 {
t.Error("expected the sent slice to be empty")
}
}

func TestLogsFilter_AllAddressesAndTopicsFilter_DistributesLogRegardless(t *testing.T) {
events := NewEvents()
agg := NewLogsFilterAggregator(events)

srv := &testServer{
received: make(chan *remote.LogsFilterRequest, 256),
receiveCompleted: make(chan struct{}, 1),
sent: make([]*remote.SubscribeLogsReply, 0),
ctx: context.Background(),
ServerStream: nil,
}

req1 := &remote.LogsFilterRequest{
AllAddresses: true,
Addresses: nil,
AllTopics: true,
Topics: nil,
}
srv.received <- req1

go func() {
err := agg.subscribeLogs(srv)
if err != nil {
t.Error(err)
}
}()

<-srv.receiveCompleted

// now see if a log would be sent or not
log := createLog()
agg.distributeLogs([]*remote.SubscribeLogsReply{log})

if len(srv.sent) != 1 {
t.Error("expected the sent slice to have the log present")
}

log = createLog()
log.Topics = []*types2.H256{topic1H256}
agg.distributeLogs([]*remote.SubscribeLogsReply{log})
if len(srv.sent) != 2 {
t.Error("expected any topic to be allowed through the filter")
}

log = createLog()
log.Address = address160
agg.distributeLogs([]*remote.SubscribeLogsReply{log})
if len(srv.sent) != 3 {
t.Error("expected any address to be allowed through the filter")
}
}

func TestLogsFilter_TopicFilter_OnlyAllowsThatTopicThrough(t *testing.T) {
events := NewEvents()
agg := NewLogsFilterAggregator(events)

srv := &testServer{
received: make(chan *remote.LogsFilterRequest, 256),
receiveCompleted: make(chan struct{}, 1),
sent: make([]*remote.SubscribeLogsReply, 0),
ctx: context.Background(),
ServerStream: nil,
}

req1 := &remote.LogsFilterRequest{
AllAddresses: true, // need to allow all addresses on the request else it will filter on them
Addresses: nil,
AllTopics: false,
Topics: []*types2.H256{topic1H256},
}
srv.received <- req1

go func() {
err := agg.subscribeLogs(srv)
if err != nil {
t.Error(err)
}
}()

<-srv.receiveCompleted

// now see if a log would be sent or not
log := createLog()
agg.distributeLogs([]*remote.SubscribeLogsReply{log})

if len(srv.sent) != 0 {
t.Error("the sent slice should be empty as the topic didn't match")
}

log = createLog()
log.Topics = []*types2.H256{topic1H256}
agg.distributeLogs([]*remote.SubscribeLogsReply{log})
if len(srv.sent) != 1 {
t.Error("expected the log to be distributed as the topic matched")
}
}

func TestLogsFilter_AddressFilter_OnlyAllowsThatAddressThrough(t *testing.T) {
events := NewEvents()
agg := NewLogsFilterAggregator(events)

srv := &testServer{
received: make(chan *remote.LogsFilterRequest, 256),
receiveCompleted: make(chan struct{}, 1),
sent: make([]*remote.SubscribeLogsReply, 0),
ctx: context.Background(),
ServerStream: nil,
}

req1 := &remote.LogsFilterRequest{
AllAddresses: false,
Addresses: []*types2.H160{address160},
AllTopics: true,
Topics: []*types2.H256{},
}
srv.received <- req1

go func() {
err := agg.subscribeLogs(srv)
if err != nil {
t.Error(err)
}
}()

<-srv.receiveCompleted

// now see if a log would be sent or not
log := createLog()
agg.distributeLogs([]*remote.SubscribeLogsReply{log})

if len(srv.sent) != 0 {
t.Error("the sent slice should be empty as the address didn't match")
}

log = createLog()
log.Address = address160
agg.distributeLogs([]*remote.SubscribeLogsReply{log})
if len(srv.sent) != 1 {
t.Error("expected the log to be distributed as the address matched")
}
}
27 changes: 8 additions & 19 deletions turbo/rpchelper/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,9 +397,11 @@ func (ff *Filters) SubscribeLogs(out chan *types.Log, crit filters.FilterCriteri
}
f.topicsOriginal = crit.Topics
ff.logsSubs.addLogsFilters(f)
// if any filter in the aggregate needs all addresses or all topics then the global log subscription needs to
// allow all addresses or topics through
lfr := &remote.LogsFilterRequest{
AllAddresses: ff.logsSubs.aggLogsFilter.allAddrs == 1,
AllTopics: ff.logsSubs.aggLogsFilter.allTopics == 1,
AllAddresses: ff.logsSubs.aggLogsFilter.allAddrs >= 1,
AllTopics: ff.logsSubs.aggLogsFilter.allTopics >= 1,
}

addresses, topics := ff.logsSubs.getAggMaps()
Expand Down Expand Up @@ -430,9 +432,11 @@ func (ff *Filters) loadLogsRequester() any {

func (ff *Filters) UnsubscribeLogs(id LogsSubID) bool {
isDeleted := ff.logsSubs.removeLogsFilter(id)
// if any filters in the aggregate need all addresses or all topics then the request to the central
// log subscription needs to honour this
lfr := &remote.LogsFilterRequest{
AllAddresses: ff.logsSubs.aggLogsFilter.allAddrs == 1,
AllTopics: ff.logsSubs.aggLogsFilter.allTopics == 1,
AllAddresses: ff.logsSubs.aggLogsFilter.allAddrs >= 1,
AllTopics: ff.logsSubs.aggLogsFilter.allTopics >= 1,
}

addresses, topics := ff.logsSubs.getAggMaps()
Expand Down Expand Up @@ -539,21 +543,6 @@ func (ff *Filters) OnNewTx(reply *txpool.OnAddReply) {
}

func (ff *Filters) OnNewLogs(reply *remote.SubscribeLogsReply) {
lg := &types.Log{
Address: gointerfaces.ConvertH160toAddress(reply.Address),
Data: reply.Data,
BlockNumber: reply.BlockNumber,
TxHash: gointerfaces.ConvertH256ToHash(reply.TransactionHash),
TxIndex: uint(reply.TransactionIndex),
BlockHash: gointerfaces.ConvertH256ToHash(reply.BlockHash),
Index: uint(reply.LogIndex),
Removed: reply.Removed,
}
t := make([]common.Hash, 0)
for _, v := range reply.Topics {
t = append(t, gointerfaces.ConvertH256ToHash(v))
}
lg.Topics = t
ff.logsSubs.distributeLog(reply)
}

Expand Down
Loading

0 comments on commit cd8cad6

Please sign in to comment.