From 6204a3f748fb87750cd55beb9a85ea0598738680 Mon Sep 17 00:00:00 2001 From: John Spray Date: Mon, 9 Dec 2024 09:25:32 +0000 Subject: [PATCH 1/4] storcon: do up-front AZ selection during creation --- storage_controller/src/scheduler.rs | 4 ++ storage_controller/src/service.rs | 65 ++++++++++---------------- storage_controller/src/tenant_shard.rs | 15 +++--- 3 files changed, 34 insertions(+), 50 deletions(-) diff --git a/storage_controller/src/scheduler.rs b/storage_controller/src/scheduler.rs index ecc6b11e4758..1fb2d3eb9b9c 100644 --- a/storage_controller/src/scheduler.rs +++ b/storage_controller/src/scheduler.rs @@ -742,6 +742,10 @@ impl Scheduler { self.schedule_shard::(&[], &None, &ScheduleContext::default()) } + pub(crate) fn get_node_az(&self, node_id: &NodeId) -> Option { + self.nodes.get(node_id).map(|n| n.az.clone()) + } + /// Unit test access to internal state #[cfg(test)] pub(crate) fn get_node_shard_count(&self, node_id: NodeId) -> usize { diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index 7e4ee53b4cbf..bb189c2a7239 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -29,7 +29,7 @@ use crate::{ ShardGenerationState, TenantFilter, }, reconciler::{ReconcileError, ReconcileUnits, ReconcilerConfig, ReconcilerConfigBuilder}, - scheduler::{MaySchedule, ScheduleContext, ScheduleError, ScheduleMode}, + scheduler::{AttachedShardTag, MaySchedule, ScheduleContext, ScheduleError, ScheduleMode}, tenant_shard::{ MigrateAttachment, ObservedStateDelta, ReconcileNeeded, ReconcilerStatus, ScheduleOptimization, ScheduleOptimizationAction, @@ -1579,6 +1579,7 @@ impl Service { attach_req.tenant_shard_id, ShardIdentity::unsharded(), PlacementPolicy::Attached(0), + None, ), ); tracing::info!("Inserted shard {} in memory", attach_req.tenant_shard_id); @@ -2106,6 +2107,21 @@ impl Service { ) }; + let preferred_az_id: Option = { + let mut locked = self.inner.write().unwrap(); + + // Idempotency: take the existing value if the tenant already exists + if let Some(shard) = locked.tenants.get(create_ids.first().unwrap()) { + shard.preferred_az().cloned() + } else { + locked + .scheduler + .schedule_shard::(&[], &None, &ScheduleContext::default()) + .ok() + .and_then(|n_id| locked.scheduler.get_node_az(&n_id)) + } + }; + // Ordering: we persist tenant shards before creating them on the pageserver. This enables a caller // to clean up after themselves by issuing a tenant deletion if something goes wrong and we restart // during the creation, rather than risking leaving orphan objects in S3. @@ -2125,7 +2141,7 @@ impl Service { splitting: SplitState::default(), scheduling_policy: serde_json::to_string(&ShardSchedulingPolicy::default()) .unwrap(), - preferred_az_id: None, + preferred_az_id: preferred_az_id.as_ref().map(|az| az.to_string()), }) .collect(); @@ -2161,6 +2177,7 @@ impl Service { &create_req.shard_parameters, create_req.config.clone(), placement_policy.clone(), + preferred_az_id.as_ref(), &mut schedule_context, ) .await; @@ -2174,44 +2191,6 @@ impl Service { } } - let preferred_azs = { - let locked = self.inner.read().unwrap(); - response_shards - .iter() - .filter_map(|resp| { - let az_id = locked - .nodes - .get(&resp.node_id) - .map(|n| n.get_availability_zone_id().clone())?; - - Some((resp.shard_id, az_id)) - }) - .collect::>() - }; - - // Note that we persist the preferred AZ for the new shards separately. - // In theory, we could "peek" the scheduler to determine where the shard will - // land, but the subsequent "real" call into the scheduler might select a different - // node. Hence, we do this awkward update to keep things consistent. - let updated = self - .persistence - .set_tenant_shard_preferred_azs(preferred_azs) - .await - .map_err(|err| { - ApiError::InternalServerError(anyhow::anyhow!( - "Failed to persist preferred az ids: {err}" - )) - })?; - - { - let mut locked = self.inner.write().unwrap(); - for (tid, az_id) in updated { - if let Some(shard) = locked.tenants.get_mut(&tid) { - shard.set_preferred_az(az_id); - } - } - } - // If we failed to schedule shards, then they are still created in the controller, // but we return an error to the requester to avoid a silent failure when someone // tries to e.g. create a tenant whose placement policy requires more nodes than @@ -2242,6 +2221,7 @@ impl Service { /// Helper for tenant creation that does the scheduling for an individual shard. Covers both the /// case of a new tenant and a pre-existing one. + #[allow(clippy::too_many_arguments)] async fn do_initial_shard_scheduling( &self, tenant_shard_id: TenantShardId, @@ -2249,6 +2229,7 @@ impl Service { shard_params: &ShardParameters, config: TenantConfig, placement_policy: PlacementPolicy, + preferred_az_id: Option<&AvailabilityZone>, schedule_context: &mut ScheduleContext, ) -> InitialShardScheduleOutcome { let mut locked = self.inner.write().unwrap(); @@ -2286,6 +2267,7 @@ impl Service { tenant_shard_id, ShardIdentity::from_params(tenant_shard_id.shard_number, shard_params), placement_policy, + preferred_az_id.cloned(), )); state.generation = initial_generation; @@ -4184,7 +4166,8 @@ impl Service { }, ); - let mut child_state = TenantShard::new(child, child_shard, policy.clone()); + let mut child_state = + TenantShard::new(child, child_shard, policy.clone(), preferred_az.clone()); child_state.intent = IntentState::single(scheduler, Some(pageserver)); child_state.observed = ObservedState { locations: child_observed, diff --git a/storage_controller/src/tenant_shard.rs b/storage_controller/src/tenant_shard.rs index 2eb98ee82545..f1b921646f44 100644 --- a/storage_controller/src/tenant_shard.rs +++ b/storage_controller/src/tenant_shard.rs @@ -472,6 +472,7 @@ impl TenantShard { tenant_shard_id: TenantShardId, shard: ShardIdentity, policy: PlacementPolicy, + preferred_az_id: Option, ) -> Self { metrics::METRICS_REGISTRY .metrics_group @@ -495,7 +496,7 @@ impl TenantShard { last_error: Arc::default(), pending_compute_notification: false, scheduling_policy: ShardSchedulingPolicy::default(), - preferred_az_id: None, + preferred_az_id, } } @@ -1571,6 +1572,7 @@ pub(crate) mod tests { ) .unwrap(), policy, + None, ) } @@ -1597,7 +1599,7 @@ pub(crate) mod tests { shard_number, shard_count, }; - let mut ts = TenantShard::new( + TenantShard::new( tenant_shard_id, ShardIdentity::new( shard_number, @@ -1606,13 +1608,8 @@ pub(crate) mod tests { ) .unwrap(), policy.clone(), - ); - - if let Some(az) = &preferred_az { - ts.set_preferred_az(az.clone()); - } - - ts + preferred_az.clone(), + ) }) .collect() } From c5f2415b9ce23a0ae29643a3c83658be9a7a3fe2 Mon Sep 17 00:00:00 2001 From: John Spray Date: Mon, 9 Dec 2024 09:26:43 +0000 Subject: [PATCH 2/4] storcon: remove stale comment --- storage_controller/src/service.rs | 4 ---- 1 file changed, 4 deletions(-) diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index bb189c2a7239..b3b9bc15dafc 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -2240,10 +2240,6 @@ impl Service { Entry::Occupied(mut entry) => { tracing::info!("Tenant shard {tenant_shard_id} already exists while creating"); - // TODO: schedule() should take an anti-affinity expression that pushes - // attached and secondary locations (independently) away frorm those - // pageservers also holding a shard for this tenant. - if let Err(err) = entry.get_mut().schedule(scheduler, schedule_context) { return InitialShardScheduleOutcome::ShardScheduleError(err); } From ecbe8e8c0cb02d4ebda70ca853a130f8ca235def Mon Sep 17 00:00:00 2001 From: John Spray Date: Mon, 9 Dec 2024 09:56:27 +0000 Subject: [PATCH 3/4] storcon: pick AZ based on median utilization --- libs/pageserver_api/src/controller_api.rs | 2 +- libs/pageserver_api/src/models/utilization.rs | 7 ++ storage_controller/src/scheduler.rs | 93 ++++++++++++++++++- storage_controller/src/service.rs | 13 +-- 4 files changed, 103 insertions(+), 12 deletions(-) diff --git a/libs/pageserver_api/src/controller_api.rs b/libs/pageserver_api/src/controller_api.rs index 6839ef69f592..ec7b81423a44 100644 --- a/libs/pageserver_api/src/controller_api.rs +++ b/libs/pageserver_api/src/controller_api.rs @@ -75,7 +75,7 @@ pub struct TenantPolicyRequest { pub scheduling: Option, } -#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Hash, Debug)] +#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Hash, Debug, PartialOrd, Ord)] pub struct AvailabilityZone(pub String); impl Display for AvailabilityZone { diff --git a/libs/pageserver_api/src/models/utilization.rs b/libs/pageserver_api/src/models/utilization.rs index 641aa51989ed..62eb0aeb0fcd 100644 --- a/libs/pageserver_api/src/models/utilization.rs +++ b/libs/pageserver_api/src/models/utilization.rs @@ -104,6 +104,13 @@ impl PageserverUtilization { score >= 2 * Self::UTILIZATION_FULL } + /// The inverse of [`Self::score`]. Higher values are more affine to scheduling more work on this node. + /// + /// This uses the same threshold as [`Self::is_overloaded`] as a definition of "full". + pub fn free(&self) -> RawScore { + self.free_space_bytes + } + pub fn adjust_shard_count_max(&mut self, shard_count: u32) { if self.shard_count < shard_count { self.shard_count = shard_count; diff --git a/storage_controller/src/scheduler.rs b/storage_controller/src/scheduler.rs index 1fb2d3eb9b9c..51a4cf35be0a 100644 --- a/storage_controller/src/scheduler.rs +++ b/storage_controller/src/scheduler.rs @@ -742,8 +742,48 @@ impl Scheduler { self.schedule_shard::(&[], &None, &ScheduleContext::default()) } - pub(crate) fn get_node_az(&self, node_id: &NodeId) -> Option { - self.nodes.get(node_id).map(|n| n.az.clone()) + /// For choosing which AZ to schedule a new shard into, use this. It will return the + /// AZ with the lowest median utilization. + /// + /// We use an AZ-wide measure rather than simply selecting the AZ of the least-loaded + /// node, because while tenants start out single sharded, when they grow and undergo + /// shard-split, they will occupy space on many nodes within an AZ. + /// + /// We use median rather than total free space or mean utilization, because + /// we wish to avoid preferring AZs that have low-load nodes resulting from + /// recent replacements. + /// + /// The practical result is that we will pick an AZ based on its median node, and + /// then actually _schedule_ the new shard onto the lowest-loaded node in that AZ. + pub(crate) fn get_az_for_new_tenant(&self) -> Option { + if self.nodes.is_empty() { + return None; + } + + let mut scores_by_az = HashMap::new(); + for (node_id, node) in &self.nodes { + let az_scores = scores_by_az.entry(&node.az).or_insert_with(Vec::new); + let score = match &node.may_schedule { + MaySchedule::Yes(utilization) => utilization.score(), + MaySchedule::No => PageserverUtilization::full().score(), + }; + az_scores.push((node_id, node, score)); + } + + // Sort by utilization. Also include the node ID to break ties. + for scores in scores_by_az.values_mut() { + scores.sort_by_key(|i| (i.2, i.0)); + } + + let mut median_by_az = scores_by_az + .iter() + .map(|(az, nodes)| (*az, nodes.get(nodes.len() / 2).unwrap().2)) + .collect::>(); + // Sort by utilization. Also include the AZ to break ties. + median_by_az.sort_by_key(|i| (i.1, i.0)); + + // Return the AZ with the lowest median utilization + Some(median_by_az.first().unwrap().0.clone()) } /// Unit test access to internal state @@ -1091,4 +1131,53 @@ mod tests { intent.clear(&mut scheduler); } } + + #[test] + fn az_scheduling_for_new_tenant() { + let az_a_tag = AvailabilityZone("az-a".to_string()); + let az_b_tag = AvailabilityZone("az-b".to_string()); + let nodes = test_utils::make_test_nodes( + 6, + &[ + az_a_tag.clone(), + az_a_tag.clone(), + az_a_tag.clone(), + az_b_tag.clone(), + az_b_tag.clone(), + az_b_tag.clone(), + ], + ); + + let mut scheduler = Scheduler::new(nodes.values()); + + /// Force the utilization of a node in Scheduler's state to a particular + /// number of bytes used. + fn set_utilization(scheduler: &mut Scheduler, node_id: NodeId, shard_count: u32) { + let mut node = Node::new( + node_id, + "".to_string(), + 0, + "".to_string(), + 0, + scheduler.nodes.get(&node_id).unwrap().az.clone(), + ); + node.set_availability(NodeAvailability::Active(test_utilization::simple( + shard_count, + 0, + ))); + scheduler.node_upsert(&node); + } + + // Initial empty state. Scores are tied, scheduler prefers lower AZ ID. + assert_eq!(scheduler.get_az_for_new_tenant(), Some(az_a_tag.clone())); + + // Put some utilization on one node in AZ A: this should change nothing, as the median hasn't changed + set_utilization(&mut scheduler, NodeId(1), 1000000); + assert_eq!(scheduler.get_az_for_new_tenant(), Some(az_a_tag.clone())); + + // Put some utilization on a second node in AZ A: now the median has changed, so the scheduler + // should prefer the other AZ. + set_utilization(&mut scheduler, NodeId(2), 1000000); + assert_eq!(scheduler.get_az_for_new_tenant(), Some(az_b_tag.clone())); + } } diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index b3b9bc15dafc..51370e0144fd 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -29,7 +29,7 @@ use crate::{ ShardGenerationState, TenantFilter, }, reconciler::{ReconcileError, ReconcileUnits, ReconcilerConfig, ReconcilerConfigBuilder}, - scheduler::{AttachedShardTag, MaySchedule, ScheduleContext, ScheduleError, ScheduleMode}, + scheduler::{MaySchedule, ScheduleContext, ScheduleError, ScheduleMode}, tenant_shard::{ MigrateAttachment, ObservedStateDelta, ReconcileNeeded, ReconcilerStatus, ScheduleOptimization, ScheduleOptimizationAction, @@ -2107,18 +2107,13 @@ impl Service { ) }; - let preferred_az_id: Option = { - let mut locked = self.inner.write().unwrap(); - + let preferred_az_id = { + let locked = self.inner.read().unwrap(); // Idempotency: take the existing value if the tenant already exists if let Some(shard) = locked.tenants.get(create_ids.first().unwrap()) { shard.preferred_az().cloned() } else { - locked - .scheduler - .schedule_shard::(&[], &None, &ScheduleContext::default()) - .ok() - .and_then(|n_id| locked.scheduler.get_node_az(&n_id)) + locked.scheduler.get_az_for_new_tenant() } }; From 873c4e5e3ae878f825d8487abcb2bf018371797d Mon Sep 17 00:00:00 2001 From: John Spray Date: Thu, 12 Dec 2024 18:23:02 +0000 Subject: [PATCH 4/4] remove unused function --- libs/pageserver_api/src/models/utilization.rs | 7 ------- 1 file changed, 7 deletions(-) diff --git a/libs/pageserver_api/src/models/utilization.rs b/libs/pageserver_api/src/models/utilization.rs index 62eb0aeb0fcd..641aa51989ed 100644 --- a/libs/pageserver_api/src/models/utilization.rs +++ b/libs/pageserver_api/src/models/utilization.rs @@ -104,13 +104,6 @@ impl PageserverUtilization { score >= 2 * Self::UTILIZATION_FULL } - /// The inverse of [`Self::score`]. Higher values are more affine to scheduling more work on this node. - /// - /// This uses the same threshold as [`Self::is_overloaded`] as a definition of "full". - pub fn free(&self) -> RawScore { - self.free_space_bytes - } - pub fn adjust_shard_count_max(&mut self, shard_count: u32) { if self.shard_count < shard_count { self.shard_count = shard_count;