Skip to content

Commit

Permalink
re-worked writing
Browse files Browse the repository at this point in the history
  • Loading branch information
RCMast3r committed Mar 18, 2024
1 parent 0863d87 commit cffd91f
Show file tree
Hide file tree
Showing 5 changed files with 177 additions and 87 deletions.
4 changes: 2 additions & 2 deletions py_data_acq/broadcast-test.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@
from hytech_np_proto_py import hytech_pb2

# Define the IP and port for the UDP socket
# bus1 = can.interface.Bus('can0', bustype='virtual')
bus1 = can.Bus(channel="can0", interface='socketcan')
bus1 = can.interface.Bus(channel=UdpMulticastBus.DEFAULT_GROUP_IPv6, interface="udp_multicast")
# bus1 = can.Bus(channel="can0", interface='socketcan')
def main():
path_to_dbc = os.environ.get('DBC_PATH')
full_path = os.path.join(path_to_dbc, "hytech.dbc")
Expand Down
11 changes: 10 additions & 1 deletion py_data_acq/py_data_acq/common/common_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,13 @@ class QueueData():
def __init__(self, schema_name: str, msg):
self.name = schema_name
self.data = msg.SerializeToString()
self.pb_msg = msg
self.pb_msg = msg

class MCAPServerStatusQueueData():
def __init__(self, writing_status: bool, writing_file: str):
self.is_writing = writing_status
self.writing_file = writing_file

class MCAPFileWriterCommand():
def __init__(self, write: bool):
self.writing = write
76 changes: 55 additions & 21 deletions py_data_acq/py_data_acq/mcap_writer/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,48 +3,82 @@
import time
from mcap_protobuf.writer import Writer
from datetime import datetime
from typing import (
Any,
Optional,
Set
)
from typing import Any, Optional, Set
import os

class HTPBMcapWriter(Writer):
def __init__(self, mcap_base_path, msg_names: list[str], msg_classes):

class HTPBMcapWriter:
def __init__(self, mcap_base_path, init_writing: bool):
self.base_path = mcap_base_path
messages = msg_names
self.message_classes = msg_classes
now = datetime.now()
date_time_filename = now.strftime("%m_%d_%Y_%H_%M_%S"+".mcap")
self.actual_path = os.path.join(mcap_base_path, date_time_filename)
self.writing_file = open(self.actual_path, "wb")
super().__init__(self.writing_file)
if init_writing:
now = datetime.now()
date_time_filename = now.strftime("%m_%d_%Y_%H_%M_%S" + ".mcap")
self.actual_path = os.path.join(mcap_base_path, date_time_filename)
self.writing_file = open(self.actual_path, "wb")
self.mcap_writer_class = Writer(self.writing_file)
self.is_writing = True
else:
self.is_writing = False
self.actual_path = None
self.writing_file = None
self.mcap_writer_class = None

def __await__(self):
async def closure():
print("await")
return self

return closure().__await__()

def __enter__(self):
return self

def __exit__(self, exc_, exc_type_, tb_):
super().finish()
self.mcap_writer_class.finish()
self.writing_file.close()

def __aenter__(self):
return self

async def __aexit__(self, exc_type: Any, exc_val: Any, traceback: Any):
super().finish()
self.mcap_writer_class.finish()
self.writing_file.close()

async def close_writer(self):
if self.is_writing:
self.is_writing = False
self.mcap_writer_class.finish()
self.writing_file.close()

return True

async def open_new_writer(self):
if self.is_writing:
self.is_writing = False
self.mcap_writer_class.finish()
self.writing_file.close()

now = datetime.now()
date_time_filename = now.strftime("%m_%d_%Y_%H_%M_%S" + ".mcap")
self.actual_path = os.path.join(self.base_path, date_time_filename)
self.writing_file = open(self.actual_path, "wb")
self.mcap_writer_class = Writer(self.writing_file)
self.is_writing = True

return True

