Skip to content

Commit

Permalink
Terraform template updates for custom transformation (#1746)
Browse files Browse the repository at this point in the history
* terraform changes for end-to-end template

* terraform updates

* formatting fix
  • Loading branch information
shreyakhajanchi authored and dhercher committed Jul 30, 2024
1 parent bf81829 commit cec8533
Show file tree
Hide file tree
Showing 11 changed files with 92 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,10 @@ resource "google_dataflow_flex_template_job" "live_migration_job" {
directoryWatchDurationInMinutes = tostring(var.dataflow_params.template_params.directory_watch_duration_in_minutes)
spannerPriority = var.dataflow_params.template_params.spanner_priority
dlqGcsPubSubSubscription = var.dataflow_params.template_params.dlq_gcs_pub_sub_subscription
transformationJarPath = var.dataflow_params.template_params.transformation_jar_path
transformationClassName = var.dataflow_params.template_params.transformation_class_name
transformationCustomParameters = var.dataflow_params.template_params.transformation_custom_parameters
filteredEventsDirectory = var.dataflow_params.template_params.filtered_events_directory
}

# Additional Job Configurations
Expand All @@ -265,7 +269,7 @@ resource "google_dataflow_flex_template_job" "live_migration_job" {
service_account_email = var.dataflow_params.runner_params.service_account_email
skip_wait_on_job_termination = var.dataflow_params.runner_params.skip_wait_on_job_termination
staging_location = var.dataflow_params.runner_params.staging_location
subnetwork = var.common_params.host_project != null ? "https://www.googleapis.com/compute/v1/projects/${var.common_params.host_project}/regions/${var.common_params.region}/subnetworks/${var.dataflow_params.runner_params.subnetwork}" : "https://www.googleapis.com/compute/v1/projects/${var.common_params.project}/regions/${var.common_params.region}/subnetworks/${var.dataflow_params.runner_params.subnetwork}"
subnetwork = var.dataflow_params.runner_params.subnetwork != null ? var.common_params.host_project != null ? "https://www.googleapis.com/compute/v1/projects/${var.common_params.host_project}/regions/${var.common_params.region}/subnetworks/${var.dataflow_params.runner_params.subnetwork}" : "https://www.googleapis.com/compute/v1/projects/${var.common_params.project}/regions/${var.common_params.region}/subnetworks/${var.dataflow_params.runner_params.subnetwork}" : null
temp_location = var.dataflow_params.runner_params.temp_location
on_delete = var.dataflow_params.runner_params.on_delete
region = var.common_params.region
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,16 @@ datastream_params = {
stream_id = "mysql-stream" # Or provide a custom stream ID
max_concurrent_cdc_tasks = 50 # Adjust as needed
max_concurrent_backfill_tasks = 50 # Adjust as needed
mysql_databases = [
{
database = "<YOUR_DATABASE_NAME>"
tables = [] # List specific tables to replicate (optional)
}
# Add more database objects if needed
]
mysql_host = "<YOUR_MYSQL_HOST_IP_ADDRESS>"
# Use the Public IP if using IP allowlisting and Private IP if using
# private connectivity.
mysql_username = "<YOUR_MYSQL_USERNAME>"
mysql_password = "<YOUR_MYSQL_PASSWORD>"
mysql_port = 3306
mysql_database = {
database = "<YOUR_DATABASE_NAME>"
tables = [] # List specific tables to replicate (optional)
}
private_connectivity_id = "<YOUR_PRIVATE_CONNECTIVITY_ID>"
# Only one of `private_connectivity_id` or `private_connectivity` block
# may exist. Use `private_connectivity_id` to specify an existing
Expand Down Expand Up @@ -52,18 +55,22 @@ dataflow_params = {
dlq_retry_minutes = 10 # Adjust as needed
dlq_max_retry_count = 3 # Adjust as needed
datastream_root_url = "<YOUR_DATASTREAM_ROOT_URL>" # Base URL of your Datastream API (optional)
datastream_source_type = "MYSQL"
datastream_source_type = "mysql"
round_json_decimals = false
run_mode = "STREAMING"
run_mode = "regular"
transformation_context_file_path = "<YOUR_TRANSFORMATION_FILE_PATH>" # Path to your transformation file (optional)
directory_watch_duration_in_minutes = "5" # Adjust as needed
spanner_priority = "high"
dlq_gcs_pub_sub_subscription = "<YOUR_DLQ_PUBSUB_SUBSCRIPTION>" # Optional
spanner_priority = "HIGH"
dlq_gcs_pub_sub_subscription = "<YOUR_DLQ_PUBSUB_SUBSCRIPTION>" # Optional
transformation_jar_path = "<YOUR_CUSTOM_TRANSFORMATION_JAR_PATH>" # Optional
transformation_custom_parameters = "<YOUR_CUSTOM_PARAMETERS_FOR_JAR>" # Optional
transformation_class_name = "<YOUR_TRANSFORMATION_CLASS_NAME>" # Fully Classified Class Name(Optional)
filtered_events_directory = "<YOUR_GCS_PATH_FOR_FILTERED_EVENTS>" # Optional
}

runner_params = {
additional_experiments = [] # Add any additional experiments or leave empty
autoscaling_algorithm = "THROUGHPUT_BASED" # Or NONE
autoscaling_algorithm = "BASIC" # Or NONE
enable_streaming_engine = true # true or false
kms_key_name = "<YOUR_KMS_KEY_NAME>" # If you're using customer-managed encryption key
labels = {} # Add any labels you want
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,11 @@ datastream_params = {
mysql_username = "<YOUR_MYSQL_USERNAME>"
mysql_password = "<YOUR_MYSQL_PASSWORD>"
mysql_port = 3306
mysql_databases = [
{
database = "<YOUR_DATABASE_NAME>"
tables = []
# Optionally list specific tables, or remove "tables" all together for all tables
}
]
mysql_database = {
database = "<YOUR_DATABASE_NAME>"
tables = []
# Optionally list specific tables, or remove "tables" all together for all tables
}
private_connectivity_id = "<YOUR_PRIVATE_CONNECTIVITY_ID>"
# Only one of `private_connectivity_id` or `private_connectivity` block
# may exist. Use `private_connectivity_id` to specify an existing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ variable "dataflow_params" {
directory_watch_duration_in_minutes = optional(string)
spanner_priority = optional(string)
dlq_gcs_pub_sub_subscription = optional(string)
transformation_jar_path = optional(string)
transformation_custom_parameters = optional(string)
transformation_class_name = optional(string)
filtered_events_directory = optional(string)
})
runner_params = object({
additional_experiments = optional(set(string), [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,10 @@ resource "google_dataflow_flex_template_job" "live_migration_job" {
directoryWatchDurationInMinutes = tostring(var.common_params.dataflow_params.template_params.directory_watch_duration_in_minutes)
spannerPriority = var.common_params.dataflow_params.template_params.spanner_priority
dlqGcsPubSubSubscription = var.shard_list[count.index].dataflow_params.template_params.dlq_gcs_pub_sub_subscription
transformationJarPath = var.common_params.dataflow_params.template_params.transformation_jar_path
transformationClassName = var.common_params.dataflow_params.template_params.transformation_class_name
transformationCustomParameters = var.common_params.dataflow_params.template_params.transformation_custom_parameters
filteredEventsDirectory = var.common_params.dataflow_params.template_params.filtered_events_directory
}

# Additional Job Configurations
Expand All @@ -293,7 +297,7 @@ resource "google_dataflow_flex_template_job" "live_migration_job" {
service_account_email = var.common_params.dataflow_params.runner_params.service_account_email
skip_wait_on_job_termination = var.common_params.dataflow_params.runner_params.skip_wait_on_job_termination
staging_location = var.common_params.dataflow_params.runner_params.staging_location
subnetwork = var.common_params.host_project != null ? "https://www.googleapis.com/compute/v1/projects/${var.common_params.host_project}/regions/${var.common_params.region}/subnetworks/${var.common_params.dataflow_params.runner_params.subnetwork}" : "https://www.googleapis.com/compute/v1/projects/${var.common_params.project}/regions/${var.common_params.region}/subnetworks/${var.common_params.dataflow_params.runner_params.subnetwork}"
subnetwork = var.common_params.dataflow_params.runner_params.subnetwork != null ? var.common_params.host_project != null ? "https://www.googleapis.com/compute/v1/projects/${var.common_params.host_project}/regions/${var.common_params.region}/subnetworks/${var.common_params.dataflow_params.runner_params.subnetwork}" : "https://www.googleapis.com/compute/v1/projects/${var.common_params.project}/regions/${var.common_params.region}/subnetworks/${var.common_params.dataflow_params.runner_params.subnetwork}" : null
temp_location = var.common_params.dataflow_params.runner_params.temp_location
on_delete = var.common_params.dataflow_params.runner_params.on_delete
region = var.common_params.region
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,28 +30,32 @@ common_params = {

dataflow_params = {
template_params = {
shadow_table_prefix = "<YOUR_SHADOW_TABLE_PREFIX>" # Prefix for shadow tables (e.g., "shadow_")
create_shadow_tables = "<TRUE/FALSE>" # Whether to create shadow tables in Spanner
rfc_start_date_time = "<YOUR_RFC_START_DATETIME>" # RFC 3339 timestamp for the start of replication (optional)
file_read_concurrency = "<YOUR_CONCURRENCY>" # File read concurrency for Dataflow
spanner_project_id = "<YOUR_PROJECT_ID>" # GCP project ID for Spanner
spanner_instance_id = "<YOUR_SPANNER_INSTANCE_ID>" # Spanner instance ID
spanner_database_id = "<YOUR_SPANNER_DATABASE_ID>" # Spanner database ID
spanner_host = "<YOUR_SPANNER_HOST>" # Spanner host (typically "spanner.googleapis.com")
dlq_retry_minutes = "<YOUR_DLQ_RETRY_MINUTES>" # Retry interval for dead-letter queue messages (in minutes)
dlq_max_retry_count = "<YOUR_DLQ_MAX_RETRIES>" # Maximum retry count for dead-letter queue messages
datastream_root_url = "<YOUR_DATASTREAM_ROOT_URL>" # Datastream API root URL (typically "https://datastream.googleapis.com/v1")
datastream_source_type = "<YOUR_DATASTREAM_SOURCE_TYPE>" # Datastream source type (e.g., "MYSQL")
round_json_decimals = "<TRUE/FALSE>" # Whether to round JSON decimal values in Dataflow
directory_watch_duration_in_minutes = "<YOUR_WATCH_DURATION>" # Directory watch duration (in minutes) for Dataflow
spanner_priority = "<YOUR_SPANNER_PRIORITY>" # Spanner priority ("high", "medium", or "low")
local_session_file_path = "<YOUR_SESSION_FILE_PATH>" # Path to local session file (optional)
shadow_table_prefix = "<YOUR_SHADOW_TABLE_PREFIX>" # Prefix for shadow tables (e.g., "shadow_")
create_shadow_tables = "<TRUE/FALSE>" # Whether to create shadow tables in Spanner
rfc_start_date_time = "<YOUR_RFC_START_DATETIME>" # RFC 3339 timestamp for the start of replication (optional)
file_read_concurrency = "<YOUR_CONCURRENCY>" # File read concurrency for Dataflow
spanner_project_id = "<YOUR_PROJECT_ID>" # GCP project ID for Spanner
spanner_instance_id = "<YOUR_SPANNER_INSTANCE_ID>" # Spanner instance ID
spanner_database_id = "<YOUR_SPANNER_DATABASE_ID>" # Spanner database ID
spanner_host = "<YOUR_SPANNER_HOST>" # Spanner host (typically "spanner.googleapis.com")
dlq_retry_minutes = "<YOUR_DLQ_RETRY_MINUTES>" # Retry interval for dead-letter queue messages (in minutes)
dlq_max_retry_count = "<YOUR_DLQ_MAX_RETRIES>" # Maximum retry count for dead-letter queue messages
datastream_root_url = "<YOUR_DATASTREAM_ROOT_URL>" # Datastream API root URL (typically "https://datastream.googleapis.com/v1")
datastream_source_type = "<YOUR_DATASTREAM_SOURCE_TYPE>" # Datastream source type (e.g., "mysql")
round_json_decimals = "<TRUE/FALSE>" # Whether to round JSON decimal values in Dataflow
directory_watch_duration_in_minutes = "<YOUR_WATCH_DURATION>" # Directory watch duration (in minutes) for Dataflow
spanner_priority = "<YOUR_SPANNER_PRIORITY>" # Spanner priority ("HIGH", "MEDIUM", or "LOW")
local_session_file_path = "<YOUR_SESSION_FILE_PATH>" # Path to local session file (optional)
transformation_jar_path = "<YOUR_CUSTOM_TRANSFORMATION_JAR_PATH>" # GCS path to the custom transformation JAR(Optional)
transformation_custom_parameters = "<YOUR_CUSTOM_PARAMETERS_FOR_JAR>" # Custom parameters used by the transformation JAR(Optional)
transformation_class_name = "<YOUR_TRANSFORMATION_CLASS_NAME>" # Fully Classified Class Name(Optional)
filtered_events_directory = "<YOUR_GCS_PATH_FOR_FILTERED_EVENTS>" # GCS path to store the filtered events(Optional)
}

runner_params = {
additional_experiments = ["enable_google_cloud_profiler", "enable_stackdriver_agent_metrics",
"disable_runner_v2", "enable_google_cloud_heap_sampling"]
autoscaling_algorithm = "<YOUR_AUTOSCALING_ALGORITHM>" # e.g., "THROUGHPUT_BASED", "NONE"
autoscaling_algorithm = "<YOUR_AUTOSCALING_ALGORITHM>" # e.g., "BASIC", "NONE"
enable_streaming_engine = "<TRUE/FALSE>" # Whether to use Dataflow Streaming Engine
kms_key_name = "<YOUR_KMS_KEY_NAME>" # KMS key name for encryption (optional)
labels = { env = "<YOUR_ENVIRONMENT>" } # Labels for the Dataflow job
Expand All @@ -65,7 +69,7 @@ common_params = {
service_account_email = "<YOUR_SERVICE_ACCOUNT_EMAIL>" # Service account email for Dataflow
skip_wait_on_job_termination = "<TRUE/FALSE>" # Whether to skip waiting for job termination on deletion
staging_location = "gs://<YOUR_GCS_BUCKET>/staging" # GCS staging location for Dataflow
subnetwork = "<YOUR-FULL-PATH-SUBNETWORK" # Give the full path to the subnetwork
subnetwork = "<YOUR-FULL-PATH-SUBNETWORK>" # Give the full path to the subnetwork
temp_location = "gs://<YOUR_GCS_BUCKET>/temp" # GCS temp location for Dataflow
on_delete = "<YOUR_ON_DELETE_ACTION>" # Action on Dataflow job deletion ("cancel" or "drain")
ip_configuration = "<YOUR_IP_CONFIGURATION>" # IP configuration for Dataflow workers ("WORKER_IP_PRIVATE" or "WORKER_IP_PUBLIC")
Expand Down Expand Up @@ -94,7 +98,7 @@ shard_list = [

dataflow_params = {
template_params = {
run_mode = "<YOUR_RUN_MODE>" # Dataflow run mode ("streaming" or "daemon")
run_mode = "<YOUR_RUN_MODE>" # Dataflow run mode ("regular" or "retryDLQ")
local_transformation_context_path = "<YOUR_CONTEXT_PATH>" # Path to local transformation context (optional)
dlq_gcs_pub_sub_subscription = "<YOUR_DLQ_SUBSCRIPTION>" # Pub/Sub subscription for the dead-letter queue (optional)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ variable "common_params" {
directory_watch_duration_in_minutes = optional(string)
spanner_priority = optional(string)
local_session_file_path = optional(string)
transformation_jar_path = optional(string)
transformation_custom_parameters = optional(string)
transformation_class_name = optional(string)
filtered_events_directory = optional(string)
})
runner_params = object({
additional_experiments = optional(set(string), [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,10 @@ resource "google_dataflow_flex_template_job" "live_migration_job" {
directoryWatchDurationInMinutes = tostring(var.dataflow_params.template_params.directory_watch_duration_in_minutes)
spannerPriority = var.dataflow_params.template_params.spanner_priority
dlqGcsPubSubSubscription = var.dataflow_params.template_params.dlq_gcs_pub_sub_subscription
transformationJarPath = var.dataflow_params.template_params.transformation_jar_path
transformationClassName = var.dataflow_params.template_params.transformation_class_name
transformationCustomParameters = var.dataflow_params.template_params.transformation_custom_parameters
filteredEventsDirectory = var.dataflow_params.template_params.filtered_events_directory
}

# Additional Job Configurations
Expand All @@ -178,7 +182,7 @@ resource "google_dataflow_flex_template_job" "live_migration_job" {
service_account_email = var.dataflow_params.runner_params.service_account_email
skip_wait_on_job_termination = var.dataflow_params.runner_params.skip_wait_on_job_termination
staging_location = var.dataflow_params.runner_params.staging_location
subnetwork = var.common_params.host_project != null ? "https://www.googleapis.com/compute/v1/projects/${var.common_params.host_project}/regions/${var.common_params.region}/subnetworks/${var.dataflow_params.runner_params.subnetwork}" : "https://www.googleapis.com/compute/v1/projects/${var.common_params.project}/regions/${var.common_params.region}/subnetworks/${var.dataflow_params.runner_params.subnetwork}"
subnetwork = var.dataflow_params.runner_params.subnetwork != null ? var.common_params.host_project != null ? "https://www.googleapis.com/compute/v1/projects/${var.common_params.host_project}/regions/${var.common_params.region}/subnetworks/${var.dataflow_params.runner_params.subnetwork}" : "https://www.googleapis.com/compute/v1/projects/${var.common_params.project}/regions/${var.common_params.region}/subnetworks/${var.dataflow_params.runner_params.subnetwork}" : null
temp_location = var.dataflow_params.runner_params.temp_location
on_delete = var.dataflow_params.runner_params.on_delete
region = var.common_params.region
Expand Down
Loading

0 comments on commit cec8533

Please sign in to comment.