Skip to content

Latest commit

 

History

History

cdc-parquet-to-apache-iceberg

AWS Glue Job to ingest Change Data Capture(CDC) to Apache Iceberg table in Amazon S3

glue-job-cdc-parquet-to-iceberg-arch

This is a glue job project for CDK development with Python.

The cdk.json file tells the CDK Toolkit how to execute your app.

This project is set up like a standard Python project. The initialization process also creates a virtualenv within this project, stored under the .venv directory. To create the virtualenv it assumes that there is a python3 (or python for Windows) executable in your path with access to the venv package. If for any reason the automatic creation of the virtualenv fails, you can create the virtualenv manually.

To manually create a virtualenv on MacOS and Linux:

$ python3 -m venv .venv

After the init process completes and the virtualenv is created, you can use the following step to activate your virtualenv.

$ source .venv/bin/activate

If you are a Windows platform, you would activate the virtualenv like this:

% .venv\Scripts\activate.bat

Once the virtualenv is activated, you can install the required dependencies.

(.venv) $ pip install -r requirements.txt

AWS Glue 4.0 added support for Apache Iceberg natively, so no Apache Iceberg connector for AWS Glue from marketplace is needed.

In case of AWS Glue 3.0, before synthesizing the CloudFormation, you first set up Apache Iceberg connector for AWS Glue to use Apache Iceberg with AWS Glue jobs. (For more information, see References (3), (4), or (5))

Then you should set approperly the cdk context configuration file, cdk.context.json. For example:

{
  "vpc_name": "default",
  "glue_assets_s3_bucket_name": "aws-glue-assets-123456789012-us-east-1",
  "glue_job_script_file_name": "employee-details-full-etl.py",
  "glue_job_input_arguments": {
    "--raw_s3_path": "s3://aws-glue-input-parquet-atq4q5u/full-load",
    "--iceberg_s3_path": "s3://aws-glue-output-iceberg-atq4q5u",
    "--database": "human_resources",
    "--partition_key": "department",
    "--primary_key": "emp_no",
    "--table_name": "employee_details"
  }
}

ℹ️ --primary_key option should be set by Iceberg table's primary column name. :information_source: --partition_key option should be set by the colum for Iceberg table partition.

⚠️ You should create a S3 bucket for a glue job script and upload the glue job script file into the s3 bucket.

At this point you can now synthesize the CloudFormation template for this code.

(.venv) $ export CDK_DEFAULT_ACCOUNT=$(aws sts get-caller-identity --query Account --output text)
(.venv) $ export CDK_DEFAULT_REGION=$(aws configure get region)
(.venv) $ cdk synth

Use cdk deploy command to create the stack shown above.

(.venv) $ cdk deploy --require-approval never

To add additional dependencies, for example other CDK libraries, just add them to your setup.py file and rerun the pip install -r requirements.txt command.

