Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spgt 6 implement a save of project file #11

Merged
merged 4 commits into from
Jul 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions project/__init__.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
from project.controller.maincontroller import MainController
from .view import MainUI
from .controller import SerialController
from .model import MainModel

class Project:
def main(self):
serialPort = SerialController()
mainWindow = MainUI('Porty', serialPort)
mainModel = MainModel()
mainController = MainController(mainModel)
mainWindow = MainUI('Porty', mainController)
mainWindow.launch()


3 changes: 2 additions & 1 deletion project/controller/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
from .serialcontroller import SerialController
from .serialcontroller import SerialController
from .maincontroller import MainController
43 changes: 43 additions & 0 deletions project/controller/maincontroller.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@

from project.controller.serialcontroller import SerialController
from project.model.main import MainModel


class MainController():
def __init__(self, model:MainModel) -> None:
self._mainModel = model
self._serialController = SerialController(self._mainModel)
pass

def set_device_port(self, comPort:str):
self._serialController.set_comport(comPortName = comPort, conf=self._mainModel.get_all_port_settings())
self._mainModel.update_port_name(comPort)
def connect_to_device(self):
self._serialController.connect()
def disconnect_from_device(self):
self._serialController.disconnect()
def send_packet(self, msg):
self._serialController.send_packet(msg)
def update_port_baudrate(self, baudrate):
self._mainModel.update_setting_baudrate(baudrate)
self._serialController.set_comport(comPortName = self._mainModel.get_port_name(), conf=self._mainModel.get_all_port_settings())
def update_serial_cb(self, handlePacketCb, connectionCb, disconnectionCb):
self._serialController.handle_packet = handlePacketCb
self._serialController.connection_callback = connectionCb
self._serialController.disconnection_callback = disconnectionCb


def save_project_settings(self, filename:str):
self._mainModel.save_project_file(filename)
def open_project_settings(self, filename:str):
self._mainModel.load_project_file(filename)

def update_send_sequence(self, sequence:list):
self._mainModel.update_send_sequence(sequence)
def get_send_sequences(self)->list:
return self._mainModel.get_sequences()
def get_saved_port_name(self)->str:
return self._mainModel.get_port_name()

def list_serial_ports(self) -> list:
return self._serialController.list_serial_ports()
57 changes: 38 additions & 19 deletions project/controller/serialcontroller.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,40 +2,54 @@
from serial import Serial, SerialException
from serial.threaded import Protocol, ReaderThread
from serial.tools import list_ports

from project.model.main import MainModel

class FixedLengthPacketHandler(Protocol):
"""\
Modified Protocol as used by the ReaderThread.
"""
PACKET_LENGTH = 256
isConnected:bool
def __init__(self, controller):
super().__init__()
self.buffer = bytearray()
self.controller = controller
self.transport = None
self.isConnected = False

def connection_made(self, transport):
"""Called when reader thread is started"""
super(FixedLengthPacketHandler, self).connection_made(transport)
print(f'Serial connection made for {transport.serial.name}')
self.transport = transport
self.buffer = bytearray()
self.isConnected = True
if self.controller and self.controller.connection_callback:
self.controller.connection_callback()

def chunk_and_handle_packets(self, big_buff:bytearray)->bytearray:
small_buff = big_buff[:self.PACKET_LENGTH]
big_buff = big_buff[self.PACKET_LENGTH:]
if self.controller and self.controller.handle_packet:
convhex = " ".join(["{:02x}".format(bytes) for bytes in small_buff])
self.controller.handle_packet('[RX]: ' + convhex.upper())
while len(big_buff)>=len(small_buff):
small_buff = big_buff[:self.PACKET_LENGTH]
big_buff = big_buff[self.PACKET_LENGTH:]
if self.controller and self.controller.handle_packet:
convhex = " ".join(["{:02x}".format(bytes) for bytes in small_buff])
self.controller.handle_packet('[RX]: ' + convhex.upper())
return big_buff

def data_received(self, data):
"""\
Called when data is received from the serial port
Append bytes till buffer is full and handle the packet.
"""
self.buffer.extend(data)
out = " ".join(["{:02x}".format(bytes) for bytes in data])
if len(self.buffer) >= self.PACKET_LENGTH:
temp_buff = self.buffer[:self.PACKET_LENGTH]
self.buffer = self.buffer[self.PACKET_LENGTH:]
# TODO: Handle this
if self.controller and self.controller.handle_packet:
convhex = " ".join(["{:02x}".format(bytes) for bytes in temp_buff])
self.controller.handle_packet('[RX]: ' + convhex.upper())
self.buffer=self.chunk_and_handle_packets(self.buffer)


