diff --git a/samtranslator/feature_toggle/dialup.py b/samtranslator/feature_toggle/dialup.py index c9276e782..9cf5443f2 100644 --- a/samtranslator/feature_toggle/dialup.py +++ b/samtranslator/feature_toggle/dialup.py @@ -1,19 +1,20 @@ import hashlib +from abc import ABCMeta, abstractmethod -class BaseDialup(object): +class BaseDialup(object, metaclass=ABCMeta): """BaseDialup class to provide an interface for all dialup classes""" def __init__(self, region_config, **kwargs): # type: ignore[no-untyped-def] self.region_config = region_config - def is_enabled(self): # type: ignore[no-untyped-def] + @abstractmethod + def is_enabled(self) -> bool: """ Returns a bool on whether this dialup is enabled or not """ - raise NotImplementedError - def __str__(self): # type: ignore[no-untyped-def] + def __str__(self) -> str: return self.__class__.__name__ @@ -54,7 +55,7 @@ def __init__(self, region_config, account_id, feature_name, **kwargs): # type: self.account_id = account_id self.feature_name = feature_name - def _get_account_percentile(self): # type: ignore[no-untyped-def] + def _get_account_percentile(self) -> int: """ Get account percentile based on sha256 hash of account ID and feature_name @@ -65,10 +66,10 @@ def _get_account_percentile(self): # type: ignore[no-untyped-def] m.update(self.feature_name.encode()) return int(m.hexdigest(), 16) % 100 - def is_enabled(self): # type: ignore[no-untyped-def] + def is_enabled(self) -> bool: """ Enable when account_percentile falls within target_percentile Meaning only (target_percentile)% of accounts will be enabled """ - target_percentile = self.region_config.get("enabled-%", 0) - return self._get_account_percentile() < target_percentile # type: ignore[no-untyped-call] + target_percentile: int = self.region_config.get("enabled-%", 0) + return self._get_account_percentile() < target_percentile diff --git a/samtranslator/feature_toggle/feature_toggle.py b/samtranslator/feature_toggle/feature_toggle.py index bfba01ea7..2a63ff57b 100644 --- a/samtranslator/feature_toggle/feature_toggle.py +++ b/samtranslator/feature_toggle/feature_toggle.py @@ -1,4 +1,7 @@ import json +from abc import ABC, abstractmethod +from typing import Any, Dict, cast + import boto3 import logging @@ -87,15 +90,16 @@ def is_enabled(self, feature_name: str) -> bool: return is_enabled -class FeatureToggleConfigProvider: +class FeatureToggleConfigProvider(ABC): """Interface for all FeatureToggle config providers""" def __init__(self) -> None: pass @property - def config(self): # type: ignore[no-untyped-def] - raise NotImplementedError + @abstractmethod + def config(self) -> Dict[str, Any]: + pass class FeatureToggleDefaultConfigProvider(FeatureToggleConfigProvider): @@ -105,7 +109,7 @@ def __init__(self) -> None: FeatureToggleConfigProvider.__init__(self) @property - def config(self): # type: ignore[no-untyped-def] + def config(self) -> Dict[str, Any]: return {} @@ -116,10 +120,10 @@ def __init__(self, local_config_path): # type: ignore[no-untyped-def] FeatureToggleConfigProvider.__init__(self) with open(local_config_path, "r", encoding="utf-8") as f: config_json = f.read() - self.feature_toggle_config = json.loads(config_json) + self.feature_toggle_config = cast(Dict[str, Any], json.loads(config_json)) @property - def config(self): # type: ignore[no-untyped-def] + def config(self) -> Dict[str, Any]: return self.feature_toggle_config @@ -147,13 +151,13 @@ def __init__(self, application_id, environment_id, configuration_profile_id, app ClientId="FeatureToggleAppConfigConfigProvider", ) binary_config_string = response["Content"].read() - self.feature_toggle_config = json.loads(binary_config_string.decode("utf-8")) + self.feature_toggle_config = cast(Dict[str, Any], json.loads(binary_config_string.decode("utf-8"))) LOG.info("Finished loading feature toggle config from AppConfig.") except Exception as ex: LOG.error("Failed to load config from AppConfig: {}. Using empty config.".format(ex)) # There is chance that AppConfig is not available in a particular region. - self.feature_toggle_config = json.loads("{}") + self.feature_toggle_config = {} @property - def config(self): # type: ignore[no-untyped-def] + def config(self) -> Dict[str, Any]: return self.feature_toggle_config diff --git a/samtranslator/intrinsics/resource_refs.py b/samtranslator/intrinsics/resource_refs.py index d11cad05b..76f0be32f 100644 --- a/samtranslator/intrinsics/resource_refs.py +++ b/samtranslator/intrinsics/resource_refs.py @@ -67,12 +67,12 @@ def get_all(self, logical_id): # type: ignore[no-untyped-def] """ return self._refs.get(logical_id, None) - def __len__(self): # type: ignore[no-untyped-def] + def __len__(self) -> int: """ To make len(this_object) work :return: Number of resource references available """ return len(self._refs) - def __str__(self): # type: ignore[no-untyped-def] + def __str__(self) -> str: return str(self._refs) diff --git a/samtranslator/metrics/method_decorator.py b/samtranslator/metrics/method_decorator.py index fbe09328c..c92dd442b 100644 --- a/samtranslator/metrics/method_decorator.py +++ b/samtranslator/metrics/method_decorator.py @@ -21,11 +21,11 @@ class MetricsMethodWrapperSingleton: _METRICS_INSTANCE = _DUMMY_INSTANCE @staticmethod - def set_instance(metrics): # type: ignore[no-untyped-def] + def set_instance(metrics: Metrics) -> None: MetricsMethodWrapperSingleton._METRICS_INSTANCE = metrics @staticmethod - def get_instance(): # type: ignore[no-untyped-def] + def get_instance() -> Metrics: """ Return the instance, if nothing is set return a dummy one """ diff --git a/samtranslator/metrics/metrics.py b/samtranslator/metrics/metrics.py index 2285a743e..77fa4d5f0 100644 --- a/samtranslator/metrics/metrics.py +++ b/samtranslator/metrics/metrics.py @@ -3,6 +3,7 @@ """ import logging from datetime import datetime +from typing import Any, Dict LOG = logging.getLogger(__name__) @@ -101,7 +102,7 @@ def __init__(self, name, value, unit, dimensions=None, timestamp=None): # type: self.dimensions = dimensions if dimensions else [] self.timestamp = timestamp if timestamp else datetime.utcnow() - def get_metric_data(self): # type: ignore[no-untyped-def] + def get_metric_data(self) -> Dict[str, Any]: return { "MetricName": self.name, "Value": self.value, @@ -123,13 +124,13 @@ def __init__(self, namespace="ServerlessTransform", metrics_publisher=None): # self.metrics_cache = {} self.namespace = namespace - def __del__(self): # type: ignore[no-untyped-def] + def __del__(self) -> None: if len(self.metrics_cache) > 0: # attempting to publish if user forgot to call publish in code LOG.warning( "There are unpublished metrics. Please make sure you call publish after you record all metrics." ) - self.publish() # type: ignore[no-untyped-call] + self.publish() def _record_metric(self, name, value, unit, dimensions=None, timestamp=None): # type: ignore[no-untyped-def] """ @@ -167,7 +168,7 @@ def record_latency(self, name, value, dimensions=None, timestamp=None): # type: """ self._record_metric(name, value, Unit.Milliseconds, dimensions, timestamp) # type: ignore[no-untyped-call] - def publish(self): # type: ignore[no-untyped-def] + def publish(self) -> None: """Calls publish method from the configured metrics publisher to publish metrics""" # flatten the key->list dict into a flat list; we don't care about the key as it's # the metric name which is also in the MetricDatum object diff --git a/samtranslator/model/__init__.py b/samtranslator/model/__init__.py index e53f8f716..b07e0c90b 100644 --- a/samtranslator/model/__init__.py +++ b/samtranslator/model/__init__.py @@ -1,7 +1,7 @@ """ CloudFormation Resource serialization, deserialization, and validation """ import re import inspect -from typing import Any, Callable, Dict, List, Optional, Union +from typing import Any, Callable, Dict, List, Optional, Tuple, Union from samtranslator.intrinsics.resolver import IntrinsicsResolver from samtranslator.model.exceptions import ExpectedType, InvalidResourceException, InvalidResourcePropertyTypeException @@ -138,7 +138,7 @@ def get_supported_resource_attributes(cls): # type: ignore[no-untyped-def] return tuple(cls._supported_resource_attributes) @classmethod - def get_pass_through_attributes(cls): # type: ignore[no-untyped-def] + def get_pass_through_attributes(cls) -> Tuple[str, ...]: """ A getter method for the resource attributes to be passed to auto-generated resources returns: a tuple that contains the name of all pass through attributes @@ -254,11 +254,11 @@ def to_dict(self) -> Dict[str, Dict[str, Any]]: """ self.validate_properties() - resource_dict = self._generate_resource_dict() # type: ignore[no-untyped-call] + resource_dict = self._generate_resource_dict() return {self.logical_id: resource_dict} - def _generate_resource_dict(self): # type: ignore[no-untyped-def] + def _generate_resource_dict(self) -> Dict[str, Any]: """Generates the resource dict for this Resource, the value associated with the logical id in a CloudFormation template's Resources section. @@ -383,7 +383,7 @@ def get_passthrough_resource_attributes(self) -> Dict[str, Any]: :return: Dictionary of resource attributes. """ attributes = {} - for resource_attribute in self.get_pass_through_attributes(): # type: ignore[no-untyped-call] + for resource_attribute in self.get_pass_through_attributes(): if resource_attribute in self.resource_attributes: attributes[resource_attribute] = self.resource_attributes.get(resource_attribute) return attributes diff --git a/samtranslator/model/api/api_generator.py b/samtranslator/model/api/api_generator.py index d5e067c91..a7530802a 100644 --- a/samtranslator/model/api/api_generator.py +++ b/samtranslator/model/api/api_generator.py @@ -1117,7 +1117,7 @@ def _get_authorizers(self, authorizers_config, default_authorizer=None): # type # The dict below will eventually become part of swagger/openapi definition, thus requires using Py27Dict() authorizers = Py27Dict() if default_authorizer == "AWS_IAM": - authorizers[default_authorizer] = ApiGatewayAuthorizer( # type: ignore[no-untyped-call] + authorizers[default_authorizer] = ApiGatewayAuthorizer( api_logical_id=self.logical_id, name=default_authorizer, is_aws_iam_authorizer=True ) @@ -1131,7 +1131,7 @@ def _get_authorizers(self, authorizers_config, default_authorizer=None): # type for authorizer_name, authorizer in authorizers_config.items(): sam_expect(authorizer, self.logical_id, f"Auth.Authorizers.{authorizer_name}").to_be_a_map() - authorizers[authorizer_name] = ApiGatewayAuthorizer( # type: ignore[no-untyped-call] + authorizers[authorizer_name] = ApiGatewayAuthorizer( api_logical_id=self.logical_id, name=authorizer_name, user_pool_arn=authorizer.get("UserPoolArn"), diff --git a/samtranslator/model/apigateway.py b/samtranslator/model/apigateway.py index 20a51cdc9..4a4b96a80 100644 --- a/samtranslator/model/apigateway.py +++ b/samtranslator/model/apigateway.py @@ -125,7 +125,7 @@ def make_auto_deployable( # type: ignore[no-untyped-def] data = self._X_HASH_DELIMITER.join(hash_input) generator = logical_id_generator.LogicalIdGenerator(self.logical_id, data) self.logical_id = generator.gen() - digest = generator.get_hash(length=40) # type: ignore[no-untyped-call] # Get the full hash + digest = generator.get_hash(length=40) self.Description = "RestApi deployment id: {}".format(digest) stage.update_deployment_ref(self.logical_id) @@ -159,7 +159,7 @@ def __init__( self.response_templates = response_templates or Py27Dict() self.status_code = status_code_str - def generate_swagger(self): # type: ignore[no-untyped-def] + def generate_swagger(self) -> Py27Dict: # Applying Py27Dict here as this goes into swagger swagger = Py27Dict() swagger["responseParameters"] = self._add_prefixes(self.response_parameters) # type: ignore[no-untyped-call] @@ -265,7 +265,7 @@ def __init__( # type: ignore[no-untyped-def] user_pool_arn=None, function_arn=None, identity=None, - function_payload_type=None, + function_payload_type: Optional[str] = None, function_invoke_role=None, is_aws_iam_authorizer=False, authorization_scopes=None, @@ -325,23 +325,23 @@ def _is_missing_identity_source(self, identity: Dict[str, Any]) -> bool: # If we can resolve ttl, attempt to see if things are valid return ttl_int > 0 and required_properties_missing - def generate_swagger(self): # type: ignore[no-untyped-def] - authorizer_type = self._get_type() # type: ignore[no-untyped-call] + def generate_swagger(self) -> Py27Dict: + authorizer_type = self._get_type() APIGATEWAY_AUTHORIZER_KEY = "x-amazon-apigateway-authorizer" swagger = Py27Dict() swagger["type"] = "apiKey" - swagger["name"] = self._get_swagger_header_name() # type: ignore[no-untyped-call] + swagger["name"] = self._get_swagger_header_name() swagger["in"] = "header" - swagger["x-amazon-apigateway-authtype"] = self._get_swagger_authtype() # type: ignore[no-untyped-call] + swagger["x-amazon-apigateway-authtype"] = self._get_swagger_authtype() if authorizer_type == "COGNITO_USER_POOLS": authorizer_dict = Py27Dict() - authorizer_dict["type"] = self._get_swagger_authorizer_type() # type: ignore[no-untyped-call] - authorizer_dict["providerARNs"] = self._get_user_pool_arn_array() # type: ignore[no-untyped-call] + authorizer_dict["type"] = self._get_swagger_authorizer_type() + authorizer_dict["providerARNs"] = self._get_user_pool_arn_array() swagger[APIGATEWAY_AUTHORIZER_KEY] = authorizer_dict elif authorizer_type == "LAMBDA": - swagger[APIGATEWAY_AUTHORIZER_KEY] = Py27Dict({"type": self._get_swagger_authorizer_type()}) # type: ignore[no-untyped-call, no-untyped-call] + swagger[APIGATEWAY_AUTHORIZER_KEY] = Py27Dict({"type": self._get_swagger_authorizer_type()}) partition = ArnGenerator.get_partition_name() resource = "lambda:path/2015-03-31/functions/${__FunctionArn__}/invocations" authorizer_uri = fnSub( @@ -352,8 +352,8 @@ def generate_swagger(self): # type: ignore[no-untyped-def] ) swagger[APIGATEWAY_AUTHORIZER_KEY]["authorizerUri"] = authorizer_uri - reauthorize_every = self._get_reauthorize_every() # type: ignore[no-untyped-call] - function_invoke_role = self._get_function_invoke_role() # type: ignore[no-untyped-call] + reauthorize_every = self._get_reauthorize_every() + function_invoke_role = self._get_function_invoke_role() if reauthorize_every is not None: swagger[APIGATEWAY_AUTHORIZER_KEY]["authorizerResultTtlInSeconds"] = reauthorize_every @@ -361,23 +361,23 @@ def generate_swagger(self): # type: ignore[no-untyped-def] if function_invoke_role: swagger[APIGATEWAY_AUTHORIZER_KEY]["authorizerCredentials"] = function_invoke_role - if self._get_function_payload_type() == "REQUEST": # type: ignore[no-untyped-call] + if self._get_function_payload_type() == "REQUEST": identity_source = self._get_identity_source() if identity_source: swagger[APIGATEWAY_AUTHORIZER_KEY]["identitySource"] = self._get_identity_source() # Authorizer Validation Expression is only allowed on COGNITO_USER_POOLS and LAMBDA_TOKEN - is_lambda_token_authorizer = authorizer_type == "LAMBDA" and self._get_function_payload_type() == "TOKEN" # type: ignore[no-untyped-call] + is_lambda_token_authorizer = authorizer_type == "LAMBDA" and self._get_function_payload_type() == "TOKEN" if authorizer_type == "COGNITO_USER_POOLS" or is_lambda_token_authorizer: - identity_validation_expression = self._get_identity_validation_expression() # type: ignore[no-untyped-call] + identity_validation_expression = self._get_identity_validation_expression() if identity_validation_expression: swagger[APIGATEWAY_AUTHORIZER_KEY]["identityValidationExpression"] = identity_validation_expression return swagger - def _get_identity_validation_expression(self): # type: ignore[no-untyped-def] + def _get_identity_validation_expression(self) -> Optional[PassThrough]: return self.identity and self.identity.get("ValidationExpression") @staticmethod @@ -417,19 +417,19 @@ def _get_identity_source(self) -> str: return identity_source - def _get_user_pool_arn_array(self): # type: ignore[no-untyped-def] + def _get_user_pool_arn_array(self) -> List[PassThrough]: return self.user_pool_arn if isinstance(self.user_pool_arn, list) else [self.user_pool_arn] - def _get_swagger_header_name(self): # type: ignore[no-untyped-def] - authorizer_type = self._get_type() # type: ignore[no-untyped-call] - payload_type = self._get_function_payload_type() # type: ignore[no-untyped-call] + def _get_swagger_header_name(self) -> Optional[str]: + authorizer_type = self._get_type() + payload_type = self._get_function_payload_type() if authorizer_type == "LAMBDA" and payload_type == "REQUEST": return "Unused" - return self._get_identity_header() # type: ignore[no-untyped-call] + return self._get_identity_header() - def _get_type(self): # type: ignore[no-untyped-def] + def _get_type(self) -> str: if self.is_aws_iam_authorizer: return "AWS_IAM" @@ -438,7 +438,7 @@ def _get_type(self): # type: ignore[no-untyped-def] return "LAMBDA" - def _get_identity_header(self): # type: ignore[no-untyped-def] + def _get_identity_header(self) -> Optional[str]: if self.identity and not isinstance(self.identity, dict): raise InvalidResourceException( self.api_logical_id, @@ -451,20 +451,20 @@ def _get_identity_header(self): # type: ignore[no-untyped-def] return self.identity.get("Header") - def _get_reauthorize_every(self): # type: ignore[no-untyped-def] + def _get_reauthorize_every(self) -> Optional[PassThrough]: if not self.identity: return None return self.identity.get("ReauthorizeEvery") - def _get_function_invoke_role(self): # type: ignore[no-untyped-def] + def _get_function_invoke_role(self) -> Optional[PassThrough]: if not self.function_invoke_role or self.function_invoke_role == "NONE": return None return self.function_invoke_role - def _get_swagger_authtype(self): # type: ignore[no-untyped-def] - authorizer_type = self._get_type() # type: ignore[no-untyped-call] + def _get_swagger_authtype(self) -> str: + authorizer_type = self._get_type() if authorizer_type == "AWS_IAM": return "awsSigv4" @@ -473,19 +473,21 @@ def _get_swagger_authtype(self): # type: ignore[no-untyped-def] return "custom" - def _get_function_payload_type(self): # type: ignore[no-untyped-def] + def _get_function_payload_type(self) -> str: return "TOKEN" if not self.function_payload_type else self.function_payload_type - def _get_swagger_authorizer_type(self): # type: ignore[no-untyped-def] - authorizer_type = self._get_type() # type: ignore[no-untyped-call] + def _get_swagger_authorizer_type(self) -> Optional[str]: + authorizer_type = self._get_type() if authorizer_type == "COGNITO_USER_POOLS": return "cognito_user_pools" - payload_type = self._get_function_payload_type() # type: ignore[no-untyped-call] + payload_type = self._get_function_payload_type() if payload_type == "REQUEST": return "request" if payload_type == "TOKEN": return "token" + + return None # should we raise validation error here? diff --git a/samtranslator/model/apigatewayv2.py b/samtranslator/model/apigatewayv2.py index 41d1611b6..9ace9497e 100644 --- a/samtranslator/model/apigatewayv2.py +++ b/samtranslator/model/apigatewayv2.py @@ -4,6 +4,7 @@ from samtranslator.model.types import IS_DICT, is_type, one_of, IS_STR, list_of from samtranslator.model.intrinsics import ref, fnSub from samtranslator.model.exceptions import ExpectedType, InvalidResourceException +from samtranslator.schema.common import PassThrough from samtranslator.translator.arn_generator import ArnGenerator from samtranslator.utils.types import Intrinsicable from samtranslator.validator.value_validator import sam_expect @@ -104,26 +105,26 @@ def __init__( # type: ignore[no-untyped-def] self.enable_simple_responses = enable_simple_responses self.is_aws_iam_authorizer = is_aws_iam_authorizer - self._validate_input_parameters() # type: ignore[no-untyped-call] + self._validate_input_parameters() - authorizer_type = self._get_auth_type() # type: ignore[no-untyped-call] + authorizer_type = self._get_auth_type() # Validate necessary parameters exist if authorizer_type == "JWT": self._validate_jwt_authorizer() if authorizer_type == "REQUEST": - self._validate_lambda_authorizer() # type: ignore[no-untyped-call] + self._validate_lambda_authorizer() - def _get_auth_type(self): # type: ignore[no-untyped-def] + def _get_auth_type(self) -> str: if self.is_aws_iam_authorizer: return "AWS_IAM" if self.jwt_configuration: return "JWT" return "REQUEST" - def _validate_input_parameters(self): # type: ignore[no-untyped-def] - authorizer_type = self._get_auth_type() # type: ignore[no-untyped-call] + def _validate_input_parameters(self) -> None: + authorizer_type = self._get_auth_type() if self.authorization_scopes is not None and not isinstance(self.authorization_scopes, list): raise InvalidResourceException(self.api_logical_id, "AuthorizationScopes must be a list.") @@ -176,7 +177,7 @@ def _validate_jwt_authorizer(self) -> None: self.api_logical_id, f"{self.name} OAuth2 Authorizer must define 'IdentitySource'." ) - def _validate_lambda_authorizer(self): # type: ignore[no-untyped-def] + def _validate_lambda_authorizer(self) -> None: if not self.function_arn: raise InvalidResourceException( self.api_logical_id, f"{self.name} Lambda Authorizer must define 'FunctionArn'." @@ -190,7 +191,7 @@ def generate_openapi(self) -> Dict[str, Any]: """ Generates OAS for the securitySchemes section """ - authorizer_type = self._get_auth_type() # type: ignore[no-untyped-call] + authorizer_type = self._get_auth_type() openapi: Dict[str, Any] if authorizer_type == "AWS_IAM": @@ -231,7 +232,7 @@ def generate_openapi(self) -> Dict[str, Any]: openapi[APIGATEWAY_AUTHORIZER_KEY]["authorizerUri"] = authorizer_uri # Set authorizerCredentials if present - function_invoke_role = self._get_function_invoke_role() # type: ignore[no-untyped-call] + function_invoke_role = self._get_function_invoke_role() if function_invoke_role: openapi[APIGATEWAY_AUTHORIZER_KEY]["authorizerCredentials"] = function_invoke_role @@ -259,7 +260,7 @@ def generate_openapi(self) -> Dict[str, Any]: raise ValueError(f"Unexpected authorizer_type: {authorizer_type}") return openapi - def _get_function_invoke_role(self): # type: ignore[no-untyped-def] + def _get_function_invoke_role(self) -> Optional[PassThrough]: if not self.function_invoke_role or self.function_invoke_role == "NONE": return None diff --git a/samtranslator/model/eventsources/cloudwatchlogs.py b/samtranslator/model/eventsources/cloudwatchlogs.py index f9c7682b9..743dc1c45 100644 --- a/samtranslator/model/eventsources/cloudwatchlogs.py +++ b/samtranslator/model/eventsources/cloudwatchlogs.py @@ -1,3 +1,5 @@ +from typing import Any, Dict + from samtranslator.metrics.method_decorator import cw_timer from samtranslator.model import PropertyType from samtranslator.model.intrinsics import fnSub @@ -15,6 +17,9 @@ class CloudWatchLogs(PushEventSource): principal = "logs.amazonaws.com" property_types = {"LogGroupName": PropertyType(True, IS_STR), "FilterPattern": PropertyType(True, IS_STR)} + LogGroupName: str + FilterPattern: str + @cw_timer(prefix=FUNCTION_EVETSOURCE_METRIC_PREFIX) def to_cloudformation(self, **kwargs): # type: ignore[no-untyped-def] """Returns the CloudWatch Logs Subscription Filter and Lambda Permission to which this CloudWatch Logs event source @@ -29,20 +34,20 @@ def to_cloudformation(self, **kwargs): # type: ignore[no-untyped-def] if not function: raise TypeError("Missing required keyword argument: function") - source_arn = self.get_source_arn() # type: ignore[no-untyped-call] + source_arn = self.get_source_arn() permission = self._construct_permission(function, source_arn=source_arn) # type: ignore[no-untyped-call] subscription_filter = self.get_subscription_filter(function, permission) # type: ignore[no-untyped-call] resources = [permission, subscription_filter] return resources - def get_source_arn(self): # type: ignore[no-untyped-def] + def get_source_arn(self) -> Dict[str, Any]: resource = "log-group:${__LogGroupName__}:*" partition = ArnGenerator.get_partition_name() return fnSub( ArnGenerator.generate_arn(partition=partition, service="logs", resource=resource), # type: ignore[no-untyped-call] - {"__LogGroupName__": self.LogGroupName}, # type: ignore[attr-defined] + {"__LogGroupName__": self.LogGroupName}, ) def get_subscription_filter(self, function, permission): # type: ignore[no-untyped-def] @@ -51,8 +56,8 @@ def get_subscription_filter(self, function, permission): # type: ignore[no-unty depends_on=[permission.logical_id], attributes=function.get_passthrough_resource_attributes(), ) - subscription_filter.LogGroupName = self.LogGroupName # type: ignore[attr-defined] - subscription_filter.FilterPattern = self.FilterPattern # type: ignore[attr-defined] + subscription_filter.LogGroupName = self.LogGroupName + subscription_filter.FilterPattern = self.FilterPattern subscription_filter.DestinationArn = function.get_runtime_attr("arn") return subscription_filter diff --git a/samtranslator/model/eventsources/pull.py b/samtranslator/model/eventsources/pull.py index 21bfdb8e9..78ccafdbd 100644 --- a/samtranslator/model/eventsources/pull.py +++ b/samtranslator/model/eventsources/pull.py @@ -132,7 +132,7 @@ def to_cloudformation(self, **kwargs): # type: ignore[no-untyped-def] lambda_eventsourcemapping.TumblingWindowInSeconds = self.TumblingWindowInSeconds lambda_eventsourcemapping.FunctionResponseTypes = self.FunctionResponseTypes lambda_eventsourcemapping.FilterCriteria = self.FilterCriteria - self._validate_filter_criteria() # type: ignore[no-untyped-call] + self._validate_filter_criteria() if self.KafkaBootstrapServers: lambda_eventsourcemapping.SelfManagedEventSource = { @@ -215,7 +215,7 @@ def _link_policy(self, role, destination_config_policy=None): # type: ignore[no if not destination_config_policy.get("PolicyDocument") in [d["PolicyDocument"] for d in role.Policies]: role.Policies.append(destination_config_policy) - def _validate_filter_criteria(self): # type: ignore[no-untyped-def] + def _validate_filter_criteria(self) -> None: if not self.FilterCriteria or is_intrinsic(self.FilterCriteria): return if self.resource_type not in self.RESOURCE_TYPES_WITH_EVENT_FILTERING: @@ -492,7 +492,7 @@ def generate_policy_document(self, source_access_configurations: List[Any]): # statements.append(secret_manager) if has_vpc_config: - vpc_permissions = self.get_vpc_permission() # type: ignore[no-untyped-call] + vpc_permissions = self.get_vpc_permission() statements.append(vpc_permissions) if self.SecretsManagerKmsKeyId: @@ -574,7 +574,7 @@ def get_secret_manager_secret(self, authentication_uri): # type: ignore[no-unty "Resource": authentication_uri, } - def get_vpc_permission(self): # type: ignore[no-untyped-def] + def get_vpc_permission(self) -> Dict[str, Any]: return { "Action": [ "ec2:CreateNetworkInterface", diff --git a/samtranslator/model/eventsources/push.py b/samtranslator/model/eventsources/push.py index f1e238f96..923f81649 100644 --- a/samtranslator/model/eventsources/push.py +++ b/samtranslator/model/eventsources/push.py @@ -19,6 +19,7 @@ from samtranslator.model.eventbridge_utils import EventBridgeRuleUtils from samtranslator.model.iot import IotTopicRule from samtranslator.model.cognito import CognitoUserPool +from samtranslator.schema.common import PassThrough from samtranslator.translator import logical_id_generator from samtranslator.translator.arn_generator import ArnGenerator from samtranslator.model.exceptions import InvalidEventException, InvalidResourceException, InvalidDocumentException @@ -111,6 +112,16 @@ class Schedule(PushEventSource): "RetryPolicy": PropertyType(False, IS_DICT), } + Schedule: PassThrough + RuleName: Optional[PassThrough] + Input: Optional[PassThrough] + Enabled: Optional[bool] + State: Optional[PassThrough] + Name: Optional[PassThrough] + Description: Optional[PassThrough] + DeadLetterConfig: Optional[Dict[str, Any]] + RetryPolicy: Optional[PassThrough] + @cw_timer(prefix=FUNCTION_EVETSOURCE_METRIC_PREFIX) def to_cloudformation(self, **kwargs): # type: ignore[no-untyped-def] """Returns the EventBridge Rule and Lambda Permission to which this Schedule event source corresponds. @@ -130,24 +141,24 @@ def to_cloudformation(self, **kwargs): # type: ignore[no-untyped-def] events_rule = EventsRule(self.logical_id, attributes=passthrough_resource_attributes) resources.append(events_rule) - events_rule.ScheduleExpression = self.Schedule # type: ignore[attr-defined] + events_rule.ScheduleExpression = self.Schedule - if self.State and self.Enabled is not None: # type: ignore[attr-defined, attr-defined] + if self.State and self.Enabled is not None: raise InvalidEventException(self.relative_id, "State and Enabled Properties cannot both be specified.") - if self.State: # type: ignore[attr-defined] - events_rule.State = self.State # type: ignore[attr-defined] + if self.State: + events_rule.State = self.State - if self.Enabled is not None: # type: ignore[attr-defined] - events_rule.State = "ENABLED" if self.Enabled else "DISABLED" # type: ignore[attr-defined] + if self.Enabled is not None: + events_rule.State = "ENABLED" if self.Enabled else "DISABLED" - events_rule.Name = self.Name # type: ignore[attr-defined] - events_rule.Description = self.Description # type: ignore[attr-defined] + events_rule.Name = self.Name + events_rule.Description = self.Description source_arn = events_rule.get_runtime_attr("arn") dlq_queue_arn = None - if self.DeadLetterConfig is not None: # type: ignore[attr-defined] - EventBridgeRuleUtils.validate_dlq_config(self.logical_id, self.DeadLetterConfig) # type: ignore[attr-defined, no-untyped-call] + if self.DeadLetterConfig is not None: + EventBridgeRuleUtils.validate_dlq_config(self.logical_id, self.DeadLetterConfig) # type: ignore[no-untyped-call] dlq_queue_arn, dlq_resources = EventBridgeRuleUtils.get_dlq_queue_arn_and_resources( # type: ignore[no-untyped-call] self, source_arn, passthrough_resource_attributes ) @@ -166,14 +177,14 @@ def _construct_target(self, function, dead_letter_queue_arn=None): # type: igno :rtype: dict """ target = {"Arn": function.get_runtime_attr("arn"), "Id": self.logical_id + "LambdaTarget"} - if self.Input is not None: # type: ignore[attr-defined] - target["Input"] = self.Input # type: ignore[attr-defined] + if self.Input is not None: + target["Input"] = self.Input - if self.DeadLetterConfig is not None: # type: ignore[attr-defined] + if self.DeadLetterConfig is not None: target["DeadLetterConfig"] = {"Arn": dead_letter_queue_arn} - if self.RetryPolicy is not None: # type: ignore[attr-defined] - target["RetryPolicy"] = self.RetryPolicy # type: ignore[attr-defined] + if self.RetryPolicy is not None: + target["RetryPolicy"] = self.RetryPolicy return target diff --git a/samtranslator/model/exceptions.py b/samtranslator/model/exceptions.py index 123a44934..686ccb475 100644 --- a/samtranslator/model/exceptions.py +++ b/samtranslator/model/exceptions.py @@ -54,7 +54,7 @@ def __init__(self, logical_id: str, duplicate_id: str, type: str) -> None: self._type = type @property - def message(self): # type: ignore[no-untyped-def] + def message(self) -> str: return ( "Transforming resource with id [{logical_id}] attempts to create a new" ' resource with id [{duplicate_id}] and type "{type}". A resource with that id already' diff --git a/samtranslator/model/iam.py b/samtranslator/model/iam.py index 2c801ae4f..5a7231d06 100644 --- a/samtranslator/model/iam.py +++ b/samtranslator/model/iam.py @@ -58,7 +58,7 @@ def step_functions_start_execution_role_policy(cls, state_machine_arn, logical_i return document @classmethod - def stepfunctions_assume_role_policy(cls): # type: ignore[no-untyped-def] + def stepfunctions_assume_role_policy(cls) -> Dict[str, Any]: document = { "Version": "2012-10-17", "Statement": [ @@ -72,7 +72,7 @@ def stepfunctions_assume_role_policy(cls): # type: ignore[no-untyped-def] return document @classmethod - def cloud_watch_log_assume_role_policy(cls): # type: ignore[no-untyped-def] + def cloud_watch_log_assume_role_policy(cls) -> Dict[str, Any]: document = { "Version": "2012-10-17", "Statement": [ diff --git a/samtranslator/model/naming.py b/samtranslator/model/naming.py index d51ca8b6e..750410059 100644 --- a/samtranslator/model/naming.py +++ b/samtranslator/model/naming.py @@ -6,9 +6,9 @@ class GeneratedLogicalId(object): """ @staticmethod - def implicit_api(): # type: ignore[no-untyped-def] + def implicit_api() -> str: return "ServerlessRestApi" @staticmethod - def implicit_http_api(): # type: ignore[no-untyped-def] + def implicit_http_api() -> str: return "ServerlessHttpApi" diff --git a/samtranslator/model/preferences/deployment_preference_collection.py b/samtranslator/model/preferences/deployment_preference_collection.py index 5e8314d6c..d59550486 100644 --- a/samtranslator/model/preferences/deployment_preference_collection.py +++ b/samtranslator/model/preferences/deployment_preference_collection.py @@ -80,13 +80,13 @@ def get(self, logical_id: str) -> DeploymentPreference: # TODO: find a way to deal with this implicit assumption return cast(DeploymentPreference, self._resource_preferences.get(logical_id)) - def any_enabled(self): # type: ignore[no-untyped-def] + def any_enabled(self) -> bool: """ :return: boolean whether any deployment preferences in the collection are enabled """ return any(preference.enabled for preference in self._resource_preferences.values()) - def can_skip_service_role(self): # type: ignore[no-untyped-def] + def can_skip_service_role(self) -> bool: """ If every one of the deployment preferences have a custom IAM role provided, we can skip creating the service role altogether. @@ -128,7 +128,7 @@ def enabled_logical_ids(self) -> List[str]: """ return [logical_id for logical_id, preference in self._resource_preferences.items() if preference.enabled] - def get_codedeploy_application(self): # type: ignore[no-untyped-def] + def get_codedeploy_application(self) -> CodeDeployApplication: codedeploy_application_resource = CodeDeployApplication(CODEDEPLOY_APPLICATION_LOGICAL_ID) codedeploy_application_resource.ComputePlatform = "Lambda" if self.needs_resource_condition(): @@ -139,7 +139,7 @@ def get_codedeploy_application(self): # type: ignore[no-untyped-def] codedeploy_application_resource.set_resource_attribute("Condition", condition_name) return codedeploy_application_resource - def get_codedeploy_iam_role(self): # type: ignore[no-untyped-def] + def get_codedeploy_iam_role(self) -> IAMRole: iam_role = IAMRole(CODE_DEPLOY_SERVICE_ROLE_LOGICAL_ID) iam_role.AssumeRolePolicyDocument = { "Version": "2012-10-17", @@ -310,5 +310,5 @@ def __ne__(self, other): # type: ignore[no-untyped-def] return not self.__eq__(other) # type: ignore[no-untyped-call] return NotImplemented - def __hash__(self): # type: ignore[no-untyped-def] + def __hash__(self) -> int: return hash(tuple(sorted(self.__dict__.items()))) diff --git a/samtranslator/model/stepfunctions/generators.py b/samtranslator/model/stepfunctions/generators.py index aa5d4725b..f76f5eb15 100644 --- a/samtranslator/model/stepfunctions/generators.py +++ b/samtranslator/model/stepfunctions/generators.py @@ -1,9 +1,10 @@ import json from copy import deepcopy +from typing import Any, Dict, List, Tuple from samtranslator.metrics.method_decorator import cw_timer from samtranslator.model.exceptions import InvalidEventException, InvalidResourceException -from samtranslator.model.iam import IAMRolePolicies +from samtranslator.model.iam import IAMRole, IAMRolePolicies from samtranslator.model.resource_policies import ResourcePolicies from samtranslator.model.role_utils import construct_role_for_resource from samtranslator.model.s3_utils.uri_parser import parse_s3_uri @@ -108,7 +109,7 @@ def to_cloudformation(self): # type: ignore[no-untyped-def] :returns: a list of resources including the State Machine resource. :rtype: list """ - resources = [self.state_machine] + resources: List[Any] = [self.state_machine] # Defaulting to {} will add the DefinitionSubstitutions field on the transform output even when it is not relevant if self.definition_substitutions: @@ -128,7 +129,7 @@ def to_cloudformation(self): # type: ignore[no-untyped-def] self.state_machine.DefinitionSubstitutions = substitutions self.state_machine.DefinitionString = self._build_definition_string(processed_definition) # type: ignore[no-untyped-call] elif self.definition_uri: - self.state_machine.DefinitionS3Location = self._construct_definition_uri() # type: ignore[no-untyped-call] + self.state_machine.DefinitionS3Location = self._construct_definition_uri() else: raise InvalidResourceException( self.logical_id, "Either 'Definition' or 'DefinitionUri' property must be specified." @@ -143,7 +144,7 @@ def to_cloudformation(self): # type: ignore[no-untyped-def] raise Exception("Managed policy map is empty, but should not be.") if not self.policies: self.policies = [] - execution_role = self._construct_role() # type: ignore[no-untyped-call] + execution_role = self._construct_role() self.state_machine.RoleArn = execution_role.get_runtime_attr("arn") resources.append(execution_role) @@ -151,14 +152,14 @@ def to_cloudformation(self): # type: ignore[no-untyped-def] self.state_machine.StateMachineType = self.type self.state_machine.LoggingConfiguration = self.logging self.state_machine.TracingConfiguration = self.tracing - self.state_machine.Tags = self._construct_tag_list() # type: ignore[no-untyped-call] + self.state_machine.Tags = self._construct_tag_list() - event_resources = self._generate_event_resources() # type: ignore[no-untyped-call] + event_resources = self._generate_event_resources() resources.extend(event_resources) return resources - def _construct_definition_uri(self): # type: ignore[no-untyped-def] + def _construct_definition_uri(self) -> Dict[str, Any]: """ Constructs the State Machine's `DefinitionS3 property`_, from the SAM State Machines's DefinitionUri property. @@ -204,7 +205,7 @@ def _build_definition_string(self, definition_dict): # type: ignore[no-untyped- definition_string = fnJoin("\n", definition_lines) return definition_string - def _construct_role(self): # type: ignore[no-untyped-def] + def _construct_role(self) -> IAMRole: """ Constructs a State Machine execution role based on this SAM State Machine's Policies property. @@ -226,14 +227,14 @@ def _construct_role(self): # type: ignore[no-untyped-def] role_path=self.role_path, attributes=self.passthrough_resource_attributes, managed_policy_map=self.managed_policy_map, - assume_role_policy_document=IAMRolePolicies.stepfunctions_assume_role_policy(), # type: ignore[no-untyped-call] + assume_role_policy_document=IAMRolePolicies.stepfunctions_assume_role_policy(), resource_policies=state_machine_policies, - tags=self._construct_tag_list(), # type: ignore[no-untyped-call] + tags=self._construct_tag_list(), permissions_boundary=self.permissions_boundary, ) return execution_role - def _construct_tag_list(self): # type: ignore[no-untyped-def] + def _construct_tag_list(self) -> List[Dict[str, Any]]: """ Transforms the SAM defined Tags into the form CloudFormation is expecting. @@ -243,7 +244,7 @@ def _construct_tag_list(self): # type: ignore[no-untyped-def] sam_tag = {self._SAM_KEY: self._SAM_VALUE} return get_tag_list(sam_tag) + get_tag_list(self.tags) - def _generate_event_resources(self): # type: ignore[no-untyped-def] + def _generate_event_resources(self) -> List[Dict[str, Any]]: """Generates and returns the resources associated with this state machine's event sources. :returns: a list containing the state machine's event resources @@ -282,7 +283,7 @@ def _replace_dynamic_values_with_substitutions(self, input): # type: ignore[no- location = input for step in path[:-1]: location = location[step] - sub_name, sub_key = self._generate_substitution() # type: ignore[no-untyped-call] + sub_name, sub_key = self._generate_substitution() substitution_map[sub_name] = location[path[-1]] location[path[-1]] = sub_key return substitution_map @@ -313,7 +314,7 @@ def _get_paths_to_intrinsics(self, input, path=None): # type: ignore[no-untyped return dynamic_value_paths - def _generate_substitution(self): # type: ignore[no-untyped-def] + def _generate_substitution(self) -> Tuple[str, str]: """ Generates a name and key for a new substitution. diff --git a/samtranslator/plugins/api/implicit_api_plugin.py b/samtranslator/plugins/api/implicit_api_plugin.py index 27245136b..b045e1564 100644 --- a/samtranslator/plugins/api/implicit_api_plugin.py +++ b/samtranslator/plugins/api/implicit_api_plugin.py @@ -1,5 +1,6 @@ import copy +from abc import ABCMeta from typing import Any, Dict, Optional, Type, Union from samtranslator.metrics.method_decorator import cw_timer @@ -14,7 +15,7 @@ from samtranslator.utils.py27hash_fix import Py27Dict -class ImplicitApiPlugin(BasePlugin): +class ImplicitApiPlugin(BasePlugin, metaclass=ABCMeta): """ This plugin provides Implicit API shorthand syntax in the SAM Spec. https://github.com/awslabs/serverless-application-model/blob/master/versions/2016-10-31.md#api @@ -83,7 +84,7 @@ def on_before_transform_template(self, template_dict): # type: ignore[no-untype # removing the "ServerlessRestApi" resource, we just restore what the author defined. self.existing_implicit_api_resource = copy.deepcopy(template.get(self.implicit_api_logical_id)) - template.set(self.implicit_api_logical_id, self._generate_implicit_api_resource()) # type: ignore[no-untyped-call] + template.set(self.implicit_api_logical_id, self._generate_implicit_api_resource()) errors = [] for logicalId, resource in template.iterate( @@ -200,7 +201,7 @@ def _add_api_to_swagger(self, event_id, event_properties, template): # type: ig if isinstance(api_id, dict) or is_referencing_http_from_api_event: raise InvalidEventException( event_id, - f"{self.api_id_property} must be a valid reference to an '{self._get_api_resource_type_name()}'" # type: ignore[no-untyped-call] + f"{self.api_id_property} must be a valid reference to an '{self._get_api_resource_type_name()}'" " resource in same template.", ) @@ -441,18 +442,12 @@ def _maybe_remove_implicit_api(self, template): # type: ignore[no-untyped-def] else: template.delete(self.implicit_api_logical_id) - def _generate_implicit_api_resource(self): # type: ignore[no-untyped-def] + def _generate_implicit_api_resource(self) -> Dict[str, Any]: """ Helper function implemented by child classes that create a new implicit API resource """ - raise NotImplementedError( - "Method _setup_api_properties() must be implemented in a subclass of ImplicitApiPlugin" - ) - def _get_api_resource_type_name(self): # type: ignore[no-untyped-def] + def _get_api_resource_type_name(self) -> str: """ Returns the type of API resource """ - raise NotImplementedError( - "Method _setup_api_properties() must be implemented in a subclass of ImplicitApiPlugin" - ) diff --git a/samtranslator/plugins/api/implicit_http_api_plugin.py b/samtranslator/plugins/api/implicit_http_api_plugin.py index 7a95fa576..cb3e7c880 100644 --- a/samtranslator/plugins/api/implicit_http_api_plugin.py +++ b/samtranslator/plugins/api/implicit_http_api_plugin.py @@ -34,11 +34,11 @@ def __init__(self) -> None: """ super(ImplicitHttpApiPlugin, self).__init__(ImplicitHttpApiPlugin.__name__) - def _setup_api_properties(self): # type: ignore[no-untyped-def] + def _setup_api_properties(self) -> None: """ Sets up properties that are distinct to this plugin """ - self.implicit_api_logical_id = GeneratedLogicalId.implicit_http_api() # type: ignore[no-untyped-call] + self.implicit_api_logical_id = GeneratedLogicalId.implicit_http_api() self.implicit_api_condition = "ServerlessHttpApiCondition" self.api_event_type = "HttpApi" self.api_type = SamResourceType.HttpApi.value diff --git a/samtranslator/plugins/api/implicit_rest_api_plugin.py b/samtranslator/plugins/api/implicit_rest_api_plugin.py index 3f9f77c61..cba19d068 100644 --- a/samtranslator/plugins/api/implicit_rest_api_plugin.py +++ b/samtranslator/plugins/api/implicit_rest_api_plugin.py @@ -1,3 +1,5 @@ +from typing import Any, Dict + from samtranslator.model.naming import GeneratedLogicalId from samtranslator.plugins.api.implicit_api_plugin import ImplicitApiPlugin from samtranslator.public.swagger import SwaggerEditor @@ -33,11 +35,11 @@ def __init__(self) -> None: """ super(ImplicitRestApiPlugin, self).__init__(ImplicitRestApiPlugin.__name__) - def _setup_api_properties(self): # type: ignore[no-untyped-def] + def _setup_api_properties(self) -> None: """ Sets up properties that are distinct to this plugin """ - self.implicit_api_logical_id = GeneratedLogicalId.implicit_api() # type: ignore[no-untyped-call] + self.implicit_api_logical_id = GeneratedLogicalId.implicit_api() self.implicit_api_condition = "ServerlessRestApiCondition" self.api_event_type = "Api" self.api_type = SamResourceType.Api.value @@ -117,7 +119,7 @@ def _add_implicit_api_id_if_necessary(self, event_properties): # type: ignore[n if "RestApiId" not in event_properties: event_properties["RestApiId"] = {"Ref": self.implicit_api_logical_id} - def _generate_implicit_api_resource(self): # type: ignore[no-untyped-def] + def _generate_implicit_api_resource(self) -> Dict[str, Any]: """ Uses the implicit API in this file to generate an Implicit API resource """ @@ -129,7 +131,7 @@ def _get_api_definition_from_editor(self, editor): # type: ignore[no-untyped-de """ return editor.swagger - def _get_api_resource_type_name(self): # type: ignore[no-untyped-def] + def _get_api_resource_type_name(self) -> str: """ Returns the type of API resource """ diff --git a/samtranslator/plugins/application/serverless_app_plugin.py b/samtranslator/plugins/application/serverless_app_plugin.py index 17cfe2754..fabedb4db 100644 --- a/samtranslator/plugins/application/serverless_app_plugin.py +++ b/samtranslator/plugins/application/serverless_app_plugin.py @@ -1,4 +1,4 @@ -from typing import Any, Tuple +from typing import Any, Dict, Tuple import boto3 import json @@ -73,7 +73,7 @@ def __init__(self, sar_client=None, wait_for_template_active_status=False, valid # make sure the flag combination makes sense if self._validate_only is True and self._wait_for_template_active_status is True: message = "Cannot set both validate_only and wait_for_template_active_status flags to True." - raise InvalidPluginException(ServerlessAppPlugin.__name__, message) # type: ignore[no-untyped-call] + raise InvalidPluginException(ServerlessAppPlugin.__name__, message) @staticmethod def _make_app_key(app_id: Any, semver: Any) -> Tuple[str, str]: @@ -148,7 +148,7 @@ def _make_service_call_with_retry(self, service_call, app_id, semver, key, logic error_code = e.response["Error"]["Code"] if error_code == "TooManyRequestsException": LOG.debug("SAR call timed out for application id {}".format(app_id)) - sleep_time = self._get_sleep_time_sec() # type: ignore[no-untyped-call] + sleep_time = self._get_sleep_time_sec() sleep(sleep_time) self._total_wait_time += sleep_time continue @@ -350,7 +350,7 @@ def on_after_transform_template(self, template): # type: ignore[no-untyped-def] break # We were throttled by SAR, break out to a sleep raise e - if self._is_template_active(response, application_id, template_id): # type: ignore[no-untyped-call] + if self._is_template_active(response, application_id, template_id): self._in_progress_templates.remove((application_id, template_id)) else: idx += 1 # check next template @@ -362,7 +362,7 @@ def on_after_transform_template(self, template): # type: ignore[no-untyped-def] break # Sleep a little so we don't spam service calls - sleep_time = self._get_sleep_time_sec() # type: ignore[no-untyped-call] + sleep_time = self._get_sleep_time_sec() sleep(sleep_time) self._total_wait_time += sleep_time @@ -373,10 +373,10 @@ def on_after_transform_template(self, template): # type: ignore[no-untyped-def] application_ids, "Timed out waiting for nested stack templates to reach ACTIVE status." ) - def _get_sleep_time_sec(self): # type: ignore[no-untyped-def] + def _get_sleep_time_sec(self) -> int: return self.SLEEP_TIME_SECONDS - def _is_template_active(self, response, application_id, template_id): # type: ignore[no-untyped-def] + def _is_template_active(self, response: Dict[str, Any], application_id: str, template_id: str) -> bool: """ Checks the response from a SAR service call; returns True if the template is active, throws an exception if the request expired and returns False in all other cases. @@ -385,10 +385,13 @@ def _is_template_active(self, response, application_id, template_id): # type: i :param string application_id: the ApplicationId :param string template_id: the unique TemplateId for this application """ - status = response["Status"] # options: PREPARING, EXPIRED or ACTIVE + status: str = response["Status"] # options: PREPARING, EXPIRED or ACTIVE if status == "EXPIRED": - message = f"Template for {application_id} with id {template_id} returned status: {status}. Cannot access an expired template." + message = ( + f"Template for {application_id} with id {template_id} returned status: {status}. " + "Cannot access an expired template." + ) raise InvalidResourceException(application_id, message) return status == "ACTIVE" diff --git a/samtranslator/plugins/exceptions.py b/samtranslator/plugins/exceptions.py index 9f68b1108..c5553efd6 100644 --- a/samtranslator/plugins/exceptions.py +++ b/samtranslator/plugins/exceptions.py @@ -6,10 +6,10 @@ class InvalidPluginException(Exception): message -- explanation of the error """ - def __init__(self, plugin_name, message): # type: ignore[no-untyped-def] + def __init__(self, plugin_name: str, message: str) -> None: self._plugin_name = plugin_name self._message = message @property - def message(self): # type: ignore[no-untyped-def] + def message(self) -> str: return "The {} plugin is invalid. {}".format(self._plugin_name, self._message) diff --git a/samtranslator/plugins/globals/globals.py b/samtranslator/plugins/globals/globals.py index 74c65aa48..262b7d503 100644 --- a/samtranslator/plugins/globals/globals.py +++ b/samtranslator/plugins/globals/globals.py @@ -475,5 +475,5 @@ def __init__(self, logical_id, message): # type: ignore[no-untyped-def] self._message = message @property - def message(self): # type: ignore[no-untyped-def] + def message(self) -> str: return "'{}' section is invalid. {}".format(self._logical_id, self._message) diff --git a/samtranslator/sdk/resource.py b/samtranslator/sdk/resource.py index a5e212ab3..8e5a1cafd 100644 --- a/samtranslator/sdk/resource.py +++ b/samtranslator/sdk/resource.py @@ -31,7 +31,7 @@ def __init__(self, resource_dict: Dict[str, Any]) -> None: # Properties is *not* required. Ex: SimpleTable resource has no required properties self.properties = resource_dict.get("Properties", {}) - def valid(self): # type: ignore[no-untyped-def] + def valid(self) -> bool: """ Checks if the resource data is valid @@ -59,11 +59,12 @@ def valid(self): # type: ignore[no-untyped-def] [InvalidTemplateException("Every UpdateReplacePolicy member must be a string.")] ) - return SamResourceType.has_value(self.type) # type: ignore[no-untyped-call] + # TODO: should we raise exception if `self.type` is not a string? + return isinstance(self.type, str) and SamResourceType.has_value(self.type) def to_dict(self) -> Dict[str, Any]: - if self.valid(): # type: ignore[no-untyped-call] + if self.valid(): # Touch a resource dictionary ONLY if it is valid # Modify only Type & Properties section to preserve CloudFormation properties like DependsOn, Conditions etc self.resource_dict["Type"] = self.type @@ -86,7 +87,7 @@ class SamResourceType(Enum): StateMachine = "AWS::Serverless::StateMachine" @classmethod - def has_value(cls, value): # type: ignore[no-untyped-def] + def has_value(cls, value: str) -> bool: """ Checks if the given value belongs to the Enum diff --git a/samtranslator/sdk/template.py b/samtranslator/sdk/template.py index 0db0d8ded..3cf4cfb17 100644 --- a/samtranslator/sdk/template.py +++ b/samtranslator/sdk/template.py @@ -33,7 +33,7 @@ def iterate(self, resource_types: Optional[Set[str]] = None) -> Iterator[Tuple[s for logicalId, resource_dict in self.resources.items(): resource = SamResource(resource_dict) - needs_filter = resource.valid() # type: ignore[no-untyped-call] + needs_filter = resource.valid() if resource_types: needs_filter = needs_filter and resource.type in resource_types @@ -76,7 +76,7 @@ def delete(self, logicalId): # type: ignore[no-untyped-def] if logicalId in self.resources: del self.resources[logicalId] - def to_dict(self): # type: ignore[no-untyped-def] + def to_dict(self) -> Dict[str, Any]: """ Returns the template as a dictionary diff --git a/samtranslator/translator/logical_id_generator.py b/samtranslator/translator/logical_id_generator.py index ca44ad2e4..32b4d5b07 100644 --- a/samtranslator/translator/logical_id_generator.py +++ b/samtranslator/translator/logical_id_generator.py @@ -1,6 +1,5 @@ import hashlib import json -import sys from typing import Any, Optional @@ -21,7 +20,7 @@ def __init__(self, prefix: str, data_obj: Optional[Any] = None, data_hash: Optio data_str = "" if data_obj: - data_str = self._stringify(data_obj) # type: ignore[no-untyped-call] + data_str = self._stringify(data_obj) self._prefix = prefix self.data_str = data_str @@ -45,10 +44,10 @@ def gen(self) -> str: :rtype string """ - data_hash = self.get_hash() # type: ignore[no-untyped-call] + data_hash = self.get_hash() return "{prefix}{hash}".format(prefix=self._prefix, hash=data_hash) - def get_hash(self, length=HASH_LENGTH): # type: ignore[no-untyped-def] + def get_hash(self, length: int = HASH_LENGTH) -> str: """ Generate and return a hash of data that can be used as suffix of logicalId @@ -63,20 +62,12 @@ def get_hash(self, length=HASH_LENGTH): # type: ignore[no-untyped-def] if not self.data_str: return data_hash - encoded_data_str = self.data_str - if sys.version_info.major == 2: - # In Py2, only unicode needs to be encoded. - if isinstance(self.data_str, unicode): # type: ignore[name-defined] # pylint: disable=E0602 - encoded_data_str = self.data_str.encode("utf-8") - else: - # data_str should always be unicode on python 3 - encoded_data_str = self.data_str.encode("utf-8") # type: ignore[assignment] - - data_hash = hashlib.sha1(encoded_data_str).hexdigest() # type: ignore[arg-type] + encoded_data_str = self.data_str.encode("utf-8") + data_hash = hashlib.sha1(encoded_data_str).hexdigest() return data_hash[:length] - def _stringify(self, data): # type: ignore[no-untyped-def] + def _stringify(self, data: Any) -> str: """ Stable, platform & language-independent stringification of a data with basic Python type. diff --git a/samtranslator/translator/translator.py b/samtranslator/translator/translator.py index bddd6cbe3..d19be4059 100644 --- a/samtranslator/translator/translator.py +++ b/samtranslator/translator/translator.py @@ -1,4 +1,5 @@ import copy +from typing import TYPE_CHECKING from samtranslator.metrics.method_decorator import MetricsMethodWrapperSingleton from samtranslator.metrics.metrics import DummyMetricsPublisher, Metrics @@ -49,7 +50,7 @@ def __init__(self, managed_policy_map, sam_parser, plugins=None, boto_session=No self.feature_toggle = None self.boto_session = boto_session self.metrics = metrics if metrics else Metrics("ServerlessTransform", DummyMetricsPublisher()) # type: ignore[no-untyped-call, no-untyped-call] - MetricsMethodWrapperSingleton.set_instance(self.metrics) # type: ignore[no-untyped-call] + MetricsMethodWrapperSingleton.set_instance(self.metrics) self._translated_resouce_mapping = {} if self.boto_session: @@ -185,15 +186,15 @@ def translate( except (InvalidResourceException, InvalidEventException, InvalidTemplateException) as e: document_errors.append(e) # type: ignore[arg-type] - if deployment_preference_collection.any_enabled(): # type: ignore[no-untyped-call] - template["Resources"].update(deployment_preference_collection.get_codedeploy_application().to_dict()) # type: ignore[no-untyped-call] + if deployment_preference_collection.any_enabled(): + template["Resources"].update(deployment_preference_collection.get_codedeploy_application().to_dict()) if deployment_preference_collection.needs_resource_condition(): new_conditions = deployment_preference_collection.create_aggregate_deployment_condition() if new_conditions: template.get("Conditions", {}).update(new_conditions) - if not deployment_preference_collection.can_skip_service_role(): # type: ignore[no-untyped-call] - template["Resources"].update(deployment_preference_collection.get_codedeploy_iam_role().to_dict()) # type: ignore[no-untyped-call] + if not deployment_preference_collection.can_skip_service_role(): + template["Resources"].update(deployment_preference_collection.get_codedeploy_iam_role().to_dict()) for logical_id in deployment_preference_collection.enabled_logical_ids(): try: @@ -282,8 +283,8 @@ def prepare_plugins(plugins: List[Any], parameters: Optional[Dict[str, Any]] = N parameters = {} required_plugins = [ DefaultDefinitionBodyPlugin(), - make_implicit_rest_api_plugin(), # type: ignore[no-untyped-call] - make_implicit_http_api_plugin(), # type: ignore[no-untyped-call] + make_implicit_rest_api_plugin(), + make_implicit_http_api_plugin(), GlobalsPlugin(), make_policy_template_for_function_plugin(), ] @@ -299,14 +300,19 @@ def prepare_plugins(plugins: List[Any], parameters: Optional[Dict[str, Any]] = N return SamPlugins(plugins + required_plugins) -def make_implicit_rest_api_plugin(): # type: ignore[no-untyped-def] +if TYPE_CHECKING: + from samtranslator.plugins.api.implicit_rest_api_plugin import ImplicitRestApiPlugin + from samtranslator.plugins.api.implicit_http_api_plugin import ImplicitHttpApiPlugin + + +def make_implicit_rest_api_plugin() -> "ImplicitRestApiPlugin": # This is necessary to prevent a circular dependency on imports when loading package from samtranslator.plugins.api.implicit_rest_api_plugin import ImplicitRestApiPlugin return ImplicitRestApiPlugin() -def make_implicit_http_api_plugin(): # type: ignore[no-untyped-def] +def make_implicit_http_api_plugin() -> "ImplicitHttpApiPlugin": # This is necessary to prevent a circular dependency on imports when loading package from samtranslator.plugins.api.implicit_http_api_plugin import ImplicitHttpApiPlugin diff --git a/samtranslator/utils/py27hash_fix.py b/samtranslator/utils/py27hash_fix.py index f0f1f44e2..b22178250 100644 --- a/samtranslator/utils/py27hash_fix.py +++ b/samtranslator/utils/py27hash_fix.py @@ -4,10 +4,9 @@ import ctypes import copy import json -import sys import logging -from typing import Any, Dict, List +from typing import Any, Dict, Iterator, List, cast from samtranslator.parser.parser import Parser from samtranslator.third_party.py27hash.hash import Hash @@ -114,17 +113,13 @@ class Py27UniStr(unicode_string_type): def __add__(self, other): # type: ignore[no-untyped-def] return Py27UniStr(super(Py27UniStr, self).__add__(other)) - def __repr__(self): # type: ignore[no-untyped-def] - if sys.version_info.major >= 3: - return "u" + super(Py27UniStr, self).encode("unicode_escape").decode("ascii").__repr__().replace( - "\\\\", "\\" - ) - return super(Py27UniStr, self).__repr__() + def __repr__(self) -> str: + return "u" + super(Py27UniStr, self).encode("unicode_escape").decode("ascii").__repr__().replace("\\\\", "\\") - def upper(self): # type: ignore[no-untyped-def] + def upper(self) -> "Py27UniStr": return Py27UniStr(super(Py27UniStr, self).upper()) - def lower(self): # type: ignore[no-untyped-def] + def lower(self) -> "Py27UniStr": return Py27UniStr(super(Py27UniStr, self).lower()) def replace(self, __old, __new, __count=None): # type: ignore[no-untyped-def] @@ -153,8 +148,8 @@ class Py27LongInt(long_int_type): PY2_MAX_INT = 9223372036854775807 # sys.maxint from Python2.7 Lambda runtime - def __repr__(self): # type: ignore[no-untyped-def] - if sys.version_info.major >= 3 and self > Py27LongInt.PY2_MAX_INT: + def __repr__(self) -> str: + if self > Py27LongInt.PY2_MAX_INT: return super(Py27LongInt, self).__repr__() + "L" return super(Py27LongInt, self).__repr__() @@ -171,12 +166,14 @@ class Py27Keys(object): in determining the iteration order. """ - DUMMY = ["dummy"] # marker for deleted keys + # marker for deleted keys + # we use DUMMY for a dummy key, force it to be treated as a str to avoid mypy unhappy + DUMMY: str = cast(str, ["dummy"]) def __init__(self) -> None: super(Py27Keys, self).__init__() self.debug = False - self.keyorder: Dict[int, List[str]] = {} + self.keyorder: Dict[int, str] = {} self.size = 0 # current size of the keys, equivalent to ma_used in dictobject.c self.fill = 0 # increment count when a key is added, equivalent to ma_fill in dictobject.c self.mask = MINSIZE - 1 # Python2 default dict size @@ -185,7 +182,7 @@ def __deepcopy__(self, memo): # type: ignore[no-untyped-def] # add keys in the py2 order -- we can't do a straigh-up deep copy of keyorder because # in py2 copy.deepcopy of a dict may result in reordering of the keys ret = Py27Keys() - for k in self.keys(): # type: ignore[no-untyped-call] + for k in self.keys(): if k is self.DUMMY: continue ret.add(copy.deepcopy(k, memo)) # type: ignore[no-untyped-call] @@ -273,7 +270,7 @@ def add(self, key): # type: ignore[no-untyped-def] # Python2 dict increases size by a factor of 4 for small dict, and 2 for large dict self._resize(self.size * (2 if self.size > 50000 else 4)) # type: ignore[no-untyped-call] - def keys(self): # type: ignore[no-untyped-def] + def keys(self) -> List[str]: """Return keys in Python2 order""" return [self.keyorder[key] for key in sorted(self.keyorder.keys()) if self.keyorder[key] is not self.DUMMY] @@ -284,7 +281,7 @@ def __setstate__(self, state): # type: ignore[no-untyped-def] :param state: input state """ self.__dict__ = state - keys = self.keys() # type: ignore[no-untyped-call] + keys = self.keys() # Clear keys and re-add to match deserialization logic self.__init__() # type: ignore[misc] @@ -294,21 +291,21 @@ def __setstate__(self, state): # type: ignore[no-untyped-def] continue self.add(k) # type: ignore[no-untyped-call] - def __iter__(self): # type: ignore[no-untyped-def] + def __iter__(self) -> Iterator[str]: """ Default iterator """ - return iter(self.keys()) # type: ignore[no-untyped-call] + return iter(self.keys()) def __eq__(self, other): # type: ignore[no-untyped-def] if isinstance(other, Py27Keys): - return self.keys() == other.keys() # type: ignore[no-untyped-call] + return self.keys() == other.keys() if isinstance(other, list): - return self.keys() == other # type: ignore[no-untyped-call] + return self.keys() == other return False - def __len__(self): # type: ignore[no-untyped-def] - return len(self.keys()) # type: ignore[no-untyped-call] + def __len__(self) -> int: + return len(self.keys()) def merge(self, other): # type: ignore[no-untyped-def] """ @@ -329,7 +326,7 @@ def merge(self, other): # type: ignore[no-untyped-def] for k in other: self.add(k) # type: ignore[no-untyped-call] - def copy(self): # type: ignore[no-untyped-def] + def copy(self) -> "Py27Keys": """ Makes a copy of self """ @@ -343,7 +340,7 @@ def pop(self): # type: ignore[no-untyped-def] Pops the top element from the sorted keys if it exists. Returns None otherwise. """ if self.keyorder: - value = self.keys()[0] # type: ignore[no-untyped-call] + value = self.keys()[0] self.remove(value) # type: ignore[no-untyped-call] return value return None @@ -429,14 +426,14 @@ def update(self, *args, **kwargs): # type: ignore[no-untyped-def] for k, v in dict(**kwargs).items(): self[k] = v - def clear(self): # type: ignore[no-untyped-def] + def clear(self) -> None: """ Clears the dict along with its backing Python2.7 keylist. """ super(Py27Dict, self).clear() self.keylist = Py27Keys() - def copy(self): # type: ignore[no-untyped-def] + def copy(self) -> "Py27Dict": """ Copies the dict along with its backing Python2.7 keylist. @@ -448,7 +445,7 @@ def copy(self): # type: ignore[no-untyped-def] new = Py27Dict() # First copy the keylist to the new object - new.keylist = self.keylist.copy() # type: ignore[no-untyped-call] + new.keylist = self.keylist.copy() # Copy keys into backing dict for k, v in self.items(): # type: ignore[no-untyped-call] @@ -494,7 +491,7 @@ def popitem(self): # type: ignore[no-untyped-def] return None - def __iter__(self): # type: ignore[no-untyped-def] + def __iter__(self) -> Iterator[str]: """ Default iterator @@ -502,9 +499,9 @@ def __iter__(self): # type: ignore[no-untyped-def] ------- iterator """ - return self.keylist.__iter__() # type: ignore[no-untyped-call] + return self.keylist.__iter__() - def __str__(self): # type: ignore[no-untyped-def] + def __str__(self) -> str: """ Override to minic exact Python2.7 str(dict_obj) @@ -529,7 +526,7 @@ def __str__(self): # type: ignore[no-untyped-def] string += "}" return string - def __repr__(self): # type: ignore[no-untyped-def] + def __repr__(self) -> str: """ Create a string version of this Dict @@ -537,7 +534,7 @@ def __repr__(self): # type: ignore[no-untyped-def] ------- str """ - return self.__str__() # type: ignore[no-untyped-call] + return self.__str__() def keys(self): # type: ignore[no-untyped-def] """ @@ -548,7 +545,7 @@ def keys(self): # type: ignore[no-untyped-def] list list of keys """ - return self.keylist.keys() # type: ignore[no-untyped-call] + return self.keylist.keys() def values(self): # type: ignore[no-untyped-def] """ diff --git a/tests/feature_toggle/test_dialup.py b/tests/feature_toggle/test_dialup.py index 2a613d862..a3b0bd9f0 100644 --- a/tests/feature_toggle/test_dialup.py +++ b/tests/feature_toggle/test_dialup.py @@ -4,19 +4,17 @@ from samtranslator.feature_toggle.dialup import * -class TestBaseDialup(TestCase): - def test___str__(self): - region_config = {} - dialup = BaseDialup(region_config) - self.assertEqual(str(dialup), "BaseDialup") - - class TestDisabledDialup(TestCase): def test_is_enabled(self): region_config = {} dialup = DisabledDialup(region_config) self.assertFalse(dialup.is_enabled()) + def test___str__(self): + region_config = {} + dialup = DisabledDialup(region_config) + self.assertEqual(str(dialup), "DisabledDialup") + class TestToggleDialUp(TestCase): @parameterized.expand( @@ -30,6 +28,10 @@ def test_is_enabled(self, region_config, expected): dialup = ToggleDialup(region_config) self.assertEqual(dialup.is_enabled(), expected) + def test___str__(self): + dialup = ToggleDialup({}) + self.assertEqual(str(dialup), "ToggleDialup") + class TestSimpleAccountPercentileDialup(TestCase): @parameterized.expand( @@ -63,3 +65,11 @@ def test__get_account_percentile(self, account_id, feature_name): feature_name=feature_name, ) self.assertTrue(0 <= dialup._get_account_percentile() < 100) + + def test___str__(self): + dialup = SimpleAccountPercentileDialup( + region_config={}, + account_id="123456789012", + feature_name="feat", + ) + self.assertEqual(str(dialup), "SimpleAccountPercentileDialup") diff --git a/tests/translator/test_translator.py b/tests/translator/test_translator.py index 8e2d87194..6940f3e57 100644 --- a/tests/translator/test_translator.py +++ b/tests/translator/test_translator.py @@ -62,12 +62,8 @@ def deep_sort_lists(value): if isinstance(value, dict): return {k: deep_sort_lists(v) for k, v in value.items()} if isinstance(value, list): - if sys.version_info.major < 3: - # Py2 can sort lists with complex types like dictionaries - return sorted((deep_sort_lists(x) for x in value)) - else: - # Py3 cannot sort lists with complex types. Hence a custom comparator function - return sorted((deep_sort_lists(x) for x in value), key=cmp_to_key(custom_list_data_comparator)) + # Py3 cannot sort lists with complex types. Hence a custom comparator function + return sorted((deep_sort_lists(x) for x in value), key=cmp_to_key(custom_list_data_comparator)) else: return value