Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add throughput distribution and coarse datapoints for StressWorkerBench #18149

Merged
merged 12 commits into from
Sep 18, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,8 @@ public String runTask(StressBenchConfig config, ArrayList<String> args,
command.add(Configuration.get(PropertyKey.HOME) + "/bin/alluxio");
command.add("exec");
command.add("class");
command.add("--");
command.add(config.getClassName());
command.add("--");

// the cluster will run distributed tasks
command.add(BaseParameters.DISTRIBUTED_FLAG);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
* The Alluxio Open Foundation licenses this work under the Apache License, version 2.0
* (the "License"). You may not use this work except in compliance with the License, which is
* available at www.apache.org/licenses/LICENSE-2.0
*
* This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied, as more fully set forth in the License.
*
* See the NOTICE file distributed with this work for information regarding copyright ownership.
*/

package alluxio.stress.worker;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;

import java.util.List;

/**
* One coarse data point captures a series of data points.
* All data points are group by thread and time slices.
*/
twalluxio marked this conversation as resolved.
Show resolved Hide resolved
@JsonDeserialize(using = WorkerBenchCoarseDataPointDeserializer.class)
public class WorkerBenchCoarseDataPoint {
// properties: workerId, threadId, sliceId, records
twalluxio marked this conversation as resolved.
Show resolved Hide resolved
@JsonProperty("workerId")
private Long mWorkerId;
@JsonProperty("threadId")
private Long mThreadId;
@JsonProperty("data")
private List<WorkerBenchDataPoint> mData;
@JsonProperty("throughput")
private List<Long> mThroughput;

/**
* Creates a coarse data point.
*
* @param workerID the ID of the worker
* @param threadID the ID of the thread
* @param data the list of data point lists
* @param throughput the list of throughput
*/
public WorkerBenchCoarseDataPoint(Long workerID, Long threadID,
List<WorkerBenchDataPoint> data,
List<Long> throughput) {
mWorkerId = workerID;
mThreadId = threadID;
mData = data;
mThroughput = throughput;
}

/**
* @return the ID of the worker
*/
public Long getWid() {
return mWorkerId;
}

/**
* @param wid the ID of the worker
*/
public void setWid(Long wid) {
mWorkerId = wid;
}

/**
* @return the ID of the thread
*/
public Long getTid() {
return mThreadId;
}

/**
* @param tid the ID of the thread
*/
public void setTid(Long tid) {
mThreadId = tid;
}

/**
* @return the list of data point lists
*/
public List<WorkerBenchDataPoint> getData() {
return mData;
}

/**
* @param data the list of data point lists
*/
public void setData(List<WorkerBenchDataPoint> data) {
mData = data;
}

/**
* @param data add a data point list to the list of data point lists
*/
public void addDataPoint(WorkerBenchDataPoint data) {
mData.add(data);
}

/**
* @return the list of all throughput
*/
public List<Long> getThroughput() {
return mThroughput;
}

/**
* @param throughput the list of all throughput
*/
public void setThroughput(List<Long> throughput) {
mThroughput = throughput;
}

/**
* removes the list of all throughput after worker aggregation.
*/
public void clearThroughput() {
mThroughput.clear();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* The Alluxio Open Foundation licenses this work under the Apache License, version 2.0
* (the "License"). You may not use this work except in compliance with the License, which is
* available at www.apache.org/licenses/LICENSE-2.0
*
* This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied, as more fully set forth in the License.
*
* See the NOTICE file distributed with this work for information regarding copyright ownership.
*/

package alluxio.stress.worker;

import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.JsonDeserializer;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

/**
* A deserializer converting {@link WorkerBenchCoarseDataPoint} from JSON.
*/
public class WorkerBenchCoarseDataPointDeserializer
extends JsonDeserializer<WorkerBenchCoarseDataPoint> {
@Override
public WorkerBenchCoarseDataPoint deserialize(JsonParser parser, DeserializationContext ctx)
throws IOException {
ObjectMapper mapper = (ObjectMapper) parser.getCodec();
JsonNode node = parser.getCodec().readTree(parser);
Long wId = node.get("workerId").asLong();
Long tId = node.get("threadId").asLong();
List<Long> tpList = new ArrayList<>();
JsonNode tpNode = node.get("throughput");
if (tpNode != null) {
for (JsonNode throughput : tpNode) {
tpList.add(throughput.asLong());
}
}
List<WorkerBenchDataPoint> data = new ArrayList<>();
JsonNode dataNodes = node.get("data");
if (dataNodes != null) {
for (JsonNode dataNode: dataNodes) {
WorkerBenchDataPoint dataPoint = mapper
.treeToValue(dataNode, WorkerBenchDataPoint.class);
data.add(dataPoint);
}
}
return new WorkerBenchCoarseDataPoint(wId, tId, data, tpList);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,107 +14,47 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.google.common.base.MoreObjects;

/**
* One data point captures the information we collect from one I/O operation to a worker.
* The one operation may be a full scan or positioned read on a file.
*/
@JsonDeserialize(using = WorkerBenchDataPointDeserializer.class)
public class WorkerBenchDataPoint {
@JsonProperty("workerID")
public String mWorkerID;
@JsonProperty("threadID")
public long mThreadID;

@JsonProperty("duration")
public long mDuration;
@JsonProperty("start")
public long mStartMs;
@JsonProperty("ioBytes")
@JsonProperty("count")
public long mCount;
@JsonProperty("iobytes")
public long mIOBytes;

/**
* @param workerID the worker this I/O operation reads
* @param threadID the thread performing the I/O
* @param startMs start timestamp of the I/O
* @param duration duration of the file read operation
* @param ioBytes bytes read
* @param count number of files read
* @param ioBytes total bytes read
*/
@JsonCreator
public WorkerBenchDataPoint(@JsonProperty("workerID") String workerID,
@JsonProperty("threadID") long threadID,
@JsonProperty("start") long startMs,
@JsonProperty("duration") long duration,
@JsonProperty("ioBytes") long ioBytes) {
mWorkerID = workerID;
mThreadID = threadID;
mStartMs = startMs;
mDuration = duration;
public WorkerBenchDataPoint(long count, long ioBytes) {
mCount = count;
mIOBytes = ioBytes;
}

/**
* @return worker ID
*/
public String getWorkerID() {
return mWorkerID;
}

/**
* @return thread ID
*/
public long getThreadID() {
return mThreadID;
}

/**
* @return duration in ms
*/
public long getDuration() {
return mDuration;
}

/**
* @return start timestamp in long
* @return number of files read
*/
public long getStartMs() {
return mStartMs;
public long getCount() {
return mCount;
}

/**
* @return bytes read
* @return total bytes read
*/
public long getIOBytes() {
return mIOBytes;
}

/**
* @param workerID worker ID
* @param count number of files read
*/
public void setWorkerID(String workerID) {
mWorkerID = workerID;
}

/**
* @param threadID the thread ID
*/
public void setThreadID(long threadID) {
mThreadID = threadID;
}

/**
* @param duration duration in ms
*/
public void setDuration(long duration) {
mDuration = duration;
}

/**
* @param startMs start timestamp in long
*/
public void setStartMs(long startMs) {
mStartMs = startMs;
public void setCount(long count) {
mCount = count;
}

/**
Expand All @@ -123,13 +63,4 @@ public void setStartMs(long startMs) {
public void setIOBytes(long ioBytes) {
mIOBytes = ioBytes;
}

@Override
public String toString() {
twalluxio marked this conversation as resolved.
Show resolved Hide resolved
return MoreObjects.toStringHelper(this)
.add("threadID", mThreadID)
.add("ioBytes", mIOBytes)
.add("duration", mDuration)
.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,8 @@ public WorkerBenchDataPoint deserialize(JsonParser parser, DeserializationContex
throws IOException {
JsonNode node = parser.getCodec().readTree(parser);
return new WorkerBenchDataPoint(
node.get("workerID").asText(), node.get("threadID").asLong(),
node.get("startMs").asLong(), node.get("duration").asLong(),
node.get("ioBytes").asLong()
node.get("count").asLong(),
node.get("iobytes").asLong()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,13 @@ public final class WorkerBenchParameters extends FileSystemParameters {
description = "If true, skip the data file creation")
public boolean mSkipCreation = false;

@Parameter(names = {"--slice-size"},
description = "There will be too many I/O operations during the test, " +
"so instead of keeping one data point for each operation, " +
"the I/O performed in a small window will be tracked in one result. " +
"This argument sets the size of that window.")
public String mSliceSize = "1s";

@DynamicParameter(names = "--conf", description = "HDFS client configuration. Can be repeated.")
public Map<String, String> mConf = new HashMap<>();
}
Loading
Loading