Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add default pagination to watcher #1249

Merged
merged 12 commits into from
Jul 13, 2023
4 changes: 4 additions & 0 deletions examples/pod_paged.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ use kube::{
};
use tracing::*;

// This example shows how to do pagination with the raw `Api` only.
// In many realistic setups that need a continual, paginated, safe list-watch;
// the `watcher` is an easier abstraction that has configurable pagination built in.

const PAGE_SIZE: u32 = 5;

#[tokio::main]
Expand Down
8 changes: 4 additions & 4 deletions kube-runtime/src/controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,12 +237,12 @@ const APPLIER_REQUEUE_BUF_SIZE: usize = 100;

/// Apply a reconciler to an input stream, with a given retry policy
///
/// Takes a `store` parameter for the core objects, which should usually be updated by a [`reflector`].
/// Takes a `store` parameter for the core objects, which should usually be updated by a [`reflector()`].
///
/// The `queue` indicates which objects should be reconciled. For the core objects this will usually be
/// the [`reflector`] (piped through [`trigger_self`]). If your core objects own any subobjects then you
/// can also make them trigger reconciliations by [merging](`futures::stream::select`) the [`reflector`]
/// with a [`watcher`](watcher()) or [`reflector`](reflector()) for the subobject.
/// the [`reflector()`] (piped through [`trigger_self`]). If your core objects own any subobjects then you
/// can also make them trigger reconciliations by [merging](`futures::stream::select`) the [`reflector()`]
/// with a [`watcher()`] or [`reflector()`] for the subobject.
///
/// This is the "hard-mode" version of [`Controller`], which allows you some more customization
/// (such as triggering from arbitrary [`Stream`]s), at the cost of being a bit more verbose.
Expand Down
11 changes: 3 additions & 8 deletions kube-runtime/src/controller/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,18 +142,13 @@ mod tests {
};
use futures::{
channel::{mpsc, oneshot},
future, poll, stream, FutureExt, SinkExt, StreamExt, TryStreamExt,
};
use std::{
cell::RefCell,
collections::{HashMap, HashSet},
sync::Mutex,
time::Duration,
future, poll, stream, SinkExt, StreamExt, TryStreamExt,
};
use std::{cell::RefCell, collections::HashMap, sync::Mutex, time::Duration};
use tokio::{
runtime::Handle,
task::yield_now,
time::{error::Elapsed, pause, sleep, timeout, Instant},
time::{pause, sleep, timeout, Instant},
};

