Skip to content

Commit

Permalink
Big refactor of python files
Browse files Browse the repository at this point in the history
  • Loading branch information
thomwiggers committed Mar 21, 2019
1 parent 65ec9d8 commit f530577
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 234 deletions.
58 changes: 8 additions & 50 deletions benchmarks.py
Original file line number Diff line number Diff line change
@@ -1,54 +1,12 @@
#!/usr/bin/env python3
import sys
import os
import datetime
import time
import utils
from mupq import mupq
from interface import M4Settings, M4


def benchmarkBinary(benchmark, binary):
binpath = os.path.join("bin", binary)
if __name__ == "__main__":
m4 = M4()
test = mupq.StackBenchmark(M4Settings(), m4)
test.test_all()

info = binary.split('_')
primitive = '_'.join(info[:2])
scheme = '_'.join(info[2:-2])
implementation = info[-2]

if utils.m4ignore(primitive, scheme, implementation):
return

if len(sys.argv) > 1 and scheme not in sys.argv[1:]:
return

result = utils.m4run(binpath)
if 'ERROR KEYS' in result:
print("")
print("!!! KEY MISMATCH DURING BENCHMARKING !!!")
print(" This could indicate illegal stack usage,")
print(" leading to errors when measurement interrupts occur.")
print("")
print(" .. exiting with errors!")
sys.exit(1)

timestamp = datetime.datetime.fromtimestamp(time.time()).strftime('%Y%m%d%H%M%S')
filename = os.path.join('benchmarks/', benchmark, primitive, scheme, implementation, timestamp)
os.makedirs(os.path.dirname(filename), exist_ok=True)
with open(filename, 'w') as f:
f.write(result.strip())
print(" .. wrote benchmarks!")

def doBenchmarks(benchmark):
try:
binaries = [x for x in os.listdir('bin') if (benchmark+".bin") in x]
except FileNotFoundError:
print("There is no bin/ folder. Please first make binaries.")
sys.exit(1)

print("This script flashes the benchmarking binaries onto the board, ")
print(" and then writes the resulting output to the benchmarks directory.")

for binary in binaries:
benchmarkBinary(benchmark, binary)

doBenchmarks("stack")
doBenchmarks("speed")
test = mupq.SpeedBenchmark(M4Settings(), m4)
test.test_all()
84 changes: 25 additions & 59 deletions interface.py
Original file line number Diff line number Diff line change
@@ -1,69 +1,35 @@
import subprocess
import logging

import serial

dev = serial.Serial("/dev/ttyUSB0", 115200, timeout=10)
from mupq import mupq

class Platform(object):
"""Implements the M4 platform"""

def __init__(self, binary_path):
self.log = logging.getLogger("m4 interface")
self.binary_path = binary_path
class M4Settings(mupq.PlatformSettings):
#: Specify folders to include
scheme_folders = [ # mupq.PlatformSettings.scheme_folders + [
('pqm4', 'crypto_kem'),
('pqm4', 'crypto_sign'),
]

def _flash(self):
self.log.info("Flashing %s to device", self.binary_path)
self.state = 'waiting'
self.equals_seen = 0
subprocess.check_call(
["st-flash", "--reset", "write", self.binary_path, "0x8000000"],
)
#: List of dicts, in each dict specify (Scheme class) attributes of the
#: scheme with values, if all attributes match the scheme is skipped.
skip_list = (
{'scheme': 'frodo640-aes', 'implementation': 'ref'},
{'scheme': 'frodo640-cshake', 'implementation': 'ref'},
)

def run(self):
"""Runs the flashed target and collects the result"""
self._flash()
self._wait_for_start()
self.log.info("Output started")
return self._read_output()

def _wait_for_start(self):
"""Waits until we read five equals signs"""
while self.state == 'waiting':
x = dev.read()
if x == b'':
self.log.warning(
"timed out while waiting for the markers, reflashing")
self._flash()
elif x == b'=':
self.equals_seen += 1
continue
elif self.equals_seen > 5:
self.state = 'beginning'
self.log.debug("Found output marker")
elif self.equals_seen > 1:
logging.warning(
"Got garbage after finding first equals sign, restarting"
)
self._flash()
# Read remaining = signs
while self.state == 'beginning':
x = dev.read()
# Consume remaining =
if x != b'=':
self.output = [x]
self.state = 'reading'
break
dev = serial.Serial("/dev/ttyUSB0", 115200, timeout=10)


def _read_output(self):
while self.state == 'reading':
x = dev.read()
if x == b'#':
self.state = 'done'
break
elif x != b'':
self.output.append(x)
output = b''.join(self.output).decode('utf-8', 'ignore')
# sometimes there's a line full of markers; strip out to avoid errors
lines = (x for x in output.split('\n') if not all(c == '=' for c in x))
return "{}\n".format('\n'.join(lines))
class M4(mupq.Platform):

