Skip to content

Commit

Permalink
EVM-42 Add error handling to remove closed ws connection in JSON-RPC
Browse files Browse the repository at this point in the history
  • Loading branch information
Kourin1996 committed Sep 26, 2022
1 parent 7975703 commit 27d27d8
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 21 deletions.
10 changes: 3 additions & 7 deletions jsonrpc/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,7 @@ func TestDispatcher_HandleWebsocketConnection_EthSubscribe(t *testing.T) {
store := newMockStore()
dispatcher := newDispatcher(hclog.NewNullLogger(), store, 0, 0, 20, 1000)

mockConnection := &mockWsConn{
msgCh: make(chan []byte, 1),
}
mockConnection, msgCh := newMockWsConnWithMsgCh()

req := []byte(`{
"method": "eth_subscribe",
Expand All @@ -87,7 +85,7 @@ func TestDispatcher_HandleWebsocketConnection_EthSubscribe(t *testing.T) {
})

select {
case <-mockConnection.msgCh:
case <-msgCh:
case <-time.After(2 * time.Second):
t.Fatal("\"newHeads\" event not received in 2 seconds")
}
Expand All @@ -98,9 +96,7 @@ func TestDispatcher_WebsocketConnection_RequestFormats(t *testing.T) {
store := newMockStore()
dispatcher := newDispatcher(hclog.NewNullLogger(), store, 0, 0, 20, 1000)

mockConnection := &mockWsConn{
msgCh: make(chan []byte, 1),
}
mockConnection, _ := newMockWsConnWithMsgCh()

cases := []struct {
msg []byte
Expand Down
3 changes: 2 additions & 1 deletion jsonrpc/filter_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"errors"
"fmt"
"net"
"sync"
"time"

Expand Down Expand Up @@ -718,7 +719,7 @@ func (f *FilterManager) flushWsFilters() error {

if flushErr := filter.sendUpdates(); flushErr != nil {
// mark as closed if the connection is closed
if errors.Is(flushErr, websocket.ErrCloseSent) {
if errors.Is(flushErr, websocket.ErrCloseSent) || errors.Is(flushErr, net.ErrClosed) {
closedFilterIDs = append(closedFilterIDs, id)

f.logger.Warn(fmt.Sprintf("Subscription %s has been closed", id))
Expand Down
134 changes: 121 additions & 13 deletions jsonrpc/filter_manager_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package jsonrpc

import (
"context"
"errors"
"math/big"
"net"
"strconv"
"testing"
"time"
Expand Down Expand Up @@ -326,9 +329,7 @@ func TestRemoveFilterByWebsocket(t *testing.T) {

store := newMockStore()

mock := &mockWsConn{
msgCh: make(chan []byte, 1),
}
mock, _ := newMockWsConnWithMsgCh()

m := NewFilterManager(hclog.NewNullLogger(), store, 1000)
defer m.Close()
Expand All @@ -343,15 +344,100 @@ func TestRemoveFilterByWebsocket(t *testing.T) {
assert.False(t, m.Exists(id))
}

func TestFilterWebsocket(t *testing.T) {
func Test_flushWsFilters(t *testing.T) {
t.Parallel()

store := newMockStore()

mock := &mockWsConn{
msgCh: make(chan []byte, 1),
m := NewFilterManager(hclog.NewNullLogger(), store, 1000)

t.Cleanup(func() {
m.Close()
})

go m.Run()

runTest := func(t *testing.T, flushErr error, shouldExist bool) {
t.Helper()

var (
filterID string
)

mock := &mockWsConn{
SetFilterIDFn: func(s string) {
filterID = s
},
GetFilterIDFn: func() string {
return filterID
},
WriteMessageFn: func(i int, b []byte) error {
return flushErr
},
}

id := m.NewBlockFilter(mock)

// emit event
store.emitEvent(&mockEvent{
NewChain: []*mockHeader{
{
header: &types.Header{
Hash: types.StringToHash("1"),
},
},
},
})

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

for {
select {
case <-ctx.Done():
t.Errorf("timeout for filter existence check, expected=%t, actual=%t", shouldExist, m.Exists(id))

return
default:
if shouldExist == m.Exists(id) {
return
}
}
}
}

t.Run("should remove if sendUpdates returns websocket.ErrCloseSent", func(t *testing.T) {
t.Parallel()

runTest(t, websocket.ErrCloseSent, false)
})

t.Run("should remove if sendUpdates returns net.ErrClosed", func(t *testing.T) {
t.Parallel()

runTest(t, net.ErrClosed, false)
})

t.Run("should keep if sendUpdates returns unknown error", func(t *testing.T) {
t.Parallel()

runTest(t, errors.New("hoge"), true)
})

t.Run("should keep if sendUpdates doesn't return error", func(t *testing.T) {
t.Parallel()

runTest(t, nil, true)
})
}

func TestFilterWebsocket(t *testing.T) {
t.Parallel()

store := newMockStore()

mock, msgCh := newMockWsConnWithMsgCh()

m := NewFilterManager(hclog.NewNullLogger(), store, 1000)
defer m.Close()

Expand All @@ -375,29 +461,51 @@ func TestFilterWebsocket(t *testing.T) {
})

select {
case <-mock.msgCh:
case <-msgCh:
case <-time.After(2 * time.Second):
t.Fatal("bad")
}
}

type mockWsConn struct {
msgCh chan []byte
filterID string
SetFilterIDFn func(string)
GetFilterIDFn func() string
WriteMessageFn func(int, []byte) error
}

func (m *mockWsConn) SetFilterID(filterID string) {
m.filterID = filterID
m.SetFilterIDFn(filterID)
}

func (m *mockWsConn) GetFilterID() string {
return m.filterID
return m.GetFilterIDFn()
}

func (m *mockWsConn) WriteMessage(messageType int, b []byte) error {
m.msgCh <- b
return m.WriteMessageFn(messageType, b)
}

func newMockWsConnWithMsgCh() (*mockWsConn, <-chan []byte) {
var (
filterID string
msgCh = make(chan []byte, 1)
)

mock := &mockWsConn{
SetFilterIDFn: func(s string) {
filterID = s
},
GetFilterIDFn: func() string {
return filterID
},
WriteMessageFn: func(i int, b []byte) error {
msgCh <- b

return nil
},
}

return nil
return mock, msgCh
}

func TestHeadStream(t *testing.T) {
Expand Down

0 comments on commit 27d27d8

Please sign in to comment.