diff --git a/dora/core/client/fs/src/main/java/alluxio/client/file/dora/WorkerLocationPolicy.java b/dora/core/client/fs/src/main/java/alluxio/client/file/dora/WorkerLocationPolicy.java index da8f681a9edd..5272d25e1005 100644 --- a/dora/core/client/fs/src/main/java/alluxio/client/file/dora/WorkerLocationPolicy.java +++ b/dora/core/client/fs/src/main/java/alluxio/client/file/dora/WorkerLocationPolicy.java @@ -17,6 +17,9 @@ import alluxio.exception.status.ResourceExhaustedException; import alluxio.util.CommonUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.List; /** @@ -53,6 +56,8 @@ List getPreferredWorkers(List blockWorkerInfos * The factory for the {@link WorkerLocationPolicy}. */ class Factory { + private static final Logger LOG = LoggerFactory.getLogger(Factory.class); + private Factory() {} // prevent instantiation /** @@ -63,9 +68,12 @@ private Factory() {} // prevent instantiation */ public static WorkerLocationPolicy create(AlluxioConfiguration conf) { try { - return CommonUtils.createNewClassInstance( + WorkerLocationPolicy workerLocationPolicy = CommonUtils.createNewClassInstance( conf.getClass(PropertyKey.USER_WORKER_SELECTION_POLICY), new Class[] {AlluxioConfiguration.class}, new Object[] {conf}); + LOG.debug("Using worker location policy: {}", + workerLocationPolicy.getClass().getSimpleName()); + return workerLocationPolicy; } catch (ClassCastException e) { throw new RuntimeException(e); } diff --git a/dora/stress/common/src/main/java/alluxio/stress/worker/WorkerBenchMode.java b/dora/stress/common/src/main/java/alluxio/stress/worker/WorkerBenchMode.java new file mode 100644 index 000000000000..0efe7834334d --- /dev/null +++ b/dora/stress/common/src/main/java/alluxio/stress/worker/WorkerBenchMode.java @@ -0,0 +1,36 @@ +/* + * The Alluxio Open Foundation licenses this work under the Apache License, version 2.0 + * (the "License"). You may not use this work except in compliance with the License, which is + * available at www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied, as more fully set forth in the License. + * + * See the NOTICE file distributed with this work for information regarding copyright ownership. + */ + +package alluxio.stress.worker; + +/** + * WorkerBenchMode, HASH or LOCAL_ONLY. + */ +public enum WorkerBenchMode { + HASH("HASH"), + LOCAL_ONLY("LOCAL_ONLY"); + + private final String mName; + + /** + * Constructor. + * + * @param name of the client type + */ + WorkerBenchMode(String name) { + mName = name; + } + + @Override + public String toString() { + return mName; + } +} diff --git a/dora/stress/common/src/main/java/alluxio/stress/worker/WorkerBenchParameters.java b/dora/stress/common/src/main/java/alluxio/stress/worker/WorkerBenchParameters.java index 2ada4c6569d9..a59368e024ca 100644 --- a/dora/stress/common/src/main/java/alluxio/stress/worker/WorkerBenchParameters.java +++ b/dora/stress/common/src/main/java/alluxio/stress/worker/WorkerBenchParameters.java @@ -97,6 +97,14 @@ public final class WorkerBenchParameters extends FileSystemParameters { + "This argument sets the size of that window.") public String mSliceSize = "1s"; + @Parameter(names = {"--mode"}, + description = "Specifies which worker the test process reads from." + + "Possible values are: [HASH, LOCAL_ONLY]" + + "HASH -> alluxio.client.file.dora.ConsistentHashPolicy" + + "LOCAL_ONLY -> alluxio.client.file.dora.LocalWorkerPolicy" + + "The default is HASH.") + public WorkerBenchMode mMode = WorkerBenchMode.HASH; + @DynamicParameter(names = "--conf", description = "HDFS client configuration. Can be repeated.") public Map mConf = new HashMap<>(); } diff --git a/dora/stress/shell/src/main/java/alluxio/stress/cli/worker/StressWorkerBench.java b/dora/stress/shell/src/main/java/alluxio/stress/cli/worker/StressWorkerBench.java index 1cc62d83dc85..a6de4da39c99 100644 --- a/dora/stress/shell/src/main/java/alluxio/stress/cli/worker/StressWorkerBench.java +++ b/dora/stress/shell/src/main/java/alluxio/stress/cli/worker/StressWorkerBench.java @@ -188,9 +188,23 @@ public void prepare() throws Exception { hdfsConf.set( String.format("fs.%s.impl.disable.cache", (new URI(mParameters.mBasePath)).getScheme()), "true"); + + // default mode value: hash, using consistent hash // TODO(jiacheng): we may need a policy to only IO to remote worker - hdfsConf.set(PropertyKey.Name.USER_WORKER_SELECTION_POLICY, - "alluxio.client.file.dora.LocalWorkerPolicy"); + switch (mParameters.mMode) { + case HASH: + hdfsConf.set(PropertyKey.Name.USER_WORKER_SELECTION_POLICY, + "alluxio.client.file.dora.ConsistentHashPolicy"); + break; + case LOCAL_ONLY: + hdfsConf.set(PropertyKey.Name.USER_WORKER_SELECTION_POLICY, + "alluxio.client.file.dora.LocalWorkerPolicy"); + break; + default: + throw new IllegalArgumentException("Unrecognized mode" + mParameters.mMode); + } + LOG.info("User worker selection policy: {}", mParameters.mMode); + for (Map.Entry entry : mParameters.mConf.entrySet()) { hdfsConf.set(entry.getKey(), entry.getValue()); }