Skip to content

Commit

Permalink
Send UNWATCH when recycling redis connection
Browse files Browse the repository at this point in the history
This should allow safe recycling of connection after starting transaction with WATCH, but before EXEC/DISCARD/UNWATCH
This closes #258
  • Loading branch information
mcheshkov authored and bikeshedder committed Sep 6, 2023
1 parent 3e1778b commit 8f666b9
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 2 deletions.
8 changes: 6 additions & 2 deletions redis/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,13 @@ impl managed::Manager for Manager {

async fn recycle(&self, conn: &mut RedisConnection, _: &Metrics) -> RecycleResult {
let ping_number = self.ping_number.fetch_add(1, Ordering::Relaxed).to_string();
let n = redis::cmd("PING")
// Using pipeline to avoid roundtrip for UNWATCH
let (n,) = redis::Pipeline::with_capacity(2)
.cmd("UNWATCH")
.ignore()
.cmd("PING")
.arg(&ping_number)
.query_async::<_, String>(conn)
.query_async::<_, (String,)>(conn)
.await?;
if n == ping_number {
Ok(())
Expand Down
77 changes: 77 additions & 0 deletions redis/tests/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,3 +115,80 @@ async fn test_recycled() {
);
}
}

#[tokio::test]
async fn test_recycled_with_watch() {
use deadpool_redis::redis::{pipe, Value};

let pool = create_pool();

const WATCHED_KEY: &str = "deadpool/watched_test_key";
const TXN_KEY: &str = "deadpool/txn_test_key";

// Start transaction on one key and return connection to pool
let client_with_watch_id = {
let mut conn = pool.get().await.unwrap();

let client_id = cmd("CLIENT")
.arg("ID")
.query_async::<_, i64>(&mut conn)
.await
.unwrap();

cmd("WATCH")
.arg(WATCHED_KEY)
.query_async::<_, ()>(&mut conn)
.await
.unwrap();

client_id
};

{
let mut txn_conn = pool.get().await.unwrap();

let new_client_id = cmd("CLIENT")
.arg("ID")
.query_async::<_, i64>(&mut txn_conn)
.await
.unwrap();

// Ensure that's the same connection as the one in first transaction
assert_eq!(
client_with_watch_id, new_client_id,
"the redis connection with transaction was not recycled"
);

// Start transaction on another key
cmd("WATCH")
.arg(TXN_KEY)
.query_async::<_, ()>(&mut txn_conn)
.await
.unwrap();

{
let mut writer_conn = pool.get().await.unwrap();

// Overwrite key from first transaction from another connection
cmd("SET")
.arg(WATCHED_KEY)
.arg("v")
.query_async::<_, ()>(&mut writer_conn)
.await
.unwrap();
}

// Expect that new transaction is not aborted by irrelevant key
let txn_result = pipe()
.atomic()
.set(TXN_KEY, "foo")
.query_async::<_, Value>(&mut txn_conn)
.await
.unwrap();
assert_eq!(
txn_result,
Value::Bulk(vec![Value::Okay]),
"redis transaction in recycled connection aborted",
);
}
}

0 comments on commit 8f666b9

Please sign in to comment.