Skip to content

Latest commit

 

History

History
96 lines (60 loc) · 4.36 KB

README.md

File metadata and controls

96 lines (60 loc) · 4.36 KB

Streaming Amazon DynamoDB data into a centralized data lake (S3)

DynamoDB Stream captures item-level changes in the DynamoDb table. Then, Kinesis Data Stream and Firehose save the changes to an S3 bucket. A lambda function transforms the data before dumping it into S3.

dynamodb_Stream

Step 1: Create a DynamoDB table.

Create a customer table with a customer_id as the primary key, Screen Shot 2024-01-27 at 2 28 39 PM

Step 2: Create a kinesis data stream

Screen Shot 2024-01-27 at 2 32 59 PM

Step 3: Create a lambda function and a s3 bucket

Screen Shot 2024-01-27 at 2 40 26 PM

Lambda function script

          # This function adds a new line in between each record coming from Kenesis data stream 
          import json
          import boto3
          import base64
          output = []
          
          def lambda_handler(event, context):
              print(event)
              for record in event['records']:
                  payload = base64.b64decode(record['data']).decode('utf-8')
                  print('payload:', payload)
                  
                  row_w_newline = payload + "\n"
                  print('row_w_newline type:', type(row_w_newline))
                  row_w_newline = base64.b64encode(row_w_newline.encode('utf-8'))
                  
                  output_record = {
                      'recordId': record['recordId'],
                      'result': 'Ok',
                      'data': row_w_newline
                  }
                  output.append(output_record)
          
              print('Processed {} records.'.format(len(event['records'])))
              
              return {'records': output}

Step 4: Create the kinesis firehouse

Screen Shot 2024-01-27 at 2 38 12 PM

With source as kinesis data stream and destination as s3

Add Transform source records with AWS Lambda.

Add the s3 bucket

Configure the Buffer size and interval to dump the data only if the size is one MB or in 60 seconds.

Screen Shot 2024-01-27 at 2 47 53 PM

Step 5: setup the DynamoDB stream with the kinesis

Turn on the Amazon Kinesis data stream from the DynamoDB table.
Screen Shot 2024-01-27 at 2 50 45 PM

Step 6: insert data into the table

Screen Shot 2024-01-27 at 3 08 35 PM

SQL Queriew for DynamoDB

INSERT INTO "customers" value {'customers_id':1, 'name':'Baba Li', 'age':20,'gender':'M'}
INSERT INTO "customers" value {'customers_id':2, 'name':'Lucky Bill', 'age':24,'gender':'M'}
INSERT INTO "customers" value {'customers_id':3, 'name':'Mom Ma', 'age':50,'gender':'F'}
INSERT INTO "customers" value {'customers_id':4, 'name':'Locker Su', 'age':30,'gender':'M'}
INSERT INTO "customers" value {'customers_id':5, 'name':'Abdel ly', 'age':41,'gender':'F'}
INSERT INTO "customers" value {'customers_id':6, 'name':'Abou Sar', 'age':35,'gender':'F'}

update customers set age=26 where customers_id=3

select * from customers;
Screen Shot 2024-01-27 at 3 08 42 PM

Step 7: Check the data in the s3 bucket

Screen Shot 2024-01-27 at 3 11 05 PM