Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest/s3): Partition support #11083

Merged
merged 31 commits into from
Aug 22, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
1bb1626
Initial commit for s3 partition support
treff7es Aug 2, 2024
7774e5f
Merge branch 'master' into s3_partition_support
treff7es Aug 2, 2024
7db9079
black linting
treff7es Aug 2, 2024
e8d6f3b
Merge branch 'master' into s3_partition_support
treff7es Aug 5, 2024
45efdb7
Changing enum to check if doc build succeeds
treff7es Aug 5, 2024
664bcd7
Update to use the latest model
treff7es Aug 5, 2024
af6df2a
Update golden files
treff7es Aug 5, 2024
6f1de05
Update golden files
treff7es Aug 5, 2024
05a132b
Stabilizing s3 integration tests
treff7es Aug 5, 2024
34f629b
Updating golden again
treff7es Aug 5, 2024
c247299
Updating goldens again
treff7es Aug 5, 2024
35bc791
Fix formatting
treff7es Aug 5, 2024
8d20952
Merge branch 'master' into s3_partition_support
treff7es Aug 5, 2024
45ee511
- Adding option to disable partition aspect generation for backward c…
treff7es Aug 5, 2024
f51a3e9
Black formatting
treff7es Aug 5, 2024
20c54e2
Update doc
treff7es Aug 5, 2024
e401ee9
Merge branch 'master' into s3_partition_support
treff7es Aug 5, 2024
56e14e2
Fix typos
treff7es Aug 5, 2024
a4cbdb9
Addressing pr review comments
treff7es Aug 12, 2024
d5aee9d
Merge branch 'master' into s3_partition_support
treff7es Aug 12, 2024
b0c8799
Merge branch 'master' into s3_partition_support
treff7es Aug 13, 2024
4d6502a
Fix linter issues
treff7es Aug 13, 2024
cb56197
Update metadata-ingestion/docs/sources/s3/s3.md
treff7es Aug 21, 2024
ab36e00
Update metadata-ingestion/src/datahub/ingestion/source/data_lake_comm…
treff7es Aug 21, 2024
662fa52
Update metadata-ingestion/src/datahub/ingestion/source/data_lake_comm…
treff7es Aug 21, 2024
12d6150
Update metadata-ingestion/src/datahub/ingestion/source/s3/source.py
treff7es Aug 21, 2024
a07216e
Update metadata-ingestion/src/datahub/ingestion/source/s3/source.py
treff7es Aug 21, 2024
e8c4f74
Update metadata-ingestion/docs/sources/s3/s3.md
treff7es Aug 21, 2024
b929f6a
Addressing pr review comments
treff7es Aug 21, 2024
ee2b827
Merge branch 'master' into s3_partition_support
treff7es Aug 21, 2024
0ce1174
Update golden files
treff7es Aug 21, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 23 additions & 4 deletions metadata-ingestion/docs/sources/s3/s3.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,31 @@

