Skip to content

Commit

Permalink
destination-s3: don't reuse names of existing objects (#45143)
Browse files Browse the repository at this point in the history
  • Loading branch information
stephane-airbyte authored Sep 5, 2024
1 parent e6d6b8e commit 6730a3b
Show file tree
Hide file tree
Showing 8 changed files with 76 additions and 48 deletions.
1 change: 1 addition & 0 deletions airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ corresponds to that version.

| Version | Date | Pull Request | Subject |
| :--------- | :--------- | :----------------------------------------------------------- | :------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| 0.44.21 | 2024-09-04 | [\#45143](https://github.com/airbytehq/airbyte/pull/45143) | S3-destination: don't overwrite existing files, skip those file indexes instead |
| 0.44.20 | 2024-08-30 | [\#44933](https://github.com/airbytehq/airbyte/pull/44933) | Avro/Parquet destinations: handle `{}` schemas inside objects/arrays |
| 0.44.19 | 2024-08-20 | [\#44476](https://github.com/airbytehq/airbyte/pull/44476) | Increase Jackson message length limit to 100mb |
| 0.44.18 | 2024-08-22 | [\#44505](https://github.com/airbytehq/airbyte/pull/44505) | Improve handling of incoming debezium change events |
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.44.20
version=0.44.21
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ open class S3StorageOperations(
private val s3FilenameTemplateManager: S3FilenameTemplateManager = S3FilenameTemplateManager()

private val partCounts: ConcurrentMap<String, AtomicInteger> = ConcurrentHashMap()
private val objectNameByPrefix: ConcurrentMap<String, Set<String>> = ConcurrentHashMap()

override fun getBucketObjectPath(
namespace: String?,
Expand Down Expand Up @@ -167,6 +168,32 @@ open class S3StorageOperations(
* @return the uploaded filename, which is different from the serialized buffer filename
* </extension></partId>
*/
@VisibleForTesting
fun getFileName(
objectPath: String,
recordsData: SerializableBuffer,
): String {
var fullObjectKey: String
do {
val partId: String = getPartId(objectPath)
val fileExtension: String = getExtension(recordsData.filename)
fullObjectKey =
if (!s3Config.fileNamePattern.isNullOrBlank()) {
s3FilenameTemplateManager.applyPatternToFilename(
S3FilenameTemplateParameterObject.builder()
.partId(partId)
.recordsData(recordsData)
.objectPath(objectPath)
.fileExtension(fileExtension)
.fileNamePattern(s3Config.fileNamePattern)
.build(),
)
} else {
objectPath + partId + fileExtension
}
} while (objectNameByPrefix.getValue(objectPath).contains(fullObjectKey))
return fullObjectKey
}
@Throws(IOException::class)
private fun loadDataIntoBucket(
objectPath: String,
Expand All @@ -175,22 +202,7 @@ open class S3StorageOperations(
): String {
val partSize: Long = DEFAULT_PART_SIZE.toLong()
val bucket: String? = s3Config.bucketName
val partId: String = getPartId(objectPath)
val fileExtension: String = getExtension(recordsData.filename)
val fullObjectKey: String =
if (!s3Config.fileNamePattern.isNullOrBlank()) {
s3FilenameTemplateManager.applyPatternToFilename(
S3FilenameTemplateParameterObject.builder()
.partId(partId)
.recordsData(recordsData)
.objectPath(objectPath)
.fileExtension(fileExtension)
.fileNamePattern(s3Config.fileNamePattern)
.build(),
)
} else {
objectPath + partId + fileExtension
}
val fullObjectKey: String = getFileName(objectPath, recordsData)
val metadata: MutableMap<String, String> = HashMap()
for (blobDecorator: BlobDecorator in blobDecorators) {
blobDecorator.updateMetadata(metadata, getMetadataMapping())
Expand Down Expand Up @@ -263,31 +275,14 @@ open class S3StorageOperations(
) {
AtomicInteger(0)
}

if (partCount.get() == 0) {
var objects: ObjectListing?
var objectCount = 0

val bucket: String? = s3Config.bucketName
objects = s3Client.listObjects(bucket, objectPath)

if (objects != null) {
objectCount += objects.objectSummaries.size
while (objects != null && objects.nextMarker != null) {
objects =
s3Client.listObjects(
ListObjectsRequest()
.withBucketName(bucket)
.withPrefix(objectPath)
.withMarker(objects.nextMarker),
)
if (objects != null) {
objectCount += objects.objectSummaries.size
}
}
objectNameByPrefix.computeIfAbsent(
objectPath,
) {
var objectList: Set<String> = setOf()
forObjectsByPage(objectPath) { objectSummaries ->
objectList = objectList + objectSummaries.map { it.key }
}

partCount.set(objectCount)
objectList
}

return partCount.getAndIncrement().toString()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import io.airbyte.cdk.integrations.destination.s3.jsonschema.JsonRecordIdentityM
import io.airbyte.commons.jackson.MoreMappers

class JsonRecordParquetPreprocessor : JsonRecordIdentityMapper() {
private fun mapCommon(record: JsonNode?, matchingOption: ObjectNode): JsonNode? {
private fun mapCommon(record: JsonNode?, matchingOption: ObjectNode): ObjectNode {
val newObj = MoreMappers.initMapper().createObjectNode()

val propertyName = JsonSchemaParquetPreprocessor.typeFieldName(matchingOption)
Expand All @@ -24,7 +24,7 @@ class JsonRecordParquetPreprocessor : JsonRecordIdentityMapper() {
return newObj
}

override fun mapUnion(record: JsonNode?, schema: ObjectNode): JsonNode? {
override fun mapUnion(record: JsonNode?, schema: ObjectNode): ObjectNode? {
if (record == null || record.isNull) {
return null
}
Expand All @@ -35,7 +35,7 @@ class JsonRecordParquetPreprocessor : JsonRecordIdentityMapper() {
return mapCommon(record, matchingOption)
}

override fun mapCombined(record: JsonNode?, schema: ObjectNode): JsonNode? {
override fun mapCombined(record: JsonNode?, schema: ObjectNode): ObjectNode? {
if (record == null || record.isNull) {
return null
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import com.amazonaws.services.s3.model.ListObjectsRequest
import com.amazonaws.services.s3.model.ObjectListing
import com.amazonaws.services.s3.model.S3ObjectSummary
import io.airbyte.cdk.integrations.destination.NamingConventionTransformer
import io.airbyte.cdk.integrations.destination.record_buffer.SerializableBuffer
import io.airbyte.cdk.integrations.destination.s3.util.S3NameTransformer
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
Expand All @@ -23,6 +24,7 @@ import org.junit.jupiter.api.Test
import org.mockito.ArgumentCaptor
import org.mockito.ArgumentMatchers
import org.mockito.Mockito
import org.mockito.kotlin.eq

class S3StorageOperationsTest {

Expand All @@ -31,7 +33,9 @@ class S3StorageOperationsTest {
private const val FAKE_BUCKET_PATH = "fake-bucketPath"
private const val NAMESPACE = "namespace"
private const val STREAM_NAME = "stream_name1"
private const val OBJECT_TO_DELETE = "$NAMESPACE/$STREAM_NAME/2022_04_04_123456789_0.csv.gz"
private const val OBJECT_PREFIX = "$NAMESPACE/$STREAM_NAME/2022_04_04_123456789_"
private const val OBJECT_EXTENSION = ".csv.gz"
private const val OBJECT_TO_DELETE = "${OBJECT_PREFIX}1$OBJECT_EXTENSION"
}

private lateinit var s3Client: AmazonS3
Expand Down Expand Up @@ -74,6 +78,15 @@ class S3StorageOperationsTest {
),
)
.thenReturn(results)
Mockito.`when`(
s3Client.listObjects(
eq(BUCKET_NAME),
ArgumentMatchers.any(
String::class.java,
),
),
)
.thenReturn(results)

val s3Config =
S3DestinationConfig.create(BUCKET_NAME, FAKE_BUCKET_PATH, "fake-region")
Expand Down Expand Up @@ -210,4 +223,22 @@ class S3StorageOperationsTest {
assertEquals("1", s3StorageOperations.getPartId(FAKE_BUCKET_PATH))
assertEquals("0", s3StorageOperations.getPartId("other_path"))
}

@Test
fun testGetFileName() {
val recordsData =
Mockito.mock(
SerializableBuffer::class.java,
)
Mockito.`when`(recordsData.filename).thenReturn(".csv.gz")
assertEquals(
OBJECT_PREFIX + 0 + OBJECT_EXTENSION,
s3StorageOperations.getFileName(OBJECT_PREFIX, recordsData)
)
// 1 is skipped because it's already existing
assertEquals(
OBJECT_PREFIX + 2 + OBJECT_EXTENSION,
s3StorageOperations.getFileName(OBJECT_PREFIX, recordsData)
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.44.20'
cdkVersionRequired = '0.44.21'
features = ['db-destinations', 's3-destinations']
useLocalCdk = false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ data:
connectorSubtype: file
connectorType: destination
definitionId: 4816b78f-1489-44c1-9060-4b19d5fa9362
dockerImageTag: 1.0.4
dockerImageTag: 1.0.5
dockerRepository: airbyte/destination-s3
githubIssueLabel: destination-s3
icon: s3.svg
Expand Down
3 changes: 2 additions & 1 deletion docs/integrations/destinations/s3.md
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,8 @@ To see connector limitations, or troubleshoot your S3 connector, see more [in ou
<summary>Expand to review</summary>

| Version | Date | Pull Request | Subject |
| :------ | :--------- | :--------------------------------------------------------- | :--------------------------------------------------------------------------------------------------------------------------------------------------- |
|:--------|:-----------|:-----------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------|
| 1.0.5 | 2024-09-05 | [45143](https://github.com/airbytehq/airbyte/pull/45143) | don't overwrite (and delete) existing files, skip indexes instead |
| 1.0.4 | 2024-08-30 | [44933](https://github.com/airbytehq/airbyte/pull/44933) | Fix: Avro/Parquet: handle empty schemas in nested objects/lists |
| 1.0.3 | 2024-08-20 | [44476](https://github.com/airbytehq/airbyte/pull/44476) | Increase message parsing limit to 100mb |
| 1.0.2 | 2024-08-19 | [44401](https://github.com/airbytehq/airbyte/pull/44401) | Fix: S3 Avro/Parquet: handle nullable top-level schema |
Expand Down

0 comments on commit 6730a3b

Please sign in to comment.