Skip to content

Commit

Permalink
exception handling
Browse files Browse the repository at this point in the history
Signed-off-by: Clemens Vasters <clemens@vasters.com>
  • Loading branch information
clemensv committed Jan 23, 2025
1 parent ac4721f commit 7c9f96a
Showing 1 changed file with 83 additions and 78 deletions.
161 changes: 83 additions & 78 deletions mode-s/mode_s_kafka_bridge/mode_s.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import aiohttp
import re
import logging
from datetime import datetime, timedelta, timezone
from datetime import datetime, time, timedelta, timezone
from typing import Dict, List, AsyncIterator, Any
from zoneinfo import ZoneInfo
import argparse
Expand Down Expand Up @@ -49,85 +49,90 @@ def stop(self):
return super().stop()

def handle_messages(self, messages):
if self.stop_flag:
return
msgs = []
for msg, ts in messages:
raw_msg = bytes.fromhex(msg)
if len(raw_msg) < 7:
continue
ts_ms = int(ts * 1000)
df = pms.df(msg)
icao = pms.icao(msg)
dbfs_rssi = None
raw_rssi = raw_msg[6]
if raw_rssi > 0:
rssi_ratio = raw_rssi / 255
signal_level = rssi_ratio ** 2
dbfs_rssi = round(10 * math.log10(signal_level), 2)

record = ModeS_ADSB_Record(
ts=ts_ms, icao=icao, df=df, rssi=dbfs_rssi, tc=None, bcode=None, alt=None, cs=None, sq=None, lat=None, lon=None,
spd=None, ang=None, vr=None, spd_type=None, dir_src=None, vr_src=None, ws=None, wd=None, at=None, ap=None, hm=None,
roll=None, trak=None, gs=None, tas=None, hd=None, ias=None, m=None, vrb=None, vri=None, emst=None, tgt=None, opst=None
)
if df in (17, 18):
if pms.crc(msg) != 0:
try:
if self.stop_flag:
return
msgs = []
for msg, ts in messages:
raw_msg = bytes.fromhex(msg)
if len(raw_msg) < 7:
continue
tc = pms.typecode(msg)
record.tc = tc
if 1 <= tc <= 4:
record.cs = pms.adsb.callsign(msg)
elif 5 <= tc <= 8:
lat, lon = pms.adsb.surface_position_with_ref(msg, self.ref_lat, self.ref_lon)
record.lat, record.lon = lat, lon
elif 9 <= tc <= 18:
record.alt = pms.adsb.altitude(msg)
lat, lon = pms.adsb.airborne_position_with_ref(msg, self.ref_lat, self.ref_lon)
record.lat, record.lon = lat, lon
elif tc == 19:
speed, angle, vr, spd_type, *extras = pms.adsb.velocity(msg)
record.spd, record.ang, record.vr = speed, angle, vr
record.spd_type = spd_type
if len(extras) > 0:
record.dir_src = extras[0]
if len(extras) > 1:
record.vr_src = extras[1]
elif 20 <= tc <= 22:
record.alt = pms.adsb.altitude(msg)
lat, lon = pms.adsb.airborne_position_with_ref(msg, self.ref_lat, self.ref_lon)
record.lat, record.lon = lat, lon
elif df in (20, 21):
bds = pms.bds.infer(msg, mrar=True)
record.bcode = bds if bds else None
if df == 20:
record.alt = pms.common.altcode(msg)
if df == 21:
record.sq = str(pms.common.idcode(msg))
if bds == "BDS44":
ws, wd = pms.commb.wind44(msg)
record.ws, record.wd = ws, wd
record.at = pms.commb.temp44(msg)
record.ap = pms.commb.p44(msg)
record.hm = pms.commb.hum44(msg)
elif bds == "BDS50":
record.roll = pms.commb.roll50(msg)
record.trak = pms.commb.trk50(msg)
record.gs = pms.commb.gs50(msg)
record.tas = pms.commb.tas50(msg)
elif bds == "BDS60":
record.hd = pms.commb.hdg60(msg)
record.ias = pms.commb.ias60(msg)
record.m = pms.commb.mach60(msg)
record.vrb = pms.commb.vr60baro(msg)
record.vri = pms.commb.vr60ins(msg)
msgs.append(record)
ts_ms = int(ts * 1000)
df = pms.df(msg)
icao = pms.icao(msg)
dbfs_rssi = None
raw_rssi = raw_msg[6]
if raw_rssi > 0:
rssi_ratio = raw_rssi / 255
signal_level = rssi_ratio ** 2
dbfs_rssi = round(10 * math.log10(signal_level), 2)