Path Specs (`path_specs`) is a list of Path Spec (`path_spec`) objects where each individual `path_spec` represents one or more datasets. Include path (`path_spec.include`) represents formatted path to the dataset. This path must end with `*.*` or `*.[ext]` to represent leaf level. If `*.[ext]` is provided then files with only specified extension type will be scanned. "`.[ext]`" can be any of [supported file types](#supported-file-types). Refer [example 1](#example-1---individual-file-as-dataset) below for more details.

All folder levels need to be specified in include path. You can use `/*/` to represent a folder level and avoid specifying exact folder name. To map folder as a dataset, use `{table}` placeholder to represent folder level for which dataset is to be created. For a partitioned dataset, you can use placeholder `{partition_key[i]}` to represent name of `i`th partition and `{partition[i]}` to represent value of `i`th partition. During ingestion, `i` will be used to match partition_key to partition. Refer [example 2 and 3](#example-2---folder-of-files-as-dataset-without-partitions) below for more details.
All folder levels need to be specified in include path. You can use `/*/` to represent a folder level and avoid specifying exact folder name. To map folder as a dataset, use `{table}` placeholder to represent folder level for which dataset is to be created. For a partitioned dataset, you can use placeholder `{partition_key[i]}` to represent name of `i`th partition and `{partition_value[i]}` to represent value of `i`th partition. During ingestion, `i` will be used to match partition_key to partition. Refer [example 2 and 3](#example-2---folder-of-files-as-dataset-without-partitions) below for more details.

Exclude paths (`path_spec.exclude`) can be used to ignore paths that are not relevant to current `path_spec`. This path cannot have named variables ( `{}` ). Exclude path can have `**` to represent multiple folder levels. Refer [example 4](#example-4---folder-of-files-as-dataset-with-partitions-and-exclude-filter) below for more details.

Refer [example 5](#example-5---advanced---either-individual-file-or-folder-of-files-as-dataset) if your bucket has more complex dataset representation.


**Additional points to note**
- Folder names should not contain {, }, *, / in their names.
- Named variable {folder} is reserved for internal working. please do not use in named variables.

#### Partitioned Dataset support
If your dataset is partitioned by the `partition_key`=`partition_value` format, then the partition values are auto-detected.

Otherwise, you can specify partitions in the following way in the path_spec:
1. Specify partition_key and partition_value in the path like => {partition_key[0]}={partition_value[0]}{partition_key[1]}={partition_value[1]}{partition_key[2]}={partition_value[2]}
treff7es marked this conversation as resolved.
Show resolved Hide resolved
2. Partition key can be specify using named variables in the path_spec like => year={year}month={month}day={day}
treff7es marked this conversation as resolved.
Show resolved Hide resolved
3 if the path is in the form of /value1/value2/value3 the source infer partition value from the path and assign partition_0, partition_1, partition_2 etc

Dataset creation time is determined by the creation time of earliest created file in the lowest partition while last updated time is determined by the last updated time of the latest updated file in the highest partition.

How the source determines the highest/lowest partition it is based on the traversal method set in the path_spec.
- If the traversal method is set to `MAX` then the source will try to find the latest partition by ordering the partitions each level and find the latest partiton. This traversal method won't look for earilest partition/creation time but this is the fastest.
- If the traversal method is set to `MIN_MAX` then the source will try to find the latest and earliest partition by ordering the partitions each level and find the latest/earliest partiton. This traversal sort folders purely by name therefor it is fast but it doesn't guarantee the latest partition will have the latest created file.
- If the traversal method is set to `ALL` then the source will try to find the latest and earliest partition by listing all the files in all the partitions and find the creation/last modification time based on the file creations. This is the slowest but for non time partitioned datasets this is the only way to find the latest/earliest partition.

### Path Specs - Examples
#### Example 1 - Individual file as Dataset
Expand Down Expand Up @@ -73,7 +88,7 @@ test-bucket
Path specs config to ingest folders `orders` and `returns` as datasets:
```
path_specs:
- include: s3://test-bucket/{table}/{partition_key[0]}={partition[0]}/{partition_key[1]}={partition[1]}/*.parquet
- include: s3://test-bucket/{table}/{partition_key[0]}={partition_value[0]}/{partition_key[1]}={partition_value[1]}/*.parquet
```
or with partition auto-detection:
```
Expand Down Expand Up @@ -104,11 +119,15 @@ test-bucket
Path specs config to ingest folder `orders` as dataset but not folder `tmp_orders`:
```
path_specs:
- include: s3://test-bucket/{table}/{partition_key[0]}={partition[0]}/{partition_key[1]}={partition[1]}/*.parquet
- include: s3://test-bucket/{table}/{partition_key[0]}={partition_value[0]}/{partition_key[1]}={partition_value[1]}/*.parquet
exclude:
- **/tmp_orders/**
```

or with partition auto-detection:
```
path_specs:
- include: s3://test-bucket/{table}/
```

#### Example 5 - Advanced - Either Individual file OR Folder of files as Dataset

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,8 +233,10 @@ def get_parsable_include(cls, include: str) -> str:
return parsable_include

def get_named_vars(self, path: str) -> Union[None, parse.Result, parse.Match]:
if self.autodetect_partitions and self.include.endswith("{table}/**"):
if self.include.endswith("{table}/**"):
# If we have a partial path with ** at the end, we need to truncate the path to parse correctly
# parse needs to have exact number of folders to parse correctly and in case of ** we don't know the number of folders
# so we need to truncate the path to the last folder before ** to parse and get named vars correctly
splits = len(self.include[: self.include.find("{table}/")].split("/"))
treff7es marked this conversation as resolved.
Show resolved Hide resolved
path = "/".join(path.split("/", splits)[:-1]) + "/"

Expand Down Expand Up @@ -358,23 +360,45 @@ def extract_variable_names(self):
return matches

def get_partition_from_path(self, path: str) -> Optional[List[Tuple[str, str]]]:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add some description and examples here, as to what input would generate what output ?

# Automatic partition detection supports four methods to get partiton keys and values from path:
# Let's say we have the following path => year=2024/month=10/day=11 for this example you can specify the following path spec expressions:
# 1. User can specify partition_key and partition_value in the path like => {partition_key[0]}={partition_value[0]}{partition_key[1]}={partition_value[1]}{partition_key[2]}={partition_value[2]}
treff7es marked this conversation as resolved.
Show resolved Hide resolved
# 2. User can specify only partition key and the partition key will be used as partition name like => year={year}month={month}day={day}
treff7es marked this conversation as resolved.
Show resolved Hide resolved
# 3. You omit specifying anything and it will detect partiton key and value based on the equal signs (this only works if partitioned are specified in the key=value way.
# 4. if the path is in the form of /value1/value2/value3 we infer it from the path and assign partition_0, partition_1, partition_2 etc

partition_keys: List[Tuple[str, str]] = []
if self.include.find("{table}/"):
named_vars = self.get_named_vars(path)
if named_vars:
# If user has specified partition_key and partition_value in the path_spec then we use it to get partition keys
if (
"partition_key" in named_vars.named
and "partition_value" in named_vars.named
and len(named_vars.named["partition_key"])
== len(named_vars.named["partition_value"])
if "partition_key" in named_vars.named and (
(
"partition_value" in named_vars.named
and len(named_vars.named["partition_key"])
== len(named_vars.named["partition_value"])
)
or (
"partition" in named_vars.named
and len(named_vars.named["partition_key"])
== len(named_vars.named["partition"])
)
):
for key in named_vars.named["partition_key"]:
if key in named_vars.named["partition_value"]:
# We need to support both partition_value and partition as both were in our docs
if (
"partition_value" in named_vars
and key in named_vars.named["partition_value"]
) or (
"partition" in named_vars
and key in named_vars.named["partition"]
):
partition_keys.append(
(
named_vars.named["partition_key"][key],
named_vars.named["partition_value"][key],
named_vars.named["partition_value"][key]
if "partition_value" in named_vars.named
else named_vars.named["partition"][key],
)
)
return partition_keys
Expand All @@ -387,20 +411,27 @@ def get_partition_from_path(self, path: str) -> Optional[List[Tuple[str, str]]]:
partition_vars = self.extract_variable_names
if partition_vars:
for partition_key in partition_vars:
treff7es marked this conversation as resolved.
Show resolved Hide resolved
[key, index] = partition_key.strip("]").split("[")

if key in named_vars.named:
if index and index in named_vars.named[key]:
pkey: str = partition_key
index: Optional[int] = None
# We need to recreate the key and index from the partition_key
if partition_key.find("[") != -1:
pkey, index = partition_key.strip("]").split("[")
else:
pkey = partition_key
index = None

if pkey in named_vars.named:
if index and index in named_vars.named[pkey]:
partition_keys.append(
(partition_key, named_vars.named[key][index])
(f"{pkey}_{index}", named_vars.named[pkey][index])
)
else:
partition_keys.append(
(partition_key, named_vars.named[partition_key])
)
return partition_keys

# If user has not specified partition_key and partition_value in the path_spec then we use the default mechanism to get partition keys
# If user did not specified partition_key and partition_value in the path_spec then we use the default mechanism to get partition keys
if len(self.include.split("{table}/")) == 2:
num_slash = len(self.include.split("{table}/")[0].split("/"))
partition = path.split("/", num_slash)[num_slash]
Expand Down Expand Up @@ -526,4 +557,4 @@ def extract_table_name_and_path(self, path: str) -> Tuple[str, str]:
table_path = (
"/".join(path.split("/")[:depth]) + "/" + parsed_vars.named["table"]
)
return self._extract_table_name(parsed_vars.named), table_path
return self._extract_table_name(parsed_vars.named), table_path
5 changes: 0 additions & 5 deletions metadata-ingestion/src/datahub/ingestion/source/s3/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,6 @@ class DataLakeSourceConfig(
description="Whether or not to create tags in datahub from the s3 object",
)

get_all_partitions: bool = Field(
default=False,
description="Whether to list all partitions in the table, or only the latest",
)

# Whether to update the table schema when schema in files within the partitions are updated
_update_schema_on_partition_file_updates_deprecation = pydantic_field_deprecated(
"update_schema_on_partition_file_updates",
Expand Down
Loading