Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storcon: track number of attached shards for each node #8011

Merged
merged 1 commit into from
Jun 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 75 additions & 26 deletions storage_controller/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ pub enum MaySchedule {
struct SchedulerNode {
/// How many shards are currently scheduled on this node, via their [`crate::tenant_shard::IntentState`].
shard_count: usize,
/// How many shards are currently attached on this node, via their [`crate::tenant_shard::IntentState`].
attached_shard_count: usize,

/// Whether this node is currently elegible to have new shards scheduled (this is derived
/// from a node's availability state and scheduling policy).
Expand All @@ -42,7 +44,9 @@ impl PartialEq for SchedulerNode {
(MaySchedule::Yes(_), MaySchedule::Yes(_)) | (MaySchedule::No, MaySchedule::No)
);

may_schedule_matches && self.shard_count == other.shard_count
may_schedule_matches
&& self.shard_count == other.shard_count
&& self.attached_shard_count == other.attached_shard_count
}
}

Expand Down Expand Up @@ -138,6 +142,15 @@ impl ScheduleContext {
}
}

pub(crate) enum RefCountUpdate {
PromoteSecondary,
Attach,
Detach,
DemoteAttached,
AddSecondary,
RemoveSecondary,
}

impl Scheduler {
pub(crate) fn new<'a>(nodes: impl Iterator<Item = &'a Node>) -> Self {
let mut scheduler_nodes = HashMap::new();
Expand All @@ -146,6 +159,7 @@ impl Scheduler {
node.get_id(),
SchedulerNode {
shard_count: 0,
attached_shard_count: 0,
may_schedule: node.may_schedule(),
},
);
Expand All @@ -171,6 +185,7 @@ impl Scheduler {
node.get_id(),
SchedulerNode {
shard_count: 0,
attached_shard_count: 0,
may_schedule: node.may_schedule(),
},
);
Expand All @@ -179,7 +194,10 @@ impl Scheduler {
for shard in shards {
if let Some(node_id) = shard.intent.get_attached() {
match expect_nodes.get_mut(node_id) {
Some(node) => node.shard_count += 1,
Some(node) => {
node.shard_count += 1;
node.attached_shard_count += 1;
}
None => anyhow::bail!(
"Tenant {} references nonexistent node {}",
shard.tenant_shard_id,
Expand Down Expand Up @@ -227,31 +245,42 @@ impl Scheduler {
Ok(())
}

/// Increment the reference count of a node. This reference count is used to guide scheduling
/// decisions, not for memory management: it represents one tenant shard whose IntentState targets
/// this node.
/// Update the reference counts of a node. These reference counts are used to guide scheduling
/// decisions, not for memory management: they represent the number of tenant shard whose IntentState
/// targets this node and the number of tenants shars whose IntentState is attached to this
/// node.
///
/// It is an error to call this for a node that is not known to the scheduler (i.e. passed into
/// [`Self::new`] or [`Self::node_upsert`])
pub(crate) fn node_inc_ref(&mut self, node_id: NodeId) {
let Some(node) = self.nodes.get_mut(&node_id) else {
tracing::error!("Scheduler missing node {node_id}");
debug_assert!(false);
return;
};

node.shard_count += 1;
}

/// Decrement a node's reference count. Inverse of [`Self::node_inc_ref`].
pub(crate) fn node_dec_ref(&mut self, node_id: NodeId) {
pub(crate) fn update_node_ref_counts(&mut self, node_id: NodeId, update: RefCountUpdate) {
let Some(node) = self.nodes.get_mut(&node_id) else {
debug_assert!(false);
tracing::error!("Scheduler missing node {node_id}");
return;
};

node.shard_count -= 1;
match update {
RefCountUpdate::PromoteSecondary => {
node.attached_shard_count += 1;
}
RefCountUpdate::Attach => {
node.shard_count += 1;
node.attached_shard_count += 1;
}
RefCountUpdate::Detach => {
node.shard_count -= 1;
node.attached_shard_count -= 1;
}
RefCountUpdate::DemoteAttached => {
node.attached_shard_count -= 1;
}
RefCountUpdate::AddSecondary => {
node.shard_count += 1;
}
RefCountUpdate::RemoveSecondary => {
node.shard_count -= 1;
}
}
}

pub(crate) fn node_upsert(&mut self, node: &Node) {
Expand All @@ -263,6 +292,7 @@ impl Scheduler {
Vacant(entry) => {
entry.insert(SchedulerNode {
shard_count: 0,
attached_shard_count: 0,
may_schedule: node.may_schedule(),
});
}
Expand Down Expand Up @@ -385,6 +415,11 @@ impl Scheduler {
pub(crate) fn get_node_shard_count(&self, node_id: NodeId) -> usize {
self.nodes.get(&node_id).unwrap().shard_count
}

#[cfg(test)]
pub(crate) fn get_node_attached_shard_count(&self, node_id: NodeId) -> usize {
self.nodes.get(&node_id).unwrap().attached_shard_count
}
}

#[cfg(test)]
Expand Down Expand Up @@ -437,18 +472,28 @@ mod tests {
let scheduled = scheduler.schedule_shard(&[], &context)?;
t2_intent.set_attached(&mut scheduler, Some(scheduled));

assert_eq!(scheduler.nodes.get(&NodeId(1)).unwrap().shard_count, 1);
assert_eq!(scheduler.nodes.get(&NodeId(2)).unwrap().shard_count, 1);
assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 1);
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(1)), 1);

assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 1);
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 1);

let scheduled = scheduler.schedule_shard(&t1_intent.all_pageservers(), &context)?;
t1_intent.push_secondary(&mut scheduler, scheduled);

assert_eq!(scheduler.nodes.get(&NodeId(1)).unwrap().shard_count, 1);
assert_eq!(scheduler.nodes.get(&NodeId(2)).unwrap().shard_count, 2);
assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 1);
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(1)), 1);

assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 2);
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 1);

