diff --git a/StepFunctionsStateMachine.json b/StepFunctionsStateMachine.json new file mode 100644 index 0000000000..d3cccf91c3 --- /dev/null +++ b/StepFunctionsStateMachine.json @@ -0,0 +1,105 @@ +{ + "Comment": "State Machine for Trackstars ta10", + "StartAt": "Extract", + "States": { + "Extract": { + "Type": "Task", + "Resource": "arn:aws:states:::lambda:invoke", + "OutputPath": "$.Payload", + "Parameters": { + "Payload.$": "$", + "FunctionName": "arn:aws:lambda:us-east-2:430118838802:function:TrackstarsFunction:$LATEST" + }, + "Retry": [ + { + "ErrorEquals": [ + "Lambda.ServiceException", + "Lambda.AWSLambdaException", + "Lambda.SdkClientException", + "Lambda.TooManyRequestsException" + ], + "IntervalSeconds": 1, + "MaxAttempts": 3, + "BackoffRate": 2, + "JitterStrategy": "FULL" + } + ], + "Next": "Transform", + "Catch": [ + { + "ErrorEquals": [ + "States.ALL" + ], + "Next": "Fail" + } + ] + }, + "Fail": { + "Type": "Fail" + }, + "Transform": { + "Type": "Task", + "Resource": "arn:aws:states:::lambda:invoke", + "OutputPath": "$.Payload", + "Parameters": { + "Payload.$": "$", + "FunctionName": "arn:aws:lambda:us-east-2:430118838802:function:Trackstars_2:$LATEST" + }, + "Retry": [ + { + "ErrorEquals": [ + "Lambda.ServiceException", + "Lambda.AWSLambdaException", + "Lambda.SdkClientException", + "Lambda.TooManyRequestsException" + ], + "IntervalSeconds": 1, + "MaxAttempts": 3, + "BackoffRate": 2, + "JitterStrategy": "FULL" + } + ], + "Next": "load", + "Catch": [ + { + "ErrorEquals": [ + "States.ALL" + ], + "Next": "Fail" + } + ] + }, + "load": { + "Type": "Task", + "Resource": "arn:aws:states:::lambda:invoke", + "OutputPath": "$.Payload", + "Parameters": { + "Payload.$": "$", + "FunctionName": "arn:aws:lambda:us-east-2:430118838802:function:Trackstars_3:$LATEST" + }, + "Retry": [ + { + "ErrorEquals": [ + "Lambda.ServiceException", + "Lambda.AWSLambdaException", + "Lambda.SdkClientException", + "Lambda.TooManyRequestsException" + ], + "IntervalSeconds": 1, + "MaxAttempts": 3, + "BackoffRate": 2, + "JitterStrategy": "FULL" + } + ], + "End": true, + "Catch": [ + { + "ErrorEquals": [ + "States.ALL" + ], + "Next": "Fail" + } + ] + } + } + } \ No newline at end of file diff --git a/lambda_functions/Extract/lambda_function.py b/lambda_functions/Extract/lambda_function.py new file mode 100644 index 0000000000..76bdf6a453 --- /dev/null +++ b/lambda_functions/Extract/lambda_function.py @@ -0,0 +1,36 @@ +import json +import urllib.request + +def lambda_handler(event, context): + """ + This handler implements the extraction state of the step function. Here we access a portion + of the Fingertips dataset on situational life expectancy for men and women at the age of 65. + Successful retrieval and reading of the dataset will pass control to the Transform step. + """ + try: + # URL to fetch data + url = "https://fingertips.phe.org.uk/api/all_data/csv/for_one_indicator?indicator_id=93505" + + # Fetch the data + response = urllib.request.urlopen(url) + if response.getcode() == 200: + data = response.read().decode('utf-8')[:100000] # Limit the payload size + print("Successfully fetched data") + + # Return data for the Step Function + return { + 'statusCode': 200, + 'body': data + } + else: + print(f"Error: API request failed with code: {response.getcode()}") + return { + 'statusCode': response.getcode(), + 'body': json.dumps({"error": "API request failed"}) + } + except Exception as e: + print(f"Exception occurred: {e}") + return { + 'statusCode': 500, + 'body': json.dumps({"error": str(e)}) + } \ No newline at end of file diff --git a/lambda_functions/Load/lambda_function.py b/lambda_functions/Load/lambda_function.py new file mode 100644 index 0000000000..9f3b767ce6 --- /dev/null +++ b/lambda_functions/Load/lambda_function.py @@ -0,0 +1,58 @@ +import json +import boto3 +from uuid import uuid4 +from decimal import Decimal + +# Initialize the DynamoDB client +dynamodb = boto3.resource('dynamodb') + +# Set the table name +TABLE_NAME = 'LifeExpData' + +def convert_to_decimal(obj): + """ + Recursively converts float values in a dictionary or list to Decimal for DynamoDB. + """ + if isinstance(obj, list): + return [convert_to_decimal(i) for i in obj] + elif isinstance(obj, dict): + return {k: convert_to_decimal(v) for k, v in obj.items()} + elif isinstance(obj, float): + return Decimal(str(obj)) # Convert float to Decimal + return obj + +def lambda_handler(event, context): + """ + This handler implements the final stage of the pipeline where data is loaded into + DynamoDB for use by notional follow-on processors. + """ + try: + data = event.get("body") + if not data: + raise ValueError("No data received.") + + data_items = json.loads(data) + + table = dynamodb.Table(TABLE_NAME) + + for item in data_items: + + item['id'] = str(uuid4()) + + item = convert_to_decimal(item) + + item['AreaName'] = item.get('Area Name', 'Unknown') + + table.put_item(Item=item) + + return { + 'statusCode': 200, + 'body': json.dumps(data_items) + } + + except Exception as e: + print(f"Exception occurred: {e}") + return { + 'statusCode': 500, + 'body': json.dumps({"error": str(e)}) + } \ No newline at end of file diff --git a/lambda_functions/Transform/lambda_function.py b/lambda_functions/Transform/lambda_function.py new file mode 100644 index 0000000000..c7b70efbe6 --- /dev/null +++ b/lambda_functions/Transform/lambda_function.py @@ -0,0 +1,34 @@ +from io import StringIO +import pandas as pd +import json + +def lambda_handler(event, context): + """ + This handler implements the transformation stage of the pipeline, accepting + a subset of a public dataset and transforming it for downstream use by th e + load stage of the pipeline. + """ + try: + # Extract data from the event + data = event.get("body") + if not data: + raise ValueError("No data received.") + + print("Successfully loaded data.") + + data = pd.read_csv(StringIO(data)) + + data = data[['Area Name', 'Sex', 'Value']] + + data = data.sort_values(by="Value", ascending=False) + + serialized_data = data.to_json(orient="records") + + return { + 'statusCode': 200, + 'body': serialized_data + } + + except Exception as e: + print(f"Exception occurred: {e}") + return {'statusCode': 500, 'body': str(e)}