diff --git a/proxy/api.go b/proxy/api.go index 8e977c0f7c4..26799c907b3 100644 --- a/proxy/api.go +++ b/proxy/api.go @@ -77,7 +77,7 @@ func (v *httpAPI) Run(ctx context.Context) error { // The WebRTC WHIP API handler. logger.Df(ctx, "Handle /rtc/v1/whip/ by %v", addr) mux.HandleFunc("/rtc/v1/whip/", func(w http.ResponseWriter, r *http.Request) { - if err := v.rtc.HandleWHIP(ctx, w, r); err != nil { + if err := v.rtc.HandleApiForWHIP(ctx, w, r); err != nil { apiError(ctx, w, r, err) } }) @@ -85,7 +85,7 @@ func (v *httpAPI) Run(ctx context.Context) error { // The WebRTC WHEP API handler. logger.Df(ctx, "Handle /rtc/v1/whep/ by %v", addr) mux.HandleFunc("/rtc/v1/whep/", func(w http.ResponseWriter, r *http.Request) { - if err := v.rtc.HandleWHEP(ctx, w, r); err != nil { + if err := v.rtc.HandleApiForWHEP(ctx, w, r); err != nil { apiError(ctx, w, r, err) } }) diff --git a/proxy/http.go b/proxy/http.go index ed89c4acb55..cc58fe80902 100644 --- a/proxy/http.go +++ b/proxy/http.go @@ -103,10 +103,10 @@ func (v *httpServer) Run(ctx context.Context) error { return } - stream, _ := srsLoadBalancer.LoadOrStoreHLS(ctx, streamURL, NewHLSStreaming(func(v *HLSStreaming) { - v.SRSProxyBackendHLSID = logger.GenerateContextID() - v.StreamURL, v.FullURL = streamURL, fullURL - v.BuildContext(ctx) + stream, _ := srsLoadBalancer.LoadOrStoreHLS(ctx, streamURL, NewHLSPlayStream(func(s *HLSPlayStream) { + s.SRSProxyBackendHLSID = logger.GenerateContextID() + s.StreamURL, s.FullURL = streamURL, fullURL + s.Initialize(ctx) })) stream.ServeHTTP(w, r) @@ -121,14 +121,14 @@ func (v *httpServer) Run(ctx context.Context) error { if stream, err := srsLoadBalancer.LoadHLSBySPBHID(ctx, srsProxyBackendID); err != nil { http.Error(w, fmt.Sprintf("load stream by spbhid %v", srsProxyBackendID), http.StatusBadRequest) } else { - stream.ServeHTTP(w, r) + stream.Initialize(ctx).ServeHTTP(w, r) } return } // Use HTTP pseudo streaming to proxy the request. - NewHTTPStreaming(func(streaming *HTTPStreaming) { - streaming.ctx = ctx + NewHTTPFlvTsConnection(func(c *HTTPFlvTsConnection) { + c.ctx = ctx }).ServeHTTP(w, r) return } @@ -155,20 +155,26 @@ func (v *httpServer) Run(ctx context.Context) error { return nil } -type HTTPStreaming struct { +// HTTPFlvTsConnection is an HTTP pseudo streaming connection, such as an HTTP-FLV or HTTP-TS +// connection. There is no state need to be sync between proxy servers. +// +// When we got an HTTP FLV or TS request, we will parse the stream URL from the HTTP request, +// then proxy to the corresponding backend server. All state is in the HTTP request, so this +// connection is stateless. +type HTTPFlvTsConnection struct { // The context for HTTP streaming. ctx context.Context } -func NewHTTPStreaming(opts ...func(streaming *HTTPStreaming)) *HTTPStreaming { - v := &HTTPStreaming{} +func NewHTTPFlvTsConnection(opts ...func(*HTTPFlvTsConnection)) *HTTPFlvTsConnection { + v := &HTTPFlvTsConnection{} for _, opt := range opts { opt(v) } return v } -func (v *HTTPStreaming) ServeHTTP(w http.ResponseWriter, r *http.Request) { +func (v *HTTPFlvTsConnection) ServeHTTP(w http.ResponseWriter, r *http.Request) { defer r.Body.Close() ctx := logger.WithContext(v.ctx) @@ -179,7 +185,7 @@ func (v *HTTPStreaming) ServeHTTP(w http.ResponseWriter, r *http.Request) { } } -func (v *HTTPStreaming) serve(ctx context.Context, w http.ResponseWriter, r *http.Request) error { +func (v *HTTPFlvTsConnection) serve(ctx context.Context, w http.ResponseWriter, r *http.Request) error { // Always allow CORS for all requests. if ok := apiCORS(ctx, w, r); ok { return nil @@ -207,7 +213,7 @@ func (v *HTTPStreaming) serve(ctx context.Context, w http.ResponseWriter, r *htt return nil } -func (v *HTTPStreaming) serveByBackend(ctx context.Context, w http.ResponseWriter, r *http.Request, backend *SRSServer) error { +func (v *HTTPFlvTsConnection) serveByBackend(ctx context.Context, w http.ResponseWriter, r *http.Request, backend *SRSServer) error { // Parse HTTP port from backend. if len(backend.HTTP) == 0 { return errors.Errorf("no http stream server") @@ -255,7 +261,14 @@ func (v *HTTPStreaming) serveByBackend(ctx context.Context, w http.ResponseWrite return nil } -type HLSStreaming struct { +// HLSPlayStream is an HLS stream proxy, which represents the stream level object. This means multiple HLS +// clients will share this object, and they use the same ctx among proxy servers. +// +// Unlike the HTTP FLV or TS connection, HLS client may request the m3u8 or ts via different HTTP connections. +// Especially for requesting ts, we need to identify the stream URl or backend server for it. So we create +// the spbhid which can be seen as the hash of stream URL or backend server. The spbhid enable us to convert +// to the stream URL and then query the backend server to serve it. +type HLSPlayStream struct { // The context for HLS streaming. ctx context.Context // The context ID for recovering the context. @@ -269,22 +282,28 @@ type HLSStreaming struct { FullURL string `json:"full_url"` } -func NewHLSStreaming(opts ...func(streaming *HLSStreaming)) *HLSStreaming { - v := &HLSStreaming{} +func NewHLSPlayStream(opts ...func(*HLSPlayStream)) *HLSPlayStream { + v := &HLSPlayStream{} for _, opt := range opts { opt(v) } return v } -func (v *HLSStreaming) BuildContext(ctx context.Context) { +func (v *HLSPlayStream) Initialize(ctx context.Context) *HLSPlayStream { + if v.ctx != nil && v.ContextID != "" { + return v + } + if v.ContextID == "" { v.ContextID = logger.GenerateContextID() } v.ctx = logger.WithContextID(ctx, v.ContextID) + + return v } -func (v *HLSStreaming) ServeHTTP(w http.ResponseWriter, r *http.Request) { +func (v *HLSPlayStream) ServeHTTP(w http.ResponseWriter, r *http.Request) { defer r.Body.Close() if err := v.serve(v.ctx, w, r); err != nil { @@ -295,7 +314,7 @@ func (v *HLSStreaming) ServeHTTP(w http.ResponseWriter, r *http.Request) { } } -func (v *HLSStreaming) serve(ctx context.Context, w http.ResponseWriter, r *http.Request) error { +func (v *HLSPlayStream) serve(ctx context.Context, w http.ResponseWriter, r *http.Request) error { ctx, streamURL, fullURL := v.ctx, v.StreamURL, v.FullURL // Always allow CORS for all requests. @@ -316,7 +335,7 @@ func (v *HLSStreaming) serve(ctx context.Context, w http.ResponseWriter, r *http return nil } -func (v *HLSStreaming) serveByBackend(ctx context.Context, w http.ResponseWriter, r *http.Request, backend *SRSServer) error { +func (v *HLSPlayStream) serveByBackend(ctx context.Context, w http.ResponseWriter, r *http.Request, backend *SRSServer) error { // Parse HTTP port from backend. if len(backend.HTTP) == 0 { return errors.Errorf("no rtmp server") diff --git a/proxy/rtc.go b/proxy/rtc.go index 3799b7dbfd2..b8cb82df5dc 100644 --- a/proxy/rtc.go +++ b/proxy/rtc.go @@ -52,7 +52,7 @@ func (v *rtcServer) Close() error { return nil } -func (v *rtcServer) HandleWHIP(ctx context.Context, w http.ResponseWriter, r *http.Request) error { +func (v *rtcServer) HandleApiForWHIP(ctx context.Context, w http.ResponseWriter, r *http.Request) error { defer r.Body.Close() ctx = logger.WithContext(ctx) @@ -82,14 +82,14 @@ func (v *rtcServer) HandleWHIP(ctx context.Context, w http.ResponseWriter, r *ht return errors.Wrapf(err, "pick backend for %v", streamURL) } - if err = v.serveByBackend(ctx, w, r, backend, string(remoteSDPOffer), streamURL); err != nil { + if err = v.proxyApiToBackend(ctx, w, r, backend, string(remoteSDPOffer), streamURL); err != nil { return errors.Wrapf(err, "serve %v with %v by backend %+v", fullURL, streamURL, backend) } return nil } -func (v *rtcServer) HandleWHEP(ctx context.Context, w http.ResponseWriter, r *http.Request) error { +func (v *rtcServer) HandleApiForWHEP(ctx context.Context, w http.ResponseWriter, r *http.Request) error { defer r.Body.Close() ctx = logger.WithContext(ctx) @@ -119,14 +119,17 @@ func (v *rtcServer) HandleWHEP(ctx context.Context, w http.ResponseWriter, r *ht return errors.Wrapf(err, "pick backend for %v", streamURL) } - if err = v.serveByBackend(ctx, w, r, backend, string(remoteSDPOffer), streamURL); err != nil { + if err = v.proxyApiToBackend(ctx, w, r, backend, string(remoteSDPOffer), streamURL); err != nil { return errors.Wrapf(err, "serve %v with %v by backend %+v", fullURL, streamURL, backend) } return nil } -func (v *rtcServer) serveByBackend(ctx context.Context, w http.ResponseWriter, r *http.Request, backend *SRSServer, remoteSDPOffer string, streamURL string) error { +func (v *rtcServer) proxyApiToBackend( + ctx context.Context, w http.ResponseWriter, r *http.Request, backend *SRSServer, + remoteSDPOffer string, streamURL string, +) error { // Parse HTTP port from backend. if len(backend.API) == 0 { return errors.Errorf("no http api server") @@ -198,9 +201,12 @@ func (v *rtcServer) serveByBackend(ctx context.Context, w http.ResponseWriter, r RemoteICEUfrag: remoteICEUfrag, RemoteICEPwd: remoteICEPwd, LocalICEUfrag: localICEUfrag, LocalICEPwd: localICEPwd, } - if _, err := srsLoadBalancer.LoadOrStoreWebRTC(ctx, streamURL, icePair.Ufrag(), NewRTCStreaming(func(s *RTCConnection) { - s.StreamURL, s.listenerUDP = streamURL, v.listener - s.BuildContext(ctx) + if err := srsLoadBalancer.StoreWebRTC(ctx, streamURL, NewRTCConnection(func(c *RTCConnection) { + c.StreamURL, c.Ufrag = streamURL, icePair.Ufrag() + c.Initialize(ctx, v.listener) + + // Cache the connection for fast search by username. + v.usernames.Store(c.Ufrag, c) })); err != nil { return errors.Wrapf(err, "load or store webrtc %v", streamURL) } @@ -210,7 +216,7 @@ func (v *rtcServer) serveByBackend(ctx context.Context, w http.ResponseWriter, r return errors.Wrapf(err, "write local sdp answer %v", localSDPAnswer) } - logger.Df(ctx, "Response local answer %vB with ice-ufrag=%v, ice-pwd=%vB", + logger.Df(ctx, "Create WebRTC connection with local answer %vB with ice-ufrag=%v, ice-pwd=%vB", len(localSDPAnswer), localICEUfrag, len(localICEPwd)) return nil } @@ -244,12 +250,12 @@ func (v *rtcServer) Run(ctx context.Context) error { n, addr, err := listener.ReadFromUDP(buf) if err != nil { // TODO: If WebRTC server closed unexpectedly, we should notice the main loop to quit. - logger.Wf(ctx, "read from udp failed, err=%v", err) + logger.Wf(ctx, "read from udp failed, err=%+v", err) continue } if err := v.handleClientUDP(ctx, addr, buf[:n]); err != nil { - logger.Wf(ctx, "handle udp %vB failed, addr=%v, err=%v", n, addr, err) + logger.Wf(ctx, "handle udp %vB failed, addr=%v, err=%+v", n, addr, err) } } }() @@ -258,7 +264,7 @@ func (v *rtcServer) Run(ctx context.Context) error { } func (v *rtcServer) handleClientUDP(ctx context.Context, addr *net.UDPAddr, data []byte) error { - var stream *RTCConnection + var connection *RTCConnection // If STUN binding request, parse the ufrag and identify the connection. if err := func() error { @@ -271,58 +277,69 @@ func (v *rtcServer) handleClientUDP(ctx context.Context, addr *net.UDPAddr, data return errors.Wrapf(err, "unmarshal stun packet") } - // Search the stream in fast cache. + // Search the connection in fast cache. if s, ok := v.usernames.Load(pkt.Username); ok { - stream = s + connection = s return nil } - // Load stream by username. + // Load connection by username. if s, err := srsLoadBalancer.LoadWebRTCByUfrag(ctx, pkt.Username); err != nil { return errors.Wrapf(err, "load webrtc by ufrag %v", pkt.Username) } else { - stream = s + connection = s.Initialize(ctx, v.listener) + logger.Df(ctx, "Create WebRTC connection by ufrag=%v, stream=%v", pkt.Username, connection.StreamURL) } - // Cache stream for fast search. - if stream != nil { - v.usernames.Store(pkt.Username, stream) + // Cache connection for fast search. + if connection != nil { + v.usernames.Store(pkt.Username, connection) } return nil }(); err != nil { return err } - // Search the stream by addr. + // Search the connection by addr. if s, ok := v.addresses.Load(addr.String()); ok { - stream = s - } else if stream != nil { + connection = s + } else if connection != nil { // Cache the address for fast search. - v.addresses.Store(addr.String(), stream) + v.addresses.Store(addr.String(), connection) } - // If stream is not found, ignore the packet. - if stream == nil { + // If connection is not found, ignore the packet. + if connection == nil { // TODO: Should logging the dropped packet, only logging the first one for each address. return nil } // Proxy the packet to backend. - if err := stream.Proxy(addr, data); err != nil { - return errors.Wrapf(err, "proxy %vB for %v", len(data), stream.StreamURL) + if err := connection.HandlePacket(addr, data); err != nil { + return errors.Wrapf(err, "proxy %vB for %v", len(data), connection.StreamURL) } return nil } +// RTCConnection is a WebRTC connection proxy, for both WHIP and WHEP. It represents a WebRTC +// connection, identify by the ufrag in sdp offer/answer and ICE binding request. +// +// It's not like RTMP or HTTP FLV/TS proxy connection, which are stateless and all state is +// in the client request. The RTCConnection is stateful, and need to sync the ufrag between +// proxy servers. +// +// The media transport is UDP, which is also a special thing for WebRTC. So if the client switch +// to another UDP address, it may connect to another WebRTC proxy, then we should discover the +// RTCConnection by the ufrag from the ICE binding request. type RTCConnection struct { // The stream context for WebRTC streaming. ctx context.Context - // The context ID for recovering the context. - ContextID string `json:"cid"` // The stream URL in vhost/app/stream schema. StreamURL string `json:"stream_url"` + // The ufrag for this WebRTC connection. + Ufrag string `json:"ufrag"` // The UDP connection proxy to backend. backendUDP *net.UDPConn @@ -332,7 +349,7 @@ type RTCConnection struct { listenerUDP *net.UDPConn } -func NewRTCStreaming(opts ...func(*RTCConnection)) *RTCConnection { +func NewRTCConnection(opts ...func(*RTCConnection)) *RTCConnection { v := &RTCConnection{} for _, opt := range opts { opt(v) @@ -340,7 +357,15 @@ func NewRTCStreaming(opts ...func(*RTCConnection)) *RTCConnection { return v } -func (v *RTCConnection) Proxy(addr *net.UDPAddr, data []byte) error { +func (v *RTCConnection) Initialize(ctx context.Context, listener *net.UDPConn) *RTCConnection { + v.ctx = logger.WithContext(ctx) + if listener != nil { + v.listenerUDP = listener + } + return v +} + +func (v *RTCConnection) HandlePacket(addr *net.UDPAddr, data []byte) error { ctx := v.ctx // Update the current UDP address. @@ -352,10 +377,31 @@ func (v *RTCConnection) Proxy(addr *net.UDPAddr, data []byte) error { } // Proxy client message to backend. - if v.backendUDP != nil { - if _, err := v.backendUDP.Write(data); err != nil { - return errors.Wrapf(err, "write to backend %v", v.StreamURL) + if v.backendUDP == nil { + return nil + } + + // Proxy all messages from backend to client. + go func() { + for ctx.Err() == nil { + buf := make([]byte, 4096) + n, _, err := v.backendUDP.ReadFromUDP(buf) + if err != nil { + // TODO: If backend server closed unexpectedly, we should notice the stream to quit. + logger.Wf(ctx, "read from backend failed, err=%v", err) + break + } + + if _, err = v.listenerUDP.WriteToUDP(buf[:n], v.clientUDP); err != nil { + // TODO: If backend server closed unexpectedly, we should notice the stream to quit. + logger.Wf(ctx, "write to client failed, err=%v", err) + break + } } + }() + + if _, err := v.backendUDP.Write(data); err != nil { + return errors.Wrapf(err, "write to backend %v", v.StreamURL) } return nil @@ -385,6 +431,7 @@ func (v *RTCConnection) connectBackend(ctx context.Context) error { } // Connect to backend SRS server via UDP client. + // TODO: Support close the connection when timeout or DTLS alert. backendAddr := net.UDPAddr{IP: net.ParseIP(backend.IP), Port: udpPort} if backendUDP, err := net.DialUDP("udp", nil, &backendAddr); err != nil { return errors.Wrapf(err, "dial udp to %v", backendAddr) @@ -392,35 +439,9 @@ func (v *RTCConnection) connectBackend(ctx context.Context) error { v.backendUDP = backendUDP } - // Proxy all messages from backend to client. - go func() { - for ctx.Err() == nil { - buf := make([]byte, 4096) - n, _, err := v.backendUDP.ReadFromUDP(buf) - if err != nil { - // TODO: If backend server closed unexpectedly, we should notice the stream to quit. - logger.Wf(ctx, "read from backend failed, err=%v", err) - break - } - - if _, err = v.listenerUDP.WriteToUDP(buf[:n], v.clientUDP); err != nil { - // TODO: If backend server closed unexpectedly, we should notice the stream to quit. - logger.Wf(ctx, "write to client failed, err=%v", err) - break - } - } - }() - return nil } -func (v *RTCConnection) BuildContext(ctx context.Context) { - if v.ContextID == "" { - v.ContextID = logger.GenerateContextID() - } - v.ctx = logger.WithContextID(ctx, v.ContextID) -} - type RTCICEPair struct { // The remote ufrag, used for ICE username and session id. RemoteICEUfrag string `json:"remote_ufrag"` diff --git a/proxy/rtmp.go b/proxy/rtmp.go index 764ab7be59b..bf1c4ebea5b 100644 --- a/proxy/rtmp.go +++ b/proxy/rtmp.go @@ -108,6 +108,12 @@ func (v *rtmpServer) Run(ctx context.Context) error { return nil } +// RTMPConnection is an RTMP streaming connection. There is no state need to be sync between +// proxy servers. +// +// When we got an RTMP request, we will parse the stream URL from the RTMP publish or play request, +// then proxy to the corresponding backend server. All state is in the RTMP request, so this +// connection is stateless. type RTMPConnection struct { // The random number generator. rd *rand.Rand diff --git a/proxy/srs.go b/proxy/srs.go index 15e9418deca..3bc69f1b2b0 100644 --- a/proxy/srs.go +++ b/proxy/srs.go @@ -148,11 +148,11 @@ type SRSLoadBalancer interface { // Pick a backend server for the specified stream URL. Pick(ctx context.Context, streamURL string) (*SRSServer, error) // Load or store the HLS streaming for the specified stream URL. - LoadOrStoreHLS(ctx context.Context, streamURL string, value *HLSStreaming) (*HLSStreaming, error) + LoadOrStoreHLS(ctx context.Context, streamURL string, value *HLSPlayStream) (*HLSPlayStream, error) // Load the HLS streaming by SPBHID, the SRS Proxy Backend HLS ID. - LoadHLSBySPBHID(ctx context.Context, spbhid string) (*HLSStreaming, error) - // Load or store the WebRTC streaming for the specified stream URL. - LoadOrStoreWebRTC(ctx context.Context, streamURL, ufrag string, value *RTCConnection) (*RTCConnection, error) + LoadHLSBySPBHID(ctx context.Context, spbhid string) (*HLSPlayStream, error) + // Store the WebRTC streaming for the specified stream URL. + StoreWebRTC(ctx context.Context, streamURL string, value *RTCConnection) error // Load the WebRTC streaming by ufrag, the ICE username. LoadWebRTCByUfrag(ctx context.Context, ufrag string) (*RTCConnection, error) } @@ -167,9 +167,9 @@ type srsMemoryLoadBalancer struct { // The picked server to servce client by specified stream URL, key is stream url. picked sync.Map[string, *SRSServer] // The HLS streaming, key is stream URL. - hlsStreamURL sync.Map[string, *HLSStreaming] + hlsStreamURL sync.Map[string, *HLSPlayStream] // The HLS streaming, key is SPBHID. - hlsSPBHID sync.Map[string, *HLSStreaming] + hlsSPBHID sync.Map[string, *HLSPlayStream] // The WebRTC streaming, key is stream URL. rtcStreamURL sync.Map[string, *RTCConnection] // The WebRTC streaming, key is ufrag. @@ -245,7 +245,7 @@ func (v *srsMemoryLoadBalancer) Pick(ctx context.Context, streamURL string) (*SR return server, nil } -func (v *srsMemoryLoadBalancer) LoadHLSBySPBHID(ctx context.Context, spbhid string) (*HLSStreaming, error) { +func (v *srsMemoryLoadBalancer) LoadHLSBySPBHID(ctx context.Context, spbhid string) (*HLSPlayStream, error) { // Load the HLS streaming for the SPBHID, for TS files. if actual, ok := v.hlsSPBHID.Load(spbhid); !ok { return nil, errors.Errorf("no HLS streaming for SPBHID %v", spbhid) @@ -254,7 +254,7 @@ func (v *srsMemoryLoadBalancer) LoadHLSBySPBHID(ctx context.Context, spbhid stri } } -func (v *srsMemoryLoadBalancer) LoadOrStoreHLS(ctx context.Context, streamURL string, value *HLSStreaming) (*HLSStreaming, error) { +func (v *srsMemoryLoadBalancer) LoadOrStoreHLS(ctx context.Context, streamURL string, value *HLSPlayStream) (*HLSPlayStream, error) { // Update the HLS streaming for the stream URL, for M3u8. actual, _ := v.hlsStreamURL.LoadOrStore(streamURL, value) if actual == nil { @@ -263,19 +263,17 @@ func (v *srsMemoryLoadBalancer) LoadOrStoreHLS(ctx context.Context, streamURL st // Update the HLS streaming for the SPBHID, for TS files. v.hlsSPBHID.Store(value.SRSProxyBackendHLSID, actual) + return actual, nil } -func (v *srsMemoryLoadBalancer) LoadOrStoreWebRTC(ctx context.Context, streamURL, ufrag string, value *RTCConnection) (*RTCConnection, error) { +func (v *srsMemoryLoadBalancer) StoreWebRTC(ctx context.Context, streamURL string, value *RTCConnection) error { // Update the WebRTC streaming for the stream URL. - actual, _ := v.rtcStreamURL.LoadOrStore(streamURL, value) - if actual == nil { - return nil, errors.Errorf("load or store WebRTC streaming for %v failed", streamURL) - } + v.rtcStreamURL.Store(streamURL, value) // Update the WebRTC streaming for the ufrag. - v.rtcUfrag.Store(ufrag, value) - return nil, nil + v.rtcUfrag.Store(value.Ufrag, value) + return nil } func (v *srsMemoryLoadBalancer) LoadWebRTCByUfrag(ctx context.Context, ufrag string) (*RTCConnection, error) { @@ -446,24 +444,23 @@ func (v *srsRedisLoadBalancer) Pick(ctx context.Context, streamURL string) (*SRS return &server, nil } -func (v *srsRedisLoadBalancer) LoadHLSBySPBHID(ctx context.Context, spbhid string) (*HLSStreaming, error) { +func (v *srsRedisLoadBalancer) LoadHLSBySPBHID(ctx context.Context, spbhid string) (*HLSPlayStream, error) { key := v.redisKeySPBHID(spbhid) - actual, err := v.rdb.Get(ctx, key).Bytes() + b, err := v.rdb.Get(ctx, key).Bytes() if err != nil { return nil, errors.Wrapf(err, "get key=%v HLS", key) } - var actualHLS HLSStreaming - if err := json.Unmarshal(actual, &actualHLS); err != nil { - return nil, errors.Wrapf(err, "unmarshal key=%v HLS %v", key, string(actual)) + var actual HLSPlayStream + if err := json.Unmarshal(b, &actual); err != nil { + return nil, errors.Wrapf(err, "unmarshal key=%v HLS %v", key, string(b)) } - actualHLS.BuildContext(ctx) - return &actualHLS, nil + return &actual, nil } -func (v *srsRedisLoadBalancer) LoadOrStoreHLS(ctx context.Context, streamURL string, value *HLSStreaming) (*HLSStreaming, error) { +func (v *srsRedisLoadBalancer) LoadOrStoreHLS(ctx context.Context, streamURL string, value *HLSPlayStream) (*HLSPlayStream, error) { b, err := json.Marshal(value) if err != nil { return nil, errors.Wrapf(err, "marshal HLS %v", value) @@ -479,22 +476,21 @@ func (v *srsRedisLoadBalancer) LoadOrStoreHLS(ctx context.Context, streamURL str } // Query the HLS streaming from redis. - actual, err := v.rdb.Get(ctx, key).Bytes() + b2, err := v.rdb.Get(ctx, key).Bytes() if err != nil { return nil, errors.Wrapf(err, "get key=%v HLS", key) } - var actualHLS HLSStreaming - if err := json.Unmarshal(actual, &actualHLS); err != nil { - return nil, errors.Wrapf(err, "unmarshal key=%v HLS %v", key, string(actual)) + var actual HLSPlayStream + if err := json.Unmarshal(b2, &actual); err != nil { + return nil, errors.Wrapf(err, "unmarshal key=%v HLS %v", key, string(b2)) } - actualHLS.BuildContext(ctx) - return &actualHLS, nil + return &actual, nil } -func (v *srsRedisLoadBalancer) LoadOrStoreWebRTC(ctx context.Context, streamURL, ufrag string, value *RTCConnection) (*RTCConnection, error) { - return nil, nil +func (v *srsRedisLoadBalancer) StoreWebRTC(ctx context.Context, streamURL string, value *RTCConnection) error { + return nil } func (v *srsRedisLoadBalancer) LoadWebRTCByUfrag(ctx context.Context, ufrag string) (*RTCConnection, error) {