Skip to content

Commit

Permalink
Add support for floating point node.processors setting (#89281)
Browse files Browse the repository at this point in the history
This commit adds support for floating point node.processors setting.
This is useful when the nodes run in an environment where the CPU
time assigned to the ES node process is limited (i.e. using cgroups).
With this change, the system would be able to size the thread pools
accordingly, in this case it would round up the provided setting
to the closest integer.
  • Loading branch information
fcofdez authored Aug 17, 2022
1 parent 695d1a8 commit 837a8d7
Show file tree
Hide file tree
Showing 6 changed files with 87 additions and 13 deletions.
4 changes: 3 additions & 1 deletion docs/reference/modules/threadpool.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,9 @@ thread_pool:
The number of processors is automatically detected, and the thread pool settings
are automatically set based on it. In some cases it can be useful to override
the number of detected processors. This can be done by explicitly setting the
`node.processors` setting.
`node.processors` setting. This setting accepts floating point numbers, this
can be useful in environments where the Elasticsearch nodes are configured
to run with CPU limits, such as cpu shares or quota under `Cgroups`.

[source,yaml]
--------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ public Netty4HttpServerTransport(
clusterSettings,
tracer
);
Netty4Utils.setAvailableProcessors(EsExecutors.NODE_PROCESSORS_SETTING.get(settings));
Netty4Utils.setAvailableProcessors(EsExecutors.allocatedProcessors(settings));
NettyAllocator.logAllocatorDescriptionIfNeeded();
this.sharedGroupFactory = sharedGroupFactory;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public Netty4Transport(
SharedGroupFactory sharedGroupFactory
) {
super(settings, version, threadPool, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, networkService);
Netty4Utils.setAvailableProcessors(EsExecutors.NODE_PROCESSORS_SETTING.get(settings));
Netty4Utils.setAvailableProcessors(EsExecutors.allocatedProcessors(settings));
NettyAllocator.logAllocatorDescriptionIfNeeded();
this.sharedGroupFactory = sharedGroupFactory;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public static BytesReference toBytesReference(final ByteBuf buffer) {
public static Recycler<BytesRef> createRecycler(Settings settings) {
// If this method is called by super ctor the processors will not be set. Accessing NettyAllocator initializes netty's internals
// setting the processors. We must do it ourselves first just in case.
setAvailableProcessors(EsExecutors.NODE_PROCESSORS_SETTING.get(settings));
setAvailableProcessors(EsExecutors.allocatedProcessors(settings));
return NettyAllocator.getRecycler();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,31 @@ public class EsExecutors {
/**
* Setting to manually control the number of allocated processors. This setting is used to adjust thread pool sizes per node. The
* default value is {@link Runtime#availableProcessors()} but should be manually controlled if not all processors on the machine are
* available to Elasticsearch (e.g., because of CPU limits).
* available to Elasticsearch (e.g., because of CPU limits). Note that this setting accepts floating point processors.
* If a rounded number is needed, always use {@link EsExecutors#allocatedProcessors(Settings)}.
*/
public static final Setting<Integer> NODE_PROCESSORS_SETTING = Setting.intSetting(
public static final Setting<Double> NODE_PROCESSORS_SETTING = new Setting<>(
"node.processors",
Runtime.getRuntime().availableProcessors(),
1,
Runtime.getRuntime().availableProcessors(),
Double.toString(Runtime.getRuntime().availableProcessors()),
textValue -> {
double numberOfProcessors = Double.parseDouble(textValue);
if (Double.isNaN(numberOfProcessors) || Double.isInfinite(numberOfProcessors)) {
String err = "Failed to parse value [" + textValue + "] for setting [node.processors]";
throw new IllegalArgumentException(err);
}

if (numberOfProcessors <= 0.0) {
String err = "Failed to parse value [" + textValue + "] for setting [node.processors] must be > 0";
throw new IllegalArgumentException(err);
}

final int maxNumberOfProcessors = Runtime.getRuntime().availableProcessors();
if (numberOfProcessors > maxNumberOfProcessors) {
String err = "Failed to parse value [" + textValue + "] for setting [node.processors] must be <= " + maxNumberOfProcessors;
throw new IllegalArgumentException(err);
}
return numberOfProcessors;
},
Property.NodeScope
);

Expand All @@ -55,7 +73,7 @@ public class EsExecutors {
* @return the number of allocated processors
*/
public static int allocatedProcessors(final Settings settings) {
return NODE_PROCESSORS_SETTING.get(settings);
return (int) Math.ceil(NODE_PROCESSORS_SETTING.get(settings));
}

public static PrioritizedEsThreadPoolExecutor newSinglePrioritizing(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import static org.hamcrest.Matchers.either;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasToString;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThan;

/**
Expand Down Expand Up @@ -437,19 +438,72 @@ public void testGetTasks() throws InterruptedException {
}

public void testNodeProcessorsBound() {
final Setting<Integer> processorsSetting = EsExecutors.NODE_PROCESSORS_SETTING;
final Setting<Double> processorsSetting = EsExecutors.NODE_PROCESSORS_SETTING;
final int available = Runtime.getRuntime().availableProcessors();
final int processors = randomIntBetween(available + 1, Integer.MAX_VALUE);
final double processors = randomDoubleBetween(available + Math.ulp(available), Float.MAX_VALUE, true);
final Settings settings = Settings.builder().put(processorsSetting.getKey(), processors).build();
final IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> processorsSetting.get(settings));
final String expected = String.format(
Locale.ROOT,
"Failed to parse value [%d] for setting [%s] must be <= %d",
"Failed to parse value [%s] for setting [%s] must be <= %d",
processors,
processorsSetting.getKey(),
available
);
assertThat(e, hasToString(containsString(expected)));
}

public void testNodeProcessorsIsRoundedUpWhenUsingFloats() {
assertThat(
EsExecutors.allocatedProcessors(Settings.builder().put(EsExecutors.NODE_PROCESSORS_SETTING.getKey(), Double.MIN_VALUE).build()),
is(equalTo(1))
);

assertThat(
EsExecutors.allocatedProcessors(Settings.builder().put(EsExecutors.NODE_PROCESSORS_SETTING.getKey(), 0.2).build()),
is(equalTo(1))
);

assertThat(
EsExecutors.allocatedProcessors(Settings.builder().put(EsExecutors.NODE_PROCESSORS_SETTING.getKey(), 1.2).build()),
is(equalTo(2))
);

assertThat(
EsExecutors.allocatedProcessors(
Settings.builder().put(EsExecutors.NODE_PROCESSORS_SETTING.getKey(), Runtime.getRuntime().availableProcessors()).build()
),
is(equalTo(Runtime.getRuntime().availableProcessors()))
);
}

public void testNodeProcessorsFloatValidation() {
final Setting<Double> processorsSetting = EsExecutors.NODE_PROCESSORS_SETTING;

{
final Settings settings = Settings.builder().put(processorsSetting.getKey(), 0.0).build();
expectThrows(IllegalArgumentException.class, () -> processorsSetting.get(settings));
}

{
final Settings settings = Settings.builder().put(processorsSetting.getKey(), Double.NaN).build();
expectThrows(IllegalArgumentException.class, () -> processorsSetting.get(settings));
}

{
final Settings settings = Settings.builder().put(processorsSetting.getKey(), Double.POSITIVE_INFINITY).build();
expectThrows(IllegalArgumentException.class, () -> processorsSetting.get(settings));
}

{
final Settings settings = Settings.builder().put(processorsSetting.getKey(), Double.NEGATIVE_INFINITY).build();
expectThrows(IllegalArgumentException.class, () -> processorsSetting.get(settings));
}

{
final Settings settings = Settings.builder().put(processorsSetting.getKey(), -1.5).build();
expectThrows(IllegalArgumentException.class, () -> processorsSetting.get(settings));
}
}

}

0 comments on commit 837a8d7

Please sign in to comment.