Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EMR launcher #1061

Merged
merged 1 commit into from
Oct 16, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 39 additions & 2 deletions sdk/python/feast/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -364,11 +364,13 @@ def sync_offline_to_online(feature_table: str, start_time: str, end_time: str):
"""
Sync offline store to online.
"""
import feast.pyspark.aws.jobs
from datetime import datetime

client = Client()
table = client.get_feature_table(feature_table)
feast.pyspark.aws.jobs.sync_offline_to_online(client, table, start_time, end_time)
client.start_offline_to_online_ingestion(
table, datetime.fromisoformat(start_time), datetime.fromisoformat(end_time)
)


@cli.command()
Expand Down Expand Up @@ -424,5 +426,40 @@ def list_emr_jobs():
)


@cli.command()
@click.option(
"--features",
"-f",
help="Features in feature_table:feature format, comma separated",
required=True,
)
@click.option(
"--entity-df-path",
"-e",
help="Path to entity df in CSV format. It is assumed to have event_timestamp column and a header.",
required=True,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am wondering if it might be better if the users are expected to provide a uri that is recognizable by the Spark Launcher, such as s3:// for EMR, gs:// for Dataproc, and file:// for standalone cluster launchers running locally. That way, we skip the process of reading to Pandas dataframe and convert the file again.

Staged panda dataframe is still a useful method to have though, because we plan to add support to pandas dataframe as input argument for historical feature retrieval method, for Feast Client.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, it is mostly for convenience/testing for now, to reduce a # of steps that someone need to do to see if historical retrieval works. I wouldn't expect people to normally use local CSV for entity dfs. I'd tweak this interface in later PRs though.

)
@click.option("--destination", "-d", help="Destination", default="")
def get_historical_features(features: str, entity_df_path: str, destination: str):
"""
Get historical features
"""
import pandas

client = Client()

# TODO: clean this up
entity_df = pandas.read_csv(entity_df_path, sep=None, engine="python",)

entity_df["event_timestamp"] = pandas.to_datetime(entity_df["event_timestamp"])

uploaded_df = client.stage_dataframe(
entity_df, "event_timestamp", "created_timestamp"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a heads up: "created_timestamp" is actually supposed to be optional for the entity, so it's a bug that we need to resolve in another PR.

)

job = client.get_historical_features(features.split(","), uploaded_df,)
print(job.get_output_file_uri())


