-
Notifications
You must be signed in to change notification settings - Fork 29
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(proxyd): add filter polling support via RPC #105
base: main
Are you sure you want to change the base?
feat(proxyd): add filter polling support via RPC #105
Conversation
if res[i].Result != nil && strings.Contains(req.Method, "eth_new") && strings.Contains(req.Method, "Filter") { | ||
f := &filter{ | ||
backendGroup: group.backendGroup, | ||
backendName: strings.SplitN(sb, "/", 2)[1], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sb
has the following format: fmt.Sprintf("%s/%s", bg.Name, back.Name)
(see proxyd/backend.go:1410
)
7f00fc0
to
02be283
Compare
02be283
to
e87715f
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the great PR and contribution. I've left some initial comments, and I'd also like another person from out team to review this.
Currently we really dont use proxyd with filters so I'm hesitant to merge and this functionality as our team will need to maintain this.
if len(rpcReqs) == 0 { | ||
return nil, "", nil | ||
} | ||
|
||
backends := bg.orderedBackendsForRequest() | ||
backends := bg.orderedBackendsForRequest(targetBackend) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It it even worth it to pass the name into bg.orderedBackendsForRequest? Could it just look up the target backend name and skip orderedBackendForRequest
// FilterTimeoutSeconds specifies the maximum time to keep a filter for that has not been polled for changes. | ||
FilterTimeoutSeconds int `toml:"filter_timeout_seconds"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please make this feature of filter tracking configurable on and off with a boolean
var backendName string | ||
if parsedReq.Method == "eth_getFilterChanges" || parsedReq.Method == "eth_getFilterLogs" || parsedReq.Method == "eth_uninstallFilter" { | ||
var params []string | ||
if err := json.Unmarshal(parsedReq.Params, ¶ms); err != nil { | ||
log.Debug("error unmarshalling raw transaction params", "err", err, "req_Id", GetReqID(ctx)) | ||
RecordRPCError(ctx, BackendProxyd, parsedReq.Method, err) | ||
responses[i] = NewRPCErrorRes(parsedReq.ID, err) | ||
continue | ||
} | ||
|
||
removed := parsedReq.Method == "eth_uninstallFilter" | ||
filterID := params[0] | ||
if f, ok := s.filters[filterID]; ok { | ||
if removed { | ||
s.filtersMu.Lock() | ||
|
||
f.deadline.Stop() | ||
delete(s.filters, filterID) | ||
|
||
s.filtersMu.Unlock() | ||
} else { | ||
f.deadline.Reset(s.filterTimeout) | ||
} | ||
|
||
group = f.backendGroup | ||
backendName = f.backendName | ||
} else { | ||
RecordRPCError(ctx, BackendProxyd, parsedReq.Method, err) | ||
responses[i] = NewRPCErrorRes(parsedReq.ID, ErrInvalidParams("filter not found")) | ||
continue | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please encapsulate this log into a function
// just delete, the node will automatically expire it on their end | ||
delete(s.filters, id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the default filter timeout on the node itself?
Is the user of this RPC able to override this?
Adding too many filters to a node may slow down performance and not actually deleting in the filter timeout by proxyd could be a concern
@@ -548,6 +598,20 @@ func (s *Server) handleBatchRPC(ctx context.Context, reqs []json.RawMessage, isL | |||
for i := range elems { | |||
responses[elems[i].Index] = res[i] | |||
|
|||
req := elems[i].Req |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please put 601/614 in a function
@@ -763,12 +763,12 @@ func (bg *BackendGroup) Primaries() []*Backend { | |||
} | |||
|
|||
// NOTE: BackendGroup Forward contains the log for balancing with consensus aware | |||
func (bg *BackendGroup) Forward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool) ([]*RPCRes, string, error) { | |||
func (bg *BackendGroup) Forward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool, targetBackend string) ([]*RPCRes, string, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It its possible to add targeted backend to the context of the request instead of passing in another argument?
@jelias2 thank you for the context around supporting filters. I'll wait for additional feedback, if this is something you want to support in proxyd, before addressing the points from your review. |
Description
Currently, the backend on which the filter was created at can be different than the backend being polled for changes (due to load balancing), resulting in
filter not found
errors.This PR adds support for creating filters and polling its changes via RPC calls when there are multiple backends defined. For each new filter, the server now stores which backend served the request. When filter changes are requested, the server correctly routes the request to the same backend where the filter was created at.
Tests
Added an integration test that verifies that the filter requests are being routed to the same backend.
Additional context
WS works fine because it keeps open the connection to the correct backend.