Skip to content

Commit

Permalink
Merge pull request #6483 from ethereum-optimism/felipe/proxyd-timeouts
Browse files Browse the repository at this point in the history
feat(proxyd): betterer timeoutz
  • Loading branch information
OptimismBot authored Jul 28, 2023
2 parents 63d5a75 + 452cf2d commit e2054df
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 16 deletions.
38 changes: 29 additions & 9 deletions proxyd/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -854,9 +854,12 @@ func calcBackoff(i int) time.Duration {
type WSProxier struct {
backend *Backend
clientConn *websocket.Conn
clientConnMu sync.Mutex
backendConn *websocket.Conn
backendConnMu sync.Mutex
methodWhitelist *StringSet
clientConnMu sync.Mutex
readTimeout time.Duration
writeTimeout time.Duration
}

func NewWSProxier(backend *Backend, clientConn, backendConn *websocket.Conn, methodWhitelist *StringSet) *WSProxier {
Expand All @@ -865,6 +868,8 @@ func NewWSProxier(backend *Backend, clientConn, backendConn *websocket.Conn, met
clientConn: clientConn,
backendConn: backendConn,
methodWhitelist: methodWhitelist,
readTimeout: defaultWSReadTimeout,
writeTimeout: defaultWSWriteTimeout,
}
}

Expand All @@ -882,19 +887,19 @@ func (w *WSProxier) clientPump(ctx context.Context, errC chan error) {
// Block until we get a message.
msgType, msg, err := w.clientConn.ReadMessage()
if err != nil {
errC <- err
if err := w.backendConn.WriteMessage(websocket.CloseMessage, formatWSError(err)); err != nil {
if err := w.writeBackendConn(websocket.CloseMessage, formatWSError(err)); err != nil {
log.Error("error writing backendConn message", "err", err)
errC <- err
return
}
return
}

RecordWSMessage(ctx, w.backend.Name, SourceClient)

// Route control messages to the backend. These don't
// count towards the total RPC requests count.
if msgType != websocket.TextMessage && msgType != websocket.BinaryMessage {
err := w.backendConn.WriteMessage(msgType, msg)
err := w.writeBackendConn(msgType, msg)
if err != nil {
errC <- err
return
Expand Down Expand Up @@ -952,7 +957,7 @@ func (w *WSProxier) clientPump(ctx context.Context, errC chan error) {
"req_id", GetReqID(ctx),
)

err = w.backendConn.WriteMessage(msgType, msg)
err = w.writeBackendConn(msgType, msg)
if err != nil {
errC <- err
return
Expand All @@ -965,11 +970,11 @@ func (w *WSProxier) backendPump(ctx context.Context, errC chan error) {
// Block until we get a message.
msgType, msg, err := w.backendConn.ReadMessage()
if err != nil {
errC <- err
if err := w.writeClientConn(websocket.CloseMessage, formatWSError(err)); err != nil {
log.Error("error writing clientConn message", "err", err)
errC <- err
return
}
return
}

RecordWSMessage(ctx, w.backend.Name, SourceBackend)
Expand Down Expand Up @@ -1050,8 +1055,23 @@ func (w *WSProxier) parseBackendMsg(msg []byte) (*RPCRes, error) {

func (w *WSProxier) writeClientConn(msgType int, msg []byte) error {
w.clientConnMu.Lock()
defer w.clientConnMu.Unlock()
if err := w.clientConn.SetWriteDeadline(time.Now().Add(w.writeTimeout)); err != nil {
log.Error("ws client write timeout", "err", err)
return err
}
err := w.clientConn.WriteMessage(msgType, msg)
w.clientConnMu.Unlock()
return err
}

func (w *WSProxier) writeBackendConn(msgType int, msg []byte) error {
w.backendConnMu.Lock()
defer w.backendConnMu.Unlock()
if err := w.backendConn.SetWriteDeadline(time.Now().Add(w.writeTimeout)); err != nil {
log.Error("ws backend write timeout", "err", err)
return err
}
err := w.backendConn.WriteMessage(msgType, msg)
return err
}

Expand Down
51 changes: 48 additions & 3 deletions proxyd/integration_tests/ws_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ package integration_tests

import (
"os"
"strings"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/ethereum/go-ethereum/log"

"github.com/ethereum-optimism/optimism/proxyd"
"github.com/ethereum/go-ethereum/log"
"github.com/gorilla/websocket"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -201,7 +201,7 @@ func TestWS(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
timeout := time.NewTicker(30 * time.Second)
timeout := time.NewTicker(10 * time.Second)
doneCh := make(chan struct{}, 1)
backendHdlr.SetMsgCB(func(conn *websocket.Conn, msgType int, data []byte) {
require.NoError(t, conn.WriteMessage(websocket.TextMessage, []byte(tt.backendRes)))
Expand Down Expand Up @@ -270,3 +270,48 @@ func TestWSClientClosure(t *testing.T) {
})
}
}

func TestWSClientExceedReadLimit(t *testing.T) {
backendHdlr := new(backendHandler)
clientHdlr := new(clientHandler)

backend := NewMockWSBackend(nil, func(conn *websocket.Conn, msgType int, data []byte) {
backendHdlr.MsgCB(conn, msgType, data)
}, func(conn *websocket.Conn, err error) {
backendHdlr.CloseCB(conn, err)
})
defer backend.Close()

require.NoError(t, os.Setenv("GOOD_BACKEND_RPC_URL", backend.URL()))

config := ReadConfig("ws")
_, shutdown, err := proxyd.Start(config)
require.NoError(t, err)
defer shutdown()

client, err := NewProxydWSClient("ws://127.0.0.1:8546", func(msgType int, data []byte) {
clientHdlr.MsgCB(msgType, data)
}, nil)
require.NoError(t, err)

closed := false
originalHandler := client.conn.CloseHandler()
client.conn.SetCloseHandler(func(code int, text string) error {
closed = true
return originalHandler(code, text)
})

backendHdlr.SetMsgCB(func(conn *websocket.Conn, msgType int, data []byte) {
t.Fatalf("backend should not get the large message")
})

payload := strings.Repeat("barf", 1024*1024)
clientReq := "{\"id\": 1, \"method\": \"eth_subscribe\", \"params\": [\"" + payload + "\"]}"
err = client.WriteMessage(
websocket.TextMessage,
[]byte(clientReq),
)
require.Error(t, err)
require.True(t, closed)

}
14 changes: 10 additions & 4 deletions proxyd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/gorilla/websocket"
"github.com/prometheus/client_golang/prometheus"
"github.com/rs/cors"
"github.com/syndtr/goleveldb/leveldb/opt"
)

const (
Expand All @@ -35,7 +36,11 @@ const (
ContextKeyXForwardedFor = "x_forwarded_for"
MaxBatchRPCCallsHardLimit = 100
cacheStatusHdr = "X-Proxyd-Cache-Status"
defaultServerTimeout = time.Second * 10
defaultRPCTimeout = 10 * time.Second
defaultBodySizeLimit = 256 * opt.KiB
defaultWSHandshakeTimeout = 10 * time.Second
defaultWSReadTimeout = 2 * time.Minute
defaultWSWriteTimeout = 10 * time.Second
maxRequestBodyLogLen = 2000
defaultMaxUpstreamBatchSize = 10
)
Expand Down Expand Up @@ -92,11 +97,11 @@ func NewServer(
}

if maxBodySize == 0 {
maxBodySize = math.MaxInt64
maxBodySize = defaultBodySizeLimit
}

if timeout == 0 {
timeout = defaultServerTimeout
timeout = defaultRPCTimeout
}

if maxUpstreamBatchSize == 0 {
Expand Down Expand Up @@ -170,7 +175,7 @@ func NewServer(
maxRequestBodyLogLen: maxRequestBodyLogLen,
maxBatchSize: maxBatchSize,
upgrader: &websocket.Upgrader{
HandshakeTimeout: 5 * time.Second,
HandshakeTimeout: defaultWSHandshakeTimeout,
},
mainLim: mainLim,
overrideLims: overrideLims,
Expand Down Expand Up @@ -547,6 +552,7 @@ func (s *Server) HandleWS(w http.ResponseWriter, r *http.Request) {
log.Error("error upgrading client conn", "auth", GetAuthCtx(ctx), "req_id", GetReqID(ctx), "err", err)
return
}
clientConn.SetReadLimit(s.maxBodySize)

proxier, err := s.wsBackendGroup.ProxyWS(ctx, clientConn, s.wsMethodWhitelist)
if err != nil {
Expand Down

0 comments on commit e2054df

Please sign in to comment.