-
Notifications
You must be signed in to change notification settings - Fork 2.4k
/
events.py
504 lines (403 loc) · 23.9 KB
/
events.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
import json
from typing import Any, Dict, Optional
from samtranslator.metrics.method_decorator import cw_timer
from samtranslator.model import Property, PropertyType, ResourceMacro, Resource
from samtranslator.model.events import EventsRule
from samtranslator.model.iam import IAMRole, IAMRolePolicies
from samtranslator.model.types import is_str, is_type
from samtranslator.model.intrinsics import fnSub
from samtranslator.translator import logical_id_generator
from samtranslator.model.exceptions import InvalidEventException
from samtranslator.model.eventbridge_utils import EventBridgeRuleUtils
from samtranslator.model.eventsources.push import Api as PushApi
from samtranslator.swagger.swagger import SwaggerEditor
CONDITION = "Condition"
SFN_EVETSOURCE_METRIC_PREFIX = "SFNEventSource"
class EventSource(ResourceMacro):
"""Base class for event sources for SAM State Machine.
:cvar str principal: The AWS service principal of the source service.
"""
# Note(xinhol): `EventSource` should have been an abstract class. Disabling the type check for the next
# line to avoid any potential behavior change.
# TODO: Make `EventSource` an abstract class and not giving `principal` initial value.
principal: str = None # type: ignore
Target: Optional[Dict[str, str]]
def _generate_logical_id(self, prefix, suffix, resource_type): # type: ignore[no-untyped-def]
"""Helper utility to generate a logicial ID for a new resource
:param string prefix: Prefix to use for the logical ID of the resource
:param string suffix: Suffix to add for the logical ID of the resource
:param string resource_type: Type of the resource
:returns: the logical ID for the new resource
:rtype: string
"""
if prefix is None:
prefix = self.logical_id
if suffix.isalnum():
logical_id = prefix + resource_type + suffix
else:
generator = logical_id_generator.LogicalIdGenerator(prefix + resource_type, suffix)
logical_id = generator.gen()
return logical_id
def _construct_role(self, resource, permissions_boundary=None, prefix=None, suffix=""): # type: ignore[no-untyped-def]
"""Constructs the IAM Role resource allowing the event service to invoke
the StartExecution API of the state machine resource it is associated with.
:param model.stepfunctions.StepFunctionsStateMachine resource: The state machine resource associated with the event
:param string permissions_boundary: The ARN of the policy used to set the permissions boundary for the role
:param string prefix: Prefix to use for the logical ID of the IAM role
:param string suffix: Suffix to add for the logical ID of the IAM role
:returns: the IAM Role resource
:rtype: model.iam.IAMRole
"""
role_logical_id = self._generate_logical_id(prefix=prefix, suffix=suffix, resource_type="Role") # type: ignore[no-untyped-call]
event_role = IAMRole(role_logical_id, attributes=resource.get_passthrough_resource_attributes())
event_role.AssumeRolePolicyDocument = IAMRolePolicies.construct_assume_role_policy_for_service_principal( # type: ignore[no-untyped-call]
self.principal
)
state_machine_arn = resource.get_runtime_attr("arn")
event_role.Policies = [
IAMRolePolicies.step_functions_start_execution_role_policy(state_machine_arn, role_logical_id) # type: ignore[no-untyped-call]
]
if permissions_boundary:
event_role.PermissionsBoundary = permissions_boundary
return event_role
class Schedule(EventSource):
"""Scheduled executions for SAM State Machine."""
resource_type = "Schedule"
principal = "events.amazonaws.com"
property_types = {
"Schedule": PropertyType(True, is_str()),
"Input": PropertyType(False, is_str()),
"Enabled": PropertyType(False, is_type(bool)),
"State": PropertyType(False, is_str()),
"Name": PropertyType(False, is_str()),
"Description": PropertyType(False, is_str()),
"DeadLetterConfig": PropertyType(False, is_type(dict)),
"RetryPolicy": PropertyType(False, is_type(dict)),
"Target": Property(False, is_type(dict)),
}
@cw_timer(prefix=SFN_EVETSOURCE_METRIC_PREFIX)
def to_cloudformation(self, resource, **kwargs): # type: ignore[no-untyped-def]
"""Returns the EventBridge Rule and IAM Role to which this Schedule event source corresponds.
:param dict kwargs: no existing resources need to be modified
:returns: a list of vanilla CloudFormation Resources, to which this Schedule event expands
:rtype: list
"""
resources = []
permissions_boundary = kwargs.get("permissions_boundary")
passthrough_resource_attributes = resource.get_passthrough_resource_attributes()
events_rule = EventsRule(self.logical_id, attributes=passthrough_resource_attributes)
resources.append(events_rule)
events_rule.ScheduleExpression = self.Schedule # type: ignore[attr-defined]
if self.State and self.Enabled is not None: # type: ignore[attr-defined, attr-defined]
raise InvalidEventException(self.relative_id, "State and Enabled Properties cannot both be specified.")
if self.State: # type: ignore[attr-defined]
events_rule.State = self.State # type: ignore[attr-defined]
if self.Enabled is not None: # type: ignore[attr-defined]
events_rule.State = "ENABLED" if self.Enabled else "DISABLED" # type: ignore[attr-defined]
events_rule.Name = self.Name # type: ignore[attr-defined]
events_rule.Description = self.Description # type: ignore[attr-defined]
role = self._construct_role(resource, permissions_boundary) # type: ignore[no-untyped-call]
resources.append(role)
source_arn = events_rule.get_runtime_attr("arn")
dlq_queue_arn = None
if self.DeadLetterConfig is not None: # type: ignore[attr-defined]
EventBridgeRuleUtils.validate_dlq_config(self.logical_id, self.DeadLetterConfig) # type: ignore[attr-defined, no-untyped-call]
dlq_queue_arn, dlq_resources = EventBridgeRuleUtils.get_dlq_queue_arn_and_resources( # type: ignore[no-untyped-call]
self, source_arn, passthrough_resource_attributes
)
resources.extend(dlq_resources)
events_rule.Targets = [self._construct_target(resource, role, dlq_queue_arn)] # type: ignore[no-untyped-call]
return resources
def _construct_target(self, resource, role, dead_letter_queue_arn=None): # type: ignore[no-untyped-def]
"""Constructs the Target property for the EventBridge Rule.
:returns: the Target property
:rtype: dict
"""
target_id = (
self.Target["Id"] if self.Target and "Id" in self.Target else self.logical_id + "StepFunctionsTarget"
)
target = {
"Arn": resource.get_runtime_attr("arn"),
"Id": target_id,
"RoleArn": role.get_runtime_attr("arn"),
}
if self.Input is not None: # type: ignore[attr-defined]
target["Input"] = self.Input # type: ignore[attr-defined]
if self.DeadLetterConfig is not None: # type: ignore[attr-defined]
target["DeadLetterConfig"] = {"Arn": dead_letter_queue_arn}
if self.RetryPolicy is not None: # type: ignore[attr-defined]
target["RetryPolicy"] = self.RetryPolicy # type: ignore[attr-defined]
return target
class CloudWatchEvent(EventSource):
"""CloudWatch Events/EventBridge event source for SAM State Machine."""
resource_type = "CloudWatchEvent"
principal = "events.amazonaws.com"
property_types = {
"EventBusName": PropertyType(False, is_str()),
"RuleName": PropertyType(False, is_str()),
"Pattern": PropertyType(False, is_type(dict)),
"Input": PropertyType(False, is_str()),
"InputPath": PropertyType(False, is_str()),
"DeadLetterConfig": PropertyType(False, is_type(dict)),
"RetryPolicy": PropertyType(False, is_type(dict)),
"State": PropertyType(False, is_str()),
"Target": Property(False, is_type(dict)),
}
@cw_timer(prefix=SFN_EVETSOURCE_METRIC_PREFIX)
def to_cloudformation(self, resource, **kwargs): # type: ignore[no-untyped-def]
"""Returns the CloudWatch Events/EventBridge Rule and IAM Role to which this
CloudWatch Events/EventBridge event source corresponds.
:param dict kwargs: no existing resources need to be modified
:returns: a list of vanilla CloudFormation Resources, to which this CloudWatch Events/EventBridge event expands
:rtype: list
"""
resources = []
permissions_boundary = kwargs.get("permissions_boundary")
passthrough_resource_attributes = resource.get_passthrough_resource_attributes()
events_rule = EventsRule(self.logical_id, attributes=passthrough_resource_attributes)
events_rule.EventBusName = self.EventBusName # type: ignore[attr-defined]
events_rule.EventPattern = self.Pattern # type: ignore[attr-defined]
events_rule.Name = self.RuleName # type: ignore[attr-defined]
if self.State: # type: ignore[attr-defined]
events_rule.State = self.State # type: ignore[attr-defined]
resources.append(events_rule)
role = self._construct_role(resource, permissions_boundary) # type: ignore[no-untyped-call]
resources.append(role)
source_arn = events_rule.get_runtime_attr("arn")
dlq_queue_arn = None
if self.DeadLetterConfig is not None: # type: ignore[attr-defined]
EventBridgeRuleUtils.validate_dlq_config(self.logical_id, self.DeadLetterConfig) # type: ignore[attr-defined, no-untyped-call]
dlq_queue_arn, dlq_resources = EventBridgeRuleUtils.get_dlq_queue_arn_and_resources( # type: ignore[no-untyped-call]
self, source_arn, passthrough_resource_attributes
)
resources.extend(dlq_resources)
events_rule.Targets = [self._construct_target(resource, role, dlq_queue_arn)] # type: ignore[no-untyped-call]
return resources
def _construct_target(self, resource, role, dead_letter_queue_arn=None): # type: ignore[no-untyped-def]
"""Constructs the Target property for the CloudWatch Events/EventBridge Rule.
:returns: the Target property
:rtype: dict
"""
target_id = (
self.Target["Id"] if self.Target and "Id" in self.Target else self.logical_id + "StepFunctionsTarget"
)
target = {
"Arn": resource.get_runtime_attr("arn"),
"Id": target_id,
"RoleArn": role.get_runtime_attr("arn"),
}
if self.Input is not None: # type: ignore[attr-defined]
target["Input"] = self.Input # type: ignore[attr-defined]
if self.InputPath is not None: # type: ignore[attr-defined]
target["InputPath"] = self.InputPath # type: ignore[attr-defined]
if self.DeadLetterConfig is not None: # type: ignore[attr-defined]
target["DeadLetterConfig"] = {"Arn": dead_letter_queue_arn}
if self.RetryPolicy is not None: # type: ignore[attr-defined]
target["RetryPolicy"] = self.RetryPolicy # type: ignore[attr-defined]
return target
class EventBridgeRule(CloudWatchEvent):
"""EventBridge Rule event source for SAM State Machine."""
resource_type = "EventBridgeRule"
class Api(EventSource):
"""Api method event source for SAM State Machines."""
resource_type = "Api"
principal = "apigateway.amazonaws.com"
property_types = {
"Path": PropertyType(True, is_str()),
"Method": PropertyType(True, is_str()),
# Api Event sources must "always" be paired with a Serverless::Api
"RestApiId": PropertyType(True, is_str()),
"Stage": PropertyType(False, is_str()),
"Auth": PropertyType(False, is_type(dict)),
"UnescapeMappingTemplate": Property(False, is_type(bool)),
}
UnescapeMappingTemplate: Optional[bool]
def resources_to_link(self, resources): # type: ignore[no-untyped-def]
"""
If this API Event Source refers to an explicit API resource, resolve the reference and grab
necessary data from the explicit API
"""
# If RestApiId is a resource in the same template, then we try find the StageName by following the reference
# Otherwise we default to a wildcard. This stage name is solely used to construct the permission to
# allow this stage to invoke the State Machine. If we are unable to resolve the stage name, we will
# simply permit all stages to invoke this State Machine
# This hack is necessary because customers could use !ImportValue, !Ref or other intrinsic functions which
# can be sometimes impossible to resolve (ie. when it has cross-stack references)
permitted_stage = "*"
stage_suffix = "AllStages"
explicit_api = None
rest_api_id = PushApi.get_rest_api_id_string(self.RestApiId) # type: ignore[attr-defined, no-untyped-call]
if isinstance(rest_api_id, str):
if (
rest_api_id in resources
and "Properties" in resources[rest_api_id]
and "StageName" in resources[rest_api_id]["Properties"]
):
explicit_api = resources[rest_api_id]["Properties"]
permitted_stage = explicit_api["StageName"]
# Stage could be a intrinsic, in which case leave the suffix to default value
if isinstance(permitted_stage, str):
stage_suffix = permitted_stage
else:
stage_suffix = "Stage" # type: ignore[unreachable]
else:
# RestApiId is a string, not an intrinsic, but we did not find a valid API resource for this ID
raise InvalidEventException(
self.relative_id,
"RestApiId property of Api event must reference a valid resource in the same template.",
)
return {"explicit_api": explicit_api, "explicit_api_stage": {"suffix": stage_suffix}}
@cw_timer(prefix=SFN_EVETSOURCE_METRIC_PREFIX)
def to_cloudformation(self, resource, **kwargs): # type: ignore[no-untyped-def]
"""If the Api event source has a RestApi property, then simply return the IAM role resource
allowing API Gateway to start the state machine execution. If no RestApi is provided, then
additionally inject the path, method, and the x-amazon-apigateway-integration into the
Swagger body for a provided implicit API.
:param model.stepfunctions.resources.StepFunctionsStateMachine resource; the state machine \
resource to which the Api event source must be associated
:param dict kwargs: a dict containing the implicit RestApi to be modified, should no \
explicit RestApi be provided.
:returns: a list of vanilla CloudFormation Resources, to which this Api event expands
:rtype: list
"""
resources = []
intrinsics_resolver = kwargs.get("intrinsics_resolver")
permissions_boundary = kwargs.get("permissions_boundary")
if self.Method is not None: # type: ignore[has-type]
# Convert to lower case so that user can specify either GET or get
self.Method = self.Method.lower() # type: ignore[has-type]
role = self._construct_role(resource, permissions_boundary) # type: ignore[no-untyped-call]
resources.append(role)
explicit_api = kwargs["explicit_api"]
if explicit_api.get("__MANAGE_SWAGGER"):
self._add_swagger_integration(explicit_api, resource, role, intrinsics_resolver) # type: ignore[no-untyped-call]
return resources
def _add_swagger_integration(self, api, resource, role, intrinsics_resolver): # type: ignore[no-untyped-def]
"""Adds the path and method for this Api event source to the Swagger body for the provided RestApi.
:param model.apigateway.ApiGatewayRestApi rest_api: the RestApi to which the path and method should be added.
"""
swagger_body = api.get("DefinitionBody")
if swagger_body is None:
return
integration_uri = fnSub("arn:${AWS::Partition}:apigateway:${AWS::Region}:states:action/StartExecution")
editor = SwaggerEditor(swagger_body)
if editor.has_integration(self.Path, self.Method): # type: ignore[attr-defined]
# Cannot add the integration, if it is already present
raise InvalidEventException(
self.relative_id,
'API method "{method}" defined multiple times for path "{path}".'.format(
method=self.Method, path=self.Path # type: ignore[attr-defined]
),
)
condition = None
if CONDITION in resource.resource_attributes:
condition = resource.resource_attributes[CONDITION]
request_template = (
self._generate_request_template_unescaped(resource)
if self.UnescapeMappingTemplate
else self._generate_request_template(resource)
)
editor.add_state_machine_integration( # type: ignore[no-untyped-call]
self.Path, # type: ignore[attr-defined]
self.Method,
integration_uri,
role.get_runtime_attr("arn"),
request_template,
condition=condition,
)
# Note: Refactor and combine the section below with the Api eventsource for functions
if self.Auth: # type: ignore[attr-defined]
method_authorizer = self.Auth.get("Authorizer") # type: ignore[attr-defined]
api_auth = api.get("Auth")
api_auth = intrinsics_resolver.resolve_parameter_refs(api_auth)
if method_authorizer:
api_authorizers = api_auth and api_auth.get("Authorizers")
if method_authorizer != "AWS_IAM":
if method_authorizer != "NONE" and not api_authorizers:
raise InvalidEventException(
self.relative_id,
"Unable to set Authorizer [{authorizer}] on API method [{method}] for path [{path}] "
"because the related API does not define any Authorizers.".format(
authorizer=method_authorizer, method=self.Method, path=self.Path # type: ignore[attr-defined]
),
)
if method_authorizer != "NONE" and not api_authorizers.get(method_authorizer):
raise InvalidEventException(
self.relative_id,
"Unable to set Authorizer [{authorizer}] on API method [{method}] for path [{path}] "
"because it wasn't defined in the API's Authorizers.".format(
authorizer=method_authorizer, method=self.Method, path=self.Path # type: ignore[attr-defined]
),
)
if method_authorizer == "NONE":
if not api_auth or not api_auth.get("DefaultAuthorizer"):
raise InvalidEventException(
self.relative_id,
"Unable to set Authorizer on API method [{method}] for path [{path}] because 'NONE' "
"is only a valid value when a DefaultAuthorizer on the API is specified.".format(
method=self.Method, path=self.Path # type: ignore[attr-defined]
),
)
if self.Auth.get("AuthorizationScopes") and not isinstance(self.Auth.get("AuthorizationScopes"), list): # type: ignore[attr-defined]
raise InvalidEventException(
self.relative_id,
"Unable to set Authorizer on API method [{method}] for path [{path}] because "
"'AuthorizationScopes' must be a list of strings.".format(method=self.Method, path=self.Path), # type: ignore[attr-defined]
)
apikey_required_setting = self.Auth.get("ApiKeyRequired") # type: ignore[attr-defined]
apikey_required_setting_is_false = apikey_required_setting is not None and not apikey_required_setting
if apikey_required_setting_is_false and (not api_auth or not api_auth.get("ApiKeyRequired")):
raise InvalidEventException(
self.relative_id,
"Unable to set ApiKeyRequired [False] on API method [{method}] for path [{path}] "
"because the related API does not specify any ApiKeyRequired.".format(
method=self.Method, path=self.Path # type: ignore[attr-defined]
),
)
if method_authorizer or apikey_required_setting is not None:
editor.add_auth_to_method(api=api, path=self.Path, method_name=self.Method, auth=self.Auth) # type: ignore[attr-defined, attr-defined, no-untyped-call]
if self.Auth.get("ResourcePolicy"): # type: ignore[attr-defined]
resource_policy = self.Auth.get("ResourcePolicy") # type: ignore[attr-defined]
editor.add_resource_policy(resource_policy=resource_policy, path=self.Path, stage=self.Stage) # type: ignore[attr-defined, attr-defined, no-untyped-call]
if resource_policy.get("CustomStatements"):
editor.add_custom_statements(resource_policy.get("CustomStatements")) # type: ignore[no-untyped-call]
api["DefinitionBody"] = editor.swagger
def _generate_request_template(self, resource: Resource) -> Dict[str, Any]:
"""Generates the Body mapping request template for the Api. This allows for the input
request to the Api to be passed as the execution input to the associated state machine resource.
:param model.stepfunctions.resources.StepFunctionsStateMachine resource; the state machine
resource to which the Api event source must be associated
:returns: a body mapping request which passes the Api input to the state machine execution
:rtype: dict
"""
request_templates = {
"application/json": fnSub(
json.dumps(
{
"input": "$util.escapeJavaScript($input.json('$'))",
"stateMachineArn": "${" + resource.logical_id + "}",
}
)
)
}
return request_templates
def _generate_request_template_unescaped(self, resource: Resource) -> Dict[str, Any]:
"""Generates the Body mapping request template for the Api. This allows for the input
request to the Api to be passed as the execution input to the associated state machine resource.
Unescapes single quotes such that it's valid JSON.
:param model.stepfunctions.resources.StepFunctionsStateMachine resource; the state machine
resource to which the Api event source must be associated
:returns: a body mapping request which passes the Api input to the state machine execution
:rtype: dict
"""
request_templates = {
"application/json": fnSub(
# Need to unescape single quotes escaped by escapeJavaScript.
# Also the mapping template isn't valid JSON, so can't use json.dumps().
# See https://docs.aws.amazon.com/apigateway/latest/developerguide/api-gateway-mapping-template-reference.html#util-template-reference
"""{"input": "$util.escapeJavaScript($input.json('$')).replaceAll("\\\\'","'")", "stateMachineArn": "${"""
+ resource.logical_id
+ """}"}"""
)
}
return request_templates