Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingestion/s3): groupby group-splitting issue #12254

Conversation

eagle-25
Copy link
Contributor

@eagle-25 eagle-25 commented Jan 1, 2025

Summary

Fix the group-splitting issue caused by itertools.groupby when the input data is not ordered by the group key.

Changes

As-is

  • Use itertools.groupby for grouping. (group_key: s3_object.key)
  • Sort s3_objects before grouping. (sort_key: s3_object.last_modified)

To-be

  • Use defaultdict for grouping without pre-sorting.

Background

Behavior of itertools.groupby

  • itertools.groupby() generates a new group every time the value of the key function changes, which requires the input data to be sorted.

  • This means the input data should be sorted and the group key must be equal to the sort key for correct grouping.

    from itertools import groupby
    
    data = [1, 1, 2, 1]
    
    # Without sorting
    for key, val in groupby(data, key=lambda x: x):
        print(key, list(val))
    
    # --- output --- (Group Splitting)
    # 1 [1, 1] 
    # 2 [2]
    # 1 [1]
    
    # With sorting
    sorted_data = sorted(data, key=lambda x: x)
    
    for key, val in groupby(sorted_data, key=lambda x: x):
        print(key, list(val))
    
    # --- output ---
    # 1 [1, 1, 1]
    # 2 [2]

Problem part

  • in our code, the sorting order is changed to file.last_modified even though the group key is file.key. This mismatch causes group-splitting issues.
    # Current code
    files = sorted(files, key=lambda a: a.last_modified) 
    grouped_files = groupby(files, lambda x: x.key.rsplit("/", 1)[0])

Checklist

  • The PR conforms to DataHub's Contributing Guideline (particularly Commit Message Format)
  • Links to related issues (if applicable)
  • Tests for the changes have been added/updated (if applicable)
  • Docs related to the changes have been added/updated (if applicable). If a new feature has been added a Usage Guide has been added for the same.
  • For any breaking change/potential downtime/deprecation/big changes an entry has been made in Updating DataHub

@github-actions github-actions bot added ingestion PR or Issue related to the ingestion of metadata community-contribution PR or Issue raised by member(s) of DataHub Community labels Jan 1, 2025
files = list(
bucket.objects.filter(Prefix=f"{prefix_to_list}").page_size(PAGE_SIZE)
)
files = sorted(files, key=lambda a: a.last_modified)
Copy link
Contributor Author

@eagle-25 eagle-25 Jan 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there is no need to sort the s3_objects by last_modified, as the logic below iterates through all files within the same folder and determines the maximum value

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds fair to me. @treff7es , since you were involved in the development here, are you okay with this?

@@ -139,6 +139,14 @@ def partitioned_folder_comparator(folder1: str, folder2: str) -> int:
return 1 if folder1 > folder2 else -1


def _group_s3_objects_by_dirname(s3_objects: Any) -> Dict[str, List[Any]]:
grouped_objects = defaultdict(list)
Copy link
Contributor Author

@eagle-25 eagle-25 Jan 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used defaultdict for grouping to eliminate the need to consider the order of the collection.

@eagle-25 eagle-25 force-pushed the fix/ingestion/s3/groupby-group-splitting-issue branch from cc6f3b8 to 57d76cb Compare January 1, 2025 10:57
@eagle-25 eagle-25 force-pushed the fix/ingestion/s3/groupby-group-splitting-issue branch from 57d76cb to 52bb264 Compare January 1, 2025 11:24
@eagle-25 eagle-25 marked this pull request as ready for review January 1, 2025 11:25
@datahub-cyborg datahub-cyborg bot added the needs-review Label for PRs that need review from a maintainer. label Jan 1, 2025
@eagle-25 eagle-25 force-pushed the fix/ingestion/s3/groupby-group-splitting-issue branch from 52bb264 to bef800f Compare January 1, 2025 14:10
@eagle-25 eagle-25 force-pushed the fix/ingestion/s3/groupby-group-splitting-issue branch from bef800f to 4019f41 Compare January 4, 2025 10:15
@eagle-25 eagle-25 force-pushed the fix/ingestion/s3/groupby-group-splitting-issue branch from 4019f41 to f12a066 Compare January 4, 2025 17:21
Comment on lines +321 to +305
assert len(res) == 2
assert res[0].sample_file == "s3://my-bucket/my-folder/dir1/0002.csv"
assert res[1].sample_file == "s3://my-bucket/my-folder/dir2/0001.csv"
Copy link
Contributor Author

@eagle-25 eagle-25 Jan 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfixed code returns

len(res): 3
res[0].sample_file: s3://my-bucket/my-folder/dir1/0001.csv
res[1].sample_file: s3://my-bucket/my-folder/dir2/0001.csv
res[2].sample_file: s3://my-bucket/my-folder/dir1/0002.csv

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, nice catch!

def _group_s3_objects_by_dirname(s3_objects: Any) -> Dict[str, List[Any]]:
grouped_objects = defaultdict(list)
for obj in s3_objects:
dirname = obj.key.rsplit("/", 1)[0]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The _group_s3_objects_by_dirname function could be placed in either s3_util.py or s3_boto_utils.py within metadata-ingestion/src/datahub/ingestion/source/aws.

