Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🎉 Destination S3: parquet output #3908

Merged
merged 41 commits into from
Jun 14, 2021
Merged

Conversation

tuliren
Copy link
Contributor

@tuliren tuliren commented Jun 5, 2021

What

  • Address Destination S3: support writing Parquet data format #3642.
  • Refactor S3 destination so that it is easier to add new formats.
    • Rename S3OutputFormatter to S3Writer, and add a writer package.
    • Extract shared writer logic to an abstract class BaseS3Writer.
    • Move CSV logics to its own package.
    • Move CSV specific constants to its own constant file.
    • Add a util package.
    • Extract shared acceptance code to an abstract class S3DestinationAcceptanceTest.
  • Support parquet file output:
    • Use AvroParquetWriter to output Parquet files on S3.
      • The dependency on hadoop-aws ensures that data is uploaded to S3 while it is generated on the fly.
    • Create JsonSchemaConverter to convert JsonSchema to Avro schema.
    • Use json2avro.converter to convert Json object to Avro record based on the schema.

Recommended reading order

  1. spec.json
  2. S3ParquetWriter
  3. JsonSchemaConverter

Pre-merge Checklist

Expand the checklist which is relevant for this PR.

Connector checklist

  • Issue acceptance criteria met
  • PR name follows PR naming conventions
  • Secrets are annotated with airbyte_secret in output spec
  • Unit & integration tests added as appropriate (and are passing)
    • Community members: please provide proof of this succeeding locally e.g: screenshot or copy-paste acceptance test output. To run acceptance tests for a Python connector, follow instructions in the README. For java connectors run ./gradlew :airbyte-integrations:connectors:<name>:integrationTest.
  • /test connector=connectors/<name> command as documented here is passing.
    • Community members can skip this, Airbyters will run this for you.
  • Code reviews completed
  • Credentials added to Github CI if needed and not already present. instructions for injecting secrets into CI.
  • Documentation updated
    • README
    • CHANGELOG.md
    • Reference docs in the docs/integrations/ directory.
  • Build is successful
  • Connector version bumped like described here
  • New Connector version released on Dockerhub by running the /publish command described here
  • No major blockers
  • PR merged into master branch
  • Follow up tickets have been created
  • Associated tickets have been closed & stakeholders notified

@marcosmarxm marcosmarxm mentioned this pull request Jun 8, 2021
@tuliren tuliren force-pushed the liren/s3-destination-parquet branch from bd97fe8 to 176db13 Compare June 10, 2021 17:17
.put("HKD", 10)
.put("NZD", 700)
.put("HKD", 10.0)
.put("NZD", 700.0)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HKD and NZD are typed as number in the catalog. All other entries have decimals for these two fields. So I'd like to change these values to decimals as well so that the type is consistent.

In Parquet and probably other formats, we need to have strict type mappings, and number is mapped to double. If these two fields have flipping types, I need to do arbitrary conversions to pass the acceptance test, which seems unnecessary.

} catch (Exception e) {
return Optional.empty();
}
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the arbitrary type conversion I was talking about in the comment about changing integer HKD field to decimals. When the testing data has consistent typing, such conversion can be removed.

.queueCapacity(S3DestinationConstants.DEFAULT_QUEUE_CAPACITY)
.numUploadThreads(S3DestinationConstants.DEFAULT_UPLOAD_THREADS)
.partSize(S3DestinationConstants.DEFAULT_PART_SIZE_MD);
.numStreams(S3CsvConstants.DEFAULT_NUM_STREAMS)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WHy don't we use the same hadoop s3 uploader here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which uploader do you mean by "hadoop s3 uploader"?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I am reading the PR correctly, it seems we are using two different ways to push data to s3.

  • ParquetWriter
  • StreamTransferManager

I am just curious if it is possible to use a similar one from the Hadoop package to push the CSV one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. The two writers output data in different data structures, and we do need them for different formats. The Parquet writer organizes data in Parquet row groups, while the stream transfer manager writes data line by line.

@github-actions github-actions bot added the area/documentation Improvements or additions to documentation label Jun 10, 2021
@tuliren tuliren force-pushed the liren/s3-destination-parquet branch from a9790dd to b369dd7 Compare June 11, 2021 05:01
@tuliren tuliren changed the title 🎉 S3 destination parquet format 🎉 Destination S3: parquet output Jun 11, 2021
@tuliren tuliren marked this pull request as ready for review June 11, 2021 05:49
@github-actions github-actions bot added area/documentation Improvements or additions to documentation and removed area/connectors Connector related issues labels Jun 11, 2021
@@ -15,10 +15,18 @@ dependencies {
implementation project(':airbyte-integrations:connectors:destination-jdbc')
implementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs)

