Skip to content

Commit

Permalink
Feature(#299): Update the handler to receive settings from AWS trigger
Browse files Browse the repository at this point in the history
  • Loading branch information
David Sullivan committed Nov 2, 2019
1 parent bb5d3af commit 78b4d6d
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 65 deletions.
146 changes: 81 additions & 65 deletions dist/aws/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,56 +14,67 @@
import subprocess
import sys
import urllib.parse as urllib
from six import string_types
from collections import OrderedDict

import _pickle as cPickle

import boto3
import botocore

# Add /tmp/ path to handle python path for dependencies
sys.path.append("/tmp/")

# TODO: move to s3fs for simpler access
# not sure if this is possible?
# continue to assume that lambda handler has access to all s3 resources
s3 = boto3.client("s3")
"""
Helper method to determine if the given event was triggered by an S3 event
"""
def is_s3_trigger(event):
return "Records" in event and event["Records"][0]["eventSource"] == "aws:s3"

# TODO: handle settings from within the API gateway call or dynamically from bucket
# this is currently bundled with lambda function
settings_json = {} # default to empty
try:
# currently loaded into lambda function
with open("settings.json", "r") as f:
settings_json = json.load(f)
"""
Helper method to merge two local settings json/dict objects, then update the `PodpacSettings`.
print (settings_json) # TODO: remove
except:
pass
The settings variable here is podpac.settings.
"""
def update_podpac_settings(old_settings_json, new_settings_json):
updated_settings = {**old_settings_json, **new_settings_json}
for key in updated_settings:
settings[key] = updated_settings[key]

"""
Returns true if the requested output is already computed (and force_compute is false.)
"""
def check_for_cached_output(input_file_key, pipeline, settings_json, bucket):
output_filename = input_file_key.replace(".json", "." + pipeline["output"]["format"])
output_filename = output_filename.replace(settings_json["S3_INPUT_FOLDER"], settings_json["S3_OUTPUT_FOLDER"])
try:
s3.head_object(Bucket=bucket, Key=output_filename)
# Object exists, so we don't have to recompute
if not pipeline.get("force_compute", False):
return True, output_filename
except botocore.exceptions.ClientError as e:
if e.response["Error"]["Code"] == "404":
# It does not exist, so we should proceed
return False, output_filename
# Something else has gone wrong... not handling this case.
return False, output_filename
return False, output_filename

def handler(event, context, get_deps=True, ret_pipeline=False):

# TODO: remove
print (event)
# Add /tmp/ path to handle python path for dependencies
sys.path.append("/tmp/")

# get bucket path
# this should be defined by S3 trigger or API gateway trigger
bucket = settings_json["S3_BUCKET_NAME"]
# TODO: move to s3fs for simpler access
# not sure if this is possible?
# continue to assume that lambda handler has access to all s3 resources
s3 = boto3.client("s3")

# get dependencies path
if "FUNCTION_DEPENDENCIES_KEY" in settings_json:
dependencies = settings_json["FUNCTION_DEPENDENCIES_KEY"]
else:
dependencies = "podpac_deps_{}.zip".format(
settings_json["PODPAC_VERSION"]
) # this should be equivalent to version.semver()
args = []
kwargs = {}

if "Records" in event and event["Records"][0]["eventSource"] == "aws:s3":
# <start S3 trigger specific>
# This event was triggered by S3, so let's see if the requested output is already cached.
if is_s3_trigger(event):

# TODO: get the name of the calling bucket from event for s3 triggers?
# bucket = event['Records'][0]['s3']['bucket']['name']
# We always have to look to the bucket that triggered the event for the input
bucket = event['Records'][0]['s3']['bucket']['name']

file_key = urllib.unquote_plus(event["Records"][0]["s3"]["object"]["key"])
_json = ""
Expand All @@ -78,15 +89,38 @@ def handler(event, context, get_deps=True, ret_pipeline=False):
_json += "\n"
_json += r.decode()
pipeline = json.loads(_json, object_pairs_hook=OrderedDict)

# We can return if there is a valid cached output to save compute time.
settings_json = pipeline["settings"]
cached, output_filename = check_for_cached_output(file_key, pipeline, settings_json, bucket)
if cached:
return
else:
# elif ('pathParameters' in event and event['pathParameters'] is not None and 'proxy' in event['pathParameters']) or ('authorizationToken' in event and event['authorizationToken'] == "incoming-client-token"):
# TODO: Need to get the pipeline from the event...
print ("DSullivan: we have an API Gateway event")
pipeline = None
print ("DSullivan: we have an API Gateway event. Will now get deps in order to proceed.")
url = event["queryStringParameters"]
if isinstance(url, string_types):
url = urllib.parse_qs(urllib.urlparse(url).query)

# Capitalize the keywords for consistency
settings_json = {}
for k in url:
if k.upper() == "SETTINGS":
settings_json = url[k]
bucket = settings_json["S3_BUCKET_NAME"]

# get dependencies path
if "FUNCTION_DEPENDENCIES_KEY" in settings_json:
dependencies = settings_json["FUNCTION_DEPENDENCIES_KEY"]
else:
# TODO: this could be a problem, since we can't import podpac settings yet
# which means the input event might have to include the version or
# "FUNCTION_DEPENDENCIES_KEY".
dependencies = "podpac_deps_{}.zip".format(
settings_json["PODPAC_VERSION"]
) # this should be equivalent to version.semver()
# Download dependencies from specific bucket/object
if get_deps:
s3.download_file(bucket, "podpac/" + dependencies, "/tmp/" + dependencies)
s3.download_file(bucket, dependencies, "/tmp/" + dependencies)
subprocess.call(["unzip", "/tmp/" + dependencies, "-d", "/tmp"])
sys.path.append("/tmp/")
subprocess.call(["rm", "/tmp/" + dependencies])
Expand All @@ -101,38 +135,18 @@ def handler(event, context, get_deps=True, ret_pipeline=False):
from podpac.core.utils import JSONEncoder, _get_query_params_from_url
import podpac.datalib

# check if file exists
if pipeline is not None:
filename = file_key.replace(".json", "." + pipeline["output"]["format"])
filename = filename.replace(settings["S3_INPUT_FOLDER"], settings["S3_OUTPUT_FOLDER"])
try:
s3.head_object(Bucket=bucket, Key=filename)
# Object exists, so we don't have to recompute
if not pipeline.get("force_compute", False):
return
except botocore.exceptions.ClientError as e:
if e.response["Error"]["Code"] == "404":
# It does not exist, so we should proceed
pass
else:
# Something else has gone wrong... not handling this case.
pass
try:
update_podpac_settings(settings, settings_json)
except Exception:
print("The settings could not be updated.")

args = []
kwargs = {}
# if from S3 trigger
if pipeline is not None:
# if 'Records' in event and event['Records'][0]['eventSource'] == 'aws:s3':
if is_s3_trigger(event):
node = Node.from_definition(pipeline["pipeline"])
coords = Coordinates.from_json(json.dumps(pipeline["coordinates"], indent=4, cls=JSONEncoder))
fmt = pipeline["output"]["format"]
kwargs = pipeline["output"].copy()
kwargs.pop("format")

# else from api gateway and it's a WMS/WCS request
else:
# elif ('pathParameters' in event and event['pathParameters'] is not None and 'proxy' in event['pathParameters']) or ('authorizationToken' in event and event['authorizationToken'] == "incoming-client-token"):
print (_get_query_params_from_url(event["queryStringParameters"]))
coords = Coordinates.from_url(event["queryStringParameters"])
node = Node.from_url(event["queryStringParameters"])
fmt = _get_query_params_from_url(event["queryStringParameters"])["FORMAT"].split("/")[-1]
Expand All @@ -144,8 +158,10 @@ def handler(event, context, get_deps=True, ret_pipeline=False):
return node

body = output.to_format(fmt, *args, **kwargs)
if pipeline is not None:
s3.put_object(Bucket=bucket, Key=filename, Body=body)

# output_filename only exists if this was an S3 triggered event.
if output_filename is not None:
s3.put_object(Bucket=bucket, Key=output_filename, Body=body)
else:
try:
json.dumps(body)
Expand Down
2 changes: 2 additions & 0 deletions doc/source/developer/aws.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# AWS Lambda #

TODO: This needs rewritten.

Podpac includes a package to create an Amazon Web Services Lambda function to execute nodes in a server-less environment. This package can be altered to handle events according to the developer's use case.

## AWS Architecture ##
Expand Down
2 changes: 2 additions & 0 deletions podpac/core/managers/aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -1168,6 +1168,8 @@ def get_bucket(session, bucket_name):
# init empty object
bucket = {"name": bucket_name}

# TODO: this is usually none, even though the bucket has a region. It could either be a bug
# in getting the region/LocationConstraint, or just misleading
# get location constraint. this will be None for no location constraint
bucket["region"] = s3.get_bucket_location(Bucket=bucket_name)["LocationConstraint"]

Expand Down

0 comments on commit 78b4d6d

Please sign in to comment.