Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Proxy: Support proxy server for SRS. v7.0.16 #4158

Merged
merged 46 commits into from
Sep 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
fdcd86c
Proxy: Support proxy server for SRS.
winlinvip Aug 26, 2024
19d6d54
Add logger with context ID.
winlinvip Aug 26, 2024
7aba30a
Add version and signature.
winlinvip Aug 26, 2024
df04a8e
Add errors for proxy server.
winlinvip Aug 26, 2024
2eae020
Add HTTP server versions.
winlinvip Aug 26, 2024
650befd
Refine code to files.
winlinvip Aug 26, 2024
a3aaa0b
Add RTMP and AMF0 protocol stack.
winlinvip Aug 28, 2024
9b2e734
Support RTMP proxy server.
winlinvip Aug 28, 2024
9e43187
Support FFmpeg RTMP publisher.
winlinvip Aug 28, 2024
e00fcae
Support HTTP API server proxy.
winlinvip Aug 28, 2024
73a16e3
Add system api server for proxy.
winlinvip Aug 28, 2024
b62b1e3
Add SRS load balancer.
winlinvip Aug 28, 2024
fc4b657
Support proxy RTMP to backend.
winlinvip Aug 28, 2024
fa39d74
Refine default ports.
winlinvip Aug 28, 2024
b8ded5a
Refine the amf0 any API and RTMP.
winlinvip Aug 28, 2024
3269228
Support redis load balancer.
winlinvip Aug 29, 2024
82707eb
Use memory LB for simple use scenarios.
winlinvip Aug 29, 2024
688bc56
Support redis LB for large scale use scenarios.
winlinvip Aug 29, 2024
e3cfc3b
Support RTMP play or view client.
winlinvip Aug 30, 2024
960d1e6
Always set stream id.
winlinvip Aug 30, 2024
ef70204
Support multiple errors.
winlinvip Aug 30, 2024
c13e757
Refine proxy error handler.
winlinvip Aug 30, 2024
1f68947
Support proxy HTTP-FLV to backend.
winlinvip Aug 31, 2024
2242bf0
Refine error for HTTP stream.
winlinvip Aug 31, 2024
e311928
Refine errors for HTTP streaming.
winlinvip Aug 31, 2024
cc4fdb6
Fix bugs.
winlinvip Sep 1, 2024
3c5f6a4
Support HLS streaming with memory LB.
winlinvip Sep 2, 2024
f05d3fe
Refine HLS streaming.
winlinvip Sep 3, 2024
0dc47d9
Support hls proxy via redis LB.
winlinvip Sep 3, 2024
3d5db62
Refine HLS error.
winlinvip Sep 3, 2024
d053b81
Refine HTTP streaming error.
winlinvip Sep 3, 2024
68595a5
Refine RTMP streaming error.
winlinvip Sep 3, 2024
17f836a
Support proxy to webrtc api.
winlinvip Sep 3, 2024
5b6c9df
Support proxy to webrtc media.
winlinvip Sep 4, 2024
bf4b973
Refine the logs.
winlinvip Sep 4, 2024
ebdb078
Support redis LB for WebRTC.
winlinvip Sep 5, 2024
1c10756
Support proxy SRT media server.
winlinvip Sep 5, 2024
e982852
WaitGroup: Do not wait automatically.
winlinvip Sep 5, 2024
8b64ebe
Support static files.
winlinvip Sep 5, 2024
79f090c
Refine comments.
winlinvip Sep 5, 2024
d5032f6
Support parse listen endpoint.
winlinvip Sep 5, 2024
62c280c
Support SRT listen ep in UDP.
winlinvip Sep 5, 2024
79161b9
Refine names.
winlinvip Sep 9, 2024
2e7f2c2
Refine comments.
winlinvip Sep 9, 2024
7b20e58
Refine makefile.
winlinvip Sep 9, 2024
7f5c1c9
Update release to v7.0.16
winlinvip Sep 9, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions proxy/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
.idea
srs-proxy
.env
.go-formarted
23 changes: 23 additions & 0 deletions proxy/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
.PHONY: all build test fmt clean run

all: build

build: fmt ./srs-proxy

./srs-proxy: *.go
go build -o srs-proxy .

test:
go test ./...

fmt: ./.go-formarted

./.go-formarted: *.go
touch .go-formarted
go fmt ./...

clean:
rm -f srs-proxy .go-formarted

