From f58df31075fe4b32eb8d489af9204a9dabfd3bd2 Mon Sep 17 00:00:00 2001 From: Billy Chan Date: Mon, 21 Nov 2022 20:52:30 +0800 Subject: [PATCH 1/2] event stream (brainstorming) --- Cargo.toml | 3 ++ src/database/db_connection.rs | 13 +++++- src/database/event.rs | 81 +++++++++++++++++++++++++++++++++++ src/database/mod.rs | 2 + tests/basic.rs | 7 ++- 5 files changed, 103 insertions(+), 3 deletions(-) create mode 100644 src/database/event.rs diff --git a/Cargo.toml b/Cargo.toml index e06deb06d..6a4fa0b25 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -43,6 +43,8 @@ uuid = { version = "^1", default-features = false, optional = true } ouroboros = { version = "0.15", default-features = false } url = { version = "^2.2", default-features = false } thiserror = { version = "^1", default-features = false } +tokio = { version = "^1.6", default-features = false, features = ["sync"] } +async-channel = { version = "^1.7", default-features = false } [dev-dependencies] smol = { version = "^1.2" } @@ -59,6 +61,7 @@ pretty_assertions = { version = "^0.7" } time = { version = "^0.3", features = ["macros"] } uuid = { version = "^1", features = ["v4"] } once_cell = "1.8" +async-channel = { version = "^1.7" } [features] debug-print = [] diff --git a/src/database/db_connection.rs b/src/database/db_connection.rs index 7fd51633b..f38e2f951 100644 --- a/src/database/db_connection.rs +++ b/src/database/db_connection.rs @@ -1,6 +1,6 @@ use crate::{ - error::*, ConnectionTrait, DatabaseTransaction, ExecResult, QueryResult, Statement, - StatementBuilder, StreamTrait, TransactionError, TransactionTrait, + error::*, ConnectionTrait, DatabaseTransaction, EventStream, ExecResult, QueryResult, + Statement, StatementBuilder, StreamTrait, TransactionError, TransactionTrait, }; use sea_query::{MysqlQueryBuilder, PostgresQueryBuilder, QueryBuilder, SqliteQueryBuilder}; use std::{future::Future, pin::Pin}; @@ -283,6 +283,15 @@ impl DatabaseConnection { _ => {} } } + + pub fn set_event_stream(&mut self, event_stream: E) -> E::Receiver + where + E: EventStream, + { + let (sender, receiver) = event_stream.subscribe(); + // TODO: Save the `sender` in `DatabaseConnection` + receiver + } } impl DbBackend { diff --git a/src/database/event.rs b/src/database/event.rs new file mode 100644 index 000000000..f4bab9cc5 --- /dev/null +++ b/src/database/event.rs @@ -0,0 +1,81 @@ +use crate::DbErr; + +pub trait EventStream { + type Sender: EventSender; + type Receiver: EventReceiver; + + fn subscribe(self) -> (Self::Sender, Self::Receiver); +} + +#[async_trait::async_trait] +pub trait EventSender { + async fn send(&self, event: Event) -> Result<(), DbErr>; +} + +#[async_trait::async_trait] +pub trait EventReceiver { + async fn recv(&mut self) -> Result; +} + +#[derive(Debug, Clone)] +pub enum Event { + Insert, + Update, + Delete, +} + +mod event_stream_tokio { + use super::*; + use tokio::sync::broadcast::{Receiver, Sender}; + + impl EventStream for (Sender, Receiver) { + type Sender = Sender; + type Receiver = Receiver; + + fn subscribe(self) -> (Self::Sender, Self::Receiver) { + self + } + } + + #[async_trait::async_trait] + impl EventSender for Sender { + async fn send(&self, event: Event) -> Result<(), DbErr> { + self.send(event).map(|_| ()).map_err(|e| todo!()) + } + } + + #[async_trait::async_trait] + impl EventReceiver for Receiver { + async fn recv(&mut self) -> Result { + self.recv().await.map_err(|e| todo!()) + } + } +} + +mod event_stream_async_channel { + use super::*; + use async_channel::{Receiver, Sender}; + + impl EventStream for (Sender, Receiver) { + type Sender = Sender; + type Receiver = Receiver; + + fn subscribe(self) -> (Self::Sender, Self::Receiver) { + self + } + } + + #[async_trait::async_trait] + impl EventSender for Sender { + async fn send(&self, event: Event) -> Result<(), DbErr> { + self.send(event).await.map_err(|e| todo!()) + } + } + + #[async_trait::async_trait] + impl EventReceiver for Receiver { + async fn recv(&mut self) -> Result { + self.recv().await.map_err(|e| todo!()) + } + } +} diff --git a/src/database/mod.rs b/src/database/mod.rs index 795789e6f..6f2706f8e 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -2,6 +2,7 @@ use std::time::Duration; mod connection; mod db_connection; +mod event; #[cfg(feature = "mock")] mod mock; mod statement; @@ -10,6 +11,7 @@ mod transaction; pub use connection::*; pub use db_connection::*; +pub use event::*; #[cfg(feature = "mock")] pub use mock::*; pub use statement::*; diff --git a/tests/basic.rs b/tests/basic.rs index a7ec72f26..e4423b4ee 100644 --- a/tests/basic.rs +++ b/tests/basic.rs @@ -10,7 +10,12 @@ pub use sea_orm::{entity::*, error::*, query::*, sea_query, tests_cfg::*, Databa async fn main() -> Result<(), DbErr> { let base_url = std::env::var("DATABASE_URL").unwrap_or_else(|_| "sqlite::memory:".to_owned()); - let db: DbConn = Database::connect(&base_url).await?; + let mut db: DbConn = Database::connect(&base_url).await?; + + let tokio_receiver = db.set_event_stream(tokio::sync::broadcast::channel(10)); + + let async_channel_receiver = db.set_event_stream(async_channel::bounded(10)); + setup_schema(&db).await?; crud_cake(&db).await?; From 5d2543490b38f027f956fd6dc391f5e164f1fc27 Mon Sep 17 00:00:00 2001 From: Billy Chan Date: Fri, 27 Jan 2023 22:46:37 +0800 Subject: [PATCH 2/2] Drafting... --- Cargo.toml | 3 +-- src/database/event.rs | 60 +++++++++++++++++++------------------------ tests/basic.rs | 10 ++++++-- 3 files changed, 35 insertions(+), 38 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index c233505c3..1876e61e1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -44,8 +44,7 @@ uuid = { version = "1", default-features = false, optional = true } ouroboros = { version = "0.15", default-features = false } url = { version = "2.2", default-features = false } thiserror = { version = "1", default-features = false } -tokio = { version = "1.6", default-features = false, features = ["sync"] } -async-channel = { version = "1.7", default-features = false } +async-broadcast = { version = "0.5" } [dev-dependencies] smol = { version = "1.2" } diff --git a/src/database/event.rs b/src/database/event.rs index f4bab9cc5..15fc1af8f 100644 --- a/src/database/event.rs +++ b/src/database/event.rs @@ -1,4 +1,7 @@ -use crate::DbErr; +use crate::{DbErr, EntityTrait}; +use async_trait::async_trait; +use sea_query::{DynIden, Value}; +use std::{any::TypeId, collections::HashMap, fmt::Debug}; pub trait EventStream { type Sender: EventSender; @@ -7,54 +10,43 @@ pub trait EventStream { fn subscribe(self) -> (Self::Sender, Self::Receiver); } -#[async_trait::async_trait] +#[async_trait] pub trait EventSender { async fn send(&self, event: Event) -> Result<(), DbErr>; } -#[async_trait::async_trait] +#[async_trait] pub trait EventReceiver { async fn recv(&mut self) -> Result; } #[derive(Debug, Clone)] -pub enum Event { +pub struct Event { + pub entity_type_id: TypeId, + pub action: EventAction, + pub values: HashMap, +} + +#[derive(Debug, Clone)] +pub enum EventAction { Insert, Update, Delete, } -mod event_stream_tokio { - use super::*; - use tokio::sync::broadcast::{Receiver, Sender}; - - impl EventStream for (Sender, Receiver) { - type Sender = Sender; - type Receiver = Receiver; - - fn subscribe(self) -> (Self::Sender, Self::Receiver) { - self - } - } - - #[async_trait::async_trait] - impl EventSender for Sender { - async fn send(&self, event: Event) -> Result<(), DbErr> { - self.send(event).map(|_| ()).map_err(|e| todo!()) - } - } - - #[async_trait::async_trait] - impl EventReceiver for Receiver { - async fn recv(&mut self) -> Result { - self.recv().await.map_err(|e| todo!()) - } +impl Event { + pub fn of_entity(&self) -> bool + where + E: EntityTrait, + { + self.entity_type_id == TypeId::of::() } } -mod event_stream_async_channel { +mod impl_event_stream_for_async_broadcast { use super::*; - use async_channel::{Receiver, Sender}; + use async_broadcast::{Receiver, Sender}; + use futures::FutureExt; impl EventStream for (Sender, Receiver) { type Sender = Sender; @@ -65,14 +57,14 @@ mod event_stream_async_channel { } } - #[async_trait::async_trait] + #[async_trait] impl EventSender for Sender { async fn send(&self, event: Event) -> Result<(), DbErr> { - self.send(event).await.map_err(|e| todo!()) + self.broadcast(event).await.map(|_| ()).map_err(|e| todo!()) } } - #[async_trait::async_trait] + #[async_trait] impl EventReceiver for Receiver { async fn recv(&mut self) -> Result { self.recv().await.map_err(|e| todo!()) diff --git a/tests/basic.rs b/tests/basic.rs index e4423b4ee..f99235e27 100644 --- a/tests/basic.rs +++ b/tests/basic.rs @@ -12,9 +12,15 @@ async fn main() -> Result<(), DbErr> { let mut db: DbConn = Database::connect(&base_url).await?; - let tokio_receiver = db.set_event_stream(tokio::sync::broadcast::channel(10)); + let mut tokio_receiver = db.set_event_stream(async_broadcast::broadcast(10)); - let async_channel_receiver = db.set_event_stream(async_channel::bounded(10)); + while let Ok(event) = tokio_receiver.recv().await { + if event.of_entity::() { + if let Some(val) = event.values.get(cake::Column::Name.as_str()) { + todo!() + } + } + } setup_schema(&db).await?; crud_cake(&db).await?;