Skip to content

Commit

Permalink
refactor!: Pre-release Refactor (#5)
Browse files Browse the repository at this point in the history
* feat: init split

* feat: add JSON support to base64

* fix: remove split

* feat: add JSON object array parsing

* feat: add new processing patterns to Copy

* feat: add new processing patterns to Copy

* feat: add new processing patterns to Capture

* feat: add new processing patterns to Case

* fix: Case description

* feat: add new processing patterns to Concat

* feat: add benchmark to Concat

* feat: add new processing patterns to Convert

* docs: update Copy comments

* docs: update Count comments

* test: add benchmarks to Count

* feat: add new processing patterns to Delete

* docs: update comments

* feat: add new processing patterns to Domain

* feat: add new processing patterns to Drop

* feat: add new processing patterns to DynamoDB

* feat: add new processing patterns to Expand

* feat: add new processing patterns to Flatten

* feat: add new processing patterns to Gzip

* feat: add new processing patterns to Hash

* feat: add new processing patterns to Insert

* feat: add new processing patterns to Lambda

* feat: add new processing patterns to Math

* feat: add new processing patterns to Replace

* feat: add new processing patterns to Time

* feat: add new processing patterns to Zip

* docs: update comments

* refactor!: rename Zip to Group

* test: update api

* docs: updated process README

* feat: add new processing patterns to Conditions

* feat!: add new processing patterns to Sinks

* feat: add new processing patterns to Transforms

* docs: updated documentation

* refactor!: channeler to slicer, mapstruct to json

* chore: fix lines

* fix: correct libsonnets

* docs: update comments

* refactor: sinks

* refactor: make xray optional

* fix: broken capture output

* tests: add ddb and lambda tests

* refactor!: update sink configs

* feat: add division to math processor

* feat: improved shard redistribution

* refactor: update example deployment

* refactor!: remove nano, add epoch output

* refactor!: simplified lambda input

* chore: doc cleanup

* test: fix broken lambda test

* refactor: improve err messages

* refactor: replace random with uuid

* refactor!: simplified input / output

* refactor!: updated libsonnets

* fix: terraform image versions

* docs: updated README

* refactor!: update keys to match condition

* docs: Jsonnet config docs
  • Loading branch information
jshlbrd committed Jun 17, 2022
1 parent cc76318 commit c89ced4
Show file tree
Hide file tree
Showing 134 changed files with 5,660 additions and 4,675 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -153,12 +153,12 @@ From the project root, run the commands below to create a sample events file and

```bash
$ echo '{"hello":"world"}' >> quickstart.json
$ ./cmd/file/substation/substation -input /tmp/quickstart.json -config config/quickstart/config.json
$ ./cmd/file/substation/substation -input quickstart.json -config config/quickstart/config.json
```

### Step 4: Test Substation in AWS

Navigate to the [build](build/) directory and review the `terraform`, `container`, and `config` documentation. [build/terraform/aws/example_pipeline.tf](build/terraform/aws/example_pipeline.tf.bak) is a fully-featured data pipeline that can be used as an example of how to deploy pipelines in AWS.
Navigate to the [build](build/) directory and review the `terraform`, `container`, and `config` documentation. [build/terraform/aws/](build/terraform/aws/) contains a fully-featured data pipeline that can be used as an example of how to deploy pipelines in AWS.

## Additional Documentation

Expand Down
61 changes: 32 additions & 29 deletions build/terraform/aws/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,27 +33,22 @@ Substation Lambdas use AWS X-Ray for performance monitoring and tuning. If the A

Read more about AWS X-Ray [here](https://aws.amazon.com/xray/).

### autoscaling.tf.bak
### autoscaling.tf

(Note: this file must be applied after running `bootstrap.tf` and `xray.tf`.)
This file includes everything required to deploy the AWS Lambda autoscaling application (`cmd/aws/lambda/autoscaling`). This is required for any data pipelines that use Kinesis Data Streams.

This file includes everything required to deploy the AWS Lambda autoscaling application (`cmd/aws/lambda/kinesis_autoscaling`). This is required for any data pipelines that use Kinesis Data Streams.
### example_*.tf

### example_pipeline.tf.bak
These files include a fully-featured data pipeline that makes use of every Substation component as an example of a "best practice" deployment. This includes:

(Note: this file must be applied after running `bootstrap.tf` and `xray.tf`.)

This file includes a fully-featured data pipeline that makes use of every Substation component as an example of a "best practice" deployment. This includes:

- ability to ingest data pulled from an S3 bucket
- ability to ingest data pushed from an API Gateway
- ability to ingest data pushed from a Kinesis client
- ingest data from an S3 bucket
- ingest data from multiple API Gateways
- raw Kinesis stream for storage and access to unmodified, pre-processed data
- processed Kinesis stream for storage and access to transformed, processed data
- data processing Lambda that reads from the raw Kinesis stream, processes data, and writes to the processed Kinesis stream
- write once, read many (WORM) S3 bucket for long-term storage of raw data (useful for compliance purposes or data lakes)
- S3 bucket for short-term storage of processed data (useful for data analytics)
- DynamoDB table for short-term or long-term storage of metadata (useful for correlation inside a data pipeline or across data pipelines)
- data processor Lambda that reads from the raw Kinesis stream, processes data, and writes to the processed Kinesis stream
- load raw data to an S3 bucket
- load processed data to an S3 bucket
- load metadata to a DynamoDB table

This data pipeline can be visualized like this:

Expand All @@ -63,34 +58,34 @@ graph TD
dynamodb_table(Metadata DynamoDB Table)
gateway_kinesis(HTTPS Endpoint)
gateway(HTTPS Endpoint)
kinesis_raw(Raw Data Kinesis Store)
kinesis_processed(Processed Data Kinesis Store)
kinesis_raw(Raw Data Kinesis Stream)
kinesis_processed(Processed Data Kinesis Stream)
s3_source_bucket(S3 Data Storage)
s3_worm_bucket(WORM / Data Lake S3 Storage)
s3_sink_bucket(Data Warehouse S3 Storage)
s3_lake_bucket(Data Lake S3 Storage)
s3_warehouse_bucket(Data Warehouse S3 Storage)
%% Lambda data processing
dynamodb_lambda[Metadata Processing Lambda]
gateway_lambda[HTTPS Ingest Lambda]
kinesis_lambda[Data Processing Lambda]
dynamodb_lambda[DynamoDB Sink Lambda]
gateway_lambda[Gateway Source Lambda]
kinesis_lambda[Processor Lambda]
enrichment_lambda[Data Enrichment Lambda]
s3_sink_lambda[Data Warehouse Processing Lambda]
s3_source_lambda[Data Storage Ingest Lambda]
s3_worm_lambda[WORM Processing Lambda]
s3_warehouse_sink_lambda[S3 Sink Lambda]
s3_source_lambda[S3 Source Lambda]
s3_lake_sink_lambda[S3 Sink Lambda]
%% ingest
gateway ---|Push| gateway_lambda ---|Push| kinesis_raw
gateway_kinesis ---|Push| kinesis_raw
s3_source_bucket ---|Pull| s3_source_lambda ---|Push| kinesis_raw
kinesis_raw ---|Pull| s3_worm_lambda ---|Push| s3_worm_bucket
kinesis_raw ---|Pull| s3_lake_sink_lambda ---|Push| s3_lake_bucket
%% transform
kinesis_raw ---|Pull| kinesis_lambda ---|Push| kinesis_processed
kinesis_lambda ---|Pull| dynamodb_table
kinesis_lambda ---|Pull| enrichment_lambda
kinesis_lambda ---|Invoke| enrichment_lambda
%% load
kinesis_processed ---|Pull| s3_sink_lambda ---|Push| s3_sink_bucket
kinesis_processed ---|Pull| s3_warehouse_sink_lambda ---|Push| s3_warehouse_bucket
kinesis_processed ---|Pull| dynamodb_lambda ---|Push| dynamodb_table
```

Expand Down Expand Up @@ -127,6 +122,14 @@ There are two things to be aware of when deploying new image repositories:

Read more about ECR [here](https://aws.amazon.com/ecr/).

### Event Bridge

#### Lambda

This module is used to create Event Bridge rules that trigger a Lambda.

Read more about Event Bridge [here](https://aws.amazon.com/eventbridge/).

### IAM

This module is used to provide default Identity and Access Management (IAM) policies for the most commonly used permissions. We use this naming convention: [AWS service]\_[read|write|modify]\_policy. For example, the `kinesis_read_policy` grants all the permissions required to read from a provided Kinesis stream.
Expand Down Expand Up @@ -155,7 +158,7 @@ Read more about the Key Management Service [here](https://aws.amazon.com/kms/).

This module is used to create and manage Lambda, which is the recommended service for data processing. At release, the Lambda Substation app (`cmd/aws/lambda/substation`) supports these Lambda triggers:

- API Gateway (REST)
- API Gateway
- Kinesis Data Streams
- SNS via S3
- S3
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,108 +2,117 @@

resource "aws_sns_topic" "autoscaling_topic" {
name = "substation_autoscaling"
kms_master_key_id = module.substation_kms.key_id
kms_master_key_id = module.kms_substation.key_id
tags = {
"Owner" : "substation_example",
"Owner" : "example",
}
}

# required for reading autoscaling configurations from AppConfig
module "autoscaling_appconfig_read" {
source = "./modules/iam"
resources = ["${aws_appconfig_application.substation.arn}/*"]
}
# first runs of this Terraform will fail due to an empty ECR image
module "lambda_autoscaling" {
source = "./modules/lambda"
function_name = "substation_autoscaling"
description = "Autoscales Kinesis streams based on data volume and size"
appconfig_id = aws_appconfig_application.substation.id
kms_arn = module.kms_substation.arn
image_uri = "${module.ecr_autoscaling.repository_url}:latest"
architectures = ["arm64"]

# required for updating shard counts on Kinesis streams
# resources can be isolated, but defaults to all streams
module "autoscaling_kinesis_modify" {
source = "./modules/iam"
resources = ["*"]
}
env = {
AWS_APPCONFIG_EXTENSION_PREFETCH_LIST : "/applications/substation/environments/prod/configurations/substation_autoscaling"
}
tags = {
"Owner" : "example",
}

# required for reading active shard counts for Kinesis streams
# resources can be isolated, but defaults to all streams
module "autoscaling_kinesis_read" {
source = "./modules/iam"
resources = ["*"]
depends_on = [
aws_appconfig_application.substation,
module.ecr_autoscaling.repository_url,
]
}

# required for resetting CloudWatch alarm states
# resources can be isolated, but defaults to all streams
module "autoscaling_cloudwatch_modify" {
source = "./modules/iam"
resources = ["*"]
}
resource "aws_sns_topic_subscription" "autoscaling_subscription" {
topic_arn = aws_sns_topic.autoscaling_topic.arn
protocol = "lambda"
endpoint = module.lambda_autoscaling.arn

# required for updating CloudWatch alarm variables
# resources can be isolated, but defaults to all streams
module "autoscaling_cloudwatch_write" {
source = "./modules/iam"
resources = ["*"]
depends_on = [
module.lambda_autoscaling.name
]
}

# first runs of this Terraform will fail due to an empty ECR image
module "autoscaling_lambda" {
source = "./modules/lambda"
function_name = "substation_autoscaling"
description = "Autoscales Kinesis streams based on data volume and size"
appconfig_id = aws_appconfig_application.substation.id
kms_arn = module.substation_kms.arn
resource "aws_lambda_permission" "autoscaling_invoke" {
statement_id = "AllowExecutionFromSNS"
action = "lambda:InvokeFunction"
function_name = module.lambda_autoscaling.name
principal = "sns.amazonaws.com"
source_arn = aws_sns_topic.autoscaling_topic.arn

env = {
AWS_APPCONFIG_EXTENSION_PREFETCH_LIST : "/applications/substation/environments/prod/configurations/autoscaling"
}
tags = {
"Owner" : "substation_example",
}
image_uri = "${module.autoscaling_ecr.repository_url}:latest"
depends_on = [
module.lambda_autoscaling.name
]
}

module "autoscaling_appconfig_read_attachment" {
source = "./modules/iam_attachment"
id = "substation_autoscaling_appconfig_read_attachment"
policy = module.autoscaling_appconfig_read.appconfig_read_policy
roles = [module.autoscaling_lambda.role]
# required for updating shard counts on Kinesis streams
# resources can be isolated, but defaults to all streams
module "autoscaling_kinesis_modify" {
source = "./modules/iam"
resources = ["*"]
}

module "autoscaling_kinesis_modify_attachment" {
source = "./modules/iam_attachment"
id = "substation_autoscaling_kinesis_modify_attachment"
policy = module.autoscaling_kinesis_modify.kinesis_modify_policy
roles = [module.autoscaling_lambda.role]
roles = [
module.lambda_autoscaling.role,
]
}

# required for reading active shard counts for Kinesis streams
# resources can be isolated, but defaults to all streams
module "autoscaling_kinesis_read" {
source = "./modules/iam"
resources = ["*"]
}

module "autoscaling_kinesis_read_attachment" {
source = "./modules/iam_attachment"
id = "substation_autoscaling_kinesis_read_attachment"
policy = module.autoscaling_kinesis_read.kinesis_read_policy
roles = [module.autoscaling_lambda.role]
roles = [
module.lambda_autoscaling.role,
]
}

# required for resetting CloudWatch alarm states
# resources can be isolated, but defaults to all streams
module "autoscaling_cloudwatch_modify" {
source = "./modules/iam"
resources = ["*"]
}

module "autoscaling_cloudwatch_modify_attachment" {
source = "./modules/iam_attachment"
id = "substation_autoscaling_cloudwatch_modify_attachment"
policy = module.autoscaling_cloudwatch_modify.cloudwatch_modify_policy
roles = [module.autoscaling_lambda.role]
roles = [
module.lambda_autoscaling.role,
]
}

# required for updating CloudWatch alarm variables
# resources can be isolated, but defaults to all streams
module "autoscaling_cloudwatch_write" {
source = "./modules/iam"
resources = ["*"]
}

module "autoscaling_cloudwatch_write_attachment" {
source = "./modules/iam_attachment"
id = "substation_autoscaling_cloudwatch_write_attachment"
policy = module.autoscaling_cloudwatch_write.cloudwatch_write_policy
roles = [module.autoscaling_lambda.role]
}

resource "aws_lambda_permission" "autoscaling_invoke" {
statement_id = "AllowExecutionFromSNS"
action = "lambda:InvokeFunction"
function_name = module.autoscaling_lambda.name
principal = "sns.amazonaws.com"
source_arn = aws_sns_topic.autoscaling_topic.arn
}

resource "aws_sns_topic_subscription" "autoscaling_subscription" {
topic_arn = aws_sns_topic.autoscaling_topic.arn
protocol = "lambda"
endpoint = module.autoscaling_lambda.arn
roles = [
module.lambda_autoscaling.role,
]
}
18 changes: 7 additions & 11 deletions build/terraform/aws/bootstrap.tf
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
# Initializes deployment-wide Substation infrastructure.

data "aws_region" "region" {}

data "aws_caller_identity" "caller" {}

# KMS encryption key that is shared by all Substation infrastructure
module "substation_kms" {
module "kms_substation" {
source = "./modules/kms"
name = "alias/substation"
policy = <<POLICY
Expand Down Expand Up @@ -42,14 +38,14 @@ resource "aws_appconfig_application" "substation" {
description = "Stores compiled configuration files for Substation"
}

# use the prod environment for production pipelines
# use the prod environment for production resources
resource "aws_appconfig_environment" "prod" {
name = "prod"
description = "Stores production Substation configuration files"
application_id = aws_appconfig_application.substation.id
}

# use the dev environment for development pipelines
# use the dev environment for development resources
resource "aws_appconfig_environment" "dev" {
name = "dev"
description = "Stores development Substation configuration files"
Expand All @@ -69,15 +65,15 @@ resource "aws_appconfig_deployment_strategy" "instant" {
}

# repository for the core Substation app
module "substation_ecr" {
module "ecr_substation" {
source = "./modules/ecr"
name = "substation"
kms_arn = module.substation_kms.arn
kms_arn = module.kms_substation.arn
}

# repository for the autoscaling app
module "autoscaling_ecr" {
module "ecr_autoscaling" {
source = "./modules/ecr"
name = "substation_autoscaling"
kms_arn = module.substation_kms.arn
kms_arn = module.kms_substation.arn
}
24 changes: 24 additions & 0 deletions build/terraform/aws/example_appconfig.tf
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
################################################
# appconfig permissions
# all Lambda must have this policy
################################################

module "iam_example_appconfig_read" {
source = "./modules/iam"
resources = ["${aws_appconfig_application.substation.arn}/*"]
}

module "iam_example_appconfig_read_attachment" {
source = "./modules/iam_attachment"
id = "substation_example_appconfig_read"
policy = module.iam_example_appconfig_read.appconfig_read_policy
roles = [
module.lambda_autoscaling.role,
module.lambda_example_processor.role,
module.lambda_example_dynamodb_sink.role,
module.lambda_example_raw_s3_sink.role,
module.lambda_example_processed_s3_sink.role,
module.lambda_example_gateway_source.role,
module.lambda_example_s3_source.role,
]
}
10 changes: 10 additions & 0 deletions build/terraform/aws/example_gateway_source.tf
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
################################################
# API Gateway
# sends data to raw Kinesis stream
################################################

module "example_gateway_kinesis" {
source = "./modules/api_gateway/kinesis"
name = "substation_kinesis_example"
stream = "substation_example_raw"
}
Loading

0 comments on commit c89ced4

Please sign in to comment.