Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HDDS-11413. PipelineManagerImpl lock optimization reduces AllocateBlock latency #7160

Merged
merged 2 commits into from
Dec 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ public ContainerInfo allocateContainer(
// Acquire pipeline manager lock, to avoid any updates to pipeline
// while allocate container happens. This is to avoid scenario like
// mentioned in HDDS-5655.
pipelineManager.acquireReadLock();
pipelineManager.acquireReadLock(replicationConfig);
lock.lock();
List<Pipeline> pipelines;
Pipeline pipeline;
Expand All @@ -190,7 +190,7 @@ public ContainerInfo allocateContainer(
}
} finally {
lock.unlock();
pipelineManager.releaseReadLock();
pipelineManager.releaseReadLock(replicationConfig);
}

if (pipelines.isEmpty()) {
Expand All @@ -203,7 +203,7 @@ public ContainerInfo allocateContainer(
" matching pipeline for replicationConfig: " + replicationConfig
+ ", State:PipelineState.OPEN", e);
}
pipelineManager.acquireReadLock();
pipelineManager.acquireReadLock(replicationConfig);
lock.lock();
try {
pipelines = pipelineManager
Expand All @@ -218,7 +218,7 @@ public ContainerInfo allocateContainer(
}
} finally {
lock.unlock();
pipelineManager.releaseReadLock();
pipelineManager.releaseReadLock(replicationConfig);
}
}
return containerInfo;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,20 +207,20 @@ void reinitialize(Table<PipelineID, Pipeline> pipelineStore)
/**
* Acquire read lock.
*/
void acquireReadLock();
void acquireReadLock(ReplicationConfig replicationConfig);

/**
* Release read lock.
*/
void releaseReadLock();
void releaseReadLock(ReplicationConfig replicationConfig);

/**
* Acquire write lock.
*/
void acquireWriteLock();
void acquireWriteLock(ReplicationConfig replicationConfig);

/**
* Release write lock.
*/
void releaseWriteLock();
void releaseWriteLock(ReplicationConfig replicationConfig);
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ public class PipelineManagerImpl implements PipelineManager {

// Limit the number of on-going ratis operation to be 1.
private final ReentrantReadWriteLock lock;
private final ReentrantReadWriteLock ecPipelineLock;
private PipelineFactory pipelineFactory;
private PipelineStateManager stateManager;
private BackgroundPipelineCreator backgroundPipelineCreator;
Expand Down Expand Up @@ -105,6 +106,7 @@ protected PipelineManagerImpl(ConfigurationSource conf,
SCMContext scmContext,
Clock clock) {
this.lock = new ReentrantReadWriteLock();
this.ecPipelineLock = new ReentrantReadWriteLock();
this.pipelineFactory = pipelineFactory;
this.stateManager = pipelineStateManager;
this.conf = conf;
Expand Down Expand Up @@ -248,7 +250,7 @@ public Pipeline createPipeline(ReplicationConfig replicationConfig,
throws IOException {
checkIfPipelineCreationIsAllowed(replicationConfig);

acquireWriteLock();
acquireWriteLock(replicationConfig);
final Pipeline pipeline;
try {
try {
Expand All @@ -261,7 +263,7 @@ public Pipeline createPipeline(ReplicationConfig replicationConfig,
addPipelineToManager(pipeline);
return pipeline;
} finally {
releaseWriteLock();
releaseWriteLock(replicationConfig);
}
}

Expand All @@ -286,15 +288,16 @@ private void addPipelineToManager(Pipeline pipeline)
throws IOException {
HddsProtos.Pipeline pipelineProto = pipeline.getProtobufMessage(
ClientVersion.CURRENT_VERSION);
acquireWriteLock();
ReplicationConfig replicationConfig = pipeline.getReplicationConfig();
acquireWriteLock(replicationConfig);
try {
stateManager.addPipeline(pipelineProto);
} catch (IOException ex) {
LOG.debug("Failed to add pipeline {}.", pipeline, ex);
metrics.incNumPipelineCreationFailed();
throw ex;
} finally {
releaseWriteLock();
releaseWriteLock(replicationConfig);
}
recordMetricsForPipeline(pipeline);
}
Expand Down Expand Up @@ -419,19 +422,23 @@ public int getNumberOfContainers(PipelineID pipelineID) throws IOException {
public void openPipeline(PipelineID pipelineId)
throws IOException {
HddsProtos.PipelineID pipelineIdProtobuf = pipelineId.getProtobuf();
acquireWriteLock();
final Pipeline pipeline;

final Pipeline pipeline = getPipeline(pipelineId);
ReplicationConfig replicationConfig = null;
try {
pipeline = stateManager.getPipeline(pipelineId);
if (pipeline.isClosed()) {
throw new IOException("Closed pipeline can not be opened");
}
replicationConfig = pipeline.getReplicationConfig();
acquireWriteLock(replicationConfig);
if (pipeline.getPipelineState() == Pipeline.PipelineState.ALLOCATED) {
stateManager.updatePipelineState(pipelineIdProtobuf,
HddsProtos.PipelineState.PIPELINE_OPEN);
}
} finally {
releaseWriteLock();
if (replicationConfig != null) {
releaseWriteLock(replicationConfig);
}
}
metrics.incNumPipelineCreated();
metrics.createPerPipelineMetrics(pipeline);
Expand All @@ -447,14 +454,15 @@ protected void removePipeline(Pipeline pipeline)
throws IOException {
pipelineFactory.close(pipeline.getType(), pipeline);
HddsProtos.PipelineID pipelineID = pipeline.getId().getProtobuf();
acquireWriteLock();
ReplicationConfig replicationConfig = pipeline.getReplicationConfig();
acquireWriteLock(replicationConfig);
try {
stateManager.removePipeline(pipelineID);
} catch (IOException ex) {
metrics.incNumPipelineDestroyFailed();
throw ex;
} finally {
releaseWriteLock();
releaseWriteLock(replicationConfig);
}
LOG.info("Pipeline {} removed.", pipeline);
metrics.incNumPipelineDestroyed();
Expand Down Expand Up @@ -507,19 +515,20 @@ public void closePipeline(PipelineID pipelineID) throws IOException {
HddsProtos.PipelineID pipelineIDProtobuf = pipelineID.getProtobuf();
// close containers.
closeContainersForPipeline(pipelineID);
if (!getPipeline(pipelineID).isClosed()) {
acquireWriteLock();
Pipeline pipeline = getPipeline(pipelineID);
if (!pipeline.isClosed()) {
ReplicationConfig replicationConfig = pipeline.getReplicationConfig();
acquireWriteLock(replicationConfig);
try {
stateManager.updatePipelineState(pipelineIDProtobuf,
HddsProtos.PipelineState.PIPELINE_CLOSED);
} finally {
releaseWriteLock();
releaseWriteLock(replicationConfig);
}
LOG.info("Pipeline {} moved to CLOSED state", pipelineID);
}

metrics.removePipelineMetrics(pipelineID);

}

/**
Expand Down Expand Up @@ -684,12 +693,14 @@ public int minPipelineLimit(Pipeline pipeline) {
public void activatePipeline(PipelineID pipelineID)
throws IOException {
HddsProtos.PipelineID pipelineIDProtobuf = pipelineID.getProtobuf();
acquireWriteLock();
Pipeline pipeline = getPipeline(pipelineID);
ReplicationConfig replicationConfig = pipeline.getReplicationConfig();
acquireWriteLock(replicationConfig);
try {
stateManager.updatePipelineState(pipelineIDProtobuf,
HddsProtos.PipelineState.PIPELINE_OPEN);
} finally {
releaseWriteLock();
releaseWriteLock(replicationConfig);
}
}

Expand All @@ -703,12 +714,14 @@ public void activatePipeline(PipelineID pipelineID)
public void deactivatePipeline(PipelineID pipelineID)
throws IOException {
HddsProtos.PipelineID pipelineIDProtobuf = pipelineID.getProtobuf();
acquireWriteLock();
Pipeline pipeline = getPipeline(pipelineID);
ReplicationConfig replicationConfig = pipeline.getReplicationConfig();
acquireWriteLock(replicationConfig);
try {
stateManager.updatePipelineState(pipelineIDProtobuf,
HddsProtos.PipelineState.PIPELINE_DORMANT);
} finally {
releaseWriteLock();
releaseWriteLock(replicationConfig);
}
}

Expand Down Expand Up @@ -931,22 +944,38 @@ private void recordMetricsForPipeline(Pipeline pipeline) {
}

@Override
public void acquireReadLock() {
lock.readLock().lock();
public void acquireReadLock(ReplicationConfig replicationConfig) {
if (replicationConfig.getReplicationType().equals(ReplicationType.EC)) {
ecPipelineLock.readLock().lock();
} else {
lock.readLock().lock();
}
}

@Override
public void releaseReadLock() {
lock.readLock().unlock();
public void releaseReadLock(ReplicationConfig replicationConfig) {
if (replicationConfig.getReplicationType().equals(ReplicationType.EC)) {
ecPipelineLock.readLock().unlock();
} else {
lock.readLock().unlock();
}
}

@Override
public void acquireWriteLock() {
lock.writeLock().lock();
public void acquireWriteLock(ReplicationConfig replicationConfig) {
if (replicationConfig.getReplicationType().equals(ReplicationType.EC)) {
ecPipelineLock.writeLock().lock();
} else {
lock.writeLock().lock();
}
}

@Override
public void releaseWriteLock() {
lock.writeLock().unlock();
public void releaseWriteLock(ReplicationConfig replicationConfig) {
if (replicationConfig.getReplicationType().equals(ReplicationType.EC)) {
ecPipelineLock.writeLock().unlock();
} else {
lock.writeLock().unlock();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -156,13 +156,13 @@ private ContainerInfo getContainer(ReplicationConfig repConfig, String owner,
// Acquire pipeline manager lock, to avoid any updates to pipeline
// while allocate container happens. This is to avoid scenario like
// mentioned in HDDS-5655.
pipelineManager.acquireReadLock();
pipelineManager.acquireReadLock(repConfig);
try {
List<Pipeline> availablePipelines = findPipelinesByState(repConfig,
excludeList, Pipeline.PipelineState.OPEN);
return selectContainer(availablePipelines, req, owner, excludeList);
} finally {
pipelineManager.releaseReadLock();
pipelineManager.releaseReadLock(repConfig);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -332,22 +332,22 @@ public Map<String, Integer> getPipelineInfo() {
}

@Override
public void acquireReadLock() {
public void acquireReadLock(ReplicationConfig replicationConfig) {

}

@Override
public void releaseReadLock() {
public void releaseReadLock(ReplicationConfig replicationConfig) {

}

@Override
public void acquireWriteLock() {
public void acquireWriteLock(ReplicationConfig replicationConfig) {

}

@Override
public void releaseWriteLock() {
public void releaseWriteLock(ReplicationConfig replicationConfig) {

}

Expand Down
Loading
Loading