Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: correctly generate sequences when the value is pre-existed #3502

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 102 additions & 14 deletions src/common/meta/src/sequence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,14 @@ pub struct SequenceBuilder {
max: u64,
}

fn seq_name(name: impl AsRef<str>) -> String {
format!("{}-{}", SEQ_PREFIX, name.as_ref())
}

impl SequenceBuilder {
pub fn new(name: impl AsRef<str>, generator: KvBackendRef) -> Self {
Self {
name: format!("{}-{}", SEQ_PREFIX, name.as_ref()),
name: seq_name(name),
initial: 0,
step: 1,
generator,
Expand Down Expand Up @@ -138,13 +142,14 @@ impl Inner {
pub async fn next_range(&self) -> Result<Range<u64>> {
let key = self.name.as_bytes();
let mut start = self.next;
for _ in 0..self.force_quit {
let expect = if start == self.initial {
vec![]
} else {
u64::to_le_bytes(start).to_vec()
};

let mut expect = if start == self.initial {
vec![]
} else {
u64::to_le_bytes(start).to_vec()
};

for _ in 0..self.force_quit {
let step = self.step.min(self.max - start);

ensure!(
Expand All @@ -167,15 +172,24 @@ impl Inner {

if !res.success {
if let Some(kv) = res.prev_kv {
let value = kv.value;
ensure!(
value.len() == std::mem::size_of::<u64>(),
error::UnexpectedSequenceValueSnafu {
err_msg: format!("key={}, unexpected value={:?}", self.name, value)
expect = kv.value.clone();

let v: [u8; 8] = match kv.value.try_into() {
Ok(a) => a,
Err(v) => {
return error::UnexpectedSequenceValueSnafu {
err_msg: format!("Not a valid u64 for '{}': {v:?}", self.name),
}
.fail()
}
);
start = u64::from_le_bytes(value.try_into().unwrap());
};
let v = u64::from_le_bytes(v);

// If the existed value is smaller than the initial, we should start from the initial.
start = v.max(self.initial);
} else {
expect = vec![];

start = self.initial;
}
continue;
Expand All @@ -197,8 +211,12 @@ impl Inner {
#[cfg(test)]
mod tests {
use std::any::Any;
use std::collections::HashSet;
use std::sync::Arc;

use itertools::{Itertools, MinMaxResult};
use tokio::sync::mpsc;

use super::*;
use crate::error::Error;
use crate::kv_backend::memory::MemoryKvBackend;
Expand All @@ -209,6 +227,76 @@ mod tests {
DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse,
};

#[tokio::test]
async fn test_sequence_with_existed_value() {
async fn test(exist: u64, expected: Vec<u64>) {
let kv_backend = Arc::new(MemoryKvBackend::default());

let exist = u64::to_le_bytes(exist);
kv_backend
.put(PutRequest::new().with_key(seq_name("s")).with_value(exist))
.await
.unwrap();

let initial = 100;
let seq = SequenceBuilder::new("s", kv_backend)
.initial(initial)
.build();

let mut actual = Vec::with_capacity(expected.len());
for _ in 0..expected.len() {
actual.push(seq.next().await.unwrap());
}
assert_eq!(actual, expected);
}

// put a value not greater than the "initial", the sequence should start from "initial"
test(1, vec![100, 101, 102]).await;
test(100, vec![100, 101, 102]).await;

// put a value greater than the "initial", the sequence should start from the put value
test(200, vec![200, 201, 202]).await;
}

#[tokio::test(flavor = "multi_thread")]
async fn test_sequence_with_contention() {
let seq = Arc::new(
SequenceBuilder::new("s", Arc::new(MemoryKvBackend::default()))
.initial(1024)
.build(),
);

let (tx, mut rx) = mpsc::unbounded_channel();
// Spawn 10 tasks to concurrently get the next sequence. Each task will get 100 sequences.
for _ in 0..10 {
tokio::spawn({
let seq = seq.clone();
let tx = tx.clone();
async move {
for _ in 0..100 {
tx.send(seq.next().await.unwrap()).unwrap()
}
}
});
}

// Test that we get 1000 unique sequences, and start from 1024 to 2023.
let mut nums = HashSet::new();
let mut c = 0;
while c < 1000
&& let Some(x) = rx.recv().await
{
nums.insert(x);
c += 1;
}
assert_eq!(nums.len(), 1000);
let MinMaxResult::MinMax(min, max) = nums.iter().minmax() else {
unreachable!("nums has more than one elements");
};
assert_eq!(*min, 1024);
assert_eq!(*max, 2023);
}

#[tokio::test]
async fn test_sequence() {
let kv_backend = Arc::new(MemoryKvBackend::default());
Expand Down