Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ease the bound for reflector to only request identifying metadata #1393

Merged
merged 6 commits into from
Mar 5, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions kube-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ pub use request::Request;

mod resource;
pub use resource::{
ClusterResourceScope, DynamicResourceScope, NamespaceResourceScope, Resource, ResourceExt, ResourceScope,
SubResourceScope,
api_version_from_group_version, ClusterResourceScope, DynamicResourceScope, NamespaceResourceScope,
Resource, ResourceExt, ResourceScope, SubResourceScope,
};

pub mod response;
Expand Down
21 changes: 13 additions & 8 deletions kube-core/src/resource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,7 @@ pub trait Resource {
fn version(dt: &Self::DynamicType) -> Cow<'_, str>;
/// Returns apiVersion of this object
fn api_version(dt: &Self::DynamicType) -> Cow<'_, str> {
let group = Self::group(dt);
if group.is_empty() {
return Self::version(dt);
}
let mut group = group.into_owned();
group.push('/');
group.push_str(&Self::version(dt));
group.into()
api_version_from_group_version(Self::group(dt), Self::version(dt))
}
/// Returns the plural name of the kind
///
Expand Down Expand Up @@ -115,6 +108,18 @@ pub trait Resource {
}
}

/// Helper function that creates the `apiVersion` field from the group and version strings.
pub fn api_version_from_group_version<'a>(group: Cow<'a, str>, version: Cow<'a, str>) -> Cow<'a, str> {
if group.is_empty() {
return version;
}

let mut output = group;
output.to_mut().push('/');
output.to_mut().push_str(&version);
output
}

/// Implement accessor trait for any ObjectMeta-using Kubernetes Resource
impl<K, S> Resource for K
where
Expand Down
5 changes: 2 additions & 3 deletions kube-runtime/src/reflector/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,9 @@
mod object_ref;
pub mod store;

pub use self::object_ref::{Extra as ObjectRefExtra, ObjectRef};
pub use self::object_ref::{Extra as ObjectRefExtra, ObjectRef, Lookup};
use crate::watcher;
use futures::{Stream, TryStreamExt};
use kube_client::Resource;
use std::hash::Hash;
pub use store::{store, Store};

Expand Down Expand Up @@ -91,7 +90,7 @@ pub use store::{store, Store};
/// Additionally, only `labels`, `annotations` and `managed_fields` are safe to drop from `ObjectMeta`.
pub fn reflector<K, W>(mut writer: store::Writer<K>, stream: W) -> impl Stream<Item = W::Item>
where
K: Resource + Clone,
K: Lookup + Clone,
K::DynamicType: Eq + Hash + Clone,
W: Stream<Item = watcher::Result<watcher::Event<K>>>,
{
Expand Down
135 changes: 107 additions & 28 deletions kube-runtime/src/reflector/object_ref.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,102 @@
use k8s_openapi::{api::core::v1::ObjectReference, apimachinery::pkg::apis::meta::v1::OwnerReference};
use kube_client::{
api::{DynamicObject, Resource},
core::ObjectMeta,
ResourceExt,
core::{api_version_from_group_version, ObjectMeta},

Check warning on line 5 in kube-runtime/src/reflector/object_ref.rs

View workflow job for this annotation

GitHub Actions / msrv

unused import: `ObjectMeta`
};
use std::{
borrow::Cow,
fmt::{Debug, Display},
hash::Hash,
};

/// Minimal lookup behaviour needed by a [reflector store](super::Store).
///
/// This trait is blanket-implemented for all [`Resource`] objects.
#[allow(clippy::module_name_repetitions)]
SOF3 marked this conversation as resolved.
Show resolved Hide resolved
pub trait Lookup {
/// Type information for types that do not know their resource information at compile time.
/// This is equivalent to [`Resource::DynamicType`].
type DynamicType;

/// The [kind](Resource::kind) for this object.
fn kind(dyntype: &Self::DynamicType) -> Cow<'_, str>;

/// The [group](Resource::group) for this object.
fn group(dyntype: &Self::DynamicType) -> Cow<'_, str>;

/// The [version](Resource::version) for this object.
fn version(dyntype: &Self::DynamicType) -> Cow<'_, str>;

/// The [apiVersion](Resource::_version) for this object.
fn api_version(dyntype: &Self::DynamicType) -> Cow<'_, str> {
api_version_from_group_version(Self::group(dyntype), Self::version(dyntype))
}

/// The [plural](Resource::plural) for this object.
fn plural(dyntype: &Self::DynamicType) -> Cow<'_, str>;

/// The [name](ObjectMeta#structfield.name) of the object.
fn name(&self) -> Option<Cow<'_, str>>;

/// The [namespace](ObjectMeta#structfield.namespace) of the object.
fn namespace(&self) -> Option<Cow<'_, str>>;

/// The [resource version](ObjectMeta#structfield.resource_version) of the object.
fn resource_version(&self) -> Option<Cow<'_, str>>;

/// The [UID](ObjectMeta#structfield.uid) of the object.
fn uid(&self) -> Option<Cow<'_, str>>;

/// Constructs an [`ObjectRef`] for this object.
fn to_object_ref(&self, dyntype: Self::DynamicType) -> ObjectRef<Self> {
ObjectRef {
dyntype,
name: self.name().expect(".metadata.name missing").into_owned(),
namespace: self.namespace().map(Cow::into_owned),
extra: Extra {
resource_version: self.resource_version().map(Cow::into_owned),
uid: self.uid().map(Cow::into_owned),
},
}
}
}

