Skip to content

Commit

Permalink
updating stuff
Browse files Browse the repository at this point in the history
  • Loading branch information
RCMast3r committed Feb 17, 2024
1 parent 6f56aa7 commit 02c47ac
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 17 deletions.
1 change: 1 addition & 0 deletions flake.nix
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@
proto_gen_pkg
ht_can_pkg
cmake
can-utils
];
# Setting up the environment variables you need during
# development.
Expand Down
6 changes: 5 additions & 1 deletion py_data_acq/py_data_acq/common/protobuf_helpers.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from hytech_np_proto_py import hytech_pb2
import google.protobuf.message_factory
from cantools.database import *


def get_msg_names_and_classes():
Expand All @@ -23,5 +24,8 @@ def pack_protobuf_msg(cantools_dict: dict, msg_name: str, message_classes):
if msg_name in message_classes:
pb_msg = message_classes[msg_name]()
for key in cantools_dict.keys():
setattr(pb_msg, key, cantools_dict[key])
if(type(cantools_dict[key]) is namedsignalvalue.NamedSignalValue):
setattr(pb_msg, key, cantools_dict[key].value)
else:
setattr(pb_msg, key, cantools_dict[key])
return pb_msg
56 changes: 42 additions & 14 deletions py_data_acq/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,27 +26,51 @@
can_methods = {
"debug": [UdpMulticastBus.DEFAULT_GROUP_IPv6, 'udp_multicast'],
"local_can_usb_KV": [0, 'kvaser'],
"local_debug": ["vcan0", 'socketcan']
}

async def continuous_can_receiver(can_msg_decoder: cantools.db.Database, message_classes, queue, q2, config):
with can.Bus(
channel=config[0], interface=config[1]
) as bus:
reader = can.AsyncBufferedReader()
listeners: List[MessageRecipient] = [
reader # AsyncBufferedReader() listener
]
loop = asyncio.get_running_loop()
notifier = can.Notifier(bus, listeners, loop=loop)
while True:
msg = await reader.get_message()
async def continuous_can_receiver(can_msg_decoder: cantools.db.Database, message_classes, queue, q2, can_bus):
loop = asyncio.get_event_loop()
reader = can.AsyncBufferedReader()
notifier = can.Notifier(can_bus, [reader], loop=loop)

while True:
# Wait for the next message from the buffer
msg = await reader.get_message()
try:

decoded_msg = can_msg_decoder.decode_message(msg.arbitration_id, msg.data, decode_containers=True)
msg = can_msg_decoder.get_message_by_frame_id(msg.arbitration_id)
msg = pb_helpers.pack_protobuf_msg(decoded_msg, msg.name.lower(), message_classes)

data = QueueData(msg.DESCRIPTOR.name, msg)
# await asyncio.sleep(1)
await queue.put(data)
await q2.put(data)
except:
pass

# Don't forget to stop the notifier to clean up resources.
notifier.stop()


# with can.Bus(
# interface='socketcan', channel='vcan0', receive_own_messages=True
# ) as bus:
# # bus = can.Bus(interface='socketcan', channel='vcan0', receive_own_messages=True)
# reader = can.AsyncBufferedReader()
# loop = asyncio.get_running_loop()
# notifier = can.Notifier(bus, [reader], loop=loop)
# while True:
# msg = await reader.get_message()
# decoded_msg = can_msg_decoder.decode_message(msg.arbitration_id, msg.data, decode_containers=True)
# msg = can_msg_decoder.get_message_by_frame_id(msg.arbitration_id)
# msg = pb_helpers.pack_protobuf_msg(decoded_msg, msg.name.lower(), message_classes)
# print("received new messgae")
# data = QueueData(msg.DESCRIPTOR.name, msg)
# await queue.put(data)
# await q2.put(data)

# notifier.stop()

async def write_data_to_mcap(queue, mcap_writer):
async with mcap_writer as mcw:
Expand All @@ -62,6 +86,7 @@ async def run(logger):

# for example, we will have CAN as our only input as of right now but we may need to add in
# a sensor that inputs over UART or ethernet
bus = can.Bus(interface='socketcan', channel='vcan0', receive_own_messages=True)
queue = asyncio.Queue()
queue2 = asyncio.Queue()
path_to_bin = ""
Expand All @@ -84,7 +109,7 @@ async def run(logger):

mcap_writer = HTPBMcapWriter(".", list_of_msg_names, True)
mcap_server = MCAPServer(mcap_writer=mcap_writer)
receiver_task = asyncio.create_task(continuous_can_receiver(db, msg_pb_classes, queue, queue2, can_methods["debug"]))
receiver_task = asyncio.create_task(continuous_can_receiver(db, msg_pb_classes, queue, queue2, bus))
fx_task = asyncio.create_task(fxglv_websocket_consume_data(queue, fx_s))
mcap_task = asyncio.create_task(write_data_to_mcap(queue2, mcap_writer))
srv_task = asyncio.create_task(mcap_server.start_server())
Expand All @@ -94,7 +119,10 @@ async def run(logger):
# and schema in the foxglove websocket server.

await asyncio.gather(receiver_task, fx_task, mcap_task, srv_task)
# await asyncio.gather(receiver_task, fx_task, mcap_task)

# await asyncio.gather(receiver_task, mcap_task, srv_task)
# await asyncio.gather(receiver_task)

if __name__ == "__main__":
logging.basicConfig()
Expand Down
5 changes: 3 additions & 2 deletions py_dbc_proto_gen/dbc_to_proto.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ def append_proto_message_from_CAN_message(file, can_msg: can.message.Message):
line_index = 0
for sig in can_msg.signals:
line_index += 1
if sig.is_float or (

if sig.is_float or ((sig.scale is not None) and (sig.scale != 1.0)) or (
type(sig.conversion)
is not type(conversion.IdentityConversion(is_float=False))
and not type(
Expand All @@ -44,7 +45,7 @@ def append_proto_message_from_CAN_message(file, can_msg: can.message.Message):
+ str(line_index)
+ ";"
)
elif sig.choices is not None:
elif sig.choices is not None and sig.length is not 1:
line = (
" string "
+ create_field_name(sig.name)
Expand Down

0 comments on commit 02c47ac

Please sign in to comment.