record = ModeS_ADSB_Record(
ts=ts_ms, icao=icao, df=df, rssi=dbfs_rssi, tc=None, bcode=None, alt=None, cs=None, sq=None, lat=None, lon=None,
spd=None, ang=None, vr=None, spd_type=None, dir_src=None, vr_src=None, ws=None, wd=None, at=None, ap=None, hm=None,
roll=None, trak=None, gs=None, tas=None, hd=None, ias=None, m=None, vrb=None, vri=None, emst=None, tgt=None, opst=None
)
if df in (17, 18):
if pms.crc(msg) != 0:
continue
tc = pms.typecode(msg)
record.tc = tc
if 1 <= tc <= 4:
record.cs = pms.adsb.callsign(msg)
elif 5 <= tc <= 8:
lat, lon = pms.adsb.surface_position_with_ref(msg, self.ref_lat, self.ref_lon)
record.lat, record.lon = lat, lon
elif 9 <= tc <= 18:
record.alt = pms.adsb.altitude(msg)
lat, lon = pms.adsb.airborne_position_with_ref(msg, self.ref_lat, self.ref_lon)
record.lat, record.lon = lat, lon
elif tc == 19:
speed, angle, vr, spd_type, *extras = pms.adsb.velocity(msg)
record.spd, record.ang, record.vr = speed, angle, vr
record.spd_type = spd_type
if len(extras) > 0:
record.dir_src = extras[0]
if len(extras) > 1:
record.vr_src = extras[1]
elif 20 <= tc <= 22:
record.alt = pms.adsb.altitude(msg)
lat, lon = pms.adsb.airborne_position_with_ref(msg, self.ref_lat, self.ref_lon)
record.lat, record.lon = lat, lon
elif df in (20, 21):
bds = pms.bds.infer(msg, mrar=True)
record.bcode = bds if bds else None
if df == 20:
record.alt = pms.common.altcode(msg)
if df == 21:
record.sq = str(pms.common.idcode(msg))
if bds == "BDS44":
ws, wd = pms.commb.wind44(msg)
record.ws, record.wd = ws, wd
record.at = pms.commb.temp44(msg)
record.ap = pms.commb.p44(msg)
record.hm = pms.commb.hum44(msg)
elif bds == "BDS50":
record.roll = pms.commb.roll50(msg)
record.trak = pms.commb.trk50(msg)
record.gs = pms.commb.gs50(msg)
record.tas = pms.commb.tas50(msg)
elif bds == "BDS60":
record.hd = pms.commb.hdg60(msg)
record.ias = pms.commb.ias60(msg)
record.m = pms.commb.mach60(msg)
record.vrb = pms.commb.vr60baro(msg)
record.vri = pms.commb.vr60ins(msg)
msgs.append(record)

if len(msgs) > 0:
bundle = Messages(messages=msgs)
self.task_queue.append(bundle)
if len(self.task_queue) > 20:
logger.warning("Queue length is now %d", len(self.task_queue))
if len(msgs) > 0:
bundle = Messages(messages=msgs)
self.task_queue.append(bundle)
if len(self.task_queue) > 20:
logger.warning("Queue length is now %d", len(self.task_queue))
except Exception as e:
logger.error("Error handling messages: %s", e)
# we're going to observe a moment of silence and hope the problem goes away
time.sleep(0.1)


async def queue_consumer(self, stop_event: threading.Event):
Expand Down

0 comments on commit 7c9f96a

Please sign in to comment.