Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingest/abs): Splitting abs utils into multiple files to not include abs specific includes which broke path_spec includes #10945

Merged
merged 2 commits into from
Jul 19, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions metadata-ingestion/src/datahub/ingestion/source/abs/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,15 @@
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.abs.config import DataLakeSourceConfig, PathSpec
from datahub.ingestion.source.abs.report import DataLakeSourceReport
from datahub.ingestion.source.azure.abs_util import (
from datahub.ingestion.source.azure.abs_folder_utils import (
get_abs_properties,
get_abs_tags,
list_folders,
)
from datahub.ingestion.source.azure.abs_utils import (
get_container_name,
get_container_relative_path,
get_key_prefix,
list_folders,
strip_abs_prefix,
)
from datahub.ingestion.source.data_lake_common.data_lake_utils import ContainerWUCreator
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
import logging
import os
import re
from typing import Dict, Iterable, List, Optional

from azure.storage.blob import BlobProperties
Expand All @@ -10,67 +8,10 @@
from datahub.ingestion.source.azure.azure_common import AzureConnectionConfig
from datahub.metadata.schema_classes import GlobalTagsClass, TagAssociationClass

ABS_PREFIXES_REGEX = re.compile(
r"(http[s]?://[a-z0-9]{3,24}\.blob\.core\.windows\.net/)"
)

logging.getLogger("py4j").setLevel(logging.ERROR)
logger: logging.Logger = logging.getLogger(__name__)


def is_abs_uri(uri: str) -> bool:
return bool(ABS_PREFIXES_REGEX.match(uri))


def get_abs_prefix(abs_uri: str) -> Optional[str]:
result = re.search(ABS_PREFIXES_REGEX, abs_uri)
if result and result.groups():
return result.group(1)
return None


def strip_abs_prefix(abs_uri: str) -> str:
# remove abs prefix https://<storage-account>.blob.core.windows.net
abs_prefix = get_abs_prefix(abs_uri)
if not abs_prefix:
raise ValueError(
f"Not an Azure Blob Storage URI. Must match the following regular expression: {str(ABS_PREFIXES_REGEX)}"
)
length_abs_prefix = len(abs_prefix)
return abs_uri[length_abs_prefix:]


def make_abs_urn(abs_uri: str, env: str) -> str:
abs_name = strip_abs_prefix(abs_uri)

if abs_name.endswith("/"):
abs_name = abs_name[:-1]

name, extension = os.path.splitext(abs_name)

if extension != "":
extension = extension[1:] # remove the dot
return f"urn:li:dataset:(urn:li:dataPlatform:abs,{name}_{extension},{env})"

return f"urn:li:dataset:(urn:li:dataPlatform:abs,{abs_name},{env})"


def get_container_name(abs_uri: str) -> str:
if not is_abs_uri(abs_uri):
raise ValueError(
f"Not an Azure Blob Storage URI. Must match the following regular expression: {str(ABS_PREFIXES_REGEX)}"
)
return strip_abs_prefix(abs_uri).split("/")[0]


def get_key_prefix(abs_uri: str) -> str:
if not is_abs_uri(abs_uri):
raise ValueError(
f"Not an Azure Blob Storage URI. Must match the following regular expression: {str(ABS_PREFIXES_REGEX)}"
)
return strip_abs_prefix(abs_uri).split("/", maxsplit=1)[1]


def get_abs_properties(
container_name: str,
blob_name: Optional[str],
Expand Down Expand Up @@ -280,7 +221,3 @@ def list_folders(
this_dict[folder_name] = folder_name

yield f"{folder_name}"


def get_container_relative_path(abs_uri: str) -> str:
return "/".join(strip_abs_prefix(abs_uri).split("/")[1:])
66 changes: 66 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/source/azure/abs_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import os
import re
from typing import Optional

# This file should not import any abs spectific modules as we import it in path_spec.py in datat_lake_common.py

ABS_PREFIXES_REGEX = re.compile(
r"(http[s]?://[a-z0-9]{3,24}\.blob\.core\.windows\.net/)"
)


def is_abs_uri(uri: str) -> bool:
return bool(ABS_PREFIXES_REGEX.match(uri))


def get_abs_prefix(abs_uri: str) -> Optional[str]:
result = re.search(ABS_PREFIXES_REGEX, abs_uri)
if result and result.groups():
return result.group(1)
return None


def strip_abs_prefix(abs_uri: str) -> str:
# remove abs prefix https://<storage-account>.blob.core.windows.net
abs_prefix = get_abs_prefix(abs_uri)
if not abs_prefix:
raise ValueError(
f"Not an Azure Blob Storage URI. Must match the following regular expression: {str(ABS_PREFIXES_REGEX)}"
)
length_abs_prefix = len(abs_prefix)
return abs_uri[length_abs_prefix:]


def make_abs_urn(abs_uri: str, env: str) -> str:
abs_name = strip_abs_prefix(abs_uri)

if abs_name.endswith("/"):
abs_name = abs_name[:-1]

name, extension = os.path.splitext(abs_name)

if extension != "":
extension = extension[1:] # remove the dot
return f"urn:li:dataset:(urn:li:dataPlatform:abs,{name}_{extension},{env})"

return f"urn:li:dataset:(urn:li:dataPlatform:abs,{abs_name},{env})"


def get_container_name(abs_uri: str) -> str:
if not is_abs_uri(abs_uri):
raise ValueError(
f"Not an Azure Blob Storage URI. Must match the following regular expression: {str(ABS_PREFIXES_REGEX)}"
)
return strip_abs_prefix(abs_uri).split("/")[0]


def get_key_prefix(abs_uri: str) -> str:
if not is_abs_uri(abs_uri):
raise ValueError(
f"Not an Azure Blob Storage URI. Must match the following regular expression: {str(ABS_PREFIXES_REGEX)}"
)
return strip_abs_prefix(abs_uri).split("/", maxsplit=1)[1]


def get_container_relative_path(abs_uri: str) -> str:
return "/".join(strip_abs_prefix(abs_uri).split("/")[1:])
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
get_s3_prefix,
is_s3_uri,
)
from datahub.ingestion.source.azure.abs_util import (
from datahub.ingestion.source.azure.abs_utils import (
get_abs_prefix,
get_container_name,
get_container_relative_path,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

from datahub.configuration.common import ConfigModel
from datahub.ingestion.source.aws.s3_util import is_s3_uri
from datahub.ingestion.source.azure.abs_util import is_abs_uri
from datahub.ingestion.source.azure.abs_utils import is_abs_uri
from datahub.ingestion.source.gcs.gcs_utils import is_gcs_uri

# hide annoying debug errors from py4j
Expand Down
Loading