-
Notifications
You must be signed in to change notification settings - Fork 4.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(Platform): update actor configuration when receiving control messages from connectors during sync #19811
Changes from 52 commits
736ac57
715c170
2df5039
59122d7
7f5a4e7
2637e62
289672d
9c2bded
1ebed55
6ec3db5
1bad8e9
753dafe
ab8e2fc
de0d638
d159366
0f1c122
519b920
4ef8b91
24c2968
dda5861
c30e8f6
51f1976
8203432
bff1d44
9563ddc
44be06d
5227c73
f2ec8dd
329f641
b94a8f8
6fbbfbe
b486f6d
2710f27
adabd35
27ebbfb
2fc87c6
d88f12f
ef656dc
a8a0496
8a67e13
8d3deab
0f81248
8b64f4a
e4e3584
875228e
4e6ff2e
01e3aee
6ecac44
c20f61e
b3a443c
079c25d
e7f4736
d6eb05f
860a478
87c8fe2
beb9f70
e1261d8
ad8d6b0
6e42699
6554039
7a86e6e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,6 +9,7 @@ | |
import com.auth0.jwt.algorithms.Algorithm; | ||
import com.google.auth.oauth2.ServiceAccountCredentials; | ||
import io.airbyte.api.client.AirbyteApiClient; | ||
import io.airbyte.api.client.generated.DestinationApi; | ||
import io.airbyte.api.client.generated.SourceApi; | ||
import io.airbyte.api.client.invoker.generated.ApiClient; | ||
import io.airbyte.commons.temporal.config.WorkerMode; | ||
|
@@ -58,7 +59,7 @@ public ApiClient apiClient(@Value("${airbyte.internal.api.auth-header.name}") fi | |
} | ||
|
||
@Singleton | ||
public AirbyteApiClient airbyteApiClient(ApiClient apiClient) { | ||
public AirbyteApiClient airbyteApiClient(final ApiClient apiClient) { | ||
return new AirbyteApiClient(apiClient); | ||
} | ||
|
||
|
@@ -67,6 +68,11 @@ public SourceApi sourceApi(final ApiClient apiClient) { | |
return new SourceApi(apiClient); | ||
} | ||
|
||
@Singleton | ||
public DestinationApi destinationApi(final ApiClient apiClient) { | ||
return new DestinationApi(apiClient); | ||
} | ||
Comment on lines
+73
to
+76
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added so we have the singleton available for injection directly - followed what was already being done for |
||
|
||
@Singleton | ||
public HttpClient httpClient() { | ||
return HttpClient.newHttpClient(); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -26,6 +26,7 @@ | |
import io.airbyte.config.WorkerDestinationConfig; | ||
import io.airbyte.config.WorkerSourceConfig; | ||
import io.airbyte.metrics.lib.ApmTraceUtils; | ||
import io.airbyte.protocol.models.AirbyteControlMessage; | ||
import io.airbyte.protocol.models.AirbyteMessage; | ||
import io.airbyte.protocol.models.AirbyteMessage.Type; | ||
import io.airbyte.protocol.models.AirbyteRecordMessage; | ||
|
@@ -36,6 +37,7 @@ | |
import io.airbyte.workers.WorkerUtils; | ||
import io.airbyte.workers.exception.RecordSchemaValidationException; | ||
import io.airbyte.workers.exception.WorkerException; | ||
import io.airbyte.workers.helper.ConnectorConfigUpdater; | ||
import io.airbyte.workers.helper.FailureHelper; | ||
import io.airbyte.workers.helper.ThreadedTimeTracker; | ||
import io.airbyte.workers.internal.AirbyteDestination; | ||
|
@@ -98,6 +100,7 @@ public class DefaultReplicationWorker implements ReplicationWorker { | |
private final AtomicBoolean hasFailed; | ||
private final RecordSchemaValidator recordSchemaValidator; | ||
private final WorkerMetricReporter metricReporter; | ||
private final ConnectorConfigUpdater connectorConfigUpdater; | ||
private final boolean fieldSelectionEnabled; | ||
|
||
public DefaultReplicationWorker(final String jobId, | ||
|
@@ -108,6 +111,7 @@ public DefaultReplicationWorker(final String jobId, | |
final MessageTracker messageTracker, | ||
final RecordSchemaValidator recordSchemaValidator, | ||
final WorkerMetricReporter metricReporter, | ||
final ConnectorConfigUpdater connectorConfigUpdater, | ||
final boolean fieldSelectionEnabled) { | ||
this.jobId = jobId; | ||
this.attempt = attempt; | ||
|
@@ -118,6 +122,7 @@ public DefaultReplicationWorker(final String jobId, | |
this.executors = Executors.newFixedThreadPool(2); | ||
this.recordSchemaValidator = recordSchemaValidator; | ||
this.metricReporter = metricReporter; | ||
this.connectorConfigUpdater = connectorConfigUpdater; | ||
this.fieldSelectionEnabled = fieldSelectionEnabled; | ||
|
||
this.cancelled = new AtomicBoolean(false); | ||
|
@@ -191,7 +196,7 @@ private void replicate(final Path jobRoot, | |
// note: `whenComplete` is used instead of `exceptionally` so that the original exception is still | ||
// thrown | ||
final CompletableFuture<?> readFromDstThread = CompletableFuture.runAsync( | ||
readFromDstRunnable(destination, cancelled, messageTracker, mdc, timeTracker), | ||
readFromDstRunnable(destination, cancelled, messageTracker, connectorConfigUpdater, mdc, timeTracker, destinationConfig), | ||
executors) | ||
.whenComplete((msg, ex) -> { | ||
if (ex != null) { | ||
|
@@ -212,10 +217,12 @@ private void replicate(final Path jobRoot, | |
cancelled, | ||
mapper, | ||
messageTracker, | ||
connectorConfigUpdater, | ||
mdc, | ||
recordSchemaValidator, | ||
metricReporter, | ||
timeTracker, | ||
sourceConfig, | ||
fieldSelectionEnabled), | ||
executors) | ||
.whenComplete((msg, ex) -> { | ||
|
@@ -253,8 +260,10 @@ private void replicate(final Path jobRoot, | |
private static Runnable readFromDstRunnable(final AirbyteDestination destination, | ||
final AtomicBoolean cancelled, | ||
final MessageTracker messageTracker, | ||
final ConnectorConfigUpdater connectorConfigUpdater, | ||
final Map<String, String> mdc, | ||
final ThreadedTimeTracker timeHolder) { | ||
final ThreadedTimeTracker timeHolder, | ||
final WorkerDestinationConfig destinationConfig) { | ||
return () -> { | ||
MDC.setContextMap(mdc); | ||
LOGGER.info("Destination output thread started."); | ||
|
@@ -267,8 +276,18 @@ private static Runnable readFromDstRunnable(final AirbyteDestination destination | |
throw new DestinationException("Destination process read attempt failed", e); | ||
} | ||
if (messageOptional.isPresent()) { | ||
LOGGER.info("State in DefaultReplicationWorker from destination: {}", messageOptional.get()); | ||
messageTracker.acceptFromDestination(messageOptional.get()); | ||
final AirbyteMessage message = messageOptional.get(); | ||
LOGGER.info("State in DefaultReplicationWorker from destination: {}", message); | ||
|
||
messageTracker.acceptFromDestination(message); | ||
|
||
try { | ||
if (message.getType() == Type.CONTROL) { | ||
acceptDstControlMessage(destinationConfig, message.getControl(), connectorConfigUpdater); | ||
} | ||
} catch (final Exception e) { | ||
LOGGER.error("Error updating destination configuration", e); | ||
} | ||
} | ||
} | ||
timeHolder.trackDestinationWriteEndTime(); | ||
|
@@ -300,10 +319,12 @@ private static Runnable readFromSrcAndWriteToDstRunnable(final AirbyteSource sou | |
final AtomicBoolean cancelled, | ||
final AirbyteMapper mapper, | ||
final MessageTracker messageTracker, | ||
final ConnectorConfigUpdater connectorConfigUpdater, | ||
final Map<String, String> mdc, | ||
final RecordSchemaValidator recordSchemaValidator, | ||
final WorkerMetricReporter metricReporter, | ||
final ThreadedTimeTracker timeHolder, | ||
final WorkerSourceConfig sourceConfig, | ||
final boolean fieldSelectionEnabled) { | ||
return () -> { | ||
MDC.setContextMap(mdc); | ||
|
@@ -333,6 +354,14 @@ private static Runnable readFromSrcAndWriteToDstRunnable(final AirbyteSource sou | |
|
||
messageTracker.acceptFromSource(message); | ||
|
||
try { | ||
if (message.getType() == Type.CONTROL) { | ||
acceptSrcControlMessage(sourceConfig, message.getControl(), connectorConfigUpdater); | ||
} | ||
} catch (final Exception e) { | ||
LOGGER.error("Error updating source configuration", e); | ||
} | ||
|
||
try { | ||
if (message.getType() == Type.RECORD || message.getType() == Type.STATE) { | ||
destination.accept(message); | ||
|
@@ -391,6 +420,22 @@ private static Runnable readFromSrcAndWriteToDstRunnable(final AirbyteSource sou | |
}; | ||
} | ||
|
||
private static void acceptSrcControlMessage(final WorkerSourceConfig sourceConfig, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we can simplify this method signature by passing in the id instead of the entire config There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this simplification also extends to the various runnable method interfaces e.g There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍🏻 updated in d6eb05f |
||
final AirbyteControlMessage controlMessage, | ||
final ConnectorConfigUpdater connectorConfigUpdater) { | ||
if (controlMessage.getType() == AirbyteControlMessage.Type.CONNECTOR_CONFIG) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. are there other kinds of control messages today? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nope, this is the only one for now! |
||
connectorConfigUpdater.updateSource(sourceConfig.getSourceId(), controlMessage.getConnectorConfig().getConfig()); | ||
} | ||
} | ||
|
||
private static void acceptDstControlMessage(final WorkerDestinationConfig destinationConfig, | ||
final AirbyteControlMessage controlMessage, | ||
final ConnectorConfigUpdater connectorConfigUpdater) { | ||
if (controlMessage.getType() == AirbyteControlMessage.Type.CONNECTOR_CONFIG) { | ||
connectorConfigUpdater.updateDestination(destinationConfig.getDestinationId(), controlMessage.getConnectorConfig().getConfig()); | ||
} | ||
} | ||
|
||
private ReplicationOutput getReplicationOutput(final StandardSyncInput syncInput, | ||
final WorkerDestinationConfig destinationConfig, | ||
final AtomicReference<FailureReason> replicationRunnableFailureRef, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,87 @@ | ||
/* | ||
* Copyright (c) 2022 Airbyte, Inc., all rights reserved. | ||
*/ | ||
|
||
package io.airbyte.workers.helper; | ||
|
||
import com.google.common.hash.Hashing; | ||
import io.airbyte.api.client.AirbyteApiClient; | ||
import io.airbyte.api.client.generated.DestinationApi; | ||
import io.airbyte.api.client.generated.SourceApi; | ||
import io.airbyte.api.client.model.generated.DestinationIdRequestBody; | ||
import io.airbyte.api.client.model.generated.DestinationRead; | ||
import io.airbyte.api.client.model.generated.DestinationUpdate; | ||
import io.airbyte.api.client.model.generated.SourceIdRequestBody; | ||
import io.airbyte.api.client.model.generated.SourceRead; | ||
import io.airbyte.api.client.model.generated.SourceUpdate; | ||
import io.airbyte.commons.json.Jsons; | ||
import io.airbyte.protocol.models.Config; | ||
import java.nio.charset.StandardCharsets; | ||
import java.util.UUID; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
/** | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. +1 for the javadocs! |
||
* Helper class for workers to persist updates to Source/Destination configs emitted from | ||
* AirbyteControlMessages. | ||
* | ||
* This is in order to support connectors updating configs when running commands, which is specially | ||
* useful for migrating configuration to a new version or for enabling connectors that require | ||
* single-use or short-lived OAuth tokens. | ||
*/ | ||
public class ConnectorConfigUpdater { | ||
|
||
private static final Logger LOGGER = LoggerFactory.getLogger(ConnectorConfigUpdater.class); | ||
|
||
private final SourceApi sourceApi; | ||
private final DestinationApi destinationApi; | ||
|
||
public ConnectorConfigUpdater(final SourceApi sourceApi, final DestinationApi destinationApi) { | ||
this.sourceApi = sourceApi; | ||
this.destinationApi = destinationApi; | ||
} | ||
|
||
/** | ||
* Updates the Source from a sync job ID with the provided Configuration. Secrets and OAuth | ||
* parameters will be masked when saving. | ||
*/ | ||
public void updateSource(final UUID sourceId, final Config config) { | ||
final SourceRead source = AirbyteApiClient.retryWithJitter( | ||
() -> sourceApi.getSource(new SourceIdRequestBody().sourceId(sourceId)), | ||
"get source"); | ||
|
||
final SourceRead updatedSource = AirbyteApiClient.retryWithJitter( | ||
() -> sourceApi | ||
.updateSource(new SourceUpdate() | ||
.sourceId(sourceId) | ||
.name(source.getName()) | ||
.connectionConfiguration(Jsons.jsonNode(config.getAdditionalProperties()))), | ||
"update source"); | ||
|
||
LOGGER.info("Persisted updated configuration for source {}. New config hash: {}.", sourceId, | ||
Hashing.sha256().hashString(updatedSource.getConnectionConfiguration().asText(), StandardCharsets.UTF_8)); | ||
|
||
} | ||
|
||
/** | ||
* Updates the Destination from a sync job ID with the provided Configuration. Secrets and OAuth | ||
* parameters will be masked when saving. | ||
*/ | ||
public void updateDestination(final UUID destinationId, final Config config) { | ||
final DestinationRead destination = AirbyteApiClient.retryWithJitter( | ||
() -> destinationApi.getDestination(new DestinationIdRequestBody().destinationId(destinationId)), | ||
"get destination"); | ||
|
||
final DestinationRead updatedDestination = AirbyteApiClient.retryWithJitter( | ||
() -> destinationApi | ||
.updateDestination(new DestinationUpdate() | ||
.destinationId(destinationId) | ||
.name(destination.getName()) | ||
.connectionConfiguration(Jsons.jsonNode(config.getAdditionalProperties()))), | ||
"update destination"); | ||
|
||
LOGGER.info("Persisted updated configuration for destination {}. New config hash: {}.", destinationId, | ||
Hashing.sha256().hashString(updatedDestination.getConnectionConfiguration().asText(), StandardCharsets.UTF_8)); | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,6 +27,7 @@ | |
import java.io.OutputStreamWriter; | ||
import java.nio.file.Path; | ||
import java.util.Iterator; | ||
import java.util.List; | ||
import java.util.Optional; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.concurrent.atomic.AtomicBoolean; | ||
|
@@ -81,8 +82,9 @@ public void start(final WorkerDestinationConfig destinationConfig, final Path jo | |
|
||
writer = messageWriterFactory.createWriter(new BufferedWriter(new OutputStreamWriter(destinationProcess.getOutputStream(), Charsets.UTF_8))); | ||
|
||
final List<Type> acceptedMessageTypes = List.of(Type.STATE, Type.TRACE, Type.CONTROL); | ||
messageIterator = streamFactory.create(IOs.newBufferedReader(destinationProcess.getInputStream())) | ||
.filter(message -> message.getType() == Type.STATE || message.getType() == Type.TRACE) | ||
.filter(message -> acceptedMessageTypes.contains(message.getType())) | ||
.iterator(); | ||
Comment on lines
+90
to
93
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Enable reading control messages from destinations |
||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,6 +27,7 @@ | |
import java.time.Duration; | ||
import java.time.temporal.ChronoUnit; | ||
import java.util.Iterator; | ||
import java.util.List; | ||
import java.util.Optional; | ||
import java.util.concurrent.TimeUnit; | ||
import org.slf4j.Logger; | ||
|
@@ -86,9 +87,10 @@ public void start(final WorkerSourceConfig sourceConfig, final Path jobRoot) thr | |
|
||
logInitialStateAsJSON(sourceConfig); | ||
|
||
final List<Type> acceptedMessageTypes = List.of(Type.RECORD, Type.STATE, Type.TRACE, Type.CONTROL); | ||
messageIterator = streamFactory.create(IOs.newBufferedReader(sourceProcess.getInputStream())) | ||
.peek(message -> heartbeatMonitor.beat()) | ||
.filter(message -> message.getType() == Type.RECORD || message.getType() == Type.STATE || message.getType() == Type.TRACE) | ||
.filter(message -> acceptedMessageTypes.contains(message.getType())) | ||
Comment on lines
+95
to
+98
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Enable reading control messages from sources |
||
.iterator(); | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
curious, what is this for?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was necessary in order to move the ApiClientBeanFactory into commons-worker (to reuse it in both airbyte-workers and container-orchestrator). It does some token generation that needs this dependency.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This might be an issue, we split up
airbyte-commons-worker
recently due to some initialization issues of the beans when we added cron.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gosusnp Could you expand a bit on what the issue might be? My change moved the
AirbyteAPiClientBeanFactory
fromairbyte-workers
intoairbyte-commons-worker
. I don't seeairbyte-workers
norairbyte-commons-worker
being a dependency ofairbyte-cron
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was looking for an issue to link for better context but I cannot find it...
I think the problem we had is that micronaut discovers all singleton upon starts, so by having some services in a shared lib led us to start more services than intended. For example, the cron would pick up some beans from the workers and would effectively behave as an extra worker.
@jdpgrailsdev or @benmoriceau might have better context since they were more closely working on this.