Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

prefer AirbyteVersion over String #7310

Merged
merged 3 commits into from
Oct 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public AirbyteVersion(final String major, final String minor, final String patch
this.patch = patch;
}

public String getVersion() {
public String serialize() {
return version;
}

Expand Down Expand Up @@ -74,6 +74,27 @@ public int compatibleVersionCompareTo(final AirbyteVersion another) {
return compareVersion(minor, another.minor);
}

/**
* @return true if this is greater than other. otherwise false.
*/
public boolean greaterThan(final AirbyteVersion other) {
return patchVersionCompareTo(other) > 0;
}

/**
* @return true if this is greater than or equal toother. otherwise false.
*/
public boolean greaterThanOrEqualTo(final AirbyteVersion other) {
return patchVersionCompareTo(other) >= 0;
}

/**
* @return true if this is less than other. otherwise false.
*/
public boolean lessThan(final AirbyteVersion other) {
return patchVersionCompareTo(other) < 0;
}

/**
* Compares two Airbyte Version to check if they are equivalent (including patch version).
*/
Expand All @@ -92,6 +113,10 @@ public int patchVersionCompareTo(final AirbyteVersion another) {
return compareVersion(patch, another.patch);
}

public boolean isDev() {
return version.equals(DEV_VERSION);
}

/**
* Version string needs to be converted to integer for comparison, because string comparison does
* not handle version string with different digits correctly. For example:
Expand All @@ -101,25 +126,21 @@ private static int compareVersion(final String v1, final String v2) {
return Integer.compare(Integer.parseInt(v1), Integer.parseInt(v2));
}

public static void assertIsCompatible(final String version1, final String version2) throws IllegalStateException {
public static void assertIsCompatible(final AirbyteVersion version1, final AirbyteVersion version2) throws IllegalStateException {
if (!isCompatible(version1, version2)) {
throw new IllegalStateException(getErrorMessage(version1, version2));
}
}

public static String getErrorMessage(final String version1, final String version2) {
final String cleanVersion1 = version1.replace("\n", "").strip();
final String cleanVersion2 = version2.replace("\n", "").strip();
public static String getErrorMessage(final AirbyteVersion version1, final AirbyteVersion version2) {
return String.format(
"Version mismatch between %s and %s.\n" +
"Please upgrade or reset your Airbyte Database, see more at https://docs.airbyte.io/operator-guides/upgrading-airbyte",
cleanVersion1, cleanVersion2);
version1.serialize(), version2.serialize());
}

public static boolean isCompatible(final String v1, final String v2) {
final AirbyteVersion version1 = new AirbyteVersion(v1);
final AirbyteVersion version2 = new AirbyteVersion(v2);
return version1.compatibleVersionCompareTo(version2) == 0;
public static boolean isCompatible(final AirbyteVersion v1, final AirbyteVersion v2) {
return v1.compatibleVersionCompareTo(v2) == 0;
}

@Override
Expand All @@ -137,7 +158,7 @@ public static AirbyteVersion versionWithoutPatch(final AirbyteVersion airbyteVer
+ "."
+ airbyteVersion.getMinorVersion()
+ ".0-"
+ airbyteVersion.getVersion().replace("\n", "").strip().split("-")[1];
+ airbyteVersion.serialize().replace("\n", "").strip().split("-")[1];
return new AirbyteVersion(versionWithoutPatch);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@
package io.airbyte.commons.version;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

import org.junit.jupiter.api.Test;

public class TestAirbyteVersion {
public class AirbyteVersionTest {

@Test
public void testParseVersion() {
Expand Down Expand Up @@ -53,6 +54,32 @@ public void testPatchVersionCompareTo() {
assertEquals(0, new AirbyteVersion("dev").patchVersionCompareTo(new AirbyteVersion("1.2.3-prod")));
}

@Test
public void testGreaterThan() {
assertFalse(new AirbyteVersion("6.7.8-omega").greaterThan(new AirbyteVersion("6.7.8-gamma")));
assertFalse(new AirbyteVersion("6.7.8-alpha").greaterThan(new AirbyteVersion("6.7.9-alpha")));
assertFalse(new AirbyteVersion("6.7.8-alpha").greaterThan(new AirbyteVersion("6.7.11-alpha")));
assertTrue(new AirbyteVersion("6.8.0-alpha").greaterThan(new AirbyteVersion("6.7.8-alpha")));
assertTrue(new AirbyteVersion("6.11.0-alpha").greaterThan(new AirbyteVersion("6.7.8-alpha")));
assertFalse(new AirbyteVersion("3.8.0-alpha").greaterThan(new AirbyteVersion("6.7.8-alpha")));
assertFalse(new AirbyteVersion("3.8.0-alpha").greaterThan(new AirbyteVersion("11.7.8-alpha")));
assertFalse(new AirbyteVersion("1.2.3-prod").greaterThan(new AirbyteVersion("dev")));
assertFalse(new AirbyteVersion("dev").greaterThan(new AirbyteVersion("1.2.3-prod")));
}

@Test
public void testLessThan() {
assertFalse(new AirbyteVersion("6.7.8-omega").lessThan(new AirbyteVersion("6.7.8-gamma")));
assertTrue(new AirbyteVersion("6.7.8-alpha").lessThan(new AirbyteVersion("6.7.9-alpha")));
assertTrue(new AirbyteVersion("6.7.8-alpha").lessThan(new AirbyteVersion("6.7.11-alpha")));
assertFalse(new AirbyteVersion("6.8.0-alpha").lessThan(new AirbyteVersion("6.7.8-alpha")));
assertFalse(new AirbyteVersion("6.11.0-alpha").lessThan(new AirbyteVersion("6.7.8-alpha")));
assertTrue(new AirbyteVersion("3.8.0-alpha").lessThan(new AirbyteVersion("6.7.8-alpha")));
assertTrue(new AirbyteVersion("3.8.0-alpha").lessThan(new AirbyteVersion("11.7.8-alpha")));
assertFalse(new AirbyteVersion("1.2.3-prod").lessThan(new AirbyteVersion("dev")));
assertFalse(new AirbyteVersion("dev").lessThan(new AirbyteVersion("1.2.3-prod")));
}

@Test
public void testInvalidVersions() {
assertThrows(NullPointerException.class, () -> new AirbyteVersion(null));
Expand All @@ -61,8 +88,8 @@ public void testInvalidVersions() {

@Test
public void testCheckVersion() {
AirbyteVersion.assertIsCompatible("3.2.1", "3.2.1");
assertThrows(IllegalStateException.class, () -> AirbyteVersion.assertIsCompatible("1.2.3", "3.2.1"));
AirbyteVersion.assertIsCompatible(new AirbyteVersion("3.2.1"), new AirbyteVersion("3.2.1"));
assertThrows(IllegalStateException.class, () -> AirbyteVersion.assertIsCompatible(new AirbyteVersion("1.2.3"), new AirbyteVersion("3.2.1")));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ static MigrationVersion getNextMigrationVersion(final AirbyteVersion currentAirb
System.out.println("lastMigrationId: " + lastMigrationId);
final String nextMigrationId = String.format("%03d", Integer.parseInt(lastMigrationId) + 1);
System.out.println("nextMigrationId: " + nextMigrationId);
return MigrationVersion.fromVersion(String.format("%s_%s", migrationAirbyteVersion.getVersion(), nextMigrationId));
return MigrationVersion.fromVersion(String.format("%s_%s", migrationAirbyteVersion.serialize(), nextMigrationId));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public void testGetCurrentAirbyteVersion() {
public void testGetAirbyteVersion() {
final MigrationVersion migrationVersion = MigrationVersion.fromVersion("0.11.3.010");
final AirbyteVersion airbyteVersion = MigrationDevHelper.getAirbyteVersion(migrationVersion);
assertEquals("0.11.3", airbyteVersion.getVersion());
assertEquals("0.11.3", airbyteVersion.serialize());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@ public static void run(final String[] args) throws IOException {

public static void run(MigrateConfig migrateConfig) throws IOException {
final Path workspaceRoot = Files.createTempDirectory(Path.of("/tmp"), "airbyte_migrate");
migrateConfig = new MigrateConfig(migrateConfig.getInputPath(), migrateConfig.getOutputPath(),
AirbyteVersion.versionWithoutPatch(migrateConfig.getTargetVersion()).getVersion());
migrateConfig = new MigrateConfig(migrateConfig.getInputPath(),
migrateConfig.getOutputPath(),
AirbyteVersion.versionWithoutPatch(migrateConfig.getTargetVersion()).serialize());

if (migrateConfig.getInputPath().toString().endsWith(".gz")) {
LOGGER.info("Unpacking tarball");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,9 @@ public static void main(final String[] args) throws IOException, InterruptedExce
configs.getWorkspaceRetentionConfig(),
workspaceRoot,
jobPersistence);
AirbyteVersion.assertIsCompatible(configs.getAirbyteVersion(), jobPersistence.getVersion().orElseThrow());
AirbyteVersion.assertIsCompatible(
new AirbyteVersion(configs.getAirbyteVersion()),
jobPersistence.getVersion().map(AirbyteVersion::new).orElseThrow());

TrackingClientSingleton.initialize(
configs.getTrackingStrategy(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public static void initStagedResourceFolder() {
}
}

public void importDataWithSeed(final String targetVersion, final File archive, final ConfigPersistence seedPersistence)
public void importDataWithSeed(final AirbyteVersion targetVersion, final File archive, final ConfigPersistence seedPersistence)
throws IOException, JsonValidationException {
final Path sourceRoot = Files.createTempDirectory(Path.of("/tmp"), "airbyte_archive");
try {
Expand All @@ -145,7 +145,7 @@ public void importDataWithSeed(final String targetVersion, final File archive, f

// 5. Set DB version
LOGGER.info("Setting the DB Airbyte version to : " + targetVersion);
jobPersistence.setVersion(targetVersion);
jobPersistence.setVersion(targetVersion.serialize());

// 6. check db version
checkDBVersion(targetVersion);
Expand All @@ -158,10 +158,12 @@ public void importDataWithSeed(final String targetVersion, final File archive, f
configRepository.listStandardWorkspaces(true).forEach(workspace -> TrackingClientSingleton.get().identify(workspace.getWorkspaceId()));
}

private void checkImport(final String targetVersion, final Path tempFolder) throws IOException {
private void checkImport(final AirbyteVersion targetVersion, final Path tempFolder) throws IOException {
final Path versionFile = tempFolder.resolve(VERSION_FILE_NAME);
final String importVersion = Files.readString(versionFile, Charset.defaultCharset())
.replace("\n", "").strip();
final AirbyteVersion importVersion = new AirbyteVersion(Files
.readString(versionFile, Charset.defaultCharset())
.replace("\n", "")
.strip());
LOGGER.info(String.format("Checking Airbyte Version to import %s", importVersion));
if (!AirbyteVersion.isCompatible(targetVersion, importVersion)) {
throw new IOException(String
Expand Down Expand Up @@ -232,7 +234,7 @@ protected static Path buildConfigPath(final Path storageRoot, final ConfigSchema
}

// Postgres Portion
public void importDatabaseFromArchive(final Path storageRoot, final String airbyteVersion) throws IOException {
public void importDatabaseFromArchive(final Path storageRoot, final AirbyteVersion airbyteVersion) throws IOException {
try {
final Map<JobsDatabaseSchema, Stream<JsonNode>> data = new HashMap<>();
for (final JobsDatabaseSchema tableType : JobsDatabaseSchema.values()) {
Expand All @@ -245,7 +247,7 @@ public void importDatabaseFromArchive(final Path storageRoot, final String airby

data.put(tableType, tableStream);
}
jobPersistence.importDatabase(airbyteVersion, data);
jobPersistence.importDatabase(airbyteVersion.serialize(), data);
LOGGER.info("Successful upgrade of airbyte postgres database from archive");
} catch (final Exception e) {
LOGGER.warn("Postgres database version upgrade failed, reverting to state previous to migration.");
Expand Down Expand Up @@ -309,10 +311,10 @@ private Stream<JsonNode> readTableFromArchive(final JobsDatabaseSchema tableSche
}
}

private void checkDBVersion(final String airbyteVersion) throws IOException {
final Optional<String> airbyteDatabaseVersion = jobPersistence.getVersion();
private void checkDBVersion(final AirbyteVersion airbyteVersion) throws IOException {
final Optional<AirbyteVersion> airbyteDatabaseVersion = jobPersistence.getVersion().map(AirbyteVersion::new);
airbyteDatabaseVersion
.ifPresent(dbversion -> AirbyteVersion.assertIsCompatible(airbyteVersion, dbversion));
.ifPresent(dbVersion -> AirbyteVersion.assertIsCompatible(airbyteVersion, dbVersion));
}

public UploadRead uploadArchiveResource(final File archive) {
Expand Down Expand Up @@ -341,7 +343,7 @@ public void deleteArchiveResource(final UUID resourceId) {
FileUtils.deleteQuietly(archive);
}

public void importIntoWorkspace(final String targetVersion, final UUID workspaceId, final File archive)
public void importIntoWorkspace(final AirbyteVersion targetVersion, final UUID workspaceId, final File archive)
throws IOException, JsonValidationException, ConfigNotFoundException {
final Path sourceRoot = Files.createTempDirectory(Path.of("/tmp"), "airbyte_archive");
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package io.airbyte.server;

import io.airbyte.commons.version.AirbyteVersion;
import io.airbyte.config.persistence.ConfigPersistence;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.migrate.MigrateConfig;
Expand All @@ -24,15 +25,15 @@
public class RunMigration implements Runnable, AutoCloseable {

private static final Logger LOGGER = LoggerFactory.getLogger(RunMigration.class);
private final String targetVersion;
private final AirbyteVersion targetVersion;
private final ConfigPersistence seedPersistence;
private final ConfigDumpExporter configDumpExporter;
private final ConfigDumpImporter configDumpImporter;
private final List<File> filesToBeCleanedUp = new ArrayList<>();

public RunMigration(final JobPersistence jobPersistence,
final ConfigRepository configRepository,
final String targetVersion,
final AirbyteVersion targetVersion,
final ConfigPersistence seedPersistence,
final SpecFetcher specFetcher) {
this.targetVersion = targetVersion;
Expand All @@ -55,7 +56,7 @@ public void run() {
filesToBeCleanedUp.add(tempFolder.toFile());

// Run Migration
final MigrateConfig migrateConfig = new MigrateConfig(exportData.toPath(), output.toPath(), targetVersion);
final MigrateConfig migrateConfig = new MigrateConfig(exportData.toPath(), output.toPath(), targetVersion.serialize());
MigrationRunner.run(migrateConfig);

// Import data
Expand Down
26 changes: 11 additions & 15 deletions airbyte-server/src/main/java/io/airbyte/server/ServerApp.java
Original file line number Diff line number Diff line change
Expand Up @@ -199,10 +199,10 @@ public static ServerRunnable getServer(final ServerFactory apiFactory, final Con
// if no workspace exists, we create one so the user starts out with a place to add configuration.
createWorkspaceIfNoneExists(configRepository);

final String airbyteVersion = configs.getAirbyteVersion();
final AirbyteVersion airbyteVersion = new AirbyteVersion(configs.getAirbyteVersion());
if (jobPersistence.getVersion().isEmpty()) {
LOGGER.info(String.format("Setting Database version to %s...", airbyteVersion));
jobPersistence.setVersion(airbyteVersion);
jobPersistence.setVersion(airbyteVersion.serialize());
}

final JobTracker jobTracker = new JobTracker(configRepository, jobPersistence, trackingClient);
Expand All @@ -222,17 +222,16 @@ public static ServerRunnable getServer(final ServerFactory apiFactory, final Con
// TODO: remove this specFetcherFn logic once file migrations are deprecated
configRepository.setSpecFetcher(dockerImage -> Exceptions.toRuntime(() -> specFetcher.getSpec(dockerImage)));

Optional<String> airbyteDatabaseVersion = jobPersistence.getVersion();
Optional<AirbyteVersion> airbyteDatabaseVersion = jobPersistence.getVersion().map(AirbyteVersion::new);
if (airbyteDatabaseVersion.isPresent() && isDatabaseVersionBehindAppVersion(airbyteVersion, airbyteDatabaseVersion.get())) {
final boolean isKubernetes = configs.getWorkerEnvironment() == WorkerEnvironment.KUBERNETES;
final boolean versionSupportsAutoMigrate =
new AirbyteVersion(airbyteDatabaseVersion.get()).patchVersionCompareTo(KUBE_SUPPORT_FOR_AUTOMATIC_MIGRATION) >= 0;
final boolean versionSupportsAutoMigrate = airbyteDatabaseVersion.get().greaterThanOrEqualTo(KUBE_SUPPORT_FOR_AUTOMATIC_MIGRATION);
if (!isKubernetes || versionSupportsAutoMigrate) {
runAutomaticMigration(configRepository, jobPersistence, seed, specFetcher, airbyteVersion, airbyteDatabaseVersion.get());
// After migration, upgrade the DB version
airbyteDatabaseVersion = jobPersistence.getVersion();
airbyteDatabaseVersion = jobPersistence.getVersion().map(AirbyteVersion::new);
} else {
LOGGER.info("Can not run automatic migration for Airbyte on KUBERNETES before version " + KUBE_SUPPORT_FOR_AUTOMATIC_MIGRATION.getVersion());
LOGGER.info("Can not run automatic migration for Airbyte on KUBERNETES before version " + KUBE_SUPPORT_FOR_AUTOMATIC_MIGRATION.serialize());
}
}

Expand Down Expand Up @@ -271,9 +270,9 @@ private static void runAutomaticMigration(final ConfigRepository configRepositor
final JobPersistence jobPersistence,
final ConfigPersistence seed,
final SpecFetcher specFetcher,
final String airbyteVersion,
final String airbyteDatabaseVersion) {
LOGGER.info("Running Automatic Migration from version : " + airbyteDatabaseVersion + " to version : " + airbyteVersion);
final AirbyteVersion airbyteVersion,
final AirbyteVersion airbyteDatabaseVersion) {
LOGGER.info("Running Automatic Migration from version : " + airbyteDatabaseVersion.serialize() + " to version : " + airbyteVersion.serialize());
try (final RunMigration runMigration = new RunMigration(
jobPersistence,
configRepository,
Expand All @@ -286,15 +285,12 @@ private static void runAutomaticMigration(final ConfigRepository configRepositor
}
}

public static boolean isDatabaseVersionBehindAppVersion(final String airbyteVersion, final String airbyteDatabaseVersion) {
final boolean bothVersionsCompatible = AirbyteVersion.isCompatible(airbyteVersion, airbyteDatabaseVersion);
public static boolean isDatabaseVersionBehindAppVersion(final AirbyteVersion serverVersion, final AirbyteVersion databaseVersion) {
final boolean bothVersionsCompatible = AirbyteVersion.isCompatible(serverVersion, databaseVersion);
if (bothVersionsCompatible) {
return false;
}

final AirbyteVersion serverVersion = new AirbyteVersion(airbyteVersion);
final AirbyteVersion databaseVersion = new AirbyteVersion(airbyteDatabaseVersion);

if (databaseVersion.getMajorVersion().compareTo(serverVersion.getMajorVersion()) < 0) {
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
import io.airbyte.api.model.WorkspaceReadList;
import io.airbyte.api.model.WorkspaceUpdate;
import io.airbyte.commons.io.FileTtlManager;
import io.airbyte.commons.version.AirbyteVersion;
import io.airbyte.config.Configs;
import io.airbyte.config.persistence.ConfigNotFoundException;
import io.airbyte.config.persistence.ConfigPersistence;
Expand Down Expand Up @@ -192,7 +193,7 @@ public ConfigurationApi(final ConfigRepository configRepository,
webBackendDestinationsHandler = new WebBackendDestinationsHandler(destinationHandler, configRepository, trackingClient);
healthCheckHandler = new HealthCheckHandler(configRepository);
archiveHandler = new ArchiveHandler(
configs.getAirbyteVersion(),
new AirbyteVersion(configs.getAirbyteVersion()),
configRepository,
jobPersistence,
seed,
Expand Down
Loading