Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

graphql, node, rpc: improve HTTP write timeout handling #25457

Merged
merged 46 commits into from
Dec 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
8ea20d7
node: fix rpc write timeout
s1na Aug 1, 2022
8dcbf87
add timeout to graphql tests
s1na Aug 1, 2022
97f9d97
Revert "node: fix rpc write timeout"
s1na Aug 8, 2022
d970cf7
handle timeout through context directly in handler
s1na Aug 8, 2022
3f0f2bb
eth/filters: respect ctx timeout in unindexedLogs
s1na Aug 9, 2022
245aff9
eth/tracers: respect ctx timeout in IntermediateRoots
s1na Aug 9, 2022
07c763f
use writeTimeout, not read
s1na Aug 10, 2022
477d724
explicitly add timeout to tests
s1na Aug 10, 2022
0d0da18
improve
s1na Aug 31, 2022
1a3ae0c
alternative approach
s1na Nov 16, 2022
8f3f415
fix double write issue
s1na Nov 17, 2022
986c090
refactor handleBatch timeout path
s1na Nov 17, 2022
8ac33fe
improve timeout computation and cancellation
s1na Nov 17, 2022
d56c820
only write once
s1na Nov 17, 2022
6675df4
fix error code
s1na Nov 18, 2022
6b24bc7
add comment
s1na Nov 18, 2022
954b043
node: implement Flush in gzipResponseWriter
fjl Nov 23, 2022
5ec44c7
node: enable request deadline only when configured
fjl Nov 23, 2022
23d0c70
node: close gzip writer as soon as output is complete
fjl Nov 23, 2022
8182a9b
rpc: add content-length/flush only when sending error response
fjl Nov 23, 2022
4e56b08
node: remove context deadline in rpcstack
fjl Nov 23, 2022
8f25a47
node: improve gzipResponseWriter
fjl Nov 23, 2022
d78546e
rpc: fix batch timeouts
fjl Nov 23, 2022
521982a
rpc: disable chunked transfer-encoding for error responses
fjl Nov 24, 2022
6ee484d
rpc: update comment
fjl Nov 24, 2022
a3df90f
add basic test for http write timeout
s1na Nov 24, 2022
3ec82be
minor fix
s1na Nov 28, 2022
52b1f6c
add test case for batch timeout
s1na Nov 28, 2022
85c6fa6
update graphql timeout
s1na Nov 29, 2022
8cc7802
return error for graphql on timeout
s1na Nov 29, 2022
1a47ffd
fix http status
s1na Nov 29, 2022
e1f6498
dont compress error responses
s1na Dec 1, 2022
6ed9113
return graphql error
s1na Dec 4, 2022
3feba50
remove length tracking from gzipWriter
s1na Dec 4, 2022
2680314
Revert "remove length tracking from gzipWriter"
fjl Dec 6, 2022
177927a
node: fix some issues in gzip handler and add test
fjl Dec 6, 2022
be88406
node: update comment
fjl Dec 6, 2022
9bfe5af
rpc: rename to ContextRequestTimeout
fjl Dec 6, 2022
f834267
graphql: add comments
fjl Dec 6, 2022
e30a7cb
node: unset gz when rw closed
fjl Dec 6, 2022
507ba3d
eth/filters, eth/tracers: remove cancel checks
fjl Dec 6, 2022
6d0746c
graphql: cancel the request context on timeout
fjl Dec 6, 2022
272bc6e
node: fix typo
fjl Dec 7, 2022
3dd9134
node: reword comment
fjl Dec 7, 2022
0123227
node: less space
fjl Dec 7, 2022
41befc6
node: even more comment updates
fjl Dec 7, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions graphql/graphql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,10 +321,11 @@ func TestGraphQLTransactionLogs(t *testing.T) {

func createNode(t *testing.T) *node.Node {
stack, err := node.New(&node.Config{
HTTPHost: "127.0.0.1",
HTTPPort: 0,
WSHost: "127.0.0.1",
WSPort: 0,
HTTPHost: "127.0.0.1",
HTTPPort: 0,
WSHost: "127.0.0.1",
WSPort: 0,
HTTPTimeouts: node.DefaultConfig.HTTPTimeouts,
})
if err != nil {
t.Fatalf("could not create node: %v", err)
Expand Down
65 changes: 54 additions & 11 deletions graphql/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,16 @@ import (
"context"
"encoding/json"
"net/http"
"strconv"
"sync"
"time"

"github.com/ethereum/go-ethereum/eth/filters"
"github.com/ethereum/go-ethereum/internal/ethapi"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/rpc"
"github.com/graph-gophers/graphql-go"
gqlErrors "github.com/graph-gophers/graphql-go/errors"
)

type handler struct {
Expand All @@ -43,21 +47,60 @@ func (h handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}

ctx, cancel := context.WithTimeout(r.Context(), 60*time.Second)
var (
ctx = r.Context()
responded sync.Once
timer *time.Timer
cancel context.CancelFunc
)
ctx, cancel = context.WithCancel(ctx)
defer cancel()

response := h.Schema.Exec(ctx, params.Query, params.OperationName, params.Variables)
responseJSON, err := json.Marshal(response)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if len(response.Errors) > 0 {
w.WriteHeader(http.StatusBadRequest)
if timeout, ok := rpc.ContextRequestTimeout(ctx); ok {
timer = time.AfterFunc(timeout, func() {
responded.Do(func() {
// Cancel request handling.
cancel()

// Create the timeout response.
response := &graphql.Response{
Errors: []*gqlErrors.QueryError{{Message: "request timed out"}},
}
responseJSON, err := json.Marshal(response)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

// Setting this disables gzip compression in package node.
w.Header().Set("transfer-encoding", "identity")

// Flush the response. Since we are writing close to the response timeout,
// chunked transfer encoding must be disabled by setting content-length.
w.Header().Set("content-type", "application/json")
w.Header().Set("content-length", strconv.Itoa(len(responseJSON)))
w.Write(responseJSON)
if flush, ok := w.(http.Flusher); ok {
flush.Flush()
}
})
})
}

w.Header().Set("Content-Type", "application/json")
w.Write(responseJSON)
response := h.Schema.Exec(ctx, params.Query, params.OperationName, params.Variables)
timer.Stop()
responded.Do(func() {
responseJSON, err := json.Marshal(response)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if len(response.Errors) > 0 {
w.WriteHeader(http.StatusBadRequest)
}
w.Header().Set("Content-Type", "application/json")
w.Write(responseJSON)
})
}

// New constructs a new GraphQL service instance.
Expand Down
3 changes: 3 additions & 0 deletions node/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,9 @@ func TestStartRPC(t *testing.T) {
config := test.cfg
// config.Logger = testlog.Logger(t, log.LvlDebug)
config.P2P.NoDiscovery = true
if config.HTTPTimeouts == (rpc.HTTPTimeouts{}) {
config.HTTPTimeouts = rpc.DefaultHTTPTimeouts
}

// Create Node.
stack, err := New(&config)
Expand Down
13 changes: 7 additions & 6 deletions node/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -559,13 +559,13 @@ func (test rpcPrefixTest) check(t *testing.T, node *Node) {
}

for _, path := range test.wantHTTP {
resp := rpcRequest(t, httpBase+path)
resp := rpcRequest(t, httpBase+path, testMethod)
if resp.StatusCode != 200 {
t.Errorf("Error: %s: bad status code %d, want 200", path, resp.StatusCode)
}
}
for _, path := range test.wantNoHTTP {
resp := rpcRequest(t, httpBase+path)
resp := rpcRequest(t, httpBase+path, testMethod)
if resp.StatusCode != 404 {
t.Errorf("Error: %s: bad status code %d, want 404", path, resp.StatusCode)
}
Expand All @@ -586,10 +586,11 @@ func (test rpcPrefixTest) check(t *testing.T, node *Node) {

func createNode(t *testing.T, httpPort, wsPort int) *Node {
conf := &Config{
HTTPHost: "127.0.0.1",
HTTPPort: httpPort,
WSHost: "127.0.0.1",
WSPort: wsPort,
HTTPHost: "127.0.0.1",
HTTPPort: httpPort,
WSHost: "127.0.0.1",
WSPort: wsPort,
HTTPTimeouts: rpc.DefaultHTTPTimeouts,
}
node, err := New(conf)
if err != nil {
Expand Down
100 changes: 87 additions & 13 deletions node/rpcstack.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"net"
"net/http"
"sort"
"strconv"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -196,6 +197,7 @@ func (h *httpServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
return
}

// if http-rpc is enabled, try to serve request
rpc := h.httpHandler.Load().(*rpcHandler)
if rpc != nil {
Expand Down Expand Up @@ -462,17 +464,94 @@ var gzPool = sync.Pool{
}

type gzipResponseWriter struct {
io.Writer
http.ResponseWriter
resp http.ResponseWriter

gz *gzip.Writer
contentLength uint64 // total length of the uncompressed response
written uint64 // amount of written bytes from the uncompressed response
hasLength bool // true if uncompressed response had Content-Length
inited bool // true after init was called for the first time
}

// init runs just before response headers are written. Among other things, this function
// also decides whether compression will be applied at all.
func (w *gzipResponseWriter) init() {
if w.inited {
return
}
w.inited = true

hdr := w.resp.Header()
length := hdr.Get("content-length")
if len(length) > 0 {
if n, err := strconv.ParseUint(length, 10, 64); err != nil {
w.hasLength = true
w.contentLength = n
}
}

// Setting Transfer-Encoding to "identity" explicitly disables compression. net/http
// also recognizes this header value and uses it to disable "chunked" transfer
// encoding, trimming the header from the response. This means downstream handlers can
// set this without harm, even if they aren't wrapped by newGzipHandler.
//
// In go-ethereum, we use this signal to disable compression for certain error
// responses which are flushed out close to the write deadline of the response. For
// these cases, we want to avoid chunked transfer encoding and compression because
// they require additional output that may not get written in time.
passthrough := hdr.Get("transfer-encoding") == "identity"
if !passthrough {
w.gz = gzPool.Get().(*gzip.Writer)
w.gz.Reset(w.resp)
hdr.Del("content-length")
hdr.Set("content-encoding", "gzip")
}
}

func (w *gzipResponseWriter) Header() http.Header {
return w.resp.Header()
}

func (w *gzipResponseWriter) WriteHeader(status int) {
w.Header().Del("Content-Length")
w.ResponseWriter.WriteHeader(status)
w.init()
w.resp.WriteHeader(status)
}

func (w *gzipResponseWriter) Write(b []byte) (int, error) {
return w.Writer.Write(b)
w.init()

if w.gz == nil {
// Compression is disabled.
return w.resp.Write(b)
}

n, err := w.gz.Write(b)
w.written += uint64(n)
if w.hasLength && w.written >= w.contentLength {
// The HTTP handler has finished writing the entire uncompressed response. Close
// the gzip stream to ensure the footer will be seen by the client in case the
// response is flushed after this call to write.
err = w.gz.Close()
}
return n, err
}

func (w *gzipResponseWriter) Flush() {
if w.gz != nil {
w.gz.Flush()
}
if f, ok := w.resp.(http.Flusher); ok {
f.Flush()
}
}

func (w *gzipResponseWriter) close() {
if w.gz == nil {
return
}
w.gz.Close()
gzPool.Put(w.gz)
w.gz = nil
}

func newGzipHandler(next http.Handler) http.Handler {
Expand All @@ -482,15 +561,10 @@ func newGzipHandler(next http.Handler) http.Handler {
return
}

w.Header().Set("Content-Encoding", "gzip")

gz := gzPool.Get().(*gzip.Writer)
defer gzPool.Put(gz)

gz.Reset(w)
defer gz.Close()
wrapper := &gzipResponseWriter{resp: w}
defer wrapper.close()

next.ServeHTTP(&gzipResponseWriter{ResponseWriter: w, Writer: gz}, r)
next.ServeHTTP(wrapper, r)
})
}

Expand Down
Loading