run: fmt
go run .
272 changes: 272 additions & 0 deletions proxy/api.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,272 @@
// Copyright (c) 2024 Winlin
//
// SPDX-License-Identifier: MIT
package main

import (
"context"
"fmt"
"net/http"
"os"
"strings"
"sync"
"time"

"srs-proxy/errors"
"srs-proxy/logger"
)

// srsHTTPAPIServer is the proxy for SRS HTTP API, to proxy the WebRTC HTTP API like WHIP and WHEP,
// to proxy other HTTP API of SRS like the streams and clients, etc.
type srsHTTPAPIServer struct {
// The underlayer HTTP server.
server *http.Server
// The WebRTC server.
rtc *srsWebRTCServer
// The gracefully quit timeout, wait server to quit.
gracefulQuitTimeout time.Duration
// The wait group for all goroutines.
wg sync.WaitGroup
}

func NewSRSHTTPAPIServer(opts ...func(*srsHTTPAPIServer)) *srsHTTPAPIServer {
v := &srsHTTPAPIServer{}
for _, opt := range opts {
opt(v)
}
return v
}

func (v *srsHTTPAPIServer) Close() error {
ctx, cancel := context.WithTimeout(context.Background(), v.gracefulQuitTimeout)
defer cancel()
v.server.Shutdown(ctx)

v.wg.Wait()
return nil
}

func (v *srsHTTPAPIServer) Run(ctx context.Context) error {
// Parse address to listen.
addr := envHttpAPI()
if !strings.Contains(addr, ":") {
addr = ":" + addr
}

// Create server and handler.
mux := http.NewServeMux()
v.server = &http.Server{Addr: addr, Handler: mux}
logger.Df(ctx, "HTTP API server listen at %v", addr)

// Shutdown the server gracefully when quiting.
go func() {
ctxParent := ctx
<-ctxParent.Done()

ctx, cancel := context.WithTimeout(context.Background(), v.gracefulQuitTimeout)
defer cancel()

v.server.Shutdown(ctx)
}()

// The basic version handler, also can be used as health check API.
logger.Df(ctx, "Handle /api/v1/versions by %v", addr)
mux.HandleFunc("/api/v1/versions", func(w http.ResponseWriter, r *http.Request) {
apiResponse(ctx, w, r, map[string]string{
"signature": Signature(),
"version": Version(),
})
})

// The WebRTC WHIP API handler.
logger.Df(ctx, "Handle /rtc/v1/whip/ by %v", addr)
mux.HandleFunc("/rtc/v1/whip/", func(w http.ResponseWriter, r *http.Request) {
if err := v.rtc.HandleApiForWHIP(ctx, w, r); err != nil {
apiError(ctx, w, r, err)
}
})

// The WebRTC WHEP API handler.
logger.Df(ctx, "Handle /rtc/v1/whep/ by %v", addr)
mux.HandleFunc("/rtc/v1/whep/", func(w http.ResponseWriter, r *http.Request) {
if err := v.rtc.HandleApiForWHEP(ctx, w, r); err != nil {
apiError(ctx, w, r, err)
}
})

// Run HTTP API server.
v.wg.Add(1)
go func() {
defer v.wg.Done()

err := v.server.ListenAndServe()
if err != nil {
if ctx.Err() != context.Canceled {
// TODO: If HTTP API server closed unexpectedly, we should notice the main loop to quit.
logger.Wf(ctx, "HTTP API accept err %+v", err)
} else {
logger.Df(ctx, "HTTP API server done")
}
}
}()

return nil
}

// systemAPI is the system HTTP API of the proxy server, for SRS media server to register the service
// to proxy server. It also provides some other system APIs like the status of proxy server, like exporter
// for Prometheus metrics.
type systemAPI struct {
// The underlayer HTTP server.
server *http.Server
// The gracefully quit timeout, wait server to quit.
gracefulQuitTimeout time.Duration
// The wait group for all goroutines.
wg sync.WaitGroup
}

func NewSystemAPI(opts ...func(*systemAPI)) *systemAPI {
v := &systemAPI{}
for _, opt := range opts {
opt(v)
}
return v
}

func (v *systemAPI) Close() error {
ctx, cancel := context.WithTimeout(context.Background(), v.gracefulQuitTimeout)
defer cancel()
v.server.Shutdown(ctx)

v.wg.Wait()
return nil
}

