Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support StressWorkerBench using consistent hash policy #18246

Merged
merged 6 commits into from
Oct 10, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
import alluxio.exception.status.ResourceExhaustedException;
import alluxio.util.CommonUtils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;

/**
Expand Down Expand Up @@ -53,6 +56,8 @@ List<BlockWorkerInfo> getPreferredWorkers(List<BlockWorkerInfo> blockWorkerInfos
* The factory for the {@link WorkerLocationPolicy}.
*/
class Factory {
private static final Logger LOG = LoggerFactory.getLogger(Factory.class);

private Factory() {} // prevent instantiation

/**
Expand All @@ -63,9 +68,12 @@ private Factory() {} // prevent instantiation
*/
public static WorkerLocationPolicy create(AlluxioConfiguration conf) {
try {
return CommonUtils.createNewClassInstance(
WorkerLocationPolicy workerLocationPolicy = CommonUtils.createNewClassInstance(
conf.getClass(PropertyKey.USER_WORKER_SELECTION_POLICY),
new Class[] {AlluxioConfiguration.class}, new Object[] {conf});
LOG.debug(String.format("Using worker location policy: %s",
twalluxio marked this conversation as resolved.
Show resolved Hide resolved
workerLocationPolicy.getClass().getSimpleName()));
return workerLocationPolicy;
} catch (ClassCastException e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,14 @@ public final class WorkerBenchParameters extends FileSystemParameters {
+ "This argument sets the size of that window.")
public String mSliceSize = "1s";

@Parameter(names = {"--mode"},
description = "Specifies which worker the test process reads from."
+ "Possible values are: [HASH, LOCAL_ONLY]"
+ "HASH - alluxio.client.file.dora.ConsistentHashPolicy"
+ "LOCAL_ONLY - alluxio.client.file.dora.LocalWorkerPolicy"
+ "The default is HASH.")
public String mMode = "HASH";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you use enum here instead of a string? Refer to my previous comment on how.


@DynamicParameter(names = "--conf", description = "HDFS client configuration. Can be repeated.")
public Map<String, String> mConf = new HashMap<>();
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadLocalRandom;
Expand Down Expand Up @@ -188,9 +189,23 @@ public void prepare() throws Exception {
hdfsConf.set(
String.format("fs.%s.impl.disable.cache", (new URI(mParameters.mBasePath)).getScheme()),
"true");

// default mode value: hash, using consistent hash
// TODO(jiacheng): we may need a policy to only IO to remote worker
hdfsConf.set(PropertyKey.Name.USER_WORKER_SELECTION_POLICY,
"alluxio.client.file.dora.LocalWorkerPolicy");
if (Objects.equals(mParameters.mMode, "LOCAL_ONLY")) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here mParameters.mMode should be an enum so you just

switch (mParameters.mMode) {
  case LOCAL_ONLY: ...; break;
  case HASH: ...;break;
  default: throw new IllegalArgumentException("Unrecognized mode" + mParameters.mMode);
}

hdfsConf.set(PropertyKey.Name.USER_WORKER_SELECTION_POLICY,
"alluxio.client.file.dora.LocalWorkerPolicy");
} else if (Objects.equals(mParameters.mMode, "HASH")) {
hdfsConf.set(PropertyKey.Name.USER_WORKER_SELECTION_POLICY,
"alluxio.client.file.dora.ConsistentHashPolicy");
} else {
LOG.warn("Invalid mode. Switching to consistent hash policy.");
twalluxio marked this conversation as resolved.
Show resolved Hide resolved
hdfsConf.set(PropertyKey.Name.USER_WORKER_SELECTION_POLICY,
"alluxio.client.file.dora.ConsistentHashPolicy");
mParameters.mMode = "HASH";
}
LOG.info("User worker selection policy: {}", mParameters.mMode);

for (Map.Entry<String, String> entry : mParameters.mConf.entrySet()) {
hdfsConf.set(entry.getKey(), entry.getValue());
}
Expand Down
Loading