-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Step 2: Add Namespace to Airbyte Protocol #2228
Conversation
When we make changes to the Airbyte Protocol, old source connectors are incompatible with newer versions of the protocol: If I run a connector with a version from before my change to the protocol where I added the
How should I deal with this?
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks really cool!
...t-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDestinationTest.java
Outdated
Show resolved
Hide resolved
airbyte-protocol/models/src/main/java/io/airbyte/protocol/models/CatalogHelpers.java
Outdated
Show resolved
Hide resolved
@@ -34,9 +34,34 @@ | |||
*/ | |||
public class CatalogConverter { | |||
|
|||
private static io.airbyte.api.model.AirbyteStreamName toApi(final String name, final io.airbyte.protocol.models.AirbyteStreamName streamName) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
streamname should be the first parameter since it is the one that will remain.
Can you add a Preconditions that checks that only one of the two is set? (assuming this is correct)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for the moment both are set, sometimes the StreamName object could be null (if it was serialized before introducing the namespace field)
airbyte-server/src/main/java/io/airbyte/server/converters/CatalogConverter.java
Outdated
Show resolved
Hide resolved
@@ -281,6 +283,7 @@ public void testDiscoverSourceSchema() throws ApiException { | |||
.build()); | |||
final AirbyteStream stream = new AirbyteStream() | |||
.name(STREAM_NAME) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we unset this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, this is required as we are currently backward compatible and everything keeps working as it was before.
The new AirbyteStreamName field is not actually influencing the stream names for the moment
I created this issue to remove such methods usage here though: #2240
...yte-workers/src/test/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteSourceTest.java
Outdated
Show resolved
Hide resolved
…AM_NAMESPACE, STREAM_NAME)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good! can you also get a review from someone else in the team before you merge?
private static final String STREAM_NAME = "dbo.id_and_name"; | ||
private static final String STREAM_NAMESPACE = "dbo"; | ||
private static final String TABLE_NAME = "id_and_name"; | ||
private static final String STREAM_NAME = STREAM_NAMESPACE + "." + TABLE_NAME; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't the stream name just be tablename?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The previous STREAM_NAME was "dbo.id_and_name"
We didn't introduce big changes on how things work so the stream name is still the same as before (but now it is also just "parsed" and split in two in the new AirbyteStreamName struct)
@@ -79,10 +123,15 @@ | |||
final List<io.airbyte.api.model.AirbyteStreamAndConfiguration> streams = catalog.getStreams() | |||
.stream() | |||
.map(configuredStream -> { | |||
final String streamName; | |||
if (configuredStream.getStream().getStreamName() != null) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT You're missing {
@@ -79,10 +123,15 @@ | |||
final List<io.airbyte.api.model.AirbyteStreamAndConfiguration> streams = catalog.getStreams() | |||
.stream() | |||
.map(configuredStream -> { | |||
final String streamName; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this variable name is super confusing. I think you should just call it name
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this seems like it should work okay (as long as all of the tests for the jdbc sources are passing).
I am not convinced that adding the AirbyteStreamName
adds anything over just adding a namespace
field. It seems like the extra object is kinda a headache. I think it would also be easier from a backwards compatibility point of view.
@@ -127,6 +127,9 @@ definitions: | |||
properties: | |||
name: | |||
type: string | |||
description: Stream's name (deprecated, will be replaced by stream_name). | |||
stream_name: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would it be simpler if instead of adding this AirbyteStreamName
object, you just added namespace as a property to this top level object? then we don't need to deprecate name
? it also saves us having to ever have name and stream_name at the same time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i got to think it makes the backwards compatibility story way simpler too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in my last PR, @michel-tricot was mentioning this: #1993 (comment)
Did I misunderstand his comment?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
kk. i don't think you misunderstood. up to you.
@@ -134,7 +134,9 @@ public AirbyteCatalog discover(JsonNode config) throws Exception { | |||
Optional.ofNullable(config.get("database")).map(JsonNode::asText), | |||
Optional.ofNullable(config.get("schema")).map(JsonNode::asText)) | |||
.stream() | |||
.map(t -> CatalogHelpers.createAirbyteStream(t.getName(), t.getFields()) | |||
.map(t -> CatalogHelpers.createAirbyteStream(CatalogHelpers.createAirbyteStreamName(t.getSchemaName(), t.getName()), t.getFields()) | |||
// TODO: Switch fully to StreamName instead of temporarily setName() for backward compatibility |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you add your github user name, or something else identifying when you add todos?
private final String name; | ||
private final List<Field> fields; | ||
|
||
public TableInfo(String name, List<Field> fields) { | ||
public TableInfo(String schemaName, String name, List<Field> fields) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does this all work okay with MySQL (which doesn't have schemas)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I think we already figured it out when we made the PR to discover tables from another schema than just the public
one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
kk
what i'm describing won't work, because you're changing behavior in the sources before the destinations are compatible with namespaces so you're a little boxed. if you were just doing destinations first, i think you could the in place replacement (i.e. just add namespace). in that process each destination would be made to support namespaces first when they were present. each one could be migrated. once that was done, we could start migrating sources, and as they had non null values for namespaces the destinations would respect them). i don't think i love having this extra object, but it would have to be a pretty execution plan to get around it. probably fine to keep it as is, but wanted to at least offer this different perspective. |
Co-authored-by: Charles <giardina.charles@gmail.com>
What
Describe what the change is solving
Same as #1993 to implement #1921
How
Describe the solution
This introduces a new object
AirbyteStreamName
in both the API and the Airbyte Protocol to represent namespace and table name in a single struct.The old
name
ofAirbyteStream
is kept as it was but the new fields are properly populated with separate namespace/table names.So for example, we would have catalog produced by the JDBC sources:
Non-jdbc sources are now producing:
These new fields won't be used right away so they shouldn't introduce any change to the product until the last step is implemented.
Pre-merge Checklist
Recommended reading order
airbyte-api/src/main/openapi/config.yaml
airbyte-protocol/models/src/main/resources/airbyte_protocol/airbyte_protocol.yaml
airbyte-server/src/main/java/io/airbyte/server/converters/CatalogConverter.java
airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java