To keep things as generic as possible, we should consider what happens if there's no directory (i.e., no /) in the key. In such cases, we could include the file in the group None or /. WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, regarding s3_objects: Any, could you specify a more precise type?

Copy link
Contributor

@sgomezvillamor sgomezvillamor Jan 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar functionality existed in the past
https://github.com/datahub-project/datahub/pull/4011/files

def groupby_unsorted(
    iterable: Iterable[T], key: Callable[[T], K]
) -> Iterable[Tuple[K, Iterable[T]]]:

Yet another example to keep this logic as generic as possible

Copy link
Contributor Author

@eagle-25 eagle-25 Jan 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sgomezvillamor

Thank you for the detailed review. I’ll make the changes you suggested and request your feedback again.

Regarding your question, here’s my response:

To keep things as generic as possible, we should consider what happens if there's no directory (i.e., no /) in the key. In such cases, we could include the file in the group None or /. WDYT?

-> I think / is the better choice because it intuitively represents the root directory, whereas it's harder to understand the meaning of None in this context. I'll add the necessary logic and tests for this case. Thanks for pointing this out. 👍

The _group_s3_objects_by_dirname function could be placed in either s3_util.py or s3_boto_utils.py within metadata-ingestion/src/datahub/ingestion/source/aws.

-> Sounds good. I'll move it to s3_util.py

Also, regarding s3_objects: Any, could you specify a more precise type?

-> Okay, I'll specify the types with mypy_boto3_s3.

Copy link
Contributor

@sgomezvillamor sgomezvillamor left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM
Let's double confirm with @treff7es 's review, if possible

Comment on lines +321 to +305
assert len(res) == 2
assert res[0].sample_file == "s3://my-bucket/my-folder/dir1/0002.csv"
assert res[1].sample_file == "s3://my-bucket/my-folder/dir2/0001.csv"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, nice catch!

@datahub-cyborg datahub-cyborg bot added pending-submitter-response Issue/request has been reviewed but requires a response from the submitter and removed needs-review Label for PRs that need review from a maintainer. labels Jan 7, 2025
@datahub-cyborg datahub-cyborg bot added pending-submitter-response Issue/request has been reviewed but requires a response from the submitter and removed needs-review Label for PRs that need review from a maintainer. labels Jan 8, 2025
@eagle-25 eagle-25 force-pushed the fix/ingestion/s3/groupby-group-splitting-issue branch from 20c8acc to b323dc1 Compare January 9, 2025 01:49
@eagle-25 eagle-25 force-pushed the fix/ingestion/s3/groupby-group-splitting-issue branch from b323dc1 to 96bb991 Compare January 9, 2025 02:06
@eagle-25
Copy link
Contributor Author

eagle-25 commented Jan 9, 2025

Changes

  • change a function name (group_s3_objects_by_dirname)

@datahub-cyborg datahub-cyborg bot added needs-review Label for PRs that need review from a maintainer. and removed pending-submitter-response Issue/request has been reviewed but requires a response from the submitter labels Jan 9, 2025
Copy link
Contributor

@sgomezvillamor sgomezvillamor left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM
Thanks for the contrib!

@datahub-cyborg datahub-cyborg bot added merge-pending-ci A PR that has passed review and should be merged once CI is green. and removed needs-review Label for PRs that need review from a maintainer. labels Jan 9, 2025
@eagle-25 eagle-25 force-pushed the fix/ingestion/s3/groupby-group-splitting-issue branch from 96bb991 to 61ffdcb Compare January 9, 2025 14:27
@eagle-25 eagle-25 force-pushed the fix/ingestion/s3/groupby-group-splitting-issue branch from 61ffdcb to 732fa19 Compare January 10, 2025 01:42
Copy link

codecov bot commented Jan 10, 2025

Codecov Report

All modified and coverable lines are covered by tests ✅

Files with missing lines Coverage Δ
...estion/src/datahub/ingestion/source/aws/s3_util.py 91.66% <100.00%> (+1.92%) ⬆️
...ngestion/src/datahub/ingestion/source/s3/source.py 86.82% <100.00%> (-0.11%) ⬇️

... and 636 files with indirect coverage changes


Continue to review full report in Codecov by Sentry.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 3b827f3...2412ab9. Read the comment docs.

@eagle-25
Copy link
Contributor Author

@sgomezvillamor Could you please merge this PR? 🙏

@eagle-25 eagle-25 force-pushed the fix/ingestion/s3/groupby-group-splitting-issue branch from 732fa19 to ad69996 Compare January 10, 2025 06:16
@sgomezvillamor sgomezvillamor merged commit d8e7cb2 into datahub-project:master Jan 10, 2025
192 checks passed
llance pushed a commit to llance/datahub that referenced this pull request Jan 13, 2025
Co-authored-by: Sergio Gómez Villamor <sgomezvillamor@gmail.com>
llance pushed a commit to llance/datahub that referenced this pull request Jan 13, 2025
Co-authored-by: Sergio Gómez Villamor <sgomezvillamor@gmail.com>
brock-acryl pushed a commit to brock-acryl/datahub that referenced this pull request Jan 30, 2025
Co-authored-by: Sergio Gómez Villamor <sgomezvillamor@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
community-contribution PR or Issue raised by member(s) of DataHub Community ingestion PR or Issue related to the ingestion of metadata merge-pending-ci A PR that has passed review and should be merged once CI is green.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants