From b57ffb55115c935d52f0a322bfa73f6e22f72e12 Mon Sep 17 00:00:00 2001 From: luofucong Date: Wed, 13 Mar 2024 17:19:53 +0800 Subject: [PATCH] fix: correctly generate sequences when the value is pre-existed --- src/common/meta/src/sequence.rs | 116 ++++++++++++++++++++++++++++---- 1 file changed, 102 insertions(+), 14 deletions(-) diff --git a/src/common/meta/src/sequence.rs b/src/common/meta/src/sequence.rs index d297ab30bd3d..9e2469332a26 100644 --- a/src/common/meta/src/sequence.rs +++ b/src/common/meta/src/sequence.rs @@ -34,10 +34,14 @@ pub struct SequenceBuilder { max: u64, } +fn seq_name(name: impl AsRef) -> String { + format!("{}-{}", SEQ_PREFIX, name.as_ref()) +} + impl SequenceBuilder { pub fn new(name: impl AsRef, generator: KvBackendRef) -> Self { Self { - name: format!("{}-{}", SEQ_PREFIX, name.as_ref()), + name: seq_name(name), initial: 0, step: 1, generator, @@ -138,13 +142,14 @@ impl Inner { pub async fn next_range(&self) -> Result> { let key = self.name.as_bytes(); let mut start = self.next; - for _ in 0..self.force_quit { - let expect = if start == self.initial { - vec![] - } else { - u64::to_le_bytes(start).to_vec() - }; + let mut expect = if start == self.initial { + vec![] + } else { + u64::to_le_bytes(start).to_vec() + }; + + for _ in 0..self.force_quit { let step = self.step.min(self.max - start); ensure!( @@ -167,15 +172,24 @@ impl Inner { if !res.success { if let Some(kv) = res.prev_kv { - let value = kv.value; - ensure!( - value.len() == std::mem::size_of::(), - error::UnexpectedSequenceValueSnafu { - err_msg: format!("key={}, unexpected value={:?}", self.name, value) + expect = kv.value.clone(); + + let v: [u8; 8] = match kv.value.try_into() { + Ok(a) => a, + Err(v) => { + return error::UnexpectedSequenceValueSnafu { + err_msg: format!("Not a valid u64 for '{}': {v:?}", self.name), + } + .fail() } - ); - start = u64::from_le_bytes(value.try_into().unwrap()); + }; + let v = u64::from_le_bytes(v); + + // If the existed value is smaller than the initial, we should start from the initial. + start = v.max(self.initial); } else { + expect = vec![]; + start = self.initial; } continue; @@ -197,8 +211,12 @@ impl Inner { #[cfg(test)] mod tests { use std::any::Any; + use std::collections::HashSet; use std::sync::Arc; + use itertools::{Itertools, MinMaxResult}; + use tokio::sync::mpsc; + use super::*; use crate::error::Error; use crate::kv_backend::memory::MemoryKvBackend; @@ -209,6 +227,76 @@ mod tests { DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse, }; + #[tokio::test] + async fn test_sequence_with_existed_value() { + async fn test(exist: u64, expected: Vec) { + let kv_backend = Arc::new(MemoryKvBackend::default()); + + let exist = u64::to_le_bytes(exist); + kv_backend + .put(PutRequest::new().with_key(seq_name("s")).with_value(exist)) + .await + .unwrap(); + + let initial = 100; + let seq = SequenceBuilder::new("s", kv_backend) + .initial(initial) + .build(); + + let mut actual = Vec::with_capacity(expected.len()); + for _ in 0..expected.len() { + actual.push(seq.next().await.unwrap()); + } + assert_eq!(actual, expected); + } + + // put a value not greater than the "initial", the sequence should start from "initial" + test(1, vec![100, 101, 102]).await; + test(100, vec![100, 101, 102]).await; + + // put a value greater than the "initial", the sequence should start from the put value + test(200, vec![200, 201, 202]).await; + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_sequence_with_contention() { + let seq = Arc::new( + SequenceBuilder::new("s", Arc::new(MemoryKvBackend::default())) + .initial(1024) + .build(), + ); + + let (tx, mut rx) = mpsc::unbounded_channel(); + // Spawn 10 tasks to concurrently get the next sequence. Each task will get 100 sequences. + for _ in 0..10 { + tokio::spawn({ + let seq = seq.clone(); + let tx = tx.clone(); + async move { + for _ in 0..100 { + tx.send(seq.next().await.unwrap()).unwrap() + } + } + }); + } + + // Test that we get 1000 unique sequences, and start from 1024 to 2023. + let mut nums = HashSet::new(); + let mut c = 0; + while c < 1000 + && let Some(x) = rx.recv().await + { + nums.insert(x); + c += 1; + } + assert_eq!(nums.len(), 1000); + let MinMaxResult::MinMax(min, max) = nums.iter().minmax() else { + unreachable!("nums has more than one elements"); + }; + assert_eq!(*min, 1024); + assert_eq!(*max, 2023); + } + #[tokio::test] async fn test_sequence() { let kv_backend = Arc::new(MemoryKvBackend::default());