func (v *systemAPI) Run(ctx context.Context) error {
// Parse address to listen.
addr := envSystemAPI()
if !strings.Contains(addr, ":") {
addr = ":" + addr
}

// Create server and handler.
mux := http.NewServeMux()
v.server = &http.Server{Addr: addr, Handler: mux}
logger.Df(ctx, "System API server listen at %v", addr)

// Shutdown the server gracefully when quiting.
go func() {
ctxParent := ctx
<-ctxParent.Done()

ctx, cancel := context.WithTimeout(context.Background(), v.gracefulQuitTimeout)
defer cancel()

v.server.Shutdown(ctx)
}()

// The basic version handler, also can be used as health check API.
logger.Df(ctx, "Handle /api/v1/versions by %v", addr)
mux.HandleFunc("/api/v1/versions", func(w http.ResponseWriter, r *http.Request) {
apiResponse(ctx, w, r, map[string]string{
"signature": Signature(),
"version": Version(),
})
})

// The register service for SRS media servers.
logger.Df(ctx, "Handle /api/v1/srs/register by %v", addr)
mux.HandleFunc("/api/v1/srs/register", func(w http.ResponseWriter, r *http.Request) {
if err := func() error {
var deviceID, ip, serverID, serviceID, pid string
var rtmp, stream, api, srt, rtc []string
if err := ParseBody(r.Body, &struct {
// The IP of SRS, mandatory.
IP *string `json:"ip"`
// The server id of SRS, store in file, may not change, mandatory.
ServerID *string `json:"server"`
// The service id of SRS, always change when restarted, mandatory.
ServiceID *string `json:"service"`
// The process id of SRS, always change when restarted, mandatory.
PID *string `json:"pid"`
// The RTMP listen endpoints, mandatory.
RTMP *[]string `json:"rtmp"`
// The HTTP Stream listen endpoints, optional.
HTTP *[]string `json:"http"`
// The API listen endpoints, optional.
API *[]string `json:"api"`
// The SRT listen endpoints, optional.
SRT *[]string `json:"srt"`
// The RTC listen endpoints, optional.
RTC *[]string `json:"rtc"`
// The device id of SRS, optional.
DeviceID *string `json:"device_id"`
}{
IP: &ip, DeviceID: &deviceID,
ServerID: &serverID, ServiceID: &serviceID, PID: &pid,
RTMP: &rtmp, HTTP: &stream, API: &api, SRT: &srt, RTC: &rtc,
}); err != nil {
return errors.Wrapf(err, "parse body")
}

if ip == "" {
return errors.Errorf("empty ip")
}
if serverID == "" {
return errors.Errorf("empty server")
}
if serviceID == "" {
return errors.Errorf("empty service")
}
if pid == "" {
return errors.Errorf("empty pid")
}
if len(rtmp) == 0 {
return errors.Errorf("empty rtmp")
}

server := NewSRSServer(func(srs *SRSServer) {
srs.IP, srs.DeviceID = ip, deviceID
srs.ServerID, srs.ServiceID, srs.PID = serverID, serviceID, pid
srs.RTMP, srs.HTTP, srs.API = rtmp, stream, api
srs.SRT, srs.RTC = srt, rtc
srs.UpdatedAt = time.Now()
})
if err := srsLoadBalancer.Update(ctx, server); err != nil {
return errors.Wrapf(err, "update SRS server %+v", server)
}

logger.Df(ctx, "Register SRS media server, %+v", server)
return nil
}(); err != nil {
apiError(ctx, w, r, err)
}

type Response struct {
Code int `json:"code"`
PID string `json:"pid"`
}

apiResponse(ctx, w, r, &Response{
Code: 0, PID: fmt.Sprintf("%v", os.Getpid()),
})
})

// Run System API server.
v.wg.Add(1)
go func() {
defer v.wg.Done()

err := v.server.ListenAndServe()
if err != nil {
if ctx.Err() != context.Canceled {
// TODO: If System API server closed unexpectedly, we should notice the main loop to quit.
logger.Wf(ctx, "System API accept err %+v", err)
} else {
logger.Df(ctx, "System API server done")
}
}
}()

return nil
}
20 changes: 20 additions & 0 deletions proxy/debug.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// Copyright (c) 2024 Winlin
//
// SPDX-License-Identifier: MIT
package main

import (
"context"
"net/http"

"srs-proxy/logger"
)

func handleGoPprof(ctx context.Context) {
if addr := envGoPprof(); addr != "" {
go func() {
logger.Df(ctx, "Start Go pprof at %v", addr)
http.ListenAndServe(addr, nil)
}()
}
}
Loading
Loading