Skip to content

Commit

Permalink
fix more telemetry bugs, separate classes
Browse files Browse the repository at this point in the history
  • Loading branch information
paulo authored and paulo committed May 4, 2024
1 parent c1054d4 commit 6c080c7
Show file tree
Hide file tree
Showing 8 changed files with 316 additions and 294 deletions.
65 changes: 65 additions & 0 deletions lib/providers/telemetry/telemetry_latest_streamer.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import 'package:riverpod_annotation/riverpod_annotation.dart';

import '../../models/telemetry_state.dart';
import '../../models/timed_telemetry.dart';
import '../../repository/telemetry_repository.dart';
import '../radio_config/radio_config_service.dart';
import '../repository/telemetry_repository.dart';
import 'telemetry_receiver.dart';

part 'telemetry_latest_streamer.g.dart';

@Riverpod(keepAlive: true)
class TelemetryLatestStreamer extends _$TelemetryLatestStreamer {
late TelemetryRepository _telemetryRepository;
late int _myNodeNum;
late int _nodeNum;

@override
TelemetryState build({required int nodeNum}) {
final sub = ref
.watch(telemetryReceiverProvider)
.addTelemetryListener(nodeNum: nodeNum, listener: _processTelemetry);
_myNodeNum = ref
.watch(radioConfigServiceProvider.select((value) => value.myNodeNum));
_telemetryRepository = ref.watch(telemetryRepositoryProvider);
_nodeNum = nodeNum;
ref.onDispose(sub.cancel);
_loadFirstTelemetry();
return const TelemetryState();
}

void _processTelemetry(TimedTelemetry timedTelemetry) {
final telemetry = timedTelemetry.telemetry;
final environmentMetrics = telemetry.environmentMetrics;
if (environmentMetrics.hasTemperature()) {
state = state.copyWith(temp: telemetry.environmentMetrics.temperature);
}
if (environmentMetrics.hasRelativeHumidity()) {
state = state.copyWith(
relativeHumidity: telemetry.environmentMetrics.relativeHumidity,
);
}
if (environmentMetrics.hasBarometricPressure()) {
state = state.copyWith(
barometricPressure: telemetry.environmentMetrics.barometricPressure,
);
}
if (environmentMetrics.hasGasResistance()) {
state = state.copyWith(
gasResistance: telemetry.environmentMetrics.gasResistance,
);
}
}

Future<void> _loadFirstTelemetry() async {
final telemetry = await _telemetryRepository.getOne(
index: 0,
fromNode: _nodeNum,
owner: _myNodeNum,
);
if (telemetry != null) {
_processTelemetry(telemetry);
}
}
}
175 changes: 175 additions & 0 deletions lib/providers/telemetry/telemetry_latest_streamer.g.dart

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

128 changes: 1 addition & 127 deletions lib/providers/telemetry/telemetry_receiver.dart
Original file line number Diff line number Diff line change
@@ -1,15 +1,6 @@
import 'dart:async';

import 'package:logger/logger.dart';
import 'package:riverpod_annotation/riverpod_annotation.dart';

import '../../models/telemetry_state.dart';
import '../../models/timed_telemetry.dart';
import '../../protobufs/generated/meshtastic/mesh.pb.dart';
import '../../protobufs/generated/meshtastic/portnums.pb.dart';
import '../../protobufs/generated/meshtastic/telemetry.pb.dart';
import '../../repository/telemetry_repository.dart';
import '../../services/interfaces/radio_reader.dart';
import '../../services/telemetry/telemetry_receiver.dart';
import '../radio_config/radio_config_service.dart';
import '../radio_reader.dart';
import '../repository/telemetry_repository.dart';
Expand All @@ -25,120 +16,3 @@ TelemetryReceiver telemetryReceiver(TelemetryReceiverRef ref) {
onDispose: ref.onDispose,
);
}

class TelemetryReceiver {
TelemetryReceiver({
required int myNodeNum,
required TelemetryRepository telemetryRepository,
required RadioReader radioReader,
required void Function(void Function() cb) onDispose,
}) : _myNodeNum = myNodeNum,
_telemetryRepository = telemetryRepository,
_radioReader = radioReader,
_onDispose = onDispose {
final sub = _radioReader.onPacketReceived().listen(_processPacket);
_streamController = StreamController<MeshPacket>.broadcast();
_onDispose(sub.cancel);
_onDispose(_streamController.close);
}

final int _myNodeNum;
final TelemetryRepository _telemetryRepository;
final void Function(void Function() cb) _onDispose;
final RadioReader _radioReader;
late StreamController<MeshPacket> _streamController;
final _logger = Logger();

void _processPacket(FromRadio event) {
final packet = event.packet;
final decoded = packet.decoded;
if (decoded.portnum != PortNum.TELEMETRY_APP) {
return;
}
final telemetry = Telemetry.fromBuffer(decoded.payload);
if (!telemetry.hasEnvironmentMetrics()) {
// TODO: handle deviceMetrics
return;
}

_logger.i(telemetry.toString());
_streamController.add(packet);
_telemetryRepository.add(
timedTelemetry:
TimedTelemetry(timeReceived: DateTime.now(), telemetry: telemetry),
fromNode: packet.from,
owner: _myNodeNum,
);
}

StreamSubscription<MeshPacket> addTelemetryListener({
required int nodeNum,
required void Function(TimedTelemetry) listener,
}) {
return _streamController.stream
.where((event) => event.from == nodeNum)
.listen((event) {
listener(
TimedTelemetry(
timeReceived: DateTime.now(),
telemetry: Telemetry.fromBuffer(event.decoded.payload),
),
);
});
}
}

@Riverpod(keepAlive: true)
class TelemetryLatestStreamer extends _$TelemetryLatestStreamer {
late TelemetryRepository _telemetryRepository;
late int _myNodeNum;
late int _nodeNum;

@override
TelemetryState build({required int nodeNum}) {
final sub = ref
.watch(telemetryReceiverProvider)
.addTelemetryListener(nodeNum: nodeNum, listener: _processTelemetry);
_myNodeNum = ref
.watch(radioConfigServiceProvider.select((value) => value.myNodeNum));
_telemetryRepository = ref.watch(telemetryRepositoryProvider);
_nodeNum = nodeNum;
ref.onDispose(sub.cancel);
_loadFirstTelemetry();
return const TelemetryState();
}

void _processTelemetry(TimedTelemetry timedTelemetry) {
final telemetry = timedTelemetry.telemetry;
final environmentMetrics = telemetry.environmentMetrics;
if (environmentMetrics.hasTemperature()) {
state = state.copyWith(temp: telemetry.environmentMetrics.temperature);
}
if (environmentMetrics.hasRelativeHumidity()) {
state = state.copyWith(
relativeHumidity: telemetry.environmentMetrics.relativeHumidity,
);
}
if (environmentMetrics.hasBarometricPressure()) {
state = state.copyWith(
barometricPressure: telemetry.environmentMetrics.barometricPressure,
);
}
if (environmentMetrics.hasGasResistance()) {
state = state.copyWith(
gasResistance: telemetry.environmentMetrics.gasResistance,
);
}
}

Future<void> _loadFirstTelemetry() async {
final telemetry = await _telemetryRepository.getOne(
index: 0,
fromNode: _nodeNum,
owner: _myNodeNum,
);
if (telemetry != null) {
_processTelemetry(telemetry);
}
}
}
Loading

0 comments on commit 6c080c7

Please sign in to comment.