Skip to content

Commit

Permalink
Merge pull request #617 from shivdeep-singh-ibm/handle_exception_with…
Browse files Browse the repository at this point in the history
…out_crashing

log error in case of exception
  • Loading branch information
Param-S authored Sep 24, 2024
2 parents 56a0cb1 + 6960272 commit d159d61
Showing 1 changed file with 14 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,21 +41,24 @@ def _default_mapper_func(self, table, file_name):
]

def process(self, repo: str, files: List[str]):
repo_table = self._read_table_for_group(self.repo_column_name, repo, files)
if len(repo_table) == 0:
# not processing empty table
return
try:
repo_table = self._read_table_for_group(self.repo_column_name, repo, files)
if len(repo_table) == 0:
# not processing empty table
return

def sanitize_path(repo_name):
return repo_name.replace("/", "%2F")
def sanitize_path(repo_name):
return repo_name.replace("/", "%2F")

repo = sanitize_path(repo)
tables = self.table_mapper(repo_table, repo)
repo = sanitize_path(repo)
tables = self.table_mapper(repo_table, repo)

for out_table, filename in tables:
for out_table, filename in tables:

self.logger.info(f"Write {filename}, tables: {len(out_table)}")
self._write_parquet(out_table, filename)
self.logger.info(f"Write {filename}, tables: {len(out_table)}")
self._write_parquet(out_table, filename)
except Exception as e:
self.logger.error(f"Failed processing repo: {repo}. {e}")

def _write_parquet(self, table, repo_name):
# since we already know the repo
Expand Down

0 comments on commit d159d61

Please sign in to comment.