Skip to content

Commit

Permalink
Merge pull request #336 from creare-com/feature/aws-improvements
Browse files Browse the repository at this point in the history
Feature/aws improvements
  • Loading branch information
David Sullivan authored Nov 22, 2019
2 parents 8ae3494 + d05f5b3 commit 3c52a2b
Show file tree
Hide file tree
Showing 4 changed files with 369 additions and 165 deletions.
2 changes: 1 addition & 1 deletion dist/aws/aws_requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ h5py>=2.9
lxml>=4.2
pydap>=3.2
rasterio>=0.36
pyproj>=2.2
pyproj>=2.4
requests>=2.18
numexpr>=2.6
lazy-import>=0.2.2
Expand Down
296 changes: 188 additions & 108 deletions dist/aws/handler.py
Original file line number Diff line number Diff line change
@@ -1,144 +1,222 @@
"""
PODPAC AWS Handler
Attributes
----------
s3 : TYPE
Description
settings_json : dict
Description
"""
from __future__ import absolute_import, division, print_function, unicode_literals

import json
import subprocess
import sys
import urllib.parse as urllib
from six import string_types
from collections import OrderedDict

import _pickle as cPickle
import os

import boto3
import botocore

from six import string_types

def is_s3_trigger(event):
"""
Helper method to determine if the given event was triggered by an S3 event
def default_pipeline(pipeline=None):
"""Get default pipeline definiton, merging with input pipline if supplied
Parameters
----------
event : dict
Event dict from AWS. See [TODO: add link reference]
pipeline : dict, optional
Input pipline. Will fill in any missing defaults.
Returns
-------
Bool
True if the event is an S3 trigger
dict
pipeline dict
"""
return "Records" in event and event["Records"][0]["eventSource"] == "aws:s3"
defaults = {
"pipeline": {},
"settings": {},
"output": {"format": "netcdf", "filename": None, "format_kwargs": {}},
# API Gateway
"url": "",
"params": {},
}

# merge defaults with input pipelines, if supplied
if pipeline is not None:
pipeline = {**defaults, **pipeline}
pipeline["output"] = {**defaults["output"], **pipeline["output"]}
pipeline["settings"] = {**defaults["settings"], **pipeline["settings"]}
else:
pipeline = defaults

return pipeline

def handler(event, context, get_deps=True, ret_pipeline=False):
"""Lambda function handler

def get_trigger(event):
"""
Helper method to determine the trigger for the lambda invocation
Parameters
----------
event : TYPE
Description
context : TYPE
Description
get_deps : bool, optional
Description
ret_pipeline : bool, optional
Description
event : dict
Event dict from AWS. See [TODO: add link reference]
Returns
-------
TYPE
Description
str
One of "S3", "eval", or "APIGateway"
"""

print(event)
if "Records" in event and event["Records"][0]["eventSource"] == "aws:s3":
return "S3"
elif "queryStringParameters" in event:
return "APIGateway"
else:
return "eval"

# Add /tmp/ path to handle python path for dependencies
sys.path.append("/tmp/")

# get S3 client - note this will have the same access that the current "function role" does
s3 = boto3.client("s3")
def parse_event(trigger, event):
"""Parse pipeline, settings, and output details from event depending on trigger
Parameters
----------
trigger : str
One of "S3", "eval", or "APIGateway"
event : dict
Event dict from AWS. See [TODO: add link reference]
"""

args = []
kwargs = {}
if trigger == "eval":
print("Triggered by Invoke")

# This event was triggered by S3, so let's see if the requested output is already cached.
if is_s3_trigger(event):
# event is the pipeline, provide consistent pipeline defaults
pipeline = default_pipeline(event)

# We always have to look to the bucket that triggered the event for the input
bucket = event["Records"][0]["s3"]["bucket"]["name"]
return pipeline

file_key = urllib.unquote_plus(event["Records"][0]["s3"]["object"]["key"])
_json = ""
elif trigger == "S3":
print("Triggered from S3")

# get boto s3 client
s3 = boto3.client("s3")

# We always have to look to the bucket that triggered the event for the input
triggered_bucket = event["Records"][0]["s3"]["bucket"]["name"]

