From 32438029ca0f88477238c856164b453ada89d55a Mon Sep 17 00:00:00 2001 From: Vlad Lazar Date: Tue, 11 Jun 2024 12:17:20 +0100 Subject: [PATCH] storcon: track number of attached shards for each node Problem The storage controller does not track the number of shards attached to a given pageserver. This is a requirement for various scheduling operations (e.g. draining and filling will use this to figure out if the cluster is balanced) Summary of Changes Track the number of shards attached to each node. --- storage_controller/src/scheduler.rs | 101 ++++++++++++++++++------- storage_controller/src/service.rs | 2 +- storage_controller/src/tenant_shard.rs | 56 +++++++++----- 3 files changed, 114 insertions(+), 45 deletions(-) diff --git a/storage_controller/src/scheduler.rs b/storage_controller/src/scheduler.rs index 3ff0d87988d7..4ab85509dc4b 100644 --- a/storage_controller/src/scheduler.rs +++ b/storage_controller/src/scheduler.rs @@ -29,6 +29,8 @@ pub enum MaySchedule { struct SchedulerNode { /// How many shards are currently scheduled on this node, via their [`crate::tenant_shard::IntentState`]. shard_count: usize, + /// How many shards are currently attached on this node, via their [`crate::tenant_shard::IntentState`]. + attached_shard_count: usize, /// Whether this node is currently elegible to have new shards scheduled (this is derived /// from a node's availability state and scheduling policy). @@ -42,7 +44,9 @@ impl PartialEq for SchedulerNode { (MaySchedule::Yes(_), MaySchedule::Yes(_)) | (MaySchedule::No, MaySchedule::No) ); - may_schedule_matches && self.shard_count == other.shard_count + may_schedule_matches + && self.shard_count == other.shard_count + && self.attached_shard_count == other.attached_shard_count } } @@ -138,6 +142,15 @@ impl ScheduleContext { } } +pub(crate) enum RefCountUpdate { + PromoteSecondary, + Attach, + Detach, + DemoteAttached, + AddSecondary, + RemoveSecondary, +} + impl Scheduler { pub(crate) fn new<'a>(nodes: impl Iterator) -> Self { let mut scheduler_nodes = HashMap::new(); @@ -146,6 +159,7 @@ impl Scheduler { node.get_id(), SchedulerNode { shard_count: 0, + attached_shard_count: 0, may_schedule: node.may_schedule(), }, ); @@ -171,6 +185,7 @@ impl Scheduler { node.get_id(), SchedulerNode { shard_count: 0, + attached_shard_count: 0, may_schedule: node.may_schedule(), }, ); @@ -179,7 +194,10 @@ impl Scheduler { for shard in shards { if let Some(node_id) = shard.intent.get_attached() { match expect_nodes.get_mut(node_id) { - Some(node) => node.shard_count += 1, + Some(node) => { + node.shard_count += 1; + node.attached_shard_count += 1; + } None => anyhow::bail!( "Tenant {} references nonexistent node {}", shard.tenant_shard_id, @@ -227,31 +245,42 @@ impl Scheduler { Ok(()) } - /// Increment the reference count of a node. This reference count is used to guide scheduling - /// decisions, not for memory management: it represents one tenant shard whose IntentState targets - /// this node. + /// Update the reference counts of a node. These reference counts are used to guide scheduling + /// decisions, not for memory management: they represent the number of tenant shard whose IntentState + /// targets this node and the number of tenants shars whose IntentState is attached to this + /// node. /// /// It is an error to call this for a node that is not known to the scheduler (i.e. passed into /// [`Self::new`] or [`Self::node_upsert`]) - pub(crate) fn node_inc_ref(&mut self, node_id: NodeId) { - let Some(node) = self.nodes.get_mut(&node_id) else { - tracing::error!("Scheduler missing node {node_id}"); - debug_assert!(false); - return; - }; - - node.shard_count += 1; - } - - /// Decrement a node's reference count. Inverse of [`Self::node_inc_ref`]. - pub(crate) fn node_dec_ref(&mut self, node_id: NodeId) { + pub(crate) fn update_node_ref_counts(&mut self, node_id: NodeId, update: RefCountUpdate) { let Some(node) = self.nodes.get_mut(&node_id) else { debug_assert!(false); tracing::error!("Scheduler missing node {node_id}"); return; }; - node.shard_count -= 1; + match update { + RefCountUpdate::PromoteSecondary => { + node.attached_shard_count += 1; + } + RefCountUpdate::Attach => { + node.shard_count += 1; + node.attached_shard_count += 1; + } + RefCountUpdate::Detach => { + node.shard_count -= 1; + node.attached_shard_count -= 1; + } + RefCountUpdate::DemoteAttached => { + node.attached_shard_count -= 1; + } + RefCountUpdate::AddSecondary => { + node.shard_count += 1; + } + RefCountUpdate::RemoveSecondary => { + node.shard_count -= 1; + } + } } pub(crate) fn node_upsert(&mut self, node: &Node) { @@ -263,6 +292,7 @@ impl Scheduler { Vacant(entry) => { entry.insert(SchedulerNode { shard_count: 0, + attached_shard_count: 0, may_schedule: node.may_schedule(), }); } @@ -385,6 +415,11 @@ impl Scheduler { pub(crate) fn get_node_shard_count(&self, node_id: NodeId) -> usize { self.nodes.get(&node_id).unwrap().shard_count } + + #[cfg(test)] + pub(crate) fn get_node_attached_shard_count(&self, node_id: NodeId) -> usize { + self.nodes.get(&node_id).unwrap().attached_shard_count + } } #[cfg(test)] @@ -437,18 +472,28 @@ mod tests { let scheduled = scheduler.schedule_shard(&[], &context)?; t2_intent.set_attached(&mut scheduler, Some(scheduled)); - assert_eq!(scheduler.nodes.get(&NodeId(1)).unwrap().shard_count, 1); - assert_eq!(scheduler.nodes.get(&NodeId(2)).unwrap().shard_count, 1); + assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 1); + assert_eq!(scheduler.get_node_attached_shard_count(NodeId(1)), 1); + + assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 1); + assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 1); let scheduled = scheduler.schedule_shard(&t1_intent.all_pageservers(), &context)?; t1_intent.push_secondary(&mut scheduler, scheduled); - assert_eq!(scheduler.nodes.get(&NodeId(1)).unwrap().shard_count, 1); - assert_eq!(scheduler.nodes.get(&NodeId(2)).unwrap().shard_count, 2); + assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 1); + assert_eq!(scheduler.get_node_attached_shard_count(NodeId(1)), 1); + + assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 2); + assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 1); t1_intent.clear(&mut scheduler); - assert_eq!(scheduler.nodes.get(&NodeId(1)).unwrap().shard_count, 0); - assert_eq!(scheduler.nodes.get(&NodeId(2)).unwrap().shard_count, 1); + assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 0); + assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 1); + + let total_attached = scheduler.get_node_attached_shard_count(NodeId(1)) + + scheduler.get_node_attached_shard_count(NodeId(2)); + assert_eq!(total_attached, 1); if cfg!(debug_assertions) { // Dropping an IntentState without clearing it causes a panic in debug mode, @@ -459,8 +504,12 @@ mod tests { assert!(result.is_err()); } else { t2_intent.clear(&mut scheduler); - assert_eq!(scheduler.nodes.get(&NodeId(1)).unwrap().shard_count, 0); - assert_eq!(scheduler.nodes.get(&NodeId(2)).unwrap().shard_count, 0); + + assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 0); + assert_eq!(scheduler.get_node_attached_shard_count(NodeId(1)), 0); + + assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 0); + assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 0); } Ok(()) diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index 756dc10a2ab6..1e81b5c5a2ca 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -4312,7 +4312,7 @@ impl Service { continue; } - if tenant_shard.intent.demote_attached(node_id) { + if tenant_shard.intent.demote_attached(scheduler, node_id) { tenant_shard.sequence = tenant_shard.sequence.next(); // TODO: populate a ScheduleContext including all shards in the same tenant_id (only matters diff --git a/storage_controller/src/tenant_shard.rs b/storage_controller/src/tenant_shard.rs index dda17f9887dd..77bbf4c60491 100644 --- a/storage_controller/src/tenant_shard.rs +++ b/storage_controller/src/tenant_shard.rs @@ -8,7 +8,7 @@ use crate::{ metrics::{self, ReconcileCompleteLabelGroup, ReconcileOutcome}, persistence::TenantShardPersistence, reconciler::ReconcileUnits, - scheduler::{AffinityScore, MaySchedule, ScheduleContext}, + scheduler::{AffinityScore, MaySchedule, RefCountUpdate, ScheduleContext}, }; use pageserver_api::controller_api::{PlacementPolicy, ShardSchedulingPolicy}; use pageserver_api::{ @@ -153,7 +153,7 @@ impl IntentState { } pub(crate) fn single(scheduler: &mut Scheduler, node_id: Option) -> Self { if let Some(node_id) = node_id { - scheduler.node_inc_ref(node_id); + scheduler.update_node_ref_counts(node_id, RefCountUpdate::Attach); } Self { attached: node_id, @@ -164,10 +164,10 @@ impl IntentState { pub(crate) fn set_attached(&mut self, scheduler: &mut Scheduler, new_attached: Option) { if self.attached != new_attached { if let Some(old_attached) = self.attached.take() { - scheduler.node_dec_ref(old_attached); + scheduler.update_node_ref_counts(old_attached, RefCountUpdate::Detach); } if let Some(new_attached) = &new_attached { - scheduler.node_inc_ref(*new_attached); + scheduler.update_node_ref_counts(*new_attached, RefCountUpdate::Attach); } self.attached = new_attached; } @@ -177,22 +177,27 @@ impl IntentState { /// secondary to attached while maintaining the scheduler's reference counts. pub(crate) fn promote_attached( &mut self, - _scheduler: &mut Scheduler, + scheduler: &mut Scheduler, promote_secondary: NodeId, ) { // If we call this with a node that isn't in secondary, it would cause incorrect // scheduler reference counting, since we assume the node is already referenced as a secondary. debug_assert!(self.secondary.contains(&promote_secondary)); - // TODO: when scheduler starts tracking attached + secondary counts separately, we will - // need to call into it here. self.secondary.retain(|n| n != &promote_secondary); + + let demoted = self.attached; self.attached = Some(promote_secondary); + + scheduler.update_node_ref_counts(promote_secondary, RefCountUpdate::PromoteSecondary); + if let Some(demoted) = demoted { + scheduler.update_node_ref_counts(demoted, RefCountUpdate::DemoteAttached); + } } pub(crate) fn push_secondary(&mut self, scheduler: &mut Scheduler, new_secondary: NodeId) { debug_assert!(!self.secondary.contains(&new_secondary)); - scheduler.node_inc_ref(new_secondary); + scheduler.update_node_ref_counts(new_secondary, RefCountUpdate::AddSecondary); self.secondary.push(new_secondary); } @@ -200,27 +205,27 @@ impl IntentState { pub(crate) fn remove_secondary(&mut self, scheduler: &mut Scheduler, node_id: NodeId) { let index = self.secondary.iter().position(|n| *n == node_id); if let Some(index) = index { - scheduler.node_dec_ref(node_id); + scheduler.update_node_ref_counts(node_id, RefCountUpdate::RemoveSecondary); self.secondary.remove(index); } } pub(crate) fn clear_secondary(&mut self, scheduler: &mut Scheduler) { for secondary in self.secondary.drain(..) { - scheduler.node_dec_ref(secondary); + scheduler.update_node_ref_counts(secondary, RefCountUpdate::RemoveSecondary); } } /// Remove the last secondary node from the list of secondaries pub(crate) fn pop_secondary(&mut self, scheduler: &mut Scheduler) { if let Some(node_id) = self.secondary.pop() { - scheduler.node_dec_ref(node_id); + scheduler.update_node_ref_counts(node_id, RefCountUpdate::RemoveSecondary); } } pub(crate) fn clear(&mut self, scheduler: &mut Scheduler) { if let Some(old_attached) = self.attached.take() { - scheduler.node_dec_ref(old_attached); + scheduler.update_node_ref_counts(old_attached, RefCountUpdate::Detach); } self.clear_secondary(scheduler); @@ -251,12 +256,11 @@ impl IntentState { /// forget the location on the offline node. /// /// Returns true if a change was made - pub(crate) fn demote_attached(&mut self, node_id: NodeId) -> bool { + pub(crate) fn demote_attached(&mut self, scheduler: &mut Scheduler, node_id: NodeId) -> bool { if self.attached == Some(node_id) { - // TODO: when scheduler starts tracking attached + secondary counts separately, we will - // need to call into it here. self.attached = None; self.secondary.push(node_id); + scheduler.update_node_ref_counts(node_id, RefCountUpdate::DemoteAttached); true } else { false @@ -593,7 +597,7 @@ impl TenantShard { Secondary => { if let Some(node_id) = self.intent.get_attached() { // Populate secondary by demoting the attached node - self.intent.demote_attached(*node_id); + self.intent.demote_attached(scheduler, *node_id); modified = true; } else if self.intent.secondary.is_empty() { // Populate secondary by scheduling a fresh node @@ -783,7 +787,7 @@ impl TenantShard { old_attached_node_id, new_attached_node_id, }) => { - self.intent.demote_attached(old_attached_node_id); + self.intent.demote_attached(scheduler, old_attached_node_id); self.intent .promote_attached(scheduler, new_attached_node_id); } @@ -1321,7 +1325,9 @@ pub(crate) mod tests { assert_ne!(attached_node_id, secondary_node_id); // Notifying the attached node is offline should demote it to a secondary - let changed = tenant_shard.intent.demote_attached(attached_node_id); + let changed = tenant_shard + .intent + .demote_attached(&mut scheduler, attached_node_id); assert!(changed); assert!(tenant_shard.intent.attached.is_none()); assert_eq!(tenant_shard.intent.secondary.len(), 2); @@ -1604,7 +1610,14 @@ pub(crate) mod tests { // We should see equal number of locations on the two nodes. assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 4); + // Scheduling does not consider the number of attachments picking the initial + // pageserver to attach to (hence the assertion that all primaries are on the + // same node) + // TODO: Tweak the scheduling to evenly distribute attachments for new shards. + assert_eq!(scheduler.get_node_attached_shard_count(NodeId(1)), 4); + assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 4); + assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 0); // Add another two nodes: we should see the shards spread out when their optimize // methods are called @@ -1613,9 +1626,16 @@ pub(crate) mod tests { optimize_til_idle(&nodes, &mut scheduler, &mut shards); assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 2); + assert_eq!(scheduler.get_node_attached_shard_count(NodeId(1)), 1); + assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 2); + assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 1); + assert_eq!(scheduler.get_node_shard_count(NodeId(3)), 2); + assert_eq!(scheduler.get_node_attached_shard_count(NodeId(3)), 1); + assert_eq!(scheduler.get_node_shard_count(NodeId(4)), 2); + assert_eq!(scheduler.get_node_attached_shard_count(NodeId(4)), 1); for shard in shards.iter_mut() { shard.intent.clear(&mut scheduler);