diff --git a/README.md b/README.md index 63c4855..2855a19 100644 --- a/README.md +++ b/README.md @@ -148,6 +148,20 @@ Deploying tx: 0x17a8009565731f45a1621905a7e85e84a6330b485ac3e7e450d90f126b6c3006 ``` Observe that we are setting `--constructorInputs`. It is optional for contracts that have no constructor inputs programmed. +If you do not pass the `--constructorInputs` argument, you shall be prompted for the same. + +``` +ev-cli deploy contracts/ERC20Mintable.sol --contractName='ERC20Mintable' + +Enter constructor inputs... +name(string): TestToken +symbol(string): TTK +decimals(uint8): 18 +Contract ERC20Mintable deployed successfully +Contract Address: 0x9290b03870b0c4c99cc3c1e1dfcfa1ff789af6c0 +Deploying tx: 0x699af417f4349f9e29d63dbc894874b5ae865fefe8e7a6bb2365339fab774211 +``` + ### SignerControlBase.sol This contract forms the base of [EthVigil's Proxy+Signer Control contract](https://medium.com/blockvigil/signer-control-cum-proxy-smart-contract-a-look-at-ethvigils-latest-offering-9ad6c098c095). Without going into the logic of the contract, let us take a look at the constructor as written in the contract. diff --git a/click_cli.py b/click_cli.py index 2408788..658db9b 100644 --- a/click_cli.py +++ b/click_cli.py @@ -9,6 +9,7 @@ import pwd from eth_utils import to_normalized_address from solidity_parser import parser +from utils.EVContractUtils import extract_abi, ABIParser CONTEXT_SETTINGS = dict( help_option_names=['-h', '--help'] @@ -222,7 +223,7 @@ def importsettings(importfile, verbose): @click.option('--contractName', 'contract_name', required=True, help='name of the contract to be deployed. For eg. FixedSupplyToken') @click.option('--constructorInputs', 'inputs', - help='constructor input values as a JSON list. ' + help='constructor input values as a JSON list. OPTIONAL. If you do not specify, you shall be prompted for the same. ' 'Eg: \'["abced", "0x008604d4997a15a77f00CA37aA9f6A376E129DC5"]\' ' 'for constructor inputs of type (string, address). ' 'Can be left empty if there are no inputs accepted by the constructor') @@ -235,15 +236,16 @@ def deploy(ctx_obj, contract_name, inputs, verbose, contract): CONTRACT: path to the solidity file - Usage example: ev-cli deploy ../token.sol --contractName=FixedSupplyToken --constructorInputs='JSON representation of the constructor arguments' + Usage example: ev-cli deploy contracts/Microblog.sol --contractName=Microblog --constructorInputs='JSON representation of the constructor arguments in an array' """ - contract_src = "" + constructor_input_prompt = False if verbose: click.echo('Got constructor inputs: ') click.echo(inputs) if inputs: c_inputs = json.loads(inputs) else: + constructor_input_prompt = True c_inputs = list() # an empty list sources = dict() if contract[0] == '~': @@ -281,6 +283,28 @@ def deploy(ctx_obj, contract_name, inputs, verbose, contract): break contract_src += chunk sources[f'ev-cli/{import_location[2:]}'] = {'content': contract_src} + + if len(c_inputs) == 0 and constructor_input_prompt: + abi_json = extract_abi(ctx_obj['settings'], {'sources': sources, 'sourceFile': f'ev-cli/{contract_file_name}'}) + abp = ABIParser(abi_json=abi_json) + abp.load_abi() + if len(abp.constructor_params()) > 0: + click.echo('Enter constructor inputs...') + for idx, each_param in enumerate(abp.constructor_params()): + param_type = abp._constructor_mapping["constructor"]["input_types"][idx] + param_type_cat = abp.type_category(param_type) + arg = click.prompt(f'{each_param}({param_type})') + if param_type_cat == 'integer': + arg = int(arg) + elif param_type_cat == 'array': + # check if it can be deserialized into a python dict + try: + arg_dict = json.loads(arg) + except json.JSONDecodeError: + click.echo(f'Parameter {each_param} of type {param_type} ' + f'should be correctly passed as a JSON array', err=True) + sys.exit(1) + c_inputs.append(arg) msg = "Trying to deploy" message_hash = encode_defunct(text=msg) # deploy from alpha account diff --git a/requirements.txt b/requirements.txt index 7b90a91..f01984f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -8,3 +8,5 @@ PyInstaller==3.6 pytest==4.4.1 pytest-click==0.3 solidity_parser==0.0.7 +tenacity==6.2.0 +antlr4-python3-runtime>=4.7,<4.8 \ No newline at end of file diff --git a/setup.py b/setup.py index 84d7410..6a629cd 100644 --- a/setup.py +++ b/setup.py @@ -12,7 +12,9 @@ 'requests == 2.22.0', 'eth-account == 0.4.0', 'solidity_parser == 0.0.7', - 'click == 7.0' + 'click == 7.0', + "tenacity==6.2.0", + "antlr4-python3-runtime>=4.7,<4.8" ], classifiers=[ "Programming Language :: Python :: 3", diff --git a/utils/EVContractUtils.py b/utils/EVContractUtils.py new file mode 100644 index 0000000..42dda2f --- /dev/null +++ b/utils/EVContractUtils.py @@ -0,0 +1,526 @@ +# -*- coding: utf-8 -*- +from eth_account.messages import defunct_hash_message +from eth_account.account import Account +import json +from eth_utils import encode_hex, keccak +import eth_utils +from eth_abi import is_encodable +from .http_helper import make_http_call +import os +import sys + + +def read_file_by_chunks(fp, chunk_size=1024): + chunk = '' + for p in fp: + chunk += p + if len(p) == chunk_size: + yield chunk + chunk = '' + if chunk: + yield chunk + + +# def extract_abi(contract_filepath, contract_name): +# # uses solcx +# file_contents = ''.join([_ for _ in read_file_by_chunks(open(os.path.join(sys.path[0], contract_filepath), 'r'))]) +# compile_args = dict() +# compile_args['language'] = 'Solidity' +# compile_args['sources'] = { +# contract_filepath: { +# 'content': file_contents +# } +# } +# compile_args['settings'] = { +# 'outputSelection': { +# contract_filepath: { +# contract_name: +# ['abi'] +# } +# } +# } +# compiled_output = solcx.compile_standard(compile_args) +# return compiled_output['contracts'][contract_filepath][contract_name]['abi'] + +def extract_abi(ev_settings, parsed_sources): + msg = "Trying to signup" + message_hash = defunct_hash_message(text=msg) + signed_msg = Account.signHash(message_hash, ev_settings['PRIVATEKEY']) + compile_params = {'msg': msg, 'sig': signed_msg.signature.hex()} + compile_params.update(parsed_sources) + _resp = make_http_call( + request_type='post', + url=ev_settings['INTERNAL_API_ENDPOINT'] + '/compile', + params=compile_params + ) + return _resp['data']['contract']['abi'] + + +class ABIParser: + def __init__(self, abi_json): + self._fill_allowed_ints() + self._fill_allowed_bytes() + self._events_mapping = {} + self._functions_mapping = {} + self._functions_name_to_hash = {} + self._constructor_mapping = {} + self._functions_selector_to_hash = {} + self._abi_def = abi_json + self._abi_mapped = False + + def constructor_params(self): + if self._abi_mapped: + return self._constructor_mapping['constructor']['input_params'] + else: + return False + + def ordered_map_to_ev_constructor_args(self, arg_map): + """ + + :param arg_map: unordered constructor arguments. For eg. {'constr_arg2': 'val2', 'constr_arg1': 'val1'} + :return: ordered list of constructor arguments to be accepted by EV APIs + """ + if not self._abi_mapped: + return None + param_list = [] + for idx, ordered_param in enumerate(self._constructor_mapping['constructor']['input_params']): + if self.type_category(self._constructor_mapping['constructor']['input_types'][idx]) == 'array': + # array types are passed as JSON serialized strings to EV APIs + param_list.append(json.dumps(arg_map[ordered_param])) + else: + param_list.append(arg_map[ordered_param]) + return param_list + + def _fill_allowed_ints(self): + self._allowed_int_types = allowed_int_types() + + def _fill_allowed_bytes(self): + self._allowed_byte_types = allowed_byte_types() + + def load_abi(self): + self._map_abi() + self._abi_mapped = True + + def _map_events(self, events): + for event in events: + canonical_types = "" + for types in event["inputs"]: + canonical_types += types["type"] + "," + # remove extra , in the end + canonical_types = canonical_types.rstrip(',') + canonical_full_name = "{0}({1})".format(event["name"], canonical_types) + event_hash = encode_hex(keccak(text=canonical_full_name)) + # initialize event signature based hash storage + # also add a redundant one based on the name to refer back to the hash + self._events_mapping[event_hash] = {} + self._events_mapping[event["name"]] = event_hash + # begin filling + self._events_mapping[event_hash]["nickname"] = event["name"] + self._events_mapping[event_hash]["canonical"] = canonical_full_name + # strip out the '0x' and take the first (32 bits/4 bytes/8 hex digits) as the method selector + # defined according to the function encoding standards defined by the Solidity compiler + self._events_mapping[event_hash]["selector"] = event_hash[2:10] + # types_full_list = list(map(lambda x: x["type"], event["inputs"])) + # params_full_list = list(map(lambda x: x["name"], event["inputs"])) + unindexed_params_list = list( + map(lambda x: x["name"], filter(lambda x: x["indexed"] is False, event["inputs"]))) + unindexed_types_list = list( + map(lambda x: x["type"], filter(lambda x: x["indexed"] is False, event["inputs"]))) + indexed_params_list = list( + map(lambda x: x["name"], filter(lambda x: x["indexed"] is True, event["inputs"]))) + indexed_types_list = list(map(lambda x: x["type"], filter(lambda x: x["indexed"] == True, event["inputs"]))) + self._events_mapping[event_hash]["unindexed_types"] = unindexed_types_list + self._events_mapping[event_hash]["unindexed_params"] = unindexed_params_list + self._events_mapping[event_hash]["indexed_params"] = indexed_params_list + self._events_mapping[event_hash]["indexed_types"] = indexed_types_list + + def _map_functions(self, functions): + for function in functions: + canonical_types = "" + if function['name'] == 'decimals': + self._is_erc20 = True + # --- expand tuple types to elementary types where necessary --- + fn_types_list = list() + tuple_encodings = dict() + for fn_input in function['inputs']: + if 'tuple' not in fn_input['type']: + fn_types_list.append(fn_input['type']) + canonical_types += fn_input['type'] + ',' + else: + # prepare string to be passed to eth_abi.encode_single('(component1, component2)', [val1, val2]) + is_array = False + if fn_input['type'][-2:] == '[]': # could be an array of tuples + is_array = True + enc_string = self._expand_components(fn_input['components'], is_array) + tuple_encodings[fn_input['name']] = enc_string + # the type to be included in the canonical sig is the same + # for eg 'f_name((uint256,string,address))' instead of f_name(tuple) + fn_types_list.append(enc_string) + canonical_types += enc_string + ',' + # END --- expand tuple types to elementary types where necessary --- + canonical_types = canonical_types.rstrip(',') + canonical_full_name = "{0}({1})".format(function["name"], canonical_types) + function_hash = encode_hex(keccak(text=canonical_full_name)) + # initialize function signature based hash storage + # also add a redundant one based on the name to refer back to the hash + self._functions_mapping[function_hash] = {} + self._functions_name_to_hash[function["name"]] = function_hash + # begin filling + self._functions_mapping[function_hash]["nickname"] = function["name"] + self._functions_mapping[function_hash]["canonical"] = canonical_full_name + # strip out the '0x' and take the first (32 bits/4 bytes/8 hex digits) as the method selector + # defined according to the function encoding standards defined by the Solidity compiler + self._functions_mapping[function_hash]["selector"] = function_hash[2:10] + fn_params_list = list(map(lambda x: x["name"], function["inputs"])) + # fn_types_list = list(map(lambda x: x["type"], function["inputs"])) + + self._functions_mapping[function_hash]["params"] = fn_params_list + self._functions_mapping[function_hash]["types"] = fn_types_list + if tuple_encodings: + self._functions_mapping[function_hash]['input_tuple_encodings'] = tuple_encodings + # add output types now + return_params_list = list(map(lambda x: x["name"], function["outputs"])) + self._functions_mapping[function_hash]["output_params"] = return_params_list + # self._functions_mapping[function_hash]["output_types"] = return_types_list + # --- OUTPUT params: expand tuple types to elementary types where necessary --- + fn_op_types_list = list() + op_tuple_encodings = dict() + for fn_output in function['outputs']: + if 'tuple' not in fn_output['type']: + fn_op_types_list.append(fn_output['type']) + else: + # prepare string to be passed to eth_abi.encode_single('(component1, component2)', [val1, val2]) + is_array = False + if fn_output['type'][-2:] == '[]': # could be an array of tuples + is_array = True + enc_string = self._expand_components(fn_output['components'], is_array) + op_tuple_encodings[fn_output['name']] = enc_string + # the type to be included in the canonical sig is the same + # for eg 'f_name((uint256,string,address))' instead of f_name(tuple) + fn_op_types_list.append(enc_string) + # END --- OUTPUT params: expand tuple types to elementary types where necessary --- + self._functions_mapping[function_hash]["output_types"] = fn_op_types_list + if op_tuple_encodings: + self._functions_mapping[function_hash]['output_tuple_encodings'] = op_tuple_encodings + self._functions_mapping[function_hash]["stateMutability"] = function["stateMutability"] + # fill up function selector to function canonical signature hash mapping + self._functions_selector_to_hash[function_hash[2:10]] = function_hash + + def _expand_components(self, components_list, is_tuple_array): + encoding_type_str = '(' + for component in components_list: + if 'tuple' not in component['type']: + encoding_type_str += component['type']+',' + else: + is_nested_tuple_an_array = component['type'][-2:] == '[]' + encoding_type_str += self._expand_components(component['components'], is_nested_tuple_an_array) + if encoding_type_str[-1:] == ',': + encoding_type_str = encoding_type_str[:-1] # remove the final comma + encoding_type_str += ')' # final enclosing parantheses + if is_tuple_array: + encoding_type_str += '[]' + + return encoding_type_str + + def _map_constructor(self, constr_): + for constr in constr_: + self._constructor_mapping = {"constructor": {}} + inputs = constr["inputs"] + input_params_list = list(map(lambda x: x["name"], inputs)) + input_types_list = list(map(lambda x: x["type"], inputs)) + self._constructor_mapping["constructor"]["input_params"] = input_params_list + self._constructor_mapping["constructor"]["input_types"] = input_types_list + + def _map_erc20_values(self): + pass + # if 'decimals' in + + def _map_abi(self): + events = list(filter(lambda x: x["type"] == "event", self._abi_def)) + functions = list(filter(lambda x: x["type"] == "function", self._abi_def)) + constr_ = list(filter(lambda x: x["type"] == "constructor", self._abi_def)) + # print("Events: ", events) + self._map_events(events) + self._map_functions(functions) + self._map_constructor(constr_) + # print("\n Function mapping.... \n {0}".format(self._functions_mapping)) + # print("\n Event mapping.... \n {0}".format(self._events_mapping)) + + def _only_getters(self): + hash_keys = list( + filter(lambda x: self._functions_mapping[x]["stateMutability"] == "view", self._functions_mapping)) + hash_entries = list(map(lambda x: self._functions_mapping[x], hash_keys)) + return dict(zip(hash_keys, hash_entries)) + + def _only_getters_by_name(self): + hash_keys = list( + filter(lambda x: self._functions_mapping[x]["stateMutability"] == "view", self._functions_mapping)) + hash_names = list(map(lambda x: self._functions_mapping[x]["nickname"], hash_keys)) + hash_entries = list(map(lambda x: self._functions_mapping[x], hash_keys)) + return dict(zip(hash_names, hash_entries)) + + def _only_writers_by_name(self): + hash_keys = list( + filter(lambda x: self._functions_mapping[x]["stateMutability"] != "view", self._functions_mapping)) + hash_names = list(map(lambda x: self._functions_mapping[x]["nickname"], hash_keys)) + hash_entries = list(map(lambda x: self._functions_mapping[x], hash_keys)) + return dict(zip(hash_names, hash_entries)) + + def is_valid(self, method, params): + if method in self._functions_name_to_hash: + cn_hash = self._functions_mapping[self._functions_name_to_hash[method]] + if len(cn_hash["params"]) is not len(params): + return (False, -2, "Argument list mismatch") + else: + return (True, 1, "Success") + else: + return (False, -1, "Invalid method: {0}".format(method)) + + def is_valid_param_dict(self, method, params): # params is a dictionary here. + if method in self._functions_name_to_hash: + cn_hash = self._functions_mapping[self._functions_name_to_hash[method]] + if len(cn_hash["params"]) is not len(params): + return (False, -2, "Argument list mismatch") + else: + return (True, 1, "Success") + else: + return (False, -1, "Invalid method") + + def type_category(self, sol_type): # type category that is used in Swagger API specs, not Solidity specific at all + if sol_type in self._allowed_int_types: + return "integer" + elif sol_type in self._allowed_byte_types or sol_type == "address" or sol_type == "string": + return "string" + elif sol_type == "bool": + return "boolean" + else: + if sol_type[-2:] == "[]": + return "array" + else: + return " " + + +class ABIHelper: + ''' + Array check methods have lists passed and modified "in place". + ''' + + @classmethod + def first_pass_check_int(cls, single_param, param_name, param_type, conversion_errors): + error_flag = False + try: + ret = int(single_param) + except (ValueError, AttributeError): + conversion_errors[param_name] = "Expected type: {0}. Supplied argument not a valid integer".format( + param_type) + error_flag = True + ret = 0 + return (ret, error_flag) + + @classmethod + def first_pass_check_byte(cls, single_param, param_name, param_type, conversion_errors): + error_flag = False + if eth_utils.is_0x_prefixed(single_param): + pc = eth_utils.remove_0x_prefix(single_param) + pc_to_bytes = bytes.fromhex(pc) + ret = pc_to_bytes + else: + try: + ret = single_param.encode('utf-8') + except: + conversion_errors[param_name] = "Expected type: {0}. Supplied argument not a valid byte object.".format( + param_type) + error_flag = True + ret = "".encode('utf-8') + return (ret, error_flag) + + @classmethod + def first_pass_check_address(cls, single_param, param_name, param_type, conversion_errors): + error_flag = False + ret = "0x" + single_param = str(single_param) + if not eth_utils.is_0x_prefixed(single_param): + conversion_errors[ + param_name] = "Expected type: address. Supplied argument {0} not a hexadecimal value".format( + single_param) + error_flag = True + else: + if not eth_utils.is_address(single_param): + conversion_errors[ + param_name] = "Expected type: address. Supplied argument {0} is not a valid Ethereum address".format( + single_param) + error_flag = True + else: + ret = single_param + return (ret, error_flag) + + @classmethod + def first_pass_check_string(cls, single_param, param_name, param_type, conversion_errors): + error_flag = False + try: + ret = str(single_param) + except: + conversion_errors[param_name] = "Expected type: {0}. Supplied argument not a valid string".format( + param_type) + error_flag = True + ret = "" + return (ret, error_flag) + + @classmethod + def first_pass_check_bool(cls, single_param, param_name, param_type, conversion_errors): + error_flag = False + try: + if single_param.lower() == "true" or single_param == "1": + ret = True + elif single_param.lower() == "false" or single_param == "0": + ret = False + else: + ret = False + conversion_errors[param_name] = "Expected type: bool. Supplied argument not a boolean." + error_flag = True + except: + conversion_errors[param_name] = "Expected type: {0}. Supplied argument not a boolean".format(param_type) + error_flag = True + ret = False + return (ret, error_flag) + + @classmethod + def first_pass_check_int_arr(cls, int_param_lst, param_name, base_param_type, conversion_errors): + error_flag = False + ret = int_param_lst + for idx, each_int in enumerate(int_param_lst): + try: + ret[idx] = int(each_int) + except (ValueError, AttributeError): + if param_name not in conversion_errors: + conversion_errors[param_name] = {"message": [], "failed_indexes": []} + error_msg = "Expected type: {0}. One or more supplied argument is not a valid integer".format( + base_param_type) + conversion_errors[param_name]["message"].append(error_msg) + conversion_errors[param_name]["failed_indexes"].append(idx) + error_flag = True + ret[idx] = 0 + return (ret, error_flag) + + @classmethod + def first_pass_check_bytes_arr(cls, bytes_param_lst, param_name, base_param_type, conversion_errors): + error_flag = False + ret = bytes_param_lst + for idx, bytes_param in enumerate(bytes_param_lst): + if eth_utils.is_0x_prefixed(bytes_param): + pc = eth_utils.remove_0x_prefix(bytes_param) + pc_to_bytes = bytes.fromhex(pc) + ret[idx] = pc_to_bytes + else: + try: + ret[idx] = bytes_param.encode('utf-8') + except: + error_msg = "Expected type: {0}. Supplied argument not a valid byte object.".format(base_param_type) + conversion_errors[param_name]["message"].append(error_msg) + conversion_errors[param_name]["failed_indexes"].append(idx) + error_flag = True + ret[idx] = "".encode('utf-8') + return (ret, error_flag) + + @classmethod + def first_pass_check_address_arr(cls, address_param_lst, param_name, base_param_type, conversion_errors): + error_flag = False + ret = address_param_lst + for idx, each_addr in enumerate(address_param_lst): + each_addr = str(each_addr) + ret[idx] = "0x" + if not eth_utils.is_0x_prefixed(each_addr): + if param_name not in conversion_errors: + conversion_errors[param_name] = {"message": [], "failed_indexes": []} + e_m = "Expected type: address. Supplied argument {0} not a hexadecimal value".format(each_addr) + conversion_errors[param_name]["message"].append(e_m) + conversion_errors[param_name]["failed_indexes"].append(idx) + error_flag = True + else: + if not eth_utils.is_address(each_addr): + if param_name not in conversion_errors: + conversion_errors[param_name] = {"message": [], "failed_indexes": []} + e_m = "Expected type: address. Supplied argument {0} is not a valid Ethereum address".format( + each_addr) + conversion_errors[param_name]["message"].append(e_m) + conversion_errors[param_name]["failed_indexes"].append(idx) + error_flag = True + else: + ret[idx] = each_addr + return (ret, error_flag) + + @classmethod + def first_pass_check_string_arr(cls, str_param_lst, param_name, base_param_type, conversion_errors): + error_flag = False + ret = str_param_lst + for idx, each_str in enumerate(str_param_lst): + try: + ret[idx] = str(each_str) + except: + if param_name not in conversion_errors: + conversion_errors[param_name] = {"message": [], "failed_indexes": []} + e_m = "Expected type: {0}. Supplied argument not a valid string".format(base_param_type) + conversion_errors[param_name]["message"].append(e_m) + conversion_errors[param_name]["failed_indexes"].append(idx) + error_flag = True + ret[idx] = "" + return (ret, error_flag) + + @classmethod + def first_pass_check_bool_arr(cls, bool_param_lst, param_name, base_param_type, conversion_errors): + error_flag = False + ret = bool_param_lst + for idx, each_bool in enumerate(bool_param_lst): + try: + if each_bool.lower() == "true" or each_bool == "1": + ret[idx] = True + elif each_bool.lower() == "false" or each_bool == "0": + ret[idx] = False + else: + ret[idx] = False + if param_name not in conversion_errors: + conversion_errors[param_name] = {"message": [], "failed_indexes": []} + conversion_errors[param_name]["message"].append( + "Expected type: bool. Supplied argument not a boolean.") + conversion_errors[param_name]["failed_indexes"].append(idx) + error_flag = True + except: + if param_name not in conversion_errors: + conversion_errors[param_name] = {"message": [], "failed_indexes": []} + e_m = "Expected type: {0}. Supplied argument not a boolean".format(base_param_type) + conversion_errors[param_name]["message"].append(e_m) + conversion_errors[param_name]["failed_indexes"].append(idx) + error_flag = True + ret[idx] = False + return (ret, error_flag) + + @classmethod + def first_pass_check_tuple_arr(cls, param_list, param_name, param_type, conversion_errors): + # we expect the entire type string in case of tuples to be passed here + # because we gon run a check using the eth_abi.is_encodable feature + error_flag = not is_encodable(param_type, param_list) + return param_list, error_flag + + +def allowed_int_types(): + int_type = "int" + uint_type = "uint" + allowed_ints = [int_type + str((i + 1) * 8) for i in range(32)] + allowed_uints = [uint_type + str((i + 1) * 8) for i in range(32)] + allowed_ints.append(int_type) + allowed_uints.append(uint_type) + + return allowed_ints + allowed_uints + + +def allowed_byte_types(): + byte_type = "bytes" + allowed_byte_types = [byte_type + str(i) for i in range(1, 33)] + return allowed_byte_types + ["byte", "bytes"] + + +if __name__ == "__main__": + ab = ABIParser(abi_json=extract_abi('SignerControlBase.sol', 'SignerControlBase')) + ab.load_abi() + # print(json.dumps(ab._only_getters_by_name())) + print(ab._constructor_mapping) diff --git a/utils/__init__.py b/utils/__init__.py new file mode 100644 index 0000000..0d7163b --- /dev/null +++ b/utils/__init__.py @@ -0,0 +1,14 @@ +from .exceptions import EVBaseException, EVAPIError, EVHTTPError, EVConnectionError +from .http_helper import make_http_call +from .EVContractUtils import extract_abi, ABIHelper, ABIParser + +__all__ = [ + 'ABIParser', + 'ABIHelper', + 'EVConnectionError', + 'EVHTTPError', + 'EVAPIError', + 'EVBaseException', + 'make_http_call', + 'extract_abi' +] \ No newline at end of file diff --git a/utils/exceptions.py b/utils/exceptions.py new file mode 100644 index 0000000..055df98 --- /dev/null +++ b/utils/exceptions.py @@ -0,0 +1,36 @@ +class EVBaseException(Exception): + def __init__(self, msg): + super(EVBaseException, self).__init__(msg) + self._msg = msg + # Exception.__init__(self, msg) + def __str__(self): + return self._msg + + +class EVConnectionError(EVBaseException): + def __init__(self, msg, exc_obj): + super(EVConnectionError, self).__init__(msg) + self.original_exc_obj = exc_obj + self._msg = "\n%s\nOriginal exception:\n%s" %(msg, exc_obj) + + def __str__(self): + return self._msg + + +class EVHTTPError(EVBaseException): + def __init__(self, request_url, request_body, status_code, response_body): + _msg = "\nRequest URL: %s\nRequest body: %s\nResponse HTTP status: %s\nResponse body: %s" % (request_url, request_body, status_code, response_body) + super(EVHTTPError, self).__init__(_msg) + self._request_url = request_url + self._request_body = request_body + self._status_code = status_code + self._msg = _msg + + def __str__(self): + return self._msg + + +class EVAPIError(EVHTTPError): + def __init__(self, request_url, request_body, status_code, response_body): + super(EVAPIError, self).__init__(request_url, request_body, status_code, response_body) + diff --git a/utils/http_helper.py b/utils/http_helper.py new file mode 100644 index 0000000..9b7b6c7 --- /dev/null +++ b/utils/http_helper.py @@ -0,0 +1,91 @@ +import tenacity +import requests +from .exceptions import * +import logging + +ev_logger = logging.getLogger('EVCore') + + +@tenacity.retry( + stop=tenacity.stop_after_delay(60), + wait=tenacity.wait_random_exponential(multiplier=1, max=60), + reraise=True +) +def get(url): + r = requests.get(url) + return r + + +@tenacity.retry( + stop=tenacity.stop_after_delay(60), + wait=tenacity.wait_random_exponential(multiplier=1, max=60), + reraise=True +) +def post(url, json_params, headers): + r = requests.post(url=url, json=json_params, headers=headers) + return r + + +def make_http_call(request_type, url, params={}, headers={}): + response = None + request_details = {'requestType': request_type, 'url': url, 'params': params, 'headers': headers} + ev_logger.debug('HTTPRequest') + ev_logger.debug(request_details) + if request_type == 'get': + try: + response = get(url) + except ( + requests.exceptions.ConnectionError, + requests.exceptions.Timeout, + requests.exceptions.ConnectTimeout + ) as e: + raise EVConnectionError("Error connecting to EthVigil API %s" % url, e) + except Exception as e: + raise EVBaseException(e.__str__()) + try: + response.raise_for_status() + except requests.exceptions.HTTPError: + request_details.update({'response': {'code': response.status_code, 'text': response.text}}) + ev_logger.debug(request_details) + raise EVHTTPError( + request_url=url, + request_body='', + status_code=response.status_code, + response_body=response.text + ) + elif request_type == 'post': + try: + response = post(url=url, json_params=params, headers=headers) + except ( + requests.exceptions.ConnectionError, + requests.exceptions.Timeout, + requests.exceptions.ConnectTimeout + ) as e: + raise EVConnectionError("Error connecting to EthVigil API %s" % url, e) + except Exception as e: + raise EVBaseException(e.__str__()) + try: + response.raise_for_status() + except requests.exceptions.HTTPError as e: + request_details.update({'response': {'code': response.status_code, 'text': response.text}}) + ev_logger.debug(request_details) + raise EVHTTPError( + request_url=url, + request_body=params, + status_code=response.status_code, + response_body=response.text + ) + if not(request_type == 'get' and 'swagger' in url): + return_status = response.status_code + return_content = response.text + request_details.update({'response': {'text': return_content, 'status': return_status}}) + ev_logger.debug('HTTPResponse') + ev_logger.debug(request_details) + response = response.json() + + api_success = response.get('success', False) + # ignoring GET returns for OpenAPI spec. Does not carry a 'success' field + if not api_success and request_type == 'get' and 'openapi' not in response: + raise EVAPIError(request_url=url, request_body=params, status_code=return_status, + response_body=return_content) + return response