Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ocpp): Make websocket connection persistent #1932

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 53 additions & 73 deletions packages/control/ocpp.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,43 +10,38 @@
from control.optional_data import OptionalProtocol
from modules.common.fault_state import FaultState


log = logging.getLogger(__name__)


class OcppMixin:
_ws: Optional[websockets.WebSocketClientProtocol] = None

def _get_formatted_time(self: OptionalProtocol) -> str:
return datetime.now().strftime("%Y-%m-%dT%H:%M:%SZ")

def _process_call(self: OptionalProtocol,
chargebox_id: str,
fault_state: FaultState,
func: Callable) -> Optional[websockets.WebSocketClientProtocol]:
async def make_call() -> websockets.WebSocketClientProtocol:
async with websockets.connect(self.data.ocpp.url+chargebox_id,
subprotocols=[self.data.ocpp.version]) as ws:
try:
cp = OcppChargepoint(chargebox_id, ws, 2)
await cp.call(func)
except asyncio.exceptions.TimeoutError:
# log.exception("Erwarteter TimeOut StartTransaction")
pass
return ws
try:
if self.data.ocpp.active and chargebox_id:
return asyncio.run(make_call())
except websockets.exceptions.InvalidStatusCode:
fault_state.warning(f"Chargebox ID {chargebox_id} konnte nicht im OCPP-Backend gefunden werden oder "
"URL des Backends ist falsch.")
return None
async def _get_connection(self: OptionalProtocol, chargebox_id: str, fault_state: FaultState) -> Optional[websockets.WebSocketClientProtocol]:
if self._ws is None or not self._ws.open:
try:
self._ws = await websockets.connect(
self.data.ocpp.url + chargebox_id,
subprotocols=[self.data.ocpp.version]
)
except websockets.exceptions.InvalidStatusCode:
fault_state.warning(f"Chargebox ID {chargebox_id} konnte nicht im OCPP-Backend gefunden werden oder URL des Backends ist falsch.")
self._ws = None
return self._ws

async def _process_call(self: OptionalProtocol, chargebox_id: str, fault_state: FaultState, func: Callable):
ws = await self._get_connection(chargebox_id, fault_state)
if ws:
try:
cp = OcppChargepoint(chargebox_id, ws, 2)
await cp.call(func)
except asyncio.exceptions.TimeoutError:
pass

def boot_notification(self: OptionalProtocol,
chargebox_id: str,
fault_state: FaultState,
model: str,
serial_number: str) -> Optional[int]:
async def boot_notification(self: OptionalProtocol, chargebox_id: str, fault_state: FaultState, model: str, serial_number: str) -> Optional[int]:
try:
self._process_call(chargebox_id, fault_state, call.BootNotification(
await self._process_call(chargebox_id, fault_state, call.BootNotification(
charge_point_model=model,
charge_point_vendor="openWB",
firmware_version=data.data.system_data["system"].data["version"],
Expand All @@ -55,72 +50,57 @@ def boot_notification(self: OptionalProtocol,
except Exception as e:
fault_state.from_exception(e)

def start_transaction(self: OptionalProtocol,
chargebox_id: str,
fault_state: FaultState,
connector_id: int,
id_tag: str,
imported: int) -> Optional[int]:
async def start_transaction(self: OptionalProtocol, chargebox_id: str, fault_state: FaultState, connector_id: int, id_tag: str, imported: int) -> Optional[int]:
try:
ws = self._process_call(chargebox_id, fault_state, call.StartTransaction(
await self._process_call(chargebox_id, fault_state, call.StartTransaction(
connector_id=connector_id,
id_tag=id_tag if id_tag else "",
meter_start=int(imported),
timestamp=self._get_formatted_time()
))
if ws:
tansaction_id = json.loads(ws.messages[0])[2]["transactionId"]
log.debug(f"Transaction ID: {tansaction_id} für Chargebox ID: {chargebox_id} mit Tag: {id_tag} und "
f"Zählerstand: {imported} erhalten.")
return tansaction_id
if self._ws and self._ws.messages:
transaction_id = json.loads(self._ws.messages[0])[2]["transactionId"]
log.debug(f"Transaction ID: {transaction_id} für Chargebox ID: {chargebox_id} mit Tag: {id_tag} und Zählerstand: {imported} erhalten.")
return transaction_id
except Exception as e:
fault_state.from_exception(e)
return None

def transfer_values(self: OptionalProtocol,
chargebox_id: str,
fault_state: FaultState,
connector_id: int,
imported: int) -> None:
async def transfer_values(self: OptionalProtocol, chargebox_id: str, fault_state: FaultState, connector_id: int, imported: int) -> None:
try:
self._process_call(chargebox_id, fault_state, call.MeterValues(
await self._process_call(chargebox_id, fault_state, call.MeterValues(
connector_id=connector_id,
meter_value=[{"timestamp": self._get_formatted_time(),
"sampledValue": [
{
"value": f'{int(imported)}',
"context": "Sample.Periodic",
"format": "Raw",
"measurand": "Energy.Active.Import.Register",
"unit": "Wh"
},
]}],
meter_value=[{
"timestamp": self._get_formatted_time(),
"sampledValue": [{
"value": f'{int(imported)}',
"context": "Sample.Periodic",
"format": "Raw",
"measurand": "Energy.Active.Import.Register",
"unit": "Wh"
}]
}]
))
log.debug(f"Zählerstand {imported} an Chargebox ID: {chargebox_id} übermittelt.")
except Exception as e:
fault_state.from_exception(e)

def send_heart_beat(self: OptionalProtocol, chargebox_id: str, fault_state: FaultState) -> None:
async def send_heart_beat(self: OptionalProtocol, chargebox_id: str, fault_state: FaultState) -> None:
try:
self._process_call(chargebox_id, fault_state, call.Heartbeat())
await self._process_call(chargebox_id, fault_state, call.Heartbeat())
log.debug(f"Heartbeat an Chargebox ID: {chargebox_id} gesendet.")
except Exception as e:
fault_state.from_exception(e)

def stop_transaction(self: OptionalProtocol,
chargebox_id: str,
fault_state: FaultState,
imported: int,
transaction_id: int,
id_tag: str) -> None:
async def stop_transaction(self: OptionalProtocol, chargebox_id: str, fault_state: FaultState, imported: int, transaction_id: int, id_tag: str) -> None:
try:
self._process_call(chargebox_id, fault_state, call.StopTransaction(meter_stop=int(imported),
timestamp=self._get_formatted_time(),
transaction_id=transaction_id,
reason="EVDisconnected",
id_tag=id_tag if id_tag else ""
))
log.debug(f"Transaction mit ID: {transaction_id} für Chargebox ID: {chargebox_id} mit Tag: {id_tag} und "
f"Zählerstand: {imported} beendet.")
await self._process_call(chargebox_id, fault_state, call.StopTransaction(
meter_stop=int(imported),
timestamp=self._get_formatted_time(),
transaction_id=transaction_id,
reason="EVDisconnected",
id_tag=id_tag if id_tag else ""
))
log.debug(f"Transaction mit ID: {transaction_id} für Chargebox ID: {chargebox_id} mit Tag: {id_tag} und Zählerstand: {imported} beendet.")
except Exception as e:
fault_state.from_exception(e)