diff --git a/samtranslator/model/eventsources/pull.py b/samtranslator/model/eventsources/pull.py index 9daa327fc..21bfdb8e9 100644 --- a/samtranslator/model/eventsources/pull.py +++ b/samtranslator/model/eventsources/pull.py @@ -1,3 +1,4 @@ +from abc import ABCMeta, abstractmethod from typing import Any, Dict, List, Optional from samtranslator.metrics.method_decorator import cw_timer @@ -15,7 +16,7 @@ from samtranslator.validator.value_validator import sam_expect -class PullEventSource(ResourceMacro): +class PullEventSource(ResourceMacro, metaclass=ABCMeta): """Base class for pull event sources for SAM Functions. The pull events are Kinesis Streams, DynamoDB Streams, Kafka Topics, Amazon MQ Queues and SQS Queues. All of these correspond to an @@ -32,11 +33,8 @@ class PullEventSource(ResourceMacro): # line to avoid any potential behavior change. # TODO: Make `PullEventSource` an abstract class and not giving `resource_type` initial value. resource_type: str = None # type: ignore - requires_stream_queue_broker = True relative_id: str # overriding the Optional[str]: for event, relative id is not None - property_types = { - "Stream": PropertyType(False, IS_STR), - "Queue": PropertyType(False, IS_STR), + property_types: Dict[str, PropertyType] = { "BatchSize": PropertyType(False, is_type(int)), "StartingPosition": PassThroughProperty(False), "StartingPositionTimestamp": PassThroughProperty(False), @@ -48,7 +46,6 @@ class PullEventSource(ResourceMacro): "DestinationConfig": PropertyType(False, IS_DICT), "ParallelizationFactor": PropertyType(False, is_type(int)), "Topics": PropertyType(False, is_type(list)), - "Broker": PropertyType(False, IS_STR), "Queues": PropertyType(False, is_type(list)), "SourceAccessConfigurations": PropertyType(False, is_type(list)), "SecretsManagerKmsKeyId": PropertyType(False, IS_STR), @@ -59,8 +56,6 @@ class PullEventSource(ResourceMacro): "ConsumerGroupId": PropertyType(False, IS_STR), } - Stream: Optional[Intrinsicable[str]] - Queue: Optional[Intrinsicable[str]] BatchSize: Optional[Intrinsicable[int]] StartingPosition: Optional[PassThrough] StartingPositionTimestamp: Optional[PassThrough] @@ -72,7 +67,6 @@ class PullEventSource(ResourceMacro): DestinationConfig: Optional[Dict[str, Any]] ParallelizationFactor: Optional[Intrinsicable[int]] Topics: Optional[List[Any]] - Broker: Optional[Intrinsicable[str]] Queues: Optional[List[Any]] SourceAccessConfigurations: Optional[List[Any]] SecretsManagerKmsKeyId: Optional[str] @@ -82,11 +76,17 @@ class PullEventSource(ResourceMacro): FilterCriteria: Optional[Dict[str, Any]] ConsumerGroupId: Optional[Intrinsicable[str]] - def get_policy_arn(self): # type: ignore[no-untyped-def] - raise NotImplementedError("Subclass must implement this method") + @abstractmethod + def get_policy_arn(self) -> Optional[str]: + """Policy to be added to the role (if a role applies).""" - def get_policy_statements(self): # type: ignore[no-untyped-def] - raise NotImplementedError("Subclass must implement this method") + @abstractmethod + def get_policy_statements(self) -> Optional[List[Dict[str, Any]]]: + """Inline policy statements to be added to the role (if a role applies).""" + + @abstractmethod + def get_event_source_arn(self) -> Optional[PassThrough]: + """Return the value to assign to lambda event source mapping's EventSourceArn.""" @cw_timer(prefix=FUNCTION_EVETSOURCE_METRIC_PREFIX) def to_cloudformation(self, **kwargs): # type: ignore[no-untyped-def] @@ -115,17 +115,8 @@ def to_cloudformation(self, **kwargs): # type: ignore[no-untyped-def] except NotImplementedError: function_name_or_arn = function.get_runtime_attr("arn") - if self.requires_stream_queue_broker and not self.Stream and not self.Queue and not self.Broker: - raise InvalidEventException( - self.relative_id, - "No Queue (for SQS) or Stream (for Kinesis, DynamoDB or MSK) or Broker (for Amazon MQ) provided.", - ) - - if self.Stream and not self.StartingPosition: - raise InvalidEventException(self.relative_id, "StartingPosition is required for Kinesis, DynamoDB and MSK.") - lambda_eventsourcemapping.FunctionName = function_name_or_arn - lambda_eventsourcemapping.EventSourceArn = self.Stream or self.Queue or self.Broker + lambda_eventsourcemapping.EventSourceArn = self.get_event_source_arn() lambda_eventsourcemapping.StartingPosition = self.StartingPosition lambda_eventsourcemapping.StartingPositionTimestamp = self.StartingPositionTimestamp lambda_eventsourcemapping.BatchSize = self.BatchSize @@ -202,8 +193,8 @@ def _link_policy(self, role, destination_config_policy=None): # type: ignore[no :param model.iam.IAMRole role: the execution role generated for the function """ - policy_arn = self.get_policy_arn() # type: ignore[no-untyped-call] - policy_statements = self.get_policy_statements() # type: ignore[no-untyped-call] + policy_arn = self.get_policy_arn() + policy_statements = self.get_policy_statements() if role is not None: if policy_arn is not None and policy_arn not in role.ManagedPolicyArns: role.ManagedPolicyArns.append(policy_arn) @@ -250,11 +241,21 @@ class Kinesis(PullEventSource): """Kinesis event source.""" resource_type = "Kinesis" + property_types: Dict[str, PropertyType] = { + **PullEventSource.property_types, + "Stream": PassThroughProperty(True), + "StartingPosition": PassThroughProperty(True), + } + + Stream: PassThrough + + def get_event_source_arn(self) -> Optional[PassThrough]: + return self.Stream - def get_policy_arn(self): # type: ignore[no-untyped-def] + def get_policy_arn(self) -> Optional[str]: return ArnGenerator.generate_aws_managed_policy_arn("service-role/AWSLambdaKinesisExecutionRole") - def get_policy_statements(self): # type: ignore[no-untyped-def] + def get_policy_statements(self) -> Optional[List[Dict[str, Any]]]: return None @@ -262,11 +263,21 @@ class DynamoDB(PullEventSource): """DynamoDB Streams event source.""" resource_type = "DynamoDB" + property_types: Dict[str, PropertyType] = { + **PullEventSource.property_types, + "Stream": PassThroughProperty(True), + "StartingPosition": PassThroughProperty(True), + } + + Stream: PassThrough + + def get_event_source_arn(self) -> Optional[PassThrough]: + return self.Stream - def get_policy_arn(self): # type: ignore[no-untyped-def] + def get_policy_arn(self) -> Optional[str]: return ArnGenerator.generate_aws_managed_policy_arn("service-role/AWSLambdaDynamoDBExecutionRole") - def get_policy_statements(self): # type: ignore[no-untyped-def] + def get_policy_statements(self) -> Optional[List[Dict[str, Any]]]: return None @@ -274,11 +285,20 @@ class SQS(PullEventSource): """SQS Queue event source.""" resource_type = "SQS" + property_types: Dict[str, PropertyType] = { + **PullEventSource.property_types, + "Queue": PassThroughProperty(True), + } + + Queue: PassThrough + + def get_event_source_arn(self) -> Optional[PassThrough]: + return self.Queue - def get_policy_arn(self): # type: ignore[no-untyped-def] + def get_policy_arn(self) -> Optional[str]: return ArnGenerator.generate_aws_managed_policy_arn("service-role/AWSLambdaSQSQueueExecutionRole") - def get_policy_statements(self): # type: ignore[no-untyped-def] + def get_policy_statements(self) -> Optional[List[Dict[str, Any]]]: return None @@ -286,11 +306,21 @@ class MSK(PullEventSource): """MSK event source.""" resource_type = "MSK" + property_types: Dict[str, PropertyType] = { + **PullEventSource.property_types, + "Stream": PassThroughProperty(True), + "StartingPosition": PassThroughProperty(True), + } + + Stream: PassThrough + + def get_event_source_arn(self) -> Optional[PassThrough]: + return self.Stream - def get_policy_arn(self): # type: ignore[no-untyped-def] + def get_policy_arn(self) -> Optional[str]: return ArnGenerator.generate_aws_managed_policy_arn("service-role/AWSLambdaMSKExecutionRole") - def get_policy_statements(self): # type: ignore[no-untyped-def] + def get_policy_statements(self) -> Optional[List[Dict[str, Any]]]: if self.SourceAccessConfigurations: for conf in self.SourceAccessConfigurations: # Lambda does not support multiple CLIENT_CERTIFICATE_TLS_AUTH configurations @@ -312,18 +342,27 @@ def get_policy_statements(self): # type: ignore[no-untyped-def] } ] - return None + return None class MQ(PullEventSource): """MQ event source.""" resource_type = "MQ" + property_types: Dict[str, PropertyType] = { + **PullEventSource.property_types, + "Broker": PassThroughProperty(True), + } + + Broker: PassThrough - def get_policy_arn(self): # type: ignore[no-untyped-def] + def get_event_source_arn(self) -> Optional[PassThrough]: + return self.Broker + + def get_policy_arn(self) -> Optional[str]: return None - def get_policy_statements(self): # type: ignore[no-untyped-def] + def get_policy_statements(self) -> Optional[List[Dict[str, Any]]]: if not self.SourceAccessConfigurations: raise InvalidEventException( self.relative_id, @@ -404,7 +443,6 @@ class SelfManagedKafka(PullEventSource): """ resource_type = "SelfManagedKafka" - requires_stream_queue_broker = False AUTH_MECHANISM = [ "SASL_SCRAM_256_AUTH", "SASL_SCRAM_512_AUTH", @@ -413,10 +451,13 @@ class SelfManagedKafka(PullEventSource): "SERVER_ROOT_CA_CERTIFICATE", ] - def get_policy_arn(self): # type: ignore[no-untyped-def] + def get_event_source_arn(self) -> Optional[PassThrough]: + return None + + def get_policy_arn(self) -> Optional[str]: return None - def get_policy_statements(self): # type: ignore[no-untyped-def] + def get_policy_statements(self) -> Optional[List[Dict[str, Any]]]: if not self.KafkaBootstrapServers: raise InvalidEventException( self.relative_id, diff --git a/tests/translator/output/error_missing_broker.json b/tests/translator/output/error_missing_broker.json index 4e71d7e1a..74320f2b1 100644 --- a/tests/translator/output/error_missing_broker.json +++ b/tests/translator/output/error_missing_broker.json @@ -1,8 +1,3 @@ { - "errorMessage": "Invalid Serverless Application Specification document. Number of errors found: 1. Resource with id [MQFunction] is invalid. Event with id [MyMQQueue] is invalid. No Queue (for SQS) or Stream (for Kinesis, DynamoDB or MSK) or Broker (for Amazon MQ) provided.", - "errors": [ - { - "errorMessage": "Resource with id [MQFunction] is invalid. Event with id [MyMQQueue] is invalid. No Queue (for SQS) or Stream (for Kinesis, DynamoDB or MSK) or Broker (for Amazon MQ) provided." - } - ] + "errorMessage": "Invalid Serverless Application Specification document. Number of errors found: 1. Resource with id [MQFunctionMyMQQueue] is invalid. Missing required property 'Broker'." } diff --git a/tests/translator/output/error_missing_queue.json b/tests/translator/output/error_missing_queue.json index 5ce5535bf..e1b366ab0 100644 --- a/tests/translator/output/error_missing_queue.json +++ b/tests/translator/output/error_missing_queue.json @@ -1,8 +1,3 @@ { - "errorMessage": "Invalid Serverless Application Specification document. Number of errors found: 1. Resource with id [SQSFunction] is invalid. Event with id [MySqsQueue] is invalid. No Queue (for SQS) or Stream (for Kinesis, DynamoDB or MSK) or Broker (for Amazon MQ) provided.", - "errors": [ - { - "errorMessage": "Resource with id [SQSFunction] is invalid. Event with id [MySqsQueue] is invalid. No Queue (for SQS) or Stream (for Kinesis, DynamoDB or MSK) or Broker (for Amazon MQ) provided." - } - ] + "errorMessage": "Invalid Serverless Application Specification document. Number of errors found: 1. Resource with id [SQSFunctionMySqsQueue] is invalid. Missing required property 'Queue'." } diff --git a/tests/translator/output/error_missing_startingposition.json b/tests/translator/output/error_missing_startingposition.json index 1aaa06467..397272ef8 100644 --- a/tests/translator/output/error_missing_startingposition.json +++ b/tests/translator/output/error_missing_startingposition.json @@ -1,8 +1,3 @@ { - "errorMessage": "Invalid Serverless Application Specification document. Number of errors found: 1. Resource with id [KinesisFunction] is invalid. Event with id [MyKinesisStream] is invalid. StartingPosition is required for Kinesis, DynamoDB and MSK.", - "errors": [ - { - "errorMessage": "Resource with id [KinesisFunction] is invalid. Event with id [MyKinesisStream] is invalid. StartingPosition is required for Kinesis, DynamoDB and MSK." - } - ] + "errorMessage": "Invalid Serverless Application Specification document. Number of errors found: 1. Resource with id [KinesisFunctionMyKinesisStream] is invalid. Missing required property 'StartingPosition'." } diff --git a/tests/translator/output/error_missing_stream.json b/tests/translator/output/error_missing_stream.json index d8e00ba19..43386d129 100644 --- a/tests/translator/output/error_missing_stream.json +++ b/tests/translator/output/error_missing_stream.json @@ -1,8 +1,3 @@ { - "errorMessage": "Invalid Serverless Application Specification document. Number of errors found: 1. Resource with id [DynamoDBFunction] is invalid. Event with id [MyDDBStream] is invalid. No Queue (for SQS) or Stream (for Kinesis, DynamoDB or MSK) or Broker (for Amazon MQ) provided.", - "errors": [ - { - "errorMessage": "Resource with id [DynamoDBFunction] is invalid. Event with id [MyDDBStream] is invalid. No Queue (for SQS) or Stream (for Kinesis, DynamoDB or MSK) or Broker (for Amazon MQ) provided." - } - ] + "errorMessage": "Invalid Serverless Application Specification document. Number of errors found: 1. Resource with id [DynamoDBFunctionMyDDBStream] is invalid. Missing required property 'Stream'." }