diff --git a/dist/aws/handler.py b/dist/aws/handler.py index ebeb931eb..45f763837 100644 --- a/dist/aws/handler.py +++ b/dist/aws/handler.py @@ -14,6 +14,7 @@ import subprocess import sys import urllib.parse as urllib +from six import string_types from collections import OrderedDict import _pickle as cPickle @@ -21,49 +22,59 @@ import boto3 import botocore -# Add /tmp/ path to handle python path for dependencies -sys.path.append("/tmp/") - -# TODO: move to s3fs for simpler access -# not sure if this is possible? -# continue to assume that lambda handler has access to all s3 resources -s3 = boto3.client("s3") +""" +Helper method to determine if the given event was triggered by an S3 event +""" +def is_s3_trigger(event): + return "Records" in event and event["Records"][0]["eventSource"] == "aws:s3" -# TODO: handle settings from within the API gateway call or dynamically from bucket -# this is currently bundled with lambda function -settings_json = {} # default to empty -try: - # currently loaded into lambda function - with open("settings.json", "r") as f: - settings_json = json.load(f) +""" +Helper method to merge two local settings json/dict objects, then update the `PodpacSettings`. - print (settings_json) # TODO: remove -except: - pass +The settings variable here is podpac.settings. +""" +def update_podpac_settings(old_settings_json, new_settings_json): + updated_settings = {**old_settings_json, **new_settings_json} + for key in updated_settings: + settings[key] = updated_settings[key] +""" +Returns true if the requested output is already computed (and force_compute is false.) +""" +def check_for_cached_output(input_file_key, pipeline, settings_json, bucket): + output_filename = input_file_key.replace(".json", "." + pipeline["output"]["format"]) + output_filename = output_filename.replace(settings_json["S3_INPUT_FOLDER"], settings_json["S3_OUTPUT_FOLDER"]) + try: + s3.head_object(Bucket=bucket, Key=output_filename) + # Object exists, so we don't have to recompute + if not pipeline.get("force_compute", False): + return True, output_filename + except botocore.exceptions.ClientError as e: + if e.response["Error"]["Code"] == "404": + # It does not exist, so we should proceed + return False, output_filename + # Something else has gone wrong... not handling this case. + return False, output_filename + return False, output_filename def handler(event, context, get_deps=True, ret_pipeline=False): - # TODO: remove - print (event) + # Add /tmp/ path to handle python path for dependencies + sys.path.append("/tmp/") - # get bucket path - # this should be defined by S3 trigger or API gateway trigger - bucket = settings_json["S3_BUCKET_NAME"] + # TODO: move to s3fs for simpler access + # not sure if this is possible? + # continue to assume that lambda handler has access to all s3 resources + s3 = boto3.client("s3") - # get dependencies path - if "FUNCTION_DEPENDENCIES_KEY" in settings_json: - dependencies = settings_json["FUNCTION_DEPENDENCIES_KEY"] - else: - dependencies = "podpac_deps_{}.zip".format( - settings_json["PODPAC_VERSION"] - ) # this should be equivalent to version.semver() + args = [] + kwargs = {} - if "Records" in event and event["Records"][0]["eventSource"] == "aws:s3": - # + # This event was triggered by S3, so let's see if the requested output is already cached. + if is_s3_trigger(event): - # TODO: get the name of the calling bucket from event for s3 triggers? - # bucket = event['Records'][0]['s3']['bucket']['name'] + # We always have to look to the bucket that triggered the event for the input + bucket = event['Records'][0]['s3']['bucket']['name'] file_key = urllib.unquote_plus(event["Records"][0]["s3"]["object"]["key"]) _json = "" @@ -78,15 +89,38 @@ def handler(event, context, get_deps=True, ret_pipeline=False): _json += "\n" _json += r.decode() pipeline = json.loads(_json, object_pairs_hook=OrderedDict) + + # We can return if there is a valid cached output to save compute time. + settings_json = pipeline["settings"] + cached, output_filename = check_for_cached_output(file_key, pipeline, settings_json, bucket) + if cached: + return else: - # elif ('pathParameters' in event and event['pathParameters'] is not None and 'proxy' in event['pathParameters']) or ('authorizationToken' in event and event['authorizationToken'] == "incoming-client-token"): - # TODO: Need to get the pipeline from the event... - print ("DSullivan: we have an API Gateway event") - pipeline = None + print ("DSullivan: we have an API Gateway event. Will now get deps in order to proceed.") + url = event["queryStringParameters"] + if isinstance(url, string_types): + url = urllib.parse_qs(urllib.urlparse(url).query) + + # Capitalize the keywords for consistency + settings_json = {} + for k in url: + if k.upper() == "SETTINGS": + settings_json = url[k] + bucket = settings_json["S3_BUCKET_NAME"] + # get dependencies path + if "FUNCTION_DEPENDENCIES_KEY" in settings_json: + dependencies = settings_json["FUNCTION_DEPENDENCIES_KEY"] + else: + # TODO: this could be a problem, since we can't import podpac settings yet + # which means the input event might have to include the version or + # "FUNCTION_DEPENDENCIES_KEY". + dependencies = "podpac_deps_{}.zip".format( + settings_json["PODPAC_VERSION"] + ) # this should be equivalent to version.semver() # Download dependencies from specific bucket/object if get_deps: - s3.download_file(bucket, "podpac/" + dependencies, "/tmp/" + dependencies) + s3.download_file(bucket, dependencies, "/tmp/" + dependencies) subprocess.call(["unzip", "/tmp/" + dependencies, "-d", "/tmp"]) sys.path.append("/tmp/") subprocess.call(["rm", "/tmp/" + dependencies]) @@ -101,38 +135,18 @@ def handler(event, context, get_deps=True, ret_pipeline=False): from podpac.core.utils import JSONEncoder, _get_query_params_from_url import podpac.datalib - # check if file exists - if pipeline is not None: - filename = file_key.replace(".json", "." + pipeline["output"]["format"]) - filename = filename.replace(settings["S3_INPUT_FOLDER"], settings["S3_OUTPUT_FOLDER"]) - try: - s3.head_object(Bucket=bucket, Key=filename) - # Object exists, so we don't have to recompute - if not pipeline.get("force_compute", False): - return - except botocore.exceptions.ClientError as e: - if e.response["Error"]["Code"] == "404": - # It does not exist, so we should proceed - pass - else: - # Something else has gone wrong... not handling this case. - pass + try: + update_podpac_settings(settings, settings_json) + except Exception: + print("The settings could not be updated.") - args = [] - kwargs = {} - # if from S3 trigger - if pipeline is not None: - # if 'Records' in event and event['Records'][0]['eventSource'] == 'aws:s3': + if is_s3_trigger(event): node = Node.from_definition(pipeline["pipeline"]) coords = Coordinates.from_json(json.dumps(pipeline["coordinates"], indent=4, cls=JSONEncoder)) fmt = pipeline["output"]["format"] kwargs = pipeline["output"].copy() kwargs.pop("format") - - # else from api gateway and it's a WMS/WCS request else: - # elif ('pathParameters' in event and event['pathParameters'] is not None and 'proxy' in event['pathParameters']) or ('authorizationToken' in event and event['authorizationToken'] == "incoming-client-token"): - print (_get_query_params_from_url(event["queryStringParameters"])) coords = Coordinates.from_url(event["queryStringParameters"]) node = Node.from_url(event["queryStringParameters"]) fmt = _get_query_params_from_url(event["queryStringParameters"])["FORMAT"].split("/")[-1] @@ -144,8 +158,10 @@ def handler(event, context, get_deps=True, ret_pipeline=False): return node body = output.to_format(fmt, *args, **kwargs) - if pipeline is not None: - s3.put_object(Bucket=bucket, Key=filename, Body=body) + + # output_filename only exists if this was an S3 triggered event. + if output_filename is not None: + s3.put_object(Bucket=bucket, Key=output_filename, Body=body) else: try: json.dumps(body) diff --git a/doc/source/developer/aws.md b/doc/source/developer/aws.md index 1f1dd725d..aabd06881 100644 --- a/doc/source/developer/aws.md +++ b/doc/source/developer/aws.md @@ -1,5 +1,7 @@ # AWS Lambda # +TODO: This needs rewritten. + Podpac includes a package to create an Amazon Web Services Lambda function to execute nodes in a server-less environment. This package can be altered to handle events according to the developer's use case. ## AWS Architecture ## diff --git a/podpac/core/managers/aws.py b/podpac/core/managers/aws.py index dc47237de..21adedfba 100644 --- a/podpac/core/managers/aws.py +++ b/podpac/core/managers/aws.py @@ -1168,6 +1168,8 @@ def get_bucket(session, bucket_name): # init empty object bucket = {"name": bucket_name} + # TODO: this is usually none, even though the bucket has a region. It could either be a bug + # in getting the region/LocationConstraint, or just misleading # get location constraint. this will be None for no location constraint bucket["region"] = s3.get_bucket_location(Bucket=bucket_name)["LocationConstraint"]