Skip to content

Commit

Permalink
Merge pull request apache#11 from kiegroup/JBPM-10187
Browse files Browse the repository at this point in the history
[JBPM-10187] Sorting resources to avoid deadlock
  • Loading branch information
fjtirado authored Nov 6, 2023
1 parent 8021164 commit 5eb0c4f
Show file tree
Hide file tree
Showing 6 changed files with 32 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@
package org.drools.persistence.api;

import java.util.Collections;
import java.util.Comparator;
import java.util.LinkedHashSet;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;

public class TransactionManagerHelper {

private static final String APP_UPDETEABLE_RESOURCE = "app-updateable-resource";
private static final String CMD_UPDETEABLE_RESOURCE = "cmd-updateable-resource";

public static void registerTransactionSyncInContainer(TransactionManager txm, OrderedTransactionSynchronization synchronization) {
TransactionSynchronizationContainer container = (TransactionSynchronizationContainer)txm.getResource(TransactionSynchronizationContainer.RESOURCE_KEY);
Expand All @@ -41,7 +43,7 @@ public static void addToUpdatableSet(TransactionManager txm, Transformable trans
}
Set<Transformable> toBeUpdated = (Set<Transformable>) txm.getResource(APP_UPDETEABLE_RESOURCE);
if (toBeUpdated == null) {
toBeUpdated = new LinkedHashSet<Transformable>();
toBeUpdated = new LinkedHashSet<>();
txm.putResource(APP_UPDETEABLE_RESOURCE, toBeUpdated);
}
toBeUpdated.add(transformable);
Expand All @@ -58,11 +60,12 @@ public static void removeFromUpdatableSet(TransactionManager txm, Transformable

@SuppressWarnings("unchecked")
public static Set<Transformable> getUpdateableSet(TransactionManager txm) {
Set<Transformable> toBeUpdated = (Set<Transformable>) txm.getResource(APP_UPDETEABLE_RESOURCE);
if (toBeUpdated == null) {
return Collections.emptySet();
Set<Transformable> result = (Set<Transformable>) txm.getResource(APP_UPDETEABLE_RESOURCE);
if (result != null) {
SortedSet<Transformable> sorted = new TreeSet<>(Comparator.comparing(o -> o.getClass().getSimpleName()));
sorted.addAll(result);
return sorted;
}

return new LinkedHashSet<Transformable>(toBeUpdated);
return Collections.emptySet();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,9 @@ protected javax.transaction.TransactionManager findTransactionManager(UserTransa
jndiName );
return tm;
} catch ( NamingException ex ) {
logger.debug( "No JTA TransactionManager found at fallback JNDI location [{}]",
logger.debug( "No JTA TransactionManager found at fallback JNDI location [{}]. Exception message {}",
jndiName,
ex);
ex.getMessage());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ protected void initNewKnowledgeSession(KieBase kbase, KieSessionConfiguration co

((InternalKnowledgeRuntime) this.ksession).setEndOperationListener( new EndOperationListenerImpl(this.txm, this.sessionInfo ) );

this.runner = new TransactionInterceptor();
this.runner = new TransactionInterceptor(sessionInfo);

TimerJobFactoryManager timerJobFactoryManager = ((InternalKnowledgeRuntime) ksession ).getTimerService().getTimerJobFactoryManager();
if (timerJobFactoryManager instanceof CommandServiceTimerJobFactoryManager) {
Expand Down Expand Up @@ -258,7 +258,7 @@ protected void initExistingKnowledgeSession(Long sessionId,
kruntime.setIdentifier( this.sessionInfo.getId() );
kruntime.setEndOperationListener( new EndOperationListenerImpl( this.txm, this.sessionInfo ) );

this.runner = new TransactionInterceptor();
this.runner = new TransactionInterceptor(sessionInfo);
// apply interceptors
Iterator<ChainableRunner> iterator = this.interceptors.descendingIterator();
while (iterator.hasNext()) {
Expand Down Expand Up @@ -504,6 +504,7 @@ public void afterCompletion(int status) {
this.service.jpm.endCommandScopedEntityManager();

KieSession ksession = this.service.ksession;
logger.debug("Cleaning up session {} information", ksession != null ? ksession.getIdentifier() : null);
// clean up cached process and work item instances
if ( ksession != null ) {
InternalProcessRuntime internalProcessRuntime = ((InternalWorkingMemory) ksession).internalGetProcessRuntime();
Expand All @@ -513,8 +514,11 @@ public void afterCompletion(int status) {
}

internalProcessRuntime.clearProcessInstances();
logger.debug("Cached process instances after clean up {}",internalProcessRuntime.getProcessInstances());
}

((JPAWorkItemManager) ksession.getWorkItemManager()).clearWorkItems();

}
if (status != TransactionManager.STATUS_COMMITTED) {
this.service.jpm.resetApplicationScopedPersistenceContext();
Expand Down Expand Up @@ -567,14 +571,16 @@ private void registerUpdateSync() {

private class TransactionInterceptor extends AbstractInterceptor {

private SessionInfo sessionInfo;


public TransactionInterceptor() {
public TransactionInterceptor(SessionInfo sessionInfo) {
this.sessionInfo = sessionInfo;
setNext(new PseudoClockRunner());
}

@Override
public RequestContext execute( Executable executable, RequestContext context ) {

if ( !( (InternalExecutable) executable ).canRunInTransaction() ) {
executeNext(executable, context);
if (((InternalExecutable) executable ).requiresDispose()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@

public class JpaPersistenceContext implements PersistenceContext {

private static Logger logger = LoggerFactory.getLogger(JpaPersistenceContext.class);
protected static Logger logger = LoggerFactory.getLogger(JpaPersistenceContext.class);

private EntityManager em;
protected final boolean isJTA;
Expand Down Expand Up @@ -68,7 +68,7 @@ public PersistentSession persist(PersistentSession entity) {
}

public PersistentSession findSession(Long id) {

logger.trace("Reading session info {}",id);
SessionInfo sessionInfo = null;
if( this.pessimisticLocking ) {
sessionInfo = this.em.find( SessionInfo.class, id, lockMode );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ protected RequestContext internalExecute( Executable executable, RequestContext
RuntimeException originException = null;

while (true) {
if (attempt > 1) {
if (attempt > 0) {
logger.trace("retrying (attempt {})...", attempt);
}
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,13 +203,19 @@ private boolean isEntity(Object o){
public void onStart(TransactionManager txm) {
if (persister.get() == null) {
EntityManager em = emf.createEntityManager();
log.trace ("Created EM {} for JPAPlaceHolder {}:{}",em, this, name);
persister.set(new EntityPersister(em));
}
}

@Override
public void onEnd(TransactionManager txm) {
EntityPersister em = persister.get();
if (em == null) {
log.warn ("EM is null for {}:{} and status {}", this, name, txm.getStatus());
return;
}
log.trace ("Executing onEnd for {}:{} with status {}", this, name, txm.getStatus());
if(txm.getStatus() == TransactionManager.STATUS_ROLLEDBACK) {
// this is pretty much of a hack but for avoiding issues when rolling back we need to set to null
// the primary key of the entities (simple types)
Expand Down Expand Up @@ -239,6 +245,7 @@ public void onEnd(TransactionManager txm) {
});
}
if (em != null) {
log.trace ("Closing EM {} for JPAPlaceHolder {}:{}",em.getEntityManager(), this, name);
em.close();
persister.set(null);
}
Expand Down

0 comments on commit 5eb0c4f

Please sign in to comment.