From 5983f40f8f5df5dbbcd2640f83ef82c19cdb4d19 Mon Sep 17 00:00:00 2001 From: Hai Nguyen Date: Mon, 6 Nov 2023 16:19:22 +0700 Subject: [PATCH] fix: improve parsing bytewax job status Signed-off-by: Hai Nguyen --- .../bytewax/bytewax_materialization_job.py | 23 +++++++++++++------ 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_job.py b/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_job.py index 4105be90ee..da969d5a88 100644 --- a/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_job.py +++ b/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_job.py @@ -35,13 +35,22 @@ def status(self): if job_status.active is not None: if job_status.completion_time is None: return MaterializationJobStatus.RUNNING - elif job_status.failed is not None: - self._error = Exception(f"Job {self.job_id()} failed") - return MaterializationJobStatus.ERROR - elif job_status.active is None: - if job_status.completion_time is not None: - if job_status.conditions[0].type == "Complete": - return MaterializationJobStatus.SUCCEEDED + else: + if ( + job_status.completion_time is not None + and job_status.conditions[0].type == "Complete" + ): + return MaterializationJobStatus.SUCCEEDED + + if ( + job_status.conditions is not None + and job_status.conditions[0].type == "Failed" + ): + self._error = Exception( + f"Job {self.job_id()} failed with reason: " + f"{job_status.conditions[0].message}" + ) + return MaterializationJobStatus.ERROR return MaterializationJobStatus.WAITING def should_be_retried(self):