// csv
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I appreciate the comments here to make clear what dependencies are for what!

Copy link
Contributor

@davinchia davinchia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice work! Appreciate the comments + the extensive tests.

My comments are:

  1. Minor readability changes.
  2. Some better commenting to help future OSS contributors. I know there are some that want to contribute other formats.
  3. Possibility of using an OSS tool to do the Json -> Avro conversion. Not a blocker. Thought it would be nice to not write our own tool.
  4. Possibility of sharing the CsvWriter stream transfer manager construction with the CopyConsumer.
  5. Possibility of using the PrimitiveJsonSchema class as a enum instead of having a separate listing in the S3 directory.

The last 2 points are more me thinking out loud. I'm entirely sure they are good ideas. These changes can be done in follow up PRs since this one is getting big as is.

Comment on lines 98 to 111
if (hasFailed) {
LOGGER.warn("Failure detected. Aborting upload of stream '{}'...", stream.getName());
csvPrinter.close();
outputStream.close();
uploadManager.abort();
LOGGER.warn("Upload of stream '{}' aborted.", stream.getName());
} else {
LOGGER.info("Uploading remaining data for stream '{}'.", stream.getName());
csvPrinter.close();
outputStream.close();
uploadManager.complete();
LOGGER.info("Upload completed for stream '{}'.", stream.getName());
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (hasFailed) {
LOGGER.warn("Failure detected. Aborting upload of stream '{}'...", stream.getName());
csvPrinter.close();
outputStream.close();
uploadManager.abort();
LOGGER.warn("Upload of stream '{}' aborted.", stream.getName());
} else {
LOGGER.info("Uploading remaining data for stream '{}'.", stream.getName());
csvPrinter.close();
outputStream.close();
uploadManager.complete();
LOGGER.info("Upload completed for stream '{}'.", stream.getName());
}
}
csvPrinter.close();
outputStream.close();
if (hasFailed) {
LOGGER.warn("Failure detected. Aborting upload of stream '{}'...", stream.getName());
uploadManager.abort();
LOGGER.warn("Upload of stream '{}' aborted.", stream.getName());
return;
}
LOGGER.info("Uploading remaining data for stream '{}'.", stream.getName());
uploadManager.complete();
LOGGER.info("Upload completed for stream '{}'.", stream.getName());
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just slightly easier to read.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will move the two close statements before the if check.

Usually I am a fan of early returning. However, given the shortness of the if blocks, it is already pretty readable, and early returning will slightly make it more confusing I think.

import java.util.Map;

