-
Notifications
You must be signed in to change notification settings - Fork 4.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
🎉Source Hubspot: Add contacts associations to Deals stream. #5693
🎉Source Hubspot: Add contacts associations to Deals stream. #5693
Conversation
@marcosmarxm Hello Marc, will you look at this?? |
"to": { | ||
"type": ["null", "array"], | ||
"items": { | ||
"type": ["null", "Object"], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"type": ["null", "Object"], | |
"type": ["null", "object"], |
@@ -14,5 +14,5 @@ RUN pip install . | |||
|
|||
ENV AIRBYTE_ENTRYPOINT "/airbyte/base.sh" | |||
|
|||
LABEL io.airbyte.version=0.1.10 | |||
LABEL io.airbyte.version=0.1.11 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You need to bump to 0.1.12
also get latest code from master because yesterday was merged a change to Hubspot connector.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vladimir-remar please bump the versions in all required places, to the 0.1.12
thanks @vladimir-remar I made a few suggestions and request the review! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great addition to the connector, added comments and questions around reusing existing parts of code. Looks promising!
self._endpoint = endpoint | ||
|
||
def _transform(self, records: Iterable) -> Iterable: | ||
"""Preprocess record """ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add more description in docstring, why we need this method just passing the records.
yield record | ||
|
||
def _filter_old_records(self, records: Iterable) -> Iterable: | ||
"""Skip """ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as comment above.
yield record | ||
|
||
|
||
class DealToContactAssociationsStream(CRMAssociationStream): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is useful part of the connector, however I have a question around reusing the parts:
- we have
class CRMObjectStream(Stream)
which should provide the necessary functionality for the flat associations for other HubSpot objects, have you tried to reuse it for your stream? Were there any problems of using that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hello @bazarnov, thanks for the comment and the advices, you are right the class CRMObjectStream(Stream)
work more than fine, it should be something like this:
class DealToContactAssociationsStream(CRMObjectStream):
entity = "deal"
associations = ["contacts"]
Now my question is, how to do a right json schema, since it's almost the same as the deals schema, is it good to duplicate it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can always go with
$ref: schema.json
Inside of your_stream_schema.json
But if you want you can duplicate it, not a big deal.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hello @bazarnov, thanks for the comment and the advices, you are right the
class CRMObjectStream(Stream)
work more than fine, it should be something like this:class DealToContactAssociationsStream(CRMObjectStream): entity = "deal" associations = ["contacts"]
Now my question is, how to do a right json schema, since it's almost the same as the deals schema, is it good to duplicate it?
Sounds great, can wee proceed with reusing those parts, instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I'm gonna update the pr with the new changes.
|
||
def _filter_old_records(self, records: Iterable) -> Iterable: | ||
"""Skip """ | ||
for record in records: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe you can simply:
yield from records
in this case, no need for the loop.
|
||
def _transform(self, records: Iterable) -> Iterable: | ||
"""Preprocess record """ | ||
for record in records: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yield from records in this case, no need for the loop.
@@ -14,5 +14,5 @@ RUN pip install . | |||
|
|||
ENV AIRBYTE_ENTRYPOINT "/airbyte/base.sh" | |||
|
|||
LABEL io.airbyte.version=0.1.10 | |||
LABEL io.airbyte.version=0.1.11 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vladimir-remar please bump the versions in all required places, to the 0.1.12
Thanks for fast push! Please, make sure your local acceptance-tests are passing as expected |
Well, It seems something is wrong with the tests, I attach the logs. This command work as expected These ones not
|
According to the logs, you need to increase the
After this, please run the following command from Airbyte root: If command this is successful:
If command is failed: |
@bazarnov I did the gradlew but it keeps failing |
Make sure:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
version
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
version
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
version
@vladimir-remar sorry to ask again, but you need to bump the version again. After that we're ready to merge this! |
I bump the versions. |
@vladimir-remar can you check your latest commit? |
|
"""Entity URL""" | ||
return f"/crm/v3/associations/{self._relationship_from}/{self._relationship_to}/{self._endpoint}" | ||
|
||
def __init__(self, relationship_from: str = None, relationship_to: str = None, endpoint: str = None, **kwargs): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I recommend the following ordering convention:
- properties
- init
- abstract methods
- concrete methods
@@ -636,6 +636,14 @@ def list(self, fields) -> Iterable: | |||
yield record | |||
|
|||
|
|||
class DealToContactAssociationsStream(CRMObjectStream): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vladimir-remar it seems better from a UX standpoint to include the deal-to-contact association as a field on the deals
stream. This way in the destination it will be normalized to a deal_contacts
table. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @sherifnada thanks for the review, about the suggestion make sense for me, I will include the field as "associations", since "contacts" are one of them , for me it would be something like this.
class DealStream(CRMObjectStream):
"""Deals, API v3"""
def __init__(self, associations: List[str] = None, **kwargs):
super().__init__(entity="deal", associations=associations, **kwargs)
self._stage_history = DealStageHistoryStream(**kwargs)
def list(self, fields) -> Iterable:
history_by_id = {}
for record in self._stage_history.list(fields):
if all(field in record for field in ("id", "dealstage")):
history_by_id[record["id"]] = record["dealstage"]
for record in super().list(fields):
if record.get("id") and int(record["id"]) in history_by_id:
record["dealstage"] = history_by_id[int(record["id"])]
yield record
and I will call it in client.py
"deal_to_contact_associations": DealStream(associations=["contacts"], **common_params),
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry I was a little unclear: why wouldn't we include this by default in the Deals
stream instead of a separate deals_to_contacts_associations
? That would be my first inclination although there may be a reason why that's not favorable.
CRMObjectStream
already accepts associations
as input so you actually don't need to change anything about DealStream
, you only need to change the calling context to initialize it with associations=['contacts']
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hello @sherifnada and thanks again.
On the first question, my first approach was to replicate the functionality of the Deals Streams instead of modifying it, since associations were not included by default in the Deals stream, I did it thinking about the possible inclusion of the remaining associations. Why it was not included, I really do not know, perhaps to obtain the values of the associations we do not need the history of the deals or because the rest of the associations do not generate a good output, I do not know the result of the iteration with the rest of the associations Maybe someone on the team can answer that question.
Finally calling CRMObjectStream like this "deal_to_contact_associations": CRMObjectStream(entity="deal", associations=['contacts'], **common_params)
would also get the response.
So my question is, at this point what it is the good approach to implement the associations in the current streams?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vladimir-remar that approach (including associations=['contacts']) is the one I'd recommend!
…o associations in Deal stream
…RMObjectStream to deal_to_contact_associations
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vladimir-remar can you update the schema for deals to include the contacts association if it's not already there and remove the newly added schema (which is no longer needed if we say associations=['contacts']
)? We should be good to go afterwards
and also update the PR title since we're no longer adding a new stream :)
What
Add the contacts association
How
By adding to Deals stream the contacts associations.
Pre-merge Checklist
Community member or Airbyter
airbyte_secret
./gradlew :airbyte-integrations:connectors:<name>:integrationTest
.README.md
docs/integrations/<source or destination>/<name>.md
including changelog. See changelog exampleAirbyter
If this is a community PR, the Airbyte engineer reviewing this PR is responsible for the below items.
/test connector=connectors/<name>
command is passing./publish
command described here