Skip to content

Commit

Permalink
AG-239 - Recovery connections from the pool
Browse files Browse the repository at this point in the history
  • Loading branch information
barreiro committed May 21, 2024
1 parent 151d15c commit 896c51a
Show file tree
Hide file tree
Showing 11 changed files with 552 additions and 46 deletions.
12 changes: 8 additions & 4 deletions agroal-pool/src/main/java/io/agroal/pool/ConnectionFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -184,11 +184,11 @@ private Properties jdbcProperties() {

private Properties recoveryProperties() {
Properties properties = new Properties();
// use the main credentials when recovery credentials are not provided
if ( configuration.recoveryPrincipal() == null && ( configuration.recoveryCredentials() == null || configuration.recoveryCredentials().isEmpty() ) ) {
properties.putAll( securityProperties( configuration.principal(), configuration.credentials() ) );
} else {
if ( hasRecoveryCredentials() ) {
properties.putAll( securityProperties( configuration.recoveryPrincipal(), configuration.recoveryCredentials() ) );
} else {
// use the main credentials when recovery credentials are not provided
properties.putAll( securityProperties( configuration.principal(), configuration.credentials() ) );
}
return properties;
}
Expand Down Expand Up @@ -269,6 +269,10 @@ private XAConnection xaConnectionSetup(XAConnection xaConnection) throws SQLExce

// --- //

public boolean hasRecoveryCredentials() {
return configuration.recoveryPrincipal() != null || ( configuration.recoveryCredentials() != null && !configuration.recoveryCredentials().isEmpty() ) ;
}

@Override
public boolean isRecoverable() {
if ( factoryMode == Mode.XA_DATASOURCE ) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.agroal.pool.util.AutoCloseableElement;
import io.agroal.pool.util.UncheckedArrayList;
import io.agroal.pool.wrapper.ConnectionWrapper;
import io.agroal.pool.wrapper.XAConnectionWrapper;

import javax.sql.XAConnection;
import javax.transaction.xa.XAResource;
Expand Down Expand Up @@ -94,6 +95,10 @@ public ConnectionHandler(XAConnection xa, Pool pool) throws SQLException {
touch();
}

public XAConnectionWrapper xaConnectionWrapper() {
return new XAConnectionWrapper( this, xaConnection, connectionPool.getConfiguration().connectionFactoryConfiguration().trackJdbcResources() );
}

public ConnectionWrapper connectionWrapper() {
return new ConnectionWrapper( this, connectionPool.getConfiguration().connectionFactoryConfiguration().trackJdbcResources(), enlisted ? enlistedOpenWrappers : null );
}
Expand Down
51 changes: 43 additions & 8 deletions agroal-pool/src/main/java/io/agroal/pool/ConnectionPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import io.agroal.pool.util.PriorityScheduledExecutor;
import io.agroal.pool.util.StampedCopyOnWriteArrayList;

import javax.sql.XAConnection;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Collection;
Expand Down Expand Up @@ -133,8 +134,7 @@ public void init() {
if ( reapEnabled ) {
housekeepingExecutor.schedule( new ReapTask(), configuration.reapTimeout().toNanos(), NANOSECONDS );
}

transactionIntegration.addResourceRecoveryFactory( connectionFactory );
transactionIntegration.addResourceRecoveryFactory( connectionFactory.hasRecoveryCredentials() ? connectionFactory : this );

// fill to the initial size
if ( configuration.initialSize() < configuration.minSize() ) {
Expand Down Expand Up @@ -187,7 +187,7 @@ public void flushPool(AgroalDataSource.FlushMode mode) {

@Override
public void close() {
transactionIntegration.removeResourceRecoveryFactory( connectionFactory );
transactionIntegration.removeResourceRecoveryFactory( connectionFactory.hasRecoveryCredentials() ? connectionFactory : this );

for ( Runnable task : housekeepingExecutor.shutdownNow() ) {
if ( task instanceof DestroyConnectionTask ) {
Expand All @@ -207,6 +207,40 @@ public void close() {

// --- //

@Override
public boolean isRecoverable() {
return connectionFactory.isRecoverable();
}

@Override
public XAConnection getRecoveryConnection() throws SQLException {
long stamp = beforeAcquire();
checkMultipleAcquisition();
ConnectionHandler checkedOutHandler = null;

try {
do {
checkedOutHandler = (ConnectionHandler) localCache.get();
if ( checkedOutHandler == null ) {
checkedOutHandler = handlerFromSharedCache();
}
} while ( ( borrowValidationEnabled && !borrowValidation( checkedOutHandler ) )
|| ( idleValidationEnabled && !idleValidation( checkedOutHandler ) ) );

activeCount.increment();
fireOnConnectionAcquiredInterceptor( interceptors, checkedOutHandler );
afterAcquire( stamp, checkedOutHandler, false );
return checkedOutHandler.xaConnectionWrapper();
} catch ( Throwable t ) {
if ( checkedOutHandler != null ) {
checkedOutHandler.setState( CHECKED_OUT, CHECKED_IN );
}
throw t;
}
}

// --- //

private long beforeAcquire() throws SQLException {
fireBeforeConnectionAcquire( listeners );
if ( housekeepingExecutor.isShutdown() ) {
Expand Down Expand Up @@ -242,7 +276,8 @@ public Connection getConnection() throws SQLException {
if ( checkedOutHandler != null ) {
// AG-140 - If associate throws here is fine, it's assumed the synchronization that returns the connection has been registered
transactionIntegration.associate( checkedOutHandler, checkedOutHandler.getXaResource() );
return afterAcquire( stamp, checkedOutHandler );
afterAcquire( stamp, checkedOutHandler, true);
return checkedOutHandler.connectionWrapper();
}
checkMultipleAcquisition();

Expand All @@ -258,7 +293,8 @@ public Connection getConnection() throws SQLException {

activeCount.increment();
fireOnConnectionAcquiredInterceptor( interceptors, checkedOutHandler );
return afterAcquire( stamp, checkedOutHandler );
afterAcquire( stamp, checkedOutHandler, true );
return checkedOutHandler.connectionWrapper();
} catch ( Throwable t ) {
if ( checkedOutHandler != null ) {
// AG-140 - Return the connection to the pool to prevent leak
Expand Down Expand Up @@ -368,11 +404,11 @@ private boolean performValidation(ConnectionHandler handler, ConnectionHandler.S
}
}

private Connection afterAcquire(long metricsStamp, ConnectionHandler checkedOutHandler) throws SQLException {
private void afterAcquire(long metricsStamp, ConnectionHandler checkedOutHandler, boolean verifyEnlistment) throws SQLException {
metricsRepository.afterConnectionAcquire( metricsStamp );
fireOnConnectionAcquired( listeners, checkedOutHandler );

if ( !checkedOutHandler.isEnlisted() ) {
if ( verifyEnlistment && !checkedOutHandler.isEnlisted() ) {
switch ( configuration.transactionRequirement() ) {
case STRICT:
returnConnectionHandler( checkedOutHandler );
Expand All @@ -397,7 +433,6 @@ private Connection afterAcquire(long metricsStamp, ConnectionHandler checkedOutH
checkedOutHandler.setAcquisitionStackTrace( currentThread().getStackTrace() );
}
}
return checkedOutHandler.connectionWrapper();
}

// --- //
Expand Down
3 changes: 2 additions & 1 deletion agroal-pool/src/main/java/io/agroal/pool/Pool.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import io.agroal.api.AgroalPoolInterceptor;
import io.agroal.api.configuration.AgroalConnectionPoolConfiguration;
import io.agroal.api.configuration.AgroalDataSourceConfiguration.MetricsEnabledListener;
import io.agroal.api.transaction.TransactionIntegration.ResourceRecoveryFactory;

import java.sql.Connection;
import java.sql.SQLException;
Expand All @@ -18,7 +19,7 @@
/**
* @author <a href="lbarreiro@redhat.com">Luis Barreiro</a>
*/
public interface Pool extends MetricsEnabledListener, AutoCloseable {
public interface Pool extends MetricsEnabledListener, AutoCloseable, ResourceRecoveryFactory {

void init();

Expand Down
37 changes: 28 additions & 9 deletions agroal-pool/src/main/java/io/agroal/pool/Poolless.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import io.agroal.pool.util.AgroalSynchronizer;
import io.agroal.pool.util.StampedCopyOnWriteArrayList;

import javax.sql.XAConnection;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Collection;
Expand Down Expand Up @@ -56,7 +57,7 @@

/**
* Alternative implementation of ConnectionPool for the special case of flush-on-close (and min-size == 0)
* In particular, this removes the need for the executor. Also there is no thread-local connection cache as connections are not reused
* In particular, this removes the need for the executor. Also, there is no thread-local connection cache as connections are not reused
*
* @author <a href="lbarreiro@redhat.com">Luis Barreiro</a>
*/
Expand Down Expand Up @@ -111,8 +112,7 @@ public void init() {
if ( configuration.minSize() != 0 ) {
fireOnInfo( listeners, "Min size always zero in pool-less mode" );
}

transactionIntegration.addResourceRecoveryFactory( connectionFactory );
transactionIntegration.addResourceRecoveryFactory( connectionFactory.hasRecoveryCredentials() ? connectionFactory : this );
}

public AgroalConnectionPoolConfiguration getConfiguration() {
Expand Down Expand Up @@ -147,9 +147,27 @@ public void setPoolInterceptors(Collection<? extends AgroalPoolInterceptor> list

// --- //

@Override
public boolean isRecoverable() {
return connectionFactory.isRecoverable();
}

@Override
public XAConnection getRecoveryConnection() throws SQLException {
long stamp = beforeAcquire();
checkMultipleAcquisition();

ConnectionHandler checkedOutHandler = handlerFromSharedCache();
fireOnConnectionAcquiredInterceptor( interceptors, checkedOutHandler );
afterAcquire( stamp, checkedOutHandler, false );
return checkedOutHandler.xaConnectionWrapper();
}

// --- //

@Override
public void close() {
transactionIntegration.removeResourceRecoveryFactory( connectionFactory );
transactionIntegration.removeResourceRecoveryFactory( connectionFactory.hasRecoveryCredentials() ? connectionFactory : this );
shutdown = true;

for ( ConnectionHandler handler : allConnections ) {
Expand Down Expand Up @@ -197,15 +215,17 @@ public Connection getConnection() throws SQLException {
if ( checkedOutHandler != null ) {
// AG-140 - If associate throws here is fine, it's assumed the synchronization that returns the connection has been registered
transactionIntegration.associate( checkedOutHandler, checkedOutHandler.getXaResource() );
return afterAcquire( stamp, checkedOutHandler );
afterAcquire( stamp, checkedOutHandler, false );
return checkedOutHandler.connectionWrapper();
}
checkMultipleAcquisition();

try {
checkedOutHandler = handlerFromSharedCache();
transactionIntegration.associate( checkedOutHandler, checkedOutHandler.getXaResource() );
fireOnConnectionAcquiredInterceptor( interceptors, checkedOutHandler );
return afterAcquire( stamp, checkedOutHandler );
afterAcquire( stamp, checkedOutHandler, true );
return checkedOutHandler.connectionWrapper();
} catch ( Throwable t ) {
if ( checkedOutHandler != null ) {
// AG-140 - Flush handler to prevent leak
Expand Down Expand Up @@ -254,11 +274,11 @@ private ConnectionHandler handlerFromSharedCache() throws SQLException {
}

@SuppressWarnings( "SingleCharacterStringConcatenation" )
private Connection afterAcquire(long metricsStamp, ConnectionHandler checkedOutHandler) throws SQLException {
private void afterAcquire(long metricsStamp, ConnectionHandler checkedOutHandler, boolean verifyEnlistment) throws SQLException {
metricsRepository.afterConnectionAcquire( metricsStamp );
fireOnConnectionAcquired( listeners, checkedOutHandler );

if ( !checkedOutHandler.isEnlisted() ) {
if ( verifyEnlistment && !checkedOutHandler.isEnlisted() ) {
switch ( configuration.transactionRequirement() ) {
case STRICT:
returnConnectionHandler( checkedOutHandler );
Expand All @@ -282,7 +302,6 @@ private Connection afterAcquire(long metricsStamp, ConnectionHandler checkedOutH
checkedOutHandler.setAcquisitionStackTrace( copyOfRange( stackTrace, 4, stackTrace.length ) );
}
}
return checkedOutHandler.connectionWrapper();
}

// --- //
Expand Down
Loading

0 comments on commit 896c51a

Please sign in to comment.