From f93a2898bbac2ccf407851d3d5a95522f5212e31 Mon Sep 17 00:00:00 2001 From: Daniel Guo Date: Mon, 27 Jan 2025 09:25:11 -0500 Subject: [PATCH] add gate for requesting streams during bootstrap --- .../cassandra/service/StorageService.java | 27 ++++++++++++++++++- .../service/StorageServiceMBean.java | 5 ++++ 2 files changed, 31 insertions(+), 1 deletion(-) diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 65e31db29b..ea6562e072 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -110,8 +110,10 @@ public class StorageService extends NotificationBroadcasterSupport implements IE { private static final Logger logger = LoggerFactory.getLogger(StorageService.class); private static final boolean DISABLE_WAIT_TO_BOOTSTRAP = Boolean.getBoolean("palantir_cassandra.disable_wait_to_bootstrap"); + private static final boolean DISABLE_WAIT_TO_REQUEST_STREAMS = Boolean.getBoolean("palantir_cassandra.disable_wait_to_request_streams"); private static final boolean DISABLE_WAIT_TO_FINISH_BOOTSTRAP = Boolean.getBoolean("palantir_cassandra.disable_wait_to_finish_bootstrap"); private static final Integer BOOTSTRAP_DISK_USAGE_THRESHOLD = Integer.getInteger("palantir_cassandra.bootstrap_disk_usage_threshold_percentage"); + private static final Integer STREAMS_REQUEST_SAFETY_CHECK_GRACE_PERIOD_MINUTES = Integer.getInteger("palantir_cassandra.streams_request_safety_check_grace_period_minutes", 30); private static final Integer BOOTSTRAP_SAFETY_CHECK_GRACE_PERIOD_MINUTES = Integer.getInteger("palantir_cassandra.bootstrap_safety_check_grace_period_minutes", 30); public static final int RING_DELAY = getRingDelay(); // delay after which we assume ring has stablized @@ -124,6 +126,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE private final List bootstrapListeners = new CopyOnWriteArrayList<>(); private final Condition startBootstrapCondition = new SimpleCondition(DISABLE_WAIT_TO_BOOTSTRAP); + private final Condition startRequestStreamsCondition = new SimpleCondition(DISABLE_WAIT_TO_REQUEST_STREAMS); private final Condition finishBootstrapCondition = new SimpleCondition(DISABLE_WAIT_TO_FINISH_BOOTSTRAP); /** @@ -201,7 +204,7 @@ public Collection> getPrimaryRangesWithinDC(String keyspace) private double traceProbability = 0.0; @VisibleForTesting - static enum Mode { STARTING, NORMAL, JOINING, LEAVING, DECOMMISSIONED, MOVING, DRAINING, DRAINED, ZOMBIE, NON_TRANSIENT_ERROR, TRANSIENT_ERROR, WAITING_TO_BOOTSTRAP, WAITING_TO_FINISH_BOOTSTRAP, DISABLED } + static enum Mode { STARTING, NORMAL, JOINING, LEAVING, DECOMMISSIONED, MOVING, DRAINING, DRAINED, ZOMBIE, NON_TRANSIENT_ERROR, TRANSIENT_ERROR, WAITING_TO_BOOTSTRAP, WAITING_TO_REQUEST_STREAMS, WAITING_TO_FINISH_BOOTSTRAP, DISABLED } private volatile Mode operationMode = Mode.STARTING; /* Used for tracking drain progress */ @@ -1586,6 +1589,22 @@ private boolean bootstrap(final Collection tokens) SystemKeyspace.resetAvailableRanges(); } + try + { + setMode(Mode.WAITING_TO_REQUEST_STREAMS, "Awaiting call to proceed with requesting streams during bootstrap", true); + boolean timeoutExceeded = !startRequestStreamsCondition.await(STREAMS_REQUEST_SAFETY_CHECK_GRACE_PERIOD_MINUTES, MINUTES); + if (timeoutExceeded) + { + logger.error("Start signal to request streams was not given within 30 minutes. Streams request safety check failed."); + recordBootstrapErrorAndThrow("streamsRequestSafetyCheckFailed"); + } + } + catch (InterruptedException e) + { + throw new AssertionError(e); + } + logger.info("Received signal to start requesting streams."); + setMode(Mode.JOINING, "Starting to bootstrap...", true); BootStrapper bootstrapper = new BootStrapper(FBUtilities.getBroadcastAddress(), tokens, tokenMetadata); bootstrapper.addProgressListener(progressSupport); @@ -1695,6 +1714,12 @@ public void startBootstrap() startBootstrapCondition.signalAll(); } + @Override + public void startRequestingStreams() + { + startRequestStreamsCondition.signalAll(); + } + @Override public void finishBootstrap() { diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java index 318549ed94..6ce32d25f0 100644 --- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java +++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java @@ -815,6 +815,11 @@ public enum ProgressState */ public void startBootstrap(); + /** + * Send signal to start requesting streams for bootstrap + */ + public void startRequestingStreams(); + /** * Send signal to finalize the bootstrap process and finish joining the ring. */