Skip to content
This repository has been archived by the owner on Sep 2, 2024. It is now read-only.

Commit

Permalink
Introduce CAN reader hierarchy
Browse files Browse the repository at this point in the history
Increase cohesion and reduce coupling between components by introducing
an abstract CANReader class which defines the responsibilities and
behavior of the DBCReader and J1939Reader.

Modified readers to consistently use HW based CAN frame filtering based
on DBC message and DBC<->VSS mapping definitions.

Also added typing information to functions and member variables to
improve readability and maintainability.
  • Loading branch information
sophokles73 authored and erikbosch committed Sep 19, 2023
1 parent 9d718b9 commit b53717c
Show file tree
Hide file tree
Showing 12 changed files with 439 additions and 378 deletions.
126 changes: 55 additions & 71 deletions dbc2val/dbcfeeder.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@
import asyncio

from signal import SIGINT, SIGTERM, signal
from typing import Any, Dict, Optional
from typing import Any, Dict, List, Optional

from dbcfeederlib import canclient
from dbcfeederlib import canplayer
from dbcfeederlib.canclient import CANClient
from dbcfeederlib.canreader import CanReader
from dbcfeederlib import dbc2vssmapper
from dbcfeederlib import dbcreader
from dbcfeederlib import j1939reader
Expand Down Expand Up @@ -125,31 +125,30 @@ class Feeder:
Start listening to the queue and transform CAN messages to VSS data and if conditions
are fulfilled send them to the client wrapper which in turn send it to the bckend supported by the wrapper.
"""
def __init__(self, client_wrapper: clientwrapper.ClientWrapper,
def __init__(self, kuksa_client: clientwrapper.ClientWrapper,
elmcan_config: Dict[str, Any], dbc2val: bool = True, val2dbc: bool = False):
self._running = False
self._reader = None
self._player: Optional[canplayer.CANplayer] = None
self._mapper = None
self._registered = False
self._can_queue: queue.Queue[dbc2vssmapper.VSSObservation] = queue.Queue()
self._client_wrapper = client_wrapper
self._running: bool = False
self._reader: Optional[CanReader] = None
self._mapper: Optional[dbc2vssmapper.Mapper] = None
self._registered: bool = False
self._dbc2vss_queue: queue.Queue[dbc2vssmapper.VSSObservation] = queue.Queue()
self._kuksa_client = kuksa_client
self._elmcan_config = elmcan_config
self._disconnect_time = 0.0
self._dbc2val = dbc2val
self._val2dbc = val2dbc
self._canclient = None
self._transmit = False
self._dbc2val_enabled = dbc2val
self._val2dbc_enabled = val2dbc
self._canclient: Optional[CANClient] = None
self._transmit: bool = False

def start(
self,
canport,
dbc_file_names,
mappingfile,
dbc_default_file,
candumpfile,
use_j1939=False,
use_strict_parsing=False
canport: str,
dbc_file_names: List[str],
mappingfile: str,
dbc_default_file: Optional[str],
candumpfile: Optional[str],
use_j1939: bool = False,
use_strict_parsing: bool = False
):

self._running = True
Expand All @@ -160,59 +159,44 @@ def start(
expect_extended_frame_ids=use_j1939,
can_signal_default_values_file=dbc_default_file)

self._client_wrapper.start()
self._kuksa_client.start()
threads = []

if self._dbc2val and self._mapper.has_dbc2vss_mapping():
if self._dbc2val_enabled and self._mapper.has_dbc2vss_mapping():

log.info("Setting up reception of CAN signals")
if use_j1939:
log.info("Using J1939 reader")
self._reader = j1939reader.J1939Reader(self._can_queue, self._mapper)
self._reader = j1939reader.J1939Reader(self._dbc2vss_queue, self._mapper, canport, candumpfile)
else:
log.info("Using DBC reader")
self._reader = dbcreader.DBCReader(self._can_queue, self._mapper)

if candumpfile:
# use dumpfile
log.info(
"Using virtual bus to replay CAN messages (channel: %s) (dumpfile: %s)",
canport,
candumpfile
)
self._reader.start_listening(
bustype="virtual",
channel=canport,
bitrate=500000
)
self._player = canplayer.CANplayer(dumpfile=candumpfile)
self._player.start_replaying(canport=canport)
else:

if canport == 'elmcan':
self._reader = dbcreader.DBCReader(self._dbc2vss_queue, self._mapper, canport, candumpfile)

log.info("Using elmcan. Trying to set up elm2can bridge")
elm2canbridge.elm2canbridge(canport, self._elmcan_config, self._reader._canidwl)
if canport == 'elmcan':
log.info("Using elmcan. Trying to set up elm2can bridge")
whitelisted_frame_ids: List[int] = []
for filter in self._mapper.can_frame_id_whitelist():
whitelisted_frame_ids.append(filter.can_id) # type: ignore
elm2canbridge.elm2canbridge(canport, self._elmcan_config, whitelisted_frame_ids)

# use socketCAN
log.info("Using socket CAN device '%s'", canport)
self._reader.start_listening(bustype="socketcan", channel=canport)
self._reader.start()

receiver = threading.Thread(target=self._run_receiver)
receiver.start()
threads.append(receiver)
else:
log.info("No dbc2val mappings found or dbc2val disabled!")

if self._val2dbc and self._mapper.has_vss2dbc_mapping():
if not self._client_wrapper.supports_subscription():
if self._val2dbc_enabled and self._mapper.has_vss2dbc_mapping():
if not self._kuksa_client.supports_subscription():
log.error("Subscribing to VSS signals not supported by chosen client!")
self.stop()
else:
log.info("Starting transmit thread, using %s", canport)
# For now creating another bus
# Maybe support different buses for downstream/upstream in the future

self._canclient = canclient.CANClient(bustype="socketcan", channel=canport)
self._canclient = CANClient(interface="socketcan", channel=canport)

transmitter = threading.Thread(target=self._run_transmitter)
transmitter.start()
Expand All @@ -229,16 +213,12 @@ def stop(self):
# Tell others to stop
if self._reader is not None:
self._reader.stop()
if self._player is not None:
self._player.stop()
self._client_wrapper.stop()
self._kuksa_client.stop()
if self._canclient:
self._canclient.stop()
self._canclient = None
self._mapper = None
self._transmit = False

def is_running(self):
def is_running(self) -> bool:
return self._running

def _register_datapoints(self) -> bool:
Expand All @@ -254,7 +234,7 @@ def _register_datapoints(self) -> bool:
all_registered = True
for vss_name in self._mapper.get_vss_names():
log.debug("Checking if signal %s is registered", vss_name)
resp = self._client_wrapper.is_signal_defined(vss_name)
resp = self._kuksa_client.is_signal_defined(vss_name)
if not resp:
all_registered = False
return all_registered
Expand All @@ -265,7 +245,7 @@ def _run_receiver(self):
last_sent_log_entry = 0
queue_max_size = 0
while self._running is True:
if self._client_wrapper.is_connected():
if self._kuksa_client.is_connected():
self._disconnect_time = 0.0
else:
# As we actually cannot register
Expand All @@ -287,36 +267,40 @@ def _run_receiver(self):
if not processing_started:
processing_started = True
log.info("Starting to process CAN signals")
queue_size = self._can_queue.qsize()
queue_size = self._dbc2vss_queue.qsize()
if queue_size > queue_max_size:
queue_max_size = queue_size
vss_observation = self._can_queue.get(timeout=1)
vss_observation = self._dbc2vss_queue.get(timeout=1)
vss_mapping = self._mapper.get_dbc2vss_mapping(vss_observation.dbc_name, vss_observation.vss_name)
value = vss_mapping.transform_value(vss_observation.raw_value)
if value is None:
log.warning(f"Value ignored for dbc {vss_observation.dbc_name} to VSS {vss_observation.vss_name},"
f" from raw value {value} of type {type(value)}")
log.warning(
"Value ignored for dbc %s to VSS %s, from raw value %s of type %s",
vss_observation.dbc_name, vss_observation.vss_name, value, type(value)
)
elif not vss_mapping.change_condition_fulfilled(value):
log.debug(f"Value condition not fulfilled for VSS {vss_observation.vss_name}, value {value}")
log.debug("Value condition not fulfilled for VSS %s, value %s", vss_observation.vss_name, value)
else:
# get values out of the canreplay and map to desired signals
# update current value in KUKSA.val
target = vss_observation.vss_name

success = self._client_wrapper.update_datapoint(target, value)
success = self._kuksa_client.update_datapoint(target, value)
if success:
log.debug("Succeeded sending DataPoint(%s, %s, %f)", target, value, vss_observation.time)
# Give status message after 1, 2, 4, 8, 16, 32, 64, .... messages have been sent
messages_sent += 1
if messages_sent >= (2 * last_sent_log_entry):
log.info(f"Number of VSS messages sent so far: {messages_sent}, "
f"queue max size: {queue_max_size}")
log.info(
"Number of VSS messages sent so far: %d, queue max size: %d",
messages_sent, queue_max_size
)
last_sent_log_entry = messages_sent
except queue.Empty:
pass
except Exception:
log.error("Exception caught in main loop", exc_info=True)

async def vss_update(self, updates):
async def _vss_update(self, updates):
log.debug("vss-Update callback!")
dbc_ids = set()
for update in updates:
Expand Down Expand Up @@ -353,7 +337,7 @@ async def _run_subscribe(self):
Requests the client wrapper to start subscription.
Checks every second if we have requested to stop reception and if so exits
"""
asyncio.create_task(self._client_wrapper.subscribe(self._mapper.get_vss2dbc_entries(), self.vss_update))
asyncio.create_task(self._kuksa_client.subscribe(self._mapper.get_vss2dbc_entries(), self._vss_update))
while self._transmit:
await asyncio.sleep(1)

