Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🎉 New destination: S3 #3672

Merged
merged 28 commits into from
Jun 3, 2021
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
1728c21
Update README icon links
tuliren May 25, 2021
88044a5
Update airbyte-specification doc
tuliren May 25, 2021
b6084bf
Extend base connector
tuliren May 25, 2021
0818163
Remove redundant region
tuliren May 27, 2021
6f4aa9c
Separate warning from info
tuliren May 27, 2021
fdaf213
Implement s3 destination
tuliren May 27, 2021
2d90864
Run format
tuliren May 27, 2021
dc1220e
Clarify logging message
tuliren May 29, 2021
f9d07a0
Rename variables and functions
tuliren May 29, 2021
df040f7
Update documentation
tuliren May 29, 2021
8ec265a
Rename and annotate interface
tuliren May 29, 2021
0048bf7
Inject formatter factory
tuliren May 30, 2021
dffae5a
Remove part size
tuliren May 30, 2021
b026598
Fix spec field names and add unit tests
tuliren May 31, 2021
fc3bba7
Add unit tests for csv output formatter
tuliren May 31, 2021
4efa629
Format code
tuliren May 31, 2021
a6e6856
Complete acceptance test and fix bugs
tuliren Jun 1, 2021
9ac8fcc
Fix uuid
tuliren Jun 2, 2021
d3949f0
Merge branch 'master' into liren/s3-destination-mvp
tuliren Jun 2, 2021
d840504
Remove generator template files
tuliren Jun 2, 2021
dbb2f79
Add unhappy test case
tuliren Jun 2, 2021
434c8a4
Checkin airbyte state message
tuliren Jun 2, 2021
edbcfec
Adjust stream transfer manager parameters
tuliren Jun 2, 2021
8f940a3
Use underscore in filename
tuliren Jun 2, 2021
6cb86c0
Create csv sheet generator to handle data processing
tuliren Jun 2, 2021
60b252a
Format code
tuliren Jun 2, 2021
5373cd7
Add partition id to filename
tuliren Jun 2, 2021
caa44f4
Rename date format variable
tuliren Jun 3, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Introduction

