diff --git a/sdk/python/feast/cli.py b/sdk/python/feast/cli.py index 8a3e1d2bf5..488f3011bd 100644 --- a/sdk/python/feast/cli.py +++ b/sdk/python/feast/cli.py @@ -364,11 +364,13 @@ def sync_offline_to_online(feature_table: str, start_time: str, end_time: str): """ Sync offline store to online. """ - import feast.pyspark.aws.jobs + from datetime import datetime client = Client() table = client.get_feature_table(feature_table) - feast.pyspark.aws.jobs.sync_offline_to_online(client, table, start_time, end_time) + client.start_offline_to_online_ingestion( + table, datetime.fromisoformat(start_time), datetime.fromisoformat(end_time) + ) @cli.command() @@ -424,5 +426,40 @@ def list_emr_jobs(): ) +@cli.command() +@click.option( + "--features", + "-f", + help="Features in feature_table:feature format, comma separated", + required=True, +) +@click.option( + "--entity-df-path", + "-e", + help="Path to entity df in CSV format. It is assumed to have event_timestamp column and a header.", + required=True, +) +@click.option("--destination", "-d", help="Destination", default="") +def get_historical_features(features: str, entity_df_path: str, destination: str): + """ + Get historical features + """ + import pandas + + client = Client() + + # TODO: clean this up + entity_df = pandas.read_csv(entity_df_path, sep=None, engine="python",) + + entity_df["event_timestamp"] = pandas.to_datetime(entity_df["event_timestamp"]) + + uploaded_df = client.stage_dataframe( + entity_df, "event_timestamp", "created_timestamp" + ) + + job = client.get_historical_features(features.split(","), uploaded_df,) + print(job.get_output_file_uri()) + + if __name__ == "__main__": cli() diff --git a/sdk/python/feast/client.py b/sdk/python/feast/client.py index 7e9e7bdeea..ed80714845 100644 --- a/sdk/python/feast/client.py +++ b/sdk/python/feast/client.py @@ -76,6 +76,7 @@ from feast.online_response import OnlineResponse, _infer_online_entity_rows from feast.pyspark.abc import RetrievalJob, SparkJob from feast.pyspark.launcher import ( + stage_dataframe, start_historical_feature_retrieval_job, start_historical_feature_retrieval_spark_session, start_offline_to_online_ingestion, @@ -885,9 +886,16 @@ def _get_feature_tables_from_feature_refs( return feature_tables def start_offline_to_online_ingestion( - self, - feature_table: Union[FeatureTable, str], - start: Union[datetime, str], - end: Union[datetime, str], + self, feature_table: Union[FeatureTable, str], start: datetime, end: datetime, ) -> SparkJob: return start_offline_to_online_ingestion(feature_table, start, end, self) # type: ignore + + def stage_dataframe( + self, + df: pd.DataFrame, + event_timestamp_column: str, + created_timestamp_column: str, + ) -> FileSource: + return stage_dataframe( + df, event_timestamp_column, created_timestamp_column, self + ) diff --git a/sdk/python/feast/constants.py b/sdk/python/feast/constants.py index a432fd1cc8..78f353d280 100644 --- a/sdk/python/feast/constants.py +++ b/sdk/python/feast/constants.py @@ -80,6 +80,16 @@ class AuthProvider(Enum): CONFIG_SPARK_HISTORICAL_FEATURE_OUTPUT_FORMAT = "historical_feature_output_format" CONFIG_SPARK_HISTORICAL_FEATURE_OUTPUT_LOCATION = "historical_feature_output_location" +CONFIG_REDIS_HOST = "redis_host" +CONFIG_REDIS_PORT = "redis_port" +CONFIG_REDIS_SSL = "redis_ssl" + +CONFIG_SPARK_EMR_REGION = "emr_region" +CONFIG_SPARK_EMR_CLUSTER_ID = "emr_cluster_id" +CONFIG_SPARK_EMR_CLUSTER_TEMPLATE_PATH = "emr_cluster_template_path" +CONFIG_SPARK_EMR_STAGING_LOCATION = "emr_staging_location" +CONFIG_SPARK_EMR_LOG_LOCATION = "emr_log_location" + # Configuration option default values FEAST_DEFAULT_OPTIONS = { diff --git a/sdk/python/feast/pyspark/abc.py b/sdk/python/feast/pyspark/abc.py index 15af1bd78a..fc162f0b2f 100644 --- a/sdk/python/feast/pyspark/abc.py +++ b/sdk/python/feast/pyspark/abc.py @@ -5,6 +5,10 @@ from enum import Enum from typing import Dict, List, Optional +import pandas + +from feast.data_source import FileSource + class SparkJobFailure(Exception): """ @@ -258,19 +262,31 @@ def __init__( start: datetime, end: datetime, jar: str, + redis_host: str, + redis_port: int, + redis_ssl: bool, ): self._feature_table = feature_table self._source = source self._start = start self._end = end self._jar = jar + self._redis_host = redis_host + self._redis_port = redis_port + self._redis_ssl = redis_ssl def get_name(self) -> str: return ( - f"BatchIngestion-{self._feature_table['name']}-" + f"BatchIngestion-{self.get_feature_table_name()}-" f"{self._start.strftime('%Y-%m-%d')}-{self._end.strftime('%Y-%m-%d')}" ) + def _get_redis_config(self): + return dict(host=self._redis_host, port=self._redis_port, ssl=self._redis_ssl) + + def get_feature_table_name(self) -> str: + return self._feature_table["name"] + def get_main_file_path(self) -> str: return self._jar @@ -289,6 +305,8 @@ def get_arguments(self) -> List[str]: self._start.strftime("%Y-%m-%dT%H:%M:%S"), "--end", self._end.strftime("%Y-%m-%dT%H:%M:%S"), + "--redis", + json.dumps(self._get_redis_config()), ] @@ -334,3 +352,18 @@ def offline_to_online_ingestion( IngestionJob: wrapper around remote job that can be used to check when job completed. """ raise NotImplementedError + + @abc.abstractmethod + def stage_dataframe( + self, + df: pandas.DataFrame, + event_timestamp_column: str, + created_timestamp_column: str, + ) -> FileSource: + """ + Upload a pandas dataframe so it is available to the Spark cluster. + + Returns: + FileSource: representing the uploaded dataframe. + """ + raise NotImplementedError diff --git a/sdk/python/feast/pyspark/historical_feature_retrieval_job.py b/sdk/python/feast/pyspark/historical_feature_retrieval_job.py index e65c200896..1d221e0f32 100644 --- a/sdk/python/feast/pyspark/historical_feature_retrieval_job.py +++ b/sdk/python/feast/pyspark/historical_feature_retrieval_job.py @@ -773,7 +773,7 @@ def _feature_table_from_dict(dct: Dict[str, Any]) -> FeatureTable: spark = SparkSession.builder.getOrCreate() args = _get_args() feature_tables_conf = json.loads(args.feature_tables) - feature_tables_sources_conf = json.loads(args.feature_tables_source) + feature_tables_sources_conf = json.loads(args.feature_tables_sources) entity_source_conf = json.loads(args.entity_source) destination_conf = json.loads(args.destination) start_job( diff --git a/sdk/python/feast/pyspark/launcher.py b/sdk/python/feast/pyspark/launcher.py index 9a0a3bfc63..61bf16cef6 100644 --- a/sdk/python/feast/pyspark/launcher.py +++ b/sdk/python/feast/pyspark/launcher.py @@ -6,10 +6,18 @@ from feast.config import Config from feast.constants import ( + CONFIG_REDIS_HOST, + CONFIG_REDIS_PORT, + CONFIG_REDIS_SSL, CONFIG_SPARK_DATAPROC_CLUSTER_NAME, CONFIG_SPARK_DATAPROC_PROJECT, CONFIG_SPARK_DATAPROC_REGION, CONFIG_SPARK_DATAPROC_STAGING_LOCATION, + CONFIG_SPARK_EMR_CLUSTER_ID, + CONFIG_SPARK_EMR_CLUSTER_TEMPLATE_PATH, + CONFIG_SPARK_EMR_LOG_LOCATION, + CONFIG_SPARK_EMR_REGION, + CONFIG_SPARK_EMR_STAGING_LOCATION, CONFIG_SPARK_HOME, CONFIG_SPARK_INGESTION_JOB_JAR, CONFIG_SPARK_LAUNCHER, @@ -50,7 +58,27 @@ def _dataproc_launcher(config: Config) -> JobLauncher: ) -_launchers = {"standalone": _standalone_launcher, "dataproc": _dataproc_launcher} +def _emr_launcher(config: Config) -> JobLauncher: + from feast.pyspark.launchers import aws + + def _get_optional(option): + if config.exists(option): + return config.get(option) + + return aws.EmrClusterLauncher( + region=config.get(CONFIG_SPARK_EMR_REGION), + existing_cluster_id=_get_optional(CONFIG_SPARK_EMR_CLUSTER_ID), + new_cluster_template_path=_get_optional(CONFIG_SPARK_EMR_CLUSTER_TEMPLATE_PATH), + staging_location=config.get(CONFIG_SPARK_EMR_STAGING_LOCATION), + emr_log_location=config.get(CONFIG_SPARK_EMR_LOG_LOCATION), + ) + + +_launchers = { + "standalone": _standalone_launcher, + "dataproc": _dataproc_launcher, + "emr": _emr_launcher, +} def resolve_launcher(config: Config) -> JobLauncher: @@ -177,5 +205,17 @@ def start_offline_to_online_ingestion( feature_table=_feature_table_to_argument(client, feature_table), start=start, end=end, + redis_host=client._config.get(CONFIG_REDIS_HOST), + redis_port=client._config.getint(CONFIG_REDIS_PORT), + redis_ssl=client._config.getboolean(CONFIG_REDIS_SSL), ) ) + + +def stage_dataframe( + df, event_timestamp_column: str, created_timestamp_column: str, client: "Client" +) -> FileSource: + launcher = resolve_launcher(client._config) + return launcher.stage_dataframe( + df, event_timestamp_column, created_timestamp_column, + ) diff --git a/sdk/python/feast/pyspark/launchers/aws/__init__.py b/sdk/python/feast/pyspark/launchers/aws/__init__.py index e69de29bb2..5bfb24f018 100644 --- a/sdk/python/feast/pyspark/launchers/aws/__init__.py +++ b/sdk/python/feast/pyspark/launchers/aws/__init__.py @@ -0,0 +1,3 @@ +from .emr import EmrClusterLauncher, EmrIngestionJob, EmrRetrievalJob + +__all__ = ["EmrRetrievalJob", "EmrIngestionJob", "EmrClusterLauncher"] diff --git a/sdk/python/feast/pyspark/launchers/aws/emr.py b/sdk/python/feast/pyspark/launchers/aws/emr.py new file mode 100644 index 0000000000..0e8a4c23a8 --- /dev/null +++ b/sdk/python/feast/pyspark/launchers/aws/emr.py @@ -0,0 +1,247 @@ +import os +import tempfile +from io import BytesIO +from typing import Any, Dict, Optional + +import boto3 +import pandas + +from feast.data_source import FileSource +from feast.pyspark.abc import ( + IngestionJob, + IngestionJobParameters, + JobLauncher, + RetrievalJob, + RetrievalJobParameters, + SparkJobStatus, +) + +from .emr_utils import ( + FAILED_STEP_STATES, + IN_PROGRESS_STEP_STATES, + SUCCEEDED_STEP_STATES, + TERMINAL_STEP_STATES, + EmrJobRef, + _get_job_state, + _historical_retrieval_step, + _load_new_cluster_template, + _random_string, + _s3_upload, + _sync_offline_to_online_step, + _upload_jar, + _wait_for_job_state, +) + + +class EmrJobMixin: + def __init__(self, emr_client, job_ref: EmrJobRef): + """ + Args: + emr_client: boto3 emr client + job_ref: job reference + """ + self._job_ref = job_ref + self._emr_client = emr_client + + def get_id(self) -> str: + return f'{self._job_ref.cluster_id}:{self._job_ref.step_id or ""}' + + def get_status(self) -> SparkJobStatus: + emr_state = _get_job_state(self._emr_client, self._job_ref) + if emr_state in IN_PROGRESS_STEP_STATES: + return SparkJobStatus.IN_PROGRESS + elif emr_state in SUCCEEDED_STEP_STATES: + return SparkJobStatus.COMPLETED + elif emr_state in FAILED_STEP_STATES: + return SparkJobStatus.FAILED + else: + # we should never get here + raise Exception("Invalid EMR state") + + +class EmrRetrievalJob(EmrJobMixin, RetrievalJob): + """ + Historical feature retrieval job result for a EMR cluster + """ + + def __init__(self, emr_client, job_ref: EmrJobRef, output_file_uri: str): + """ + This is the job object representing the historical retrieval job, returned by EmrClusterLauncher. + + Args: + output_file_uri (str): Uri to the historical feature retrieval job output file. + """ + super().__init__(emr_client, job_ref) + self._output_file_uri = output_file_uri + + def get_output_file_uri(self, timeout_sec=None): + _wait_for_job_state( + self._emr_client, self._job_ref, TERMINAL_STEP_STATES, timeout_sec + ) + return self._output_file_uri + + +class EmrIngestionJob(EmrJobMixin, IngestionJob): + """ + Ingestion job result for a EMR cluster + """ + + def __init__(self, emr_client, job_ref: EmrJobRef): + super().__init__(emr_client, job_ref) + + +class EmrClusterLauncher(JobLauncher): + """ + Submits jobs to an existing or new EMR cluster. Requires boto3 as an additional dependency. + """ + + _existing_cluster_id: Optional[str] + _new_cluster_template: Optional[Dict[str, Any]] + _staging_location: str + _emr_log_location: str + _region: str + + def __init__( + self, + *, + region: str, + existing_cluster_id: Optional[str], + new_cluster_template_path: Optional[str], + staging_location: str, + emr_log_location: str, + ): + """ + Initialize a dataproc job controller client, used internally for job submission and result + retrieval. Can work with either an existing EMR cluster, or create a cluster on-demand + for each job. + + Args: + region (str): + AWS region name. + existing_cluster_id (str): + Existing EMR cluster id, if using an existing cluster. + new_cluster_template_path (str): + Path to yaml new cluster template, if using a new cluster. + staging_location: + An S3 staging location for artifacts. + emr_log_location: + S3 location for EMR logs. + """ + + assert existing_cluster_id or new_cluster_template_path + + self._existing_cluster_id = existing_cluster_id + if new_cluster_template_path: + self._new_cluster_template = _load_new_cluster_template( + new_cluster_template_path + ) + else: + self._new_cluster_template = None + + self._staging_location = staging_location + self._emr_log_location = emr_log_location + self._region = region + + def _emr_client(self): + return boto3.client("emr", region_name=self._region) + + def _submit_emr_job(self, step: Dict[str, Any]) -> EmrJobRef: + """ + Submit EMR job using a new or existing cluster. + + Returns a job reference (cluster_id and step_id). + """ + + emr = self._emr_client() + + if self._existing_cluster_id: + step["ActionOnFailure"] = "CONTINUE" + step_ids = emr.add_job_flow_steps( + JobFlowId=self._existing_cluster_id, Steps=[step], + ) + return EmrJobRef(self._existing_cluster_id, step_ids["StepIds"][0]) + else: + assert self._new_cluster_template is not None + jobTemplate = self._new_cluster_template + step["ActionOnFailure"] = "TERMINATE_CLUSTER" + + jobTemplate["Steps"] = [step] + + if self._emr_log_location: + jobTemplate["LogUri"] = os.path.join( + self._emr_log_location, _random_string(5) + ) + + job = emr.run_job_flow(**jobTemplate) + return EmrJobRef(job["JobFlowId"], None) + + def historical_feature_retrieval( + self, job_params: RetrievalJobParameters + ) -> RetrievalJob: + + with open(job_params.get_main_file_path()) as f: + pyspark_script = f.read() + + pyspark_script_path = _s3_upload( + BytesIO(pyspark_script.encode("utf8")), + local_path="historical_retrieval.py", + remote_path_prefix=self._staging_location, + remote_path_suffix=".py", + ) + + step = _historical_retrieval_step( + pyspark_script_path, args=job_params.get_arguments() + ) + + job_ref = self._submit_emr_job(step) + + return EmrRetrievalJob( + self._emr_client(), + job_ref, + os.path.join(job_params.get_destination_path(), _random_string(8)), + ) + + def offline_to_online_ingestion( + self, ingestion_job_params: IngestionJobParameters + ) -> IngestionJob: + """ + Submits a batch ingestion job to a Spark cluster. + + Raises: + SparkJobFailure: The spark job submission failed, encountered error + during execution, or timeout. + + Returns: + IngestionJob: wrapper around remote job that can be used to check when job completed. + """ + + jar_s3_path = _upload_jar( + self._staging_location, ingestion_job_params.get_main_file_path() + ) + step = _sync_offline_to_online_step( + jar_s3_path, + ingestion_job_params.get_feature_table_name(), + args=ingestion_job_params.get_arguments(), + ) + + job_ref = self._submit_emr_job(step) + + return EmrIngestionJob(self._emr_client(), job_ref) + + def stage_dataframe( + self, df: pandas.DataFrame, event_timestamp: str, created_timestamp_column: str + ) -> FileSource: + with tempfile.NamedTemporaryFile() as f: + df.to_parquet(f) + file_url = _s3_upload( + f, + f.name, + remote_path_prefix=os.path.join(self._staging_location, "dataframes"), + remote_path_suffix=".parquet", + ) + return FileSource( + event_timestamp_column=event_timestamp, + created_timestamp_column=created_timestamp_column, + file_format="parquet", + file_url=file_url, + ) diff --git a/sdk/python/feast/pyspark/launchers/aws/emr_utils.py b/sdk/python/feast/pyspark/launchers/aws/emr_utils.py new file mode 100644 index 0000000000..3652c37f72 --- /dev/null +++ b/sdk/python/feast/pyspark/launchers/aws/emr_utils.py @@ -0,0 +1,355 @@ +import hashlib +import logging +import os +import random +import string +import tempfile +import time +from typing import IO, Any, Dict, List, NamedTuple, Optional, Tuple + +import boto3 +import botocore +import pandas +import yaml + +log = logging.getLogger("aws") + +SUPPORTED_EMR_VERSION = "emr-6.0.0" +STREAM_TO_ONLINE_JOB_TYPE = "STREAM_TO_ONLINE_JOB" +OFFLINE_TO_ONLINE_JOB_TYPE = "OFFLINE_TO_ONLINE_JOB" +HISTORICAL_RETRIEVAL_JOB_TYPE = "HISTORICAL_RETRIEVAL_JOB" + + +# Mapping of EMR states to "active" vs terminated for whatever reason +ACTIVE_STEP_STATES = ["PENDING", "CANCEL_PENDING", "RUNNING"] +TERMINAL_STEP_STATES = ["COMPLETED", "CANCELLED", "FAILED", "INTERRUPTED"] + +# Mapping of EMR states to generic states +IN_PROGRESS_STEP_STATES = ["PENDING", "CANCEL_PENDING", "RUNNING"] +SUCCEEDED_STEP_STATES = ["COMPLETED"] +FAILED_STEP_STATES = ["CANCELLED", "FAILED", "INTERRUPTED"] + + +def _sanity_check_cluster_template(template: Dict[str, Any], template_path: str): + """ + Sanity check the run job flow template. We don't really have to do this here but if the spark + job fails you'll only find out much later and this is annoying. Those are not exhaustive, just + some checks to help debugging common configuration issues. + """ + + releaseLabel = template.get("ReleaseLabel") + if releaseLabel != SUPPORTED_EMR_VERSION: + log.warn( + f"{template_path}: ReleaseLabel is set to {releaseLabel}. Recommended: {SUPPORTED_EMR_VERSION}" + ) + + +def _load_new_cluster_template(path: str) -> Dict[str, Any]: + with open(path) as f: + template = yaml.safe_load(f) + _sanity_check_cluster_template(template, path) + return template + + +def _random_string(length) -> str: + return "".join(random.choice(string.ascii_lowercase) for _ in range(length)) + + +def _s3_split_path(path: str) -> Tuple[str, str]: + """ Convert s3:// url to (bucket, key) """ + assert path.startswith("s3://") + _, _, bucket, key = path.split("/", 3) + return bucket, key + + +def _hash_fileobj(fileobj: IO[bytes]) -> str: + """ Compute sha256 hash of a file. File pointer will be reset to 0 on return. """ + fileobj.seek(0) + h = hashlib.sha256() + for block in iter(lambda: fileobj.read(2 ** 20), b""): + h.update(block) + fileobj.seek(0) + return h.hexdigest() + + +def _s3_upload( + fileobj: IO[bytes], + local_path: str, + *, + remote_path: Optional[str] = None, + remote_path_prefix: Optional[str] = None, + remote_path_suffix: Optional[str] = None, +) -> str: + """ + Upload a local file to S3. We store the file sha256 sum in S3 metadata and skip the upload + if the file hasn't changed. + + You can either specify remote_path or remote_path_prefix+remote_path_suffix. In the latter case, + the remote path will be computed as $remote_path_prefix/$sha256$remote_path_suffix + """ + + assert (remote_path is not None) or ( + remote_path_prefix is not None and remote_path_suffix is not None + ) + + sha256sum = _hash_fileobj(fileobj) + + if remote_path is None: + assert remote_path_prefix is not None + remote_path = os.path.join( + remote_path_prefix, f"{sha256sum}{remote_path_suffix}" + ) + + bucket, key = _s3_split_path(remote_path) + client = boto3.client("s3") + + try: + head_response = client.head_object(Bucket=bucket, Key=key) + if head_response["Metadata"]["sha256sum"] == sha256sum: + # File already exists + return remote_path + else: + log.info("Uploading {local_path} to {remote_path}") + client.upload_fileobj( + fileobj, bucket, key, ExtraArgs={"Metadata": {"sha256sum": sha256sum}}, + ) + return remote_path + except botocore.exceptions.ClientError as e: + if e.response["Error"]["Code"] == "404": + log.info("Uploading {local_path} to {remote_path}") + client.upload_fileobj( + fileobj, bucket, key, ExtraArgs={"Metadata": {"sha256sum": sha256sum}}, + ) + return remote_path + else: + raise + + +def _upload_jar(jar_s3_prefix: str, local_path: str) -> str: + with open(local_path, "rb") as f: + return _s3_upload( + f, + local_path, + remote_path=os.path.join(jar_s3_prefix, os.path.basename(local_path)), + ) + + +def _sync_offline_to_online_step( + jar_path: str, feature_table_name: str, args: List[str], +) -> Dict[str, Any]: + + return { + "Name": "Feast Ingestion", + "HadoopJarStep": { + "Properties": [ + { + "Key": "feast.step_metadata.job_type", + "Value": OFFLINE_TO_ONLINE_JOB_TYPE, + }, + { + "Key": "feast.step_metadata.offline_to_online.table_name", + "Value": feature_table_name, + }, + ], + "Args": [ + "spark-submit", + "--class", + "feast.ingestion.IngestionJob", + "--packages", + "com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.17.2", + jar_path, + ] + + args, + "Jar": "command-runner.jar", + }, + } + + +class JobInfo(NamedTuple): + job_type: str + cluster_id: str + step_id: str + table_name: str + state: str + + +def list_jobs( + emr_client, job_type: Optional[str], table_name: Optional[str], active_only=True +) -> List[JobInfo]: + """ + List Feast EMR jobs. + + Args: + job_type: optional filter by job type + table_name: optional filter by table name + active_only: filter only for "active" jobs, that is the ones that are running or pending, not terminated + + Returns: + A list of jobs. + """ + + paginator = emr_client.get_paginator("list_clusters") + res: List[JobInfo] = [] + for page in paginator.paginate( + ClusterStates=["STARTING", "BOOTSTRAPPING", "RUNNING", "WAITING", "TERMINATING"] + ): + for cluster in page["Clusters"]: + cluster_id = cluster["Id"] + step_paginator = emr_client.get_paginator("list_steps") + + list_steps_params = dict(ClusterId=cluster_id) + if active_only: + list_steps_params["StepStates"] = ACTIVE_STEP_STATES + + for step_page in step_paginator.paginate(**list_steps_params): + for step in step_page["Steps"]: + props = step["Config"]["Properties"] + if "feast.step_metadata.job_type" not in props: + continue + + step_table_name = props.get( + "feast.step_metadata.stream_to_online.table_name" + ) or props.get("feast.step_metadata.offline_to_online.table_name") + step_job_type = props["feast.step_metadata.job_type"] + + if table_name and step_table_name != table_name: + continue + + if job_type and step_job_type != job_type: + continue + + res.append( + JobInfo( + job_type=step_job_type, + cluster_id=cluster_id, + step_id=step["Id"], + state=step["Status"]["State"], + table_name=step_table_name, + ) + ) + return res + + +def _get_stream_to_online_job(emr_client, table_name: str) -> List[JobInfo]: + return list_jobs( + emr_client, + job_type=STREAM_TO_ONLINE_JOB_TYPE, + table_name=table_name, + active_only=True, + ) + + +class EmrJobRef(NamedTuple): + cluster_id: str + step_id: Optional[str] + + +def _get_first_step_id(emr_client, cluster_id: str) -> str: + response = emr_client.list_steps(ClusterId=cluster_id,) + assert len(response["Steps"]) == 1 + return response["Steps"][0]["Id"] + + +def _wait_for_job_state( + emr_client, + job: EmrJobRef, + desired_states: List[str], + timeout_seconds: Optional[int], +): + if job.step_id is None: + step_id = _get_first_step_id(emr_client, job.cluster_id) + else: + step_id = job.step_id + + _wait_for_step_state( + emr_client, job.cluster_id, step_id, desired_states, timeout_seconds + ) + + +def _get_job_state(emr_client, job: EmrJobRef): + if job.step_id is None: + step_id = _get_first_step_id(emr_client, job.cluster_id) + else: + step_id = job.step_id + + return _get_step_state(emr_client, job.cluster_id, step_id) + + +def _get_step_state(emr_client, cluster_id: str, step_id: str) -> str: + response = emr_client.describe_step(ClusterId=cluster_id, StepId=step_id) + state = response["Step"]["Status"]["State"] + return state + + +def _wait_for_step_state( + emr_client, + cluster_id: str, + step_id: str, + desired_states: List[str], + timeout_seconds: Optional[int], +): + """ + Wait up to timeout seconds for job to go into one of the desired states. + """ + start_time = time.time() + while (timeout_seconds is None) or (time.time() - start_time < timeout_seconds): + state = _get_step_state(emr_client, cluster_id, step_id) + if state in desired_states: + return + else: + time.sleep(0.5) + else: + raise TimeoutError( + f'Timeout waiting for job state to become {"|".join(desired_states)}' + ) + + +def _cancel_job(emr_client, job_type: str, table_name: str): + """ + Cancel a EMR job. + """ + jobs = list_jobs( + emr_client, job_type=job_type, table_name=table_name, active_only=True + ) + + for job in jobs: + emr_client.cancel_steps(ClusterId=job.cluster_id, StepIds=[job.step_id]) + + for job in jobs: + _wait_for_job_state( + emr_client, EmrJobRef(job.cluster_id, job.step_id), TERMINAL_STEP_STATES, 90 + ) + + +def stop_stream_to_online(emr_client, table_name: str): + """ + Stop offline-to-online ingestion job for the table. + """ + _cancel_job(emr_client, STREAM_TO_ONLINE_JOB_TYPE, table_name) + + +def _upload_dataframe(s3prefix: str, df: pandas.DataFrame) -> str: + with tempfile.NamedTemporaryFile() as f: + df.to_parquet(f) + return _s3_upload( + f, f.name, remote_path_prefix=s3prefix, remote_path_suffix=".parquet" + ) + + +def _historical_retrieval_step( + pyspark_script_path: str, args: List[str], +) -> Dict[str, Any]: + + return { + "Name": "Feast Historical Retrieval", + "HadoopJarStep": { + "Properties": [ + { + "Key": "feast.step_metadata.job_type", + "Value": HISTORICAL_RETRIEVAL_JOB_TYPE, + }, + ], + "Args": ["spark-submit", pyspark_script_path] + args, + "Jar": "command-runner.jar", + }, + } diff --git a/sdk/python/feast/pyspark/launchers/gcloud/dataproc.py b/sdk/python/feast/pyspark/launchers/gcloud/dataproc.py index 3bea32b60f..79bf8c2aaf 100644 --- a/sdk/python/feast/pyspark/launchers/gcloud/dataproc.py +++ b/sdk/python/feast/pyspark/launchers/gcloud/dataproc.py @@ -153,3 +153,8 @@ def offline_to_online_ingestion( self, job_params: IngestionJobParameters ) -> IngestionJob: return DataprocIngestionJob(self.dataproc_submit(job_params)) + + def stage_dataframe( + self, df, event_timestamp_column: str, created_timestamp_column: str, + ): + raise NotImplementedError diff --git a/sdk/python/feast/pyspark/launchers/standalone/local.py b/sdk/python/feast/pyspark/launchers/standalone/local.py index 58fa84643e..f1b10f8539 100644 --- a/sdk/python/feast/pyspark/launchers/standalone/local.py +++ b/sdk/python/feast/pyspark/launchers/standalone/local.py @@ -128,3 +128,8 @@ def offline_to_online_ingestion( ) -> IngestionJob: job_id = str(uuid.uuid4()) return StandaloneClusterIngestionJob(job_id, self.spark_submit(job_params)) + + def stage_dataframe( + self, df, event_timestamp_column: str, created_timestamp_column: str, + ): + raise NotImplementedError