def device(self):
return dev

def flash(self, binary_path):
super().flash(binary_path)
subprocess.check_call(
["st-flash", "--reset", "write", binary_path, "0x8000000"],
)
2 changes: 1 addition & 1 deletion mupq
Submodule mupq updated 2 files
+123 −0 .gitignore
+281 −0 mupq.py
43 changes: 5 additions & 38 deletions test.py
Original file line number Diff line number Diff line change
@@ -1,41 +1,8 @@
#!/usr/bin/env python3
import serial
import sys
import os
import subprocess
import utils
from mupq import mupq
from interface import M4Settings, M4


try:
binaries = [x for x in os.listdir('bin') if 'test.bin' in x]
except FileNotFoundError:
print("There is no bin/ folder. Please first make binaries.")
sys.exit(1)


def doTest(binary):
binpath = os.path.join("bin", binary)
info = binary.split('_')
primitive = '_'.join(info[:2])
scheme = '_'.join(info[2:-2])
implementation = info[-2]

if utils.m4ignore(primitive, scheme, implementation):
return

if len(sys.argv) > 1 and scheme not in sys.argv[1:]:
return

result = utils.m4run(binpath)

print("Testing if tests were successful..")
contents = result.strip()
# can we find a nicer way of checking if tests ran correctly?
if contents.count("ERROR") != 0 or contents.count("OK") != 30:
print("FAILED!")
else:
print("passed.")


for binary in binaries:
doTest(binary)
if __name__ == "__main__":
test = mupq.SimpleTest(M4Settings(), M4())
test.test_all()
76 changes: 5 additions & 71 deletions testvectors.py
Original file line number Diff line number Diff line change
@@ -1,73 +1,7 @@
#!/usr/bin/env python3
import sys
import os
import subprocess
import hashlib
import utils
from mupq import mupq
from interface import M4Settings, M4


try:
binaries = [x for x in os.listdir('bin') if 'testvectors.bin' in x]
except FileNotFoundError:
print("There is no bin/ folder. Please first make binaries.")
sys.exit(1)

try:
binaries_host = [x for x in os.listdir('bin-host') if 'testvectors' in x]
except FileNotFoundError:
print("There is no bin-host/ folder. Please first make binaries.")
sys.exit(1)

print("This script flashes the test vector binaries onto the board, and then")
print(" writes the resulting output to the testvectors directory. It then")
print(" checks if these are internally consistent and match the test vectors")
print(" when running the reference code locally.")

for binary in binaries + binaries_host:
info = binary.split('_')
primitive = '_'.join(info[:2])
scheme = '_'.join(info[2:-2])
impl = info[-2]

if len(sys.argv) > 1 and scheme not in sys.argv[1:]:
continue

# if this is a binary that needs to be ran on the board:
if binary[-4:] == '.bin':
if utils.m4ignore(primitive, scheme, impl):
continue
binpath = os.path.join("bin", binary)

results = utils.m4run(binpath)
filename = os.path.join('testvectors/', primitive, scheme, impl)
os.makedirs(os.path.dirname(filename), exist_ok=True)
with open(filename, 'w') as f:
f.write(results.lstrip())
else:
binpath = os.path.join("bin-host", binary)
print("Running {}..".format(binpath))
filename = os.path.join('testvectors/', primitive, scheme, "host")
os.makedirs(os.path.dirname(filename), exist_ok=True)
with open(filename, 'w') as f:
subprocess.run([binpath], stdout=f, stderr=subprocess.DEVNULL)
print(" .. wrote test vectors!")

if not os.path.isdir('testvectors'):
sys.exit(0)

print("Testing if test vectors are consistent..")
for primitive in os.listdir('testvectors'):
for scheme in os.listdir(os.path.join('testvectors', primitive)):
print(" .. {}: ".format(os.path.join(primitive, scheme)), end='')
hashes = dict()
for impl in os.listdir(os.path.join('testvectors', primitive, scheme)):
path = os.path.join('testvectors', primitive, scheme, impl)
with open(path, 'rb') as file:
hashes[file.name] = hashlib.sha3_256(file.read()).hexdigest()
if len(set(hashes.values())) <= 1:
print("passed.")
else:
print("FAILED!")
for file, checksum in hashes.items():
print((" {: <{width}} sha3:{}").format(
file, checksum, width=max(len(file) for file in hashes)))
if __name__ == "__main__":
test = mupq.TestVectors(M4Settings(), M4())
test.test_all()
15 changes: 0 additions & 15 deletions utils.py

This file was deleted.

0 comments on commit f530577

Please sign in to comment.