def send_data(self, data):
Expand All @@ -50,9 +64,9 @@ def connection_lost(self, exc):
Called when the serial port is closed or the reader loop terminated
otherwise.
"""
print(f'Connection Lost')
self.transport = None
self.buffer = bytearray()
self.isConnected = False
if self.controller and self.controller.disconnection_callback:
self.controller.disconnection_callback()
if isinstance(exc, Exception):
Expand All @@ -73,34 +87,37 @@ class SerialController:
_rt:ReaderThread
_proto:FixedLengthPacketHandler

def __init__(self, comPortName:str = None, conf:dict = None) -> None:
def __init__(self, model:MainModel) -> None:

self._sp = None
self._proto = None
self._model= model
conf = self._model.get_all_port_settings()
print(conf)
if conf:
for setting in conf:
if setting in self._conf:
self._conf[setting]=conf[setting]
else:
raise ValueError(f'Unsupported or invalid setting {setting}')

if comPortName:
self.set_comport(comPortName)

raise ValueError(f'Unsupported or invalid setting {setting}')
self.connection_callback = lambda : print('connection_callback')
self.disconnection_callback = lambda : print('disconnection_callback')
self.handle_packet = lambda : print('handle_packet')

def set_comport(self, comPortName):
def set_comport(self, comPortName:str, conf:dict = None):
'''\
Set comport
'''
availablePorts = self.list_serial_ports()
if comPortName in availablePorts:
if self._sp:
self.disconnect()
if self._sp.is_open:
self.disconnect()
self._sp = None
self._selectedPortName = comPortName
try:
if conf:
self._conf = conf
self._sp = Serial(self._selectedPortName)
self._sp.baudrate = self._conf['baudrate']
self._sp.bytesize=self._conf['bytesize']
Expand All @@ -113,7 +130,7 @@ def set_comport(self, comPortName):
except SerialException as e:
print(f'Cannot open COM Port:{self._selectedPortName}, {e}')
else:
raise ValueError('Port {comPortName} doesn\'t exist')
raise ValueError(f'Port {comPortName} doesn\'t exist')

def connect(self):
'''\
Expand All @@ -128,7 +145,7 @@ def disconnect(self):
'''\
Attempts to disconnect to the specified serial port
'''
if self._rt is not None:
if self._proto is not None:
try:
self._rt.close()
except Exception as e:
Expand All @@ -151,6 +168,7 @@ def send_packet(self, msg):
def serial_packet_handler(self):
self._proto = FixedLengthPacketHandler(self)
return self._proto


@staticmethod
def list_serial_ports() -> list:
Expand All @@ -166,7 +184,8 @@ def list_serial_ports() -> list:
TEST_PORT = 'COM9'
ports = SerialController.list_serial_ports()
portSetting = {'baudrate':115200}
sPort = SerialController(TEST_PORT, portSetting)
sPort = SerialController()
sPort.set_comport(TEST_PORT)
with sPort as device:
sleep(1)
print('End')
Expand Down
1 change: 1 addition & 0 deletions project/model/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .main import MainModel
63 changes: 63 additions & 0 deletions project/model/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import json
from project.util import valuemodifier as UVM
class MainModel():
def __init__(self) -> None:
self._portName=''
self._portSettings = {
'baudrate':115200,
'bytesize':8,
'parity':'N',
'stopbits':1,
'timeout':1,
'xonxoff':False,
'rtscts':False
}
self._sendSequence=[]

def get_all_port_settings(self)->dict:
return self._portSettings

def update_port_name(self, portName:str):
self._portName = portName
def get_port_name(self)->str:
return self._portName
def update_setting_baudrate(self, baudrate:int):
self._portSettings['baudrate']=UVM.return_as_int(baudrate)
def update_setting_bytesize(self, bytesize:int):
self._portSettings['bytesize'] = UVM.return_as_int(bytesize)
def update_setting_parity(self, parity):
self._portSettings['parity'] = parity
def update_setting_stopbits(self, stopbits:int):
self._portSettings['stopbits'] = UVM.return_as_int(stopbits)
def update_setting_timeout(self, timeout:int):
self._portSettings['timeout'] = UVM.return_as_int(timeout)
def update_setting_xonxoff(self, xonxoff:bool):
self._portSettings['xonxoff'] = xonxoff
def update_setting_rtscts(self, rtscts:bool):
self._portSettings['rtscts'] = rtscts

def update_send_sequence(self, sequence:list)->None:
self._sendSequence = sequence
def get_sequences(self)->list:
return self._sendSequence

def save_project_file(self, filepath:str):
outData = {
'port':self._portName,
'portSetting':self._portSettings,
'sequences':self._sendSequence
}
with open(filepath, "w+") as outfile:
json.dump(outData, outfile)

def load_project_file(self, filepath:str)->bool:
projectSetting={}
with open(filepath, 'r') as projectFile:
projectSetting = json.load(projectFile)
if 'sequences' in projectSetting:
self._sendSequence = projectSetting['sequences']
if 'port' in projectSetting:
self._portName = projectSetting['port']
if 'portSetting' in projectSetting:
self._portSettings = projectSetting['portSetting']
return True
8 changes: 8 additions & 0 deletions project/util/valuemodifier.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@

def return_as_int(value)->int:
if type(value)==str:
return int(value)
elif type(value)==int:
return value
else:
raise TypeError(f'Incorrect Baudrate type {type(value)}')
Loading