Skip to content

Commit

Permalink
Fix concurrent upgrades
Browse files Browse the repository at this point in the history
  • Loading branch information
badgerwithagun committed Dec 21, 2023
1 parent 6622356 commit e29b932
Show file tree
Hide file tree
Showing 3 changed files with 178 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ public String booleanValue(boolean criteriaValue) {
@Override
public void createVersionTableIfNotExists(Connection connection) throws SQLException {
try (Statement s = connection.createStatement()) {
s.execute("CREATE TABLE IF NOT EXISTS TXNO_VERSION (version INT)");
s.execute(
"CREATE TABLE IF NOT EXISTS TXNO_VERSION (id INT DEFAULT 0, version INT, PRIMARY KEY (id))");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,14 @@
import java.io.PrintWriter;
import java.io.Writer;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import lombok.extern.slf4j.Slf4j;

/**
Expand All @@ -17,6 +22,25 @@
@Slf4j
class DefaultMigrationManager {

private static final Executor basicExecutor =
runnable -> {
new Thread(runnable).start();
};

private static CountDownLatch waitLatch;
private static CountDownLatch readyLatch;

static void withLatch(CountDownLatch readyLatch, Consumer<CountDownLatch> runnable) {
waitLatch = new CountDownLatch(1);
DefaultMigrationManager.readyLatch = readyLatch;
try {
runnable.accept(waitLatch);
} finally {
waitLatch = null;
DefaultMigrationManager.readyLatch = null;
}
}

static void migrate(TransactionManager transactionManager, Dialect dialect) {
transactionManager.inTransaction(
transaction -> {
Expand All @@ -25,7 +49,10 @@ static void migrate(TransactionManager transactionManager, Dialect dialect) {
dialect
.getMigrations()
.filter(migration -> migration.getVersion() > currentVersion)
.forEach(migration -> uncheck(() -> runSql(transaction.connection(), migration)));
.forEach(
migration ->
uncheck(
() -> runSql(transactionManager, transaction.connection(), migration)));
} catch (Exception e) {
throw new RuntimeException("Migrations failed", e);
}
Expand All @@ -52,28 +79,83 @@ static void writeSchema(Writer writer, Dialect dialect) {
printWriter.flush();
}

private static void runSql(Connection connection, Migration migration) throws SQLException {
log.info("Running migration: {}", migration.getName());
try (Statement s = connection.createStatement()) {
if (migration.getSql() != null && !migration.getSql().isEmpty()) {
s.execute(migration.getSql());
}
if (s.executeUpdate("UPDATE TXNO_VERSION SET version = " + migration.getVersion()) != 1) {
// TODO shouldn't be necessary if the lock is done correctly
s.execute("INSERT INTO TXNO_VERSION VALUES (" + migration.getVersion() + ")");
private static void runSql(TransactionManager txm, Connection connection, Migration migration)
throws SQLException {
log.info("Running migration {}: {}", migration.getVersion(), migration.getName());

if (migration.getSql() != null && !migration.getSql().isEmpty()) {
CompletableFuture.runAsync(
() -> {
try {
txm.inTransactionThrows(
tx -> {
try (var s = tx.connection().prepareStatement(migration.getSql())) {
s.execute();
}
});
} catch (SQLException e) {
throw new RuntimeException(e);
}
},
basicExecutor)
.join();
}

try (var s = connection.prepareStatement("UPDATE TXNO_VERSION SET version = ?")) {
s.setInt(1, migration.getVersion());
if (s.executeUpdate() != 1) {
throw new IllegalStateException("Version table should already exist");
}
}
}

private static int currentVersion(Connection connection, Dialect dialect) throws SQLException {
dialect.createVersionTableIfNotExists(connection);
try (Statement s = connection.createStatement();
ResultSet rs = s.executeQuery("SELECT version FROM TXNO_VERSION FOR UPDATE")) {
if (!rs.next()) {
// TODO should attempt to "win" at creating the record and then lock it
return 0;
int version = fetchCurrentVersion(connection);
if (version >= 0) {
return version;
}
try {
log.info("No version record found. Attempting to create");
if (waitLatch != null) {
log.info("Stopping at latch");
readyLatch.countDown();
if (!waitLatch.await(10, TimeUnit.SECONDS)) {
throw new IllegalStateException("Latch not released in 10 seconds");
}
log.info("Latch released");
}
try (var s = connection.prepareStatement("INSERT INTO TXNO_VERSION (version) VALUES (0)")) {
s.executeUpdate();
}
log.info("Created version record.");
return fetchCurrentVersion(connection);
} catch (Exception e) {
log.info(
"Error attempting to create ({} - {}). May have been beaten to it, attempting second fetch",
e.getClass().getSimpleName(),
e.getMessage());
version = fetchCurrentVersion(connection);
if (version >= 0) {
return version;
}
throw new IllegalStateException("Unable to read or create version record", e);
}
}

private static int fetchCurrentVersion(Connection connection) throws SQLException {
try (PreparedStatement s =
connection.prepareStatement("SELECT version FROM TXNO_VERSION FOR UPDATE");
ResultSet rs = s.executeQuery()) {
if (rs.next()) {
var version = rs.getInt(1);
log.info("Current version is {}, obtained lock", version);
if (rs.next()) {
throw new IllegalStateException("More than one version record");
}
return version;
}
return rs.getInt(1);
return -1;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package com.gruelbox.transactionoutbox;

import static com.gruelbox.transactionoutbox.Dialect.H2;
import static org.junit.jupiter.api.Assertions.fail;

import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import java.util.concurrent.*;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

@Slf4j
public class TestDefaultMigrationManager {

private static HikariDataSource dataSource;

@BeforeAll
static void beforeAll() {
HikariConfig config = new HikariConfig();
config.setJdbcUrl(
"jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;DEFAULT_LOCK_TIMEOUT=60000;LOB_TIMEOUT=2000;MV_STORE=TRUE;DATABASE_TO_UPPER=FALSE");
config.setUsername("test");
config.setPassword("test");
config.addDataSourceProperty("cachePrepStmts", "true");
dataSource = new HikariDataSource(config);
}

@AfterAll
static void afterAll() {
dataSource.close();
}

@Test
void parallelMigrations() {
CountDownLatch readyLatch = new CountDownLatch(2);
DefaultMigrationManager.withLatch(
readyLatch,
waitLatch -> {
Executor executor = runnable -> new Thread(runnable).start();
TransactionManager txm = TransactionManager.fromDataSource(dataSource);
CompletableFuture<?> threads =
CompletableFuture.allOf(
CompletableFuture.runAsync(
() -> {
try {
DefaultMigrationManager.migrate(txm, H2);
} catch (Exception e) {
log.error("Thread 1 failed", e);
throw e;
}
},
executor),
CompletableFuture.runAsync(
() -> {
try {
DefaultMigrationManager.migrate(txm, H2);
} catch (Exception e) {
log.error("Thread 2 failed", e);
throw e;
}
},
executor));
try {
if (!readyLatch.await(15, TimeUnit.SECONDS)) {
throw new TimeoutException();
}
waitLatch.countDown();
} catch (InterruptedException | TimeoutException e) {
fail("Timed out or interrupted waiting for ready latch");
} finally {
threads.join();
}
});
}
}

0 comments on commit e29b932

Please sign in to comment.