Skip to content

Commit

Permalink
HDDS-11413. PipelineManagerImpl lock optimization reduces AllocateBlo…
Browse files Browse the repository at this point in the history
…ck latency (apache#7160)
  • Loading branch information
guohao-rosicky authored Dec 13, 2024
1 parent 853d657 commit bf6f323
Show file tree
Hide file tree
Showing 6 changed files with 111 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ public ContainerInfo allocateContainer(
// Acquire pipeline manager lock, to avoid any updates to pipeline
// while allocate container happens. This is to avoid scenario like
// mentioned in HDDS-5655.
pipelineManager.acquireReadLock();
pipelineManager.acquireReadLock(replicationConfig);
lock.lock();
List<Pipeline> pipelines;
Pipeline pipeline;
Expand All @@ -196,7 +196,7 @@ public ContainerInfo allocateContainer(
}
} finally {
lock.unlock();
pipelineManager.releaseReadLock();
pipelineManager.releaseReadLock(replicationConfig);
}

if (pipelines.isEmpty()) {
Expand All @@ -209,7 +209,7 @@ public ContainerInfo allocateContainer(
" matching pipeline for replicationConfig: " + replicationConfig
+ ", State:PipelineState.OPEN", e);
}
pipelineManager.acquireReadLock();
pipelineManager.acquireReadLock(replicationConfig);
lock.lock();
try {
pipelines = pipelineManager
Expand All @@ -224,7 +224,7 @@ public ContainerInfo allocateContainer(
}
} finally {
lock.unlock();
pipelineManager.releaseReadLock();
pipelineManager.releaseReadLock(replicationConfig);
}
}
return containerInfo;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,20 +207,20 @@ void reinitialize(Table<PipelineID, Pipeline> pipelineStore)
/**
* Acquire read lock.
*/
void acquireReadLock();
void acquireReadLock(ReplicationConfig replicationConfig);

/**
* Release read lock.
*/
void releaseReadLock();
void releaseReadLock(ReplicationConfig replicationConfig);

/**
* Acquire write lock.
*/
void acquireWriteLock();
void acquireWriteLock(ReplicationConfig replicationConfig);

