Skip to content

Commit

Permalink
process config control messages during check and discover (#20894)
Browse files Browse the repository at this point in the history
* track latest config message

* pass new config as part of outputs

* persist new config

* persist config as the messages come through, dont set output

* clean up old implementation

* accept control messages for destinations

* get api client from micronaut

* mask instance-wide oauth params when updating configs

* defaultreplicationworker tests

* formatting

* tests for source/destination handlers

* rm todo

* refactor test a bit to fix pmd

* fix pmd

* fix test

* add PersistConfigHelperTest

* update message tracker comment

* fix pmd

* format

* move ApiClientBeanFactory to commons-worker, use in container-orchestrator

* pull out config updating to separate methods

* add jitter

* rename PersistConfigHelper -> UpdateConnectorConfigHelper, docs

* fix exception type

* fmt

* move message type check into runnable

* formatting

* pass api client env vars to container orchestrator

* pass micronaut envs to container orchestrator

* print stacktrace for debugging

* different api host for container orchestrator

* fix default env var

* format

* fix errors after merge

* set source and destination actor id as part of the sync input

* fix: get destination definition

* fix null ptr

* remove "actor" from naming

* fix missing change from rename

* revert ContainerOrchestratorConfigBeanFactory changes

* inject sourceapi/destinationapi directly rather than airbyteapiclient

* UpdateConnectorConfigHelper -> ConnectorConfigUpdater

* rm log

* fix test

* dont fail on config update error

* process control messages for discover jobs

* process control messages for CHECK

* persist config updates on check_connection_for_update

* get last config message rather than first

* fix pmd

* fix failing tests

* add tests

* source id not required for check connection (create case)

* suppress pmd warning for BusyWait literal

* source id not required for checkc onnection (create case) (p2)

* pass id, not full config to runnables/accept control message

* add new config required for api client

* add test file

* remove debugging logs

* rename method (getLast -> getMostRecent)

* rm version check (re-added this in by mistake on merge)

* fix test compatibility

* simplify
  • Loading branch information
pedroslopez authored Jan 6, 2023
1 parent 31c28f5 commit 01e256f
Show file tree
Hide file tree
Showing 19 changed files with 265 additions and 38 deletions.
4 changes: 4 additions & 0 deletions airbyte-api/src/main/openapi/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2690,6 +2690,8 @@ components:
- connectionConfiguration
- workspaceId
properties:
sourceId:
$ref: "#/components/schemas/SourceId"
sourceDefinitionId:
$ref: "#/components/schemas/SourceDefinitionId"
connectionConfiguration:
Expand Down Expand Up @@ -3000,6 +3002,8 @@ components:
- destinationDefinitionId
- connectionConfiguration
properties:
destinationId:
$ref: "#/components/schemas/DestinationId"
destinationDefinitionId:
$ref: "#/components/schemas/DestinationDefinitionId"
connectionConfiguration:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,10 @@ public TemporalResponse<ConnectorJobOutput> submitCheckConnection(final UUID job
.withDockerImage(config.getDockerImage())
.withProtocolVersion(config.getProtocolVersion())
.withIsCustomConnector(config.getIsCustomConnector());
final StandardCheckConnectionInput input = new StandardCheckConnectionInput().withConnectionConfiguration(config.getConnectionConfiguration());
final StandardCheckConnectionInput input = new StandardCheckConnectionInput()
.withActorType(config.getActorType())
.withActorId(config.getActorId())
.withConnectionConfiguration(config.getConnectionConfiguration());

return execute(jobRunConfig,
() -> getWorkflowStub(CheckConnectionWorkflow.class, TemporalJobType.CHECK_CONNECTION).run(jobRunConfig, launcherConfig, input));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import io.airbyte.config.StandardSyncInput;
import io.airbyte.config.WorkerDestinationConfig;
import io.airbyte.config.WorkerSourceConfig;
import io.airbyte.protocol.models.AirbyteControlConnectorConfigMessage;
import io.airbyte.protocol.models.AirbyteControlMessage;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteMessage.Type;
import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair;
Expand Down Expand Up @@ -109,6 +111,14 @@ public static WorkerDestinationConfig syncToWorkerDestinationConfig(final Standa
.withState(sync.getState());
}

public static Optional<AirbyteControlConnectorConfigMessage> getMostRecentConfigControlMessage(final Map<Type, List<AirbyteMessage>> messagesByType) {
return messagesByType.getOrDefault(Type.CONTROL, new ArrayList<>()).stream()
.map(AirbyteMessage::getControl)
.filter(control -> control.getType() == AirbyteControlMessage.Type.CONNECTOR_CONFIG)
.map(AirbyteControlMessage::getConnectorConfig)
.reduce((first, second) -> second);
}

public static ConnectorJobOutput getJobFailureOutputOrThrow(final OutputType outputType,
final Map<Type, List<AirbyteMessage>> messagesByType,
final String defaultErrorMessage)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@
import io.airbyte.config.StandardCheckConnectionOutput.Status;
import io.airbyte.metrics.lib.ApmTraceUtils;
import io.airbyte.protocol.models.AirbyteConnectionStatus;
import io.airbyte.protocol.models.AirbyteControlConnectorConfigMessage;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteMessage.Type;
import io.airbyte.workers.WorkerConstants;
import io.airbyte.workers.WorkerUtils;
import io.airbyte.workers.exception.WorkerException;
import io.airbyte.workers.helper.ConnectorConfigUpdater;
import io.airbyte.workers.internal.AirbyteStreamFactory;
import io.airbyte.workers.internal.DefaultAirbyteStreamFactory;
import io.airbyte.workers.process.IntegrationLauncher;
Expand All @@ -43,18 +45,21 @@ public class DefaultCheckConnectionWorker implements CheckConnectionWorker {
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultCheckConnectionWorker.class);

private final IntegrationLauncher integrationLauncher;
private final ConnectorConfigUpdater connectorConfigUpdater;
private final AirbyteStreamFactory streamFactory;

private Process process;

public DefaultCheckConnectionWorker(final IntegrationLauncher integrationLauncher,
final ConnectorConfigUpdater connectorConfigUpdater,
final AirbyteStreamFactory streamFactory) {
this.integrationLauncher = integrationLauncher;
this.connectorConfigUpdater = connectorConfigUpdater;
this.streamFactory = streamFactory;
}

public DefaultCheckConnectionWorker(final IntegrationLauncher integrationLauncher) {
this(integrationLauncher, new DefaultAirbyteStreamFactory());
public DefaultCheckConnectionWorker(final IntegrationLauncher integrationLauncher, final ConnectorConfigUpdater connectorConfigUpdater) {
this(integrationLauncher, connectorConfigUpdater, new DefaultAirbyteStreamFactory());
}

@Trace(operationName = WORKER_OPERATION_NAME)
Expand Down Expand Up @@ -84,6 +89,21 @@ public ConnectorJobOutput run(final StandardCheckConnectionInput input, final Pa
.map(AirbyteMessage::getConnectionStatus)
.findFirst();

if (input.getActorId() != null && input.getActorType() != null) {
final Optional<AirbyteControlConnectorConfigMessage> optionalConfigMsg = WorkerUtils.getMostRecentConfigControlMessage(messagesByType);
optionalConfigMsg.ifPresent(
configMessage -> {
switch (input.getActorType()) {
case SOURCE -> connectorConfigUpdater.updateSource(
input.getActorId(),
configMessage.getConfig());
case DESTINATION -> connectorConfigUpdater.updateDestination(
input.getActorId(),
configMessage.getConfig());
}
});
}

if (status.isPresent() && exitCode == 0) {
final StandardCheckConnectionOutput output = new StandardCheckConnectionOutput()
.withStatus(Enums.convertTo(status.get().getStatus(), Status.class))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.metrics.lib.ApmTraceUtils;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.AirbyteControlConnectorConfigMessage;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteMessage.Type;
import io.airbyte.workers.WorkerConstants;
import io.airbyte.workers.WorkerUtils;
import io.airbyte.workers.exception.WorkerException;
import io.airbyte.workers.helper.ConnectorConfigUpdater;
import io.airbyte.workers.internal.AirbyteStreamFactory;
import io.airbyte.workers.internal.DefaultAirbyteStreamFactory;
import io.airbyte.workers.process.IntegrationLauncher;
Expand All @@ -48,20 +50,24 @@ public class DefaultDiscoverCatalogWorker implements DiscoverCatalogWorker {

private final IntegrationLauncher integrationLauncher;
private final AirbyteStreamFactory streamFactory;
private final ConnectorConfigUpdater connectorConfigUpdater;

private volatile Process process;

public DefaultDiscoverCatalogWorker(final ConfigRepository configRepository,
final IntegrationLauncher integrationLauncher,
final ConnectorConfigUpdater connectorConfigUpdater,
final AirbyteStreamFactory streamFactory) {
this.configRepository = configRepository;
this.integrationLauncher = integrationLauncher;
this.streamFactory = streamFactory;
this.connectorConfigUpdater = connectorConfigUpdater;
}

public DefaultDiscoverCatalogWorker(final ConfigRepository configRepository,
final IntegrationLauncher integrationLauncher) {
this(configRepository, integrationLauncher, new DefaultAirbyteStreamFactory());
final IntegrationLauncher integrationLauncher,
final ConnectorConfigUpdater connectorConfigUpdater) {
this(configRepository, integrationLauncher, connectorConfigUpdater, new DefaultAirbyteStreamFactory());
}

@Trace(operationName = WORKER_OPERATION_NAME)
Expand Down Expand Up @@ -90,6 +96,12 @@ public ConnectorJobOutput run(final StandardDiscoverCatalogInput discoverSchemaI
.map(AirbyteMessage::getCatalog)
.findFirst();

final Optional<AirbyteControlConnectorConfigMessage> optionalConfigMsg = WorkerUtils.getMostRecentConfigControlMessage(messagesByType);
optionalConfigMsg.ifPresent(
configMessage -> connectorConfigUpdater.updateSource(
UUID.fromString(discoverSchemaInput.getSourceId()),
configMessage.getConfig()));

final int exitCode = process.exitValue();
if (exitCode == 0) {
if (catalog.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,12 @@ required:
- connectionConfiguration
- dockerImage
properties:
actorType:
"$ref": ActorType.yaml
actorId:
description: The ID of the actor being checked, so we can persist config updates
type: string
format: uuid
connectionConfiguration:
description: Integration specific blob. Must be a valid JSON string.
type: object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,12 @@ required:
- connectionConfiguration
additionalProperties: false
properties:
actorType:
"$ref": ActorType.yaml
actorId:
description: The ID of the actor being checked, so we can persist config updates
type: string
format: uuid
connectionConfiguration:
description: Integration specific blob. Must be a valid JSON string.
type: object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import io.airbyte.workers.general.DbtTransformationRunner;
import io.airbyte.workers.general.DefaultCheckConnectionWorker;
import io.airbyte.workers.general.DefaultGetSpecWorker;
import io.airbyte.workers.helper.ConnectorConfigUpdater;
import io.airbyte.workers.helper.EntrypointEnvChecker;
import io.airbyte.workers.internal.AirbyteDestination;
import io.airbyte.workers.internal.DefaultAirbyteDestination;
Expand Down Expand Up @@ -94,6 +95,7 @@
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.ArgumentsProvider;
import org.junit.jupiter.params.provider.ArgumentsSource;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -114,6 +116,7 @@ public abstract class DestinationAcceptanceTest {
private Path jobRoot;
private ProcessFactory processFactory;
private WorkerConfigs workerConfigs;
private ConnectorConfigUpdater mConnectorConfigUpdater;

protected Path localRoot;
protected TestDataComparator testDataComparator = getTestDataComparator();
Expand All @@ -131,11 +134,11 @@ private String getImageNameWithoutTag() {

private Optional<StandardDestinationDefinition> getOptionalDestinationDefinitionFromProvider(final String imageNameWithoutTag) {
try {
LocalDefinitionsProvider provider = new LocalDefinitionsProvider(LocalDefinitionsProvider.DEFAULT_SEED_DEFINITION_RESOURCE_CLASS);
final LocalDefinitionsProvider provider = new LocalDefinitionsProvider(LocalDefinitionsProvider.DEFAULT_SEED_DEFINITION_RESOURCE_CLASS);
return provider.getDestinationDefinitions().stream()
.filter(definition -> imageNameWithoutTag.equalsIgnoreCase(definition.getDockerRepository()))
.findFirst();
} catch (IOException e) {
} catch (final IOException e) {
return Optional.empty();
}
}
Expand Down Expand Up @@ -362,6 +365,7 @@ void setUpInternal() throws Exception {
LOGGER.info("localRoot: {}", localRoot);
testEnv = new TestDestinationEnv(localRoot);
workerConfigs = new WorkerConfigs(new EnvConfigs());
mConnectorConfigUpdater = Mockito.mock(ConnectorConfigUpdater.class);

setup(testEnv);

Expand Down Expand Up @@ -1210,7 +1214,8 @@ private ConnectorSpecification runSpec() throws WorkerException {

protected StandardCheckConnectionOutput runCheck(final JsonNode config) throws WorkerException {
return new DefaultCheckConnectionWorker(
new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, getImageName(), processFactory, null, false))
new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, getImageName(), processFactory, null, false),
mConnectorConfigUpdater)
.run(new StandardCheckConnectionInput().withConnectionConfiguration(config), jobRoot)
.getCheckConnection();
}
Expand All @@ -1219,7 +1224,8 @@ protected StandardCheckConnectionOutput.Status runCheckWithCatchedException(
final JsonNode config) {
try {
final StandardCheckConnectionOutput standardCheckConnectionOutput = new DefaultCheckConnectionWorker(
new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, getImageName(), processFactory, null, false))
new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, getImageName(), processFactory, null, false),
mConnectorConfigUpdater)
.run(new StandardCheckConnectionInput().withConnectionConfiguration(config), jobRoot)
.getCheckConnection();
return standardCheckConnectionOutput.getStatus();
Expand Down Expand Up @@ -1648,7 +1654,7 @@ public void testDataTypeTestWithNormalization(final String messagesFilename,
@Test
public void testSyncNumberNanDataType() throws Exception {
// NaN/Infinity protocol supports started from V1 version or higher
SpecialNumericTypes numericTypesSupport = getSpecialNumericTypesSupportTest();
final SpecialNumericTypes numericTypesSupport = getSpecialNumericTypesSupportTest();
if (getProtocolVersion().equals(ProtocolVersion.V0) || !numericTypesSupport.isSupportNumberNan()) {
return;
}
Expand All @@ -1664,7 +1670,7 @@ public void testSyncNumberNanDataType() throws Exception {
@Test
public void testSyncIntegerNanDataType() throws Exception {
// NaN/Infinity protocol supports started from V1 version or higher
SpecialNumericTypes numericTypesSupport = getSpecialNumericTypesSupportTest();
final SpecialNumericTypes numericTypesSupport = getSpecialNumericTypesSupportTest();
if (getProtocolVersion().equals(ProtocolVersion.V0) || !numericTypesSupport.isSupportIntegerNan()) {
return;
}
Expand All @@ -1680,7 +1686,7 @@ public void testSyncIntegerNanDataType() throws Exception {
@Test
public void testSyncNumberInfinityDataType() throws Exception {
// NaN/Infinity protocol supports started from V1 version or higher
SpecialNumericTypes numericTypesSupport = getSpecialNumericTypesSupportTest();
final SpecialNumericTypes numericTypesSupport = getSpecialNumericTypesSupportTest();
if (getProtocolVersion().equals(ProtocolVersion.V0) || !numericTypesSupport.isSupportNumberInfinity()) {
return;
}
Expand All @@ -1696,7 +1702,7 @@ public void testSyncNumberInfinityDataType() throws Exception {
@Test
public void testSyncIntegerInfinityDataType() throws Exception {
// NaN/Infinity protocol supports started from V1 version or higher
SpecialNumericTypes numericTypesSupport = getSpecialNumericTypesSupportTest();
final SpecialNumericTypes numericTypesSupport = getSpecialNumericTypesSupportTest();
if (getProtocolVersion().equals(ProtocolVersion.V0) || !numericTypesSupport.isSupportIntegerInfinity()) {
return;
}
Expand All @@ -1709,7 +1715,8 @@ public void testSyncIntegerInfinityDataType() throws Exception {
runAndCheck(catalog, configuredCatalog, messages);
}

private void runAndCheck(AirbyteCatalog catalog, ConfiguredAirbyteCatalog configuredCatalog, List<AirbyteMessage> messages) throws Exception {
private void runAndCheck(final AirbyteCatalog catalog, final ConfiguredAirbyteCatalog configuredCatalog, final List<AirbyteMessage> messages)
throws Exception {
if (supportsNormalization()) {
LOGGER.info("Normalization is supported! Run test with normalization.");
runAndCheckWithNormalization(messages, configuredCatalog, catalog);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.airbyte.workers.general.DefaultCheckConnectionWorker;
import io.airbyte.workers.general.DefaultDiscoverCatalogWorker;
import io.airbyte.workers.general.DefaultGetSpecWorker;
import io.airbyte.workers.helper.ConnectorConfigUpdater;
import io.airbyte.workers.helper.EntrypointEnvChecker;
import io.airbyte.workers.internal.AirbyteSource;
import io.airbyte.workers.internal.DefaultAirbyteSource;
Expand Down Expand Up @@ -111,6 +112,7 @@ public abstract class AbstractSourceConnectorTest {
private WorkerConfigs workerConfigs;

private ConfigRepository mConfigRepository;
private ConnectorConfigUpdater mConnectorConfigUpdater;

// This has to be using the protocol version of the platform in order to capture the arg
private final ArgumentCaptor<io.airbyte.protocol.models.AirbyteCatalog> lastPersistedCatalog =
Expand All @@ -131,6 +133,7 @@ public void setUpInternal() throws Exception {
setupEnvironment(environment);
workerConfigs = new WorkerConfigs(new EnvConfigs());
mConfigRepository = mock(ConfigRepository.class);
mConnectorConfigUpdater = mock(ConnectorConfigUpdater.class);
processFactory = new DockerProcessFactory(
workerConfigs,
workspaceRoot,
Expand Down Expand Up @@ -161,20 +164,23 @@ protected ConnectorSpecification runSpec() throws WorkerException {

protected StandardCheckConnectionOutput runCheck() throws Exception {
return new DefaultCheckConnectionWorker(
new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, getImageName(), processFactory, workerConfigs.getResourceRequirements(), false))
new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, getImageName(), processFactory, workerConfigs.getResourceRequirements(), false),
mConnectorConfigUpdater)
.run(new StandardCheckConnectionInput().withConnectionConfiguration(getConfig()), jobRoot).getCheckConnection();
}

protected String runCheckAndGetStatusAsString(final JsonNode config) throws Exception {
return new DefaultCheckConnectionWorker(
new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, getImageName(), processFactory, workerConfigs.getResourceRequirements(), false))
new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, getImageName(), processFactory, workerConfigs.getResourceRequirements(), false),
mConnectorConfigUpdater)
.run(new StandardCheckConnectionInput().withConnectionConfiguration(config), jobRoot).getCheckConnection().getStatus().toString();
}

protected UUID runDiscover() throws Exception {
final UUID toReturn = new DefaultDiscoverCatalogWorker(
mConfigRepository,
new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, getImageName(), processFactory, workerConfigs.getResourceRequirements(), false))
new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, getImageName(), processFactory, workerConfigs.getResourceRequirements(), false),
mConnectorConfigUpdater)
.run(new StandardDiscoverCatalogInput().withSourceId(SOURCE_ID.toString()).withConnectionConfiguration(getConfig()), jobRoot)
.getDiscoverCatalogId();
verify(mConfigRepository).writeActorCatalogFetchEvent(lastPersistedCatalog.capture(), any(), any(), any());
Expand Down
Loading

0 comments on commit 01e256f

Please sign in to comment.