Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce Default Replication Worker Performance Test Harness #20956

Merged
merged 19 commits into from
Jan 5, 2023
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions airbyte-commons-worker/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,16 @@ dependencies {

testAnnotationProcessor platform(libs.micronaut.bom)
testAnnotationProcessor libs.bundles.micronaut.test.annotation.processor
testAnnotationProcessor libs.jmh.annotations

testImplementation libs.bundles.micronaut.test
testImplementation 'com.jayway.jsonpath:json-path:2.7.0'
testImplementation 'org.mockito:mockito-inline:4.7.0'
testImplementation libs.postgresql
testImplementation libs.platform.testcontainers
testImplementation libs.platform.testcontainers.postgresql
testImplementation libs.jmh.core
testImplementation libs.jmh.annotations

testImplementation project(':airbyte-commons-docker')
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,13 +309,14 @@ private static Runnable readFromSrcAndWriteToDstRunnable(final AirbyteSource sou
return () -> {
MDC.setContextMap(mdc);
LOGGER.info("Replication thread started.");
Long recordsRead = 0L;
long recordsRead = 0L;
final Map<AirbyteStreamNameNamespacePair, ImmutablePair<Set<String>, Integer>> validationErrors = new HashMap<>();
final Map<AirbyteStreamNameNamespacePair, List<String>> streamToSelectedFields = new HashMap<>();
if (fieldSelectionEnabled) {
populatedStreamToSelectedFields(catalog, streamToSelectedFields);
}
try {
// can this while be handled by a virtual thread too?
while (!cancelled.get() && !source.isFinished()) {
final Optional<AirbyteMessage> messageOptional;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,8 @@ void testCancellation() throws InterruptedException {
sleep(100);
}

LOGGER.info("total records emitted: {}, total bytes emitted: {}", messageTracker.getTotalRecordsEmitted(), messageTracker.getTotalBytesEmitted());

worker.cancel();
Assertions.assertTimeout(Duration.ofSeconds(5), (Executable) workerThread::join);
assertNotNull(output.get());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.general;

import io.airbyte.config.WorkerDestinationConfig;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.workers.internal.AirbyteDestination;
import java.nio.file.Path;
import java.util.Optional;

/**
* Empty Airbyte Destination. Does nothing with messages. Intended for performance testing.
*/
public class EmptyAirbyteDestination implements AirbyteDestination {

@Override
public void start(WorkerDestinationConfig destinationConfig, Path jobRoot) throws Exception {

}

@Override
public void accept(AirbyteMessage message) throws Exception {

}

@Override
public void notifyEndOfInput() throws Exception {

}

@Override
public boolean isFinished() {
return true;
}

@Override
public int getExitValue() {
return 0;
}

@Override
public Optional<AirbyteMessage> attemptRead() {
return Optional.empty();
}

@Override
public void close() throws Exception {}

@Override
public void cancel() throws Exception {

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.general;

import io.airbyte.config.WorkerSourceConfig;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.workers.internal.AirbyteSource;
import io.airbyte.workers.test_utils.AirbyteMessageUtils;
import java.nio.file.Path;
import java.util.Optional;

/**
* Basic Airbyte Source that emits {@link LimitedAirbyteSource#TOTAL_RECORDS} before finishing.
* Intended for performance testing.
*/
public class LimitedAirbyteSource implements AirbyteSource {

private static final int TOTAL_RECORDS = 1_000_000;

private int currentRecords = 0;

@Override
public void start(WorkerSourceConfig sourceConfig, Path jobRoot) throws Exception {

}

@Override
public boolean isFinished() {
return currentRecords == TOTAL_RECORDS;
}

@Override
public int getExitValue() {
return 0;
}

@Override
public Optional<AirbyteMessage> attemptRead() {
currentRecords++;
return Optional.of(AirbyteMessageUtils.createRecordMessage("s1", "data",
"This is a fairly long sentence to provide some bytes here. More bytes is better as it helps us measure performance."
+ "Random append to prevent dead code generation: " + currentRecords));
}

@Override
public void close() throws Exception {

}

@Override
public void cancel() throws Exception {

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.general;

import io.airbyte.config.JobSyncConfig.NamespaceDefinitionType;
import io.airbyte.config.ReplicationOutput;
import io.airbyte.config.StandardSyncInput;
import io.airbyte.metrics.lib.NotImplementedMetricClient;
import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.JsonSchemaType;
import io.airbyte.protocol.models.SyncMode;
import io.airbyte.workers.RecordSchemaValidator;
import io.airbyte.workers.WorkerMetricReporter;
import io.airbyte.workers.exception.WorkerException;
import io.airbyte.workers.internal.NamespacingMapper;
import io.airbyte.workers.internal.book_keeping.AirbyteMessageTracker;
import java.io.IOException;
import java.nio.file.Path;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import lombok.extern.slf4j.Slf4j;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.Warmup;

@Slf4j
public class ReplicationWorkerPerformanceTest {

/**
* Hook up the DefaultReplicationWorker to a test harness with an insanely quick Source
* {@link LimitedAirbyteSource} and Destination {@link EmptyAirbyteDestination}.
* <p>
* Harness uses Java Micro Benchmark to run the E2E sync a configured number of times. It then
* reports a time distribution for the time taken to run the E2E sync.
* <p>
* Because the reported time does not explicitly include throughput numbers, throughput logging has
* been added. This class is intended to help devs understand the impact of changes on throughput.
* <p>
* To run this, simply run the main method and run the logs.
*/
@Benchmark
// SampleTime = the time taken to run the benchmarked method. Use this because we only care about
// the time taken to sync the entire dataset.
@BenchmarkMode(Mode.SampleTime)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@colesnodgrass unfortunately because these annotations don't have the target annotation type, we cannot fold them into a meta annotation. I think this is fine for now.

// Warming up the JVM stabilises results however takes longer. Skip this for now since we don't need
// that fine a result.
@Warmup(iterations = 0)
// How many runs to do.
@Fork(value = 1)
// Within each run, how many iterations to do.
@Measurement(iterations = 2)
public void executeOneSync() throws InterruptedException {
final var perSource = new LimitedAirbyteSource();
final var perDestination = new EmptyAirbyteDestination();
final var messageTracker = new AirbyteMessageTracker();
final var metricReporter = new WorkerMetricReporter(new NotImplementedMetricClient(), "test-image:0.01");
final var dstNamespaceMapper = new NamespacingMapper(NamespaceDefinitionType.DESTINATION, "", "");
final var validator = new RecordSchemaValidator(Map.of(
new AirbyteStreamNameNamespacePair("s1", null),
CatalogHelpers.fieldsToJsonSchema(io.airbyte.protocol.models.Field.of("data", JsonSchemaType.STRING))));

final var worker = new DefaultReplicationWorker("1", 0,
perSource,
dstNamespaceMapper,
perDestination,
messageTracker,
validator,
metricReporter,
false);
final AtomicReference<ReplicationOutput> output = new AtomicReference<>();
final Thread workerThread = new Thread(() -> {
try {
output.set(worker.run(new StandardSyncInput().withCatalog(new ConfiguredAirbyteCatalog()
.withStreams(List.of(new ConfiguredAirbyteStream().withSyncMode(SyncMode.FULL_REFRESH).withStream(new AirbyteStream().withName("s1"))))),
Path.of("/")));
} catch (final WorkerException e) {
throw new RuntimeException(e);
}
});

workerThread.start();
workerThread.join();
Comment on lines +92 to +93
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this test in the same process, but using different threads? If that's the case, then this also bypasses the stdio streams as well, which also present their own bottlenecks (cc @colesnodgrass)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree. Let me see if I can quickly work something up.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does show that the core serialising isn't the bottleneck like we thought.

final var summary = output.get().getReplicationAttemptSummary();
final var mbRead = summary.getBytesSynced() / 1_000_000;
final var timeTakenSec = (summary.getEndTime() - summary.getStartTime()) / 1000.0;
log.info("MBs read: {}, Time taken sec: {}, MB/s: {}", mbRead, timeTakenSec, mbRead / timeTakenSec);
}

public static void main(String[] args) throws IOException {
// Run this main class to start benchmarking.
org.openjdk.jmh.Main.main(args);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.general;

import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.workers.internal.AirbyteMapper;

/**
* Stub mapper testing what happens without any mapping.
*/
public class StubAirbyteMapper implements AirbyteMapper {
davinchia marked this conversation as resolved.
Show resolved Hide resolved

@Override
public ConfiguredAirbyteCatalog mapCatalog(ConfiguredAirbyteCatalog catalog) {
return null;
}

@Override
public AirbyteMessage mapMessage(AirbyteMessage message) {
return message;
}

}
3 changes: 3 additions & 0 deletions deps.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ fasterxml_version = "2.14.0"
flyway = "7.14.0"
glassfish_version = "2.31"
hikaricp = "5.0.1"
jmh = "1.36"
jooq = "3.13.4"
junit-jupiter = "5.9.1"
log4j = "2.17.2"
Expand Down Expand Up @@ -68,6 +69,8 @@ jackson-datatype = { module = "com.fasterxml.jackson.datatype:jackson-datatype-j
java-dogstatsd-client = { module = "com.datadoghq:java-dogstatsd-client", version = "4.1.0" }
javax-databind = { module = "javax.xml.bind:jaxb-api", version = "2.4.0-b180830.0359" }
jcl-over-slf4j = { module = "org.slf4j:jcl-over-slf4j", version.ref = "slf4j" }
jmh-core = { module = "org.openjdk.jmh:jmh-core", version.ref = "jmh" }
jmh-annotations = { module = "org.openjdk.jmh:jmh-generator-annprocess", version.ref = "jmh" }
jooq = { module = "org.jooq:jooq", version.ref = "jooq" }
jooq-codegen = { module = "org.jooq:jooq-codegen", version.ref = "jooq" }
jooq-meta = { module = "org.jooq:jooq-meta", version.ref = "jooq" }
Expand Down