Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#5507] feat(python): Support Azure blob storage for GVFS python client #5538

Merged
merged 23 commits into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 69 additions & 20 deletions clients/client-python/gravitino/filesystem/gvfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ class StorageType(Enum):
GCS = "gs"
S3A = "s3a"
OSS = "oss"
ABS = "abfss"


class FilesetContextPair:
Expand Down Expand Up @@ -320,6 +321,7 @@ def mv(self, path1, path2, recursive=False, maxdepth=None, **kwargs):
StorageType.GCS,
StorageType.S3A,
StorageType.OSS,
StorageType.ABS,
]:
src_context_pair.filesystem().mv(
self._strip_storage_protocol(storage_type, src_actual_path),
Expand Down Expand Up @@ -577,6 +579,30 @@ def _convert_actual_path(
)

actual_prefix = ops["host"] + ops["path"]
elif storage_location.startswith(f"{StorageType.ABS.value}://"):
ops = infer_storage_options(storage_location)
if "username" not in ops or "host" not in ops or "path" not in ops:
raise GravitinoRuntimeException(
f"Storage location:{storage_location} doesn't support now. as the username,"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Username, host...are required"

f"host and path are required in the storage location."
)
actual_prefix = f"{StorageType.ABS.value}://{ops['username']}@{ops['host']}{ops['path']}"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add / before {ops['path']}?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually not, please see L611, other file system like HDFS, file also omit the trialling slash.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean "before", not "after".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The value of path starts with /.


# For ABS, the actual path should be the same as the virtual path is like
# 'wasbs//bucket1@xiaoyu123.blob.core.windows.net/test_gvfs_catalog6588/test_gvfs_schema/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we specify "wasbs"? Also what's the meaning here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have removed the code here, the logic is specific for paths with wasbs prefixes.

# test_gvfs_fileset/test_cat/test.file'
# we need to add ':' after the wasbs
if actual_path.startswith(f"{StorageType.ABS.value}//"):
actual_path = actual_path.replace(
f"{StorageType.ABS.value}//", f"{StorageType.ABS.value}://"
)

# the actual path may be '{container}/{path}', we need to add the host and username
# get the path from {container}/{path}
if not actual_path.startswith(f"{StorageType.ABS}"):
path_without_username = actual_path[actual_path.index("/") + 1 :]
actual_path = f"{StorageType.ABS.value}://{ops['username']}@{ops['host']}/{path_without_username}"

elif storage_location.startswith(f"{StorageType.LOCAL.value}:/"):
actual_prefix = storage_location[len(f"{StorageType.LOCAL.value}:") :]
else:
Expand Down Expand Up @@ -613,33 +639,22 @@ def _convert_actual_info(
entry["name"], storage_location, virtual_location
)

# if entry contains 'mtime', then return the entry with 'mtime' else
# if entry contains 'LastModified', then return the entry with 'LastModified'

last_modified = None
if "mtime" in entry:
# HDFS and GCS
return {
"name": path,
"size": entry["size"],
"type": entry["type"],
"mtime": entry["mtime"],
}

if "LastModified" in entry:
last_modified = entry["mtime"]
elif "LastModified" in entry:
# S3 and OSS
return {
"name": path,
"size": entry["size"],
"type": entry["type"],
"mtime": entry["LastModified"],
}

# Unknown
last_modified = entry["LastModified"]
elif "last_modified" in entry:
# Azure Blob Storage
last_modified = entry["last_modified"]

return {
"name": path,
"size": entry["size"],
"type": entry["type"],
"mtime": None,
"mtime": last_modified,
}

def _get_fileset_context(self, virtual_path: str, operation: FilesetDataOperation):
Expand Down Expand Up @@ -745,6 +760,8 @@ def _recognize_storage_type(path: str):
return StorageType.S3A
if path.startswith(f"{StorageType.OSS.value}://"):
return StorageType.OSS
if path.startswith(f"{StorageType.ABS.value}://"):
return StorageType.ABS
raise GravitinoRuntimeException(
f"Storage type doesn't support now. Path:{path}"
)
Expand Down Expand Up @@ -801,6 +818,12 @@ def _strip_storage_protocol(storage_type: StorageType, path: str):
if storage_type == StorageType.LOCAL:
return path[len(f"{StorageType.LOCAL.value}:") :]

## We need to remove the protocol and host from the path for instance
# 'wsabs://container@account/path' to 'container/path'
yuqi1129 marked this conversation as resolved.
Show resolved Hide resolved
if storage_type == StorageType.ABS:
ops = infer_storage_options(path)
return ops["username"] + ops["path"]

# OSS has different behavior than S3 and GCS, if we do not remove the
# protocol, it will always return an empty array.
if storage_type == StorageType.OSS:
Expand Down Expand Up @@ -883,6 +906,8 @@ def _get_filesystem(self, actual_file_location: str):
fs = self._get_s3_filesystem()
elif storage_type == StorageType.OSS:
fs = self._get_oss_filesystem()
elif storage_type == StorageType.ABS:
fs = self._get_abs_filesystem()
else:
raise GravitinoRuntimeException(
f"Storage type: `{storage_type}` doesn't support now."
Expand Down Expand Up @@ -965,5 +990,29 @@ def _get_oss_filesystem(self):
endpoint=oss_endpoint_url,
)

def _get_abs_filesystem(self):
# get 'abs_account_name' from abs options, if the key is not found, throw an exception
abs_account_name = self._options.get(
GVFSConfig.GVFS_FILESYSTEM_AZURE_ACCOUNT_NAME
)
if abs_account_name is None:
raise GravitinoRuntimeException(
"ABS account name is not found in the options."
)

# get 'abs_account_key' from abs options, if the key is not found, throw an exception
abs_account_key = self._options.get(
GVFSConfig.GVFS_FILESYSTEM_AZURE_ACCOUNT_KEY
)
if abs_account_key is None:
raise GravitinoRuntimeException(
"ABS account key is not found in the options."
)

return importlib.import_module("adlfs").AzureBlobFileSystem(
account_name=abs_account_name,
account_key=abs_account_key,
)


fsspec.register_implementation(PROTOCOL_NAME, GravitinoVirtualFileSystem)
3 changes: 3 additions & 0 deletions clients/client-python/gravitino/filesystem/gvfs_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,6 @@ class GVFSConfig:
GVFS_FILESYSTEM_OSS_ACCESS_KEY = "oss_access_key_id"
GVFS_FILESYSTEM_OSS_SECRET_KEY = "oss_secret_access_key"
GVFS_FILESYSTEM_OSS_ENDPOINT = "oss_endpoint"

GVFS_FILESYSTEM_AZURE_ACCOUNT_NAME = "abs_account_name"
GVFS_FILESYSTEM_AZURE_ACCOUNT_KEY = "abs_account_key"
3 changes: 2 additions & 1 deletion clients/client-python/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,5 @@ pyarrow==15.0.2
cachetools==5.3.3
gcsfs==2024.3.1
s3fs==2024.3.1
ossfs==2023.12.0
ossfs==2023.12.0
adlfs==2023.12.0
Loading
Loading