From 150fa31548aa9d04072299dde63f9f6439c240e2 Mon Sep 17 00:00:00 2001 From: ymgyt Date: Thu, 19 Sep 2024 23:17:06 +0900 Subject: [PATCH] refactor(kvsd): wrap uow mpsc sender and receiver --- crates/synd_kvsd/src/boot/mod.rs | 6 +-- crates/synd_kvsd/src/middleware/dispatcher.rs | 8 ++- crates/synd_kvsd/src/table/table.rs | 50 +++++-------------- crates/synd_kvsd/src/uow/mod.rs | 23 ++++++++- 4 files changed, 41 insertions(+), 46 deletions(-) diff --git a/crates/synd_kvsd/src/boot/mod.rs b/crates/synd_kvsd/src/boot/mod.rs index a176057..f6262ef 100644 --- a/crates/synd_kvsd/src/boot/mod.rs +++ b/crates/synd_kvsd/src/boot/mod.rs @@ -2,12 +2,12 @@ use std::path::PathBuf; use thiserror::Error; -use tokio::sync::mpsc; use crate::{ boot::provision::{ProvisionError, Provisioner}, middleware::Dispatcher, table::{Table, TableRef}, + uow::UnitOfWork, }; mod provision; @@ -43,7 +43,7 @@ impl Boot { message: err.to_string(), })?; // TODO: configure buffer size - let (tx, _) = mpsc::channel(1024); + let (tx, rx) = UnitOfWork::channel(1024); let table_ref = TableRef { namespace, name: table.name().into(), @@ -51,7 +51,7 @@ impl Boot { dispatcher.add_table(table_ref, tx); // TODO: abstract async runtime - // tokio::spawn(table.run(rx)); + tokio::spawn(table.run(rx)); } // Create Middleware // Create Kvsd diff --git a/crates/synd_kvsd/src/middleware/dispatcher.rs b/crates/synd_kvsd/src/middleware/dispatcher.rs index 520d342..f4136a3 100644 --- a/crates/synd_kvsd/src/middleware/dispatcher.rs +++ b/crates/synd_kvsd/src/middleware/dispatcher.rs @@ -1,15 +1,13 @@ use std::collections::HashMap; -use tokio::sync::mpsc; - use crate::{ table::{Namespace, TableRef}, - uow::UnitOfWork, + uow::UowSender, }; pub(crate) struct Dispatcher { // TODO: use TableName - table: HashMap>>, + table: HashMap>, } impl Dispatcher { @@ -19,7 +17,7 @@ impl Dispatcher { } } - pub(crate) fn add_table(&mut self, table_ref: TableRef<'_>, sender: mpsc::Sender) { + pub(crate) fn add_table(&mut self, table_ref: TableRef<'_>, sender: UowSender) { self.table .entry(table_ref.namespace) .or_default() diff --git a/crates/synd_kvsd/src/table/table.rs b/crates/synd_kvsd/src/table/table.rs index e39f146..969a4d8 100644 --- a/crates/synd_kvsd/src/table/table.rs +++ b/crates/synd_kvsd/src/table/table.rs @@ -8,9 +8,12 @@ use tokio::{ }; use tracing::debug; -use crate::table::{ - index::{Index, IndexError}, - Namespace, +use crate::{ + table::{ + index::{Index, IndexError}, + Namespace, + }, + uow::{UnitOfWork, UowError, UowReceiver}, }; #[derive(Error, Debug)] @@ -109,8 +112,7 @@ where }) } - /* - pub(crate) async fn run(mut self, mut receiver: Receiver) { + pub(crate) async fn run(mut self, mut receiver: UowReceiver) { while let Some(uow) = receiver.recv().await { if let Err(err) = self.handle_uow(uow).await { error!("handle uow {}", err); @@ -118,7 +120,10 @@ where } } - async fn handle_uow(&mut self, uow: UnitOfWork) -> Result<()> { + #[expect(clippy::unused_async)] + async fn handle_uow(&mut self, _uow: UnitOfWork) -> Result<(), UowError> { + todo!() + /* match uow { UnitOfWork::Set(set) => { info!("{}", set.request); @@ -176,8 +181,10 @@ where } _ => unreachable!(), } + */ } + /* fn send_value( &self, sender: Option>>>, @@ -207,34 +214,3 @@ where } */ } - -/* -impl Table -where - File: AsyncRead + AsyncSeek + Unpin, -{ - pub(crate) async fn dump(&mut self, mut callback: F) -> Result<()> - where - F: FnMut(EntryDump), - { - let current = self.file.seek(SeekFrom::Current(0)).await?; - - self.file.seek(SeekFrom::Start(0)).await?; - - loop { - match Entry::decode_from(&mut self.file).await { - Ok((_, entry)) => { - callback(entry.into()); - } - Err(err) if err.is_eof() => break, - Err(err) => { - tracing::error!("{err}"); - } - } - } - - self.file.seek(SeekFrom::Start(current)).await?; - Ok(()) - } -} -*/ diff --git a/crates/synd_kvsd/src/uow/mod.rs b/crates/synd_kvsd/src/uow/mod.rs index 2e70856..2a9efbf 100644 --- a/crates/synd_kvsd/src/uow/mod.rs +++ b/crates/synd_kvsd/src/uow/mod.rs @@ -15,7 +15,7 @@ use thiserror::Error; use std::sync::Arc; -use tokio::sync::oneshot; +use tokio::sync::{mpsc, oneshot}; use crate::authn::principal::Principal; @@ -25,6 +25,20 @@ pub(crate) enum UowError { SendResponse, } +pub(crate) struct UowSender { + tx: mpsc::Sender, +} + +pub(crate) struct UowReceiver { + rx: mpsc::Receiver, +} + +impl UowReceiver { + pub(crate) async fn recv(&mut self) -> Option { + self.rx.recv().await + } +} + pub(crate) enum UnitOfWork { Authenticate(AuthenticateWork), Ping(PingWork), @@ -33,6 +47,13 @@ pub(crate) enum UnitOfWork { Delete(DeleteWork), } +impl UnitOfWork { + pub(crate) fn channel(buffer: usize) -> (UowSender, UowReceiver) { + let (tx, rx) = mpsc::channel(buffer); + (UowSender { tx }, UowReceiver { rx }) + } +} + pub(crate) struct Work { pub(crate) principal: Arc, pub(crate) request: Req,