impl<K: Resource> Lookup for K {
type DynamicType = K::DynamicType;

fn kind(dyntype: &Self::DynamicType) -> Cow<'_, str> {
K::kind(dyntype)
}

fn version(dyntype: &Self::DynamicType) -> Cow<'_, str> {
K::version(dyntype)
}

fn group(dyntype: &Self::DynamicType) -> Cow<'_, str> {
K::group(dyntype)
}

fn plural(dyntype: &Self::DynamicType) -> Cow<'_, str> {
K::plural(dyntype)
}

fn name(&self) -> Option<Cow<'_, str>> {
self.meta().name.as_deref().map(Cow::Borrowed)
}

fn namespace(&self) -> Option<Cow<'_, str>> {
self.meta().namespace.as_deref().map(Cow::Borrowed)
}

fn resource_version(&self) -> Option<Cow<'_, str>> {
self.meta().resource_version.as_deref().map(Cow::Borrowed)
}

fn uid(&self) -> Option<Cow<'_, str>> {
self.meta().uid.as_deref().map(Cow::Borrowed)
}
}

#[derive(Derivative)]
#[derivative(
Debug(bound = "K::DynamicType: Debug"),
Expand All @@ -33,7 +121,7 @@
/// );
/// ```
#[non_exhaustive]
pub struct ObjectRef<K: Resource> {
pub struct ObjectRef<K: Lookup + ?Sized> {
pub dyntype: K::DynamicType,
/// The name of the object
pub name: String,
Expand Down Expand Up @@ -69,7 +157,7 @@
pub uid: Option<String>,
}

impl<K: Resource> ObjectRef<K>
impl<K: Lookup> ObjectRef<K>
where
K::DynamicType: Default,
{
Expand All @@ -81,13 +169,13 @@
#[must_use]
pub fn from_obj(obj: &K) -> Self
where
K: Resource,
K: Lookup,
{
Self::from_obj_with(obj, Default::default())
obj.to_object_ref(Default::default())
}
}

impl<K: Resource> ObjectRef<K> {
impl<K: Lookup> ObjectRef<K> {
#[must_use]
pub fn new_with(name: &str, dyntype: K::DynamicType) -> Self {
Self {
Expand All @@ -108,15 +196,9 @@
#[must_use]
pub fn from_obj_with(obj: &K, dyntype: K::DynamicType) -> Self
where
K: Resource,
K: Lookup,
{
let meta = obj.meta();
Self {
dyntype,
name: obj.name_unchecked(),
namespace: meta.namespace.clone(),
extra: Extra::from_obj_meta(meta),
}
obj.to_object_ref(dyntype)

Check warning on line 201 in kube-runtime/src/reflector/object_ref.rs

View check run for this annotation

Codecov / codecov/patch

kube-runtime/src/reflector/object_ref.rs#L201

Added line #L201 was not covered by tests
}

/// Create an `ObjectRef` from an `OwnerReference`
Expand Down Expand Up @@ -148,7 +230,7 @@
/// Note that no checking is done on whether this conversion makes sense. For example, every `Service`
/// has a corresponding `Endpoints`, but it wouldn't make sense to convert a `Pod` into a `Deployment`.
#[must_use]
pub fn into_kind_unchecked<K2: Resource>(self, dt2: K2::DynamicType) -> ObjectRef<K2> {
pub fn into_kind_unchecked<K2: Lookup>(self, dt2: K2::DynamicType) -> ObjectRef<K2> {

Check warning on line 233 in kube-runtime/src/reflector/object_ref.rs

View check run for this annotation

Codecov / codecov/patch

kube-runtime/src/reflector/object_ref.rs#L233

Added line #L233 was not covered by tests
ObjectRef {
dyntype: dt2,
name: self.name,
Expand All @@ -159,15 +241,21 @@

pub fn erase(self) -> ObjectRef<DynamicObject> {
ObjectRef {
dyntype: kube_client::api::ApiResource::erase::<K>(&self.dyntype),
dyntype: kube_client::api::ApiResource {
group: K::group(&self.dyntype).to_string(),
version: K::version(&self.dyntype).to_string(),
api_version: K::api_version(&self.dyntype).to_string(),
kind: K::kind(&self.dyntype).to_string(),
plural: K::plural(&self.dyntype).to_string(),
},
name: self.name,
namespace: self.namespace,
extra: self.extra,
}
}
}

impl<K: Resource> From<ObjectRef<K>> for ObjectReference {
impl<K: Lookup> From<ObjectRef<K>> for ObjectReference {
fn from(val: ObjectRef<K>) -> Self {
let ObjectRef {
dyntype: dt,
Expand All @@ -190,7 +278,7 @@
}
}

impl<K: Resource> Display for ObjectRef<K> {
impl<K: Lookup> Display for ObjectRef<K> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
Expand All @@ -207,15 +295,6 @@
}
}

impl Extra {
fn from_obj_meta(obj_meta: &ObjectMeta) -> Self {
Self {
resource_version: obj_meta.resource_version.clone(),
uid: obj_meta.uid.clone(),
}
}
}

#[cfg(test)]
mod tests {
use std::{
Expand Down
26 changes: 10 additions & 16 deletions kube-runtime/src/reflector/store.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
use super::ObjectRef;
use super::{ObjectRef, Lookup};
use crate::{
utils::delayed_init::{self, DelayedInit},
watcher,
};
use ahash::AHashMap;
use derivative::Derivative;
use kube_client::Resource;
use parking_lot::RwLock;
use std::{fmt::Debug, hash::Hash, sync::Arc};
use thiserror::Error;
Expand All @@ -17,7 +16,7 @@ type Cache<K> = Arc<RwLock<AHashMap<ObjectRef<K>, Arc<K>>>>;
/// This is exclusive since it's not safe to share a single `Store` between multiple reflectors.
/// In particular, `Restarted` events will clobber the state of other connected reflectors.
#[derive(Debug)]
pub struct Writer<K: 'static + Resource>
pub struct Writer<K: 'static + Lookup>
where
K::DynamicType: Eq + Hash,
{
Expand All @@ -27,7 +26,7 @@ where
ready_rx: Arc<DelayedInit<()>>,
}

impl<K: 'static + Resource + Clone> Writer<K>
impl<K: 'static + Lookup + Clone> Writer<K>
where
K::DynamicType: Eq + Hash + Clone,
{
Expand Down Expand Up @@ -61,23 +60,18 @@ where
pub fn apply_watcher_event(&mut self, event: &watcher::Event<K>) {
match event {
watcher::Event::Applied(obj) => {
let key = ObjectRef::from_obj_with(obj, self.dyntype.clone());
let key = obj.to_object_ref(self.dyntype.clone());
let obj = Arc::new(obj.clone());
self.store.write().insert(key, obj);
}
watcher::Event::Deleted(obj) => {
let key = ObjectRef::from_obj_with(obj, self.dyntype.clone());
let key = obj.to_object_ref(self.dyntype.clone());
self.store.write().remove(&key);
}
watcher::Event::Restarted(new_objs) => {
let new_objs = new_objs
.iter()
.map(|obj| {
(
ObjectRef::from_obj_with(obj, self.dyntype.clone()),
Arc::new(obj.clone()),
)
})
.map(|obj| (obj.to_object_ref(self.dyntype.clone()), Arc::new(obj.clone())))
.collect::<AHashMap<_, _>>();
*self.store.write() = new_objs;
}
Expand All @@ -91,7 +85,7 @@ where
}
impl<K> Default for Writer<K>
where
K: Resource + Clone + 'static,
K: Lookup + Clone + 'static,
K::DynamicType: Default + Eq + Hash + Clone,
{
fn default() -> Self {
Expand All @@ -107,7 +101,7 @@ where
/// use `Writer::as_reader()` instead.
#[derive(Derivative)]
#[derivative(Debug(bound = "K: Debug, K::DynamicType: Debug"), Clone)]
pub struct Store<K: 'static + Resource>
pub struct Store<K: 'static + Lookup>
where
K::DynamicType: Hash + Eq,
{
Expand All @@ -119,7 +113,7 @@ where
#[error("writer was dropped before store became ready")]
pub struct WriterDropped(delayed_init::InitDropped);

impl<K: 'static + Clone + Resource> Store<K>
impl<K: 'static + Clone + Lookup> Store<K>
where
K::DynamicType: Eq + Hash + Clone,
{
Expand Down Expand Up @@ -201,7 +195,7 @@ where
#[must_use]
pub fn store<K>() -> (Store<K>, Writer<K>)
where
K: Resource + Clone + 'static,
K: Lookup + Clone + 'static,
K::DynamicType: Eq + Hash + Clone + Default,
{
let w = Writer::<K>::default();
Expand Down
Loading