Run Test

  1. Generate fake parquet files
    (.venv) $ pwd
    ~/my-aws-cdk-examples/glue
    (.venv) $ pip install boto3 Faker fastparquet pyarrow pandas # pip install -r requirements-dev.txt
    (.venv) $ python src/utils/gen_fake_cdc_parquet.py
    
    [full-load data]
       Op  emp_no       name     department     city  salary              m_time
    0   I     129    Tiffany             IT    Tokyo   49882 1973-10-15 12:32:25
    1   I     204      Oscar             IT  Chicago   93507 2006-03-17 23:21:06
    2   I     252      Julia     Purchasing    Seoul   41204 2007-04-26 12:00:28
    3   I     288       Chad             IT    Tokyo   89084 2002-02-06 13:06:02
    4   I     347      James  Manufacturing      SFO   62261 1988-09-23 20:13:34
    5   I     377     Nathan  Manufacturing       NY   45970 1971-03-03 06:06:03
    6   I     434      Emily          Sales      SFO   20443 1994-03-27 02:22:03
    7   I     558     Edward  Manufacturing    Tokyo   85874 1985-08-18 11:37:01
    8   I     633   Danielle          Sales    Seoul   65974 2020-02-16 20:01:22
    9   I     682       Anne     Purchasing      SFO   36606 2000-07-31 17:35:01
    10  I     695       Gina             IT    Tokyo   93670 2006-02-07 23:05:40
    11  I     695    Richard        Finance    Seoul   37034 1998-12-09 20:18:12
    12  I     924  Frederick          Sales  Chicago   48173 1974-05-01 01:23:15
    13  I     951     Hannah     Purchasing       NY   71689 1993-03-07 04:18:21
    14  I     998  Elizabeth  Manufacturing    Seoul   46318 1971-05-27 14:07:43
    
    [cdc data]
    Op  emp_no      name     department     city  salary                  m_time
    0  U     377    Nathan       Security   Lisbon   50210 2022-07-11 15:12:31.189
    1  U     347     James            R&D   Sydney   56497 2022-07-11 08:48:31.189
    2  I    8826     Kelly        Finance    Tokyo   52185 2006-06-03 17:46:51.000
    3  U     252     Julia             FC   Sydney   89129 2022-07-11 13:07:31.189
    4  I    8787     Chris             IT  Chicago   30662 1991-08-04 05:10:38.000
    5  D     951    Hannah     Purchasing       NY   71689 2022-07-11 08:48:31.189
    6  I    7339  Jonathan          Sales    Seoul   33806 1972-08-24 22:44:20.000
    7  I    7441  Kristine  Manufacturing    Seoul   87117 1990-08-19 21:13:20.000
    (.venv) $ ls *.parquet
     cdc-load-20220730173650.parquet
     full-load-20220730173650.parquet
    
  2. Create S3 bucket for input and oput data and Copy fake parquet files into input S3 bucket
    (.venv) $ aws s3 mb s3://aws-glue-input-parquet-atq4q5u --region us-east-1
    (.venv) $ aws s3 cp full-load-20220730173650.parquet s3://aws-glue-input-parquet-atq4q5u/full-load/human_resources/employee_details/full-load-20220730173650.parquet
    (.venv) $ aws s3 mb s3://aws-glue-output-iceberg-atq4q5u --region us-east-1
    
  3. Create an Iceberg table using Athena - To create an Iceberg table in the AWS Glue Data Catalog, open the Athena console and run the following queries in sequence:
    -- Create database for the demo
    CREATE DATABASE IF NOT EXISTS human_resources;
    
    -- Create output Iceberg table with partitioning. Replace the S3 bucket name with your bucket name
    CREATE TABLE human_resources.employee_details_iceberg (
       emp_no bigint,
       name string,
       department string,
       city string,
       salary int,
       m_time timestamp,
       last_applied_date timestamp)
    PARTITIONED BY (`department`)
    LOCATION 's3://aws-glue-output-iceberg-atq4q5u/human_resources/employee_details_iceberg'
    TBLPROPERTIES (
       'table_type'='iceberg'
    );
    
    ℹ️ If you fail to create the table, give Athena users access permissions on human_resources through AWS Lake Formation
  4. Deply glue job using cdk deploy
    (.venv) $ ls src/main/python/etl/
     employee-details-cdc-etl.py
    (.venv) $ aws s3 mb s3://aws-glue-assets-123456789012-us-east-1 --region us-east-1
    (.venv) $ aws s3 cp employee-details-cdc-etl.py s3://aws-glue-assets-123456789012-us-east-1/scripts/employee-details-cdc-etl.py
    (.venv) $ cdk deploy --require-approval never
    
  5. Make sure the glue job to access the Iceberg tables in the database, otherwise grant the glue job to permissions
    (.venv) $ aws lakeformation grant-permissions \
                --principal DataLakePrincipalIdentifier=arn:aws:iam::account-id:role/GlueJobRole \
                --permissions SELECT INSERT DELETE DESCRIBE ALTER \
                --resource '{ "Table": {"DatabaseName": "human_resources", "TableWildcard": {}} }'
    
  6. Run glue job to fully load data into the Iceberg table
    (.venv) $ aws glue start-job-run --job-name employee-details-cdc-etl
    
  7. Check the output logs of the glue job and results in S3
    (.venv) $ aws s3 ls s3://aws-glue-output-iceberg-atq4q5u/human_resources/employee_details_iceberg/
                            PRE data/
                            PRE metadata/
    
  8. Query the Iceberg table using Athena - After you have successfully run the AWS Glue job, you can validate the output in Athena with the following SQL query:
    SELECT * FROM human_resources.employee_details_iceberg LIMIT 10;
    
  9. Upload incremental (CDC) data for further processing - After processing the initial full load file, let’s upload the following incremental files, which include insert, update, and delete records.
    (.venv) $ aws s3 cp cdc-load-20220730173650.parquet s3://aws-glue-input-parquet-atq4q5u/cdc-load/human_resources/employee_details/cdc-load-20220730173650.parquet
    
  10. Run the AWS Glue job again to process incremental files
    (.venv) $ aws glue start-job-run \
                --job-name employee-details-cdc-etl \
                --arguments='--raw_s3_path="s3://aws-glue-input-parquet-atq4q5u/cdc-load/human_resources/employee_details/"'
    
  11. Query the Iceberg table using Athena, after incremental data processing - After incremental data processing is complete, you can run the same SELECT statement again
    SELECT * FROM human_resources.employee_details_iceberg LIMIT 10;
    
  12. Query the previous version of data with Iceberg’s time travel feature - You can run the following SQL query in Athena that uses the AS OF TIME statement of Iceberg to query the previous version of the data:
    -- Replace the timestamp with an appropriate one
    SELECT * FROM human_resources.employee_details_iceberg FOR SYSTEM_TIME AS OF TIMESTAMP '2022-07-30 17:36:00'
    

Clean Up

Delete the CloudFormation stack by running the below command.

(.venv) $ cdk destroy

Useful commands

  • cdk ls list all stacks in the app
  • cdk synth emits the synthesized CloudFormation template
  • cdk deploy deploy this stack to your default AWS account/region
  • cdk diff compare deployed stack with current state
  • cdk docs open CDK documentation

Enjoy!

References

Troubleshooting

  • When you meet the following error, see Why does my AWS Glue crawler or ETL job fail with the error "Insufficient Lake Formation permission(s)"?

     An error occurred while calling o100.getCatalogSource. Insufficient Lake Formation permission(s) on your-table-name (Service: AWSGlue; Status Code: 400; Error Code: AccessDeniedException; Request ID: 2623b59f-b5d2-497b-bed6-a7b2bcc32ba8; Proxy: null)
     
  • How do I resolve "MalformedPolicyDocument" errors in AWS CloudFormation?

    PM 1:46:58 | CREATE_FAILED        | AWS::IAM::Policy   | GlueJobRoleDefaultPolicy94EFA0CF IAM resource arn:aws:iam:us-east-1:123456789012:role/GlueJobRole cannot contain region information. (Service: AmazonIdentityManagement; Status Code: 400; Error Code: MalformedPolicyDocument; Request ID: 4e64e3c1-555f-44d1-b7e6-a170514e1ce6; Proxy: null)
     

    See cdk.Arn.format: adds region automatically for services not requiring one