Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RSDK-6515 Add dial timeout and stats #188

Merged
merged 8 commits into from
Apr 12, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 115 additions & 4 deletions lib/src/rpc/dial.dart
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import '../utils.dart';
import 'grpc/grpc_or_grpcweb_channel.dart';
import 'web_rtc/web_rtc_client.dart';

final _logger = Logger();
final _logger = Logger(printer: PrettyPrinter(printTime: true));

/// Describes the behavior for connecting to a robot
class DialOptions {
Expand Down Expand Up @@ -53,6 +53,9 @@ class DialOptions {

/// Whether the connection was made using mDNS
bool _usingMdns = false;

/// Timeout is the timeout for dial.
Duration timeout = Duration(seconds: 10);
}

/// The credentials used for connecting to the robot
Expand Down Expand Up @@ -110,10 +113,12 @@ class DialWebRtcOptions {

/// Connect to a robot at the provided address with the given options
Future<ClientChannelBase> dial(String address, DialOptions? options, String Function() sessionCallback) async {
final dialSW = Stopwatch()..start();
_logger.i('Connecting to address $address');
final opts = options ?? DialOptions();

if (opts.attemptMdns) {
final mdnsSW = Stopwatch()..start();
try {
final mdnsUri = await _searchMdns(address);
// Let downstream calls know when mdns was used. This is helpful to inform
Expand All @@ -129,16 +134,23 @@ Future<ClientChannelBase> dial(String address, DialOptions? options, String Func
} catch (e) {
_logger.d('Error dialing with mDNS; falling back to other methods', error: e);
}
mdnsSW.stop();
_logger.d('STATS: mDNS discovery took ${mdnsSW.elapsed}');
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would love to capture this data somehow so we could historically track.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah NetCode will be working on that eventually (gathering this data and tracking it to get a sense of how our connection establishment performance looks over time). For now, I think we're ok just debug-logging a number of measurements in this dial code and in other connection establishment paths.

}

bool disableWebRtc = opts.webRtcOptions?.disable ?? false;
if (address.contains('.local.') || address.contains('localhost')) {
disableWebRtc = true;
}
final Future<ClientChannelBase> chan;
if (disableWebRtc) {
return _dialDirectGrpc(address, opts, sessionCallback);
chan = _dialDirectGrpc(address, opts, sessionCallback);
} else {
chan = _dialWebRtc(address, opts, sessionCallback);
}
return _dialWebRtc(address, opts, sessionCallback);
dialSW.stop();
_logger.d('STATS: dial function took ${dialSW.elapsed}');
return chan.timeout(opts.timeout);
}

Future<String> _searchMdns(String address) async {
Expand Down Expand Up @@ -188,7 +200,54 @@ Future<ClientChannelBase> _dialDirectGrpc(String address, DialOptions options, S
return _authenticatedChannel(address, options, sessionCallback);
}

Future _logConnectionStats(Stopwatch webrtcDialSW, RTCPeerConnection peerConnection, int updateCalls, Duration totalCallUpdateDuration,
maxCallUpdateDuration) async {
webrtcDialSW.stop();
_logger.d('STATS: all ICE candidates gathered in ${webrtcDialSW.elapsed}');
_logger.d('STATS: $updateCalls call updates to the signaling server were made');
// Floor average call update duration to closest millisecond.
final averageCallUpdateDuration = Duration(milliseconds: (totalCallUpdateDuration.inMilliseconds ~/ updateCalls));
_logger.d('STATS: average call update took $averageCallUpdateDuration');
_logger.d('STATS: max call update took $maxCallUpdateDuration');

// Attempt to find chosen local and remote ICE candidate's addresses,
// ports, and candidate types: 'host', 'srflx' or 'relay'. 'host' is a
// candidate within the network; 'srflx' is a candidate returned by a
// STUN server; 'relay' is a candidate returned by a TURN server. Note
// that multiple candidate pairs can be nominated if there was an
// "upgrade" in the connection.
final stats = await peerConnection.getStats();
for (var stat in stats) {
// NOTE(benjirewis): some magic-string-usage here; there are not great
// constants in the WebRTC library for these fields.
if (stat.type == 'candidate-pair' && stat.values['nominated']) {
// Use 'lastPacketSentTimestamp' on candidate pair to estimate when the
// pair was nominated.
final double lpst = stat.values['lastPacketSentTimestamp'];
final DateTime nominatedTime = DateTime.fromMillisecondsSinceEpoch(lpst.toInt());

final String lcid = stat.values['localCandidateId'];
final String rcid = stat.values['remoteCandidateId'];
for (var innerStat in stats) {
if (innerStat.id == lcid) {
final type = innerStat.values['candidateType'];
final addr = innerStat.values['address'];
final port = innerStat.values['port'];
_logger.d('STATS: chose $type local candidate with IP $addr:$port @ $nominatedTime');
}
if (innerStat.id == rcid) {
final type = innerStat.values['candidateType'];
final addr = innerStat.values['address'];
final port = innerStat.values['port'];
_logger.d('STATS: chose $type remote candidate with IP $addr:$port @ $nominatedTime');
}
}
}
}
}

Future<ClientChannelBase> _dialWebRtc(String address, DialOptions options, String Function() sessionCallback) async {
final Stopwatch webrtcDialSW = Stopwatch()..start();
_logger.d('Dialing WebRTC to $address');
if (options.authEntity.isNullOrEmpty) {
if (options.externalAuthAddress.isNullOrEmpty) {
Expand All @@ -201,9 +260,11 @@ Future<ClientChannelBase> _dialWebRtc(String address, DialOptions options, Strin
}

final signalingServer = options.webRtcOptions?.signalingServerAddress ?? ((options._usingMdns) ? address : 'app.viam.com');
final sigServerSW = Stopwatch()..start();
_logger.d('Connecting to signaling server: $signalingServer');
final signalingChannel = await _dialDirectGrpc(signalingServer, options, sessionCallback);
_logger.d('Connected to signaling server: $signalingServer');
sigServerSW.stop();
_logger.d('STATS: connected to signaling in ${sigServerSW.elapsed}');
final signalingClient = SignalingServiceClient(signalingChannel, options: CallOptions(metadata: {'rpc-host': address}));
WebRTCConfig config;
try {
Expand All @@ -224,6 +285,7 @@ Future<ClientChannelBase> _dialWebRtc(String address, DialOptions options, Strin
'sdpSemantics': 'unified-plan',
});

final createPeerConnSW = Stopwatch()..start();
final peerConnection = await createPeerConnection({'iceServers': iceServers});
final dataChannel = await peerConnection.createDataChannel(
'data',
Expand All @@ -242,11 +304,38 @@ Future<ClientChannelBase> _dialWebRtc(String address, DialOptions options, Strin
..negotiated = true
..ordered = true,
);
createPeerConnSW.stop();
_logger.d('STATS: created peer connection and channels in ${createPeerConnSW.elapsed}');

String? uuid;
final didConnect = Completer();
final didSetRemoteDesc = Completer();

int updateCalls = 0; // how many times we've sent an update to the sig. server
int updateCallsFinished = 0; // how many update calls have finished
Duration totalCallUpdateDuration = Duration();
Duration maxCallUpdateDuration = Duration();
bool iceConnectionCompleted = false;

peerConnection.onIceConnectionState = (RTCIceConnectionState state) async {
// 'connected' ICE connection state represents when a candidate pair has
// been nominated, and a connection has been established. ICE candidate
// gathering may still be happening, but the connection should be usable.
if (state == RTCIceConnectionState.RTCIceConnectionStateConnected) {
_logger.d('STATS: WebRTC connection made in ${webrtcDialSW.elapsed}');
}
// 'completed' ICE connection state represents when the ICE agent has
// finished gathering all candidates.
if (state == RTCIceConnectionState.RTCIceConnectionStateCompleted) {
iceConnectionCompleted = true;
// If all update calls have finished, report stats now. Otherwise, rely
// on `onIceCandidate` callback below to report them.
if (updateCalls == updateCallsFinished) {
await _logConnectionStats(webrtcDialSW, peerConnection, updateCalls, totalCallUpdateDuration, maxCallUpdateDuration);
}
}
};

// If trickleICE is enabled, set onIceCandidate handler
if (!(options.webRtcOptions?.disableTrickleIce ?? config.disableTrickle)) {
final offer = await peerConnection.createOffer({});
Expand All @@ -272,7 +361,24 @@ Future<ClientChannelBase> _dialWebRtc(String address, DialOptions options, Strin
if (uuid != null) {
callUpdateRequest.uuid = uuid!;
}
final Stopwatch stopwatch = Stopwatch()..start();
final currUpdateCall = updateCalls++;
benjirewis marked this conversation as resolved.
Show resolved Hide resolved
_logger.d('STATS: making call update $currUpdateCall to the signaling server');
await signalingClient.callUpdate(callUpdateRequest);
stopwatch.stop();
_logger.d('STATS: call update $currUpdateCall took ${stopwatch.elapsed}');
final callUpdateDuration = stopwatch.elapsed;
if (callUpdateDuration.compareTo(maxCallUpdateDuration) > 0) {
maxCallUpdateDuration = callUpdateDuration;
}
totalCallUpdateDuration += callUpdateDuration;
updateCallsFinished++;

// If ICE connection state has reached 'completed' and we have finished
// all tracked updateCalls, report stats.
if (iceConnectionCompleted && updateCalls == updateCallsFinished) {
await _logConnectionStats(webrtcDialSW, peerConnection, updateCalls, totalCallUpdateDuration, maxCallUpdateDuration);
}
} catch (error, st) {
_logger.e('Update ICECandidate error', error: error, stackTrace: st);
}
Expand Down Expand Up @@ -387,10 +493,13 @@ String _encodeSDPJsonStringToBase64String(String sdp) {
}

Future<AuthenticatedChannel> _authenticatedChannel(String address, DialOptions options, String Function() sessionsCallback) async {
final authSW = Stopwatch()..start();
String accessToken = options.accessToken ?? '';
if (accessToken.isNotEmpty && options.externalAuthAddress.isNullOrEmpty && options.externalAuthToEntity.isNullOrEmpty) {
_logger.d('Received pre-authenticated access token');
final addr = _hostAndPort(address, options.insecure);
authSW.stop();
_logger.d('STATS: authentication (pre-authenticated) took ${authSW.elapsed}');
return AuthenticatedChannel(addr.host, addr.port, accessToken, options.insecure, sessionsCallback);
}

Expand Down Expand Up @@ -439,6 +548,8 @@ Future<AuthenticatedChannel> _authenticatedChannel(String address, DialOptions o
}

final actual = _hostAndPort(address, options.insecure);
authSW.stop();
_logger.d('STATS: authentication took ${authSW.elapsed}');
benjirewis marked this conversation as resolved.
Show resolved Hide resolved
return AuthenticatedChannel(actual.host, actual.port, accessToken, options.insecure, sessionsCallback);
}

Expand Down
Loading