diff --git a/dist/aws/aws_requirements.txt b/dist/aws/aws_requirements.txt index 206ae85ff..95fd4ca7e 100644 --- a/dist/aws/aws_requirements.txt +++ b/dist/aws/aws_requirements.txt @@ -10,7 +10,7 @@ h5py>=2.9 lxml>=4.2 pydap>=3.2 rasterio>=0.36 -pyproj>=2.2 +pyproj>=2.4 requests>=2.18 numexpr>=2.6 lazy-import>=0.2.2 diff --git a/dist/aws/handler.py b/dist/aws/handler.py index cfafccfce..bb3a5a982 100644 --- a/dist/aws/handler.py +++ b/dist/aws/handler.py @@ -1,144 +1,222 @@ """ PODPAC AWS Handler - -Attributes ----------- -s3 : TYPE - Description -settings_json : dict - Description """ -from __future__ import absolute_import, division, print_function, unicode_literals import json import subprocess import sys import urllib.parse as urllib -from six import string_types -from collections import OrderedDict - -import _pickle as cPickle +import os import boto3 import botocore +from six import string_types -def is_s3_trigger(event): - """ - Helper method to determine if the given event was triggered by an S3 event +def default_pipeline(pipeline=None): + """Get default pipeline definiton, merging with input pipline if supplied Parameters ---------- - event : dict - Event dict from AWS. See [TODO: add link reference] + pipeline : dict, optional + Input pipline. Will fill in any missing defaults. Returns ------- - Bool - True if the event is an S3 trigger + dict + pipeline dict """ - return "Records" in event and event["Records"][0]["eventSource"] == "aws:s3" + defaults = { + "pipeline": {}, + "settings": {}, + "output": {"format": "netcdf", "filename": None, "format_kwargs": {}}, + # API Gateway + "url": "", + "params": {}, + } + + # merge defaults with input pipelines, if supplied + if pipeline is not None: + pipeline = {**defaults, **pipeline} + pipeline["output"] = {**defaults["output"], **pipeline["output"]} + pipeline["settings"] = {**defaults["settings"], **pipeline["settings"]} + else: + pipeline = defaults + return pipeline -def handler(event, context, get_deps=True, ret_pipeline=False): - """Lambda function handler + +def get_trigger(event): + """ + Helper method to determine the trigger for the lambda invocation Parameters ---------- - event : TYPE - Description - context : TYPE - Description - get_deps : bool, optional - Description - ret_pipeline : bool, optional - Description + event : dict + Event dict from AWS. See [TODO: add link reference] Returns ------- - TYPE - Description + str + One of "S3", "eval", or "APIGateway" """ - print(event) + if "Records" in event and event["Records"][0]["eventSource"] == "aws:s3": + return "S3" + elif "queryStringParameters" in event: + return "APIGateway" + else: + return "eval" - # Add /tmp/ path to handle python path for dependencies - sys.path.append("/tmp/") - # get S3 client - note this will have the same access that the current "function role" does - s3 = boto3.client("s3") +def parse_event(trigger, event): + """Parse pipeline, settings, and output details from event depending on trigger + + Parameters + ---------- + trigger : str + One of "S3", "eval", or "APIGateway" + event : dict + Event dict from AWS. See [TODO: add link reference] + """ - args = [] - kwargs = {} + if trigger == "eval": + print("Triggered by Invoke") - # This event was triggered by S3, so let's see if the requested output is already cached. - if is_s3_trigger(event): + # event is the pipeline, provide consistent pipeline defaults + pipeline = default_pipeline(event) - # We always have to look to the bucket that triggered the event for the input - bucket = event["Records"][0]["s3"]["bucket"]["name"] + return pipeline - file_key = urllib.unquote_plus(event["Records"][0]["s3"]["object"]["key"]) - _json = "" + elif trigger == "S3": + print("Triggered from S3") + + # get boto s3 client + s3 = boto3.client("s3") + + # We always have to look to the bucket that triggered the event for the input + triggered_bucket = event["Records"][0]["s3"]["bucket"]["name"] # get the pipeline object and read - pipeline_obj = s3.get_object(Bucket=bucket, Key=file_key) - pipeline = json.loads(pipeline_obj["Body"].read().decode("utf-8")) + file_key = urllib.unquote_plus(event["Records"][0]["s3"]["object"]["key"]) + pipline_obj = s3.get_object(Bucket=triggered_bucket, Key=file_key) + pipeline = json.loads(pipline_obj["Body"].read().decode("utf-8")) - # We can return if there is a valid cached output to save compute time. - settings_trigger = pipeline["settings"] + # provide consistent pipeline defaults + pipeline = default_pipeline(pipeline) - # check for cached output - cached = False - output_filename = file_key.replace(".json", "." + pipeline["output"]["format"]).replace( - settings_trigger["FUNCTION_S3_INPUT"], settings_trigger["FUNCTION_S3_OUTPUT"] + # create output filename + pipeline["output"]["filename"] = file_key.replace(".json", "." + pipeline["output"]["format"]).replace( + pipeline["settings"]["FUNCTION_S3_INPUT"], pipeline["settings"]["FUNCTION_S3_OUTPUT"] ) - try: - s3.head_object(Bucket=bucket, Key=output_filename) + if not pipeline["settings"]["FUNCTION_FORCE_COMPUTE"]: + + # get configured s3 bucket to check for cache + bucket = pipeline["settings"]["S3_BUCKET_NAME"] + + # We can return if there is a valid cached output to save compute time. + try: + s3.head_object(Bucket=bucket, Key=pipeline["output"]["filename"]) + return None + + # throws ClientError if no file is found + except botocore.exceptions.ClientError: + pass + + # return pipeline definition + return pipeline + + elif trigger == "APIGateway": + print("Triggered from API Gateway") + + pipeline = default_pipeline() + pipeline["url"] = event["queryStringParameters"] + if isinstance(pipeline["url"], string_types): + pipeline["url"] = urllib.parse_qs(urllib.urlparse(pipeline["url"]).query) + pipeline["params"] = pipeline["url"] + + # look for specific parameter definitions in query parameters + for param in pipeline["params"]: + + # handle SETTINGS in query parameters + if param == "settings": + # Try loading this settings string into a dict to merge with default settings + try: + api_settings = json.loads(pipeline["params"][param]) + # If we get here, the api settings were loaded + pipeline["settings"] = {**pipeline["settings"], **api_settings} + except Exception as e: + print("Got an exception when attempting to load api settings: ", e) + print(pipeline) + + + # handle OUTPUT in query parameters + elif param == "output": + pipeline["output"] = pipeline["params"][param] + + # handle FORMAT in query parameters + elif param == "format": + pipeline["output"][param] = pipeline["params"][param].split("/")[-1] + # handle image returns + if pipeline["output"][param] in ["png", "jpg", "jpeg"]: + pipeline["output"]["format_kwargs"]["return_base64"] = True + + return pipeline + + else: + raise Exception("Unsupported trigger") - # Object exists, so we don't have to recompute - # TODO: the "force_compute" parameter will never work as is written in aws.py - if not pipeline.get("force_compute", False): - cached = True - except botocore.exceptions.ClientError: - pass +def handler(event, context): + """Lambda function handler + + Parameters + ---------- + event : dict + Description + context : TYPE + Description + get_deps : bool, optional + Description + ret_pipeline : bool, optional + Description + """ + print(event) + + # Add /tmp/ path to handle python path for dependencies + sys.path.append("/tmp/") - if cached: - return + # handle triggers + trigger = get_trigger(event) - # TODO: handle "invoke" and "APIGateway" triggers explicitly - else: - print("Not triggered by S3") + # parse event + pipeline = parse_event(trigger, event) - url = event["queryStringParameters"] - if isinstance(url, string_types): - url = urllib.parse_qs(urllib.urlparse(url).query) + # bail if we can't parse + if pipeline is None: + return - # Capitalize the keywords for consistency - settings_trigger = {} - for k in url: - if k.upper() == "SETTINGS": - settings_trigger = url[k] - bucket = settings_trigger["S3_BUCKET_NAME"] + # ----- + # TODO: remove when layers is configured + # get configured bucket to download dependencies + bucket = pipeline["settings"]["S3_BUCKET_NAME"] # get dependencies path - if "FUNCTION_DEPENDENCIES_KEY" in settings_trigger: - dependencies = settings_trigger["FUNCTION_DEPENDENCIES_KEY"] + if "FUNCTION_DEPENDENCIES_KEY" in pipeline["settings"]: + dependencies = pipeline["settings"]["FUNCTION_DEPENDENCIES_KEY"] else: - # TODO: this could be a problem, since we can't import podpac settings yet - # which means the input event might have to include the version or - # "FUNCTION_DEPENDENCIES_KEY". dependencies = "podpac_deps_{}.zip".format( - settings_trigger["PODPAC_VERSION"] + pipeline["settings"]["PODPAC_VERSION"] ) # this should be equivalent to version.semver() # Download dependencies from specific bucket/object - if get_deps: - s3.download_file(bucket, dependencies, "/tmp/" + dependencies) - subprocess.call(["unzip", "/tmp/" + dependencies, "-d", "/tmp"]) - sys.path.append("/tmp/") - subprocess.call(["rm", "/tmp/" + dependencies]) + s3 = boto3.client("s3") + s3.download_file(bucket, dependencies, "/tmp/" + dependencies) + subprocess.call(["unzip", "/tmp/" + dependencies, "-d", "/tmp"]) + sys.path.append("/tmp/") + subprocess.call(["rm", "/tmp/" + dependencies]) + # ----- # Load PODPAC @@ -153,38 +231,40 @@ def handler(event, context, get_deps=True, ret_pipeline=False): import podpac.datalib # update podpac settings with inputs from the trigger - for key in settings_trigger: - settings[key] = settings_trigger[key] + settings = {**settings, **pipeline["settings"]} - if is_s3_trigger(event): + # build the Node and Coordinates + if trigger in ("eval", "S3"): node = Node.from_definition(pipeline["pipeline"]) coords = Coordinates.from_json(json.dumps(pipeline["coordinates"], indent=4, cls=JSONEncoder)) - fmt = pipeline["output"]["format"] - kwargs = pipeline["output"].copy() - kwargs.pop("format") - # TODO: handle "invoke" and "APIGateway" triggers explicitly - else: - coords = Coordinates.from_url(event["queryStringParameters"]) - node = Node.from_url(event["queryStringParameters"]) - fmt = _get_query_params_from_url(event["queryStringParameters"])["FORMAT"].split("/")[-1] - if fmt in ["png", "jpg", "jpeg"]: - kwargs["return_base64"] = True + # TODO: handle API Gateway better - is this always going to be WCS? + elif trigger == "APIGateway": + node = Node.from_url(pipeline["url"]) + coords = Coordinates.from_url(pipeline["url"]) + + # make sure pipeline is allowed to be run + if "PODPAC_RESTRICT_PIPELINES" in os.environ: + whitelist = json.loads(os.environ["PODPAC_RESTRICT_PIPELINES"]) + if node.hash not in whitelist: + raise ValueError("Node hash is not in the whitelist for this function") + # run analysis output = node.eval(coords) - # FOR DEBUGGING - if ret_pipeline: - return node + # convert to output format + body = output.to_format(pipeline["output"]["format"], **pipeline["output"]["format_kwargs"]) - body = output.to_format(fmt, *args, **kwargs) + # Response + if trigger == "eval": + return body - # output_filename only exists if this was an S3 triggered event. - if is_s3_trigger(event): - s3.put_object(Bucket=bucket, Key=output_filename, Body=body) + elif trigger == "S3": + s3.put_object(Bucket=settings["S3_BUCKET_NAME"], Key=pipeline["output"]["filename"], Body=body) - # TODO: handle "invoke" and "APIGateway" triggers explicitly - else: + elif trigger == "APIGateway": + + # TODO: can we handle the deserialization better? try: json.dumps(body) except Exception as e: diff --git a/podpac/core/managers/aws.py b/podpac/core/managers/aws.py index 1e73707ef..b22f2919a 100644 --- a/podpac/core/managers/aws.py +++ b/podpac/core/managers/aws.py @@ -8,12 +8,15 @@ import time import re from copy import deepcopy +import base64 +from datetime import datetime import boto3 import botocore import traitlets as tl import numpy as np +from podpac.core.units import UnitsDataArray from podpac.core.settings import settings from podpac.core.node import COMMON_NODE_DOC, Node from podpac.core.utils import common_doc, JSONEncoder @@ -22,14 +25,15 @@ # Set up logging _log = logging.getLogger(__name__) -try: - import cPickle # Python 2.7 -except: - import _pickle as cPickle - COMMON_DOC = COMMON_NODE_DOC.copy() +class LambdaException(Exception): + """ Exception during execution of a Lambda node""" + + pass + + class Lambda(Node): """A `Node` wrapper to evaluate source on AWS Lambda function @@ -79,8 +83,11 @@ class Lambda(Node): Function trigger to use during node eval process. Must be on of "eval" (default), "S3", or "APIGateway". function_handler : str, optional Handler method in Lambda function. Defaults to "handler.handler". - function_memory : int, option + function_memory : int, optional Memory allocated for each Lambda function. Defaults to 2048 MB. + function_restrict_pipelines : list, optional + List of Node hashes (see :class:`podpac.Node.hash`). + Restricts lambda function evaluation to specific Node definitions. function_role_assume_policy_document : dict, optional. Assume policy document for role created. Defaults to allowing role to assume Lambda function. function_role_description : str, optional @@ -141,7 +148,7 @@ def _session_default(self): # lambda function parameters function_name = tl.Unicode().tag(attr=True, readonly=True) # see default below - function_triggers = tl.List(tl.Enum(["eval", "S3", "APIGateway"]), default_value=["eval"]).tag(readonly=True) + function_triggers = tl.List(tl.Enum(["eval", "S3", "APIGateway"])).tag(readonly=True) function_handler = tl.Unicode(default_value="handler.handler").tag(readonly=True) function_description = tl.Unicode(default_value="PODPAC Lambda Function (https://podpac.org)").tag(readonly=True) function_env_variables = tl.Dict(default_value={}).tag(readonly=True) # environment vars in function @@ -158,6 +165,7 @@ def _session_default(self): function_source_dist_key = tl.Unicode().tag(readonly=True) # see default below function_source_dependencies_key = tl.Unicode().tag(readonly=True) # see default below function_allow_unsafe_eval = tl.Bool(default_value=False).tag(readonly=True) + function_restrict_pipelines = tl.List(tl.Unicode(), default_value=[]).tag(readonly=True) _function_arn = tl.Unicode(default_value=None, allow_none=True) _function_last_modified = tl.Unicode(default_value=None, allow_none=True) _function_version = tl.Unicode(default_value=None, allow_none=True) @@ -173,6 +181,13 @@ def _function_name_default(self): return settings["FUNCTION_NAME"] + @tl.default("function_triggers") + def _function_triggers_default(self): + if self.function_eval_trigger != "eval": + return ["eval", self.function_eval_trigger] + else: + return ["eval"] + @tl.default("function_source_dist_key") def _function_source_dist_key_default(self): v = version.version() @@ -321,17 +336,24 @@ def _function_api_tags_default(self): return self.function_tags # podpac node parameters - source = tl.Instance(Node, help="Node to evaluate in a Lambda function.", allow_none=True).tag(attr=True) - source_output_format = tl.Unicode(default_value="pkl", help="Output format.") - source_output_name = tl.Unicode(help="Image output name.") - attrs = tl.Dict() # TODO: are we still using this? + source = tl.Instance(Node, allow_none=True).tag(attr=True) + source_output_format = tl.Unicode(default_value="netcdf") + source_output_name = tl.Unicode() + attrs = tl.Dict() download_result = tl.Bool(True).tag(attr=True) + force_compute = tl.Bool().tag(attr=True) @tl.default("source_output_name") def _source_output_name_default(self): return self.source.__class__.__name__ - # TODO: are this still being used? + @tl.default("force_compute") + def _force_compute_default(self): + if settings["FUNCTION_FORCE_COMPUTE"] is None: + settings["FUNCTION_FORCE_COMPUTE"] = False + + return settings["FUNCTION_FORCE_COMPUTE"] + @property def pipeline(self): """ @@ -353,16 +375,23 @@ def eval(self, coordinates, output=None): if self.source is None: raise ValueError("'source' node must be defined to eval") - if self.function_eval_trigger == "eval" or self.function_eval_trigger == "S3": - return self._eval_s3(coordinates, output=None) - else: + if self.function_eval_trigger == "eval": + return self._eval_invoke(coordinates, output) + elif self.function_eval_trigger == "S3": + return self._eval_s3(coordinates, output) + elif self.function_eval_trigger == "APIGateway": raise NotImplementedError("APIGateway trigger not yet implemented through eval") + else: + raise ValueError("Function trigger is not one of 'eval', 'S3', or 'APIGateway'") def build(self): """Build Lambda function and associated resources on AWS to run PODPAC pipelines """ + # TODO: move towards an architecture where the "create_" functions repair on each build + # and skip when the resources already exist + # see if current setup is valid, if so just return valid = self.validate() if valid: @@ -397,9 +426,8 @@ def build(self): # TODO: check to make sure function and API work together - # create S3 bucket - if self._bucket is None: - self.create_bucket() + # create S3 bucket - this will skip pieces that already exist + self.create_bucket() # check to see if setup is valid after creation # TODO: remove this in favor of something more granular?? @@ -545,6 +573,7 @@ def describe(self): Source Dependencies: {source_deps} Last Modified: {_function_last_modified} Version: {_function_version} + Restrict Evaluation: {function_restrict_pipelines} S3 Bucket: {function_s3_bucket} @@ -577,6 +606,7 @@ def describe(self): _function_arn=self._function_arn, _function_last_modified=self._function_last_modified, _function_version=self._function_version, + function_restrict_pipelines=self.function_restrict_pipelines, function_s3_bucket=self.function_s3_bucket, function_s3_tags=self.function_s3_tags, function_s3_input=self.function_s3_input, @@ -604,6 +634,10 @@ def create_function(self): _log.info("Lambda function will allow unsafe evaluation of Nodes with the current settings") self.function_env_variables["PODPAC_UNSAFE_EVAL"] = settings["UNSAFE_EVAL_HASH"] + if self.function_restrict_pipelines: + _log.info("Lambda function will only run for pipelines: {}".format(self.function_restrict_pipelines)) + self.function_env_variables["PODPAC_RESTRICT_PIPELINES"] = json.dumps(self.function_restrict_pipelines) + # if function already exists, this will return existing function function = create_function( self.session, @@ -818,26 +852,35 @@ def create_bucket(self): # after creating a bucket, you need to wait ~2 seconds before its active and can be uploaded to # this is not cool - time.sleep(2) + time.sleep(5) + + # get reference to s3 client for session + s3 = self.session.client("s3") # add podpac deps to bucket for version - # copy from user supplied dependencies - if self.function_source_dependencies_zip is not None: - put_object( - self.session, - self.function_s3_bucket, - self.function_s3_dependencies_key, - file=self.function_source_dependencies_zip, - ) + # see if the function depedencies exist in bucket + try: + s3.head_object(Bucket=self.function_s3_bucket, Key=self.function_s3_dependencies_key) + except botocore.exceptions.ClientError: + + # copy from user supplied dependencies + if self.function_source_dependencies_zip is not None: + put_object( + self.session, + self.function_s3_bucket, + self.function_s3_dependencies_key, + file=self.function_source_dependencies_zip, + ) - # copy resources from podpac dist - else: - s3resource = self.session.resource("s3") - copy_source = {"Bucket": self.function_source_bucket, "Key": self.function_source_dependencies_key} - s3resource.meta.client.copy(copy_source, self.function_s3_bucket, self.function_s3_dependencies_key) + # copy resources from podpac dist + else: + s3resource = self.session.resource("s3") + copy_source = {"Bucket": self.function_source_bucket, "Key": self.function_source_dependencies_key} + s3resource.meta.client.copy(copy_source, self.function_s3_bucket, self.function_s3_dependencies_key) - # TODO: remove eval from here once we implement "eval" through invoke - if "eval" in self.function_triggers or "S3" in self.function_triggers: + # Add S3 Function triggers, if they don't exist already + # TODO: add validition to see if trigger already exists + if "S3" in self.function_triggers: # add permission to invoke call lambda - this feels brittle due to source_arn statement_id = re.sub("[-_.]", "", self.function_s3_bucket) principle = "s3.amazonaws.com" @@ -845,7 +888,6 @@ def create_bucket(self): self.add_trigger(statement_id, principle, source_arn) # lambda integration on object creation events - s3 = self.session.client("s3") s3.put_bucket_notification_configuration( Bucket=self.function_s3_bucket, NotificationConfiguration={ @@ -859,6 +901,8 @@ def create_bucket(self): ] }, ) + else: + _log.debug("Skipping S3 trigger because 'S3' not in the function triggers") return bucket @@ -881,10 +925,22 @@ def validate_bucket(self): This should only be run after running `self.get_bucket()` """ - # TODO: needs to be implemented if self._bucket is None: return False + s3 = self.session.client("s3") + + # make sure dependencies are in there + try: + s3.head_object(Bucket=self.function_s3_bucket, Key=self.function_s3_dependencies_key) + except botocore.exceptions.ClientError: + _log.error("Failed to find PODPAC dependencies in bucket") + return False + + # TODO: make sure trigger exists + if "S3" in self.function_triggers: + pass + return True def delete_bucket(self, delete_objects=False): @@ -1017,7 +1073,7 @@ def delete_api(self): self._function_api_resource_id = None # Logs - def get_logs(self, limit=100, start=None, end=None): + def get_logs(self, limit=5, start=None, end=None): """Get Cloudwatch logs from lambda function execution See :func:`podpac.managers.aws.get_logs` @@ -1134,6 +1190,56 @@ def _set_api(self, api): # store a copy of the whole response from AWS self._api = api + def _create_eval_pipeline(self, coordinates): + """shorthand to create pipeline on eval""" + + # add coordinates to the pipeline + pipeline = self.pipeline # contains "pipeline" and "output" keys + pipeline["coordinates"] = json.loads(coordinates.json) + + # TODO: should we move this to `self.pipeline`? + pipeline["settings"] = settings.copy() # TODO: we should not wholesale copy settings here !! + pipeline["settings"][ + "FUNCTION_DEPENDENCIES_KEY" + ] = self.function_s3_dependencies_key # overwrite in case this is specified explicitly by class + + return pipeline + + def _eval_invoke(self, coordinates, output=None): + """eval node through invoke trigger""" + _log.debug("Evaluating pipeline via invoke") + + # create eval pipeline + pipeline = self._create_eval_pipeline(coordinates) + + # create lambda client + awslambda = self.session.client("lambda") + + # invoke + payload = bytes(json.dumps(pipeline, indent=4, cls=JSONEncoder).encode("UTF-8")) + response = awslambda.invoke( + FunctionName=self.function_name, + LogType="Tail", # include the execution log in the response. + Payload=payload, + ) + + _log.debug("Received response from lambda function") + + if "FunctionError" in response: + _log.error("Unhandled error from lambda function") + # logs = base64.b64decode(response["LogResult"]).decode("UTF-8").split('\n') + payload = json.loads(response["Payload"].read().decode("UTF-8")) + raise LambdaException( + "Error in lambda function evaluation:\n\nError Type: {}\nError Message: {}\nStack Trace: {}".format( + payload["errorType"], payload["errorMessage"], "\n".join(payload["stackTrace"]) + ) + ) + + # After waiting, load the pickle file like this: + payload = response["Payload"].read() + self._output = UnitsDataArray.open(payload) + return self._output + def _eval_s3(self, coordinates, output=None): """Evaluate node through s3 trigger""" @@ -1142,15 +1248,15 @@ def _eval_s3(self, coordinates, output=None): input_folder = "{}{}".format(self.function_s3_input, "/" if not self.function_s3_input.endswith("/") else "") output_folder = "{}{}".format(self.function_s3_output, "/" if not self.function_s3_output.endswith("/") else "") - # add coordinates to the pipeline - pipeline = self.pipeline - pipeline["coordinates"] = json.loads(coordinates.json) - pipeline["settings"] = { - "FUNCTION_S3_INPUT": input_folder, - "FUNCTION_S3_OUTPUT": output_folder, - "FUNCTION_DEPENDENCIES_KEY": self.function_s3_dependencies_key, - "UNSAFE_EVAL_HASH": settings["UNSAFE_EVAL_HASH"], - } + # create eval pipeline + pipeline = self._create_eval_pipeline(coordinates) + pipeline["settings"]["FUNCTION_FORCE_COMPUTE"] = self.force_compute + pipeline["settings"][ + "FUNCTION_S3_INPUT" + ] = input_folder # overwrite in case this is specified explicitly by class + pipeline["settings"][ + "FUNCTION_S3_OUTPUT" + ] = output_folder # overwrite in case this is specified explicitly by class # filename filename = "{folder}{output}_{source}_{coordinates}.{suffix}".format( @@ -1194,7 +1300,7 @@ def _eval_s3(self, coordinates, output=None): _log.debug("Received response from lambda function") response = s3.get_object(Key=filename, Bucket=self.function_s3_bucket) body = response["Body"].read() - self._output = cPickle.loads(body) + self._output = UnitsDataArray.open(body) return self._output def _eval_api(self, coordinates, output=None): @@ -1595,17 +1701,18 @@ def create_function( "Tags": function_tags, } + # read function from zip file + if function_source_dist_zip is not None: + raise NotImplementedError("Supplying a source dist zip from a local file is not yet supported") + # TODO: this fails when the file size is over a certain limit + # with open(function_source_dist_zip, "rb") as f: + # lambda_config["Code"]["ZipFile"] = f.read() + # read function from S3 (Default) - if function_source_bucket is not None and function_source_dist_key is not None: + elif function_source_bucket is not None and function_source_dist_key is not None: lambda_config["Code"]["S3Bucket"] = function_source_bucket lambda_config["Code"]["S3Key"] = function_source_dist_key - # read function from zip file - elif function_source_dist_zip is not None: - with open(function_source_dist_zip, "rb") as f: - lambda_config["Code"] = {} # reset the code dict to make sure S3Bucket and S3Key are overridden - lambda_config["Code"]["ZipFile"] = f.read() - else: raise ValueError("Function source is not defined") @@ -1697,8 +1804,10 @@ def update_function( # read function from zip file elif function_source_dist_zip is not None: - with open(function_source_dist_zip, "rb") as f: - lambda_config["ZipFile"] = f.read() + raise NotImplementedError("Supplying a source dist zip from a local file is not yet supported") + # TODO: this fails when the file size is over a certain limit + # with open(function_source_dist_zip, "rb") as f: + # lambda_config["ZipFile"] = f.read() else: raise ValueError("Function source is not defined") @@ -2292,4 +2401,16 @@ def get_logs(session, log_group_name, limit=100, start=None, end=None): ) logs += response["events"] + # sort logs + logs.sort(key=lambda k: k["timestamp"]) + + # take only the last "limit" + logs = logs[-limit:] + + # add time easier to read + for log in logs: + log["time"] = "{}.{:03d}".format( + datetime.fromtimestamp(log["timestamp"] / 1000).strftime("%Y-%m-%d %H:%M:%S"), log["timestamp"] % 1000 + ) + return logs diff --git a/podpac/core/settings.py b/podpac/core/settings.py index d11184a84..f5de84afc 100644 --- a/podpac/core/settings.py +++ b/podpac/core/settings.py @@ -53,6 +53,7 @@ "FUNCTION_DEPENDENCIES_KEY": None, "FUNCTION_S3_INPUT": None, "FUNCTION_S3_OUTPUT": None, + "FUNCTION_FORCE_COMPUTE": False, } @@ -134,6 +135,8 @@ class PodpacSettings(dict): Folder within :attr:`S3_BUCKET_NAME` to use for triggering node evaluation. FUNCTION_S3_OUTPUT : str Folder within :attr:`S3_BUCKET_NAME` to use for outputs. + FUNCTION_FORCE_COMPUTE : bool + Force the lambda function to compute pipeline, even if result is already cached. AUTOSAVE_SETTINGS: bool Save settings automatically as they are changed during runtime. Defaults to ``False``. MULTITHREADING: bool