Skip to content

Commit

Permalink
Performance improvement: synchronize to java.util.concurrent.locks sw…
Browse files Browse the repository at this point in the history
…itch to improve performance with VirtualThreads (#2116) (#2118)

This change improves EclipseLink performance with higher versions of the JDK (21 and above).
It's about replacement of synchronized keyword and wait(), notify() methods by objects from java.util.concurrent.locks.* package.
Additionally there are some new performance tests to verify it.


(cherry picked from commit 1f0cf75)

Signed-off-by: Radek Felcman <radek.felcman@oracle.com>
  • Loading branch information
rfelcman authored Apr 25, 2024
1 parent 8a993b4 commit c9022b7
Show file tree
Hide file tree
Showing 26 changed files with 1,060 additions and 249 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020, 2021 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2020, 2024 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0 which is available at
Expand All @@ -14,7 +14,14 @@

import org.eclipse.persistence.internal.helper.type.ReadLockAcquisitionMetadata;

import java.util.*;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Vector;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class ReadLockManager {

Expand All @@ -40,21 +47,28 @@ public class ReadLockManager {
*/
private final List<String> removeReadLockProblemsDetected = new ArrayList<>();

private final Lock instanceLock = new ReentrantLock();

/**
* add a concurrency manager as deferred locks to the DLM
*/
public synchronized void addReadLock(ConcurrencyManager concurrencyManager) {
final Thread currentThread = Thread.currentThread();
final long currentThreadId = currentThread.getId();
ReadLockAcquisitionMetadata readLockAcquisitionMetadata = ConcurrencyUtil.SINGLETON.createReadLockAcquisitionMetadata(concurrencyManager);

this.readLocks.add(FIRST_INDEX_OF_COLLECTION, concurrencyManager);
if(!mapThreadToReadLockAcquisitionMetadata.containsKey(currentThreadId)) {
List<ReadLockAcquisitionMetadata> newList = Collections.synchronizedList(new ArrayList<>());
mapThreadToReadLockAcquisitionMetadata.put(currentThreadId, newList );
public void addReadLock(ConcurrencyManager concurrencyManager) {
instanceLock.lock();
try {
final Thread currentThread = Thread.currentThread();
final long currentThreadId = currentThread.getId();
ReadLockAcquisitionMetadata readLockAcquisitionMetadata = ConcurrencyUtil.SINGLETON.createReadLockAcquisitionMetadata(concurrencyManager);

this.readLocks.add(FIRST_INDEX_OF_COLLECTION, concurrencyManager);
if (!mapThreadToReadLockAcquisitionMetadata.containsKey(currentThreadId)) {
List<ReadLockAcquisitionMetadata> newList = Collections.synchronizedList(new ArrayList<>());
mapThreadToReadLockAcquisitionMetadata.put(currentThreadId, newList);
}
List<ReadLockAcquisitionMetadata> acquiredReadLocksInCurrentTransactionList = mapThreadToReadLockAcquisitionMetadata.get(currentThreadId);
acquiredReadLocksInCurrentTransactionList.add(FIRST_INDEX_OF_COLLECTION, readLockAcquisitionMetadata);
} finally {
instanceLock.unlock();
}
List<ReadLockAcquisitionMetadata> acquiredReadLocksInCurrentTransactionList = mapThreadToReadLockAcquisitionMetadata.get(currentThreadId);
acquiredReadLocksInCurrentTransactionList.add(FIRST_INDEX_OF_COLLECTION, readLockAcquisitionMetadata);
}

/**
Expand All @@ -65,46 +79,56 @@ public synchronized void addReadLock(ConcurrencyManager concurrencyManager) {
* @param concurrencyManager
* the concurrency cache key that is about to be decrement in number of readers.
*/
public synchronized void removeReadLock(ConcurrencyManager concurrencyManager) {
final Thread currentThread = Thread.currentThread();
final long currentThreadId = currentThread.getId();
boolean readLockManagerHasTracingAboutAddedReadLocksForCurrentThread = mapThreadToReadLockAcquisitionMetadata.containsKey(currentThreadId);

if (!readLockManagerHasTracingAboutAddedReadLocksForCurrentThread) {
String errorMessage = ConcurrencyUtil.SINGLETON.readLockManagerProblem02ReadLockManageHasNoEntriesForThread(concurrencyManager, currentThreadId);
removeReadLockProblemsDetected.add(errorMessage);
return;
}
public void removeReadLock(ConcurrencyManager concurrencyManager) {
instanceLock.lock();
try {
final Thread currentThread = Thread.currentThread();
final long currentThreadId = currentThread.getId();
boolean readLockManagerHasTracingAboutAddedReadLocksForCurrentThread = mapThreadToReadLockAcquisitionMetadata.containsKey(currentThreadId);

if (!readLockManagerHasTracingAboutAddedReadLocksForCurrentThread) {
String errorMessage = ConcurrencyUtil.SINGLETON.readLockManagerProblem02ReadLockManageHasNoEntriesForThread(concurrencyManager, currentThreadId);
removeReadLockProblemsDetected.add(errorMessage);
return;
}

List<ReadLockAcquisitionMetadata> readLocksAcquiredDuringCurrentThread = mapThreadToReadLockAcquisitionMetadata.get(currentThreadId);
ReadLockAcquisitionMetadata readLockAquisitionMetadataToRemove = null;
for (ReadLockAcquisitionMetadata currentReadLockAcquisitionMetadata : readLocksAcquiredDuringCurrentThread) {
ConcurrencyManager currentCacheKeyObjectToCheck = currentReadLockAcquisitionMetadata.getCacheKeyWhoseNumberOfReadersThreadIsIncrementing();
boolean dtoToRemoveFound = concurrencyManager.getConcurrencyManagerId() == currentCacheKeyObjectToCheck.getConcurrencyManagerId();
if (dtoToRemoveFound) {
readLockAquisitionMetadataToRemove = currentReadLockAcquisitionMetadata;
break;
List<ReadLockAcquisitionMetadata> readLocksAcquiredDuringCurrentThread = mapThreadToReadLockAcquisitionMetadata.get(currentThreadId);
ReadLockAcquisitionMetadata readLockAquisitionMetadataToRemove = null;
for (ReadLockAcquisitionMetadata currentReadLockAcquisitionMetadata : readLocksAcquiredDuringCurrentThread) {
ConcurrencyManager currentCacheKeyObjectToCheck = currentReadLockAcquisitionMetadata.getCacheKeyWhoseNumberOfReadersThreadIsIncrementing();
boolean dtoToRemoveFound = concurrencyManager.getConcurrencyManagerId() == currentCacheKeyObjectToCheck.getConcurrencyManagerId();
if (dtoToRemoveFound) {
readLockAquisitionMetadataToRemove = currentReadLockAcquisitionMetadata;
break;
}
}
}

if (readLockAquisitionMetadataToRemove == null) {
String errorMessage = ConcurrencyUtil.SINGLETON.readLockManagerProblem03ReadLockManageHasNoEntriesForThread(concurrencyManager, currentThreadId);
removeReadLockProblemsDetected.add(errorMessage);
return;
}
this.readLocks.remove(concurrencyManager);
readLocksAcquiredDuringCurrentThread.remove(readLockAquisitionMetadataToRemove);
if (readLockAquisitionMetadataToRemove == null) {
String errorMessage = ConcurrencyUtil.SINGLETON.readLockManagerProblem03ReadLockManageHasNoEntriesForThread(concurrencyManager, currentThreadId);
removeReadLockProblemsDetected.add(errorMessage);
return;
}
this.readLocks.remove(concurrencyManager);
readLocksAcquiredDuringCurrentThread.remove(readLockAquisitionMetadataToRemove);

if (readLocksAcquiredDuringCurrentThread.isEmpty()) {
mapThreadToReadLockAcquisitionMetadata.remove(currentThreadId);
if (readLocksAcquiredDuringCurrentThread.isEmpty()) {
mapThreadToReadLockAcquisitionMetadata.remove(currentThreadId);
}
} finally {
instanceLock.unlock();
}
}

/**
* Return a set of the deferred locks
*/
public synchronized List<ConcurrencyManager> getReadLocks() {
return Collections.unmodifiableList(readLocks);
public List<ConcurrencyManager> getReadLocks() {
instanceLock.lock();
try {
return Collections.unmodifiableList(readLocks);
} finally {
instanceLock.unlock();
}
}

/**
Expand All @@ -114,8 +138,13 @@ public synchronized List<ConcurrencyManager> getReadLocks() {
* @param problemDetected
* the detected problem
*/
public synchronized void addRemoveReadLockProblemsDetected(String problemDetected) {
removeReadLockProblemsDetected.add(problemDetected);
public void addRemoveReadLockProblemsDetected(String problemDetected) {
instanceLock.lock();
try {
removeReadLockProblemsDetected.add(problemDetected);
} finally {
instanceLock.unlock();
}
}

/** Getter for {@link #mapThreadToReadLockAcquisitionMetadata} */
Expand All @@ -137,8 +166,13 @@ public List<String> getRemoveReadLockProblemsDetected() {
* any read lock acquired in the tracing we definitely do not want this object instance to be thrown out
* from our main tracing. It is probably revealing problems in read lock acquisition and released.
*/
public synchronized boolean isEmpty() {
return readLocks.isEmpty() && removeReadLockProblemsDetected.isEmpty();
public boolean isEmpty() {
instanceLock.lock();
try {
return readLocks.isEmpty() && removeReadLockProblemsDetected.isEmpty();
} finally {
instanceLock.unlock();
}
}

/**
Expand All @@ -151,16 +185,20 @@ public synchronized boolean isEmpty() {
* or to go about doing
*/
@Override
public synchronized ReadLockManager clone() {
ReadLockManager clone = new ReadLockManager();
clone.readLocks.addAll(this.readLocks);
for (Map.Entry<Long, List<ReadLockAcquisitionMetadata>> currentEntry : this.mapThreadToReadLockAcquisitionMetadata.entrySet()) {
Long key = currentEntry.getKey();
List<ReadLockAcquisitionMetadata> value = currentEntry.getValue();
clone.mapThreadToReadLockAcquisitionMetadata.put(key, new ArrayList<>(value));
public ReadLockManager clone() {
instanceLock.lock();
try {
ReadLockManager clone = new ReadLockManager();
clone.readLocks.addAll(this.readLocks);
for (Map.Entry<Long, List<ReadLockAcquisitionMetadata>> currentEntry : this.mapThreadToReadLockAcquisitionMetadata.entrySet()) {
Long key = currentEntry.getKey();
List<ReadLockAcquisitionMetadata> value = currentEntry.getValue();
clone.mapThreadToReadLockAcquisitionMetadata.put(key, new ArrayList<>(value));
}
clone.removeReadLockProblemsDetected.addAll(this.removeReadLockProblemsDetected);
return clone;
} finally {
instanceLock.unlock();
}
clone.removeReadLockProblemsDetected.addAll(this.removeReadLockProblemsDetected);
return clone;
}

}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*
* Copyright (c) 1998, 2021 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 1998, 2021 IBM Corporation. All rights reserved.
* Copyright (c) 1998, 2024 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 1998, 2024 IBM Corporation. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0 which is available at
Expand All @@ -22,10 +22,6 @@
// - 526957 : Split the logging and trace messages
package org.eclipse.persistence.internal.helper;

import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;

import org.eclipse.persistence.descriptors.ClassDescriptor;
import org.eclipse.persistence.descriptors.FetchGroupManager;
import org.eclipse.persistence.exceptions.ConcurrencyException;
Expand All @@ -40,6 +36,19 @@
import org.eclipse.persistence.logging.SessionLog;
import org.eclipse.persistence.mappings.DatabaseMapping;

import java.util.Collection;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import static java.util.Collections.unmodifiableMap;

/**
Expand Down Expand Up @@ -122,6 +131,10 @@ public class WriteLockManager {
/* the first element in this list will be the prevailing thread */
protected ExposedNodeLinkedList prevailingQueue;

private final Lock toWaitOnLock = new ReentrantLock();
private final Lock instancePrevailingQueueLock = new ReentrantLock();
private final Condition toWaitOnLockCondition = toWaitOnLock.newCondition();

public WriteLockManager() {
this.prevailingQueue = new ExposedNodeLinkedList();
}
Expand Down Expand Up @@ -167,14 +180,17 @@ public Map acquireLocksForClone(Object objectForClone, ClassDescriptor descripto
// using the exact same approach we have been adding to the concurrency manager
ConcurrencyUtil.SINGLETON.determineIfReleaseDeferredLockAppearsToBeDeadLocked(toWaitOn, whileStartTimeMillis, lockManager, readLockManager, ALLOW_INTERRUPTED_EXCEPTION_TO_BE_FIRED_UP_TRUE);

synchronized (toWaitOn) {
toWaitOnLock.lock();
try {
try {
if (toWaitOn.isAcquired()) {//last minute check to insure it is still locked.
toWaitOn.wait(ConcurrencyUtil.SINGLETON.getAcquireWaitTime());// wait for lock on object to be released
toWaitOnLockCondition.await(ConcurrencyUtil.SINGLETON.getAcquireWaitTime(), TimeUnit.MILLISECONDS);// wait for lock on object to be released
}
} catch (InterruptedException ex) {
// Ignore exception thread should continue.
}
} finally {
toWaitOnLock.unlock();
}
Object waitObject = toWaitOn.getObject();
// Object may be null for loss of identity.
Expand Down Expand Up @@ -419,8 +435,11 @@ private void acquireRequiredLocksInternal(MergeManager mergeManager, UnitOfWorkC
// set the QueueNode to be the node from the
// linked list for quick removal upon
// acquiring all locks
synchronized (this.prevailingQueue) {
instancePrevailingQueueLock.lock();
try {
mergeManager.setQueueNode(this.prevailingQueue.addLast(mergeManager));
} finally {
instancePrevailingQueueLock.unlock();
}
}

Expand All @@ -430,14 +449,15 @@ private void acquireRequiredLocksInternal(MergeManager mergeManager, UnitOfWorkC
try {
if (activeCacheKey != null){
//wait on the lock of the object that we couldn't get.
synchronized (activeCacheKey) {
activeCacheKey.getInstanceLock().lock();
try {
// verify that the cache key is still locked before we wait on it, as
//it may have been released since we tried to acquire it.
if (activeCacheKey.isAcquired() && (activeCacheKey.getActiveThread() != Thread.currentThread())) {
Thread thread = activeCacheKey.getActiveThread();
if (thread.isAlive()){
long time = System.currentTimeMillis();
activeCacheKey.wait(MAX_WAIT);
activeCacheKey.getInstanceLockCondition().await(MAX_WAIT, TimeUnit.MILLISECONDS);
if (System.currentTimeMillis() - time >= MAX_WAIT){
Object[] params = new Object[]{MAX_WAIT /1000, descriptor.getJavaClassName(), activeCacheKey.getKey(), thread.getName()};
StringBuilder buffer = new StringBuilder(TraceLocalization.buildMessage("max_time_exceeded_for_acquirerequiredlocks_wait", params));
Expand All @@ -459,6 +479,8 @@ private void acquireRequiredLocksInternal(MergeManager mergeManager, UnitOfWorkC
}
}
}
} finally {
activeCacheKey.getInstanceLock().unlock();
}
}
} catch (InterruptedException exception) {
Expand Down Expand Up @@ -497,8 +519,11 @@ private void acquireRequiredLocksInternal(MergeManager mergeManager, UnitOfWorkC
}finally {
if (mergeManager.getWriteLockQueued() != null) {
//the merge manager entered the wait queue and must be cleaned up
synchronized(this.prevailingQueue) {
instancePrevailingQueueLock.lock();
try {
this.prevailingQueue.remove(mergeManager.getQueueNode());
} finally {
instancePrevailingQueueLock.unlock();
}
mergeManager.setWriteLockQueued(null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import org.eclipse.persistence.sessions.DataRecord;
import org.eclipse.persistence.sessions.DatabaseRecord;

import java.util.concurrent.TimeUnit;

/**
* <p><b>Purpose</b>: Container class for storing objects in an IdentityMap.
* <p><b>Responsibilities</b>:<ul>
Expand Down Expand Up @@ -604,18 +606,23 @@ public void setTransactionId(Object transactionId) {
this.transactionId = transactionId;
}

public synchronized Object waitForObject(){
public Object waitForObject(){
getInstanceLock().lock();
try {
int count = 0;
while (this.object == null && isAcquired()) {
if (count > MAX_WAIT_TRIES)
throw ConcurrencyException.maxTriesLockOnBuildObjectExceded(getActiveThread(), Thread.currentThread());
wait(10);
++count;
try {
int count = 0;
while (this.object == null && isAcquired()) {
if (count > MAX_WAIT_TRIES)
throw ConcurrencyException.maxTriesLockOnBuildObjectExceded(getActiveThread(), Thread.currentThread());
getInstanceLockCondition().await(10, TimeUnit.MILLISECONDS);
++count;
}
} catch(InterruptedException ex) {
//ignore as the loop is broken
}
} catch(InterruptedException ex) {
//ignore as the loop is broken
return this.object;
} finally {
getInstanceLock().unlock();
}
return this.object;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,4 @@

warmup.iterations=20
run.iterations=20
jmh.resultFile=jmh-result.csv
jmh.resultFormat=csv
Loading

0 comments on commit c9022b7

Please sign in to comment.