Skip to content

Commit

Permalink
Prepare for release 0.79.0
Browse files Browse the repository at this point in the history
Signed-off-by: clux <sszynrae@gmail.com>
  • Loading branch information
clux committed Feb 23, 2023
1 parent b822ee6 commit 058307a
Showing 1 changed file with 13 additions and 18 deletions.
31 changes: 13 additions & 18 deletions examples/dynamic_watcher.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
use futures::{Stream, StreamExt, TryStreamExt};
use kube::{
api::{Api, DynamicObject, GroupVersionKind, ListParams, ResourceExt},
discovery::{self, ApiCapabilities, Scope},
runtime::{metadata_watcher, watcher, WatchStreamExt},
Client,
api::{Api, DynamicObject, GroupVersionKind, ListParams, Resource, ResourceExt},
runtime::{metadata_watcher, watcher, watcher::Event, WatchStreamExt},
};
use serde::de::DeserializeOwned;
use tracing::*;
Expand All @@ -13,7 +11,7 @@ use std::{env, fmt::Debug};
#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt::init();
let client = Client::try_default().await?;
let client = kube::Client::try_default().await?;

// If set will receive only the metadata for watched resources
let watch_metadata = env::var("WATCH_METADATA").map(|s| s == "1").unwrap_or(false);
Expand All @@ -26,34 +24,31 @@ async fn main() -> anyhow::Result<()> {
// Turn them into a GVK
let gvk = GroupVersionKind::gvk(&group, &version, &kind);
// Use API discovery to identify more information about the type (like its plural)
let (ar, caps) = discovery::pinned_kind(&client, &gvk).await?;
let (ar, _caps) = kube::discovery::pinned_kind(&client, &gvk).await?;

// Use the full resource info to create an Api with the ApiResource as its DynamicType
let api = Api::<DynamicObject>::all_with(client, &ar);
let lp = ListParams::default();

// Start a metadata or a full resource watch
if watch_metadata {
handle_events(metadata_watcher(api, ListParams::default()), caps).await?
handle_events(metadata_watcher(api, lp)).await
} else {
handle_events(watcher(api, ListParams::default()), caps).await?
handle_events(watcher(api, lp)).await
}

Ok(())
}

async fn handle_events<K: kube::Resource + Clone + Debug + Send + DeserializeOwned + 'static>(
stream: impl Stream<Item = watcher::Result<watcher::Event<K>>> + Send + 'static,
api_caps: ApiCapabilities,
async fn handle_events<K: Resource + Clone + Debug + Send + DeserializeOwned + 'static>(
stream: impl Stream<Item = watcher::Result<Event<K>>> + Send + 'static,
) -> anyhow::Result<()> {
// Fully compatible with kube-runtime
let mut items = stream.applied_objects().boxed();
while let Some(p) = items.try_next().await? {
if api_caps.scope == Scope::Cluster {
info!("saw {}", p.name_any());
if let Some(ns) = p.namespace() {
info!("saw {} in {ns}", p.name_any());
} else {
info!("saw {} in {}", p.name_any(), p.namespace().unwrap());
info!("saw {}", p.name_any());
}
trace!("full obj: {p:?}");
}

Ok(())
}

0 comments on commit 058307a

Please sign in to comment.