Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(proxyd): add filter polling support via RPC #105

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions proxyd/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -763,12 +763,12 @@ func (bg *BackendGroup) Primaries() []*Backend {
}

// NOTE: BackendGroup Forward contains the log for balancing with consensus aware
func (bg *BackendGroup) Forward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool) ([]*RPCRes, string, error) {
func (bg *BackendGroup) Forward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool, targetBackend string) ([]*RPCRes, string, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It its possible to add targeted backend to the context of the request instead of passing in another argument?

if len(rpcReqs) == 0 {
return nil, "", nil
}

backends := bg.orderedBackendsForRequest()
backends := bg.orderedBackendsForRequest(targetBackend)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It it even worth it to pass the name into bg.orderedBackendsForRequest? Could it just look up the target backend name and skip orderedBackendForRequest


overriddenResponses := make([]*indexedReqRes, 0)
rewrittenReqs := make([]*RPCReq, 0, len(rpcReqs))
Expand Down Expand Up @@ -999,13 +999,19 @@ func weightedShuffle(backends []*Backend) {
weightedshuffle.ShuffleInplace(backends, weight, nil)
}

func (bg *BackendGroup) orderedBackendsForRequest() []*Backend {
if bg.Consensus != nil {
func (bg *BackendGroup) orderedBackendsForRequest(targetBackend string) []*Backend {
// ignore consensus load balancing if targetBackend is set - no backend other than targetBackend can
// serve the requests
if bg.Consensus != nil && targetBackend == "" {
return bg.loadBalancedConsensusGroup()
} else {
healthy := make([]*Backend, 0, len(bg.Backends))
unhealthy := make([]*Backend, 0, len(bg.Backends))
for _, be := range bg.Backends {
if targetBackend != "" && be.Name != targetBackend {
continue
}

if be.IsHealthy() {
healthy = append(healthy, be)
} else {
Expand Down
3 changes: 3 additions & 0 deletions proxyd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ type ServerConfig struct {
EnablePprof bool `toml:"enable_pprof"`
EnableXServedByHeader bool `toml:"enable_served_by_header"`
AllowAllOrigins bool `toml:"allow_all_origins"`

// FilterTimeoutSeconds specifies the maximum time to keep a filter for that has not been polled for changes.
FilterTimeoutSeconds int `toml:"filter_timeout_seconds"`
Comment on lines +33 to +34
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please make this feature of filter tracking configurable on and off with a boolean

}

type CacheConfig struct {
Expand Down
67 changes: 67 additions & 0 deletions proxyd/integration_tests/filter_rpc_routing_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package integration_tests

import (
"fmt"
"github.com/ethereum-optimism/infra/proxyd"
"github.com/stretchr/testify/require"
"net/http"
"os"
"testing"
)

func TestFilterRpcRouting(t *testing.T) {
backend1 := NewMockBackend(nil)
backend2 := NewMockBackend(nil)
defer backend1.Close()
defer backend2.Close()

require.NoError(t, os.Setenv("NODE1_URL", backend1.URL()))
require.NoError(t, os.Setenv("NODE2_URL", backend2.URL()))

config := ReadConfig("filter_rpc_routing")
client := NewProxydClient("http://127.0.0.1:8545")
_, shutdown, err := proxyd.Start(config)
require.NoError(t, err)
defer shutdown()

filterId := "0x11414222354634635214124"
newFilterResponse := fmt.Sprintf(`{"jsonrpc":"2.0","result":"%s","id":1}`, filterId)
getFilterChangesResponse1 := `{"jsonrpc":"2.0","result":["0xffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff"],"id":1}`
getFilterChangesResponse2 := `{"jsonrpc":"2.0","result":["0xeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee"],"id":1}`

responseQueue := make(chan string, 3)
responseQueue <- newFilterResponse
responseQueue <- getFilterChangesResponse1
responseQueue <- getFilterChangesResponse2

handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
SingleResponseHandler(200, <-responseQueue)(w, r)
})

backend1.SetHandler(handler)
backend2.SetHandler(handler)

res, statusCode, err := client.SendRPC("eth_newBlockFilter", nil)
require.NoError(t, err)
require.Equal(t, 200, statusCode)

var selectedBackend *MockBackend
if len(backend1.Requests()) > 0 {
selectedBackend = backend1
} else {
selectedBackend = backend2
}

require.Equal(t, 1, len(selectedBackend.Requests()))
RequireEqualJSON(t, []byte(newFilterResponse), res)

res, statusCode, err = client.SendRPC("eth_getFilterChanges", []interface{}{filterId})

require.Equal(t, 2, len(selectedBackend.Requests()))
RequireEqualJSON(t, []byte(getFilterChangesResponse1), res)

res, statusCode, err = client.SendRPC("eth_getFilterChanges", []interface{}{filterId})

require.Equal(t, 3, len(selectedBackend.Requests()))
RequireEqualJSON(t, []byte(getFilterChangesResponse2), res)
}
20 changes: 20 additions & 0 deletions proxyd/integration_tests/testdata/filter_rpc_routing.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
[server]
rpc_port = 8545

[backends]
[backends.first]
rpc_url = "$NODE1_URL"

[backends.second]
rpc_url = "$NODE1_URL"

[backend_groups]
[backend_groups.main]
backends = ["first", "second"]

[rpc_method_mappings]
eth_newFilter = "main"
eth_newBlockFilter = "main"
eth_uninstallFilter = "main"
eth_getFilterChanges = "main"
eth_getFilterLogs = "main"
1 change: 1 addition & 0 deletions proxyd/proxyd.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,7 @@ func Start(config *Config) (*Server, func(), error) {
config.Server.MaxRequestBodyLogLen,
config.BatchConfig.MaxSize,
limiterFactory,
secondsToDuration(config.Server.FilterTimeoutSeconds),
)
if err != nil {
return nil, nil, fmt.Errorf("error creating server: %w", err)
Expand Down
101 changes: 96 additions & 5 deletions proxyd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ const (
defaultWSReadTimeout = 2 * time.Minute
defaultWSWriteTimeout = 10 * time.Second
defaultCacheTtl = 1 * time.Hour
defaultFilterTimeout = 10 * time.Minute
maxRequestBodyLogLen = 2000
defaultMaxUpstreamBatchSize = 10
defaultRateLimitHeader = "X-Forwarded-For"
Expand Down Expand Up @@ -76,6 +77,9 @@ type Server struct {
cache RPCCache
srvMu sync.Mutex
rateLimitHeader string
filtersMu sync.Mutex
filters map[string]*filter // filterID -> filter
filterTimeout time.Duration
}

type limiterFunc func(method string) bool
Expand All @@ -99,6 +103,7 @@ func NewServer(
maxRequestBodyLogLen int,
maxBatchSize int,
limiterFactory limiterFactoryFunc,
filterTimeout time.Duration,
) (*Server, error) {
if cache == nil {
cache = &NoopRPCCache{}
Expand All @@ -124,6 +129,10 @@ func NewServer(
maxBatchSize = MaxBatchRPCCallsHardLimit
}

if filterTimeout == 0 {
filterTimeout = defaultFilterTimeout
}

var mainLim FrontendRateLimiter
limExemptOrigins := make([]*regexp.Regexp, 0)
limExemptUserAgents := make([]*regexp.Regexp, 0)
Expand Down Expand Up @@ -166,7 +175,7 @@ func NewServer(
rateLimitHeader = rateLimitConfig.IPHeaderOverride
}

return &Server{
ret := &Server{
BackendGroups: backendGroups,
wsBackendGroup: wsBackendGroup,
wsMethodWhitelist: wsMethodWhitelist,
Expand All @@ -191,7 +200,13 @@ func NewServer(
limExemptOrigins: limExemptOrigins,
limExemptUserAgents: limExemptUserAgents,
rateLimitHeader: rateLimitHeader,
}, nil
filters: make(map[string]*filter),
filterTimeout: filterTimeout,
}

go ret.filterTimeoutLoop()

return ret, nil
}

func (s *Server) RPCListenAndServe(host string, port int) error {
Expand Down Expand Up @@ -398,6 +413,7 @@ func (s *Server) handleBatchRPC(ctx context.Context, reqs []json.RawMessage, isL
type batchGroup struct {
groupID int
backendGroup string
backendName string
}

responses := make([]*RPCRes, len(reqs))
Expand Down Expand Up @@ -486,11 +502,45 @@ func (s *Server) handleBatchRPC(ctx context.Context, reqs []json.RawMessage, isL
}
}

// route filter calls to the backend where the filter was installed at
var backendName string
if parsedReq.Method == "eth_getFilterChanges" || parsedReq.Method == "eth_getFilterLogs" || parsedReq.Method == "eth_uninstallFilter" {
var params []string
if err := json.Unmarshal(parsedReq.Params, &params); err != nil {
log.Debug("error unmarshalling raw transaction params", "err", err, "req_Id", GetReqID(ctx))
RecordRPCError(ctx, BackendProxyd, parsedReq.Method, err)
responses[i] = NewRPCErrorRes(parsedReq.ID, err)
continue
}

removed := parsedReq.Method == "eth_uninstallFilter"
filterID := params[0]
if f, ok := s.filters[filterID]; ok {
if removed {
s.filtersMu.Lock()

f.deadline.Stop()
delete(s.filters, filterID)

s.filtersMu.Unlock()
} else {
f.deadline.Reset(s.filterTimeout)
}

group = f.backendGroup
backendName = f.backendName
} else {
RecordRPCError(ctx, BackendProxyd, parsedReq.Method, err)
responses[i] = NewRPCErrorRes(parsedReq.ID, ErrInvalidParams("filter not found"))
continue
}
}

Comment on lines +506 to +538
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please encapsulate this log into a function

id := string(parsedReq.ID)
// If this is a duplicate Request ID, move the Request to a new batchGroup
ids[id]++
batchGroupID := ids[id]
batchGroup := batchGroup{groupID: batchGroupID, backendGroup: group}
batchGroup := batchGroup{groupID: batchGroupID, backendGroup: group, backendName: backendName}
batches[batchGroup] = append(batches[batchGroup], batchElem{parsedReq, i})
}

Expand All @@ -512,7 +562,7 @@ func (s *Server) handleBatchRPC(ctx context.Context, reqs []json.RawMessage, isL
// Create minibatches - each minibatch must be no larger than the maxUpstreamBatchSize
numBatches := int(math.Ceil(float64(len(cacheMisses)) / float64(s.maxUpstreamBatchSize)))
for i := 0; i < numBatches; i++ {
if ctx.Err() == context.DeadlineExceeded {
if errors.Is(ctx.Err(), context.DeadlineExceeded) {
log.Info("short-circuiting batch RPC",
"req_id", GetReqID(ctx),
"auth", GetAuthCtx(ctx),
Expand All @@ -525,7 +575,7 @@ func (s *Server) handleBatchRPC(ctx context.Context, reqs []json.RawMessage, isL
start := i * s.maxUpstreamBatchSize
end := int(math.Min(float64(start+s.maxUpstreamBatchSize), float64(len(cacheMisses))))
elems := cacheMisses[start:end]
res, sb, err := s.BackendGroups[group.backendGroup].Forward(ctx, createBatchRequest(elems), isBatch)
res, sb, err := s.BackendGroups[group.backendGroup].Forward(ctx, createBatchRequest(elems), isBatch, group.backendName)
servedBy[sb] = true
if err != nil {
if errors.Is(err, ErrConsensusGetReceiptsCantBeBatched) ||
Expand All @@ -548,6 +598,20 @@ func (s *Server) handleBatchRPC(ctx context.Context, reqs []json.RawMessage, isL
for i := range elems {
responses[elems[i].Index] = res[i]

req := elems[i].Req
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please put 601/614 in a function

if res[i].Result != nil && strings.Contains(req.Method, "eth_new") && strings.Contains(req.Method, "Filter") {
f := &filter{
backendGroup: group.backendGroup,
backendName: strings.SplitN(sb, "/", 2)[1],
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sb has the following format: fmt.Sprintf("%s/%s", bg.Name, back.Name) (see proxyd/backend.go:1410)

deadline: time.NewTimer(s.filterTimeout),
}

filterId := res[i].Result.(string)
s.filtersMu.Lock()
s.filters[filterId] = f
s.filtersMu.Unlock()
}

// TODO(inphi): batch put these
if res[i].Error == nil && res[i].Result != nil {
if err := s.cache.PutRPC(ctx, elems[i].Req, res[i]); err != nil {
Expand Down Expand Up @@ -654,6 +718,27 @@ func randStr(l int) string {
return hex.EncodeToString(b)
}

// timeoutLoop runs at the interval set by 'timeout' and deletes filters
// that have not been recently used. It is started when the Server is created.
func (s *Server) filterTimeoutLoop() {
ticker := time.NewTicker(s.filterTimeout)
defer ticker.Stop()
for {
<-ticker.C
s.filtersMu.Lock()
for id, f := range s.filters {
select {
case <-f.deadline.C:
// just delete, the node will automatically expire it on their end
delete(s.filters, id)
Comment on lines +732 to +733
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the default filter timeout on the node itself?
Is the user of this RPC able to override this?
Adding too many filters to a node may slow down performance and not actually deleting in the filter timeout by proxyd could be a concern

default:
continue
}
}
s.filtersMu.Unlock()
}
}

func (s *Server) isUnlimitedOrigin(origin string) bool {
for _, pat := range s.limExemptOrigins {
if pat.MatchString(origin) {
Expand Down Expand Up @@ -878,3 +963,9 @@ func createBatchRequest(elems []batchElem) []*RPCReq {
}
return batch
}

type filter struct {
backendGroup string
backendName string
deadline *time.Timer // filter is inactive when deadline triggers
}