Skip to content

Commit

Permalink
prefer AirbyteVersion over String (airbytehq#7310)
Browse files Browse the repository at this point in the history
  • Loading branch information
cgardens authored and schlattk committed Jan 4, 2022
1 parent 90e4d31 commit 835f49f
Show file tree
Hide file tree
Showing 19 changed files with 125 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public AirbyteVersion(final String major, final String minor, final String patch
this.patch = patch;
}

public String getVersion() {
public String serialize() {
return version;
}

Expand Down Expand Up @@ -74,6 +74,27 @@ public int compatibleVersionCompareTo(final AirbyteVersion another) {
return compareVersion(minor, another.minor);
}

/**
* @return true if this is greater than other. otherwise false.
*/
public boolean greaterThan(final AirbyteVersion other) {
return patchVersionCompareTo(other) > 0;
}

/**
* @return true if this is greater than or equal toother. otherwise false.
*/
public boolean greaterThanOrEqualTo(final AirbyteVersion other) {
return patchVersionCompareTo(other) >= 0;
}

/**
* @return true if this is less than other. otherwise false.
*/
public boolean lessThan(final AirbyteVersion other) {
return patchVersionCompareTo(other) < 0;
}

/**
* Compares two Airbyte Version to check if they are equivalent (including patch version).
*/
Expand All @@ -92,6 +113,10 @@ public int patchVersionCompareTo(final AirbyteVersion another) {
return compareVersion(patch, another.patch);
}

public boolean isDev() {
return version.equals(DEV_VERSION);
}

/**
* Version string needs to be converted to integer for comparison, because string comparison does
* not handle version string with different digits correctly. For example:
Expand All @@ -101,25 +126,21 @@ private static int compareVersion(final String v1, final String v2) {
return Integer.compare(Integer.parseInt(v1), Integer.parseInt(v2));
}

public static void assertIsCompatible(final String version1, final String version2) throws IllegalStateException {
public static void assertIsCompatible(final AirbyteVersion version1, final AirbyteVersion version2) throws IllegalStateException {
if (!isCompatible(version1, version2)) {
throw new IllegalStateException(getErrorMessage(version1, version2));
}
}

public static String getErrorMessage(final String version1, final String version2) {
final String cleanVersion1 = version1.replace("\n", "").strip();
final String cleanVersion2 = version2.replace("\n", "").strip();
public static String getErrorMessage(final AirbyteVersion version1, final AirbyteVersion version2) {
return String.format(
"Version mismatch between %s and %s.\n" +
"Please upgrade or reset your Airbyte Database, see more at https://docs.airbyte.io/operator-guides/upgrading-airbyte",
cleanVersion1, cleanVersion2);
version1.serialize(), version2.serialize());
}

public static boolean isCompatible(final String v1, final String v2) {
final AirbyteVersion version1 = new AirbyteVersion(v1);
final AirbyteVersion version2 = new AirbyteVersion(v2);
return version1.compatibleVersionCompareTo(version2) == 0;
public static boolean isCompatible(final AirbyteVersion v1, final AirbyteVersion v2) {
return v1.compatibleVersionCompareTo(v2) == 0;
}

@Override
Expand All @@ -137,7 +158,7 @@ public static AirbyteVersion versionWithoutPatch(final AirbyteVersion airbyteVer
+ "."
+ airbyteVersion.getMinorVersion()
+ ".0-"
+ airbyteVersion.getVersion().replace("\n", "").strip().split("-")[1];
+ airbyteVersion.serialize().replace("\n", "").strip().split("-")[1];
return new AirbyteVersion(versionWithoutPatch);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@
package io.airbyte.commons.version;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

import org.junit.jupiter.api.Test;

public class TestAirbyteVersion {
public class AirbyteVersionTest {

@Test
public void testParseVersion() {
Expand Down Expand Up @@ -53,6 +54,32 @@ public void testPatchVersionCompareTo() {
assertEquals(0, new AirbyteVersion("dev").patchVersionCompareTo(new AirbyteVersion("1.2.3-prod")));
}

@Test
public void testGreaterThan() {
assertFalse(new AirbyteVersion("6.7.8-omega").greaterThan(new AirbyteVersion("6.7.8-gamma")));
assertFalse(new AirbyteVersion("6.7.8-alpha").greaterThan(new AirbyteVersion("6.7.9-alpha")));
assertFalse(new AirbyteVersion("6.7.8-alpha").greaterThan(new AirbyteVersion("6.7.11-alpha")));
assertTrue(new AirbyteVersion("6.8.0-alpha").greaterThan(new AirbyteVersion("6.7.8-alpha")));
assertTrue(new AirbyteVersion("6.11.0-alpha").greaterThan(new AirbyteVersion("6.7.8-alpha")));
assertFalse(new AirbyteVersion("3.8.0-alpha").greaterThan(new AirbyteVersion("6.7.8-alpha")));
assertFalse(new AirbyteVersion("3.8.0-alpha").greaterThan(new AirbyteVersion("11.7.8-alpha")));
assertFalse(new AirbyteVersion("1.2.3-prod").greaterThan(new AirbyteVersion("dev")));
assertFalse(new AirbyteVersion("dev").greaterThan(new AirbyteVersion("1.2.3-prod")));
}

@Test
public void testLessThan() {
assertFalse(new AirbyteVersion("6.7.8-omega").lessThan(new AirbyteVersion("6.7.8-gamma")));
assertTrue(new AirbyteVersion("6.7.8-alpha").lessThan(new AirbyteVersion("6.7.9-alpha")));
assertTrue(new AirbyteVersion("6.7.8-alpha").lessThan(new AirbyteVersion("6.7.11-alpha")));
assertFalse(new AirbyteVersion("6.8.0-alpha").lessThan(new AirbyteVersion("6.7.8-alpha")));
assertFalse(new AirbyteVersion("6.11.0-alpha").lessThan(new AirbyteVersion("6.7.8-alpha")));
assertTrue(new AirbyteVersion("3.8.0-alpha").lessThan(new AirbyteVersion("6.7.8-alpha")));
assertTrue(new AirbyteVersion("3.8.0-alpha").lessThan(new AirbyteVersion("11.7.8-alpha")));
assertFalse(new AirbyteVersion("1.2.3-prod").lessThan(new AirbyteVersion("dev")));
assertFalse(new AirbyteVersion("dev").lessThan(new AirbyteVersion("1.2.3-prod")));
}

@Test
public void testInvalidVersions() {
assertThrows(NullPointerException.class, () -> new AirbyteVersion(null));
Expand All @@ -61,8 +88,8 @@ public void testInvalidVersions() {

@Test
public void testCheckVersion() {
AirbyteVersion.assertIsCompatible("3.2.1", "3.2.1");
assertThrows(IllegalStateException.class, () -> AirbyteVersion.assertIsCompatible("1.2.3", "3.2.1"));
AirbyteVersion.assertIsCompatible(new AirbyteVersion("3.2.1"), new AirbyteVersion("3.2.1"));
assertThrows(IllegalStateException.class, () -> AirbyteVersion.assertIsCompatible(new AirbyteVersion("1.2.3"), new AirbyteVersion("3.2.1")));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ static MigrationVersion getNextMigrationVersion(final AirbyteVersion currentAirb
System.out.println("lastMigrationId: " + lastMigrationId);
final String nextMigrationId = String.format("%03d", Integer.parseInt(lastMigrationId) + 1);
System.out.println("nextMigrationId: " + nextMigrationId);
return MigrationVersion.fromVersion(String.format("%s_%s", migrationAirbyteVersion.getVersion(), nextMigrationId));
return MigrationVersion.fromVersion(String.format("%s_%s", migrationAirbyteVersion.serialize(), nextMigrationId));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public void testGetCurrentAirbyteVersion() {
public void testGetAirbyteVersion() {
final MigrationVersion migrationVersion = MigrationVersion.fromVersion("0.11.3.010");
final AirbyteVersion airbyteVersion = MigrationDevHelper.getAirbyteVersion(migrationVersion);
assertEquals("0.11.3", airbyteVersion.getVersion());
assertEquals("0.11.3", airbyteVersion.serialize());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@ public static void run(final String[] args) throws IOException {

public static void run(MigrateConfig migrateConfig) throws IOException {
final Path workspaceRoot = Files.createTempDirectory(Path.of("/tmp"), "airbyte_migrate");
migrateConfig = new MigrateConfig(migrateConfig.getInputPath(), migrateConfig.getOutputPath(),
AirbyteVersion.versionWithoutPatch(migrateConfig.getTargetVersion()).getVersion());
migrateConfig = new MigrateConfig(migrateConfig.getInputPath(),
migrateConfig.getOutputPath(),
AirbyteVersion.versionWithoutPatch(migrateConfig.getTargetVersion()).serialize());

if (migrateConfig.getInputPath().toString().endsWith(".gz")) {
LOGGER.info("Unpacking tarball");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,9 @@ public static void main(final String[] args) throws IOException, InterruptedExce
configs.getWorkspaceRetentionConfig(),
workspaceRoot,
jobPersistence);
AirbyteVersion.assertIsCompatible(configs.getAirbyteVersion(), jobPersistence.getVersion().orElseThrow());
AirbyteVersion.assertIsCompatible(
new AirbyteVersion(configs.getAirbyteVersion()),
jobPersistence.getVersion().map(AirbyteVersion::new).orElseThrow());

TrackingClientSingleton.initialize(
configs.getTrackingStrategy(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public static void initStagedResourceFolder() {
}
}

public void importDataWithSeed(final String targetVersion, final File archive, final ConfigPersistence seedPersistence)
public void importDataWithSeed(final AirbyteVersion targetVersion, final File archive, final ConfigPersistence seedPersistence)
throws IOException, JsonValidationException {
final Path sourceRoot = Files.createTempDirectory(Path.of("/tmp"), "airbyte_archive");
try {
Expand All @@ -145,7 +145,7 @@ public void importDataWithSeed(final String targetVersion, final File archive, f

// 5. Set DB version
LOGGER.info("Setting the DB Airbyte version to : " + targetVersion);
jobPersistence.setVersion(targetVersion);
jobPersistence.setVersion(targetVersion.serialize());

// 6. check db version
checkDBVersion(targetVersion);
Expand All @@ -158,10 +158,12 @@ public void importDataWithSeed(final String targetVersion, final File archive, f
configRepository.listStandardWorkspaces(true).forEach(workspace -> TrackingClientSingleton.get().identify(workspace.getWorkspaceId()));
}

private void checkImport(final String targetVersion, final Path tempFolder) throws IOException {
private void checkImport(final AirbyteVersion targetVersion, final Path tempFolder) throws IOException {
final Path versionFile = tempFolder.resolve(VERSION_FILE_NAME);
final String importVersion = Files.readString(versionFile, Charset.defaultCharset())
.replace("\n", "").strip();
final AirbyteVersion importVersion = new AirbyteVersion(Files
.readString(versionFile, Charset.defaultCharset())
.replace("\n", "")
.strip());
LOGGER.info(String.format("Checking Airbyte Version to import %s", importVersion));
if (!AirbyteVersion.isCompatible(targetVersion, importVersion)) {
throw new IOException(String
Expand Down Expand Up @@ -232,7 +234,7 @@ protected static Path buildConfigPath(final Path storageRoot, final ConfigSchema
}

// Postgres Portion
public void importDatabaseFromArchive(final Path storageRoot, final String airbyteVersion) throws IOException {
public void importDatabaseFromArchive(final Path storageRoot, final AirbyteVersion airbyteVersion) throws IOException {
try {
final Map<JobsDatabaseSchema, Stream<JsonNode>> data = new HashMap<>();
for (final JobsDatabaseSchema tableType : JobsDatabaseSchema.values()) {
Expand All @@ -245,7 +247,7 @@ public void importDatabaseFromArchive(final Path storageRoot, final String airby

data.put(tableType, tableStream);
}
jobPersistence.importDatabase(airbyteVersion, data);
jobPersistence.importDatabase(airbyteVersion.serialize(), data);
LOGGER.info("Successful upgrade of airbyte postgres database from archive");
} catch (final Exception e) {
LOGGER.warn("Postgres database version upgrade failed, reverting to state previous to migration.");
Expand Down Expand Up @@ -309,10 +311,10 @@ private Stream<JsonNode> readTableFromArchive(final JobsDatabaseSchema tableSche
}
}

private void checkDBVersion(final String airbyteVersion) throws IOException {
final Optional<String> airbyteDatabaseVersion = jobPersistence.getVersion();
private void checkDBVersion(final AirbyteVersion airbyteVersion) throws IOException {
final Optional<AirbyteVersion> airbyteDatabaseVersion = jobPersistence.getVersion().map(AirbyteVersion::new);
airbyteDatabaseVersion
.ifPresent(dbversion -> AirbyteVersion.assertIsCompatible(airbyteVersion, dbversion));
.ifPresent(dbVersion -> AirbyteVersion.assertIsCompatible(airbyteVersion, dbVersion));
}

public UploadRead uploadArchiveResource(final File archive) {
Expand Down Expand Up @@ -341,7 +343,7 @@ public void deleteArchiveResource(final UUID resourceId) {
FileUtils.deleteQuietly(archive);
}

public void importIntoWorkspace(final String targetVersion, final UUID workspaceId, final File archive)
public void importIntoWorkspace(final AirbyteVersion targetVersion, final UUID workspaceId, final File archive)
throws IOException, JsonValidationException, ConfigNotFoundException {
final Path sourceRoot = Files.createTempDirectory(Path.of("/tmp"), "airbyte_archive");
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package io.airbyte.server;

import io.airbyte.commons.version.AirbyteVersion;
import io.airbyte.config.persistence.ConfigPersistence;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.migrate.MigrateConfig;
Expand All @@ -24,15 +25,15 @@
public class RunMigration implements Runnable, AutoCloseable {

private static final Logger LOGGER = LoggerFactory.getLogger(RunMigration.class);
private final String targetVersion;
private final AirbyteVersion targetVersion;
private final ConfigPersistence seedPersistence;
private final ConfigDumpExporter configDumpExporter;
private final ConfigDumpImporter configDumpImporter;
private final List<File> filesToBeCleanedUp = new ArrayList<>();

public RunMigration(final JobPersistence jobPersistence,
final ConfigRepository configRepository,
final String targetVersion,
final AirbyteVersion targetVersion,
final ConfigPersistence seedPersistence,
final SpecFetcher specFetcher) {
this.targetVersion = targetVersion;
Expand All @@ -55,7 +56,7 @@ public void run() {
filesToBeCleanedUp.add(tempFolder.toFile());

// Run Migration
final MigrateConfig migrateConfig = new MigrateConfig(exportData.toPath(), output.toPath(), targetVersion);
final MigrateConfig migrateConfig = new MigrateConfig(exportData.toPath(), output.toPath(), targetVersion.serialize());
MigrationRunner.run(migrateConfig);

// Import data
Expand Down
26 changes: 11 additions & 15 deletions airbyte-server/src/main/java/io/airbyte/server/ServerApp.java
Original file line number Diff line number Diff line change
Expand Up @@ -199,10 +199,10 @@ public static ServerRunnable getServer(final ServerFactory apiFactory, final Con
// if no workspace exists, we create one so the user starts out with a place to add configuration.
createWorkspaceIfNoneExists(configRepository);

final String airbyteVersion = configs.getAirbyteVersion();
final AirbyteVersion airbyteVersion = new AirbyteVersion(configs.getAirbyteVersion());
if (jobPersistence.getVersion().isEmpty()) {
LOGGER.info(String.format("Setting Database version to %s...", airbyteVersion));
jobPersistence.setVersion(airbyteVersion);
jobPersistence.setVersion(airbyteVersion.serialize());
}

final JobTracker jobTracker = new JobTracker(configRepository, jobPersistence, trackingClient);
Expand All @@ -222,17 +222,16 @@ public static ServerRunnable getServer(final ServerFactory apiFactory, final Con
// TODO: remove this specFetcherFn logic once file migrations are deprecated
configRepository.setSpecFetcher(dockerImage -> Exceptions.toRuntime(() -> specFetcher.getSpec(dockerImage)));

Optional<String> airbyteDatabaseVersion = jobPersistence.getVersion();
Optional<AirbyteVersion> airbyteDatabaseVersion = jobPersistence.getVersion().map(AirbyteVersion::new);
if (airbyteDatabaseVersion.isPresent() && isDatabaseVersionBehindAppVersion(airbyteVersion, airbyteDatabaseVersion.get())) {
final boolean isKubernetes = configs.getWorkerEnvironment() == WorkerEnvironment.KUBERNETES;
final boolean versionSupportsAutoMigrate =
new AirbyteVersion(airbyteDatabaseVersion.get()).patchVersionCompareTo(KUBE_SUPPORT_FOR_AUTOMATIC_MIGRATION) >= 0;
final boolean versionSupportsAutoMigrate = airbyteDatabaseVersion.get().greaterThanOrEqualTo(KUBE_SUPPORT_FOR_AUTOMATIC_MIGRATION);
if (!isKubernetes || versionSupportsAutoMigrate) {
runAutomaticMigration(configRepository, jobPersistence, seed, specFetcher, airbyteVersion, airbyteDatabaseVersion.get());
// After migration, upgrade the DB version
airbyteDatabaseVersion = jobPersistence.getVersion();
airbyteDatabaseVersion = jobPersistence.getVersion().map(AirbyteVersion::new);
} else {
LOGGER.info("Can not run automatic migration for Airbyte on KUBERNETES before version " + KUBE_SUPPORT_FOR_AUTOMATIC_MIGRATION.getVersion());
LOGGER.info("Can not run automatic migration for Airbyte on KUBERNETES before version " + KUBE_SUPPORT_FOR_AUTOMATIC_MIGRATION.serialize());
}
}

Expand Down Expand Up @@ -271,9 +270,9 @@ private static void runAutomaticMigration(final ConfigRepository configRepositor
final JobPersistence jobPersistence,
final ConfigPersistence seed,
final SpecFetcher specFetcher,
final String airbyteVersion,
final String airbyteDatabaseVersion) {
LOGGER.info("Running Automatic Migration from version : " + airbyteDatabaseVersion + " to version : " + airbyteVersion);
final AirbyteVersion airbyteVersion,
final AirbyteVersion airbyteDatabaseVersion) {
LOGGER.info("Running Automatic Migration from version : " + airbyteDatabaseVersion.serialize() + " to version : " + airbyteVersion.serialize());
try (final RunMigration runMigration = new RunMigration(
jobPersistence,
configRepository,
Expand All @@ -286,15 +285,12 @@ private static void runAutomaticMigration(final ConfigRepository configRepositor
}
}

public static boolean isDatabaseVersionBehindAppVersion(final String airbyteVersion, final String airbyteDatabaseVersion) {
final boolean bothVersionsCompatible = AirbyteVersion.isCompatible(airbyteVersion, airbyteDatabaseVersion);
public static boolean isDatabaseVersionBehindAppVersion(final AirbyteVersion serverVersion, final AirbyteVersion databaseVersion) {
final boolean bothVersionsCompatible = AirbyteVersion.isCompatible(serverVersion, databaseVersion);
if (bothVersionsCompatible) {
return false;
}

final AirbyteVersion serverVersion = new AirbyteVersion(airbyteVersion);
final AirbyteVersion databaseVersion = new AirbyteVersion(airbyteDatabaseVersion);

if (databaseVersion.getMajorVersion().compareTo(serverVersion.getMajorVersion()) < 0) {
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
import io.airbyte.api.model.WorkspaceReadList;
import io.airbyte.api.model.WorkspaceUpdate;
import io.airbyte.commons.io.FileTtlManager;
import io.airbyte.commons.version.AirbyteVersion;
import io.airbyte.config.Configs;
import io.airbyte.config.persistence.ConfigNotFoundException;
import io.airbyte.config.persistence.ConfigPersistence;
Expand Down Expand Up @@ -192,7 +193,7 @@ public ConfigurationApi(final ConfigRepository configRepository,
webBackendDestinationsHandler = new WebBackendDestinationsHandler(destinationHandler, configRepository, trackingClient);
healthCheckHandler = new HealthCheckHandler(configRepository);
archiveHandler = new ArchiveHandler(
configs.getAirbyteVersion(),
new AirbyteVersion(configs.getAirbyteVersion()),
configRepository,
jobPersistence,
seed,
Expand Down
Loading

0 comments on commit 835f49f

Please sign in to comment.