Skip to content

Commit

Permalink
better logging
Browse files Browse the repository at this point in the history
Signed-off-by: Clemens Vasters <clemens@vasters.com>
  • Loading branch information
clemensv committed Jan 24, 2025
1 parent 97f3439 commit 3fc00e2
Showing 1 changed file with 86 additions and 92 deletions.
178 changes: 86 additions & 92 deletions mode-s/mode_s_kafka_bridge/mode_s.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@
from collections import deque
from confluent_kafka import Producer

if sys.gettrace() is not None:
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
else:
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

logger = logging.getLogger(__name__)
logger.handlers = []
logger.setLevel(logging.DEBUG if sys.gettrace() else logging.INFO)

stream_handler = logging.StreamHandler(sys.stdout)
stream_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
logger.addHandler(stream_handler)
logger.propagate = False


class ADSBClient(TcpClient):
Expand Down Expand Up @@ -62,75 +64,78 @@ def handle_messages(self, messages):
return
msgs = []
for msg, ts in messages:
raw_msg = bytes.fromhex(msg)
if len(raw_msg) < 7:
continue
ts_ms = int(ts * 1000)
df = pms.df(msg)
icao = pms.icao(msg)
dbfs_rssi = None
raw_rssi = raw_msg[6]
if raw_rssi > 0:
rssi_ratio = raw_rssi / 255
signal_level = rssi_ratio ** 2
dbfs_rssi = round(10 * math.log10(signal_level), 2)

record = ModeS_ADSB_Record(
ts=ts_ms, icao=icao, df=df, rssi=dbfs_rssi, tc=None, bcode=None, alt=None, cs=None, sq=None, lat=None, lon=None,
spd=None, ang=None, vr=None, spd_type=None, dir_src=None, vr_src=None, ws=None, wd=None, at=None, ap=None, hm=None,
roll=None, trak=None, gs=None, tas=None, hd=None, ias=None, m=None, vrb=None, vri=None, emst=None, tgt=None, opst=None
)
if df in (17, 18):
if pms.crc(msg) != 0:
try:
raw_msg = bytes.fromhex(msg)
if len(raw_msg) < 7:
logger.warning("Invalid message (too short): %s", msg)
continue
tc = pms.typecode(msg)
record.tc = tc
if 1 <= tc <= 4:
record.cs = pms.adsb.callsign(msg)
elif 5 <= tc <= 8:
lat, lon = pms.adsb.surface_position_with_ref(msg, self.ref_lat, self.ref_lon)
record.lat, record.lon = lat, lon
elif 9 <= tc <= 18:
record.alt = pms.adsb.altitude(msg)
lat, lon = pms.adsb.airborne_position_with_ref(msg, self.ref_lat, self.ref_lon)
record.lat, record.lon = lat, lon
elif tc == 19:
speed, angle, vr, spd_type, *extras = pms.adsb.velocity(msg)
record.spd, record.ang, record.vr = speed, angle, vr
record.spd_type = spd_type
if len(extras) > 0:
record.dir_src = extras[0]
if len(extras) > 1:
record.vr_src = extras[1]
elif 20 <= tc <= 22:
record.alt = pms.adsb.altitude(msg)
lat, lon = pms.adsb.airborne_position_with_ref(msg, self.ref_lat, self.ref_lon)
record.lat, record.lon = lat, lon
elif df in (20, 21):
bds = pms.bds.infer(msg, mrar=True)
record.bcode = bds if bds else None
if df == 20:
record.alt = pms.common.altcode(msg)
if df == 21:
record.sq = str(pms.common.idcode(msg))
if bds == "BDS44":
ws, wd = pms.commb.wind44(msg)
record.ws, record.wd = ws, wd
record.at = pms.commb.temp44(msg)
record.ap = pms.commb.p44(msg)
record.hm = pms.commb.hum44(msg)
elif bds == "BDS50":
record.roll = pms.commb.roll50(msg)
record.trak = pms.commb.trk50(msg)
record.gs = pms.commb.gs50(msg)
record.tas = pms.commb.tas50(msg)
elif bds == "BDS60":
record.hd = pms.commb.hdg60(msg)
record.ias = pms.commb.ias60(msg)
record.m = pms.commb.mach60(msg)
record.vrb = pms.commb.vr60baro(msg)
record.vri = pms.commb.vr60ins(msg)
msgs.append(record)

ts_ms = int(ts * 1000)
df = pms.df(msg)
icao = pms.icao(msg)
dbfs_rssi = None
raw_rssi = raw_msg[6]
if raw_rssi > 0:
rssi_ratio = raw_rssi / 255
signal_level = rssi_ratio ** 2
dbfs_rssi = round(10 * math.log10(signal_level), 2)

