Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable smartSegmentLoading on the Coordinator #13197

Merged
merged 109 commits into from
Jun 19, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
109 commits
Select commit Hold shift + click to select a range
43cb8b8
Allow cancellation of load queue items
kfaraz Oct 10, 2022
c1af872
Fix tests
kfaraz Oct 10, 2022
1992777
Emit broadcast metrics
kfaraz Oct 12, 2022
4a22f94
Add ServerHolder.canLoadSegment
kfaraz Oct 13, 2022
22413c6
Maintain replication level during tier shift
kfaraz Oct 13, 2022
40ac8f7
Fix injection and initialization of SegmentStateManager
kfaraz Oct 15, 2022
60801df
Emit some more metrics
kfaraz Oct 16, 2022
d2bde9d
Fix LGTM failure
kfaraz Oct 16, 2022
f5b9f19
Fix tests
kfaraz Oct 16, 2022
bd0db30
Enable priorization of segment actions
kfaraz Oct 18, 2022
b7a8c01
Clean up replication throttling
kfaraz Oct 18, 2022
ee80210
Fix inspection
kfaraz Oct 18, 2022
7a6318c
Simplify SegmentLoader logic, fix under-replication bug
kfaraz Oct 19, 2022
030d2df
Maintain count of loading, dropping, moving segments in lookup
kfaraz Oct 30, 2022
6d964c3
Rename segment actions
kfaraz Oct 31, 2022
0d91c2c
Remove SegmentState and use SegmentAction instead
kfaraz Nov 1, 2022
5816396
Merge branch 'master' of github.com:apache/druid into improve_load_queue
kfaraz Nov 1, 2022
7f6b4ef
Fix imports
kfaraz Nov 2, 2022
2b61baa
Fix minor bug in cancellation
kfaraz Nov 3, 2022
0c70c10
Minor fixes
kfaraz Nov 3, 2022
5e39b3c
Merge branch 'master' of github.com:apache/druid into improve_load_queue
kfaraz Nov 9, 2022
d4bf514
Merge branch 'master' of github.com:apache/druid into improve_load_queue
kfaraz Nov 20, 2022
804056b
Allow move of loading segments
kfaraz Nov 20, 2022
0adbf55
Merge branch 'master' of github.com:apache/druid into improve_load_queue
kfaraz Nov 23, 2022
7c85131
Fix tests
kfaraz Nov 23, 2022
7a9a51c
Revert change to indexing-service
kfaraz Nov 23, 2022
a05eba9
Merge branch 'master' of github.com:apache/druid into improve_load_queue
kfaraz Jan 11, 2023
bf87d31
Fix tests
kfaraz Jan 11, 2023
94ae076
Fix more tests
kfaraz Jan 11, 2023
cd23d6a
Merge branch 'master' of github.com:apache/druid into improve_load_queue
kfaraz Feb 16, 2023
ed02b9f
Fix imports
kfaraz Feb 16, 2023
2f3ab6d
Revert timeline changes
kfaraz Feb 22, 2023
0bedbbe
Remove unused import
kfaraz Feb 22, 2023
4e3b8d3
Merge branch 'master' of github.com:apache/druid into improve_load_queue
kfaraz Feb 22, 2023
0df1f87
Merge branch 'master' of github.com:apache/druid into improve_load_queue
kfaraz Feb 23, 2023
adf4c7c
Fix cachingCost to subtract self cost of segment
kfaraz Feb 24, 2023
bd1c549
Update BalancerStrategy javadocs
kfaraz Feb 24, 2023
d763e06
Merge branch 'master' of github.com:apache/druid into improve_load_queue
kfaraz Feb 24, 2023
ae135df
Merge branch 'master' of github.com:apache/druid into improve_load_queue
kfaraz Feb 27, 2023
5c6a6d4
Merge branch 'master' of github.com:apache/druid into improve_load_queue
kfaraz Feb 28, 2023
9709a8f
Missing change from merge
kfaraz Feb 28, 2023
c4d4df2
Fix violation of maxSegmentsInNodeLoadingQueue
kfaraz Mar 1, 2023
d055982
Allow maxSegmentsToMove to be automatically calculated
kfaraz Mar 6, 2023
a0effed
Update tests for maxSegmentsToMove
kfaraz Mar 6, 2023
f0f9a9d
Fix violation of replicationThrottleLimit
kfaraz Mar 7, 2023
dce67e3
Fix HttpLoadQueuePeon bug
kfaraz Mar 7, 2023
b8b0d99
Merge branch 'master' of github.com:apache/druid into improve_load_queue
kfaraz Mar 7, 2023
dfa4701
Remove unused imports
kfaraz Mar 7, 2023
470cb6b
Remove unused import
kfaraz Mar 7, 2023
c1386ec
Fix segment balancing loadSpec bug
kfaraz Mar 10, 2023
932036d
Fix segment balancing loadSpec bug
kfaraz Mar 16, 2023
fcd969d
Merge branch 'master' of github.com:apache/druid into improve_load_queue
kfaraz Apr 5, 2023
2ad1869
Cleanup CoordinatorRunStats
kfaraz Apr 9, 2023
b4f434a
Add dimension datasource for segment stats
kfaraz Apr 9, 2023
ca03ab8
Merge branch 'master' of github.com:apache/druid into improve_load_queue
kfaraz Apr 10, 2023
04b0050
Prioritize move of loading segments, allow moves when tier is busy
kfaraz Apr 11, 2023
4bef581
Merge branch 'master' of github.com:apache/druid into improve_load_queue
kfaraz Apr 11, 2023
b1dda80
Fix tests and checkstyle
kfaraz Apr 11, 2023
a2e8575
Fix flakiness in test
kfaraz Apr 11, 2023
b948874
Fix tests, add logs
kfaraz Apr 11, 2023
fc3ac3a
Cleanup configs, LoadRuleTest
kfaraz Apr 11, 2023
82b0589
Cleanup DruidClusterBuilder, remove unused code
kfaraz Apr 12, 2023
56e4a56
Merge branch 'master' of github.com:apache/druid into improve_load_queue
kfaraz Apr 12, 2023
49bce2e
Add TierSegmentBalancer, remove EmitClusterStats, add CollectSegmentS…
kfaraz Apr 13, 2023
d79b270
Speed up decommissioning of servers by cancelling loads
kfaraz Apr 14, 2023
c4e653e
Merge branch 'master' of github.com:apache/druid into improve_load_queue
kfaraz Apr 14, 2023
69f2ef8
Create package coordinator.balancer
kfaraz Apr 14, 2023
e42f5d3
Fix tests
kfaraz Apr 14, 2023
52ba2c5
Fix errors
kfaraz Apr 15, 2023
7266800
Add SegmentLoader.moveSegment
kfaraz Apr 15, 2023
c8928d9
Merge branch 'master' of github.com:apache/druid into improve_load_queue
kfaraz Apr 15, 2023
000e967
Track assignment errors and log
kfaraz Apr 15, 2023
6e557d5
Cleanup logs
kfaraz Apr 16, 2023
9239a8b
Add SegmentAction.MOVE_FROM and use that in ServerHolder.getProjectSe…
kfaraz Apr 19, 2023
05d0d63
Create uniformIntervalStrategy
kfaraz Apr 19, 2023
d1895dc
Merge branch 'master' of github.com:apache/druid into improve_load_queue
kfaraz Apr 19, 2023
fe093b3
Clean up reporting of debug stats
kfaraz Apr 21, 2023
a3015ea
Merge branch 'master' of github.com:apache/druid into improve_load_queue
kfaraz Apr 21, 2023
d1bc527
Test CoordinatorDynamicConfig.debugDimensions
kfaraz Apr 21, 2023
7995c9f
Remove unused import
kfaraz Apr 21, 2023
fc874b3
Fix issues from review
kfaraz Apr 21, 2023
4aa2688
Fix minor bugs, limit movement if already moving
kfaraz Apr 26, 2023
21bf59a
Use configured replicantLifetime
kfaraz Apr 26, 2023
4432cf8
Fix compilation error
kfaraz Apr 27, 2023
2244425
Cleanup reporting of under-replicated segments
kfaraz Apr 27, 2023
b38f999
Add UniformIntervalStrategy.pickServersToDrop
kfaraz May 1, 2023
870940b
fix usage of StrategicSegmentAssigner
kfaraz May 2, 2023
2bce764
Fix tests
kfaraz May 2, 2023
ac2e28f
Merge branch 'master' of github.com:apache/druid into improve_load_queue
kfaraz May 2, 2023
1ed7b3c
Remove unused code
kfaraz May 2, 2023
3f826f7
Remove unused code
kfaraz May 2, 2023
415a4a6
Report under-replicated count as 0 for fully replicated datasources f…
kfaraz May 11, 2023
7d9be8e
Merge branch 'master' of github.com:apache/druid into improve_load_queue
kfaraz May 11, 2023
ec555bf
Merge branch 'master' of github.com:apache/druid into improve_load_queue
kfaraz May 19, 2023
ae3d045
Fix reporting of segment/unavailable/count
kfaraz May 20, 2023
da5edd7
Improve perf of ServerHolder.getProjectedSegments()
kfaraz May 20, 2023
5fd1521
Merge branch 'master' of github.com:apache/druid into improve_load_queue
kfaraz May 22, 2023
7d6b020
Merge branch 'master' of github.com:apache/druid into improve_load_queue
kfaraz May 30, 2023
6c6633b
Add CoordinatorRunStats.getSnapshot()
kfaraz Jun 5, 2023
79110c9
Merge branch 'master' of github.com:apache/druid into improve_load_queue
kfaraz Jun 5, 2023
9374780
Merge branch 'master' of github.com:apache/druid into improve_load_queue
kfaraz Jun 8, 2023
aa3db93
Merge branch 'master' of github.com:apache/druid into improve_load_queue
kfaraz Jun 10, 2023
9d0c756
Emit metrics moveSkipped/count, assignSkipped/count. Add config smart…
kfaraz Jun 13, 2023
6f4b306
Add SegmentReplicaCount, SegmentLoadingConfig
kfaraz Jun 17, 2023
e4cd4f4
Merge branch 'master' of github.com:apache/druid into improve_load_queue
kfaraz Jun 17, 2023
da88ec4
Break up SegmentReplicantLookup into SegmentReplicationStatus and Seg…
kfaraz Jun 18, 2023
51e9dd0
Merge branch 'master' of github.com:apache/druid into improve_load_queue
kfaraz Jun 19, 2023
a661837
Fix test, address feedback
kfaraz Jun 19, 2023
1d785ca
Fix dependency
kfaraz Jun 19, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,14 @@
package org.apache.druid.server.coordinator;

