Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

allow multiple log subscriptions at the same time #5358

Merged
merged 1 commit into from
Sep 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
245 changes: 245 additions & 0 deletions ethdb/privateapi/logsfilter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,245 @@
package privateapi

import (
"context"
"testing"

"github.com/ledgerwatch/erigon-lib/gointerfaces"
"github.com/ledgerwatch/erigon-lib/gointerfaces/remote"
types2 "github.com/ledgerwatch/erigon-lib/gointerfaces/types"
"google.golang.org/grpc"

"github.com/ledgerwatch/erigon/common"
)

var (
address1 = common.HexToHash("0xdac17f958d2ee523a2206206994597c13d831ec7")
topic1 = common.HexToHash("0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef")
address160 *types2.H160
topic1H256 *types2.H256
)

func init() {
var a common.Address
a.SetBytes(address1.Bytes())
address160 = gointerfaces.ConvertAddressToH160(a)
topic1H256 = gointerfaces.ConvertHashToH256(topic1)
}

type testServer struct {
received chan *remote.LogsFilterRequest
receiveCompleted chan struct{}
sent []*remote.SubscribeLogsReply
ctx context.Context
grpc.ServerStream
}

func (ts *testServer) Send(m *remote.SubscribeLogsReply) error {
ts.sent = append(ts.sent, m)
return nil
}

func (ts *testServer) Recv() (*remote.LogsFilterRequest, error) {
// notify complete when the last request has been processed
defer func() {
if len(ts.received) == 0 {
ts.receiveCompleted <- struct{}{}
}
}()

return <-ts.received, nil
}

func createLog() *remote.SubscribeLogsReply {
return &remote.SubscribeLogsReply{
Address: gointerfaces.ConvertAddressToH160([20]byte{}),
BlockHash: gointerfaces.ConvertHashToH256([32]byte{}),
BlockNumber: 0,
Data: []byte{},
LogIndex: 0,
Topics: []*types2.H256{gointerfaces.ConvertHashToH256([32]byte{99, 99})},
TransactionHash: gointerfaces.ConvertHashToH256([32]byte{}),
TransactionIndex: 0,
Removed: false,
}
}

func TestLogsFilter_EmptyFilter_DoesNotDistributeAnything(t *testing.T) {
events := NewEvents()
agg := NewLogsFilterAggregator(events)

srv := &testServer{
received: make(chan *remote.LogsFilterRequest, 256),
receiveCompleted: make(chan struct{}, 1),
sent: make([]*remote.SubscribeLogsReply, 0),
ctx: context.Background(),
ServerStream: nil,
}

req1 := &remote.LogsFilterRequest{
AllAddresses: false,
Addresses: nil,
AllTopics: false,
Topics: nil,
}
srv.received <- req1

go func() {
err := agg.subscribeLogs(srv)
if err != nil {
t.Error(err)
}
}()

<-srv.receiveCompleted

// now see if a log would be sent or not
log := createLog()
agg.distributeLogs([]*remote.SubscribeLogsReply{log})

if len(srv.sent) != 0 {
t.Error("expected the sent slice to be empty")
}
}

func TestLogsFilter_AllAddressesAndTopicsFilter_DistributesLogRegardless(t *testing.T) {
events := NewEvents()
agg := NewLogsFilterAggregator(events)

srv := &testServer{
received: make(chan *remote.LogsFilterRequest, 256),
receiveCompleted: make(chan struct{}, 1),
sent: make([]*remote.SubscribeLogsReply, 0),
ctx: context.Background(),
ServerStream: nil,
}

req1 := &remote.LogsFilterRequest{
AllAddresses: true,
Addresses: nil,
AllTopics: true,
Topics: nil,
}
srv.received <- req1

go func() {
err := agg.subscribeLogs(srv)
if err != nil {
t.Error(err)
}
}()

<-srv.receiveCompleted

// now see if a log would be sent or not
log := createLog()
agg.distributeLogs([]*remote.SubscribeLogsReply{log})

if len(srv.sent) != 1 {
t.Error("expected the sent slice to have the log present")
}

log = createLog()
log.Topics = []*types2.H256{topic1H256}
agg.distributeLogs([]*remote.SubscribeLogsReply{log})
if len(srv.sent) != 2 {
t.Error("expected any topic to be allowed through the filter")
}

log = createLog()
log.Address = address160
agg.distributeLogs([]*remote.SubscribeLogsReply{log})
if len(srv.sent) != 3 {
t.Error("expected any address to be allowed through the filter")
}
}

