Skip to content

Commit

Permalink
feat(kvsd): impl accept connection
Browse files Browse the repository at this point in the history
  • Loading branch information
ymgyt committed Sep 23, 2024
1 parent 925ff79 commit 8ff5dd1
Show file tree
Hide file tree
Showing 17 changed files with 336 additions and 40 deletions.
7 changes: 5 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ async-trait = { version = "0.1.82", default-features = false }
axum = { version = "0.7.6", default-features = false }
axum-server = { version = "0.6.0", features = ["tls-rustls"] }
bitflags = { version = "2.6.0", default-features = false }
bytes = { version = "1.7.2" }
chrono = { version = "0.4.38", default-features = false }
clap = { version = "4.5", default-features = false }
criterion = { version = "0.5.1", features = ["async_tokio"] }
Expand Down
17 changes: 9 additions & 8 deletions crates/synd_kvsd/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,15 @@ synd-kvsd-protocol = { path = "../synd_kvsd_protocol", version = "0.1.0" }
synd-o11y = { path = "../synd_o11y", version = "0.1.8" }
synd-stdx = { path = "../synd_stdx", version = "0.1.0", features = ["color", "humantime", "byte", "conf"] }

chrono = { workspace = true }
clap = { workspace = true, features = ["derive", "env", "std"] }
crc32fast = "1.4.2"
serde = { workspace = true, features = ["derive"] }
thiserror = { workspace = true }
tokio = { workspace = true, features = ["rt", "rt-multi-thread", "io-util", "net", "signal", "fs", "sync", "macros", "time"] }
toml = { workspace = true }
tracing = { workspace = true }
chrono = { workspace = true }
clap = { workspace = true, features = ["derive", "env", "std"] }
crc32fast = "1.4.2"
futures-util = { workspace = true }
serde = { workspace = true, features = ["derive"] }
thiserror = { workspace = true }
tokio = { workspace = true, features = ["rt", "rt-multi-thread", "io-util", "net", "signal", "fs", "sync", "macros", "time"] }
toml = { workspace = true }
tracing = { workspace = true }

[features]

Expand Down
20 changes: 12 additions & 8 deletions crates/synd_kvsd/src/boot/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ use thiserror::Error;

use crate::{
boot::provision::{ProvisionError, Provisioner},
middleware::Dispatcher,
kvsd::Kvsd,
middleware::{Dispatcher, MiddlewareStack},
table::{Table, TableRef},
uow::UnitOfWork,
};
Expand All @@ -31,30 +32,33 @@ impl Boot {
}
}

pub async fn boot(self) -> Result<(), BootError> {
pub async fn boot(self) -> Result<Kvsd, BootError> {
let prov = Provisioner::new(self.root_dir).provision()?;
let mut dispatcher = Dispatcher::new();

// Create dispatcher
for (namespace, table_dir) in prov.table_dirs()? {
let table = Table::try_from_dir(table_dir)
.await
.map_err(|err| BootError::Table {
message: err.to_string(),
})?;
// TODO: configure buffer size
let (tx, rx) = UnitOfWork::channel(1024);
let (tx, rx) = UnitOfWork::channel(1024).split();
let table_ref = TableRef {
namespace,
name: table.name().into(),
};
dispatcher.add_table(table_ref, tx);

// TODO: abstract async runtime
tokio::spawn(table.run(rx));
}
// Create Middleware
// Create Kvsd
Ok(())

let kvsd = {
let mw = MiddlewareStack::new(dispatcher);
let uow_ch = UnitOfWork::channel(1024);
Kvsd::new(uow_ch, mw)
};

Ok(kvsd)
}
}
16 changes: 16 additions & 0 deletions crates/synd_kvsd/src/kvsd/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
use crate::{middleware::MiddlewareStack, uow::UowChannel};

#[expect(dead_code)]
pub struct Kvsd {
channel: UowChannel,
middlewares: MiddlewareStack,
}

impl Kvsd {
pub(crate) fn new(channel: UowChannel, middlewares: MiddlewareStack) -> Self {
Self {
channel,
middlewares,
}
}
}
2 changes: 2 additions & 0 deletions crates/synd_kvsd/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ pub mod boot;
pub mod cli;
pub mod config;
pub mod error;
pub mod server;
pub mod types;

mod authn;
mod kvsd;
mod middleware;
mod table;
mod uow;
3 changes: 1 addition & 2 deletions crates/synd_kvsd/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ async fn main() {
};
let _guard = init_tracing(&args.o11y);

