Skip to content

Commit

Permalink
Merge pull request #14 from huiping192/feature/publisher
Browse files Browse the repository at this point in the history
refactoring publisher
  • Loading branch information
huiping192 committed Oct 13, 2023
2 parents 8a2510d + ea0fb36 commit 0552c37
Show file tree
Hide file tree
Showing 5 changed files with 227 additions and 322 deletions.
11 changes: 0 additions & 11 deletions Sources/HPLiveKit/LiveSession.swift
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,6 @@ public class LiveSession: NSObject {
// 推流 publisher
private var publisher: Publisher?

// 视频保存 archive to local document
private var filePublisher: FilePublisher = FilePublisher()

// 调试信息 debug info
private var debugInfo: LiveDebug?
// 流信息 stream info
Expand Down Expand Up @@ -120,9 +117,6 @@ public class LiveSession: NSObject {
}
}

// 是否保存在本地文件
// should save to local file, default is no
public var saveLocalVideo: Bool = false

public init(audioConfiguration: LiveAudioConfiguration, videoConfiguration: LiveVideoConfiguration) {
self.audioConfiguration = audioConfiguration
Expand Down Expand Up @@ -194,11 +188,6 @@ private extension LiveSession {
guard let publisher = publisher else { return }

await publisher.send(frame: frame)

// save to file
if saveLocalVideo {
filePublisher.save(frame: frame)
}
}
}
}
Expand Down
73 changes: 0 additions & 73 deletions Sources/HPLiveKit/Publish/FilePublisher.swift

This file was deleted.

18 changes: 9 additions & 9 deletions Sources/HPLiveKit/Publish/Publisher.swift
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@

import Foundation

