Skip to content

Commit

Permalink
start sending frame when rtmp is publishable
Browse files Browse the repository at this point in the history
  • Loading branch information
huiping192 committed Jul 15, 2023
1 parent 9f7499e commit 7226cf0
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 34 deletions.
2 changes: 1 addition & 1 deletion Example/HPLiveKit/ViewController.swift
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class ViewController: UIViewController {
@objc private func buttonTapped() {
switch liveState {
case .ready, .stop, .error:
let info = LiveStreamInfo(streamId: "sample1", url: "rtmp://192.168.11.23:1936/live/haha")
let info = LiveStreamInfo(streamId: "sample1", url: "rtmp://192.168.11.23/live/haha")
liveSession?.startLive(streamInfo: info)
liveState = .start
button.setTitle("Stop", for: .normal)
Expand Down
65 changes: 32 additions & 33 deletions Sources/HPLiveKit/Publish/RtmpPublisher.swift
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,9 @@ actor RtmpPublisher: Publisher {
debugInfo.streamId = stream.streamId
debugInfo.uploadUrl = stream.url

guard !isConnected else { return }
guard !isConnecting else { return }

isConnected = true
isConnecting = true
delegate?.publisher(publisher: self, publishStatus: .pending)

await connect()
Expand All @@ -110,11 +110,6 @@ actor RtmpPublisher: Publisher {
await rtmp.publish(url: stream.url, configure: configure)

delegate?.publisher(publisher: self, publishStatus: .start)

isConnected = true
isConnecting = false
isReconnecting = false
isSending = false
}

private func reconnect() {
Expand Down Expand Up @@ -172,39 +167,38 @@ actor RtmpPublisher: Publisher {
retryTimes4netWorkBreaken = 0
}

func send(frame: Frame) async {
func send(frame: Frame) {
buffer.append(frame: frame)
if !isSending {
self.sendFrame()
Task {
if !isSending {
await self.sendFrame()
}
}
}

}

private extension RtmpPublisher {
func sendFrame() {
Task {
guard !self.isSending && !self.buffer.list.isEmpty else { return }

self.isSending = true

if !self.isConnected || self.isReconnecting || self.isConnecting {
self.isSending = false
return
}

guard let frame = self.buffer.popFirstFrame() else { return }

await pushFrame(frame: frame)

updateDebugInfo(frame: frame)

func sendFrame() async {
guard !self.isSending && !self.buffer.list.isEmpty else { return }

self.isSending = true

if !self.isConnected || self.isReconnecting || self.isConnecting {
self.isSending = false
return
}

guard let frame = self.buffer.popFirstFrame() else { return }

await pushFrame(frame: frame)

updateDebugInfo(frame: frame)

self.isSending = false
}

func pushFrame(frame: Frame) async {
guard await rtmp.publishStatus == .publishStart else { return }
if let frame = frame as? VideoFrame {
await pushVideo(frame: frame)
return
Expand All @@ -225,6 +219,7 @@ private extension RtmpPublisher {
}

await sendVideoHeader(frame: frame)
await sendVideoFrame(frame: frame)
} else {
await sendVideoFrame(frame: frame)
}
Expand All @@ -238,6 +233,7 @@ private extension RtmpPublisher {
return
}
await self.sendAudioHeader(frame: frame)
await self.sendAudioFrame(frame: frame)
} else {
await self.sendAudioFrame(frame: frame)
}
Expand Down Expand Up @@ -303,7 +299,6 @@ private extension RtmpPublisher {
body.append(Data([(UInt8(ppsSize) >> 8) & 0xff, UInt8(ppsSize) & 0xff]))
body.append(Data(pps))

self.lastVideoTimestamp = frame.timestamp
await rtmp.publishVideoHeader(data: body)
}

Expand All @@ -326,7 +321,7 @@ private extension RtmpPublisher {
descData.append(Data([frameAndCode]))
descData.append(Data([VideoData.AVCPacketType.nalu.rawValue]))

let delta = frame.timestamp - lastVideoTimestamp
let delta = lastVideoTimestamp != 0 ? frame.timestamp - lastVideoTimestamp : 0
// 24bit
descData.write24(frame.compositionTime, bigEndian: true)
descData.append(data)
Expand All @@ -339,7 +334,6 @@ private extension RtmpPublisher {
return
}
// Publish the audio header to the RTMP server
lastAudioTimestamp = frame.timestamp
await rtmp.publishAudioHeader(data: header)
}

Expand All @@ -351,7 +345,7 @@ private extension RtmpPublisher {
audioPacketData.append(aacHeader)
audioPacketData.write(AudioData.AACPacketType.raw.rawValue)
audioPacketData.append(data)
let delta = UInt32(frame.timestamp - lastAudioTimestamp)
let delta = lastAudioTimestamp != 0 ? UInt32(frame.timestamp - lastAudioTimestamp) : 0
await rtmp.publishAudio(data: audioPacketData, delta: delta)
lastAudioTimestamp = frame.timestamp
}
Expand All @@ -371,7 +365,12 @@ extension RtmpPublisher: RTMPPublishSessionDelegate {
}

func sessionStatusChange(_ session: HPRTMP.RTMPPublishSession, status: HPRTMP.RTMPPublishSession.Status) {

if status == .publishStart {
isConnected = true
isConnecting = false
isReconnecting = false
isSending = false
}
}
}

Expand Down

0 comments on commit 7226cf0

Please sign in to comment.