diff --git a/agdb/src/query.rs b/agdb/src/query.rs index bdc4b703..b5897955 100644 --- a/agdb/src/query.rs +++ b/agdb/src/query.rs @@ -56,7 +56,7 @@ use crate::{ #[cfg(any(feature = "serde", feature = "openapi"))] #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] #[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))] -#[derive(Debug, PartialEq)] +#[derive(Clone, Debug, PartialEq)] #[expect(clippy::large_enum_variant)] pub enum QueryType { InsertAlias(InsertAliasesQuery), diff --git a/agdb/src/query/insert_aliases_query.rs b/agdb/src/query/insert_aliases_query.rs index 0fcc6907..1aa65130 100644 --- a/agdb/src/query/insert_aliases_query.rs +++ b/agdb/src/query/insert_aliases_query.rs @@ -15,7 +15,7 @@ use crate::StorageData; /// The result will contain number of aliases inserted/updated but no elements. #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] #[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))] -#[derive(Debug, PartialEq)] +#[derive(Clone, Debug, PartialEq)] pub struct InsertAliasesQuery { /// Ids to be aliased pub ids: QueryIds, diff --git a/agdb/src/query/insert_edges_query.rs b/agdb/src/query/insert_edges_query.rs index 30e370d9..3a4d81c1 100644 --- a/agdb/src/query/insert_edges_query.rs +++ b/agdb/src/query/insert_edges_query.rs @@ -28,7 +28,7 @@ use crate::StorageData; /// with their ids, origin and destination, but no properties. #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] #[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))] -#[derive(Debug, PartialEq)] +#[derive(Clone, Debug, PartialEq)] pub struct InsertEdgesQuery { /// Origins pub from: QueryIds, diff --git a/agdb/src/query/insert_index_query.rs b/agdb/src/query/insert_index_query.rs index 436f466f..7c9889a8 100644 --- a/agdb/src/query/insert_index_query.rs +++ b/agdb/src/query/insert_index_query.rs @@ -9,7 +9,7 @@ use crate::StorageData; /// a given key. #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] #[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))] -#[derive(Debug, PartialEq)] +#[derive(Clone, Debug, PartialEq)] pub struct InsertIndexQuery(pub DbValue); impl QueryMut for InsertIndexQuery { diff --git a/agdb/src/query/insert_nodes_query.rs b/agdb/src/query/insert_nodes_query.rs index e34ea10d..110d8f9b 100644 --- a/agdb/src/query/insert_nodes_query.rs +++ b/agdb/src/query/insert_nodes_query.rs @@ -27,7 +27,7 @@ use crate::StorageData; /// with their ids but no properties. #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] #[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))] -#[derive(Debug, PartialEq)] +#[derive(Clone, Debug, PartialEq)] pub struct InsertNodesQuery { /// Number of nodes to be inserted. pub count: u64, diff --git a/agdb/src/query/insert_values_query.rs b/agdb/src/query/insert_values_query.rs index 2eab5582..3880c441 100644 --- a/agdb/src/query/insert_values_query.rs +++ b/agdb/src/query/insert_values_query.rs @@ -24,7 +24,7 @@ use crate::StorageData; /// NOTE: The result is NOT number of affected elements but individual properties. #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] #[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))] -#[derive(Debug, PartialEq)] +#[derive(Clone, Debug, PartialEq)] pub struct InsertValuesQuery { /// Ids whose properties should be updated pub ids: QueryIds, diff --git a/agdb/src/query/query_values.rs b/agdb/src/query/query_values.rs index 679a2185..df804ec2 100644 --- a/agdb/src/query/query_values.rs +++ b/agdb/src/query/query_values.rs @@ -5,7 +5,7 @@ use crate::DbUserValue; /// and multiple (`Multi`) values in database queries. #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] #[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))] -#[derive(Debug, PartialEq)] +#[derive(Clone, Debug, PartialEq)] pub enum QueryValues { /// Single list of properties (key-value pairs) /// to be applied to all elements in a query. diff --git a/agdb/src/query/remove_aliases_query.rs b/agdb/src/query/remove_aliases_query.rs index 5ffb0e3c..e2424bb9 100644 --- a/agdb/src/query/remove_aliases_query.rs +++ b/agdb/src/query/remove_aliases_query.rs @@ -12,7 +12,7 @@ use crate::StorageData; /// many aliases have been actually removed. #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] #[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))] -#[derive(Debug, PartialEq)] +#[derive(Clone, Debug, PartialEq)] pub struct RemoveAliasesQuery(pub Vec); impl QueryMut for RemoveAliasesQuery { diff --git a/agdb/src/query/remove_index_query.rs b/agdb/src/query/remove_index_query.rs index 4d55f163..18a92271 100644 --- a/agdb/src/query/remove_index_query.rs +++ b/agdb/src/query/remove_index_query.rs @@ -9,7 +9,7 @@ use crate::StorageData; /// a given key. #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] #[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))] -#[derive(Debug, PartialEq)] +#[derive(Clone, Debug, PartialEq)] pub struct RemoveIndexQuery(pub DbValue); impl QueryMut for RemoveIndexQuery { diff --git a/agdb/src/query/remove_query.rs b/agdb/src/query/remove_query.rs index ab18d4f2..c9791fe9 100644 --- a/agdb/src/query/remove_query.rs +++ b/agdb/src/query/remove_query.rs @@ -16,7 +16,7 @@ use crate::StorageData; /// also removed along with their properties. #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] #[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))] -#[derive(Debug, PartialEq)] +#[derive(Clone, Debug, PartialEq)] pub struct RemoveQuery(pub QueryIds); impl QueryMut for RemoveQuery { diff --git a/agdb/src/query/remove_values_query.rs b/agdb/src/query/remove_values_query.rs index 992f0ffc..ee43236e 100644 --- a/agdb/src/query/remove_values_query.rs +++ b/agdb/src/query/remove_values_query.rs @@ -15,7 +15,7 @@ use crate::StorageData; /// do not exist on any of the elements). #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] #[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))] -#[derive(Debug, PartialEq)] +#[derive(Clone, Debug, PartialEq)] pub struct RemoveValuesQuery(pub SelectValuesQuery); impl QueryMut for RemoveValuesQuery { diff --git a/agdb/src/query/select_aliases_query.rs b/agdb/src/query/select_aliases_query.rs index d331d3fc..a6f83b78 100644 --- a/agdb/src/query/select_aliases_query.rs +++ b/agdb/src/query/select_aliases_query.rs @@ -17,7 +17,7 @@ use crate::StorageData; /// the value `String`. #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] #[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))] -#[derive(Debug, PartialEq)] +#[derive(Clone, Debug, PartialEq)] pub struct SelectAliasesQuery(pub QueryIds); impl Query for SelectAliasesQuery { diff --git a/agdb/src/query/select_all_aliases_query.rs b/agdb/src/query/select_all_aliases_query.rs index d5662908..cd0694d8 100644 --- a/agdb/src/query/select_all_aliases_query.rs +++ b/agdb/src/query/select_all_aliases_query.rs @@ -12,7 +12,7 @@ use crate::StorageData; /// the value `String`. #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] #[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))] -#[derive(Debug, PartialEq)] +#[derive(Clone, Debug, PartialEq)] pub struct SelectAllAliasesQuery {} impl Query for SelectAllAliasesQuery { diff --git a/agdb/src/query/select_edge_count_query.rs b/agdb/src/query/select_edge_count_query.rs index df88dddc..429216e8 100644 --- a/agdb/src/query/select_edge_count_query.rs +++ b/agdb/src/query/select_edge_count_query.rs @@ -22,7 +22,7 @@ use crate::StorageData; /// might be greater than number of unique db elements. #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] #[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))] -#[derive(Debug, PartialEq)] +#[derive(Clone, Debug, PartialEq)] pub struct SelectEdgeCountQuery { /// Ids of the nodes to select edge count for. pub ids: QueryIds, diff --git a/agdb/src/query/select_indexes_query.rs b/agdb/src/query/select_indexes_query.rs index db621519..9f2eb5fc 100644 --- a/agdb/src/query/select_indexes_query.rs +++ b/agdb/src/query/select_indexes_query.rs @@ -14,7 +14,7 @@ use crate::StorageData; /// index. #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] #[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))] -#[derive(Debug, PartialEq)] +#[derive(Clone, Debug, PartialEq)] pub struct SelectIndexesQuery {} impl Query for SelectIndexesQuery { diff --git a/agdb/src/query/select_key_count_query.rs b/agdb/src/query/select_key_count_query.rs index e319883a..1b587f69 100644 --- a/agdb/src/query/select_key_count_query.rs +++ b/agdb/src/query/select_key_count_query.rs @@ -16,7 +16,7 @@ use crate::StorageData; /// a value `u64`. #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] #[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))] -#[derive(Debug, PartialEq)] +#[derive(Clone, Debug, PartialEq)] pub struct SelectKeyCountQuery(pub QueryIds); impl Query for SelectKeyCountQuery { diff --git a/agdb/src/query/select_keys_query.rs b/agdb/src/query/select_keys_query.rs index f5d892df..1c349756 100644 --- a/agdb/src/query/select_keys_query.rs +++ b/agdb/src/query/select_keys_query.rs @@ -15,7 +15,7 @@ use crate::StorageData; /// of elements with all properties except all values will be empty. #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] #[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))] -#[derive(Debug, PartialEq)] +#[derive(Clone, Debug, PartialEq)] pub struct SelectKeysQuery(pub QueryIds); impl Query for SelectKeysQuery { diff --git a/agdb/src/query/select_node_count.rs b/agdb/src/query/select_node_count.rs index c9311668..e753b726 100644 --- a/agdb/src/query/select_node_count.rs +++ b/agdb/src/query/select_node_count.rs @@ -13,7 +13,7 @@ use crate::StorageData; /// a value `u64` represneting number of nodes in teh database. #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] #[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))] -#[derive(Debug, PartialEq)] +#[derive(Clone, Debug, PartialEq)] pub struct SelectNodeCountQuery {} impl Query for SelectNodeCountQuery { diff --git a/agdb/src/query/select_values_query.rs b/agdb/src/query/select_values_query.rs index 998cccca..22eb80ad 100644 --- a/agdb/src/query/select_values_query.rs +++ b/agdb/src/query/select_values_query.rs @@ -17,7 +17,7 @@ use crate::StorageData; /// list of elements with the requested properties. #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] #[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))] -#[derive(Debug, PartialEq)] +#[derive(Clone, Debug, PartialEq)] pub struct SelectValuesQuery { pub keys: Vec, pub ids: QueryIds, diff --git a/agdb/storage2 b/agdb/storage2 new file mode 100644 index 00000000..6a47775b Binary files /dev/null and b/agdb/storage2 differ diff --git a/agdb_api/rust/src/api.rs b/agdb_api/rust/src/api.rs index 2dd7cc93..e02a033e 100644 --- a/agdb_api/rust/src/api.rs +++ b/agdb_api/rust/src/api.rs @@ -156,7 +156,7 @@ impl AgdbApi { &self, owner: &str, db: &str, - queries: &Vec, + queries: &[QueryType], ) -> AgdbApiResult<(u16, Vec)> { self.client .post( diff --git a/agdb_api/rust/src/api_types.rs b/agdb_api/rust/src/api_types.rs index 3891d987..17e08e5d 100644 --- a/agdb_api/rust/src/api_types.rs +++ b/agdb_api/rust/src/api_types.rs @@ -1,11 +1,10 @@ -use std::fmt::Display; - use agdb::DbError; use agdb::DbValue; use agdb::QueryResult; use agdb::QueryType; use serde::Deserialize; use serde::Serialize; +use std::fmt::Display; use utoipa::ToSchema; #[derive( @@ -70,7 +69,7 @@ pub struct AdminStatus { pub size: u64, } -#[derive(Deserialize, ToSchema)] +#[derive(Clone, Deserialize, Serialize, ToSchema)] pub struct Queries(pub Vec); #[derive(Serialize, ToSchema)] @@ -116,9 +115,20 @@ pub struct UserStatus { impl From<&str> for DbType { fn from(value: &str) -> Self { match value { - "mapped" => DbType::Mapped, - "file" => DbType::File, - _ => DbType::Memory, + "mapped" => Self::Mapped, + "file" => Self::File, + _ => Self::Memory, + } + } +} + +impl From<&str> for DbResource { + fn from(value: &str) -> Self { + match value { + "db" => Self::Db, + "audit" => Self::Audit, + "backup" => Self::Backup, + _ => Self::All, } } } @@ -131,12 +141,26 @@ impl TryFrom for DbType { } } +impl TryFrom for DbResource { + type Error = DbError; + + fn try_from(value: DbValue) -> Result { + Ok(Self::from(value.to_string().as_str())) + } +} + impl From for DbValue { fn from(value: DbType) -> Self { value.to_string().into() } } +impl From for DbValue { + fn from(value: DbResource) -> Self { + value.to_string().into() + } +} + impl Display for DbType { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { diff --git a/agdb_server/openapi.json b/agdb_server/openapi.json index 0858b84b..c0703445 100644 --- a/agdb_server/openapi.json +++ b/agdb_server/openapi.json @@ -15,6 +15,41 @@ } ], "paths": { + "/api/v1/admin/cluster/{username}/logout": { + "post": { + "tags": [ + "agdb" + ], + "operationId": "admin_cluster_logout", + "parameters": [ + { + "name": "username", + "in": "path", + "description": "user name", + "required": true, + "schema": { + "type": "string" + } + } + ], + "responses": { + "201": { + "description": "user logged out" + }, + "401": { + "description": "admin only" + }, + "404": { + "description": "user not found" + } + }, + "security": [ + { + "Token": [] + } + ] + } + }, "/api/v1/admin/db/list": { "get": { "tags": [ @@ -1062,6 +1097,60 @@ ] } }, + "/api/v1/cluster/login": { + "post": { + "tags": [ + "agdb" + ], + "operationId": "cluster_login", + "requestBody": { + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/UserLogin" + } + } + }, + "required": true + }, + "responses": { + "200": { + "description": "login successful", + "content": { + "text/plain": { + "schema": { + "type": "string" + } + } + } + }, + "401": { + "description": "invalid credentials" + } + } + } + }, + "/api/v1/cluster/logout": { + "post": { + "tags": [ + "agdb" + ], + "operationId": "cluster_logout", + "responses": { + "201": { + "description": "user logged out" + }, + "401": { + "description": "invalid credentials" + } + }, + "security": [ + { + "Token": [] + } + ] + } + }, "/api/v1/cluster/status": { "get": { "tags": [ @@ -1119,7 +1208,7 @@ "description": "unauthorized" }, "404": { - "description": "backup not found" + "description": "db or backup not found" } }, "security": [ @@ -1647,6 +1736,9 @@ }, "403": { "description": "must have write permissions" + }, + "404": { + "description": "db not found" } }, "security": [ diff --git a/agdb_server/src/action.rs b/agdb_server/src/action.rs index 80c996d5..426838a8 100644 --- a/agdb_server/src/action.rs +++ b/agdb_server/src/action.rs @@ -1,16 +1,39 @@ pub(crate) mod change_password; pub(crate) mod cluster_login; +pub(crate) mod db_add; +pub(crate) mod db_backup; +pub(crate) mod db_clear; +pub(crate) mod db_convert; +pub(crate) mod db_copy; +pub(crate) mod db_delete; +pub(crate) mod db_exec; +pub(crate) mod db_optimize; +pub(crate) mod db_remove; +pub(crate) mod db_rename; +pub(crate) mod db_restore; pub(crate) mod user_add; pub(crate) mod user_remove; use crate::action::change_password::ChangePassword; use crate::action::cluster_login::ClusterLogin; +use crate::action::db_add::DbAdd; +use crate::action::db_backup::DbBackup; +use crate::action::db_clear::DbClear; +use crate::action::db_convert::DbConvert; +use crate::action::db_copy::DbCopy; +use crate::action::db_delete::DbDelete; +use crate::action::db_exec::DbExec; +use crate::action::db_optimize::DbOptimize; +use crate::action::db_remove::DbRemove; +use crate::action::db_rename::DbRename; +use crate::action::db_restore::DbRestore; use crate::action::user_add::UserAdd; use crate::action::user_remove::UserRemove; use crate::config::Config; use crate::db_pool::DbPool; use crate::server_db::ServerDb; use crate::server_error::ServerResult; +use agdb::QueryResult; use serde::Deserialize; use serde::Serialize; @@ -20,24 +43,56 @@ pub(crate) enum ClusterAction { ClusterLogin(ClusterLogin), ChangePassword(ChangePassword), UserRemove(UserRemove), + DbAdd(DbAdd), + DbBackup(DbBackup), + DbClear(DbClear), + DbConvert(DbConvert), + DbCopy(DbCopy), + DbDelete(DbDelete), + DbRemove(DbRemove), + DbExec(DbExec), + DbOptimize(DbOptimize), + DbRestore(DbRestore), + DbRename(DbRename), +} + +pub(crate) enum ClusterActionResult { + None, + QueryResults(Vec), } pub(crate) trait Action: Sized { - async fn exec(self, db: &mut ServerDb, db_pool: &mut DbPool, config: &Config) -> ServerResult; + async fn exec( + self, + db: ServerDb, + db_pool: DbPool, + config: &Config, + ) -> ServerResult; } impl ClusterAction { pub(crate) async fn exec( self, - db: &mut ServerDb, - db_pool: &mut DbPool, + db: ServerDb, + db_pool: DbPool, config: &Config, - ) -> ServerResult<()> { + ) -> ServerResult { match self { ClusterAction::UserAdd(action) => action.exec(db, db_pool, config).await, ClusterAction::ClusterLogin(action) => action.exec(db, db_pool, config).await, ClusterAction::ChangePassword(action) => action.exec(db, db_pool, config).await, ClusterAction::UserRemove(action) => action.exec(db, db_pool, config).await, + ClusterAction::DbAdd(action) => action.exec(db, db_pool, config).await, + ClusterAction::DbBackup(action) => action.exec(db, db_pool, config).await, + ClusterAction::DbClear(action) => action.exec(db, db_pool, config).await, + ClusterAction::DbConvert(action) => action.exec(db, db_pool, config).await, + ClusterAction::DbCopy(action) => action.exec(db, db_pool, config).await, + ClusterAction::DbDelete(action) => action.exec(db, db_pool, config).await, + ClusterAction::DbRemove(action) => action.exec(db, db_pool, config).await, + ClusterAction::DbExec(action) => action.exec(db, db_pool, config).await, + ClusterAction::DbOptimize(action) => action.exec(db, db_pool, config).await, + ClusterAction::DbRestore(action) => action.exec(db, db_pool, config).await, + ClusterAction::DbRename(action) => action.exec(db, db_pool, config).await, } } } @@ -65,3 +120,69 @@ impl From for ClusterAction { ClusterAction::UserRemove(value) } } + +impl From for ClusterAction { + fn from(value: DbAdd) -> Self { + ClusterAction::DbAdd(value) + } +} + +impl From for ClusterAction { + fn from(value: DbBackup) -> Self { + ClusterAction::DbBackup(value) + } +} + +impl From for ClusterAction { + fn from(value: DbClear) -> Self { + ClusterAction::DbClear(value) + } +} + +impl From for ClusterAction { + fn from(value: DbConvert) -> Self { + ClusterAction::DbConvert(value) + } +} + +impl From for ClusterAction { + fn from(value: DbCopy) -> Self { + ClusterAction::DbCopy(value) + } +} + +impl From for ClusterAction { + fn from(value: DbDelete) -> Self { + ClusterAction::DbDelete(value) + } +} + +impl From for ClusterAction { + fn from(value: DbRemove) -> Self { + ClusterAction::DbRemove(value) + } +} + +impl From for ClusterAction { + fn from(value: DbExec) -> Self { + ClusterAction::DbExec(value) + } +} + +impl From for ClusterAction { + fn from(value: DbOptimize) -> Self { + ClusterAction::DbOptimize(value) + } +} + +impl From for ClusterAction { + fn from(value: DbRestore) -> Self { + ClusterAction::DbRestore(value) + } +} + +impl From for ClusterAction { + fn from(value: DbRename) -> Self { + ClusterAction::DbRename(value) + } +} diff --git a/agdb_server/src/action/change_password.rs b/agdb_server/src/action/change_password.rs index 7301fbb2..5602e082 100644 --- a/agdb_server/src/action/change_password.rs +++ b/agdb_server/src/action/change_password.rs @@ -1,6 +1,7 @@ use super::DbPool; use super::ServerDb; use crate::action::Action; +use crate::action::ClusterActionResult; use crate::action::Config; use crate::server_error::ServerResult; use agdb::UserValue; @@ -17,15 +18,15 @@ pub(crate) struct ChangePassword { impl Action for ChangePassword { async fn exec( self, - db: &mut ServerDb, - _db_pool: &mut DbPool, + db: ServerDb, + _db_pool: DbPool, _config: &Config, - ) -> ServerResult { + ) -> ServerResult { let mut user = db.user(&self.user).await?; user.password = self.new_password; user.salt = self.new_salt; db.save_user(user).await?; - Ok(()) + Ok(ClusterActionResult::None) } } diff --git a/agdb_server/src/action/cluster_login.rs b/agdb_server/src/action/cluster_login.rs index 21f3a437..40600ce4 100644 --- a/agdb_server/src/action/cluster_login.rs +++ b/agdb_server/src/action/cluster_login.rs @@ -1,6 +1,7 @@ use super::DbPool; use super::ServerDb; use crate::action::Action; +use crate::action::ClusterActionResult; use crate::action::Config; use crate::server_error::ServerResult; use agdb::UserValue; @@ -16,13 +17,13 @@ pub(crate) struct ClusterLogin { impl Action for ClusterLogin { async fn exec( self, - db: &mut ServerDb, - _db_pool: &mut DbPool, + db: ServerDb, + _db_pool: DbPool, _config: &Config, - ) -> ServerResult { + ) -> ServerResult { let user_id = db.user_id(&self.user).await?; db.save_token(user_id, &self.new_token).await?; - Ok(()) + Ok(ClusterActionResult::None) } } diff --git a/agdb_server/src/action/db_add.rs b/agdb_server/src/action/db_add.rs new file mode 100644 index 00000000..0600141f --- /dev/null +++ b/agdb_server/src/action/db_add.rs @@ -0,0 +1,49 @@ +use super::DbPool; +use super::ServerDb; +use crate::action::Action; +use crate::action::ClusterActionResult; +use crate::action::Config; +use crate::server_db::Database; +use crate::server_error::ServerResult; +use crate::utilities::db_name; +use agdb::UserValue; +use agdb_api::DbType; +use serde::Deserialize; +use serde::Serialize; + +#[derive(Clone, Serialize, Deserialize, UserValue)] +pub(crate) struct DbAdd { + pub(crate) owner: String, + pub(crate) db: String, + pub(crate) db_type: DbType, +} + +impl Action for DbAdd { + async fn exec( + self, + db: ServerDb, + db_pool: DbPool, + config: &Config, + ) -> ServerResult { + let name = db_name(&self.owner, &self.db); + + let backup = db_pool + .add_db(&self.owner, &self.db, &name, self.db_type, config) + .await?; + + let owner = db.user_id(&self.owner).await?; + + db.insert_db( + owner, + Database { + db_id: None, + name, + db_type: self.db_type, + backup, + }, + ) + .await?; + + Ok(ClusterActionResult::None) + } +} diff --git a/agdb_server/src/action/db_backup.rs b/agdb_server/src/action/db_backup.rs new file mode 100644 index 00000000..930e352d --- /dev/null +++ b/agdb_server/src/action/db_backup.rs @@ -0,0 +1,34 @@ +use super::DbPool; +use super::ServerDb; +use crate::action::Action; +use crate::action::ClusterActionResult; +use crate::action::Config; +use crate::server_error::ServerResult; +use crate::utilities::db_name; +use agdb::UserValue; +use serde::Deserialize; +use serde::Serialize; + +#[derive(Clone, Serialize, Deserialize, UserValue)] +pub(crate) struct DbBackup { + pub(crate) owner: String, + pub(crate) db: String, +} + +impl Action for DbBackup { + async fn exec( + self, + db: ServerDb, + db_pool: DbPool, + config: &Config, + ) -> ServerResult { + let owner = db.user_id(&self.owner).await?; + let name = db_name(&self.owner, &self.db); + let mut database = db.user_db(owner, &name).await?; + database.backup = db_pool + .backup_db(&self.owner, &self.db, &name, database.db_type, config) + .await?; + db.save_db(&database).await?; + Ok(ClusterActionResult::None) + } +} diff --git a/agdb_server/src/action/db_clear.rs b/agdb_server/src/action/db_clear.rs new file mode 100644 index 00000000..2db31eef --- /dev/null +++ b/agdb_server/src/action/db_clear.rs @@ -0,0 +1,36 @@ +use super::DbPool; +use super::ServerDb; +use crate::action::Action; +use crate::action::ClusterActionResult; +use crate::action::Config; +use crate::server_error::ServerResult; +use crate::utilities::db_name; +use agdb::UserValue; +use agdb_api::DbResource; +use serde::Deserialize; +use serde::Serialize; + +#[derive(Clone, Serialize, Deserialize, UserValue)] +pub(crate) struct DbClear { + pub(crate) owner: String, + pub(crate) db: String, + pub(crate) resource: DbResource, +} + +impl Action for DbClear { + async fn exec( + self, + db: ServerDb, + db_pool: DbPool, + config: &Config, + ) -> ServerResult { + let owner = db.user_id(&self.owner).await?; + let name = db_name(&self.owner, &self.db); + let mut database = db.user_db(owner, &name).await?; + db_pool + .clear_db(&self.owner, &self.db, &mut database, config, self.resource) + .await?; + db.save_db(&database).await?; + Ok(ClusterActionResult::None) + } +} diff --git a/agdb_server/src/action/db_convert.rs b/agdb_server/src/action/db_convert.rs new file mode 100644 index 00000000..bfcd618b --- /dev/null +++ b/agdb_server/src/action/db_convert.rs @@ -0,0 +1,44 @@ +use super::DbPool; +use super::ServerDb; +use crate::action::Action; +use crate::action::ClusterActionResult; +use crate::action::Config; +use crate::server_error::ServerResult; +use crate::utilities::db_name; +use agdb::UserValue; +use agdb_api::DbType; +use serde::Deserialize; +use serde::Serialize; + +#[derive(Clone, Serialize, Deserialize, UserValue)] +pub(crate) struct DbConvert { + pub(crate) owner: String, + pub(crate) db: String, + pub(crate) db_type: DbType, +} + +impl Action for DbConvert { + async fn exec( + self, + db: ServerDb, + db_pool: DbPool, + config: &Config, + ) -> ServerResult { + let owner = db.user_id(&self.owner).await?; + let name = db_name(&self.owner, &self.db); + let mut database = db.user_db(owner, &name).await?; + db_pool + .convert_db( + &self.owner, + &self.db, + &name, + database.db_type, + self.db_type, + config, + ) + .await?; + database.db_type = self.db_type; + db.save_db(&database).await?; + Ok(ClusterActionResult::None) + } +} diff --git a/agdb_server/src/action/db_copy.rs b/agdb_server/src/action/db_copy.rs new file mode 100644 index 00000000..2379b559 --- /dev/null +++ b/agdb_server/src/action/db_copy.rs @@ -0,0 +1,48 @@ +use super::DbPool; +use super::ServerDb; +use crate::action::Action; +use crate::action::ClusterActionResult; +use crate::action::Config; +use crate::server_db::Database; +use crate::server_error::ServerResult; +use crate::utilities::db_name; +use agdb::UserValue; +use agdb_api::DbType; +use serde::Deserialize; +use serde::Serialize; + +#[derive(Clone, Serialize, Deserialize, UserValue)] +pub(crate) struct DbCopy { + pub(crate) owner: String, + pub(crate) db: String, + pub(crate) new_owner: String, + pub(crate) new_db: String, + pub(crate) db_type: DbType, +} + +impl Action for DbCopy { + async fn exec( + self, + db: ServerDb, + db_pool: DbPool, + config: &Config, + ) -> ServerResult { + let name = db_name(&self.owner, &self.db); + let target_name = db_name(&self.new_owner, &self.new_db); + let new_owner_id = db.user_id(&self.new_owner).await?; + db_pool + .copy_db(&name, &self.new_owner, &self.new_db, &target_name, config) + .await?; + db.insert_db( + new_owner_id, + Database { + db_id: None, + name: target_name, + db_type: self.db_type, + backup: 0, + }, + ) + .await?; + Ok(ClusterActionResult::None) + } +} diff --git a/agdb_server/src/action/db_delete.rs b/agdb_server/src/action/db_delete.rs new file mode 100644 index 00000000..ddd249ac --- /dev/null +++ b/agdb_server/src/action/db_delete.rs @@ -0,0 +1,33 @@ +use super::DbPool; +use super::ServerDb; +use crate::action::Action; +use crate::action::ClusterActionResult; +use crate::action::Config; +use crate::server_error::ServerResult; +use crate::utilities::db_name; +use agdb::UserValue; +use serde::Deserialize; +use serde::Serialize; + +#[derive(Clone, Serialize, Deserialize, UserValue)] +pub(crate) struct DbDelete { + pub(crate) owner: String, + pub(crate) db: String, +} + +impl Action for DbDelete { + async fn exec( + self, + db: ServerDb, + db_pool: DbPool, + config: &Config, + ) -> ServerResult { + let name = db_name(&self.owner, &self.db); + let user_id = db.user_id(&self.owner).await?; + db.remove_db(user_id, &name).await?; + db_pool + .delete_db(&self.owner, &self.db, &name, config) + .await?; + Ok(ClusterActionResult::None) + } +} diff --git a/agdb_server/src/action/db_exec.rs b/agdb_server/src/action/db_exec.rs new file mode 100644 index 00000000..19f58e41 --- /dev/null +++ b/agdb_server/src/action/db_exec.rs @@ -0,0 +1,98 @@ +use super::DbPool; +use super::ServerDb; +use crate::action::Action; +use crate::action::ClusterActionResult; +use crate::action::Config; +use crate::server_error::ServerResult; +use crate::utilities::db_name; +use agdb::DbUserValue; +use agdb::QueryResult; +use agdb_api::Queries; +use serde::Deserialize; +use serde::Serialize; + +#[derive(Clone, Serialize, Deserialize)] +pub(crate) struct DbExec { + pub(crate) user: String, + pub(crate) owner: String, + pub(crate) db: String, + pub(crate) queries: Queries, +} + +impl Action for DbExec { + async fn exec( + self, + _db: ServerDb, + db_pool: DbPool, + config: &Config, + ) -> ServerResult { + let name = db_name(&self.owner, &self.db); + Ok(ClusterActionResult::QueryResults( + db_pool + .exec_mut( + &self.owner, + &self.db, + &name, + &self.user, + self.queries, + config, + ) + .await?, + )) + } +} + +impl DbUserValue for DbExec { + type ValueType = DbExec; + + fn db_id(&self) -> Option { + None + } + + fn db_keys() -> Vec { + vec!["user".into(), "owner".into(), "db".into(), "queries".into()] + } + + fn from_db_element(element: &agdb::DbElement) -> Result { + Ok(Self { + user: element.values[0].value.string()?.clone(), + owner: element.values[1].value.string()?.clone(), + db: element.values[2].value.string()?.clone(), + queries: serde_json::from_str(element.values[3].value.string()?) + .map_err(|e| agdb::DbError::from(e.to_string()))?, + }) + } + + fn to_db_values(&self) -> Vec { + let queries = serde_json::to_string(&self.queries).unwrap_or_default(); + + vec![ + ("user", &self.user).into(), + ("owner", &self.owner).into(), + ("db", &self.db).into(), + ("queries", queries).into(), + ] + } +} + +impl TryFrom<&agdb::DbElement> for DbExec { + type Error = agdb::DbError; + + #[track_caller] + fn try_from(value: &agdb::DbElement) -> std::result::Result { + DbExec::from_db_element(value) + } +} + +impl TryFrom for DbExec { + type Error = agdb::DbError; + + #[track_caller] + fn try_from(value: QueryResult) -> Result { + value + .elements + .first() + .ok_or(Self::Error::from("No element found"))? + .try_into() + } +} diff --git a/agdb_server/src/action/db_optimize.rs b/agdb_server/src/action/db_optimize.rs new file mode 100644 index 00000000..518e9659 --- /dev/null +++ b/agdb_server/src/action/db_optimize.rs @@ -0,0 +1,30 @@ +use super::DbPool; +use super::ServerDb; +use crate::action::Action; +use crate::action::ClusterActionResult; +use crate::action::Config; +use crate::server_error::ServerResult; +use crate::utilities::db_name; +use agdb::UserValue; +use serde::Deserialize; +use serde::Serialize; + +#[derive(Clone, Serialize, Deserialize, UserValue)] +pub(crate) struct DbOptimize { + pub(crate) owner: String, + pub(crate) db: String, +} + +impl Action for DbOptimize { + async fn exec( + self, + _db: ServerDb, + db_pool: DbPool, + _config: &Config, + ) -> ServerResult { + let name = db_name(&self.owner, &self.db); + db_pool.optimize_db(&name).await?; + + Ok(ClusterActionResult::None) + } +} diff --git a/agdb_server/src/action/db_remove.rs b/agdb_server/src/action/db_remove.rs new file mode 100644 index 00000000..351ed3a3 --- /dev/null +++ b/agdb_server/src/action/db_remove.rs @@ -0,0 +1,31 @@ +use super::DbPool; +use super::ServerDb; +use crate::action::Action; +use crate::action::ClusterActionResult; +use crate::action::Config; +use crate::server_error::ServerResult; +use crate::utilities::db_name; +use agdb::UserValue; +use serde::Deserialize; +use serde::Serialize; + +#[derive(Clone, Serialize, Deserialize, UserValue)] +pub(crate) struct DbRemove { + pub(crate) owner: String, + pub(crate) db: String, +} + +impl Action for DbRemove { + async fn exec( + self, + db: ServerDb, + db_pool: DbPool, + _config: &Config, + ) -> ServerResult { + let name = db_name(&self.owner, &self.db); + let user_id = db.user_id(&self.owner).await?; + db.remove_db(user_id, &name).await?; + db_pool.remove_db(&name).await?; + Ok(ClusterActionResult::None) + } +} diff --git a/agdb_server/src/action/db_rename.rs b/agdb_server/src/action/db_rename.rs new file mode 100644 index 00000000..449ed0f3 --- /dev/null +++ b/agdb_server/src/action/db_rename.rs @@ -0,0 +1,58 @@ +use super::DbPool; +use super::ServerDb; +use crate::action::Action; +use crate::action::ClusterActionResult; +use crate::action::Config; +use crate::server_error::ServerResult; +use crate::utilities::db_name; +use agdb::UserValue; +use agdb_api::DbUserRole; +use serde::Deserialize; +use serde::Serialize; + +#[derive(Clone, Serialize, Deserialize, UserValue)] +pub(crate) struct DbRename { + pub(crate) owner: String, + pub(crate) db: String, + pub(crate) new_owner: String, + pub(crate) new_db: String, +} + +impl Action for DbRename { + async fn exec( + self, + db: ServerDb, + db_pool: DbPool, + config: &Config, + ) -> ServerResult { + let owner_id = db.user_id(&self.owner).await?; + let name = db_name(&self.owner, &self.db); + let new_name = db_name(&self.new_owner, &self.new_db); + let mut database = db.user_db(owner_id, &name).await?; + db_pool + .rename_db( + &self.owner, + &self.db, + &name, + &self.new_owner, + &self.new_db, + &new_name, + config, + ) + .await?; + database.name = new_name; + db.save_db(&database).await?; + + if self.owner != self.new_owner { + let new_owner_id = db.user_id(&self.new_owner).await?; + db.insert_db_user( + database.db_id.expect("database should have db_id"), + new_owner_id, + DbUserRole::Admin, + ) + .await?; + } + + Ok(ClusterActionResult::None) + } +} diff --git a/agdb_server/src/action/db_restore.rs b/agdb_server/src/action/db_restore.rs new file mode 100644 index 00000000..e01b27cc --- /dev/null +++ b/agdb_server/src/action/db_restore.rs @@ -0,0 +1,39 @@ +use super::DbPool; +use super::ServerDb; +use crate::action::Action; +use crate::action::ClusterActionResult; +use crate::action::Config; +use crate::server_error::ServerResult; +use crate::utilities::db_name; +use agdb::UserValue; +use serde::Deserialize; +use serde::Serialize; + +#[derive(Clone, Serialize, Deserialize, UserValue)] +pub(crate) struct DbRestore { + pub(crate) owner: String, + pub(crate) db: String, +} + +impl Action for DbRestore { + async fn exec( + self, + db: ServerDb, + db_pool: DbPool, + config: &Config, + ) -> ServerResult { + let owner = db.user_id(&self.owner).await?; + let name = db_name(&self.owner, &self.db); + let mut database = db.user_db(owner, &name).await?; + + if let Some(backup) = db_pool + .restore_db(&self.owner, &self.db, &name, database.db_type, config) + .await? + { + database.backup = backup; + db.save_db(&database).await?; + } + + Ok(ClusterActionResult::None) + } +} diff --git a/agdb_server/src/action/user_add.rs b/agdb_server/src/action/user_add.rs index 3cba0d69..cd03247a 100644 --- a/agdb_server/src/action/user_add.rs +++ b/agdb_server/src/action/user_add.rs @@ -1,6 +1,7 @@ use super::DbPool; use super::ServerDb; use crate::action::Action; +use crate::action::ClusterActionResult; use crate::action::Config; use crate::server_db::ServerUser; use crate::server_error::ServerResult; @@ -18,10 +19,10 @@ pub(crate) struct UserAdd { impl Action for UserAdd { async fn exec( self, - db: &mut ServerDb, - _db_pool: &mut DbPool, + db: ServerDb, + _db_pool: DbPool, _config: &Config, - ) -> ServerResult { + ) -> ServerResult { db.insert_user(ServerUser { db_id: None, username: self.user, @@ -30,7 +31,6 @@ impl Action for UserAdd { token: String::new(), }) .await?; - - Ok(()) + Ok(ClusterActionResult::None) } } diff --git a/agdb_server/src/action/user_remove.rs b/agdb_server/src/action/user_remove.rs index e6dc76ae..1141497f 100644 --- a/agdb_server/src/action/user_remove.rs +++ b/agdb_server/src/action/user_remove.rs @@ -1,6 +1,7 @@ use super::DbPool; use super::ServerDb; use crate::action::Action; +use crate::action::ClusterActionResult; use crate::config::Config; use crate::server_error::ServerResult; use agdb::UserValue; @@ -13,10 +14,14 @@ pub(crate) struct UserRemove { } impl Action for UserRemove { - async fn exec(self, db: &mut ServerDb, db_pool: &mut DbPool, config: &Config) -> ServerResult { + async fn exec( + self, + db: ServerDb, + db_pool: DbPool, + config: &Config, + ) -> ServerResult { let dbs = db.remove_user(&self.user).await?; db_pool.remove_user_dbs(&self.user, &dbs, config).await?; - - Ok(()) + Ok(ClusterActionResult::None) } } diff --git a/agdb_server/src/cluster.rs b/agdb_server/src/cluster.rs index 6434ba10..b4d80441 100644 --- a/agdb_server/src/cluster.rs +++ b/agdb_server/src/cluster.rs @@ -1,5 +1,6 @@ use crate::action::Action; use crate::action::ClusterAction; +use crate::action::ClusterActionResult; use crate::config::Config; use crate::db_pool::DbPool; use crate::raft; @@ -33,7 +34,7 @@ use tokio::sync::RwLock; pub(crate) type Cluster = Arc; type ClusterNode = Arc; -type ResultNotifier = tokio::sync::oneshot::Sender>; +type ResultNotifier = tokio::sync::oneshot::Sender>; type ClusterResponseReceiver = UnboundedReceiver<(Request, Response)>; pub(crate) struct ClusterNodeImpl { @@ -54,11 +55,12 @@ pub(crate) struct ClusterImpl { } impl ClusterImpl { - pub(crate) async fn append>( + pub(crate) async fn exec>( &self, action: T, - ) -> ServerResult { - let (sender, receiver) = tokio::sync::oneshot::channel::>(); + ) -> ServerResult<(u64, ClusterActionResult)> { + let (sender, receiver) = + tokio::sync::oneshot::channel::>(); let requests = self .raft .write() @@ -304,7 +306,7 @@ pub(crate) struct ClusterStorage { impl ClusterStorage { async fn new(db: ServerDb, db_pool: DbPool, config: Config) -> ServerResult { let (index, term, commit) = db.cluster_log().await?; - let logs = db.logs_unexecuted_until(commit).await?; + let logs = db.logs_unexecuted(commit).await?; let mut storage = Self { result_notifiers: HashMap::new(), @@ -327,18 +329,18 @@ impl ClusterStorage { async fn execute_log(&mut self, log: Log) -> ServerResult<()> { let log_id = log.db_id.unwrap_or_default(); let executed = self.executed.clone(); - let mut db = self.db.clone(); - let mut db_pool = self.db_pool.clone(); + let db = self.db.clone(); + let db_pool = self.db_pool.clone(); let config = self.config.clone(); let notifier = self.result_notifiers.remove(&log_id); tokio::spawn(async move { - let result = log.data.exec(&mut db, &mut db_pool, &config).await; + let result = log.data.exec(db.clone(), db_pool, &config).await; executed.fetch_max(log.index, Ordering::Relaxed); let _ = db.log_executed(log_id).await; if let Some(notifier) = notifier { - let _ = notifier.send(result.map(|_| log.index)); + let _ = notifier.send(result.map(|r| (log.index, r))); } }); @@ -352,7 +354,7 @@ impl Storage for ClusterStorage { log: Log, notifier: Option, ) -> ServerResult<()> { - self.db.remove_unexecuted_logs_since(log.index).await?; + self.db.remove_uncommitted_logs_since(log.index).await?; let log_id = self.db.append_log(&log).await?; self.index = log.index; self.term = log.term; @@ -365,7 +367,7 @@ impl Storage for ClusterStorage { } async fn commit(&mut self, index: u64) -> ServerResult<()> { - for log in self.db.logs_unexecuted_until(index).await? { + for log in self.db.logs_uncommitted(index).await? { self.commit = index; self.db .log_committed(log.db_id.expect("log should have db_id")) diff --git a/agdb_server/src/db_pool.rs b/agdb_server/src/db_pool.rs index 834cfc35..ab671f92 100644 --- a/agdb_server/src/db_pool.rs +++ b/agdb_server/src/db_pool.rs @@ -12,9 +12,7 @@ use agdb::QueryResult; use agdb_api::DbAudit; use agdb_api::DbResource; use agdb_api::DbType; -use agdb_api::DbUserRole; use agdb_api::Queries; -use agdb_api::ServerDatabase; use axum::http::StatusCode; use std::collections::HashMap; use std::io::Seek; @@ -133,10 +131,9 @@ impl DbPool { owner: &str, db: &str, database: &mut Database, - role: DbUserRole, config: &Config, resource: DbResource, - ) -> ServerResult { + ) -> ServerResult { match resource { DbResource::All => { self.do_clear_db(owner, db, database, config).await?; @@ -154,15 +151,7 @@ impl DbPool { } } - let size = self.db_size(&database.name).await?; - - Ok(ServerDatabase { - name: database.name.clone(), - db_type: database.db_type, - role, - size, - backup: database.backup, - }) + Ok(()) } pub(crate) async fn db_size(&self, name: &str) -> ServerResult { @@ -328,10 +317,10 @@ impl DbPool { Ok(r) } - pub(crate) async fn optimize_db(&self, db_name: &str) -> ServerResult { + pub(crate) async fn optimize_db(&self, db_name: &str) -> ServerResult { let user_db = self.db(db_name).await?; user_db.optimize_storage().await?; - Ok(user_db.size().await) + Ok(()) } pub(crate) async fn remove_db(&self, db_name: &str) -> ServerResult { diff --git a/agdb_server/src/routes/admin/db.rs b/agdb_server/src/routes/admin/db.rs index b833eeec..7eb8c26a 100644 --- a/agdb_server/src/routes/admin/db.rs +++ b/agdb_server/src/routes/admin/db.rs @@ -1,12 +1,24 @@ pub(crate) mod user; +use crate::action::db_add::DbAdd; +use crate::action::db_backup::DbBackup; +use crate::action::db_clear::DbClear; +use crate::action::db_convert::DbConvert; +use crate::action::db_copy::DbCopy; +use crate::action::db_delete::DbDelete; +use crate::action::db_exec::DbExec; +use crate::action::db_optimize::DbOptimize; +use crate::action::db_remove::DbRemove; +use crate::action::db_rename::DbRename; +use crate::action::db_restore::DbRestore; +use crate::action::ClusterActionResult; +use crate::cluster::Cluster; use crate::config::Config; use crate::db_pool::DbPool; use crate::error_code::ErrorCode; use crate::routes::db::DbTypeParam; use crate::routes::db::ServerDatabaseRename; use crate::routes::db::ServerDatabaseResource; -use crate::server_db::Database; use crate::server_db::ServerDb; use crate::server_error::ServerResponse; use crate::user_id::AdminId; @@ -21,6 +33,7 @@ use axum::extract::Path; use axum::extract::Query; use axum::extract::State; use axum::http::StatusCode; +use axum::response::IntoResponse; use axum::Json; #[utoipa::path(post, @@ -42,12 +55,11 @@ use axum::Json; )] pub(crate) async fn add( _admin: AdminId, - State(db_pool): State, + State(cluster): State, State(server_db): State, - State(config): State, Path((owner, db)): Path<(String, String)>, request: Query, -) -> ServerResponse { +) -> ServerResponse { let name = db_name(&owner, &db); let owner_id = server_db.user_id(&owner).await?; @@ -55,23 +67,18 @@ pub(crate) async fn add( return Err(ErrorCode::DbExists.into()); } - let backup = db_pool - .add_db(&owner, &db, &name, request.db_type, &config) - .await?; - - server_db - .insert_db( - owner_id, - Database { - db_id: None, - name, - db_type: request.db_type, - backup, - }, - ) + let (commit_index, _result) = cluster + .exec(DbAdd { + owner: owner.to_string(), + db, + db_type: request.db_type, + }) .await?; - Ok(StatusCode::CREATED) + Ok(( + StatusCode::CREATED, + [("commit-index", commit_index.to_string())], + )) } #[utoipa::path(get, @@ -123,22 +130,20 @@ pub(crate) async fn audit( )] pub(crate) async fn backup( _admin: AdminId, - State(db_pool): State, + State(cluster): State, State(server_db): State, - State(config): State, Path((owner, db)): Path<(String, String)>, -) -> ServerResponse { +) -> ServerResponse { let db_name = db_name(&owner, &db); let owner_id = server_db.user_id(&owner).await?; - let mut database = server_db.user_db(owner_id, &db_name).await?; - - database.backup = db_pool - .backup_db(&owner, &db, &db_name, database.db_type, &config) - .await?; + server_db.user_db_id(owner_id, &db_name).await?; - server_db.save_db(&database).await?; + let (commit_index, _result) = cluster.exec(DbBackup { owner, db }).await?; - Ok(StatusCode::CREATED) + Ok(( + StatusCode::CREATED, + [("commit-index", commit_index.to_string())], + )) } #[utoipa::path(post, @@ -160,21 +165,38 @@ pub(crate) async fn backup( pub(crate) async fn clear( _admin: AdminId, State(db_pool): State, + State(cluster): State, State(server_db): State, - State(config): State, Path((owner, db)): Path<(String, String)>, request: Query, -) -> ServerResponse<(StatusCode, Json)> { +) -> ServerResponse { let db_name = db_name(&owner, &db); let owner_id = server_db.user_id(&owner).await?; - let mut database = server_db.user_db(owner_id, &db_name).await?; let role = server_db.user_db_role(owner_id, &db_name).await?; - let db = db_pool - .clear_db(&owner, &db, &mut database, role, &config, request.resource) + + let (commit_index, _result) = cluster + .exec(DbClear { + owner, + db, + resource: request.resource, + }) .await?; - server_db.save_db(&database).await?; - Ok((StatusCode::OK, Json(db))) + let size = db_pool.db_size(&db_name).await.unwrap_or(0); + let database = server_db.user_db(owner_id, &db_name).await?; + let db = ServerDatabase { + name: db_name, + db_type: database.db_type, + role, + backup: database.backup, + size, + }; + + Ok(( + StatusCode::OK, + [("commit-index", commit_index.to_string())], + Json(db), + )) } #[utoipa::path(post, @@ -196,35 +218,31 @@ pub(crate) async fn clear( )] pub(crate) async fn convert( _admin: AdminId, - State(db_pool): State, + State(cluster): State, State(server_db): State, - State(config): State, Path((owner, db)): Path<(String, String)>, request: Query, -) -> ServerResponse { +) -> ServerResponse { let db_name = db_name(&owner, &db); let owner_id = server_db.user_id(&owner).await?; - let mut database = server_db.user_db(owner_id, &db_name).await?; + let db_type = server_db.user_db(owner_id, &db_name).await?.db_type; - if database.db_type == request.db_type { - return Ok(StatusCode::CREATED); + if db_type == request.db_type { + return Ok((StatusCode::CREATED, [("commit-index", String::new())])); } - db_pool - .convert_db( - &owner, - &db, - &db_name, - database.db_type, - request.db_type, - &config, - ) + let (commit_index, _result) = cluster + .exec(DbConvert { + owner, + db, + db_type: request.db_type, + }) .await?; - database.db_type = request.db_type; - server_db.save_db(&database).await?; - - Ok(StatusCode::CREATED) + Ok(( + StatusCode::CREATED, + [("commit-index", commit_index.to_string())], + )) } #[utoipa::path(post, @@ -247,12 +265,11 @@ pub(crate) async fn convert( )] pub(crate) async fn copy( _admin: AdminId, - State(db_pool): State, + State(cluster): State, State(server_db): State, - State(config): State, Path((owner, db)): Path<(String, String)>, request: Query, -) -> ServerResponse { +) -> ServerResponse { let (new_owner, new_db) = request .new_name .split_once('/') @@ -271,20 +288,20 @@ pub(crate) async fn copy( return Err(ErrorCode::DbExists.into()); } - db_pool - .copy_db(&source_db, new_owner, new_db, &target_db, &config) - .await?; - - server_db - .save_db(&Database { - db_id: None, - name: target_db, + let (commit_index, _result) = cluster + .exec(DbCopy { + owner, + db, + new_owner: new_owner.to_string(), + new_db: new_db.to_string(), db_type, - backup: 0, }) .await?; - Ok(StatusCode::CREATED) + Ok(( + StatusCode::CREATED, + [("commit-index", commit_index.to_string())], + )) } #[utoipa::path(delete, @@ -304,17 +321,19 @@ pub(crate) async fn copy( )] pub(crate) async fn delete( _admin: AdminId, - State(db_pool): State, + State(cluster): State, State(server_db): State, - State(config): State, Path((owner, db)): Path<(String, String)>, -) -> ServerResponse { - let owner_id = server_db.user_id(&owner).await?; - let db_name = db_name(&owner, &db); - server_db.remove_db(owner_id, &db_name).await?; - db_pool.delete_db(&owner, &db, &db_name, &config).await?; +) -> ServerResponse { + let user_id = server_db.user_id(&owner).await?; + let _ = server_db.user_db_id(user_id, &db_name(&owner, &db)).await?; - Ok(StatusCode::NO_CONTENT) + let (commit_index, _result) = cluster.exec(DbDelete { owner, db }).await?; + + Ok(( + StatusCode::NO_CONTENT, + [("commit-index", commit_index.to_string())], + )) } #[utoipa::path(post, @@ -337,22 +356,41 @@ pub(crate) async fn delete( pub(crate) async fn exec( _admin: AdminId, State(db_pool): State, + State(cluster): State, State(config): State, Path((owner, db)): Path<(String, String)>, Json(queries): Json, -) -> ServerResponse<(StatusCode, Json)> { +) -> ServerResponse { let db_name = db_name(&owner, &db); let required_role = required_role(&queries); - let results = if required_role == DbUserRole::Read { - db_pool.exec(&db_name, queries).await? + let (commit_index, results) = if required_role == DbUserRole::Read { + (0, db_pool.exec(&db_name, queries).await?) } else { - db_pool - .exec_mut(&owner, &db, &db_name, &config.admin, queries, &config) + let mut index = 0; + let mut results = Vec::new(); + + if let (i, ClusterActionResult::QueryResults(r)) = cluster + .exec(DbExec { + user: config.admin.clone(), + owner, + db, + queries, + }) .await? + { + index = i; + results = r; + } + + (index, results) }; - Ok((StatusCode::OK, Json(QueriesResults(results)))) + Ok(( + StatusCode::OK, + [("commit-index", commit_index.to_string())], + Json(QueriesResults(results)), + )) } #[utoipa::path(get, @@ -403,17 +441,21 @@ pub(crate) async fn list( pub(crate) async fn optimize( _admin: AdminId, State(db_pool): State, + State(cluster): State, State(server_db): State, Path((owner, db)): Path<(String, String)>, -) -> ServerResponse<(StatusCode, Json)> { +) -> ServerResponse { let db_name = db_name(&owner, &db); let owner_id = server_db.user_id(&owner).await?; let database = server_db.user_db(owner_id, &db_name).await?; let role = server_db.user_db_role(owner_id, &db_name).await?; - let size = db_pool.optimize_db(&db_name).await?; + + let (commit_index, _result) = cluster.exec(DbOptimize { owner, db }).await?; + let size = db_pool.db_size(&db_name).await?; Ok(( StatusCode::OK, + [("commit-index", commit_index.to_string())], Json(ServerDatabase { name: db_name, db_type: database.db_type, @@ -441,16 +483,19 @@ pub(crate) async fn optimize( )] pub(crate) async fn remove( _admin: AdminId, - State(db_pool): State, + State(cluster): State, State(server_db): State, Path((owner, db)): Path<(String, String)>, -) -> ServerResponse { - let db_name = db_name(&owner, &db); - let owner_id = server_db.user_id(&owner).await?; - server_db.remove_db(owner_id, &db_name).await?; - db_pool.remove_db(&db_name).await?; +) -> ServerResponse { + let user_id = server_db.user_id(&owner).await?; + let _ = server_db.user_db_id(user_id, &db_name(&owner, &db)).await?; + + let (commit_index, _result) = cluster.exec(DbRemove { owner, db }).await?; - Ok(StatusCode::NO_CONTENT) + Ok(( + StatusCode::NO_CONTENT, + [("commit-index", commit_index.to_string())], + )) } #[utoipa::path(post, @@ -473,48 +518,46 @@ pub(crate) async fn remove( )] pub(crate) async fn rename( _admin: AdminId, - State(db_pool): State, + State(cluster): State, State(server_db): State, - State(config): State, Path((owner, db)): Path<(String, String)>, request: Query, -) -> ServerResponse { +) -> ServerResponse { let db_name = db_name(&owner, &db); + let owner_id = server_db.user_id(&owner).await?; + let _ = server_db.user_db_id(owner_id, &db_name).await?; if db_name == request.new_name { - return Ok(StatusCode::CREATED); + return Ok((StatusCode::CREATED, [("commit-index", String::new())])); } let (new_owner, new_db) = request .new_name .split_once('/') .ok_or(ErrorCode::DbInvalid)?; - let new_owner_id = server_db.user_id(new_owner).await?; - let owner_id = server_db.user_id(&owner).await?; - let mut database = server_db.user_db(owner_id, &db_name).await?; - - db_pool - .rename_db( - &owner, - &db, - &db_name, - new_owner, - new_db, - &request.new_name, - &config, - ) - .await?; - database.name = request.new_name.clone(); - server_db.save_db(&database).await?; - - if new_owner != owner { - server_db - .insert_db_user(database.db_id.unwrap(), new_owner_id, DbUserRole::Admin) - .await?; + let new_owner_id = server_db.user_id(new_owner).await?; + if server_db + .find_user_db_id(new_owner_id, &request.new_name) + .await? + .is_some() + { + return Err(ErrorCode::DbExists.into()); } - Ok(StatusCode::CREATED) + let (commit_index, _result) = cluster + .exec(DbRename { + owner, + db, + new_owner: new_owner.to_string(), + new_db: new_db.to_string(), + }) + .await?; + + Ok(( + StatusCode::CREATED, + [("commit-index", commit_index.to_string())], + )) } #[utoipa::path(post, @@ -529,27 +572,23 @@ pub(crate) async fn rename( responses( (status = 201, description = "db restored"), (status = 401, description = "unauthorized"), - (status = 404, description = "backup not found"), + (status = 404, description = "db or backup not found"), ) )] pub(crate) async fn restore( _admin: AdminId, - State(db_pool): State, + State(cluster): State, State(server_db): State, - State(config): State, Path((owner, db)): Path<(String, String)>, -) -> ServerResponse { +) -> ServerResponse { let db_name = db_name(&owner, &db); let owner_id = server_db.user_id(&owner).await?; - let mut database = server_db.user_db(owner_id, &db_name).await?; + let _ = server_db.user_db_id(owner_id, &db_name).await?; - if let Some(backup) = db_pool - .restore_db(&owner, &db, &db_name, database.db_type, &config) - .await? - { - database.backup = backup; - server_db.save_db(&database).await?; - } + let (commit_index, _result) = cluster.exec(DbRestore { owner, db }).await?; - Ok(StatusCode::CREATED) + Ok(( + StatusCode::CREATED, + [("commit-index", commit_index.to_string())], + )) } diff --git a/agdb_server/src/routes/admin/user.rs b/agdb_server/src/routes/admin/user.rs index 9757128b..bc6c41b1 100644 --- a/agdb_server/src/routes/admin/user.rs +++ b/agdb_server/src/routes/admin/user.rs @@ -49,8 +49,8 @@ pub(crate) async fn add( let pswd = Password::create(&username, &request.password); - let commit_index = cluster - .append(UserAdd { + let (commit_index, _result) = cluster + .exec(UserAdd { user: username, password: pswd.password.to_vec(), salt: pswd.user_salt.to_vec(), @@ -90,8 +90,8 @@ pub(crate) async fn change_password( password::validate_password(&request.password)?; let pswd = Password::create(&username, &request.password); - let commit_index = cluster - .append(ChangePasswordAction { + let (commit_index, _result) = cluster + .exec(ChangePasswordAction { user: username.to_string(), new_password: pswd.password.to_vec(), new_salt: pswd.user_salt.to_vec(), @@ -168,8 +168,8 @@ pub(crate) async fn remove( ) -> ServerResponse { server_db.user_id(&username).await?; - let commit_index = cluster - .append(UserRemove { + let (commit_index, _result) = cluster + .exec(UserRemove { user: username.to_string(), }) .await?; diff --git a/agdb_server/src/routes/cluster.rs b/agdb_server/src/routes/cluster.rs index f266868d..d4703877 100644 --- a/agdb_server/src/routes/cluster.rs +++ b/agdb_server/src/routes/cluster.rs @@ -50,8 +50,8 @@ pub(crate) async fn admin_logout( ) -> ServerResponse { let _user_id = server_db.user_id(&username).await?; - let commit_index = cluster - .append(ClusterLogin { + let (commit_index, _result) = cluster + .exec(ClusterLogin { user: username, new_token: String::new(), }) @@ -82,12 +82,13 @@ pub(crate) async fn login( let mut commit_index = 0; if user_id.is_some() { - commit_index = cluster - .append(ClusterLogin { + let (index, _result) = cluster + .exec(ClusterLogin { user: request.username, new_token: token.clone(), }) .await?; + commit_index = index; } Ok(( @@ -116,12 +117,13 @@ pub(crate) async fn logout( let mut commit_index = 0; if !token.is_empty() { - commit_index = cluster - .append(ClusterLogin { + let (index, _result) = cluster + .exec(ClusterLogin { user: server_db.user_name(user.0).await?, new_token: String::new(), }) .await?; + commit_index = index; } Ok(( diff --git a/agdb_server/src/routes/db.rs b/agdb_server/src/routes/db.rs index e8ac0551..99f29da8 100644 --- a/agdb_server/src/routes/db.rs +++ b/agdb_server/src/routes/db.rs @@ -1,9 +1,21 @@ pub(crate) mod user; +use crate::action::db_add::DbAdd; +use crate::action::db_backup::DbBackup; +use crate::action::db_clear::DbClear; +use crate::action::db_convert::DbConvert; +use crate::action::db_copy::DbCopy; +use crate::action::db_delete::DbDelete; +use crate::action::db_exec::DbExec; +use crate::action::db_optimize::DbOptimize; +use crate::action::db_remove::DbRemove; +use crate::action::db_rename::DbRename; +use crate::action::db_restore::DbRestore; +use crate::action::ClusterActionResult; +use crate::cluster::Cluster; use crate::config::Config; use crate::db_pool::DbPool; use crate::error_code::ErrorCode; -use crate::server_db::Database; use crate::server_db::ServerDb; use crate::server_error::permission_denied; use crate::server_error::ServerError; @@ -22,6 +34,7 @@ use axum::extract::Path; use axum::extract::Query; use axum::extract::State; use axum::http::StatusCode; +use axum::response::IntoResponse; use axum::Json; use serde::Deserialize; use utoipa::IntoParams; @@ -65,12 +78,11 @@ pub struct ServerDatabaseResource { )] pub(crate) async fn add( user: UserId, - State(db_pool): State, + State(cluster): State, State(server_db): State, - State(config): State, Path((owner, db)): Path<(String, String)>, request: Query, -) -> ServerResponse { +) -> ServerResponse { let username = server_db.user_name(user.0).await?; if username != owner { @@ -86,23 +98,18 @@ pub(crate) async fn add( return Err(ErrorCode::DbExists.into()); } - let backup = db_pool - .add_db(&owner, &db, &name, request.db_type, &config) - .await?; - - server_db - .insert_db( - user.0, - Database { - db_id: None, - name, - db_type: request.db_type, - backup, - }, - ) + let (commit_index, _result) = cluster + .exec(DbAdd { + owner: username, + db, + db_type: request.db_type, + }) .await?; - Ok(StatusCode::CREATED) + Ok(( + StatusCode::CREATED, + [("commit-index", commit_index.to_string())], + )) } #[utoipa::path(get, @@ -154,28 +161,23 @@ pub(crate) async fn audit( )] pub(crate) async fn backup( user: UserId, - State(db_pool): State, + State(cluster): State, State(server_db): State, - State(config): State, Path((owner, db)): Path<(String, String)>, -) -> ServerResponse { +) -> ServerResponse { let db_name = db_name(&owner, &db); - let mut database = server_db.user_db(user.0, &db_name).await?; + let db_id = server_db.user_db_id(user.0, &db_name).await?; - if !server_db - .is_db_admin(user.0, database.db_id.unwrap()) - .await? - { + if !server_db.is_db_admin(user.0, db_id).await? { return Err(permission_denied("admin only")); } - database.backup = db_pool - .backup_db(&owner, &db, &db_name, database.db_type, &config) - .await?; - - server_db.save_db(&database).await?; + let (commit_index, _result) = cluster.exec(DbBackup { owner, db }).await?; - Ok(StatusCode::CREATED) + Ok(( + StatusCode::CREATED, + [("commit-index", commit_index.to_string())], + )) } #[utoipa::path(post, @@ -197,30 +199,43 @@ pub(crate) async fn backup( )] pub(crate) async fn clear( user: UserId, + State(cluster): State, State(db_pool): State, State(server_db): State, - State(config): State, Path((owner, db)): Path<(String, String)>, request: Query, -) -> ServerResponse<(StatusCode, Json)> { +) -> ServerResponse { let db_name = db_name(&owner, &db); - let mut database = server_db.user_db(user.0, &db_name).await?; + let db_id = server_db.user_db_id(user.0, &db_name).await?; let role = server_db.user_db_role(user.0, &db_name).await?; - if !server_db - .is_db_admin(user.0, database.db_id.unwrap()) - .await? - { + if !server_db.is_db_admin(user.0, db_id).await? { return Err(permission_denied("admin only")); } - let db = db_pool - .clear_db(&owner, &db, &mut database, role, &config, request.resource) + let (commit_index, _result) = cluster + .exec(DbClear { + owner, + db, + resource: request.resource, + }) .await?; - server_db.save_db(&database).await?; + let size = db_pool.db_size(&db_name).await.unwrap_or(0); + let database = server_db.user_db(user.0, &db_name).await?; + let db = ServerDatabase { + name: db_name, + db_type: database.db_type, + role, + backup: database.backup, + size, + }; - Ok((StatusCode::OK, Json(db))) + Ok(( + StatusCode::OK, + [("commit-index", commit_index.to_string())], + Json(db), + )) } #[utoipa::path(post, @@ -242,14 +257,13 @@ pub(crate) async fn clear( )] pub(crate) async fn convert( user: UserId, - State(db_pool): State, + State(cluster): State, State(server_db): State, - State(config): State, Path((owner, db)): Path<(String, String)>, request: Query, -) -> ServerResponse { +) -> ServerResponse { let db_name = db_name(&owner, &db); - let mut database = server_db.user_db(user.0, &db_name).await?; + let database = server_db.user_db(user.0, &db_name).await?; if !server_db .is_db_admin(user.0, database.db_id.unwrap()) @@ -259,24 +273,21 @@ pub(crate) async fn convert( } if database.db_type == request.db_type { - return Ok(StatusCode::CREATED); + return Ok((StatusCode::CREATED, [("commit-index", String::new())])); } - db_pool - .convert_db( - &owner, - &db, - &db_name, - database.db_type, - request.db_type, - &config, - ) + let (commit_index, _result) = cluster + .exec(DbConvert { + owner, + db, + db_type: request.db_type, + }) .await?; - database.db_type = request.db_type; - server_db.save_db(&database).await?; - - Ok(StatusCode::CREATED) + Ok(( + StatusCode::CREATED, + [("commit-index", commit_index.to_string())], + )) } #[utoipa::path(post, @@ -300,12 +311,11 @@ pub(crate) async fn convert( )] pub(crate) async fn copy( user: UserId, - State(db_pool): State, + State(cluster): State, State(server_db): State, - State(config): State, Path((owner, db)): Path<(String, String)>, request: Query, -) -> ServerResponse { +) -> ServerResponse { let (new_owner, new_db) = request .new_name .split_once('/') @@ -314,11 +324,6 @@ pub(crate) async fn copy( let target_db = db_name(new_owner, new_db); let db_type = server_db.user_db(user.0, &source_db).await?.db_type; let username = server_db.user_name(user.0).await?; - let new_owner_id = if username == new_owner { - user.0 - } else { - server_db.user_id(new_owner).await? - }; if new_owner != username { return Err(permission_denied("cannot copy db to another user")); @@ -332,23 +337,20 @@ pub(crate) async fn copy( return Err(ErrorCode::DbExists.into()); } - db_pool - .copy_db(&source_db, new_owner, new_db, &target_db, &config) - .await?; - - server_db - .insert_db( - new_owner_id, - Database { - db_id: None, - name: target_db, - db_type, - backup: 0, - }, - ) + let (commit_index, _result) = cluster + .exec(DbCopy { + owner, + db, + new_owner: new_owner.to_string(), + new_db: new_db.to_string(), + db_type, + }) .await?; - Ok(StatusCode::CREATED) + Ok(( + StatusCode::CREATED, + [("commit-index", commit_index.to_string())], + )) } #[utoipa::path(delete, @@ -369,22 +371,24 @@ pub(crate) async fn copy( )] pub(crate) async fn delete( user: UserId, - State(db_pool): State, + State(cluster): State, State(server_db): State, - State(config): State, Path((owner, db)): Path<(String, String)>, -) -> ServerResponse { - let user_name = server_db.user_name(user.0).await?; +) -> ServerResponse { + let username = server_db.user_name(user.0).await?; - if owner != user_name { + if owner != username { return Err(permission_denied("owner only")); } - let db_name = db_name(&owner, &db); - server_db.remove_db(user.0, &db_name).await?; - db_pool.delete_db(&owner, &db, &db_name, &config).await?; + let _ = server_db.user_db_id(user.0, &db_name(&owner, &db)).await?; + + let (commit_index, _result) = cluster.exec(DbDelete { owner, db }).await?; - Ok(StatusCode::NO_CONTENT) + Ok(( + StatusCode::NO_CONTENT, + [("commit-index", commit_index.to_string())], + )) } #[utoipa::path(post, @@ -407,11 +411,11 @@ pub(crate) async fn delete( pub(crate) async fn exec( user: UserId, State(db_pool): State, + State(cluster): State, State(server_db): State, - State(config): State, Path((owner, db)): Path<(String, String)>, Json(queries): Json, -) -> ServerResponse<(StatusCode, Json)> { +) -> ServerResponse { let db_name = db_name(&owner, &db); let role = server_db.user_db_role(user.0, &db_name).await?; let required_role = required_role(&queries); @@ -420,16 +424,34 @@ pub(crate) async fn exec( return Err(permission_denied("write rights required")); } - let results = if required_role == DbUserRole::Read { - db_pool.exec(&db_name, queries).await? + let (commit_index, results) = if required_role == DbUserRole::Read { + (0, db_pool.exec(&db_name, queries).await?) } else { let username = server_db.user_name(user.0).await?; - db_pool - .exec_mut(&owner, &db, &db_name, &username, queries, &config) + let mut index = 0; + let mut results = Vec::new(); + + if let (i, ClusterActionResult::QueryResults(r)) = cluster + .exec(DbExec { + user: username, + owner, + db, + queries, + }) .await? + { + index = i; + results = r; + } + + (index, results) }; - Ok((StatusCode::OK, Json(QueriesResults(results)))) + Ok(( + StatusCode::OK, + [("commit-index", commit_index.to_string())], + Json(QueriesResults(results)), + )) } #[utoipa::path(get, @@ -488,14 +510,16 @@ pub(crate) async fn list( (status = 200, description = "ok", body = ServerDatabase), (status = 401, description = "unauthorized"), (status = 403, description = "must have write permissions"), + (status = 404, description = "db not found"), ) )] pub(crate) async fn optimize( user: UserId, State(db_pool): State, + State(cluster): State, State(server_db): State, Path((owner, db)): Path<(String, String)>, -) -> ServerResponse<(StatusCode, Json)> { +) -> ServerResponse { let db_name = db_name(&owner, &db); let database = server_db.user_db(user.0, &db_name).await?; let role = server_db.user_db_role(user.0, &db_name).await?; @@ -504,10 +528,12 @@ pub(crate) async fn optimize( return Err(permission_denied("write rights required")); } - let size = db_pool.optimize_db(&db_name).await?; + let (commit_index, _result) = cluster.exec(DbOptimize { owner, db }).await?; + let size = db_pool.db_size(&db_name).await?; Ok(( StatusCode::OK, + [("commit-index", commit_index.to_string())], Json(ServerDatabase { name: db_name, db_type: database.db_type, @@ -536,21 +562,24 @@ pub(crate) async fn optimize( )] pub(crate) async fn remove( user: UserId, - State(db_pool): State, + State(cluster): State, State(server_db): State, Path((owner, db)): Path<(String, String)>, -) -> ServerResponse { +) -> ServerResponse { let user_name = server_db.user_name(user.0).await?; if owner != user_name { return Err(permission_denied("owner only")); } - let db_name: String = db_name(&owner, &db); - server_db.remove_db(user.0, &db_name).await?; - db_pool.remove_db(&db_name).await?; + let _ = server_db.user_db_id(user.0, &db_name(&owner, &db)).await?; + + let (commit_index, _result) = cluster.exec(DbRemove { owner, db }).await?; - Ok(StatusCode::NO_CONTENT) + Ok(( + StatusCode::NO_CONTENT, + [("commit-index", commit_index.to_string())], + )) } #[utoipa::path(post, @@ -574,52 +603,49 @@ pub(crate) async fn remove( )] pub(crate) async fn rename( user: UserId, - State(db_pool): State, + State(cluster): State, State(server_db): State, - State(config): State, Path((owner, db)): Path<(String, String)>, request: Query, -) -> ServerResponse { +) -> ServerResponse { let db_name = db_name(&owner, &db); + let _ = server_db.user_db_id(user.0, &db_name).await?; if db_name == request.new_name { - return Ok(StatusCode::CREATED); + return Ok((StatusCode::CREATED, [("commit-index", String::new())])); } let (new_owner, new_db) = request .new_name .split_once('/') .ok_or(ErrorCode::DbInvalid)?; - let new_owner_id = server_db.user_id(new_owner).await?; if owner != server_db.user_name(user.0).await? { return Err(permission_denied("owner only")); } - let mut database = server_db.user_db(user.0, &db_name).await?; - - db_pool - .rename_db( - &owner, - &db, - &db_name, - new_owner, - new_db, - &request.new_name, - &config, - ) - .await?; - - database.name = request.new_name.clone(); - server_db.save_db(&database).await?; - - if new_owner != owner { - server_db - .insert_db_user(database.db_id.unwrap(), new_owner_id, DbUserRole::Admin) - .await?; + let new_owner_id = server_db.user_id(new_owner).await?; + if server_db + .find_user_db_id(new_owner_id, &request.new_name) + .await? + .is_some() + { + return Err(ErrorCode::DbExists.into()); } - Ok(StatusCode::CREATED) + let (commit_index, _result) = cluster + .exec(DbRename { + owner, + db, + new_owner: new_owner.to_string(), + new_db: new_db.to_string(), + }) + .await?; + + Ok(( + StatusCode::CREATED, + [("commit-index", commit_index.to_string())], + )) } #[utoipa::path(post, @@ -640,28 +666,21 @@ pub(crate) async fn rename( )] pub(crate) async fn restore( user: UserId, - State(db_pool): State, + State(cluster): State, State(server_db): State, - State(config): State, Path((owner, db)): Path<(String, String)>, -) -> ServerResponse { +) -> ServerResponse { let db_name = db_name(&owner, &db); - let mut database = server_db.user_db(user.0, &db_name).await?; + let db_id = server_db.user_db_id(user.0, &db_name).await?; - if !server_db - .is_db_admin(user.0, database.db_id.unwrap()) - .await? - { + if !server_db.is_db_admin(user.0, db_id).await? { return Err(permission_denied("admin only")); } - if let Some(backup) = db_pool - .restore_db(&owner, &db, &db_name, database.db_type, &config) - .await? - { - database.backup = backup; - server_db.save_db(&database).await?; - } + let (commit_index, _result) = cluster.exec(DbRestore { owner, db }).await?; - Ok(StatusCode::CREATED) + Ok(( + StatusCode::CREATED, + [("commit-index", commit_index.to_string())], + )) } diff --git a/agdb_server/src/routes/user.rs b/agdb_server/src/routes/user.rs index ee293944..757324e0 100644 --- a/agdb_server/src/routes/user.rs +++ b/agdb_server/src/routes/user.rs @@ -112,8 +112,8 @@ pub(crate) async fn change_password( password::validate_password(&request.new_password)?; let pswd = Password::create(&user.username, &request.new_password); - let commit_index = cluster - .append(ChangePasswordAction { + let (commit_index, _result) = cluster + .exec(ChangePasswordAction { user: user.username, new_password: pswd.password.to_vec(), new_salt: pswd.user_salt.to_vec(), diff --git a/agdb_server/src/server_db.rs b/agdb_server/src/server_db.rs index ffa19bd9..2156d0ed 100644 --- a/agdb_server/src/server_db.rs +++ b/agdb_server/src/server_db.rs @@ -1,5 +1,16 @@ use crate::action::change_password::ChangePassword; use crate::action::cluster_login::ClusterLogin; +use crate::action::db_add::DbAdd; +use crate::action::db_backup::DbBackup; +use crate::action::db_clear::DbClear; +use crate::action::db_convert::DbConvert; +use crate::action::db_copy::DbCopy; +use crate::action::db_delete::DbDelete; +use crate::action::db_exec::DbExec; +use crate::action::db_optimize::DbOptimize; +use crate::action::db_remove::DbRemove; +use crate::action::db_rename::DbRename; +use crate::action::db_restore::DbRestore; use crate::action::user_add::UserAdd; use crate::action::user_remove::UserRemove; use crate::action::ClusterAction; @@ -111,6 +122,10 @@ impl ServerDb { t.exec_mut(QueryBuilder::insert().index(EXECUTED).query())?; } + if !indexes.iter().any(|i| i == COMMITTED) { + t.exec_mut(QueryBuilder::insert().index(COMMITTED).query())?; + } + if t.exec(QueryBuilder::select().ids(USERS).query()).is_err() { t.exec_mut(QueryBuilder::insert().nodes().aliases(USERS).query())?; } @@ -409,17 +424,28 @@ impl ServerDb { ) } - pub(crate) async fn logs_unexecuted_until( + pub(crate) async fn logs_unexecuted( + &self, + index: u64, + ) -> ServerResult>> { + self.logs_until(index, EXECUTED).await + } + + pub(crate) async fn logs_uncommitted( &self, index: u64, ) -> ServerResult>> { + self.logs_until(index, COMMITTED).await + } + + async fn logs_until(&self, index: u64, label: &str) -> ServerResult>> { self.0.read().await.transaction(|t| { let mut log_ids: Vec<(u64, DbId)> = t .exec( QueryBuilder::select() .values("index") .search() - .index(EXECUTED) + .index(label) .value(false) .query(), )? @@ -436,18 +462,18 @@ impl ServerDb { }) .collect(); log_ids.sort_by_key(|l| l.0); - Self::logs(t, log_ids.into_iter().map(|l| l.1).collect()) + logs(t, log_ids.into_iter().map(|l| l.1).collect()) }) } - pub(crate) async fn remove_unexecuted_logs_since(&self, since_index: u64) -> ServerResult<()> { + pub(crate) async fn remove_uncommitted_logs_since(&self, since_index: u64) -> ServerResult<()> { self.0.write().await.transaction_mut(|t| { let logs: Vec = t .exec( QueryBuilder::select() .values(["index", "term"]) .search() - .index(EXECUTED) + .index(COMMITTED) .value(false) .query(), )? @@ -499,83 +525,10 @@ impl ServerDb { .ids(); log_ids.reverse(); - Self::logs(t, log_ids) + logs(t, log_ids) }) } - fn logs( - t: &Transaction, - log_ids: Vec, - ) -> ServerResult>> { - let mut actions = Vec::new(); - - for element in t - .exec( - QueryBuilder::select() - .values(["index", "term", "action"]) - .ids(log_ids) - .query(), - )? - .elements - { - let index = element.values[0].value.to_u64()?; - let term = element.values[1].value.to_u64()?; - let action = element.values[2].value.string()?; - - let data = match action.as_str() { - "UserAdd" => Ok(ClusterAction::UserAdd( - t.exec( - QueryBuilder::select() - .elements::() - .ids(element.id) - .query(), - )? - .try_into()?, - )), - "ClusterLogin" => Ok(ClusterAction::ClusterLogin( - t.exec( - QueryBuilder::select() - .elements::() - .ids(element.id) - .query(), - )? - .try_into()?, - )), - "ChangePassword" => Ok(ClusterAction::ChangePassword( - t.exec( - QueryBuilder::select() - .elements::() - .ids(element.id) - .query(), - )? - .try_into()?, - )), - "UserRemove" => Ok(ClusterAction::UserRemove( - t.exec( - QueryBuilder::select() - .elements::() - .ids(element.id) - .query(), - )? - .try_into()?, - )), - _ => Err(ServerError::new( - StatusCode::INTERNAL_SERVER_ERROR, - &format!("unknown action: {action}"), - )), - }?; - - actions.push(Log { - db_id: Some(element.id), - index, - term, - data, - }); - } - - Ok(actions) - } - pub(crate) async fn remove_db(&self, user: DbId, db: &str) -> ServerResult<()> { self.0.write().await.transaction_mut(|t| { let db_id = t @@ -927,7 +880,223 @@ fn log_db_values(log: &Log) -> Vec { values.push(("action", "UserRemove").into()); values.extend(action.to_db_values()); } + ClusterAction::DbAdd(action) => { + values.push(("action", "DbAdd").into()); + values.extend(action.to_db_values()); + } + ClusterAction::DbBackup(db_backup) => { + values.push(("action", "DbBackup").into()); + values.extend(db_backup.to_db_values()); + } + ClusterAction::DbClear(db_clear) => { + values.push(("action", "DbClear").into()); + values.extend(db_clear.to_db_values()); + } + ClusterAction::DbConvert(db_convert) => { + values.push(("action", "DbConvert").into()); + values.extend(db_convert.to_db_values()); + } + ClusterAction::DbCopy(db_copy) => { + values.push(("action", "DbCopy").into()); + values.extend(db_copy.to_db_values()); + } + ClusterAction::DbDelete(db_delete) => { + values.push(("action", "DbDelete").into()); + values.extend(db_delete.to_db_values()); + } + ClusterAction::DbRemove(db_remove) => { + values.push(("action", "DbRemove").into()); + values.extend(db_remove.to_db_values()); + } + ClusterAction::DbExec(db_exec) => { + values.push(("action", "DbExec").into()); + values.extend(db_exec.to_db_values()); + } + ClusterAction::DbOptimize(db_optimize) => { + values.push(("action", "DbOptimize").into()); + values.extend(db_optimize.to_db_values()); + } + ClusterAction::DbRestore(db_restore) => { + values.push(("action", "DbRestore").into()); + values.extend(db_restore.to_db_values()); + } + ClusterAction::DbRename(db_rename) => { + values.push(("action", "DbRename").into()); + values.extend(db_rename.to_db_values()); + } } values } + +fn logs( + t: &Transaction, + log_ids: Vec, +) -> ServerResult>> { + let mut actions = Vec::new(); + + for element in t + .exec( + QueryBuilder::select() + .values(["index", "term", "action"]) + .ids(log_ids) + .query(), + )? + .elements + { + let index = element.values[0].value.to_u64()?; + let term = element.values[1].value.to_u64()?; + let action = element.values[2].value.string()?; + + let data = match action.as_str() { + "UserAdd" => Ok(ClusterAction::UserAdd( + t.exec( + QueryBuilder::select() + .elements::() + .ids(element.id) + .query(), + )? + .try_into()?, + )), + "ClusterLogin" => Ok(ClusterAction::ClusterLogin( + t.exec( + QueryBuilder::select() + .elements::() + .ids(element.id) + .query(), + )? + .try_into()?, + )), + "ChangePassword" => Ok(ClusterAction::ChangePassword( + t.exec( + QueryBuilder::select() + .elements::() + .ids(element.id) + .query(), + )? + .try_into()?, + )), + "UserRemove" => Ok(ClusterAction::UserRemove( + t.exec( + QueryBuilder::select() + .elements::() + .ids(element.id) + .query(), + )? + .try_into()?, + )), + "DbAdd" => Ok(ClusterAction::DbAdd( + t.exec( + QueryBuilder::select() + .elements::() + .ids(element.id) + .query(), + )? + .try_into()?, + )), + "DbBackup" => Ok(ClusterAction::DbBackup( + t.exec( + QueryBuilder::select() + .elements::() + .ids(element.id) + .query(), + )? + .try_into()?, + )), + "DbClear" => Ok(ClusterAction::DbClear( + t.exec( + QueryBuilder::select() + .elements::() + .ids(element.id) + .query(), + )? + .try_into()?, + )), + "DbConvert" => Ok(ClusterAction::DbConvert( + t.exec( + QueryBuilder::select() + .elements::() + .ids(element.id) + .query(), + )? + .try_into()?, + )), + "DbCopy" => Ok(ClusterAction::DbCopy( + t.exec( + QueryBuilder::select() + .elements::() + .ids(element.id) + .query(), + )? + .try_into()?, + )), + "DbDelete" => Ok(ClusterAction::DbDelete( + t.exec( + QueryBuilder::select() + .elements::() + .ids(element.id) + .query(), + )? + .try_into()?, + )), + "DbRemove" => Ok(ClusterAction::DbRemove( + t.exec( + QueryBuilder::select() + .elements::() + .ids(element.id) + .query(), + )? + .try_into()?, + )), + "DbExec" => Ok(ClusterAction::DbExec( + t.exec( + QueryBuilder::select() + .elements::() + .ids(element.id) + .query(), + )? + .try_into()?, + )), + "DbOptimize" => Ok(ClusterAction::DbOptimize( + t.exec( + QueryBuilder::select() + .elements::() + .ids(element.id) + .query(), + )? + .try_into()?, + )), + "DbRestore" => Ok(ClusterAction::DbRestore( + t.exec( + QueryBuilder::select() + .elements::() + .ids(element.id) + .query(), + )? + .try_into()?, + )), + "DbRename" => Ok(ClusterAction::DbRename( + t.exec( + QueryBuilder::select() + .elements::() + .ids(element.id) + .query(), + )? + .try_into()?, + )), + _ => Err(ServerError::new( + StatusCode::INTERNAL_SERVER_ERROR, + &format!("unknown action: {action}"), + )), + }?; + + actions.push(Log { + db_id: Some(element.id), + index, + term, + data, + }); + } + + Ok(actions) +} diff --git a/agdb_server/tests/routes/admin_db_exec_test.rs b/agdb_server/tests/routes/admin_db_exec_test.rs index f8cbcf28..e441f993 100644 --- a/agdb_server/tests/routes/admin_db_exec_test.rs +++ b/agdb_server/tests/routes/admin_db_exec_test.rs @@ -113,7 +113,7 @@ async fn db_not_found() -> anyhow::Result<()> { server.api.user_login(ADMIN, ADMIN).await?; let status = server .api - .admin_db_exec("owner", "db", &vec![]) + .admin_db_exec("owner", "db", &[]) .await .unwrap_err() .status; @@ -130,7 +130,7 @@ async fn non_admin() -> anyhow::Result<()> { server.api.user_login(owner, owner).await?; let status = server .api - .admin_db_exec(owner, "db", &vec![]) + .admin_db_exec(owner, "db", &[]) .await .unwrap_err() .status; @@ -143,7 +143,7 @@ async fn no_token() -> anyhow::Result<()> { let server = TestServer::new().await?; let status = server .api - .admin_db_exec("owner", "db", &vec![]) + .admin_db_exec("owner", "db", &[]) .await .unwrap_err() .status; diff --git a/agdb_server/tests/routes/cluster_test.rs b/agdb_server/tests/routes/cluster_test.rs index acfdcdb4..b50d6a00 100644 --- a/agdb_server/tests/routes/cluster_test.rs +++ b/agdb_server/tests/routes/cluster_test.rs @@ -3,8 +3,11 @@ use crate::TestServerImpl; use crate::ADMIN; use crate::HOST; use crate::SERVER_DATA_DIR; +use agdb::QueryBuilder; use agdb_api::AgdbApi; use agdb_api::ClusterStatus; +use agdb_api::DbResource; +use agdb_api::DbType; use agdb_api::ReqwestClient; use std::collections::HashMap; use std::sync::Arc; @@ -142,7 +145,7 @@ async fn user() -> anyhow::Result<()> { leader.admin_user_remove("user1").await?; client.write().await.user_login(ADMIN, ADMIN).await?; client.read().await.user_status().await?; - client.write().await.admin_cluster_logout(ADMIN).await?; + client.write().await.cluster_logout().await?; assert_eq!( client.read().await.user_status().await.unwrap_err().status, 401 @@ -151,6 +154,162 @@ async fn user() -> anyhow::Result<()> { Ok(()) } +#[tokio::test] +async fn db() -> anyhow::Result<()> { + let (_leader, servers) = create_cluster(2).await?; + servers[0] + .client + .write() + .await + .cluster_login(ADMIN, ADMIN) + .await?; + + let client = servers[0].client.read().await; + client.db_add(ADMIN, "db1", DbType::Memory).await?; + let db = &client.db_list().await?.1[0]; + assert_eq!(db.name, "admin/db1"); + assert_eq!(db.db_type, DbType::Memory); + + client.db_backup(ADMIN, "db1").await?; + let db = &client.db_list().await?.1[0]; + assert_ne!(db.backup, 0); + client.db_restore(ADMIN, "db1").await?; + + let db = client.db_clear(ADMIN, "db1", DbResource::Backup).await?.1; + assert_eq!(db.backup, 0); + + client.db_convert(ADMIN, "db1", DbType::Mapped).await?; + let db = &client.db_list().await?.1[0]; + assert_eq!(db.db_type, DbType::Mapped); + + client.db_copy(ADMIN, "db1", ADMIN, "db2").await?; + let db = &client.db_list().await?.1[1]; + assert_eq!(db.name, "admin/db2"); + client.db_backup(ADMIN, "db2").await?; + + client.db_remove(ADMIN, "db2").await?; + assert_eq!(client.db_list().await?.1.len(), 1); + + client.db_add(ADMIN, "db2", DbType::Memory).await?; + let db = &client.db_list().await?.1[1]; + assert_eq!(db.name, "admin/db2"); + assert_ne!(db.backup, 0); + + client.db_delete(ADMIN, "db2").await?; + assert_eq!(client.db_list().await?.1.len(), 1); + + client + .db_exec( + ADMIN, + "db1", + &[QueryBuilder::insert().nodes().count(100).query().into()], + ) + .await?; + let node_count = client + .db_exec( + ADMIN, + "db1", + &[QueryBuilder::select().node_count().query().into()], + ) + .await? + .1[0] + .elements[0] + .values[0] + .value + .to_u64() + .unwrap(); + assert_eq!(node_count, 100); + + let orig_size = client.db_list().await?.1[0].size; + let db_size = client.db_optimize(ADMIN, "db1").await?.1.size; + assert!(db_size < orig_size); + + client.db_rename(ADMIN, "db1", ADMIN, "db2").await?; + let db = &client.db_list().await?.1[0]; + assert_eq!(db.name, "admin/db2"); + + Ok(()) +} + +#[tokio::test] +async fn db_admin() -> anyhow::Result<()> { + let (_leader, servers) = create_cluster(2).await?; + servers[0] + .client + .write() + .await + .cluster_login(ADMIN, ADMIN) + .await?; + + let client = servers[0].client.read().await; + client.admin_db_add(ADMIN, "db1", DbType::Memory).await?; + let db = &client.db_list().await?.1[0]; + assert_eq!(db.name, "admin/db1"); + assert_eq!(db.db_type, DbType::Memory); + + client.admin_db_backup(ADMIN, "db1").await?; + let db = &client.db_list().await?.1[0]; + assert_ne!(db.backup, 0); + client.admin_db_restore(ADMIN, "db1").await?; + + let db = client.db_clear(ADMIN, "db1", DbResource::Backup).await?.1; + assert_eq!(db.backup, 0); + + client + .admin_db_convert(ADMIN, "db1", DbType::Mapped) + .await?; + let db = &client.db_list().await?.1[0]; + assert_eq!(db.db_type, DbType::Mapped); + + client.admin_db_copy(ADMIN, "db1", ADMIN, "db2").await?; + let db = &client.db_list().await?.1[1]; + assert_eq!(db.name, "admin/db2"); + client.admin_db_backup(ADMIN, "db2").await?; + + client.admin_db_remove(ADMIN, "db2").await?; + assert_eq!(client.db_list().await?.1.len(), 1); + + client.admin_db_add(ADMIN, "db2", DbType::Memory).await?; + let db = &client.db_list().await?.1[1]; + assert_eq!(db.name, "admin/db2"); + assert_ne!(db.backup, 0); + + client.admin_db_delete(ADMIN, "db2").await?; + assert_eq!(client.db_list().await?.1.len(), 1); + + client + .admin_db_exec( + ADMIN, + "db1", + &[QueryBuilder::insert().nodes().count(100).query().into()], + ) + .await?; + let node_count = client + .admin_db_exec( + ADMIN, + "db1", + &[QueryBuilder::select().node_count().query().into()], + ) + .await? + .1[0] + .elements[0] + .values[0] + .value + .to_u64() + .unwrap(); + assert_eq!(node_count, 100); + + let orig_size = client.admin_db_list().await?.1[0].size; + let db_size = client.admin_db_optimize(ADMIN, "db1").await?.1.size; + assert!(db_size < orig_size); + + client.admin_db_rename(ADMIN, "db1", ADMIN, "db2").await?; + let db = &client.db_list().await?.1[0]; + assert_eq!(db.name, "admin/db2"); + + Ok(()) +} + #[tokio::test] async fn status() { let server = TestServer::new().await.unwrap();