Skip to content

Commit

Permalink
Work around panic in tokio::time::sleep()
Browse files Browse the repository at this point in the history
A user showed a stack trace of a crash that resulted from the key-value
background task attempting to sleep forever by passing a large Duration.
The example the user was running doesn't use expiration at all, so the
loop is meant to sleep forever.

I'm uncertain what is causing this to panic on their machine and not on
any of the test environments I've run on except that they're on Windows.
The tokio code as far as I can tell has no platform-specific code in
that area, so I'm baffled. I reported tokio-rs/tokio#4494.

In the meantime, I've changed the logic to support a "Never" sleep
target which avoids calling sleep() when there's no expiration target.
This makes the code a little more complicated (the reason I went with
the original approach), but it should fix the panic the user was seeing.
  • Loading branch information
ecton committed Feb 12, 2022
1 parent 78ec702 commit 874dbe8
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 17 deletions.
4 changes: 3 additions & 1 deletion crates/bonsaidb-local/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ use tokio::sync::watch;
use crate::vault::TreeVault;
use crate::{
config::{Builder, KeyValuePersistence, StorageConfiguration},
database::keyvalue::BackgroundWorkerProcessTarget,
error::Error,
open_trees::OpenTrees,
views::{
Expand Down Expand Up @@ -1436,7 +1437,8 @@ impl Borrow<Roots<AnyFile>> for Context {

impl Context {
pub(crate) fn new(roots: Roots<AnyFile>, key_value_persistence: KeyValuePersistence) -> Self {
let (background_sender, background_receiver) = watch::channel(None);
let (background_sender, background_receiver) =
watch::channel(BackgroundWorkerProcessTarget::Never);
let key_value_state = Arc::new(Mutex::new(keyvalue::KeyValueState::new(
key_value_persistence,
roots.clone(),
Expand Down
58 changes: 42 additions & 16 deletions crates/bonsaidb-local/src/database/keyvalue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ pub struct KeyValueState {
roots: Roots<AnyFile>,
persistence: KeyValuePersistence,
last_commit: Timestamp,
background_worker_target: watch::Sender<Option<Timestamp>>,
background_worker_target: watch::Sender<BackgroundWorkerProcessTarget>,
expiring_keys: BTreeMap<String, Timestamp>,
expiration_order: VecDeque<String>,
dirty_keys: BTreeMap<String, Option<Entry>>,
Expand All @@ -235,7 +235,7 @@ impl KeyValueState {
pub fn new(
persistence: KeyValuePersistence,
roots: Roots<AnyFile>,
background_worker_target: watch::Sender<Option<Timestamp>>,
background_worker_target: watch::Sender<BackgroundWorkerProcessTarget>,
) -> Self {
Self {
roots,
Expand Down Expand Up @@ -592,18 +592,33 @@ impl KeyValueState {
*expiration_timeout
});
let now = Timestamp::now();
let duration_until_commit = if self.keys_being_persisted.is_some() {
Duration::MAX
if self.keys_being_persisted.is_some() {
drop(
self.background_worker_target
.send(BackgroundWorkerProcessTarget::Never),
);
return;
}
let duration_until_commit = self.persistence.duration_until_next_commit(
self.dirty_keys.len(),
(now - self.last_commit).unwrap_or_default(),
);
if duration_until_commit == Duration::ZERO {
drop(
self.background_worker_target
.send(BackgroundWorkerProcessTarget::Now),
);
} else {
self.persistence.duration_until_next_commit(
self.dirty_keys.len(),
(now - self.last_commit).unwrap_or_default(),
)
};
let commit_target = now + duration_until_commit;
let closest_target = key_expiration_target.min(commit_target);
if *self.background_worker_target.borrow() != Some(closest_target) {
drop(self.background_worker_target.send(Some(closest_target)));
let commit_target = now + duration_until_commit;
let closest_target = key_expiration_target.min(commit_target);
if *self.background_worker_target.borrow()
!= BackgroundWorkerProcessTarget::Timestamp(closest_target)
{
drop(
self.background_worker_target
.send(BackgroundWorkerProcessTarget::Timestamp(closest_target)),
);
}
}
}

Expand Down Expand Up @@ -729,13 +744,14 @@ impl KeyValueState {

pub async fn background_worker(
key_value_state: Arc<Mutex<KeyValueState>>,
mut timestamp_receiver: watch::Receiver<Option<Timestamp>>,
mut timestamp_receiver: watch::Receiver<BackgroundWorkerProcessTarget>,
) -> Result<(), Error> {
loop {
let mut perform_operations = false;
let current_timestamp = *timestamp_receiver.borrow_and_update();
let changed_result = match current_timestamp {
Some(target) => {
BackgroundWorkerProcessTarget::Never => timestamp_receiver.changed().await,
BackgroundWorkerProcessTarget::Timestamp(target) => {
let remaining = target - Timestamp::now();
if let Some(remaining) = remaining {
tokio::select! {
Expand All @@ -750,7 +766,10 @@ pub async fn background_worker(
Ok(())
}
}
None => timestamp_receiver.changed().await,
BackgroundWorkerProcessTarget::Now => {
perform_operations = true;
Ok(())
}
};

if changed_result.is_err() {
Expand All @@ -771,6 +790,13 @@ pub async fn background_worker(
Ok(())
}

#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub enum BackgroundWorkerProcessTarget {
Now,
Timestamp(Timestamp),
Never,
}

#[derive(Debug)]
pub struct ExpirationLoader {
pub database: Database,
Expand Down

0 comments on commit 874dbe8

Please sign in to comment.