Skip to content

Commit

Permalink
nfs4: improve state handler concurrency
Browse files Browse the repository at this point in the history
Motivation:
The state handler keeps track of NFSv4 clients. This duty is not
symmetric - updates happen when a client is added or removed, but
queried almost for any compound operation in a session. Thus on a
busy system we can observe thread serialization.

Modification:
replace synchronized blocks with read/write locks.

Result:
a better concurrency in a multi-client environment.

Acked-by: Paul Millar
Acked-by: Albert Rossi
Target: master
  • Loading branch information
kofemann committed Jan 7, 2021
1 parent ea6f71a commit 44fa34c
Showing 1 changed file with 103 additions and 50 deletions.
153 changes: 103 additions & 50 deletions core/src/main/java/org/dcache/nfs/v4/NFSv4StateHandler.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2009 - 2020 Deutsches Elektronen-Synchroton,
* Copyright (c) 2009 - 2021 Deutsches Elektronen-Synchroton,
* Member of the Helmholtz Association, (DESY), HAMBURG, GERMANY
*
* This library is free software; you can redistribute it and/or modify
Expand Down Expand Up @@ -32,6 +32,8 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;

import org.dcache.nfs.ChimeraNFSException;
Expand All @@ -47,6 +49,8 @@
import org.dcache.nfs.util.Cache;
import org.dcache.nfs.util.NopCacheEventListener;

import javax.annotation.concurrent.GuardedBy;

import static com.google.common.base.Preconditions.checkState;