t1_intent.clear(&mut scheduler);
assert_eq!(scheduler.nodes.get(&NodeId(1)).unwrap().shard_count, 0);
assert_eq!(scheduler.nodes.get(&NodeId(2)).unwrap().shard_count, 1);
assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 0);
assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 1);

let total_attached = scheduler.get_node_attached_shard_count(NodeId(1))
+ scheduler.get_node_attached_shard_count(NodeId(2));
assert_eq!(total_attached, 1);

if cfg!(debug_assertions) {
// Dropping an IntentState without clearing it causes a panic in debug mode,
Expand All @@ -459,8 +504,12 @@ mod tests {
assert!(result.is_err());
} else {
t2_intent.clear(&mut scheduler);
assert_eq!(scheduler.nodes.get(&NodeId(1)).unwrap().shard_count, 0);
assert_eq!(scheduler.nodes.get(&NodeId(2)).unwrap().shard_count, 0);

assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 0);
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(1)), 0);

assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 0);
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 0);
}

Ok(())
Expand Down
2 changes: 1 addition & 1 deletion storage_controller/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4312,7 +4312,7 @@ impl Service {
continue;
}

if tenant_shard.intent.demote_attached(node_id) {
if tenant_shard.intent.demote_attached(scheduler, node_id) {
tenant_shard.sequence = tenant_shard.sequence.next();

// TODO: populate a ScheduleContext including all shards in the same tenant_id (only matters
Expand Down
56 changes: 38 additions & 18 deletions storage_controller/src/tenant_shard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::{
metrics::{self, ReconcileCompleteLabelGroup, ReconcileOutcome},
persistence::TenantShardPersistence,
reconciler::ReconcileUnits,
scheduler::{AffinityScore, MaySchedule, ScheduleContext},
scheduler::{AffinityScore, MaySchedule, RefCountUpdate, ScheduleContext},
};
use pageserver_api::controller_api::{PlacementPolicy, ShardSchedulingPolicy};
use pageserver_api::{
Expand Down Expand Up @@ -153,7 +153,7 @@ impl IntentState {
}
pub(crate) fn single(scheduler: &mut Scheduler, node_id: Option<NodeId>) -> Self {
if let Some(node_id) = node_id {
scheduler.node_inc_ref(node_id);
scheduler.update_node_ref_counts(node_id, RefCountUpdate::Attach);
}
Self {
attached: node_id,
Expand All @@ -164,10 +164,10 @@ impl IntentState {
pub(crate) fn set_attached(&mut self, scheduler: &mut Scheduler, new_attached: Option<NodeId>) {
if self.attached != new_attached {
if let Some(old_attached) = self.attached.take() {
scheduler.node_dec_ref(old_attached);
scheduler.update_node_ref_counts(old_attached, RefCountUpdate::Detach);
}
if let Some(new_attached) = &new_attached {
scheduler.node_inc_ref(*new_attached);
scheduler.update_node_ref_counts(*new_attached, RefCountUpdate::Attach);
}
self.attached = new_attached;
}
Expand All @@ -177,50 +177,55 @@ impl IntentState {
/// secondary to attached while maintaining the scheduler's reference counts.
pub(crate) fn promote_attached(
&mut self,
_scheduler: &mut Scheduler,
scheduler: &mut Scheduler,
promote_secondary: NodeId,
) {
// If we call this with a node that isn't in secondary, it would cause incorrect
// scheduler reference counting, since we assume the node is already referenced as a secondary.
debug_assert!(self.secondary.contains(&promote_secondary));

// TODO: when scheduler starts tracking attached + secondary counts separately, we will
// need to call into it here.
self.secondary.retain(|n| n != &promote_secondary);

let demoted = self.attached;
self.attached = Some(promote_secondary);

scheduler.update_node_ref_counts(promote_secondary, RefCountUpdate::PromoteSecondary);
if let Some(demoted) = demoted {
scheduler.update_node_ref_counts(demoted, RefCountUpdate::DemoteAttached);
}
}

pub(crate) fn push_secondary(&mut self, scheduler: &mut Scheduler, new_secondary: NodeId) {
debug_assert!(!self.secondary.contains(&new_secondary));
scheduler.node_inc_ref(new_secondary);
scheduler.update_node_ref_counts(new_secondary, RefCountUpdate::AddSecondary);
self.secondary.push(new_secondary);
}

/// It is legal to call this with a node that is not currently a secondary: that is a no-op
pub(crate) fn remove_secondary(&mut self, scheduler: &mut Scheduler, node_id: NodeId) {
let index = self.secondary.iter().position(|n| *n == node_id);
if let Some(index) = index {
scheduler.node_dec_ref(node_id);
scheduler.update_node_ref_counts(node_id, RefCountUpdate::RemoveSecondary);
self.secondary.remove(index);
}
}

pub(crate) fn clear_secondary(&mut self, scheduler: &mut Scheduler) {
for secondary in self.secondary.drain(..) {
scheduler.node_dec_ref(secondary);
scheduler.update_node_ref_counts(secondary, RefCountUpdate::RemoveSecondary);
}
}

/// Remove the last secondary node from the list of secondaries
pub(crate) fn pop_secondary(&mut self, scheduler: &mut Scheduler) {
if let Some(node_id) = self.secondary.pop() {
scheduler.node_dec_ref(node_id);
scheduler.update_node_ref_counts(node_id, RefCountUpdate::RemoveSecondary);
}
}

pub(crate) fn clear(&mut self, scheduler: &mut Scheduler) {
if let Some(old_attached) = self.attached.take() {
scheduler.node_dec_ref(old_attached);
scheduler.update_node_ref_counts(old_attached, RefCountUpdate::Detach);
}

self.clear_secondary(scheduler);
Expand Down Expand Up @@ -251,12 +256,11 @@ impl IntentState {
/// forget the location on the offline node.
///
/// Returns true if a change was made
pub(crate) fn demote_attached(&mut self, node_id: NodeId) -> bool {
pub(crate) fn demote_attached(&mut self, scheduler: &mut Scheduler, node_id: NodeId) -> bool {
if self.attached == Some(node_id) {
// TODO: when scheduler starts tracking attached + secondary counts separately, we will
// need to call into it here.
self.attached = None;
self.secondary.push(node_id);
scheduler.update_node_ref_counts(node_id, RefCountUpdate::DemoteAttached);
true
} else {
false
Expand Down Expand Up @@ -593,7 +597,7 @@ impl TenantShard {
Secondary => {
if let Some(node_id) = self.intent.get_attached() {
// Populate secondary by demoting the attached node
self.intent.demote_attached(*node_id);
self.intent.demote_attached(scheduler, *node_id);
modified = true;
} else if self.intent.secondary.is_empty() {
// Populate secondary by scheduling a fresh node
Expand Down Expand Up @@ -783,7 +787,7 @@ impl TenantShard {
old_attached_node_id,
new_attached_node_id,
}) => {
self.intent.demote_attached(old_attached_node_id);
self.intent.demote_attached(scheduler, old_attached_node_id);
self.intent
.promote_attached(scheduler, new_attached_node_id);
}
Expand Down Expand Up @@ -1321,7 +1325,9 @@ pub(crate) mod tests {
assert_ne!(attached_node_id, secondary_node_id);

// Notifying the attached node is offline should demote it to a secondary
let changed = tenant_shard.intent.demote_attached(attached_node_id);
let changed = tenant_shard
.intent
.demote_attached(&mut scheduler, attached_node_id);
assert!(changed);
assert!(tenant_shard.intent.attached.is_none());
assert_eq!(tenant_shard.intent.secondary.len(), 2);
Expand Down Expand Up @@ -1604,7 +1610,14 @@ pub(crate) mod tests {

// We should see equal number of locations on the two nodes.
assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 4);
// Scheduling does not consider the number of attachments picking the initial
// pageserver to attach to (hence the assertion that all primaries are on the
// same node)
// TODO: Tweak the scheduling to evenly distribute attachments for new shards.
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(1)), 4);

assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 4);
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 0);

// Add another two nodes: we should see the shards spread out when their optimize
// methods are called
Expand All @@ -1613,9 +1626,16 @@ pub(crate) mod tests {
optimize_til_idle(&nodes, &mut scheduler, &mut shards);

assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 2);
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(1)), 1);

assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 2);
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 1);

assert_eq!(scheduler.get_node_shard_count(NodeId(3)), 2);
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(3)), 1);

assert_eq!(scheduler.get_node_shard_count(NodeId(4)), 2);
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(4)), 1);

for shard in shards.iter_mut() {
shard.intent.clear(&mut scheduler);
Expand Down
Loading