Skip to content

Commit

Permalink
Merge pull request #70 from mgcuthbert/master
Browse files Browse the repository at this point in the history
Adding database locking functionality
  • Loading branch information
migurski authored Jun 10, 2020
2 parents 22fc47c + a401ada commit 107d52a
Show file tree
Hide file tree
Showing 15 changed files with 440 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import java.util.logging.Level;
import java.util.logging.Logger;

import javax.sql.DataSource;

import org.apache.commons.dbcp.BasicDataSource;
import org.openstreetmap.osmosis.core.OsmosisRuntimeException;
import org.openstreetmap.osmosis.core.database.DatabaseLoginCredentials;
Expand Down Expand Up @@ -64,6 +66,14 @@ public DatabaseContext2(DatabaseLoginCredentials loginCredentials) {
}
}

/**
* Retrieves the data source associated with this database context.
*
* @return {@link DataSource}
*/
public DataSource getDataSource() {
return this.dataSource;
}

private OsmosisRuntimeException createUnknownDbTypeException() {
return new OsmosisRuntimeException("Unknown database type " + dbType + ".");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.openstreetmap.osmosis.apidb.v0_6.impl.DeltaToDiffReader;
import org.openstreetmap.osmosis.apidb.v0_6.impl.SchemaVersionValidator;
import org.openstreetmap.osmosis.core.container.v0_6.ChangeContainer;
import org.openstreetmap.osmosis.core.database.DatabaseLocker;
import org.openstreetmap.osmosis.core.database.DatabaseLoginCredentials;
import org.openstreetmap.osmosis.core.database.DatabasePreferences;
import org.openstreetmap.osmosis.core.lifecycle.ReleasableIterator;
Expand Down Expand Up @@ -109,13 +110,17 @@ protected void runImpl(DatabaseContext2 dbCtx) {
* Reads all data from the database and send it to the sink.
*/
public void run() {
try (DatabaseContext2 dbCtx = new DatabaseContext2(loginCredentials)) {
try (DatabaseContext2 dbCtx = new DatabaseContext2(loginCredentials);
DatabaseLocker locker = new DatabaseLocker(dbCtx.getDataSource(), false)) {
dbCtx.executeWithinTransaction(new TransactionCallbackWithoutResult() {
@Override
protected void doInTransactionWithoutResult(TransactionStatus arg0) {
locker.lockDatabase(this.getClass().getSimpleName());
runImpl(dbCtx);
} });

}
} catch (final Exception e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@
import java.util.HashMap;
import java.util.Map;

import org.openstreetmap.osmosis.apidb.common.DatabaseContext2;
import org.openstreetmap.osmosis.core.OsmosisRuntimeException;
import org.openstreetmap.osmosis.apidb.v0_6.impl.ActionChangeWriter;
import org.openstreetmap.osmosis.apidb.v0_6.impl.ChangeWriter;
import org.openstreetmap.osmosis.apidb.v0_6.impl.SchemaVersionValidator;
import org.openstreetmap.osmosis.core.container.v0_6.ChangeContainer;
import org.openstreetmap.osmosis.core.database.DatabaseLocker;
import org.openstreetmap.osmosis.core.database.DatabaseLoginCredentials;
import org.openstreetmap.osmosis.core.database.DatabasePreferences;
import org.openstreetmap.osmosis.core.task.common.ChangeAction;
Expand All @@ -27,6 +29,8 @@ public class ApidbChangeWriter implements ChangeSink {
private final Map<ChangeAction, ActionChangeWriter> actionWriterMap;

private final SchemaVersionValidator schemaVersionValidator;
private final DatabaseContext2 dbCtx;
private final DatabaseLocker locker;

/**
* Creates a new instance.
Expand All @@ -39,12 +43,14 @@ public class ApidbChangeWriter implements ChangeSink {
public ApidbChangeWriter(DatabaseLoginCredentials loginCredentials, DatabasePreferences preferences,
boolean populateCurrentTables) {
changeWriter = new ChangeWriter(loginCredentials, populateCurrentTables);
actionWriterMap = new HashMap<ChangeAction, ActionChangeWriter>();
actionWriterMap = new HashMap<>();
actionWriterMap.put(ChangeAction.Create, new ActionChangeWriter(changeWriter, ChangeAction.Create));
actionWriterMap.put(ChangeAction.Modify, new ActionChangeWriter(changeWriter, ChangeAction.Modify));
actionWriterMap.put(ChangeAction.Delete, new ActionChangeWriter(changeWriter, ChangeAction.Delete));

schemaVersionValidator = new SchemaVersionValidator(loginCredentials, preferences);
dbCtx = new DatabaseContext2(loginCredentials);
locker = new DatabaseLocker(dbCtx.getDataSource(), true);
}

/**
Expand All @@ -59,6 +65,7 @@ public void initialize(Map<String, Object> metaData) {
*/
public void process(ChangeContainer change) {
ChangeAction action;
this.locker.lockDatabase(this.getClass().getSimpleName());

// Verify that the schema version is supported.
schemaVersionValidator.validateVersion(ApidbVersionConstants.SCHEMA_MIGRATIONS);
Expand All @@ -85,6 +92,8 @@ public void complete() {
* {@inheritDoc}
*/
public void close() {
this.locker.unlockDatabase();
changeWriter.close();
this.dbCtx.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.openstreetmap.osmosis.apidb.v0_6.impl.SchemaVersionValidator;
import org.openstreetmap.osmosis.core.container.v0_6.BoundContainer;
import org.openstreetmap.osmosis.core.container.v0_6.EntityContainer;
import org.openstreetmap.osmosis.core.database.DatabaseLocker;
import org.openstreetmap.osmosis.core.database.DatabaseLoginCredentials;
import org.openstreetmap.osmosis.core.database.DatabasePreferences;
import org.openstreetmap.osmosis.core.domain.v0_6.Bound;
Expand Down Expand Up @@ -90,13 +91,16 @@ protected void runImpl(DatabaseContext2 dbCtx) {
* Reads all data from the database and send it to the sink.
*/
public void run() {
try (DatabaseContext2 dbCtx = new DatabaseContext2(loginCredentials)) {
try (DatabaseContext2 dbCtx = new DatabaseContext2(loginCredentials);
DatabaseLocker locker = new DatabaseLocker(dbCtx.getDataSource(), false)) {
dbCtx.executeWithinTransaction(new TransactionCallbackWithoutResult() {

@Override
protected void doInTransactionWithoutResult(TransactionStatus arg0) {
locker.lockDatabase(this.getClass().getSimpleName());
runImpl(dbCtx);
} });
}
} catch (final Exception e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.openstreetmap.osmosis.apidb.v0_6.impl.TimeDao;
import org.openstreetmap.osmosis.apidb.v0_6.impl.TransactionDao;
import org.openstreetmap.osmosis.apidb.v0_6.impl.TransactionManager;
import org.openstreetmap.osmosis.core.database.DatabaseLocker;
import org.openstreetmap.osmosis.core.database.DatabaseLoginCredentials;
import org.openstreetmap.osmosis.core.database.DatabasePreferences;
import org.openstreetmap.osmosis.core.task.v0_6.ChangeSink;
Expand Down Expand Up @@ -95,8 +96,12 @@ protected void runImpl(DatabaseContext2 dbCtx) {
*/
@Override
public void run() {
try (DatabaseContext2 dbCtx = new DatabaseContext2(loginCredentials)) {
try (DatabaseContext2 dbCtx = new DatabaseContext2(loginCredentials);
DatabaseLocker locker = new DatabaseLocker(dbCtx.getDataSource(), false)) {
locker.lockDatabase(this.getClass().getSimpleName());
runImpl(dbCtx);
}
} catch (final Exception e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.openstreetmap.osmosis.apidb.v0_6.impl.SchemaVersionValidator;
import org.openstreetmap.osmosis.core.container.v0_6.BoundContainer;
import org.openstreetmap.osmosis.core.container.v0_6.EntityContainer;
import org.openstreetmap.osmosis.core.database.DatabaseLocker;
import org.openstreetmap.osmosis.core.database.DatabaseLoginCredentials;
import org.openstreetmap.osmosis.core.database.DatabasePreferences;
import org.openstreetmap.osmosis.core.domain.v0_6.Bound;
Expand Down Expand Up @@ -95,13 +96,17 @@ protected void runImpl(DatabaseContext2 dbCtx) {
* Reads all data from the database and send it to the sink.
*/
public void run() {
try (DatabaseContext2 dbCtx = new DatabaseContext2(loginCredentials)) {
try (DatabaseContext2 dbCtx = new DatabaseContext2(loginCredentials);
DatabaseLocker locker = new DatabaseLocker(dbCtx.getDataSource(), false)) {
dbCtx.executeWithinTransaction(new TransactionCallbackWithoutResult() {
@Override
protected void doInTransactionWithoutResult(TransactionStatus arg0) {
locker.lockDatabase(this.getClass().getSimpleName());
runImpl(dbCtx);
}
});
}
} catch (final Exception e) {
throw new RuntimeException(e);
}
}
}
2 changes: 2 additions & 0 deletions osmosis-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ dependencies {
compile group: 'com.fasterxml.woodstox', name: 'woodstox-core', version: dependencyVersionWoodstoxCore
compile group: 'org.codehaus.woodstox', name: 'stax2-api', version: dependencyVersionWoodstoxStax2
compile group: 'org.apache.commons', name: 'commons-compress', version: dependencyVersionCommonsCompress
compile group: 'xerces', name: 'xercesImpl', version: dependencyVersionXerces
compile group: 'org.springframework', name: 'spring-jdbc', version: dependencyVersionSpring
}

/*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
// This software is released into the Public Domain. See copying.txt for details.
package org.openstreetmap.osmosis.core.database;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.logging.Logger;

import javax.sql.DataSource;

/**
* A check against the database to see if it is locked.
*
* @author mcuthbert
*/
public class DatabaseLocker implements AutoCloseable {

private static Logger logger = Logger.getLogger(DatabaseLocker.class.getSimpleName());
private final DataSource source;
private final boolean writeLock;
private boolean enabled = true;
private int lockedIdentifier = -1;

/**
* Static function to fully unlock the database.
*
* @param source {@link DataSource} to execute the query
*/
public static void fullUnlockDatabase(final DataSource source) {
unlockDatabase(-1, source);
}

private static void unlockDatabase(final int identifier, final DataSource source) {
try (Connection connection = source.getConnection();
PreparedStatement statement = connection.prepareStatement("SELECT unlock_database(?)")) {
statement.setInt(1, identifier);
final ResultSet result = statement.executeQuery();
if (result.next()) {
final boolean unlocked = result.getBoolean(1);
if (unlocked) {
logger.info(String.format("Unlocked database using identifier %d.", identifier));
return;
}
}
throw new RuntimeException("Failed to unlock the database");
} catch (final SQLException e) {
throw new RuntimeException("Failed to unlock the database", e);
}
}

/**
* Default Constructor.
*
* @param source The DataSource for the connection
* @param writeLock Whether the lock that is being requested is a write lock
*/
public DatabaseLocker(final DataSource source, final boolean writeLock) {
this.source = source;
this.writeLock = writeLock;
// check to see if the function exists in the database and if it doesn't then throw a
// warning on the log and don't try any locking
try (Connection connection = source.getConnection();
Statement statement = connection.createStatement()) {
statement.executeQuery("SELECT 'lock_database'::regproc, 'unlock_database'::regproc");
} catch (final Exception e) {
logger.warning("Locking functions do not exist in database. Disabling locking.");
this.enabled = false;
}
}

/**
* Helper function for primary lock database function that simply sets the source to empty string.
*
* @param process The process that is locking the database
*/
public void lockDatabase(final String process) {
this.lockDatabase(process, "");
}

/**
* Will attempt to lock the database.
*
* @param process
* The process that will lock the database. This would be something like "Extracts" or "Replication".
* @param description
* The source of the process. This is basically a description for the process, like
* "Pipeline Extraction for Build X".
*/
public void lockDatabase(final String process, final String description) {
if (this.enabled) {
String lockName = "read";
if (this.writeLock) {
lockName = "write";
}
try (Connection connection = source.getConnection();
PreparedStatement statement = connection.prepareStatement("SELECT lock_database(?, ?, ?, ?)")) {
statement.setString(1, process);
statement.setString(2, description);
statement.setString(3, InetAddress.getLocalHost().getHostName());
statement.setBoolean(4, this.writeLock);
final ResultSet result = statement.executeQuery();
if (result.next()) {
this.lockedIdentifier = result.getInt(1);
logger.info(String.format("Obtained %s lock to database for process: %s "
+ "from source: '%s', with lockedID: %d",
lockName, process, description, this.lockedIdentifier));
} else {
throw new RuntimeException("Failed to retrieve lock for the database.");
}
} catch (final SQLException | UnknownHostException e) {
throw new RuntimeException("Failed to lock the database.", e);
}
}
}

/**
* Will attempt to unlock the database with the given locked identifier.
*/
public void unlockDatabase() {
if (this.enabled && this.lockedIdentifier > 0) {
unlockDatabase(this.lockedIdentifier, this.source);
}
}

/**
* Does a full unlock of the database. So no matter what is locking it, this will unlock it.
*/
public void fullUnlockDatabase() {
if (this.enabled) {
fullUnlockDatabase(this.source);
}
}

@Override
public void close() throws Exception {
this.unlockDatabase();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,14 @@ public DatabaseContext(DatabaseLoginCredentials loginCredentials) {
}
}

/**
* Returns the DataSource associated with this database context.
*
* @return {@link DataSource}
*/
public DataSource getDataSource() {
return this.dataSource;
}

/**
* Begins a new database transaction. This is not required if
Expand Down
Loading

0 comments on commit 107d52a

Please sign in to comment.