Skip to content

Commit

Permalink
storcon: handle entire cluster going unavailable correctly (#8060)
Browse files Browse the repository at this point in the history
## Problem
A period of unavailability for all pageservers in a cluster produced the
following fallout in staging:
all tenants became detached and required manual operation to re-attach.
Manually restarting
the storage controller re-attached all tenants due to a consistency bug.

Turns out there are two related bugs which caused the issue:
1. Pageserver re-attach can be processed before the first heartbeat.
Hence, when handling
the availability delta produced by the heartbeater,
`Node::get_availability_transition` claims
that there's no need to reconfigure the node.
2. We would still attempt to reschedule tenant shards when handling
offline transitions even
if the entire cluster is down. This puts tenant shards into a state
where the reconciler believes
they have to be detached (no pageserver shows up in their intent state).
This is doubly wrong
because we don't mark the tenant shards as detached in the database,
thus causing memory vs
database consistency issues. Luckily, this bug allowed all tenant shards
to re-attach after restart.

## Summary of changes
* For (1), abuse the fact that re-attach requests do not contain an
utilisation score and use that
to differentiate from a node that replied to heartbeats.
* For (2), introduce a special case that skips any rescheduling if the
entire cluster is unavailable.
* Update the storage controller heartbeat test with an extra scenario
where the entire cluster goes
for lunch.

Fixes #8044
  • Loading branch information
VladLazar authored Jun 17, 2024
1 parent 2ba4145 commit 16d8012
Show file tree
Hide file tree
Showing 4 changed files with 141 additions and 57 deletions.
4 changes: 4 additions & 0 deletions storage_controller/src/heartbeater.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ pub(crate) enum PageserverState {
Available {
last_seen_at: Instant,
utilization: PageserverUtilization,
new: bool,
},
Offline,
}
Expand Down Expand Up @@ -127,6 +128,7 @@ impl HeartbeaterTask {
heartbeat_futs.push({
let jwt_token = self.jwt_token.clone();
let cancel = self.cancel.clone();
let new_node = !self.state.contains_key(node_id);

// Clone the node and mark it as available such that the request
// goes through to the pageserver even when the node is marked offline.
Expand Down Expand Up @@ -159,6 +161,7 @@ impl HeartbeaterTask {
PageserverState::Available {
last_seen_at: Instant::now(),
utilization,
new: new_node,
}
} else {
PageserverState::Offline
Expand Down Expand Up @@ -220,6 +223,7 @@ impl HeartbeaterTask {
}
},
Vacant(_) => {
// This is a new node. Don't generate a delta for it.
deltas.push((node_id, ps_state.clone()));
}
}
Expand Down
12 changes: 11 additions & 1 deletion storage_controller/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{str::FromStr, time::Duration};
use pageserver_api::{
controller_api::{
NodeAvailability, NodeDescribeResponse, NodeRegisterRequest, NodeSchedulingPolicy,
TenantLocateResponseShard,
TenantLocateResponseShard, UtilizationScore,
},
shard::TenantShardId,
};
Expand Down Expand Up @@ -116,6 +116,16 @@ impl Node {
match (self.availability, availability) {
(Offline, Active(_)) => ToActive,
(Active(_), Offline) => ToOffline,
// Consider the case when the storage controller handles the re-attach of a node
// before the heartbeats detect that the node is back online. We still need
// [`Service::node_configure`] to attempt reconciliations for shards with an
// unknown observed location.
// The unsavoury match arm below handles this situation.
(Active(lhs), Active(rhs))
if lhs == UtilizationScore::worst() && rhs < UtilizationScore::worst() =>
{
ToActive
}
_ => Unchanged,
}
}
Expand Down
88 changes: 68 additions & 20 deletions storage_controller/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::{
id_lock_map::{trace_exclusive_lock, trace_shared_lock, IdLockMap, WrappedWriteGuard},
persistence::{AbortShardSplitStatus, TenantFilter},
reconciler::{ReconcileError, ReconcileUnits},
scheduler::{ScheduleContext, ScheduleMode},
scheduler::{MaySchedule, ScheduleContext, ScheduleMode},
tenant_shard::{
MigrateAttachment, ReconcileNeeded, ScheduleOptimization, ScheduleOptimizationAction,
},
Expand Down Expand Up @@ -747,29 +747,61 @@ impl Service {
let res = self.heartbeater.heartbeat(nodes).await;
if let Ok(deltas) = res {
for (node_id, state) in deltas.0 {
let new_availability = match state {
PageserverState::Available { utilization, .. } => NodeAvailability::Active(
UtilizationScore(utilization.utilization_score),
let (new_node, new_availability) = match state {
PageserverState::Available {
utilization, new, ..
} => (
new,
NodeAvailability::Active(UtilizationScore(
utilization.utilization_score,
)),
),
PageserverState::Offline => NodeAvailability::Offline,
PageserverState::Offline => (false, NodeAvailability::Offline),
};
let res = self
.node_configure(node_id, Some(new_availability), None)
.await;

match res {
Ok(()) => {}
Err(ApiError::NotFound(_)) => {
// This should be rare, but legitimate since the heartbeats are done
// on a snapshot of the nodes.
tracing::info!("Node {} was not found after heartbeat round", node_id);
if new_node {
// When the heartbeats detect a newly added node, we don't wish
// to attempt to reconcile the shards assigned to it. The node
// is likely handling it's re-attach response, so reconciling now
// would be counterproductive.
//
// Instead, update the in-memory state with the details learned about the
// node.
let mut locked = self.inner.write().unwrap();
let (nodes, _tenants, scheduler) = locked.parts_mut();

let mut new_nodes = (**nodes).clone();

if let Some(node) = new_nodes.get_mut(&node_id) {
node.set_availability(new_availability);
scheduler.node_upsert(node);
}
Err(err) => {
tracing::error!(
"Failed to update node {} after heartbeat round: {}",
node_id,
err
);

locked.nodes = Arc::new(new_nodes);
} else {
// This is the code path for geniune availability transitions (i.e node
// goes unavailable and/or comes back online).
let res = self
.node_configure(node_id, Some(new_availability), None)
.await;

match res {
Ok(()) => {}
Err(ApiError::NotFound(_)) => {
// This should be rare, but legitimate since the heartbeats are done
// on a snapshot of the nodes.
tracing::info!(
"Node {} was not found after heartbeat round",
node_id
);
}
Err(err) => {
tracing::error!(
"Failed to update node {} after heartbeat round: {}",
node_id,
err
);
}
}
}
}
Expand Down Expand Up @@ -4316,6 +4348,16 @@ impl Service {
continue;
}

if !new_nodes
.values()
.any(|n| matches!(n.may_schedule(), MaySchedule::Yes(_)))
{
// Special case for when all nodes are unavailable and/or unschedulable: there is no point
// trying to reschedule since there's nowhere else to go. Without this
// branch we incorrectly detach tenants in response to node unavailability.
continue;
}

if tenant_shard.intent.demote_attached(scheduler, node_id) {
tenant_shard.sequence = tenant_shard.sequence.next();

Expand Down Expand Up @@ -4353,6 +4395,12 @@ impl Service {
// When a node comes back online, we must reconcile any tenant that has a None observed
// location on the node.
for tenant_shard in locked.tenants.values_mut() {
// If a reconciliation is already in progress, rely on the previous scheduling
// decision and skip triggering a new reconciliation.
if tenant_shard.reconciler.is_some() {
continue;
}

if let Some(observed_loc) = tenant_shard.observed.locations.get_mut(&node_id) {
if observed_loc.conf.is_none() {
self.maybe_reconcile_shard(tenant_shard, &new_nodes);
Expand Down
94 changes: 58 additions & 36 deletions test_runner/regress/test_storage_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -934,19 +934,27 @@ def apply(self, env: NeonEnv):
def clear(self, env: NeonEnv):
raise NotImplementedError()

def nodes(self):
raise NotImplementedError()


class NodeStop(Failure):
def __init__(self, pageserver_id, immediate):
self.pageserver_id = pageserver_id
def __init__(self, pageserver_ids, immediate):
self.pageserver_ids = pageserver_ids
self.immediate = immediate

def apply(self, env: NeonEnv):
pageserver = env.get_pageserver(self.pageserver_id)
pageserver.stop(immediate=self.immediate)
for ps_id in self.pageserver_ids:
pageserver = env.get_pageserver(ps_id)
pageserver.stop(immediate=self.immediate)

def clear(self, env: NeonEnv):
pageserver = env.get_pageserver(self.pageserver_id)
pageserver.start()
for ps_id in self.pageserver_ids:
pageserver = env.get_pageserver(ps_id)
pageserver.start()

def nodes(self):
return self.pageserver_ids


class PageserverFailpoint(Failure):
Expand All @@ -962,6 +970,9 @@ def clear(self, env: NeonEnv):
pageserver = env.get_pageserver(self.pageserver_id)
pageserver.http_client().configure_failpoints((self.failpoint, "off"))

def nodes(self):
return [self.pageserver_id]


def build_node_to_tenants_map(env: NeonEnv) -> dict[int, list[TenantId]]:
tenants = env.storage_controller.tenant_list()
Expand All @@ -985,8 +996,9 @@ def build_node_to_tenants_map(env: NeonEnv) -> dict[int, list[TenantId]]:
@pytest.mark.parametrize(
"failure",
[
NodeStop(pageserver_id=1, immediate=False),
NodeStop(pageserver_id=1, immediate=True),
NodeStop(pageserver_ids=[1], immediate=False),
NodeStop(pageserver_ids=[1], immediate=True),
NodeStop(pageserver_ids=[1, 2], immediate=True),
PageserverFailpoint(pageserver_id=1, failpoint="get-utilization-http-handler"),
],
)
Expand Down Expand Up @@ -1039,65 +1051,75 @@ def tenants_placed():
wait_until(10, 1, tenants_placed)

# ... then we apply the failure
offline_node_id = failure.pageserver_id
online_node_id = (set(range(1, len(env.pageservers) + 1)) - {offline_node_id}).pop()
env.get_pageserver(offline_node_id).allowed_errors.append(
# In the case of the failpoint failure, the impacted pageserver
# still believes it has the tenant attached since location
# config calls into it will fail due to being marked offline.
".*Dropped remote consistent LSN updates.*",
)
offline_node_ids = set(failure.nodes())
online_node_ids = set(range(1, len(env.pageservers) + 1)) - offline_node_ids

for node_id in offline_node_ids:
env.get_pageserver(node_id).allowed_errors.append(
# In the case of the failpoint failure, the impacted pageserver
# still believes it has the tenant attached since location
# config calls into it will fail due to being marked offline.
".*Dropped remote consistent LSN updates.*",
)

if len(offline_node_ids) > 1:
env.get_pageserver(node_id).allowed_errors.append(
".*Scheduling error when marking pageserver.*offline.*",
)

failure.apply(env)

# ... expecting the heartbeats to mark it offline
def node_offline():
def nodes_offline():
nodes = env.storage_controller.node_list()
log.info(f"{nodes=}")
target = next(n for n in nodes if n["id"] == offline_node_id)
assert target["availability"] == "Offline"
for node in nodes:
if node["id"] in offline_node_ids:
assert node["availability"] == "Offline"

# A node is considered offline if the last successful heartbeat
# was more than 10 seconds ago (hardcoded in the storage controller).
wait_until(20, 1, node_offline)
wait_until(20, 1, nodes_offline)

# .. expecting the tenant on the offline node to be migrated
def tenant_migrated():
if len(online_node_ids) == 0:
time.sleep(5)
return

node_to_tenants = build_node_to_tenants_map(env)
log.info(f"{node_to_tenants=}")
assert set(node_to_tenants[online_node_id]) == set(tenant_ids)

observed_tenants = set()
for node_id in online_node_ids:
observed_tenants |= set(node_to_tenants[node_id])

assert observed_tenants == set(tenant_ids)

wait_until(10, 1, tenant_migrated)

# ... then we clear the failure
failure.clear(env)

# ... expecting the offline node to become active again
def node_online():
def nodes_online():
nodes = env.storage_controller.node_list()
target = next(n for n in nodes if n["id"] == offline_node_id)
assert target["availability"] == "Active"
for node in nodes:
if node["id"] in online_node_ids:
assert node["availability"] == "Active"

wait_until(10, 1, node_online)
wait_until(10, 1, nodes_online)

time.sleep(5)

# ... then we create a new tenant
tid = TenantId.generate()
env.storage_controller.tenant_create(tid)

# ... expecting it to be placed on the node that just came back online
tenants = env.storage_controller.tenant_list()
newest_tenant = next(t for t in tenants if t["tenant_shard_id"] == str(tid))
locations = list(newest_tenant["observed"]["locations"].keys())
locations = [int(node_id) for node_id in locations]
assert locations == [offline_node_id]
node_to_tenants = build_node_to_tenants_map(env)
log.info(f"Back online: {node_to_tenants=}")

# ... expecting the storage controller to reach a consistent state
def storage_controller_consistent():
env.storage_controller.consistency_check()

wait_until(10, 1, storage_controller_consistent)
wait_until(30, 1, storage_controller_consistent)


def test_storage_controller_re_attach(neon_env_builder: NeonEnvBuilder):
Expand Down

1 comment on commit 16d8012

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

3310 tests run: 3172 passed, 0 failed, 138 skipped (full report)


Flaky tests (1)

Postgres 16

  • test_metric_collection: debug

Code coverage* (full report)

  • functions: 32.4% (6831 of 21065 functions)
  • lines: 50.0% (53173 of 106317 lines)

* collected from Rust tests only


The comment gets automatically updated with the latest test results
16d8012 at 2024-06-17T12:01:30.913Z :recycle:

Please sign in to comment.