Skip to content

Commit

Permalink
fix(ext/kv): send queue wake messages accross different kv instances (#…
Browse files Browse the repository at this point in the history
…20465)

fixes #20454

Current KV queues implementation assumes that `enqueue` and
`listenQueue` are called on the same instance of `Deno.Kv`. It's
possible that the same Deno process opens multiple KV instances pointing
to the same fs path, and in that case `listenQueue` should still get
notified of messages enqueued through a different KV instance.
  • Loading branch information
Igor Zinkovsky authored Sep 29, 2023
1 parent 5edd102 commit 61b91e1
Show file tree
Hide file tree
Showing 4 changed files with 165 additions and 34 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

34 changes: 34 additions & 0 deletions cli/tests/unit/kv_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1819,6 +1819,40 @@ Deno.test({
},
});

Deno.test({
name: "different kv instances for enqueue and queueListen",
async fn() {
const filename = await Deno.makeTempFile({ prefix: "queue_db" });
try {
const db0 = await Deno.openKv(filename);
const db1 = await Deno.openKv(filename);
const promise = deferred();
let dequeuedMessage: unknown = null;
const listener = db0.listenQueue((msg) => {
dequeuedMessage = msg;
promise.resolve();
});
try {
const res = await db1.enqueue("test");
assert(res.ok);
assertNotEquals(res.versionstamp, null);
await promise;
assertEquals(dequeuedMessage, "test");
} finally {
db0.close();
await listener;
db1.close();
}
} finally {
try {
await Deno.remove(filename);
} catch {
// pass
}
}
},
});

