Skip to content

Commit

Permalink
fix: Resolve PR Comments
Browse files Browse the repository at this point in the history
  • Loading branch information
sagar-salvi-apptware committed Jul 26, 2024
1 parent cad9e8e commit 1b2f7a6
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 40 deletions.
46 changes: 15 additions & 31 deletions metadata-ingestion/src/datahub/ingestion/source/aws/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -948,22 +948,21 @@ def get_profile_if_enabled(
partitions = response["Partitions"]
partition_keys = [k["Name"] for k in partition_keys]

with ThreadPoolExecutor(
max_workers=self.source_config.profiling.max_workers
) as executor:
futures = [
executor.submit(
self._create_partition_profile_mcp, mce, partition_keys, p
)
for p in partitions
]
for future in as_completed(futures):
try:
result = future.result()
if result:
yield result
except Exception as e:
logger.error(f"Error profiling partition: {e}")
for p in partitions:
table_stats = p.get("Parameters", {})
column_stats = p["StorageDescriptor"]["Columns"]

# only support single partition key
partition_spec = str({partition_keys[0]: p["Values"][0]})

if self.source_config.profiling.partition_patterns.allowed(
partition_spec
):
yield self._create_profile_mcp(
mce, table_stats, column_stats, partition_spec
).as_workunit()
else:
continue
else:
# ingest data profile without partition
table_stats = response["Table"]["Parameters"]
Expand All @@ -972,21 +971,6 @@ def get_profile_if_enabled(
mce, table_stats, column_stats
).as_workunit()

def _create_partition_profile_mcp(
self,
mce: MetadataChangeEventClass,
partition_keys: List[str],
partition: Dict[str, Any],
) -> Optional[MetadataWorkUnit]:
table_stats = partition.get("Parameters", {})
column_stats = partition["StorageDescriptor"]["Columns"]
partition_spec = str({partition_keys[0]: partition["Values"][0]})
if self.source_config.profiling.partition_patterns.allowed(partition_spec):
return self._create_profile_mcp(
mce, table_stats, column_stats, partition_spec
).as_workunit()
return None

def gen_database_key(self, database: str) -> DatabaseKey:
return DatabaseKey(
database=database,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import os
from typing import Optional

from pydantic.fields import Field
Expand All @@ -16,14 +15,6 @@ class GlueProfilingConfig(ConfigModel):
default=False,
description="Whether to perform profiling at table-level only, or include column-level profiling as well.",
)
# The default of (5 * cpu_count) is adopted from the default max_workers
# parameter of ThreadPoolExecutor. Given that profiling is often an I/O-bound
# task, it may make sense to increase this default value in the future.
# https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor
max_workers: int = Field(
default=5 * (os.cpu_count() or 4),
description="Number of worker threads to use for profiling. Set to 1 to disable.",
)
row_count: Optional[str] = Field(
default=None,
description="The parameter name for row count in glue table.",
Expand Down

0 comments on commit 1b2f7a6

Please sign in to comment.