/**
* Release write lock.
*/
void releaseWriteLock();
void releaseWriteLock(ReplicationConfig replicationConfig);
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ public class PipelineManagerImpl implements PipelineManager {

// Limit the number of on-going ratis operation to be 1.
private final ReentrantReadWriteLock lock;
private final ReentrantReadWriteLock ecPipelineLock;
private PipelineFactory pipelineFactory;
private PipelineStateManager stateManager;
private BackgroundPipelineCreator backgroundPipelineCreator;
Expand Down Expand Up @@ -105,6 +106,7 @@ protected PipelineManagerImpl(ConfigurationSource conf,
SCMContext scmContext,
Clock clock) {
this.lock = new ReentrantReadWriteLock();
this.ecPipelineLock = new ReentrantReadWriteLock();
this.pipelineFactory = pipelineFactory;
this.stateManager = pipelineStateManager;
this.conf = conf;
Expand Down Expand Up @@ -248,7 +250,7 @@ public Pipeline createPipeline(ReplicationConfig replicationConfig,
throws IOException {
checkIfPipelineCreationIsAllowed(replicationConfig);

acquireWriteLock();
acquireWriteLock(replicationConfig);
final Pipeline pipeline;
try {
try {
Expand All @@ -261,7 +263,7 @@ public Pipeline createPipeline(ReplicationConfig replicationConfig,
addPipelineToManager(pipeline);
return pipeline;
} finally {
releaseWriteLock();
releaseWriteLock(replicationConfig);
}
}

Expand All @@ -286,15 +288,16 @@ private void addPipelineToManager(Pipeline pipeline)
throws IOException {
HddsProtos.Pipeline pipelineProto = pipeline.getProtobufMessage(
ClientVersion.CURRENT_VERSION);
acquireWriteLock();
ReplicationConfig replicationConfig = pipeline.getReplicationConfig();
acquireWriteLock(replicationConfig);
try {
stateManager.addPipeline(pipelineProto);
} catch (IOException ex) {
LOG.debug("Failed to add pipeline {}.", pipeline, ex);
metrics.incNumPipelineCreationFailed();
throw ex;
} finally {
releaseWriteLock();
releaseWriteLock(replicationConfig);
}
recordMetricsForPipeline(pipeline);
}
Expand Down Expand Up @@ -419,19 +422,23 @@ public int getNumberOfContainers(PipelineID pipelineID) throws IOException {
public void openPipeline(PipelineID pipelineId)
throws IOException {
HddsProtos.PipelineID pipelineIdProtobuf = pipelineId.getProtobuf();
acquireWriteLock();
final Pipeline pipeline;

final Pipeline pipeline = getPipeline(pipelineId);
ReplicationConfig replicationConfig = null;
try {
pipeline = stateManager.getPipeline(pipelineId);
if (pipeline.isClosed()) {
throw new IOException("Closed pipeline can not be opened");
}
replicationConfig = pipeline.getReplicationConfig();
acquireWriteLock(replicationConfig);
if (pipeline.getPipelineState() == Pipeline.PipelineState.ALLOCATED) {
stateManager.updatePipelineState(pipelineIdProtobuf,
HddsProtos.PipelineState.PIPELINE_OPEN);
}
} finally {
releaseWriteLock();
if (replicationConfig != null) {
releaseWriteLock(replicationConfig);
}
}
metrics.incNumPipelineCreated();
metrics.createPerPipelineMetrics(pipeline);
Expand All @@ -447,14 +454,15 @@ protected void removePipeline(Pipeline pipeline)
throws IOException {
pipelineFactory.close(pipeline.getType(), pipeline);
HddsProtos.PipelineID pipelineID = pipeline.getId().getProtobuf();
acquireWriteLock();
ReplicationConfig replicationConfig = pipeline.getReplicationConfig();
acquireWriteLock(replicationConfig);
try {
stateManager.removePipeline(pipelineID);
} catch (IOException ex) {
metrics.incNumPipelineDestroyFailed();
throw ex;
} finally {
releaseWriteLock();
releaseWriteLock(replicationConfig);
}
LOG.info("Pipeline {} removed.", pipeline);
metrics.incNumPipelineDestroyed();
Expand Down Expand Up @@ -507,19 +515,20 @@ public void closePipeline(PipelineID pipelineID) throws IOException {
HddsProtos.PipelineID pipelineIDProtobuf = pipelineID.getProtobuf();
// close containers.
closeContainersForPipeline(pipelineID);
if (!getPipeline(pipelineID).isClosed()) {
acquireWriteLock();
Pipeline pipeline = getPipeline(pipelineID);
if (!pipeline.isClosed()) {
ReplicationConfig replicationConfig = pipeline.getReplicationConfig();
acquireWriteLock(replicationConfig);
try {
stateManager.updatePipelineState(pipelineIDProtobuf,
HddsProtos.PipelineState.PIPELINE_CLOSED);
} finally {
releaseWriteLock();
releaseWriteLock(replicationConfig);
}
LOG.info("Pipeline {} moved to CLOSED state", pipelineID);
}

metrics.removePipelineMetrics(pipelineID);

}

/**
Expand Down Expand Up @@ -684,12 +693,14 @@ public int minPipelineLimit(Pipeline pipeline) {
public void activatePipeline(PipelineID pipelineID)
throws IOException {
HddsProtos.PipelineID pipelineIDProtobuf = pipelineID.getProtobuf();
acquireWriteLock();
Pipeline pipeline = getPipeline(pipelineID);
ReplicationConfig replicationConfig = pipeline.getReplicationConfig();
acquireWriteLock(replicationConfig);
try {
stateManager.updatePipelineState(pipelineIDProtobuf,
HddsProtos.PipelineState.PIPELINE_OPEN);
} finally {
releaseWriteLock();
releaseWriteLock(replicationConfig);
}
}

Expand All @@ -703,12 +714,14 @@ public void activatePipeline(PipelineID pipelineID)
public void deactivatePipeline(PipelineID pipelineID)
throws IOException {
HddsProtos.PipelineID pipelineIDProtobuf = pipelineID.getProtobuf();
acquireWriteLock();
Pipeline pipeline = getPipeline(pipelineID);
ReplicationConfig replicationConfig = pipeline.getReplicationConfig();
acquireWriteLock(replicationConfig);
try {
stateManager.updatePipelineState(pipelineIDProtobuf,
HddsProtos.PipelineState.PIPELINE_DORMANT);
} finally {
releaseWriteLock();
releaseWriteLock(replicationConfig);
}
}

Expand Down Expand Up @@ -931,22 +944,38 @@ private void recordMetricsForPipeline(Pipeline pipeline) {
}

@Override
public void acquireReadLock() {
lock.readLock().lock();
public void acquireReadLock(ReplicationConfig replicationConfig) {
if (replicationConfig.getReplicationType().equals(ReplicationType.EC)) {
ecPipelineLock.readLock().lock();
} else {
lock.readLock().lock();
}
}

@Override
public void releaseReadLock() {
lock.readLock().unlock();
public void releaseReadLock(ReplicationConfig replicationConfig) {
if (replicationConfig.getReplicationType().equals(ReplicationType.EC)) {
ecPipelineLock.readLock().unlock();
} else {
lock.readLock().unlock();
}
}

@Override
public void acquireWriteLock() {
lock.writeLock().lock();
public void acquireWriteLock(ReplicationConfig replicationConfig) {
if (replicationConfig.getReplicationType().equals(ReplicationType.EC)) {
ecPipelineLock.writeLock().lock();
} else {
lock.writeLock().lock();
}
}

@Override
public void releaseWriteLock() {
lock.writeLock().unlock();
public void releaseWriteLock(ReplicationConfig replicationConfig) {
if (replicationConfig.getReplicationType().equals(ReplicationType.EC)) {
ecPipelineLock.writeLock().unlock();
} else {
lock.writeLock().unlock();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -156,13 +156,13 @@ private ContainerInfo getContainer(ReplicationConfig repConfig, String owner,
// Acquire pipeline manager lock, to avoid any updates to pipeline
// while allocate container happens. This is to avoid scenario like
// mentioned in HDDS-5655.
pipelineManager.acquireReadLock();
pipelineManager.acquireReadLock(repConfig);
try {
List<Pipeline> availablePipelines = findPipelinesByState(repConfig,
excludeList, Pipeline.PipelineState.OPEN);
return selectContainer(availablePipelines, req, owner, excludeList);
} finally {
pipelineManager.releaseReadLock();
pipelineManager.releaseReadLock(repConfig);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -332,22 +332,22 @@ public Map<String, Integer> getPipelineInfo() {
}

@Override
public void acquireReadLock() {
public void acquireReadLock(ReplicationConfig replicationConfig) {

}

@Override
public void releaseReadLock() {
public void releaseReadLock(ReplicationConfig replicationConfig) {

}

@Override
public void acquireWriteLock() {
public void acquireWriteLock(ReplicationConfig replicationConfig) {

}

@Override
public void releaseWriteLock() {
public void releaseWriteLock(ReplicationConfig replicationConfig) {

}

Expand Down
Loading

0 comments on commit bf6f323

Please sign in to comment.