Skip to content

Commit

Permalink
HDDS-11200. Hsync client-side metrics (apache#7371)
Browse files Browse the repository at this point in the history
  • Loading branch information
duongkame authored Nov 4, 2024
1 parent 58d1443 commit 6f9db61
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.hadoop.metrics2.lib.MetricsRegistry;
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
import org.apache.hadoop.metrics2.lib.MutableQuantiles;
import org.apache.hadoop.metrics2.lib.MutableRate;
import org.apache.hadoop.ozone.OzoneConsts;

import java.util.Map;
Expand All @@ -52,6 +53,21 @@ public final class ContainerClientMetrics {
private MutableCounterLong totalWriteChunkCalls;
@Metric
private MutableCounterLong totalWriteChunkBytes;

@Metric
private MutableRate hsyncSynchronizedWorkNs;
@Metric
private MutableRate hsyncSendWriteChunkNs;
@Metric
private MutableRate hsyncWaitForFlushNs;
@Metric
private MutableRate hsyncWatchForCommitNs;
@Metric
private MutableCounterLong writeChunksDuringWrite;
@Metric
private MutableCounterLong flushesDuringWrite;


private MutableQuantiles[] listBlockLatency;
private MutableQuantiles[] getBlockLatency;
private MutableQuantiles[] getCommittedBlockLengthLatency;
Expand Down Expand Up @@ -249,4 +265,28 @@ Map<PipelineID, MutableCounterLong> getWriteChunkCallsByPipeline() {
Map<UUID, MutableCounterLong> getWriteChunksCallsByLeaders() {
return writeChunksCallsByLeaders;
}

public MutableRate getHsyncSynchronizedWorkNs() {
return hsyncSynchronizedWorkNs;
}

public MutableRate getHsyncSendWriteChunkNs() {
return hsyncSendWriteChunkNs;
}

public MutableRate getHsyncWaitForFlushNs() {
return hsyncWaitForFlushNs;
}

public MutableRate getHsyncWatchForCommitNs() {
return hsyncWatchForCommitNs;
}

public MutableCounterLong getWriteChunksDuringWrite() {
return writeChunksDuringWrite;
}

public MutableCounterLong getFlushesDuringWrite() {
return flushesDuringWrite;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,9 @@
import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls.putBlockAsync;
import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls.writeChunkAsync;
import static org.apache.hadoop.ozone.OzoneConsts.INCREMENTAL_CHUNK_LIST;
import static org.apache.hadoop.ozone.util.MetricUtil.captureLatencyNs;

import org.apache.hadoop.util.Time;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -360,6 +362,7 @@ public void write(int b) throws IOException {
private void writeChunkIfNeeded() throws IOException {
if (currentBufferRemaining == 0) {
LOG.debug("WriteChunk from write(), buffer = {}", currentBuffer);
clientMetrics.getWriteChunksDuringWrite().incr();
writeChunk(currentBuffer);
updateWriteChunkLength();
}
Expand Down Expand Up @@ -404,6 +407,7 @@ private void doFlushOrWatchIfNeeded() throws IOException {
updatePutBlockLength();
CompletableFuture<PutBlockResult> putBlockFuture = executePutBlock(false, false);
recordWatchForCommitAsync(putBlockFuture);
clientMetrics.getFlushesDuringWrite().incr();
}

if (bufferPool.isAtCapacity()) {
Expand Down Expand Up @@ -532,12 +536,16 @@ private CompletableFuture<Void> watchForCommit(long commitIndex) {
}

LOG.debug("Entering watchForCommit commitIndex = {}", commitIndex);
final long start = Time.monotonicNowNanos();
return sendWatchForCommit(commitIndex)
.thenAccept(this::checkReply)
.exceptionally(e -> {
throw new FlushRuntimeException(setIoException(e));
})
.whenComplete((r, e) -> LOG.debug("Leaving watchForCommit commitIndex = {}", commitIndex));
.whenComplete((r, e) -> {
LOG.debug("Leaving watchForCommit commitIndex = {}", commitIndex);
clientMetrics.getHsyncWatchForCommitNs().add(Time.monotonicNowNanos() - start);
});
}

private void checkReply(XceiverClientReply reply) {
Expand Down Expand Up @@ -693,12 +701,15 @@ private void handleFlushInternal(boolean close)
throws IOException, InterruptedException, ExecutionException {
checkOpen();
LOG.debug("Start handleFlushInternal close={}", close);
CompletableFuture<Void> toWaitFor = handleFlushInternalSynchronized(close);
CompletableFuture<Void> toWaitFor = captureLatencyNs(clientMetrics.getHsyncSynchronizedWorkNs(),
() -> handleFlushInternalSynchronized(close));

if (toWaitFor != null) {
LOG.debug("Waiting for flush");
try {
long startWaiting = Time.monotonicNowNanos();
toWaitFor.get();
clientMetrics.getHsyncWaitForFlushNs().add(Time.monotonicNowNanos() - startWaiting);
} catch (ExecutionException ex) {
if (ex.getCause() instanceof FlushRuntimeException) {
throw ((FlushRuntimeException) ex.getCause()).cause;
Expand Down Expand Up @@ -727,6 +738,7 @@ public void waitForAllPendingFlushes() throws IOException {
}

private synchronized CompletableFuture<Void> handleFlushInternalSynchronized(boolean close) throws IOException {
long start = Time.monotonicNowNanos();
CompletableFuture<PutBlockResult> putBlockResultFuture = null;
// flush the last chunk data residing on the currentBuffer
if (totalWriteChunkLength < writtenDataLength) {
Expand Down Expand Up @@ -768,6 +780,7 @@ private synchronized CompletableFuture<Void> handleFlushInternalSynchronized(boo
if (putBlockResultFuture != null) {
recordWatchForCommitAsync(putBlockResultFuture);
}
clientMetrics.getHsyncSendWriteChunkNs().add(Time.monotonicNowNanos() - start);
return lastFlushFuture;
}

Expand Down

0 comments on commit 6f9db61

Please sign in to comment.