Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ta10 trackstars #260

Open
wants to merge 2 commits into
base: etl-ta8
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 105 additions & 0 deletions StepFunctionsStateMachine.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
{
"Comment": "State Machine for Trackstars ta10",
"StartAt": "Extract",
"States": {
"Extract": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"OutputPath": "$.Payload",
"Parameters": {
"Payload.$": "$",
"FunctionName": "arn:aws:lambda:us-east-2:430118838802:function:TrackstarsFunction:$LATEST"
},
"Retry": [
{
"ErrorEquals": [
"Lambda.ServiceException",
"Lambda.AWSLambdaException",
"Lambda.SdkClientException",
"Lambda.TooManyRequestsException"
],
"IntervalSeconds": 1,
"MaxAttempts": 3,
"BackoffRate": 2,
"JitterStrategy": "FULL"
}
],
"Next": "Transform",
"Catch": [
{
"ErrorEquals": [
"States.ALL"
],
"Next": "Fail"
}
]
},
"Fail": {
"Type": "Fail"
},
"Transform": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"OutputPath": "$.Payload",
"Parameters": {
"Payload.$": "$",
"FunctionName": "arn:aws:lambda:us-east-2:430118838802:function:Trackstars_2:$LATEST"
},
"Retry": [
{
"ErrorEquals": [
"Lambda.ServiceException",
"Lambda.AWSLambdaException",
"Lambda.SdkClientException",
"Lambda.TooManyRequestsException"
],
"IntervalSeconds": 1,
"MaxAttempts": 3,
"BackoffRate": 2,
"JitterStrategy": "FULL"
}
],
"Next": "load",
"Catch": [
{
"ErrorEquals": [
"States.ALL"
],
"Next": "Fail"
}
]
},
"load": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"OutputPath": "$.Payload",
"Parameters": {
"Payload.$": "$",
"FunctionName": "arn:aws:lambda:us-east-2:430118838802:function:Trackstars_3:$LATEST"
},
"Retry": [
{
"ErrorEquals": [
"Lambda.ServiceException",
"Lambda.AWSLambdaException",
"Lambda.SdkClientException",
"Lambda.TooManyRequestsException"
],
"IntervalSeconds": 1,
"MaxAttempts": 3,
"BackoffRate": 2,
"JitterStrategy": "FULL"
}
],
"End": true,
"Catch": [
{
"ErrorEquals": [
"States.ALL"
],
"Next": "Fail"
}
]
}
}
}
36 changes: 36 additions & 0 deletions lambda_functions/Extract/lambda_function.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import json
import urllib.request

def lambda_handler(event, context):
"""
This handler implements the extraction state of the step function. Here we access a portion
of the Fingertips dataset on situational life expectancy for men and women at the age of 65.
Successful retrieval and reading of the dataset will pass control to the Transform step.
"""
try:
# URL to fetch data
url = "https://fingertips.phe.org.uk/api/all_data/csv/for_one_indicator?indicator_id=93505"

# Fetch the data
response = urllib.request.urlopen(url)
if response.getcode() == 200:
data = response.read().decode('utf-8')[:100000] # Limit the payload size
print("Successfully fetched data")

# Return data for the Step Function
return {
'statusCode': 200,
'body': data
}
else:
print(f"Error: API request failed with code: {response.getcode()}")
return {
'statusCode': response.getcode(),
'body': json.dumps({"error": "API request failed"})
}
except Exception as e:
print(f"Exception occurred: {e}")
return {
'statusCode': 500,
'body': json.dumps({"error": str(e)})
}
58 changes: 58 additions & 0 deletions lambda_functions/Load/lambda_function.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import json
import boto3
from uuid import uuid4
from decimal import Decimal

# Initialize the DynamoDB client
dynamodb = boto3.resource('dynamodb')

# Set the table name
TABLE_NAME = 'LifeExpData'

def convert_to_decimal(obj):
"""
Recursively converts float values in a dictionary or list to Decimal for DynamoDB.
"""
if isinstance(obj, list):
return [convert_to_decimal(i) for i in obj]
elif isinstance(obj, dict):
return {k: convert_to_decimal(v) for k, v in obj.items()}
elif isinstance(obj, float):
return Decimal(str(obj)) # Convert float to Decimal
return obj

def lambda_handler(event, context):
"""
This handler implements the final stage of the pipeline where data is loaded into
DynamoDB for use by notional follow-on processors.
"""
try:
data = event.get("body")
if not data:
raise ValueError("No data received.")

data_items = json.loads(data)

table = dynamodb.Table(TABLE_NAME)

for item in data_items:

item['id'] = str(uuid4())

item = convert_to_decimal(item)

item['AreaName'] = item.get('Area Name', 'Unknown')

table.put_item(Item=item)

return {
'statusCode': 200,
'body': json.dumps(data_items)
}

except Exception as e:
print(f"Exception occurred: {e}")
return {
'statusCode': 500,
'body': json.dumps({"error": str(e)})
}
34 changes: 34 additions & 0 deletions lambda_functions/Transform/lambda_function.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from io import StringIO
import pandas as pd
import json

def lambda_handler(event, context):
"""
This handler implements the transformation stage of the pipeline, accepting
a subset of a public dataset and transforming it for downstream use by th e
load stage of the pipeline.
"""
try:
# Extract data from the event
data = event.get("body")
if not data:
raise ValueError("No data received.")

print("Successfully loaded data.")

data = pd.read_csv(StringIO(data))

data = data[['Area Name', 'Sex', 'Value']]

data = data.sort_values(by="Value", ascending=False)

serialized_data = data.to_json(orient="records")

return {
'statusCode': 200,
'body': serialized_data
}

except Exception as e:
print(f"Exception occurred: {e}")
return {'statusCode': 500, 'body': str(e)}