From c7d859da7a8db1499fc2ac8ae7d09788cda89775 Mon Sep 17 00:00:00 2001 From: AleksaOpacic Date: Mon, 30 May 2022 16:40:39 +0200 Subject: [PATCH 01/10] Implement removing from filter by ws --- jsonrpc/dispatcher.go | 4 ++++ jsonrpc/filter_manager.go | 19 +++++++++++++++++++ jsonrpc/filter_manager_test.go | 18 ++++++++++++++++++ jsonrpc/jsonrpc.go | 6 ++++++ 4 files changed, 47 insertions(+) diff --git a/jsonrpc/dispatcher.go b/jsonrpc/dispatcher.go index db32f2d033..02209adcf2 100644 --- a/jsonrpc/dispatcher.go +++ b/jsonrpc/dispatcher.go @@ -167,6 +167,10 @@ func (d *Dispatcher) handleUnsubscribe(req Request) (bool, Error) { return d.filterManager.Uninstall(filterID), nil } +func (d *Dispatcher) RemoveFilterByWs(conn wsConn) bool { + return d.filterManager.RemoveFilterByWs(conn) +} + func (d *Dispatcher) HandleWs(reqBody []byte, conn wsConn) ([]byte, error) { var req Request if err := json.Unmarshal(reqBody, &req); err != nil { diff --git a/jsonrpc/filter_manager.go b/jsonrpc/filter_manager.go index a0c2b278ad..c151027217 100644 --- a/jsonrpc/filter_manager.go +++ b/jsonrpc/filter_manager.go @@ -390,6 +390,25 @@ func (f *FilterManager) removeFilterByID(id string) bool { return true } +// removeFilterByWs removes the filter with given WS, unsafe against race condition +func (f *FilterManager) RemoveFilterByWs(ws wsConn) bool { + for _, filter := range f.filters { + if filter.getFilterBase().ws != ws { + continue + } + + delete(f.filters, filter.getFilterBase().id) + + if removed := f.timeouts.removeFilter(filter.getFilterBase()); removed { + f.emitSignalToUpdateCh() + } + + return true + } + + return false +} + // addFilter is an internal method to add given filter to list and heap func (f *FilterManager) addFilter(filter filter) string { f.lock.Lock() diff --git a/jsonrpc/filter_manager_test.go b/jsonrpc/filter_manager_test.go index 43e9cf440a..66de5bbcd9 100644 --- a/jsonrpc/filter_manager_test.go +++ b/jsonrpc/filter_manager_test.go @@ -148,6 +148,24 @@ func TestFilterTimeout(t *testing.T) { assert.False(t, m.Exists(id)) } +func TestRemoveFilterByWebsocket(t *testing.T) { + store := newMockStore() + + mock := &mockWsConn{ + msgCh: make(chan []byte, 1), + } + + m := NewFilterManager(hclog.NewNullLogger(), store) + go m.Run() + + id := m.NewBlockFilter(mock) + + m.RemoveFilterByWs(mock) + + // false because filter was removed + assert.False(t, m.Exists(id)) +} + func TestFilterWebsocket(t *testing.T) { store := newMockStore() diff --git a/jsonrpc/jsonrpc.go b/jsonrpc/jsonrpc.go index 3f6c221757..05fe61985f 100644 --- a/jsonrpc/jsonrpc.go +++ b/jsonrpc/jsonrpc.go @@ -40,6 +40,7 @@ type JSONRPC struct { } type dispatcher interface { + RemoveFilterByWs(conn wsConn) bool HandleWs(reqBody []byte, conn wsConn) ([]byte, error) Handle(reqBody []byte) ([]byte, error) } @@ -209,6 +210,11 @@ func (j *JSONRPC) handleWs(w http.ResponseWriter, req *http.Request) { j.logger.Info("Closing WS connection with error") } + ok := j.dispatcher.RemoveFilterByWs(wrapConn) + if !ok { + j.logger.Error("Unable to remove filter with specific WS") + } + break } From dfa9cf9b1ee51761e111616bca079e4d33220df0 Mon Sep 17 00:00:00 2001 From: AleksaOpacic Date: Thu, 7 Jul 2022 08:11:52 +0200 Subject: [PATCH 02/10] Remove feedback from RemoveFilterByWs method --- jsonrpc/dispatcher.go | 4 ++-- jsonrpc/filter_manager.go | 6 +----- jsonrpc/jsonrpc.go | 7 ++----- 3 files changed, 5 insertions(+), 12 deletions(-) diff --git a/jsonrpc/dispatcher.go b/jsonrpc/dispatcher.go index 02209adcf2..a907fa20d0 100644 --- a/jsonrpc/dispatcher.go +++ b/jsonrpc/dispatcher.go @@ -167,8 +167,8 @@ func (d *Dispatcher) handleUnsubscribe(req Request) (bool, Error) { return d.filterManager.Uninstall(filterID), nil } -func (d *Dispatcher) RemoveFilterByWs(conn wsConn) bool { - return d.filterManager.RemoveFilterByWs(conn) +func (d *Dispatcher) RemoveFilterByWs(conn wsConn) { + d.filterManager.RemoveFilterByWs(conn) } func (d *Dispatcher) HandleWs(reqBody []byte, conn wsConn) ([]byte, error) { diff --git a/jsonrpc/filter_manager.go b/jsonrpc/filter_manager.go index c151027217..3b5f4ffc8b 100644 --- a/jsonrpc/filter_manager.go +++ b/jsonrpc/filter_manager.go @@ -391,7 +391,7 @@ func (f *FilterManager) removeFilterByID(id string) bool { } // removeFilterByWs removes the filter with given WS, unsafe against race condition -func (f *FilterManager) RemoveFilterByWs(ws wsConn) bool { +func (f *FilterManager) RemoveFilterByWs(ws wsConn) { for _, filter := range f.filters { if filter.getFilterBase().ws != ws { continue @@ -402,11 +402,7 @@ func (f *FilterManager) RemoveFilterByWs(ws wsConn) bool { if removed := f.timeouts.removeFilter(filter.getFilterBase()); removed { f.emitSignalToUpdateCh() } - - return true } - - return false } // addFilter is an internal method to add given filter to list and heap diff --git a/jsonrpc/jsonrpc.go b/jsonrpc/jsonrpc.go index 05fe61985f..49d4843981 100644 --- a/jsonrpc/jsonrpc.go +++ b/jsonrpc/jsonrpc.go @@ -40,7 +40,7 @@ type JSONRPC struct { } type dispatcher interface { - RemoveFilterByWs(conn wsConn) bool + RemoveFilterByWs(conn wsConn) HandleWs(reqBody []byte, conn wsConn) ([]byte, error) Handle(reqBody []byte) ([]byte, error) } @@ -210,10 +210,7 @@ func (j *JSONRPC) handleWs(w http.ResponseWriter, req *http.Request) { j.logger.Info("Closing WS connection with error") } - ok := j.dispatcher.RemoveFilterByWs(wrapConn) - if !ok { - j.logger.Error("Unable to remove filter with specific WS") - } + j.dispatcher.RemoveFilterByWs(wrapConn) break } From dd60f2d672f94444aa9a46fc732c33a46ae61a30 Mon Sep 17 00:00:00 2001 From: AleksaOpacic Date: Thu, 7 Jul 2022 08:27:24 +0200 Subject: [PATCH 03/10] Add filterByWs map --- jsonrpc/filter_manager.go | 41 +++++++++++++++++++++++---------------- 1 file changed, 24 insertions(+), 17 deletions(-) diff --git a/jsonrpc/filter_manager.go b/jsonrpc/filter_manager.go index 3b5f4ffc8b..a7a715e851 100644 --- a/jsonrpc/filter_manager.go +++ b/jsonrpc/filter_manager.go @@ -233,19 +233,22 @@ type FilterManager struct { updateCh chan struct{} closeCh chan struct{} + + filterByWsConn map[wsConn]filter } func NewFilterManager(logger hclog.Logger, store filterManagerStore) *FilterManager { m := &FilterManager{ - logger: logger.Named("filter"), - timeout: defaultTimeout, - store: store, - blockStream: &blockStream{}, - lock: sync.RWMutex{}, - filters: make(map[string]filter), - timeouts: timeHeapImpl{}, - updateCh: make(chan struct{}), - closeCh: make(chan struct{}), + logger: logger.Named("filter"), + timeout: defaultTimeout, + store: store, + blockStream: &blockStream{}, + lock: sync.RWMutex{}, + filters: make(map[string]filter), + timeouts: timeHeapImpl{}, + updateCh: make(chan struct{}), + closeCh: make(chan struct{}), + filterByWsConn: make(map[wsConn]filter), } // start blockstream with the current header @@ -319,6 +322,8 @@ func (f *FilterManager) NewBlockFilter(ws wsConn) string { block: f.blockStream.Head(), } + f.filterByWsConn[ws] = filter + return f.addFilter(filter) } @@ -329,6 +334,8 @@ func (f *FilterManager) NewLogFilter(logQuery *LogQuery, ws wsConn) string { query: logQuery, } + f.filterByWsConn[ws] = filter + return f.addFilter(filter) } @@ -392,16 +399,16 @@ func (f *FilterManager) removeFilterByID(id string) bool { // removeFilterByWs removes the filter with given WS, unsafe against race condition func (f *FilterManager) RemoveFilterByWs(ws wsConn) { - for _, filter := range f.filters { - if filter.getFilterBase().ws != ws { - continue - } + filter, ok := f.filterByWsConn[ws] + if !ok { + return + } - delete(f.filters, filter.getFilterBase().id) + delete(f.filters, filter.getFilterBase().id) + delete(f.filterByWsConn, ws) - if removed := f.timeouts.removeFilter(filter.getFilterBase()); removed { - f.emitSignalToUpdateCh() - } + if removed := f.timeouts.removeFilter(filter.getFilterBase()); removed { + f.emitSignalToUpdateCh() } } From 865b2c3aed0c45e48c16f21980f4b44c10b319a3 Mon Sep 17 00:00:00 2001 From: AleksaOpacic Date: Thu, 7 Jul 2022 08:36:52 +0200 Subject: [PATCH 04/10] delete from filterByWsConn by filterId --- jsonrpc/filter_manager.go | 1 + 1 file changed, 1 insertion(+) diff --git a/jsonrpc/filter_manager.go b/jsonrpc/filter_manager.go index a7a715e851..cc61ee7e3c 100644 --- a/jsonrpc/filter_manager.go +++ b/jsonrpc/filter_manager.go @@ -389,6 +389,7 @@ func (f *FilterManager) removeFilterByID(id string) bool { } delete(f.filters, id) + delete(f.filterByWsConn, filter.getFilterBase().ws) if removed := f.timeouts.removeFilter(filter.getFilterBase()); removed { f.emitSignalToUpdateCh() From af9f7ff6d4b01815b6e7ea3f1aba80c4026dd1bb Mon Sep 17 00:00:00 2001 From: AleksaOpacic Date: Tue, 19 Jul 2022 22:56:43 +0200 Subject: [PATCH 05/10] change logic 3rd time :) --- jsonrpc/dispatcher.go | 2 ++ jsonrpc/filter_manager.go | 34 +++++++++++++++------------------- jsonrpc/filter_manager_test.go | 17 ++++++++++++++++- jsonrpc/jsonrpc.go | 9 +++++++++ 4 files changed, 42 insertions(+), 20 deletions(-) diff --git a/jsonrpc/dispatcher.go b/jsonrpc/dispatcher.go index 5c785d09e3..1225b2713c 100644 --- a/jsonrpc/dispatcher.go +++ b/jsonrpc/dispatcher.go @@ -100,6 +100,8 @@ func (d *Dispatcher) getFnHandler(req Request) (*serviceData, *funcData, Error) type wsConn interface { WriteMessage(messageType int, data []byte) error + GetFilterID() string + SetFilterID(string) } // as per https://www.jsonrpc.org/specification, the `id` in JSON-RPC 2.0 diff --git a/jsonrpc/filter_manager.go b/jsonrpc/filter_manager.go index f973ff932e..890aa2fccd 100644 --- a/jsonrpc/filter_manager.go +++ b/jsonrpc/filter_manager.go @@ -240,22 +240,19 @@ type FilterManager struct { updateCh chan struct{} closeCh chan struct{} - - filterByWsConn map[wsConn]filter } func NewFilterManager(logger hclog.Logger, store filterManagerStore) *FilterManager { m := &FilterManager{ - logger: logger.Named("filter"), - timeout: defaultTimeout, - store: store, - blockStream: &blockStream{}, - lock: sync.RWMutex{}, - filters: make(map[string]filter), - timeouts: timeHeapImpl{}, - updateCh: make(chan struct{}), - closeCh: make(chan struct{}), - filterByWsConn: make(map[wsConn]filter), + logger: logger.Named("filter"), + timeout: defaultTimeout, + store: store, + blockStream: &blockStream{}, + lock: sync.RWMutex{}, + filters: make(map[string]filter), + timeouts: timeHeapImpl{}, + updateCh: make(chan struct{}), + closeCh: make(chan struct{}), } // start blockstream with the current header @@ -330,7 +327,7 @@ func (f *FilterManager) NewBlockFilter(ws wsConn) string { block: f.blockStream.Head(), } - f.filterByWsConn[ws] = filter + ws.SetFilterID(filter.id) return f.addFilter(filter) } @@ -342,8 +339,6 @@ func (f *FilterManager) NewLogFilter(logQuery *LogQuery, ws wsConn) string { query: logQuery, } - f.filterByWsConn[ws] = filter - return f.addFilter(filter) } @@ -525,7 +520,6 @@ func (f *FilterManager) removeFilterByID(id string) bool { } delete(f.filters, id) - delete(f.filterByWsConn, filter.getFilterBase().ws) if removed := f.timeouts.removeFilter(filter.getFilterBase()); removed { f.emitSignalToUpdateCh() @@ -536,13 +530,15 @@ func (f *FilterManager) removeFilterByID(id string) bool { // removeFilterByWs removes the filter with given WS, unsafe against race condition func (f *FilterManager) RemoveFilterByWs(ws wsConn) { - filter, ok := f.filterByWsConn[ws] + filterID := ws.GetFilterID() + + // Make sure filter exists + filter, ok := f.filters[filterID] if !ok { return } - delete(f.filters, filter.getFilterBase().id) - delete(f.filterByWsConn, ws) + delete(f.filters, filterID) if removed := f.timeouts.removeFilter(filter.getFilterBase()); removed { f.emitSignalToUpdateCh() diff --git a/jsonrpc/filter_manager_test.go b/jsonrpc/filter_manager_test.go index 6c354db9b7..c6347f0fb9 100644 --- a/jsonrpc/filter_manager_test.go +++ b/jsonrpc/filter_manager_test.go @@ -345,7 +345,16 @@ func TestFilterWebsocket(t *testing.T) { } type mockWsConn struct { - msgCh chan []byte + msgCh chan []byte + filterID string +} + +func (m *mockWsConn) SetFilterID(filterID string) { + m.filterID = filterID +} + +func (m *mockWsConn) GetFilterID() string { + return m.filterID } func (m *mockWsConn) WriteMessage(messageType int, b []byte) error { @@ -378,6 +387,12 @@ func TestHeadStream(t *testing.T) { type MockClosedWSConnection struct{} +func (m *MockClosedWSConnection) SetFilterID(_filterID string) {} + +func (m *MockClosedWSConnection) GetFilterID() string { + return "" +} + func (m *MockClosedWSConnection) WriteMessage(_messageType int, _data []byte) error { return websocket.ErrCloseSent } diff --git a/jsonrpc/jsonrpc.go b/jsonrpc/jsonrpc.go index 6b42d1657e..027288e37d 100644 --- a/jsonrpc/jsonrpc.go +++ b/jsonrpc/jsonrpc.go @@ -146,6 +146,15 @@ type wsWrapper struct { ws *websocket.Conn // the actual WS connection logger hclog.Logger // module logger writeLock sync.Mutex // writer lock + filterID string // filter ID +} + +func (w *wsWrapper) SetFilterID(filterID string) { + w.filterID = filterID +} + +func (w *wsWrapper) GetFilterID() string { + return w.filterID } // WriteMessage writes out the message to the WS peer From b13e4cfdc4194734fce2ef60c94c7c38cd1886b6 Mon Sep 17 00:00:00 2001 From: Milos Zivkovic Date: Mon, 25 Jul 2022 13:27:34 +0200 Subject: [PATCH 06/10] Minor cleanup --- jsonrpc/eth_endpoint.go | 8 +--- jsonrpc/filter_manager.go | 78 ++++++++++++++++++++------------------- jsonrpc/jsonrpc.go | 13 ++++--- 3 files changed, 50 insertions(+), 49 deletions(-) diff --git a/jsonrpc/eth_endpoint.go b/jsonrpc/eth_endpoint.go index ad0cc38068..a1aa9242a7 100644 --- a/jsonrpc/eth_endpoint.go +++ b/jsonrpc/eth_endpoint.go @@ -803,16 +803,12 @@ func (e *Eth) GetFilterChanges(id string) (interface{}, error) { // UninstallFilter uninstalls a filter with given ID func (e *Eth) UninstallFilter(id string) (bool, error) { - ok := e.filterManager.Uninstall(id) - - return ok, nil + return e.filterManager.Uninstall(id), nil } // Unsubscribe uninstalls a filter in a websocket func (e *Eth) Unsubscribe(id string) (bool, error) { - ok := e.filterManager.Uninstall(id) - - return ok, nil + return e.filterManager.Uninstall(id), nil } func (e *Eth) getBlockHeader(number BlockNumber) (*types.Header, error) { diff --git a/jsonrpc/filter_manager.go b/jsonrpc/filter_manager.go index 890aa2fccd..2ccb54664c 100644 --- a/jsonrpc/filter_manager.go +++ b/jsonrpc/filter_manager.go @@ -23,6 +23,7 @@ var ( ErrBlockNotFound = errors.New("block not found") ErrIncorrectBlockRange = errors.New("incorrect range") ErrPendingBlockNumber = errors.New("pending block number is not supported") + ErrNoWSConnection = errors.New("no websocket connection") ) // defaultTimeout is the timeout to remove the filters that don't have a web socket stream @@ -35,8 +36,8 @@ const ( // filter is an interface that BlockFilter and LogFilter implement type filter interface { - // isWS returns the flag indicating the filter has web socket stream - isWS() bool + // hasWSConn returns the flag indicating the filter has web socket stream + hasWSConn() bool // getFilterBase returns filterBase that has common fields getFilterBase() *filterBase @@ -57,7 +58,7 @@ type filterBase struct { heapIndex int // timestamp to be expired - expiredAt time.Time + expiresAt time.Time // websocket connection ws wsConn @@ -77,8 +78,8 @@ func (f *filterBase) getFilterBase() *filterBase { return f } -// isWS returns the flag indicating this filter has websocket connection -func (f *filterBase) isWS() bool { +// hasWSConn returns the flag indicating this filter has websocket connection +func (f *filterBase) hasWSConn() bool { return f.ws != nil } @@ -93,18 +94,21 @@ const ethSubscriptionTemplate = `{ // writeMessageToWs sends given message to websocket stream func (f *filterBase) writeMessageToWs(msg string) error { - res := fmt.Sprintf(ethSubscriptionTemplate, f.id, msg) - if err := f.ws.WriteMessage(websocket.TextMessage, []byte(res)); err != nil { - return err + if !f.hasWSConn() { + return ErrNoWSConnection } - return nil + return f.ws.WriteMessage( + websocket.TextMessage, + []byte(fmt.Sprintf(ethSubscriptionTemplate, f.id, msg)), + ) } // blockFilter is a filter to store the updates of block type blockFilter struct { filterBase sync.Mutex + block *headElem } @@ -226,6 +230,8 @@ type filterManagerStore interface { // FilterManager manages all running filters type FilterManager struct { + sync.RWMutex + logger hclog.Logger timeout time.Duration @@ -234,7 +240,6 @@ type FilterManager struct { subscription blockchain.Subscription blockStream *blockStream - lock sync.RWMutex filters map[string]filter timeouts timeHeapImpl @@ -248,7 +253,6 @@ func NewFilterManager(logger hclog.Logger, store filterManagerStore) *FilterMana timeout: defaultTimeout, store: store, blockStream: &blockStream{}, - lock: sync.RWMutex{}, filters: make(map[string]filter), timeouts: timeHeapImpl{}, updateCh: make(chan struct{}), @@ -288,7 +292,7 @@ func (f *FilterManager) Run() { // set timer to remove filter if filterBase != nil { - timeoutCh = time.After(time.Until(filterBase.expiredAt)) + timeoutCh = time.After(time.Until(filterBase.expiresAt)) } select { @@ -344,8 +348,8 @@ func (f *FilterManager) NewLogFilter(logQuery *LogQuery, ws wsConn) string { // Exists checks the filter with given ID exists func (f *FilterManager) Exists(id string) bool { - f.lock.RLock() - defer f.lock.RUnlock() + f.RLock() + defer f.RUnlock() _, ok := f.filters[id] @@ -463,10 +467,10 @@ func (f *FilterManager) GetLogsForQuery(query *LogQuery) ([]*Log, error) { //GetLogFilterFromID return log filter for given filterID func (f *FilterManager) GetLogFilterFromID(filterID string) (*logFilter, error) { - f.lock.RLock() + f.RLock() filter, ok := f.filters[filterID] - f.lock.RUnlock() + f.RUnlock() if !ok { return nil, ErrFilterDoesNotExists @@ -482,8 +486,8 @@ func (f *FilterManager) GetLogFilterFromID(filterID string) (*logFilter, error) // GetFilterChanges returns the updates of the filter with given ID in string func (f *FilterManager) GetFilterChanges(id string) (string, error) { - f.lock.RLock() - defer f.lock.RUnlock() + f.RLock() + defer f.RUnlock() filter, ok := f.filters[id] @@ -492,7 +496,7 @@ func (f *FilterManager) GetFilterChanges(id string) (string, error) { } // we cannot get updates from a ws filter with getFilterChanges - if filter.isWS() { + if filter.hasWSConn() { return "", ErrWSFilterDoesNotSupportGetChanges } @@ -506,8 +510,8 @@ func (f *FilterManager) GetFilterChanges(id string) (string, error) { // Uninstall removes the filter with given ID from list func (f *FilterManager) Uninstall(id string) bool { - f.lock.Lock() - defer f.lock.Unlock() + f.Lock() + defer f.Unlock() return f.removeFilterByID(id) } @@ -547,16 +551,16 @@ func (f *FilterManager) RemoveFilterByWs(ws wsConn) { // addFilter is an internal method to add given filter to list and heap func (f *FilterManager) addFilter(filter filter) string { - f.lock.Lock() - defer f.lock.Unlock() + f.Lock() + defer f.Unlock() base := filter.getFilterBase() f.filters[base.id] = filter // Set timeout and add to heap if filter doesn't have web socket connection - if !filter.isWS() { - base.expiredAt = time.Now().Add(f.timeout) + if !filter.hasWSConn() { + base.expiresAt = time.Now().Add(f.timeout) f.timeouts.addFilter(base) f.emitSignalToUpdateCh() } @@ -575,8 +579,8 @@ func (f *FilterManager) emitSignalToUpdateCh() { // nextTimeoutFilter returns the filter that will be expired next // nextTimeoutFilter returns the only filter with timeout func (f *FilterManager) nextTimeoutFilter() *filterBase { - f.lock.RLock() - defer f.lock.RUnlock() + f.RLock() + defer f.RUnlock() if len(f.timeouts) == 0 { return nil @@ -605,8 +609,8 @@ func (f *FilterManager) dispatchEvent(evnt *blockchain.Event) error { // processEvent makes each filter append the new data that interests them func (f *FilterManager) processEvent(evnt *blockchain.Event) error { - f.lock.RLock() - defer f.lock.RUnlock() + f.RLock() + defer f.RUnlock() // first include all the new headers in the blockstream for BlockFilter for _, header := range evnt.NewChain { @@ -684,10 +688,10 @@ func (f *FilterManager) appendLogsToFilters(header *types.Header, removed bool) func (f *FilterManager) flushWsFilters() error { closedFilterIDs := make([]string, 0) - f.lock.RLock() + f.RLock() for id, filter := range f.filters { - if !filter.isWS() { + if !filter.hasWSConn() { continue } @@ -705,17 +709,17 @@ func (f *FilterManager) flushWsFilters() error { } } - f.lock.RUnlock() + f.RUnlock() // remove filters with closed web socket connections from FilterManager if len(closedFilterIDs) > 0 { - f.lock.Lock() + f.Lock() for _, id := range closedFilterIDs { f.removeFilterByID(id) } - f.lock.Unlock() + f.Unlock() f.emitSignalToUpdateCh() f.logger.Info(fmt.Sprintf("Removed %d filters due to closed connections", len(closedFilterIDs))) } @@ -725,8 +729,8 @@ func (f *FilterManager) flushWsFilters() error { // getLogFilters returns logFilters func (f *FilterManager) getLogFilters() []*logFilter { - f.lock.RLock() - defer f.lock.RUnlock() + f.RLock() + defer f.RUnlock() logFilters := []*logFilter{} @@ -758,7 +762,7 @@ func (t *timeHeapImpl) removeFilter(filter *filterBase) bool { func (t timeHeapImpl) Len() int { return len(t) } func (t timeHeapImpl) Less(i, j int) bool { - return t[i].expiredAt.Before(t[j].expiredAt) + return t[i].expiresAt.Before(t[j].expiresAt) } func (t timeHeapImpl) Swap(i, j int) { diff --git a/jsonrpc/jsonrpc.go b/jsonrpc/jsonrpc.go index 027288e37d..b425322538 100644 --- a/jsonrpc/jsonrpc.go +++ b/jsonrpc/jsonrpc.go @@ -143,10 +143,11 @@ var wsUpgrader = websocket.Upgrader{ // wsWrapper is a wrapping object for the web socket connection and logger type wsWrapper struct { - ws *websocket.Conn // the actual WS connection - logger hclog.Logger // module logger - writeLock sync.Mutex // writer lock - filterID string // filter ID + sync.Mutex + + ws *websocket.Conn // the actual WS connection + logger hclog.Logger // module logger + filterID string // filter ID } func (w *wsWrapper) SetFilterID(filterID string) { @@ -159,8 +160,8 @@ func (w *wsWrapper) GetFilterID() string { // WriteMessage writes out the message to the WS peer func (w *wsWrapper) WriteMessage(messageType int, data []byte) error { - w.writeLock.Lock() - defer w.writeLock.Unlock() + w.Lock() + defer w.Unlock() writeErr := w.ws.WriteMessage(messageType, data) if writeErr != nil { From e55be3add33703b5638383a8c567ac223513157b Mon Sep 17 00:00:00 2001 From: Milos Zivkovic Date: Mon, 25 Jul 2022 16:55:36 +0200 Subject: [PATCH 07/10] Continue with minor cleanup, and thread protection --- jsonrpc/filter_manager.go | 148 +++++++++++++++++++------------------- 1 file changed, 72 insertions(+), 76 deletions(-) diff --git a/jsonrpc/filter_manager.go b/jsonrpc/filter_manager.go index 2ccb54664c..6e140220db 100644 --- a/jsonrpc/filter_manager.go +++ b/jsonrpc/filter_manager.go @@ -115,32 +115,39 @@ type blockFilter struct { // takeBlockUpdates advances blocks from head to latest and returns header array func (f *blockFilter) takeBlockUpdates() []*types.Header { updates, newHead := f.block.getUpdates() + f.setHeadElem(newHead) + return updates +} + +// setHeadElem sets the block filter head +func (f *blockFilter) setHeadElem(head *headElem) { f.Lock() - f.block = newHead - f.Unlock() + defer f.Unlock() - return updates + f.block = head } // getUpdates returns updates of blocks in string func (f *blockFilter) getUpdates() (string, error) { headers := f.takeBlockUpdates() - updates := []string{} - for _, header := range headers { - updates = append(updates, header.Hash.String()) + updates := make([]string, len(headers)) + for index, header := range headers { + updates[index] = header.Hash.String() } - return fmt.Sprintf("[\"%s\"]", strings.Join(updates, "\",\"")), nil + return fmt.Sprintf( + "[\"%s\"]", strings.Join(updates, "\",\""), + ), nil } // sendUpdates writes the updates of blocks to web socket stream func (f *blockFilter) sendUpdates() error { updates := f.takeBlockUpdates() - for _, block := range updates { - raw, err := json.Marshal(block) + for _, header := range updates { + raw, err := json.Marshal(header) if err != nil { return err } @@ -157,6 +164,7 @@ func (f *blockFilter) sendUpdates() error { type logFilter struct { filterBase sync.Mutex + query *LogQuery logs []*Log } @@ -175,7 +183,7 @@ func (f *logFilter) takeLogUpdates() []*Log { defer f.Unlock() logs := f.logs - f.logs = []*Log{} // create brand new slice so that prevent new logs from being added to current logs + f.logs = []*Log{} // create brand-new slice so that prevent new logs from being added to current logs return logs } @@ -305,8 +313,8 @@ func (f *FilterManager) Run() { case <-timeoutCh: // timeout for filter // if filter still exists - if filterBase != nil && !f.Uninstall(filterBase.id) { - f.logger.Error("failed to uninstall filter", "id", filterBase.id) + if !f.Uninstall(filterBase.id) { + f.logger.Warn("failed to uninstall filter", "id", filterBase.id) } case <-f.updateCh: @@ -331,7 +339,9 @@ func (f *FilterManager) NewBlockFilter(ws wsConn) string { block: f.blockStream.Head(), } - ws.SetFilterID(filter.id) + if filter.hasWSConn() { + ws.SetFilterID(filter.id) + } return f.addFilter(filter) } @@ -343,6 +353,10 @@ func (f *FilterManager) NewLogFilter(logQuery *LogQuery, ws wsConn) string { query: logQuery, } + if filter.hasWSConn() { + ws.SetFilterID(filter.id) + } + return f.addFilter(filter) } @@ -370,7 +384,7 @@ func (f *FilterManager) getLogsFromBlock(query *LogQuery, block *types.Block) ([ logs = append(logs, &Log{ Address: log.Address, Topics: log.Topics, - Data: argBytes(log.Data), + Data: log.Data, BlockNumber: argUint64(block.Header.Number), BlockHash: block.Header.Hash, TxHash: block.Transactions[idx].Hash, @@ -465,18 +479,29 @@ func (f *FilterManager) GetLogsForQuery(query *LogQuery) ([]*Log, error) { return f.getLogsFromBlocks(query) } -//GetLogFilterFromID return log filter for given filterID -func (f *FilterManager) GetLogFilterFromID(filterID string) (*logFilter, error) { +// getFilterByID fetches the filter by the ID +func (f *FilterManager) getFilterByID(filterID string) filter { f.RLock() + defer f.RUnlock() - filter, ok := f.filters[filterID] - f.RUnlock() + filter, exists := f.filters[filterID] - if !ok { + if !exists { + return nil + } + + return filter +} + +//GetLogFilterFromID return log filter for given filterID +func (f *FilterManager) GetLogFilterFromID(filterID string) (*logFilter, error) { + filterRaw := f.getFilterByID(filterID) + + if filterRaw == nil { return nil, ErrFilterDoesNotExists } - logFilter, ok := filter.(*logFilter) + logFilter, ok := filterRaw.(*logFilter) if !ok { return nil, ErrCastingFilterToLogFilter } @@ -516,8 +541,9 @@ func (f *FilterManager) Uninstall(id string) bool { return f.removeFilterByID(id) } -// removeFilterByID removes the filter with given ID, unsafe against race condition +// removeFilterByID removes the filter with given ID [NOT Thread Safe] func (f *FilterManager) removeFilterByID(id string) bool { + // Make sure filter exists filter, ok := f.filters[id] if !ok { return false @@ -532,21 +558,12 @@ func (f *FilterManager) removeFilterByID(id string) bool { return true } -// removeFilterByWs removes the filter with given WS, unsafe against race condition +// RemoveFilterByWs removes the filter with given WS [Thread safe] func (f *FilterManager) RemoveFilterByWs(ws wsConn) { - filterID := ws.GetFilterID() - - // Make sure filter exists - filter, ok := f.filters[filterID] - if !ok { - return - } - - delete(f.filters, filterID) + f.Lock() + defer f.Unlock() - if removed := f.timeouts.removeFilter(filter.getFilterBase()); removed { - f.emitSignalToUpdateCh() - } + f.removeFilterByID(ws.GetFilterID()) } // addFilter is an internal method to add given filter to list and heap @@ -592,12 +609,10 @@ func (f *FilterManager) nextTimeoutFilter() *filterBase { return base } -// dispatchEvent is a event handler for new block event +// dispatchEvent is an event handler for new block event func (f *FilterManager) dispatchEvent(evnt *blockchain.Event) error { // store new event in each filters - if err := f.processEvent(evnt); err != nil { - return err - } + f.processEvent(evnt) // send data to web socket stream if err := f.flushWsFilters(); err != nil { @@ -608,34 +623,23 @@ func (f *FilterManager) dispatchEvent(evnt *blockchain.Event) error { } // processEvent makes each filter append the new data that interests them -func (f *FilterManager) processEvent(evnt *blockchain.Event) error { +func (f *FilterManager) processEvent(evnt *blockchain.Event) { f.RLock() defer f.RUnlock() - // first include all the new headers in the blockstream for BlockFilter for _, header := range evnt.NewChain { + // first include all the new headers in the blockstream for BlockFilter f.blockStream.push(header) - } - // process old chain to include old logs marked removed for LogFilter - for _, header := range evnt.OldChain { - if processErr := f.appendLogsToFilters(header, true); processErr != nil { + // process new chain to include new logs for LogFilter + if processErr := f.appendLogsToFilters(header); processErr != nil { f.logger.Error(fmt.Sprintf("Unable to process block, %v", processErr)) } } - - // process new chain to include new logs for LogFilter - for _, header := range evnt.NewChain { - if processErr := f.appendLogsToFilters(header, false); processErr != nil { - f.logger.Error(fmt.Sprintf("Unable to process block, %v", processErr)) - } - } - - return nil } // appendLogsToFilters makes each LogFilters append logs in the header -func (f *FilterManager) appendLogsToFilters(header *types.Header, removed bool) error { +func (f *FilterManager) appendLogsToFilters(header *types.Header) error { receipts, err := f.store.GetReceiptsByHash(header.Hash) if err != nil { return err @@ -661,20 +665,18 @@ func (f *FilterManager) appendLogsToFilters(header *types.Header, removed bool) } // check the logs with the filters for _, log := range receipt.Logs { - nn := &Log{ - Address: log.Address, - Topics: log.Topics, - Data: argBytes(log.Data), - BlockNumber: argUint64(header.Number), - BlockHash: header.Hash, - TxHash: receipt.TxHash, - TxIndex: argUint64(indx), - Removed: removed, - } - for _, f := range logFilters { if f.query.Match(log) { - f.appendLog(nn) + f.appendLog(&Log{ + Address: log.Address, + Topics: log.Topics, + Data: argBytes(log.Data), + BlockNumber: argUint64(header.Number), + BlockHash: header.Hash, + TxHash: receipt.TxHash, + TxIndex: argUint64(indx), + Removed: false, + }) } } } @@ -714,13 +716,11 @@ func (f *FilterManager) flushWsFilters() error { // remove filters with closed web socket connections from FilterManager if len(closedFilterIDs) > 0 { f.Lock() - for _, id := range closedFilterIDs { f.removeFilterByID(id) } - f.Unlock() - f.emitSignalToUpdateCh() + f.logger.Info(fmt.Sprintf("Removed %d filters due to closed connections", len(closedFilterIDs))) } @@ -732,7 +732,7 @@ func (f *FilterManager) getLogFilters() []*logFilter { f.RLock() defer f.RUnlock() - logFilters := []*logFilter{} + logFilters := make([]*logFilter, 0) for _, f := range f.filters { if logFilter, ok := f.(*logFilter); ok { @@ -824,15 +824,11 @@ type headElem struct { } func (h *headElem) getUpdates() ([]*types.Header, *headElem) { - res := []*types.Header{} + res := make([]*types.Header, 0) cur := h - for { - if cur.next == nil { - break - } - + for cur.next != nil { cur = cur.next res = append(res, cur.header) } From 12f591a234b3db3ba72e4894e1c6b077b3b916d7 Mon Sep 17 00:00:00 2001 From: Milos Zivkovic Date: Tue, 26 Jul 2022 10:57:45 +0200 Subject: [PATCH 08/10] Simplify filter return --- jsonrpc/filter_manager.go | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/jsonrpc/filter_manager.go b/jsonrpc/filter_manager.go index 6e140220db..1d89966e93 100644 --- a/jsonrpc/filter_manager.go +++ b/jsonrpc/filter_manager.go @@ -484,13 +484,7 @@ func (f *FilterManager) getFilterByID(filterID string) filter { f.RLock() defer f.RUnlock() - filter, exists := f.filters[filterID] - - if !exists { - return nil - } - - return filter + return f.filters[filterID] } //GetLogFilterFromID return log filter for given filterID From 246f04eeb3f24e18b3807fe59dd54343f7fc7441 Mon Sep 17 00:00:00 2001 From: Milos Zivkovic Date: Tue, 26 Jul 2022 11:00:43 +0200 Subject: [PATCH 09/10] Add close to the FM in tests --- jsonrpc/filter_manager_test.go | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/jsonrpc/filter_manager_test.go b/jsonrpc/filter_manager_test.go index c6347f0fb9..87b02a1085 100644 --- a/jsonrpc/filter_manager_test.go +++ b/jsonrpc/filter_manager_test.go @@ -113,6 +113,7 @@ func Test_GetLogsForQuery(t *testing.T) { store.appendBlocksToStore(blocks) f := NewFilterManager(hclog.NewNullLogger(), store) + defer f.Close() for _, testCase := range testTable { testCase := testCase @@ -139,6 +140,7 @@ func Test_GetLogFilterFromID(t *testing.T) { store := newMockStore() m := NewFilterManager(hclog.NewNullLogger(), store) + defer m.Close() go m.Run() @@ -159,6 +161,8 @@ func TestFilterLog(t *testing.T) { store := newMockStore() m := NewFilterManager(hclog.NewNullLogger(), store) + defer m.Close() + go m.Run() id := m.NewLogFilter(&LogQuery{ @@ -219,6 +223,8 @@ func TestFilterBlock(t *testing.T) { store := newMockStore() m := NewFilterManager(hclog.NewNullLogger(), store) + defer m.Close() + go m.Run() // add block filter @@ -280,6 +286,8 @@ func TestFilterTimeout(t *testing.T) { store := newMockStore() m := NewFilterManager(hclog.NewNullLogger(), store) + defer m.Close() + m.timeout = 2 * time.Second go m.Run() @@ -300,6 +308,8 @@ func TestRemoveFilterByWebsocket(t *testing.T) { } m := NewFilterManager(hclog.NewNullLogger(), store) + defer m.Close() + go m.Run() id := m.NewBlockFilter(mock) @@ -318,6 +328,8 @@ func TestFilterWebsocket(t *testing.T) { } m := NewFilterManager(hclog.NewNullLogger(), store) + defer m.Close() + go m.Run() id := m.NewBlockFilter(mock) @@ -401,6 +413,7 @@ func TestClosedFilterDeletion(t *testing.T) { store := newMockStore() m := NewFilterManager(hclog.NewNullLogger(), store) + defer m.Close() go m.Run() From 397ee227049ec6acbe72e8400a70d8a22ca7f3cf Mon Sep 17 00:00:00 2001 From: Milos Zivkovic Date: Wed, 27 Jul 2022 10:09:40 +0200 Subject: [PATCH 10/10] Resolve linting error, add parallel execution for tests --- jsonrpc/filter_manager_test.go | 21 ++++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/jsonrpc/filter_manager_test.go b/jsonrpc/filter_manager_test.go index 87b02a1085..11cd5b14e0 100644 --- a/jsonrpc/filter_manager_test.go +++ b/jsonrpc/filter_manager_test.go @@ -113,7 +113,10 @@ func Test_GetLogsForQuery(t *testing.T) { store.appendBlocksToStore(blocks) f := NewFilterManager(hclog.NewNullLogger(), store) - defer f.Close() + + t.Cleanup(func() { + defer f.Close() + }) for _, testCase := range testTable { testCase := testCase @@ -137,6 +140,8 @@ func Test_GetLogsForQuery(t *testing.T) { } func Test_GetLogFilterFromID(t *testing.T) { + t.Parallel() + store := newMockStore() m := NewFilterManager(hclog.NewNullLogger(), store) @@ -158,6 +163,8 @@ func Test_GetLogFilterFromID(t *testing.T) { } func TestFilterLog(t *testing.T) { + t.Parallel() + store := newMockStore() m := NewFilterManager(hclog.NewNullLogger(), store) @@ -220,6 +227,8 @@ func TestFilterLog(t *testing.T) { } func TestFilterBlock(t *testing.T) { + t.Parallel() + store := newMockStore() m := NewFilterManager(hclog.NewNullLogger(), store) @@ -283,6 +292,8 @@ func TestFilterBlock(t *testing.T) { } func TestFilterTimeout(t *testing.T) { + t.Parallel() + store := newMockStore() m := NewFilterManager(hclog.NewNullLogger(), store) @@ -301,6 +312,8 @@ func TestFilterTimeout(t *testing.T) { } func TestRemoveFilterByWebsocket(t *testing.T) { + t.Parallel() + store := newMockStore() mock := &mockWsConn{ @@ -321,6 +334,8 @@ func TestRemoveFilterByWebsocket(t *testing.T) { } func TestFilterWebsocket(t *testing.T) { + t.Parallel() + store := newMockStore() mock := &mockWsConn{ @@ -376,6 +391,8 @@ func (m *mockWsConn) WriteMessage(messageType int, b []byte) error { } func TestHeadStream(t *testing.T) { + t.Parallel() + b := &blockStream{} b.push(&types.Header{Hash: types.StringToHash("1")}) @@ -410,6 +427,8 @@ func (m *MockClosedWSConnection) WriteMessage(_messageType int, _data []byte) er } func TestClosedFilterDeletion(t *testing.T) { + t.Parallel() + store := newMockStore() m := NewFilterManager(hclog.NewNullLogger(), store)