protocol PublisherDelegate: class {
/** callback buffer current status (回调当前缓冲区情况,可实现相关切换帧率 码率等策略)*/
func publisher(publisher: Publisher, bufferStatus: BufferState)
/** callback publish current status (回调当前网络情况) */
func publisher(publisher: Publisher, publishStatus: LiveState)
/** callback publish error */
func publisher(publisher: Publisher, errorCode: LiveSocketErrorCode)
/** callback debugInfo */
func publisher(publisher: Publisher, debugInfo: LiveDebug)
protocol PublisherDelegate: AnyObject {
/** callback buffer current status */
func publisher(publisher: Publisher, bufferStatus: BufferState)
/** callback publish current status */
func publisher(publisher: Publisher, publishStatus: LiveState)
/** callback publish error */
func publisher(publisher: Publisher, errorCode: LiveSocketErrorCode)
/** callback debugInfo */
func publisher(publisher: Publisher, debugInfo: LiveDebug)
}

protocol Publisher {
Expand Down
97 changes: 46 additions & 51 deletions Sources/HPLiveKit/Publish/RtmpPublisher.swift
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import HPRTMP
import QuartzCore

actor RtmpPublisher: Publisher {

///< 重连1分钟 3秒一次 一共20次
private let retryTimesBreaken = 5
private let retryTimesMargin = 3
Expand All @@ -28,13 +28,9 @@ actor RtmpPublisher: Publisher {

private let stream: LiveStreamInfo

private lazy var buffer: StreamingBuffer = {
let buffer = StreamingBuffer()
buffer.delegate = self
return buffer
}()
private let buffer: StreamingBuffer = StreamingBuffer()
private var debugInfo: LiveDebug = .init()

//错误信息
private var retryTimes4netWorkBreaken: Int = 0
private let reconnectInterval: Int
Expand Down Expand Up @@ -74,26 +70,20 @@ actor RtmpPublisher: Publisher {
configure = conf

Task {
await buffer.setDelegate(delegate: self)
await self.rtmp.setDelegate(self)
}
}

nonisolated func start() {
Task {
await self._start()
}
}

private func _start() async {
guard !isConnected else { return }

debugInfo.uploadUrl = stream.url

func start() async {
guard !isConnecting else { return }

isConnecting = true

debugInfo.uploadUrl = stream.url

delegate?.publisher(publisher: self, publishStatus: .pending)

await connect()
}

Expand Down Expand Up @@ -128,7 +118,6 @@ actor RtmpPublisher: Publisher {
private func _reconnect() async {
self.isReconnecting = false
if isConnected { return }
if isConnected { return }

sendAudioHead = false
sendVideoHead = false
Expand All @@ -141,32 +130,27 @@ actor RtmpPublisher: Publisher {
}

func stop() async {
await self._stop()
}

private func _stop() async {
delegate?.publisher(publisher: self, publishStatus: .stop)

await rtmp.invalidate()

clean()
await clean()
}

private func clean() {
private func clean() async {
isConnected = false
isReconnecting = false
isSending = false
isConnected = false
sendAudioHead = false
sendVideoHead = false
debugInfo = LiveDebug()
buffer.removeAll()
await buffer.removeAll()
retryTimes4netWorkBreaken = 0
}

func send(frame: any Frame) {
buffer.append(frame: frame)
Task {
await buffer.append(frame: frame)
if !isSending {
await self.sendFrame()
}
Expand All @@ -177,20 +161,21 @@ actor RtmpPublisher: Publisher {

private extension RtmpPublisher {
func sendFrame() async {
guard !self.isSending && !self.buffer.list.isEmpty else { return }

guard !self.isSending else { return }
guard await !buffer.isEmpty else { return }

self.isSending = true

if !self.isConnected || self.isReconnecting || self.isConnecting {
self.isSending = false
return
}

guard let frame = self.buffer.popFirstFrame() else { return }
guard let frame = await self.buffer.popFirstFrame() else { return }

await pushFrame(frame: frame)

updateDebugInfo(frame: frame)
await updateDebugInfo(frame: frame)

self.isSending = false
}
Expand Down Expand Up @@ -225,22 +210,18 @@ private extension RtmpPublisher {
func pushAudio(frame: AudioFrame) async {
if !self.sendAudioHead {
self.sendAudioHead = true
if frame.header == nil {
self.isSending = false
return
}
await self.sendAudioHeader(frame: frame)
await self.sendAudioFrame(frame: frame)
} else {
await self.sendAudioFrame(frame: frame)
}
}

func updateDebugInfo(frame: any Frame) {
func updateDebugInfo(frame: any Frame) async {
//debug更新
self.debugInfo.totalFrameCount += 1
self.debugInfo.dropFrameCount += self.buffer.lastDropFrames
self.buffer.lastDropFrames = 0
self.debugInfo.dropFrameCount += await self.buffer.lastDropFrames
await self.buffer.clearDropFramesCount()

self.debugInfo.allDataSize += CGFloat(frame.data?.count ?? 0)
self.debugInfo.elapsedMilli = CGFloat(UInt64(CACurrentMediaTime() * 1000)) - self.debugInfo.currentTimeStamp
Expand All @@ -252,7 +233,7 @@ private extension RtmpPublisher {
} else {
debugInfo.capturedVideoCountPerSec += 1
}
debugInfo.unsendCount = buffer.list.count
debugInfo.unsendCount = await buffer.list.count
} else {
debugInfo.currentBandwidth = debugInfo.bandwidthPerSec
debugInfo.currentCapturedAudioCount = debugInfo.currentCapturedAudioCount
Expand All @@ -273,27 +254,41 @@ private extension RtmpPublisher {
func sendVideoHeader(frame: VideoFrame) async {
guard let sps = frame.sps, let pps = frame.pps else { return }
var body = Data()
body.append(Data([0x17]))
body.append(Data([0x00]))
// Video Tag Header, key frame and avc encode
let frameAndCode:UInt8 = UInt8(VideoData.FrameType.keyframe.rawValue << 4 | VideoData.CodecId.avc.rawValue)
body.append(Data([frameAndCode]))

// AVC sequence header
body.append(Data([VideoData.AVCPacketType.header.rawValue]))

// CompositionTime 0
body.append(Data([0x00, 0x00, 0x00]))

body.append(Data([0x01]))

let spsSize = sps.count
// AVCDecoderConfigurationRecord

// configurationVersion
body.append(Data([0x01]))

// AVCProfileIndication,profile_compatibility,AVCLevelIndication, lengthSizeMinusOne
body.append(Data([sps[1], sps[2], sps[3], 0xff]))

/*sps*/

// numOfSequenceParameterSets
body.append(Data([0xe1]))
body.append(Data([(UInt8(spsSize) >> 8) & 0xff, UInt8(spsSize) & 0xff]))
// sequenceParameterSetLength
body.append(UInt16(sps.count).bigEndian.data)
// sequenceParameterSetNALUnit
body.append(Data(sps))

let ppsSize = pps.count


/*pps*/

// numOfPictureParameterSets
body.append(Data([0x01]))
body.append(Data([(UInt8(ppsSize) >> 8) & 0xff, UInt8(ppsSize) & 0xff]))
// pictureParameterSetLength
body.append(UInt16(pps.count).bigEndian.data)
// pictureParameterSetNALUnit
body.append(Data(pps))

await rtmp.publishVideoHeader(data: body)
Expand Down
Loading

0 comments on commit 0552c37

Please sign in to comment.