Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Jobservice control loop (based on #1140) #1156

Merged
merged 14 commits into from
Nov 14, 2020
Merged
2 changes: 1 addition & 1 deletion infra/scripts/test-docker-compose.sh
Original file line number Diff line number Diff line change
Expand Up @@ -69,4 +69,4 @@ docker exec \
-e DISABLE_FEAST_SERVICE_FIXTURES=true \
--user root \
feast_jupyter_1 bash \
-c 'cd /feast/tests && python -m pip install -r requirements.txt && pytest e2e/ --ingestion-jar https://storage.googleapis.com/feast-jobs/spark/ingestion/feast-ingestion-spark-${FEAST_VERSION}.jar --redis-url redis:6379 --core-url core:6565 --serving-url online_serving:6566 --job-service-url jobservice:6568 --staging-path file:///shared/staging/ --kafka-brokers kafka:9092'
-c 'cd /feast/tests && python -m pip install -r requirements.txt && pytest e2e/ --ingestion-jar https://storage.googleapis.com/feast-jobs/spark/ingestion/feast-ingestion-spark-${FEAST_VERSION}.jar --redis-url redis:6379 --core-url core:6565 --serving-url online_serving:6566 --job-service-url jobservice:6568 --staging-path file:///shared/staging/ --kafka-brokers kafka:9092 --feast-version develop'
2 changes: 2 additions & 0 deletions protos/feast/core/JobService.proto
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ message Job {
JobType type = 2;
// Current job status
JobStatus status = 3;
// Deterministic hash of the Job
string hash = 8;

message RetrievalJobMeta {
string output_location = 4;
Expand Down
9 changes: 6 additions & 3 deletions sdk/python/feast/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1098,12 +1098,15 @@ def start_offline_to_online_ingestion(
)

def start_stream_to_online_ingestion(
self, feature_table: FeatureTable, extra_jars: Optional[List[str]] = None,
self,
feature_table: FeatureTable,
extra_jars: Optional[List[str]] = None,
project: str = None,
) -> SparkJob:
if not self._use_job_service:
return start_stream_to_online_ingestion(
client=self,
project=self.project,
project=project or self.project,
feature_table=feature_table,
extra_jars=extra_jars or [],
)
Expand All @@ -1113,7 +1116,7 @@ def start_stream_to_online_ingestion(
)
response = self._job_service.StartStreamToOnlineIngestionJob(request)
return RemoteStreamIngestionJob(
self._job_service, self._extra_grpc_params, response.id,
self._job_service, self._extra_grpc_params, response.id
)

def list_jobs(self, include_terminated: bool) -> List[SparkJob]:
Expand Down
3 changes: 3 additions & 0 deletions sdk/python/feast/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ class AuthProvider(Enum):
CONFIG_JOB_SERVICE_URL_KEY = "job_service_url"
CONFIG_JOB_SERVICE_ENABLE_SSL_KEY = "job_service_enable_ssl"
CONFIG_JOB_SERVICE_SERVER_SSL_CERT_KEY = "job_service_server_ssl_cert"
CONFIG_JOB_SERVICE_ENABLE_CONTROL_LOOP = "job_service_enable_control_loop"
CONFIG_GRPC_CONNECTION_TIMEOUT_DEFAULT_KEY = "grpc_connection_timeout_default"
CONFIG_GRPC_CONNECTION_TIMEOUT_APPLY_KEY = "grpc_connection_timeout_apply"
CONFIG_BATCH_FEATURE_REQUEST_WAIT_TIME_SECONDS_KEY = (
Expand Down Expand Up @@ -143,6 +144,8 @@ class AuthProvider(Enum):
CONFIG_JOB_SERVICE_ENABLE_SSL_KEY: "False",
# Path to certificate(s) to secure connection to Feast Job Service
CONFIG_JOB_SERVICE_SERVER_SSL_CERT_KEY: "",
# Disable control loop by default for now
CONFIG_JOB_SERVICE_ENABLE_CONTROL_LOOP: "False",
CONFIG_STATSD_ENABLED: "False",
# IngestionJob DeadLetter Destination
CONFIG_DEADLETTER_PATH: "",
Expand Down
206 changes: 172 additions & 34 deletions sdk/python/feast/job_service.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
import logging
import os
import signal
import threading
import time
import traceback
from concurrent.futures import ThreadPoolExecutor
from typing import Dict, List, Tuple

import grpc

import feast
from feast.constants import CONFIG_JOB_SERVICE_ENABLE_CONTROL_LOOP
from feast.core import JobService_pb2_grpc
from feast.core.JobService_pb2 import (
CancelJobResponse,
Expand Down Expand Up @@ -31,6 +38,7 @@
)
from feast.pyspark.launcher import (
get_job_by_id,
get_stream_to_online_ingestion_params,
list_jobs,
start_historical_feature_retrieval_job,
start_offline_to_online_ingestion,
Expand All @@ -43,36 +51,37 @@
)


def _job_to_proto(spark_job: SparkJob) -> JobProto:
job = JobProto()
job.id = spark_job.get_id()
status = spark_job.get_status()
if status == SparkJobStatus.COMPLETED:
job.status = JobStatus.JOB_STATUS_DONE
elif status == SparkJobStatus.IN_PROGRESS:
job.status = JobStatus.JOB_STATUS_RUNNING
elif status == SparkJobStatus.FAILED:
job.status = JobStatus.JOB_STATUS_ERROR
elif status == SparkJobStatus.STARTING:
job.status = JobStatus.JOB_STATUS_PENDING
else:
raise ValueError(f"Invalid job status {status}")

if isinstance(spark_job, RetrievalJob):
job.type = JobType.RETRIEVAL_JOB
job.retrieval.output_location = spark_job.get_output_file_uri(block=False)
elif isinstance(spark_job, BatchIngestionJob):
job.type = JobType.BATCH_INGESTION_JOB
elif isinstance(spark_job, StreamIngestionJob):
job.type = JobType.STREAM_INGESTION_JOB
else:
raise ValueError(f"Invalid job type {job}")

return job


class JobServiceServicer(JobService_pb2_grpc.JobServiceServicer):
def __init__(self):
self.client = feast.Client()

def _job_to_proto(self, spark_job: SparkJob) -> JobProto:
job = JobProto()
job.id = spark_job.get_id()
status = spark_job.get_status()
if status == SparkJobStatus.COMPLETED:
job.status = JobStatus.JOB_STATUS_DONE
elif status == SparkJobStatus.IN_PROGRESS:
job.status = JobStatus.JOB_STATUS_RUNNING
elif status == SparkJobStatus.FAILED:
job.status = JobStatus.JOB_STATUS_ERROR
elif status == SparkJobStatus.STARTING:
job.status = JobStatus.JOB_STATUS_PENDING
else:
raise ValueError(f"Invalid job status {status}")

if isinstance(spark_job, RetrievalJob):
job.type = JobType.RETRIEVAL_JOB
job.retrieval.output_location = spark_job.get_output_file_uri(block=False)
elif isinstance(spark_job, BatchIngestionJob):
job.type = JobType.BATCH_INGESTION_JOB
elif isinstance(spark_job, StreamIngestionJob):
job.type = JobType.STREAM_INGESTION_JOB
else:
raise ValueError(f"Invalid job type {job}")

return job
def __init__(self, client):
self.client = client

def StartOfflineToOnlineIngestionJob(
self, request: StartOfflineToOnlineIngestionJobRequest, context
Expand Down Expand Up @@ -117,6 +126,20 @@ def StartStreamToOnlineIngestionJob(
feature_table = self.client.get_feature_table(
request.table_name, request.project
)

if self.client._config.getboolean(CONFIG_JOB_SERVICE_ENABLE_CONTROL_LOOP):
# If the control loop is enabled, return existing stream ingestion job id instead of starting a new one
params = get_stream_to_online_ingestion_params(
self.client, request.project, feature_table, []
)
job_hash = params.get_job_hash()
for job in list_jobs(include_terminated=True, client=self.client):
if isinstance(job, StreamIngestionJob) and job.get_hash() == job_hash:
return StartStreamToOnlineIngestionJobResponse(id=job.get_id())
raise RuntimeError(
"Feast Job Service has control loop enabled, but couldn't find the existing stream ingestion job for the given FeatureTable"
)

# TODO: add extra_jars to request
job = start_stream_to_online_ingestion(
client=self.client,
Expand All @@ -131,7 +154,7 @@ def ListJobs(self, request, context):
jobs = list_jobs(
include_terminated=request.include_terminated, client=self.client
)
return ListJobsResponse(jobs=[self._job_to_proto(job) for job in jobs])
return ListJobsResponse(jobs=[_job_to_proto(job) for job in jobs])

def CancelJob(self, request, context):
"""Stop a single job"""
Expand All @@ -142,7 +165,30 @@ def CancelJob(self, request, context):
def GetJob(self, request, context):
"""Get details of a single job"""
job = get_job_by_id(request.job_id, client=self.client)
return GetJobResponse(job=self._job_to_proto(job))
return GetJobResponse(job=_job_to_proto(job))


def start_control_loop() -> None:
"""Starts control loop that continuously ensures that correct jobs are being run.

Currently this affects only the stream ingestion jobs. Please refer to
ensure_stream_ingestion_jobs for full documentation on how the check works.

"""
logging.info(
"Feast Job Service is starting a control loop in a background thread, "
"which will ensure that stream ingestion jobs are successfully running."
)
try:
client = feast.Client()
while True:
ensure_stream_ingestion_jobs(client, all_projects=True)
time.sleep(1)
except Exception:
traceback.print_exc()
finally:
# Send interrupt signal to the main thread to kill the server if control loop fails
os.kill(os.getpid(), signal.SIGINT)


class HealthServicer(HealthService_pb2_grpc.HealthServicer):
Expand All @@ -156,18 +202,110 @@ def intercept_service(self, continuation, handler_call_details):
return continuation(handler_call_details)


def start_job_service():
def start_job_service() -> None:
"""
Start Feast Job Service
"""

log_fmt = "%(asctime)s %(levelname)s %(message)s"
logging.basicConfig(level=logging.INFO, format=log_fmt)

client = feast.Client()

if client._config.getboolean(CONFIG_JOB_SERVICE_ENABLE_CONTROL_LOOP):
# Start the control loop thread only if it's enabled from configs
thread = threading.Thread(target=start_control_loop, daemon=True)
thread.start()

server = grpc.server(ThreadPoolExecutor(), interceptors=(LoggingInterceptor(),))
JobService_pb2_grpc.add_JobServiceServicer_to_server(JobServiceServicer(), server)
JobService_pb2_grpc.add_JobServiceServicer_to_server(
JobServiceServicer(client), server
)
HealthService_pb2_grpc.add_HealthServicer_to_server(HealthServicer(), server)
server.add_insecure_port("[::]:6568")
server.start()
print("Feast job server listening on port :6568")
logging.info("Feast Job Service is listening on port :6568")
server.wait_for_termination()


def _get_expected_job_hash_to_table_refs(
client: feast.Client, projects: List[str]
) -> Dict[str, Tuple[str, str]]:
"""
Checks all feature tables for the requires project(s) and determines all required stream
ingestion jobs from them. Outputs a map of the expected job_hash to a tuple of (project, table_name).

Args:
all_projects (bool): If true, runs the check for all project.
Otherwise only checks the current project.

Returns:
Dict[str, Tuple[str, str]]: Map of job_hash -> (project, table_name) for expected stream ingestion jobs
"""
job_hash_to_table_refs = {}

for project in projects:
feature_tables = client.list_feature_tables(project)
for feature_table in feature_tables:
if feature_table.stream_source is not None:
params = get_stream_to_online_ingestion_params(
client, project, feature_table, []
)
job_hash = params.get_job_hash()
job_hash_to_table_refs[job_hash] = (project, feature_table.name)

return job_hash_to_table_refs


def ensure_stream_ingestion_jobs(client: feast.Client, all_projects: bool):
"""Ensures all required stream ingestion jobs are running and cleans up the unnecessary jobs.

More concretely, it will determine
- which stream ingestion jobs are running
- which stream ingestion jobs should be running
And it'll do 2 kinds of operations
- Cancel all running jobs that should not be running
- Start all non-existent jobs that should be running

Args:
all_projects (bool): If true, runs the check for all project.
Otherwise only checks the client's current project.
"""

projects = client.list_projects() if all_projects else [client.project]

expected_job_hash_to_table_refs = _get_expected_job_hash_to_table_refs(
client, projects
)

expected_job_hashes = set(expected_job_hash_to_table_refs.keys())

jobs_by_hash: Dict[str, StreamIngestionJob] = {}
for job in client.list_jobs(include_terminated=False):
if isinstance(job, StreamIngestionJob):
jobs_by_hash[job.get_hash()] = job

existing_job_hashes = set(jobs_by_hash.keys())

job_hashes_to_cancel = existing_job_hashes - expected_job_hashes
job_hashes_to_start = expected_job_hashes - existing_job_hashes

logging.debug(
f"existing_job_hashes = {sorted(list(existing_job_hashes))} expected_job_hashes = {sorted(list(expected_job_hashes))}"
)

for job_hash in job_hashes_to_cancel:
job = jobs_by_hash[job_hash]
logging.info(
f"Cancelling a stream ingestion job with job_hash={job_hash} job_id={job.get_id()} status={job.get_status()}"
)
job.cancel()

for job_hash in job_hashes_to_start:
# Any job that we wish to start should be among expected table refs map
project, table_name = expected_job_hash_to_table_refs[job_hash]
logging.info(
f"Starting a stream ingestion job for project={project}, table_name={table_name} with job_hash={job_hash}"
)
feature_table = client.get_feature_table(name=table_name, project=project)
client.start_stream_to_online_ingestion(feature_table, [], project=project)
19 changes: 19 additions & 0 deletions sdk/python/feast/pyspark/abc.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import abc
import hashlib
import json
import os
from datetime import datetime
Expand Down Expand Up @@ -456,6 +457,13 @@ def get_arguments(self) -> List[str]:
"online",
]

def get_job_hash(self) -> str:
job_json = json.dumps(
{"source": self._source, "feature_table": self._feature_table},
sort_keys=True,
)
return hashlib.md5(job_json.encode()).hexdigest()


class BatchIngestionJob(SparkJob):
"""
Expand All @@ -468,6 +476,17 @@ class StreamIngestionJob(SparkJob):
Container for the streaming ingestion job result
"""

def get_hash(self) -> str:
"""Gets the consistent hash of this stream ingestion job.

The hash needs to be persisted at the data processing layer, so that we can get the same
hash when retrieving the job from Spark.

Returns:
str: The hash for this streaming ingestion job
"""
raise NotImplementedError


class JobLauncher(abc.ABC):
"""
Expand Down
Loading