Skip to content

Commit

Permalink
storage controller: use proper ScheduleContext when evacuating a node (
Browse files Browse the repository at this point in the history
…#9908)

## Problem

When picking locations for a shard, we should use a ScheduleContext that
includes all the other shards in the tenant, so that we apply proper
anti-affinity between shards. If we don't do this, then it can lead to
unstable scheduling, where we place a shard somewhere that the optimizer
will then immediately move it away from.

We didn't always do this, because it was a bit awkward to accumulate the
context for a tenant rather than just walking tenants.

This was a TODO in `handle_node_availability_transition`:
```
                        // TODO: populate a ScheduleContext including all shards in the same tenant_id (only matters
                        // for tenants without secondary locations: if they have a secondary location, then this
                        // schedule() call is just promoting an existing secondary)
```

This is a precursor to #8264,
where the current imperfect scheduling during node evacuation hampers
testing.

## Summary of changes

- Add an iterator type that yields each shard along with a
schedulecontext that includes all the other shards from the same tenant
- Use the iterator to replace hand-crafted logic in optimize_all_plan
(functionally identical)
- Use the iterator in `handle_node_availability_transition` to apply
proper anti-affinity during node evacuation.
  • Loading branch information
jcsp authored and awarus committed Dec 5, 2024
1 parent e61ec94 commit 008616c
Show file tree
Hide file tree
Showing 4 changed files with 245 additions and 122 deletions.
17 changes: 15 additions & 2 deletions storage_controller/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ impl std::ops::Add for AffinityScore {

/// Hint for whether this is a sincere attempt to schedule, or a speculative
/// check for where we _would_ schedule (done during optimization)
#[derive(Debug)]
#[derive(Debug, Clone)]
pub(crate) enum ScheduleMode {
Normal,
Speculative,
Expand All @@ -319,7 +319,7 @@ impl Default for ScheduleMode {

// For carrying state between multiple calls to [`TenantShard::schedule`], e.g. when calling
// it for many shards in the same tenant.
#[derive(Debug, Default)]
#[derive(Debug, Default, Clone)]
pub(crate) struct ScheduleContext {
/// Sparse map of nodes: omitting a node implicitly makes its affinity [`AffinityScore::FREE`]
pub(crate) nodes: HashMap<NodeId, AffinityScore>,
Expand All @@ -331,6 +331,14 @@ pub(crate) struct ScheduleContext {
}

impl ScheduleContext {
pub(crate) fn new(mode: ScheduleMode) -> Self {
Self {
nodes: HashMap::new(),
attached_nodes: HashMap::new(),
mode,
}
}

/// Input is a list of nodes we would like to avoid using again within this context. The more
/// times a node is passed into this call, the less inclined we are to use it.
pub(crate) fn avoid(&mut self, nodes: &[NodeId]) {
Expand All @@ -355,6 +363,11 @@ impl ScheduleContext {
pub(crate) fn get_node_attachments(&self, node_id: NodeId) -> usize {
self.attached_nodes.get(&node_id).copied().unwrap_or(0)
}

#[cfg(test)]
pub(crate) fn attach_count(&self) -> usize {
self.attached_nodes.values().sum()
}
}

pub(crate) enum RefCountUpdate {
Expand Down
200 changes: 82 additions & 118 deletions storage_controller/src/service.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
pub mod chaos_injector;
mod context_iterator;

use hyper::Uri;
use std::{
borrow::Cow,
Expand Down Expand Up @@ -95,7 +98,7 @@ use crate::{
},
};

pub mod chaos_injector;
use context_iterator::TenantShardContextIterator;

// For operations that should be quick, like attaching a new tenant
const SHORT_RECONCILE_TIMEOUT: Duration = Duration::from_secs(5);
Expand Down Expand Up @@ -5498,49 +5501,51 @@ impl Service {

let mut tenants_affected: usize = 0;

for (tenant_shard_id, tenant_shard) in tenants {
if let Some(observed_loc) = tenant_shard.observed.locations.get_mut(&node_id) {
// When a node goes offline, we set its observed configuration to None, indicating unknown: we will
// not assume our knowledge of the node's configuration is accurate until it comes back online
observed_loc.conf = None;
}

if nodes.len() == 1 {
// Special case for single-node cluster: there is no point trying to reschedule
// any tenant shards: avoid doing so, in order to avoid spewing warnings about
// failures to schedule them.
continue;
}
for (_tenant_id, mut schedule_context, shards) in
TenantShardContextIterator::new(tenants, ScheduleMode::Normal)
{
for tenant_shard in shards {
let tenant_shard_id = tenant_shard.tenant_shard_id;
if let Some(observed_loc) =
tenant_shard.observed.locations.get_mut(&node_id)
{
// When a node goes offline, we set its observed configuration to None, indicating unknown: we will
// not assume our knowledge of the node's configuration is accurate until it comes back online
observed_loc.conf = None;
}

if !nodes
.values()
.any(|n| matches!(n.may_schedule(), MaySchedule::Yes(_)))
{
// Special case for when all nodes are unavailable and/or unschedulable: there is no point
// trying to reschedule since there's nowhere else to go. Without this
// branch we incorrectly detach tenants in response to node unavailability.
continue;
}
if nodes.len() == 1 {
// Special case for single-node cluster: there is no point trying to reschedule
// any tenant shards: avoid doing so, in order to avoid spewing warnings about
// failures to schedule them.
continue;
}

if tenant_shard.intent.demote_attached(scheduler, node_id) {
tenant_shard.sequence = tenant_shard.sequence.next();
if !nodes
.values()
.any(|n| matches!(n.may_schedule(), MaySchedule::Yes(_)))
{
// Special case for when all nodes are unavailable and/or unschedulable: there is no point
// trying to reschedule since there's nowhere else to go. Without this
// branch we incorrectly detach tenants in response to node unavailability.
continue;
}

// TODO: populate a ScheduleContext including all shards in the same tenant_id (only matters
// for tenants without secondary locations: if they have a secondary location, then this
// schedule() call is just promoting an existing secondary)
let mut schedule_context = ScheduleContext::default();
if tenant_shard.intent.demote_attached(scheduler, node_id) {
tenant_shard.sequence = tenant_shard.sequence.next();

match tenant_shard.schedule(scheduler, &mut schedule_context) {
Err(e) => {
// It is possible that some tenants will become unschedulable when too many pageservers
// go offline: in this case there isn't much we can do other than make the issue observable.
// TODO: give TenantShard a scheduling error attribute to be queried later.
tracing::warn!(%tenant_shard_id, "Scheduling error when marking pageserver {} offline: {e}", node_id);
}
Ok(()) => {
if self.maybe_reconcile_shard(tenant_shard, nodes).is_some() {
tenants_affected += 1;
};
match tenant_shard.schedule(scheduler, &mut schedule_context) {
Err(e) => {
// It is possible that some tenants will become unschedulable when too many pageservers
// go offline: in this case there isn't much we can do other than make the issue observable.
// TODO: give TenantShard a scheduling error attribute to be queried later.
tracing::warn!(%tenant_shard_id, "Scheduling error when marking pageserver {} offline: {e}", node_id);
}
Ok(()) => {
if self.maybe_reconcile_shard(tenant_shard, nodes).is_some() {
tenants_affected += 1;
};
}
}
}
}
Expand Down Expand Up @@ -6011,14 +6016,8 @@ impl Service {
let (nodes, tenants, _scheduler) = locked.parts_mut();
let pageservers = nodes.clone();

let mut schedule_context = ScheduleContext::default();

let mut reconciles_spawned = 0;
for (tenant_shard_id, shard) in tenants.iter_mut() {
if tenant_shard_id.is_shard_zero() {
schedule_context = ScheduleContext::default();
}

for shard in tenants.values_mut() {
// Skip checking if this shard is already enqueued for reconciliation
if shard.delayed_reconcile && self.reconciler_concurrency.available_permits() == 0 {
// If there is something delayed, then return a nonzero count so that
Expand All @@ -6033,8 +6032,6 @@ impl Service {
if self.maybe_reconcile_shard(shard, &pageservers).is_some() {
reconciles_spawned += 1;
}

schedule_context.avoid(&shard.intent.all_pageservers());
}

reconciles_spawned
Expand Down Expand Up @@ -6103,95 +6100,62 @@ impl Service {
}

fn optimize_all_plan(&self) -> Vec<(TenantShardId, ScheduleOptimization)> {
let mut schedule_context = ScheduleContext::default();

let mut tenant_shards: Vec<&TenantShard> = Vec::new();

// How many candidate optimizations we will generate, before evaluating them for readniess: setting
// this higher than the execution limit gives us a chance to execute some work even if the first
// few optimizations we find are not ready.
const MAX_OPTIMIZATIONS_PLAN_PER_PASS: usize = 8;

let mut work = Vec::new();

let mut locked = self.inner.write().unwrap();
let (nodes, tenants, scheduler) = locked.parts_mut();
for (tenant_shard_id, shard) in tenants.iter() {
if tenant_shard_id.is_shard_zero() {
// Reset accumulators on the first shard in a tenant
schedule_context = ScheduleContext::default();
schedule_context.mode = ScheduleMode::Speculative;
tenant_shards.clear();
}

if work.len() >= MAX_OPTIMIZATIONS_PLAN_PER_PASS {
break;
}

match shard.get_scheduling_policy() {
ShardSchedulingPolicy::Active => {
// Ok to do optimization
for (_tenant_id, schedule_context, shards) in
TenantShardContextIterator::new(tenants, ScheduleMode::Speculative)
{
for shard in shards {
if work.len() >= MAX_OPTIMIZATIONS_PLAN_PER_PASS {
break;
}
ShardSchedulingPolicy::Essential
| ShardSchedulingPolicy::Pause
| ShardSchedulingPolicy::Stop => {
// Policy prevents optimizing this shard.
continue;
match shard.get_scheduling_policy() {
ShardSchedulingPolicy::Active => {
// Ok to do optimization
}
ShardSchedulingPolicy::Essential
| ShardSchedulingPolicy::Pause
| ShardSchedulingPolicy::Stop => {
// Policy prevents optimizing this shard.
continue;
}
}
}

// Accumulate the schedule context for all the shards in a tenant: we must have
// the total view of all shards before we can try to optimize any of them.
schedule_context.avoid(&shard.intent.all_pageservers());
if let Some(attached) = shard.intent.get_attached() {
schedule_context.push_attached(*attached);
}
tenant_shards.push(shard);

// Once we have seen the last shard in the tenant, proceed to search across all shards
// in the tenant for optimizations
if shard.shard.number.0 == shard.shard.count.count() - 1 {
if tenant_shards.iter().any(|s| s.reconciler.is_some()) {
if !matches!(shard.splitting, SplitState::Idle)
|| matches!(shard.policy, PlacementPolicy::Detached)
|| shard.reconciler.is_some()
{
// Do not start any optimizations while another change to the tenant is ongoing: this
// is not necessary for correctness, but simplifies operations and implicitly throttles
// optimization changes to happen in a "trickle" over time.
continue;
}

if tenant_shards.iter().any(|s| {
!matches!(s.splitting, SplitState::Idle)
|| matches!(s.policy, PlacementPolicy::Detached)
}) {
// Never attempt to optimize a tenant that is currently being split, or
// a tenant that is meant to be detached
continue;
}

// TODO: optimization calculations are relatively expensive: create some fast-path for
// the common idle case (avoiding the search on tenants that we have recently checked)

for shard in &tenant_shards {
if let Some(optimization) =
// If idle, maybe ptimize attachments: if a shard has a secondary location that is preferable to
// its primary location based on soft constraints, cut it over.
shard.optimize_attachment(nodes, &schedule_context)
{
work.push((shard.tenant_shard_id, optimization));
break;
} else if let Some(optimization) =
// If idle, maybe optimize secondary locations: if a shard has a secondary location that would be
// better placed on another node, based on ScheduleContext, then adjust it. This
// covers cases like after a shard split, where we might have too many shards
// in the same tenant with secondary locations on the node where they originally split.
shard.optimize_secondary(scheduler, &schedule_context)
{
work.push((shard.tenant_shard_id, optimization));
break;
}

// TODO: extend this mechanism to prefer attaching on nodes with fewer attached
// tenants (i.e. extend schedule state to distinguish attached from secondary counts),
// for the total number of attachments on a node (not just within a tenant.)
if let Some(optimization) =
// If idle, maybe ptimize attachments: if a shard has a secondary location that is preferable to
// its primary location based on soft constraints, cut it over.
shard.optimize_attachment(nodes, &schedule_context)
{
work.push((shard.tenant_shard_id, optimization));
break;
} else if let Some(optimization) =
// If idle, maybe optimize secondary locations: if a shard has a secondary location that would be
// better placed on another node, based on ScheduleContext, then adjust it. This
// covers cases like after a shard split, where we might have too many shards
// in the same tenant with secondary locations on the node where they originally split.
shard.optimize_secondary(scheduler, &schedule_context)
{
work.push((shard.tenant_shard_id, optimization));
break;
}
}
}
Expand Down
Loading

0 comments on commit 008616c

Please sign in to comment.