Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement databricks connector as a copy destination #5748

Merged
merged 1 commit into from
Aug 31, 2021

Conversation

tuliren
Copy link
Contributor

@tuliren tuliren commented Aug 31, 2021

@github-actions github-actions bot added the area/connectors Connector related issues label Aug 31, 2021
@@ -15,6 +15,14 @@ dependencies {
implementation project(':airbyte-integrations:bases:base-java')
implementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs)
implementation project(':airbyte-integrations:connectors:destination-jdbc')
implementation project(':airbyte-integrations:connectors:destination-s3')
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's depend on the s3 destination for now and DRY it later.

@tuliren tuliren changed the title Implement databricks destination as a stream copier Implement databricks destination as a copy destination Aug 31, 2021
@tuliren tuliren changed the title Implement databricks destination as a copy destination Implement databricks connector as a copy destination Aug 31, 2021

@Override
public AirbyteConnectionStatus check(JsonNode config) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The CopyDestination can check or create the SQL tables by calling the abstract method. So we only need to check the persistence in this implementation.

* This implementation is similar to {@link io.airbyte.integrations.destination.jdbc.copy.s3.S3StreamCopier}.
* The difference is that this implementation creates Parquet staging files, instead of CSV ones.
*/
public class DatabricksStreamCopier implements StreamCopier {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can focus on implementing the methods in this class to transfer the data from S3 to Databricks.

@tuliren tuliren requested a review from Phlair August 31, 2021 06:02
@@ -36,7 +37,7 @@
/**
* Writes a value to a staging file for the stream.
*/
void write(UUID id, String jsonDataString, Timestamp emittedAt) throws Exception;
void write(UUID id, AirbyteRecordMessage recordMessage) throws Exception;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This interface change is necessary to drop in the S3ParquetWriter directly for convenience.

import io.airbyte.protocol.models.DestinationSyncMode;

public interface StreamCopierFactory<T> {

StreamCopier create(String configuredSchema,
T config,
String stagingFolder,
DestinationSyncMode syncMode,
AirbyteStream stream,
ConfiguredAirbyteStream configuredStream,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This interface change is necessary to reuse the S3Writer.

@tuliren tuliren linked an issue Aug 31, 2021 that may be closed by this pull request
@Phlair
Copy link
Contributor

Phlair commented Aug 31, 2021

@tuliren I'll merge this in and then work off it

@Phlair Phlair merged commit d7db844 into george/hotload-jar Aug 31, 2021
@Phlair Phlair deleted the liren/databricks-s3-writer branch August 31, 2021 12:39
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

New destination: Databricks Delta Lake
3 participants