diff --git a/agent/Cargo.toml b/agent/Cargo.toml index 1666e20c7..16efa261b 100644 --- a/agent/Cargo.toml +++ b/agent/Cargo.toml @@ -36,7 +36,7 @@ serde_derive = "1.0.104" serde_json = "1.0.45" serde_yaml = { version = "0.8.11", optional = true } thiserror = "1.0.50" -tokio = { version = "1.0", features = ["rt-multi-thread", "time", "fs", "macros", "net", "signal"] } +tokio = { version = "1.0", features = ["rt-multi-thread", "time", "fs", "macros", "net"] } tokio-stream = { version = "0.1", features = ["net", "sync"] } tonic = "0.10" tower = "0.4.8" diff --git a/agent/src/discovery_handler_manager/discovery_handler_registry.rs b/agent/src/discovery_handler_manager/discovery_handler_registry.rs index f12a24c06..d81c2bb3d 100644 --- a/agent/src/discovery_handler_manager/discovery_handler_registry.rs +++ b/agent/src/discovery_handler_manager/discovery_handler_registry.rs @@ -403,14 +403,10 @@ impl DiscoveryHandlerRegistry for DHRegistryImpl { let notifier_receiver = self.endpoint_notifier.subscribe(); let local_req = self.requests.clone(); tokio::spawn(async move { - let mut signal = - tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) - .unwrap(); select! { _ = dh_req_ref .watch_devices(notifier_receiver) => {}, _ = terminated.notified() => {}, - _ = signal.recv() => {}, } local_req.write().await.remove(&local_key); }); diff --git a/agent/src/discovery_handler_manager/embedded_handler.rs b/agent/src/discovery_handler_manager/embedded_handler.rs index c82344320..6d0320892 100644 --- a/agent/src/discovery_handler_manager/embedded_handler.rs +++ b/agent/src/discovery_handler_manager/embedded_handler.rs @@ -36,12 +36,9 @@ impl EmbeddedHandlerEndpoint { sender: watch::Sender>>, mut stream: ReceiverStream>, ) { - let mut signal = - tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()).unwrap(); loop { let msg = select! { _ = sender.closed() => return, - _ = signal.recv() => return, msg = stream.try_next() => match msg { Ok(Some(msg)) => msg, Ok(None) => { diff --git a/agent/src/discovery_handler_manager/registration_socket.rs b/agent/src/discovery_handler_manager/registration_socket.rs index ffdefae34..f1a401282 100644 --- a/agent/src/discovery_handler_manager/registration_socket.rs +++ b/agent/src/discovery_handler_manager/registration_socket.rs @@ -67,15 +67,12 @@ impl NetworkEndpoint { sender: watch::Sender>>, mut stream: Pin> + Send>>, ) { - let mut signal = - tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()).unwrap(); loop { let msg = select! { // This means all queries for this endpoint must end. _ = stopper.stopped() => return, // This means all receiver dropped (i.e no one cares about this query anymore) _ = sender.closed() => return, - _ = signal.recv() => return, msg = stream.try_next() => match msg { Ok(Some(msg)) => msg, Ok(None) => { @@ -216,8 +213,6 @@ pub async fn run_registration_server( } } }; - let mut signal = - tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()).unwrap(); tonic::transport::Server::builder() .add_service( akri_discovery_utils::discovery::v0::registration_server::RegistrationServer::new( @@ -227,7 +222,7 @@ pub async fn run_registration_server( }, ), ) - .serve_with_incoming_shutdown(incoming, signal.recv().map(|_| ())) + .serve_with_incoming(incoming) .await?; trace!( "internal_run_registration_server - gracefully shutdown ... deleting socket {}", diff --git a/agent/src/main.rs b/agent/src/main.rs index 3ebc6f9bd..75e1851d6 100644 --- a/agent/src/main.rs +++ b/agent/src/main.rs @@ -71,10 +71,11 @@ async fn main() -> Result<(), Box ), ); - let (instances_cache, task) = plugin_manager::device_plugin_instance_controller::start_dpm( - device_plugin_manager.clone(), - ); - tasks.push(task); + let (instances_cache, device_plugin_controller_task) = + plugin_manager::device_plugin_instance_controller::start_dpm( + device_plugin_manager.clone(), + ); + tasks.push(device_plugin_controller_task); tasks.push(tokio::spawn( plugin_manager::device_plugin_slot_reclaimer::start_reclaimer(device_plugin_manager), diff --git a/agent/src/plugin_manager/device_plugin_instance_controller.rs b/agent/src/plugin_manager/device_plugin_instance_controller.rs index 30368637f..9ec4c8fc5 100644 --- a/agent/src/plugin_manager/device_plugin_instance_controller.rs +++ b/agent/src/plugin_manager/device_plugin_instance_controller.rs @@ -4,12 +4,13 @@ use std::str::FromStr; use std::{collections::HashMap, sync::Arc, time::Duration}; use akri_shared::{akri::instance::Instance, k8s::api::IntoApi}; +use anyhow::Context; use async_trait::async_trait; use futures::StreamExt; use itertools::Itertools; use kube::api::{Patch, PatchParams}; use kube::core::{NotUsed, Object, ObjectMeta, TypeMeta}; -use kube::ResourceExt; +use kube::{Resource, ResourceExt}; use kube_runtime::controller::Action; use kube_runtime::reflector::Store; use kube_runtime::Controller; @@ -27,6 +28,8 @@ use super::device_plugin_runner::{ }; use super::v1beta1::{AllocateRequest, AllocateResponse, ListAndWatchResponse}; +pub const DP_SLOT_PREFIX: &str = "akri.sh/"; + #[derive(Error, Debug)] pub enum DevicePluginError { #[error("Slot already in use")] @@ -223,8 +226,8 @@ impl InstanceDevicePlugin { let patch = Patch::Apply( serde_json::to_value(Object { types: Some(TypeMeta { - api_version: "akri.sh/v0".to_owned(), - kind: "Instance".to_owned(), + api_version: Instance::api_version(&()).to_string(), + kind: Instance::kind(&()).to_string(), }), status: None::, spec: PartialInstanceSlotUsage { device_usage }, @@ -233,7 +236,7 @@ impl InstanceDevicePlugin { ..Default::default() }, }) - .unwrap(), + .context("Could not create instance patch")?, ); api.raw_patch( &self.instance_name, @@ -280,8 +283,8 @@ impl InstanceDevicePlugin { let patch = Patch::Apply( serde_json::to_value(Object { types: Some(TypeMeta { - api_version: "akri.sh/v0".to_owned(), - kind: "Instance".to_owned(), + api_version: Instance::api_version(&()).to_string(), + kind: Instance::kind(&()).to_string(), }), status: None::, spec: PartialInstanceSlotUsage { device_usage }, @@ -290,7 +293,7 @@ impl InstanceDevicePlugin { ..Default::default() }, }) - .unwrap(), + .context("Could not create instance patch")?, ); api.raw_patch( &self.instance_name, @@ -487,8 +490,6 @@ impl ConfigurationDevicePlugin { let instance_name = plugin.instance_name.clone(); let mut receiver = plugin.slots_status.lock().await.subscribe(); tokio::spawn(async move { - let mut signal = - tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()).unwrap(); loop { { let (has_free, used_config_slots) = { @@ -568,7 +569,6 @@ impl ConfigurationDevicePlugin { break; } }, - _ = signal.recv() => {break} } } slots_ref.write().await.send_modify(|slots| { @@ -779,7 +779,7 @@ impl DevicePluginManager { .enumerate() .filter_map(|(i, u)| match u { DeviceUsage::Node(n) if *n == self.node_name => { - Some(format!("akri.sh/{}-{}", instance, i)) + Some(format!("{}{}-{}", DP_SLOT_PREFIX, instance, i)) } DeviceUsage::Configuration { vdev, node } if *node == self.node_name => { Some(vdev.to_string()) @@ -798,12 +798,6 @@ pub fn start_dpm(dpm: Arc) -> (Store, JoinHandle< let store = controller.store(); let task = tokio::spawn(async { controller - .graceful_shutdown_on(async { - let mut signal = - tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) - .unwrap(); - signal.recv().await; - }) .run(reconcile, error_policy, dpm) .for_each(|_| futures::future::ready(())) .await diff --git a/agent/src/plugin_manager/device_plugin_slot_reclaimer.rs b/agent/src/plugin_manager/device_plugin_slot_reclaimer.rs index 6a0e04a0f..1c9d41e34 100644 --- a/agent/src/plugin_manager/device_plugin_slot_reclaimer.rs +++ b/agent/src/plugin_manager/device_plugin_slot_reclaimer.rs @@ -9,7 +9,9 @@ use tokio::net::UnixStream; use tonic::transport::{Endpoint, Uri}; use tower::service_fn; -use crate::plugin_manager::v1::ListPodResourcesRequest; +use crate::plugin_manager::{ + device_plugin_instance_controller::DP_SLOT_PREFIX, v1::ListPodResourcesRequest, +}; use super::{ device_plugin_instance_controller::DevicePluginManager, @@ -21,6 +23,10 @@ pub const KUBELET_SOCKET: &str = "/var/lib/kubelet/pod-resources/kubelet.sock"; const SLOT_GRACE_PERIOD: Duration = Duration::from_secs(20); const SLOT_RECLAIM_INTERVAL: Duration = Duration::from_secs(10); +/// This function connects to kubelet's resource monitoring interface and extracts +/// the set of resources currently used by pods on the node. +/// It uses this Kubelet interface: +/// https://kubernetes.io/docs/concepts/extend-kubernetes/compute-storage-net/device-plugins/#grpc-endpoint-list async fn get_used_slots() -> Result, anyhow::Error> { // We will ignore this dummy uri because UDS does not use it. // Some servers will check the uri content so the uri needs to @@ -51,7 +57,7 @@ async fn get_used_slots() -> Result, anyhow::Error> { .flat_map(|pr| { pr.containers.into_iter().flat_map(|cr| { cr.devices.into_iter().flat_map(|cd| { - if cd.resource_name.starts_with("akri.sh/") { + if cd.resource_name.starts_with(DP_SLOT_PREFIX) { cd.device_ids } else { vec![] @@ -66,8 +72,6 @@ async fn get_used_slots() -> Result, anyhow::Error> { pub async fn start_reclaimer(dp_manager: Arc) { let mut stalled_slots: HashMap = HashMap::new(); - let mut signal = - tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()).unwrap(); loop { trace!("reclaiming unused slots - start"); if let Ok(used_slots) = get_used_slots().await { @@ -102,7 +106,6 @@ pub async fn start_reclaimer(dp_manager: Arc) { } tokio::select! { _ = tokio::time::sleep(SLOT_RECLAIM_INTERVAL) => {}, - _ = signal.recv() => return, }; } } diff --git a/agent/src/util/discovery_configuration_controller.rs b/agent/src/util/discovery_configuration_controller.rs index 723d615dd..496a783fa 100644 --- a/agent/src/util/discovery_configuration_controller.rs +++ b/agent/src/util/discovery_configuration_controller.rs @@ -58,11 +58,6 @@ pub async fn start_controller( let controller = Controller::new(api, Default::default()); controller - .graceful_shutdown_on(async { - let mut signal = - tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()).unwrap(); - signal.recv().await; - }) // Reconcile the Configuration when the discovery handler manager signals a change .reconcile_on(tokio_stream::wrappers::ReceiverStream::new(rec)) .run(reconcile, error_policy, ctx) @@ -70,6 +65,17 @@ pub async fn start_controller( .await; } +/// This function is the main Reconcile function for Configurations resources +/// This will get called every time a Configuration gets added or is changed, it will also be called +/// for every existing configuration on startup. +/// We also set-up discovery manager to trigger reconciliation upon discovery state change +/// +/// Here the function will (in order): +/// - Check if Configuration awaits deletion, and if so terminate pending discovery, remove finalizer and return early +/// - Add finalizer if not here already +/// - Start discovery if not already started +/// - Get discovery results (empty list if just started) +/// - Create/Delete Instances according to discovery results pub async fn reconcile( dc: Arc, ctx: Arc, @@ -515,7 +521,7 @@ mod tests { namespace: Some("namespace-a".to_string()), name: Some("instance-1".to_string()), owner_references: Some(vec![OwnerReference { - api_version: "akri.sh/v0".to_string(), + api_version: Instance::api_version(&()).to_string(), block_owner_deletion: None, controller: Some(true), kind: "Configuration".to_string(), @@ -539,7 +545,7 @@ mod tests { namespace: Some("namespace-a".to_string()), name: Some("instance-2".to_string()), owner_references: Some(vec![OwnerReference { - api_version: "akri.sh/v0".to_string(), + api_version: Instance::api_version(&()).to_string(), block_owner_deletion: None, controller: Some(true), kind: "Configuration".to_string(), @@ -563,7 +569,7 @@ mod tests { namespace: Some("namespace-a".to_string()), name: Some("instance-3".to_string()), owner_references: Some(vec![OwnerReference { - api_version: "akri.sh/v0".to_string(), + api_version: Instance::api_version(&()).to_string(), block_owner_deletion: None, controller: Some(true), kind: "Configuration".to_string(), diff --git a/agent/src/util/stopper.rs b/agent/src/util/stopper.rs index 0649cb6a5..386faa142 100644 --- a/agent/src/util/stopper.rs +++ b/agent/src/util/stopper.rs @@ -1,7 +1,7 @@ use std::sync::Arc; use futures::stream::{AbortHandle, Abortable}; -use tokio::{signal::unix::SignalKind, sync::watch}; +use tokio::sync::watch; #[derive(Clone)] pub struct Stopper { @@ -16,10 +16,8 @@ impl Stopper { }; let local_s = s.clone(); tokio::spawn(async move { - let mut signal = tokio::signal::unix::signal(SignalKind::terminate()).unwrap(); tokio::select! { _ = local_s.stopped() => {}, - _ = signal.recv() => local_s.stop() } }); s