-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Source Hubspot: add incremental streams #2425
Source Hubspot: add incremental streams #2425
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
/test connector=source-hubspot
|
/test connector=source-hubspot
|
/test connector=source-hubspot
|
}, | ||
"updatedAt": { | ||
"type": "string" | ||
}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the cursor_field
and default_cursor_field
should be set on any stream that is using incremental. Is discover
not outputing a default cursor field?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but if source_defined_cursor
is True
, why cursor_field
should be in catalog? I thought this one is coming from user, the same question about default_cursor_field
, I thought this one used in case source_defined_cursor
is False
and user didn't provide the cursor_field
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the current implementation of the BaseClient returns only JSON schema
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cursor_field
is used by destinations when deduping records. For example, if the records user=eugene,score=300,updatedAt=1
and user=eugene,score=200,updatedAt=2
appear in the destination, assuming user
is the primary key then we need to know what field to dedupe on i.e: cursor field
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will also be used by standard tests to fix the incremental problem you are running into by verifying all records have cursor value >= state
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but these are two completely different use cases, I don't see how this will work in case there is no primary_key
or primary_key
is composite (multiple keys).
update_at
/bookmark_field
can't be used instead of primary_key
in many cases because it means a different thing (could have duplicates, etc).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@@ -98,16 +98,15 @@ | |||
private Set<String> IMAGES_TO_SKIP_SECOND_INCREMENTAL_READ = Sets.newHashSet( | |||
"airbyte/source-intercom-singer", | |||
"airbyte/source-exchangeratesapi-singer", | |||
"airbyte/source-hubspot-singer", | |||
"airbyte/source-hubspot", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the reason we include this here is because the cursor comparison logic is inclusive?
how can we verify that incremental is working? can we compare cursor values in a custom test?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes it is inclusive as we use startTimestamp
param to filter records
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we could verify that we don't produce records older than the state value in custom test
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we do that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have added a custom test for incremental, unfortunately, subscription_changes
stream doesn't have data, and all my tries to subscribe and unsubscribe from emails didn't trigger any event there. I remember I was using demo credentials to test development, but obviously, demo creds can't be used in the test, as they mostly give you a random response
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no problem.
def read(self, getter: Callable, params: Mapping[str, Any] = None) -> Iterator: | ||
"""Apply state filter to set of records, update cursor(state) if necessary in the end""" | ||
latest_cursor = None | ||
for record in self.read_chunked(getter, params): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we read chunked instead of just paginating over a single request on the entire date range? (i assume it's for a good reason, just not obvious. -- can you leave in a comment?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to track state, there is no guarantee that returned records sorted in ascending order. Having exact boundary we could always ensure we don't miss records between states. In the future, if we would like to save the state more often we can do this every batch
@@ -300,7 +365,7 @@ class CRMObjectStream(Stream): | |||
|
|||
entity: Optional[str] = None | |||
associations: List[str] = [] | |||
updated_field = "updatedAt" | |||
updated_at_fields = ["updatedAt", "createdAt"] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why are we doing this? this is really dangerous because we are not using a consistent field for the cursor value. This means we could miss records.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we use fallback because updatedAt
can be null if there is no updates yet
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nvm -- thought more about it. this makes sense. created_at is always <= updated_at, and as long as we only persist state at the very end of syncing a stream then we should be fine
@keu lmk when this is ready for another look |
* #2150 Issue: created native connector with schema folder populated * #2150 Issue: make format code * first version * fix few issues * fix issues * fix read issue * format * docs * docker tags * extend configured catalog for testing * fix source definitions * format * fix call rate issue, add backoff for retry after * add general backoff * write secrets for new connector * drop singer connector registration * refactor streams, resolve properties in schemas at runtime * replace deprecated endpoint for company contacts * replace deprecated pipeline endpoint * update comments * update docs * fix typo * fix stream contact lists * fix pagination and forms result fetching * fix health_check * format and update catalog * revert changes * drop singer based hubspot * fix company contacts substream * move deals to separate test * fix deals tests * remove dynamic fields from records * move deals to catalog again * extend CRMObjectStream with associations * format * update schemas with updated field, change engagement layout * fix Campaign stream * remove custom tests * remove dependency * remove oauth * Source Hubspot: add incremental streams (#2425) * add incremental * add incremental * polishing * update docs * fix docstring * clean up * fix incremental bookmark access * fix incremental tests * clean up * add custom test for incremental, improve logging * format * Update airbyte-integrations/connectors/source-hubspot/source_hubspot/api.py Co-authored-by: Eugene Kulak <kulak.eugene@gmail.com> Co-authored-by: Sherif A. Nada <snadalive@gmail.com> * Source Hubspot: best practices (#2537) * fix error reporting and add unit tests * fix test and refactor cursor fields * format Co-authored-by: Eugene Kulak <kulak.eugene@gmail.com> * restored configured_catalog.json Co-authored-by: ykurochkin <y.kurochkin@zazmic.com> Co-authored-by: Eugene Kulak <widowmakerreborn@gmail.com> Co-authored-by: Sherif A. Nada <snadalive@gmail.com>
What
#2222
Describe what the change is solving
It helps to add screenshots if it affects the frontend.
How
Describe the solution
Pre-merge Checklist
Recommended reading order
test.java
component.ts