Skip to content

Commit

Permalink
intermediate merge (#3597)
Browse files Browse the repository at this point in the history
* test checkpointing

* code review feedback

* migrate copy destinations

* meilisearch

* Checkpointing: migrate remaining destinations (#3551)
  • Loading branch information
cgardens committed May 25, 2021
1 parent 4f912f3 commit bdb60ab
Show file tree
Hide file tree
Showing 13 changed files with 357 additions and 263 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package io.airbyte.integrations.base;

import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteMessage.Type;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Minimal abstract class intended to handle the case where the destination can commit records every
* time a state message appears. This class does that commit and then immediately emits the state
* message. This should only be used in cases when the commit is relatively cheap. immediately.
*/
public abstract class CommitOnStateAirbyteMessageConsumer extends FailureTrackingAirbyteMessageConsumer implements AirbyteMessageConsumer {

private static final Logger LOGGER = LoggerFactory.getLogger(CommitOnStateAirbyteMessageConsumer.class);

private final Consumer<AirbyteMessage> outputRecordCollector;

public CommitOnStateAirbyteMessageConsumer(Consumer<AirbyteMessage> outputRecordCollector) {
this.outputRecordCollector = outputRecordCollector;
}

@Override
public void accept(AirbyteMessage message) throws Exception {
if (message.getType() == Type.STATE) {
commit();
outputRecordCollector.accept(message);
}
super.accept(message);
}

public abstract void commit() throws Exception;

}
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@
public class BufferedStreamConsumer extends FailureTrackingAirbyteMessageConsumer implements AirbyteMessageConsumer {

private static final Logger LOGGER = LoggerFactory.getLogger(BufferedStreamConsumer.class);
private static final int BATCH_SIZE = 10000;

private final VoidCallable onStart;
private final RecordWriter recordWriter;
Expand All @@ -81,6 +80,7 @@ public class BufferedStreamConsumer extends FailureTrackingAirbyteMessageConsume
private final CheckedFunction<String, Boolean, Exception> isValidRecord;
private final Map<AirbyteStreamNameNamespacePair, Long> pairToIgnoredRecordCount;
private final Consumer<AirbyteMessage> outputRecordCollector;
private final int queueBatchSize;

private boolean hasStarted;
private boolean hasClosed;
Expand All @@ -93,8 +93,10 @@ public BufferedStreamConsumer(Consumer<AirbyteMessage> outputRecordCollector,
RecordWriter recordWriter,
CheckedConsumer<Boolean, Exception> onClose,
ConfiguredAirbyteCatalog catalog,
CheckedFunction<String, Boolean, Exception> isValidRecord) {
CheckedFunction<String, Boolean, Exception> isValidRecord,
int queueBatchSize) {
this.outputRecordCollector = outputRecordCollector;
this.queueBatchSize = queueBatchSize;
this.hasStarted = false;
this.hasClosed = false;
this.onStart = onStart;
Expand All @@ -103,7 +105,7 @@ public BufferedStreamConsumer(Consumer<AirbyteMessage> outputRecordCollector,
this.catalog = catalog;
this.streamNames = AirbyteStreamNameNamespacePair.fromConfiguredCatalog(catalog);
this.isValidRecord = isValidRecord;
this.buffer = new ArrayList<>(BATCH_SIZE);
this.buffer = new ArrayList<>(queueBatchSize);

this.pairToIgnoredRecordCount = new HashMap<>();
}
Expand Down Expand Up @@ -140,7 +142,7 @@ protected void acceptTracked(AirbyteMessage message) throws Exception {

buffer.add(message);

if (buffer.size() == BATCH_SIZE) {
if (buffer.size() == queueBatchSize) {
flushQueueToDestination();
}
} else if (message.getType() == Type.STATE) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.integrations.base.AirbyteMessageConsumer;
import io.airbyte.integrations.base.CommitOnStateAirbyteMessageConsumer;
import io.airbyte.integrations.base.Destination;
import io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer;
import io.airbyte.integrations.base.IntegrationRunner;
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.integrations.destination.StandardNameTransformer;
Expand Down Expand Up @@ -125,7 +125,7 @@ public AirbyteMessageConsumer getConsumer(JsonNode config,
writeConfigs.put(stream.getStream().getName(), new WriteConfig(printer, tmpPath, finalPath));
}

return new CsvConsumer(writeConfigs, catalog);
return new CsvConsumer(writeConfigs, catalog, outputRecordCollector);
}

/**
Expand Down Expand Up @@ -153,12 +153,13 @@ protected Path getDestinationPath(JsonNode config) {
* successfully, it moves the tmp files to files named by their respective stream. If there are any
* failures, nothing is written.
*/
private static class CsvConsumer extends FailureTrackingAirbyteMessageConsumer {
private static class CsvConsumer extends CommitOnStateAirbyteMessageConsumer {

private final Map<String, WriteConfig> writeConfigs;
private final ConfiguredAirbyteCatalog catalog;

public CsvConsumer(Map<String, WriteConfig> writeConfigs, ConfiguredAirbyteCatalog catalog) {
public CsvConsumer(Map<String, WriteConfig> writeConfigs, ConfiguredAirbyteCatalog catalog, Consumer<AirbyteMessage> outputRecordCollector) {
super(outputRecordCollector);
this.catalog = catalog;
LOGGER.info("initializing consumer.");

Expand Down Expand Up @@ -190,6 +191,13 @@ protected void acceptTracked(AirbyteMessage message) throws Exception {
Jsons.serialize(recordMessage.getData()));
}

@Override
public void commit() throws Exception {
for (WriteConfig writeConfig : writeConfigs.values()) {
writeConfig.getWriter().flush();
}
}

@Override
protected void close(boolean hasFailed) throws IOException {
LOGGER.info("finalizing consumer.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ public class JdbcBufferedConsumerFactory {

private static final Logger LOGGER = LoggerFactory.getLogger(JdbcBufferedConsumerFactory.class);

private static final int MAX_BATCH_SIZE = 10000;

public static AirbyteMessageConsumer create(Consumer<AirbyteMessage> outputRecordCollector,
JdbcDatabase database,
SqlOperations sqlOperations,
Expand All @@ -78,7 +80,8 @@ public static AirbyteMessageConsumer create(Consumer<AirbyteMessage> outputRecor
recordWriterFunction(database, sqlOperations, writeConfigs, catalog),
onCloseFunction(database, sqlOperations, writeConfigs),
catalog,
sqlOperations::isValidData);
sqlOperations::isValidData,
MAX_BATCH_SIZE);
}

private static List<WriteConfig> createWriteConfigs(NamingConventionTransformer namingResolver,
Expand Down

This file was deleted.

Loading

0 comments on commit bdb60ab

Please sign in to comment.