record = ModeS_ADSB_Record(
ts=ts_ms, icao=icao, df=df, rssi=dbfs_rssi, tc=None, bcode=None, alt=None, cs=None, sq=None, lat=None, lon=None,
spd=None, ang=None, vr=None, spd_type=None, dir_src=None, vr_src=None, ws=None, wd=None, at=None, ap=None, hm=None,
roll=None, trak=None, gs=None, tas=None, hd=None, ias=None, m=None, vrb=None, vri=None, emst=None, tgt=None, opst=None
)
if df in (17, 18):
tc = pms.typecode(msg)
record.tc = tc
if 1 <= tc <= 4:
record.cs = pms.adsb.callsign(msg)
elif 5 <= tc <= 8:
lat, lon = pms.adsb.surface_position_with_ref(msg, self.ref_lat, self.ref_lon)
record.lat, record.lon = lat, lon
elif 9 <= tc <= 18:
record.alt = pms.adsb.altitude(msg)
lat, lon = pms.adsb.airborne_position_with_ref(msg, self.ref_lat, self.ref_lon)
record.lat, record.lon = lat, lon
elif tc == 19:
speed, angle, vr, spd_type, *extras = pms.adsb.velocity(msg)
record.spd, record.ang, record.vr = speed, angle, vr
record.spd_type = spd_type
if len(extras) > 0:
record.dir_src = extras[0]
if len(extras) > 1:
record.vr_src = extras[1]
elif 20 <= tc <= 22:
record.alt = pms.adsb.altitude(msg)
lat, lon = pms.adsb.airborne_position_with_ref(msg, self.ref_lat, self.ref_lon)
record.lat, record.lon = lat, lon
elif df in (20, 21):
bds = pms.bds.infer(msg, mrar=True)
record.bcode = bds if bds else None
if df == 20:
record.alt = pms.common.altcode(msg)
if df == 21:
record.sq = str(pms.common.idcode(msg))
if bds == "BDS44":
ws, wd = pms.commb.wind44(msg)
record.ws, record.wd = ws, wd
record.at = pms.commb.temp44(msg)
record.ap = pms.commb.p44(msg)
record.hm = pms.commb.hum44(msg)
elif bds == "BDS50":
record.roll = pms.commb.roll50(msg)
record.trak = pms.commb.trk50(msg)
record.gs = pms.commb.gs50(msg)
record.tas = pms.commb.tas50(msg)
elif bds == "BDS60":
record.hd = pms.commb.hdg60(msg)
record.ias = pms.commb.ias60(msg)
record.m = pms.commb.mach60(msg)
record.vrb = pms.commb.vr60baro(msg)
record.vri = pms.commb.vr60ins(msg)
msgs.append(record)
except Exception as e:
logger.error("Invalid message: %s, error: %s", msg, e)

if len(msgs) > 0:
bundle = Messages(messages=msgs)
Expand Down Expand Up @@ -164,30 +169,30 @@ async def queue_consumer(self, stop_event: threading.Event):
flush_producer=False
)
if (datetime.now() - last_flush) > timedelta(seconds=1) or self.records_since_last_flush >= 1000:
if last_info_log < datetime.now() - timedelta(minutes=5):
logging.info("Messages %d, records %d, queue length is %d", messages_since_last_log, records_since_last_log, len(self.task_queue))
if last_info_log < datetime.now() - timedelta(seconds=5*60):
logger.info("Messages %d, records %d, queue length is %d", messages_since_last_log, records_since_last_log, len(self.task_queue))
last_info_log = datetime.now()
messages_since_last_log = 0
records_since_last_log = 0
else:
logging.debug("Flushing producer, messages %d, records %d, queue length is %d", self.messages_since_last_flush, self.records_since_last_flush, len(self.task_queue))
logger.debug("Flushing producer, messages %d, records %d, queue length is %d", self.messages_since_last_flush, self.records_since_last_flush, len(self.task_queue))
self.producer.producer.flush()
self.messages_since_last_flush = 0
self.records_since_last_flush = 0
last_flush = datetime.now()
except asyncio.CancelledError:
logging.info("Queue consumer task cancelled")
logger.info("Queue consumer task cancelled")
return
except Exception as e:
logging.error("Error sending messages: %s", e)
logger.error("Error sending messages: %s", e)
else:
await asyncio.sleep(0.05)
except asyncio.CancelledError:
logging.info("Queue consumer task cancelled")
logger.info("Queue consumer task cancelled")
return
except Exception as e:
logging.error("Queue consumer task error: %s", e)
raise e
logger.error("Queue consumer task error: %s", e)
raise e


def parse_connection_string(connection_string: str) -> Dict[str, str]:
Expand Down Expand Up @@ -252,19 +257,15 @@ async def run():
if args.subcommand == 'feed':
# Host/port/pos checks
if not args.host:
logging.error("Missing required parameter: host")
print("Error: Dump1090 host is required (env: DUMP1090_HOST or --host)")
return
if not args.port:
logging.error("Missing required parameter: port")
print("Error: Dump1090 port is required (env: DUMP1090_PORT or --port)")
return
if args.ref_lat is None:
logging.error("Missing required parameter: ref_lat")
print("Error: Antenna latitude is required (env: REF_LAT or --ref-lat)")
return
if args.ref_lon is None:
logging.error("Missing required parameter: ref_lon")
print("Error: Antenna longitude is required (env: REF_LON or --ref-lon)")
return

Expand All @@ -282,7 +283,6 @@ async def run():
sasl_username = config_params.get('sasl.username')
sasl_password = config_params.get('sasl.password')
except ValueError as e:
logging.error("Invalid connection string: %s", e)
print("Error: Invalid connection string format.")
return
else:
Expand All @@ -292,15 +292,12 @@ async def run():
sasl_password = args.sasl_password

if not kafka_bootstrap_servers:
logging.error("Missing required parameter: kafka_bootstrap_servers")
print("Error: No Kafka bootstrap servers found.")
return
if not kafka_topic:
logging.error("Missing required parameter: kafka_topic")
print("Error: No Kafka topic found.")
return
if not sasl_username or not sasl_password:
logging.error("Missing required SASL credentials")
print("Error: SASL username and password are required.")
return

Expand All @@ -315,7 +312,6 @@ async def run():
}
kafka_producer = Producer(kafka_config)
except Exception as producer_err:
logging.error("Failed to create Kafka producer: %s", producer_err)
print("Error: Could not create Kafka producer.")
return

Expand Down Expand Up @@ -347,9 +343,7 @@ def signal_handler(signum, frame):

except KeyboardInterrupt:
print("Interrupted")
logging.info("Application interrupted by user.")
except Exception as e:
logging.error("Unhandled startup error: %s", e)
print(f"Error: {e}")

def main():
Expand Down

0 comments on commit 3fc00e2

Please sign in to comment.