func TestLogsFilter_TopicFilter_OnlyAllowsThatTopicThrough(t *testing.T) {
events := NewEvents()
agg := NewLogsFilterAggregator(events)

srv := &testServer{
received: make(chan *remote.LogsFilterRequest, 256),
receiveCompleted: make(chan struct{}, 1),
sent: make([]*remote.SubscribeLogsReply, 0),
ctx: context.Background(),
ServerStream: nil,
}

req1 := &remote.LogsFilterRequest{
AllAddresses: true, // need to allow all addresses on the request else it will filter on them
Addresses: nil,
AllTopics: false,
Topics: []*types2.H256{topic1H256},
}
srv.received <- req1

go func() {
err := agg.subscribeLogs(srv)
if err != nil {
t.Error(err)
}
}()

<-srv.receiveCompleted

// now see if a log would be sent or not
log := createLog()
agg.distributeLogs([]*remote.SubscribeLogsReply{log})

if len(srv.sent) != 0 {
t.Error("the sent slice should be empty as the topic didn't match")
}

log = createLog()
log.Topics = []*types2.H256{topic1H256}
agg.distributeLogs([]*remote.SubscribeLogsReply{log})
if len(srv.sent) != 1 {
t.Error("expected the log to be distributed as the topic matched")
}
}

func TestLogsFilter_AddressFilter_OnlyAllowsThatAddressThrough(t *testing.T) {
events := NewEvents()
agg := NewLogsFilterAggregator(events)

srv := &testServer{
received: make(chan *remote.LogsFilterRequest, 256),
receiveCompleted: make(chan struct{}, 1),
sent: make([]*remote.SubscribeLogsReply, 0),
ctx: context.Background(),
ServerStream: nil,
}

req1 := &remote.LogsFilterRequest{
AllAddresses: false,
Addresses: []*types2.H160{address160},
AllTopics: true,
Topics: []*types2.H256{},
}
srv.received <- req1

go func() {
err := agg.subscribeLogs(srv)
if err != nil {
t.Error(err)
}
}()

<-srv.receiveCompleted

// now see if a log would be sent or not
log := createLog()
agg.distributeLogs([]*remote.SubscribeLogsReply{log})

if len(srv.sent) != 0 {
t.Error("the sent slice should be empty as the address didn't match")
}

log = createLog()
log.Address = address160
agg.distributeLogs([]*remote.SubscribeLogsReply{log})
if len(srv.sent) != 1 {
t.Error("expected the log to be distributed as the address matched")
}
}
27 changes: 8 additions & 19 deletions turbo/rpchelper/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,9 +397,11 @@ func (ff *Filters) SubscribeLogs(out chan *types.Log, crit filters.FilterCriteri
}
f.topicsOriginal = crit.Topics
ff.logsSubs.addLogsFilters(f)
// if any filter in the aggregate needs all addresses or all topics then the global log subscription needs to
// allow all addresses or topics through
lfr := &remote.LogsFilterRequest{
AllAddresses: ff.logsSubs.aggLogsFilter.allAddrs == 1,
AllTopics: ff.logsSubs.aggLogsFilter.allTopics == 1,
AllAddresses: ff.logsSubs.aggLogsFilter.allAddrs >= 1,
AllTopics: ff.logsSubs.aggLogsFilter.allTopics >= 1,
}

addresses, topics := ff.logsSubs.getAggMaps()
Expand Down Expand Up @@ -430,9 +432,11 @@ func (ff *Filters) loadLogsRequester() any {

func (ff *Filters) UnsubscribeLogs(id LogsSubID) bool {
isDeleted := ff.logsSubs.removeLogsFilter(id)
// if any filters in the aggregate need all addresses or all topics then the request to the central
// log subscription needs to honour this
lfr := &remote.LogsFilterRequest{
AllAddresses: ff.logsSubs.aggLogsFilter.allAddrs == 1,
AllTopics: ff.logsSubs.aggLogsFilter.allTopics == 1,
AllAddresses: ff.logsSubs.aggLogsFilter.allAddrs >= 1,
AllTopics: ff.logsSubs.aggLogsFilter.allTopics >= 1,
}

addresses, topics := ff.logsSubs.getAggMaps()
Expand Down Expand Up @@ -539,21 +543,6 @@ func (ff *Filters) OnNewTx(reply *txpool.OnAddReply) {
}

func (ff *Filters) OnNewLogs(reply *remote.SubscribeLogsReply) {
lg := &types.Log{
Address: gointerfaces.ConvertH160toAddress(reply.Address),
Data: reply.Data,
BlockNumber: reply.BlockNumber,
TxHash: gointerfaces.ConvertH256ToHash(reply.TransactionHash),
TxIndex: uint(reply.TransactionIndex),
BlockHash: gointerfaces.ConvertH256ToHash(reply.BlockHash),
Index: uint(reply.LogIndex),
Removed: reply.Removed,
}
t := make([]common.Hash, 0)
for _, v := range reply.Topics {
t = append(t, gointerfaces.ConvertH256ToHash(v))
}
lg.Topics = t
ff.logsSubs.distributeLog(reply)
}

Expand Down
Loading