Skip to content

Commit

Permalink
Merge branch 'main' into wff/sqlx
Browse files Browse the repository at this point in the history
  • Loading branch information
wangrunji0408 authored Apr 9, 2024
2 parents 583afa4 + ff89182 commit 34db9b8
Show file tree
Hide file tree
Showing 11 changed files with 94 additions and 16 deletions.
24 changes: 24 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,30 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

## tokio [0.2.25] - 2024-04-08

### Removed

- Remove `stats` feature to allow tokio >=1.33.

## madsim [0.2.27] - 2024-04-07

### Fixed

- Fix the problem that `getrandom` returns different values in multiple runs with the same seed.

## rdkafka [0.3.4] - 2024-03-22

### Fixed

- Fix unintended drop of client in `fetch_watermarks`.

## madsim [0.2.26] - 2024-03-18

### Fixed

- `sleep` and `sleep_until` now sleep for at least 1ms to be consistent with tokio's behavior.

## rdkafka [0.3.3] - 2024-02-28

### Changed
Expand Down
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,6 @@ members = [
"tonic-example",
]
resolver = "2"

[patch.crates-io]
getrandom = { git = "https://github.com/madsim-rs/getrandom.git", rev = "6c9d9e9" }
2 changes: 1 addition & 1 deletion madsim-rdkafka/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "madsim-rdkafka"
version = "0.3.3+0.34.0"
version = "0.3.4+0.34.0"
edition = "2021"
authors = ["Runji Wang <wangrunji0408@163.com>"]
description = "The rdkafka simulator on madsim."
Expand Down
14 changes: 10 additions & 4 deletions madsim-rdkafka/src/std/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,17 +360,23 @@ impl<C: ClientContext> Client<C> {
partition: i32,
timeout: T,
) -> KafkaResult<(i64, i64)> {
// XXX: to move the raw pointer into spawn_blocking
struct NativePtr(*mut RDKafka);
unsafe impl Send for NativePtr {}

let topic_c = CString::new(topic.to_string())?;
let native_client = unsafe { NativeClient::from_ptr(self.native_ptr()) };
let native_ptr = NativePtr(self.native_ptr());

tokio::task::spawn_blocking(move || unsafe {
let mut low = -1;
let mut high = -1;
let native_ptr = native_ptr;
let ret = rdsys::rd_kafka_query_watermark_offsets(
native_client.ptr(),
native_ptr.0,
topic_c.as_ptr(),
partition,
&mut low as *mut i64,
&mut high as *mut i64,
&mut low,
&mut high,
timeout.into().as_millis(),
);
if ret.is_error() {
Expand Down
1 change: 1 addition & 0 deletions madsim-rdkafka/src/std/mocking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use crate::ClientContext;
///
/// In this case, we **must neither** destroy the mock cluster in `MockCluster`'s `drop()`,
/// **nor** outlive the `Client` from which the reference is obtained, hence the lifetime.
#[allow(dead_code)]
enum MockClusterClient<'c, C: ClientContext> {
Owned(Client<C>),
Borrowed(&'c Client<C>),
Expand Down
3 changes: 1 addition & 2 deletions madsim-tokio/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "madsim-tokio"
version = "0.2.24"
version = "0.2.25"
edition = "2021"
authors = ["Runji Wang <wangrunji0408@163.com>"]
description = "The `tokio` simulator on madsim."
Expand Down Expand Up @@ -43,7 +43,6 @@ signal = ["tokio/signal"]
sync = ["tokio/sync"]
test-util = ["tokio/test-util"]
time = ["tokio/time"]
stats = ["tokio/stats"]
tracing = ["tokio/tracing"]

# tokio optional dependencies
Expand Down
1 change: 1 addition & 0 deletions madsim-tokio/src/sim/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ pub struct Runtime {
abort_handles: Mutex<Vec<AbortHandle>>,
}

#[allow(dead_code)]
pub struct EnterGuard<'a>(&'a Runtime);

impl Runtime {
Expand Down
5 changes: 3 additions & 2 deletions madsim/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "madsim"
version = "0.2.25"
version = "0.2.27"
edition = "2021"
authors = ["Runji Wang <wangrunji0408@163.com>"]
description = "Deterministic Simulator for distributed systems."
Expand Down Expand Up @@ -62,7 +62,8 @@ tokio-util = { version = "0.7", features = ["codec"] }
[dev-dependencies]
criterion = "0.5"
structopt = "0.3"
tokio = { version = "1", features = ["rt-multi-thread", "macros"] }
tokio = { version = "1", features = ["rt-multi-thread", "macros", "io-util"] }
getrandom = "=0.2.13"

[[bench]]
name = "rpc"
Expand Down
37 changes: 33 additions & 4 deletions madsim/src/sim/rand.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ fn init_std_random_state(seed: u64) -> bool {
}

thread_local! {
static SEED: Cell<Option<u64>> = Cell::new(None);
static SEED: Cell<Option<u64>> = const { Cell::new(None) };
}

/// Obtain a series of random bytes.
Expand Down Expand Up @@ -208,8 +208,11 @@ unsafe extern "C" fn getrandom(mut buf: *mut u8, mut buflen: usize, _flags: u32)
buf = buf.add(std::mem::size_of::<u64>());
buflen -= std::mem::size_of::<u64>();
}
let val = rand.with(|rng| rng.gen::<u64>().to_ne_bytes());
core::ptr::copy(val.as_ptr(), buf, buflen);
// note: do not modify state if buflen == 0
if buflen != 0 {
let val = rand.with(|rng| rng.gen::<u64>().to_ne_bytes());
core::ptr::copy(val.as_ptr(), buf, buflen);
}
return len as _;
}
#[cfg(target_os = "linux")]
Expand Down Expand Up @@ -265,7 +268,7 @@ unsafe extern "C" fn getentropy(buf: *mut u8, buflen: usize) -> i32 {
#[cfg(test)]
mod tests {
use crate::runtime::Runtime;
use std::collections::{BTreeSet, HashMap};
use std::collections::{BTreeSet, HashMap, HashSet};

#[test]
#[cfg_attr(target_os = "linux", ignore)]
Expand Down Expand Up @@ -305,4 +308,30 @@ mod tests {
}
assert_eq!(seqs.len(), 3, "hashmap is not deterministic");
}

// https://github.com/madsim-rs/madsim/issues/201
#[test]
fn getrandom_should_be_deterministic() {
let rnd_fn = || async {
let mut dst = [0];
getrandom::getrandom(&mut dst).unwrap();
dst
};
let builder = crate::runtime::Builder::from_env();
let seed = builder.seed;
let set = (0..10)
.map(|_| {
crate::runtime::Builder {
seed,
count: 1,
jobs: 1,
config: crate::Config::default(),
time_limit: None,
check: false,
}
.run(rnd_fn)
})
.collect::<HashSet<_>>();
assert_eq!(set.len(), 1);
}
}
4 changes: 2 additions & 2 deletions madsim/src/sim/runtime/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use crate::{
use std::{cell::RefCell, sync::Arc};

thread_local! {
static CONTEXT: RefCell<Option<Handle>> = RefCell::new(None);
static TASK: RefCell<Option<Arc<TaskInfo>>> = RefCell::new(None);
static CONTEXT: RefCell<Option<Handle>> = const { RefCell::new(None) };
static TASK: RefCell<Option<Arc<TaskInfo>>> = const { RefCell::new(None) };
}

pub(crate) fn current<T>(map: impl FnOnce(&Handle) -> T) -> T {
Expand Down
16 changes: 15 additions & 1 deletion madsim/src/sim/time/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,15 +106,20 @@ impl TimeHandle {
}

/// Waits until `duration` has elapsed.
///
/// It will sleep for at least 1ms to be consistent with the behavior of `tokio::time::sleep`.
pub fn sleep(&self, duration: Duration) -> Sleep {
self.sleep_until(self.clock.now_instant() + duration)
}

/// Waits until `deadline` is reached.
///
/// It will sleep for at least 1ms to be consistent with the behavior of `tokio::time::sleep_until`.
pub fn sleep_until(&self, deadline: Instant) -> Sleep {
let min_deadline = self.clock.now_instant() + Duration::from_millis(1);
Sleep {
handle: self.clone(),
deadline,
deadline: deadline.max(min_deadline),
}
}

Expand Down Expand Up @@ -233,6 +238,15 @@ mod tests {
fn time() {
let runtime = Runtime::new();
runtime.block_on(async {
// sleep for at least 1ms
let t0 = Instant::now();
sleep(Duration::default()).await;
assert!(t0.elapsed() >= Duration::from_millis(1));

let t0 = Instant::now();
sleep_until(t0).await;
assert!(t0.elapsed() >= Duration::from_millis(1));

let t0 = Instant::now();

sleep(Duration::from_secs(1)).await;
Expand Down

0 comments on commit 34db9b8

Please sign in to comment.