Skip to content

Commit

Permalink
refactor(kvsd): wrap uow mpsc sender and receiver
Browse files Browse the repository at this point in the history
  • Loading branch information
ymgyt committed Sep 19, 2024
1 parent 9a047bb commit 150fa31
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 46 deletions.
6 changes: 3 additions & 3 deletions crates/synd_kvsd/src/boot/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@
use std::path::PathBuf;

use thiserror::Error;
use tokio::sync::mpsc;

use crate::{
boot::provision::{ProvisionError, Provisioner},
middleware::Dispatcher,
table::{Table, TableRef},
uow::UnitOfWork,
};

mod provision;
Expand Down Expand Up @@ -43,15 +43,15 @@ impl Boot {
message: err.to_string(),
})?;
// TODO: configure buffer size
let (tx, _) = mpsc::channel(1024);
let (tx, rx) = UnitOfWork::channel(1024);
let table_ref = TableRef {
namespace,
name: table.name().into(),
};
dispatcher.add_table(table_ref, tx);

// TODO: abstract async runtime
// tokio::spawn(table.run(rx));
tokio::spawn(table.run(rx));
}
// Create Middleware
// Create Kvsd
Expand Down
8 changes: 3 additions & 5 deletions crates/synd_kvsd/src/middleware/dispatcher.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
use std::collections::HashMap;

use tokio::sync::mpsc;

use crate::{
table::{Namespace, TableRef},
uow::UnitOfWork,
uow::UowSender,
};

pub(crate) struct Dispatcher {
// TODO: use TableName
table: HashMap<Namespace, HashMap<String, mpsc::Sender<UnitOfWork>>>,
table: HashMap<Namespace, HashMap<String, UowSender>>,
}

impl Dispatcher {
Expand All @@ -19,7 +17,7 @@ impl Dispatcher {
}
}

pub(crate) fn add_table(&mut self, table_ref: TableRef<'_>, sender: mpsc::Sender<UnitOfWork>) {
pub(crate) fn add_table(&mut self, table_ref: TableRef<'_>, sender: UowSender) {
self.table
.entry(table_ref.namespace)
.or_default()
Expand Down
50 changes: 13 additions & 37 deletions crates/synd_kvsd/src/table/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,12 @@ use tokio::{
};
use tracing::debug;

use crate::table::{
index::{Index, IndexError},
Namespace,
use crate::{
table::{
index::{Index, IndexError},
Namespace,
},
uow::{UnitOfWork, UowError, UowReceiver},
};

#[derive(Error, Debug)]
Expand Down Expand Up @@ -109,16 +112,18 @@ where
})
}

/*
pub(crate) async fn run(mut self, mut receiver: Receiver<UnitOfWork>) {
pub(crate) async fn run(mut self, mut receiver: UowReceiver) {
while let Some(uow) = receiver.recv().await {
if let Err(err) = self.handle_uow(uow).await {
error!("handle uow {}", err);
}
}
}

async fn handle_uow(&mut self, uow: UnitOfWork) -> Result<()> {
#[expect(clippy::unused_async)]
async fn handle_uow(&mut self, _uow: UnitOfWork) -> Result<(), UowError> {
todo!()
/*
match uow {
UnitOfWork::Set(set) => {
info!("{}", set.request);
Expand Down Expand Up @@ -176,8 +181,10 @@ where
}
_ => unreachable!(),
}
*/
}

/*
fn send_value(
&self,
sender: Option<oneshot::Sender<Result<Option<Value>>>>,
Expand Down Expand Up @@ -207,34 +214,3 @@ where
}
*/
}

/*
impl<File> Table<File>
where
File: AsyncRead + AsyncSeek + Unpin,
{
pub(crate) async fn dump<F>(&mut self, mut callback: F) -> Result<()>
where
F: FnMut(EntryDump),
{
let current = self.file.seek(SeekFrom::Current(0)).await?;
self.file.seek(SeekFrom::Start(0)).await?;
loop {
match Entry::decode_from(&mut self.file).await {
Ok((_, entry)) => {
callback(entry.into());
}
Err(err) if err.is_eof() => break,
Err(err) => {
tracing::error!("{err}");
}
}
}
self.file.seek(SeekFrom::Start(current)).await?;
Ok(())
}
}
*/
23 changes: 22 additions & 1 deletion crates/synd_kvsd/src/uow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use thiserror::Error;

use std::sync::Arc;

use tokio::sync::oneshot;
use tokio::sync::{mpsc, oneshot};

use crate::authn::principal::Principal;

Expand All @@ -25,6 +25,20 @@ pub(crate) enum UowError {
SendResponse,
}

pub(crate) struct UowSender {
tx: mpsc::Sender<UnitOfWork>,
}

pub(crate) struct UowReceiver {
rx: mpsc::Receiver<UnitOfWork>,
}

impl UowReceiver {
pub(crate) async fn recv(&mut self) -> Option<UnitOfWork> {
self.rx.recv().await
}
}

pub(crate) enum UnitOfWork {
Authenticate(AuthenticateWork),
Ping(PingWork),
Expand All @@ -33,6 +47,13 @@ pub(crate) enum UnitOfWork {
Delete(DeleteWork),
}

impl UnitOfWork {
pub(crate) fn channel(buffer: usize) -> (UowSender, UowReceiver) {
let (tx, rx) = mpsc::channel(buffer);
(UowSender { tx }, UowReceiver { rx })
}
}

pub(crate) struct Work<Req, Res> {
pub(crate) principal: Arc<Principal>,
pub(crate) request: Req,
Expand Down

0 comments on commit 150fa31

Please sign in to comment.