Skip to content

Commit

Permalink
add gate for requesting streams during bootstrap
Browse files Browse the repository at this point in the history
  • Loading branch information
Daniel Guo committed Jan 27, 2025
1 parent 1e1ec72 commit f93a289
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 1 deletion.
27 changes: 26 additions & 1 deletion src/java/org/apache/cassandra/service/StorageService.java
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,10 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
{
private static final Logger logger = LoggerFactory.getLogger(StorageService.class);
private static final boolean DISABLE_WAIT_TO_BOOTSTRAP = Boolean.getBoolean("palantir_cassandra.disable_wait_to_bootstrap");
private static final boolean DISABLE_WAIT_TO_REQUEST_STREAMS = Boolean.getBoolean("palantir_cassandra.disable_wait_to_request_streams");
private static final boolean DISABLE_WAIT_TO_FINISH_BOOTSTRAP = Boolean.getBoolean("palantir_cassandra.disable_wait_to_finish_bootstrap");
private static final Integer BOOTSTRAP_DISK_USAGE_THRESHOLD = Integer.getInteger("palantir_cassandra.bootstrap_disk_usage_threshold_percentage");
private static final Integer STREAMS_REQUEST_SAFETY_CHECK_GRACE_PERIOD_MINUTES = Integer.getInteger("palantir_cassandra.streams_request_safety_check_grace_period_minutes", 30);
private static final Integer BOOTSTRAP_SAFETY_CHECK_GRACE_PERIOD_MINUTES = Integer.getInteger("palantir_cassandra.bootstrap_safety_check_grace_period_minutes", 30);

public static final int RING_DELAY = getRingDelay(); // delay after which we assume ring has stablized
Expand All @@ -124,6 +126,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
private final List<ProgressListener> bootstrapListeners = new CopyOnWriteArrayList<>();

private final Condition startBootstrapCondition = new SimpleCondition(DISABLE_WAIT_TO_BOOTSTRAP);
private final Condition startRequestStreamsCondition = new SimpleCondition(DISABLE_WAIT_TO_REQUEST_STREAMS);
private final Condition finishBootstrapCondition = new SimpleCondition(DISABLE_WAIT_TO_FINISH_BOOTSTRAP);

/**
Expand Down Expand Up @@ -201,7 +204,7 @@ public Collection<Range<Token>> getPrimaryRangesWithinDC(String keyspace)
private double traceProbability = 0.0;

@VisibleForTesting
static enum Mode { STARTING, NORMAL, JOINING, LEAVING, DECOMMISSIONED, MOVING, DRAINING, DRAINED, ZOMBIE, NON_TRANSIENT_ERROR, TRANSIENT_ERROR, WAITING_TO_BOOTSTRAP, WAITING_TO_FINISH_BOOTSTRAP, DISABLED }
static enum Mode { STARTING, NORMAL, JOINING, LEAVING, DECOMMISSIONED, MOVING, DRAINING, DRAINED, ZOMBIE, NON_TRANSIENT_ERROR, TRANSIENT_ERROR, WAITING_TO_BOOTSTRAP, WAITING_TO_REQUEST_STREAMS, WAITING_TO_FINISH_BOOTSTRAP, DISABLED }
private volatile Mode operationMode = Mode.STARTING;

/* Used for tracking drain progress */
Expand Down Expand Up @@ -1586,6 +1589,22 @@ private boolean bootstrap(final Collection<Token> tokens)
SystemKeyspace.resetAvailableRanges();
}

try
{
setMode(Mode.WAITING_TO_REQUEST_STREAMS, "Awaiting call to proceed with requesting streams during bootstrap", true);
boolean timeoutExceeded = !startRequestStreamsCondition.await(STREAMS_REQUEST_SAFETY_CHECK_GRACE_PERIOD_MINUTES, MINUTES);
if (timeoutExceeded)
{
logger.error("Start signal to request streams was not given within 30 minutes. Streams request safety check failed.");
recordBootstrapErrorAndThrow("streamsRequestSafetyCheckFailed");
}
}
catch (InterruptedException e)
{
throw new AssertionError(e);
}
logger.info("Received signal to start requesting streams.");

setMode(Mode.JOINING, "Starting to bootstrap...", true);
BootStrapper bootstrapper = new BootStrapper(FBUtilities.getBroadcastAddress(), tokens, tokenMetadata);
bootstrapper.addProgressListener(progressSupport);
Expand Down Expand Up @@ -1695,6 +1714,12 @@ public void startBootstrap()
startBootstrapCondition.signalAll();
}

@Override
public void startRequestingStreams()
{
startRequestStreamsCondition.signalAll();
}

@Override
public void finishBootstrap()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -815,6 +815,11 @@ public enum ProgressState
*/
public void startBootstrap();

/**
* Send signal to start requesting streams for bootstrap
*/
public void startRequestingStreams();

/**
* Send signal to finalize the bootstrap process and finish joining the ring.
*/
Expand Down

0 comments on commit f93a289

Please sign in to comment.