From bc4ab5b9796fee0e69790673d548a90db81d40e9 Mon Sep 17 00:00:00 2001 From: Tsotne Tabidze Date: Thu, 5 Nov 2020 12:58:34 +0400 Subject: [PATCH 01/14] Implement Job Service control loop for stream ingestion jobs Signed-off-by: Tsotne Tabidze --- sdk/python/feast/constants.py | 3 + sdk/python/feast/job_service.py | 87 ++++++++++++++++- sdk/python/feast/pyspark/abc.py | 15 +++ sdk/python/feast/pyspark/launcher.py | 45 ++++++--- sdk/python/feast/pyspark/launchers/aws/emr.py | 15 +++ .../feast/pyspark/launchers/aws/emr_utils.py | 11 ++- .../pyspark/launchers/gcloud/dataproc.py | 3 + .../pyspark/launchers/standalone/local.py | 94 +++++++++++++++++-- 8 files changed, 243 insertions(+), 30 deletions(-) diff --git a/sdk/python/feast/constants.py b/sdk/python/feast/constants.py index 0a4b4f52e8..3f0614b097 100644 --- a/sdk/python/feast/constants.py +++ b/sdk/python/feast/constants.py @@ -52,6 +52,7 @@ class AuthProvider(Enum): CONFIG_JOB_SERVICE_URL_KEY = "job_service_url" CONFIG_JOB_SERVICE_ENABLE_SSL_KEY = "job_service_enable_ssl" CONFIG_JOB_SERVICE_SERVER_SSL_CERT_KEY = "job_service_server_ssl_cert" +CONFIG_JOB_SERVICE_ENABLE_CONTROL_LOOP = "job_service_enable_control_loop" CONFIG_GRPC_CONNECTION_TIMEOUT_DEFAULT_KEY = "grpc_connection_timeout_default" CONFIG_GRPC_CONNECTION_TIMEOUT_APPLY_KEY = "grpc_connection_timeout_apply" CONFIG_BATCH_FEATURE_REQUEST_WAIT_TIME_SECONDS_KEY = ( @@ -143,6 +144,8 @@ class AuthProvider(Enum): CONFIG_JOB_SERVICE_ENABLE_SSL_KEY: "False", # Path to certificate(s) to secure connection to Feast Job Service CONFIG_JOB_SERVICE_SERVER_SSL_CERT_KEY: "", + # Disable control loop by default for now + CONFIG_JOB_SERVICE_ENABLE_CONTROL_LOOP: "False", CONFIG_STATSD_ENABLED: "False", # IngestionJob DeadLetter Destination CONFIG_DEADLETTER_PATH: "", diff --git a/sdk/python/feast/job_service.py b/sdk/python/feast/job_service.py index 62f8f3bed9..6b0024ad5e 100644 --- a/sdk/python/feast/job_service.py +++ b/sdk/python/feast/job_service.py @@ -1,9 +1,13 @@ import logging +import threading +import time from concurrent.futures import ThreadPoolExecutor +from typing import Dict, Set, Tuple import grpc import feast +from feast.constants import CONFIG_JOB_SERVICE_ENABLE_CONTROL_LOOP from feast.core import JobService_pb2_grpc from feast.core.JobService_pb2 import ( CancelJobResponse, @@ -31,7 +35,9 @@ ) from feast.pyspark.launcher import ( get_job_by_id, + get_stream_to_online_ingestion_params, list_jobs, + list_jobs_by_hash, start_historical_feature_retrieval_job, start_offline_to_online_ingestion, start_stream_to_online_ingestion, @@ -44,8 +50,8 @@ class JobServiceServicer(JobService_pb2_grpc.JobServiceServicer): - def __init__(self): - self.client = feast.Client() + def __init__(self, client): + self.client = client def _job_to_proto(self, spark_job: SparkJob) -> JobProto: job = JobProto() @@ -117,6 +123,22 @@ def StartStreamToOnlineIngestionJob( feature_table = self.client.get_feature_table( request.table_name, request.project ) + + if self.client._config.get(CONFIG_JOB_SERVICE_ENABLE_CONTROL_LOOP) != "False": + # If the control loop is enabled, return existing stream ingestion job id instead of starting a new one + params = get_stream_to_online_ingestion_params( + self.client, request.project, feature_table, [] + ) + job_hash = params.get_job_hash() + jobs_by_hash = list_jobs_by_hash(self.client, include_terminated=False) + if job_hash not in jobs_by_hash: + raise RuntimeError( + "Feast Job Service has control loop enabled, but couldn't find the existing stream ingestion job for the given FeatureTable" + ) + return StartStreamToOnlineIngestionJobResponse( + id=jobs_by_hash[job_hash].get_id() + ) + # TODO: add extra_jars to request job = start_stream_to_online_ingestion( client=self.client, @@ -145,6 +167,54 @@ def GetJob(self, request, context): return GetJobResponse(job=self._job_to_proto(job)) +def run_control_loop(): + """Runs control loop that continuously ensures that correct jobs are being run.""" + logging.info( + "Feast Job Service is starting a control loop in a background thread..." + ) + client = feast.Client() + while True: + # Map of job hash -> (project, table_name) + table_metadata: Dict[str, Tuple[str, str]] = dict() + # Job hashes that should exist + final_job_hashes: Set[str] = set() + + for project in client.list_projects(): + feature_tables = client.list_feature_tables(project) + for feature_table in feature_tables: + if feature_table.stream_source is not None: + params = get_stream_to_online_ingestion_params( + client, project, feature_table, [] + ) + job_hash = params.get_job_hash() + final_job_hashes.add(job_hash) + table_metadata[job_hash] = (project, feature_table.name) + + jobs_by_hash = list_jobs_by_hash(client, include_terminated=False) + # Job hashes that currently exist + existing_job_hashes = set(jobs_by_hash.keys()) + + job_hashes_to_cancel = existing_job_hashes - final_job_hashes + job_hashes_to_start = final_job_hashes - existing_job_hashes + + for job_hash in job_hashes_to_cancel: + job = jobs_by_hash[job_hash] + logging.info( + f"Cancelling a stream ingestion job with job_hash={job_hash} job_id={job.get_id()} status={job.get_status()}" + ) + job.cancel() + + for job_hash in job_hashes_to_start: + project, table_name = table_metadata[job_hash] + logging.info( + f"Starting a stream ingestion job for project={project}, table_name={table_name} with job_hash={job_hash}" + ) + feature_table = client.get_feature_table(name=table_name, project=project) + start_stream_to_online_ingestion(client, project, feature_table, []) + + time.sleep(1) + + class HealthServicer(HealthService_pb2_grpc.HealthServicer): def Check(self, request, context): return HealthCheckResponse(status=ServingStatus.SERVING) @@ -164,10 +234,19 @@ def start_job_service(): log_fmt = "%(asctime)s %(levelname)s %(message)s" logging.basicConfig(level=logging.INFO, format=log_fmt) + client = feast.Client() + + if client._config.get(CONFIG_JOB_SERVICE_ENABLE_CONTROL_LOOP) != "False": + # Start the control loop thread only if it's enabled from configs + thread = threading.Thread(target=run_control_loop, daemon=True) + thread.start() + server = grpc.server(ThreadPoolExecutor(), interceptors=(LoggingInterceptor(),)) - JobService_pb2_grpc.add_JobServiceServicer_to_server(JobServiceServicer(), server) + JobService_pb2_grpc.add_JobServiceServicer_to_server( + JobServiceServicer(client), server + ) HealthService_pb2_grpc.add_HealthServicer_to_server(HealthServicer(), server) server.add_insecure_port("[::]:6568") server.start() - print("Feast job server listening on port :6568") + logging.info("Feast Job Service is listening on port :6568") server.wait_for_termination() diff --git a/sdk/python/feast/pyspark/abc.py b/sdk/python/feast/pyspark/abc.py index d3935ff65b..efa7083426 100644 --- a/sdk/python/feast/pyspark/abc.py +++ b/sdk/python/feast/pyspark/abc.py @@ -1,4 +1,5 @@ import abc +import hashlib import json import os from datetime import datetime @@ -360,6 +361,9 @@ def get_arguments(self) -> List[str]: return args + def get_job_hash(self) -> str: + raise NotImplementedError + class BatchIngestionJobParameters(IngestionJobParameters): def __init__( @@ -456,6 +460,13 @@ def get_arguments(self) -> List[str]: "online", ] + def get_job_hash(self) -> str: + job_json = json.dumps( + {"source": self._source, "feature_table": self._feature_table}, + sort_keys=True, + ) + return hashlib.md5(job_json.encode()).hexdigest() + class BatchIngestionJob(SparkJob): """ @@ -541,3 +552,7 @@ def get_job_by_id(self, job_id: str) -> SparkJob: @abc.abstractmethod def list_jobs(self, include_terminated: bool) -> List[SparkJob]: raise NotImplementedError + + @abc.abstractmethod + def list_jobs_by_hash(self, include_terminated: bool) -> Dict[str, SparkJob]: + raise NotImplementedError diff --git a/sdk/python/feast/pyspark/launcher.py b/sdk/python/feast/pyspark/launcher.py index e3e7c03e51..cafe2b1d46 100644 --- a/sdk/python/feast/pyspark/launcher.py +++ b/sdk/python/feast/pyspark/launcher.py @@ -1,5 +1,5 @@ from datetime import datetime -from typing import TYPE_CHECKING, List, Union +from typing import TYPE_CHECKING, Dict, List, Union from feast.config import Config from feast.constants import ( @@ -263,6 +263,26 @@ def start_offline_to_online_ingestion( ) +def get_stream_to_online_ingestion_params( + client: "Client", project: str, feature_table: FeatureTable, extra_jars: List[str] +) -> StreamIngestionJobParameters: + return StreamIngestionJobParameters( + jar=client._config.get(CONFIG_SPARK_INGESTION_JOB_JAR), + extra_jars=extra_jars, + source=_source_to_argument(feature_table.stream_source), + feature_table=_feature_table_to_argument(client, project, feature_table), + redis_host=client._config.get(CONFIG_REDIS_HOST), + redis_port=client._config.getint(CONFIG_REDIS_PORT), + redis_ssl=client._config.getboolean(CONFIG_REDIS_SSL), + statsd_host=client._config.getboolean(CONFIG_STATSD_ENABLED) + and client._config.get(CONFIG_STATSD_HOST), + statsd_port=client._config.getboolean(CONFIG_STATSD_ENABLED) + and client._config.getint(CONFIG_STATSD_PORT), + deadletter_path=client._config.get(CONFIG_DEADLETTER_PATH), + stencil_url=client._config.get(CONFIG_STENCIL_URL), + ) + + def start_stream_to_online_ingestion( client: "Client", project: str, feature_table: FeatureTable, extra_jars: List[str] ) -> StreamIngestionJob: @@ -270,20 +290,8 @@ def start_stream_to_online_ingestion( launcher = resolve_launcher(client._config) return launcher.start_stream_to_online_ingestion( - StreamIngestionJobParameters( - jar=client._config.get(CONFIG_SPARK_INGESTION_JOB_JAR), - extra_jars=extra_jars, - source=_source_to_argument(feature_table.stream_source), - feature_table=_feature_table_to_argument(client, project, feature_table), - redis_host=client._config.get(CONFIG_REDIS_HOST), - redis_port=client._config.getint(CONFIG_REDIS_PORT), - redis_ssl=client._config.getboolean(CONFIG_REDIS_SSL), - statsd_host=client._config.getboolean(CONFIG_STATSD_ENABLED) - and client._config.get(CONFIG_STATSD_HOST), - statsd_port=client._config.getboolean(CONFIG_STATSD_ENABLED) - and client._config.getint(CONFIG_STATSD_PORT), - deadletter_path=client._config.get(CONFIG_DEADLETTER_PATH), - stencil_url=client._config.get(CONFIG_STENCIL_URL), + get_stream_to_online_ingestion_params( + client, project, feature_table, extra_jars ) ) @@ -301,3 +309,10 @@ def get_job_by_id(job_id: str, client: "Client") -> SparkJob: def stage_dataframe(df, event_timestamp_column: str, client: "Client") -> FileSource: launcher = resolve_launcher(client._config) return launcher.stage_dataframe(df, event_timestamp_column) + + +def list_jobs_by_hash( + client: "Client", include_terminated: bool +) -> Dict[str, SparkJob]: + launcher = resolve_launcher(client._config) + return launcher.list_jobs_by_hash(include_terminated=include_terminated) diff --git a/sdk/python/feast/pyspark/launchers/aws/emr.py b/sdk/python/feast/pyspark/launchers/aws/emr.py index 02cb59c12c..94f866b787 100644 --- a/sdk/python/feast/pyspark/launchers/aws/emr.py +++ b/sdk/python/feast/pyspark/launchers/aws/emr.py @@ -288,6 +288,7 @@ def start_stream_to_online_ingestion( extra_jar_paths, ingestion_job_params.get_feature_table_name(), args=ingestion_job_params.get_arguments(), + job_hash=ingestion_job_params.get_job_hash(), ) job_ref = self._submit_emr_job(step) @@ -372,3 +373,17 @@ def get_job_by_id(self, job_id: str) -> SparkJob: return self._job_from_job_info(job_info) else: raise KeyError(f"Job not found {job_id}") + + def list_jobs_by_hash(self, include_terminated: bool) -> Dict[str, SparkJob]: + jobs = _list_jobs( + emr_client=self._emr_client(), + job_type=None, + table_name=None, + active_only=not include_terminated, + ) + + result = {} + for job_info in jobs: + if job_info.job_hash is not None: + result[job_info.job_hash] = self._job_from_job_info(job_info) + return result diff --git a/sdk/python/feast/pyspark/launchers/aws/emr_utils.py b/sdk/python/feast/pyspark/launchers/aws/emr_utils.py index 634d82ce78..72df3f92d8 100644 --- a/sdk/python/feast/pyspark/launchers/aws/emr_utils.py +++ b/sdk/python/feast/pyspark/launchers/aws/emr_utils.py @@ -183,6 +183,7 @@ class JobInfo(NamedTuple): state: str table_name: Optional[str] output_file_uri: Optional[str] + job_hash: Optional[str] def _list_jobs( @@ -228,6 +229,8 @@ def _list_jobs( "feast.step_metadata.historical_retrieval.output_file_uri" ) + job_hash = props.get("feast.step_metadata.job_hash") + if table_name and step_table_name != table_name: continue @@ -241,6 +244,7 @@ def _list_jobs( state=step["Status"]["State"], table_name=step_table_name, output_file_uri=output_file_uri, + job_hash=job_hash, ) ) return res @@ -364,7 +368,11 @@ def _historical_retrieval_step( def _stream_ingestion_step( - jar_path: str, extra_jar_paths: List[str], feature_table_name: str, args: List[str], + jar_path: str, + extra_jar_paths: List[str], + feature_table_name: str, + args: List[str], + job_hash: str, ) -> Dict[str, Any]: if extra_jar_paths: @@ -384,6 +392,7 @@ def _stream_ingestion_step( "Key": "feast.step_metadata.stream_to_online.table_name", "Value": feature_table_name, }, + {"Key": "feast.step_metadata.job_hash", "Value": job_hash}, ], "Args": ["spark-submit", "--class", "feast.ingestion.IngestionJob"] + jars_args diff --git a/sdk/python/feast/pyspark/launchers/gcloud/dataproc.py b/sdk/python/feast/pyspark/launchers/gcloud/dataproc.py index 1dfce4ce44..d6fe45009b 100644 --- a/sdk/python/feast/pyspark/launchers/gcloud/dataproc.py +++ b/sdk/python/feast/pyspark/launchers/gcloud/dataproc.py @@ -345,3 +345,6 @@ def list_jobs(self, include_terminated: bool) -> List[SparkJob]: project_id=self.project_id, region=self.region, filter=job_filter ) ] + + def list_jobs_by_hash(self, include_terminated: bool) -> Dict[str, SparkJob]: + raise NotImplementedError diff --git a/sdk/python/feast/pyspark/launchers/standalone/local.py b/sdk/python/feast/pyspark/launchers/standalone/local.py index 821a962ed3..45d9205093 100644 --- a/sdk/python/feast/pyspark/launchers/standalone/local.py +++ b/sdk/python/feast/pyspark/launchers/standalone/local.py @@ -1,9 +1,10 @@ import os import socket import subprocess +import threading import uuid from contextlib import closing -from typing import Dict, List +from typing import Dict, List, Optional import requests from requests.exceptions import RequestException @@ -22,9 +23,71 @@ StreamIngestionJobParameters, ) -# In-memory cache of Spark jobs -# This is necessary since we can't query Spark jobs in local mode -JOB_CACHE: Dict[str, SparkJob] = {} + +class JobCache: + """In-memory cache of Spark jobs. + + This is necessary since we can't query Spark jobs in local mode + + """ + + # Map of job_id -> spark job + job_by_id: Dict[str, SparkJob] + + # Map of job_id -> job_hash. The value can be None, indicating this job was + # manually created and Job Service isn't maintaining the state of this job + hash_by_id: Dict[str, Optional[str]] + + # This reentrant lock is necessary for multi-threading access + lock: threading.RLock + + def __init__(self): + self.job_by_id = {} + self.hash_by_id = {} + self.lock = threading.RLock() + + def add_job(self, job_id: str, job_hash: Optional[str], job: SparkJob) -> None: + """Add a Spark job to the cache with given job_id and job_cache. + + Args: + job_id (str): External ID of the Spark Job. + job_hash (Optional[str]): Computed hash of the Spark job. If None, Job Service + will ignore maintaining the state of this Spark job. + job (SparkJob): The new Spark job to add. + """ + with self.lock: + self.job_by_id[job_id] = job + self.hash_by_id[job_id] = job_hash + + def list_jobs(self) -> List[SparkJob]: + """List all Spark jobs in the cache.""" + with self.lock: + return list(self.job_by_id.values()) + + def get_job_by_id(self, job_id: str) -> SparkJob: + """Get a Spark job with the given ID. Throws an exception if such job doesn't exist. + + Args: + job_id (str): External ID of the Spark job to get. + + Returns: + SparkJob: The Spark job with the given ID. + """ + with self.lock: + return self.job_by_id[job_id] + + def list_jobs_by_hash(self) -> Dict[str, SparkJob]: + """Get a map of job_hash -> Spark job. Returns only jobs with non-None job hashes.""" + with self.lock: + jobs_by_hash = {} + for job_id, job in self.job_by_id.items(): + job_hash = self.hash_by_id[job_id] + if job_hash is not None: + jobs_by_hash[job_hash] = job + return jobs_by_hash + + +JOB_CACHE = JobCache() def _find_free_port(): @@ -230,7 +293,7 @@ def historical_feature_retrieval( self.spark_submit(job_params), job_params.get_destination_path(), ) - JOB_CACHE[job_id] = job + JOB_CACHE.add_job(job_id, None, job) return job def offline_to_online_ingestion( @@ -244,7 +307,7 @@ def offline_to_online_ingestion( self.spark_submit(ingestion_job_params, ui_port), ui_port, ) - JOB_CACHE[job_id] = job + JOB_CACHE.add_job(job_id, None, job) return job def start_stream_to_online_ingestion( @@ -258,22 +321,33 @@ def start_stream_to_online_ingestion( self.spark_submit(ingestion_job_params, ui_port), ui_port, ) - JOB_CACHE[job_id] = job + JOB_CACHE.add_job(job_id, ingestion_job_params.get_job_hash(), job) return job def stage_dataframe(self, df, event_timestamp_column: str): raise NotImplementedError def get_job_by_id(self, job_id: str) -> SparkJob: - return JOB_CACHE[job_id] + return JOB_CACHE.get_job_by_id(job_id) def list_jobs(self, include_terminated: bool) -> List[SparkJob]: if include_terminated is True: - return list(JOB_CACHE.values()) + return JOB_CACHE.list_jobs() else: return [ job - for job in JOB_CACHE.values() + for job in JOB_CACHE.list_jobs() if job.get_status() in (SparkJobStatus.STARTING, SparkJobStatus.IN_PROGRESS) ] + + def list_jobs_by_hash(self, include_terminated: bool) -> Dict[str, SparkJob]: + if include_terminated is True: + return JOB_CACHE.list_jobs_by_hash() + else: + return { + job_hash: job + for job_hash, job in JOB_CACHE.list_jobs_by_hash().items() + if job.get_status() + in (SparkJobStatus.STARTING, SparkJobStatus.IN_PROGRESS) + } From f01f55516af2d777f0e1382850987e84b080065b Mon Sep 17 00:00:00 2001 From: Tsotne Tabidze Date: Thu, 5 Nov 2020 13:38:53 +0400 Subject: [PATCH 02/14] Update sdk/python/feast/job_service.py Co-authored-by: Oleg Avdeev Signed-off-by: Tsotne Tabidze --- sdk/python/feast/job_service.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/feast/job_service.py b/sdk/python/feast/job_service.py index 6b0024ad5e..6021962dab 100644 --- a/sdk/python/feast/job_service.py +++ b/sdk/python/feast/job_service.py @@ -124,7 +124,7 @@ def StartStreamToOnlineIngestionJob( request.table_name, request.project ) - if self.client._config.get(CONFIG_JOB_SERVICE_ENABLE_CONTROL_LOOP) != "False": + if self.client._config.getboolean(CONFIG_JOB_SERVICE_ENABLE_CONTROL_LOOP): # If the control loop is enabled, return existing stream ingestion job id instead of starting a new one params = get_stream_to_online_ingestion_params( self.client, request.project, feature_table, [] From 32bc580f88457017f0abfdaf6717df90ec5df0f4 Mon Sep 17 00:00:00 2001 From: Tsotne Tabidze Date: Thu, 5 Nov 2020 13:39:01 +0400 Subject: [PATCH 03/14] Update sdk/python/feast/job_service.py Co-authored-by: Oleg Avdeev Signed-off-by: Tsotne Tabidze --- sdk/python/feast/job_service.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/feast/job_service.py b/sdk/python/feast/job_service.py index 6021962dab..f44874b3da 100644 --- a/sdk/python/feast/job_service.py +++ b/sdk/python/feast/job_service.py @@ -236,7 +236,7 @@ def start_job_service(): client = feast.Client() - if client._config.get(CONFIG_JOB_SERVICE_ENABLE_CONTROL_LOOP) != "False": + if client._config.getboolean(CONFIG_JOB_SERVICE_ENABLE_CONTROL_LOOP): # Start the control loop thread only if it's enabled from configs thread = threading.Thread(target=run_control_loop, daemon=True) thread.start() From 853ff8a47856278cd8914ceea8dc5d50dc76023d Mon Sep 17 00:00:00 2001 From: Tsotne Tabidze Date: Thu, 5 Nov 2020 22:34:56 +0400 Subject: [PATCH 04/14] Ensure jobservice isn't running with failed control loop; also other small fixes Signed-off-by: Tsotne Tabidze --- sdk/python/feast/job_service.py | 119 +++++++++++------- sdk/python/feast/pyspark/abc.py | 3 + .../pyspark/launchers/standalone/local.py | 18 +-- 3 files changed, 85 insertions(+), 55 deletions(-) diff --git a/sdk/python/feast/job_service.py b/sdk/python/feast/job_service.py index f44874b3da..e03be40e61 100644 --- a/sdk/python/feast/job_service.py +++ b/sdk/python/feast/job_service.py @@ -1,8 +1,11 @@ import logging +import os +import signal import threading import time +import traceback from concurrent.futures import ThreadPoolExecutor -from typing import Dict, Set, Tuple +from typing import Dict, Tuple import grpc @@ -167,52 +170,76 @@ def GetJob(self, request, context): return GetJobResponse(job=self._job_to_proto(job)) -def run_control_loop(): - """Runs control loop that continuously ensures that correct jobs are being run.""" - logging.info( - "Feast Job Service is starting a control loop in a background thread..." - ) - client = feast.Client() - while True: - # Map of job hash -> (project, table_name) - table_metadata: Dict[str, Tuple[str, str]] = dict() - # Job hashes that should exist - final_job_hashes: Set[str] = set() - - for project in client.list_projects(): - feature_tables = client.list_feature_tables(project) - for feature_table in feature_tables: - if feature_table.stream_source is not None: - params = get_stream_to_online_ingestion_params( - client, project, feature_table, [] - ) - job_hash = params.get_job_hash() - final_job_hashes.add(job_hash) - table_metadata[job_hash] = (project, feature_table.name) - - jobs_by_hash = list_jobs_by_hash(client, include_terminated=False) - # Job hashes that currently exist - existing_job_hashes = set(jobs_by_hash.keys()) - - job_hashes_to_cancel = existing_job_hashes - final_job_hashes - job_hashes_to_start = final_job_hashes - existing_job_hashes - - for job_hash in job_hashes_to_cancel: - job = jobs_by_hash[job_hash] - logging.info( - f"Cancelling a stream ingestion job with job_hash={job_hash} job_id={job.get_id()} status={job.get_status()}" - ) - job.cancel() +def get_expected_job_hash_to_table_refs(client) -> Dict[str, Tuple[str, str]]: + """Get the map of job hash -> (project, table_name) for expected stream ingestion jobs.""" + job_hash_to_table_refs = {} - for job_hash in job_hashes_to_start: - project, table_name = table_metadata[job_hash] - logging.info( - f"Starting a stream ingestion job for project={project}, table_name={table_name} with job_hash={job_hash}" - ) - feature_table = client.get_feature_table(name=table_name, project=project) - start_stream_to_online_ingestion(client, project, feature_table, []) + for project in client.list_projects(): + feature_tables = client.list_feature_tables(project) + for feature_table in feature_tables: + if feature_table.stream_source is not None: + params = get_stream_to_online_ingestion_params( + client, project, feature_table, [] + ) + job_hash = params.get_job_hash() + job_hash_to_table_refs[job_hash] = (project, feature_table.name) + + return job_hash_to_table_refs + + +def ensure_stream_ingestion_jobs(client: feast.Client): + """Runs a single iteration of the control loop to ensure correct set of stream ingestion jobs are running.""" + expected_job_hash_to_table_refs = get_expected_job_hash_to_table_refs(client) + expected_job_hashes = set(expected_job_hash_to_table_refs.keys()) + + jobs_by_hash = list_jobs_by_hash(client, include_terminated=False) + # Job hashes that currently exist + existing_job_hashes = set(jobs_by_hash.keys()) + + job_hashes_to_cancel = existing_job_hashes - expected_job_hashes + job_hashes_to_start = expected_job_hashes - existing_job_hashes - time.sleep(1) + for job_hash in job_hashes_to_cancel: + job = jobs_by_hash[job_hash] + logging.info( + f"Cancelling a stream ingestion job with job_hash={job_hash} job_id={job.get_id()} status={job.get_status()}" + ) + job.cancel() + + for job_hash in job_hashes_to_start: + # Any job that we wish to start should be among expected table refs map + project, table_name = expected_job_hash_to_table_refs[job_hash] + logging.info( + f"Starting a stream ingestion job for project={project}, table_name={table_name} with job_hash={job_hash}" + ) + feature_table = client.get_feature_table(name=table_name, project=project) + start_stream_to_online_ingestion(client, project, feature_table, []) + + +def start_control_loop(): + """Starts control loop that continuously ensures that correct jobs are being run. + + Currently this includes stream ingestion jobs. Every second the control loop will determine + - which stream ingestion jobs are running (from launcher layer) + - which stream ingestion jobs should be running (from feast core) + And it'll do 2 kinds of operations + - Cancel all running jobs that should not be running + - Start all non-existent jobs that should be running + """ + logging.info( + "Feast Job Service is starting a control loop in a background thread, " + "which will ensure that stream ingestion jobs are successfully running." + ) + try: + client = feast.Client() + while True: + ensure_stream_ingestion_jobs(client) + time.sleep(1) + except Exception: + traceback.print_exc() + finally: + # Send interrupt signal to the main thread to kill the server if control loop fails + os.kill(os.getpid(), signal.SIGINT) class HealthServicer(HealthService_pb2_grpc.HealthServicer): @@ -238,7 +265,7 @@ def start_job_service(): if client._config.getboolean(CONFIG_JOB_SERVICE_ENABLE_CONTROL_LOOP): # Start the control loop thread only if it's enabled from configs - thread = threading.Thread(target=run_control_loop, daemon=True) + thread = threading.Thread(target=start_control_loop, daemon=True) thread.start() server = grpc.server(ThreadPoolExecutor(), interceptors=(LoggingInterceptor(),)) diff --git a/sdk/python/feast/pyspark/abc.py b/sdk/python/feast/pyspark/abc.py index efa7083426..c833b2d7d3 100644 --- a/sdk/python/feast/pyspark/abc.py +++ b/sdk/python/feast/pyspark/abc.py @@ -479,6 +479,9 @@ class StreamIngestionJob(SparkJob): Container for the streaming ingestion job result """ + def get_job_hash(self): + raise NotImplementedError + class JobLauncher(abc.ABC): """ diff --git a/sdk/python/feast/pyspark/launchers/standalone/local.py b/sdk/python/feast/pyspark/launchers/standalone/local.py index 45d9205093..3ff56839fb 100644 --- a/sdk/python/feast/pyspark/launchers/standalone/local.py +++ b/sdk/python/feast/pyspark/launchers/standalone/local.py @@ -87,7 +87,7 @@ def list_jobs_by_hash(self) -> Dict[str, SparkJob]: return jobs_by_hash -JOB_CACHE = JobCache() +job_cache = JobCache() def _find_free_port(): @@ -293,7 +293,7 @@ def historical_feature_retrieval( self.spark_submit(job_params), job_params.get_destination_path(), ) - JOB_CACHE.add_job(job_id, None, job) + job_cache.add_job(job_id, None, job) return job def offline_to_online_ingestion( @@ -307,7 +307,7 @@ def offline_to_online_ingestion( self.spark_submit(ingestion_job_params, ui_port), ui_port, ) - JOB_CACHE.add_job(job_id, None, job) + job_cache.add_job(job_id, None, job) return job def start_stream_to_online_ingestion( @@ -321,33 +321,33 @@ def start_stream_to_online_ingestion( self.spark_submit(ingestion_job_params, ui_port), ui_port, ) - JOB_CACHE.add_job(job_id, ingestion_job_params.get_job_hash(), job) + job_cache.add_job(job_id, ingestion_job_params.get_job_hash(), job) return job def stage_dataframe(self, df, event_timestamp_column: str): raise NotImplementedError def get_job_by_id(self, job_id: str) -> SparkJob: - return JOB_CACHE.get_job_by_id(job_id) + return job_cache.get_job_by_id(job_id) def list_jobs(self, include_terminated: bool) -> List[SparkJob]: if include_terminated is True: - return JOB_CACHE.list_jobs() + return job_cache.list_jobs() else: return [ job - for job in JOB_CACHE.list_jobs() + for job in job_cache.list_jobs() if job.get_status() in (SparkJobStatus.STARTING, SparkJobStatus.IN_PROGRESS) ] def list_jobs_by_hash(self, include_terminated: bool) -> Dict[str, SparkJob]: if include_terminated is True: - return JOB_CACHE.list_jobs_by_hash() + return job_cache.list_jobs_by_hash() else: return { job_hash: job - for job_hash, job in JOB_CACHE.list_jobs_by_hash().items() + for job_hash, job in job_cache.list_jobs_by_hash().items() if job.get_status() in (SparkJobStatus.STARTING, SparkJobStatus.IN_PROGRESS) } From 7f8c34a949d932310d0e979666ffb8a63e48e352 Mon Sep 17 00:00:00 2001 From: Tsotne Tabidze Date: Fri, 6 Nov 2020 20:44:30 +0400 Subject: [PATCH 05/14] Code restructure (see commit message details) - Move ensure_stream_ingestion_jobs to client - Move _job_to_proto to SparkJob & its subclasses (as .to_proto methods) - Remove list_jobs_by_hash from job launcher layer - Add .get_hash() to StreamIngestionJob objects, which replaces list_jobs_by_hash in a much nicer way - Add logic in dataproc launcher (completes all 3 spark modes) - Add hash to Job proto & RemoteStreamIngestionJob - Add bunch of docstrings Signed-off-by: Tsotne Tabidze --- protos/feast/core/JobService.proto | 2 + sdk/python/feast/client.py | 92 +++++++++++++- sdk/python/feast/job_service.py | 115 ++---------------- sdk/python/feast/pyspark/abc.py | 58 +++++++-- sdk/python/feast/pyspark/launcher.py | 9 +- sdk/python/feast/pyspark/launchers/aws/emr.py | 32 +++-- .../pyspark/launchers/gcloud/dataproc.py | 28 ++++- .../pyspark/launchers/standalone/local.py | 54 ++++---- sdk/python/feast/remote_job.py | 13 +- 9 files changed, 217 insertions(+), 186 deletions(-) diff --git a/protos/feast/core/JobService.proto b/protos/feast/core/JobService.proto index d3924ecc71..d0ae6ac05f 100644 --- a/protos/feast/core/JobService.proto +++ b/protos/feast/core/JobService.proto @@ -72,6 +72,8 @@ message Job { JobType type = 2; // Current job status JobStatus status = 3; + // Deterministic hash of the Job + string hash = 8; message RetrievalJobMeta { string output_location = 4; diff --git a/sdk/python/feast/client.py b/sdk/python/feast/client.py index 5e4b78b5ff..ca70d2c82b 100644 --- a/sdk/python/feast/client.py +++ b/sdk/python/feast/client.py @@ -18,7 +18,7 @@ import uuid from datetime import datetime from itertools import groupby -from typing import Any, Dict, List, Optional, Union +from typing import Any, Dict, List, Optional, Tuple, Union import grpc import pandas as pd @@ -90,9 +90,10 @@ _write_partitioned_table_from_source, ) from feast.online_response import OnlineResponse, _infer_online_entity_rows -from feast.pyspark.abc import RetrievalJob, SparkJob +from feast.pyspark.abc import RetrievalJob, SparkJob, StreamIngestionJob from feast.pyspark.launcher import ( get_job_by_id, + get_stream_to_online_ingestion_params, list_jobs, stage_dataframe, start_historical_feature_retrieval_job, @@ -1098,12 +1099,15 @@ def start_offline_to_online_ingestion( ) def start_stream_to_online_ingestion( - self, feature_table: FeatureTable, extra_jars: Optional[List[str]] = None, + self, + feature_table: FeatureTable, + extra_jars: Optional[List[str]] = None, + project: str = None, ) -> SparkJob: if not self._use_job_service: return start_stream_to_online_ingestion( client=self, - project=self.project, + project=project or self.project, feature_table=feature_table, extra_jars=extra_jars or [], ) @@ -1113,8 +1117,86 @@ def start_stream_to_online_ingestion( ) response = self._job_service.StartStreamToOnlineIngestionJob(request) return RemoteStreamIngestionJob( - self._job_service, self._extra_grpc_params, response.id, + self._job_service, self._extra_grpc_params, response.id + ) + + def _get_expected_job_hash_to_table_refs( + self, all_projects: bool + ) -> Dict[str, Tuple[str, str]]: + """ + Checks all feature tables for the requires project(s) and determines all required stream + ingestion jobs from them. Outputs a map of the expected job_hash to a tuple of (project, table_name). + + Args: + all_projects (bool): If true, runs the check for all project. + Otherwise only checks the current project. + + Returns: + Dict[str, Tuple[str, str]]: Map of job_hash -> (project, table_name) for expected stream ingestion jobs + """ + job_hash_to_table_refs = {} + + projects = self.list_projects() if all_projects else [self.project] + for project in projects: + feature_tables = self.list_feature_tables(project) + for feature_table in feature_tables: + if feature_table.stream_source is not None: + params = get_stream_to_online_ingestion_params( + self, project, feature_table, [] + ) + job_hash = params.get_job_hash() + job_hash_to_table_refs[job_hash] = (project, feature_table.name) + + return job_hash_to_table_refs + + def ensure_stream_ingestion_jobs(self, all_projects: bool = False): + """Ensures all required stream ingestion jobs are running and cleans up the unnecessary jobs. + + More concretely, it will determine + - which stream ingestion jobs are running + - which stream ingestion jobs should be running + And it'll do 2 kinds of operations + - Cancel all running jobs that should not be running + - Start all non-existent jobs that should be running + + Args: + all_projects (bool, optional): If true, runs the check for all project. + Otherwise only checks the current project. Defaults to False. + """ + expected_job_hash_to_table_refs = self._get_expected_job_hash_to_table_refs( + all_projects + ) + expected_job_hashes = set(expected_job_hash_to_table_refs.keys()) + + jobs_by_hash: Dict[str, StreamIngestionJob] = {} + for job in self.list_jobs(include_terminated=False): + if isinstance(job, StreamIngestionJob): + jobs_by_hash[job.get_hash()] = job + + existing_job_hashes = set(jobs_by_hash.keys()) + + job_hashes_to_cancel = existing_job_hashes - expected_job_hashes + job_hashes_to_start = expected_job_hashes - existing_job_hashes + + logging.info( + f"existing_job_hashes = {sorted(list(existing_job_hashes))} expected_job_hashes = {sorted(list(expected_job_hashes))}" + ) + + for job_hash in job_hashes_to_cancel: + job = jobs_by_hash[job_hash] + logging.info( + f"Cancelling a stream ingestion job with job_hash={job_hash} job_id={job.get_id()} status={job.get_status()}" + ) + job.cancel() + + for job_hash in job_hashes_to_start: + # Any job that we wish to start should be among expected table refs map + project, table_name = expected_job_hash_to_table_refs[job_hash] + logging.info( + f"Starting a stream ingestion job for project={project}, table_name={table_name} with job_hash={job_hash}" ) + feature_table = self.get_feature_table(name=table_name, project=project) + self.start_stream_to_online_ingestion(feature_table, [], project=project) def list_jobs(self, include_terminated: bool) -> List[SparkJob]: if not self._use_job_service: diff --git a/sdk/python/feast/job_service.py b/sdk/python/feast/job_service.py index e03be40e61..859478508f 100644 --- a/sdk/python/feast/job_service.py +++ b/sdk/python/feast/job_service.py @@ -5,7 +5,6 @@ import time import traceback from concurrent.futures import ThreadPoolExecutor -from typing import Dict, Tuple import grpc @@ -17,11 +16,6 @@ GetHistoricalFeaturesRequest, GetHistoricalFeaturesResponse, GetJobResponse, -) -from feast.core.JobService_pb2 import Job as JobProto -from feast.core.JobService_pb2 import ( - JobStatus, - JobType, ListJobsResponse, StartOfflineToOnlineIngestionJobRequest, StartOfflineToOnlineIngestionJobResponse, @@ -29,18 +23,11 @@ StartStreamToOnlineIngestionJobResponse, ) from feast.data_source import DataSource -from feast.pyspark.abc import ( - BatchIngestionJob, - RetrievalJob, - SparkJob, - SparkJobStatus, - StreamIngestionJob, -) +from feast.pyspark.abc import StreamIngestionJob from feast.pyspark.launcher import ( get_job_by_id, get_stream_to_online_ingestion_params, list_jobs, - list_jobs_by_hash, start_historical_feature_retrieval_job, start_offline_to_online_ingestion, start_stream_to_online_ingestion, @@ -56,33 +43,6 @@ class JobServiceServicer(JobService_pb2_grpc.JobServiceServicer): def __init__(self, client): self.client = client - def _job_to_proto(self, spark_job: SparkJob) -> JobProto: - job = JobProto() - job.id = spark_job.get_id() - status = spark_job.get_status() - if status == SparkJobStatus.COMPLETED: - job.status = JobStatus.JOB_STATUS_DONE - elif status == SparkJobStatus.IN_PROGRESS: - job.status = JobStatus.JOB_STATUS_RUNNING - elif status == SparkJobStatus.FAILED: - job.status = JobStatus.JOB_STATUS_ERROR - elif status == SparkJobStatus.STARTING: - job.status = JobStatus.JOB_STATUS_PENDING - else: - raise ValueError(f"Invalid job status {status}") - - if isinstance(spark_job, RetrievalJob): - job.type = JobType.RETRIEVAL_JOB - job.retrieval.output_location = spark_job.get_output_file_uri(block=False) - elif isinstance(spark_job, BatchIngestionJob): - job.type = JobType.BATCH_INGESTION_JOB - elif isinstance(spark_job, StreamIngestionJob): - job.type = JobType.STREAM_INGESTION_JOB - else: - raise ValueError(f"Invalid job type {job}") - - return job - def StartOfflineToOnlineIngestionJob( self, request: StartOfflineToOnlineIngestionJobRequest, context ): @@ -133,13 +93,11 @@ def StartStreamToOnlineIngestionJob( self.client, request.project, feature_table, [] ) job_hash = params.get_job_hash() - jobs_by_hash = list_jobs_by_hash(self.client, include_terminated=False) - if job_hash not in jobs_by_hash: - raise RuntimeError( - "Feast Job Service has control loop enabled, but couldn't find the existing stream ingestion job for the given FeatureTable" - ) - return StartStreamToOnlineIngestionJobResponse( - id=jobs_by_hash[job_hash].get_id() + for job in list_jobs(include_terminated=True, client=self.client): + if isinstance(job, StreamIngestionJob) and job.get_hash() == job_hash: + return StartStreamToOnlineIngestionJobResponse(id=job.get_id()) + raise RuntimeError( + "Feast Job Service has control loop enabled, but couldn't find the existing stream ingestion job for the given FeatureTable" ) # TODO: add extra_jars to request @@ -156,7 +114,7 @@ def ListJobs(self, request, context): jobs = list_jobs( include_terminated=request.include_terminated, client=self.client ) - return ListJobsResponse(jobs=[self._job_to_proto(job) for job in jobs]) + return ListJobsResponse(jobs=[job.to_proto() for job in jobs]) def CancelJob(self, request, context): """Stop a single job""" @@ -167,64 +125,15 @@ def CancelJob(self, request, context): def GetJob(self, request, context): """Get details of a single job""" job = get_job_by_id(request.job_id, client=self.client) - return GetJobResponse(job=self._job_to_proto(job)) - - -def get_expected_job_hash_to_table_refs(client) -> Dict[str, Tuple[str, str]]: - """Get the map of job hash -> (project, table_name) for expected stream ingestion jobs.""" - job_hash_to_table_refs = {} - - for project in client.list_projects(): - feature_tables = client.list_feature_tables(project) - for feature_table in feature_tables: - if feature_table.stream_source is not None: - params = get_stream_to_online_ingestion_params( - client, project, feature_table, [] - ) - job_hash = params.get_job_hash() - job_hash_to_table_refs[job_hash] = (project, feature_table.name) - - return job_hash_to_table_refs - - -def ensure_stream_ingestion_jobs(client: feast.Client): - """Runs a single iteration of the control loop to ensure correct set of stream ingestion jobs are running.""" - expected_job_hash_to_table_refs = get_expected_job_hash_to_table_refs(client) - expected_job_hashes = set(expected_job_hash_to_table_refs.keys()) - - jobs_by_hash = list_jobs_by_hash(client, include_terminated=False) - # Job hashes that currently exist - existing_job_hashes = set(jobs_by_hash.keys()) - - job_hashes_to_cancel = existing_job_hashes - expected_job_hashes - job_hashes_to_start = expected_job_hashes - existing_job_hashes - - for job_hash in job_hashes_to_cancel: - job = jobs_by_hash[job_hash] - logging.info( - f"Cancelling a stream ingestion job with job_hash={job_hash} job_id={job.get_id()} status={job.get_status()}" - ) - job.cancel() - - for job_hash in job_hashes_to_start: - # Any job that we wish to start should be among expected table refs map - project, table_name = expected_job_hash_to_table_refs[job_hash] - logging.info( - f"Starting a stream ingestion job for project={project}, table_name={table_name} with job_hash={job_hash}" - ) - feature_table = client.get_feature_table(name=table_name, project=project) - start_stream_to_online_ingestion(client, project, feature_table, []) + return GetJobResponse(job=job.to_proto()) def start_control_loop(): """Starts control loop that continuously ensures that correct jobs are being run. - Currently this includes stream ingestion jobs. Every second the control loop will determine - - which stream ingestion jobs are running (from launcher layer) - - which stream ingestion jobs should be running (from feast core) - And it'll do 2 kinds of operations - - Cancel all running jobs that should not be running - - Start all non-existent jobs that should be running + Currently this affects only the stream ingestion jobs. Please refer to + Client:ensure_stream_ingestion_jobs for full documentation on how the check works. + """ logging.info( "Feast Job Service is starting a control loop in a background thread, " @@ -233,7 +142,7 @@ def start_control_loop(): try: client = feast.Client() while True: - ensure_stream_ingestion_jobs(client) + client.ensure_stream_ingestion_jobs(all_projects=True) time.sleep(1) except Exception: traceback.print_exc() diff --git a/sdk/python/feast/pyspark/abc.py b/sdk/python/feast/pyspark/abc.py index c833b2d7d3..d58019c2f9 100644 --- a/sdk/python/feast/pyspark/abc.py +++ b/sdk/python/feast/pyspark/abc.py @@ -8,6 +8,9 @@ import pandas +from feast.core.JobService_pb2 import Job as JobProto +from feast.core.JobService_pb2 import JobStatus as JobStatusProto +from feast.core.JobService_pb2 import JobType as JobTypeProto from feast.data_source import FileSource @@ -67,6 +70,27 @@ def cancel(self): """ raise NotImplementedError + def to_proto(self) -> JobProto: + """Converts SparkJob to the protobuf object. + + Returns: + JobProto: Converted protobuf object. + """ + job = JobProto() + job.id = self.get_id() + status = self.get_status() + if status == SparkJobStatus.COMPLETED: + job.status = JobStatusProto.JOB_STATUS_DONE + elif status == SparkJobStatus.IN_PROGRESS: + job.status = JobStatusProto.JOB_STATUS_RUNNING + elif status == SparkJobStatus.FAILED: + job.status = JobStatusProto.JOB_STATUS_ERROR + elif status == SparkJobStatus.STARTING: + job.status = JobStatusProto.JOB_STATUS_PENDING + else: + raise ValueError(f"Invalid job status {status}") + return job + class SparkJobParameters(abc.ABC): @abc.abstractmethod @@ -290,6 +314,12 @@ def get_output_file_uri(self, timeout_sec=None, block=True): """ raise NotImplementedError + def to_proto(self) -> JobProto: + job = super().to_proto() + job.type = JobTypeProto.RETRIEVAL_JOB + job.retrieval.output_location = self.get_output_file_uri(block=False) + return job + class IngestionJobParameters(SparkJobParameters): def __init__( @@ -361,9 +391,6 @@ def get_arguments(self) -> List[str]: return args - def get_job_hash(self) -> str: - raise NotImplementedError - class BatchIngestionJobParameters(IngestionJobParameters): def __init__( @@ -473,15 +500,34 @@ class BatchIngestionJob(SparkJob): Container for the ingestion job result """ + def to_proto(self) -> JobProto: + job = super().to_proto() + job.type = JobTypeProto.BATCH_INGESTION_JOB + return job + class StreamIngestionJob(SparkJob): """ Container for the streaming ingestion job result """ - def get_job_hash(self): + def get_hash(self) -> str: + """Gets the consistent hash of this stream ingestion job. + + The hash needs to be persisted at the data processing layer, s.t. + we can get the same hash when retrieving the job from Spark. + + Returns: + str: The hash for this streaming ingestion job + """ raise NotImplementedError + def to_proto(self) -> JobProto: + job = super().to_proto() + job.type = JobTypeProto.STREAM_INGESTION_JOB + job.hash = self.get_hash() + return job + class JobLauncher(abc.ABC): """ @@ -555,7 +601,3 @@ def get_job_by_id(self, job_id: str) -> SparkJob: @abc.abstractmethod def list_jobs(self, include_terminated: bool) -> List[SparkJob]: raise NotImplementedError - - @abc.abstractmethod - def list_jobs_by_hash(self, include_terminated: bool) -> Dict[str, SparkJob]: - raise NotImplementedError diff --git a/sdk/python/feast/pyspark/launcher.py b/sdk/python/feast/pyspark/launcher.py index cafe2b1d46..c73472b770 100644 --- a/sdk/python/feast/pyspark/launcher.py +++ b/sdk/python/feast/pyspark/launcher.py @@ -1,5 +1,5 @@ from datetime import datetime -from typing import TYPE_CHECKING, Dict, List, Union +from typing import TYPE_CHECKING, List, Union from feast.config import Config from feast.constants import ( @@ -309,10 +309,3 @@ def get_job_by_id(job_id: str, client: "Client") -> SparkJob: def stage_dataframe(df, event_timestamp_column: str, client: "Client") -> FileSource: launcher = resolve_launcher(client._config) return launcher.stage_dataframe(df, event_timestamp_column) - - -def list_jobs_by_hash( - client: "Client", include_terminated: bool -) -> Dict[str, SparkJob]: - launcher = resolve_launcher(client._config) - return launcher.list_jobs_by_hash(include_terminated=include_terminated) diff --git a/sdk/python/feast/pyspark/launchers/aws/emr.py b/sdk/python/feast/pyspark/launchers/aws/emr.py index 94f866b787..42b5348c29 100644 --- a/sdk/python/feast/pyspark/launchers/aws/emr.py +++ b/sdk/python/feast/pyspark/launchers/aws/emr.py @@ -118,8 +118,12 @@ class EmrStreamIngestionJob(EmrJobMixin, StreamIngestionJob): Ingestion streaming job for a EMR cluster """ - def __init__(self, emr_client, job_ref: EmrJobRef): + def __init__(self, emr_client, job_ref: EmrJobRef, job_hash: str): super().__init__(emr_client, job_ref) + self._job_hash = job_hash + + def get_hash(self) -> str: + return self._job_hash class EmrClusterLauncher(JobLauncher): @@ -283,17 +287,19 @@ def start_stream_to_online_ingestion( else: extra_jar_paths.append(_upload_jar(self._staging_location, extra_jar)) + job_hash = ingestion_job_params.get_job_hash() + step = _stream_ingestion_step( jar_s3_path, extra_jar_paths, ingestion_job_params.get_feature_table_name(), args=ingestion_job_params.get_arguments(), - job_hash=ingestion_job_params.get_job_hash(), + job_hash=job_hash, ) job_ref = self._submit_emr_job(step) - return EmrStreamIngestionJob(self._emr_client(), job_ref) + return EmrStreamIngestionJob(self._emr_client(), job_ref, job_hash) def stage_dataframe(self, df: pandas.DataFrame, event_timestamp: str) -> FileSource: with tempfile.NamedTemporaryFile() as f: @@ -323,8 +329,12 @@ def _job_from_job_info(self, job_info: JobInfo) -> SparkJob: emr_client=self._emr_client(), job_ref=job_info.job_ref, ) elif job_info.job_type == STREAM_TO_ONLINE_JOB_TYPE: + # job_hash must not be None for stream ingestion jobs + assert job_info.job_hash is not None return EmrStreamIngestionJob( - emr_client=self._emr_client(), job_ref=job_info.job_ref, + emr_client=self._emr_client(), + job_ref=job_info.job_ref, + job_hash=job_info.job_hash, ) else: # We should never get here @@ -373,17 +383,3 @@ def get_job_by_id(self, job_id: str) -> SparkJob: return self._job_from_job_info(job_info) else: raise KeyError(f"Job not found {job_id}") - - def list_jobs_by_hash(self, include_terminated: bool) -> Dict[str, SparkJob]: - jobs = _list_jobs( - emr_client=self._emr_client(), - job_type=None, - table_name=None, - active_only=not include_terminated, - ) - - result = {} - for job_info in jobs: - if job_info.job_hash is not None: - result[job_info.job_hash] = self._job_from_job_info(job_info) - return result diff --git a/sdk/python/feast/pyspark/launchers/gcloud/dataproc.py b/sdk/python/feast/pyspark/launchers/gcloud/dataproc.py index d6fe45009b..32e54cf5e6 100644 --- a/sdk/python/feast/pyspark/launchers/gcloud/dataproc.py +++ b/sdk/python/feast/pyspark/launchers/gcloud/dataproc.py @@ -174,6 +174,19 @@ class DataprocStreamingIngestionJob(DataprocJobMixin, StreamIngestionJob): Streaming Ingestion job result for a Dataproc cluster """ + def __init__( + self, + job: Job, + refresh_fn: Callable[[], Job], + cancel_fn: Callable[[], None], + job_hash: str, + ) -> None: + super.__init__(job, refresh_fn, cancel_fn) + self._job_hash = job_hash + + def get_hash(self) -> str: + return self._job_hash + class DataprocClusterLauncher(JobLauncher): """ @@ -184,6 +197,7 @@ class DataprocClusterLauncher(JobLauncher): EXTERNAL_JARS = ["gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar"] JOB_TYPE_LABEL_KEY = "feast_job_type" + JOB_HASH_LABEL_KEY = "feast_job_hash" def __init__( self, cluster_name: str, staging_location: str, region: str, project_id: str, @@ -238,6 +252,11 @@ def dataproc_submit( "placement": {"cluster_name": self.cluster_name}, "labels": {self.JOB_TYPE_LABEL_KEY: job_params.get_job_type().name.lower()}, } + + # Add job hash to labels only for the stream ingestion job + if isinstance(job_params, StreamIngestionJobParameters): + job_config["labels"][self.JOB_HASH_LABEL_KEY] = job_params.get_job_hash() + if job_params.get_class_name(): job_config.update( { @@ -301,7 +320,8 @@ def start_stream_to_online_ingestion( self, ingestion_job_params: StreamIngestionJobParameters ) -> StreamIngestionJob: job, refresh_fn, cancel_fn = self.dataproc_submit(ingestion_job_params) - return DataprocStreamingIngestionJob(job, refresh_fn, cancel_fn) + job_hash = ingestion_job_params.get_job_hash() + return DataprocStreamingIngestionJob(job, refresh_fn, cancel_fn, job_hash) def stage_dataframe(self, df, event_timestamp_column: str): raise NotImplementedError @@ -331,7 +351,8 @@ def _dataproc_job_to_spark_job(self, job: Job) -> SparkJob: return DataprocBatchIngestionJob(job, refresh_fn, cancel_fn) if job_type == SparkJobType.STREAM_INGESTION.name.lower(): - return DataprocStreamingIngestionJob(job, refresh_fn, cancel_fn) + job_hash = job.labels[self.JOB_HASH_LABEL_KEY] + return DataprocStreamingIngestionJob(job, refresh_fn, cancel_fn, job_hash) raise ValueError(f"Unrecognized job type: {job_type}") @@ -345,6 +366,3 @@ def list_jobs(self, include_terminated: bool) -> List[SparkJob]: project_id=self.project_id, region=self.region, filter=job_filter ) ] - - def list_jobs_by_hash(self, include_terminated: bool) -> Dict[str, SparkJob]: - raise NotImplementedError diff --git a/sdk/python/feast/pyspark/launchers/standalone/local.py b/sdk/python/feast/pyspark/launchers/standalone/local.py index 3ff56839fb..793842a939 100644 --- a/sdk/python/feast/pyspark/launchers/standalone/local.py +++ b/sdk/python/feast/pyspark/launchers/standalone/local.py @@ -46,18 +46,16 @@ def __init__(self): self.hash_by_id = {} self.lock = threading.RLock() - def add_job(self, job_id: str, job_hash: Optional[str], job: SparkJob) -> None: - """Add a Spark job to the cache with given job_id and job_cache. + def add_job(self, job: SparkJob) -> None: + """Add a Spark job to the cache. Args: - job_id (str): External ID of the Spark Job. - job_hash (Optional[str]): Computed hash of the Spark job. If None, Job Service - will ignore maintaining the state of this Spark job. job (SparkJob): The new Spark job to add. """ with self.lock: - self.job_by_id[job_id] = job - self.hash_by_id[job_id] = job_hash + self.job_by_id[job.get_id()] = job + if isinstance(job, StreamIngestionJob): + self.hash_by_id[job.get_id()] = job.get_hash() def list_jobs(self) -> List[SparkJob]: """List all Spark jobs in the cache.""" @@ -76,16 +74,6 @@ def get_job_by_id(self, job_id: str) -> SparkJob: with self.lock: return self.job_by_id[job_id] - def list_jobs_by_hash(self) -> Dict[str, SparkJob]: - """Get a map of job_hash -> Spark job. Returns only jobs with non-None job hashes.""" - with self.lock: - jobs_by_hash = {} - for job_id, job in self.job_by_id.items(): - job_hash = self.hash_by_id[job_id] - if job_hash is not None: - jobs_by_hash[job_hash] = job - return jobs_by_hash - job_cache = JobCache() @@ -163,7 +151,19 @@ class StandaloneClusterStreamingIngestionJob( Streaming Ingestion job result for a standalone spark cluster """ - pass + def __init__( + self, + job_id: str, + job_name: str, + process: subprocess.Popen, + ui_port: int, + job_hash: str, + ) -> None: + super().__init__(job_id, job_name, process, ui_port) + self._job_hash = job_hash + + def get_hash(self) -> str: + return self._job_hash class StandaloneClusterRetrievalJob(StandaloneClusterJobMixin, RetrievalJob): @@ -293,7 +293,7 @@ def historical_feature_retrieval( self.spark_submit(job_params), job_params.get_destination_path(), ) - job_cache.add_job(job_id, None, job) + job_cache.add_job(job) return job def offline_to_online_ingestion( @@ -307,7 +307,7 @@ def offline_to_online_ingestion( self.spark_submit(ingestion_job_params, ui_port), ui_port, ) - job_cache.add_job(job_id, None, job) + job_cache.add_job(job) return job def start_stream_to_online_ingestion( @@ -320,8 +320,9 @@ def start_stream_to_online_ingestion( ingestion_job_params.get_name(), self.spark_submit(ingestion_job_params, ui_port), ui_port, + ingestion_job_params.get_job_hash(), ) - job_cache.add_job(job_id, ingestion_job_params.get_job_hash(), job) + job_cache.add_job(job) return job def stage_dataframe(self, df, event_timestamp_column: str): @@ -340,14 +341,3 @@ def list_jobs(self, include_terminated: bool) -> List[SparkJob]: if job.get_status() in (SparkJobStatus.STARTING, SparkJobStatus.IN_PROGRESS) ] - - def list_jobs_by_hash(self, include_terminated: bool) -> Dict[str, SparkJob]: - if include_terminated is True: - return job_cache.list_jobs_by_hash() - else: - return { - job_hash: job - for job_hash, job in job_cache.list_jobs_by_hash().items() - if job.get_status() - in (SparkJobStatus.STARTING, SparkJobStatus.IN_PROGRESS) - } diff --git a/sdk/python/feast/remote_job.py b/sdk/python/feast/remote_job.py index 0a766cf796..6af6081749 100644 --- a/sdk/python/feast/remote_job.py +++ b/sdk/python/feast/remote_job.py @@ -124,13 +124,12 @@ class RemoteStreamIngestionJob(RemoteJobMixin, StreamIngestionJob): Stream ingestion job result. """ - def __init__( - self, - service: JobServiceStub, - grpc_extra_param_provider: GrpcExtraParamProvider, - job_id: str, - ): - super().__init__(service, grpc_extra_param_provider, job_id) + def get_hash(self) -> str: + response = self._service.GetJob( + GetJobRequest(job_id=self._job_id), **self._grpc_extra_param_provider() + ) + + return response.job.hash def get_remote_job_from_proto( From 0f6d8d71ad4d33df43c7f0b7fc81ec41623462ee Mon Sep 17 00:00:00 2001 From: Tsotne Tabidze Date: Fri, 6 Nov 2020 20:54:40 +0400 Subject: [PATCH 06/14] Fix super -> super() Signed-off-by: Tsotne Tabidze --- sdk/python/feast/pyspark/launchers/gcloud/dataproc.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/feast/pyspark/launchers/gcloud/dataproc.py b/sdk/python/feast/pyspark/launchers/gcloud/dataproc.py index 32e54cf5e6..9123829851 100644 --- a/sdk/python/feast/pyspark/launchers/gcloud/dataproc.py +++ b/sdk/python/feast/pyspark/launchers/gcloud/dataproc.py @@ -181,7 +181,7 @@ def __init__( cancel_fn: Callable[[], None], job_hash: str, ) -> None: - super.__init__(job, refresh_fn, cancel_fn) + super().__init__(job, refresh_fn, cancel_fn) self._job_hash = job_hash def get_hash(self) -> str: From f1173e4309e583381cfef1e079e116cc6ebe037d Mon Sep 17 00:00:00 2001 From: Oleg Avdeev Date: Tue, 10 Nov 2020 19:28:17 +0300 Subject: [PATCH 07/14] make _job_to_proto a function Signed-off-by: Oleg Avdeev --- sdk/python/feast/job_service.py | 43 +++++++++++++++++++++++++++++++-- sdk/python/feast/pyspark/abc.py | 41 ------------------------------- 2 files changed, 41 insertions(+), 43 deletions(-) diff --git a/sdk/python/feast/job_service.py b/sdk/python/feast/job_service.py index 859478508f..89eb03e770 100644 --- a/sdk/python/feast/job_service.py +++ b/sdk/python/feast/job_service.py @@ -16,6 +16,11 @@ GetHistoricalFeaturesRequest, GetHistoricalFeaturesResponse, GetJobResponse, +) +from feast.core.JobService_pb2 import Job as JobProto +from feast.core.JobService_pb2 import ( + JobStatus, + JobType, ListJobsResponse, StartOfflineToOnlineIngestionJobRequest, StartOfflineToOnlineIngestionJobResponse, @@ -23,7 +28,13 @@ StartStreamToOnlineIngestionJobResponse, ) from feast.data_source import DataSource -from feast.pyspark.abc import StreamIngestionJob +from feast.pyspark.abc import ( + BatchIngestionJob, + RetrievalJob, + SparkJob, + SparkJobStatus, + StreamIngestionJob, +) from feast.pyspark.launcher import ( get_job_by_id, get_stream_to_online_ingestion_params, @@ -39,6 +50,34 @@ ) +def _job_to_proto(self, spark_job: SparkJob) -> JobProto: + job = JobProto() + job.id = spark_job.get_id() + status = spark_job.get_status() + if status == SparkJobStatus.COMPLETED: + job.status = JobStatus.JOB_STATUS_DONE + elif status == SparkJobStatus.IN_PROGRESS: + job.status = JobStatus.JOB_STATUS_RUNNING + elif status == SparkJobStatus.FAILED: + job.status = JobStatus.JOB_STATUS_ERROR + elif status == SparkJobStatus.STARTING: + job.status = JobStatus.JOB_STATUS_PENDING + else: + raise ValueError(f"Invalid job status {status}") + + if isinstance(spark_job, RetrievalJob): + job.type = JobType.RETRIEVAL_JOB + job.retrieval.output_location = spark_job.get_output_file_uri(block=False) + elif isinstance(spark_job, BatchIngestionJob): + job.type = JobType.BATCH_INGESTION_JOB + elif isinstance(spark_job, StreamIngestionJob): + job.type = JobType.STREAM_INGESTION_JOB + else: + raise ValueError(f"Invalid job type {job}") + + return job + + class JobServiceServicer(JobService_pb2_grpc.JobServiceServicer): def __init__(self, client): self.client = client @@ -114,7 +153,7 @@ def ListJobs(self, request, context): jobs = list_jobs( include_terminated=request.include_terminated, client=self.client ) - return ListJobsResponse(jobs=[job.to_proto() for job in jobs]) + return ListJobsResponse(jobs=[_job_to_proto(job) for job in jobs]) def CancelJob(self, request, context): """Stop a single job""" diff --git a/sdk/python/feast/pyspark/abc.py b/sdk/python/feast/pyspark/abc.py index d58019c2f9..b1c847b824 100644 --- a/sdk/python/feast/pyspark/abc.py +++ b/sdk/python/feast/pyspark/abc.py @@ -8,9 +8,6 @@ import pandas -from feast.core.JobService_pb2 import Job as JobProto -from feast.core.JobService_pb2 import JobStatus as JobStatusProto -from feast.core.JobService_pb2 import JobType as JobTypeProto from feast.data_source import FileSource @@ -70,27 +67,6 @@ def cancel(self): """ raise NotImplementedError - def to_proto(self) -> JobProto: - """Converts SparkJob to the protobuf object. - - Returns: - JobProto: Converted protobuf object. - """ - job = JobProto() - job.id = self.get_id() - status = self.get_status() - if status == SparkJobStatus.COMPLETED: - job.status = JobStatusProto.JOB_STATUS_DONE - elif status == SparkJobStatus.IN_PROGRESS: - job.status = JobStatusProto.JOB_STATUS_RUNNING - elif status == SparkJobStatus.FAILED: - job.status = JobStatusProto.JOB_STATUS_ERROR - elif status == SparkJobStatus.STARTING: - job.status = JobStatusProto.JOB_STATUS_PENDING - else: - raise ValueError(f"Invalid job status {status}") - return job - class SparkJobParameters(abc.ABC): @abc.abstractmethod @@ -314,12 +290,6 @@ def get_output_file_uri(self, timeout_sec=None, block=True): """ raise NotImplementedError - def to_proto(self) -> JobProto: - job = super().to_proto() - job.type = JobTypeProto.RETRIEVAL_JOB - job.retrieval.output_location = self.get_output_file_uri(block=False) - return job - class IngestionJobParameters(SparkJobParameters): def __init__( @@ -500,11 +470,6 @@ class BatchIngestionJob(SparkJob): Container for the ingestion job result """ - def to_proto(self) -> JobProto: - job = super().to_proto() - job.type = JobTypeProto.BATCH_INGESTION_JOB - return job - class StreamIngestionJob(SparkJob): """ @@ -522,12 +487,6 @@ def get_hash(self) -> str: """ raise NotImplementedError - def to_proto(self) -> JobProto: - job = super().to_proto() - job.type = JobTypeProto.STREAM_INGESTION_JOB - job.hash = self.get_hash() - return job - class JobLauncher(abc.ABC): """ From c82f188dbcba3f079355df06dfcceff3b79a7e61 Mon Sep 17 00:00:00 2001 From: Oleg Avdeev Date: Tue, 10 Nov 2020 19:34:34 +0300 Subject: [PATCH 08/14] move ensure_stream_ingestion_jobs back out of the client Signed-off-by: Oleg Avdeev --- sdk/python/feast/client.py | 83 +---------------------------- sdk/python/feast/job_service.py | 92 +++++++++++++++++++++++++++++++-- sdk/python/feast/pyspark/abc.py | 4 +- 3 files changed, 92 insertions(+), 87 deletions(-) diff --git a/sdk/python/feast/client.py b/sdk/python/feast/client.py index ca70d2c82b..45e6706c9f 100644 --- a/sdk/python/feast/client.py +++ b/sdk/python/feast/client.py @@ -18,7 +18,7 @@ import uuid from datetime import datetime from itertools import groupby -from typing import Any, Dict, List, Optional, Tuple, Union +from typing import Any, Dict, List, Optional, Union import grpc import pandas as pd @@ -90,10 +90,9 @@ _write_partitioned_table_from_source, ) from feast.online_response import OnlineResponse, _infer_online_entity_rows -from feast.pyspark.abc import RetrievalJob, SparkJob, StreamIngestionJob +from feast.pyspark.abc import RetrievalJob, SparkJob from feast.pyspark.launcher import ( get_job_by_id, - get_stream_to_online_ingestion_params, list_jobs, stage_dataframe, start_historical_feature_retrieval_job, @@ -1120,84 +1119,6 @@ def start_stream_to_online_ingestion( self._job_service, self._extra_grpc_params, response.id ) - def _get_expected_job_hash_to_table_refs( - self, all_projects: bool - ) -> Dict[str, Tuple[str, str]]: - """ - Checks all feature tables for the requires project(s) and determines all required stream - ingestion jobs from them. Outputs a map of the expected job_hash to a tuple of (project, table_name). - - Args: - all_projects (bool): If true, runs the check for all project. - Otherwise only checks the current project. - - Returns: - Dict[str, Tuple[str, str]]: Map of job_hash -> (project, table_name) for expected stream ingestion jobs - """ - job_hash_to_table_refs = {} - - projects = self.list_projects() if all_projects else [self.project] - for project in projects: - feature_tables = self.list_feature_tables(project) - for feature_table in feature_tables: - if feature_table.stream_source is not None: - params = get_stream_to_online_ingestion_params( - self, project, feature_table, [] - ) - job_hash = params.get_job_hash() - job_hash_to_table_refs[job_hash] = (project, feature_table.name) - - return job_hash_to_table_refs - - def ensure_stream_ingestion_jobs(self, all_projects: bool = False): - """Ensures all required stream ingestion jobs are running and cleans up the unnecessary jobs. - - More concretely, it will determine - - which stream ingestion jobs are running - - which stream ingestion jobs should be running - And it'll do 2 kinds of operations - - Cancel all running jobs that should not be running - - Start all non-existent jobs that should be running - - Args: - all_projects (bool, optional): If true, runs the check for all project. - Otherwise only checks the current project. Defaults to False. - """ - expected_job_hash_to_table_refs = self._get_expected_job_hash_to_table_refs( - all_projects - ) - expected_job_hashes = set(expected_job_hash_to_table_refs.keys()) - - jobs_by_hash: Dict[str, StreamIngestionJob] = {} - for job in self.list_jobs(include_terminated=False): - if isinstance(job, StreamIngestionJob): - jobs_by_hash[job.get_hash()] = job - - existing_job_hashes = set(jobs_by_hash.keys()) - - job_hashes_to_cancel = existing_job_hashes - expected_job_hashes - job_hashes_to_start = expected_job_hashes - existing_job_hashes - - logging.info( - f"existing_job_hashes = {sorted(list(existing_job_hashes))} expected_job_hashes = {sorted(list(expected_job_hashes))}" - ) - - for job_hash in job_hashes_to_cancel: - job = jobs_by_hash[job_hash] - logging.info( - f"Cancelling a stream ingestion job with job_hash={job_hash} job_id={job.get_id()} status={job.get_status()}" - ) - job.cancel() - - for job_hash in job_hashes_to_start: - # Any job that we wish to start should be among expected table refs map - project, table_name = expected_job_hash_to_table_refs[job_hash] - logging.info( - f"Starting a stream ingestion job for project={project}, table_name={table_name} with job_hash={job_hash}" - ) - feature_table = self.get_feature_table(name=table_name, project=project) - self.start_stream_to_online_ingestion(feature_table, [], project=project) - def list_jobs(self, include_terminated: bool) -> List[SparkJob]: if not self._use_job_service: return list_jobs(include_terminated, self) diff --git a/sdk/python/feast/job_service.py b/sdk/python/feast/job_service.py index 89eb03e770..1798fcede8 100644 --- a/sdk/python/feast/job_service.py +++ b/sdk/python/feast/job_service.py @@ -5,6 +5,7 @@ import time import traceback from concurrent.futures import ThreadPoolExecutor +from typing import Dict, List, Tuple import grpc @@ -167,11 +168,11 @@ def GetJob(self, request, context): return GetJobResponse(job=job.to_proto()) -def start_control_loop(): +def start_control_loop() -> None: """Starts control loop that continuously ensures that correct jobs are being run. Currently this affects only the stream ingestion jobs. Please refer to - Client:ensure_stream_ingestion_jobs for full documentation on how the check works. + ensure_stream_ingestion_jobs for full documentation on how the check works. """ logging.info( @@ -181,7 +182,7 @@ def start_control_loop(): try: client = feast.Client() while True: - client.ensure_stream_ingestion_jobs(all_projects=True) + ensure_stream_ingestion_jobs(client, all_projects=True) time.sleep(1) except Exception: traceback.print_exc() @@ -201,7 +202,7 @@ def intercept_service(self, continuation, handler_call_details): return continuation(handler_call_details) -def start_job_service(): +def start_job_service() -> None: """ Start Feast Job Service """ @@ -225,3 +226,86 @@ def start_job_service(): server.start() logging.info("Feast Job Service is listening on port :6568") server.wait_for_termination() + + +def _get_expected_job_hash_to_table_refs( + client: feast.Client, projects: List[str] +) -> Dict[str, Tuple[str, str]]: + """ + Checks all feature tables for the requires project(s) and determines all required stream + ingestion jobs from them. Outputs a map of the expected job_hash to a tuple of (project, table_name). + + Args: + all_projects (bool): If true, runs the check for all project. + Otherwise only checks the current project. + + Returns: + Dict[str, Tuple[str, str]]: Map of job_hash -> (project, table_name) for expected stream ingestion jobs + """ + job_hash_to_table_refs = {} + + for project in projects: + feature_tables = client.list_feature_tables(project) + for feature_table in feature_tables: + if feature_table.stream_source is not None: + params = get_stream_to_online_ingestion_params( + client, project, feature_table, [] + ) + job_hash = params.get_job_hash() + job_hash_to_table_refs[job_hash] = (project, feature_table.name) + + return job_hash_to_table_refs + + +def ensure_stream_ingestion_jobs(client: feast.Client, all_projects: bool): + """Ensures all required stream ingestion jobs are running and cleans up the unnecessary jobs. + + More concretely, it will determine + - which stream ingestion jobs are running + - which stream ingestion jobs should be running + And it'll do 2 kinds of operations + - Cancel all running jobs that should not be running + - Start all non-existent jobs that should be running + + Args: + all_projects (bool): If true, runs the check for all project. + Otherwise only checks the client's current project. + """ + + projects = client.list_projects() if all_projects else [client.project] + + expected_job_hash_to_table_refs = _get_expected_job_hash_to_table_refs( + client, projects + ) + + expected_job_hashes = set(expected_job_hash_to_table_refs.keys()) + + jobs_by_hash: Dict[str, StreamIngestionJob] = {} + for job in client.list_jobs(include_terminated=False): + if isinstance(job, StreamIngestionJob): + jobs_by_hash[job.get_hash()] = job + + existing_job_hashes = set(jobs_by_hash.keys()) + + job_hashes_to_cancel = existing_job_hashes - expected_job_hashes + job_hashes_to_start = expected_job_hashes - existing_job_hashes + + logging.debug( + f"existing_job_hashes = {sorted(list(existing_job_hashes))} expected_job_hashes = {sorted(list(expected_job_hashes))}" + ) + + for job_hash in job_hashes_to_cancel: + job = jobs_by_hash[job_hash] + logging.info( + f"Cancelling a stream ingestion job with job_hash={job_hash} job_id={job.get_id()} status={job.get_status()}" + ) + job.cancel() + + for job_hash in job_hashes_to_start: + # Any job that we wish to start should be among expected table refs map + project, table_name = expected_job_hash_to_table_refs[job_hash] + logging.info( + f"Starting a stream ingestion job for project={project}, table_name={table_name} with job_hash={job_hash}" + ) + feature_table = client.get_feature_table(name=table_name, project=project) + client.start_stream_to_online_ingestion(feature_table, [], project=project) diff --git a/sdk/python/feast/pyspark/abc.py b/sdk/python/feast/pyspark/abc.py index b1c847b824..19ba9ab174 100644 --- a/sdk/python/feast/pyspark/abc.py +++ b/sdk/python/feast/pyspark/abc.py @@ -479,8 +479,8 @@ class StreamIngestionJob(SparkJob): def get_hash(self) -> str: """Gets the consistent hash of this stream ingestion job. - The hash needs to be persisted at the data processing layer, s.t. - we can get the same hash when retrieving the job from Spark. + The hash needs to be persisted at the data processing layer, so that we can get the same + hash when retrieving the job from Spark. Returns: str: The hash for this streaming ingestion job From 8a4348430903d3476d21cdb2e520800fdbb86b1a Mon Sep 17 00:00:00 2001 From: Oleg Avdeev Date: Wed, 11 Nov 2020 13:52:23 +0300 Subject: [PATCH 09/14] add tests for the job control loop Signed-off-by: Oleg Avdeev --- sdk/python/tests/feast_core_server.py | 11 ++ .../tests/test_streaming_control_loop.py | 184 ++++++++++++++++++ 2 files changed, 195 insertions(+) create mode 100644 sdk/python/tests/test_streaming_control_loop.py diff --git a/sdk/python/tests/feast_core_server.py b/sdk/python/tests/feast_core_server.py index 85f09175bf..0c7191f35b 100644 --- a/sdk/python/tests/feast_core_server.py +++ b/sdk/python/tests/feast_core_server.py @@ -11,6 +11,8 @@ ApplyEntityResponse, ApplyFeatureTableRequest, ApplyFeatureTableResponse, + DeleteFeatureTableRequest, + DeleteFeatureTableResponse, GetEntityRequest, GetEntityResponse, GetFeastCoreVersionResponse, @@ -20,6 +22,7 @@ ListEntitiesResponse, ListFeatureTablesRequest, ListFeatureTablesResponse, + ListProjectsResponse, ) from feast.core.Entity_pb2 import Entity as EntityProto from feast.core.Entity_pb2 import EntityMeta @@ -66,6 +69,7 @@ class CoreServicer(Core.CoreServiceServicer): def __init__(self): self._feature_tables = dict() self._entities = dict() + self._projects = ["default"] def GetFeastCoreVersion(self, request, context): return GetFeastCoreVersionResponse(version="0.10.0") @@ -105,6 +109,10 @@ def ApplyFeatureTable(self, request: ApplyFeatureTableRequest, context): return ApplyFeatureTableResponse(table=applied_feature_table,) + def DeleteFeatureTable(self, request: DeleteFeatureTableRequest, context): + del self._feature_tables[request.name] + return DeleteFeatureTableResponse() + def GetEntity(self, request: GetEntityRequest, context): filtered_entities = [ entity @@ -119,6 +127,9 @@ def ListEntities(self, request: ListEntitiesRequest, context): return ListEntitiesResponse(entities=filtered_entities_response) + def ListProjects(self, request, context): + return ListProjectsResponse(projects=self._projects) + def ApplyEntity(self, request: ApplyEntityRequest, context): entity_spec = request.spec diff --git a/sdk/python/tests/test_streaming_control_loop.py b/sdk/python/tests/test_streaming_control_loop.py new file mode 100644 index 0000000000..99f21f1ec8 --- /dev/null +++ b/sdk/python/tests/test_streaming_control_loop.py @@ -0,0 +1,184 @@ +import os +import subprocess +from concurrent import futures +from contextlib import contextmanager +from typing import List +from unittest.mock import patch + +import grpc +import pyspark + +from feast.client import Client +from feast.core.CoreService_pb2_grpc import add_CoreServiceServicer_to_server +from feast.data_format import ParquetFormat, ProtoFormat +from feast.data_source import FileSource, KafkaSource +from feast.entity import Entity +from feast.feature import Feature +from feast.feature_table import FeatureTable +from feast.job_service import ensure_stream_ingestion_jobs +from feast.pyspark.launchers.standalone import StandaloneClusterLauncher +from feast.value_type import ValueType +from tests.feast_core_server import CoreServicer as MockCoreServicer + + +@contextmanager +def mock_server(servicer, add_fn): + """Instantiate a server and return its address for use in tests""" + server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) + add_fn(servicer, server) + port = server.add_insecure_port("[::]:0") + server.start() + + try: + address = "localhost:%d" % port + with grpc.insecure_channel(address): + yield address + finally: + server.stop(None) + + +CORE_URL = "core.feast.example.com" +SERVING_URL = "serving.example.com" + + +class TestStreamingControlLoop: + table_name = "my-feature-table-1" + + features_1 = [ + Feature(name="fs1-my-feature-1", dtype=ValueType.INT64), + Feature(name="fs1-my-feature-2", dtype=ValueType.STRING), + Feature(name="fs1-my-feature-3", dtype=ValueType.STRING_LIST), + Feature(name="fs1-my-feature-4", dtype=ValueType.BYTES_LIST), + ] + + features_2 = features_1 + [ + Feature(name="fs1-my-feature-5", dtype=ValueType.BYTES_LIST), + ] + + def _create_ft(self, client: Client, features) -> None: + entity = Entity( + name="driver_car_id", + description="Car driver id", + value_type=ValueType.STRING, + labels={"team": "matchmaking"}, + ) + + # Register Entity with Core + client.apply_entity(entity) + + # Create Feature Tables + batch_source = FileSource( + file_format=ParquetFormat(), + file_url="file://feast/*", + event_timestamp_column="ts_col", + created_timestamp_column="timestamp", + date_partition_column="date_partition_col", + ) + + stream_source = KafkaSource( + bootstrap_servers="localhost:9094", + message_format=ProtoFormat("class.path"), + topic="test_topic", + event_timestamp_column="ts_col", + created_timestamp_column="timestamp", + ) + + ft1 = FeatureTable( + name=self.table_name, + features=features, + entities=["driver_car_id"], + labels={"team": "matchmaking"}, + batch_source=batch_source, + stream_source=stream_source, + ) + + # Register Feature Table with Core + client.apply_feature_table(ft1) + + def _delete_ft(self, client: Client): + client.delete_feature_table(self.table_name) + + def test_streaming_job_control_loop(self) -> None: + """ Test streaming job control loop logic. """ + + core_servicer = MockCoreServicer() + + processes: List[subprocess.Popen] = [] + + def _mock_spark_submit(self, *args, **kwargs) -> subprocess.Popen: + # We mock StandaloneClusterLauncher.spark_submit to run a dummy process and pretend + # that this is a spark structured streaming process. In addition, this implementation + # will keep track of launched processes in an array. + result = subprocess.Popen(args=["/bin/bash", "-c", "sleep 600"]) + processes.append(result) + return result + + with patch.object( + StandaloneClusterLauncher, "spark_submit", new=_mock_spark_submit + ), mock_server( + core_servicer, add_CoreServiceServicer_to_server + ) as core_service_url: + client = Client( + core_url=core_service_url, + serving_url=SERVING_URL, + spark_launcher="standalone", + spark_home=os.path.dirname(pyspark.__file__), + ) + + # Run one iteration of the control loop. It should do nothing since we have no + # feature tables. + ensure_stream_ingestion_jobs(client=client, all_projects=True) + + # No jobs should be running at this point. + assert len(client.list_jobs(include_terminated=True)) == 0 + + # Now, create a new feature table. + self._create_ft(client, self.features_1) + + # Run another iteration of the control loop. + ensure_stream_ingestion_jobs(client=client, all_projects=True) + + # We expect a streaming job to be created for the new Feature Table. + assert len(client.list_jobs(include_terminated=False)) == 1 + assert len(processes) == 1 + + first_job_id = client.list_jobs(include_terminated=False)[0].get_id() + + # Pretend that the streaming job has terminated for no reason. + processes[0].kill() + + # The control loop is expected to notice the killed job and start it again. + ensure_stream_ingestion_jobs(client=client, all_projects=True) + + # We expect to find one terminated job and one restarted job. + assert len(client.list_jobs(include_terminated=False)) == 1 + assert len(client.list_jobs(include_terminated=True)) == 2 + + id_after_restart = client.list_jobs(include_terminated=False)[0].get_id() + + # Indeed it is a new job with a new id. + assert id_after_restart != first_job_id + + # Update the feature table. + self._create_ft(client, self.features_2) + + # Run another iteration of the job control loop. We expect to restart the streaming + # job since the feature table has changed. + ensure_stream_ingestion_jobs(client=client, all_projects=True) + + # We expect to find two terminated job and one live job. + assert len(client.list_jobs(include_terminated=False)) == 1 + assert len(client.list_jobs(include_terminated=True)) == 3 + + id_after_change = client.list_jobs(include_terminated=False)[0].get_id() + assert id_after_restart != id_after_change + + # Delete the feature table. + self._delete_ft(client) + + # Run another iteration of the job control loop. We expect it to terminate the streaming + # job. + ensure_stream_ingestion_jobs(client=client, all_projects=True) + + assert len(client.list_jobs(include_terminated=False)) == 0 + assert len(client.list_jobs(include_terminated=True)) == 3 From 937a036f78205a564c1ecd810d61bf4e0e935150 Mon Sep 17 00:00:00 2001 From: Oleg Avdeev Date: Wed, 11 Nov 2020 15:00:06 +0300 Subject: [PATCH 10/14] fix _job_to_proto Signed-off-by: Oleg Avdeev --- sdk/python/feast/job_service.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/python/feast/job_service.py b/sdk/python/feast/job_service.py index 1798fcede8..1daf335337 100644 --- a/sdk/python/feast/job_service.py +++ b/sdk/python/feast/job_service.py @@ -51,7 +51,7 @@ ) -def _job_to_proto(self, spark_job: SparkJob) -> JobProto: +def _job_to_proto(spark_job: SparkJob) -> JobProto: job = JobProto() job.id = spark_job.get_id() status = spark_job.get_status() @@ -165,7 +165,7 @@ def CancelJob(self, request, context): def GetJob(self, request, context): """Get details of a single job""" job = get_job_by_id(request.job_id, client=self.client) - return GetJobResponse(job=job.to_proto()) + return GetJobResponse(job=_job_to_proto(job)) def start_control_loop() -> None: From 495da58db38c20ffe0912b8b1544b0939b943bf2 Mon Sep 17 00:00:00 2001 From: Oleg Avdeev Date: Wed, 11 Nov 2020 20:31:13 +0300 Subject: [PATCH 11/14] fix job cache state leak between tests Signed-off-by: Oleg Avdeev --- .../pyspark/launchers/standalone/__init__.py | 12 +++++++-- .../pyspark/launchers/standalone/local.py | 27 ++++++++++++------- .../tests/test_streaming_control_loop.py | 8 ++++-- 3 files changed, 33 insertions(+), 14 deletions(-) diff --git a/sdk/python/feast/pyspark/launchers/standalone/__init__.py b/sdk/python/feast/pyspark/launchers/standalone/__init__.py index 1c44e5497f..433d9ed124 100644 --- a/sdk/python/feast/pyspark/launchers/standalone/__init__.py +++ b/sdk/python/feast/pyspark/launchers/standalone/__init__.py @@ -1,3 +1,11 @@ -from .local import StandaloneClusterLauncher, StandaloneClusterRetrievalJob +from .local import ( + StandaloneClusterLauncher, + StandaloneClusterRetrievalJob, + reset_job_cache, +) -__all__ = ["StandaloneClusterRetrievalJob", "StandaloneClusterLauncher"] +__all__ = [ + "StandaloneClusterRetrievalJob", + "StandaloneClusterLauncher", + "reset_job_cache", +] diff --git a/sdk/python/feast/pyspark/launchers/standalone/local.py b/sdk/python/feast/pyspark/launchers/standalone/local.py index 793842a939..9783cbe52e 100644 --- a/sdk/python/feast/pyspark/launchers/standalone/local.py +++ b/sdk/python/feast/pyspark/launchers/standalone/local.py @@ -25,10 +25,12 @@ class JobCache: - """In-memory cache of Spark jobs. - - This is necessary since we can't query Spark jobs in local mode + """ + A *global* in-memory cache of Spark jobs. + This is necessary since we can't easily keep track of running Spark jobs in local mode, since + there is no external state (unlike EMR and Dataproc which keep track of the running jobs for + us). """ # Map of job_id -> spark job @@ -75,7 +77,12 @@ def get_job_by_id(self, job_id: str) -> SparkJob: return self.job_by_id[job_id] -job_cache = JobCache() +global_job_cache = JobCache() + + +def reset_job_cache(): + global global_job_cache + global_job_cache = JobCache() def _find_free_port(): @@ -293,7 +300,7 @@ def historical_feature_retrieval( self.spark_submit(job_params), job_params.get_destination_path(), ) - job_cache.add_job(job) + global_job_cache.add_job(job) return job def offline_to_online_ingestion( @@ -307,7 +314,7 @@ def offline_to_online_ingestion( self.spark_submit(ingestion_job_params, ui_port), ui_port, ) - job_cache.add_job(job) + global_job_cache.add_job(job) return job def start_stream_to_online_ingestion( @@ -322,22 +329,22 @@ def start_stream_to_online_ingestion( ui_port, ingestion_job_params.get_job_hash(), ) - job_cache.add_job(job) + global_job_cache.add_job(job) return job def stage_dataframe(self, df, event_timestamp_column: str): raise NotImplementedError def get_job_by_id(self, job_id: str) -> SparkJob: - return job_cache.get_job_by_id(job_id) + return global_job_cache.get_job_by_id(job_id) def list_jobs(self, include_terminated: bool) -> List[SparkJob]: if include_terminated is True: - return job_cache.list_jobs() + return global_job_cache.list_jobs() else: return [ job - for job in job_cache.list_jobs() + for job in global_job_cache.list_jobs() if job.get_status() in (SparkJobStatus.STARTING, SparkJobStatus.IN_PROGRESS) ] diff --git a/sdk/python/tests/test_streaming_control_loop.py b/sdk/python/tests/test_streaming_control_loop.py index 99f21f1ec8..1c85efc2b0 100644 --- a/sdk/python/tests/test_streaming_control_loop.py +++ b/sdk/python/tests/test_streaming_control_loop.py @@ -16,7 +16,10 @@ from feast.feature import Feature from feast.feature_table import FeatureTable from feast.job_service import ensure_stream_ingestion_jobs -from feast.pyspark.launchers.standalone import StandaloneClusterLauncher +from feast.pyspark.launchers.standalone import ( + StandaloneClusterLauncher, + reset_job_cache, +) from feast.value_type import ValueType from tests.feast_core_server import CoreServicer as MockCoreServicer @@ -37,7 +40,6 @@ def mock_server(servicer, add_fn): server.stop(None) -CORE_URL = "core.feast.example.com" SERVING_URL = "serving.example.com" @@ -101,6 +103,8 @@ def _delete_ft(self, client: Client): def test_streaming_job_control_loop(self) -> None: """ Test streaming job control loop logic. """ + reset_job_cache() + core_servicer = MockCoreServicer() processes: List[subprocess.Popen] = [] From 54190e640efce41dc6472d6da20ffbac1812669d Mon Sep 17 00:00:00 2001 From: Oleg Avdeev Date: Fri, 13 Nov 2020 08:36:34 -0800 Subject: [PATCH 12/14] fix docker-compose test Signed-off-by: Oleg Avdeev --- infra/scripts/test-docker-compose.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/infra/scripts/test-docker-compose.sh b/infra/scripts/test-docker-compose.sh index 6ac950aec0..348444956f 100755 --- a/infra/scripts/test-docker-compose.sh +++ b/infra/scripts/test-docker-compose.sh @@ -69,4 +69,4 @@ docker exec \ -e DISABLE_FEAST_SERVICE_FIXTURES=true \ --user root \ feast_jupyter_1 bash \ - -c 'cd /feast/tests && python -m pip install -r requirements.txt && pytest e2e/ --ingestion-jar https://storage.googleapis.com/feast-jobs/spark/ingestion/feast-ingestion-spark-${FEAST_VERSION}.jar --redis-url redis:6379 --core-url core:6565 --serving-url online_serving:6566 --job-service-url jobservice:6568 --staging-path file:///shared/staging/ --kafka-brokers kafka:9092' + -c 'cd /feast/tests && python -m pip install -r requirements.txt && pytest e2e/ --ingestion-jar https://storage.googleapis.com/feast-jobs/spark/ingestion/feast-ingestion-spark-${FEAST_VERSION}.jar --redis-url redis:6379 --core-url core:6565 --serving-url online_serving:6566 --job-service-url jobservice:6568 --staging-path file:///shared/staging/ --kafka-brokers kafka:9092 --feast-version develop' From bf44bef2cf21b2cc7903468fcdda1a9ea44f914b Mon Sep 17 00:00:00 2001 From: Willem Pienaar Date: Sat, 14 Nov 2020 20:50:22 +0800 Subject: [PATCH 13/14] Add retries to prevent download failure Signed-off-by: Willem Pienaar --- .github/workflows/complete.yml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/.github/workflows/complete.yml b/.github/workflows/complete.yml index 5222d9716b..778be2fc20 100644 --- a/.github/workflows/complete.yml +++ b/.github/workflows/complete.yml @@ -152,6 +152,10 @@ jobs: with: maven-version: 3.6.3 - name: build-jar + env: + # Try to add retries to prevent download failure + # https://github.community/t/getting-maven-could-not-transfer-artifact-with-500-error-when-using-github-actions/17570 + MAVEN_OPTS: -Dmaven.wagon.httpconnectionManager.ttlSeconds=25 -Dmaven.wagon.http.retryHandler.count=3 run: make build-java-no-tests REVISION=${GITHUB_SHA} - name: copy to gs run: gsutil cp ./spark/ingestion/target/feast-ingestion-spark-${GITHUB_SHA}.jar gs://feast-jobs/spark/ingestion/ From 87a866d9695f15bb09fd80b89073f641a76e7c56 Mon Sep 17 00:00:00 2001 From: Willem Pienaar Date: Sat, 14 Nov 2020 21:01:56 +0800 Subject: [PATCH 14/14] Remove MAVEN_OPTS again Signed-off-by: Willem Pienaar --- .github/workflows/complete.yml | 4 ---- 1 file changed, 4 deletions(-) diff --git a/.github/workflows/complete.yml b/.github/workflows/complete.yml index 778be2fc20..5222d9716b 100644 --- a/.github/workflows/complete.yml +++ b/.github/workflows/complete.yml @@ -152,10 +152,6 @@ jobs: with: maven-version: 3.6.3 - name: build-jar - env: - # Try to add retries to prevent download failure - # https://github.community/t/getting-maven-could-not-transfer-artifact-with-500-error-when-using-github-actions/17570 - MAVEN_OPTS: -Dmaven.wagon.httpconnectionManager.ttlSeconds=25 -Dmaven.wagon.http.retryHandler.count=3 run: make build-java-no-tests REVISION=${GITHUB_SHA} - name: copy to gs run: gsutil cp ./spark/ingestion/target/feast-ingestion-spark-${GITHUB_SHA}.jar gs://feast-jobs/spark/ingestion/