diff --git a/dora/job/server/src/main/java/alluxio/job/plan/stress/StressBenchDefinition.java b/dora/job/server/src/main/java/alluxio/job/plan/stress/StressBenchDefinition.java index be6d824d3d7c..f938e18d62a9 100644 --- a/dora/job/server/src/main/java/alluxio/job/plan/stress/StressBenchDefinition.java +++ b/dora/job/server/src/main/java/alluxio/job/plan/stress/StressBenchDefinition.java @@ -129,8 +129,8 @@ public String runTask(StressBenchConfig config, ArrayList args, command.add(Configuration.get(PropertyKey.HOME) + "/bin/alluxio"); command.add("exec"); command.add("class"); - command.add("--"); command.add(config.getClassName()); + command.add("--"); // the cluster will run distributed tasks command.add(BaseParameters.DISTRIBUTED_FLAG); diff --git a/dora/stress/common/src/main/java/alluxio/stress/worker/WorkerBenchCoarseDataPoint.java b/dora/stress/common/src/main/java/alluxio/stress/worker/WorkerBenchCoarseDataPoint.java new file mode 100644 index 000000000000..3223483ee1fe --- /dev/null +++ b/dora/stress/common/src/main/java/alluxio/stress/worker/WorkerBenchCoarseDataPoint.java @@ -0,0 +1,135 @@ +/* + * The Alluxio Open Foundation licenses this work under the Apache License, version 2.0 + * (the "License"). You may not use this work except in compliance with the License, which is + * available at www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied, as more fully set forth in the License. + * + * See the NOTICE file distributed with this work for information regarding copyright ownership. + */ + +package alluxio.stress.worker; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; + +import java.util.ArrayList; +import java.util.List; + +/** + * One coarseDataPoint captures and merges the performance of I/O operations in a specified window. + * The I/O operations are grouped by worker and by thread. In other words, I/O operations in + * a window in different threads will be recorded in different coarse data points. + */ +@JsonDeserialize(using = WorkerBenchCoarseDataPointDeserializer.class) +public class WorkerBenchCoarseDataPoint { + @JsonProperty("workerId") + private Long mWorkerId; + @JsonProperty("threadId") + private Long mThreadId; + @JsonProperty("data") + private List mData; + @JsonProperty("throughput") + private List mThroughput; + + /** + * Creates a coarse data point without data and throughput arrays. + * + * @param workerID the ID of the worker + * @param threadID the ID of the thread + */ + public WorkerBenchCoarseDataPoint(Long workerID, Long threadID) { + mWorkerId = workerID; + mThreadId = threadID; + mData = new ArrayList<>(); + mThroughput = new ArrayList<>(); + } + + /** + * Creates a coarse data point with data and throughput arrays. + * + * @param workerID the ID of the worker + * @param threadID the ID of the thread + * @param data the list of data point lists + * @param throughput the list of throughput + */ + public WorkerBenchCoarseDataPoint(Long workerID, Long threadID, + List data, + List throughput) { + mWorkerId = workerID; + mThreadId = threadID; + mData = data; + mThroughput = throughput; + } + + /** + * @return the ID of the worker + */ + public Long getWid() { + return mWorkerId; + } + + /** + * @param wid the ID of the worker + */ + public void setWid(Long wid) { + mWorkerId = wid; + } + + /** + * @return the ID of the thread + */ + public Long getTid() { + return mThreadId; + } + + /** + * @param tid the ID of the thread + */ + public void setTid(Long tid) { + mThreadId = tid; + } + + /** + * @return the list of data point lists + */ + public List getData() { + return mData; + } + + /** + * @param data the list of data point lists + */ + public void setData(List data) { + mData = data; + } + + /** + * @param data add a data point list to the list of data point lists + */ + public void addDataPoint(WorkerBenchDataPoint data) { + mData.add(data); + } + + /** + * @return the list of all throughput + */ + public List getThroughput() { + return mThroughput; + } + + /** + * @param throughput the list of all throughput + */ + public void setThroughput(List throughput) { + mThroughput = throughput; + } + + /** + * removes the list of all throughput after worker aggregation. + */ + public void clearThroughput() { + mThroughput.clear(); + } +} diff --git a/dora/stress/common/src/main/java/alluxio/stress/worker/WorkerBenchCoarseDataPointDeserializer.java b/dora/stress/common/src/main/java/alluxio/stress/worker/WorkerBenchCoarseDataPointDeserializer.java new file mode 100644 index 000000000000..22706f05355e --- /dev/null +++ b/dora/stress/common/src/main/java/alluxio/stress/worker/WorkerBenchCoarseDataPointDeserializer.java @@ -0,0 +1,54 @@ +/* + * The Alluxio Open Foundation licenses this work under the Apache License, version 2.0 + * (the "License"). You may not use this work except in compliance with the License, which is + * available at www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied, as more fully set forth in the License. + * + * See the NOTICE file distributed with this work for information regarding copyright ownership. + */ + +package alluxio.stress.worker; + +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.JsonDeserializer; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * A deserializer converting {@link WorkerBenchCoarseDataPoint} from JSON. + */ +public class WorkerBenchCoarseDataPointDeserializer + extends JsonDeserializer { + @Override + public WorkerBenchCoarseDataPoint deserialize(JsonParser parser, DeserializationContext ctx) + throws IOException { + ObjectMapper mapper = (ObjectMapper) parser.getCodec(); + JsonNode node = parser.getCodec().readTree(parser); + Long wId = node.get("workerId").asLong(); + Long tId = node.get("threadId").asLong(); + List tpList = new ArrayList<>(); + JsonNode tpNode = node.get("throughput"); + if (tpNode != null) { + for (JsonNode throughput : tpNode) { + tpList.add(throughput.asLong()); + } + } + List data = new ArrayList<>(); + JsonNode dataNodes = node.get("data"); + if (dataNodes != null) { + for (JsonNode dataNode: dataNodes) { + WorkerBenchDataPoint dataPoint = mapper + .treeToValue(dataNode, WorkerBenchDataPoint.class); + data.add(dataPoint); + } + } + return new WorkerBenchCoarseDataPoint(wId, tId, data, tpList); + } +} diff --git a/dora/stress/common/src/main/java/alluxio/stress/worker/WorkerBenchDataPoint.java b/dora/stress/common/src/main/java/alluxio/stress/worker/WorkerBenchDataPoint.java index 9109e73f06ed..7e36b75abe4c 100644 --- a/dora/stress/common/src/main/java/alluxio/stress/worker/WorkerBenchDataPoint.java +++ b/dora/stress/common/src/main/java/alluxio/stress/worker/WorkerBenchDataPoint.java @@ -14,7 +14,6 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.annotation.JsonDeserialize; -import com.google.common.base.MoreObjects; /** * One data point captures the information we collect from one I/O operation to a worker. @@ -22,99 +21,40 @@ */ @JsonDeserialize(using = WorkerBenchDataPointDeserializer.class) public class WorkerBenchDataPoint { - @JsonProperty("workerID") - public String mWorkerID; - @JsonProperty("threadID") - public long mThreadID; - - @JsonProperty("duration") - public long mDuration; - @JsonProperty("start") - public long mStartMs; - @JsonProperty("ioBytes") + @JsonProperty("count") + public long mCount; + @JsonProperty("iobytes") public long mIOBytes; /** - * @param workerID the worker this I/O operation reads - * @param threadID the thread performing the I/O - * @param startMs start timestamp of the I/O - * @param duration duration of the file read operation - * @param ioBytes bytes read + * @param count number of files read + * @param ioBytes total bytes read */ @JsonCreator - public WorkerBenchDataPoint(@JsonProperty("workerID") String workerID, - @JsonProperty("threadID") long threadID, - @JsonProperty("start") long startMs, - @JsonProperty("duration") long duration, - @JsonProperty("ioBytes") long ioBytes) { - mWorkerID = workerID; - mThreadID = threadID; - mStartMs = startMs; - mDuration = duration; + public WorkerBenchDataPoint(long count, long ioBytes) { + mCount = count; mIOBytes = ioBytes; } /** - * @return worker ID - */ - public String getWorkerID() { - return mWorkerID; - } - - /** - * @return thread ID - */ - public long getThreadID() { - return mThreadID; - } - - /** - * @return duration in ms - */ - public long getDuration() { - return mDuration; - } - - /** - * @return start timestamp in long + * @return number of files read */ - public long getStartMs() { - return mStartMs; + public long getCount() { + return mCount; } /** - * @return bytes read + * @return total bytes read */ public long getIOBytes() { return mIOBytes; } /** - * @param workerID worker ID + * @param count number of files read */ - public void setWorkerID(String workerID) { - mWorkerID = workerID; - } - - /** - * @param threadID the thread ID - */ - public void setThreadID(long threadID) { - mThreadID = threadID; - } - - /** - * @param duration duration in ms - */ - public void setDuration(long duration) { - mDuration = duration; - } - - /** - * @param startMs start timestamp in long - */ - public void setStartMs(long startMs) { - mStartMs = startMs; + public void setCount(long count) { + mCount = count; } /** @@ -123,13 +63,4 @@ public void setStartMs(long startMs) { public void setIOBytes(long ioBytes) { mIOBytes = ioBytes; } - - @Override - public String toString() { - return MoreObjects.toStringHelper(this) - .add("threadID", mThreadID) - .add("ioBytes", mIOBytes) - .add("duration", mDuration) - .toString(); - } } diff --git a/dora/stress/common/src/main/java/alluxio/stress/worker/WorkerBenchDataPointDeserializer.java b/dora/stress/common/src/main/java/alluxio/stress/worker/WorkerBenchDataPointDeserializer.java index 02859bff36b3..9d0f4cfa8403 100644 --- a/dora/stress/common/src/main/java/alluxio/stress/worker/WorkerBenchDataPointDeserializer.java +++ b/dora/stress/common/src/main/java/alluxio/stress/worker/WorkerBenchDataPointDeserializer.java @@ -27,9 +27,8 @@ public WorkerBenchDataPoint deserialize(JsonParser parser, DeserializationContex throws IOException { JsonNode node = parser.getCodec().readTree(parser); return new WorkerBenchDataPoint( - node.get("workerID").asText(), node.get("threadID").asLong(), - node.get("startMs").asLong(), node.get("duration").asLong(), - node.get("ioBytes").asLong() + node.get("count").asLong(), + node.get("iobytes").asLong() ); } } diff --git a/dora/stress/common/src/main/java/alluxio/stress/worker/WorkerBenchParameters.java b/dora/stress/common/src/main/java/alluxio/stress/worker/WorkerBenchParameters.java index f2d682a8792d..3879eb831014 100644 --- a/dora/stress/common/src/main/java/alluxio/stress/worker/WorkerBenchParameters.java +++ b/dora/stress/common/src/main/java/alluxio/stress/worker/WorkerBenchParameters.java @@ -87,6 +87,13 @@ public final class WorkerBenchParameters extends FileSystemParameters { description = "If true, skip the data file creation") public boolean mSkipCreation = false; + @Parameter(names = {"--slice-size"}, + description = "There will be too many I/O operations during the test, " + + "so instead of keeping one data point for each operation, " + + "the I/O performed in a small window will be tracked in one result. " + + "This argument sets the size of that window.") + public String mSliceSize = "1s"; + @DynamicParameter(names = "--conf", description = "HDFS client configuration. Can be repeated.") public Map mConf = new HashMap<>(); } diff --git a/dora/stress/common/src/main/java/alluxio/stress/worker/WorkerBenchSummary.java b/dora/stress/common/src/main/java/alluxio/stress/worker/WorkerBenchSummary.java index 3194c2ae8a36..f4789b7c963e 100644 --- a/dora/stress/common/src/main/java/alluxio/stress/worker/WorkerBenchSummary.java +++ b/dora/stress/common/src/main/java/alluxio/stress/worker/WorkerBenchSummary.java @@ -39,7 +39,7 @@ public final class WorkerBenchSummary extends GeneralBenchSummary mDurationPercentiles; + private List mThroughputPercentiles; /** * Creates an instance. @@ -47,7 +47,7 @@ public final class WorkerBenchSummary extends GeneralBenchSummary(); - mDurationPercentiles = new ArrayList<>(); + mThroughputPercentiles = new ArrayList<>(); } /** @@ -65,15 +65,13 @@ public WorkerBenchSummary(WorkerBenchTaskResult mergedTaskResults, mNodeResults = nodes; mThroughput = getIOMBps(); - mDurationPercentiles = new ArrayList<>(); - Histogram durationHistogram = new Histogram( - FormatUtils.parseTimeSize(mParameters.mDuration) - + FormatUtils.parseTimeSize(mParameters.mWarmup), + mThroughputPercentiles = new ArrayList<>(); + Histogram throughputHistogram = new Histogram( + FormatUtils.parseSpaceSize(mParameters.mFileSize), StressConstants.TIME_HISTOGRAM_PRECISION); - mergedTaskResults.getDataPoints().forEach(stat -> - durationHistogram.recordValue(stat.getDuration())); + mergedTaskResults.getAllThroughput().forEach(throughputHistogram::recordValue); for (int i = 0; i <= 100; i++) { - mDurationPercentiles.add(durationHistogram.getValueAtPercentile(i)); + mThroughputPercentiles.add(throughputHistogram.getValueAtPercentile(i)); } } @@ -150,15 +148,15 @@ public void setIOBytes(long IOBytes) { /** * @return 0~100 percentiles of recorded durations */ - public List getDurationPercentiles() { - return mDurationPercentiles; + public List getThroughputPercentiles() { + return mThroughputPercentiles; } /** * @param percentiles a list of calculated percentiles from recorded durations */ - public void setDurationPercentiles(List percentiles) { - mDurationPercentiles = percentiles; + public void setThroughputPercentiles(List percentiles) { + mThroughputPercentiles = percentiles; } @Override @@ -185,7 +183,7 @@ public List generate(List results) { Pair, List> fieldNames = Parameters.partitionFieldNames( summaries.stream().map(x -> x.mParameters).collect(Collectors.toList())); - // Split up common description into 100 character chunks, for the sub title + // Split up common description into 100 character chunks, for the subtitle List subTitle = new ArrayList<>(Splitter.fixedLength(100).splitToList( summaries.get(0).mParameters.getDescription(fieldNames.getFirst()))); diff --git a/dora/stress/common/src/main/java/alluxio/stress/worker/WorkerBenchTaskResult.java b/dora/stress/common/src/main/java/alluxio/stress/worker/WorkerBenchTaskResult.java index ee00f4b4843a..a1755b1af30d 100644 --- a/dora/stress/common/src/main/java/alluxio/stress/worker/WorkerBenchTaskResult.java +++ b/dora/stress/common/src/main/java/alluxio/stress/worker/WorkerBenchTaskResult.java @@ -39,8 +39,9 @@ public final class WorkerBenchTaskResult implements TaskResult { private long mEndMs; private long mIOBytes; private List mErrors; - private List mDataPoints; - private List mDurationPercentiles; + private final List mDataPoints; + private List mAllThroughputValues; + private List mThroughputPercentiles; /** * Creates an instance. @@ -49,7 +50,8 @@ public WorkerBenchTaskResult() { // Default constructor required for json deserialization mErrors = new ArrayList<>(); mDataPoints = new ArrayList<>(); - mDurationPercentiles = new ArrayList<>(); + mAllThroughputValues = new ArrayList<>(); + mThroughputPercentiles = new ArrayList<>(); } /** @@ -60,6 +62,23 @@ public WorkerBenchTaskResult() { public void merge(WorkerBenchTaskResult result) throws Exception { // When merging results within a node, we need to merge all the error information. mErrors.addAll(result.mErrors); + + if (mAllThroughputValues.isEmpty()) { + for (WorkerBenchCoarseDataPoint dataPoint : mDataPoints) { + mAllThroughputValues.addAll(new ArrayList<>(dataPoint.getThroughput())); + dataPoint.clearThroughput(); + } + } + + if (result.mAllThroughputValues.isEmpty()) { + for (WorkerBenchCoarseDataPoint dataPoint : result.mDataPoints) { + mAllThroughputValues.addAll(new ArrayList<>(dataPoint.getThroughput())); + dataPoint.clearThroughput(); + } + } else { + mAllThroughputValues.addAll(result.mAllThroughputValues); + } + mDataPoints.addAll(result.mDataPoints); aggregateByWorker(result); } @@ -172,27 +191,41 @@ public void setErrors(List errors) { /** * @return 100 percentiles for durations of all I/O operations */ - public List getDurationPercentiles() { - return mDurationPercentiles; + public List getThroughputPercentiles() { + return mThroughputPercentiles; } /** * @param percentiles 100 percentiles for durations of all I/O operations */ - public void setDurationPercentiles(List percentiles) { - mDurationPercentiles = percentiles; + public void setThroughputPercentiles(List percentiles) { + mThroughputPercentiles = percentiles; + } + + /** + * @return all instant throughput values of I/O operations + */ + public List getAllThroughput() { + return mAllThroughputValues; + } + + /** + * @param allThroughputValues all instant throughput values of I/O operations + */ + public void setAllThroughput(List allThroughputValues) { + mAllThroughputValues = allThroughputValues; } /** * From the collected operation data, calculates 100 percentiles. */ public void calculatePercentiles() { - Histogram durationHistogram = new Histogram( - FormatUtils.parseTimeSize(mParameters.mDuration), - StressConstants.TIME_HISTOGRAM_PRECISION); - mDataPoints.forEach(stat -> durationHistogram.recordValue(stat.getDuration())); + Histogram throughputHistogram = new Histogram( + FormatUtils.parseSpaceSize(mParameters.mFileSize), + StressConstants.TIME_HISTOGRAM_PRECISION); + mAllThroughputValues.forEach(throughputHistogram::recordValue); for (int i = 0; i <= 100; i++) { - mDurationPercentiles.add(durationHistogram.getValueAtPercentile(i)); + mThroughputPercentiles.add(throughputHistogram.getValueAtPercentile(i)); } } @@ -206,29 +239,29 @@ public void addErrorMessage(String errMessage) { /** * @return all data points for I/O operations */ - public List getDataPoints() { + public List getDataPoints() { return mDataPoints; } /** * @param point one data point for one I/O operation */ - public void addDataPoint(WorkerBenchDataPoint point) { + public void addDataPoint(WorkerBenchCoarseDataPoint point) { mDataPoints.add(point); } /** * @param stats data points for all recorded I/O operations */ - public void addDataPoints(Collection stats) { + public void addDataPoints(Collection stats) { mDataPoints.addAll(stats); } /** * Clears all data points from the result. */ - public void clearDataPoints() { - mDataPoints.clear(); + public void clearAllThroughput() { + mAllThroughputValues.clear(); } @Override @@ -246,9 +279,10 @@ public WorkerBenchSummary aggregate(Iterable results) thr for (WorkerBenchTaskResult result : results) { result.calculatePercentiles(); mergedTaskResult.merge(result); - LOG.info("Test results from worker {} has been merged, the data points are now cleared.", + result.clearAllThroughput(); + LOG.info("Test results from worker {} has been merged." + + "Individual data points are now cleared from output.", result.getBaseParameters().mId); - result.clearDataPoints(); nodeResults.put(result.getBaseParameters().mId, result); } diff --git a/dora/stress/shell/src/main/java/alluxio/stress/cli/Benchmark.java b/dora/stress/shell/src/main/java/alluxio/stress/cli/Benchmark.java index 24af48baeb33..e4e637edfdb6 100644 --- a/dora/stress/shell/src/main/java/alluxio/stress/cli/Benchmark.java +++ b/dora/stress/shell/src/main/java/alluxio/stress/cli/Benchmark.java @@ -195,8 +195,8 @@ protected String runSingleTask(String[] args) throws Exception { command.add(conf.get(PropertyKey.HOME) + "/bin/alluxio"); command.add("exec"); command.add("class"); - command.add("--"); command.add(className); + command.add("--"); command.addAll(Arrays.asList(args)); command.add(BaseParameters.IN_PROCESS_FLAG); command.addAll(mBaseParameters.mJavaOpts.stream().map(String::trim) diff --git a/dora/stress/shell/src/main/java/alluxio/stress/cli/client/StressClientIOBench.java b/dora/stress/shell/src/main/java/alluxio/stress/cli/client/StressClientIOBench.java index 3c2238bab7fd..f4de122c0f9c 100644 --- a/dora/stress/shell/src/main/java/alluxio/stress/cli/client/StressClientIOBench.java +++ b/dora/stress/shell/src/main/java/alluxio/stress/cli/client/StressClientIOBench.java @@ -103,10 +103,10 @@ public String getBenchDescription() { "# This test will run create a 500MB file with block size 15KB on 1 worker,", "# then test the ReadArray operation for 30s and calculate the throughput after 10s " + "warmup.", - "$ bin/alluxio runClass alluxio.stress.cli.client.StressClientIOBench --operation Write " - + "--base alluxio:///stress-client-io-base --file-size 500m --buffer-size 64k " + "$ bin/alluxio exec class alluxio.stress.cli.client.StressClientIOBench -- --operation " + + "Write --base alluxio:///stress-client-io-base --file-size 500m --buffer-size 64k " + "--block-size 16k --write-num-workers 1 --cluster --cluster-limit 1", - "$ bin/alluxio runClass alluxio.stress.cli.client.StressClientIOBench --operation " + "$ bin/alluxio exec class alluxio.stress.cli.client.StressClientIOBench -- --operation " + "ReadArray --base alluxio:///stress-client-io-base --file-size 500m --buffer-size " + "64k --block-size 16k --warmup 10s --duration 30s --write-num-workers 1 --cluster " + "--cluster-limit 1\n")); diff --git a/dora/stress/shell/src/main/java/alluxio/stress/cli/worker/StressWorkerBench.java b/dora/stress/shell/src/main/java/alluxio/stress/cli/worker/StressWorkerBench.java index ad00754cfea9..daeea146dfa4 100644 --- a/dora/stress/shell/src/main/java/alluxio/stress/cli/worker/StressWorkerBench.java +++ b/dora/stress/shell/src/main/java/alluxio/stress/cli/worker/StressWorkerBench.java @@ -22,6 +22,7 @@ import alluxio.stress.BaseParameters; import alluxio.stress.cli.AbstractStressBench; import alluxio.stress.common.FileSystemParameters; +import alluxio.stress.worker.WorkerBenchCoarseDataPoint; import alluxio.stress.worker.WorkerBenchDataPoint; import alluxio.stress.worker.WorkerBenchParameters; import alluxio.stress.worker.WorkerBenchTaskResult; @@ -123,7 +124,7 @@ public String getBenchDescription() { + "be prepared for each test thread." + "# The threads will keeping reading for 30s including a 10s warmup." + "# So the result captures I/O performance from the last 20s.", - "$ bin/alluxio runClass alluxio.stress.cli.worker.StressWorkerBench \\\n" + "$ bin/alluxio exec class alluxio.stress.cli.worker.StressWorkerBench -- \\\n" + "--threads 32 --base alluxio:///stress-worker-base --file-size 100m \\\n" + "--warmup 10s --duration 30s --cluster\n" )); @@ -455,35 +456,74 @@ private void runInternal() throws Exception { CommonUtils.sleepMs(waitMs); SAMPLING_LOG.info("Test started and recording will be started after the warm up at {}", CommonUtils.convertMsToDate(recordMs, dateFormat)); + + String workerID = mBaseParameters.mIndex; + int lastDashIndex = workerID.lastIndexOf("-"); + if (lastDashIndex != -1) { + workerID = toString().substring(lastDashIndex + 1); + } + WorkerBenchCoarseDataPoint dp = new WorkerBenchCoarseDataPoint( + Long.parseLong(workerID), + Thread.currentThread().getId()); + int sliceCount = 0; + int sliceIoBytes = 0; + List throughputList = new ArrayList<>(); + int lastSlice = 0; + while (!Thread.currentThread().isInterrupted() && CommonUtils.getCurrentMs() < mContext.getEndMs()) { // Keep reading the same file - WorkerBenchDataPoint dataPoint = applyOperation(); - long currentMs = CommonUtils.getCurrentMs(); - // Start recording after the warmup - if (currentMs > recordMs) { - mResult.addDataPoint(dataPoint); - if (dataPoint.getIOBytes() > 0) { - mResult.incrementIOBytes(dataPoint.getIOBytes()); + long startMs = CommonUtils.getCurrentMs() - recordMs; + long bytesRead = applyOperation(); + long duration = CommonUtils.getCurrentMs() - recordMs - startMs; + if (startMs > 0) { + if (bytesRead > 0) { + mResult.setIOBytes(mResult.getIOBytes() + bytesRead); + sliceCount += 1; + sliceIoBytes += bytesRead; + // if duration is 0ms, treat is as 1ms for now + if (duration == 0) { + throughputList.add(bytesRead); + } + throughputList.add(bytesRead / duration); + int currentSlice = (int) (startMs + / FormatUtils.parseTimeSize(mParameters.mSliceSize)); + while (currentSlice > lastSlice) { + dp.addDataPoint(new WorkerBenchDataPoint(sliceCount, sliceIoBytes)); + sliceCount = 0; + sliceIoBytes = 0; + lastSlice++; + } } else { LOG.warn("Thread for file {} read 0 bytes from I/O", mFilePaths[mTargetFileIndex]); } } else { - SAMPLING_LOG.info("Ignored data point during warmup: {}", dataPoint); + SAMPLING_LOG.info("Ignored record during warmup: {} bytes", bytesRead); } } + + int finalSlice = (int) (FormatUtils.parseTimeSize(mParameters.mDuration) + / FormatUtils.parseTimeSize(mParameters.mSliceSize)); + while (finalSlice > lastSlice) { + dp.addDataPoint(new WorkerBenchDataPoint(sliceCount, sliceIoBytes)); + sliceCount = 0; + sliceIoBytes = 0; + lastSlice++; + } + + dp.setThroughput(throughputList); + mResult.addDataPoint(dp); } /** * Read the file by the offset and length based on the given index. * @return the actual red byte number */ - private WorkerBenchDataPoint applyOperation() throws IOException { + private long applyOperation() throws IOException { Path filePath = mFilePaths[mTargetFileIndex]; int offset = mOffsets[mTargetFileIndex]; int length = mLengths[mTargetFileIndex]; - long startOperation = CommonUtils.getCurrentMs(); if (mInStream == null) { mInStream = mFs.open(filePath); } @@ -515,10 +555,7 @@ private WorkerBenchDataPoint applyOperation() throws IOException { } } } - long endOperation = CommonUtils.getCurrentMs(); - return new WorkerBenchDataPoint( - mBaseParameters.mIndex, Thread.currentThread().getId(), - startOperation, endOperation - startOperation, bytesRead); + return bytesRead; } private void closeInStream() {