forked from deepch/RTSPtoWeb
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathstreamCore.go
177 lines (170 loc) · 4.81 KB
/
streamCore.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
package main
import (
"math"
"time"
"github.com/deepch/vdk/av"
"github.com/deepch/vdk/format/rtspv2"
"github.com/sirupsen/logrus"
)
//StreamServerRunStreamDo stream run do mux
func StreamServerRunStreamDo(streamID string, channelID string) {
var status int
defer func() {
//TODO fix it no need unlock run if delete stream
if status != 2 {
Storage.StreamChannelUnlock(streamID, channelID)
}
}()
for {
baseLogger := log.WithFields(logrus.Fields{
"module": "core",
"stream": streamID,
"channel": channelID,
"func": "StreamServerRunStreamDo",
})
baseLogger.WithFields(logrus.Fields{"call": "Run"}).Infoln("Run stream")
opt, err := Storage.StreamChannelControl(streamID, channelID)
if err != nil {
baseLogger.WithFields(logrus.Fields{
"call": "StreamChannelControl",
}).Infoln("Exit", err)
return
}
if opt.OnDemand && !Storage.ClientHas(streamID, channelID) {
baseLogger.WithFields(logrus.Fields{
"call": "ClientHas",
}).Infoln("Stop stream no client")
return
}
status, err = StreamServerRunStream(streamID, channelID, opt)
if status > 0 {
baseLogger.WithFields(logrus.Fields{
"call": "StreamServerRunStream",
}).Infoln("Stream exit by signal or not client")
return
}
if err != nil {
log.WithFields(logrus.Fields{
"call": "Restart",
}).Errorln("Stream error restart stream", err)
}
time.Sleep(2 * time.Second)
}
}
//StreamServerRunStream core stream
func StreamServerRunStream(streamID string, channelID string, opt *ChannelST) (int, error) {
keyTest := time.NewTimer(20 * time.Second)
checkClients := time.NewTimer(20 * time.Second)
var start bool
var fps int
var preKeyTS = time.Duration(0)
var Seq []*av.Packet
RTSPClient, err := rtspv2.Dial(rtspv2.RTSPClientOptions{URL: opt.URL, InsecureSkipVerify: opt.InsecureSkipVerify, DisableAudio: !opt.Audio, DialTimeout: 3 * time.Second, ReadWriteTimeout: 5 * time.Second, Debug: opt.Debug, OutgoingProxy: true})
if err != nil {
return 0, err
}
Storage.StreamChannelStatus(streamID, channelID, ONLINE)
defer func() {
RTSPClient.Close()
Storage.StreamChannelStatus(streamID, channelID, OFFLINE)
Storage.StreamHLSFlush(streamID, channelID)
}()
var WaitCodec bool
/*
Example wait codec
*/
if RTSPClient.WaitCodec {
WaitCodec = true
} else {
if len(RTSPClient.CodecData) > 0 {
Storage.StreamChannelCodecsUpdate(streamID, channelID, RTSPClient.CodecData, RTSPClient.SDPRaw)
}
}
log.WithFields(logrus.Fields{
"module": "core",
"stream": streamID,
"channel": channelID,
"func": "StreamServerRunStream",
"call": "Start",
}).Infoln("Success connection RTSP")
var ProbeCount int
var ProbeFrame int
var ProbePTS time.Duration
Storage.NewHLSMuxer(streamID, channelID)
defer Storage.HLSMuxerClose(streamID, channelID)
for {
select {
//Check stream have clients
case <-checkClients.C:
if opt.OnDemand && !Storage.ClientHas(streamID, channelID) {
return 1, ErrorStreamNoClients
}
checkClients.Reset(20 * time.Second)
//Check stream send key
case <-keyTest.C:
return 0, ErrorStreamNoVideo
//Read core signals
case signals := <-opt.signals:
switch signals {
case SignalStreamStop:
return 2, ErrorStreamStopCoreSignal
case SignalStreamRestart:
return 0, ErrorStreamRestart
case SignalStreamClient:
return 1, ErrorStreamNoClients
}
//Read rtsp signals
case signals := <-RTSPClient.Signals:
switch signals {
case rtspv2.SignalCodecUpdate:
Storage.StreamChannelCodecsUpdate(streamID, channelID, RTSPClient.CodecData, RTSPClient.SDPRaw)
WaitCodec = false
case rtspv2.SignalStreamRTPStop:
return 0, ErrorStreamStopRTSPSignal
}
case packetRTP := <-RTSPClient.OutgoingProxyQueue:
Storage.StreamChannelCastProxy(streamID, channelID, packetRTP)
case packetAV := <-RTSPClient.OutgoingPacketQueue:
if WaitCodec {
continue
}
if packetAV.IsKeyFrame {
keyTest.Reset(20 * time.Second)
if preKeyTS > 0 {
Storage.StreamHLSAdd(streamID, channelID, Seq, packetAV.Time-preKeyTS)
Seq = []*av.Packet{}
}
preKeyTS = packetAV.Time
}
Seq = append(Seq, packetAV)
Storage.StreamChannelCast(streamID, channelID, packetAV)
/*
HLS LL Test
*/
if packetAV.IsKeyFrame && !start {
start = true
}
/*
FPS mode probe
*/
if start {
ProbePTS += packetAV.Duration
ProbeFrame++
if packetAV.IsKeyFrame && ProbePTS.Seconds() >= 1 {
ProbeCount++
if ProbeCount == 2 {
fps = int(math.Round(float64(ProbeFrame) / ProbePTS.Seconds()))
}
ProbeFrame = 0
ProbePTS = 0
}
}
if start && fps != 0 {
//TODO fix it
packetAV.Duration = time.Duration((float32(1000)/float32(fps))*1000*1000) * time.Nanosecond
Storage.HlsMuxerSetFPS(streamID, channelID, fps)
Storage.HlsMuxerWritePacket(streamID, channelID, packetAV)
}
}
}
}