-
Notifications
You must be signed in to change notification settings - Fork 131
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[OPIK-38] Deduplicate items before inserting them in a dataset #340
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very nice work, nice abstraction in compute_content_hash
to being able to reuse it in different object types and thanks for adding test for it!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As discussed, this approach:
- Doesn't handle a concurrent scenario properly.
- The round trips affect scalability.
However, we discussed that we compromise on those and that it's good enought.
Generally the implementation is in the good direction, but one bug has gone through the cracks.
I recommend a follow-up PR to fix it, as it's low hanging fruit.
self._hash_to_id: Dict[str, str] = {} | ||
self._id_to_hash: Dict[str, str] = {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor: no need to use two dictionaries to track this, you just need to set to track the duplicated items. Anyway, it just makes the logic a bit less straight forward and uses more space.
# Remove duplicates if they already exist | ||
deduplicated_items = [] | ||
for item in items: | ||
item_hash = utils.compute_content_hash(item) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor: you don't need to compute a hash for this, you can just go with the content payload, assuming Python equality works correct with this particular payload.. This approach is valid anyway. It's just a matter of space vs computation.
if item_hash in self._hash_to_id: | ||
if item.id is None or self._hash_to_id[item_hash] == item.id: # type: ignore |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a bug here as basically two items with the same payload and different ID aren't deduped. Not a big deal, but it should be fixed in a follow-up PR.
def _sync_hashes(self) -> None: | ||
"""Updates all the hashes in the dataset""" | ||
LOGGER.debug("Start hash sync in dataset") | ||
all_items = self.get_all_items() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor: This call seemed to have worked well per your testing, which is great because it's internally paginated. In the future it might consume too much space, but this is fine and no action point so far.
assert len(inserted_items) == 1, "Only one item should be inserted" | ||
|
||
|
||
def test_insert_deduplication_with_different_items(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing the test for the same payload with different item id.
# Remove duplicates if they already exist | ||
deduplicated_items = [] | ||
for item in items: | ||
item_hash = utils.compute_content_hash(item) | ||
|
||
if item_hash in self._hash_to_id: | ||
if item.id is None or self._hash_to_id[item_hash] == item.id: # type: ignore | ||
LOGGER.debug( | ||
"Duplicate item found with hash: %s - ignored the event", | ||
item_hash, | ||
) | ||
continue | ||
|
||
deduplicated_items.append(item) | ||
self._hash_to_id[item_hash] = item.id # type: ignore | ||
self._id_to_hash[item.id] = item_hash # type: ignore |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would suggest extracting this code into a separate method. This would slightly reduce the method's complexity (McCabe/Cyclomatic Complexity) and improve testability by allowing you to test just the deduplication logic without needing to mock the Opik client or other dependencies.
content = item | ||
|
||
# Convert the dictionary to a JSON string with sorted keys for consistency | ||
json_string = json.dumps(content, sort_keys=True) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I’d like to point out that if the value for the key is a collection, such as an array or a set, and the order of elements is different, the items will be considered different even though their content is nearly identical. Though it might be not relevant to our use-cases.
* Implemented deduplication * Added documentation * Added update unit test * Add support for delete method * Fix linter * Fix python 3.8 tests
Details
The SDK has been updated so that items are not duplicated if they already exist in the dataset. This assumes that only one user is updating a dataset, we can move the hash_sync function to be called before each insert if we want to resolve this edge case but would come at a small performance cost:
Documentation
The documentation has been updated