Skip to content

Commit

Permalink
switch to orjson
Browse files Browse the repository at this point in the history
Signed-off-by: Clemens Vasters <clemens@vasters.com>
  • Loading branch information
clemensv committed Jan 28, 2025
1 parent 2470369 commit 3da2898
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 66 deletions.
2 changes: 1 addition & 1 deletion mode-s/mode_s_producer/mode_s_producer_data/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ packages = [ { include = "mode_s_producer_data", from="src" } ]

[tool.poetry.dependencies]
python = "<4.0,>=3.10"
dataclasses-json = "^0.6.7"
orjson = "^3.10"

[tool.poetry.dev-dependencies]
pylint = "^3.2.3"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,20 @@
import enum
import typing
import dataclasses
import dataclasses_json
import orjson
from dataclasses import dataclass
from dataclasses_json import Undefined, dataclass_json
import json
from mode_s_producer_data.mode_s.modes_adsb_record import ModeS_ADSB_Record


@dataclass_json(undefined=Undefined.EXCLUDE)
@dataclass
class Messages:
"""
A container for multiple Mode-S and ADS-B decoded messages.
Attributes:
messages (typing.List[ModeS_ADSB_Record]): An array of Mode-S and ADS-B decoded message records."""

messages: typing.List[ModeS_ADSB_Record]=dataclasses.field(kw_only=True, metadata=dataclasses_json.config(field_name="messages"))
messages: typing.List[ModeS_ADSB_Record]=dataclasses.field(kw_only=True)


def __post_init__(self):
Expand Down Expand Up @@ -49,19 +47,24 @@ def to_serializer_dict(self) -> dict:
The dictionary representation of the dataclass.
"""
asdict_result = dataclasses.asdict(self, dict_factory=self._dict_resolver)
asdict_result = {k: v for k, v in asdict_result.items() if v is not None}
return asdict_result

def _dict_resolver(self, data):
"""
Helps resolving the Enum values to their actual values and fixes the key names.
"""
def _resolve_enum(v):
if isinstance(v,enum.Enum):
def _resolve_value(v):
if isinstance(v, list):
return [e.to_serializer_dict() if hasattr(e, 'to_serializer_dict') and callable(e.to_serializer_dict) else e for e in v]
if isinstance(v, enum.Enum):
return v.value
if hasattr(v, 'to_serializer_dict') and callable(v.to_serializer_dict):
return v.to_serializer_dict()
return v
def _fix_key(k):
return k[:-1] if k.endswith('_') else k
return {_fix_key(k): _resolve_enum(v) for k, v in iter(data)}
return {_fix_key(k): _resolve_value(v) for k, v in iter(data) if v is not None}

def to_byte_array(self, content_type_string: str) -> bytes:
"""
Expand All @@ -80,9 +83,7 @@ def to_byte_array(self, content_type_string: str) -> bytes:
content_type = content_type_string.split(';')[0].strip()
result = None
if content_type == 'application/json':
#pylint: disable=no-member
result = self.to_json()
#pylint: enable=no-member
result = orjson.dumps(self.to_serializer_dict())

