Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/aws improvements #336

Merged
merged 16 commits into from
Nov 22, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dist/aws/aws_requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ h5py>=2.9
lxml>=4.2
pydap>=3.2
rasterio>=0.36
pyproj>=2.2
pyproj>=2.4
requests>=2.18
numexpr>=2.6
lazy-import>=0.2.2
Expand Down
296 changes: 188 additions & 108 deletions dist/aws/handler.py
Original file line number Diff line number Diff line change
@@ -1,144 +1,222 @@
"""
PODPAC AWS Handler

Attributes
----------
s3 : TYPE
Description
settings_json : dict
Description
"""
from __future__ import absolute_import, division, print_function, unicode_literals

import json
import subprocess
import sys
import urllib.parse as urllib
from six import string_types
from collections import OrderedDict

import _pickle as cPickle
import os

import boto3
import botocore

from six import string_types

def is_s3_trigger(event):
"""
Helper method to determine if the given event was triggered by an S3 event
def default_pipeline(pipeline=None):
"""Get default pipeline definiton, merging with input pipline if supplied

Parameters
----------
event : dict
Event dict from AWS. See [TODO: add link reference]
pipeline : dict, optional
Input pipline. Will fill in any missing defaults.

Returns
-------
Bool
True if the event is an S3 trigger
dict
pipeline dict
"""
return "Records" in event and event["Records"][0]["eventSource"] == "aws:s3"
defaults = {
"pipeline": {},
"settings": {},
"output": {"format": "netcdf", "filename": None, "format_kwargs": {}},
# API Gateway
"url": "",
"params": {},
}

# merge defaults with input pipelines, if supplied
if pipeline is not None:
pipeline = {**defaults, **pipeline}
pipeline["output"] = {**defaults["output"], **pipeline["output"]}
pipeline["settings"] = {**defaults["settings"], **pipeline["settings"]}
else:
pipeline = defaults

return pipeline

def handler(event, context, get_deps=True, ret_pipeline=False):
"""Lambda function handler

def get_trigger(event):
"""
Helper method to determine the trigger for the lambda invocation

Parameters
----------
event : TYPE
Description
context : TYPE
Description
get_deps : bool, optional
Description
ret_pipeline : bool, optional
Description
event : dict
Event dict from AWS. See [TODO: add link reference]

Returns
-------
TYPE
Description
str
One of "S3", "eval", or "APIGateway"
"""

print(event)
if "Records" in event and event["Records"][0]["eventSource"] == "aws:s3":
return "S3"
elif "queryStringParameters" in event:
return "APIGateway"
else:
return "eval"

# Add /tmp/ path to handle python path for dependencies
sys.path.append("/tmp/")

# get S3 client - note this will have the same access that the current "function role" does
s3 = boto3.client("s3")
def parse_event(trigger, event):
"""Parse pipeline, settings, and output details from event depending on trigger

Parameters
----------
trigger : str
One of "S3", "eval", or "APIGateway"
event : dict
Event dict from AWS. See [TODO: add link reference]
"""

args = []
kwargs = {}
if trigger == "eval":
print("Triggered by Invoke")

# This event was triggered by S3, so let's see if the requested output is already cached.
if is_s3_trigger(event):
# event is the pipeline, provide consistent pipeline defaults
pipeline = default_pipeline(event)

# We always have to look to the bucket that triggered the event for the input
bucket = event["Records"][0]["s3"]["bucket"]["name"]
return pipeline

file_key = urllib.unquote_plus(event["Records"][0]["s3"]["object"]["key"])
_json = ""
elif trigger == "S3":
print("Triggered from S3")

# get boto s3 client
s3 = boto3.client("s3")

# We always have to look to the bucket that triggered the event for the input
triggered_bucket = event["Records"][0]["s3"]["bucket"]["name"]

