Skip to content

Commit

Permalink
Implement RPC feature (#544)
Browse files Browse the repository at this point in the history
Pretty much identical to
livekit/client-sdk-android#578

I included tests which mock the underlying data channel sending and
receiving to test both sides in the same SDK app instance.

---------

Co-authored-by: Hiroshi Horie <548776+hiroshihorie@users.noreply.github.com>
  • Loading branch information
bcherry and hiroshihorie authored Jan 17, 2025
1 parent f9bb4f6 commit 7b4ec4d
Show file tree
Hide file tree
Showing 12 changed files with 848 additions and 7 deletions.
14 changes: 8 additions & 6 deletions Sources/LiveKit/Core/DataChannelPair.swift
Original file line number Diff line number Diff line change
Expand Up @@ -104,19 +104,21 @@ class DataChannelPair: NSObject, Loggable {
}

public func send(userPacket: Livekit_UserPacket, kind: Livekit_DataPacket.Kind) throws {
guard isOpen else {
throw LiveKitError(.invalidState, message: "Data channel is not open")
}

let packet = Livekit_DataPacket.with {
try send(dataPacket: .with {
$0.kind = kind
$0.user = userPacket
})
}

public func send(dataPacket packet: Livekit_DataPacket) throws {
guard isOpen else {
throw LiveKitError(.invalidState, message: "Data channel is not open")
}

let serializedData = try packet.serializedData()
let rtcData = RTC.createDataBuffer(data: serializedData)

let channel = _state.read { kind == .reliable ? $0.reliable : $0.lossy }
let channel = _state.read { packet.kind == .reliable ? $0.reliable : $0.lossy }
guard let sendDataResult = channel?.sendData(rtcData), sendDataResult else {
throw LiveKitError(.invalidState, message: "sendData failed")
}
Expand Down
187 changes: 187 additions & 0 deletions Sources/LiveKit/Core/RPC.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
/*
* Copyright 2025 LiveKit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import Foundation

/// Specialized error handling for RPC methods.
///
/// Instances of this type, when thrown in a RPC method handler, will have their `message`
/// serialized and sent across the wire. The sender will receive an equivalent error on the other side.
///
/// Built-in types are included but developers may use any message string, with a max length of 256 bytes.
struct RpcError: Error {
/// The error code of the RPC call. Error codes 1001-1999 are reserved for built-in errors.
///
/// See `RpcError.BuiltInError` for built-in error information.
let code: Int

/// A message to include. Strings over 256 bytes will be truncated.
let message: String

/// An optional data payload. Must be smaller than 15KB in size, or else will be truncated.
let data: String

enum BuiltInError {
case applicationError
case connectionTimeout
case responseTimeout
case recipientDisconnected
case responsePayloadTooLarge
case sendFailed
case unsupportedMethod
case recipientNotFound
case requestPayloadTooLarge
case unsupportedServer
case unsupportedVersion

var code: Int {
switch self {
case .applicationError: return 1500
case .connectionTimeout: return 1501
case .responseTimeout: return 1502
case .recipientDisconnected: return 1503
case .responsePayloadTooLarge: return 1504
case .sendFailed: return 1505
case .unsupportedMethod: return 1400
case .recipientNotFound: return 1401
case .requestPayloadTooLarge: return 1402
case .unsupportedServer: return 1403
case .unsupportedVersion: return 1404
}
}

var message: String {
switch self {
case .applicationError: return "Application error in method handler"
case .connectionTimeout: return "Connection timeout"
case .responseTimeout: return "Response timeout"
case .recipientDisconnected: return "Recipient disconnected"
case .responsePayloadTooLarge: return "Response payload too large"
case .sendFailed: return "Failed to send"
case .unsupportedMethod: return "Method not supported at destination"
case .recipientNotFound: return "Recipient not found"
case .requestPayloadTooLarge: return "Request payload too large"
case .unsupportedServer: return "RPC not supported by server"
case .unsupportedVersion: return "Unsupported RPC version"
}
}

func create(data: String = "") -> RpcError {
RpcError(code: code, message: message, data: data)
}
}

static func builtIn(_ key: BuiltInError, data: String = "") -> RpcError {
RpcError(code: key.code, message: key.message, data: data)
}

static let MAX_MESSAGE_BYTES = 256
static let MAX_DATA_BYTES = 15360 // 15 KB

static func fromProto(_ proto: Livekit_RpcError) -> RpcError {
RpcError(
code: Int(proto.code),
message: (proto.message).truncate(maxBytes: MAX_MESSAGE_BYTES),
data: proto.data.truncate(maxBytes: MAX_DATA_BYTES)
)
}

func toProto() -> Livekit_RpcError {
Livekit_RpcError.with {
$0.code = UInt32(code)
$0.message = message
$0.data = data
}
}
}

/*
* Maximum payload size for RPC requests and responses. If a payload exceeds this size,
* the RPC call will fail with a REQUEST_PAYLOAD_TOO_LARGE(1402) or RESPONSE_PAYLOAD_TOO_LARGE(1504) error.
*/
let MAX_RPC_PAYLOAD_BYTES = 15360 // 15 KB

/// A handler that processes an RPC request and returns a string
/// that will be sent back to the requester.
///
/// Throwing an `RpcError` will send the error back to the requester.
///
/// - SeeAlso: `LocalParticipant.registerRpcMethod`
public typealias RpcHandler = (RpcInvocationData) async throws -> String

public struct RpcInvocationData {
/// A unique identifier for this RPC request
let requestId: String

/// The identity of the RemoteParticipant who initiated the RPC call
let callerIdentity: Participant.Identity

/// The data sent by the caller (as a string)
let payload: String

/// The maximum time available to return a response
let responseTimeout: TimeInterval
}

struct PendingRpcResponse {
let participantIdentity: Participant.Identity
let onResolve: (_ payload: String?, _ error: RpcError?) -> Void
}

actor RpcStateManager {
private var handlers: [String: RpcHandler] = [:] // methodName to handler
private var pendingAcks: Set<String> = Set()
private var pendingResponses: [String: PendingRpcResponse] = [:] // requestId to pending response

func registerHandler(_ method: String, handler: @escaping RpcHandler) {
handlers[method] = handler
}

func unregisterHandler(_ method: String) {
handlers.removeValue(forKey: method)
}

func getHandler(for method: String) -> RpcHandler? {
handlers[method]
}

func addPendingAck(_ requestId: String) {
pendingAcks.insert(requestId)
}

@discardableResult
func removePendingAck(_ requestId: String) -> Bool {
pendingAcks.remove(requestId) != nil
}

func hasPendingAck(_ requestId: String) -> Bool {
pendingAcks.contains(requestId)
}

func setPendingResponse(_ requestId: String, response: PendingRpcResponse) {
pendingResponses[requestId] = response
}

@discardableResult
func removePendingResponse(_ requestId: String) -> PendingRpcResponse? {
pendingResponses.removeValue(forKey: requestId)
}

func removeAllPending(_ requestId: String) async {
pendingAcks.remove(requestId)
pendingResponses.removeValue(forKey: requestId)
}
}
9 changes: 8 additions & 1 deletion Sources/LiveKit/Core/Room+Engine.swift
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,13 @@ extension Room {
}

func send(userPacket: Livekit_UserPacket, kind: Livekit_DataPacket.Kind) async throws {
try await send(dataPacket: .with {
$0.user = userPacket
$0.kind = kind
})
}

func send(dataPacket packet: Livekit_DataPacket) async throws {
func ensurePublisherConnected() async throws {
guard _state.isSubscriberPrimary else { return }

Expand Down Expand Up @@ -96,7 +103,7 @@ extension Room {
}

// Should return true if successful
try publisherDataChannel.send(userPacket: userPacket, kind: kind)
try publisherDataChannel.send(dataPacket: packet)
}
}

Expand Down
35 changes: 35 additions & 0 deletions Sources/LiveKit/Core/Room+EngineDelegate.swift
Original file line number Diff line number Diff line change
Expand Up @@ -239,4 +239,39 @@ extension Room {
$0.participant?(participant, trackPublication: publication, didReceiveTranscriptionSegments: segments)
}
}

func room(didReceiveRpcResponse response: Livekit_RpcResponse) {
let (payload, error): (String?, RpcError?) = switch response.value {
case let .payload(v): (v, nil)
case let .error(e): (nil, RpcError.fromProto(e))
default: (nil, nil)
}

localParticipant.handleIncomingRpcResponse(requestId: response.requestID,
payload: payload,
error: error)
}

func room(didReceiveRpcAck ack: Livekit_RpcAck) {
let requestId = ack.requestID
localParticipant.handleIncomingRpcAck(requestId: requestId)
}

func room(didReceiveRpcRequest request: Livekit_RpcRequest, from participantIdentity: String) {
let callerIdentity = Participant.Identity(from: participantIdentity)
let requestId = request.id
let method = request.method
let payload = request.payload
let responseTimeout = TimeInterval(UInt64(request.responseTimeoutMs) / MSEC_PER_SEC)
let version = Int(request.version)

Task {
await localParticipant.handleIncomingRpcRequest(callerIdentity: callerIdentity,
requestId: requestId,
method: method,
payload: payload,
responseTimeout: responseTimeout,
version: version)
}
}
}
3 changes: 3 additions & 0 deletions Sources/LiveKit/Core/Room.swift
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,9 @@ extension Room: DataChannelDelegate {
case let .speaker(update): engine(self, didUpdateSpeakers: update.speakers)
case let .user(userPacket): engine(self, didReceiveUserPacket: userPacket)
case let .transcription(packet): room(didReceiveTranscriptionPacket: packet)
case let .rpcResponse(response): room(didReceiveRpcResponse: response)
case let .rpcAck(ack): room(didReceiveRpcAck: ack)
case let .rpcRequest(request): room(didReceiveRpcRequest: request, from: dataPacket.participantIdentity)
default: return
}
}
Expand Down
3 changes: 3 additions & 0 deletions Sources/LiveKit/Errors.swift
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public enum LiveKitErrorType: Int, Sendable {
case failedToParseUrl = 102
case failedToConvertData = 103
case invalidState = 104
case invalidParameter = 105

case webRTC = 201

Expand Down Expand Up @@ -66,6 +67,8 @@ extension LiveKitErrorType: CustomStringConvertible {
return "Failed to convert data"
case .invalidState:
return "Invalid state"
case .invalidParameter:
return "Invalid parameter"
case .webRTC:
return "WebRTC error"
case .network:
Expand Down
25 changes: 25 additions & 0 deletions Sources/LiveKit/Extensions/String.swift
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,29 @@ extension String {
var nilIfEmpty: String? {
isEmpty ? nil : self
}

var byteLength: Int {
data(using: .utf8)?.count ?? 0
}

func truncate(maxBytes: Int) -> String {
if byteLength <= maxBytes {
return self
}

var low = 0
var high = count

while low < high {
let mid = (low + high + 1) / 2
let substring = String(prefix(mid))
if substring.byteLength <= maxBytes {
low = mid
} else {
high = mid - 1
}
}

return String(prefix(low))
}
}
Loading

0 comments on commit 7b4ec4d

Please sign in to comment.