Skip to content

Commit

Permalink
storcon_cli: remove old tenant-scatter command
Browse files Browse the repository at this point in the history
  • Loading branch information
jcsp committed Jun 21, 2024
1 parent 0139962 commit 5120eb4
Showing 1 changed file with 2 additions and 90 deletions.
92 changes: 2 additions & 90 deletions control_plane/storcon_cli/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use futures::StreamExt;
use std::{collections::HashMap, str::FromStr, time::Duration};
use std::{str::FromStr, time::Duration};

use clap::{Parser, Subcommand};
use pageserver_api::{
Expand All @@ -21,7 +21,7 @@ use utils::id::{NodeId, TenantId};

use pageserver_api::controller_api::{
NodeConfigureRequest, NodeRegisterRequest, NodeSchedulingPolicy, PlacementPolicy,
TenantLocateResponse, TenantShardMigrateRequest, TenantShardMigrateResponse,
TenantShardMigrateRequest, TenantShardMigrateResponse,
};

#[derive(Subcommand, Debug)]
Expand Down Expand Up @@ -110,12 +110,6 @@ enum Command {
#[arg(long)]
config: String,
},
/// Attempt to balance the locations for a tenant across pageservers. This is a client-side
/// alternative to the storage controller's scheduling optimization behavior.
TenantScatter {
#[arg(long)]
tenant_id: TenantId,
},
/// Print details about a particular tenant, including all its shards' states.
TenantDescribe {
#[arg(long)]
Expand Down Expand Up @@ -498,88 +492,6 @@ async fn main() -> anyhow::Result<()> {
})
.await?;
}
Command::TenantScatter { tenant_id } => {
// Find the shards
let locate_response = storcon_client
.dispatch::<(), TenantLocateResponse>(
Method::GET,
format!("control/v1/tenant/{tenant_id}/locate"),
None,
)
.await?;
let shards = locate_response.shards;

let mut node_to_shards: HashMap<NodeId, Vec<TenantShardId>> = HashMap::new();
let shard_count = shards.len();
for s in shards {
let entry = node_to_shards.entry(s.node_id).or_default();
entry.push(s.shard_id);
}

// Load list of available nodes
let nodes_resp = storcon_client
.dispatch::<(), Vec<NodeDescribeResponse>>(
Method::GET,
"control/v1/node".to_string(),
None,
)
.await?;

for node in nodes_resp {
if matches!(node.availability, NodeAvailabilityWrapper::Active) {
node_to_shards.entry(node.id).or_default();
}
}

let max_shard_per_node = shard_count / node_to_shards.len();

loop {
let mut migrate_shard = None;
for shards in node_to_shards.values_mut() {
if shards.len() > max_shard_per_node {
// Pick the emptiest
migrate_shard = Some(shards.pop().unwrap());
}
}
let Some(migrate_shard) = migrate_shard else {
break;
};

// Pick the emptiest node to migrate to
let mut destinations = node_to_shards
.iter()
.map(|(k, v)| (k, v.len()))
.collect::<Vec<_>>();
destinations.sort_by_key(|i| i.1);
let (destination_node, destination_count) = *destinations.first().unwrap();
if destination_count + 1 > max_shard_per_node {
// Even the emptiest destination doesn't have space: we're done
break;
}
let destination_node = *destination_node;

node_to_shards
.get_mut(&destination_node)
.unwrap()
.push(migrate_shard);

println!("Migrate {} -> {} ...", migrate_shard, destination_node);

storcon_client
.dispatch::<TenantShardMigrateRequest, TenantShardMigrateResponse>(
Method::PUT,
format!("control/v1/tenant/{migrate_shard}/migrate"),
Some(TenantShardMigrateRequest {
tenant_shard_id: migrate_shard,
node_id: destination_node,
}),
)
.await?;
println!("Migrate {} -> {} OK", migrate_shard, destination_node);
}

// Spread the shards across the nodes
}
Command::TenantDescribe { tenant_id } => {
let describe_response = storcon_client
.dispatch::<(), TenantDescribeResponse>(
Expand Down

0 comments on commit 5120eb4

Please sign in to comment.