-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge branch 'master' into feature/add-KVS-config-for-esp32
- Loading branch information
Showing
217 changed files
with
22,184 additions
and
16,008 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,125 @@ | ||
# | ||
# Copyright (c) 2022 Project CHIP Authors | ||
# All rights reserved. | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
# | ||
|
||
from matter_testing_support import MatterBaseTest, default_matter_test_main, async_test_body | ||
from matter_testing_support import hex_from_bytes, bytes_from_hex | ||
from chip.interaction_model import Status | ||
import chip.clusters as Clusters | ||
import logging | ||
from mobly import asserts | ||
from pathlib import Path | ||
from glob import glob | ||
from cryptography.x509 import load_der_x509_certificate, SubjectKeyIdentifier, AuthorityKeyIdentifier, Certificate | ||
from cryptography.exceptions import InvalidSignature | ||
from cryptography.hazmat.primitives import hashes | ||
from cryptography.hazmat.primitives.asymmetric import ec | ||
from cryptography.hazmat.primitives.serialization import PublicFormat, Encoding | ||
from typing import Optional | ||
|
||
FORBIDDEN_AKID = [ | ||
bytes_from_hex("78:5C:E7:05:B8:6B:8F:4E:6F:C7:93:AA:60:CB:43:EA:69:68:82:D5"), | ||
bytes_from_hex("6A:FD:22:77:1F:51:1F:EC:BF:16:41:97:67:10:DC:DC:31:A1:71:7E") | ||
] | ||
|
||
|
||
def load_all_paa(paa_path: Path) -> dict: | ||
logging.info("Loading all PAAs in %s" % paa_path) | ||
|
||
paa_by_skid = {} | ||
for filename in glob(str(paa_path.joinpath("*.der"))): | ||
with open(filename, "rb") as derfile: | ||
# Load cert | ||
paa_der = derfile.read() | ||
paa_cert = load_der_x509_certificate(paa_der) | ||
|
||
# Find the subject key identifier (if present), and record it | ||
for extension in paa_cert.extensions: | ||
if extension.oid == SubjectKeyIdentifier.oid: | ||
skid = extension.value.key_identifier | ||
paa_by_skid[skid] = (Path(filename).name, paa_cert) | ||
|
||
return paa_by_skid | ||
|
||
|
||
def extract_akid(cert: Certificate) -> Optional[bytes]: | ||
# Find the authority key identifier (if present) | ||
for extension in cert.extensions: | ||
if extension.oid == AuthorityKeyIdentifier.oid: | ||
return extension.value.key_identifier | ||
else: | ||
return None | ||
|
||
|
||
class TC_DA_1_7(MatterBaseTest): | ||
@async_test_body | ||
async def test_TC_DA_1_7(self): | ||
# Option to allow SDK roots (skip step 4 check 2) | ||
allow_sdk_dac = self.user_params.get("allow_sdk_dac", False) | ||
|
||
logging.info("Pre-condition: load all PAAs SKIDs") | ||
conf = self.matter_test_config | ||
paa_by_skid = load_all_paa(conf.paa_trust_store_path) | ||
logging.info("Found %d PAAs" % len(paa_by_skid)) | ||
|
||
logging.info("Step 1: Commissioning, already done") | ||
dev_ctrl = self.default_controller | ||
|
||
logging.info("Step 2: Get PAI of DUT1 with certificate chain request") | ||
result = await dev_ctrl.SendCommand(self.dut_node_id, 0, Clusters.OperationalCredentials.Commands.CertificateChainRequest(2)) | ||
pai_1 = result.certificate | ||
asserts.assert_less_equal(len(pai_1), 600, "PAI cert must be at most 600 bytes") | ||
self.record_data({"pai_1": hex_from_bytes(pai_1)}) | ||
|
||
logging.info("Step 3: Get DAC of DUT1 with certificate chain request") | ||
result = await dev_ctrl.SendCommand(self.dut_node_id, 0, Clusters.OperationalCredentials.Commands.CertificateChainRequest(1)) | ||
dac_1 = result.certificate | ||
asserts.assert_less_equal(len(dac_1), 600, "DAC cert must be at most 600 bytes") | ||
self.record_data({"dac_1": hex_from_bytes(dac_1)}) | ||
|
||
logging.info("Step 4 check 1: Ensure PAI's AKID matches a PAA and signature is valid") | ||
pai1_cert = load_der_x509_certificate(pai_1) | ||
pai1_akid = extract_akid(pai1_cert) | ||
if pai1_akid not in paa_by_skid: | ||
asserts.fail("DUT1's PAI (%s) not matched in PAA trust store" % hex_from_bytes(pai1_akid)) | ||
|
||
filename, paa_cert = paa_by_skid[pai1_akid] | ||
logging.info("Matched PAA file %s, subject: %s" % (filename, paa_cert.subject)) | ||
public_key = paa_cert.public_key() | ||
|
||
try: | ||
public_key.verify(signature=pai1_cert.signature, data=pai1_cert.tbs_certificate_bytes, | ||
signature_algorithm=ec.ECDSA(hashes.SHA256())) | ||
except InvalidSignature as e: | ||
asserts.fail("Failed to verify PAI signature against PAA public key: %s" % str(e)) | ||
logging.info("Validated PAI signature against PAA") | ||
|
||
logging.info("Step 4 check 2: Verify PAI AKID not in denylist of SDK PAIs") | ||
if allow_sdk_dac: | ||
logging.warn("===> TEST STEP SKIPPED: Allowing SDK DACs!") | ||
else: | ||
for candidate in FORBIDDEN_AKID: | ||
asserts.assert_not_equal(hex_from_bytes(pai1_akid), hex_from_bytes(candidate), "PAI AKID must not be in denylist") | ||
|
||
logging.info("Step 5: Extract subject public key of DAC and save") | ||
dac1_cert = load_der_x509_certificate(dac_1) | ||
pk_1 = dac1_cert.public_key().public_bytes(encoding=Encoding.X962, format=PublicFormat.UncompressedPoint) | ||
logging.info("Subject public key pk_1: %s" % hex_from_bytes(pk_1)) | ||
self.record_data({"pk_1": hex_from_bytes(pk_1)}) | ||
|
||
|
||
if __name__ == "__main__": | ||
default_matter_test_main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.