Skip to content

Commit

Permalink
EMR launcher and configuration (#1061)
Browse files Browse the repository at this point in the history
Signed-off-by: Oleg Avdeev <oleg.v.avdeev@gmail.com>
  • Loading branch information
oavdeev committed Oct 16, 2020
1 parent 9decb2d commit 7f2e40c
Show file tree
Hide file tree
Showing 11 changed files with 752 additions and 9 deletions.
41 changes: 39 additions & 2 deletions sdk/python/feast/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -364,11 +364,13 @@ def sync_offline_to_online(feature_table: str, start_time: str, end_time: str):
"""
Sync offline store to online.
"""
import feast.pyspark.aws.jobs
from datetime import datetime

client = Client()
table = client.get_feature_table(feature_table)
feast.pyspark.aws.jobs.sync_offline_to_online(client, table, start_time, end_time)
client.start_offline_to_online_ingestion(
table, datetime.fromisoformat(start_time), datetime.fromisoformat(end_time)
)


@cli.command()
Expand Down Expand Up @@ -424,5 +426,40 @@ def list_emr_jobs():
)


@cli.command()
@click.option(
"--features",
"-f",
help="Features in feature_table:feature format, comma separated",
required=True,
)
@click.option(
"--entity-df-path",
"-e",
help="Path to entity df in CSV format. It is assumed to have event_timestamp column and a header.",
required=True,
)
@click.option("--destination", "-d", help="Destination", default="")
def get_historical_features(features: str, entity_df_path: str, destination: str):
"""
Get historical features
"""
import pandas

client = Client()

# TODO: clean this up
entity_df = pandas.read_csv(entity_df_path, sep=None, engine="python",)

entity_df["event_timestamp"] = pandas.to_datetime(entity_df["event_timestamp"])

uploaded_df = client.stage_dataframe(
entity_df, "event_timestamp", "created_timestamp"
)

job = client.get_historical_features(features.split(","), uploaded_df,)
print(job.get_output_file_uri())


