-
-
Notifications
You must be signed in to change notification settings - Fork 669
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
refactor(core/ethereum): improve API of the rlp module
- Loading branch information
Showing
4 changed files
with
149 additions
and
82 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
Refactor RLP codec for better clarity and some small memory savings. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,53 +1,97 @@ | ||
def int_to_bytes(x: int) -> bytes: | ||
if x == 0: | ||
return b"" | ||
r = bytearray() | ||
while x: | ||
r.append(x % 256) | ||
x //= 256 | ||
return bytes(reversed(r)) | ||
|
||
|
||
def encode_length(l: int, is_list: bool) -> bytes: | ||
offset = 0xC0 if is_list else 0x80 | ||
if l < 56: | ||
return bytes([l + offset]) | ||
elif l < 256 ** 8: | ||
bl = int_to_bytes(l) | ||
return bytes([len(bl) + offset + 55]) + bl | ||
from micropython import const | ||
|
||
if False: | ||
from typing import Union | ||
from trezor.utils import Writer | ||
|
||
# what we want: | ||
# RLPItem = Union[list["RLPItem"], bytes, int] | ||
# what mypy can process: | ||
RLPItem = Union[list, bytes, int] | ||
|
||
|
||
STRING_HEADER_BYTE = const(0x80) | ||
LIST_HEADER_BYTE = const(0xC0) | ||
|
||
|
||
def _byte_size(x: int) -> int: | ||
if x < 0: | ||
raise ValueError # only unsigned ints are supported | ||
for exp in range(64): | ||
if x < 0x100 ** exp: | ||
return exp | ||
else: | ||
raise ValueError("Input too long") | ||
|
||
|
||
def encode(data, include_length=True) -> bytes: | ||
if isinstance(data, int): | ||
data = int_to_bytes(data) | ||
if isinstance(data, bytearray): | ||
data = bytes(data) | ||
if isinstance(data, bytes): | ||
if (len(data) == 1 and ord(data) < 128) or not include_length: | ||
return data | ||
else: | ||
return encode_length(len(data), is_list=False) + data | ||
elif isinstance(data, list): | ||
output = b"" | ||
for item in data: | ||
output += encode(item) | ||
if include_length: | ||
return encode_length(len(output), is_list=True) + output | ||
else: | ||
return output | ||
raise ValueError # int is too large | ||
|
||
|
||
def int_to_bytes(x: int) -> bytes: | ||
return x.to_bytes(_byte_size(x), "big") | ||
|
||
|
||
def write_header( | ||
w: Writer, | ||
length: int, | ||
header_byte: int, | ||
data_start: bytes | None = None, | ||
) -> None: | ||
if length == 1 and data_start is not None and data_start[0] <= 0x7F: | ||
# no header when encoding one byte below 0x80 | ||
pass | ||
|
||
elif length <= 55: | ||
w.append(header_byte + length) | ||
|
||
else: | ||
raise TypeError("Invalid input of type " + str(type(data))) | ||
encoded_length = int_to_bytes(length) | ||
w.append(header_byte + 55 + len(encoded_length)) | ||
w.extend(encoded_length) | ||
|
||
|
||
def field_length(length: int, first_byte: bytearray) -> int: | ||
if length == 1 and first_byte[0] <= 0x7F: | ||
def header_length(length: int, data_start: bytes | None = None) -> int: | ||
if length == 1 and data_start is not None and data_start[0] <= 0x7F: | ||
# no header when encoding one byte below 0x80 | ||
return 0 | ||
|
||
if length <= 55: | ||
return 1 | ||
elif length <= 55: | ||
return 1 + length | ||
elif length <= 0xFF: | ||
return 2 + length | ||
elif length <= 0xFFFF: | ||
return 3 + length | ||
return 4 + length | ||
|
||
return 1 + _byte_size(length) | ||
|
||
|
||
def length(item: RLPItem) -> int: | ||
data: bytes | None = None | ||
if isinstance(item, int): | ||
data = int_to_bytes(item) | ||
item_length = len(data) | ||
elif isinstance(item, (bytes, bytearray)): | ||
data = item | ||
item_length = len(item) | ||
elif isinstance(item, list): | ||
item_length = sum(length(i) for i in item) | ||
else: | ||
raise TypeError | ||
|
||
return header_length(item_length, data) + item_length | ||
|
||
|
||
def write_string(w: Writer, string: bytes) -> None: | ||
write_header(w, len(string), STRING_HEADER_BYTE, string) | ||
w.extend(string) | ||
|
||
|
||
def write_list(w: Writer, lst: list[RLPItem]) -> None: | ||
payload_length = sum(length(item) for item in lst) | ||
write_header(w, payload_length, LIST_HEADER_BYTE) | ||
for item in lst: | ||
write(w, item) | ||
|
||
|
||
def write(w: Writer, item: RLPItem) -> None: | ||
if isinstance(item, int): | ||
write_string(w, int_to_bytes(item)) | ||
elif isinstance(item, (bytes, bytearray)): | ||
write_string(w, item) | ||
elif isinstance(item, list): | ||
write_list(w, item) | ||
else: | ||
raise TypeError |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters