Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: sync and drain initial implementation #8

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 139 additions & 2 deletions contactor/src/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,19 @@ use yrs_warp::{
AwarenessRef,
};

use crate::{DrainingState, RoomState, SyncingState};

/// The default capacity for the broadcast buffer.
const DEFAULT_BUFFER_CAPACITY: usize = 16;

/// Represents a chat room with a broadcast group and listener count.
struct Room {
pub struct Room {
/// The broadcast group for the room.
bcast: Arc<BroadcastGroup>,
/// The number of listeners connected to the room.
listeners: AtomicUsize,
/// The current state of the room
pub(crate) state: Arc<Mutex<RoomState>>,
}

/// Errors that can occur when managing broadcasts.
Expand All @@ -48,7 +52,7 @@ pub enum BroadcastManagerError {
/// Manages broadcasting messages to subscribers across multiple rooms.
pub struct BroadcastManager {
/// A thread-safe map of room names to their corresponding `Room` instances.
rooms: DashMap<String, Room>,
pub(crate) rooms: DashMap<String, Room>,
/// A mutex-protected map for room shutdown signals.
room_shutdown_signals: Mutex<HashMap<String, watch::Sender<()>>>,
}
Expand All @@ -63,6 +67,9 @@ impl Debug for BroadcastManager {
}

impl BroadcastManager {
const MAX_SYNC_RETRIES: u32 = 3;
const MAX_DRAIN_RETRIES: u32 = 3;

/// Creates a new `BroadcastManager`.
pub fn new() -> Self {
Self {
Expand Down Expand Up @@ -95,6 +102,10 @@ impl BroadcastManager {
.map(|room| room.listeners.load(Ordering::Relaxed))
}

pub fn get_room(&self, room_name: &str) -> Option<dashmap::mapref::one::Ref<String, Room>> {
self.rooms.get(room_name)
}

/// Subscribes a client to a room's broadcast group.
///
/// # Type Parameters
Expand Down Expand Up @@ -211,8 +222,13 @@ impl BroadcastManager {
let room = Room {
bcast: Arc::new(BroadcastGroup::new(awareness, DEFAULT_BUFFER_CAPACITY).await),
listeners: AtomicUsize::new(0),
state: Arc::new(Mutex::new(RoomState::Down)),
};
self.rooms.insert(room_name.to_string(), room);

// Start syncing process
self.start_syncing(room_name).await;

Ok(())
}

Expand Down Expand Up @@ -244,6 +260,127 @@ impl BroadcastManager {
let mut signals = self.room_shutdown_signals.lock().await;
signals.remove(room_name)
}

pub async fn start_syncing(&self, room_name: &str) {
let room = match self.rooms.get(room_name) {
Some(room) => room,
None => return, // Room doesn't exist
};

// Clone for async move
let state = room.state.clone();
let room_name = room_name.to_string();

tokio::spawn(async move {
let mut retry_count = 0;

loop {
{
let mut state_guard = state.lock().await;
*state_guard = RoomState::Syncing(SyncingState::Load(retry_count));
}

// Attempt to load data (replace with your actual load logic)
match attempt_load(&room_name).await {
Ok(_) => {
let mut state_guard = state.lock().await;
*state_guard = RoomState::Syncing(SyncingState::Success);
break;
}
Err(_) if retry_count < Self::MAX_SYNC_RETRIES => {
retry_count += 1;
{
let mut state_guard = state.lock().await;
*state_guard = RoomState::Syncing(SyncingState::RetryLoad(retry_count));
}
// Wait before retrying
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
Err(_) => {
let mut state_guard = state.lock().await;
*state_guard = RoomState::Syncing(SyncingState::Fail);
// Handle failure (e.g., log, alert)
return;
}
}
}

// Transition to UP state
{
let mut state_guard = state.lock().await;
*state_guard = RoomState::Up;
}
});
}

pub async fn start_draining(self: Arc<Self>, room_name: &str) {
let room = match self.rooms.get(room_name) {
Some(room) => room,
None => return, // Room doesn't exist
};

let state = room.state.clone();
let room_name = room_name.to_string();
let self_clone = self.clone();
let max_retries = Self::MAX_DRAIN_RETRIES;

tokio::spawn(async move {
let mut retry_count = 0;

{
let mut state_guard = state.lock().await;
*state_guard = RoomState::Draining(DrainingState::Store(retry_count));
}

loop {
// Attempt to store data (replace with your actual store logic)
match attempt_store(&room_name).await {
Ok(_) => {
let mut state_guard = state.lock().await;
*state_guard = RoomState::Draining(DrainingState::Success);
break;
}
Err(_) if retry_count < max_retries => {
retry_count += 1;
{
let mut state_guard = state.lock().await;
*state_guard =
RoomState::Draining(DrainingState::RetryStore(retry_count));
}
// Wait before retrying
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
Err(_) => {
let mut state_guard = state.lock().await;
*state_guard = RoomState::Draining(DrainingState::Fail);
// Handle failure (e.g., log, alert)
return;
}
}
}

// Transition to DOWN state and remove room
{
let mut state_guard = state.lock().await;
*state_guard = RoomState::Down;
}

// Remove room from manager
self_clone.rooms.remove(&room_name);
});
}
}

// Mock function to simulate storing
async fn attempt_load(_room_name: &str) -> Result<(), ()> {
// Implement your actual data storing logic here
Ok(())
}

// Mock function to simulate storing
async fn attempt_store(_room_name: &str) -> Result<(), ()> {
// Implement your actual data storing logic here
Ok(())
}

#[cfg(test)]
Expand Down
124 changes: 124 additions & 0 deletions contactor/src/common.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
use serde::de::{self, Visitor};
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use std::fmt;

/// Struct for serializing node information.
#[derive(Serialize)]
pub struct NodeInfo {
pub address: String,
pub num_rooms: usize,
pub num_connections: usize,
pub cpu_usage: f32,
pub total_memory: u64,
pub used_memory: u64,
}

/// Struct for serializing and deserializing room information.
#[derive(Serialize, Deserialize)]
pub struct RoomInfo {
pub address: String,
pub node_id: String,
pub participants: Option<usize>,
pub status: RoomState,
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub enum RoomState {
Down,
Syncing(SyncingState),
Up,
Draining(DrainingState),
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub enum SyncingState {
Load(u32),
RetryLoad(u32),
Success,
Fail,
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub enum DrainingState {
Store(u32),
RetryStore(u32),
Success,
Fail,
}

impl Serialize for RoomState {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let s = match self {
RoomState::Down => "DOWN".to_string(),
RoomState::Syncing(sync_state) => match sync_state {
SyncingState::Load(_) => "SYNCING_LOAD".to_string(),
SyncingState::RetryLoad(_) => "SYNCING_RETRY_LOAD".to_string(),
SyncingState::Success => "SYNCING_SUCCESS".to_string(),
SyncingState::Fail => "SYNCING_FAIL".to_string(),
},
RoomState::Up => "UP".to_string(),
RoomState::Draining(draining_state) => match draining_state {
DrainingState::Store(_) => "DRAINING_STORE".to_string(),
DrainingState::RetryStore(_) => "DRAINING_RETRY_STORE".to_string(),
DrainingState::Success => "DRAINING_SUCCESS".to_string(),
DrainingState::Fail => "DRAINING_FAIL".to_string(),
},
};
serializer.serialize_str(&s)
}
}

impl<'de> Deserialize<'de> for RoomState {
fn deserialize<D>(deserializer: D) -> Result<RoomState, D::Error>
where
D: Deserializer<'de>,
{
struct RoomStateVisitor;

impl<'de> Visitor<'de> for RoomStateVisitor {
type Value = RoomState;

fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str("a SCREAMING_SNAKE_CASE string representing RoomState")
}

fn visit_str<E>(self, value: &str) -> Result<RoomState, E>
where
E: de::Error,
{
match value {
"DOWN" => Ok(RoomState::Down),
"SYNCING_LOAD" => Ok(RoomState::Syncing(SyncingState::Load(0))), // Default value; adjust as needed
"SYNCING_RETRY_LOAD" => Ok(RoomState::Syncing(SyncingState::RetryLoad(0))),
"SYNCING_SUCCESS" => Ok(RoomState::Syncing(SyncingState::Success)),
"SYNCING_FAIL" => Ok(RoomState::Syncing(SyncingState::Fail)),
"UP" => Ok(RoomState::Up),
"DRAINING_STORE" => Ok(RoomState::Draining(DrainingState::Store(0))),
"DRAINING_RETRY_STORE" => Ok(RoomState::Draining(DrainingState::RetryStore(0))),
"DRAINING_SUCCESS" => Ok(RoomState::Draining(DrainingState::Success)),
"DRAINING_FAIL" => Ok(RoomState::Draining(DrainingState::Fail)),
_ => Err(de::Error::unknown_variant(
value,
&[
"DOWN",
"SYNCING_LOAD",
"SYNCING_RETRY_LOAD",
"SYNCING_SUCCESS",
"SYNCING_FAIL",
"UP",
"DRAINING_STORE",
"DRAINING_RETRY_STORE",
"DRAINING_SUCCESS",
"DRAINING_FAIL",
],
)),
}
}
}

deserializer.deserialize_str(RoomStateVisitor)
}
}
3 changes: 3 additions & 0 deletions contactor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,11 @@ pub mod api;
pub mod broadcast;
pub mod relay;

pub(crate) mod common;
pub(crate) mod ids;

pub use crate::common::*;

/// A utility struct for generating Redis keys for rooms and nodes.
pub(crate) struct RedisKeygenerator {}

Expand Down
Loading