Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest/redshift): Adding way to filter s3 paths in Redshift Source #10622

Merged
merged 2 commits into from
Jun 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ class S3LineageProviderConfig(ConfigModel):
description="Strip filename from s3 url. It only applies if path_specs are not specified.",
)

ignore_non_path_spec_path: bool = Field(
default=False,
description="Ignore paths that are not match in path_specs. It only applies if path_specs are specified.",
)


class S3DatasetLineageProviderConfigBase(ConfigModel):
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,13 +264,23 @@ def warn(self, log: logging.Logger, key: str, reason: str) -> None:
# TODO: Remove this method.
self.report.warning(key, reason)

def _get_s3_path(self, path: str) -> str:
def _get_s3_path(self, path: str) -> Optional[str]:
if self.config.s3_lineage_config:
for path_spec in self.config.s3_lineage_config.path_specs:
if path_spec.allowed(path):
_, table_path = path_spec.extract_table_name_and_path(path)
return table_path

if (
self.config.s3_lineage_config.ignore_non_path_spec_path
and len(self.config.s3_lineage_config.path_specs) > 0
):
self.report.num_lineage_dropped_s3_path += 1
logger.debug(
f"Skipping s3 path {path} as it does not match any path spec."
)
return None

if self.config.s3_lineage_config.strip_urls:
if "/" in urlparse(path).path:
return str(path.rsplit("/", 1)[0])
Expand Down Expand Up @@ -323,13 +333,14 @@ def _get_sources_from_query(
),
)

def _build_s3_path_from_row(self, filename: str) -> str:
def _build_s3_path_from_row(self, filename: str) -> Optional[str]:
path = filename.strip()
if urlparse(path).scheme != "s3":
raise ValueError(
f"Only s3 source supported with copy/unload. The source was: {path}"
)
return strip_s3_prefix(self._get_s3_path(path))
s3_path = self._get_s3_path(path)
return strip_s3_prefix(s3_path) if s3_path else None

def _get_sources(
self,
Expand Down Expand Up @@ -369,7 +380,11 @@ def _get_sources(
)
self.report.num_lineage_dropped_not_support_copy_path += 1
return [], None
path = strip_s3_prefix(self._get_s3_path(path))
s3_path = self._get_s3_path(path)
if s3_path is None:
return [], None

path = strip_s3_prefix(s3_path)
urn = make_dataset_urn_with_platform_instance(
platform=platform.value,
name=path,
Expand Down Expand Up @@ -539,6 +554,8 @@ def _get_target_lineage(
target_platform = LineageDatasetPlatform.S3
# Following call requires 'filename' key in lineage_row
target_path = self._build_s3_path_from_row(lineage_row.filename)
if target_path is None:
return None
urn = make_dataset_urn_with_platform_instance(
platform=target_platform.value,
name=target_path,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class RedshiftReport(
num_lineage_dropped_query_parser: int = 0
num_lineage_dropped_not_support_copy_path: int = 0
num_lineage_processed_temp_tables = 0
num_lineage_dropped_s3_path: int = 0

lineage_start_time: Optional[datetime] = None
lineage_end_time: Optional[datetime] = None
Expand Down
Loading