Skip to content

Commit

Permalink
Refactoring of SecOC Layer and implementation for SecOC over CANFD (#…
Browse files Browse the repository at this point in the history
…4459)

* Refactoring of SecOC Layer and implementation for SecOC over CANFD

* fix unit test
  • Loading branch information
polybassa authored Jul 13, 2024
1 parent 836e4d5 commit ed7225d
Show file tree
Hide file tree
Showing 6 changed files with 271 additions and 98 deletions.
1 change: 0 additions & 1 deletion scapy/contrib/automotive/autosar/pdu.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ def extract_padding(self, s):
class PDUTransport(Packet):
"""
Packet representing PDUTransport containing multiple PDUs
FIXME: Support CAN messages as well.
"""
name = 'PDUTransport'
fields_desc = [
Expand Down
118 changes: 22 additions & 96 deletions scapy/contrib/automotive/autosar/secoc.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,11 @@
# Copyright (C) Nils Weiss <nils@we155.de>

# scapy.contrib.description = AUTOSAR Secure On-Board Communication
# scapy.contrib.status = loads
# scapy.contrib.status = library

"""
SecOC
"""
import struct

from scapy.config import conf
from scapy.error import log_loading

Expand All @@ -21,74 +19,35 @@
log_loading.info("Can't import python-cryptography v1.7+. "
"Disabled SecOC calculate_cmac.")

from scapy.base_classes import Packet_metaclass
from scapy.config import conf
from scapy.contrib.automotive.autosar.pdu import PDU
from scapy.fields import (XByteField, XIntField, PacketListField,
FieldLenField, PacketLenField, XStrFixedLenField)
from scapy.fields import PacketLenField
from scapy.packet import Packet, Raw

# Typing imports
from typing import (
Any,
Callable,
Dict,
Optional,
Set,
Tuple,
Type,
)


class PduPayloadField(PacketLenField):

__slots__ = ["guess_pkt_cls"]

def __init__(self,
name, # type: str
default, # type: Packet
guess_pkt_cls, # type: Callable[[Packet, bytes], Packet] # noqa: E501
length_from=None # type: Optional[Callable[[Packet], int]] # noqa: E501
):
# type: (...) -> None
super(PacketLenField, self).__init__(name, default, Raw)
self.length_from = length_from or (lambda x: 0)
self.guess_pkt_cls = guess_pkt_cls

def m2i(self, pkt, m): # type: ignore
# type: (Optional[Packet], bytes) -> Packet
return self.guess_pkt_cls(pkt, m)


class SecOC_PDU(Packet):
name = 'SecOC_PDU'
fields_desc = [
XIntField('pdu_id', 0),
FieldLenField('pdu_payload_len', None,
fmt="I",
length_of="pdu_payload",
adjust=lambda pkt, x: x + 4),
PduPayloadField('pdu_payload',
Raw(),
guess_pkt_cls=lambda pkt, data: SecOC_PDU.get_pdu_payload_cls(pkt, data), # noqa: E501
length_from=lambda pkt: pkt.pdu_payload_len - 4),
XByteField("tfv", 0), # truncated freshness value
XStrFixedLenField("tmac", None, length=3)] # truncated message authentication code # noqa: E501
class SecOCMixin:

pdu_payload_cls_by_identifier: Dict[int, Type[Packet]] = dict()
secoc_protected_pdus_by_identifier: Set[int] = set()

def secoc_authenticate(self) -> None:
self.tfv = struct.unpack(">B", self.get_secoc_freshness_value()[-1:])[0]
self.tmac = self.get_message_authentication_code()[0:3]
raise NotImplementedError

def secoc_verify(self) -> bool:
return self.get_message_authentication_code()[0:3] == self.tmac
raise NotImplementedError

def get_secoc_payload(self) -> bytes:
"""Override this method for customization
"""
return self.pdu_payload
raise NotImplementedError

def get_secoc_key(self) -> bytes:
"""Override this method for customization
Expand Down Expand Up @@ -123,56 +82,23 @@ def register_secoc_protected_pdu(cls,
@classmethod
def unregister_secoc_protected_pdu(cls, pdu_id: int) -> None:
cls.secoc_protected_pdus_by_identifier.remove(pdu_id)
del cls.secret_keys_by_identifier[pdu_id]
del cls.pdu_payload_cls_by_identifier[pdu_id]

@classmethod
def dispatch_hook(cls, s=None, *_args, **_kwds):
# type: (Optional[bytes], Any, Any) -> Packet_metaclass
"""dispatch_hook determines if PDU is protected by SecOC.
If PDU is protected, SecOC_PDU will be returned, otherwise AutoSAR PDU
will be returned.
"""
if s is None:
return SecOC_PDU
if len(s) < 4:
return Raw
identifier = struct.unpack('>I', s[0:4])[0]
if identifier in cls.secoc_protected_pdus_by_identifier:
return SecOC_PDU
else:
return PDU

@staticmethod
def get_pdu_payload_cls(pkt: Packet,
data: bytes
) -> Packet:
try:
cls = SecOC_PDU.pdu_payload_cls_by_identifier[pkt.pdu_id]
except KeyError:
cls = conf.raw_layer
return cls(data, _parent=pkt)

def extract_padding(self, s):
# type: (bytes) -> Tuple[bytes, Optional[bytes]]
return b"", s


class SecOC_PDUTransport(Packet):
"""
Packet representing SecOC_PDUTransport containing multiple PDUs
"""

name = 'SecOC_PDUTransport'
fields_desc = [
PacketListField("pdus", [SecOC_PDU()], pkt_cls=SecOC_PDU)
]
class PduPayloadField(PacketLenField):
__slots__ = ["guess_pkt_cls"]

@staticmethod
def register_secoc_protected_pdu(pdu_id: int,
pdu_payload_cls: Type[Packet] = Raw
) -> None:
SecOC_PDU.register_secoc_protected_pdu(pdu_id, pdu_payload_cls)
def __init__(self,
name, # type: str
default, # type: Packet
guess_pkt_cls, # type: Callable[[Packet, bytes], Packet] # noqa: E501
length_from=None # type: Optional[Callable[[Packet], int]] # noqa: E501
):
# type: (...) -> None
super(PacketLenField, self).__init__(name, default, Raw)
self.length_from = length_from or (lambda x: 0)
self.guess_pkt_cls = guess_pkt_cls

@staticmethod
def unregister_secoc_protected_pdu(pdu_id: int) -> None:
SecOC_PDU.unregister_secoc_protected_pdu(pdu_id)
def m2i(self, pkt, m): # type: ignore
# type: (Optional[Packet], bytes) -> Packet
return self.guess_pkt_cls(pkt, m)
91 changes: 91 additions & 0 deletions scapy/contrib/automotive/autosar/secoc_canfd.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
# SPDX-License-Identifier: GPL-2.0-only
# This file is part of Scapy
# See https://scapy.net/ for more information
# Copyright (C) Nils Weiss <nils@we155.de>

# scapy.contrib.description = AUTOSAR Secure On-Board Communication PDUs
# scapy.contrib.status = loads

"""
SecOC PDU
"""
import struct

from scapy.config import conf
from scapy.contrib.automotive.autosar.secoc import SecOCMixin, PduPayloadField
from scapy.base_classes import Packet_metaclass
from scapy.fields import (XByteField, FieldLenField, XStrFixedLenField,
FlagsField, XBitField, ShortField)
from scapy.layers.can import CANFD
from scapy.packet import Raw, Packet

# Typing imports
from typing import (
Any,
Optional,
Tuple,
)


class SecOC_CANFD(CANFD, SecOCMixin):
name = 'SecOC_CANFD'
fields_desc = [
FlagsField('flags', 0, 3, ['error',
'remote_transmission_request',
'extended']),
XBitField('identifier', 0, 29),
FieldLenField('length', None, length_of='pdu_payload',
fmt='B', adjust=lambda pkt, x: x + 4),
FlagsField('fd_flags', 4, 8, [
'bit_rate_switch', 'error_state_indicator', 'fd_frame']),
ShortField('reserved', 0),
PduPayloadField('pdu_payload',
Raw(),
guess_pkt_cls=lambda pkt, data: SecOC_CANFD.get_pdu_payload_cls(pkt, data), # noqa: E501
length_from=lambda pkt: pkt.length - 4),
XByteField("tfv", 0), # truncated freshness value
XStrFixedLenField("tmac", None, length=3)] # truncated message authentication code # noqa: E501

def secoc_authenticate(self) -> None:
self.tfv = struct.unpack(">B", self.get_secoc_freshness_value()[-1:])[0]
self.tmac = self.get_message_authentication_code()[0:3]

def secoc_verify(self) -> bool:
return self.get_message_authentication_code()[0:3] == self.tmac

def get_secoc_payload(self) -> bytes:
"""Override this method for customization
"""
return bytes(self.pdu_payload)

@classmethod
def dispatch_hook(cls, s=None, *_args, **_kwds):
# type: (Optional[bytes], Any, Any) -> Packet_metaclass
"""dispatch_hook determines if PDU is protected by SecOC.
If PDU is protected, SecOC_PDU will be returned, otherwise AutoSAR PDU
will be returned.
"""
if s is None:
return SecOC_CANFD
if len(s) < 4:
return Raw
identifier = struct.unpack('>I', s[0:4])[0] & 0x1FFFFFFF
if identifier in cls.secoc_protected_pdus_by_identifier:
return SecOC_CANFD
else:
return CANFD

@classmethod
def get_pdu_payload_cls(cls,
pkt: Packet,
data: bytes
) -> Packet:
try:
klass = cls.pdu_payload_cls_by_identifier[pkt.identifier]
except KeyError:
klass = conf.raw_layer
return klass(data, _parent=pkt)

def extract_padding(self, s):
# type: (bytes) -> Tuple[bytes, Optional[bytes]]
return b"", s
109 changes: 109 additions & 0 deletions scapy/contrib/automotive/autosar/secoc_pdu.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
# SPDX-License-Identifier: GPL-2.0-only
# This file is part of Scapy
# See https://scapy.net/ for more information
# Copyright (C) Nils Weiss <nils@we155.de>

# scapy.contrib.description = AUTOSAR Secure On-Board Communication PDUs
# scapy.contrib.status = loads

"""
SecOC PDU
"""
import struct

from scapy.config import conf
from scapy.contrib.automotive.autosar.secoc import SecOCMixin, PduPayloadField
from scapy.base_classes import Packet_metaclass
from scapy.contrib.automotive.autosar.pdu import PDU
from scapy.fields import (XByteField, XIntField, PacketListField,
FieldLenField, XStrFixedLenField)
from scapy.packet import Packet, Raw

# Typing imports
from typing import (
Any,
Optional,
Tuple,
Type,
)


class SecOC_PDU(Packet, SecOCMixin):
name = 'SecOC_PDU'
fields_desc = [
XIntField('pdu_id', 0),
FieldLenField('pdu_payload_len', None,
fmt="I",
length_of="pdu_payload",
adjust=lambda pkt, x: x + 4),
PduPayloadField('pdu_payload',
Raw(),
guess_pkt_cls=lambda pkt, data: SecOC_PDU.get_pdu_payload_cls(pkt, data), # noqa: E501
length_from=lambda pkt: pkt.pdu_payload_len - 4),
XByteField("tfv", 0), # truncated freshness value
XStrFixedLenField("tmac", None, length=3)] # truncated message authentication code # noqa: E501

def secoc_authenticate(self) -> None:
self.tfv = struct.unpack(">B", self.get_secoc_freshness_value()[-1:])[0]
self.tmac = self.get_message_authentication_code()[0:3]

def secoc_verify(self) -> bool:
return self.get_message_authentication_code()[0:3] == self.tmac

def get_secoc_payload(self) -> bytes:
"""Override this method for customization
"""
return self.pdu_payload

@classmethod
def dispatch_hook(cls, s=None, *_args, **_kwds):
# type: (Optional[bytes], Any, Any) -> Packet_metaclass
"""dispatch_hook determines if PDU is protected by SecOC.
If PDU is protected, SecOC_PDU will be returned, otherwise AutoSAR PDU
will be returned.
"""
if s is None:
return SecOC_PDU
if len(s) < 4:
return Raw
identifier = struct.unpack('>I', s[0:4])[0]
if identifier in cls.secoc_protected_pdus_by_identifier:
return SecOC_PDU
else:
return PDU

@classmethod
def get_pdu_payload_cls(cls,
pkt: Packet,
data: bytes
) -> Packet:
try:
klass = cls.pdu_payload_cls_by_identifier[pkt.pdu_id]
except KeyError:
klass = conf.raw_layer
return klass(data, _parent=pkt)

def extract_padding(self, s):
# type: (bytes) -> Tuple[bytes, Optional[bytes]]
return b"", s


class SecOC_PDUTransport(Packet):
"""
Packet representing SecOC_PDUTransport containing multiple PDUs
"""

name = 'SecOC_PDUTransport'
fields_desc = [
PacketListField("pdus", [SecOC_PDU()], pkt_cls=SecOC_PDU)
]

@staticmethod
def register_secoc_protected_pdu(pdu_id: int,
pdu_payload_cls: Type[Packet] = Raw
) -> None:
SecOC_PDU.register_secoc_protected_pdu(pdu_id, pdu_payload_cls)

@staticmethod
def unregister_secoc_protected_pdu(pdu_id: int) -> None:
SecOC_PDU.unregister_secoc_protected_pdu(pdu_id)
2 changes: 1 addition & 1 deletion test/contrib/automotive/autosar/secoc.uts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

= Load Contrib Layer

load_contrib("automotive.autosar.secoc")
load_contrib("automotive.autosar.secoc_pdu")

= Prepare SecOC keys

Expand Down
Loading

0 comments on commit ed7225d

Please sign in to comment.