-
Notifications
You must be signed in to change notification settings - Fork 184
/
safe_signature.py
366 lines (310 loc) · 12.5 KB
/
safe_signature.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
from abc import ABC, abstractmethod
from enum import Enum
from logging import getLogger
from typing import List, Optional, Sequence, Union
from eth_abi import decode as decode_abi
from eth_abi import encode as encode_abi
from eth_abi.exceptions import DecodingError
from eth_account.messages import defunct_hash_message
from eth_typing import BlockIdentifier, ChecksumAddress, HexAddress, HexStr
from hexbytes import HexBytes
from web3.exceptions import Web3Exception
from safe_eth.eth import EthereumClient
from safe_eth.eth.contracts import (
get_compatibility_fallback_handler_contract,
get_safe_contract,
)
from safe_eth.eth.utils import fast_to_checksum_address
from safe_eth.safe.signatures import (
get_signing_address,
signature_split,
signature_to_bytes,
)
logger = getLogger(__name__)
EthereumBytes = Union[bytes, str]
class SafeSignatureException(Exception):
pass
class CannotCheckEIP1271ContractSignature(SafeSignatureException):
pass
class SafeSignatureType(Enum):
CONTRACT_SIGNATURE = 0
APPROVED_HASH = 1
EOA = 2
ETH_SIGN = 3
@staticmethod
def from_v(v: int):
if v == 0:
return SafeSignatureType.CONTRACT_SIGNATURE
elif v == 1:
return SafeSignatureType.APPROVED_HASH
elif v > 30:
return SafeSignatureType.ETH_SIGN
else:
return SafeSignatureType.EOA
def uint_to_address(value: int) -> ChecksumAddress:
"""
Convert a Solidity `uint` value to a checksummed `address`, removing
invalid padding bytes if present
:return: Checksummed address
"""
encoded = encode_abi(["uint"], [value])
# Remove padding bytes, as Solidity will ignore it but `eth_abi` will not
encoded_without_padding_bytes = b"\x00" * 12 + encoded[-20:]
return fast_to_checksum_address(
decode_abi(["address"], encoded_without_padding_bytes)[0]
)
class SafeSignature(ABC):
def __init__(self, signature: EthereumBytes, safe_hash: EthereumBytes):
"""
:param signature: Owner signature
:param safe_hash: Signed hash for the Safe (message or transaction)
"""
self.signature = HexBytes(signature)
self.safe_hash = HexBytes(safe_hash)
self.v, self.r, self.s = signature_split(self.signature)
def __str__(self):
return f"SafeSignature type={self.signature_type.name} owner={self.owner}"
@classmethod
def parse_signature(
cls,
signatures: EthereumBytes,
safe_hash: EthereumBytes,
safe_hash_preimage: Optional[EthereumBytes] = None,
ignore_trailing: bool = True,
) -> List["SafeSignature"]:
"""
:param signatures: One or more signatures appended. EIP1271 data at the end is supported.
:param safe_hash: Signed hash for the Safe (message or transaction)
:param safe_hash_preimage: ``safe_hash`` preimage for EIP1271 validation
:param ignore_trailing: Ignore trailing data on the signature. Some libraries pad it and add some zeroes at
the end
:return: List of SafeSignatures decoded
"""
if not signatures:
return []
elif isinstance(signatures, str):
signatures = HexBytes(signatures)
signature_size = 65 # For contract signatures there'll be some data at the end
data_position = len(
signatures
) # For contract signatures, to stop parsing at data position
safe_signatures = []
for i in range(0, len(signatures), signature_size):
if (
i >= data_position
): # If contract signature data position is reached, stop
break
signature = signatures[i : i + signature_size]
if ignore_trailing and len(signature) < 65:
# Trailing stuff
break
v, r, s = signature_split(signature)
signature_type = SafeSignatureType.from_v(v)
safe_signature: "SafeSignature"
if signature_type == SafeSignatureType.CONTRACT_SIGNATURE:
if s < data_position:
data_position = s
contract_signature_len = int.from_bytes(
signatures[s : s + 32], "big"
) # Len size is 32 bytes
contract_signature = signatures[
s + 32 : s + 32 + contract_signature_len
] # Skip array size (32 bytes)
safe_signature = SafeSignatureContract(
signature,
safe_hash,
safe_hash_preimage or safe_hash,
contract_signature,
)
elif signature_type == SafeSignatureType.APPROVED_HASH:
safe_signature = SafeSignatureApprovedHash(signature, safe_hash)
elif signature_type == SafeSignatureType.EOA:
safe_signature = SafeSignatureEOA(signature, safe_hash)
elif signature_type == SafeSignatureType.ETH_SIGN:
safe_signature = SafeSignatureEthSign(signature, safe_hash)
safe_signatures.append(safe_signature)
return safe_signatures
@classmethod
def export_signatures(cls, safe_signatures: Sequence["SafeSignature"]) -> HexBytes:
"""
Takes a list of SafeSignature objects and exports them as a valid signature for the contract
:param safe_signatures:
:return: Valid signature for the Safe contract
"""
signature = b""
dynamic_part = b""
dynamic_offset = len(safe_signatures) * 65
# Signatures must be sorted by owner
for safe_signature in sorted(safe_signatures, key=lambda s: s.owner.lower()):
if isinstance(safe_signature, SafeSignatureContract):
signature += signature_to_bytes(
safe_signature.v, safe_signature.r, dynamic_offset
)
# encode_abi adds {32 bytes offset}{32 bytes size}. We don't need offset
contract_signature_padded = encode_abi(
["bytes"], [safe_signature.contract_signature]
)[32:]
contract_signature = contract_signature_padded[
: 32 + len(safe_signature.contract_signature)
]
dynamic_part += contract_signature
dynamic_offset += len(contract_signature)
else:
signature += safe_signature.export_signature()
return HexBytes(signature + dynamic_part)
def export_signature(self) -> HexBytes:
"""
Exports signature in a format that's valid individually. That's important for contract signatures, as it
will fix the offset
:return:
"""
return self.signature
@property
@abstractmethod
def owner(self):
"""
:return: Decode owner from signature, without any further validation (signature can be not valid)
"""
raise NotImplementedError
@abstractmethod
def is_valid(self, ethereum_client: EthereumClient, safe_address: str) -> bool:
"""
:param ethereum_client: Required for Contract Signature and Approved Hash check
:param safe_address: Required for Approved Hash check
:return: `True` if signature is valid, `False` otherwise
"""
raise NotImplementedError
@property
@abstractmethod
def signature_type(self) -> SafeSignatureType:
raise NotImplementedError
class SafeSignatureContract(SafeSignature):
EIP1271_MAGIC_VALUE = HexBytes(0x20C13B0B)
EIP1271_MAGIC_VALUE_UPDATED = HexBytes(0x1626BA7E)
def __init__(
self,
signature: EthereumBytes,
safe_hash: EthereumBytes,
safe_hash_preimage: EthereumBytes,
contract_signature: EthereumBytes,
):
"""
:param signature:
:param safe_hash: Signed hash for the Safe (message or transaction)
:param safe_hash_preimage: ``safe_hash`` preimage for EIP1271 validation
:param contract_signature:
"""
super().__init__(signature, safe_hash)
self.safe_hash_preimage = HexBytes(safe_hash_preimage)
self.contract_signature = HexBytes(contract_signature)
@classmethod
def from_values(
cls,
safe_owner: ChecksumAddress,
safe_hash: EthereumBytes,
safe_hash_preimage: EthereumBytes,
contract_signature: EthereumBytes,
) -> "SafeSignatureContract":
signature = signature_to_bytes(
0, int.from_bytes(HexBytes(safe_owner), byteorder="big"), 65
)
return cls(signature, safe_hash, safe_hash_preimage, contract_signature)
@property
def owner(self) -> ChecksumAddress:
"""
:return: Address of contract signing. No further checks to get the owner are needed,
but it could be a non-existing contract
"""
return uint_to_address(self.r)
@property
def signature_type(self) -> SafeSignatureType:
return SafeSignatureType.CONTRACT_SIGNATURE
def export_signature(self) -> HexBytes:
"""
Fix offset (s) and append `contract_signature` at the end of the signature
:return:
"""
# encode_abi adds {32 bytes offset}{32 bytes size}. We don't need offset
contract_signature_padded = encode_abi(["bytes"], [self.contract_signature])[
32:
]
contract_signature = contract_signature_padded[
: 32 + len(self.contract_signature)
]
dynamic_offset = 65
return HexBytes(
signature_to_bytes(self.v, self.r, dynamic_offset) + contract_signature
)
def is_valid(self, ethereum_client: EthereumClient, *args) -> bool:
compatibility_fallback_handler = get_compatibility_fallback_handler_contract(
ethereum_client.w3, self.owner
)
is_valid_signature_fn = (
compatibility_fallback_handler.get_function_by_signature(
"isValidSignature(bytes,bytes)"
)
)
try:
return is_valid_signature_fn(
self.safe_hash_preimage, self.contract_signature
).call() in (
self.EIP1271_MAGIC_VALUE,
self.EIP1271_MAGIC_VALUE_UPDATED,
)
except (Web3Exception, DecodingError, ValueError):
# Error using `pending` block identifier or contract does not exist
logger.warning(
"Cannot check EIP1271 signature from contract %s", self.owner
)
return False
class SafeSignatureApprovedHash(SafeSignature):
@property
def owner(self):
return uint_to_address(self.r)
@property
def signature_type(self):
return SafeSignatureType.APPROVED_HASH
@classmethod
def build_for_owner(cls, owner: str, safe_hash: str) -> "SafeSignatureApprovedHash":
r = owner.lower().replace("0x", "").rjust(64, "0")
s = "0" * 64
v = "01"
return cls(HexBytes(r + s + v), safe_hash)
def is_valid(self, ethereum_client: EthereumClient, safe_address: str) -> bool:
safe_contract = get_safe_contract(
ethereum_client.w3, ChecksumAddress(HexAddress(HexStr(safe_address)))
)
exception: Exception
block_identifiers: List[BlockIdentifier] = ["pending", "latest"]
for block_identifier in block_identifiers:
try:
return (
safe_contract.functions.approvedHashes(
self.owner, self.safe_hash
).call(block_identifier=block_identifier)
== 1
)
except (Web3Exception, DecodingError, ValueError) as e:
# Error using `pending` block identifier
exception = e
raise exception # This should never happen
class SafeSignatureEthSign(SafeSignature):
@property
def owner(self):
# defunct_hash_message prepends `\x19Ethereum Signed Message:\n32`
message_hash = defunct_hash_message(primitive=self.safe_hash)
return get_signing_address(message_hash, self.v - 4, self.r, self.s)
@property
def signature_type(self):
return SafeSignatureType.ETH_SIGN
def is_valid(self, *args) -> bool:
return True
class SafeSignatureEOA(SafeSignature):
@property
def owner(self):
return get_signing_address(self.safe_hash, self.v, self.r, self.s)
@property
def signature_type(self):
return SafeSignatureType.EOA
def is_valid(self, *args) -> bool:
return True