Skip to content

Commit

Permalink
EMR support for offline-to-online job (#1026)
Browse files Browse the repository at this point in the history
Signed-off-by: Oleg Avdeev <oleg.v.avdeev@gmail.com>
  • Loading branch information
oavdeev authored Oct 12, 2020
1 parent d607d53 commit 42bf26a
Show file tree
Hide file tree
Showing 4 changed files with 341 additions and 4 deletions.
20 changes: 20 additions & 0 deletions sdk/python/feast/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -351,5 +351,25 @@ def project_list():
print(tabulate(table, headers=["NAME"], tablefmt="plain"))


@cli.command()
@click.option(
"--feature-table",
"-t",
help="Feature table name to ingest data into",
required=True,
)
@click.option("--start-time", "-s", help="Interval start", required=True)
@click.option("--end-time", "-e", help="Interval end", required=True)
def sync_offline_to_online(feature_table: str, start_time: str, end_time: str):
"""
Sync offline store to online.
"""
import feast.pyspark.aws.jobs

client = Client()
table = client.get_feature_table(feature_table)
feast.pyspark.aws.jobs.sync_offline_to_online(client, table, start_time, end_time)


if __name__ == "__main__":
cli()
6 changes: 2 additions & 4 deletions sdk/python/feast/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,7 @@ def list_entities(
entities.append(entity)
return entities

def get_entity(self, name: str, project: str = None) -> Union[Entity, None]:
def get_entity(self, name: str, project: str = None) -> Entity:
"""
Retrieves an entity.
Expand Down Expand Up @@ -548,9 +548,7 @@ def list_feature_tables(
feature_tables.append(feature_table)
return feature_tables

def get_feature_table(
self, name: str, project: str = None
) -> Union[FeatureTable, None]:
def get_feature_table(self, name: str, project: str = None) -> FeatureTable:
"""
Retrieves a feature table.
Expand Down
Empty file.
319 changes: 319 additions & 0 deletions sdk/python/feast/pyspark/aws/jobs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,319 @@
import hashlib
import json
import logging
import os
import random
import string
from typing import Any, Dict, Tuple

import boto3
import botocore
import yaml

from feast.client import Client
from feast.feature_table import FeatureTable
from feast.value_type import ValueType

log = logging.getLogger("aws")

# Config example:
#
# aws:
# logS3Prefix: "..a prefix for logs.."
# artifactS3Prefix: "..a prefix for jars.."
# existingClusterId: "..." # You need to set either existingClusterId
# runJobFlowTemplate: # or runJobFlowTemplate
# Name: "feast-ingestion-test"
# ReleaseLabel: emr-6.0.0
# Instances:
# InstanceFleets:
# - InstanceFleetType: MASTER
# TargetOnDemandCapacity: 0
# TargetSpotCapacity: 1
# LaunchSpecifications:
# SpotSpecification:
# TimeoutDurationMinutes: 60
# TimeoutAction: TERMINATE_CLUSTER
# InstanceTypeConfigs:
# - WeightedCapacity: 1
# EbsConfiguration:
# EbsBlockDeviceConfigs:
# - VolumeSpecification:
# SizeInGB: 32
# VolumeType: gp2
# VolumesPerInstance: 2
# BidPriceAsPercentageOfOnDemandPrice: 100
# InstanceType: m4.xlarge
# - InstanceFleetType: CORE
# TargetOnDemandCapacity: 0
# TargetSpotCapacity: 2
# LaunchSpecifications:
# SpotSpecification:
# TimeoutDurationMinutes: 60
# TimeoutAction: TERMINATE_CLUSTER
# InstanceTypeConfigs:
# - WeightedCapacity: 1
# EbsConfiguration:
# EbsBlockDeviceConfigs:
# - VolumeSpecification:
# SizeInGB: 32
# VolumeType: gp2
# VolumesPerInstance: 2
# BidPriceAsPercentageOfOnDemandPrice: 100
# InstanceType: m4.xlarge
# Ec2SubnetIds:
# - "..a subnet id within a VPC with a route to redis..."
# AdditionalMasterSecurityGroups:
# - "..a security group that allows access to redis..."
# AdditionalSlaveSecurityGroups:
# - "..a security group that allows access to redis..."
# KeepJobFlowAliveWhenNoSteps: false
# BootstrapActions:
# - Name: "s3://aws-bigdata-blog/artifacts/resize_storage/resize_storage.sh"
# ScriptBootstrapAction:
# Path: "s3://aws-bigdata-blog/artifacts/resize_storage/resize_storage.sh"
# Args:
# - "--scaling-factor"
# - "1.5"
# Applications:
# - Name: Hadoop
# - Name: Hive
# - Name: Spark
# - Name: Livy
# JobFlowRole: my-spark-node
# ServiceRole: my-worker-node
# ScaleDownBehavior: TERMINATE_AT_TASK_COMPLETION
# redisConfig:
# host: my.redis.com
# port: 6379
# ssl: true

SUPPORTED_EMR_VERSION = "emr-6.0.0"


def _sanity_check_config(config, config_path: str):
"""
Sanity check the config. We don't really have to do this here but if the spark job fails
you'll only find out much later and this is annoying. Those are not exhaustive, just
some checks to help debugging common configuration issues.
"""
aws_config = config.get("aws", {})

if ("runJobFlowTemplate" not in aws_config) and (
"existingClusterId" not in aws_config
):
log.error("{config_path}: either clusterId or runJobFlowTemplate should be set")
elif "runJobFlowTemplate" in aws_config:
runJobFlowTemplate = aws_config["runJobFlowTemplate"]
releaseLabel = runJobFlowTemplate.get("ReleaseLabel")
if releaseLabel != SUPPORTED_EMR_VERSION:
log.warn(
f"{config_path}: ReleaseLabel is set to {releaseLabel}. Recommended: {SUPPORTED_EMR_VERSION}"
)

if "redisConfig" not in config:
log.error("{config_path}: redisConfig is not set")


def _get_config_path() -> str:
return os.environ["JOB_SERVICE_CONFIG_PATH"]


def _load_job_service_config(config_path: str):
with open(config_path) as f:
config = yaml.load(f)
_sanity_check_config(config, config_path)
return config


def _random_string(length) -> str:
return "".join(random.choice(string.ascii_letters) for _ in range(length))


def _batch_source_to_json(batch_source):
return {
"file": {
"path": batch_source.file_options.file_url,
"mapping": dict(batch_source.field_mapping),
"timestampColumn": batch_source.timestamp_column,
}
}


def _feature_table_to_json(client: Client, feature_table):
return {
"features": [
{"name": f.name, "type": ValueType(f.dtype).name}
for f in feature_table.features
],
"project": "default",
"name": feature_table.name,
"entities": [
{"name": n, "type": client.get_entity(n).value_type}
for n in feature_table.entities
],
}


def _s3_split_path(path: str) -> Tuple[str, str]:
""" Convert s3:// url to (bucket, key) """
assert path.startswith("s3://")
_, _, bucket, key = path.split("/", 3)
return bucket, key


def _hash_file(local_path: str) -> str:
""" Compute sha256 hash of a file """
h = hashlib.sha256()
with open(local_path, "rb") as f:
for block in iter(lambda: f.read(2 ** 20), b""):
h.update(block)
return h.hexdigest()


def _s3_upload(local_path: str, remote_path: str) -> str:
"""
Upload a local file to S3. We store the file sha256 sum in S3 metadata and skip the upload
if the file hasn't changed.
"""
bucket, key = _s3_split_path(remote_path)
client = boto3.client("s3")

sha256sum = _hash_file(local_path)

try:
head_response = client.head_object(Bucket=bucket, Key=key)
if head_response["Metadata"]["sha256sum"] == sha256sum:
# File already exists
return remote_path
else:
log.info("Uploading {local_path} to {remote_path}")
client.upload_file(
local_path,
bucket,
key,
ExtraArgs={"Metadata": {"sha256sum": sha256sum}},
)
return remote_path
except botocore.exceptions.ClientError as e:
if e.response["Error"]["Code"] == "404":
log.info("Uploading {local_path} to {remote_path}")
client.upload_file(
local_path,
bucket,
key,
ExtraArgs={"Metadata": {"sha256sum": sha256sum}},
)
return remote_path
else:
raise


def _upload_jar(jar_s3_prefix: str, local_path: str) -> str:
return _s3_upload(
local_path, os.path.join(jar_s3_prefix, os.path.basename(local_path))
)


def _get_jar_s3_path(config) -> str:
"""
Extract job jar path from the configuration, upload it to S3 if necessary and return S3 path.
"""
jar_path = os.environ.get("INGESTION_JOB_JAR_PATH")
if jar_path is None:
raise ValueError("INGESTION_JOB_JAR_PATH not set")
elif jar_path.startswith("s3://"):
return jar_path
else:
artifactS3Prefix = config.get("aws").get("artifactS3Prefix")
if artifactS3Prefix:
return _upload_jar(artifactS3Prefix, jar_path)
else:
raise ValueError("artifactS3Prefix must be set")


def _sync_offline_to_online_step(
client: Client, config, feature_table, start_ts: str, end_ts: str
) -> Dict[str, Any]:
feature_table_json = _feature_table_to_json(client, feature_table)
source_json = _batch_source_to_json(feature_table.batch_source)

return {
"Name": "Feast Ingestion",
"HadoopJarStep": {
# TODO: generate those from proto
"Properties": [
{
"Key": "feast.step_metadata.job_type",
"Value": "OFFLINE_TO_ONLINE_JOB",
},
{
"Key": "feast.step_metadata.offline_to_online.table_name",
"Value": feature_table.name,
},
{
"Key": "feast.step_metadata.offline_to_online.start_ts",
"Value": start_ts,
},
{
"Key": "feast.step_metadata.offline_to_online.end_ts",
"Value": end_ts,
},
],
"Args": [
"spark-submit",
"--class",
"feast.ingestion.IngestionJob",
"--packages",
"com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.17.2",
_get_jar_s3_path(config),
"--mode",
"offline",
"--feature-table",
json.dumps(feature_table_json),
"--source",
json.dumps(source_json),
"--redis",
json.dumps(config["redisConfig"]),
"--start",
start_ts,
"--end",
end_ts,
],
"Jar": "command-runner.jar",
},
}


def _submit_emr_job(step: Dict[str, Any], config: Dict[str, Any]):
aws_config = config.get("aws", {})

emr = boto3.client("emr", region_name=aws_config.get("region"))

if "existingClusterId" in aws_config:
step["ActionOnFailure"] = "CONTINUE"
step_ids = emr.add_job_flow_steps(
JobFlowId=aws_config["existingClusterId"], Steps=[step],
)
print(step_ids)
else:
jobTemplate = aws_config["runJobFlowTemplate"]
step["ActionOnFailure"] = "TERMINATE_CLUSTER"

jobTemplate["Steps"] = [step]

if aws_config.get("logS3Prefix"):
jobTemplate["LogUri"] = os.path.join(
aws_config["logS3Prefix"], _random_string(5)
)

job = emr.run_job_flow(**jobTemplate)
print(job)


def sync_offline_to_online(
client: Client, feature_table: FeatureTable, start_ts: str, end_ts: str
):
config = _load_job_service_config(_get_config_path())
step = _sync_offline_to_online_step(client, config, feature_table, start_ts, end_ts)
_submit_emr_job(step, config)

0 comments on commit 42bf26a

Please sign in to comment.