#[tokio::test]
Expand Down
3 changes: 2 additions & 1 deletion kube-runtime/src/reflector/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,8 @@ where
{
/// Wait for the store to be populated by Kubernetes.
///
/// Note that this will _not_ await the source calling the associated [`Writer`] (such as the [`reflector`]).
/// Note that polling this will _not_ await the source of the stream that populates the [`Writer`].
/// The [`reflector`](crate::reflector()) stream must be awaited separately.
///
/// # Errors
/// Returns an error if the [`Writer`] was dropped before any value was written.
Expand Down
10 changes: 2 additions & 8 deletions kube-runtime/src/utils/delayed_init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,19 +102,13 @@ pub struct InitDropped;

#[cfg(test)]
mod tests {
use std::{
sync::{Arc, Mutex},
task::Poll,
};
use std::task::Poll;

use super::DelayedInit;
use futures::{pin_mut, poll};
use tracing::Level;
use tracing_subscriber::util::SubscriberInitExt;

use crate::utils::delayed_init::ReceiverState;

use super::DelayedInit;

fn setup_tracing() -> tracing::dispatcher::DefaultGuard {
tracing_subscriber::fmt()
.with_max_level(Level::TRACE)
Expand Down
3 changes: 1 addition & 2 deletions kube-runtime/src/utils/event_modify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use core::{
pin::Pin,
task::{Context, Poll},
};
use std::task::ready;

use futures::{Stream, TryStream};
use pin_project::pin_project;
Expand Down Expand Up @@ -50,7 +49,7 @@ pub(crate) mod test {
use std::{task::Poll, vec};

use super::{Error, Event, EventModify};
use futures::{pin_mut, poll, stream, Stream, StreamExt};
use futures::{pin_mut, poll, stream, StreamExt};

#[tokio::test]
async fn eventmodify_modifies_innner_value_of_event() {
Expand Down
92 changes: 69 additions & 23 deletions kube-runtime/src/watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,10 @@ impl<K> Event<K> {
/// The internal finite state machine driving the [`watcher`]
enum State<K> {
/// The Watcher is empty, and the next [`poll`](Stream::poll_next) will start the initial LIST to get all existing objects
Empty,
Empty {
continue_token: Option<String>,
objects: Vec<K>,
},
/// The initial LIST was successful, so we should move on to starting the actual watch.
InitListed { resource_version: String },
/// The watch is in progress, from this point we just return events from the server.
Expand All @@ -134,6 +137,15 @@ enum State<K> {
},
}

impl<K: Resource + Clone> Default for State<K> {
fn default() -> Self {
Self::Empty {
continue_token: None,
objects: vec![],
}
}
}

/// Used to control whether the watcher receives the full object, or only the
/// metadata
#[async_trait]
Expand Down Expand Up @@ -205,6 +217,14 @@ pub struct Config {
/// Configures re-list for performance vs. consistency.
pub list_semantic: ListSemantic,

/// Maximum number of objects retrieved per list operation resyncs.
///
/// This can reduce the memory consumption during resyncs, at the cost of requiring more
/// API roundtrips to complete.
///
/// Defaults to 500. Note that `None` represents unbounded.
pub page_size: Option<u32>,

/// Enables watch events with type "BOOKMARK".
///
/// Requests watch bookmarks from the apiserver when enabled for improved watch precision and reduced list calls.
Expand All @@ -220,6 +240,9 @@ impl Default for Config {
field_selector: None,
timeout: None,
list_semantic: ListSemantic::default(),
// same default page size limit as client-go
// https://github.com/kubernetes/client-go/blob/aed71fa5cf054e1c196d67b2e21f66fd967b8ab1/tools/pager/pager.go#L31
page_size: Some(500),
}
}
}
Expand Down Expand Up @@ -288,6 +311,16 @@ impl Config {
self
}

/// Limits the number of objects retrieved in each list operation during resync.
///
/// This can reduce the memory consumption during resyncs, at the cost of requiring more
/// API roundtrips to complete.
#[must_use]
pub fn page_size(mut self, page_size: u32) -> Self {
self.page_size = Some(page_size);
self
}

/// Converts generic `watcher::Config` structure to the instance of `ListParams` used for list requests.
fn to_list_params(&self) -> ListParams {
let (resource_version, version_match) = match self.list_semantic {
Expand All @@ -300,9 +333,8 @@ impl Config {
timeout: self.timeout,
version_match,
resource_version,
// It is not permissible for users to configure the continue token and limit for the watcher, as these parameters are associated with paging.
// The watcher must handle paging internally.
limit: None,
// The watcher handles pagination internally.
limit: self.page_size,
continue_token: None,
}
}
Expand Down Expand Up @@ -368,6 +400,7 @@ where
///
/// This function should be trampolined: if event == `None`
/// then the function should be called again until it returns a Some.
#[allow(clippy::too_many_lines)] // for now
async fn step_trampolined<A>(
api: &A,
wc: &Config,
Expand All @@ -378,25 +411,38 @@ where
A::Value: Resource + 'static,
{
match state {
State::Empty => match api.list(&wc.to_list_params()).await {
Ok(list) => {
if let Some(resource_version) = list.metadata.resource_version {
(Some(Ok(Event::Restarted(list.items))), State::InitListed {
resource_version,
})
} else {
(Some(Err(Error::NoResourceVersion)), State::Empty)
State::Empty {
continue_token,
mut objects,
} => {
let mut lp = wc.to_list_params();
lp.continue_token = continue_token;
match api.list(&lp).await {
Ok(list) => {
objects.extend(list.items);
if let Some(continue_token) = list.metadata.continue_ {
(None, State::Empty {
continue_token: Some(continue_token),
objects,
})
} else if let Some(resource_version) = list.metadata.resource_version {
(Some(Ok(Event::Restarted(objects))), State::InitListed {
resource_version,
})
} else {
(Some(Err(Error::NoResourceVersion)), State::default())
}
}
}
Err(err) => {
if std::matches!(err, ClientErr::Api(ErrorResponse { code: 403, .. })) {
warn!("watch list error with 403: {err:?}");
} else {
debug!("watch list error: {err:?}");
Err(err) => {
if std::matches!(err, ClientErr::Api(ErrorResponse { code: 403, .. })) {
warn!("watch list error with 403: {err:?}");
} else {
debug!("watch list error: {err:?}");
}
(Some(Err(err).map_err(Error::InitialListFailed)), State::default())
}
(Some(Err(err).map_err(Error::InitialListFailed)), State::Empty)
}
},
}
State::InitListed { resource_version } => {
match api.watch(&wc.to_watch_params(), &resource_version).await {
Ok(stream) => (None, State::Watching {
Expand Down Expand Up @@ -441,7 +487,7 @@ where
Some(Ok(WatchEvent::Error(err))) => {
// HTTP GONE, means we have desynced and need to start over and re-list :(
let new_state = if err.code == 410 {
State::Empty
State::default()
} else {
State::Watching {
resource_version,
Expand Down Expand Up @@ -543,7 +589,7 @@ pub fn watcher<K: Resource + Clone + DeserializeOwned + Debug + Send + 'static>(
watcher_config: Config,
) -> impl Stream<Item = Result<Event<K>>> + Send {
futures::stream::unfold(
(api, watcher_config, State::Empty),
(api, watcher_config, State::default()),
|(api, watcher_config, state)| async {
let (event, state) = step(&FullObject { api: &api }, &watcher_config, state).await;
Some((event, (api, watcher_config, state)))
Expand Down Expand Up @@ -607,7 +653,7 @@ pub fn metadata_watcher<K: Resource + Clone + DeserializeOwned + Debug + Send +
watcher_config: Config,
) -> impl Stream<Item = Result<Event<PartialObjectMeta<K>>>> + Send {
futures::stream::unfold(
(api, watcher_config, State::Empty),
(api, watcher_config, State::default()),
|(api, watcher_config, state)| async {
let (event, state) = step(&MetaOnly { api: &api }, &watcher_config, state).await;
Some((event, (api, watcher_config, state)))
Expand Down
4 changes: 4 additions & 0 deletions kube/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ futures = "0.3.17"
serde_json = "1.0.68"
serde = { version = "1.0.130", features = ["derive"] }
schemars = "0.8.6"
hyper = "0.14.27"
http = "0.2.9"
tower-test = "0.4.0"
anyhow = "1.0.71"

[dev-dependencies.k8s-openapi]
version = "0.18.0"
Expand Down
5 changes: 5 additions & 0 deletions kube/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,11 @@ pub use crate::core::{CustomResourceExt, Resource, ResourceExt};
#[doc(inline)]
pub use kube_core as core;

// Mock tests for the runtime
#[cfg(test)]
#[cfg(all(feature = "derive", feature = "runtime"))]
mod mock_tests;

// Tests that require a cluster and the complete feature set
// Can be run with `cargo test -p kube --lib --features=runtime,derive -- --ignored`
#[cfg(test)]
Expand Down
Loading