Skip to content

Commit

Permalink
Multiple tiny AWS related fixes (#1083)
Browse files Browse the repository at this point in the history
* multiple tiny AWS related fixes

* Fix object path math in client staging uploader
* Add random suffix to the historical retrieval output location
* EMR job now actually checks for status
* add boto3 dep

Signed-off-by: Oleg Avdeev <oleg.v.avdeev@gmail.com>

* a couple more bugfixes

Signed-off-by: Oleg Avdeev <oleg.v.avdeev@gmail.com>
  • Loading branch information
oavdeev authored Oct 23, 2020
1 parent e426ab8 commit d779567
Show file tree
Hide file tree
Showing 6 changed files with 21 additions and 11 deletions.
1 change: 1 addition & 0 deletions infra/docker/jobservice/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ RUN make compile-protos-python
COPY .git .git
COPY README.md README.md
RUN pip install -e sdk/python -U
RUN pip install "s3fs" "boto3" "urllib3>=1.25.4"

#
# Download grpc_health_probe to run health checks
Expand Down
1 change: 1 addition & 0 deletions infra/docker/jupyter/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ RUN pip install -r sdk/python/requirements-ci.txt
COPY .git .git
COPY README.md README.md
RUN pip install -e sdk/python -U
RUN pip install "s3fs" "boto3" "urllib3>=1.25.4"

# Switch back to original user and workdir
USER $NB_UID
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/feast/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ def job():
pass


@job.command(name="start-offline-to-online")
@job.command(name="sync-offline-to-online")
@click.option(
"--feature-table",
"-t",
Expand Down
8 changes: 6 additions & 2 deletions sdk/python/feast/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -913,7 +913,7 @@ def get_historical_features(
else entity_staging_uri.netloc
)
staging_client.upload_file(
df_export_path.name, bucket, entity_staging_uri.path
df_export_path.name, bucket, entity_staging_uri.path.lstrip("/")
)
entity_source = FileSource(
"event_timestamp",
Expand All @@ -923,7 +923,11 @@ def get_historical_features(
)

return start_historical_feature_retrieval_job(
self, entity_source, feature_tables, output_format, output_location
self,
entity_source,
feature_tables,
output_format,
os.path.join(output_location, str(uuid.uuid4())),
)

def get_historical_features_df(
Expand Down
10 changes: 7 additions & 3 deletions sdk/python/feast/pyspark/launchers/aws/emr.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
JobLauncher,
RetrievalJob,
RetrievalJobParameters,
SparkJobFailure,
SparkJobStatus,
StreamIngestionJob,
StreamIngestionJobParameters,
Expand Down Expand Up @@ -83,10 +84,13 @@ def __init__(self, emr_client, job_ref: EmrJobRef, output_file_uri: str):
self._output_file_uri = output_file_uri

def get_output_file_uri(self, timeout_sec=None):
_wait_for_job_state(
state = _wait_for_job_state(
self._emr_client, self._job_ref, TERMINAL_STEP_STATES, timeout_sec
)
return self._output_file_uri
if state in SUCCEEDED_STEP_STATES:
return self._output_file_uri
else:
raise SparkJobFailure("Spark job failed")


class EmrBatchIngestionJob(EmrJobMixin, BatchIngestionJob):
Expand Down Expand Up @@ -215,7 +219,7 @@ def historical_feature_retrieval(
return EmrRetrievalJob(
self._emr_client(),
job_ref,
os.path.join(job_params.get_destination_path(), _random_string(8)),
os.path.join(job_params.get_destination_path()),
)

def offline_to_online_ingestion(
Expand Down
10 changes: 5 additions & 5 deletions sdk/python/feast/pyspark/launchers/aws/emr_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,13 +258,13 @@ def _wait_for_job_state(
job: EmrJobRef,
desired_states: List[str],
timeout_seconds: Optional[int],
):
) -> str:
if job.step_id is None:
step_id = _get_first_step_id(emr_client, job.cluster_id)
else:
step_id = job.step_id

_wait_for_step_state(
return _wait_for_step_state(
emr_client, job.cluster_id, step_id, desired_states, timeout_seconds
)

Expand All @@ -290,17 +290,17 @@ def _wait_for_step_state(
step_id: str,
desired_states: List[str],
timeout_seconds: Optional[int],
):
) -> str:
"""
Wait up to timeout seconds for job to go into one of the desired states.
"""
start_time = time.time()
while (timeout_seconds is None) or (time.time() - start_time < timeout_seconds):
state = _get_step_state(emr_client, cluster_id, step_id)
if state in desired_states:
return
return state
else:
time.sleep(0.5)
time.sleep(1)
else:
raise TimeoutError(
f'Timeout waiting for job state to become {"|".join(desired_states)}'
Expand Down

0 comments on commit d779567

Please sign in to comment.