Skip to content

Commit

Permalink
Merge pull request #105 from aws-samples/94-update-cqlreplicatorscala…
Browse files Browse the repository at this point in the history
…-for-memorydb-parquet-and-opensearch

94 update cqlreplicatorscala for memorydb parquet and opensearch
  • Loading branch information
nwheeler81 authored Feb 5, 2024
2 parents d3c02d9 + 5988502 commit 71b5b8d
Show file tree
Hide file tree
Showing 6 changed files with 466 additions and 243 deletions.
23 changes: 18 additions & 5 deletions glue/README.MD
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ This migration approach ensures zero downtime, no code compilation, predictable
## Architecture
This solution offers customers the flexibility to scale the migration workload up and out by deploying multiple Glue
jobs of CQLReplicator.
Each glue job (tile) is tasked with handling a specific subset of primary keys (250K per tile) to distribute migration
workload evenly across Glue data processing units.
Each glue job (tile) is tasked with handling a specific subset of primary keys (250K per one data processing unit
G.025X) to distribute migration
workload evenly across data processing units.
A single replication glue job (AWS Glue DPU - 0.25X) can push up to 1500 WCUs per second against the target table in
Amazon Keyspaces. This allows for easy estimation of the final traffic against Amazon Keyspaces.

Expand Down Expand Up @@ -188,10 +189,22 @@ cqlreplicator --state stats --landing-zone s3://cql-replicator-1234567890-us-wes
+------------------------------------------------------------------------+
| 1 | 2 | 2 | 1 | "2024-02-02T21:50:25.454" |
+------------------------------------------------------------------------+
[2024-02-02T16:50:39-05:00] Discovered rows in casereview.casedetails is 70270
[2024-02-02T16:50:39-05:00] Replicated rows in casereview.casedetails_2 is 70270
[2024-02-02T16:50:39-05:00] Discovered rows in ks_test_cql_replicator.test_cql_replicator is 70270
[2024-02-02T16:50:39-05:00] Replicated rows in ks_test_cql_replicator.test_cql_replicator is 70270
```
## AWS Glue Monitoring
You can profile and monitor CQLReplicator operations using AWS Glue job profiler. It collects and processes raw data
from AWS Glue jobs into readable, near real-time metrics stored in Amazon CloudWatch.
These statistics are retained and aggregated in CloudWatch so that you can access historical information for a better
perspective on how your application is performing.
In order tp enable the CQLReplicator enhanced monitoring use the `--enhanced-monitoring-enabled` flag with your init
command.
***You may incur additional charges when you enable job metrics and CloudWatch custom metrics are created***
## Cost optimization
In order to reduce AWS Glue costs after the historical workload moved to the target storage:
Expand Down Expand Up @@ -262,7 +275,7 @@ After the init phase is completed you can run the following command to start the
## Clean up
The following command will delete the Glue job, connector, the S3 bucket, and Keyspaces'table ledger:
The following command will delete the Glue job, connector, the S3 bucket, and the ledger table:
```shell
cqlreplicator --state cleanup --landing-zone s3://cql-replicator-1234567890-us-west-2 --region "us-west-2"
Expand Down
58 changes: 39 additions & 19 deletions glue/bin/cqlreplicator
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,14 @@ GLUE_IAM_ROLE=""
AWS_ACCOUNT=""
KEYS_PER_TILE=0
ROWS_PER_WORKER=250000
# Target type might be keyspace or parquet
TARGET_TYPE=keyspaces
SKIP_GLUE_CONNECTOR=false
SKIP_KEYSPACES_LEDGER=false
JSON_MAPPING=""
REPLICATION_POINT_IN_TIME=0
REPLICATION_STATS_ENABLED=false
GLUE_MONITORING=false
SAFE_MODE=true
OS=$(uname -a | awk '{print $1}')