# get the pipeline object and read
pipeline_obj = s3.get_object(Bucket=bucket, Key=file_key)
pipeline = json.loads(pipeline_obj["Body"].read().decode("utf-8"))
file_key = urllib.unquote_plus(event["Records"][0]["s3"]["object"]["key"])
pipline_obj = s3.get_object(Bucket=triggered_bucket, Key=file_key)
pipeline = json.loads(pipline_obj["Body"].read().decode("utf-8"))

# We can return if there is a valid cached output to save compute time.
settings_trigger = pipeline["settings"]
# provide consistent pipeline defaults
pipeline = default_pipeline(pipeline)

# check for cached output
cached = False
output_filename = file_key.replace(".json", "." + pipeline["output"]["format"]).replace(
settings_trigger["FUNCTION_S3_INPUT"], settings_trigger["FUNCTION_S3_OUTPUT"]
# create output filename
pipeline["output"]["filename"] = file_key.replace(".json", "." + pipeline["output"]["format"]).replace(
pipeline["settings"]["FUNCTION_S3_INPUT"], pipeline["settings"]["FUNCTION_S3_OUTPUT"]
)

try:
s3.head_object(Bucket=bucket, Key=output_filename)
if not pipeline["settings"]["FUNCTION_FORCE_COMPUTE"]:

# get configured s3 bucket to check for cache
bucket = pipeline["settings"]["S3_BUCKET_NAME"]

# We can return if there is a valid cached output to save compute time.
try:
s3.head_object(Bucket=bucket, Key=pipeline["output"]["filename"])
return None

# throws ClientError if no file is found
except botocore.exceptions.ClientError:
pass

# return pipeline definition
return pipeline

elif trigger == "APIGateway":
print("Triggered from API Gateway")

pipeline = default_pipeline()
pipeline["url"] = event["queryStringParameters"]
if isinstance(pipeline["url"], string_types):
pipeline["url"] = urllib.parse_qs(urllib.urlparse(pipeline["url"]).query)
pipeline["params"] = pipeline["url"]

# look for specific parameter definitions in query parameters
for param in pipeline["params"]:

# handle SETTINGS in query parameters
if param == "settings":
# Try loading this settings string into a dict to merge with default settings
try:
api_settings = json.loads(pipeline["params"][param])
# If we get here, the api settings were loaded
pipeline["settings"] = {**pipeline["settings"], **api_settings}
except Exception as e:
print("Got an exception when attempting to load api settings: ", e)
print(pipeline)


# handle OUTPUT in query parameters
elif param == "output":
pipeline["output"] = pipeline["params"][param]

# handle FORMAT in query parameters
elif param == "format":
pipeline["output"][param] = pipeline["params"][param].split("/")[-1]
# handle image returns
if pipeline["output"][param] in ["png", "jpg", "jpeg"]:
pipeline["output"]["format_kwargs"]["return_base64"] = True

return pipeline

else:
raise Exception("Unsupported trigger")

# Object exists, so we don't have to recompute
# TODO: the "force_compute" parameter will never work as is written in aws.py
if not pipeline.get("force_compute", False):
cached = True

except botocore.exceptions.ClientError:
pass
def handler(event, context):
"""Lambda function handler

Parameters
----------
event : dict
Description
context : TYPE
Description
get_deps : bool, optional
Description
ret_pipeline : bool, optional
Description
"""
print(event)

# Add /tmp/ path to handle python path for dependencies
sys.path.append("/tmp/")

if cached:
return
# handle triggers
trigger = get_trigger(event)

# TODO: handle "invoke" and "APIGateway" triggers explicitly
else:
print("Not triggered by S3")
# parse event
pipeline = parse_event(trigger, event)

url = event["queryStringParameters"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like it was in the porting of this block that we lost the isinstance check.

if isinstance(url, string_types):
url = urllib.parse_qs(urllib.urlparse(url).query)
# bail if we can't parse
if pipeline is None:
return

# Capitalize the keywords for consistency
settings_trigger = {}
for k in url:
if k.upper() == "SETTINGS":
settings_trigger = url[k]
bucket = settings_trigger["S3_BUCKET_NAME"]
# -----
# TODO: remove when layers is configured
# get configured bucket to download dependencies
bucket = pipeline["settings"]["S3_BUCKET_NAME"]

# get dependencies path
if "FUNCTION_DEPENDENCIES_KEY" in settings_trigger:
dependencies = settings_trigger["FUNCTION_DEPENDENCIES_KEY"]
if "FUNCTION_DEPENDENCIES_KEY" in pipeline["settings"]:
dependencies = pipeline["settings"]["FUNCTION_DEPENDENCIES_KEY"]
else:
# TODO: this could be a problem, since we can't import podpac settings yet
# which means the input event might have to include the version or
# "FUNCTION_DEPENDENCIES_KEY".
dependencies = "podpac_deps_{}.zip".format(
settings_trigger["PODPAC_VERSION"]
pipeline["settings"]["PODPAC_VERSION"]
) # this should be equivalent to version.semver()

# Download dependencies from specific bucket/object
if get_deps:
s3.download_file(bucket, dependencies, "/tmp/" + dependencies)
subprocess.call(["unzip", "/tmp/" + dependencies, "-d", "/tmp"])
sys.path.append("/tmp/")
subprocess.call(["rm", "/tmp/" + dependencies])
s3 = boto3.client("s3")
s3.download_file(bucket, dependencies, "/tmp/" + dependencies)
subprocess.call(["unzip", "/tmp/" + dependencies, "-d", "/tmp"])
sys.path.append("/tmp/")
subprocess.call(["rm", "/tmp/" + dependencies])
# -----

# Load PODPAC

Expand All @@ -153,38 +231,40 @@ def handler(event, context, get_deps=True, ret_pipeline=False):
import podpac.datalib

# update podpac settings with inputs from the trigger
for key in settings_trigger:
settings[key] = settings_trigger[key]
settings = {**settings, **pipeline["settings"]}

if is_s3_trigger(event):
# build the Node and Coordinates
if trigger in ("eval", "S3"):
node = Node.from_definition(pipeline["pipeline"])
coords = Coordinates.from_json(json.dumps(pipeline["coordinates"], indent=4, cls=JSONEncoder))
fmt = pipeline["output"]["format"]
kwargs = pipeline["output"].copy()
kwargs.pop("format")

# TODO: handle "invoke" and "APIGateway" triggers explicitly
else:
coords = Coordinates.from_url(event["queryStringParameters"])
node = Node.from_url(event["queryStringParameters"])
fmt = _get_query_params_from_url(event["queryStringParameters"])["FORMAT"].split("/")[-1]
if fmt in ["png", "jpg", "jpeg"]:
kwargs["return_base64"] = True
# TODO: handle API Gateway better - is this always going to be WCS?
elif trigger == "APIGateway":
node = Node.from_url(pipeline["url"])
coords = Coordinates.from_url(pipeline["url"])

# make sure pipeline is allowed to be run
if "PODPAC_RESTRICT_PIPELINES" in os.environ:
whitelist = json.loads(os.environ["PODPAC_RESTRICT_PIPELINES"])
if node.hash not in whitelist:
raise ValueError("Node hash is not in the whitelist for this function")

# run analysis
output = node.eval(coords)

# FOR DEBUGGING
if ret_pipeline:
return node
# convert to output format
body = output.to_format(pipeline["output"]["format"], **pipeline["output"]["format_kwargs"])

body = output.to_format(fmt, *args, **kwargs)
# Response
if trigger == "eval":
return body

# output_filename only exists if this was an S3 triggered event.
if is_s3_trigger(event):
s3.put_object(Bucket=bucket, Key=output_filename, Body=body)
elif trigger == "S3":
s3.put_object(Bucket=settings["S3_BUCKET_NAME"], Key=pipeline["output"]["filename"], Body=body)

# TODO: handle "invoke" and "APIGateway" triggers explicitly
else:
elif trigger == "APIGateway":

# TODO: can we handle the deserialization better?
try:
json.dumps(body)
except Exception as e:
Expand Down
Loading