public class NFSv4StateHandler {
Expand All @@ -63,9 +67,19 @@ public class NFSv4StateHandler {
*/
private final AtomicInteger _clientId = new AtomicInteger(0);

// mapping between server generated clietid and nfs_client_id, not confirmed yet
/**
* Mapping between server generated clietid and nfs_client_id, not confirmed yet.
*/
private final ClientCache _clientsByServerId;

/**
* Read/write lock that guards the access to {@link #_clientsByServerId}.
*/
private final ReentrantReadWriteLock _accessLock = new ReentrantReadWriteLock();

private final Lock _readLock = _accessLock.readLock();
private final Lock _writeLock = _accessLock.writeLock();

/**
* Client's lease expiration time in seconds.
*/
Expand Down Expand Up @@ -129,19 +143,27 @@ public NFSv4StateHandler(int leaseTime, int instanceId, ClientRecoveryStore clie

public void removeClient(NFS4Client client) {

synchronized (this) {
_writeLock.lock();
try {
checkState(_running, "NFS state handler not running");
_clientsByServerId.remove(client.getId());
clientStore.removeClient(client.getOwnerId());
} finally {
_writeLock.unlock();
}
client.tryDispose();
}

private synchronized void addClient(NFS4Client newClient) {
private void addClient(NFS4Client newClient) {

checkState(_running, "NFS state handler not running");
_clientsByServerId.put(newClient.getId(), newClient);
clientStore.addClient(newClient.getOwnerId());
_writeLock.lock();
try {
checkState(_running, "NFS state handler not running");
_clientsByServerId.put(newClient.getId(), newClient);
clientStore.addClient(newClient.getOwnerId());
} finally {
_writeLock.unlock();
}
}

/**
Expand All @@ -152,14 +174,12 @@ private synchronized void addClient(NFS4Client newClient) {
* @throws StaleClientidException if there are no corresponding verified
* valid record exist.
*/
public synchronized NFS4Client getConfirmedClient(clientid4 clientid) throws StaleClientidException {
public NFS4Client getConfirmedClient(clientid4 clientid) throws StaleClientidException {

NFS4Client client = getValidClient(clientid);

if (!client.isConfirmed()) {
throw new StaleClientidException("client not confirmed.");
}

return client;
}

Expand All @@ -172,14 +192,12 @@ public synchronized NFS4Client getConfirmedClient(clientid4 clientid) throws Sta
* @throws StaleClientidException if there are no corresponding verified
* valid record exist.
*/
public synchronized NFS4Client getValidClient(clientid4 clientid) throws StaleClientidException {
public NFS4Client getValidClient(clientid4 clientid) throws StaleClientidException {

NFS4Client client = getClient(clientid);

if (!client.isLeaseValid()) {
throw new StaleClientidException("client expired.");
}

return client;
}

Expand All @@ -191,37 +209,53 @@ public synchronized NFS4Client getValidClient(clientid4 clientid) throws StaleCl
* @return nfs client associated with clientid.
* @throws StaleClientidException if there are no corresponding record exist.
*/
public synchronized NFS4Client getClient(clientid4 clientid) throws StaleClientidException {
public NFS4Client getClient(clientid4 clientid) throws StaleClientidException {

checkState(_running, "NFS state handler not running");
_readLock.lock();
try {
checkState(_running, "NFS state handler not running");

NFS4Client client = _clientsByServerId.get(clientid);
if(client == null) {
throw new StaleClientidException("bad client id.");
NFS4Client client = _clientsByServerId.get(clientid);
if (client == null) {
throw new StaleClientidException("bad client id.");
}
return client;
} finally {
_readLock.unlock();
}
return client;
}

public synchronized NFS4Client getClientIdByStateId(stateid4 stateId) throws ChimeraNFSException {
public NFS4Client getClientIdByStateId(stateid4 stateId) throws ChimeraNFSException {

checkState(_running, "NFS state handler not running");
_readLock.lock();
try {
checkState(_running, "NFS state handler not running");

clientid4 clientId = new clientid4(Bytes.getLong(stateId.other, 0));
NFS4Client client = _clientsByServerId.get(clientId);
if (client == null) {
throw new BadStateidException("no client for stateid: " + stateId);
clientid4 clientId = new clientid4(Bytes.getLong(stateId.other, 0));
NFS4Client client = _clientsByServerId.get(clientId);
if (client == null) {
throw new BadStateidException("no client for stateid: " + stateId);
}
return client;
} finally {
_readLock.unlock();
}
return client;
}

public synchronized NFS4Client getClient(sessionid4 id) throws ChimeraNFSException {
checkState(_running, "NFS state handler not running");
clientid4 clientId = new clientid4(Bytes.getLong(id.value, 0));
NFS4Client client = _clientsByServerId.get(clientId);
if (client == null) {
throw new BadSessionException("session not found: " + id);
public NFS4Client getClient(sessionid4 id) throws ChimeraNFSException {

_readLock.lock();
try {
checkState(_running, "NFS state handler not running");
clientid4 clientId = new clientid4(Bytes.getLong(id.value, 0));
NFS4Client client = _clientsByServerId.get(clientId);
if (client == null) {
throw new BadSessionException("session not found: " + id);
}
return client;
} finally {
_readLock.unlock();
}
return client;
}

/**
Expand All @@ -230,12 +264,18 @@ public synchronized NFS4Client getClient(sessionid4 id) throws ChimeraNFSExcepti
*
* @return an existing client record or null, if not matching record found.
*/
public synchronized NFS4Client clientByOwner(byte[] ownerid) {
return _clientsByServerId
.stream()
.filter(c -> Arrays.equals(c.getOwnerId(), ownerid))
.findAny()
.orElse(null);
public NFS4Client clientByOwner(byte[] ownerid) {

_readLock.lock();
try {
return _clientsByServerId
.stream()
.filter(c -> Arrays.equals(c.getOwnerId(), ownerid))
.findAny()
.orElse(null);
} finally {
_readLock.unlock();
}
}

public void updateClientLeaseTime(stateid4 stateid) throws ChimeraNFSException {
Expand All @@ -252,10 +292,16 @@ public void updateClientLeaseTime(stateid4 stateid) throws ChimeraNFSException
client.updateLeaseTime();
}

public synchronized List<NFS4Client> getClients() {
checkState(_running, "NFS state handler not running");
return _clientsByServerId.peek()
.collect(Collectors.toList());
public List<NFS4Client> getClients() {

_readLock.lock();
try {
checkState(_running, "NFS state handler not running");
return _clientsByServerId.peek()
.collect(Collectors.toList());
} finally {
_readLock.unlock();
}
}

public NFS4Client createClient(InetSocketAddress clientAddress, InetSocketAddress localAddress, int minorVersion,
Expand Down Expand Up @@ -315,7 +361,8 @@ public synchronized void wantReclaim(byte[] owner) throws ChimeraNFSException {
clientStore.wantReclaim(owner);
}

private synchronized void drainClients() {
@GuardedBy("_writeLock")
private void drainClients() {
_clientsByServerId.stream()
.forEach(c -> {
c.tryDispose();
Expand All @@ -326,12 +373,18 @@ private synchronized void drainClients() {
/**
* Shutdown session lease time watchdog thread.
*/
public synchronized void shutdown() throws IOException {
checkState(_running, "NFS state handler not running");
_running = false;
drainClients();
_cleanerScheduler.shutdown();
clientStore.close();
public void shutdown() throws IOException {

_writeLock.lock();
try {
checkState(_running, "NFS state handler not running");
_running = false;
drainClients();
_cleanerScheduler.shutdown();
clientStore.close();
} finally {
_writeLock.unlock();
}
}

/**
Expand Down

0 comments on commit 44fa34c

Please sign in to comment.