import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.druid.client.ImmutableDruidDataSource;
import org.apache.druid.client.ImmutableDruidServer;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.coordinator.balancer.BalancerSegmentHolder;
import org.apache.druid.server.coordinator.balancer.BalancerStrategy;
import org.apache.druid.server.coordinator.balancer.CostBalancerStrategy;
import org.apache.druid.server.coordinator.loadqueue.LoadQueuePeonTester;
import org.apache.druid.server.coordinator.balancer.ReservoirSegmentSampler;
import org.apache.druid.server.coordinator.loading.LoadQueuePeonTester;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.Interval;
import org.openjdk.jmh.annotations.Benchmark;
Expand Down Expand Up @@ -67,7 +64,7 @@ public class BalancerStrategyBenchmark
private static final Interval TEST_SEGMENT_INTERVAL = Intervals.of("2012-03-15T00:00:00.000/2012-03-16T00:00:00.000");
private static final int NUMBER_OF_SERVERS = 20;

@Param({"default", "50percentOfSegmentsToConsiderPerMove", "useBatchedSegmentSampler"})
@Param({"default", "useBatchedSegmentSampler"})
private String mode;

@Param({"10000", "100000", "1000000"})
Expand All @@ -77,26 +74,13 @@ public class BalancerStrategyBenchmark
private int maxSegmentsToMove;