Expand Down
24 changes: 12 additions & 12 deletions dbc2val/dbcfeederlib/canclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,24 +23,24 @@ class CANClient:
"""
Wrapper class to hide dependency to CAN package.
Reason is to make it simple to replace the CAN package dependency with something else if your KUKSA.val
integration cannot interact directly with CAN, but rather interacts with some custom CAN solution/middleware
integration cannot interact directly with CAN, but rather interacts with some custom CAN solution/middleware.
"""

def __init__(self, *args, **kwargs):
self.bus = can.interface.Bus(*args, **kwargs) # pylint: disable=abstract-class-instantiated
# pylint: disable=abstract-class-instantiated
self._bus = can.interface.Bus(*args, **kwargs)

def stop(self):
if self.bus:
self.bus.shutdown()
self.bus = None
"""Shut down CAN bus."""
self._bus.shutdown()

def recv(self, timeout: int = 1) -> Optional[canmessage.CANMessage]:
"""Wait for message from CAN"""
"""Receive message from CAN bus."""
try:
msg = self.bus.recv(timeout)
msg = self._bus.recv(timeout)
except can.CanError:
msg = None
if self.bus:
if self._bus:
log.error("Error while waiting for recv from CAN", exc_info=True)
else:
# This is expected if we are shutting down
Expand All @@ -52,11 +52,11 @@ def recv(self, timeout: int = 1) -> Optional[canmessage.CANMessage]:
return None

