Skip to content

Commit

Permalink
Check protocol version compatibility during a platform update (#19200)
Browse files Browse the repository at this point in the history
* Refactoring to improve code re-use

* Add ProtocolVersionChecker

* Add an option to configure if we are automatically upgrading connectors

* Add airbyte version check to pass the fresh install case

* Inject DefinitionsProvider in the BootloaderApp

* Remove AutoUpgradeConnector config

* Improve logging

* Use named argument rather than positional

* Make DefinitionsProvider optional

* Format
  • Loading branch information
gosusnp authored Nov 14, 2022
1 parent 77243e4 commit 8683bc8
Show file tree
Hide file tree
Showing 7 changed files with 703 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
import io.airbyte.commons.features.FeatureFlags;
import io.airbyte.commons.lang.CloseableShutdownHook;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.commons.version.AirbyteProtocolVersionRange;
import io.airbyte.commons.version.AirbyteVersion;
import io.airbyte.commons.version.Version;
import io.airbyte.config.Configs;
import io.airbyte.config.EnvConfigs;
import io.airbyte.config.Geography;
Expand Down Expand Up @@ -70,7 +70,7 @@ public class BootloaderApp {
private final FeatureFlags featureFlags;
private final SecretMigrator secretMigrator;
private ConfigRepository configRepository;
private DefinitionsProvider localDefinitionsProvider;
private Optional<DefinitionsProvider> definitionsProvider;
private Database configDatabase;
private Database jobDatabase;
private JobPersistence jobPersistence;
Expand All @@ -79,6 +79,13 @@ public class BootloaderApp {
private final DSLContext configsDslContext;
private final DSLContext jobsDslContext;

// This controls how we check the protocol version compatibility
// True means that the connectors will be forcefully upgraded regardless of whether they are used in
// an active sync or not.
// This should be moved to a Configs, however, this behavior is currently forced through hooks that
// are passed as the postLoadExecution.
private final boolean autoUpgradeConnectors;

/**
* This method is exposed for Airbyte Cloud consumption. This lets us override the seed loading
* logic and customise Cloud connector versions. Please check with the Platform team before making
Expand All @@ -97,7 +104,9 @@ public BootloaderApp(final Configs configs,
final DSLContext configsDslContext,
final DSLContext jobsDslContext,
final Flyway configsFlyway,
final Flyway jobsFlyway) {
final Flyway jobsFlyway,
final Optional<DefinitionsProvider> definitionsProvider,
final boolean autoUpgradeConnectors) {
this.configs = configs;
this.postLoadExecution = postLoadExecution;
this.featureFlags = featureFlags;
Expand All @@ -106,30 +115,65 @@ public BootloaderApp(final Configs configs,
this.configsFlyway = configsFlyway;
this.jobsDslContext = jobsDslContext;
this.jobsFlyway = jobsFlyway;
this.definitionsProvider = definitionsProvider;
this.autoUpgradeConnectors = autoUpgradeConnectors;

initPersistences(configsDslContext, jobsDslContext);
}

// Temporary duplication of constructor, to remove once Cloud has been migrated to the one above.
@Deprecated(forRemoval = true)
public BootloaderApp(final Configs configs,
final Runnable postLoadExecution,
final FeatureFlags featureFlags,
final SecretMigrator secretMigrator,
final DSLContext configsDslContext,
final DSLContext jobsDslContext,
final Flyway configsFlyway,
final Flyway jobsFlyway) {
this.configs = configs;
this.postLoadExecution = postLoadExecution;
this.featureFlags = featureFlags;
this.secretMigrator = secretMigrator;
this.configsDslContext = configsDslContext;
this.configsFlyway = configsFlyway;
this.jobsDslContext = jobsDslContext;
this.jobsFlyway = jobsFlyway;
this.autoUpgradeConnectors = false;

try {
this.definitionsProvider = Optional.of(getLocalDefinitionsProvider());
} catch (final IOException e) {
LOGGER.error("Unable to initialize persistence.", e);
}

initPersistences(configsDslContext, jobsDslContext);
}

public BootloaderApp(final Configs configs,
final FeatureFlags featureFlags,
final SecretMigrator secretMigrator,
final DSLContext configsDslContext,
final DSLContext jobsDslContext,
final Flyway configsFlyway,
final Flyway jobsFlyway,
final DefinitionsProvider definitionsProvider,
final boolean autoUpgradeConnectors) {
this.configs = configs;
this.featureFlags = featureFlags;
this.secretMigrator = secretMigrator;
this.configsDslContext = configsDslContext;
this.configsFlyway = configsFlyway;
this.jobsDslContext = jobsDslContext;
this.jobsFlyway = jobsFlyway;
this.definitionsProvider = Optional.of(definitionsProvider);
this.autoUpgradeConnectors = autoUpgradeConnectors;

initPersistences(configsDslContext, jobsDslContext);

postLoadExecution = () -> {
try {
final ApplyDefinitionsHelper applyDefinitionsHelper = new ApplyDefinitionsHelper(configRepository, localDefinitionsProvider);
final ApplyDefinitionsHelper applyDefinitionsHelper = new ApplyDefinitionsHelper(configRepository, this.definitionsProvider.get());
applyDefinitionsHelper.apply();

if (featureFlags.forceSecretMigration() || !jobPersistence.isSecretMigrated()) {
Expand Down Expand Up @@ -159,10 +203,9 @@ public void load() throws Exception {
final AirbyteVersion currAirbyteVersion = configs.getAirbyteVersion();
assertNonBreakingMigration(jobPersistence, currAirbyteVersion);

final Version airbyteProtocolVersionMax = configs.getAirbyteProtocolVersionMax();
final Version airbyteProtocolVersionMin = configs.getAirbyteProtocolVersionMin();
// TODO ProtocolVersion validation should happen here
trackProtocolVersion(airbyteProtocolVersionMin, airbyteProtocolVersionMax);
final ProtocolVersionChecker protocolVersionChecker =
new ProtocolVersionChecker(jobPersistence, configs, configRepository, definitionsProvider);
assertNonBreakingProtocolVersionConstraints(protocolVersionChecker, jobPersistence, autoUpgradeConnectors);

// TODO Will be converted to an injected singleton during DI migration
final DatabaseMigrator configDbMigrator = new ConfigsDatabaseMigrator(configDatabase, configsFlyway);
Expand Down Expand Up @@ -191,7 +234,7 @@ private static Database getConfigDatabase(final DSLContext dslContext) throws IO
return new Database(dslContext);
}

private static DefinitionsProvider getLocalDefinitionsProvider() throws IOException {
static DefinitionsProvider getLocalDefinitionsProvider() throws IOException {
return new LocalDefinitionsProvider(LocalDefinitionsProvider.DEFAULT_SEED_DEFINITION_RESOURCE_CLASS);
}

Expand All @@ -207,7 +250,6 @@ private void initPersistences(final DSLContext configsDslContext, final DSLConte
try {
configDatabase = getConfigDatabase(configsDslContext);
configRepository = new ConfigRepository(configDatabase);
localDefinitionsProvider = getLocalDefinitionsProvider();
jobDatabase = getJobDatabase(jobsDslContext);
jobPersistence = getJobPersistence(jobDatabase);
} catch (final IOException e) {
Expand Down Expand Up @@ -249,7 +291,11 @@ public static void main(final String[] args) throws Exception {
// Ensure that the database resources are closed on application shutdown
CloseableShutdownHook.registerRuntimeShutdownHook(configsDataSource, jobsDataSource, configsDslContext, jobsDslContext);

final var bootloader = new BootloaderApp(configs, featureFlags, secretMigrator, configsDslContext, jobsDslContext, configsFlyway, jobsFlyway);
final DefinitionsProvider definitionsProvider = getLocalDefinitionsProvider();

final var bootloader =
new BootloaderApp(configs, featureFlags, secretMigrator, configsDslContext, jobsDslContext, configsFlyway, jobsFlyway, definitionsProvider,
false);
bootloader.load();
}
}
Expand Down Expand Up @@ -307,10 +353,24 @@ private static void assertNonBreakingMigration(final JobPersistence jobPersisten
}
}

private void trackProtocolVersion(final Version airbyteProtocolVersionMin, final Version airbyteProtocolVersionMax) throws IOException {
jobPersistence.setAirbyteProtocolVersionMin(airbyteProtocolVersionMin);
jobPersistence.setAirbyteProtocolVersionMax(airbyteProtocolVersionMax);
LOGGER.info("AirbyteProtocol version support range [{}:{}]", airbyteProtocolVersionMin.serialize(), airbyteProtocolVersionMax.serialize());
private static void assertNonBreakingProtocolVersionConstraints(final ProtocolVersionChecker protocolVersionChecker,
final JobPersistence jobPersistence,
final boolean autoUpgradeConnectors)
throws Exception {
final Optional<AirbyteProtocolVersionRange> newProtocolRange = protocolVersionChecker.validate(autoUpgradeConnectors);
if (newProtocolRange.isEmpty()) {
throw new RuntimeException(
"Aborting bootloader to avoid breaking existing connection after an upgrade. " +
"Please address airbyte protocol version support issues in the connectors before retrying.");
}
trackProtocolVersion(jobPersistence, newProtocolRange.get());
}

private static void trackProtocolVersion(final JobPersistence jobPersistence, final AirbyteProtocolVersionRange protocolVersionRange)
throws IOException {
jobPersistence.setAirbyteProtocolVersionMin(protocolVersionRange.min());
jobPersistence.setAirbyteProtocolVersionMax(protocolVersionRange.max());
LOGGER.info("AirbyteProtocol version support range [{}:{}]", protocolVersionRange.min().serialize(), protocolVersionRange.max().serialize());
}

static boolean isLegalUpgrade(final AirbyteVersion airbyteDatabaseVersion, final AirbyteVersion airbyteVersion) {
Expand Down
Loading

0 comments on commit 8683bc8

Please sign in to comment.