Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multiple tiny AWS related fixes #1083

Merged
merged 2 commits into from
Oct 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions infra/docker/jobservice/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ RUN make compile-protos-python
COPY .git .git
COPY README.md README.md
RUN pip install -e sdk/python -U
RUN pip install "s3fs" "boto3" "urllib3>=1.25.4"

#
# Download grpc_health_probe to run health checks
Expand Down
1 change: 1 addition & 0 deletions infra/docker/jupyter/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ RUN pip install -r sdk/python/requirements-ci.txt
COPY .git .git
COPY README.md README.md
RUN pip install -e sdk/python -U
RUN pip install "s3fs" "boto3" "urllib3>=1.25.4"

# Switch back to original user and workdir
USER $NB_UID
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/feast/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ def job():
pass


@job.command(name="start-offline-to-online")
@job.command(name="sync-offline-to-online")
@click.option(
"--feature-table",
"-t",
Expand Down
8 changes: 6 additions & 2 deletions sdk/python/feast/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -913,7 +913,7 @@ def get_historical_features(
else entity_staging_uri.netloc
)
staging_client.upload_file(
df_export_path.name, bucket, entity_staging_uri.path
df_export_path.name, bucket, entity_staging_uri.path.lstrip("/")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't that strip be part of implementation (GS, s3)?

Copy link
Collaborator Author

@oavdeev oavdeev Oct 22, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can put strip into both implementations too (gcp actually has it already). It just goes a bit against S3 (and i suspect GCP) semantics where /a and a are separate objects

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will look into the GCP part again.

)
entity_source = FileSource(
"event_timestamp",
Expand All @@ -923,7 +923,11 @@ def get_historical_features(
)

return start_historical_feature_retrieval_job(
self, entity_source, feature_tables, output_format, output_location
self,
entity_source,
feature_tables,
output_format,
os.path.join(output_location, str(uuid.uuid4())),
)

def get_historical_features_df(
Expand Down
10 changes: 7 additions & 3 deletions sdk/python/feast/pyspark/launchers/aws/emr.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
JobLauncher,
RetrievalJob,
RetrievalJobParameters,
SparkJobFailure,
SparkJobStatus,
StreamIngestionJob,
StreamIngestionJobParameters,
Expand Down Expand Up @@ -83,10 +84,13 @@ def __init__(self, emr_client, job_ref: EmrJobRef, output_file_uri: str):
self._output_file_uri = output_file_uri

def get_output_file_uri(self, timeout_sec=None):
_wait_for_job_state(
state = _wait_for_job_state(
self._emr_client, self._job_ref, TERMINAL_STEP_STATES, timeout_sec
)
return self._output_file_uri
if state in SUCCEEDED_STEP_STATES:
return self._output_file_uri
else:
raise SparkJobFailure("Spark job failed")


class EmrBatchIngestionJob(EmrJobMixin, BatchIngestionJob):
Expand Down Expand Up @@ -215,7 +219,7 @@ def historical_feature_retrieval(
return EmrRetrievalJob(
self._emr_client(),
job_ref,
os.path.join(job_params.get_destination_path(), _random_string(8)),
os.path.join(job_params.get_destination_path()),
)

def offline_to_online_ingestion(
Expand Down
10 changes: 5 additions & 5 deletions sdk/python/feast/pyspark/launchers/aws/emr_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,13 +258,13 @@ def _wait_for_job_state(
job: EmrJobRef,
desired_states: List[str],
timeout_seconds: Optional[int],
):
) -> str:
if job.step_id is None:
step_id = _get_first_step_id(emr_client, job.cluster_id)
else:
step_id = job.step_id

_wait_for_step_state(
return _wait_for_step_state(
emr_client, job.cluster_id, step_id, desired_states, timeout_seconds
)

Expand All @@ -290,17 +290,17 @@ def _wait_for_step_state(
step_id: str,
desired_states: List[str],
timeout_seconds: Optional[int],
):
) -> str:
"""
Wait up to timeout seconds for job to go into one of the desired states.
"""
start_time = time.time()
while (timeout_seconds is None) or (time.time() - start_time < timeout_seconds):
state = _get_step_state(emr_client, cluster_id, step_id)
if state in desired_states:
return
return state
else:
time.sleep(0.5)
time.sleep(1)
else:
raise TimeoutError(
f'Timeout waiting for job state to become {"|".join(desired_states)}'
Expand Down