-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
🎉 CDK: Added support for efficient parent/child streams using cache #6057
Conversation
self, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None | ||
) -> Iterable[Optional[Mapping[str, Any]]]: | ||
parent_stream_slices = self.parent.stream_slices( | ||
sync_mode=sync_mode, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unfortunately, we can't use the same sync_mode
, because in the case of incremental we might miss records from the child stream (if updating/creating child record doesn't update cursor_field
on parent record)
|
||
# iterate over all parent records with current stream_slice | ||
for record in parent_records: | ||
yield record |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yield record | |
yield {'parent': record} |
in the general case, the slice is a dict, because we might want to extend slices (slice of the slice, etc):
slice read by date and by parent record:
{
"date": "2020-10-10 03:00:00",
"parent": <parent_record>,
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
few comments
…-parent-child-streams # Conflicts: # airbyte-cdk/python/airbyte_cdk/sources/streams/http/http.py # airbyte-cdk/python/setup.py # airbyte-cdk/python/unit_tests/sources/streams/http/test_http.py
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Really good to see this getting added to CDK!
Couple of open questions around slicing.
sync_mode=sync_mode, | ||
cursor_field=cursor_field, | ||
stream_slice=stream_slice, | ||
stream_state=stream_state |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here we're making the read_records call on the parent stream using the substream's sync mode and state. Are there scenarios where this means we could miss relevant data if we're in incremental and have a recent state?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct, just fixed it
|
||
# iterate over all parent records with current stream_slice | ||
for record in parent_records: | ||
yield {"parent": record} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't guarantee time-order of slices right? We can assume that the iteration for stream_slice in parent_stream_slices:
is ordered but the records within that slice aren't guaranteed to iterate in order I don't think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that's right
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok now that you've changed above to full_refresh
this is safe because we're going to grab all parent records on every sync.
if self.use_cache: | ||
self.cache_file = self.request_cache() | ||
# we need this attr to get metadata about cassettes, such as record play count, all records played, etc. | ||
self.cass = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
WDYT?
self.cass = None | |
self.cassete = None |
/publish-cdk dry-run=false
|
What
Added the ability to use caching for efficient synchronization of nested streams.
How
The vcrpy library was used to add a caching mechanism.
HttpStream class:
A new class HttpSubStream.
This class should be used as the base class for "child" streams. There is a stream_slices method that gets the "parent" records from the cache files.