Skip to content

Commit

Permalink
can enforce a timeout when reserving a consumer or a producer
Browse files Browse the repository at this point in the history
  • Loading branch information
campeis committed Oct 8, 2023
1 parent e096bd0 commit a75ee1a
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 0 deletions.
27 changes: 27 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use std::cell::UnsafeCell;
use std::fmt::Debug;
use std::ops::{Deref, DerefMut};
use std::sync::Arc;
use std::time::Duration;

use tracking_cursor::{ReservedForCursor, TrackingCursor};

Expand Down Expand Up @@ -145,6 +146,19 @@ where
Ok(ProduceGuard::new(self, reservation))
}

/// Returns a ProduceGuard that could be used to push a new value in the queue. If the queue is full it will wait for a free spot
/// until timeout expires. If no spot frees before timeout expires will return ReservationErr::NoAvailableSlot.
/// The timeout is not an hard timeout, but just represents the minimum time this function is going to wait.
pub fn try_reserve_produce_with_timeout(
&self,
timeout: Duration,
) -> Result<ProduceGuard<T>, ReservationErr> {
let reservation = self
.produce_tracker
.try_advance_cursor_with_timeout(timeout)?;
Ok(ProduceGuard::new(self, reservation))
}

/// Returns the number of available entities in the producer
pub fn size(&self) -> usize {
self.buffer.len()
Expand Down Expand Up @@ -191,6 +205,19 @@ where
let reservation = self.consume_tracker.try_advance_cursor()?;
Ok(ConsumeGuard::new(self, reservation))
}

/// Returns ConsumeGuard that could be used to pull a new value from the queue. It will wait for an available entity until timeout expires.
/// If the queue is empty when timeout expires will return ReservationErr::NoAvailableSlot.
/// /// The timeout is not an hard timeout, but just represents the minimum time this function is going to wait.
pub fn try_reserve_consume_with_timeout(
&self,
timeout: Duration,
) -> Result<ConsumeGuard<T>, ReservationErr> {
let reservation = self
.consume_tracker
.try_advance_cursor_with_timeout(timeout)?;
Ok(ConsumeGuard::new(self, reservation))
}
}

/// A guard that will keep the reservation valid until the guard is in scope.
Expand Down
20 changes: 20 additions & 0 deletions src/tracking_cursor/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crossbeam_utils::CachePadded;
use std::fmt::{Debug, Formatter};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::{Duration, Instant};

type AvailabilityStrategy<T> = dyn Fn(&T, usize, usize) -> bool;

Expand Down Expand Up @@ -97,6 +98,25 @@ impl TrackingCursor {
}
}

pub(crate) fn try_advance_cursor_with_timeout(
&self,
timeout: Duration,
) -> Result<ReservedForCursor, ReservationErr> {
let start_time = Instant::now();
loop {
match self.try_advance_cursor() {
Ok(reserved) => return Ok(reserved),
Err(err) => match err {
ReservationErr::NoAvailableSlot => {
if start_time.elapsed() > timeout {
return Err(ReservationErr::NoAvailableSlot);
}
}
},
}
}
}

pub(crate) fn advance_target(&self, reserved: &ReservedForCursor) {
loop {
if self
Expand Down
20 changes: 20 additions & 0 deletions tests/integration_tests.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use ring_buffer::RingBuffer;
use std::sync::{Arc, RwLock};
use std::thread;
use std::time::Duration;

#[derive(Debug, Default, Clone)]
struct TestStruct {
Expand Down Expand Up @@ -72,6 +73,18 @@ fn cant_reserve_for_production_if_all_slots_are_full() {
assert!(res.is_err())
}

#[test]
fn cant_reserve_for_production_if_all_slots_are_full_after_timeut() {
let (producer, _) = RingBuffer::create::<TestStruct>(1024);

for i in 0..producer.size() {
producer.reserve_produce().value = i;
}

let res = producer.try_reserve_produce_with_timeout(Duration::from_millis(1));
assert!(res.is_err())
}

#[test]
fn can_reserve_for_consuming_if_there_is_an_available_slot() {
let (producer, consumer) = RingBuffer::create::<TestStruct>(1024);
Expand All @@ -90,6 +103,13 @@ fn cant_reserve_for_consuming_if_all_slots_are_empty() {
assert!(res.is_err())
}

#[test]
fn cant_reserve_for_consuming_if_all_slots_are_empty_before_timeout_expires() {
let (_, consumer) = RingBuffer::create::<TestStruct>(1024);
let res = consumer.try_reserve_consume_with_timeout(Duration::from_millis(1));
assert!(res.is_err())
}

#[test]
fn cant_reserve_for_consuming_if_intermediate_stage_did_not_consume() {
let (producer, _, consumer) = RingBuffer::create_with_intermediate_stage::<TestStruct>(1024);
Expand Down

0 comments on commit a75ee1a

Please sign in to comment.