forked from iizukanao/node-rtsp-rtmp-server
-
Notifications
You must be signed in to change notification settings - Fork 0
/
stream_server.coffee
387 lines (322 loc) · 12.7 KB
/
stream_server.coffee
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
# RTSP and RTMP/RTMPE/RTMPT/RTMPTE server implementation.
# Also serves HTTP contents as this server is meant to
# be run on port 80.
net = require 'net'
fs = require 'fs'
crypto = require 'crypto'
config = require './config'
rtmp = require './rtmp'
http = require './http'
rtsp = require './rtsp'
h264 = require './h264'
aac = require './aac'
mp4 = require './mp4'
Bits = require './bits'
avstreams = require './avstreams'
CustomReceiver = require './custom_receiver'
logger = require './logger'
packageJson = require './package.json'
Sequent = require 'sequent'
# If true, incoming video/audio packets are printed to the console
DEBUG_INCOMING_PACKET_DATA = false
# If true, hash value of each incoming video/audio access unit is printed to the console
DEBUG_INCOMING_PACKET_HASH = false
## Default server name for RTSP and HTTP responses
DEFAULT_SERVER_NAME = "node-rtsp-rtmp-server/#{packageJson.version}"
serverName = config.serverName ? DEFAULT_SERVER_NAME
class StreamServer
constructor: (opts) ->
@serverName = opts?.serverName ? serverName
if config.enableRTMP or config.enableRTMPT
# Create RTMP server
@rtmpServer = new rtmp.RTMPServer
@rtmpServer.on 'video_start', (streamId) =>
stream = avstreams.getOrCreate streamId
@onReceiveVideoControlBuffer stream
@rtmpServer.on 'video_data', (streamId, pts, dts, nalUnits) =>
stream = avstreams.get streamId
if stream?
@onReceiveVideoPacket stream, nalUnits, pts, dts
else
logger.warn "warn: Received invalid streamId from rtmp: #{streamId}"
@rtmpServer.on 'audio_start', (streamId) =>
stream = avstreams.getOrCreate streamId
@onReceiveAudioControlBuffer stream
@rtmpServer.on 'audio_data', (streamId, pts, dts, adtsFrame) =>
stream = avstreams.get streamId
if stream?
@onReceiveAudioPacket stream, adtsFrame, pts, dts
else
logger.warn "warn: Received invalid streamId from rtmp: #{streamId}"
if config.enableCustomReceiver
# Setup data receivers for custom protocol
@customReceiver = new CustomReceiver config.receiverType,
videoControl: =>
@onReceiveVideoControlBuffer arguments...
audioControl: =>
@onReceiveAudioControlBuffer arguments...
videoData: =>
@onReceiveVideoDataBuffer arguments...
audioData: =>
@onReceiveAudioDataBuffer arguments...
# Delete old sockets
@customReceiver.deleteReceiverSocketsSync()
if config.enableHTTP
@httpHandler = new http.HTTPHandler
serverName: @serverName
documentRoot: opts?.documentRoot
if config.enableRTSP or config.enableHTTP or config.enableRTMPT
if config.enableRTMPT
rtmptCallback = =>
@rtmpServer.handleRTMPTRequest arguments...
else
rtmptCallback = null
if config.enableHTTP
httpHandler = @httpHandler
else
httpHandler = null
@rtspServer = new rtsp.RTSPServer
serverName : @serverName
httpHandler: httpHandler
rtmptCallback: rtmptCallback
@rtspServer.on 'video_start', (stream) =>
@onReceiveVideoControlBuffer stream
@rtspServer.on 'audio_start', (stream) =>
@onReceiveAudioControlBuffer stream
@rtspServer.on 'video', (stream, nalUnits, pts, dts) =>
@onReceiveVideoNALUnits stream, nalUnits, pts, dts
@rtspServer.on 'audio', (stream, accessUnits, pts, dts) =>
@onReceiveAudioAccessUnits stream, accessUnits, pts, dts
avstreams.on 'new', (stream) ->
if DEBUG_INCOMING_PACKET_HASH
stream.lastSentVideoTimestamp = 0
avstreams.on 'reset', (stream) ->
if DEBUG_INCOMING_PACKET_HASH
stream.lastSentVideoTimestamp = 0
avstreams.on 'end', (stream) =>
if config.enableRTSP
@rtspServer.sendEOS stream
if config.enableRTMP or config.enableRTMPT
@rtmpServer.sendEOS stream
# for mp4
avstreams.on 'audio_data', (stream, data, pts) =>
@onReceiveAudioAccessUnits stream, [ data ], pts, pts
avstreams.on 'video_data', (stream, nalUnits, pts, dts) =>
if not dts?
dts = pts
@onReceiveVideoNALUnits stream, nalUnits, pts, dts
avstreams.on 'audio_start', (stream) =>
@onReceiveAudioControlBuffer stream
avstreams.on 'video_start', (stream) =>
@onReceiveVideoControlBuffer stream
## TODO: Do we need to do something for remove_stream event?
#avstreams.on 'remove_stream', (stream) ->
# logger.raw "received remove_stream event from stream #{stream.id}"
attachRecordedDir: (dir) ->
if config.recordedApplicationName?
logger.info "attachRecordedDir: dir=#{dir} app=#{config.recordedApplicationName}"
avstreams.attachRecordedDirToApp dir, config.recordedApplicationName
attachMP4: (filename, streamName) ->
logger.info "attachMP4: file=#{filename} stream=#{streamName}"
context = this
generator = new avstreams.AVStreamGenerator
# Generate an AVStream upon request
generate: ->
try
mp4File = new mp4.MP4File filename
catch err
logger.error "error opening MP4 file #{filename}: #{err}"
return null
streamId = avstreams.createNewStreamId()
mp4Stream = new avstreams.MP4Stream streamId
logger.info "created stream #{streamId} from #{filename}"
avstreams.emit 'new', mp4Stream
avstreams.add mp4Stream
mp4Stream.type = avstreams.STREAM_TYPE_RECORDED
audioSpecificConfig = null
mp4File.on 'audio_data', (data, pts) ->
context.onReceiveAudioAccessUnits mp4Stream, [ data ], pts, pts
mp4File.on 'video_data', (nalUnits, pts, dts) ->
if not dts?
dts = pts
context.onReceiveVideoNALUnits mp4Stream, nalUnits, pts, dts
mp4File.on 'eof', =>
mp4Stream.emit 'end'
mp4File.parse()
mp4Stream.updateSPS mp4File.getSPS()
mp4Stream.updatePPS mp4File.getPPS()
ascBuf = mp4File.getAudioSpecificConfig()
bits = new Bits ascBuf
ascInfo = aac.readAudioSpecificConfig bits
mp4Stream.updateConfig
audioSpecificConfig: ascBuf
audioASCInfo: ascInfo
audioSampleRate: ascInfo.samplingFrequency
audioClockRate: 90000
audioChannels: ascInfo.channelConfiguration
audioObjectType: ascInfo.audioObjectType
mp4Stream.durationSeconds = mp4File.getDurationSeconds()
mp4Stream.lastTagTimestamp = mp4File.getLastTimestamp()
mp4Stream.mp4File = mp4File
mp4File.fillBuffer ->
context.onReceiveAudioControlBuffer mp4Stream
context.onReceiveVideoControlBuffer mp4Stream
return mp4Stream
play: ->
@mp4File.play()
pause: ->
@mp4File.pause()
resume: ->
return @mp4File.resume()
seek: (seekSeconds, callback) ->
actualStartTime = @mp4File.seek seekSeconds
callback null, actualStartTime
sendVideoPacketsSinceLastKeyFrame: (endSeconds, callback) ->
@mp4File.sendVideoPacketsSinceLastKeyFrame endSeconds, callback
teardown: ->
@mp4File.close()
@destroy()
getCurrentPlayTime: ->
return @mp4File.currentPlayTime
isPaused: ->
return @mp4File.isPaused()
avstreams.addGenerator streamName, generator
stop: (callback) ->
if config.enableCustomReceiver
@customReceiver.deleteReceiverSocketsSync()
callback?()
start: (callback) ->
seq = new Sequent
waitCount = 0
if config.enableRTMP
waitCount++
@rtmpServer.start { port: config.rtmpServerPort }, ->
seq.done()
# RTMP server is ready
if config.enableCustomReceiver
# Start data receivers for custom protocol
@customReceiver.start()
if config.enableRTSP or config.enableHTTP or config.enableRTMPT
waitCount++
@rtspServer.start { port: config.serverPort }, ->
seq.done()
seq.wait waitCount, ->
callback?()
setLivePathConsumer: (func) ->
if config.enableRTSP
@rtspServer.setLivePathConsumer func
setAuthenticator: (func) ->
if config.enableRTSP
@rtspServer.setAuthenticator func
# buf argument can be null (not used)
onReceiveVideoControlBuffer: (stream, buf) ->
stream.resetFrameRate stream
stream.isVideoStarted = true
stream.timeAtVideoStart = Date.now()
stream.timeAtAudioStart = stream.timeAtVideoStart
# stream.spropParameterSets = ''
# buf argument can be null (not used)
onReceiveAudioControlBuffer: (stream, buf) ->
stream.isAudioStarted = true
stream.timeAtAudioStart = Date.now()
stream.timeAtVideoStart = stream.timeAtAudioStart
onReceiveVideoDataBuffer: (stream, buf) ->
pts = buf[1] * 0x010000000000 + \
buf[2] * 0x0100000000 + \
buf[3] * 0x01000000 + \
buf[4] * 0x010000 + \
buf[5] * 0x0100 + \
buf[6]
# TODO: Support dts
dts = pts
nalUnit = buf[7..]
@onReceiveVideoPacket stream, nalUnit, pts, dts
onReceiveAudioDataBuffer: (stream, buf) ->
pts = buf[1] * 0x010000000000 + \
buf[2] * 0x0100000000 + \
buf[3] * 0x01000000 + \
buf[4] * 0x010000 + \
buf[5] * 0x0100 + \
buf[6]
# TODO: Support dts
dts = pts
adtsFrame = buf[7..]
@onReceiveAudioPacket stream, adtsFrame, pts, dts
# nal_unit_type 5 must not separated with 7 and 8 which
# share the same timestamp as 5
onReceiveVideoNALUnits: (stream, nalUnits, pts, dts) ->
if DEBUG_INCOMING_PACKET_DATA
logger.info "receive video: num_nal_units=#{nalUnits.length} pts=#{pts}"
if config.enableRTSP
# rtspServer will parse nalUnits and updates SPS/PPS for the stream,
# so we don't need to parse them here.
# TODO: Should SPS/PPS be parsed here?
@rtspServer.sendVideoData stream, nalUnits, pts, dts
if config.enableRTMP or config.enableRTMPT
@rtmpServer.sendVideoPacket stream, nalUnits, pts, dts
hasVideoFrame = false
for nalUnit in nalUnits
nalUnitType = h264.getNALUnitType nalUnit
if nalUnitType is h264.NAL_UNIT_TYPE_SPS # 7
stream.updateSPS nalUnit
else if nalUnitType is h264.NAL_UNIT_TYPE_PPS # 8
stream.updatePPS nalUnit
else if (nalUnitType is h264.NAL_UNIT_TYPE_IDR_PICTURE) or
(nalUnitType is h264.NAL_UNIT_TYPE_NON_IDR_PICTURE) # 5 (key frame) or 1 (inter frame)
hasVideoFrame = true
if DEBUG_INCOMING_PACKET_HASH
md5 = crypto.createHash 'md5'
md5.update nalUnit
tsDiff = pts - stream.lastSentVideoTimestamp
logger.info "video: pts=#{pts} pts_diff=#{tsDiff} md5=#{md5.digest('hex')[0..6]} nal_unit_type=#{nalUnitType} bytes=#{nalUnit.length}"
stream.lastSentVideoTimestamp = pts
if hasVideoFrame
stream.calcFrameRate pts
return
# Takes H.264 NAL units separated by start code (0x000001)
#
# arguments:
# nalUnit: Buffer
# pts: timestamp in 90 kHz clock rate (PTS)
onReceiveVideoPacket: (stream, nalUnitGlob, pts, dts) ->
nalUnits = h264.splitIntoNALUnits nalUnitGlob
if nalUnits.length is 0
return
@onReceiveVideoNALUnits stream, nalUnits, pts, dts
return
# pts, dts: in 90KHz clock rate
onReceiveAudioAccessUnits: (stream, accessUnits, pts, dts) ->
if config.enableRTSP
@rtspServer.sendAudioData stream, accessUnits, pts, dts
if DEBUG_INCOMING_PACKET_DATA
logger.info "receive audio: num_access_units=#{accessUnits.length} pts=#{pts}"
ptsPerFrame = 90000 / (stream.audioSampleRate / 1024)
for accessUnit, i in accessUnits
if DEBUG_INCOMING_PACKET_HASH
md5 = crypto.createHash 'md5'
md5.update accessUnit
logger.info "audio: pts=#{pts} md5=#{md5.digest('hex')[0..6]} bytes=#{accessUnit.length}"
if config.enableRTMP or config.enableRTMPT
@rtmpServer.sendAudioPacket stream, accessUnit,
Math.round(pts + ptsPerFrame * i),
Math.round(dts + ptsPerFrame * i)
return
# pts, dts: in 90KHz clock rate
onReceiveAudioPacket: (stream, adtsFrameGlob, pts, dts) ->
adtsFrames = aac.splitIntoADTSFrames adtsFrameGlob
if adtsFrames.length is 0
return
adtsInfo = aac.parseADTSFrame adtsFrames[0]
isConfigUpdated = false
stream.updateConfig
audioSampleRate: adtsInfo.sampleRate
audioClockRate: adtsInfo.sampleRate
audioChannels: adtsInfo.channels
audioObjectType: adtsInfo.audioObjectType
rtpTimePerFrame = 1024
rawDataBlocks = []
for adtsFrame, i in adtsFrames
rawDataBlock = adtsFrame[7..]
rawDataBlocks.push rawDataBlock
@onReceiveAudioAccessUnits stream, rawDataBlocks, pts, dts
module.exports = StreamServer