# Progress bar configuration
Expand Down Expand Up @@ -90,6 +91,7 @@ log() {

if [[ "$OS" == Linux || "$OS" == Darwin ]]; then
log "OS: $OS"
log "AWS CLI: $(aws --version)"
else
log "ERROR: Please run this script in AWS CloudShell or Linux/Darwin"
exit 1
Expand Down Expand Up @@ -193,10 +195,10 @@ function uploader_helper() {
local next_pos=$3
local final_pos=$4
check_file_exists "$path_to_conf/$artifact_name"
progress $curr_pos $final_pos "Uploading $artifact_name "
progress $curr_pos $final_pos "Uploading $artifact_name "
if ls "$path_to_conf/$artifact_name" > /dev/null
then
progress $next_pos $final_pos "Uploading $artifact_name "
progress $next_pos $final_pos "Uploading $artifact_name "
aws s3 cp "$path_to_conf"/"$artifact_name" "$S3_LANDING_ZONE"/artifacts/"$artifact_name" > /dev/null
else
log "ERROR: $path_to_conf/$artifact_name not found"
Expand Down Expand Up @@ -228,14 +230,14 @@ function barrier() {
}

function Usage_Exit {
log "$0 [--state init/run/request-stop|--tiles number of tiles|--landing-zone s3Uri|--writetime-column col3|\
echo "$0 [--state init/run/request-stop|--tiles number of tiles|--landing-zone s3Uri|--writetime-column col3|\
--src-keyspace keyspace_name|--src-table table_name|--trg-keyspace keyspace_name|--trg-table table_name]"
log "Script version:" ${MIGRATOR_VERSION}
log "init - Deploy CQLReplicator Glue job, and download jars"
log "run - Start migration process"
log "stats - Upload progress. Only for historical workload"
log "request-stop - Stop migration process"
log "cleanup - Delete all CQLReplicator artifacts"
echo "Script version:" ${MIGRATOR_VERSION}
echo "init - Deploy CQLReplicator Glue job, and download jars"
echo "run - Start migration process"
echo "stats - Upload progress. Only for historical workload"
echo "request-stop - Stop migration process"
echo "cleanup - Delete all the CQLReplicator's artifacts"
exit 1
}

Expand All @@ -246,9 +248,9 @@ function Clean_Up {
aws s3 rb "$S3_LANDING_ZONE"
local connection_name
connection_name=$(aws glue get-job --job-name CQLReplicator --query 'Job.Connections.Connections[0]' --output text)
aws glue delete-connection --connection-name "$connection_name" --region "$AWS_REGION"
aws glue delete-connection --connection-name cql-replicator-memorydb-integration --region "$AWS_REGION" > /dev/null
aws glue delete-connection --connection-name cql-replicator-opensearch-integration --region "$AWS_REGION" > /dev/null
aws glue delete-connection --connection-name "$connection_name" --region "$AWS_REGION" > /dev/null 2>&1
aws glue delete-connection --connection-name cql-replicator-memorydb-integration --region "$AWS_REGION" > /dev/null 2>&1
aws glue delete-connection --connection-name cql-replicator-opensearch-integration --region "$AWS_REGION" > /dev/null 2>&1
aws glue delete-job --job-name CQLReplicator --region "$AWS_REGION"
if [[ $SKIP_KEYSPACES_LEDGER == false ]]; then
aws keyspaces delete-keyspace --keyspace-name migration --region "$AWS_REGION"
Expand Down Expand Up @@ -335,8 +337,13 @@ function Init {

# Create Glue Connector
local glue_conn_name
local enhanced_monitoring=""
if [[ "$GLUE_MONITORING" == true ]]; then
enhanced_monitoring=',"--enable-continuous-cloudwatch-log":"true","--enable-continuous-log-filter":"true","--enable-metrics":"true","--enable-observability-metrics":"true"'
fi

if [[ $SKIP_GLUE_CONNECTOR == false ]]; then
progress 3 5 "Creating Glue connector and CQLReplicator job "
progress 3 5 "Creating Glue artifacts "
glue_conn_name=$(echo cql-replicator-"$(uuidgen)" | tr ' [:upper:]' ' [:lower:]')
aws glue create-connection --connection-input '{
"Name":"'$glue_conn_name'",
Expand Down Expand Up @@ -390,11 +397,12 @@ function Init {
"--extra-jars":"'$S3_LANDING_ZONE'/artifacts/jedis-4.4.6.jar,'$S3_LANDING_ZONE'/artifacts/spark-cassandra-connector-assembly_2.12-3.4.1.jar,'$S3_LANDING_ZONE'/artifacts/resilience4j-retry-1.7.1.jar,'$S3_LANDING_ZONE'/artifacts/resilience4j-core-1.7.1.jar,'$S3_LANDING_ZONE'/artifacts/vavr-0.10.4.jar,'$S3_LANDING_ZONE'/artifacts/aws-sigv4-auth-cassandra-java-driver-plugin-4.0.9.jar,'$S3_LANDING_ZONE'/artifacts/opensearch-spark-30_2.12-1.0.1.jar",
"--conf":"spark.files='$S3_LANDING_ZONE'/artifacts/KeyspacesConnector.conf,'$S3_LANDING_ZONE'/artifacts/CassandraConnector.conf --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.sql.extensions=com.datastax.spark.connector.CassandraSparkExtensions --conf spark.kryoserializer.buffer.max=128m --conf spark.rdd.compress=true --conf spark.cleaner.periodicGC.interval=1min --conf spark.kryo.referenceTracking=false --conf spark.cleaner.referenceTracking.cleanCheckpoints=true --conf spark.task.maxFailures=64",
"--class":"GlueApp"
'$enhanced_monitoring'
}' > /dev/null
fi

if [[ $SKIP_GLUE_CONNECTOR == true ]]; then
progress 3 5 "Creating CQLReplicator job "
progress 3 5 "Creating Glue artifacts "
aws glue create-job \
--name "CQLReplicator" \
--role "$GLUE_IAM_ROLE" \
Expand All @@ -411,16 +419,17 @@ function Init {
"--extra-jars":"'$S3_LANDING_ZONE'/artifacts/jedis-4.4.6.jar,'$S3_LANDING_ZONE'/artifacts/spark-cassandra-connector-assembly_2.12-3.4.1.jar,'$S3_LANDING_ZONE'/artifacts/resilience4j-retry-1.7.1.jar,'$S3_LANDING_ZONE'/artifacts/resilience4j-core-1.7.1.jar,'$S3_LANDING_ZONE'/artifacts/vavr-0.10.4.jar,'$S3_LANDING_ZONE'/artifacts/aws-sigv4-auth-cassandra-java-driver-plugin-4.0.9.jar,'$S3_LANDING_ZONE'/artifacts/opensearch-spark-30_2.12-1.0.1.jar",
"--conf":"spark.files='$S3_LANDING_ZONE'/artifacts/KeyspacesConnector.conf,'$S3_LANDING_ZONE'/artifacts/CassandraConnector.conf --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.sql.extensions=com.datastax.spark.connector.CassandraSparkExtensions --conf spark.kryoserializer.buffer.max=128m --conf spark.rdd.compress=true --conf spark.cleaner.periodicGC.interval=1min --conf spark.kryo.referenceTracking=false --conf spark.cleaner.referenceTracking.cleanCheckpoints=true --conf spark.task.maxFailures=64",
"--class":"GlueApp"
'$enhanced_monitoring'
}' > /dev/null
fi

if [[ $SKIP_KEYSPACES_LEDGER == true ]]; then
progress 4 5 "Skipping CQLReplicator's internal keyspace "
progress 5 5 "Skipping CQLReplicator's internal table "
progress 4 5 "Skipping CQLReplicator's internal keyspace "
progress 5 5 "Skipping CQLReplicator's internal table "
fi

if [[ $SKIP_KEYSPACES_LEDGER == false ]]; then
progress 4 5 "Creating CQLReplicator's internal resources "
progress 4 5 "Creating CQLReplicator's internal resources "
# Create a keyspace - migration
aws keyspaces create-keyspace --keyspace-name migration --region "$AWS_REGION" > /dev/null
sleep 20
Expand All @@ -438,7 +447,7 @@ function Init {
{ "name": "offload_status", "type": "text" } ],
"partitionKeys": [ { "name": "ks" }, { "name": "tbl" } ],
"clusteringKeys": [ { "name": "tile", "orderBy": "ASC" }, { "name": "ver", "orderBy": "ASC" } ] }' --region "$AWS_REGION" > /dev/null
progress 5 5 "Creating CQLReplicator's internal resources "
progress 5 5 "Created the CQLReplicator internal resources "
fi

log "Deploy is completed"
Expand All @@ -463,6 +472,7 @@ function Start_Discovery {
log "TTL COLUMN:" $TTL_COLUMN
log "ROWS PER DPU:" $ROWS_PER_WORKER
log "START REPLICATING FROM: $REPLICATION_POINT_IN_TIME (0 is disabled)"
log "SAFE MODE: $SAFE_MODE"
local workers=$((1 + TILES / 2))
log "Checking if the discovery job is already running..."
check_discovery_runs "true"
Expand All @@ -478,6 +488,7 @@ function Start_Discovery {
"--TARGET_KS":"'$TARGET_KS'",
"--TARGET_TBL":"'$TARGET_TBL'",
"--WRITETIME_COLUMN":"'$WRITETIME_COLUMN'",
"--SAFE_MODE":"'$SAFE_MODE'",
"--OFFLOAD_LARGE_OBJECTS":"'$OFFLOAD_LARGE_OBJECTS_B64'",
"--REPLICATION_POINT_IN_TIME":"'$REPLICATION_POINT_IN_TIME'",
"--TTL_COLUMN":"'$TTL_COLUMN'"}' --output text)
Expand All @@ -504,6 +515,7 @@ function Start_Replication {
"--TARGET_KS":"'$TARGET_KS'",
"--TARGET_TBL":"'$TARGET_TBL'",
"--WRITETIME_COLUMN":"'$WRITETIME_COLUMN'",
"--SAFE_MODE":"'$SAFE_MODE'",
"--OFFLOAD_LARGE_OBJECTS":"'$OFFLOAD_LARGE_OBJECTS_B64'",
"--REPLICATION_POINT_IN_TIME":"'$REPLICATION_POINT_IN_TIME'",
"--TTL_COLUMN":"'$TTL_COLUMN'"}' --output text)
Expand Down Expand Up @@ -705,6 +717,14 @@ while (( "$#" )); do
REPLICATION_STATS_ENABLED=true
shift 1
;;
--enhanced-monitoring-enabled)
GLUE_MONITORING=true
shift 1
;;
--safe-mode-disabled)
SAFE_MODE=false
shift 1
;;
--)
shift
break
Expand Down
Loading

0 comments on commit 71b5b8d

Please sign in to comment.