private final List<ServerHolder> serverHolders = new ArrayList<>();
private boolean useBatchedSegmentSampler;
private int reservoirSize = 1;
private double percentOfSegmentsToConsider = 100;
private final BalancerStrategy balancerStrategy = new CostBalancerStrategy(
MoreExecutors.listeningDecorator(Execs.multiThreaded(1, "BalancerStrategyBenchmark-%d"))
);

@Setup(Level.Trial)
public void setup()
{
switch (mode) {
case "50percentOfSegmentsToConsiderPerMove":
percentOfSegmentsToConsider = 50;
useBatchedSegmentSampler = false;
break;
case "useBatchedSegmentSampler":
reservoirSize = maxSegmentsToMove;
useBatchedSegmentSampler = true;
break;
default:
if ("useBatchedSegmentSampler".equals(mode)) {
reservoirSize = maxSegmentsToMove;
}

List<List<DataSegment>> segmentList = new ArrayList<>(NUMBER_OF_SERVERS);
Expand Down Expand Up @@ -135,21 +119,9 @@ public void setup()
@Benchmark
public void pickSegmentsToMove(Blackhole blackhole)
{
Iterator<BalancerSegmentHolder> iterator;
if (useBatchedSegmentSampler) {
iterator = balancerStrategy.pickSegmentsToMove(
serverHolders,
Collections.emptySet(),
reservoirSize,
false
);
} else {
iterator = balancerStrategy.pickSegmentsToMove(
serverHolders,
Collections.emptySet(),
percentOfSegmentsToConsider
);
}
Iterator<BalancerSegmentHolder> iterator = ReservoirSegmentSampler
.pickMovableLoadedSegmentsFrom(serverHolders, reservoirSize, Collections.emptySet())
.iterator();

for (int i = 0; i < maxSegmentsToMove && iterator.hasNext(); i++) {
blackhole.consume(iterator.next());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import org.apache.druid.common.config.JacksonConfigManager;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.server.coordinator.duty.KillUnusedSegments;
import org.apache.druid.server.coordinator.loadqueue.LoadQueuePeon;
import org.apache.druid.server.coordinator.loading.LoadQueuePeon;
import org.apache.druid.server.coordinator.stats.Dimension;

import javax.annotation.Nonnull;
Expand All @@ -37,7 +37,6 @@
import javax.validation.constraints.NotNull;
import java.util.Collection;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -236,20 +235,13 @@ private Map<Dimension, String> validateDebugDimensions(Map<String, String> debug
return validDebugDimensions;
}

final Map<String, Dimension> nameToDimension = new HashMap<>();
for (Dimension dimension : Dimension.values()) {
nameToDimension.put(dimension.reportedName(), dimension);
final String dimensionValue = debugDimensions.get(dimension.reportedName());
if (dimensionValue != null) {
validDebugDimensions.put(dimension, dimensionValue);
}
}

debugDimensions.forEach(
(dimensionName, value) -> {
Dimension dimension = nameToDimension.get(dimensionName);
if (dimension != null && value != null) {
validDebugDimensions.put(dimension, value);
}
}
);

return validDebugDimensions;
}

Expand Down Expand Up @@ -511,7 +503,8 @@ public boolean equals(Object o)
&& Objects.equals(
dataSourcesToNotKillStalePendingSegmentsIn,
that.dataSourcesToNotKillStalePendingSegmentsIn)
&& Objects.equals(decommissioningNodes, that.decommissioningNodes);
&& Objects.equals(decommissioningNodes, that.decommissioningNodes)
&& Objects.equals(debugDimensions, that.debugDimensions);
}

@Override
Expand All @@ -534,7 +527,8 @@ public int hashCode()
decommissioningNodes,
decommissioningMaxPercentOfMaxSegmentsToMove,
pauseCoordination,
maxNonPrimaryReplicantsToLoad
maxNonPrimaryReplicantsToLoad,
debugDimensions
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,13 @@
import org.apache.druid.server.coordinator.duty.MarkOvershadowedSegmentsAsUnused;
import org.apache.druid.server.coordinator.duty.RunRules;
import org.apache.druid.server.coordinator.duty.UnloadUnusedSegments;
import org.apache.druid.server.coordinator.loadqueue.LoadQueuePeon;
import org.apache.druid.server.coordinator.loadqueue.LoadQueueTaskMaster;
import org.apache.druid.server.coordinator.loadqueue.SegmentLoadQueueManager;
import org.apache.druid.server.coordinator.loading.LoadQueuePeon;
import org.apache.druid.server.coordinator.loading.LoadQueueTaskMaster;
import org.apache.druid.server.coordinator.loading.ReplicationThrottler;
import org.apache.druid.server.coordinator.loading.SegmentLoadQueueManager;
import org.apache.druid.server.coordinator.loading.SegmentLoadingConfig;
import org.apache.druid.server.coordinator.loading.SegmentReplicaCount;
import org.apache.druid.server.coordinator.loading.SegmentReplicantLookup;
import org.apache.druid.server.coordinator.rules.LoadRule;
import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
import org.apache.druid.server.coordinator.stats.CoordinatorStat;
Expand Down Expand Up @@ -258,11 +262,11 @@ public Object2IntMap<String> getDatasourceToUnavailableSegmentCount()

final Iterable<DataSegment> dataSegments = segmentsMetadataManager.iterateAllUsedSegments();
for (DataSegment segment : dataSegments) {
if (segmentReplicantLookup.getLoadedReplicas(segment.getId(), true) == 0) {
datasourceToUnavailableSegments.addTo(segment.getDataSource(), 1);
} else {
datasourceToUnavailableSegments.addTo(segment.getDataSource(), 0);
}
SegmentReplicaCount replicaCount = segmentReplicantLookup.getReplicaCountsInCluster(segment.getId());
datasourceToUnavailableSegments.addTo(
segment.getDataSource(),
replicaCount.loaded() == 0 ? 1 : 0
);
}

return datasourceToUnavailableSegments;
Expand Down Expand Up @@ -801,22 +805,23 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
startPeonsForNewServers(currentServers);
stopPeonsForDisappearedServers(currentServers);

final DruidCluster cluster = prepareCluster(params.getCoordinatorDynamicConfig(), currentServers);
cancelLoadsOnDecommissioningServers(cluster);

final CoordinatorDynamicConfig dynamicConfig = params.getCoordinatorDynamicConfig();
final SegmentLoadingConfig segmentLoadingConfig = params.getSegmentLoadingConfig();

final DruidCluster cluster = prepareCluster(dynamicConfig, segmentLoadingConfig, currentServers);
cancelLoadsOnDecommissioningServers(cluster);

initBalancerExecutor();
final BalancerStrategy balancerStrategy = balancerStrategyFactory.createBalancerStrategy(balancerExec);
log.info(
"Using balancer strategy [%s] with round-robin assignment [%s] and debug dimensions [%s].",
balancerStrategy.getClass().getSimpleName(),
dynamicConfig.isUseRoundRobinSegmentAssignment(), dynamicConfig.getDebugDimensions()
segmentLoadingConfig.isUseRoundRobinSegmentAssignment(),
dynamicConfig.getDebugDimensions()
);

params = params.buildFromExisting()
.withDruidCluster(cluster)
.withDynamicConfigs(recomputeDynamicConfig(params))
.withBalancerStrategy(balancerStrategy)
.withSegmentAssignerUsing(loadQueueManager)
.build();
Expand All @@ -826,44 +831,6 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
return params;
}

/**
* Recomputes dynamic config values if {@code smartLoadQueue} is enabled.
*/
private CoordinatorDynamicConfig recomputeDynamicConfig(DruidCoordinatorRuntimeParams params)
{
final CoordinatorDynamicConfig dynamicConfig = params.getCoordinatorDynamicConfig();
if (!dynamicConfig.isSmartSegmentLoading()) {
return dynamicConfig;
}

// Impose a lower bound on both replicationThrottleLimit and maxSegmentsToMove
final int throttlePercentage = 2;
final int replicationThrottleLimit = Math.max(
100,
params.getUsedSegments().size() * throttlePercentage / 100
);

// Impose an upper bound on maxSegmentsToMove to ensure that coordinator
// run times are bounded. This limit can be relaxed as performance of
// the CostBalancerStrategy.computeCost() is improved.
final int maxSegmentsToMove = Math.min(1000, replicationThrottleLimit);

log.info(
"Smart segment loading is enabled. Recomputed replicationThrottleLimit"
+ " [%d] (%d%% of used segments) and maxSegmentsToMove [%d].",
replicationThrottleLimit, throttlePercentage, maxSegmentsToMove
);

return CoordinatorDynamicConfig.builder()
.withMaxSegmentsInNodeLoadingQueue(0)
.withReplicationThrottleLimit(replicationThrottleLimit)
.withMaxSegmentsToMove(maxSegmentsToMove)
.withUseRoundRobinSegmentAssignment(true)
.withUseBatchedSegmentSampler(true)
.withEmitBalancingStats(false)
.build(dynamicConfig);
}

/**
* Cancels all load/move operations on decommissioning servers. This should
* be done before initializing the SegmentReplicantLookup so that
Expand Down Expand Up @@ -931,7 +898,11 @@ void startPeonsForNewServers(List<ImmutableDruidServer> currentServers)
}
}

DruidCluster prepareCluster(CoordinatorDynamicConfig dynamicConfig, List<ImmutableDruidServer> currentServers)
DruidCluster prepareCluster(
CoordinatorDynamicConfig dynamicConfig,
SegmentLoadingConfig segmentLoadingConfig,
List<ImmutableDruidServer> currentServers
)
{
final Set<String> decommissioningServers = dynamicConfig.getDecommissioningNodes();
final DruidCluster.Builder cluster = DruidCluster.builder();
Expand All @@ -941,8 +912,8 @@ DruidCluster prepareCluster(CoordinatorDynamicConfig dynamicConfig, List<Immutab
server,
loadManagementPeons.get(server.getName()),
decommissioningServers.contains(server.getHost()),
dynamicConfig.getMaxSegmentsInNodeLoadingQueue(),
dynamicConfig.getReplicantLifetime()
segmentLoadingConfig.getMaxSegmentsInLoadQueue(),
segmentLoadingConfig.getMaxLifetimeInLoadQueue()
)
);
}
Expand Down
Loading