Skip to content

Commit

Permalink
Compatibility fixes/improvements for JSON/RPC filter polling (#641)
Browse files Browse the repository at this point in the history
* Return JSON arrays in filter change results, rather than a string

* Correct serialization of empty array, by avoiding extra marshal step

Signed-off-by: Peter Broadhurst <peter.broadhurst@kaleido.io>

* Correct typo in error message, and make consistent with other clients

Signed-off-by: Peter Broadhurst <peter.broadhurst@kaleido.io>

* Refresh filter timeout when polled

Signed-off-by: Peter Broadhurst <peter.broadhurst@kaleido.io>

* Pull in PR#209 fix from ethgo

Signed-off-by: Peter Broadhurst <peter.broadhurst@kaleido.io>
  • Loading branch information
peterbroadhurst authored Jul 29, 2022
1 parent 8502125 commit 31f8abc
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 43 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ require (
github.com/klauspost/compress v1.15.5 // indirect
github.com/mattn/go-colorable v0.1.12 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/umbracle/ethgo v0.1.3
github.com/umbracle/ethgo v0.1.4-0.20220722090909-c8ac32939570
github.com/valyala/fastjson v1.6.3 // indirect
go.uber.org/zap v1.21.0 // indirect
golang.org/x/sys v0.0.0-20220615213510-4f61da869c0c // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1178,6 +1178,8 @@ github.com/tyler-smith/go-bip39 v1.1.0/go.mod h1:gUYDtqQw1JS3ZJ8UWVcGTGqqr6YIN3C
github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0=
github.com/umbracle/ethgo v0.1.3 h1:s8D7Rmphnt71zuqrgsGTMS5gTNbueGO1zKLh7qsFzTM=
github.com/umbracle/ethgo v0.1.3/go.mod h1:g9zclCLixH8liBI27Py82klDkW7Oo33AxUOr+M9lzrU=
github.com/umbracle/ethgo v0.1.4-0.20220722090909-c8ac32939570 h1:/KyTftQQhxq0iRIVRocn0F2D4zoHmstIfB4FTDjsZbw=
github.com/umbracle/ethgo v0.1.4-0.20220722090909-c8ac32939570/go.mod h1:g9zclCLixH8liBI27Py82klDkW7Oo33AxUOr+M9lzrU=
github.com/umbracle/fastrlp v0.0.0-20220527094140-59d5dd30e722 h1:10Nbw6cACsnQm7r34zlpJky+IzxVLRk6MKTS2d3Vp0E=
github.com/umbracle/fastrlp v0.0.0-20220527094140-59d5dd30e722/go.mod h1:c8J0h9aULj2i3umrfyestM6jCq0LK0U6ly6bWy96nd4=
github.com/umbracle/go-eth-bn256 v0.0.0-20190607160430-b36caf4e0f6b h1:t3nz9xXkLZJz+ZlTGFT3ixsCGO5AHx1Yift2EAfjnnc=
Expand Down
67 changes: 42 additions & 25 deletions jsonrpc/filter_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"encoding/json"
"errors"
"fmt"
"strings"
"sync"
"time"

Expand All @@ -17,7 +16,7 @@ import (
)

var (
ErrFilterDoesNotExists = errors.New("filter does not exists")
ErrFilterNotFound = errors.New("filter not found")
ErrWSFilterDoesNotSupportGetChanges = errors.New("web socket Filter doesn't support to return a batch of the changes")
ErrCastingFilterToLogFilter = errors.New("casting filter object to logFilter error")
ErrBlockNotFound = errors.New("block not found")
Expand All @@ -43,8 +42,8 @@ type filter interface {
// getFilterBase returns filterBase that has common fields
getFilterBase() *filterBase

// getUpdates returns stored data in string
getUpdates() (string, error)
// getUpdates returns stored data in a JSON serializable form
getUpdates() (interface{}, error)

// sendUpdates write stored data to web socket stream
sendUpdates() error
Expand Down Expand Up @@ -130,17 +129,15 @@ func (f *blockFilter) setHeadElem(head *headElem) {
}

// getUpdates returns updates of blocks in string
func (f *blockFilter) getUpdates() (string, error) {
func (f *blockFilter) getUpdates() (interface{}, error) {
headers := f.takeBlockUpdates()

updates := make([]string, len(headers))
for index, header := range headers {
updates[index] = header.Hash.String()
}

return fmt.Sprintf(
"[\"%s\"]", strings.Join(updates, "\",\""),
), nil
return updates, nil
}

// sendUpdates writes the updates of blocks to web socket stream
Expand Down Expand Up @@ -190,15 +187,10 @@ func (f *logFilter) takeLogUpdates() []*Log {
}

// getUpdates returns stored logs in string
func (f *logFilter) getUpdates() (string, error) {
func (f *logFilter) getUpdates() (interface{}, error) {
logs := f.takeLogUpdates()

res, err := json.Marshal(logs)
if err != nil {
return "", err
}

return string(res), nil
return logs, nil
}

// sendUpdates writes stored logs to web socket stream
Expand Down Expand Up @@ -500,7 +492,7 @@ func (f *FilterManager) GetLogFilterFromID(filterID string) (*logFilter, error)
filterRaw := f.getFilterByID(filterID)

if filterRaw == nil {
return nil, ErrFilterDoesNotExists
return nil, ErrFilterNotFound
}

logFilter, ok := filterRaw.(*logFilter)
Expand All @@ -511,28 +503,42 @@ func (f *FilterManager) GetLogFilterFromID(filterID string) (*logFilter, error)
return logFilter, nil
}

// GetFilterChanges returns the updates of the filter with given ID in string
func (f *FilterManager) GetFilterChanges(id string) (string, error) {
// GetFilterChanges returns the updates of the filter with given ID in string, and refreshes the timeout on the filter
func (f *FilterManager) GetFilterChanges(id string) (interface{}, error) {
filter, res, err := f.getFilterAndChanges(id)

if err == nil && !filter.hasWSConn() {
// Refresh the timeout on this filter
f.Lock()
f.refreshFilterTimeout(filter.getFilterBase())
f.Unlock()
}

return res, err
}

// getFilterAndChanges returns the updates of the filter with given ID in string (read lock only)
func (f *FilterManager) getFilterAndChanges(id string) (filter, interface{}, error) {
f.RLock()
defer f.RUnlock()

filter, ok := f.filters[id]

if !ok {
return "", ErrFilterDoesNotExists
return nil, nil, ErrFilterNotFound
}

// we cannot get updates from a ws filter with getFilterChanges
if filter.hasWSConn() {
return "", ErrWSFilterDoesNotSupportGetChanges
return nil, nil, ErrWSFilterDoesNotSupportGetChanges
}

res, err := filter.getUpdates()
if err != nil {
return "", err
return nil, nil, err
}

return res, nil
return filter, res, nil
}

// Uninstall removes the filter with given ID from list
Expand Down Expand Up @@ -568,6 +574,19 @@ func (f *FilterManager) RemoveFilterByWs(ws wsConn) {
f.removeFilterByID(ws.GetFilterID())
}

// refreshFilterTimeout updates the timeout for a filter to the current time
func (f *FilterManager) refreshFilterTimeout(filter *filterBase) {
f.timeouts.removeFilter(filter)
f.addFilterTimeout(filter)
}

// addFilterTimeout set timeout and add to heap
func (f *FilterManager) addFilterTimeout(filter *filterBase) {
filter.expiresAt = time.Now().Add(f.timeout)
f.timeouts.addFilter(filter)
f.emitSignalToUpdateCh()
}

// addFilter is an internal method to add given filter to list and heap
func (f *FilterManager) addFilter(filter filter) string {
f.Lock()
Expand All @@ -579,9 +598,7 @@ func (f *FilterManager) addFilter(filter filter) string {

// Set timeout and add to heap if filter doesn't have web socket connection
if !filter.hasWSConn() {
base.expiresAt = time.Now().Add(f.timeout)
f.timeouts.addFilter(base)
f.emitSignalToUpdateCh()
f.addFilterTimeout(base)
}

return base.id
Expand Down
22 changes: 6 additions & 16 deletions vendor/github.com/umbracle/ethgo/jsonrpc/eth.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion vendor/modules.txt
Original file line number Diff line number Diff line change
Expand Up @@ -680,7 +680,7 @@ github.com/syndtr/goleveldb/leveldb/opt
github.com/syndtr/goleveldb/leveldb/storage
github.com/syndtr/goleveldb/leveldb/table
github.com/syndtr/goleveldb/leveldb/util
# github.com/umbracle/ethgo v0.1.3
# github.com/umbracle/ethgo v0.1.4-0.20220722090909-c8ac32939570
## explicit; go 1.18
github.com/umbracle/ethgo
github.com/umbracle/ethgo/abi
Expand Down

0 comments on commit 31f8abc

Please sign in to comment.