Skip to content

Commit

Permalink
Migrate secret from a non secret store to a secret store (#12088)
Browse files Browse the repository at this point in the history
Introduce a migration to a secret manager

If a secret manager is specify, it will go though all the config, save the secret in the configured secret store. If the secret is already in a store, it will not migrate the secret to the secret store.
  • Loading branch information
benmoriceau authored May 4, 2022
1 parent 3c1eab3 commit 07359ff
Show file tree
Hide file tree
Showing 11 changed files with 651 additions and 61 deletions.
1 change: 1 addition & 0 deletions .env
Original file line number Diff line number Diff line change
Expand Up @@ -93,3 +93,4 @@ MAX_DISCOVER_WORKERS=5
NEW_SCHEDULER=false
AUTO_DISABLE_FAILING_CONNECTIONS=false
EXPOSE_SECRETS_IN_EXPORT=false
MIGRATE_SECRET_STORE=false
2 changes: 1 addition & 1 deletion airbyte-bootloader/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ dependencies {
implementation project(':airbyte-config:persistence')
implementation project(':airbyte-db:lib')
implementation project(":airbyte-json-validation")
implementation project(':airbyte-protocol:models')
implementation project(':airbyte-scheduler:persistence')
implementation project(':airbyte-scheduler:models')

implementation 'io.temporal:temporal-sdk:1.8.1'
implementation "org.flywaydb:flyway-core:7.14.0"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,16 @@
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.config.persistence.DatabaseConfigPersistence;
import io.airbyte.config.persistence.split_secrets.JsonSecretsProcessor;
import io.airbyte.config.persistence.split_secrets.SecretPersistence;
import io.airbyte.db.Database;
import io.airbyte.db.instance.DatabaseMigrator;
import io.airbyte.db.instance.configs.ConfigsDatabaseInstance;
import io.airbyte.db.instance.configs.ConfigsDatabaseMigrator;
import io.airbyte.db.instance.jobs.JobsDatabaseInstance;
import io.airbyte.db.instance.jobs.JobsDatabaseMigrator;
import io.airbyte.scheduler.models.Job;
import io.airbyte.scheduler.models.JobStatus;
import io.airbyte.scheduler.persistence.DefaultJobPersistence;
import io.airbyte.scheduler.persistence.JobPersistence;
import io.airbyte.validation.json.JsonValidationException;
import io.temporal.client.WorkflowClient;
import io.temporal.serviceclient.WorkflowServiceStubs;
import io.temporal.serviceclient.WorkflowServiceStubsOptions;
import java.io.IOException;
import java.util.Optional;
import java.util.UUID;
Expand All @@ -55,13 +51,15 @@ public class BootloaderApp {
private static final AirbyteVersion VERSION_BREAK = new AirbyteVersion("0.32.0-alpha");

private final Configs configs;
private Runnable postLoadExecution;
private final Runnable postLoadExecution;
private final FeatureFlags featureFlags;
private final SecretMigrator secretMigrator;
private ConfigPersistence configPersistence;
private Database configDatabase;

@VisibleForTesting
public BootloaderApp(final Configs configs, final FeatureFlags featureFlags) {
this.configs = configs;
this.featureFlags = featureFlags;
public BootloaderApp(final Configs configs, final FeatureFlags featureFlags, final SecretMigrator secretMigrator) {
this(configs, () -> {}, featureFlags, secretMigrator);
}

/**
Expand All @@ -72,41 +70,54 @@ public BootloaderApp(final Configs configs, final FeatureFlags featureFlags) {
* @param configs
* @param postLoadExecution
*/
public BootloaderApp(final Configs configs, final Runnable postLoadExecution, final FeatureFlags featureFlags) {
public BootloaderApp(final Configs configs,
final Runnable postLoadExecution,
final FeatureFlags featureFlags,
final SecretMigrator secretMigrator) {
this.configs = configs;
this.postLoadExecution = postLoadExecution;
this.featureFlags = featureFlags;
this.secretMigrator = secretMigrator;

initPersistences();
}

public BootloaderApp(final Configs configs, final FeatureFlags featureFlags) {
this.configs = configs;
this.featureFlags = featureFlags;

try {
initPersistences();
final Optional<SecretPersistence> secretPersistence = SecretPersistence.getLongLived(configs);
secretMigrator = new SecretMigrator(configPersistence, secretPersistence);

postLoadExecution = () -> {
try {
configPersistence.loadData(YamlSeedConfigPersistence.getDefault());

if (featureFlags.runSecretMigration()) {
secretMigrator.migrateSecrets();
}
LOGGER.info("Loaded seed data..");
} catch (final IOException | JsonValidationException e) {
throw new RuntimeException(e);
}
};

} catch (final IOException e) {
throw new RuntimeException(e);
}
}

public BootloaderApp() {
configs = new EnvConfigs();
featureFlags = new EnvVariableFeatureFlags();
postLoadExecution = () -> {
try {
final Database configDatabase =
new ConfigsDatabaseInstance(configs.getConfigDatabaseUser(), configs.getConfigDatabasePassword(), configs.getConfigDatabaseUrl())
.getAndInitialize();
final JsonSecretsProcessor jsonSecretsProcessor = JsonSecretsProcessor.builder()
.maskSecrets(!featureFlags.exposeSecretsInExport())
.copySecrets(true)
.build();
final ConfigPersistence configPersistence =
DatabaseConfigPersistence.createWithValidation(configDatabase, jsonSecretsProcessor);
configPersistence.loadData(YamlSeedConfigPersistence.getDefault());
LOGGER.info("Loaded seed data..");
} catch (final IOException e) {
e.printStackTrace();
}
};
this(new EnvConfigs(), new EnvVariableFeatureFlags());
}

public void load() throws Exception {
LOGGER.info("Setting up config database and default workspace..");

try (
final Database configDatabase =
new ConfigsDatabaseInstance(configs.getConfigDatabaseUser(), configs.getConfigDatabasePassword(), configs.getConfigDatabaseUrl())
.getAndInitialize();

final Database jobDatabase =
new JobsDatabaseInstance(configs.getDatabaseUser(), configs.getDatabasePassword(), configs.getDatabaseUrl()).getAndInitialize()) {
LOGGER.info("Created initial jobs and configs database...");
Expand All @@ -118,11 +129,6 @@ public void load() throws Exception {
runFlywayMigration(configs, configDatabase, jobDatabase);
LOGGER.info("Ran Flyway migrations...");

final JsonSecretsProcessor jsonSecretsProcessor = JsonSecretsProcessor.builder()
.maskSecrets(!featureFlags.exposeSecretsInExport())
.copySecrets(false)
.build();
final ConfigPersistence configPersistence = DatabaseConfigPersistence.createWithValidation(configDatabase, jsonSecretsProcessor);
final ConfigRepository configRepository =
new ConfigRepository(configPersistence, configDatabase);

Expand All @@ -136,14 +142,30 @@ public void load() throws Exception {
LOGGER.info("Set version to {}", currAirbyteVersion);
}

if (postLoadExecution != null) {
postLoadExecution.run();
LOGGER.info("Finished running post load Execution..");
}
postLoadExecution.run();
LOGGER.info("Finished running post load Execution..");

LOGGER.info("Finished bootstrapping Airbyte environment..");
}

private void initPersistences() {
try {
configDatabase = new ConfigsDatabaseInstance(
configs.getConfigDatabaseUser(),
configs.getConfigDatabasePassword(),
configs.getConfigDatabaseUrl()).getAndInitialize();

final JsonSecretsProcessor jsonSecretsProcessor = JsonSecretsProcessor.builder()
.maskSecrets(true)
.copySecrets(true)
.build();

configPersistence = DatabaseConfigPersistence.createWithValidation(configDatabase, jsonSecretsProcessor);
} catch (final IOException e) {
e.printStackTrace();
}
}

public static void main(final String[] args) throws Exception {
final var bootloader = new BootloaderApp();
bootloader.load();
Expand Down Expand Up @@ -230,16 +252,4 @@ private static void runFlywayMigration(final Configs configs, final Database con
}
}

private static void cleanupZombies(final JobPersistence jobPersistence) throws IOException {
final Configs configs = new EnvConfigs();
final WorkflowClient wfClient =
WorkflowClient.newInstance(WorkflowServiceStubs.newInstance(
WorkflowServiceStubsOptions.newBuilder().setTarget(configs.getTemporalHost()).build()));
for (final Job zombieJob : jobPersistence.listJobsWithStatus(JobStatus.RUNNING)) {
LOGGER.info("Kill zombie job {} for connection {}", zombieJob.getId(), zombieJob.getScope());
wfClient.newUntypedWorkflowStub("sync_" + zombieJob.getId())
.terminate("Zombie");
}
}

}
Loading

0 comments on commit 07359ff

Please sign in to comment.