This AWS Event Fork Pipelines app indexes events from the given Amazon SNS topic into an Amazon Elasticsearch Service domain for search and analytics, using an Amazon Kinesis Data Firehose stream.
- An Amazon SQS queue is subscribed to the given SNS Topic ARN with an optional subscription filter policy.
- An AWS Lambda function reads events from the SQS queue and publishes them to an Amazon Kinesis Data Firehose Delivery Stream, which saves them to the given Amazon Elasticsearch Service Domain.
- An optional data transformation Lambda function can be specified to transform the data prior to saving it to the Elasticsearch Service cluster.
- If for some reason Kinesis Data Firehose is not able to write events to the Amazon Elasticsearch Service Domain, the failed events are written to a backup Amazon S3 bucket.
This app is meant to be used as part of a larger application, so the recommended way to use it is to embed it as a nested app in your serverless application. To do this, visit the app's page on the AWS Lambda Console. Click the "Copy as SAM Resource" button and paste the copied YAML into your SAM template, filling in any required parameters. Alternatively, you can deploy the application into your account directly via the AWS Lambda Console.
TopicArn
(required) - The ARN of the SNS topic to which this instance of the pipeline should be subscribed.SearchDomainArn
(required) - The ARN of the Elasticsearch domain to be used. A domain is an Elasticsearch cluster in the AWS cloud, setting the compute and storage configuration needed. If you don’t enter any value, then a new domain with default configuration is created in your account.SearchIndexName
(required) - The name of the Elasticsearch index used for indexing the events, making them available for search and analytics. Max string length of 80, all lowecase, no special characters.SearchTypeName
(required) - The name of the Elasticsearch type used for organizing the events in an index. Max string length of 100, all lowercase, not starting with an underscore.SubscriptionFilterPolicy
(optional) - The SNS subscription filter policy, in JSON format, used for filtering the incoming events. The filter policy decides which events are processed by this pipeline. If you don’t enter any value, then no filtering is used, meaning all events are processed.SearchIndexRotationPeriod
(optional) - The rotation period of the Elasticsearch index. Index rotation appends a timestamp to the index name, facilitating the expiration of old data. Five options are available, namely NoRotation, OneHour, OneDay, OneWeek, and OneMonth. If you don’t enter any value, then option NoRotation is used.StreamRetryDurationInSeconds
(optional) - The retry duration for cases when the stream is unable to index events in the Elasticsearch index. If you don’t enter any value, then the pipeline sets 300 seconds.StreamPrefix
(optional) - The string prefix used for naming files stored in the S3 bucket. If you don’t enter any value, then no prefix is used.StreamCompressionFormat
(optional) - The format used for compressing the incoming events. Three options are available, namely GZIP, ZIP, and SNAPPY. If you don’t enter any value, then data compression is disabled.StreamBufferingIntervalInSeconds
(optional) - The amount of seconds for which the stream should buffer incoming events before delivering them to the destination. Any integer value from 60 to 900 seconds. If you don't enter any value, then 300 is used.StreamBufferingSizeInMBs
(optional) - The amount of data, in MB, that the stream should buffer before delivering them to the destination. Any integer value from 1 to 100. If you don't enter any value, then 5 is used.DataTransformationFunctionArn
(optional) - The ARN of the Lambda function used for transforming the incoming events. If you don’t enter any value, then data transformation is disabled.LogLevel
(optional) - The level used for logging the execution of the Lambda function that polls events from the SQS queue. Four options are available, namely DEBUG, INFO, WARNING, and ERROR. If you don’t enter any value, then INFO is used.
AnalyticsDeadLetterBucketName
- Dead-letter S3 bucket nameAnalyticsDeadLetterBucketArn
- Dead-letter S3 bucket ARNAnalyticsDomainName
- Analytics ES domain name (only output if this app created an ES domain)AnalyticsDomainArn
- Analytics ES domain ARN (only output if this app created an ES domain)AnalyticsDomainEndpoint
- Analytics ES domain endpoint (only output if this app created an ES domain)
This code is made available under a modified MIT license. See the LICENSE file.