def send(self, arbitration_id, data):
"""Send message to CAN"""
"""Write message to CAN bus."""
msg = can.Message(arbitration_id=arbitration_id, data=data)
try:
self.bus.send(msg)
log.debug(f"Message sent on {self.bus.channel_info}")
log.debug(f"Message: {msg}")
self._bus.send(msg)
if log.isEnabledFor(logging.DEBUG):
log.debug("Sent message [channel: %s]: %s", self._bus.channel_info, msg)
except can.CanError:
log.error("Failed to send message via CAN bus")
58 changes: 29 additions & 29 deletions dbc2val/dbcfeederlib/canplayer.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import threading

import can # type: ignore
from can.interfaces.virtual import VirtualBus # type: ignore
from can.interfaces.virtual import VirtualBus

log = logging.getLogger(__name__)

Expand All @@ -36,46 +36,46 @@ class CANplayer:
files suffix is one of the above (e.g. filename.asc.gz).
"""

def __init__(self, dumpfile):
self.run = False
self.messages = [can.message]
self.dumpfile = dumpfile

def process_log(self):
def __init__(self, dumpfile: str, can_port: str):
self._running = False
# open the file for reading can messages
log.info("Replaying CAN message log {}".format(self.dumpfile))
log.info("Replaying CAN messages from log file %s", dumpfile)
self._messages = can.LogReader(dumpfile)
self._can_port = can_port
log.debug("Using virtual bus to replay CAN messages (channel: %s)", self._can_port)
self._bus = VirtualBus(channel=can_port, bitrate=500000)

def _process_log(self):
# using MessageSync in order to consider timestamps of CAN messages
# and the delays between them
log_reader = can.MessageSync(messages=can.LogReader(self.dumpfile), timestamps=True)
log_reader = can.MessageSync(messages=self._messages, timestamps=True)
for msg in log_reader:
if not self.run:
break
if not self._running:
return
try:
self.bus.send(msg)
log.debug(f"Message sent on {self.bus.channel_info}")
log.debug(f"Message: {msg}")
self._bus.send(msg)
if log.isEnabledFor(logging.DEBUG):
log.debug("Sent message [channel: %s]: %s", self._bus.channel_info, msg)
except can.CanError:
log.error("Failed to send message via CAN bus")

log.info("Replayed all messages from CAN log file")

def txWorker(self):
log.info("Starting Tx thread")
def _tx_worker(self):
log.info("Starting to write CAN messages to bus")

while self.run:
self.process_log()
while self._running:
self._process_log()

log.info("Stopped Tx thread")
log.info("Stopped writing CAN messages to bus")

def start_replaying(self, canport):
log.debug("Using virtual bus to replay CAN messages (channel: %s)", canport)
self.bus = VirtualBus(channel=canport, bitrate=500000)
self.run = True
txThread = threading.Thread(target=self.txWorker)
txThread.start()
def start(self):
self._running = True
tx_thread = threading.Thread(target=self._tx_worker)
tx_thread.start()

def stop(self):
self.run = False
if self.bus:
self.bus.shutdown()
self.bus = None
self._running = False
if self._bus:
self._bus.shutdown()
self._bus = None
Loading

0 comments on commit b53717c

Please sign in to comment.