Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

graphql, node, rpc: improve HTTP write timeout handling #25457

Merged
merged 46 commits into from
Dec 7, 2022
Merged
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
8ea20d7
node: fix rpc write timeout
s1na Aug 1, 2022
8dcbf87
add timeout to graphql tests
s1na Aug 1, 2022
97f9d97
Revert "node: fix rpc write timeout"
s1na Aug 8, 2022
d970cf7
handle timeout through context directly in handler
s1na Aug 8, 2022
3f0f2bb
eth/filters: respect ctx timeout in unindexedLogs
s1na Aug 9, 2022
245aff9
eth/tracers: respect ctx timeout in IntermediateRoots
s1na Aug 9, 2022
07c763f
use writeTimeout, not read
s1na Aug 10, 2022
477d724
explicitly add timeout to tests
s1na Aug 10, 2022
0d0da18
improve
s1na Aug 31, 2022
1a3ae0c
alternative approach
s1na Nov 16, 2022
8f3f415
fix double write issue
s1na Nov 17, 2022
986c090
refactor handleBatch timeout path
s1na Nov 17, 2022
8ac33fe
improve timeout computation and cancellation
s1na Nov 17, 2022
d56c820
only write once
s1na Nov 17, 2022
6675df4
fix error code
s1na Nov 18, 2022
6b24bc7
add comment
s1na Nov 18, 2022
954b043
node: implement Flush in gzipResponseWriter
fjl Nov 23, 2022
5ec44c7
node: enable request deadline only when configured
fjl Nov 23, 2022
23d0c70
node: close gzip writer as soon as output is complete
fjl Nov 23, 2022
8182a9b
rpc: add content-length/flush only when sending error response
fjl Nov 23, 2022
4e56b08
node: remove context deadline in rpcstack
fjl Nov 23, 2022
8f25a47
node: improve gzipResponseWriter
fjl Nov 23, 2022
d78546e
rpc: fix batch timeouts
fjl Nov 23, 2022
521982a
rpc: disable chunked transfer-encoding for error responses
fjl Nov 24, 2022
6ee484d
rpc: update comment
fjl Nov 24, 2022
a3df90f
add basic test for http write timeout
s1na Nov 24, 2022
3ec82be
minor fix
s1na Nov 28, 2022
52b1f6c
add test case for batch timeout
s1na Nov 28, 2022
85c6fa6
update graphql timeout
s1na Nov 29, 2022
8cc7802
return error for graphql on timeout
s1na Nov 29, 2022
1a47ffd
fix http status
s1na Nov 29, 2022
e1f6498
dont compress error responses
s1na Dec 1, 2022
6ed9113
return graphql error
s1na Dec 4, 2022
3feba50
remove length tracking from gzipWriter
s1na Dec 4, 2022
2680314
Revert "remove length tracking from gzipWriter"
fjl Dec 6, 2022
177927a
node: fix some issues in gzip handler and add test
fjl Dec 6, 2022
be88406
node: update comment
fjl Dec 6, 2022
9bfe5af
rpc: rename to ContextRequestTimeout
fjl Dec 6, 2022
f834267
graphql: add comments
fjl Dec 6, 2022
e30a7cb
node: unset gz when rw closed
fjl Dec 6, 2022
507ba3d
eth/filters, eth/tracers: remove cancel checks
fjl Dec 6, 2022
6d0746c
graphql: cancel the request context on timeout
fjl Dec 6, 2022
272bc6e
node: fix typo
fjl Dec 7, 2022
3dd9134
node: reword comment
fjl Dec 7, 2022
0123227
node: less space
fjl Dec 7, 2022
41befc6
node: even more comment updates
fjl Dec 7, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions eth/filters/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,9 @@ func (f *Filter) unindexedLogs(ctx context.Context, end uint64) ([]*types.Log, e
var logs []*types.Log

for ; f.begin <= int64(end); f.begin++ {
if f.begin%10 == 0 && ctx.Err() != nil {
return logs, ctx.Err()
}
header, err := f.sys.backend.HeaderByNumber(ctx, rpc.BlockNumber(f.begin))
if header == nil || err != nil {
return logs, err
Expand Down
3 changes: 3 additions & 0 deletions eth/tracers/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -547,6 +547,9 @@ func (api *API) IntermediateRoots(ctx context.Context, hash common.Hash, config
deleteEmptyObjects = chainConfig.IsEIP158(block.Number())
)
for i, tx := range block.Transactions() {
if err := ctx.Err(); err != nil {
return nil, err
}
var (
msg, _ = tx.AsMessage(signer, block.BaseFee())
txContext = core.NewEVMTxContext(msg)
Expand Down
9 changes: 5 additions & 4 deletions graphql/graphql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,10 +321,11 @@ func TestGraphQLTransactionLogs(t *testing.T) {

func createNode(t *testing.T) *node.Node {
stack, err := node.New(&node.Config{
HTTPHost: "127.0.0.1",
HTTPPort: 0,
WSHost: "127.0.0.1",
WSPort: 0,
HTTPHost: "127.0.0.1",
HTTPPort: 0,
WSHost: "127.0.0.1",
WSPort: 0,
HTTPTimeouts: node.DefaultConfig.HTTPTimeouts,
})
if err != nil {
t.Fatalf("could not create node: %v", err)
Expand Down
3 changes: 3 additions & 0 deletions node/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,9 @@ func TestStartRPC(t *testing.T) {
config := test.cfg
// config.Logger = testlog.Logger(t, log.LvlDebug)
config.P2P.NoDiscovery = true
if config.HTTPTimeouts == (rpc.HTTPTimeouts{}) {
config.HTTPTimeouts = rpc.DefaultHTTPTimeouts
}

// Create Node.
stack, err := New(&config)
Expand Down
13 changes: 7 additions & 6 deletions node/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -559,13 +559,13 @@ func (test rpcPrefixTest) check(t *testing.T, node *Node) {
}

for _, path := range test.wantHTTP {
resp := rpcRequest(t, httpBase+path)
resp := rpcRequest(t, httpBase+path, testMethod)
if resp.StatusCode != 200 {
t.Errorf("Error: %s: bad status code %d, want 200", path, resp.StatusCode)
}
}
for _, path := range test.wantNoHTTP {
resp := rpcRequest(t, httpBase+path)
resp := rpcRequest(t, httpBase+path, testMethod)
if resp.StatusCode != 404 {
t.Errorf("Error: %s: bad status code %d, want 404", path, resp.StatusCode)
}
Expand All @@ -586,10 +586,11 @@ func (test rpcPrefixTest) check(t *testing.T, node *Node) {

func createNode(t *testing.T, httpPort, wsPort int) *Node {
conf := &Config{
HTTPHost: "127.0.0.1",
HTTPPort: httpPort,
WSHost: "127.0.0.1",
WSPort: wsPort,
HTTPHost: "127.0.0.1",
HTTPPort: httpPort,
WSHost: "127.0.0.1",
WSPort: wsPort,
HTTPTimeouts: rpc.DefaultHTTPTimeouts,
}
node, err := New(conf)
if err != nil {
Expand Down
62 changes: 54 additions & 8 deletions node/rpcstack.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"net"
"net/http"
"sort"
"strconv"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -196,6 +197,7 @@ func (h *httpServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
return
}

// if http-rpc is enabled, try to serve request
rpc := h.httpHandler.Load().(*rpcHandler)
if rpc != nil {
Expand Down Expand Up @@ -462,17 +464,61 @@ var gzPool = sync.Pool{
}

type gzipResponseWriter struct {
io.Writer
http.ResponseWriter
gz *gzip.Writer
resp http.ResponseWriter

contentLength uint64 // total length of the uncompressed response
written uint64 // amount of written bytes from the uncompressed response
hasLength bool // true if uncompressed response had Content-Length
inited bool // true after initBuffering called first time
}

// initContentLength checks if the uncompressed response has a content-length header.
func (w *gzipResponseWriter) initContentLength() {
if w.inited {
return
}
w.inited = true

hdr := w.resp.Header()
length := hdr.Get("content-length")
hdr.Del("content-length")
if len(length) > 0 {
if n, err := strconv.ParseUint(length, 10, 64); err != nil {
w.hasLength = true
w.contentLength = n
}
}
}

func (w *gzipResponseWriter) Header() http.Header {
return w.resp.Header()
}

func (w *gzipResponseWriter) WriteHeader(status int) {
w.Header().Del("Content-Length")
w.ResponseWriter.WriteHeader(status)
w.initContentLength()
w.resp.WriteHeader(status)
}

func (w *gzipResponseWriter) Write(b []byte) (int, error) {
return w.Writer.Write(b)
w.initContentLength()

n, err := w.gz.Write(b)
w.written += uint64(n)
if w.hasLength && w.written >= w.contentLength {
// The HTTP handler has finished writing the entire uncompressed response. Close
// the gzip stream to ensure the footer will be seen by the client if the response
// is flushed after this call to write.
err = w.gz.Close()
}
return n, err
}

func (w *gzipResponseWriter) Flush() {
w.gz.Flush()
if f, ok := w.resp.(http.Flusher); ok {
f.Flush()
}
}

func newGzipHandler(next http.Handler) http.Handler {
Expand All @@ -482,15 +528,15 @@ func newGzipHandler(next http.Handler) http.Handler {
return
}

w.Header().Set("Content-Encoding", "gzip")
w.Header().Set("content-encoding", "gzip")

gz := gzPool.Get().(*gzip.Writer)
defer gzPool.Put(gz)

gz.Reset(w)
defer gz.Close()

next.ServeHTTP(&gzipResponseWriter{ResponseWriter: w, Writer: gz}, r)
wrapper := &gzipResponseWriter{resp: w, gz: gz}
next.ServeHTTP(wrapper, r)
})
}

Expand Down
112 changes: 97 additions & 15 deletions node/rpcstack_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package node
import (
"bytes"
"fmt"
"io"
"net/http"
"net/url"
"strconv"
Expand All @@ -34,29 +35,31 @@ import (
"github.com/stretchr/testify/assert"
)

const testMethod = "rpc_modules"

// TestCorsHandler makes sure CORS are properly handled on the http server.
func TestCorsHandler(t *testing.T) {
srv := createAndStartServer(t, &httpConfig{CorsAllowedOrigins: []string{"test", "test.com"}}, false, &wsConfig{})
srv := createAndStartServer(t, &httpConfig{CorsAllowedOrigins: []string{"test", "test.com"}}, false, &wsConfig{}, nil)
defer srv.stop()
url := "http://" + srv.listenAddr()

resp := rpcRequest(t, url, "origin", "test.com")
resp := rpcRequest(t, url, testMethod, "origin", "test.com")
assert.Equal(t, "test.com", resp.Header.Get("Access-Control-Allow-Origin"))

resp2 := rpcRequest(t, url, "origin", "bad")
resp2 := rpcRequest(t, url, testMethod, "origin", "bad")
assert.Equal(t, "", resp2.Header.Get("Access-Control-Allow-Origin"))
}

// TestVhosts makes sure vhosts are properly handled on the http server.
func TestVhosts(t *testing.T) {
srv := createAndStartServer(t, &httpConfig{Vhosts: []string{"test"}}, false, &wsConfig{})
srv := createAndStartServer(t, &httpConfig{Vhosts: []string{"test"}}, false, &wsConfig{}, nil)
defer srv.stop()
url := "http://" + srv.listenAddr()

resp := rpcRequest(t, url, "host", "test")
resp := rpcRequest(t, url, testMethod, "host", "test")
assert.Equal(t, resp.StatusCode, http.StatusOK)

resp2 := rpcRequest(t, url, "host", "bad")
resp2 := rpcRequest(t, url, testMethod, "host", "bad")
assert.Equal(t, resp2.StatusCode, http.StatusForbidden)
}

Expand Down Expand Up @@ -145,7 +148,7 @@ func TestWebsocketOrigins(t *testing.T) {
},
}
for _, tc := range tests {
srv := createAndStartServer(t, &httpConfig{}, true, &wsConfig{Origins: splitAndTrim(tc.spec)})
srv := createAndStartServer(t, &httpConfig{}, true, &wsConfig{Origins: splitAndTrim(tc.spec)}, nil)
url := fmt.Sprintf("ws://%v", srv.listenAddr())
for _, origin := range tc.expOk {
if err := wsRequest(t, url, "Origin", origin); err != nil {
Expand Down Expand Up @@ -231,11 +234,14 @@ func Test_checkPath(t *testing.T) {
}
}

func createAndStartServer(t *testing.T, conf *httpConfig, ws bool, wsConf *wsConfig) *httpServer {
func createAndStartServer(t *testing.T, conf *httpConfig, ws bool, wsConf *wsConfig, timeouts *rpc.HTTPTimeouts) *httpServer {
t.Helper()

srv := newHTTPServer(testlog.Logger(t, log.LvlDebug), rpc.DefaultHTTPTimeouts)
assert.NoError(t, srv.enableRPC(nil, *conf))
if timeouts == nil {
timeouts = &rpc.DefaultHTTPTimeouts
}
srv := newHTTPServer(testlog.Logger(t, log.LvlDebug), *timeouts)
assert.NoError(t, srv.enableRPC(apis(), *conf))
if ws {
assert.NoError(t, srv.enableWS(nil, *wsConf))
}
Expand Down Expand Up @@ -266,16 +272,33 @@ func wsRequest(t *testing.T, url string, extraHeaders ...string) error {
}

// rpcRequest performs a JSON-RPC request to the given URL.
func rpcRequest(t *testing.T, url string, extraHeaders ...string) *http.Response {
func rpcRequest(t *testing.T, url, method string, extraHeaders ...string) *http.Response {
t.Helper()

body := fmt.Sprintf(`{"jsonrpc":"2.0","id":1,"method":"%s","params":[]}`, method)
return baseRpcRequest(t, url, body, extraHeaders...)
}

func batchRpcRequest(t *testing.T, url string, methods []string, extraHeaders ...string) *http.Response {
reqs := make([]string, len(methods))
for i, m := range methods {
reqs[i] = fmt.Sprintf(`{"jsonrpc":"2.0","id":1,"method":"%s","params":[]}`, m)
}
body := fmt.Sprintf(`[%s]`, strings.Join(reqs, ","))
return baseRpcRequest(t, url, body, extraHeaders...)
}

func baseRpcRequest(t *testing.T, url, bodyStr string, extraHeaders ...string) *http.Response {
t.Helper()

// Create the request.
body := bytes.NewReader([]byte(`{"jsonrpc":"2.0","id":1,"method":"rpc_modules","params":[]}`))
body := bytes.NewReader([]byte(bodyStr))
req, err := http.NewRequest("POST", url, body)
if err != nil {
t.Fatal("could not create http request:", err)
}
req.Header.Set("content-type", "application/json")
req.Header.Set("accept-encoding", "identity")

// Apply extra headers.
if len(extraHeaders)%2 != 0 {
Expand Down Expand Up @@ -315,7 +338,7 @@ func TestJWT(t *testing.T) {
return ss
}
srv := createAndStartServer(t, &httpConfig{jwtSecret: []byte("secret")},
true, &wsConfig{Origins: []string{"*"}, jwtSecret: []byte("secret")})
true, &wsConfig{Origins: []string{"*"}, jwtSecret: []byte("secret")}, nil)
wsUrl := fmt.Sprintf("ws://%v", srv.listenAddr())
htUrl := fmt.Sprintf("http://%v", srv.listenAddr())

Expand Down Expand Up @@ -348,7 +371,7 @@ func TestJWT(t *testing.T) {
t.Errorf("test %d-ws, token '%v': expected ok, got %v", i, token, err)
}
token = tokenFn()
if resp := rpcRequest(t, htUrl, "Authorization", token); resp.StatusCode != 200 {
if resp := rpcRequest(t, htUrl, testMethod, "Authorization", token); resp.StatusCode != 200 {
t.Errorf("test %d-http, token '%v': expected ok, got %v", i, token, resp.StatusCode)
}
}
Expand Down Expand Up @@ -414,10 +437,69 @@ func TestJWT(t *testing.T) {
}

token = tokenFn()
resp := rpcRequest(t, htUrl, "Authorization", token)
resp := rpcRequest(t, htUrl, testMethod, "Authorization", token)
if resp.StatusCode != http.StatusUnauthorized {
t.Errorf("tc %d-http, token '%v': expected not to allow, got %v", i, token, resp.StatusCode)
}
}
srv.stop()
}

func TestHTTPWriteTimeout(t *testing.T) {
const (
timeoutRes = `{"jsonrpc":"2.0","id":1,"error":{"code":-32002,"message":"request timed out"}}`
greetRes = `{"jsonrpc":"2.0","id":1,"result":"Hello"}`
)
// Set-up server
timeouts := rpc.DefaultHTTPTimeouts
timeouts.WriteTimeout = time.Second
srv := createAndStartServer(t, &httpConfig{Modules: []string{"test"}}, false, &wsConfig{}, &timeouts)
url := fmt.Sprintf("http://%v", srv.listenAddr())

// Send normal request
t.Run("message", func(t *testing.T) {
resp := rpcRequest(t, url, "test_sleep")
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
t.Fatal(err)
}
if string(body) != timeoutRes {
t.Errorf("wrong response. have %s, want %s", string(body), timeoutRes)
}
})

// Batch request
t.Run("batch", func(t *testing.T) {
want := fmt.Sprintf("[%s,%s,%s]", greetRes, timeoutRes, timeoutRes)
resp := batchRpcRequest(t, url, []string{"test_greet", "test_sleep", "test_greet"})
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
t.Fatal(err)
}
if string(body) != want {
t.Errorf("wrong response. have %s, want %s", string(body), want)
}
})
}

func apis() []rpc.API {
return []rpc.API{
{
Namespace: "test",
Service: &testService{},
},
}
}

type testService struct {
}

func (s *testService) Greet() string {
return "Hello"
}

func (s *testService) Sleep() {
time.Sleep(1500 * time.Millisecond)
}
2 changes: 1 addition & 1 deletion p2p/simulations/adapters/inproc.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ func (sn *SimNode) ServeRPC(conn *websocket.Conn) error {
if err != nil {
return err
}
codec := rpc.NewFuncCodec(conn, conn.WriteJSON, conn.ReadJSON)
codec := rpc.NewFuncCodec(conn, func(v any, _ bool) error { return conn.WriteJSON(v) }, conn.ReadJSON)
handler.ServeCodec(codec, 0)
return nil
}
Expand Down
Loading