if result is not None and content_type.endswith('+gzip'):
with io.BytesIO() as stream:
Expand Down Expand Up @@ -129,7 +130,7 @@ def from_data(cls, data: typing.Any, content_type_string: typing.Optional[str] =
if content_type == 'application/json':
if isinstance(data, (bytes, str)):
data_str = data.decode('utf-8') if isinstance(data, bytes) else data
_record = json.loads(data_str)
_record = orjson.loads(data_str)
return Messages.from_serializer_dict(_record)
else:
raise NotImplementedError('Data is not of a supported type for JSON deserialization')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,12 @@
import enum
import typing
import dataclasses
import dataclasses_json
import orjson
from dataclasses import dataclass
from dataclasses_json import Undefined, dataclass_json
import json
import datetime


@dataclass_json(undefined=Undefined.EXCLUDE)
@dataclass
class ModeS_ADSB_Record:
"""
Expand Down Expand Up @@ -54,40 +52,40 @@ class ModeS_ADSB_Record:
tgt (typing.Optional[str]): Target state info. Present for certain BDS6,2 or ADS-B TC29; null otherwise.
opst (typing.Optional[str]): Operational status info. Present for certain BDS6,5 or ADS-B TC31; null otherwise."""

ts: datetime.datetime=dataclasses.field(kw_only=True, metadata=dataclasses_json.config(field_name="ts"))
icao: str=dataclasses.field(kw_only=True, metadata=dataclasses_json.config(field_name="icao"))
df: int=dataclasses.field(kw_only=True, metadata=dataclasses_json.config(field_name="df"))
tc: typing.Optional[int]=dataclasses.field(kw_only=True, metadata=dataclasses_json.config(field_name="tc"))
bcode: typing.Optional[str]=dataclasses.field(kw_only=True, metadata=dataclasses_json.config(field_name="bcode"))
alt: typing.Optional[int]=dataclasses.field(kw_only=True, metadata=dataclasses_json.config(field_name="alt"))
cs: typing.Optional[str]=dataclasses.field(kw_only=True, metadata=dataclasses_json.config(field_name="cs"))
sq: typing.Optional[str]=dataclasses.field(kw_only=True, metadata=dataclasses_json.config(field_name="sq"))
lat: typing.Optional[float]=dataclasses.field(kw_only=True, metadata=dataclasses_json.config(field_name="lat"))
lon: typing.Optional[float]=dataclasses.field(kw_only=True, metadata=dataclasses_json.config(field_name="lon"))
spd: typing.Optional[float]=dataclasses.field(kw_only=True, metadata=dataclasses_json.config(field_name="spd"))
ang: typing.Optional[float]=dataclasses.field(kw_only=True, metadata=dataclasses_json.config(field_name="ang"))
vr: typing.Optional[int]=dataclasses.field(kw_only=True, metadata=dataclasses_json.config(field_name="vr"))
spd_type: typing.Optional[str]=dataclasses.field(kw_only=True, metadata=dataclasses_json.config(field_name="spd_type"))
dir_src: typing.Optional[str]=dataclasses.field(kw_only=True, metadata=dataclasses_json.config(field_name="dir_src"))
vr_src: typing.Optional[str]=dataclasses.field(kw_only=True, metadata=dataclasses_json.config(field_name="vr_src"))
ws: typing.Optional[int]=dataclasses.field(kw_only=True, metadata=dataclasses_json.config(field_name="ws"))
wd: typing.Optional[int]=dataclasses.field(kw_only=True, metadata=dataclasses_json.config(field_name="wd"))
at: typing.Optional[float]=dataclasses.field(kw_only=True, metadata=dataclasses_json.config(field_name="at"))
ap: typing.Optional[float]=dataclasses.field(kw_only=True, metadata=dataclasses_json.config(field_name="ap"))
hm: typing.Optional[float]=dataclasses.field(kw_only=True, metadata=dataclasses_json.config(field_name="hm"))
roll: typing.Optional[float]=dataclasses.field(kw_only=True, metadata=dataclasses_json.config(field_name="roll"))
trak: typing.Optional[float]=dataclasses.field(kw_only=True, metadata=dataclasses_json.config(field_name="trak"))
gs: typing.Optional[float]=dataclasses.field(kw_only=True, metadata=dataclasses_json.config(field_name="gs"))
tas: typing.Optional[float]=dataclasses.field(kw_only=True, metadata=dataclasses_json.config(field_name="tas"))
hd: typing.Optional[float]=dataclasses.field(kw_only=True, metadata=dataclasses_json.config(field_name="hd"))
ias: typing.Optional[float]=dataclasses.field(kw_only=True, metadata=dataclasses_json.config(field_name="ias"))
m: typing.Optional[float]=dataclasses.field(kw_only=True, metadata=dataclasses_json.config(field_name="m"))
vrb: typing.Optional[float]=dataclasses.field(kw_only=True, metadata=dataclasses_json.config(field_name="vrb"))
vri: typing.Optional[float]=dataclasses.field(kw_only=True, metadata=dataclasses_json.config(field_name="vri"))
rssi: typing.Optional[float]=dataclasses.field(kw_only=True, metadata=dataclasses_json.config(field_name="rssi"))
emst: typing.Optional[str]=dataclasses.field(kw_only=True, metadata=dataclasses_json.config(field_name="emst"))
tgt: typing.Optional[str]=dataclasses.field(kw_only=True, metadata=dataclasses_json.config(field_name="tgt"))
opst: typing.Optional[str]=dataclasses.field(kw_only=True, metadata=dataclasses_json.config(field_name="opst"))
ts: datetime.datetime=dataclasses.field(kw_only=True)
icao: str=dataclasses.field(kw_only=True)
df: int=dataclasses.field(kw_only=True)
tc: typing.Optional[int]=dataclasses.field(kw_only=True)
bcode: typing.Optional[str]=dataclasses.field(kw_only=True)
alt: typing.Optional[int]=dataclasses.field(kw_only=True)
cs: typing.Optional[str]=dataclasses.field(kw_only=True)
sq: typing.Optional[str]=dataclasses.field(kw_only=True)
lat: typing.Optional[float]=dataclasses.field(kw_only=True)
lon: typing.Optional[float]=dataclasses.field(kw_only=True)
spd: typing.Optional[float]=dataclasses.field(kw_only=True)
ang: typing.Optional[float]=dataclasses.field(kw_only=True)
vr: typing.Optional[int]=dataclasses.field(kw_only=True)
spd_type: typing.Optional[str]=dataclasses.field(kw_only=True)
dir_src: typing.Optional[str]=dataclasses.field(kw_only=True)
vr_src: typing.Optional[str]=dataclasses.field(kw_only=True)
ws: typing.Optional[int]=dataclasses.field(kw_only=True)
wd: typing.Optional[int]=dataclasses.field(kw_only=True)
at: typing.Optional[float]=dataclasses.field(kw_only=True)
ap: typing.Optional[float]=dataclasses.field(kw_only=True)
hm: typing.Optional[float]=dataclasses.field(kw_only=True)
roll: typing.Optional[float]=dataclasses.field(kw_only=True)
trak: typing.Optional[float]=dataclasses.field(kw_only=True)
gs: typing.Optional[float]=dataclasses.field(kw_only=True)
tas: typing.Optional[float]=dataclasses.field(kw_only=True)
hd: typing.Optional[float]=dataclasses.field(kw_only=True)
ias: typing.Optional[float]=dataclasses.field(kw_only=True)
m: typing.Optional[float]=dataclasses.field(kw_only=True)
vrb: typing.Optional[float]=dataclasses.field(kw_only=True)
vri: typing.Optional[float]=dataclasses.field(kw_only=True)
rssi: typing.Optional[float]=dataclasses.field(kw_only=True)
emst: typing.Optional[str]=dataclasses.field(kw_only=True)
tgt: typing.Optional[str]=dataclasses.field(kw_only=True)
opst: typing.Optional[str]=dataclasses.field(kw_only=True)


def __post_init__(self):
Expand Down Expand Up @@ -149,6 +147,7 @@ def to_serializer_dict(self) -> dict:
The dictionary representation of the dataclass.
"""
asdict_result = dataclasses.asdict(self, dict_factory=self._dict_resolver)
asdict_result = {k: v for k, v in asdict_result.items() if v is not None}
return asdict_result

def _dict_resolver(self, data):
Expand All @@ -161,7 +160,7 @@ def _resolve_enum(v):
return v
def _fix_key(k):
return k[:-1] if k.endswith('_') else k
return {_fix_key(k): _resolve_enum(v) for k, v in iter(data)}
return {_fix_key(k): _resolve_enum(v) for k, v in iter(data) if v is not None}

def to_byte_array(self, content_type_string: str) -> bytes:
"""
Expand All @@ -180,9 +179,7 @@ def to_byte_array(self, content_type_string: str) -> bytes:
content_type = content_type_string.split(';')[0].strip()
result = None
if content_type == 'application/json':
#pylint: disable=no-member
result = self.to_json()
#pylint: enable=no-member
result = orjson.dumps(self.to_serializer_dict())

if result is not None and content_type.endswith('+gzip'):
with io.BytesIO() as stream:
Expand Down Expand Up @@ -229,7 +226,7 @@ def from_data(cls, data: typing.Any, content_type_string: typing.Optional[str] =
if content_type == 'application/json':
if isinstance(data, (bytes, str)):
data_str = data.decode('utf-8') if isinstance(data, bytes) else data
_record = json.loads(data_str)
_record = orjson.loads(data_str)
return ModeS_ADSB_Record.from_serializer_dict(_record)
else:
raise NotImplementedError('Data is not of a supported type for JSON deserialization')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,20 @@
import enum
import typing
import dataclasses
import dataclasses_json
import orjson
from dataclasses import dataclass
from dataclasses_json import Undefined, dataclass_json
import json
from mode_s_producer_data.mode_s.modes_adsb_record import ModeS_ADSB_Record


@dataclass_json(undefined=Undefined.EXCLUDE)
@dataclass
class ModeSMessages:
"""
A container for multiple Mode-S and ADS-B decoded messages.
Attributes:
messages (typing.List[ModeS_ADSB_Record]): An array of Mode-S and ADS-B decoded message records."""

messages: typing.List[ModeS_ADSB_Record]=dataclasses.field(kw_only=True, metadata=dataclasses_json.config(field_name="messages"))
messages: typing.List[ModeS_ADSB_Record]=dataclasses.field(kw_only=True)


def __post_init__(self):
Expand Down Expand Up @@ -49,19 +47,22 @@ def to_serializer_dict(self) -> dict:
The dictionary representation of the dataclass.
"""
asdict_result = dataclasses.asdict(self, dict_factory=self._dict_resolver)
asdict_result = {k: v for k, v in asdict_result.items() if v is not None}
return asdict_result

def _dict_resolver(self, data):
"""
Helps resolving the Enum values to their actual values and fixes the key names.
"""
def _resolve_enum(v):
if isinstance(v,enum.Enum):
def _resolve_value(v):
if isinstance(v, enum.Enum):
return v.value
if hasattr(v, 'to_serializer_dict') and callable(v.to_serializer_dict):
return v.to_serializer_dict()
return v
def _fix_key(k):
return k[:-1] if k.endswith('_') else k
return {_fix_key(k): _resolve_enum(v) for k, v in iter(data)}
return {_fix_key(k): _resolve_value(v) for k, v in iter(data)}

def to_byte_array(self, content_type_string: str) -> bytes:
"""
Expand All @@ -80,9 +81,7 @@ def to_byte_array(self, content_type_string: str) -> bytes:
content_type = content_type_string.split(';')[0].strip()
result = None
if content_type == 'application/json':
#pylint: disable=no-member
result = self.to_json()
#pylint: enable=no-member
result = orjson.dumps(self.to_serializer_dict())

if result is not None and content_type.endswith('+gzip'):
with io.BytesIO() as stream:
Expand Down Expand Up @@ -129,7 +128,7 @@ def from_data(cls, data: typing.Any, content_type_string: typing.Optional[str] =
if content_type == 'application/json':
if isinstance(data, (bytes, str)):
data_str = data.decode('utf-8') if isinstance(data, bytes) else data
_record = json.loads(data_str)
_record = orjson.loads(data_str)
return ModeSMessages.from_serializer_dict(_record)
else:
raise NotImplementedError('Data is not of a supported type for JSON deserialization')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from confluent_kafka import Producer, KafkaException, Message
from cloudevents.kafka import to_binary, to_structured, KafkaMessage
from cloudevents.http import CloudEvent
import orjson
from mode_s_producer_data.mode_s.messages import Messages

class ModeSEventProducer:
Expand Down Expand Up @@ -57,7 +58,7 @@ async def send_mode_s_messages(self,_stationid : str, data: Messages, content_ty
attributes["datacontenttype"] = content_type
event = CloudEvent.create(attributes, data)
if self.content_mode == "structured":
message = to_structured(event, data_marshaller=lambda x: json.loads(x.to_json()), key_mapper=lambda x: self.__key_mapper(x, data, key_mapper))
message = to_structured(event, data_marshaller=lambda x: orjson.loads(orjson.dumps(x.to_serializer_dict())), key_mapper=lambda x: self.__key_mapper(x, data, key_mapper))
message.headers[b"content-type"] = b"application/cloudevents+json"
else:
content_type = "application/json"
Expand Down

0 comments on commit 3da2898

Please sign in to comment.