Skip to content

Commit

Permalink
streamingBuffer to actor
Browse files Browse the repository at this point in the history
  • Loading branch information
huiping192 committed Sep 14, 2023
1 parent f721ee7 commit 3231634
Show file tree
Hide file tree
Showing 2 changed files with 187 additions and 195 deletions.
32 changes: 15 additions & 17 deletions Sources/HPLiveKit/Publish/RtmpPublisher.swift
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,7 @@ actor RtmpPublisher: Publisher {

private let stream: LiveStreamInfo

private lazy var buffer: StreamingBuffer = {
let buffer = StreamingBuffer()
buffer.delegate = self
return buffer
}()
private let buffer: StreamingBuffer = StreamingBuffer()
private var debugInfo: LiveDebug = .init()

//错误信息
Expand Down Expand Up @@ -74,6 +70,7 @@ actor RtmpPublisher: Publisher {
configure = conf

Task {
await buffer.setDelegate(delegate: self)
await self.rtmp.setDelegate(self)
}
}
Expand Down Expand Up @@ -139,23 +136,23 @@ actor RtmpPublisher: Publisher {

await rtmp.invalidate()

clean()
await clean()
}

private func clean() {
private func clean() async {
isConnected = false
isReconnecting = false
isSending = false
sendAudioHead = false
sendVideoHead = false
debugInfo = LiveDebug()
buffer.removeAll()
await buffer.removeAll()
retryTimes4netWorkBreaken = 0
}

func send(frame: any Frame) {
buffer.append(frame: frame)
Task {
await buffer.append(frame: frame)
if !isSending {
await self.sendFrame()
}
Expand All @@ -166,20 +163,21 @@ actor RtmpPublisher: Publisher {

private extension RtmpPublisher {
func sendFrame() async {
guard !self.isSending && !self.buffer.list.isEmpty else { return }

guard !self.isSending else { return }
guard await !buffer.isEmpty else { return }

self.isSending = true

if !self.isConnected || self.isReconnecting || self.isConnecting {
self.isSending = false
return
}

guard let frame = self.buffer.popFirstFrame() else { return }
guard let frame = await self.buffer.popFirstFrame() else { return }

await pushFrame(frame: frame)

updateDebugInfo(frame: frame)
await updateDebugInfo(frame: frame)

self.isSending = false
}
Expand Down Expand Up @@ -221,11 +219,11 @@ private extension RtmpPublisher {
}
}

func updateDebugInfo(frame: any Frame) {
func updateDebugInfo(frame: any Frame) async {
//debug更新
self.debugInfo.totalFrameCount += 1
self.debugInfo.dropFrameCount += self.buffer.lastDropFrames
self.buffer.lastDropFrames = 0
self.debugInfo.dropFrameCount += await self.buffer.lastDropFrames
await self.buffer.clearDropFramesCount()

self.debugInfo.allDataSize += CGFloat(frame.data?.count ?? 0)
self.debugInfo.elapsedMilli = CGFloat(UInt64(CACurrentMediaTime() * 1000)) - self.debugInfo.currentTimeStamp
Expand All @@ -237,7 +235,7 @@ private extension RtmpPublisher {
} else {
debugInfo.capturedVideoCountPerSec += 1
}
debugInfo.unsendCount = buffer.list.count
debugInfo.unsendCount = await buffer.list.count
} else {
debugInfo.currentBandwidth = debugInfo.bandwidthPerSec
debugInfo.currentCapturedAudioCount = debugInfo.currentCapturedAudioCount
Expand Down
Loading

0 comments on commit 3231634

Please sign in to comment.