![GitHub Workflow Status](https://img.shields.io/github/workflow/status/airbytehq/airbyte/Airbyte%20CI) ![License](https://img.shields.io/github/license/airbytehq/airbyte)
[![GitHub Workflow Status](https://img.shields.io/github/workflow/status/airbytehq/airbyte/Airbyte%20CI)](https://github.com/airbytehq/airbyte/actions/workflows/gradle.yml) [![License](https://img.shields.io/github/license/airbytehq/airbyte)](./LICENSE)

![](docs/.gitbook/assets/airbyte_horizontal_color_white-background.svg)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"destinationDefinitionId": "7679c5ea-e829-4202-bd47-61ae9debdds3",
"name": "S3",
"dockerRepository": "airbyte/destination-s3",
"dockerImageTag": "0.1.0",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/s3"
}
7 changes: 7 additions & 0 deletions airbyte-config/init/src/main/resources/icons/s3.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@
dockerRepository: airbyte/destination-snowflake
dockerImageTag: 0.3.6
documentationUrl: https://docs.airbyte.io/integrations/destinations/snowflake
- destinationDefinitionId: 7679c5ea-e829-4202-bd47-61ae9debdds3
name: S3
dockerRepository: airbyte/destination-s3
dockerImageTag: 0.1.0
documentationUrl: https://docs.airbyte.io/integrations/destinations/s3
- destinationDefinitionId: f7a7d195-377f-cf5b-70a5-be6b819019dc
name: Redshift
dockerRepository: airbyte/destination-redshift
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,11 @@ public void accept(AirbyteMessage msg) throws Exception {

@Override
public void close() throws Exception {
LOGGER.info("hasFailed: {}.", hasFailed);
if (hasFailed) {
LOGGER.warn("hasFailed: true.");
tuliren marked this conversation as resolved.
Show resolved Hide resolved
} else {
LOGGER.info("hasFailed: false.");
}
close(hasFailed);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.lang.Exceptions;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.integrations.BaseConnector;
import io.airbyte.integrations.base.AirbyteMessageConsumer;
import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair;
import io.airbyte.integrations.base.Destination;
Expand All @@ -68,7 +68,6 @@
import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.protocol.models.DestinationSyncMode;
import java.io.ByteArrayInputStream;
import java.io.IOException;
Expand All @@ -84,7 +83,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BigQueryDestination implements Destination {
public class BigQueryDestination extends BaseConnector implements Destination {

private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryDestination.class);
static final String CONFIG_DATASET_ID = "dataset_id";
Expand All @@ -102,13 +101,6 @@ public BigQueryDestination() {
namingResolver = new StandardNameTransformer();
}

@Override
public ConnectorSpecification spec() throws IOException {
// return a jsonschema representation of the spec for the integration.
final String resourceString = MoreResources.readResource("spec.json");
return Jsons.deserialize(resourceString, ConnectorSpecification.class);
}

@Override
public AirbyteConnectionStatus check(JsonNode config) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.base.Preconditions;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.integrations.BaseConnector;
import io.airbyte.integrations.base.AirbyteMessageConsumer;
import io.airbyte.integrations.base.Destination;
import io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer;
Expand All @@ -41,7 +41,6 @@
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.protocol.models.DestinationSyncMode;
import java.io.FileWriter;
import java.io.IOException;
Expand All @@ -59,7 +58,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CsvDestination implements Destination {
public class CsvDestination extends BaseConnector implements Destination {

private static final Logger LOGGER = LoggerFactory.getLogger(CsvDestination.class);

Expand All @@ -71,12 +70,6 @@ public CsvDestination() {
namingResolver = new StandardNameTransformer();
}

@Override
public ConnectorSpecification spec() throws IOException {
final String resourceString = MoreResources.readResource("spec.json");
return Jsons.deserialize(resourceString, ConnectorSpecification.class);
}

@Override
public AirbyteConnectionStatus check(JsonNode config) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.integrations.BaseConnector;
import io.airbyte.integrations.base.AirbyteMessageConsumer;
import io.airbyte.integrations.base.Destination;
import io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer;
Expand All @@ -42,7 +42,6 @@
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.protocol.models.DestinationSyncMode;
import java.io.FileWriter;
import java.io.IOException;
Expand All @@ -59,7 +58,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LocalJsonDestination implements Destination {
public class LocalJsonDestination extends BaseConnector implements Destination {

private static final Logger LOGGER = LoggerFactory.getLogger(LocalJsonDestination.class);

Expand All @@ -71,12 +70,6 @@ public LocalJsonDestination() {
namingResolver = new StandardNameTransformer();
}

@Override
public ConnectorSpecification spec() throws IOException {
final String resourceString = MoreResources.readResource("spec.json");
return Jsons.deserialize(resourceString, ConnectorSpecification.class);
}

@Override
public AirbyteConnectionStatus check(JsonNode config) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@
"ap-northeast-3",
"ap-southeast-1",
"ap-southeast-2",
"ap-northeast-1",
"ca-central-1",
"cn-north-1",
"cn-northwest-1",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
*
!Dockerfile
!build
11 changes: 11 additions & 0 deletions airbyte-integrations/connectors/destination-s3/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
FROM airbyte/integration-base-java:dev

WORKDIR /airbyte
ENV APPLICATION destination-s3

COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.1.0
LABEL io.airbyte.name=airbyte/destination-s3
23 changes: 23 additions & 0 deletions airbyte-integrations/connectors/destination-s3/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
plugins {
id 'application'
id 'airbyte-docker'
id 'airbyte-integration-test-java'
}

application {
mainClass = 'io.airbyte.integrations.destination.s3.S3Destination'
}

dependencies {
implementation project(':airbyte-config:models')
implementation project(':airbyte-protocol:models')
implementation project(':airbyte-integrations:bases:base-java')
implementation project(':airbyte-integrations:connectors:destination-jdbc')
implementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs)

implementation 'com.amazonaws:aws-java-sdk-s3:1.11.978'
implementation 'org.apache.commons:commons-csv:1.4'
implementation 'com.github.alexmojaki:s3-stream-upload:2.2.2'

integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-destination-test')
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package io.airbyte.integrations.destination.s3;

import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair;
import io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteMessage.Type;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import java.sql.Timestamp;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

public class S3Consumer extends FailureTrackingAirbyteMessageConsumer {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this a distinct class from the one in JDBC? should it reuse that class?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The JDBC one has lots of database operations in it. I tried to reuse that one at the beginning of last week, but it was unnecessarily complicated. So I decided to create a separate one that only deals with S3 logic.


private final S3DestinationConfig s3DestinationConfig;
private final ConfiguredAirbyteCatalog catalog;
private final Map<AirbyteStreamNameNamespacePair, S3Handler> pairToHandlers;

public S3Consumer(
S3DestinationConfig s3DestinationConfig,
ConfiguredAirbyteCatalog catalog) {
this.s3DestinationConfig = s3DestinationConfig;
this.catalog = catalog;
tuliren marked this conversation as resolved.
Show resolved Hide resolved
this.pairToHandlers = new HashMap<>(catalog.getStreams().size());
tuliren marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
protected void startTracked() throws Exception {
AWSCredentials awsCreds = new BasicAWSCredentials(s3DestinationConfig.getAccessKeyId(),
s3DestinationConfig.getSecretAccessKey());
AmazonS3 s3Client = AmazonS3ClientBuilder.standard()
.withCredentials(new AWSStaticCredentialsProvider(awsCreds))
.withRegion(s3DestinationConfig.getBucketRegion())
.build();
Timestamp uploadTimestamp = new Timestamp(System.currentTimeMillis());

for (ConfiguredAirbyteStream configuredStream : catalog.getStreams()) {
S3Handler handler = S3Handlers.getS3Handler(s3DestinationConfig, s3Client, configuredStream, uploadTimestamp);
tuliren marked this conversation as resolved.
Show resolved Hide resolved
handler.initialize();

AirbyteStream stream = configuredStream.getStream();
AirbyteStreamNameNamespacePair pair = AirbyteStreamNameNamespacePair.fromAirbyteSteam(stream);
pairToHandlers.put(pair, handler);
}
}

@Override
protected void acceptTracked(AirbyteMessage airbyteMessage) throws Exception {
if (airbyteMessage.getType() != Type.RECORD) {
return;
}

AirbyteRecordMessage recordMessage = airbyteMessage.getRecord();
AirbyteStreamNameNamespacePair pair = AirbyteStreamNameNamespacePair
.fromRecordMessage(recordMessage);

if (!pairToHandlers.containsKey(pair)) {
throw new IllegalArgumentException(
String.format(
"Message contained record from a stream that was not in the catalog. \ncatalog: %s , \nmessage: %s",
Jsons.serialize(catalog), Jsons.serialize(recordMessage)));
}

UUID id = UUID.randomUUID();
pairToHandlers.get(pair).write(id, recordMessage);
}

@Override
protected void close(boolean hasFailed) throws Exception {
for (S3Handler handler : pairToHandlers.values()) {
handler.close(hasFailed);
}
}

}
Loading