if __name__ == "__main__":
cli()
16 changes: 12 additions & 4 deletions sdk/python/feast/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
from feast.online_response import OnlineResponse, _infer_online_entity_rows
from feast.pyspark.abc import RetrievalJob, SparkJob
from feast.pyspark.launcher import (
stage_dataframe,
start_historical_feature_retrieval_job,
start_historical_feature_retrieval_spark_session,
start_offline_to_online_ingestion,
Expand Down Expand Up @@ -885,9 +886,16 @@ def _get_feature_tables_from_feature_refs(
return feature_tables

def start_offline_to_online_ingestion(
self,
feature_table: Union[FeatureTable, str],
start: Union[datetime, str],
end: Union[datetime, str],
self, feature_table: Union[FeatureTable, str], start: datetime, end: datetime,
) -> SparkJob:
return start_offline_to_online_ingestion(feature_table, start, end, self) # type: ignore

def stage_dataframe(
self,
df: pd.DataFrame,
event_timestamp_column: str,
created_timestamp_column: str,
) -> FileSource:
return stage_dataframe(
df, event_timestamp_column, created_timestamp_column, self
)
10 changes: 10 additions & 0 deletions sdk/python/feast/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,16 @@ class AuthProvider(Enum):
CONFIG_SPARK_HISTORICAL_FEATURE_OUTPUT_FORMAT = "historical_feature_output_format"
CONFIG_SPARK_HISTORICAL_FEATURE_OUTPUT_LOCATION = "historical_feature_output_location"

CONFIG_REDIS_HOST = "redis_host"
CONFIG_REDIS_PORT = "redis_port"
CONFIG_REDIS_SSL = "redis_ssl"

CONFIG_SPARK_EMR_REGION = "emr_region"
CONFIG_SPARK_EMR_CLUSTER_ID = "emr_cluster_id"
CONFIG_SPARK_EMR_CLUSTER_TEMPLATE_PATH = "emr_cluster_template_path"
CONFIG_SPARK_EMR_STAGING_LOCATION = "emr_staging_location"
CONFIG_SPARK_EMR_LOG_LOCATION = "emr_log_location"


# Configuration option default values
FEAST_DEFAULT_OPTIONS = {
Expand Down
35 changes: 34 additions & 1 deletion sdk/python/feast/pyspark/abc.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@
from enum import Enum
from typing import Dict, List, Optional

import pandas

from feast.data_source import FileSource


class SparkJobFailure(Exception):
"""
Expand Down Expand Up @@ -258,19 +262,31 @@ def __init__(
start: datetime,
end: datetime,
jar: str,
redis_host: str,
redis_port: int,
redis_ssl: bool,
):
self._feature_table = feature_table
self._source = source
self._start = start
self._end = end
self._jar = jar
self._redis_host = redis_host
self._redis_port = redis_port
self._redis_ssl = redis_ssl

def get_name(self) -> str:
return (
f"BatchIngestion-{self._feature_table['name']}-"
f"BatchIngestion-{self.get_feature_table_name()}-"
f"{self._start.strftime('%Y-%m-%d')}-{self._end.strftime('%Y-%m-%d')}"
)

def _get_redis_config(self):
return dict(host=self._redis_host, port=self._redis_port, ssl=self._redis_ssl)

def get_feature_table_name(self) -> str:
return self._feature_table["name"]

def get_main_file_path(self) -> str:
return self._jar

Expand All @@ -289,6 +305,8 @@ def get_arguments(self) -> List[str]:
self._start.strftime("%Y-%m-%dT%H:%M:%S"),
"--end",
self._end.strftime("%Y-%m-%dT%H:%M:%S"),
"--redis",
json.dumps(self._get_redis_config()),
]


Expand Down Expand Up @@ -334,3 +352,18 @@ def offline_to_online_ingestion(
IngestionJob: wrapper around remote job that can be used to check when job completed.
"""
raise NotImplementedError

@abc.abstractmethod
def stage_dataframe(
self,
df: pandas.DataFrame,
event_timestamp_column: str,
created_timestamp_column: str,
) -> FileSource:
"""
Upload a pandas dataframe so it is available to the Spark cluster.
Returns:
FileSource: representing the uploaded dataframe.
"""
raise NotImplementedError
Original file line number Diff line number Diff line change
Expand Up @@ -773,7 +773,7 @@ def _feature_table_from_dict(dct: Dict[str, Any]) -> FeatureTable:
spark = SparkSession.builder.getOrCreate()
args = _get_args()
feature_tables_conf = json.loads(args.feature_tables)
feature_tables_sources_conf = json.loads(args.feature_tables_source)
feature_tables_sources_conf = json.loads(args.feature_tables_sources)
entity_source_conf = json.loads(args.entity_source)
destination_conf = json.loads(args.destination)
start_job(
Expand Down
42 changes: 41 additions & 1 deletion sdk/python/feast/pyspark/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,18 @@

from feast.config import Config
from feast.constants import (
CONFIG_REDIS_HOST,
CONFIG_REDIS_PORT,
CONFIG_REDIS_SSL,
CONFIG_SPARK_DATAPROC_CLUSTER_NAME,
CONFIG_SPARK_DATAPROC_PROJECT,
CONFIG_SPARK_DATAPROC_REGION,
CONFIG_SPARK_DATAPROC_STAGING_LOCATION,
CONFIG_SPARK_EMR_CLUSTER_ID,
CONFIG_SPARK_EMR_CLUSTER_TEMPLATE_PATH,
CONFIG_SPARK_EMR_LOG_LOCATION,
CONFIG_SPARK_EMR_REGION,
CONFIG_SPARK_EMR_STAGING_LOCATION,
CONFIG_SPARK_HOME,
CONFIG_SPARK_INGESTION_JOB_JAR,
CONFIG_SPARK_LAUNCHER,
Expand Down Expand Up @@ -50,7 +58,27 @@ def _dataproc_launcher(config: Config) -> JobLauncher:
)


_launchers = {"standalone": _standalone_launcher, "dataproc": _dataproc_launcher}
def _emr_launcher(config: Config) -> JobLauncher:
from feast.pyspark.launchers import aws

def _get_optional(option):
if config.exists(option):
return config.get(option)

return aws.EmrClusterLauncher(
region=config.get(CONFIG_SPARK_EMR_REGION),
existing_cluster_id=_get_optional(CONFIG_SPARK_EMR_CLUSTER_ID),
new_cluster_template_path=_get_optional(CONFIG_SPARK_EMR_CLUSTER_TEMPLATE_PATH),
staging_location=config.get(CONFIG_SPARK_EMR_STAGING_LOCATION),
emr_log_location=config.get(CONFIG_SPARK_EMR_LOG_LOCATION),
)


_launchers = {
"standalone": _standalone_launcher,
"dataproc": _dataproc_launcher,
"emr": _emr_launcher,
}


def resolve_launcher(config: Config) -> JobLauncher:
Expand Down Expand Up @@ -177,5 +205,17 @@ def start_offline_to_online_ingestion(
feature_table=_feature_table_to_argument(client, feature_table),
start=start,
end=end,
redis_host=client._config.get(CONFIG_REDIS_HOST),
redis_port=client._config.getint(CONFIG_REDIS_PORT),
redis_ssl=client._config.getboolean(CONFIG_REDIS_SSL),
)
)


def stage_dataframe(
df, event_timestamp_column: str, created_timestamp_column: str, client: "Client"
) -> FileSource:
launcher = resolve_launcher(client._config)
return launcher.stage_dataframe(
df, event_timestamp_column, created_timestamp_column,
)
3 changes: 3 additions & 0 deletions sdk/python/feast/pyspark/launchers/aws/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .emr import EmrClusterLauncher, EmrIngestionJob, EmrRetrievalJob

__all__ = ["EmrRetrievalJob", "EmrIngestionJob", "EmrClusterLauncher"]
Loading

0 comments on commit 7f2e40c

Please sign in to comment.