Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add some abstraction to avoid direct generated object use #20905

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.commons.protocol.objects;

import io.airbyte.protocol.models.AirbyteTraceMessage;

public interface AirbyteMessage {

AirbyteMessageType getType();

ConnectorSpecification getSpec();

// TODO should be an interface
AirbyteTraceMessage getTrace();

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.commons.protocol.objects;

public enum AirbyteMessageType {
RECORD,
STATE,
LOG,
CONNECTION_STATUS,
CATALOG,
TRACE,
SPEC,
CONTROL

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.commons.protocol.objects;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import io.airbyte.commons.protocol.objects.serde.ConnectionSpecificationDeserializer;
import io.airbyte.commons.protocol.objects.serde.ConnectionSpecificationSerializer;
import io.airbyte.commons.protocol.objects.serde.JsonSerializable;
import io.airbyte.protocol.models.AdvancedAuth;
import io.airbyte.protocol.models.AuthSpecification;
import java.net.URI;
import java.util.List;

@JsonDeserialize(using = ConnectionSpecificationDeserializer.class)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should these be declared on the adapters? E.g. if the platform is on protocol v42, but an older connector still uses protocol v41, and it has a call to Jsons.serialize(new ConnectorSpecificationAdapterV41(...))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Those are needed for jackson to know how to serialize the objects of the model. In this case, it goes on the interface because I declared the interface in the ConnectorJobOutput.yaml rather than the implementation.

@JsonSerialize(using = ConnectionSpecificationSerializer.class)
public interface ConnectorSpecification extends JsonSerializable {

URI getDocumentationUrl();

URI getChangelogUrl();

JsonNode getConnectorSpecification();

Boolean isSupportingIncremental();

Boolean isSupportingNormalization();

Boolean isSupportingDBT();

List<DestinationSyncMode> getSupportedDestinationSyncModes();

// TODO introduce specific interfaces
AuthSpecification getAuthSpecification();

// TODO introduce specific interfaces
AdvancedAuth getAdvancedAuth();

// Clients should use this interface rather than the underlying object
@Deprecated
io.airbyte.protocol.models.ConnectorSpecification getRaw();

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.commons.protocol.objects;

public enum DestinationSyncMode {
APPEND,
OVERWRITE,
APPEND_DEDUP
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.commons.protocol.objects.impl;

import io.airbyte.commons.protocol.objects.AirbyteMessage;
import io.airbyte.commons.protocol.objects.AirbyteMessageType;
import io.airbyte.commons.protocol.objects.ConnectorSpecification;
import io.airbyte.protocol.models.AirbyteMessage.Type;
import io.airbyte.protocol.models.AirbyteTraceMessage;
import java.util.Map;
import lombok.EqualsAndHashCode;

@EqualsAndHashCode
public class AirbyteMessageAdapter implements AirbyteMessage {

private final io.airbyte.protocol.models.AirbyteMessage airbyteMessage;

public AirbyteMessageAdapter(final io.airbyte.protocol.models.AirbyteMessage airbyteMessage) {
this.airbyteMessage = airbyteMessage;
}

@Override
public AirbyteMessageType getType() {
return fromProtocolObject(airbyteMessage.getType());
}

@Override
public ConnectorSpecification getSpec() {
return new ConnectorSpecificationAdapter(airbyteMessage.getSpec());
}

@Override
public AirbyteTraceMessage getTrace() {
return airbyteMessage.getTrace();
}

public static AirbyteMessageType fromProtocolObject(final Type type) {
return fromProtocolObject.get(type);
}

private final static Map<Type, AirbyteMessageType> fromProtocolObject = Map.of(
Type.RECORD, AirbyteMessageType.RECORD,
Type.STATE, AirbyteMessageType.STATE,
Type.LOG, AirbyteMessageType.RECORD,
Type.CONNECTION_STATUS, AirbyteMessageType.CONNECTION_STATUS,
Type.CATALOG, AirbyteMessageType.CATALOG,
Type.TRACE, AirbyteMessageType.TRACE,
Type.SPEC, AirbyteMessageType.SPEC,
Type.CONTROL, AirbyteMessageType.CONTROL);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.commons.protocol.objects.impl;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.protocol.objects.ConnectorSpecification;
import io.airbyte.commons.protocol.objects.DestinationSyncMode;
import io.airbyte.protocol.models.AdvancedAuth;
import io.airbyte.protocol.models.AuthSpecification;
import java.net.URI;
import java.util.List;
import java.util.Map;
import lombok.EqualsAndHashCode;

@EqualsAndHashCode
public class ConnectorSpecificationAdapter implements ConnectorSpecification {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

checking my understanding: is the idea that we would have an adapter for every protocol version?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not necessarily, the interface gives us this flexibility, the idea is that the adapter should be the only place where we need bump the objects.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also, how sketchy is it to have the generated protocol models directly implement the interfaces? I.e.

  • interface declares default Whatever getWhatever() { return null; }
  • and the generated classes look like public class AirbyteMessage implements io.airbyte.commons.protocol.objects.AirbyteMessage

then:

