Skip to content

Commit

Permalink
Sync streams (Protocol 10) (livekit#291)
Browse files Browse the repository at this point in the history
Reference: livekit/client-sdk-js#881
Protocol is already set to v10.
  • Loading branch information
hiroshihorie authored Jan 8, 2024
1 parent 67401d6 commit 1461398
Show file tree
Hide file tree
Showing 9 changed files with 51 additions and 28 deletions.
8 changes: 6 additions & 2 deletions Sources/LiveKit/Core/Engine+TransportDelegate.swift
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,19 @@ extension Engine: TransportDelegate {
}

func transport(_ transport: Transport, didAddTrack track: LKRTCMediaStreamTrack, rtpReceiver: LKRTCRtpReceiver, streams: [LKRTCMediaStream]) {
log("did add track")
guard !streams.isEmpty else {
log("Received onTrack with no streams!", .warning)
return
}

if transport.target == .subscriber {
// execute block when connected
execute(when: { state, _ in state.connectionState == .connected },
// always remove this block when disconnected
removeWhen: { state, _ in state.connectionState == .disconnected })
{ [weak self] in
guard let self else { return }
self.notify { $0.engine(self, didAddTrack: track, rtpReceiver: rtpReceiver, streams: streams) }
self.notify { $0.engine(self, didAddTrack: track, rtpReceiver: rtpReceiver, stream: streams.first!) }
}
}
}
Expand Down
20 changes: 5 additions & 15 deletions Sources/LiveKit/Core/Room+EngineDelegate.swift
Original file line number Diff line number Diff line change
Expand Up @@ -129,31 +129,21 @@ extension Room: EngineDelegate {
}
}