if __name__ == "__main__":
cli()
16 changes: 12 additions & 4 deletions sdk/python/feast/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
from feast.online_response import OnlineResponse, _infer_online_entity_rows
from feast.pyspark.abc import RetrievalJob, SparkJob
from feast.pyspark.launcher import (
stage_dataframe,
start_historical_feature_retrieval_job,
start_historical_feature_retrieval_spark_session,
start_offline_to_online_ingestion,
Expand Down Expand Up @@ -885,9 +886,16 @@ def _get_feature_tables_from_feature_refs(
return feature_tables

def start_offline_to_online_ingestion(
self,
feature_table: Union[FeatureTable, str],
start: Union[datetime, str],
end: Union[datetime, str],
self, feature_table: Union[FeatureTable, str], start: datetime, end: datetime,
) -> SparkJob:
return start_offline_to_online_ingestion(feature_table, start, end, self) # type: ignore

def stage_dataframe(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We actually already have method in the Feast Client that takes dataframe and put it in offline storage. Currently it called ingest but I guess we'll rename it. Anyway, this shouldn't be a part of spark interop, since it's not related to spark

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my mind this is somewhat different from ingest. It is not intended for permanent storage, this is a convenience method "take this dataframe, put it in some temp location where Spark can access it". I agree launcher might not be the best place for it, just gotta be some code that can read staging_location from the config to construct the temp path.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if it's some upload-to-temp-location function - it's probably shouldn't be part of Feast Client API. maybe contrib?
or just keep it internally. What's the user use case?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just the user convenience - if you're getting started with feast and want to run historical retrieval, we can upload your pandas entity dataframe to S3 so you don't have to think about how to upload it and what bucket to use. We'll just put it in staging location for you. Basically trying to remove an extra friction point in onboarding and tutorials. Right now it is only used for CLI historical-retrieval command.

I agree it may not be the best place for it, but at the same time it needs to have access to the config to figure out where to upload the dataframe. So i can't make it completely detached from the client (that has the config object)

self,
df: pd.DataFrame,
event_timestamp_column: str,
created_timestamp_column: str,
) -> FileSource:
return stage_dataframe(
df, event_timestamp_column, created_timestamp_column, self
)
10 changes: 10 additions & 0 deletions sdk/python/feast/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,16 @@ class AuthProvider(Enum):
CONFIG_SPARK_HISTORICAL_FEATURE_OUTPUT_FORMAT = "historical_feature_output_format"
CONFIG_SPARK_HISTORICAL_FEATURE_OUTPUT_LOCATION = "historical_feature_output_location"

CONFIG_REDIS_HOST = "redis_host"
CONFIG_REDIS_PORT = "redis_port"
CONFIG_REDIS_SSL = "redis_ssl"

CONFIG_SPARK_EMR_REGION = "emr_region"
CONFIG_SPARK_EMR_CLUSTER_ID = "emr_cluster_id"
CONFIG_SPARK_EMR_CLUSTER_TEMPLATE_PATH = "emr_cluster_template_path"
CONFIG_SPARK_EMR_STAGING_LOCATION = "emr_staging_location"
CONFIG_SPARK_EMR_LOG_LOCATION = "emr_log_location"


# Configuration option default values
FEAST_DEFAULT_OPTIONS = {
Expand Down
35 changes: 34 additions & 1 deletion sdk/python/feast/pyspark/abc.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@
from enum import Enum
from typing import Dict, List, Optional

import pandas

from feast.data_source import FileSource


class SparkJobFailure(Exception):
"""
Expand Down Expand Up @@ -258,19 +262,31 @@ def __init__(
start: datetime,
end: datetime,
jar: str,
redis_host: str,
redis_port: int,
redis_ssl: bool,
):
self._feature_table = feature_table
self._source = source
self._start = start
self._end = end
self._jar = jar
self._redis_host = redis_host
self._redis_port = redis_port
self._redis_ssl = redis_ssl

def get_name(self) -> str:
return (
f"BatchIngestion-{self._feature_table['name']}-"
f"BatchIngestion-{self.get_feature_table_name()}-"
f"{self._start.strftime('%Y-%m-%d')}-{self._end.strftime('%Y-%m-%d')}"
)

def _get_redis_config(self):
return dict(host=self._redis_host, port=self._redis_port, ssl=self._redis_ssl)

def get_feature_table_name(self) -> str:
return self._feature_table["name"]

def get_main_file_path(self) -> str:
return self._jar

Expand All @@ -289,6 +305,8 @@ def get_arguments(self) -> List[str]:
self._start.strftime("%Y-%m-%dT%H:%M:%S"),
"--end",
self._end.strftime("%Y-%m-%dT%H:%M:%S"),
"--redis",
json.dumps(self._get_redis_config()),
]


Expand Down Expand Up @@ -334,3 +352,18 @@ def offline_to_online_ingestion(
IngestionJob: wrapper around remote job that can be used to check when job completed.
"""
raise NotImplementedError

@abc.abstractmethod
oavdeev marked this conversation as resolved.
Show resolved Hide resolved
def stage_dataframe(
self,
df: pandas.DataFrame,
event_timestamp_column: str,
created_timestamp_column: str,
) -> FileSource:
"""
Upload a pandas dataframe so it is available to the Spark cluster.

Returns:
FileSource: representing the uploaded dataframe.
"""
raise NotImplementedError
Original file line number Diff line number Diff line change
Expand Up @@ -773,7 +773,7 @@ def _feature_table_from_dict(dct: Dict[str, Any]) -> FeatureTable:
spark = SparkSession.builder.getOrCreate()
args = _get_args()
feature_tables_conf = json.loads(args.feature_tables)
feature_tables_sources_conf = json.loads(args.feature_tables_source)
feature_tables_sources_conf = json.loads(args.feature_tables_sources)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for catching the typo

entity_source_conf = json.loads(args.entity_source)
destination_conf = json.loads(args.destination)
start_job(
Expand Down
42 changes: 41 additions & 1 deletion sdk/python/feast/pyspark/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,18 @@

from feast.config import Config
from feast.constants import (
CONFIG_REDIS_HOST,
CONFIG_REDIS_PORT,
CONFIG_REDIS_SSL,
CONFIG_SPARK_DATAPROC_CLUSTER_NAME,
CONFIG_SPARK_DATAPROC_PROJECT,
CONFIG_SPARK_DATAPROC_REGION,
CONFIG_SPARK_DATAPROC_STAGING_LOCATION,
CONFIG_SPARK_EMR_CLUSTER_ID,
CONFIG_SPARK_EMR_CLUSTER_TEMPLATE_PATH,
CONFIG_SPARK_EMR_LOG_LOCATION,
CONFIG_SPARK_EMR_REGION,
CONFIG_SPARK_EMR_STAGING_LOCATION,
CONFIG_SPARK_HOME,
CONFIG_SPARK_INGESTION_JOB_JAR,
CONFIG_SPARK_LAUNCHER,
Expand Down Expand Up @@ -50,7 +58,27 @@ def _dataproc_launcher(config: Config) -> JobLauncher:
)


_launchers = {"standalone": _standalone_launcher, "dataproc": _dataproc_launcher}
def _emr_launcher(config: Config) -> JobLauncher:
from feast.pyspark.launchers import aws

def _get_optional(option):
if config.exists(option):
return config.get(option)

return aws.EmrClusterLauncher(
region=config.get(CONFIG_SPARK_EMR_REGION),
existing_cluster_id=_get_optional(CONFIG_SPARK_EMR_CLUSTER_ID),
new_cluster_template_path=_get_optional(CONFIG_SPARK_EMR_CLUSTER_TEMPLATE_PATH),
staging_location=config.get(CONFIG_SPARK_EMR_STAGING_LOCATION),
emr_log_location=config.get(CONFIG_SPARK_EMR_LOG_LOCATION),
)


_launchers = {
"standalone": _standalone_launcher,
"dataproc": _dataproc_launcher,
"emr": _emr_launcher,
}


def resolve_launcher(config: Config) -> JobLauncher:
Expand Down Expand Up @@ -177,5 +205,17 @@ def start_offline_to_online_ingestion(
feature_table=_feature_table_to_argument(client, feature_table),
start=start,
end=end,
redis_host=client._config.get(CONFIG_REDIS_HOST),
redis_port=client._config.getint(CONFIG_REDIS_PORT),
redis_ssl=client._config.getboolean(CONFIG_REDIS_SSL),
)
)


def stage_dataframe(
df, event_timestamp_column: str, created_timestamp_column: str, client: "Client"
) -> FileSource:
launcher = resolve_launcher(client._config)
return launcher.stage_dataframe(
df, event_timestamp_column, created_timestamp_column,
)
3 changes: 3 additions & 0 deletions sdk/python/feast/pyspark/launchers/aws/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .emr import EmrClusterLauncher, EmrIngestionJob, EmrRetrievalJob

__all__ = ["EmrRetrievalJob", "EmrIngestionJob", "EmrClusterLauncher"]
Loading