From 008616cfe696b219d39a53baee396748e75f9477 Mon Sep 17 00:00:00 2001 From: John Spray Date: Fri, 29 Nov 2024 13:27:49 +0000 Subject: [PATCH] storage controller: use proper ScheduleContext when evacuating a node (#9908) ## Problem When picking locations for a shard, we should use a ScheduleContext that includes all the other shards in the tenant, so that we apply proper anti-affinity between shards. If we don't do this, then it can lead to unstable scheduling, where we place a shard somewhere that the optimizer will then immediately move it away from. We didn't always do this, because it was a bit awkward to accumulate the context for a tenant rather than just walking tenants. This was a TODO in `handle_node_availability_transition`: ``` // TODO: populate a ScheduleContext including all shards in the same tenant_id (only matters // for tenants without secondary locations: if they have a secondary location, then this // schedule() call is just promoting an existing secondary) ``` This is a precursor to https://github.com/neondatabase/neon/issues/8264, where the current imperfect scheduling during node evacuation hampers testing. ## Summary of changes - Add an iterator type that yields each shard along with a schedulecontext that includes all the other shards from the same tenant - Use the iterator to replace hand-crafted logic in optimize_all_plan (functionally identical) - Use the iterator in `handle_node_availability_transition` to apply proper anti-affinity during node evacuation. --- storage_controller/src/scheduler.rs | 17 +- storage_controller/src/service.rs | 200 +++++++----------- .../src/service/context_iterator.rs | 139 ++++++++++++ storage_controller/src/tenant_shard.rs | 11 +- 4 files changed, 245 insertions(+), 122 deletions(-) create mode 100644 storage_controller/src/service/context_iterator.rs diff --git a/storage_controller/src/scheduler.rs b/storage_controller/src/scheduler.rs index 2414d95eb89b..ecc6b11e4758 100644 --- a/storage_controller/src/scheduler.rs +++ b/storage_controller/src/scheduler.rs @@ -305,7 +305,7 @@ impl std::ops::Add for AffinityScore { /// Hint for whether this is a sincere attempt to schedule, or a speculative /// check for where we _would_ schedule (done during optimization) -#[derive(Debug)] +#[derive(Debug, Clone)] pub(crate) enum ScheduleMode { Normal, Speculative, @@ -319,7 +319,7 @@ impl Default for ScheduleMode { // For carrying state between multiple calls to [`TenantShard::schedule`], e.g. when calling // it for many shards in the same tenant. -#[derive(Debug, Default)] +#[derive(Debug, Default, Clone)] pub(crate) struct ScheduleContext { /// Sparse map of nodes: omitting a node implicitly makes its affinity [`AffinityScore::FREE`] pub(crate) nodes: HashMap, @@ -331,6 +331,14 @@ pub(crate) struct ScheduleContext { } impl ScheduleContext { + pub(crate) fn new(mode: ScheduleMode) -> Self { + Self { + nodes: HashMap::new(), + attached_nodes: HashMap::new(), + mode, + } + } + /// Input is a list of nodes we would like to avoid using again within this context. The more /// times a node is passed into this call, the less inclined we are to use it. pub(crate) fn avoid(&mut self, nodes: &[NodeId]) { @@ -355,6 +363,11 @@ impl ScheduleContext { pub(crate) fn get_node_attachments(&self, node_id: NodeId) -> usize { self.attached_nodes.get(&node_id).copied().unwrap_or(0) } + + #[cfg(test)] + pub(crate) fn attach_count(&self) -> usize { + self.attached_nodes.values().sum() + } } pub(crate) enum RefCountUpdate { diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index 446c476b99c5..636ccf11a120 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -1,3 +1,6 @@ +pub mod chaos_injector; +mod context_iterator; + use hyper::Uri; use std::{ borrow::Cow, @@ -95,7 +98,7 @@ use crate::{ }, }; -pub mod chaos_injector; +use context_iterator::TenantShardContextIterator; // For operations that should be quick, like attaching a new tenant const SHORT_RECONCILE_TIMEOUT: Duration = Duration::from_secs(5); @@ -5498,49 +5501,51 @@ impl Service { let mut tenants_affected: usize = 0; - for (tenant_shard_id, tenant_shard) in tenants { - if let Some(observed_loc) = tenant_shard.observed.locations.get_mut(&node_id) { - // When a node goes offline, we set its observed configuration to None, indicating unknown: we will - // not assume our knowledge of the node's configuration is accurate until it comes back online - observed_loc.conf = None; - } - - if nodes.len() == 1 { - // Special case for single-node cluster: there is no point trying to reschedule - // any tenant shards: avoid doing so, in order to avoid spewing warnings about - // failures to schedule them. - continue; - } + for (_tenant_id, mut schedule_context, shards) in + TenantShardContextIterator::new(tenants, ScheduleMode::Normal) + { + for tenant_shard in shards { + let tenant_shard_id = tenant_shard.tenant_shard_id; + if let Some(observed_loc) = + tenant_shard.observed.locations.get_mut(&node_id) + { + // When a node goes offline, we set its observed configuration to None, indicating unknown: we will + // not assume our knowledge of the node's configuration is accurate until it comes back online + observed_loc.conf = None; + } - if !nodes - .values() - .any(|n| matches!(n.may_schedule(), MaySchedule::Yes(_))) - { - // Special case for when all nodes are unavailable and/or unschedulable: there is no point - // trying to reschedule since there's nowhere else to go. Without this - // branch we incorrectly detach tenants in response to node unavailability. - continue; - } + if nodes.len() == 1 { + // Special case for single-node cluster: there is no point trying to reschedule + // any tenant shards: avoid doing so, in order to avoid spewing warnings about + // failures to schedule them. + continue; + } - if tenant_shard.intent.demote_attached(scheduler, node_id) { - tenant_shard.sequence = tenant_shard.sequence.next(); + if !nodes + .values() + .any(|n| matches!(n.may_schedule(), MaySchedule::Yes(_))) + { + // Special case for when all nodes are unavailable and/or unschedulable: there is no point + // trying to reschedule since there's nowhere else to go. Without this + // branch we incorrectly detach tenants in response to node unavailability. + continue; + } - // TODO: populate a ScheduleContext including all shards in the same tenant_id (only matters - // for tenants without secondary locations: if they have a secondary location, then this - // schedule() call is just promoting an existing secondary) - let mut schedule_context = ScheduleContext::default(); + if tenant_shard.intent.demote_attached(scheduler, node_id) { + tenant_shard.sequence = tenant_shard.sequence.next(); - match tenant_shard.schedule(scheduler, &mut schedule_context) { - Err(e) => { - // It is possible that some tenants will become unschedulable when too many pageservers - // go offline: in this case there isn't much we can do other than make the issue observable. - // TODO: give TenantShard a scheduling error attribute to be queried later. - tracing::warn!(%tenant_shard_id, "Scheduling error when marking pageserver {} offline: {e}", node_id); - } - Ok(()) => { - if self.maybe_reconcile_shard(tenant_shard, nodes).is_some() { - tenants_affected += 1; - }; + match tenant_shard.schedule(scheduler, &mut schedule_context) { + Err(e) => { + // It is possible that some tenants will become unschedulable when too many pageservers + // go offline: in this case there isn't much we can do other than make the issue observable. + // TODO: give TenantShard a scheduling error attribute to be queried later. + tracing::warn!(%tenant_shard_id, "Scheduling error when marking pageserver {} offline: {e}", node_id); + } + Ok(()) => { + if self.maybe_reconcile_shard(tenant_shard, nodes).is_some() { + tenants_affected += 1; + }; + } } } } @@ -6011,14 +6016,8 @@ impl Service { let (nodes, tenants, _scheduler) = locked.parts_mut(); let pageservers = nodes.clone(); - let mut schedule_context = ScheduleContext::default(); - let mut reconciles_spawned = 0; - for (tenant_shard_id, shard) in tenants.iter_mut() { - if tenant_shard_id.is_shard_zero() { - schedule_context = ScheduleContext::default(); - } - + for shard in tenants.values_mut() { // Skip checking if this shard is already enqueued for reconciliation if shard.delayed_reconcile && self.reconciler_concurrency.available_permits() == 0 { // If there is something delayed, then return a nonzero count so that @@ -6033,8 +6032,6 @@ impl Service { if self.maybe_reconcile_shard(shard, &pageservers).is_some() { reconciles_spawned += 1; } - - schedule_context.avoid(&shard.intent.all_pageservers()); } reconciles_spawned @@ -6103,95 +6100,62 @@ impl Service { } fn optimize_all_plan(&self) -> Vec<(TenantShardId, ScheduleOptimization)> { - let mut schedule_context = ScheduleContext::default(); - - let mut tenant_shards: Vec<&TenantShard> = Vec::new(); - // How many candidate optimizations we will generate, before evaluating them for readniess: setting // this higher than the execution limit gives us a chance to execute some work even if the first // few optimizations we find are not ready. const MAX_OPTIMIZATIONS_PLAN_PER_PASS: usize = 8; let mut work = Vec::new(); - let mut locked = self.inner.write().unwrap(); let (nodes, tenants, scheduler) = locked.parts_mut(); - for (tenant_shard_id, shard) in tenants.iter() { - if tenant_shard_id.is_shard_zero() { - // Reset accumulators on the first shard in a tenant - schedule_context = ScheduleContext::default(); - schedule_context.mode = ScheduleMode::Speculative; - tenant_shards.clear(); - } - - if work.len() >= MAX_OPTIMIZATIONS_PLAN_PER_PASS { - break; - } - match shard.get_scheduling_policy() { - ShardSchedulingPolicy::Active => { - // Ok to do optimization + for (_tenant_id, schedule_context, shards) in + TenantShardContextIterator::new(tenants, ScheduleMode::Speculative) + { + for shard in shards { + if work.len() >= MAX_OPTIMIZATIONS_PLAN_PER_PASS { + break; } - ShardSchedulingPolicy::Essential - | ShardSchedulingPolicy::Pause - | ShardSchedulingPolicy::Stop => { - // Policy prevents optimizing this shard. - continue; + match shard.get_scheduling_policy() { + ShardSchedulingPolicy::Active => { + // Ok to do optimization + } + ShardSchedulingPolicy::Essential + | ShardSchedulingPolicy::Pause + | ShardSchedulingPolicy::Stop => { + // Policy prevents optimizing this shard. + continue; + } } - } - - // Accumulate the schedule context for all the shards in a tenant: we must have - // the total view of all shards before we can try to optimize any of them. - schedule_context.avoid(&shard.intent.all_pageservers()); - if let Some(attached) = shard.intent.get_attached() { - schedule_context.push_attached(*attached); - } - tenant_shards.push(shard); - // Once we have seen the last shard in the tenant, proceed to search across all shards - // in the tenant for optimizations - if shard.shard.number.0 == shard.shard.count.count() - 1 { - if tenant_shards.iter().any(|s| s.reconciler.is_some()) { + if !matches!(shard.splitting, SplitState::Idle) + || matches!(shard.policy, PlacementPolicy::Detached) + || shard.reconciler.is_some() + { // Do not start any optimizations while another change to the tenant is ongoing: this // is not necessary for correctness, but simplifies operations and implicitly throttles // optimization changes to happen in a "trickle" over time. continue; } - if tenant_shards.iter().any(|s| { - !matches!(s.splitting, SplitState::Idle) - || matches!(s.policy, PlacementPolicy::Detached) - }) { - // Never attempt to optimize a tenant that is currently being split, or - // a tenant that is meant to be detached - continue; - } - // TODO: optimization calculations are relatively expensive: create some fast-path for // the common idle case (avoiding the search on tenants that we have recently checked) - - for shard in &tenant_shards { - if let Some(optimization) = - // If idle, maybe ptimize attachments: if a shard has a secondary location that is preferable to - // its primary location based on soft constraints, cut it over. - shard.optimize_attachment(nodes, &schedule_context) - { - work.push((shard.tenant_shard_id, optimization)); - break; - } else if let Some(optimization) = - // If idle, maybe optimize secondary locations: if a shard has a secondary location that would be - // better placed on another node, based on ScheduleContext, then adjust it. This - // covers cases like after a shard split, where we might have too many shards - // in the same tenant with secondary locations on the node where they originally split. - shard.optimize_secondary(scheduler, &schedule_context) - { - work.push((shard.tenant_shard_id, optimization)); - break; - } - - // TODO: extend this mechanism to prefer attaching on nodes with fewer attached - // tenants (i.e. extend schedule state to distinguish attached from secondary counts), - // for the total number of attachments on a node (not just within a tenant.) + if let Some(optimization) = + // If idle, maybe ptimize attachments: if a shard has a secondary location that is preferable to + // its primary location based on soft constraints, cut it over. + shard.optimize_attachment(nodes, &schedule_context) + { + work.push((shard.tenant_shard_id, optimization)); + break; + } else if let Some(optimization) = + // If idle, maybe optimize secondary locations: if a shard has a secondary location that would be + // better placed on another node, based on ScheduleContext, then adjust it. This + // covers cases like after a shard split, where we might have too many shards + // in the same tenant with secondary locations on the node where they originally split. + shard.optimize_secondary(scheduler, &schedule_context) + { + work.push((shard.tenant_shard_id, optimization)); + break; } } } diff --git a/storage_controller/src/service/context_iterator.rs b/storage_controller/src/service/context_iterator.rs new file mode 100644 index 000000000000..d38010a27eca --- /dev/null +++ b/storage_controller/src/service/context_iterator.rs @@ -0,0 +1,139 @@ +use std::collections::BTreeMap; + +use utils::id::TenantId; +use utils::shard::TenantShardId; + +use crate::scheduler::{ScheduleContext, ScheduleMode}; +use crate::tenant_shard::TenantShard; + +/// When making scheduling decisions, it is useful to have the ScheduleContext for a whole +/// tenant while considering the individual shards within it. This iterator is a helper +/// that gathers all the shards in a tenant and then yields them together with a ScheduleContext +/// for the tenant. +pub(super) struct TenantShardContextIterator<'a> { + schedule_mode: ScheduleMode, + inner: std::collections::btree_map::IterMut<'a, TenantShardId, TenantShard>, +} + +impl<'a> TenantShardContextIterator<'a> { + pub(super) fn new( + tenants: &'a mut BTreeMap, + schedule_mode: ScheduleMode, + ) -> Self { + Self { + schedule_mode, + inner: tenants.iter_mut(), + } + } +} + +impl<'a> Iterator for TenantShardContextIterator<'a> { + type Item = (TenantId, ScheduleContext, Vec<&'a mut TenantShard>); + + fn next(&mut self) -> Option { + let mut tenant_shards = Vec::new(); + let mut schedule_context = ScheduleContext::new(self.schedule_mode.clone()); + loop { + let (tenant_shard_id, shard) = self.inner.next()?; + + if tenant_shard_id.is_shard_zero() { + // Cleared on last shard of previous tenant + assert!(tenant_shards.is_empty()); + } + + // Accumulate the schedule context for all the shards in a tenant + schedule_context.avoid(&shard.intent.all_pageservers()); + if let Some(attached) = shard.intent.get_attached() { + schedule_context.push_attached(*attached); + } + tenant_shards.push(shard); + + if tenant_shard_id.shard_number.0 == tenant_shard_id.shard_count.count() - 1 { + return Some((tenant_shard_id.tenant_id, schedule_context, tenant_shards)); + } + } + } +} + +#[cfg(test)] +mod tests { + use std::{collections::BTreeMap, str::FromStr}; + + use pageserver_api::controller_api::PlacementPolicy; + use utils::shard::{ShardCount, ShardNumber}; + + use crate::{ + scheduler::test_utils::make_test_nodes, service::Scheduler, + tenant_shard::tests::make_test_tenant_with_id, + }; + + use super::*; + + #[test] + fn test_context_iterator() { + // Hand-crafted tenant IDs to ensure they appear in the expected order when put into + // a btreemap & iterated + let mut t_1_shards = make_test_tenant_with_id( + TenantId::from_str("af0480929707ee75372337efaa5ecf96").unwrap(), + PlacementPolicy::Attached(1), + ShardCount(1), + None, + ); + let t_2_shards = make_test_tenant_with_id( + TenantId::from_str("bf0480929707ee75372337efaa5ecf96").unwrap(), + PlacementPolicy::Attached(1), + ShardCount(4), + None, + ); + let mut t_3_shards = make_test_tenant_with_id( + TenantId::from_str("cf0480929707ee75372337efaa5ecf96").unwrap(), + PlacementPolicy::Attached(1), + ShardCount(1), + None, + ); + + let t1_id = t_1_shards[0].tenant_shard_id.tenant_id; + let t2_id = t_2_shards[0].tenant_shard_id.tenant_id; + let t3_id = t_3_shards[0].tenant_shard_id.tenant_id; + + let mut tenants = BTreeMap::new(); + tenants.insert(t_1_shards[0].tenant_shard_id, t_1_shards.pop().unwrap()); + for shard in t_2_shards { + tenants.insert(shard.tenant_shard_id, shard); + } + tenants.insert(t_3_shards[0].tenant_shard_id, t_3_shards.pop().unwrap()); + + let nodes = make_test_nodes(3, &[]); + let mut scheduler = Scheduler::new(nodes.values()); + let mut context = ScheduleContext::default(); + for shard in tenants.values_mut() { + shard.schedule(&mut scheduler, &mut context).unwrap(); + } + + let mut iter = TenantShardContextIterator::new(&mut tenants, ScheduleMode::Speculative); + let (tenant_id, context, shards) = iter.next().unwrap(); + assert_eq!(tenant_id, t1_id); + assert_eq!(shards[0].tenant_shard_id.shard_number, ShardNumber(0)); + assert_eq!(shards.len(), 1); + assert_eq!(context.attach_count(), 1); + + let (tenant_id, context, shards) = iter.next().unwrap(); + assert_eq!(tenant_id, t2_id); + assert_eq!(shards[0].tenant_shard_id.shard_number, ShardNumber(0)); + assert_eq!(shards[1].tenant_shard_id.shard_number, ShardNumber(1)); + assert_eq!(shards[2].tenant_shard_id.shard_number, ShardNumber(2)); + assert_eq!(shards[3].tenant_shard_id.shard_number, ShardNumber(3)); + assert_eq!(shards.len(), 4); + assert_eq!(context.attach_count(), 4); + + let (tenant_id, context, shards) = iter.next().unwrap(); + assert_eq!(tenant_id, t3_id); + assert_eq!(shards[0].tenant_shard_id.shard_number, ShardNumber(0)); + assert_eq!(shards.len(), 1); + assert_eq!(context.attach_count(), 1); + + for shard in tenants.values_mut() { + shard.intent.clear(&mut scheduler); + } + } +} diff --git a/storage_controller/src/tenant_shard.rs b/storage_controller/src/tenant_shard.rs index 27c97d3b864e..2eb98ee82545 100644 --- a/storage_controller/src/tenant_shard.rs +++ b/storage_controller/src/tenant_shard.rs @@ -1574,13 +1574,20 @@ pub(crate) mod tests { ) } - fn make_test_tenant( + pub(crate) fn make_test_tenant( policy: PlacementPolicy, shard_count: ShardCount, preferred_az: Option, ) -> Vec { - let tenant_id = TenantId::generate(); + make_test_tenant_with_id(TenantId::generate(), policy, shard_count, preferred_az) + } + pub(crate) fn make_test_tenant_with_id( + tenant_id: TenantId, + policy: PlacementPolicy, + shard_count: ShardCount, + preferred_az: Option, + ) -> Vec { (0..shard_count.count()) .map(|i| { let shard_number = ShardNumber(i);