Skip to content

Commit

Permalink
[nrfconnect] Extracted private DAC key from DAC key .der file (#19453)
Browse files Browse the repository at this point in the history
The python script to create factory data should extract
the private DAC key from the DAC key der file and store
it directly into factory data.
There is no need to keep both keys (private and public)
in factory data because the public key is included in the
DAC certificate.

The final version of the factory data script has been tested
with an early version of Factory Data Accessor on the device's side,
prepared for the nrfconnect platform.
  • Loading branch information
ArekBalysNordic authored and pull[bot] committed Jun 16, 2022
1 parent 0005433 commit cfb95c5
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 25 deletions.
88 changes: 72 additions & 16 deletions scripts/tools/nrfconnect/generate_nrfconnect_chip_factory_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,52 @@
import subprocess
import logging as log
import base64
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.serialization import load_der_private_key

HEX_PREFIX = "hex:"
PUB_KEY_PREFIX = b'\x04'
INVALID_PASSCODES = [00000000, 11111111, 22222222, 33333333, 44444444,
55555555, 66666666, 77777777, 88888888, 99999999, 12345678, 87654321]


def gen_spake2p_params(spake2p_path: str, passcode: int, it: int, salt: str):
""" Generate spake2 params using external spake2p script"""
def get_raw_private_key_der(der_file: str, password: str):
""" Split given der file to get separated key pair consisting of public and private keys.
Args:
der_file (str): Path to .der file containing public and private keys
password (str): Password to decrypt Keys. It can be None, and then KEY is not encrypted.
Returns:
hex string: return a hex string containing extracted and decrypted private KEY from given .der file.
"""
try:
with open(der_file, 'rb') as file:
key_data = file.read()
if password is None:
log.warning("KEY password has not been provided. It means that DAC key is not encrypted.")
keys = load_der_private_key(key_data, password, backend=default_backend())
private_key = keys.private_numbers().private_value.to_bytes(32, byteorder='big')

return private_key

except IOError or ValueError:
return None


def gen_spake2p_params(spake2p_path: str, passcode: int, it: int, salt: str) -> dict:
""" Generate spake2 params using external spake2p script
Args:
spake2p_path (str): path to spake2 executable
passcode (int): Pairing passcode using in SPAKE 2
it (int): Iteration counter for SPAKE2 Verifier generation
salt (str): Salt used to generate SPAKE2 password
Returns:
dict: dictionary containing passcode, it, salt, and generated Verifier
"""

cmd = [
spake2p_path, 'gen-verifier',
'--iteration-count', str(it),
Expand All @@ -52,8 +90,8 @@ class FactoryDataGenerator:

def __init__(self, arguments) -> None:
"""
:param arguments: All input arguments parsed using ArgParse
Args:
arguments (any):All input arguments parsed using ArgParse
"""
self._args = arguments
self._factory_data = list()
Expand Down Expand Up @@ -104,9 +142,18 @@ def generate_json(self):
else:
rd_uid = self._args.rd_uid
if not self._args.spake2_verifier:
spake_2_verifier = base64.b64decode(self._generate_spake2_verifier()).hex()
spake_2_verifier = base64.b64decode(self._generate_spake2_verifier())
else:
spake_2_verifier = base64.b64decode(self._args.spake2_verifier).hex()
spake_2_verifier = base64.b64decode(self._args.spake2_verifier)

# convert salt to bytestring to be coherent with spake2 verifier type
spake_2_salt = base64.b64decode(self._args.spake2_salt)

# try to read DAC public and private keys
dac_priv_key = get_raw_private_key_der(self._args.dac_key, self._args.dac_key_password)
if dac_priv_key is None:
log.error("Can not read DAC keys from : {}".format(self._args.dac_key))
sys.exit(-1)

try:
json_file = open(self._args.output, "w+")
Expand All @@ -124,16 +171,16 @@ def generate_json(self):
self._add_entry("hw_ver", self._args.hw_ver)
self._add_entry("hw_ver_str", self._args.hw_ver_str)
self._add_entry("dac_cert", self._process_der(self._args.dac_cert))
self._add_entry("dac_key", self._process_der(self._args.dac_key))
self._add_entry("dac_key", dac_priv_key)
self._add_entry("pai_cert", self._process_der(self._args.pai_cert))
if self._args.include_passcode:
self._add_entry("passcode", self._args.passcode)
self._add_entry("spake2_it", self._args.spake2_it)
self._add_entry("spake2_salt", self._args.spake2_salt)
self._add_entry("spake2_verifier", HEX_PREFIX + spake_2_verifier)
self._add_entry("spake2_salt", spake_2_salt)
self._add_entry("spake2_verifier", spake_2_verifier)
self._add_entry("discriminator", self._args.discriminator)
if rd_uid:
self._add_entry("rd_uid", HEX_PREFIX + rd_uid)
self._add_entry("rd_uid", rd_uid)
# add user-specific data
self._add_entry("user", self._args.user)

Expand All @@ -154,6 +201,8 @@ def generate_json(self):

def _add_entry(self, name: str, value: any):
""" Add single entry to list of tuples ("key", "value") """
if(isinstance(value, bytes) or isinstance(value, bytearray)):
value = HEX_PREFIX + value.hex()
if value or (isinstance(value, int) and value == 0):
log.debug("Adding entry '{}' with size {} and type {}".format(name, sys.getsizeof(value), type(value)))
self._factory_data.append((name, value))
Expand All @@ -168,8 +217,8 @@ def _generate_rotating_device_uid(self):
""" If rotating device unique ID has not been provided it should be generated """
log.warning("Can not find rotating device UID in provided arguments list. A new one will be generated.")
rdu = secrets.token_bytes(16)
log.info("\n\nThe new rotate device UID: {}\n".format(rdu.hex()))
return rdu.hex()
log.info("\n\nThe new rotate device UID: {}\n".format(rdu).hex())
return rdu

def _validate_output_json(self, output_json: str):
"""
Expand All @@ -194,7 +243,7 @@ def _process_der(self, path: str):
log.debug("Processing der file...")
try:
with open(path, 'rb') as f:
data = HEX_PREFIX + f.read().hex()
data = f.read()
return data
except IOError as e:
log.error(e)
Expand Down Expand Up @@ -250,6 +299,7 @@ def allow_any_int(i): return int(i, 0)
help="[int] Provide BLE pairing discriminator. \
A 12-bit value matching the field of the same name in \
the setup code. Discriminator is used during a discovery process.")

# optional keys
optional_arguments.add_argument("--chip_cert_path", type=str,
help="Generate DAC and PAI certificates instead giving a path to .der files. This option requires a path to chip-cert executable."
Expand All @@ -258,10 +308,12 @@ def allow_any_int(i): return int(i, 0)
help="[.der] Provide the path to .der file containing DAC certificate.")
optional_arguments.add_argument("--dac_key", type=str,
help="[.der] Provide the path to .der file containing DAC keys.")
optional_arguments.add_argument("--pai_cert", type=str,
help="[.der] Provide the path to .der file containing PAI certificate.")
optional_arguments.add_argument("--generate_rd_uid", action="store_true",
help="Generate a new rotating device unique ID, print it out to console output and store it in factory data.")
optional_arguments.add_argument("--dac_key_password", type=str,
help="Provide a password to decode dac key. If dac key is not encrypted do not provide this argument.")
optional_arguments.add_argument("--pai_cert", type=str,
help="[.der] Provide the path to .der file containing PAI certificate.")
optional_arguments.add_argument("--rd_uid", type=str,
help="[hex string] Provide the rotating device unique ID. If this argument is not provided a new rotating device id unique id will be generated.")
optional_arguments.add_argument("--passcode", type=allow_any_int,
Expand All @@ -280,7 +332,11 @@ def allow_any_int(i): return int(i, 0)
if args.verbose:
log.basicConfig(format='[%(asctime)s][%(levelname)s] %(message)s', level=log.DEBUG)
else:
log.basicConfig(format='[%(asctime)s] %(message)s', level=log.INFO)
log.basicConfig(format='[%(levelname)s] %(message)s', level=log.INFO)

if(args.chip_cert_path):
log.error("Generating DAC and PAI certificates is not supported yet")
return

# check if json file already exist
if(exists(args.output) and not args.overwrite):
Expand Down
18 changes: 9 additions & 9 deletions scripts/tools/nrfconnect/nrfconnect_factory_data.schema
Original file line number Diff line number Diff line change
Expand Up @@ -73,15 +73,19 @@
},
"dac_cert": {
"description": "DAC certificate in hex-string format",
"type": "string"
"type": "string",
"maxLength": 1204
},
"dac_key": {
"description": "DAC Private Key in hex-string format",
"type": "string"
"type": "string",
"minLength": 68,
"maxLength": 68
},
"pai_cert": {
"description": "PAI certificate in hex-string format",
"type": "string"
"type": "string",
"maxLength": 1204
},
"passcode": {
"description": "A default PASE session passcode",
Expand All @@ -98,8 +102,8 @@
"spake2_salt": {
"description": "A key-derivation function for the Symmetric Password-Authenticated Key Exchange.",
"type": "string",
"minLength": 16,
"maxLength": 32
"minLength": 36,
"maxLength": 68
},
"spake2_verifier": {
"description": "A verifier for the Symmetric Password-Authenticated Key Exchange",
Expand All @@ -112,10 +116,6 @@
"minimum": 0,
"maximum": 4095
},
"fw_info": {
"description": "Information about programmed firmware in TLV format",
"type": "string"
},
"user": {
"description": "A user-specific additional data which should be added to factory data. This should be a Json format.",
"type": "object"
Expand Down

0 comments on commit cfb95c5

Please sign in to comment.