From 758f0e961f20944528490fca4fea8e880a2818a3 Mon Sep 17 00:00:00 2001 From: Marc Shapiro Date: Mon, 11 Nov 2019 15:21:02 -0500 Subject: [PATCH 01/12] ENH: add eval_invoke template --- podpac/core/managers/aws.py | 55 +++++++++++++++++++++++++++++-------- 1 file changed, 44 insertions(+), 11 deletions(-) diff --git a/podpac/core/managers/aws.py b/podpac/core/managers/aws.py index 323456328..203d9a095 100644 --- a/podpac/core/managers/aws.py +++ b/podpac/core/managers/aws.py @@ -321,9 +321,9 @@ def _function_api_tags_default(self): return self.function_tags # podpac node parameters - source = tl.Instance(Node, help="Node to evaluate in a Lambda function.", allow_none=True).tag(attr=True) - source_output_format = tl.Unicode(default_value="pkl", help="Output format.") - source_output_name = tl.Unicode(help="Image output name.") + source = tl.Instance(Node, allow_none=True).tag(attr=True) + source_output_format = tl.Unicode(default_value="netcdf") + source_output_name = tl.Unicode() attrs = tl.Dict() # TODO: are we still using this? download_result = tl.Bool(True).tag(attr=True) @@ -331,7 +331,6 @@ def _function_api_tags_default(self): def _source_output_name_default(self): return self.source.__class__.__name__ - # TODO: are this still being used? @property def pipeline(self): """ @@ -1134,6 +1133,36 @@ def _set_api(self, api): # store a copy of the whole response from AWS self._api = api + def _eval_invoke(self, coordinates, output=None): + """eval node through invoke trigger""" + _log.debug("Evaluating pipeline via invoke") + + # add coordinates to the pipeline + pipeline = self.pipeline + pipeline["coordinates"] = json.loads(coordinates.json) + pipeline["settings"] = settings.copy() + pipeline["settings"][ + "FUNCTION_DEPENDENCIES_KEY" + ] = self.function_s3_dependencies_key # overwrite in case this is specified explicitly by class + + # create lambda client + awslambda = self.session.client("lambda") + + # invoke + payload = bytes(json.dumps(pipeline, indent=4, cls=JSONEncoder).encode("UTF-8")) + response = awslambda.invoke( + FunctionName=self.function_name, + LogType="Tail", # include the execution log in the response. + Payload=payload, + ) + print(response) + + # After waiting, load the pickle file like this: + _log.debug("Received response from lambda function") + payload = response["Payload"].read() + self._output = UnitsDataArray(payload) + return self._output + def _eval_s3(self, coordinates, output=None): """Evaluate node through s3 trigger""" @@ -1142,15 +1171,19 @@ def _eval_s3(self, coordinates, output=None): input_folder = "{}{}".format(self.function_s3_input, "/" if not self.function_s3_input.endswith("/") else "") output_folder = "{}{}".format(self.function_s3_output, "/" if not self.function_s3_output.endswith("/") else "") - # add coordinates to the pipeline + # add coordinates and settings to the pipeline pipeline = self.pipeline pipeline["coordinates"] = json.loads(coordinates.json) - pipeline["settings"] = { - "FUNCTION_S3_INPUT": input_folder, - "FUNCTION_S3_OUTPUT": output_folder, - "FUNCTION_DEPENDENCIES_KEY": self.function_s3_dependencies_key, - "UNSAFE_EVAL_HASH": settings["UNSAFE_EVAL_HASH"], - } + pipeline["settings"] = settings.copy() + pipeline["settings"][ + "FUNCTION_DEPENDENCIES_KEY" + ] = self.function_s3_dependencies_key # overwrite in case this is specified explicitly by class + pipeline["settings"][ + "FUNCTION_S3_INPUT" + ] = input_folder # overwrite in case this is specified explicitly by class + pipeline["settings"][ + "FUNCTION_S3_OUTPUT" + ] = output_folder # overwrite in case this is specified explicitly by class # filename filename = "{folder}{output}_{source}_{coordinates}.{suffix}".format( From bf6462cc0078a1b56e000715d74544e40ef07005 Mon Sep 17 00:00:00 2001 From: Marc Shapiro Date: Mon, 11 Nov 2019 16:51:42 -0500 Subject: [PATCH 02/12] CHECKPOINT: improving handler and aws manager to be more consistent and easy to update --- dist/aws/handler.py | 227 +++++++++++++++++++++--------------- podpac/core/managers/aws.py | 47 +++++--- 2 files changed, 162 insertions(+), 112 deletions(-) diff --git a/dist/aws/handler.py b/dist/aws/handler.py index cfafccfce..d083ed860 100644 --- a/dist/aws/handler.py +++ b/dist/aws/handler.py @@ -23,9 +23,9 @@ import botocore -def is_s3_trigger(event): +def get_trigger(event): """ - Helper method to determine if the given event was triggered by an S3 event + Helper method to determine the trigger for the lambda invocation Parameters ---------- @@ -34,111 +34,145 @@ def is_s3_trigger(event): Returns ------- - Bool - True if the event is an S3 trigger + str + One of "S3", "eval", or "APIGateway" """ - return "Records" in event and event["Records"][0]["eventSource"] == "aws:s3" + if "Records" in event and event["Records"][0]["eventSource"] == "aws:s3": + return "S3" + elif "queryStringParameters" in event: + return "APIGateway" + else: + return "eval" -def handler(event, context, get_deps=True, ret_pipeline=False): - """Lambda function handler + +def parse_event(trigger, event): + """Parse pipeline, settings, and output details from event depending on trigger Parameters ---------- - event : TYPE - Description - context : TYPE - Description - get_deps : bool, optional - Description - ret_pipeline : bool, optional - Description - - Returns - ------- - TYPE - Description + trigger : str + One of "S3", "eval", or "APIGateway" + event : dict + Event dict from AWS. See [TODO: add link reference] """ - print(event) - - # Add /tmp/ path to handle python path for dependencies - sys.path.append("/tmp/") + if trigger == "eval": + print("Triggered by Invoke") - # get S3 client - note this will have the same access that the current "function role" does - s3 = boto3.client("s3") + # TODO: implement + return None + elif trigger == "S3": + print("Triggered from S3") - args = [] - kwargs = {} - - # This event was triggered by S3, so let's see if the requested output is already cached. - if is_s3_trigger(event): + # get boto s3 client + s3 = boto3.client("s3") # We always have to look to the bucket that triggered the event for the input - bucket = event["Records"][0]["s3"]["bucket"]["name"] - - file_key = urllib.unquote_plus(event["Records"][0]["s3"]["object"]["key"]) - _json = "" + triggered_bucket = event["Records"][0]["s3"]["bucket"]["name"] # get the pipeline object and read - pipeline_obj = s3.get_object(Bucket=bucket, Key=file_key) - pipeline = json.loads(pipeline_obj["Body"].read().decode("utf-8")) - - # We can return if there is a valid cached output to save compute time. - settings_trigger = pipeline["settings"] + file_key = urllib.unquote_plus(event["Records"][0]["s3"]["object"]["key"]) + pipline_obj = s3.get_object(Bucket=triggered_bucket, Key=file_key) + pipeline = json.loads(pipline_obj["Body"].read().decode("utf-8")) - # check for cached output - cached = False - output_filename = file_key.replace(".json", "." + pipeline["output"]["format"]).replace( - settings_trigger["FUNCTION_S3_INPUT"], settings_trigger["FUNCTION_S3_OUTPUT"] + # create output filename + pipeline["output_filename"] = file_key.replace(".json", "." + pipeline["output"]["format"]).replace( + pipeline["settings"]["FUNCTION_S3_INPUT"], pipeline["settings"]["FUNCTION_S3_OUTPUT"] ) - try: - s3.head_object(Bucket=bucket, Key=output_filename) + if not pipeline["settings"]["force_compute"]: + # get configured s3 bucket to check for cache + bucket = pipeline["settings"]["S3_BUCKET_NAME"] - # Object exists, so we don't have to recompute - # TODO: the "force_compute" parameter will never work as is written in aws.py - if not pipeline.get("force_compute", False): - cached = True + # We can return if there is a valid cached output to save compute time. + try: + s3.head_object(Bucket=bucket, Key=pipeline["output_filename"]) + return None - except botocore.exceptions.ClientError: - pass + # throws ClientError if no file is found + except botocore.exceptions.ClientError: + pass - if cached: - return + # return pipeline definition + return pipeline - # TODO: handle "invoke" and "APIGateway" triggers explicitly - else: - print("Not triggered by S3") + elif trigger == "APIGateway": + print("Triggered from API Gateway") url = event["queryStringParameters"] if isinstance(url, string_types): url = urllib.parse_qs(urllib.urlparse(url).query) # Capitalize the keywords for consistency - settings_trigger = {} - for k in url: - if k.upper() == "SETTINGS": - settings_trigger = url[k] - bucket = settings_trigger["S3_BUCKET_NAME"] + pipeline = {} + for param in url: + + if param.upper() == "PIPELINE": + pipeline["pipeline"] = url[param] + + # TODO: do we still need this? will overwrite pipeline above + if param.upper() == "SETTINGS": + pipeline["settings"] = url[param] + + if param.upper() == "OUTPUT": + pipeline["output"] = url[param] + + return pipeline + + else: + raise Exception("Unsupported trigger") + + +def handler(event, context, get_deps=True, ret_pipeline=False): + """Lambda function handler + + Parameters + ---------- + event : dict + Description + context : TYPE + Description + get_deps : bool, optional + Description + ret_pipeline : bool, optional + Description + """ + print(event) + + # Add /tmp/ path to handle python path for dependencies + sys.path.append("/tmp/") + + # handle triggers + trigger = get_trigger(event) + + # parse event + pipeline = parse_event(trigger, event) + + # bail if we can't parse + if pipeline is None: + return + + # ----- + # TODO: remove when layers is configured + # get configured bucket to download dependencies + bucket = pipeline["settings"]["S3_BUCKET_NAME"] # get dependencies path - if "FUNCTION_DEPENDENCIES_KEY" in settings_trigger: - dependencies = settings_trigger["FUNCTION_DEPENDENCIES_KEY"] + if "FUNCTION_DEPENDENCIES_KEY" in pipeline["settings"]: + dependencies = pipeline["settings"]["FUNCTION_DEPENDENCIES_KEY"] else: - # TODO: this could be a problem, since we can't import podpac settings yet - # which means the input event might have to include the version or - # "FUNCTION_DEPENDENCIES_KEY". dependencies = "podpac_deps_{}.zip".format( - settings_trigger["PODPAC_VERSION"] + pipeline["settings"]["PODPAC_VERSION"] ) # this should be equivalent to version.semver() # Download dependencies from specific bucket/object - if get_deps: - s3.download_file(bucket, dependencies, "/tmp/" + dependencies) - subprocess.call(["unzip", "/tmp/" + dependencies, "-d", "/tmp"]) - sys.path.append("/tmp/") - subprocess.call(["rm", "/tmp/" + dependencies]) + s3 = boto3.client("s3") + s3.download_file(bucket, dependencies, "/tmp/" + dependencies) + subprocess.call(["unzip", "/tmp/" + dependencies, "-d", "/tmp"]) + sys.path.append("/tmp/") + subprocess.call(["rm", "/tmp/" + dependencies]) + # ----- # Load PODPAC @@ -153,38 +187,45 @@ def handler(event, context, get_deps=True, ret_pipeline=False): import podpac.datalib # update podpac settings with inputs from the trigger - for key in settings_trigger: - settings[key] = settings_trigger[key] + for key in pipeline["settings"]: + settings[key] = pipeline["settings"][key] + + # TODO: load this into pipeline["output"] + kwargs = {} + if trigger == "eval": + pass - if is_s3_trigger(event): + elif trigger == "S3": node = Node.from_definition(pipeline["pipeline"]) coords = Coordinates.from_json(json.dumps(pipeline["coordinates"], indent=4, cls=JSONEncoder)) fmt = pipeline["output"]["format"] kwargs = pipeline["output"].copy() - kwargs.pop("format") + kwargs.pop("format") # get rid of format - # TODO: handle "invoke" and "APIGateway" triggers explicitly - else: - coords = Coordinates.from_url(event["queryStringParameters"]) + # TODO: handle API Gateway better - is this always going to be WCS? + elif trigger == "APIGateway": + # TODO: handle this in the parser above - not sure what the spec should be here node = Node.from_url(event["queryStringParameters"]) + coords = Coordinates.from_url(event["queryStringParameters"]) fmt = _get_query_params_from_url(event["queryStringParameters"])["FORMAT"].split("/")[-1] if fmt in ["png", "jpg", "jpeg"]: kwargs["return_base64"] = True + # run analysis and covert to output format output = node.eval(coords) - - # FOR DEBUGGING - if ret_pipeline: - return node - - body = output.to_format(fmt, *args, **kwargs) - - # output_filename only exists if this was an S3 triggered event. - if is_s3_trigger(event): - s3.put_object(Bucket=bucket, Key=output_filename, Body=body) - - # TODO: handle "invoke" and "APIGateway" triggers explicitly - else: + body = output.to_format(fmt, **kwargs) + + # ######## + # Response + # ######## + if trigger == "eval": + pass + # return {"statusCode": 200, "headers": {"Content-Type": "image/png"}, "isBase64Encoded": True, "body": body} + elif trigger == "S3": + s3.put_object(Bucket=settings["S3_BUCKET_NAME"], Key=pipeline["output_filename"], Body=body) + + elif trigger == "APIGateway": + # TODO: can we handle the deserialization better? try: json.dumps(body) except Exception as e: diff --git a/podpac/core/managers/aws.py b/podpac/core/managers/aws.py index 203d9a095..489da2fb1 100644 --- a/podpac/core/managers/aws.py +++ b/podpac/core/managers/aws.py @@ -14,6 +14,7 @@ import traitlets as tl import numpy as np +from podpac.core.units import open_dataarray from podpac.core.settings import settings from podpac.core.node import COMMON_NODE_DOC, Node from podpac.core.utils import common_doc, JSONEncoder @@ -326,6 +327,7 @@ def _function_api_tags_default(self): source_output_name = tl.Unicode() attrs = tl.Dict() # TODO: are we still using this? download_result = tl.Bool(True).tag(attr=True) + force_compute = tl.Bool(default_value=False).tag(attr=True) @tl.default("source_output_name") def _source_output_name_default(self): @@ -352,10 +354,14 @@ def eval(self, coordinates, output=None): if self.source is None: raise ValueError("'source' node must be defined to eval") - if self.function_eval_trigger == "eval" or self.function_eval_trigger == "S3": - return self._eval_s3(coordinates, output=None) - else: + if self.function_eval_trigger == "eval": + return self._eval_invoke(coordinates, output) + elif self.function_eval_trigger == "S3": + return self._eval_s3(coordinates, output) + elif self.function_eval_trigger == "APIGateway": raise NotImplementedError("APIGateway trigger not yet implemented through eval") + else: + raise ValueError("Function trigger is not one of 'eval', 'S3', or 'APIGateway'") def build(self): """Build Lambda function and associated resources on AWS @@ -835,8 +841,7 @@ def create_bucket(self): copy_source = {"Bucket": self.function_source_bucket, "Key": self.function_source_dependencies_key} s3resource.meta.client.copy(copy_source, self.function_s3_bucket, self.function_s3_dependencies_key) - # TODO: remove eval from here once we implement "eval" through invoke - if "eval" in self.function_triggers or "S3" in self.function_triggers: + if "S3" in self.function_triggers: # add permission to invoke call lambda - this feels brittle due to source_arn statement_id = re.sub("[-_.]", "", self.function_s3_bucket) principle = "s3.amazonaws.com" @@ -1133,18 +1138,26 @@ def _set_api(self, api): # store a copy of the whole response from AWS self._api = api - def _eval_invoke(self, coordinates, output=None): - """eval node through invoke trigger""" - _log.debug("Evaluating pipeline via invoke") + def _create_eval_pipeline(self, coordinates): + """shorthand to create pipeline on eval""" # add coordinates to the pipeline - pipeline = self.pipeline + pipeline = self.pipeline # contains "pipeline" and "output" keys pipeline["coordinates"] = json.loads(coordinates.json) - pipeline["settings"] = settings.copy() + pipeline["settings"] = settings.copy() # TODO: we should not wholesale copy settings here !! pipeline["settings"][ "FUNCTION_DEPENDENCIES_KEY" ] = self.function_s3_dependencies_key # overwrite in case this is specified explicitly by class + return pipeline + + def _eval_invoke(self, coordinates, output=None): + """eval node through invoke trigger""" + _log.debug("Evaluating pipeline via invoke") + + # create eval pipeline + pipeline = self._create_eval_pipeline(coordinates) + # create lambda client awslambda = self.session.client("lambda") @@ -1160,7 +1173,7 @@ def _eval_invoke(self, coordinates, output=None): # After waiting, load the pickle file like this: _log.debug("Received response from lambda function") payload = response["Payload"].read() - self._output = UnitsDataArray(payload) + self._output = open_dataarray(payload) return self._output def _eval_s3(self, coordinates, output=None): @@ -1171,13 +1184,9 @@ def _eval_s3(self, coordinates, output=None): input_folder = "{}{}".format(self.function_s3_input, "/" if not self.function_s3_input.endswith("/") else "") output_folder = "{}{}".format(self.function_s3_output, "/" if not self.function_s3_output.endswith("/") else "") - # add coordinates and settings to the pipeline - pipeline = self.pipeline - pipeline["coordinates"] = json.loads(coordinates.json) - pipeline["settings"] = settings.copy() - pipeline["settings"][ - "FUNCTION_DEPENDENCIES_KEY" - ] = self.function_s3_dependencies_key # overwrite in case this is specified explicitly by class + # create eval pipeline + pipeline = self._create_eval_pipeline(coordinates) + pipeline["settings"]["force_compute"] = self.force_compute pipeline["settings"][ "FUNCTION_S3_INPUT" ] = input_folder # overwrite in case this is specified explicitly by class @@ -1227,7 +1236,7 @@ def _eval_s3(self, coordinates, output=None): _log.debug("Received response from lambda function") response = s3.get_object(Key=filename, Bucket=self.function_s3_bucket) body = response["Body"].read() - self._output = cPickle.loads(body) + self._output = open_dataarray(body) return self._output def _eval_api(self, coordinates, output=None): From 175e5f980993ecdecc0794b7a736f725d8201040 Mon Sep 17 00:00:00 2001 From: Marc Shapiro Date: Tue, 12 Nov 2019 17:21:06 -0500 Subject: [PATCH 03/12] FIX: handler/aws improvements Lambda node by default uses "invoke" instead of S3. Includes handler improvements to consolidate the "pipeline" data model that is passed amongst the various triggers. Data is passed back and forth via netcdf. API Gateway and S3 triggers still needs testing. In the future, I think we should deprecate the "eval_s3" and "eval_api" methods since these are no longer relevant. We should keep the ability to add extra triggers for S3 and APIGateway since there may be other use cases that need this, but the PODPAC only evaluation will be done through invoke. --- dist/aws/handler.py | 134 +++++++++++++++++++-------------- podpac/core/managers/aws.py | 143 ++++++++++++++++++++++++++---------- podpac/core/settings.py | 3 + 3 files changed, 187 insertions(+), 93 deletions(-) diff --git a/dist/aws/handler.py b/dist/aws/handler.py index d083ed860..02bf896b9 100644 --- a/dist/aws/handler.py +++ b/dist/aws/handler.py @@ -1,30 +1,53 @@ """ PODPAC AWS Handler - -Attributes ----------- -s3 : TYPE - Description -settings_json : dict - Description """ -from __future__ import absolute_import, division, print_function, unicode_literals import json import subprocess import sys import urllib.parse as urllib -from six import string_types from collections import OrderedDict -import _pickle as cPickle import boto3 import botocore -def get_trigger(event): +def default_pipeline(pipeline=None): + """Get default pipeline definiton, merging with input pipline if supplied + + Parameters + ---------- + pipeline : dict, optional + Input pipline. Will fill in any missing defaults. + + Returns + ------- + dict + pipeline dict """ + defaults = { + "pipeline": {}, + "settings": {}, + "output": {"format": "netcdf", "filename": None, "format_kwargs": {}}, + # API Gateway + "url": "", + "params": {}, + } + + # merge defaults with input pipelines, if supplied + if pipeline is not None: + pipeline = {**defaults, **pipeline} + pipeline["output"] = {**defaults["output"], **pipeline["output"]} + pipeline["settings"] = {**defaults["settings"], **pipeline["settings"]} + else: + pipeline = defaults + + return pipeline + + +def get_trigger(event): + """ Helper method to determine the trigger for the lambda invocation Parameters @@ -60,8 +83,11 @@ def parse_event(trigger, event): if trigger == "eval": print("Triggered by Invoke") - # TODO: implement - return None + # event is the pipeline, provide consistent pipeline defaults + pipeline = default_pipeline(event) + + return pipeline + elif trigger == "S3": print("Triggered from S3") @@ -76,18 +102,22 @@ def parse_event(trigger, event): pipline_obj = s3.get_object(Bucket=triggered_bucket, Key=file_key) pipeline = json.loads(pipline_obj["Body"].read().decode("utf-8")) + # provide consistent pipeline defaults + pipeline = default_pipeline(pipeline) + # create output filename - pipeline["output_filename"] = file_key.replace(".json", "." + pipeline["output"]["format"]).replace( + pipeline["output"]["filename"] = file_key.replace(".json", "." + pipeline["output"]["format"]).replace( pipeline["settings"]["FUNCTION_S3_INPUT"], pipeline["settings"]["FUNCTION_S3_OUTPUT"] ) - if not pipeline["settings"]["force_compute"]: + if not pipeline["settings"]["FUNCTION_FORCE_COMPUTE"]: + # get configured s3 bucket to check for cache bucket = pipeline["settings"]["S3_BUCKET_NAME"] # We can return if there is a valid cached output to save compute time. try: - s3.head_object(Bucket=bucket, Key=pipeline["output_filename"]) + s3.head_object(Bucket=bucket, Key=pipeline["output"]["filename"]) return None # throws ClientError if no file is found @@ -100,23 +130,28 @@ def parse_event(trigger, event): elif trigger == "APIGateway": print("Triggered from API Gateway") - url = event["queryStringParameters"] - if isinstance(url, string_types): - url = urllib.parse_qs(urllib.urlparse(url).query) + pipeline = default_pipeline() + pipeline["url"] = event["queryStringParameters"] + pipeline["params"] = urllib.parse_qs(urllib.urlparse(pipeline["url"]).query) + + # make all params lowercase + pipeline["params"] = [param.lower() for param in pipeline["params"]] - # Capitalize the keywords for consistency - pipeline = {} - for param in url: + # look for specific parameter definitions in query parameters + for param in pipeline["params"]: + if param == "settings": + pipeline["settings"] = pipeline["params"][param] - if param.upper() == "PIPELINE": - pipeline["pipeline"] = url[param] + if param == "output": + pipeline["output"] = pipeline["params"][param] - # TODO: do we still need this? will overwrite pipeline above - if param.upper() == "SETTINGS": - pipeline["settings"] = url[param] + # handle FORMAT in query parameters + if "format" in pipeline["params"]: + pipeline["output"]["format"] = pipeline["params"]["format"].split("/")[-1] - if param.upper() == "OUTPUT": - pipeline["output"] = url[param] + # handle image returns + if pipeline["output"]["format"] in ["png", "jpg", "jpeg"]: + pipeline["output"]["format_kwargs"]["return_base64"] = True return pipeline @@ -124,7 +159,7 @@ def parse_event(trigger, event): raise Exception("Unsupported trigger") -def handler(event, context, get_deps=True, ret_pipeline=False): +def handler(event, context): """Lambda function handler Parameters @@ -187,44 +222,33 @@ def handler(event, context, get_deps=True, ret_pipeline=False): import podpac.datalib # update podpac settings with inputs from the trigger - for key in pipeline["settings"]: - settings[key] = pipeline["settings"][key] + settings = {**settings, **pipeline["settings"]} - # TODO: load this into pipeline["output"] - kwargs = {} - if trigger == "eval": - pass - - elif trigger == "S3": + # build the Node and Coordinates + if trigger in ("eval", "S3"): node = Node.from_definition(pipeline["pipeline"]) coords = Coordinates.from_json(json.dumps(pipeline["coordinates"], indent=4, cls=JSONEncoder)) - fmt = pipeline["output"]["format"] - kwargs = pipeline["output"].copy() - kwargs.pop("format") # get rid of format # TODO: handle API Gateway better - is this always going to be WCS? elif trigger == "APIGateway": - # TODO: handle this in the parser above - not sure what the spec should be here - node = Node.from_url(event["queryStringParameters"]) - coords = Coordinates.from_url(event["queryStringParameters"]) - fmt = _get_query_params_from_url(event["queryStringParameters"])["FORMAT"].split("/")[-1] - if fmt in ["png", "jpg", "jpeg"]: - kwargs["return_base64"] = True - - # run analysis and covert to output format + node = Node.from_url(pipeline["url"]) + coords = Coordinates.from_url(pipeline["url"]) + + # run analysis output = node.eval(coords) - body = output.to_format(fmt, **kwargs) - # ######## + # convert to output format + body = output.to_format(pipeline["output"]["format"], **pipeline["output"]["format_kwargs"]) + # Response - # ######## if trigger == "eval": - pass - # return {"statusCode": 200, "headers": {"Content-Type": "image/png"}, "isBase64Encoded": True, "body": body} + return body + elif trigger == "S3": - s3.put_object(Bucket=settings["S3_BUCKET_NAME"], Key=pipeline["output_filename"], Body=body) + s3.put_object(Bucket=settings["S3_BUCKET_NAME"], Key=pipeline["output"]["filename"], Body=body) elif trigger == "APIGateway": + # TODO: can we handle the deserialization better? try: json.dumps(body) diff --git a/podpac/core/managers/aws.py b/podpac/core/managers/aws.py index 489da2fb1..cc224ad8a 100644 --- a/podpac/core/managers/aws.py +++ b/podpac/core/managers/aws.py @@ -8,6 +8,8 @@ import time import re from copy import deepcopy +import base64 +from datetime import datetime import boto3 import botocore @@ -23,14 +25,15 @@ # Set up logging _log = logging.getLogger(__name__) -try: - import cPickle # Python 2.7 -except: - import _pickle as cPickle - COMMON_DOC = COMMON_NODE_DOC.copy() +class LambdaException(Exception): + """ Exception during execution of a Lambda node""" + + pass + + class Lambda(Node): """A `Node` wrapper to evaluate source on AWS Lambda function @@ -142,7 +145,7 @@ def _session_default(self): # lambda function parameters function_name = tl.Unicode().tag(attr=True, readonly=True) # see default below - function_triggers = tl.List(tl.Enum(["eval", "S3", "APIGateway"]), default_value=["eval"]).tag(readonly=True) + function_triggers = tl.List(tl.Enum(["eval", "S3", "APIGateway"])).tag(readonly=True) function_handler = tl.Unicode(default_value="handler.handler").tag(readonly=True) function_description = tl.Unicode(default_value="PODPAC Lambda Function (https://podpac.org)").tag(readonly=True) function_env_variables = tl.Dict(default_value={}).tag(readonly=True) # environment vars in function @@ -174,6 +177,13 @@ def _function_name_default(self): return settings["FUNCTION_NAME"] + @tl.default("function_triggers") + def _function_triggers_default(self): + if self.function_eval_trigger != "eval": + return ["eval", self.function_eval_trigger] + else: + return ["eval"] + @tl.default("function_source_dist_key") def _function_source_dist_key_default(self): v = version.version() @@ -325,14 +335,21 @@ def _function_api_tags_default(self): source = tl.Instance(Node, allow_none=True).tag(attr=True) source_output_format = tl.Unicode(default_value="netcdf") source_output_name = tl.Unicode() - attrs = tl.Dict() # TODO: are we still using this? + attrs = tl.Dict() download_result = tl.Bool(True).tag(attr=True) - force_compute = tl.Bool(default_value=False).tag(attr=True) + force_compute = tl.Bool().tag(attr=True) @tl.default("source_output_name") def _source_output_name_default(self): return self.source.__class__.__name__ + @tl.default("force_compute") + def _force_compute_default(self): + if settings["FUNCTION_FORCE_COMPUTE"] is None: + settings["FUNCTION_FORCE_COMPUTE"] = False + + return settings["FUNCTION_FORCE_COMPUTE"] + @property def pipeline(self): """ @@ -368,6 +385,9 @@ def build(self): to run PODPAC pipelines """ + # TODO: move towards an architecture where the "create_" functions repair on each build + # and skip when the resources already exist + # see if current setup is valid, if so just return valid = self.validate() if valid: @@ -402,9 +422,8 @@ def build(self): # TODO: check to make sure function and API work together - # create S3 bucket - if self._bucket is None: - self.create_bucket() + # create S3 bucket - this will skip pieces that already exist + self.create_bucket() # check to see if setup is valid after creation # TODO: remove this in favor of something more granular?? @@ -823,24 +842,34 @@ def create_bucket(self): # after creating a bucket, you need to wait ~2 seconds before its active and can be uploaded to # this is not cool - time.sleep(2) + time.sleep(5) + + # get reference to s3 client for session + s3 = self.session.client("s3") # add podpac deps to bucket for version - # copy from user supplied dependencies - if self.function_source_dependencies_zip is not None: - put_object( - self.session, - self.function_s3_bucket, - self.function_s3_dependencies_key, - file=self.function_source_dependencies_zip, - ) + # see if the function depedencies exist in bucket + try: + s3.head_object(Bucket=self.function_s3_bucket, Key=self.function_s3_dependencies_key) + except botocore.exceptions.ClientError: + + # copy from user supplied dependencies + if self.function_source_dependencies_zip is not None: + put_object( + self.session, + self.function_s3_bucket, + self.function_s3_dependencies_key, + file=self.function_source_dependencies_zip, + ) - # copy resources from podpac dist - else: - s3resource = self.session.resource("s3") - copy_source = {"Bucket": self.function_source_bucket, "Key": self.function_source_dependencies_key} - s3resource.meta.client.copy(copy_source, self.function_s3_bucket, self.function_s3_dependencies_key) + # copy resources from podpac dist + else: + s3resource = self.session.resource("s3") + copy_source = {"Bucket": self.function_source_bucket, "Key": self.function_source_dependencies_key} + s3resource.meta.client.copy(copy_source, self.function_s3_bucket, self.function_s3_dependencies_key) + # Add S3 Function triggers, if they don't exist already + # TODO: add validition to see if trigger already exists if "S3" in self.function_triggers: # add permission to invoke call lambda - this feels brittle due to source_arn statement_id = re.sub("[-_.]", "", self.function_s3_bucket) @@ -849,7 +878,6 @@ def create_bucket(self): self.add_trigger(statement_id, principle, source_arn) # lambda integration on object creation events - s3 = self.session.client("s3") s3.put_bucket_notification_configuration( Bucket=self.function_s3_bucket, NotificationConfiguration={ @@ -863,6 +891,8 @@ def create_bucket(self): ] }, ) + else: + _log.debug("Skipping S3 trigger because 'S3' not in the function triggers") return bucket @@ -885,10 +915,22 @@ def validate_bucket(self): This should only be run after running `self.get_bucket()` """ - # TODO: needs to be implemented if self._bucket is None: return False + s3 = self.session.client("s3") + + # make sure dependencies are in there + try: + s3.head_object(Bucket=self.function_s3_bucket, Key=self.function_s3_dependencies_key) + except botocore.exceptions.ClientError: + _log.error("Failed to find PODPAC dependencies in bucket") + return False + + # TODO: make sure trigger exists + if "S3" in self.function_triggers: + pass + return True def delete_bucket(self, delete_objects=False): @@ -1021,7 +1063,7 @@ def delete_api(self): self._function_api_resource_id = None # Logs - def get_logs(self, limit=100, start=None, end=None): + def get_logs(self, limit=5, start=None, end=None): """Get Cloudwatch logs from lambda function execution See :func:`podpac.managers.aws.get_logs` @@ -1144,6 +1186,8 @@ def _create_eval_pipeline(self, coordinates): # add coordinates to the pipeline pipeline = self.pipeline # contains "pipeline" and "output" keys pipeline["coordinates"] = json.loads(coordinates.json) + + # TODO: should we move this to `self.pipeline`? pipeline["settings"] = settings.copy() # TODO: we should not wholesale copy settings here !! pipeline["settings"][ "FUNCTION_DEPENDENCIES_KEY" @@ -1168,10 +1212,20 @@ def _eval_invoke(self, coordinates, output=None): LogType="Tail", # include the execution log in the response. Payload=payload, ) - print(response) - # After waiting, load the pickle file like this: _log.debug("Received response from lambda function") + + if "FunctionError" in response: + _log.error("Unhandled error from lambda function") + # logs = base64.b64decode(response["LogResult"]).decode("UTF-8").split('\n') + payload = json.loads(response["Payload"].read().decode("UTF-8")) + raise LambdaException( + "Error in lambda function evaluation:\n\nError Type: {}\nError Message: {}\nStack Trace: {}".format( + payload["errorType"], payload["errorMessage"], "\n".join(payload["stackTrace"]) + ) + ) + + # After waiting, load the pickle file like this: payload = response["Payload"].read() self._output = open_dataarray(payload) return self._output @@ -1186,7 +1240,7 @@ def _eval_s3(self, coordinates, output=None): # create eval pipeline pipeline = self._create_eval_pipeline(coordinates) - pipeline["settings"]["force_compute"] = self.force_compute + pipeline["settings"]["FUNCTION_FORCE_COMPUTE"] = self.force_compute pipeline["settings"][ "FUNCTION_S3_INPUT" ] = input_folder # overwrite in case this is specified explicitly by class @@ -1637,17 +1691,18 @@ def create_function( "Tags": function_tags, } + # read function from zip file + if function_source_dist_zip is not None: + raise NotImplementedError("Supplying a source dist zip from a local file is not yet supported") + # TODO: this fails when the file size is over a certain limit + # with open(function_source_dist_zip, "rb") as f: + # lambda_config["Code"]["ZipFile"]: f.read() + # read function from S3 (Default) - if function_source_bucket is not None and function_source_dist_key is not None: + elif function_source_bucket is not None and function_source_dist_key is not None: lambda_config["Code"]["S3Bucket"] = function_source_bucket lambda_config["Code"]["S3Key"] = function_source_dist_key - # read function from zip file - elif function_source_dist_zip is not None: - with open(function_source_dist_zip, "rb") as f: - lambda_config["Code"] = {} # reset the code dict to make sure S3Bucket and S3Key are overridden - lambda_config["Code"]["ZipFile"]: f.read() - else: raise ValueError("Function source is not defined") @@ -2334,4 +2389,16 @@ def get_logs(session, log_group_name, limit=100, start=None, end=None): ) logs += response["events"] + # sort logs + logs.sort(key=lambda k: k["timestamp"]) + + # take only the last "limit" + logs = logs[-limit:] + + # add time easier to read + for log in logs: + log["time"] = "{}.{:03d}".format( + datetime.fromtimestamp(log["timestamp"] / 1000).strftime("%Y-%m-%d %H:%M:%S"), log["timestamp"] % 1000 + ) + return logs diff --git a/podpac/core/settings.py b/podpac/core/settings.py index d11184a84..f5de84afc 100644 --- a/podpac/core/settings.py +++ b/podpac/core/settings.py @@ -53,6 +53,7 @@ "FUNCTION_DEPENDENCIES_KEY": None, "FUNCTION_S3_INPUT": None, "FUNCTION_S3_OUTPUT": None, + "FUNCTION_FORCE_COMPUTE": False, } @@ -134,6 +135,8 @@ class PodpacSettings(dict): Folder within :attr:`S3_BUCKET_NAME` to use for triggering node evaluation. FUNCTION_S3_OUTPUT : str Folder within :attr:`S3_BUCKET_NAME` to use for outputs. + FUNCTION_FORCE_COMPUTE : bool + Force the lambda function to compute pipeline, even if result is already cached. AUTOSAVE_SETTINGS: bool Save settings automatically as they are changed during runtime. Defaults to ``False``. MULTITHREADING: bool From ed82395147cb004b79c7dcf072e17fdfd6fa1d62 Mon Sep 17 00:00:00 2001 From: David Sullivan Date: Wed, 13 Nov 2019 11:19:41 -0500 Subject: [PATCH 04/12] FIX: Optimize the handler's parsing of params --- dist/aws/handler.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/dist/aws/handler.py b/dist/aws/handler.py index 02bf896b9..f0ebd18a1 100644 --- a/dist/aws/handler.py +++ b/dist/aws/handler.py @@ -145,13 +145,13 @@ def parse_event(trigger, event): if param == "output": pipeline["output"] = pipeline["params"][param] - # handle FORMAT in query parameters - if "format" in pipeline["params"]: - pipeline["output"]["format"] = pipeline["params"]["format"].split("/")[-1] + # handle FORMAT in query parameters + if param == "format": + pipeline["output"][param] = pipeline["params"][param].split("/")[-1] + # handle image returns + if pipeline["output"][param] in ["png", "jpg", "jpeg"]: + pipeline["output"]["format_kwargs"]["return_base64"] = True - # handle image returns - if pipeline["output"]["format"] in ["png", "jpg", "jpeg"]: - pipeline["output"]["format_kwargs"]["return_base64"] = True return pipeline From b9c6567b3f4a60bd4afa8db9289f33c8b0e48228 Mon Sep 17 00:00:00 2001 From: Marc Shapiro Date: Wed, 13 Nov 2019 13:12:49 -0500 Subject: [PATCH 05/12] FIX: typo, though it was commented out at this point --- podpac/core/managers/aws.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/podpac/core/managers/aws.py b/podpac/core/managers/aws.py index cc224ad8a..0a31e440c 100644 --- a/podpac/core/managers/aws.py +++ b/podpac/core/managers/aws.py @@ -1696,7 +1696,7 @@ def create_function( raise NotImplementedError("Supplying a source dist zip from a local file is not yet supported") # TODO: this fails when the file size is over a certain limit # with open(function_source_dist_zip, "rb") as f: - # lambda_config["Code"]["ZipFile"]: f.read() + # lambda_config["Code"]["ZipFile"] = f.read() # read function from S3 (Default) elif function_source_bucket is not None and function_source_dist_key is not None: From 06c9edee9fcae8be3de16bc10fc95c5227e3ea11 Mon Sep 17 00:00:00 2001 From: Marc Shapiro Date: Wed, 13 Nov 2019 13:21:01 -0500 Subject: [PATCH 06/12] FIX: replace `open_dataarray` with `UnitsDataArray.open()` (see #334) --- podpac/core/managers/aws.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/podpac/core/managers/aws.py b/podpac/core/managers/aws.py index 0a31e440c..f387555ad 100644 --- a/podpac/core/managers/aws.py +++ b/podpac/core/managers/aws.py @@ -16,7 +16,7 @@ import traitlets as tl import numpy as np -from podpac.core.units import open_dataarray +from podpac.core.units import UnitsDataArray from podpac.core.settings import settings from podpac.core.node import COMMON_NODE_DOC, Node from podpac.core.utils import common_doc, JSONEncoder @@ -1227,7 +1227,7 @@ def _eval_invoke(self, coordinates, output=None): # After waiting, load the pickle file like this: payload = response["Payload"].read() - self._output = open_dataarray(payload) + self._output = UnitsDataArray.open(payload) return self._output def _eval_s3(self, coordinates, output=None): @@ -1290,7 +1290,7 @@ def _eval_s3(self, coordinates, output=None): _log.debug("Received response from lambda function") response = s3.get_object(Key=filename, Bucket=self.function_s3_bucket) body = response["Body"].read() - self._output = open_dataarray(body) + self._output = UnitsDataArray.open(body) return self._output def _eval_api(self, coordinates, output=None): From 15398c4ab0060ccff78cc0877406c2d74423a847 Mon Sep 17 00:00:00 2001 From: Marc Shapiro Date: Wed, 13 Nov 2019 15:23:57 -0500 Subject: [PATCH 07/12] ENH: add ability to restrict evaluation based on node hash --- dist/aws/handler.py | 10 +++++++--- podpac/core/managers/aws.py | 5 +++++ 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/dist/aws/handler.py b/dist/aws/handler.py index f0ebd18a1..7c7bcea77 100644 --- a/dist/aws/handler.py +++ b/dist/aws/handler.py @@ -6,8 +6,7 @@ import subprocess import sys import urllib.parse as urllib -from collections import OrderedDict - +import os import boto3 import botocore @@ -152,7 +151,6 @@ def parse_event(trigger, event): if pipeline["output"][param] in ["png", "jpg", "jpeg"]: pipeline["output"]["format_kwargs"]["return_base64"] = True - return pipeline else: @@ -234,6 +232,12 @@ def handler(event, context): node = Node.from_url(pipeline["url"]) coords = Coordinates.from_url(pipeline["url"]) + # make sure pipeline is allowed to be run + if "PODPAC_RESTRICT_PIPELINES" in os.environ: + whitelist = json.loads(os.environ["PODPAC_RESTRICT_PIPELINES"]) + if node.hash not in whitelist: + raise ValueError("Node hash is not in the whitelist for this function") + # run analysis output = node.eval(coords) diff --git a/podpac/core/managers/aws.py b/podpac/core/managers/aws.py index f387555ad..c6622d193 100644 --- a/podpac/core/managers/aws.py +++ b/podpac/core/managers/aws.py @@ -162,6 +162,7 @@ def _session_default(self): function_source_dist_key = tl.Unicode().tag(readonly=True) # see default below function_source_dependencies_key = tl.Unicode().tag(readonly=True) # see default below function_allow_unsafe_eval = tl.Bool(default_value=False).tag(readonly=True) + function_restrict_pipelines = tl.List(tl.Unicode(), default_value=[]).tag(readonly=True) _function_arn = tl.Unicode(default_value=None, allow_none=True) _function_last_modified = tl.Unicode(default_value=None, allow_none=True) _function_version = tl.Unicode(default_value=None, allow_none=True) @@ -628,6 +629,10 @@ def create_function(self): _log.info("Lambda function will allow unsafe evaluation of Nodes with the current settings") self.function_env_variables["PODPAC_UNSAFE_EVAL"] = settings["UNSAFE_EVAL_HASH"] + if self.function_restrict_pipelines: + _log.info("Lambda function will only run for pipelines: {}".format(self.function_restrict_pipelines)) + self.function_env_variables["PODPAC_RESTRICT_PIPELINES"] = self.function_restrict_pipelines + # if function already exists, this will return existing function function = create_function( self.session, From 0cac3a01dd7e3e8e5b87ced6c4ef6c1705844314 Mon Sep 17 00:00:00 2001 From: David Sullivan Date: Wed, 13 Nov 2019 16:00:30 -0500 Subject: [PATCH 08/12] FIX: clean up handler.py --- dist/aws/handler.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/dist/aws/handler.py b/dist/aws/handler.py index 7c7bcea77..193fc6ee2 100644 --- a/dist/aws/handler.py +++ b/dist/aws/handler.py @@ -138,14 +138,18 @@ def parse_event(trigger, event): # look for specific parameter definitions in query parameters for param in pipeline["params"]: + + # handle SETTINGS in query parameters + # TODO: shouldn't this merge trigger-provided settings with default? if param == "settings": pipeline["settings"] = pipeline["params"][param] - if param == "output": + # handle OUTPUT in query parameters + elif param == "output": pipeline["output"] = pipeline["params"][param] # handle FORMAT in query parameters - if param == "format": + elif param == "format": pipeline["output"][param] = pipeline["params"][param].split("/")[-1] # handle image returns if pipeline["output"][param] in ["png", "jpg", "jpeg"]: From 3d2bf9d993136087cbdedec317d723e699925bd9 Mon Sep 17 00:00:00 2001 From: Marc Shapiro Date: Wed, 13 Nov 2019 16:16:45 -0500 Subject: [PATCH 09/12] FIX: add function_restricts_pipelines to docstring and make env var a json string --- dist/aws/aws_requirements.txt | 2 +- podpac/core/managers/aws.py | 9 +++++++-- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/dist/aws/aws_requirements.txt b/dist/aws/aws_requirements.txt index 206ae85ff..95fd4ca7e 100644 --- a/dist/aws/aws_requirements.txt +++ b/dist/aws/aws_requirements.txt @@ -10,7 +10,7 @@ h5py>=2.9 lxml>=4.2 pydap>=3.2 rasterio>=0.36 -pyproj>=2.2 +pyproj>=2.4 requests>=2.18 numexpr>=2.6 lazy-import>=0.2.2 diff --git a/podpac/core/managers/aws.py b/podpac/core/managers/aws.py index c6622d193..cd266024a 100644 --- a/podpac/core/managers/aws.py +++ b/podpac/core/managers/aws.py @@ -83,8 +83,11 @@ class Lambda(Node): Function trigger to use during node eval process. Must be on of "eval" (default), "S3", or "APIGateway". function_handler : str, optional Handler method in Lambda function. Defaults to "handler.handler". - function_memory : int, option + function_memory : int, optional Memory allocated for each Lambda function. Defaults to 2048 MB. + function_restrict_pipelines : list, optional + List of Node hashes (see :class:`podpac.Node.hash`). + Restricts lambda function evaluation to specific Node definitions. function_role_assume_policy_document : dict, optional. Assume policy document for role created. Defaults to allowing role to assume Lambda function. function_role_description : str, optional @@ -570,6 +573,7 @@ def describe(self): Source Dependencies: {source_deps} Last Modified: {_function_last_modified} Version: {_function_version} + Restrict Evaluation: {function_restrict_pipelines} S3 Bucket: {function_s3_bucket} @@ -602,6 +606,7 @@ def describe(self): _function_arn=self._function_arn, _function_last_modified=self._function_last_modified, _function_version=self._function_version, + function_restrict_pipelines=self.function_restrict_pipelines, function_s3_bucket=self.function_s3_bucket, function_s3_tags=self.function_s3_tags, function_s3_input=self.function_s3_input, @@ -631,7 +636,7 @@ def create_function(self): if self.function_restrict_pipelines: _log.info("Lambda function will only run for pipelines: {}".format(self.function_restrict_pipelines)) - self.function_env_variables["PODPAC_RESTRICT_PIPELINES"] = self.function_restrict_pipelines + self.function_env_variables["PODPAC_RESTRICT_PIPELINES"] = json.dumps(self.function_restrict_pipelines) # if function already exists, this will return existing function function = create_function( From d71df38804f182c4b8944b7993ebc49590f8b1e1 Mon Sep 17 00:00:00 2001 From: David Sullivan Date: Mon, 18 Nov 2019 16:26:03 -0500 Subject: [PATCH 10/12] FIX: Fix APIGateway triggered events - correctly get params The `queryStringParameters` are already a dict, so they don't need parsed with `urllib`. This also merges any settings passed in the event with the default pipeline, then again with the settings on the AWS instance. --- dist/aws/handler.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/dist/aws/handler.py b/dist/aws/handler.py index 193fc6ee2..8a35de961 100644 --- a/dist/aws/handler.py +++ b/dist/aws/handler.py @@ -11,6 +11,7 @@ import boto3 import botocore +from six import string_types def default_pipeline(pipeline=None): """Get default pipeline definiton, merging with input pipline if supplied @@ -131,18 +132,16 @@ def parse_event(trigger, event): pipeline = default_pipeline() pipeline["url"] = event["queryStringParameters"] - pipeline["params"] = urllib.parse_qs(urllib.urlparse(pipeline["url"]).query) - - # make all params lowercase - pipeline["params"] = [param.lower() for param in pipeline["params"]] + if isinstance(pipeline["url"], string_types): + pipeline["url"] = urllib.parse_qs(urllib.urlparse(pipeline["url"]).query) + pipeline["params"] = pipeline["url"] # look for specific parameter definitions in query parameters for param in pipeline["params"]: # handle SETTINGS in query parameters - # TODO: shouldn't this merge trigger-provided settings with default? if param == "settings": - pipeline["settings"] = pipeline["params"][param] + pipeline["settings"] = {**pipeline["settings"], **pipeline["params"][param]} # handle OUTPUT in query parameters elif param == "output": From 41a25cd4cc88ec6200f4848e91618a05d0674751 Mon Sep 17 00:00:00 2001 From: David Sullivan Date: Tue, 19 Nov 2019 16:57:07 -0500 Subject: [PATCH 11/12] FIX: Update the handler to load settings json from APIGateway --- dist/aws/handler.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/dist/aws/handler.py b/dist/aws/handler.py index 8a35de961..bb3a5a982 100644 --- a/dist/aws/handler.py +++ b/dist/aws/handler.py @@ -140,8 +140,16 @@ def parse_event(trigger, event): for param in pipeline["params"]: # handle SETTINGS in query parameters - if param == "settings": - pipeline["settings"] = {**pipeline["settings"], **pipeline["params"][param]} + if param == "settings": + # Try loading this settings string into a dict to merge with default settings + try: + api_settings = json.loads(pipeline["params"][param]) + # If we get here, the api settings were loaded + pipeline["settings"] = {**pipeline["settings"], **api_settings} + except Exception as e: + print("Got an exception when attempting to load api settings: ", e) + print(pipeline) + # handle OUTPUT in query parameters elif param == "output": From 9fa9d4dbf94e7610772966f34a7e5ed98d1af4e5 Mon Sep 17 00:00:00 2001 From: Marc Shapiro Date: Thu, 21 Nov 2019 08:50:12 -0500 Subject: [PATCH 12/12] FIX: add NotImplementedError for ZipFile --- podpac/core/managers/aws.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/podpac/core/managers/aws.py b/podpac/core/managers/aws.py index cd266024a..b22f2919a 100644 --- a/podpac/core/managers/aws.py +++ b/podpac/core/managers/aws.py @@ -1804,8 +1804,10 @@ def update_function( # read function from zip file elif function_source_dist_zip is not None: - with open(function_source_dist_zip, "rb") as f: - lambda_config["ZipFile"]: f.read() + raise NotImplementedError("Supplying a source dist zip from a local file is not yet supported") + # TODO: this fails when the file size is over a certain limit + # with open(function_source_dist_zip, "rb") as f: + # lambda_config["ZipFile"] = f.read() else: raise ValueError("Function source is not defined")