async def write_msg(self, msg):
super().write_message(topic=msg.DESCRIPTOR.name+"_data", message=msg, log_time=int(time.time_ns()), publish_time=int(time.time_ns()))
self.writing_file.flush()
if self.is_writing:
self.mcap_writer_class.write_message(
topic=msg.DESCRIPTOR.name + "_data",
message=msg,
log_time=int(time.time_ns()),
publish_time=int(time.time_ns()),
)
self.writing_file.flush()
return True

async def write_data(self, queue):
msg = await queue.get()
if msg is not None:
return await self.write_msg(msg.pb_msg)



51 changes: 28 additions & 23 deletions py_data_acq/py_data_acq/web_server/mcap_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,25 @@
import json
from py_data_acq.mcap_writer.writer import HTPBMcapWriter
import py_data_acq.common.protobuf_helpers as pb_helpers
from py_data_acq.common.common_types import MCAPServerStatusQueueData, MCAPFileWriterCommand
from typing import Any

class MCAPServer:
def __init__(self, host='0.0.0.0', port=6969, mcap_writer=None,path='.'):
def __init__(self, writer_command_queue: asyncio.Queue, writer_status_queue: asyncio.Queue, init_writing= True, init_filename = '.',host='0.0.0.0', port=6969):
self.host = host
self.port = port
self.mcap_writer = mcap_writer
self.path = path
if mcap_writer is not None:
self.mcap_status_message = f"An MCAP file is being written: {self.mcap_writer.writing_file.name}"

self.is_writing = init_writing
self.cmd_queue = writer_command_queue
self.status_queue = writer_status_queue

if(init_writing):
self.is_writing = True
self.mcap_status_message = f"An MCAP file is being written: {init_filename}"
else:
self.is_writing = False
self.mcap_status_message = "No MCAP file is being written."
self.html_content = b"""<!DOCTYPE html>
self.html_content = b"""<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
Expand Down Expand Up @@ -75,31 +81,30 @@ async def serve_file(self):
header = b"HTTP/1.1 200 OK\r\nContent-Type: text/html\r\n\r\n"
return header + current_html_content

async def start_mcap_generation(self):
if self.mcap_writer is None:
list_of_msg_names, msg_pb_classes = pb_helpers.get_msg_names_and_classes()
self.mcap_writer = HTPBMcapWriter(self.path, list_of_msg_names, msg_pb_classes)
self.mcap_status_message = f"An MCAP file is being written: {self.mcap_writer.writing_file.name}"

async def stop_mcap_generation(self):
if self.mcap_writer is not None:
# await self.mcap_writer.__aexit__(None, None, None)
self.mcap_writer.finish()
self.mcap_writer.writing_file.close()
self.mcap_status_message = "No MCAP file is being written."
self.mcap_writer = None
async def start_stop_mcap_generation(self, input_cmd: bool):
await self.cmd_queue.put(MCAPFileWriterCommand(input_cmd))
while True:
# Wait for the next message from the queue
message = await self.status_queue.get()
if message.is_writing:
self.is_writing = True
self.mcap_status_message = f"An MCAP file is being written: {message.writing_file}"
else:
self.is_writing = False
self.mcap_status_message = f"No MCAP file is being written."
# Important: Mark the current task as done to allow the queue to proceed
self.status_queue.task_done()

def handle_command(self, command):
if command == '/start':
asyncio.create_task(self.start_mcap_generation())
asyncio.create_task(self.start_stop_mcap_generation(True))
return "MCAP generation started."
elif command == '/stop':
asyncio.create_task(self.stop_mcap_generation())
asyncio.create_task(self.start_stop_mcap_generation(False))
return "MCAP generation stopped."
else:
return "Command not recognized."


# Checks if client connected and updates them on different actions
async def handle_client(self, reader, writer):
addr = writer.get_extra_info('peername')
Expand All @@ -117,7 +122,7 @@ async def handle_client(self, reader, writer):
elif url == '/status':
status_response = {
"statusMessage": self.mcap_status_message,
"isRecording": self.mcap_writer is not None
"isRecording": self.is_writing
}
response_bytes = json.dumps(status_response).encode('utf-8')
response = (f"HTTP/1.1 200 OK\r\n"
Expand Down
Loading

0 comments on commit cffd91f

Please sign in to comment.