From f7bd98b13c739303a46a6d15d8e0d482123384df Mon Sep 17 00:00:00 2001 From: RPRX <63339210+RPRX@users.noreply.github.com> Date: Wed, 27 Nov 2024 20:19:18 +0000 Subject: [PATCH] XHTTP: Add "stream-one" mode for client & server (#4071) ""Breaking"": Client uses "stream-one" mode by default when using **REALITY** ("stream-up" if "downloadSettings" exists) --- infra/conf/transport_internet.go | 7 +- .../internet/splithttp/browser_client.go | 4 ++ transport/internet/splithttp/client.go | 65 +++++++++++++++++++ transport/internet/splithttp/dialer.go | 42 ++++++++---- transport/internet/splithttp/hub.go | 48 ++++++++++---- 5 files changed, 141 insertions(+), 25 deletions(-) diff --git a/infra/conf/transport_internet.go b/infra/conf/transport_internet.go index 83deadbf45dc..a87a6ecb7371 100644 --- a/infra/conf/transport_internet.go +++ b/infra/conf/transport_internet.go @@ -307,7 +307,7 @@ func (c *SplitHTTPConfig) Build() (proto.Message, error) { switch c.Mode { case "": c.Mode = "auto" - case "auto", "packet-up", "stream-up": + case "auto", "packet-up", "stream-up", "stream-one": default: return nil, errors.New("unsupported mode: " + c.Mode) } @@ -327,6 +327,9 @@ func (c *SplitHTTPConfig) Build() (proto.Message, error) { } var err error if c.DownloadSettings != nil { + if c.Mode == "stream-one" { + return nil, errors.New(`Can not use "downloadSettings" in "stream-one" mode.`) + } if c.Extra != nil { c.DownloadSettings.SocketSettings = nil } @@ -707,8 +710,10 @@ func (p TransportProtocol) Build() (string, error) { case "ws", "websocket": return "websocket", nil case "h2", "h3", "http": + errors.PrintDeprecatedFeatureWarning("HTTP transport", "XHTTP transport") return "http", nil case "grpc": + errors.PrintMigrateFeatureInfo("gRPC transport", "XHTTP transport") return "grpc", nil case "httpupgrade": return "httpupgrade", nil diff --git a/transport/internet/splithttp/browser_client.go b/transport/internet/splithttp/browser_client.go index 14ecaaf15871..d56ba139e55b 100644 --- a/transport/internet/splithttp/browser_client.go +++ b/transport/internet/splithttp/browser_client.go @@ -14,6 +14,10 @@ import ( // has no fields because everything is global state :O) type BrowserDialerClient struct{} +func (c *BrowserDialerClient) Open(ctx context.Context, pureURL string) (io.WriteCloser, io.ReadCloser) { + panic("not implemented yet") +} + func (c *BrowserDialerClient) OpenUpload(ctx context.Context, baseURL string) io.WriteCloser { panic("not implemented yet") } diff --git a/transport/internet/splithttp/client.go b/transport/internet/splithttp/client.go index 77b88fa49dff..baeac3caed20 100644 --- a/transport/internet/splithttp/client.go +++ b/transport/internet/splithttp/client.go @@ -29,6 +29,10 @@ type DialerClient interface { // (ctx, baseURL) -> uploadWriter // baseURL already contains sessionId OpenUpload(context.Context, string) io.WriteCloser + + // (ctx, pureURL) -> (uploadWriter, downloadReader) + // pureURL can not contain sessionId + Open(context.Context, string) (io.WriteCloser, io.ReadCloser) } // implements splithttp.DialerClient in terms of direct network connections @@ -42,6 +46,30 @@ type DefaultDialerClient struct { dialUploadConn func(ctxInner context.Context) (net.Conn, error) } +func (c *DefaultDialerClient) Open(ctx context.Context, pureURL string) (io.WriteCloser, io.ReadCloser) { + reader, writer := io.Pipe() + req, _ := http.NewRequestWithContext(ctx, "POST", pureURL, reader) + req.Header = c.transportConfig.GetRequestHeader() + if !c.transportConfig.NoGRPCHeader { + req.Header.Set("Content-Type", "application/grpc") + } + wrc := &WaitReadCloser{Wait: make(chan struct{})} + go func() { + response, err := c.client.Do(req) + if err != nil || response.StatusCode != 200 { + if err != nil { + errors.LogInfoInner(ctx, err, "failed to open ", pureURL) + } else { + errors.LogInfo(ctx, "unexpected status ", response.StatusCode) + } + wrc.Close() + return + } + wrc.Set(response.Body) + }() + return writer, wrc +} + func (c *DefaultDialerClient) OpenUpload(ctx context.Context, baseURL string) io.WriteCloser { reader, writer := io.Pipe() req, _ := http.NewRequestWithContext(ctx, "POST", baseURL, reader) @@ -226,3 +254,40 @@ func (c downloadBody) Close() error { c.cancel() return nil } + +type WaitReadCloser struct { + Wait chan struct{} + io.ReadCloser +} + +func (w *WaitReadCloser) Set(rc io.ReadCloser) { + w.ReadCloser = rc + defer func() { + if recover() != nil { + rc.Close() + } + }() + close(w.Wait) +} + +func (w *WaitReadCloser) Read(b []byte) (int, error) { + if w.ReadCloser == nil { + if <-w.Wait; w.ReadCloser == nil { + return 0, io.ErrClosedPipe + } + } + return w.ReadCloser.Read(b) +} + +func (w *WaitReadCloser) Close() error { + if w.ReadCloser != nil { + return w.ReadCloser.Close() + } + defer func() { + if recover() != nil && w.ReadCloser != nil { + w.ReadCloser.Close() + } + }() + close(w.Wait) + return nil +} diff --git a/transport/internet/splithttp/dialer.go b/transport/internet/splithttp/dialer.go index 6a4484de8073..df83bd92c912 100644 --- a/transport/internet/splithttp/dialer.go +++ b/transport/internet/splithttp/dialer.go @@ -3,6 +3,7 @@ package splithttp import ( "context" gotls "crypto/tls" + "io" "net/http" "net/url" "strconv" @@ -279,9 +280,33 @@ func Dial(ctx context.Context, dest net.Destination, streamSettings *internet.Me requestURL2.RawQuery = config2.GetNormalizedQuery() } - reader, remoteAddr, localAddr, err := httpClient2.OpenDownload(context.WithoutCancel(ctx), requestURL2.String()) - if err != nil { - return nil, err + mode := transportConfiguration.Mode + if mode == "" || mode == "auto" { + mode = "packet-up" + if (tlsConfig != nil && (len(tlsConfig.NextProtocol) != 1 || tlsConfig.NextProtocol[0] == "h2")) || realityConfig != nil { + mode = "stream-up" + } + if realityConfig != nil && transportConfiguration.DownloadSettings == nil { + mode = "stream-one" + } + } + errors.LogInfo(ctx, "XHTTP is using mode: "+mode) + + var writer io.WriteCloser + var reader io.ReadCloser + var remoteAddr, localAddr net.Addr + var err error + + if mode == "stream-one" { + requestURL.Path = transportConfiguration.GetNormalizedPath() + writer, reader = httpClient.Open(context.WithoutCancel(ctx), requestURL.String()) + remoteAddr = &net.TCPAddr{} + localAddr = &net.TCPAddr{} + } else { + reader, remoteAddr, localAddr, err = httpClient2.OpenDownload(context.WithoutCancel(ctx), requestURL2.String()) + if err != nil { + return nil, err + } } if muxRes != nil { @@ -293,7 +318,7 @@ func Dial(ctx context.Context, dest net.Destination, streamSettings *internet.Me closed := false conn := splitConn{ - writer: nil, + writer: writer, reader: reader, remoteAddr: remoteAddr, localAddr: localAddr, @@ -311,14 +336,9 @@ func Dial(ctx context.Context, dest net.Destination, streamSettings *internet.Me }, } - mode := transportConfiguration.Mode - if mode == "auto" { - mode = "packet-up" - if (tlsConfig != nil && len(tlsConfig.NextProtocol) != 1) || realityConfig != nil { - mode = "stream-up" - } + if mode == "stream-one" { + return stat.Connection(&conn), nil } - errors.LogInfo(ctx, "XHTTP is using mode: "+mode) if mode == "stream-up" { conn.writer = httpClient.OpenUpload(ctx, requestURL.String()) return stat.Connection(&conn), nil diff --git a/transport/internet/splithttp/hub.go b/transport/internet/splithttp/hub.go index 58113f0228c3..f1002018d2b3 100644 --- a/transport/internet/splithttp/hub.go +++ b/transport/internet/splithttp/hub.go @@ -102,14 +102,22 @@ func (h *requestHandler) ServeHTTP(writer http.ResponseWriter, request *http.Req h.config.WriteResponseHeader(writer) + validRange := h.config.GetNormalizedXPaddingBytes() + x_padding := int32(len(request.URL.Query().Get("x_padding"))) + if validRange.To > 0 && (x_padding < validRange.From || x_padding > validRange.To) { + errors.LogInfo(context.Background(), "invalid x_padding length:", x_padding) + writer.WriteHeader(http.StatusBadRequest) + return + } + sessionId := "" subpath := strings.Split(request.URL.Path[len(h.path):], "/") if len(subpath) > 0 { sessionId = subpath[0] } - if sessionId == "" { - errors.LogInfo(context.Background(), "no sessionid on request:", request.URL.Path) + if sessionId == "" && h.config.Mode != "" && h.config.Mode != "auto" && h.config.Mode != "stream-one" && h.config.Mode != "stream-up" { + errors.LogInfo(context.Background(), "stream-one mode is not allowed") writer.WriteHeader(http.StatusBadRequest) return } @@ -126,17 +134,20 @@ func (h *requestHandler) ServeHTTP(writer http.ResponseWriter, request *http.Req } } - currentSession := h.upsertSession(sessionId) + var currentSession *httpSession + if sessionId != "" { + currentSession = h.upsertSession(sessionId) + } scMaxEachPostBytes := int(h.ln.config.GetNormalizedScMaxEachPostBytes().To) - if request.Method == "POST" { + if request.Method == "POST" && sessionId != "" { seq := "" if len(subpath) > 1 { seq = subpath[1] } if seq == "" { - if h.config.Mode == "packet-up" { + if h.config.Mode != "" && h.config.Mode != "auto" && h.config.Mode != "stream-up" { errors.LogInfo(context.Background(), "stream-up mode is not allowed") writer.WriteHeader(http.StatusBadRequest) return @@ -148,13 +159,16 @@ func (h *requestHandler) ServeHTTP(writer http.ResponseWriter, request *http.Req errors.LogInfoInner(context.Background(), err, "failed to upload (PushReader)") writer.WriteHeader(http.StatusConflict) } else { + if request.Header.Get("Content-Type") == "application/grpc" { + writer.Header().Set("Content-Type", "application/grpc") + } writer.WriteHeader(http.StatusOK) <-request.Context().Done() } return } - if h.config.Mode == "stream-up" { + if h.config.Mode != "" && h.config.Mode != "auto" && h.config.Mode != "packet-up" { errors.LogInfo(context.Background(), "packet-up mode is not allowed") writer.WriteHeader(http.StatusBadRequest) return @@ -193,16 +207,18 @@ func (h *requestHandler) ServeHTTP(writer http.ResponseWriter, request *http.Req } writer.WriteHeader(http.StatusOK) - } else if request.Method == "GET" { + } else if request.Method == "GET" || sessionId == "" { responseFlusher, ok := writer.(http.Flusher) if !ok { panic("expected http.ResponseWriter to be an http.Flusher") } - // after GET is done, the connection is finished. disable automatic - // session reaping, and handle it in defer - currentSession.isFullyConnected.Close() - defer h.sessions.Delete(sessionId) + if sessionId != "" { + // after GET is done, the connection is finished. disable automatic + // session reaping, and handle it in defer + currentSession.isFullyConnected.Close() + defer h.sessions.Delete(sessionId) + } // magic header instructs nginx + apache to not buffer response body writer.Header().Set("X-Accel-Buffering", "no") @@ -210,7 +226,10 @@ func (h *requestHandler) ServeHTTP(writer http.ResponseWriter, request *http.Req // Should be able to prevent overloading the cache, or stop CDNs from // teeing the response stream into their cache, causing slowdowns. writer.Header().Set("Cache-Control", "no-store") - if !h.config.NoSSEHeader { + + if request.Header.Get("Content-Type") == "application/grpc" { + writer.Header().Set("Content-Type", "application/grpc") + } else if !h.config.NoSSEHeader { // magic header to make the HTTP middle box consider this as SSE to disable buffer writer.Header().Set("Content-Type", "text/event-stream") } @@ -227,9 +246,12 @@ func (h *requestHandler) ServeHTTP(writer http.ResponseWriter, request *http.Req downloadDone: downloadDone, responseFlusher: responseFlusher, }, - reader: currentSession.uploadQueue, + reader: request.Body, remoteAddr: remoteAddr, } + if sessionId != "" { + conn.reader = currentSession.uploadQueue + } h.ln.addConn(stat.Connection(&conn))