diff --git a/proxyd/backend.go b/proxyd/backend.go index 77d3326b..ce7d1f5c 100644 --- a/proxyd/backend.go +++ b/proxyd/backend.go @@ -763,12 +763,12 @@ func (bg *BackendGroup) Primaries() []*Backend { } // NOTE: BackendGroup Forward contains the log for balancing with consensus aware -func (bg *BackendGroup) Forward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool) ([]*RPCRes, string, error) { +func (bg *BackendGroup) Forward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool, targetBackend string) ([]*RPCRes, string, error) { if len(rpcReqs) == 0 { return nil, "", nil } - backends := bg.orderedBackendsForRequest() + backends := bg.orderedBackendsForRequest(targetBackend) overriddenResponses := make([]*indexedReqRes, 0) rewrittenReqs := make([]*RPCReq, 0, len(rpcReqs)) @@ -999,13 +999,19 @@ func weightedShuffle(backends []*Backend) { weightedshuffle.ShuffleInplace(backends, weight, nil) } -func (bg *BackendGroup) orderedBackendsForRequest() []*Backend { - if bg.Consensus != nil { +func (bg *BackendGroup) orderedBackendsForRequest(targetBackend string) []*Backend { + // ignore consensus load balancing if targetBackend is set - no backend other than targetBackend can + // serve the requests + if bg.Consensus != nil && targetBackend == "" { return bg.loadBalancedConsensusGroup() } else { healthy := make([]*Backend, 0, len(bg.Backends)) unhealthy := make([]*Backend, 0, len(bg.Backends)) for _, be := range bg.Backends { + if targetBackend != "" && be.Name != targetBackend { + continue + } + if be.IsHealthy() { healthy = append(healthy, be) } else { diff --git a/proxyd/config.go b/proxyd/config.go index cf3eeb5c..7b25b669 100644 --- a/proxyd/config.go +++ b/proxyd/config.go @@ -29,6 +29,9 @@ type ServerConfig struct { EnablePprof bool `toml:"enable_pprof"` EnableXServedByHeader bool `toml:"enable_served_by_header"` AllowAllOrigins bool `toml:"allow_all_origins"` + + // FilterTimeoutSeconds specifies the maximum time to keep a filter for that has not been polled for changes. + FilterTimeoutSeconds int `toml:"filter_timeout_seconds"` } type CacheConfig struct { diff --git a/proxyd/integration_tests/filter_rpc_routing_test.go b/proxyd/integration_tests/filter_rpc_routing_test.go new file mode 100644 index 00000000..e83435c8 --- /dev/null +++ b/proxyd/integration_tests/filter_rpc_routing_test.go @@ -0,0 +1,72 @@ +package integration_tests + +import ( + "fmt" + "net/http" + "os" + "testing" + + "github.com/ethereum-optimism/infra/proxyd" + "github.com/stretchr/testify/require" +) + +func TestFilterRpcRouting(t *testing.T) { + backend1 := NewMockBackend(nil) + backend2 := NewMockBackend(nil) + defer backend1.Close() + defer backend2.Close() + + require.NoError(t, os.Setenv("NODE1_URL", backend1.URL())) + require.NoError(t, os.Setenv("NODE2_URL", backend2.URL())) + + config := ReadConfig("filter_rpc_routing") + client := NewProxydClient("http://127.0.0.1:8545") + _, shutdown, err := proxyd.Start(config) + require.NoError(t, err) + defer shutdown() + + filterId := "0x11414222354634635214124" + newFilterResponse := fmt.Sprintf(`{"jsonrpc":"2.0","result":"%s","id":1}`, filterId) + getFilterChangesResponse1 := `{"jsonrpc":"2.0","result":["0xffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff"],"id":1}` + getFilterChangesResponse2 := `{"jsonrpc":"2.0","result":["0xeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee"],"id":1}` + + responseQueue := make(chan string, 3) + responseQueue <- newFilterResponse + responseQueue <- getFilterChangesResponse1 + responseQueue <- getFilterChangesResponse2 + + handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + SingleResponseHandler(200, <-responseQueue)(w, r) + }) + + backend1.SetHandler(handler) + backend2.SetHandler(handler) + + res, statusCode, err := client.SendRPC("eth_newBlockFilter", nil) + require.NoError(t, err) + require.Equal(t, 200, statusCode) + + var selectedBackend *MockBackend + if len(backend1.Requests()) > 0 { + selectedBackend = backend1 + } else { + selectedBackend = backend2 + } + + require.Equal(t, 1, len(selectedBackend.Requests())) + RequireEqualJSON(t, []byte(newFilterResponse), res) + + res, statusCode, err = client.SendRPC("eth_getFilterChanges", []interface{}{filterId}) + + require.Equal(t, 2, len(selectedBackend.Requests())) + require.NoError(t, err) + require.Equal(t, 200, statusCode) + RequireEqualJSON(t, []byte(getFilterChangesResponse1), res) + + res, statusCode, err = client.SendRPC("eth_getFilterChanges", []interface{}{filterId}) + + require.Equal(t, 3, len(selectedBackend.Requests())) + require.NoError(t, err) + require.Equal(t, 200, statusCode) + RequireEqualJSON(t, []byte(getFilterChangesResponse2), res) +} diff --git a/proxyd/integration_tests/testdata/filter_rpc_routing.toml b/proxyd/integration_tests/testdata/filter_rpc_routing.toml new file mode 100644 index 00000000..dd939d7b --- /dev/null +++ b/proxyd/integration_tests/testdata/filter_rpc_routing.toml @@ -0,0 +1,20 @@ +[server] +rpc_port = 8545 + +[backends] +[backends.first] +rpc_url = "$NODE1_URL" + +[backends.second] +rpc_url = "$NODE1_URL" + +[backend_groups] +[backend_groups.main] +backends = ["first", "second"] + +[rpc_method_mappings] +eth_newFilter = "main" +eth_newBlockFilter = "main" +eth_uninstallFilter = "main" +eth_getFilterChanges = "main" +eth_getFilterLogs = "main" diff --git a/proxyd/proxyd.go b/proxyd/proxyd.go index f417a16a..d213181c 100644 --- a/proxyd/proxyd.go +++ b/proxyd/proxyd.go @@ -353,6 +353,7 @@ func Start(config *Config) (*Server, func(), error) { config.Server.MaxRequestBodyLogLen, config.BatchConfig.MaxSize, limiterFactory, + secondsToDuration(config.Server.FilterTimeoutSeconds), ) if err != nil { return nil, nil, fmt.Errorf("error creating server: %w", err) diff --git a/proxyd/server.go b/proxyd/server.go index 1b529b6d..6b0bf788 100644 --- a/proxyd/server.go +++ b/proxyd/server.go @@ -43,6 +43,7 @@ const ( defaultWSReadTimeout = 2 * time.Minute defaultWSWriteTimeout = 10 * time.Second defaultCacheTtl = 1 * time.Hour + defaultFilterTimeout = 10 * time.Minute maxRequestBodyLogLen = 2000 defaultMaxUpstreamBatchSize = 10 defaultRateLimitHeader = "X-Forwarded-For" @@ -76,6 +77,9 @@ type Server struct { cache RPCCache srvMu sync.Mutex rateLimitHeader string + filtersMu sync.Mutex + filters map[string]*filter // filterID -> filter + filterTimeout time.Duration } type limiterFunc func(method string) bool @@ -99,6 +103,7 @@ func NewServer( maxRequestBodyLogLen int, maxBatchSize int, limiterFactory limiterFactoryFunc, + filterTimeout time.Duration, ) (*Server, error) { if cache == nil { cache = &NoopRPCCache{} @@ -124,6 +129,10 @@ func NewServer( maxBatchSize = MaxBatchRPCCallsHardLimit } + if filterTimeout == 0 { + filterTimeout = defaultFilterTimeout + } + var mainLim FrontendRateLimiter limExemptOrigins := make([]*regexp.Regexp, 0) limExemptUserAgents := make([]*regexp.Regexp, 0) @@ -166,7 +175,7 @@ func NewServer( rateLimitHeader = rateLimitConfig.IPHeaderOverride } - return &Server{ + ret := &Server{ BackendGroups: backendGroups, wsBackendGroup: wsBackendGroup, wsMethodWhitelist: wsMethodWhitelist, @@ -191,7 +200,13 @@ func NewServer( limExemptOrigins: limExemptOrigins, limExemptUserAgents: limExemptUserAgents, rateLimitHeader: rateLimitHeader, - }, nil + filters: make(map[string]*filter), + filterTimeout: filterTimeout, + } + + go ret.filterTimeoutLoop() + + return ret, nil } func (s *Server) RPCListenAndServe(host string, port int) error { @@ -398,6 +413,7 @@ func (s *Server) handleBatchRPC(ctx context.Context, reqs []json.RawMessage, isL type batchGroup struct { groupID int backendGroup string + backendName string } responses := make([]*RPCRes, len(reqs)) @@ -486,11 +502,45 @@ func (s *Server) handleBatchRPC(ctx context.Context, reqs []json.RawMessage, isL } } + // route filter calls to the backend where the filter was installed at + var backendName string + if parsedReq.Method == "eth_getFilterChanges" || parsedReq.Method == "eth_getFilterLogs" || parsedReq.Method == "eth_uninstallFilter" { + var params []string + if err := json.Unmarshal(parsedReq.Params, ¶ms); err != nil { + log.Debug("error unmarshalling raw transaction params", "err", err, "req_Id", GetReqID(ctx)) + RecordRPCError(ctx, BackendProxyd, parsedReq.Method, err) + responses[i] = NewRPCErrorRes(parsedReq.ID, err) + continue + } + + removed := parsedReq.Method == "eth_uninstallFilter" + filterID := params[0] + if f, ok := s.filters[filterID]; ok { + if removed { + s.filtersMu.Lock() + + f.deadline.Stop() + delete(s.filters, filterID) + + s.filtersMu.Unlock() + } else { + f.deadline.Reset(s.filterTimeout) + } + + group = f.backendGroup + backendName = f.backendName + } else { + RecordRPCError(ctx, BackendProxyd, parsedReq.Method, err) + responses[i] = NewRPCErrorRes(parsedReq.ID, ErrInvalidParams("filter not found")) + continue + } + } + id := string(parsedReq.ID) // If this is a duplicate Request ID, move the Request to a new batchGroup ids[id]++ batchGroupID := ids[id] - batchGroup := batchGroup{groupID: batchGroupID, backendGroup: group} + batchGroup := batchGroup{groupID: batchGroupID, backendGroup: group, backendName: backendName} batches[batchGroup] = append(batches[batchGroup], batchElem{parsedReq, i}) } @@ -512,7 +562,7 @@ func (s *Server) handleBatchRPC(ctx context.Context, reqs []json.RawMessage, isL // Create minibatches - each minibatch must be no larger than the maxUpstreamBatchSize numBatches := int(math.Ceil(float64(len(cacheMisses)) / float64(s.maxUpstreamBatchSize))) for i := 0; i < numBatches; i++ { - if ctx.Err() == context.DeadlineExceeded { + if errors.Is(ctx.Err(), context.DeadlineExceeded) { log.Info("short-circuiting batch RPC", "req_id", GetReqID(ctx), "auth", GetAuthCtx(ctx), @@ -525,7 +575,7 @@ func (s *Server) handleBatchRPC(ctx context.Context, reqs []json.RawMessage, isL start := i * s.maxUpstreamBatchSize end := int(math.Min(float64(start+s.maxUpstreamBatchSize), float64(len(cacheMisses)))) elems := cacheMisses[start:end] - res, sb, err := s.BackendGroups[group.backendGroup].Forward(ctx, createBatchRequest(elems), isBatch) + res, sb, err := s.BackendGroups[group.backendGroup].Forward(ctx, createBatchRequest(elems), isBatch, group.backendName) servedBy[sb] = true if err != nil { if errors.Is(err, ErrConsensusGetReceiptsCantBeBatched) || @@ -548,6 +598,20 @@ func (s *Server) handleBatchRPC(ctx context.Context, reqs []json.RawMessage, isL for i := range elems { responses[elems[i].Index] = res[i] + req := elems[i].Req + if res[i].Result != nil && strings.Contains(req.Method, "eth_new") && strings.Contains(req.Method, "Filter") { + f := &filter{ + backendGroup: group.backendGroup, + backendName: strings.SplitN(sb, "/", 2)[1], + deadline: time.NewTimer(s.filterTimeout), + } + + filterId := res[i].Result.(string) + s.filtersMu.Lock() + s.filters[filterId] = f + s.filtersMu.Unlock() + } + // TODO(inphi): batch put these if res[i].Error == nil && res[i].Result != nil { if err := s.cache.PutRPC(ctx, elems[i].Req, res[i]); err != nil { @@ -654,6 +718,27 @@ func randStr(l int) string { return hex.EncodeToString(b) } +// timeoutLoop runs at the interval set by 'timeout' and deletes filters +// that have not been recently used. It is started when the Server is created. +func (s *Server) filterTimeoutLoop() { + ticker := time.NewTicker(s.filterTimeout) + defer ticker.Stop() + for { + <-ticker.C + s.filtersMu.Lock() + for id, f := range s.filters { + select { + case <-f.deadline.C: + // just delete, the node will automatically expire it on their end + delete(s.filters, id) + default: + continue + } + } + s.filtersMu.Unlock() + } +} + func (s *Server) isUnlimitedOrigin(origin string) bool { for _, pat := range s.limExemptOrigins { if pat.MatchString(origin) { @@ -878,3 +963,9 @@ func createBatchRequest(elems []batchElem) []*RPCReq { } return batch } + +type filter struct { + backendGroup string + backendName string + deadline *time.Timer // filter is inactive when deadline triggers +}