-
Notifications
You must be signed in to change notification settings - Fork 4.2k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
🎉 S3 destination: Avro & Jsonl output (#4227)
* Add jsonl format to spec.json * Implement jsonl writer * Add documentation * Add acceptance test * Update document * Bump version * Update document example * Implement avro writer * Implement compression codec * Update documentation * Revise documentation * Add more tests * Add acceptance test * Format code * Create helper method for name updater * Update csv doc with normalization * Update version date
- Loading branch information
Showing
33 changed files
with
1,233 additions
and
200 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
66 changes: 66 additions & 0 deletions
66
...ation-s3/src/main/java/io/airbyte/integrations/destination/s3/avro/AvroRecordFactory.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,66 @@ | ||
/* | ||
* MIT License | ||
* | ||
* Copyright (c) 2020 Airbyte | ||
* | ||
* Permission is hereby granted, free of charge, to any person obtaining a copy | ||
* of this software and associated documentation files (the "Software"), to deal | ||
* in the Software without restriction, including without limitation the rights | ||
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | ||
* copies of the Software, and to permit persons to whom the Software is | ||
* furnished to do so, subject to the following conditions: | ||
* | ||
* The above copyright notice and this permission notice shall be included in all | ||
* copies or substantial portions of the Software. | ||
* | ||
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | ||
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | ||
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | ||
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | ||
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | ||
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | ||
* SOFTWARE. | ||
*/ | ||
|
||
package io.airbyte.integrations.destination.s3.avro; | ||
|
||
import com.fasterxml.jackson.core.JsonProcessingException; | ||
import com.fasterxml.jackson.databind.JsonNode; | ||
import com.fasterxml.jackson.databind.ObjectMapper; | ||
import com.fasterxml.jackson.databind.ObjectWriter; | ||
import com.fasterxml.jackson.databind.node.ObjectNode; | ||
import io.airbyte.commons.jackson.MoreMappers; | ||
import io.airbyte.integrations.base.JavaBaseConstants; | ||
import io.airbyte.protocol.models.AirbyteRecordMessage; | ||
import java.util.UUID; | ||
import org.apache.avro.Schema; | ||
import org.apache.avro.generic.GenericData; | ||
import tech.allegro.schema.json2avro.converter.JsonAvroConverter; | ||
|
||
public class AvroRecordFactory { | ||
|
||
private static final ObjectMapper MAPPER = MoreMappers.initMapper(); | ||
private static final ObjectWriter WRITER = MAPPER.writer(); | ||
|
||
private final Schema schema; | ||
private final JsonFieldNameUpdater nameUpdater; | ||
private final JsonAvroConverter converter = new JsonAvroConverter(); | ||
|
||
public AvroRecordFactory(Schema schema, JsonFieldNameUpdater nameUpdater) { | ||
this.schema = schema; | ||
this.nameUpdater = nameUpdater; | ||
} | ||
|
||
public GenericData.Record getAvroRecord(UUID id, AirbyteRecordMessage recordMessage) throws JsonProcessingException { | ||
JsonNode inputData = recordMessage.getData(); | ||
inputData = nameUpdater.getJsonWithStandardizedFieldNames(inputData); | ||
|
||
ObjectNode jsonRecord = MAPPER.createObjectNode(); | ||
jsonRecord.put(JavaBaseConstants.COLUMN_NAME_AB_ID, id.toString()); | ||
jsonRecord.put(JavaBaseConstants.COLUMN_NAME_EMITTED_AT, recordMessage.getEmittedAt()); | ||
jsonRecord.setAll((ObjectNode) inputData); | ||
|
||
return converter.convertToGenericDataRecord(WRITER.writeValueAsBytes(jsonRecord), schema); | ||
} | ||
|
||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
33 changes: 33 additions & 0 deletions
33
...ination-s3/src/main/java/io/airbyte/integrations/destination/s3/avro/S3AvroConstants.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,33 @@ | ||
/* | ||
* MIT License | ||
* | ||
* Copyright (c) 2020 Airbyte | ||
* | ||
* Permission is hereby granted, free of charge, to any person obtaining a copy | ||
* of this software and associated documentation files (the "Software"), to deal | ||
* in the Software without restriction, including without limitation the rights | ||
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | ||
* copies of the Software, and to permit persons to whom the Software is | ||
* furnished to do so, subject to the following conditions: | ||
* | ||
* The above copyright notice and this permission notice shall be included in all | ||
* copies or substantial portions of the Software. | ||
* | ||
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | ||
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | ||
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | ||
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | ||
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | ||
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | ||
* SOFTWARE. | ||
*/ | ||
|
||
package io.airbyte.integrations.destination.s3.avro; | ||
|
||
public class S3AvroConstants { | ||
|
||
// Field name with special character | ||
public static final String DOC_KEY_VALUE_DELIMITER = ":"; | ||
public static final String DOC_KEY_ORIGINAL_NAME = "_airbyte_original_name"; | ||
|
||
} |
136 changes: 136 additions & 0 deletions
136
...tion-s3/src/main/java/io/airbyte/integrations/destination/s3/avro/S3AvroFormatConfig.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,136 @@ | ||
/* | ||
* MIT License | ||
* | ||
* Copyright (c) 2020 Airbyte | ||
* | ||
* Permission is hereby granted, free of charge, to any person obtaining a copy | ||
* of this software and associated documentation files (the "Software"), to deal | ||
* in the Software without restriction, including without limitation the rights | ||
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | ||
* copies of the Software, and to permit persons to whom the Software is | ||
* furnished to do so, subject to the following conditions: | ||
* | ||
* The above copyright notice and this permission notice shall be included in all | ||
* copies or substantial portions of the Software. | ||
* | ||
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | ||
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | ||
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | ||
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | ||
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | ||
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | ||
* SOFTWARE. | ||
*/ | ||
|
||
package io.airbyte.integrations.destination.s3.avro; | ||
|
||
import com.fasterxml.jackson.databind.JsonNode; | ||
import io.airbyte.integrations.destination.s3.S3Format; | ||
import io.airbyte.integrations.destination.s3.S3FormatConfig; | ||
import org.apache.avro.file.CodecFactory; | ||
|
||
public class S3AvroFormatConfig implements S3FormatConfig { | ||
|
||
private final CodecFactory codecFactory; | ||
|
||
public S3AvroFormatConfig(JsonNode formatConfig) { | ||
this.codecFactory = parseCodecConfig(formatConfig.get("compression_codec")); | ||
} | ||
|
||
public static CodecFactory parseCodecConfig(JsonNode compressionCodecConfig) { | ||
if (compressionCodecConfig == null || compressionCodecConfig.isNull()) { | ||
return CodecFactory.nullCodec(); | ||
} | ||
|
||
JsonNode codecConfig = compressionCodecConfig.get("codec"); | ||
if (codecConfig == null || codecConfig.isNull() || !codecConfig.isTextual()) { | ||
return CodecFactory.nullCodec(); | ||
} | ||
String codecType = codecConfig.asText(); | ||
CompressionCodec codec = CompressionCodec.fromConfigValue(codecConfig.asText()); | ||
switch (codec) { | ||
case NULL -> { | ||
return CodecFactory.nullCodec(); | ||
} | ||
case DEFLATE -> { | ||
int compressionLevel = getCompressionLevel(compressionCodecConfig, 0, 0, 9); | ||
return CodecFactory.deflateCodec(compressionLevel); | ||
} | ||
case BZIP2 -> { | ||
return CodecFactory.bzip2Codec(); | ||
} | ||
case XZ -> { | ||
int compressionLevel = getCompressionLevel(compressionCodecConfig, 6, 0, 9); | ||
return CodecFactory.xzCodec(compressionLevel); | ||
} | ||
case ZSTANDARD -> { | ||
int compressionLevel = getCompressionLevel(compressionCodecConfig, 3, -5, 22); | ||
boolean includeChecksum = getIncludeChecksum(compressionCodecConfig, false); | ||
return CodecFactory.zstandardCodec(compressionLevel, includeChecksum); | ||
} | ||
case SNAPPY -> { | ||
return CodecFactory.snappyCodec(); | ||
} | ||
default -> { | ||
throw new IllegalArgumentException("Unsupported compression codec: " + codecType); | ||
} | ||
} | ||
} | ||
|
||
public static int getCompressionLevel(JsonNode compressionCodecConfig, int defaultLevel, int minLevel, int maxLevel) { | ||
JsonNode levelConfig = compressionCodecConfig.get("compression_level"); | ||
if (levelConfig == null || levelConfig.isNull() || !levelConfig.isIntegralNumber()) { | ||
return defaultLevel; | ||
} | ||
int level = levelConfig.asInt(); | ||
if (level < minLevel || level > maxLevel) { | ||
throw new IllegalArgumentException( | ||
String.format("Invalid compression level: %d, expected an integer in range [%d, %d]", level, minLevel, maxLevel)); | ||
} | ||
return level; | ||
} | ||
|
||
public static boolean getIncludeChecksum(JsonNode compressionCodecConfig, boolean defaultValue) { | ||
JsonNode checksumConfig = compressionCodecConfig.get("include_checksum"); | ||
if (checksumConfig == null || checksumConfig.isNumber() || !checksumConfig.isBoolean()) { | ||
return defaultValue; | ||
} | ||
return checksumConfig.asBoolean(); | ||
} | ||
|
||
public CodecFactory getCodecFactory() { | ||
return codecFactory; | ||
} | ||
|
||
@Override | ||
public S3Format getFormat() { | ||
return S3Format.AVRO; | ||
} | ||
|
||
public enum CompressionCodec { | ||
|
||
NULL("no compression"), | ||
DEFLATE("deflate"), | ||
BZIP2("bzip2"), | ||
XZ("xz"), | ||
ZSTANDARD("zstandard"), | ||
SNAPPY("snappy"); | ||
|
||
private final String configValue; | ||
|
||
CompressionCodec(String configValue) { | ||
this.configValue = configValue; | ||
} | ||
|
||
public static CompressionCodec fromConfigValue(String configValue) { | ||
for (CompressionCodec codec : values()) { | ||
if (configValue.equalsIgnoreCase(codec.configValue)) { | ||
return codec; | ||
} | ||
} | ||
throw new IllegalArgumentException("Unknown codec config value: " + configValue); | ||
} | ||
|
||
} | ||
|
||
} |
Oops, something went wrong.