Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

api: add path to RTMP connections, RTSP sessions, WebRTC sessions (#1962) #2022

Merged
merged 3 commits into from
Jul 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions apidocs/openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,8 @@ components:
state:
type: string
enum: [idle, read, publish]
path:
type: string
bytesReceived:
type: integer
format: int64
Expand All @@ -408,6 +410,8 @@ components:
state:
type: string
enum: [idle, read, publish]
path:
type: string
bytesReceived:
type: integer
format: int64
Expand Down Expand Up @@ -496,6 +500,8 @@ components:
state:
type: string
enum: [read, publish]
path:
type: string
bytesReceived:
type: integer
format: int64
Expand Down
3 changes: 3 additions & 0 deletions internal/core/api_defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ type apiRTMPConn struct {
Created time.Time `json:"created"`
RemoteAddr string `json:"remoteAddr"`
State string `json:"state"`
Path string `json:"path"`
BytesReceived uint64 `json:"bytesReceived"`
BytesSent uint64 `json:"bytesSent"`
}
Expand All @@ -72,6 +73,7 @@ type apiRTSPSession struct {
Created time.Time `json:"created"`
RemoteAddr string `json:"remoteAddr"`
State string `json:"state"`
Path string `json:"path"`
BytesReceived uint64 `json:"bytesReceived"`
BytesSent uint64 `json:"bytesSent"`
}
Expand All @@ -90,6 +92,7 @@ type apiWebRTCSession struct {
LocalCandidate string `json:"localCandidate"`
RemoteCandidate string `json:"remoteCandidate"`
State string `json:"state"`
Path string `json:"path"`
BytesReceived uint64 `json:"bytesReceived"`
BytesSent uint64 `json:"bytesSent"`
}
Expand Down
44 changes: 26 additions & 18 deletions internal/core/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -685,25 +685,33 @@ func TestAPIProtocolList(t *testing.T) {
pa = "rtmpsconns"
}

type item struct {
State string `json:"state"`
Path string `json:"path"`
}

var out struct {
ItemCount int `json:"itemCount"`
Items []struct {
State string `json:"state"`
} `json:"items"`
ItemCount int `json:"itemCount"`
Items []item `json:"items"`
}
httpRequest(t, hc, http.MethodGet, "http://localhost:9997/v2/"+pa+"/list", nil, &out)

if ca != "rtsp conns" && ca != "rtsps conns" {
require.Equal(t, "publish", out.Items[0].State)
require.Equal(t, item{
State: "publish",
Path: "mypath",
}, out.Items[0])
}

case "hls":
type item struct {
Created string `json:"created"`
LastRequest string `json:"lastRequest"`
}

var out struct {
ItemCount int `json:"itemCount"`
Items []struct {
Created string `json:"created"`
LastRequest string `json:"lastRequest"`
} `json:"items"`
ItemCount int `json:"itemCount"`
Items []item `json:"items"`
}
httpRequest(t, hc, http.MethodGet, "http://localhost:9997/v2/hlsmuxers/list", nil, &out)

Expand All @@ -713,13 +721,9 @@ func TestAPIProtocolList(t *testing.T) {

case "webrtc":
type item struct {
Created time.Time `json:"created"`
RemoteAddr string `json:"remoteAddr"`
PeerConnectionEstablished bool `json:"peerConnectionEstablished"`
LocalCandidate string `json:"localCandidate"`
RemoteCandidate string `json:"remoteCandidate"`
BytesReceived uint64 `json:"bytesReceived"`
BytesSent uint64 `json:"bytesSent"`
PeerConnectionEstablished bool `json:"peerConnectionEstablished"`
State string `json:"state"`
Path string `json:"path"`
}

var out struct {
Expand All @@ -728,7 +732,11 @@ func TestAPIProtocolList(t *testing.T) {
}
httpRequest(t, hc, http.MethodGet, "http://localhost:9997/v2/webrtcsessions/list", nil, &out)

require.Equal(t, true, out.Items[0].PeerConnectionEstablished)
require.Equal(t, item{
PeerConnectionEstablished: true,
State: "read",
Path: "mypath",
}, out.Items[0])
}
})
}
Expand Down
35 changes: 18 additions & 17 deletions internal/core/rtmp_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,12 +210,13 @@ type rtmpConn struct {
pathManager rtmpConnPathManager
parent rtmpConnParent

ctx context.Context
ctxCancel func()
uuid uuid.UUID
created time.Time
state rtmpConnState
stateMutex sync.Mutex
ctx context.Context
ctxCancel func()
uuid uuid.UUID
created time.Time
mutex sync.Mutex
state rtmpConnState
pathName string
}

func newRTMPConn(
Expand Down Expand Up @@ -279,12 +280,6 @@ func (c *rtmpConn) ip() net.IP {
return c.nconn.RemoteAddr().(*net.TCPAddr).IP
}

func (c *rtmpConn) safeState() rtmpConnState {
c.stateMutex.Lock()
defer c.stateMutex.Unlock()
return c.state
}

func (c *rtmpConn) run() {
defer c.wg.Done()

Expand Down Expand Up @@ -380,9 +375,10 @@ func (c *rtmpConn) runRead(ctx context.Context, u *url.URL) error {

defer res.path.readerRemove(pathReaderRemoveReq{author: c})

c.stateMutex.Lock()
c.mutex.Lock()
c.state = rtmpConnStateRead
c.stateMutex.Unlock()
c.pathName = pathName
c.mutex.Unlock()

ringBuffer, _ := ringbuffer.New(uint64(c.readBufferCount))
go func() {
Expand Down Expand Up @@ -794,9 +790,10 @@ func (c *rtmpConn) runPublish(u *url.URL) error {

defer res.path.publisherRemove(pathPublisherRemoveReq{author: c})

c.stateMutex.Lock()
c.mutex.Lock()
c.state = rtmpConnStatePublish
c.stateMutex.Unlock()
c.pathName = pathName
c.mutex.Unlock()

videoFormat, audioFormat, err := c.conn.ReadTracks()
if err != nil {
Expand Down Expand Up @@ -892,12 +889,15 @@ func (c *rtmpConn) apiSourceDescribe() pathAPISourceOrReader {
}

func (c *rtmpConn) apiItem() *apiRTMPConn {
c.mutex.Lock()
defer c.mutex.Unlock()

return &apiRTMPConn{
ID: c.uuid,
Created: c.created,
RemoteAddr: c.remoteAddr().String(),
State: func() string {
switch c.safeState() {
switch c.state {
case rtmpConnStateRead:
return "read"

Expand All @@ -906,6 +906,7 @@ func (c *rtmpConn) apiItem() *apiRTMPConn {
}
return "idle"
}(),
Path: c.pathName,
BytesReceived: c.conn.BytesReceived(),
BytesSent: c.conn.BytesSent(),
}
Expand Down
53 changes: 27 additions & 26 deletions internal/core/rtsp_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,14 @@ type rtspSession struct {
pathManager rtspSessionPathManager
parent rtspSessionParent

uuid uuid.UUID
created time.Time
path *path
stream *stream
state gortsplib.ServerSessionState
stateMutex sync.Mutex
onReadCmd *externalcmd.Cmd // read
uuid uuid.UUID
created time.Time
path *path
stream *stream
onReadCmd *externalcmd.Cmd // read
mutex sync.Mutex
state gortsplib.ServerSessionState
pathName string
}

func newRTSPSession(
Expand Down Expand Up @@ -77,12 +78,6 @@ func (s *rtspSession) close() {
s.session.Close()
}

func (s *rtspSession) safeState() gortsplib.ServerSessionState {
s.stateMutex.Lock()
defer s.stateMutex.Unlock()
return s.state
}

func (s *rtspSession) remoteAddr() net.Addr {
return s.author.NetConn().RemoteAddr()
}
Expand Down Expand Up @@ -157,9 +152,10 @@ func (s *rtspSession) onAnnounce(c *rtspConn, ctx *gortsplib.ServerHandlerOnAnno

s.path = res.path

s.stateMutex.Lock()
s.mutex.Lock()
s.state = gortsplib.ServerSessionStatePreRecord
s.stateMutex.Unlock()
s.pathName = ctx.Path
s.mutex.Unlock()

return &base.Response{
StatusCode: base.StatusOK,
Expand Down Expand Up @@ -242,9 +238,10 @@ func (s *rtspSession) onSetup(c *rtspConn, ctx *gortsplib.ServerHandlerOnSetupCt
s.path = res.path
s.stream = res.stream

s.stateMutex.Lock()
s.mutex.Lock()
s.state = gortsplib.ServerSessionStatePrePlay
s.stateMutex.Unlock()
s.pathName = ctx.Path
s.mutex.Unlock()

return &base.Response{
StatusCode: base.StatusOK,
Expand Down Expand Up @@ -281,9 +278,9 @@ func (s *rtspSession) onPlay(_ *gortsplib.ServerHandlerOnPlayCtx) (*base.Respons
})
}

s.stateMutex.Lock()
s.mutex.Lock()
s.state = gortsplib.ServerSessionStatePlay
s.stateMutex.Unlock()
s.mutex.Unlock()
}

return &base.Response{
Expand Down Expand Up @@ -323,9 +320,9 @@ func (s *rtspSession) onRecord(ctx *gortsplib.ServerHandlerOnRecordCtx) (*base.R
}
}

s.stateMutex.Lock()
s.mutex.Lock()
s.state = gortsplib.ServerSessionStateRecord
s.stateMutex.Unlock()
s.mutex.Unlock()

return &base.Response{
StatusCode: base.StatusOK,
Expand All @@ -341,16 +338,16 @@ func (s *rtspSession) onPause(_ *gortsplib.ServerHandlerOnPauseCtx) (*base.Respo
s.onReadCmd.Close()
}

s.stateMutex.Lock()
s.mutex.Lock()
s.state = gortsplib.ServerSessionStatePrePlay
s.stateMutex.Unlock()
s.mutex.Unlock()

case gortsplib.ServerSessionStateRecord:
s.path.publisherStop(pathPublisherStopReq{author: s})

s.stateMutex.Lock()
s.mutex.Lock()
s.state = gortsplib.ServerSessionStatePreRecord
s.stateMutex.Unlock()
s.mutex.Unlock()
}

return &base.Response{
Expand Down Expand Up @@ -387,12 +384,15 @@ func (s *rtspSession) onDecodeError(ctx *gortsplib.ServerHandlerOnDecodeErrorCtx
}

func (s *rtspSession) apiItem() *apiRTSPSession {
s.mutex.Lock()
defer s.mutex.Unlock()

return &apiRTSPSession{
ID: s.uuid,
Created: s.created,
RemoteAddr: s.remoteAddr().String(),
State: func() string {
switch s.safeState() {
switch s.state {
case gortsplib.ServerSessionStatePrePlay,
gortsplib.ServerSessionStatePlay:
return "read"
Expand All @@ -403,6 +403,7 @@ func (s *rtspSession) apiItem() *apiRTSPSession {
}
return "idle"
}(),
Path: s.pathName,
BytesReceived: s.session.BytesReceived(),
BytesSent: s.session.BytesSent(),
}
Expand Down
Loading