// 1. Resolve Config
let config = match ConfigResolver::from_args(args.kvsd).resolve() {
Ok(config) => config,
Err(err) => {
Expand All @@ -45,7 +44,7 @@ async fn main() {
}
};

Boot::new(config.root_dir()).boot().await.unwrap();
let _kvsd = Boot::new(config.root_dir()).boot().await.unwrap();

// 7. Spawn Kvsd
// 8. Run Server
Expand Down
20 changes: 19 additions & 1 deletion crates/synd_kvsd/src/middleware/dispatcher.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,19 @@
use std::collections::HashMap;

use thiserror::Error;

use crate::{
middleware::Middleware,
table::{Namespace, TableRef},
uow::UowSender,
uow::{UnitOfWork, UowSender},
};

#[derive(Error, Debug)]
pub(crate) enum DispatchError {
#[error("table not found")]
TableNotFound,
}

pub(crate) struct Dispatcher {
// TODO: use TableName
table: HashMap<Namespace, HashMap<String, UowSender>>,
Expand Down Expand Up @@ -33,3 +42,12 @@ impl Dispatcher {
}
*/
}

impl Middleware for Dispatcher {
type Error = DispatchError;

async fn handle(&mut self, _uow: UnitOfWork) -> Result<(), Self::Error> {
// TODO: dispatch
Err(DispatchError::TableNotFound)
}
}
10 changes: 10 additions & 0 deletions crates/synd_kvsd/src/middleware/mod.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,12 @@
mod dispatcher;
pub(crate) use dispatcher::Dispatcher;
mod stack;
pub(crate) use stack::MiddlewareStack;
mod telemetry;

use crate::uow::UnitOfWork;

pub(crate) trait Middleware {
type Error;
async fn handle(&mut self, uow: UnitOfWork) -> Result<(), Self::Error>;
}
21 changes: 21 additions & 0 deletions crates/synd_kvsd/src/middleware/stack.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
use crate::{
middleware::{telemetry::Telemetry, Dispatcher, Middleware},
uow::UnitOfWork,
};

pub(crate) struct MiddlewareStack {
root: Telemetry<Dispatcher>,
}

impl MiddlewareStack {
pub(crate) fn new(dispatcher: Dispatcher) -> Self {
let telemetry = Telemetry::new(dispatcher);

Self { root: telemetry }
}

#[expect(dead_code)]
pub(crate) async fn handle(&mut self, uow: UnitOfWork) {
self.root.handle(uow).await.unwrap();
}
}
35 changes: 35 additions & 0 deletions crates/synd_kvsd/src/middleware/telemetry.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
use std::fmt;

use crate::{middleware::Middleware, uow::UnitOfWork};

use synd_stdx::prelude::*;

pub(crate) struct Telemetry<MW> {
next: MW,
}

impl<MW> Telemetry<MW> {
pub(super) fn new(next: MW) -> Self {
Self { next }
}
}

impl<MW> Middleware for Telemetry<MW>
where
MW: Middleware + Send + 'static,
<MW as Middleware>::Error: fmt::Display,
{
type Error = MW::Error;

async fn handle(&mut self, uow: UnitOfWork) -> Result<(), Self::Error> {
// TODO: emit metrics
let result = self.next.handle(uow).await;
match result {
Ok(()) => info!("Handle uow"),
// Should handle in Error mw ?
Err(ref err) => error!("{err}"),
}

result
}
}
25 changes: 25 additions & 0 deletions crates/synd_kvsd/src/server/handler.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
use synd_kvsd_protocol::Connection;

use crate::{authn::principal::Principal, server::IncommingConnection, uow::UowSender};

#[expect(dead_code)]
pub(super) struct Handler {
pub(super) principal: Principal,
pub(super) connection: IncommingConnection<Connection>,
pub(super) sender: UowSender,
}

impl Handler {
pub(super) fn new(connection: IncommingConnection<Connection>, sender: UowSender) -> Self {
Self {
principal: Principal::AnonymousUser,
connection,
sender,
}
}

#[expect(clippy::unused_async)]
pub(super) async fn handle(self) {
todo!()
}
}
35 changes: 35 additions & 0 deletions crates/synd_kvsd/src/server/listener.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
use std::sync::Arc;

use tokio::{
net::{TcpListener, TcpStream},
sync::Semaphore,
};

use crate::server::{IncommingConnection, ServerError};

pub(super) struct ConcurrencyLimited<Listener> {
listener: Listener,
semaphore: Arc<Semaphore>,
}

impl<Listener> ConcurrencyLimited<Listener> {
pub(super) fn new(listener: Listener, max_connections: usize) -> Self {
Self {
listener,
semaphore: Arc::new(Semaphore::new(max_connections)),
}
}
}

impl ConcurrencyLimited<TcpListener> {
pub(super) async fn accept(&mut self) -> Result<IncommingConnection<TcpStream>, ServerError> {
let permit = self.semaphore.clone().acquire_owned().await?;
let (stream, peer_addr) = self.listener.accept().await.map_err(ServerError::accept)?;

Ok(IncommingConnection {
connection: stream,
peer_addr,
permit,
})
}
}
Loading

0 comments on commit 8ff5dd1

Please sign in to comment.