# get the pipeline object and read
pipeline_obj = s3.get_object(Bucket=bucket, Key=file_key)
pipeline = json.loads(pipeline_obj["Body"].read().decode("utf-8"))
file_key = urllib.unquote_plus(event["Records"][0]["s3"]["object"]["key"])
pipline_obj = s3.get_object(Bucket=triggered_bucket, Key=file_key)
pipeline = json.loads(pipline_obj["Body"].read().decode("utf-8"))

# We can return if there is a valid cached output to save compute time.
settings_trigger = pipeline["settings"]
# provide consistent pipeline defaults
pipeline = default_pipeline(pipeline)

# check for cached output
cached = False
output_filename = file_key.replace(".json", "." + pipeline["output"]["format"]).replace(
settings_trigger["FUNCTION_S3_INPUT"], settings_trigger["FUNCTION_S3_OUTPUT"]
# create output filename
pipeline["output"]["filename"] = file_key.replace(".json", "." + pipeline["output"]["format"]).replace(
pipeline["settings"]["FUNCTION_S3_INPUT"], pipeline["settings"]["FUNCTION_S3_OUTPUT"]
)

try:
s3.head_object(Bucket=bucket, Key=output_filename)
if not pipeline["settings"]["FUNCTION_FORCE_COMPUTE"]:

# get configured s3 bucket to check for cache
bucket = pipeline["settings"]["S3_BUCKET_NAME"]

# We can return if there is a valid cached output to save compute time.
try:
s3.head_object(Bucket=bucket, Key=pipeline["output"]["filename"])
return None

# throws ClientError if no file is found
except botocore.exceptions.ClientError:
pass

# return pipeline definition
return pipeline

elif trigger == "APIGateway":
print("Triggered from API Gateway")

pipeline = default_pipeline()
pipeline["url"] = event["queryStringParameters"]
if isinstance(pipeline["url"], string_types):
pipeline["url"] = urllib.parse_qs(urllib.urlparse(pipeline["url"]).query)
pipeline["params"] = pipeline["url"]

# look for specific parameter definitions in query parameters
for param in pipeline["params"]:

# handle SETTINGS in query parameters
if param == "settings":
# Try loading this settings string into a dict to merge with default settings
try:
api_settings = json.loads(pipeline["params"][param])
# If we get here, the api settings were loaded
pipeline["settings"] = {**pipeline["settings"], **api_settings}
except Exception as e:
print("Got an exception when attempting to load api settings: ", e)
print(pipeline)


# handle OUTPUT in query parameters
elif param == "output":
pipeline["output"] = pipeline["params"][param]

# handle FORMAT in query parameters
elif param == "format":
pipeline["output"][param] = pipeline["params"][param].split("/")[-1]
# handle image returns
if pipeline["output"][param] in ["png", "jpg", "jpeg"]:
pipeline["output"]["format_kwargs"]["return_base64"] = True

return pipeline

else:
raise Exception("Unsupported trigger")

# Object exists, so we don't have to recompute
# TODO: the "force_compute" parameter will never work as is written in aws.py
if not pipeline.get("force_compute", False):
cached = True

except botocore.exceptions.ClientError:
pass
def handler(event, context):
"""Lambda function handler
Parameters
----------
event : dict
Description
context : TYPE
Description
get_deps : bool, optional
Description
ret_pipeline : bool, optional
Description
"""
print(event)

# Add /tmp/ path to handle python path for dependencies
sys.path.append("/tmp/")

if cached:
return
# handle triggers
trigger = get_trigger(event)

# TODO: handle "invoke" and "APIGateway" triggers explicitly
else:
print("Not triggered by S3")
# parse event
pipeline = parse_event(trigger, event)

url = event["queryStringParameters"]
if isinstance(url, string_types):
url = urllib.parse_qs(urllib.urlparse(url).query)
# bail if we can't parse
if pipeline is None:
return

# Capitalize the keywords for consistency
settings_trigger = {}
for k in url:
if k.upper() == "SETTINGS":
settings_trigger = url[k]
bucket = settings_trigger["S3_BUCKET_NAME"]
# -----
# TODO: remove when layers is configured
# get configured bucket to download dependencies
bucket = pipeline["settings"]["S3_BUCKET_NAME"]

# get dependencies path
if "FUNCTION_DEPENDENCIES_KEY" in settings_trigger:
dependencies = settings_trigger["FUNCTION_DEPENDENCIES_KEY"]
if "FUNCTION_DEPENDENCIES_KEY" in pipeline["settings"]:
dependencies = pipeline["settings"]["FUNCTION_DEPENDENCIES_KEY"]
else:
# TODO: this could be a problem, since we can't import podpac settings yet
# which means the input event might have to include the version or
# "FUNCTION_DEPENDENCIES_KEY".
dependencies = "podpac_deps_{}.zip".format(
settings_trigger["PODPAC_VERSION"]
pipeline["settings"]["PODPAC_VERSION"]
) # this should be equivalent to version.semver()

# Download dependencies from specific bucket/object
if get_deps:
s3.download_file(bucket, dependencies, "/tmp/" + dependencies)
subprocess.call(["unzip", "/tmp/" + dependencies, "-d", "/tmp"])
sys.path.append("/tmp/")
subprocess.call(["rm", "/tmp/" + dependencies])
s3 = boto3.client("s3")
s3.download_file(bucket, dependencies, "/tmp/" + dependencies)
subprocess.call(["unzip", "/tmp/" + dependencies, "-d", "/tmp"])
sys.path.append("/tmp/")
subprocess.call(["rm", "/tmp/" + dependencies])
# -----

# Load PODPAC

Expand All @@ -153,38 +231,40 @@ def handler(event, context, get_deps=True, ret_pipeline=False):
import podpac.datalib

# update podpac settings with inputs from the trigger
for key in settings_trigger:
settings[key] = settings_trigger[key]
settings = {**settings, **pipeline["settings"]}

if is_s3_trigger(event):
# build the Node and Coordinates
if trigger in ("eval", "S3"):
node = Node.from_definition(pipeline["pipeline"])
coords = Coordinates.from_json(json.dumps(pipeline["coordinates"], indent=4, cls=JSONEncoder))
fmt = pipeline["output"]["format"]
kwargs = pipeline["output"].copy()
kwargs.pop("format")

# TODO: handle "invoke" and "APIGateway" triggers explicitly
else:
coords = Coordinates.from_url(event["queryStringParameters"])
node = Node.from_url(event["queryStringParameters"])
fmt = _get_query_params_from_url(event["queryStringParameters"])["FORMAT"].split("/")[-1]
if fmt in ["png", "jpg", "jpeg"]:
kwargs["return_base64"] = True
# TODO: handle API Gateway better - is this always going to be WCS?
elif trigger == "APIGateway":
node = Node.from_url(pipeline["url"])
coords = Coordinates.from_url(pipeline["url"])

# make sure pipeline is allowed to be run
if "PODPAC_RESTRICT_PIPELINES" in os.environ:
whitelist = json.loads(os.environ["PODPAC_RESTRICT_PIPELINES"])
if node.hash not in whitelist:
raise ValueError("Node hash is not in the whitelist for this function")

# run analysis
output = node.eval(coords)

# FOR DEBUGGING
if ret_pipeline:
return node
# convert to output format
body = output.to_format(pipeline["output"]["format"], **pipeline["output"]["format_kwargs"])

body = output.to_format(fmt, *args, **kwargs)
# Response
if trigger == "eval":
return body

# output_filename only exists if this was an S3 triggered event.
if is_s3_trigger(event):
s3.put_object(Bucket=bucket, Key=output_filename, Body=body)
elif trigger == "S3":
s3.put_object(Bucket=settings["S3_BUCKET_NAME"], Key=pipeline["output"]["filename"], Body=body)

# TODO: handle "invoke" and "APIGateway" triggers explicitly
else:
elif trigger == "APIGateway":

# TODO: can we handle the deserialization better?
try:
json.dumps(body)
except Exception as e:
Expand Down
Loading

0 comments on commit 3c52a2b

Please sign in to comment.