Skip to content

Commit

Permalink
Refine the logs.
Browse files Browse the repository at this point in the history
  • Loading branch information
winlinvip committed Sep 4, 2024
1 parent 2fc5b44 commit c227165
Show file tree
Hide file tree
Showing 5 changed files with 155 additions and 113 deletions.
4 changes: 2 additions & 2 deletions proxy/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,15 +77,15 @@ func (v *httpAPI) Run(ctx context.Context) error {
// The WebRTC WHIP API handler.
logger.Df(ctx, "Handle /rtc/v1/whip/ by %v", addr)
mux.HandleFunc("/rtc/v1/whip/", func(w http.ResponseWriter, r *http.Request) {
if err := v.rtc.HandleWHIP(ctx, w, r); err != nil {
if err := v.rtc.HandleApiForWHIP(ctx, w, r); err != nil {
apiError(ctx, w, r, err)
}
})

// The WebRTC WHEP API handler.
logger.Df(ctx, "Handle /rtc/v1/whep/ by %v", addr)
mux.HandleFunc("/rtc/v1/whep/", func(w http.ResponseWriter, r *http.Request) {
if err := v.rtc.HandleWHEP(ctx, w, r); err != nil {
if err := v.rtc.HandleApiForWHEP(ctx, w, r); err != nil {
apiError(ctx, w, r, err)
}
})
Expand Down
59 changes: 39 additions & 20 deletions proxy/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,10 @@ func (v *httpServer) Run(ctx context.Context) error {
return
}

stream, _ := srsLoadBalancer.LoadOrStoreHLS(ctx, streamURL, NewHLSStreaming(func(v *HLSStreaming) {
v.SRSProxyBackendHLSID = logger.GenerateContextID()
v.StreamURL, v.FullURL = streamURL, fullURL
v.BuildContext(ctx)
stream, _ := srsLoadBalancer.LoadOrStoreHLS(ctx, streamURL, NewHLSPlayStream(func(s *HLSPlayStream) {
s.SRSProxyBackendHLSID = logger.GenerateContextID()
s.StreamURL, s.FullURL = streamURL, fullURL
s.Initialize(ctx)
}))

stream.ServeHTTP(w, r)
Expand All @@ -121,14 +121,14 @@ func (v *httpServer) Run(ctx context.Context) error {
if stream, err := srsLoadBalancer.LoadHLSBySPBHID(ctx, srsProxyBackendID); err != nil {
http.Error(w, fmt.Sprintf("load stream by spbhid %v", srsProxyBackendID), http.StatusBadRequest)
} else {
stream.ServeHTTP(w, r)
stream.Initialize(ctx).ServeHTTP(w, r)
}
return
}

// Use HTTP pseudo streaming to proxy the request.
NewHTTPStreaming(func(streaming *HTTPStreaming) {
streaming.ctx = ctx
NewHTTPFlvTsConnection(func(c *HTTPFlvTsConnection) {
c.ctx = ctx
}).ServeHTTP(w, r)
return
}
Expand All @@ -155,20 +155,26 @@ func (v *httpServer) Run(ctx context.Context) error {
return nil
}

type HTTPStreaming struct {
// HTTPFlvTsConnection is an HTTP pseudo streaming connection, such as an HTTP-FLV or HTTP-TS
// connection. There is no state need to be sync between proxy servers.
//
// When we got an HTTP FLV or TS request, we will parse the stream URL from the HTTP request,
// then proxy to the corresponding backend server. All state is in the HTTP request, so this
// connection is stateless.
type HTTPFlvTsConnection struct {
// The context for HTTP streaming.
ctx context.Context
}

func NewHTTPStreaming(opts ...func(streaming *HTTPStreaming)) *HTTPStreaming {
v := &HTTPStreaming{}
func NewHTTPFlvTsConnection(opts ...func(*HTTPFlvTsConnection)) *HTTPFlvTsConnection {
v := &HTTPFlvTsConnection{}
for _, opt := range opts {
opt(v)
}
return v
}

func (v *HTTPStreaming) ServeHTTP(w http.ResponseWriter, r *http.Request) {
func (v *HTTPFlvTsConnection) ServeHTTP(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()
ctx := logger.WithContext(v.ctx)

Expand All @@ -179,7 +185,7 @@ func (v *HTTPStreaming) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
}

func (v *HTTPStreaming) serve(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
func (v *HTTPFlvTsConnection) serve(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
// Always allow CORS for all requests.
if ok := apiCORS(ctx, w, r); ok {
return nil
Expand Down Expand Up @@ -207,7 +213,7 @@ func (v *HTTPStreaming) serve(ctx context.Context, w http.ResponseWriter, r *htt
return nil
}

func (v *HTTPStreaming) serveByBackend(ctx context.Context, w http.ResponseWriter, r *http.Request, backend *SRSServer) error {
func (v *HTTPFlvTsConnection) serveByBackend(ctx context.Context, w http.ResponseWriter, r *http.Request, backend *SRSServer) error {
// Parse HTTP port from backend.
if len(backend.HTTP) == 0 {
return errors.Errorf("no http stream server")
Expand Down Expand Up @@ -255,7 +261,14 @@ func (v *HTTPStreaming) serveByBackend(ctx context.Context, w http.ResponseWrite
return nil
}

type HLSStreaming struct {
// HLSPlayStream is an HLS stream proxy, which represents the stream level object. This means multiple HLS
// clients will share this object, and they use the same ctx among proxy servers.
//
// Unlike the HTTP FLV or TS connection, HLS client may request the m3u8 or ts via different HTTP connections.
// Especially for requesting ts, we need to identify the stream URl or backend server for it. So we create
// the spbhid which can be seen as the hash of stream URL or backend server. The spbhid enable us to convert
// to the stream URL and then query the backend server to serve it.
type HLSPlayStream struct {
// The context for HLS streaming.
ctx context.Context
// The context ID for recovering the context.
Expand All @@ -269,22 +282,28 @@ type HLSStreaming struct {
FullURL string `json:"full_url"`
}

func NewHLSStreaming(opts ...func(streaming *HLSStreaming)) *HLSStreaming {
v := &HLSStreaming{}
func NewHLSPlayStream(opts ...func(*HLSPlayStream)) *HLSPlayStream {
v := &HLSPlayStream{}
for _, opt := range opts {
opt(v)
}
return v
}

func (v *HLSStreaming) BuildContext(ctx context.Context) {
func (v *HLSPlayStream) Initialize(ctx context.Context) *HLSPlayStream {
if v.ctx != nil && v.ContextID != "" {
return v
}

if v.ContextID == "" {
v.ContextID = logger.GenerateContextID()
}
v.ctx = logger.WithContextID(ctx, v.ContextID)

return v
}

func (v *HLSStreaming) ServeHTTP(w http.ResponseWriter, r *http.Request) {
func (v *HLSPlayStream) ServeHTTP(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()

if err := v.serve(v.ctx, w, r); err != nil {
Expand All @@ -295,7 +314,7 @@ func (v *HLSStreaming) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
}

func (v *HLSStreaming) serve(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
func (v *HLSPlayStream) serve(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
ctx, streamURL, fullURL := v.ctx, v.StreamURL, v.FullURL

// Always allow CORS for all requests.
Expand All @@ -316,7 +335,7 @@ func (v *HLSStreaming) serve(ctx context.Context, w http.ResponseWriter, r *http
return nil
}

func (v *HLSStreaming) serveByBackend(ctx context.Context, w http.ResponseWriter, r *http.Request, backend *SRSServer) error {
func (v *HLSPlayStream) serveByBackend(ctx context.Context, w http.ResponseWriter, r *http.Request, backend *SRSServer) error {
// Parse HTTP port from backend.
if len(backend.HTTP) == 0 {
return errors.Errorf("no rtmp server")
Expand Down
Loading

0 comments on commit c227165

Please sign in to comment.