-
Notifications
You must be signed in to change notification settings - Fork 1.8k
/
IndexingPressureService.java
84 lines (70 loc) · 3.2 KB
/
IndexingPressureService.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
/*
* Copyright OpenSearch Contributors.
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.index;
import org.opensearch.action.admin.indices.stats.CommonStatsFlags;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.stats.IndexingPressureStats;
import org.opensearch.index.stats.ShardIndexingPressureStats;
/**
* Sets up classes for node/shard level indexing pressure.
* Provides abstraction and orchestration for indexing pressure interfaces when called from Transport Actions or for Stats.
*/
public class IndexingPressureService {
private final ShardIndexingPressure shardIndexingPressure;
public IndexingPressureService(Settings settings, ClusterService clusterService) {
shardIndexingPressure = new ShardIndexingPressure(settings, clusterService);
}
public Releasable markCoordinatingOperationStarted(long bytes, boolean forceExecution) {
if (isShardIndexingPressureEnabled() == false) {
return shardIndexingPressure.markCoordinatingOperationStarted(bytes, forceExecution);
} else {
return () -> {};
}
}
public Releasable markCoordinatingOperationStarted(ShardId shardId, long bytes, boolean forceExecution) {
if (isShardIndexingPressureEnabled()) {
return shardIndexingPressure.markCoordinatingOperationStarted(shardId, bytes, forceExecution);
} else {
return () -> {};
}
}
public Releasable markPrimaryOperationStarted(ShardId shardId, long bytes, boolean forceExecution) {
if (isShardIndexingPressureEnabled()) {
return shardIndexingPressure.markPrimaryOperationStarted(shardId, bytes, forceExecution);
} else {
return shardIndexingPressure.markPrimaryOperationStarted(bytes, forceExecution);
}
}
public Releasable markPrimaryOperationLocalToCoordinatingNodeStarted(ShardId shardId, long bytes) {
if (isShardIndexingPressureEnabled()) {
return shardIndexingPressure.markPrimaryOperationLocalToCoordinatingNodeStarted(shardId, bytes);
} else {
return shardIndexingPressure.markPrimaryOperationLocalToCoordinatingNodeStarted(bytes);
}
}
public Releasable markReplicaOperationStarted(ShardId shardId, long bytes, boolean forceExecution) {
if (isShardIndexingPressureEnabled()) {
return shardIndexingPressure.markReplicaOperationStarted(shardId, bytes, forceExecution);
} else {
return shardIndexingPressure.markReplicaOperationStarted(bytes, forceExecution);
}
}
public IndexingPressureStats nodeStats() {
return shardIndexingPressure.stats();
}
public ShardIndexingPressureStats shardStats(CommonStatsFlags statsFlags) {
return shardIndexingPressure.shardStats(statsFlags);
}
private boolean isShardIndexingPressureEnabled() {
return shardIndexingPressure.isShardIndexingPressureEnabled();
}
// visible for testing
ShardIndexingPressure getShardIndexingPressure() {
return shardIndexingPressure;
}
}