func engine(_: Engine, didAddTrack track: LKRTCMediaStreamTrack, rtpReceiver: LKRTCRtpReceiver, streams: [LKRTCMediaStream]) {
guard !streams.isEmpty else {
log("Received onTrack with no streams!", .warning)
return
}

let unpacked = streams[0].streamId.unpack()
let participantSid = unpacked.sid
var trackSid = unpacked.trackId
if trackSid == "" {
trackSid = track.trackId
}
func engine(_: Engine, didAddTrack track: LKRTCMediaStreamTrack, rtpReceiver: LKRTCRtpReceiver, stream: LKRTCMediaStream) {
let parts = stream.streamId.unpack()

let participant = _state.read {
$0.remoteParticipants.values.first { $0.sid == participantSid }
$0.remoteParticipants.values.first { $0.sid == parts.participantSid }
}

guard let participant else {
log("RemoteParticipant not found for sid: \(participantSid), remoteParticipants: \(remoteParticipants)", .warning)
log("RemoteParticipant not found for sid: \(parts.participantSid), remoteParticipants: \(remoteParticipants)", .warning)
return
}

let task = Task.retrying(retryDelay: 0.2) { _, _ in
// TODO: Only retry for TrackError.state = error
try await participant.addSubscribedMediaTrack(rtcTrack: track, rtpReceiver: rtpReceiver, sid: trackSid)
try await participant.addSubscribedMediaTrack(rtcTrack: track, rtpReceiver: rtpReceiver, sid: track.trackId)
}

Task {
Expand Down
6 changes: 3 additions & 3 deletions Sources/LiveKit/Core/Transport.swift
Original file line number Diff line number Diff line change
Expand Up @@ -218,15 +218,15 @@ extension Transport: LKRTCPeerConnectionDelegate {

func peerConnection(_: LKRTCPeerConnection,
didAdd rtpReceiver: LKRTCRtpReceiver,
streams mediaStreams: [LKRTCMediaStream])
streams: [LKRTCMediaStream])
{
guard let track = rtpReceiver.track else {
log("Track is empty for \(target)", .warning)
return
}

log("didAddTrack type: \(type(of: track)), id: \(track.trackId)")
notify { $0.transport(self, didAddTrack: track, rtpReceiver: rtpReceiver, streams: mediaStreams) }
log("type: \(type(of: track)), track.id: \(track.trackId), streams: \(streams.map { "Stream(hash: \($0.hash), id: \($0.streamId), videoTracks: \($0.videoTracks.count), audioTracks: \($0.audioTracks.count))" })")
notify { $0.transport(self, didAddTrack: track, rtpReceiver: rtpReceiver, streams: streams) }
}

func peerConnection(_: LKRTCPeerConnection,
Expand Down
2 changes: 1 addition & 1 deletion Sources/LiveKit/Extensions/Primitives.swift
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import Foundation

extension String {
func unpack() -> (sid: Sid, trackId: String) {
func unpack() -> (participantSid: Sid, trackId: String) {
let parts = split(separator: "|")
if parts.count == 2 {
return (String(parts[0]), String(parts[1]))
Expand Down
7 changes: 7 additions & 0 deletions Sources/LiveKit/Participant/LocalParticipant.swift
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,13 @@ public class LocalParticipant: Participant {
]
}

if let mediaPublishOptions = publishOptions as? MediaPublishOptions,
let streamName = mediaPublishOptions.streamName
{
// Set stream name if specified in options
populator.stream = streamName
}

return transInit
}

Expand Down
2 changes: 1 addition & 1 deletion Sources/LiveKit/Protocols/EngineDelegate.swift
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import Foundation
protocol EngineDelegate: AnyObject {
func engine(_ engine: Engine, didMutateState state: Engine.State, oldState: Engine.State)
func engine(_ engine: Engine, didUpdateSpeakers speakers: [Livekit_SpeakerInfo])
func engine(_ engine: Engine, didAddTrack track: LKRTCMediaStreamTrack, rtpReceiver: LKRTCRtpReceiver, streams: [LKRTCMediaStream])
func engine(_ engine: Engine, didAddTrack track: LKRTCMediaStreamTrack, rtpReceiver: LKRTCRtpReceiver, stream: LKRTCMediaStream)
func engine(_ engine: Engine, didRemoveTrack track: LKRTCMediaStreamTrack)
func engine(_ engine: Engine, didReceiveUserPacket packet: Livekit_UserPacket)
}
13 changes: 10 additions & 3 deletions Sources/LiveKit/Types/Options/AudioPublishOptions.swift
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import Foundation

@objc
public class AudioPublishOptions: NSObject, PublishOptions {
public class AudioPublishOptions: NSObject, MediaPublishOptions {
@objc
public let name: String?

Expand All @@ -28,13 +28,18 @@ public class AudioPublishOptions: NSObject, PublishOptions {
@objc
public let dtx: Bool

@objc
public let streamName: String?

public init(name: String? = nil,
encoding: AudioEncoding? = nil,
dtx: Bool = true)
dtx: Bool = true,
streamName: String? = nil)
{
self.name = name
self.encoding = encoding
self.dtx = dtx
self.streamName = streamName
}

// MARK: - Equal
Expand All @@ -43,14 +48,16 @@ public class AudioPublishOptions: NSObject, PublishOptions {
guard let other = object as? Self else { return false }
return name == other.name &&
encoding == other.encoding &&
dtx == other.dtx
dtx == other.dtx &&
streamName == other.streamName
}

override public var hash: Int {
var hasher = Hasher()
hasher.combine(name)
hasher.combine(encoding)
hasher.combine(dtx)
hasher.combine(streamName)
return hasher.finalize()
}
}
8 changes: 8 additions & 0 deletions Sources/LiveKit/Types/Options/PublishOptions.swift
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,11 @@ import Foundation
public protocol PublishOptions {
var name: String? { get }
}

@objc
public protocol MediaPublishOptions: PublishOptions {
/// Set stream name for the track. Audio and video tracks with the same stream name
/// will be placed in the same `MediaStream` and offer better synchronization.
/// By default, camera and microphone will be placed in a stream; as would screen_share and screen_share_audio
var streamName: String? { get }
}
13 changes: 10 additions & 3 deletions Sources/LiveKit/Types/Options/VideoPublishOptions.swift
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import Foundation

@objc
public class VideoPublishOptions: NSObject, PublishOptions {
public class VideoPublishOptions: NSObject, MediaPublishOptions {
@objc
public let name: String?

Expand Down Expand Up @@ -45,14 +45,18 @@ public class VideoPublishOptions: NSObject, PublishOptions {
@objc
public let preferredBackupCodec: VideoCodec?

@objc
public let streamName: String?

public init(name: String? = nil,
encoding: VideoEncoding? = nil,
screenShareEncoding: VideoEncoding? = nil,
simulcast: Bool = true,
simulcastLayers: [VideoParameters] = [],
screenShareSimulcastLayers: [VideoParameters] = [],
preferredCodec: VideoCodec? = nil,
preferredBackupCodec: VideoCodec? = nil)
preferredBackupCodec: VideoCodec? = nil,
streamName: String? = nil)
{
self.name = name
self.encoding = encoding
Expand All @@ -62,6 +66,7 @@ public class VideoPublishOptions: NSObject, PublishOptions {
self.screenShareSimulcastLayers = screenShareSimulcastLayers
self.preferredCodec = preferredCodec
self.preferredBackupCodec = preferredBackupCodec
self.streamName = streamName
}

// MARK: - Equal
Expand All @@ -75,7 +80,8 @@ public class VideoPublishOptions: NSObject, PublishOptions {
simulcastLayers == other.simulcastLayers &&
screenShareSimulcastLayers == other.screenShareSimulcastLayers &&
preferredCodec == other.preferredCodec &&
preferredBackupCodec == other.preferredBackupCodec
preferredBackupCodec == other.preferredBackupCodec &&
streamName == other.streamName
}

override public var hash: Int {
Expand All @@ -88,6 +94,7 @@ public class VideoPublishOptions: NSObject, PublishOptions {
hasher.combine(screenShareSimulcastLayers)
hasher.combine(preferredCodec)
hasher.combine(preferredBackupCodec)
hasher.combine(streamName)
return hasher.finalize()
}
}

0 comments on commit 1461398

Please sign in to comment.