  • adding new fields is still easy (just add a new default method to the interface)
  • deleting fields is still easy (mark the interface method as deprecated, or just delete it entirely)
  • renaming fields is... probably fine? (default Whatever getWhatever() { return getOldWhatever(); })

(I forget if the issue was that there's a technical problem, or that the code generator just doesn't support this - if the latter, we could maaaaybe do something similar to https://github.com/LiveRamp/extravagance/ and edit the generated code)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One objective is to decouple the generated code from the how we use it.
For example, we may decide to rename field in the protocol, by decoupling this, we can control if/when we want to propagate this. Typical reason could be to limit the amount of changes at the same time.
When it comes to deprecation, newer version could remove some fields, those fields could remain in the interface, but we would have the option to throw exceptions depending on the adapter implementation.


final io.airbyte.protocol.models.ConnectorSpecification connectorSpecification;

public ConnectorSpecificationAdapter(final io.airbyte.protocol.models.ConnectorSpecification connectorSpecification) {
this.connectorSpecification = connectorSpecification;
}

@Override
public URI getDocumentationUrl() {
return connectorSpecification.getDocumentationUrl();
}

@Override
public URI getChangelogUrl() {
return connectorSpecification.getChangelogUrl();
}

@Override
public JsonNode getConnectorSpecification() {
return connectorSpecification.getConnectionSpecification();
}

@Override
public Boolean isSupportingIncremental() {
return connectorSpecification.getSupportsIncremental();
}

@Override
public Boolean isSupportingNormalization() {
return connectorSpecification.getSupportsNormalization();
}

@Override
public Boolean isSupportingDBT() {
return connectorSpecification.getSupportsDBT();
}

@Override
public List<DestinationSyncMode> getSupportedDestinationSyncModes() {
return connectorSpecification.getSupportedDestinationSyncModes().stream().map(fromProtocolObjects::get).toList();
}

@Override
public AuthSpecification getAuthSpecification() {
return connectorSpecification.getAuthSpecification();
}

@Override
public AdvancedAuth getAdvancedAuth() {
return connectorSpecification.getAdvancedAuth();
}

@Override
public String toJson() {
return Jsons.serialize(connectorSpecification);
}

static public ConnectorSpecification fromJson(final String jsonString) {
return new ConnectorSpecificationAdapter(Jsons.deserialize(jsonString, io.airbyte.protocol.models.ConnectorSpecification.class));
}

@Override
public io.airbyte.protocol.models.ConnectorSpecification getRaw() {
return connectorSpecification;
}

private final static Map<io.airbyte.protocol.models.DestinationSyncMode, DestinationSyncMode> fromProtocolObjects = Map.of(
io.airbyte.protocol.models.DestinationSyncMode.APPEND, DestinationSyncMode.APPEND,
io.airbyte.protocol.models.DestinationSyncMode.APPEND_DEDUP, DestinationSyncMode.APPEND_DEDUP,
io.airbyte.protocol.models.DestinationSyncMode.OVERWRITE, DestinationSyncMode.OVERWRITE);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.commons.protocol.objects.serde;

import com.fasterxml.jackson.core.JacksonException;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
import io.airbyte.commons.protocol.objects.ConnectorSpecification;
import io.airbyte.commons.protocol.objects.impl.ConnectorSpecificationAdapter;
import java.io.IOException;

public class ConnectionSpecificationDeserializer extends StdDeserializer<ConnectorSpecification> {

public ConnectionSpecificationDeserializer() {
this(null);
}

protected ConnectionSpecificationDeserializer(Class<?> vc) {
super(vc);
}

@Override
public ConnectorSpecification deserialize(JsonParser p, DeserializationContext ctxt) throws IOException, JacksonException {
final JsonNode node = p.getCodec().readTree(p);
final String jsonString = node.get("object").asText();
return ConnectorSpecificationAdapter.fromJson(jsonString);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.commons.protocol.objects.serde;

import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.SerializerProvider;
import com.fasterxml.jackson.databind.ser.std.StdSerializer;
import io.airbyte.commons.protocol.objects.ConnectorSpecification;
import java.io.IOException;

public class ConnectionSpecificationSerializer extends StdSerializer<ConnectorSpecification> {

public ConnectionSpecificationSerializer() {
this(null);
}

protected ConnectionSpecificationSerializer(Class<ConnectorSpecification> t) {
super(t);
}

@Override
public void serialize(ConnectorSpecification value, JsonGenerator gen, SerializerProvider provider) throws IOException {
gen.writeStartObject();
gen.writeStringField("object", value.toJson());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

which components would receive these {object: {spec...}} messages? I.e. do connectors need to produce this format? Or are connectors still expected to produce protocol messages, and then the worker would translate them into this format? (or something else?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think those should remain within the platform, those objects should be mostly used for temporal payloads which is generated by the clients we use and the activity. It currently leaks to the connector's tests because some tests rely on the workers.
Connectors having to generate those should be a red flag.

gen.writeEndObject();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.commons.protocol.objects.serde;

public interface JsonSerializable {

String toJson();

}
Loading