/**
* This helper class tracks whether a Json has special field name that needs to be replaced with a
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we also add why this is required. as is I'm not sure whether this is because parquet expects it this way or we want to standardise thing to make things simpler (it looks like the latter)

Copy link
Contributor Author

@tuliren tuliren Jun 14, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is the former. Parquet only allows these characters for record name: /a-zA-Z0-9_/. Otherwise I won't go through all the trouble to do this. The necessity of this tracker actually complicates things a lot.

Will update the comment to reflect that.

Copy link
Contributor

@ChristopheDuong ChristopheDuong Jun 14, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Parquet only allows these characters for record name: /a-zA-Z0-9_/.

We currently have to deal with some naming conventions because some destinations allow different subsets of characters for identifiers names. These are dealt in classes deriving from airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/NamingConventionTransformer.java

It seems that for S3 parquet, a difference would be that it needs to apply those conventions to field names too. Other destinations only cares about conventions for stream names and namespace (table & schema).

But would it make sense to regroup them in the same kind of class/hierarchy too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But would it make sense to regroup them in the same kind of class/hierarchy too?

@ChristopheDuong, can you elaborate on this? What do you mean by "regroup them in the same kind of class"?

Copy link
Contributor

@ChristopheDuong ChristopheDuong Jun 15, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ChristopheDuong, can you elaborate on this? What do you mean by "regroup them in the same kind of class"?

Should we move some logic around the string transformations of this code to a class named S3NameTransformer that extends NamingConventionTransformer like we do with SnowflakeSQLNameTransformer or RedshiftSQLNameTransformer etc ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see.

The name conversion logic for Parquet is exactly the same as the one in StandardNameTransformer. So there is nothing to override. It seems unnecessary to create a new class.


S3ParquetFormatConfig formatConfig = (S3ParquetFormatConfig) config.getFormatConfig();
Configuration hadoopConfig = getHadoopConfig(config);
this.parquetWriter = AvroParquetWriter.<GenericData.Record>builder(HadoopOutputFile.fromPath(path, hadoopConfig))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am sad we have to rely on hadoop libraries to do this: https://issues.apache.org/jira/browse/PARQUET-1822

Gah.

airbyte-integrations/connectors/destination-s3/README.md Outdated Show resolved Hide resolved

@Override
public void close(boolean hasFailed) throws IOException {
if (hasFailed) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same thought as the CsvWriter close method. I prefer return early instead of an else block.

@tuliren
Copy link
Contributor Author

tuliren commented Jun 14, 2021

@davinchia, thanks for the code review. I know it's a long PR.

Copy link
Contributor

@subodh1810 subodh1810 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks great. I just have 1 comment.
I would have loved if somehow we could have reused the hadoop library. Is there not a csv version of HadoopOutputFile so that we could have reused a lot of code?

@@ -109,7 +109,8 @@ jobs:
ZENDESK_TALK_TEST_CREDS: ${{ secrets.ZENDESK_TALK_TEST_CREDS }}
ZOOM_INTEGRATION_TEST_CREDS: ${{ secrets.ZOOM_INTEGRATION_TEST_CREDS }}
PLAID_INTEGRATION_TEST_CREDS: ${{ secrets.PLAID_INTEGRATION_TEST_CREDS }}
DESTINATION_S3_INTEGRATION_TEST_CREDS: ${{ secrets.DESTINATION_S3_INTEGRATION_TEST_CREDS }}
DESTINATION_S3_CSV_INTEGRATION_TEST_CREDS: ${{ secrets.DESTINATION_S3_CSV_INTEGRATION_TEST_CREDS }}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can have only 1 set of credentials and change the cofig method to only read the relevant information from the credentials.
For instance this part can be hardcoded in the test class in the config i.e. for CSV test it can be

"format": {
    "format_type": "CSV",
    "flattening": "Root level flattening"
  }

and for Parquet test it can be

"format": { "format_type": "Parquet", "compression_codec": "GZIP" }

and the sensitive information can be populated from the credentials.

@github-actions github-actions bot added the area/connectors Connector related issues label Jun 14, 2021
@tuliren
Copy link
Contributor Author

tuliren commented Jun 14, 2021

/test connector=connectors/destination-s3

🕑 connectors/destination-s3 https://github.com/airbytehq/airbyte/actions/runs/937430136
✅ connectors/destination-s3 https://github.com/airbytehq/airbyte/actions/runs/937430136

@tuliren
Copy link
Contributor Author

tuliren commented Jun 14, 2021

/publish connector=connectors/destination-s3

🕑 connectors/destination-s3 https://github.com/airbytehq/airbyte/actions/runs/937443364
❌ connectors/destination-s3 https://github.com/airbytehq/airbyte/actions/runs/937443364

@tuliren
Copy link
Contributor Author

tuliren commented Jun 14, 2021

/publish connector=connectors/destination-s3

🕑 connectors/destination-s3 https://github.com/airbytehq/airbyte/actions/runs/937491071
✅ connectors/destination-s3 https://github.com/airbytehq/airbyte/actions/runs/937491071

Copy link
Contributor

@davinchia davinchia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great stuff @tuliren!

Nothing else from me; feel free to merge whenever!

@tuliren
Copy link
Contributor Author

tuliren commented Jun 14, 2021

/publish connector=connectors/destination-s3

🕑 connectors/destination-s3 https://github.com/airbytehq/airbyte/actions/runs/937515876
✅ connectors/destination-s3 https://github.com/airbytehq/airbyte/actions/runs/937515876

@tuliren tuliren merged commit 87552b2 into master Jun 14, 2021
@tuliren tuliren deleted the liren/s3-destination-parquet branch June 14, 2021 23:49

static List<JsonSchemaType> getTypes(String fieldName, JsonNode typeProperty) {
if (typeProperty == null) {
throw new IllegalStateException(String.format("Field %s has no type", fieldName));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW some catalogs are producing fields without types, so it's not so uncommon...

For example, the source-facebook does this I think... Does this exception cancel the sync of such catalogs? Should the fields be ignored or defaulted to a string for example instead?

Copy link
Contributor Author

@tuliren tuliren Jun 15, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. This is good to know.

In general, schemaless source is not suitable for Parquet. There is another big problem regarding how we are going to handle additionalProperties whose types are known.

I will submit a follow-up PR to take care of it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created an issue: #4124

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/connectors Connector related issues area/documentation Improvements or additions to documentation
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants