Skip to content

Commit

Permalink
HDDS-11665. Minor optimizations on the write path (apache#7407)
Browse files Browse the repository at this point in the history
  • Loading branch information
duongkame authored Nov 12, 2024
1 parent cb81f0c commit 5663971
Show file tree
Hide file tree
Showing 4 changed files with 5 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ private CompletableFuture<RaftClientReply> sendRequestAsync(
// gets the minimum log index replicated to all servers
@Override
public long getReplicatedMinCommitIndex() {
return commitInfoMap.values().parallelStream()
return commitInfoMap.values().stream()
.mapToLong(Long::longValue).min().orElse(0);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.UUID;
import java.util.function.Function;

import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
Expand All @@ -35,7 +34,6 @@
final class ChunkBufferImplWithByteBuffer implements ChunkBuffer {
private final ByteBuffer buffer;
private final UncheckedAutoCloseable underlying;
private final UUID identity = UUID.randomUUID();

ChunkBufferImplWithByteBuffer(ByteBuffer buffer) {
this(buffer, null);
Expand Down Expand Up @@ -163,6 +161,6 @@ public int hashCode() {
@Override
public String toString() {
return getClass().getSimpleName() + ":limit=" + buffer.limit()
+ "@" + identity;
+ "@" + Integer.toHexString(super.hashCode());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ public void setChunks(List<ContainerProtos.ChunkInfo> chunks) {
size = singleChunk.getLen();
} else {
chunkList = chunks;
size = chunks.parallelStream()
size = chunks.stream()
.mapToLong(ContainerProtos.ChunkInfo::getLen)
.sum();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ private int writeToOutputStream(BlockOutputStreamEntry current,
if (retry) {
current.writeOnRetry(len);
} else {
waitForRetryHandling(current);
current.waitForRetryHandling(retryHandlingCondition);
current.write(b, off, writeLen);
offset += writeLen;
}
Expand Down Expand Up @@ -584,7 +584,7 @@ private void handleFlushOrClose(StreamAction op) throws IOException {
blockOutputStreamEntryPool.getCurrentStreamEntry();
if (entry != null) {
// If the current block is to handle retries, wait until all the retries are done.
waitForRetryHandling(entry);
doInWriteLock(() -> entry.waitForRetryHandling(retryHandlingCondition));
entry.registerCallReceived();
try {
handleStreamAction(entry, op);
Expand All @@ -608,10 +608,6 @@ private void handleFlushOrClose(StreamAction op) throws IOException {
}
}

private void waitForRetryHandling(BlockOutputStreamEntry currentEntry) throws InterruptedException {
doInWriteLock(() -> currentEntry.waitForRetryHandling(retryHandlingCondition));
}

private void handleStreamAction(BlockOutputStreamEntry entry,
StreamAction op) throws IOException {
Collection<DatanodeDetails> failedServers = entry.getFailedServers();
Expand Down

0 comments on commit 5663971

Please sign in to comment.