Deno.test({
name: "queue graceful close",
async fn() {
Expand Down
1 change: 1 addition & 0 deletions ext/kv/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ async-trait.workspace = true
base64.workspace = true
chrono.workspace = true
deno_core.workspace = true
deno_node.workspace = true
deno_unsync = "0.1.1"
hex.workspace = true
log.workspace = true
Expand Down
163 changes: 129 additions & 34 deletions ext/kv/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@

use std::borrow::Cow;
use std::cell::RefCell;
use std::collections::HashMap;
use std::env::current_dir;
use std::future::Future;
use std::io::ErrorKind;
use std::marker::PhantomData;
use std::path::Path;
use std::path::PathBuf;
Expand All @@ -23,11 +26,14 @@ use deno_core::unsync::spawn;
use deno_core::unsync::spawn_blocking;
use deno_core::AsyncRefCell;
use deno_core::OpState;
use deno_node::PathClean;
use rand::Rng;
use rusqlite::params;
use rusqlite::OpenFlags;
use rusqlite::OptionalExtension;
use rusqlite::Transaction;
use tokio::sync::broadcast;
use tokio::sync::broadcast::error::RecvError;
use tokio::sync::mpsc;
use tokio::sync::watch;
use tokio::sync::OnceCell;
Expand Down Expand Up @@ -212,30 +218,35 @@ impl<P: SqliteDbHandlerPermissions> DatabaseHandler for SqliteDbHandler<P> {
}
}

let conn = sqlite_retry_loop(|| {
let (conn, queue_waker_key) = sqlite_retry_loop(|| {
let path = path.clone();
let default_storage_dir = self.default_storage_dir.clone();
async move {
spawn_blocking(move || {
let conn = match (path.as_deref(), &default_storage_dir) {
(Some(":memory:"), _) | (None, None) => {
rusqlite::Connection::open_in_memory()?
}
(Some(path), _) => {
let flags =
OpenFlags::default().difference(OpenFlags::SQLITE_OPEN_URI);
rusqlite::Connection::open_with_flags(path, flags)?
}
(None, Some(path)) => {
std::fs::create_dir_all(path)?;
let path = path.join("kv.sqlite3");
rusqlite::Connection::open(path)?
}
};
let (conn, queue_waker_key) =
match (path.as_deref(), &default_storage_dir) {
(Some(":memory:"), _) | (None, None) => {
(rusqlite::Connection::open_in_memory()?, None)
}
(Some(path), _) => {
let flags =
OpenFlags::default().difference(OpenFlags::SQLITE_OPEN_URI);
let resolved_path = canonicalize_path(&PathBuf::from(path))?;
(
rusqlite::Connection::open_with_flags(path, flags)?,
Some(resolved_path),
)
}
(None, Some(path)) => {
std::fs::create_dir_all(path)?;
let path = path.join("kv.sqlite3");
(rusqlite::Connection::open(path.clone())?, Some(path))
}
};

conn.pragma_update(None, "journal_mode", "wal")?;

Ok::<_, AnyError>(conn)
Ok::<_, AnyError>((conn, queue_waker_key))
})
.await
.unwrap()
Expand Down Expand Up @@ -277,6 +288,7 @@ impl<P: SqliteDbHandlerPermissions> DatabaseHandler for SqliteDbHandler<P> {
Ok(SqliteDb {
conn,
queue: OnceCell::new(),
queue_waker_key,
expiration_watcher,
})
}
Expand All @@ -285,6 +297,7 @@ impl<P: SqliteDbHandlerPermissions> DatabaseHandler for SqliteDbHandler<P> {
pub struct SqliteDb {
conn: ProtectedConn,
queue: OnceCell<SqliteQueue>,
queue_waker_key: Option<PathBuf>,
expiration_watcher: deno_core::unsync::JoinHandle<()>,
}

Expand Down Expand Up @@ -363,7 +376,7 @@ pub struct DequeuedMessage {
conn: WeakProtectedConn,
id: String,
payload: Option<Vec<u8>>,
waker_tx: mpsc::Sender<()>,
waker_tx: broadcast::Sender<()>,
_permit: OwnedSemaphorePermit,
}

Expand Down Expand Up @@ -403,7 +416,7 @@ impl QueueMessageHandle for DequeuedMessage {
};
if requeued {
// If the message was requeued, wake up the dequeue loop.
self.waker_tx.send(()).await?;
let _ = self.waker_tx.send(());
}
Ok(())
}
Expand All @@ -422,15 +435,18 @@ struct SqliteQueue {
conn: ProtectedConn,
dequeue_rx: Rc<AsyncRefCell<DequeueReceiver>>,
concurrency_limiter: Arc<Semaphore>,
waker_tx: mpsc::Sender<()>,
waker_tx: broadcast::Sender<()>,
shutdown_tx: watch::Sender<()>,
}

impl SqliteQueue {
fn new(conn: ProtectedConn) -> Self {
fn new(
conn: ProtectedConn,
waker_tx: broadcast::Sender<()>,
waker_rx: broadcast::Receiver<()>,
) -> Self {
let conn_clone = conn.clone();
let (shutdown_tx, shutdown_rx) = watch::channel::<()>(());
let (waker_tx, waker_rx) = mpsc::channel::<()>(1);
let (dequeue_tx, dequeue_rx) = mpsc::channel::<(Vec<u8>, String)>(64);

spawn(async move {
Expand Down Expand Up @@ -486,11 +502,6 @@ impl SqliteQueue {
}))
}

async fn wake(&self) -> Result<(), AnyError> {
self.waker_tx.send(()).await?;
Ok(())
}

fn shutdown(&self) {
let _ = self.shutdown_tx.send(());
}
Expand All @@ -499,7 +510,7 @@ impl SqliteQueue {
conn: ProtectedConn,
dequeue_tx: mpsc::Sender<(Vec<u8>, String)>,
mut shutdown_rx: watch::Receiver<()>,
mut waker_rx: mpsc::Receiver<()>,
mut waker_rx: broadcast::Receiver<()>,
) -> Result<(), AnyError> {
loop {
let messages = SqliteDb::run_tx(conn.clone(), move |tx| {
Expand Down Expand Up @@ -575,7 +586,9 @@ impl SqliteQueue {
};
tokio::select! {
_ = sleep_fut => {}
x = waker_rx.recv() => if x.is_none() {return Ok(());},
x = waker_rx.recv() => {
if let Err(RecvError::Closed) = x {return Ok(());}
},
_ = shutdown_rx.changed() => return Ok(())
}
}
Expand Down Expand Up @@ -773,7 +786,7 @@ impl Database for SqliteDb {

async fn atomic_write(
&self,
_state: Rc<RefCell<OpState>>,
state: Rc<RefCell<OpState>>,
write: AtomicWrite,
) -> Result<Option<CommitResult>, AnyError> {
let write = Arc::new(write);
Expand Down Expand Up @@ -892,20 +905,39 @@ impl Database for SqliteDb {
.await?;

if has_enqueues {
if let Some(queue) = self.queue.get() {
queue.wake().await?;
match self.queue.get() {
Some(queue) => {
let _ = queue.waker_tx.send(());
}
None => {
if let Some(waker_key) = &self.queue_waker_key {
let (waker_tx, _) =
shared_queue_waker_channel(waker_key, state.clone());
let _ = waker_tx.send(());
}
}
}
}
Ok(commit_result)
}

async fn dequeue_next_message(
&self,
_state: Rc<RefCell<OpState>>,
state: Rc<RefCell<OpState>>,
) -> Result<Option<Self::QMH>, AnyError> {
let queue = self
.queue
.get_or_init(|| async move { SqliteQueue::new(self.conn.clone()) })
.get_or_init(|| async move {
let (waker_tx, waker_rx) = {
match &self.queue_waker_key {
Some(waker_key) => {
shared_queue_waker_channel(waker_key, state.clone())
}
None => broadcast::channel(1),
}
};
SqliteQueue::new(self.conn.clone(), waker_tx, waker_rx)
})
.await;
let handle = queue.dequeue().await?;
Ok(handle)
Expand Down Expand Up @@ -1012,6 +1044,69 @@ fn encode_value(value: &crate::Value) -> (Cow<'_, [u8]>, i64) {
}
}

pub struct QueueWaker {
wakers_tx: HashMap<PathBuf, broadcast::Sender<()>>,
}

fn shared_queue_waker_channel(
waker_key: &Path,
state: Rc<RefCell<OpState>>,
) -> (broadcast::Sender<()>, broadcast::Receiver<()>) {
let mut state = state.borrow_mut();
let waker = {
let waker = state.try_borrow_mut::<QueueWaker>();
match waker {
Some(waker) => waker,
None => {
let waker = QueueWaker {
wakers_tx: HashMap::new(),
};
state.put::<QueueWaker>(waker);
state.borrow_mut::<QueueWaker>()
}
}
};

let waker_tx = waker
.wakers_tx
.entry(waker_key.to_path_buf())
.or_insert_with(|| {
let (waker_tx, _) = broadcast::channel(1);
waker_tx
});

(waker_tx.clone(), waker_tx.subscribe())
}

/// Same as Path::canonicalize, but also handles non-existing paths.
fn canonicalize_path(path: &Path) -> Result<PathBuf, AnyError> {
let path = path.to_path_buf().clean();
let mut path = path;
let mut names_stack = Vec::new();
loop {
match path.canonicalize() {
Ok(mut canonicalized_path) => {
for name in names_stack.into_iter().rev() {
canonicalized_path = canonicalized_path.join(name);
}
return Ok(canonicalized_path);
}
Err(err) if err.kind() == ErrorKind::NotFound => {
let file_name = path.file_name().map(|os_str| os_str.to_os_string());
if let Some(file_name) = file_name {
names_stack.push(file_name.to_str().unwrap().to_string());
path = path.parent().unwrap().to_path_buf();
} else {
names_stack.push(path.to_str().unwrap().to_string());
let current_dir = current_dir()?;
path = current_dir.clone();
}
}
Err(err) => return Err(err.into()),
}
}
}

fn is_conn_closed_error(e: &AnyError) -> bool {
get_custom_error_class(e) == Some("TypeError")
&& e.to_string() == ERROR_USING_CLOSED_DATABASE
Expand Down

0 comments on commit 61b91e1

Please sign in to comment.