Skip to content

Commit

Permalink
Merge pull request #149 from graben/JBTM-3879
Browse files Browse the repository at this point in the history
Redesign DataSourceXAResourceRecoveryHelper to be compatible with JBTM-3879 (based on org.jboss.narayana.jta.jms.JmsXAResourceRecoveryHelper)
  • Loading branch information
geoand committed Jul 5, 2024
2 parents ff2ea68 + 87d880c commit 88042a9
Show file tree
Hide file tree
Showing 5 changed files with 195 additions and 163 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
* Copyright 2020 Red Hat, Inc, and individual contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package dev.snowdrop.boot.narayana.core.jdbc;

import java.sql.SQLException;

import javax.sql.XAConnection;
import javax.sql.XADataSource;
import javax.transaction.xa.XAException;

public class ConnectionManager {

private final XADataSource xaDataSource;
private final String user;
private final String password;
private XAConnection xaConnection;

public ConnectionManager(XADataSource xaDataSource, String user, String password) {
this.xaDataSource = xaDataSource;
this.user = user;
this.password = password;
}

public void connectAndAccept(XAResourceConsumer consumer) throws XAException {
if (isConnected()) {
try {
consumer.accept(this.xaConnection.getXAResource());
} catch (SQLException ex) {
throw createXAException(ex.getMessage());
}
return;
}

connect();
try {
consumer.accept(this.xaConnection.getXAResource());
} catch (SQLException ex) {
throw createXAException(ex.getMessage());
} finally {
disconnect();
}
}

public <T> T connectAndApply(XAResourceFunction<T> function) throws XAException {
if (isConnected()) {
try {
return function.apply(this.xaConnection.getXAResource());
} catch (SQLException ex) {
throw createXAException(ex.getMessage());
}
}

connect();
try {
return function.apply(this.xaConnection.getXAResource());
} catch (SQLException ex) {
throw createXAException(ex.getMessage());
} finally {
disconnect();
}
}

public void connect() throws XAException {
if (isConnected()) {
return;
}

try {
this.xaConnection = createXAConnection();
} catch (SQLException ex) {
if (this.xaConnection != null) {
try {
this.xaConnection.close();
} catch (SQLException ignore) {
}
}
throw createXAException(ex.getMessage());
}
}

public void disconnect() {
if (!isConnected()) {
return;
}

try {
this.xaConnection.close();
} catch (SQLException e) {
} finally {
this.xaConnection = null;
}
}

public boolean isConnected() {
return this.xaConnection != null;
}

private XAConnection createXAConnection() throws SQLException {
if (this.user == null && this.password == null) {
return this.xaDataSource.getXAConnection();
}

return this.xaDataSource.getXAConnection(this.user, this.password);
}

private XAException createXAException(String message) {
XAException xaException = new XAException(message);
xaException.errorCode = XAException.XAER_RMFAIL;
return xaException;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,12 @@

package dev.snowdrop.boot.narayana.core.jdbc;

import java.sql.SQLException;

import javax.sql.XAConnection;
import javax.sql.XADataSource;
import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;

import com.arjuna.ats.jta.recovery.XAResourceRecoveryHelper;
import org.jboss.logging.Logger;

/**
* XAResourceRecoveryHelper implementation which gets XIDs, which needs to be recovered, from the database.
Expand All @@ -34,19 +30,7 @@
*/
public class DataSourceXAResourceRecoveryHelper implements XAResourceRecoveryHelper, XAResource {

private static final XAResource[] NO_XA_RESOURCES = {};

private static final Logger logger = Logger.getLogger(DataSourceXAResourceRecoveryHelper.class);

private final XADataSource xaDataSource;

private final String user;

private final String password;

private XAConnection xaConnection;

private XAResource delegate;
private final ConnectionManager connectionManager;

/**
* Create a new {@link DataSourceXAResourceRecoveryHelper} instance.
Expand All @@ -65,9 +49,7 @@ public DataSourceXAResourceRecoveryHelper(XADataSource xaDataSource) {
* @param password the database password or {@code null}
*/
public DataSourceXAResourceRecoveryHelper(XADataSource xaDataSource, String user, String password) {
this.xaDataSource = xaDataSource;
this.user = user;
this.password = password;
this.connectionManager = new ConnectionManager(xaDataSource, user, password);
}

@Override
Expand All @@ -77,104 +59,70 @@ public boolean initialise(String properties) {

@Override
public XAResource[] getXAResources() {
if (connect()) {
return new XAResource[]{ this };
}
return NO_XA_RESOURCES;
}

private boolean connect() {
if (this.delegate == null) {
if (!this.connectionManager.isConnected()) {
try {
this.xaConnection = getXaConnection();
this.delegate = this.xaConnection.getXAResource();
} catch (SQLException ex) {
logger.warn("Failed to create connection", ex);
return false;
this.connectionManager.connect();
} catch (XAException ignored) {
return new XAResource[0];
}
}
return true;
}

private XAConnection getXaConnection() throws SQLException {
if (this.user == null && this.password == null) {
return this.xaDataSource.getXAConnection();
}
return this.xaDataSource.getXAConnection(this.user, this.password);
return new XAResource[]{this};
}

@Override
public Xid[] recover(int flag) throws XAException {
try {
return getDelegate().recover(flag);
return this.connectionManager.connectAndApply(delegate -> delegate.recover(flag));
} finally {
if (flag == XAResource.TMENDRSCAN) {
disconnect();
this.connectionManager.disconnect();
}
}
}

private void disconnect() throws XAException {
try {
this.xaConnection.close();
} catch (SQLException e) {
logger.warn("Failed to close connection", e);
} finally {
this.xaConnection = null;
this.delegate = null;
}
}

@Override
public void start(Xid xid, int flags) throws XAException {
getDelegate().start(xid, flags);
this.connectionManager.connectAndAccept(delegate -> delegate.start(xid, flags));
}

@Override
public void end(Xid xid, int flags) throws XAException {
getDelegate().end(xid, flags);
this.connectionManager.connectAndAccept(delegate -> delegate.end(xid, flags));
}

@Override
public int prepare(Xid xid) throws XAException {
return getDelegate().prepare(xid);
return this.connectionManager.connectAndApply(delegate -> delegate.prepare(xid));
}

@Override
public void commit(Xid xid, boolean onePhase) throws XAException {
getDelegate().commit(xid, onePhase);
this.connectionManager.connectAndAccept(delegate -> delegate.commit(xid, onePhase));
}

@Override
public void rollback(Xid xid) throws XAException {
getDelegate().rollback(xid);
this.connectionManager.connectAndAccept(delegate -> delegate.rollback(xid));
}

@Override
public boolean isSameRM(XAResource xaResource) throws XAException {
return getDelegate().isSameRM(xaResource);
return this.connectionManager.connectAndApply(delegate -> delegate.isSameRM(xaResource));
}

@Override
public void forget(Xid xid) throws XAException {
getDelegate().forget(xid);
this.connectionManager.connectAndAccept(delegate -> delegate.forget(xid));
}

@Override
public int getTransactionTimeout() throws XAException {
return getDelegate().getTransactionTimeout();
return this.connectionManager.connectAndApply(XAResource::getTransactionTimeout);
}

@Override
public boolean setTransactionTimeout(int seconds) throws XAException {
return getDelegate().setTransactionTimeout(seconds);
return this.connectionManager.connectAndApply(delegate -> delegate.setTransactionTimeout(seconds));
}

private XAResource getDelegate() {
if (this.delegate == null) {
throw new IllegalStateException("Connection has not been opened");
}
return this.delegate;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright 2020 Red Hat, Inc, and individual contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package dev.snowdrop.boot.narayana.core.jdbc;

import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;

@FunctionalInterface
public interface XAResourceConsumer {

void accept(XAResource xaResource) throws XAException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright 2020 Red Hat, Inc, and individual contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package dev.snowdrop.boot.narayana.core.jdbc;

import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;

@FunctionalInterface
public interface XAResourceFunction<T> {

T apply(XAResource xaResource) throws XAException;
}
Loading

0 comments on commit 88042a9

Please sign in to comment.