diff --git a/.github/workflows/coverage.yml b/.github/workflows/coverage.yml index e3bc2d0..dd48c53 100644 --- a/.github/workflows/coverage.yml +++ b/.github/workflows/coverage.yml @@ -4,6 +4,7 @@ on: push: branches: - master + workflow_dispatch: pull_request: branches: - master @@ -16,14 +17,14 @@ jobs: matrix: os: [ubuntu-latest] rust: [nightly] - orientdb-server: [3.1.11] + orientdb-server: [3.2.34] steps: - uses: actions/checkout@v2 - name: Starting Gremlin Servers run: | - docker-compose -f ./docker-compose/docker-compose.yaml up -d + docker compose -f ./docker-compose/docker-compose.yaml up -d env: ORIENTDB_VERSION: ${{ matrix.orientdb-server }} diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 66f4128..554e557 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -4,6 +4,7 @@ on: push: branches: - "*" + workflow_dispatch: pull_request: branches: - master @@ -16,13 +17,13 @@ jobs: matrix: os: [ubuntu-latest] rust: [stable] - orientdb-server: [3.1.11] + orientdb-server: [3.2.34] steps: - uses: actions/checkout@v2 - name: Starting OrientDB Server run: | - docker-compose -f ./docker-compose/docker-compose.yaml up -d + docker compose -f ./docker-compose/docker-compose.yaml up -d env: ORIENTDB_VERSION: ${{ matrix.orientdb-server }} diff --git a/orientdb-client/Cargo.toml b/orientdb-client/Cargo.toml index c369802..9fbafaf 100644 --- a/orientdb-client/Cargo.toml +++ b/orientdb-client/Cargo.toml @@ -34,14 +34,14 @@ async-trait = { version = "0.1.10", optional = true } futures = { version="0.3", optional=true } mobc = {version = "0.7", optional = true, default-features=false, features = ["unstable"] } tokio = { version = "1", optional=true, features = ["full"] } -uuid = { version = "1.6", optional=true } +uuid = { version = "1.10", optional=true } orientdb-macro = { path="../orientdb-macro", version="0.2", optional=true } [dev-dependencies] -dotenv = "0.14.1" -uuid = { version = "1.6", features=["v4"] } +dotenv = "0.15.0" +uuid = { version = "1.10", features=["v4"] } [[example]] name="async-simple" diff --git a/orientdb-client/src/asynchronous/client.rs b/orientdb-client/src/asynchronous/client.rs index ef3a67a..e1dea56 100644 --- a/orientdb-client/src/asynchronous/client.rs +++ b/orientdb-client/src/asynchronous/client.rs @@ -1,12 +1,16 @@ use super::network::cluster::AsyncConnection; use super::network::cluster::{Cluster, Server}; use super::session::{OSession, SessionPool, SessionPoolManager}; +use crate::asynchronous::server_statement::ServerStatement; +use crate::asynchronous::types::resultset::ServerResultSet; use crate::common::protocol::messages::request::{ Close, Connect, CreateDB, DropDB, ExistDB, MsgHeader, Open, }; use crate::common::protocol::messages::response; +use crate::common::types::result::OResult; use crate::common::ConnectionOptions; use crate::{DatabaseType, OrientResult}; +use futures::Stream; use std::future::Future; use std::net::SocketAddr; use std::net::ToSocketAddrs; @@ -205,4 +209,34 @@ impl OrientDBClientInternal { }) .await } + + pub async fn execute( + &self, + user: &str, + password: &str, + query: &str, + ) -> OrientResult { + Ok(ServerStatement::new( + self, + user.to_string(), + password.to_string(), + query.to_string(), + )) + } + + pub(crate) async fn run( + &self, + stmt: ServerStatement<'_>, + ) -> OrientResult>> { + let user = stmt.user.clone(); + let pwd = stmt.password.clone(); + self.run_as_admin(&user, &pwd, move |session, mut conn| async move { + let response: response::ServerQuery = conn + .send(stmt.into_query(session.session_id, session.token).into()) + .await? + .payload(); + Ok((conn, ServerResultSet::new(response))) + }) + .await + } } diff --git a/orientdb-client/src/asynchronous/mod.rs b/orientdb-client/src/asynchronous/mod.rs index d43f104..3501370 100644 --- a/orientdb-client/src/asynchronous/mod.rs +++ b/orientdb-client/src/asynchronous/mod.rs @@ -2,6 +2,7 @@ pub mod client; pub mod live; pub mod live_statement; pub mod network; +pub mod server_statement; pub mod session; pub mod statement; pub mod types; diff --git a/orientdb-client/src/asynchronous/network/decoder.rs b/orientdb-client/src/asynchronous/network/decoder.rs index 6f5bd19..06c11bf 100644 --- a/orientdb-client/src/asynchronous/network/decoder.rs +++ b/orientdb-client/src/asynchronous/network/decoder.rs @@ -3,7 +3,8 @@ use crate::common::protocol::deserializer::DocumentDeserializer; use crate::common::protocol::messages::response::Response; use crate::common::protocol::messages::response::Status; use crate::common::protocol::messages::response::{ - Connect, CreateDB, DropDB, ExistDB, Header, LiveQuery, LiveQueryResult, Open, Query, QueryClose, + Connect, CreateDB, DropDB, ExistDB, Header, LiveQuery, LiveQueryResult, Open, Query, + QueryClose, ServerQuery, }; use crate::common::types::error::{OError, RequestError}; use crate::common::types::OResult; @@ -174,6 +175,36 @@ impl VersionedDecoder for Protocol37 { stats, )) } + + async fn decode_server_query(buf: &mut T) -> OrientResult + where + T: AsyncRead + Unpin + Send, + { + let query_id = reader::read_string(buf).await?; + let changes = reader::read_bool(buf).await?; + let has_plan = reader::read_bool(buf).await?; + + let execution_plan = if has_plan { + Some(read_result(buf).await?) + } else { + None + }; + + let _prefetched = reader::read_i32(buf).await?; + let records = read_result_set(buf).await?; + let has_next = reader::read_bool(buf).await?; + let stats = read_query_stats(buf).await?; + let _reaload_metadata = reader::read_bool(buf).await?; + + Ok(ServerQuery::new( + query_id, + changes, + execution_plan, + records, + has_next, + stats, + )) + } async fn decode_connect(buf: &mut T) -> OrientResult where T: AsyncRead + Unpin + Send, @@ -209,6 +240,10 @@ pub trait VersionedDecoder { where T: AsyncRead + Unpin + Send; + async fn decode_server_query(buf: &mut T) -> OrientResult + where + T: AsyncRead + Unpin + Send; + async fn decode_live_query(buf: &mut T) -> OrientResult where T: AsyncRead + Unpin + Send; @@ -259,6 +294,7 @@ where 4 => T::decode_create_db(buf).await?.into(), 6 => T::decode_exist(buf).await?.into(), 7 => T::decode_drop_db(buf).await?.into(), + 50 => T::decode_server_query(buf).await?.into(), 45 => T::decode_query(buf).await?.into(), 46 => T::decode_query_close(buf).await?.into(), 47 => T::decode_query(buf).await?.into(), diff --git a/orientdb-client/src/asynchronous/server_statement.rs b/orientdb-client/src/asynchronous/server_statement.rs new file mode 100644 index 0000000..9f94047 --- /dev/null +++ b/orientdb-client/src/asynchronous/server_statement.rs @@ -0,0 +1,129 @@ +use super::client::OrientDBClientInternal; +use crate::common::protocol::messages::request::ServerQuery; +use crate::common::types::value::{IntoOValue, OValue}; +use crate::common::types::OResult; +#[cfg(feature = "sugar")] +use crate::types::result::FromResult; +use crate::OrientResult; +use futures::Stream; +use std::collections::HashMap; + +#[cfg(feature = "sugar")] +use futures::StreamExt; + +pub struct ServerStatement<'a> { + client: &'a OrientDBClientInternal, + pub(crate) user: String, + pub(crate) password: String, + stm: String, + params: HashMap, + language: String, + page_size: i32, + mode: i8, + named: bool, +} + +impl<'a> ServerStatement<'a> { + pub(crate) fn new( + client: &'a OrientDBClientInternal, + user: String, + password: String, + stm: String, + ) -> ServerStatement<'a> { + ServerStatement { + client, + user, + password, + stm, + params: HashMap::new(), + named: true, + mode: 1, + language: String::from("sql"), + page_size: 150, + } + } + + pub fn positional(mut self, params: &[&dyn IntoOValue]) -> Self { + let mut p = HashMap::new(); + for (i, elem) in params.iter().enumerate() { + p.insert(i.to_string(), elem.into_ovalue()); + } + self.params = p; + self.named = false; + self + } + pub fn named(mut self, params: &[(&str, &dyn IntoOValue)]) -> Self { + self.params = params + .iter() + .map(|&(k, ref v)| (String::from(k), v.into_ovalue())) + .collect(); + + self.named = true; + self + } + + pub async fn run(self) -> OrientResult>> { + self.client.run(self.into()).await + } + + #[cfg(feature = "sugar")] + pub async fn fetch_one(self) -> OrientResult> + where + T: FromResult, + { + let mut stream = self + .client + .run(self.into()) + .await? + .map(|r| r.and_then(T::from_result)); + + match stream.next().await { + Some(r) => Ok(Some(r?)), + None => Ok(None), + } + } + + #[cfg(feature = "sugar")] + pub async fn fetch(self) -> OrientResult> + where + T: FromResult, + { + let mut stream = self + .client + .run(self.into()) + .await? + .map(|r| r.and_then(T::from_result)); + + let mut results = Vec::new(); + + while let Some(r) = stream.next().await { + results.push(r?); + } + Ok(results) + } + + #[cfg(feature = "sugar")] + pub async fn stream(self) -> OrientResult>> + where + T: FromResult, + { + Ok(self + .client + .run(self.into()) + .await? + .map(|r| r.and_then(T::from_result))) + } + + pub(crate) fn into_query(self, session_id: i32, token: Option>) -> ServerQuery { + ServerQuery { + session_id, + token, + query: self.stm, + language: self.language, + named: self.named, + parameters: self.params, + page_size: self.page_size, + mode: self.mode, + } + } +} diff --git a/orientdb-client/src/asynchronous/types/resultset.rs b/orientdb-client/src/asynchronous/types/resultset.rs index b2fb201..202a523 100644 --- a/orientdb-client/src/asynchronous/types/resultset.rs +++ b/orientdb-client/src/asynchronous/types/resultset.rs @@ -1,6 +1,6 @@ use crate::asynchronous::network::cluster::Server; use crate::common::protocol::messages::request::{QueryClose, QueryNext}; -use crate::common::protocol::messages::response::Query; +use crate::common::protocol::messages::response::{Query, ServerQuery}; use crate::common::types::result::OResult; use crate::OrientResult; #[cfg(feature = "async-std-runtime")] @@ -123,3 +123,21 @@ async fn close_result( conn.send(msg.into()).await?; Ok(()) } + +pub struct ServerResultSet { + response: ServerQuery, +} + +impl ServerResultSet { + pub(crate) fn new(response: ServerQuery) -> ServerResultSet { + ServerResultSet { response } + } +} + +impl futures::Stream for ServerResultSet { + type Item = OrientResult; + + fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(self.response.records.pop_front().map(|x| Ok(x))) + } +} diff --git a/orientdb-client/src/common/protocol/messages/request.rs b/orientdb-client/src/common/protocol/messages/request.rs index f59349c..1288682 100644 --- a/orientdb-client/src/common/protocol/messages/request.rs +++ b/orientdb-client/src/common/protocol/messages/request.rs @@ -347,6 +347,53 @@ impl From for Request { } } +// Server Query Message +#[derive(Debug)] +pub struct ServerQuery { + pub session_id: i32, + pub token: Option>, + pub query: String, + pub parameters: HashMap, + pub named: bool, + pub language: String, + pub mode: i8, + pub page_size: i32, +} + +impl ServerQuery { + #[allow(clippy::too_many_arguments)] + pub fn new( + session_id: i32, + token: Option>, + query: T, + parameters: HashMap, + named: bool, + language: T, + mode: i8, + page_size: i32, + ) -> ServerQuery + where + T: Into, + { + ServerQuery { + session_id, + token, + query: query.into(), + parameters, + named, + language: language.into(), + mode, + page_size, + } + } +} + +impl From for Request { + fn from(input: ServerQuery) -> Request { + Request::ServerQuery(input) + } +} + #[derive(Debug)] pub enum Request { HandShake(HandShake), @@ -354,6 +401,7 @@ pub enum Request { CreateDB(CreateDB), ExistDB(ExistDB), DropDB(DropDB), + ServerQuery(ServerQuery), Open(Open), Query(Query), LiveQuery(LiveQuery), diff --git a/orientdb-client/src/common/protocol/messages/response.rs b/orientdb-client/src/common/protocol/messages/response.rs index 21ddd99..5fa5cac 100644 --- a/orientdb-client/src/common/protocol/messages/response.rs +++ b/orientdb-client/src/common/protocol/messages/response.rs @@ -190,6 +190,45 @@ impl From for ResponseType { } } +#[derive(Debug)] +pub struct ServerQuery { + pub query_id: String, + pub tx_changes: bool, + pub execution_plan: Option, + pub records: VecDeque, + pub has_next: bool, + pub stats: HashMap, +} + +impl ServerQuery { + pub fn new( + query_id: T, + tx_changes: bool, + execution_plan: Option, + records: VecDeque, + has_next: bool, + stats: HashMap, + ) -> ServerQuery + where + T: Into, + { + ServerQuery { + query_id: query_id.into(), + tx_changes, + execution_plan, + records, + has_next, + stats, + } + } +} + +impl From for ResponseType { + fn from(input: ServerQuery) -> ResponseType { + ResponseType::ServerQuery(Some(input)) + } +} + #[allow(clippy::large_enum_variant)] #[derive(Debug)] pub enum ResponseType { @@ -200,6 +239,7 @@ pub enum ResponseType { CreateDB(Option), ExistDB(Option), DropDB(Option), + ServerQuery(Option), LiveQuery(Option), LiveQueryResult(Option), QueryClose(Option), @@ -259,3 +299,4 @@ impl_payload!(ExistDB); impl_payload!(Connect); impl_payload!(LiveQuery); impl_payload!(LiveQueryResult); +impl_payload!(ServerQuery); diff --git a/orientdb-client/src/sync/client.rs b/orientdb-client/src/sync/client.rs index 0ac9f48..0e74ef6 100644 --- a/orientdb-client/src/sync/client.rs +++ b/orientdb-client/src/sync/client.rs @@ -1,11 +1,13 @@ use super::network::cluster::SyncConnection; use super::network::cluster::{Cluster, Server}; use crate::common::protocol::messages::request::{ - Close, Connect, CreateDB, DropDB, ExistDB, MsgHeader, Open, + Close, Connect, CreateDB, DropDB, ExistDB, MsgHeader, Open, ServerQuery, }; use crate::common::protocol::messages::response; use crate::common::ConnectionOptions; +use crate::sync::server_statement::ServerStatement; use crate::sync::session::{OSession, SessionPool, SessionPoolManager}; +use crate::sync::types::resultset::{ResultSet, ServerResultSet}; use crate::{DatabaseType, OrientResult}; use std::net::SocketAddr; use std::net::ToSocketAddrs; @@ -187,4 +189,29 @@ impl OrientDBClientInternal { Ok(()) }) } + + pub fn execute( + &self, + user: &str, + password: &str, + query: &str, + ) -> OrientResult { + Ok(ServerStatement::new( + self, + user.to_string(), + password.to_string(), + query.to_string(), + )) + } + + pub(crate) fn run(&self, stmt: ServerStatement) -> OrientResult { + let user = stmt.user.clone(); + let pwd = stmt.password.clone(); + self.run_as_admin(&user, &pwd, move |session, conn| { + let response: response::ServerQuery = conn + .send(stmt.into_query(session.session_id, session.token).into())? + .payload(); + Ok(ServerResultSet::new(response)) + }) + } } diff --git a/orientdb-client/src/sync/mod.rs b/orientdb-client/src/sync/mod.rs index 4e1708a..891fe6d 100644 --- a/orientdb-client/src/sync/mod.rs +++ b/orientdb-client/src/sync/mod.rs @@ -1,6 +1,7 @@ pub mod client; pub mod network; pub(crate) mod protocol; +pub mod server_statement; pub mod session; pub mod statement; pub mod types; diff --git a/orientdb-client/src/sync/protocol/decoder.rs b/orientdb-client/src/sync/protocol/decoder.rs index 4f789c5..106e621 100644 --- a/orientdb-client/src/sync/protocol/decoder.rs +++ b/orientdb-client/src/sync/protocol/decoder.rs @@ -1,5 +1,5 @@ use crate::common::protocol::messages::response::{ - Connect, CreateDB, DropDB, ExistDB, Header, Open, Query, QueryClose, + Connect, CreateDB, DropDB, ExistDB, Header, Open, Query, QueryClose, ServerQuery, }; use crate::common::types::error::RequestError; use crate::OrientResult; @@ -21,4 +21,6 @@ pub trait VersionedDecoder { fn decode_query_close(_buf: &mut R) -> OrientResult { Ok(QueryClose {}) } + + fn decode_server_query(buf: &mut R) -> OrientResult; } diff --git a/orientdb-client/src/sync/protocol/encoder.rs b/orientdb-client/src/sync/protocol/encoder.rs index c88872f..40ee619 100644 --- a/orientdb-client/src/sync/protocol/encoder.rs +++ b/orientdb-client/src/sync/protocol/encoder.rs @@ -1,7 +1,7 @@ use crate::common::protocol::buffer::OBuffer; use crate::common::protocol::messages::request::{ Close as ReqClose, Connect, CreateDB, DropDB, ExistDB, HandShake, LiveQuery, Open as ReqOpen, - Query as ReqQuery, QueryClose, QueryNext, UnsubscribeLiveQuery, + Query as ReqQuery, QueryClose, QueryNext, ServerQuery, UnsubscribeLiveQuery, }; use crate::OrientError; @@ -22,4 +22,5 @@ pub trait VersionedEncoder { fn encode_create_db(buf: &mut OBuffer, close: CreateDB) -> Result<(), OrientError>; fn encode_exist_db(buf: &mut OBuffer, close: ExistDB) -> Result<(), OrientError>; fn encode_drop_db(buf: &mut OBuffer, close: DropDB) -> Result<(), OrientError>; + fn encode_server_query(buf: &mut OBuffer, query: ServerQuery) -> Result<(), OrientError>; } diff --git a/orientdb-client/src/sync/protocol/mod.rs b/orientdb-client/src/sync/protocol/mod.rs index 63f9a38..3b96b3b 100644 --- a/orientdb-client/src/sync/protocol/mod.rs +++ b/orientdb-client/src/sync/protocol/mod.rs @@ -57,6 +57,7 @@ impl WiredProtocol { Request::CreateDB(create) => T::encode_create_db(&mut buffer, create), Request::ExistDB(exist) => T::encode_exist_db(&mut buffer, exist), Request::DropDB(drop) => T::encode_drop_db(&mut buffer, drop), + Request::ServerQuery(query) => T::encode_server_query(&mut buffer, query), Request::Close(close) => T::encode_close(&mut buffer, close), Request::Query(query) => T::encode_query(&mut buffer, query), Request::LiveQuery(query) => T::encode_live_query(&mut buffer, query), @@ -97,6 +98,7 @@ impl WiredProtocol { 45 => T::decode_query(buf)?.into(), 46 => T::decode_query_close(buf)?.into(), 47 => T::decode_query(buf)?.into(), + 50 => T::decode_server_query(buf)?.into(), _ => panic!("Request {} not supported", header.op), }, }; diff --git a/orientdb-client/src/sync/protocol/v37/decoder.rs b/orientdb-client/src/sync/protocol/v37/decoder.rs index 38b07bd..c44c9a2 100644 --- a/orientdb-client/src/sync/protocol/v37/decoder.rs +++ b/orientdb-client/src/sync/protocol/v37/decoder.rs @@ -1,6 +1,8 @@ use super::super::v37::Protocol37; use crate::common::protocol::deserializer::DocumentDeserializer; -use crate::common::protocol::messages::response::{Connect, ExistDB, Header, Open, Query, Status}; +use crate::common::protocol::messages::response::{ + Connect, ExistDB, Header, Open, Query, ServerQuery, Status, +}; use crate::common::types::error::{OError, RequestError}; use crate::common::types::OResult; use crate::sync::protocol::decoder::VersionedDecoder; @@ -67,6 +69,33 @@ impl VersionedDecoder for Protocol37 { )) } + fn decode_server_query(buf: &mut R) -> OrientResult { + let query_id = reader::read_string(buf)?; + let changes = reader::read_bool(buf)?; + let has_plan = reader::read_bool(buf)?; + + let execution_plan = if has_plan { + Some(read_result(buf)?) + } else { + None + }; + + let _prefetched = reader::read_i32(buf)?; + let records = read_result_set(buf)?; + let has_next = reader::read_bool(buf)?; + let stats = read_query_stats(buf)?; + let _reaload_metadata = reader::read_bool(buf)?; + + Ok(ServerQuery::new( + query_id, + changes, + execution_plan, + records, + has_next, + stats, + )) + } + fn decode_errors(buf: &mut R) -> OrientResult { let code = reader::read_i32(buf)?; let identifier = reader::read_i32(buf)?; diff --git a/orientdb-client/src/sync/protocol/v37/encoder.rs b/orientdb-client/src/sync/protocol/v37/encoder.rs index a2c5c46..e780540 100644 --- a/orientdb-client/src/sync/protocol/v37/encoder.rs +++ b/orientdb-client/src/sync/protocol/v37/encoder.rs @@ -2,7 +2,7 @@ use super::super::v37::Protocol37; use crate::common::protocol::buffer::OBuffer; use crate::common::protocol::messages::request::{ Close, Connect, CreateDB, DropDB, ExistDB, HandShake, LiveQuery, Open, Query, QueryClose, - QueryNext, UnsubscribeLiveQuery, + QueryNext, ServerQuery, UnsubscribeLiveQuery, }; use crate::common::protocol::serializer::DocumentSerializer; use crate::common::types::document::ODocument; @@ -139,6 +139,27 @@ impl VersionedEncoder for Protocol37 { Ok(()) } + fn encode_server_query(buf: &mut OBuffer, query: ServerQuery) -> OrientResult<()> { + buf.put_i8(50)?; + buf.put_i32(query.session_id)?; + + if let Some(t) = query.token { + buf.write_slice(&t)?; + } + buf.write_str(&query.language)?; + buf.write_str(&query.query)?; + buf.put_i8(query.mode)?; + buf.put_i32(query.page_size)?; + buf.write_str("")?; + let mut document = ODocument::empty(); + document.set("params", query.parameters); + let encoded = Protocol37::encode_document(&document)?; + buf.write_slice(encoded.as_slice())?; + buf.write_bool(query.named)?; + + Ok(()) + } + fn encode_query_next(buf: &mut OBuffer, next: QueryNext) -> OrientResult<()> { buf.put_i8(47)?; buf.put_i32(next.session_id)?; diff --git a/orientdb-client/src/sync/server_statement.rs b/orientdb-client/src/sync/server_statement.rs new file mode 100644 index 0000000..e25b479 --- /dev/null +++ b/orientdb-client/src/sync/server_statement.rs @@ -0,0 +1,125 @@ +use super::client::OrientDBClientInternal; +use crate::common::protocol::messages::request::ServerQuery; +use crate::common::types::value::{IntoOValue, OValue}; +use crate::sync::types::resultset::ResultSet; +#[cfg(feature = "sugar")] +use crate::types::result::FromResult; +use crate::OrientResult; +use std::collections::HashMap; + +pub struct ServerStatement<'a> { + session: &'a OrientDBClientInternal, + pub(crate) user: String, + pub(crate) password: String, + stm: String, + params: HashMap, + language: String, + page_size: i32, + mode: i8, + named: bool, +} + +impl<'a> ServerStatement<'a> { + pub(crate) fn new( + session: &'a OrientDBClientInternal, + user: String, + password: String, + stm: String, + ) -> ServerStatement<'a> { + ServerStatement { + session, + user, + password, + stm, + params: HashMap::new(), + named: true, + mode: 1, + language: String::from("sql"), + page_size: 150, + } + } + pub(crate) fn mode(mut self, mode: i8) -> Self { + self.mode = mode; + self + } + + pub(crate) fn language(mut self, language: String) -> Self { + self.language = language; + self + } + + pub fn positional(mut self, params: &[&dyn IntoOValue]) -> Self { + let mut p = HashMap::new(); + for (i, elem) in params.iter().enumerate() { + p.insert(i.to_string(), elem.into_ovalue()); + } + self.params = p; + self.named = false; + self + } + pub fn named(mut self, params: &[(&str, &dyn IntoOValue)]) -> Self { + self.params = params + .iter() + .map(|&(k, ref v)| (String::from(k), v.into_ovalue())) + .collect(); + + self.named = true; + self + } + + pub fn page_size(mut self, page_size: i32) -> Self { + self.page_size = page_size; + self + } + pub fn run(self) -> OrientResult { + self.session.run(self) + } + + #[cfg(feature = "sugar")] + pub fn fetch_one(self) -> OrientResult> + where + T: FromResult, + { + match self + .session + .run(self)? + .map(|r| r.and_then(T::from_result)) + .next() + { + Some(s) => Ok(Some(s?)), + None => Ok(None), + } + } + + #[cfg(feature = "sugar")] + pub fn fetch(self) -> OrientResult> + where + T: FromResult, + { + self.session + .run(self)? + .map(|r| r.and_then(T::from_result)) + .collect() + } + + #[cfg(feature = "sugar")] + pub fn iter(self) -> OrientResult>> + where + T: FromResult, + { + Ok(self.session.run(self)?.map(|r| r.and_then(T::from_result))) + } + + pub(crate) fn into_query(self, session_id: i32, token: Option>) -> ServerQuery { + ServerQuery { + session_id, + token, + query: self.stm, + language: self.language, + named: self.named, + parameters: self.params, + page_size: self.page_size, + mode: self.mode, + } + } +} diff --git a/orientdb-client/src/sync/types/resultset.rs b/orientdb-client/src/sync/types/resultset.rs index fda8b14..f08e157 100644 --- a/orientdb-client/src/sync/types/resultset.rs +++ b/orientdb-client/src/sync/types/resultset.rs @@ -1,5 +1,5 @@ use crate::common::protocol::messages::request::{QueryClose, QueryNext}; -use crate::common::protocol::messages::response::Query; +use crate::common::protocol::messages::response::{Query, ServerQuery}; use crate::common::types::result::OResult; use crate::sync::network::cluster::Server; use crate::OrientResult; @@ -100,3 +100,27 @@ impl Drop for PagedResultSet { self.close_result(); } } + +pub struct ServerResultSet { + response: ServerQuery, +} + +impl ServerResultSet { + pub(crate) fn new(response: ServerQuery) -> ServerResultSet { + ServerResultSet { response } + } +} + +impl ResultSet for ServerResultSet { + fn close(self) -> OrientResult<()> { + Ok(()) + } +} + +impl Iterator for ServerResultSet { + type Item = OrientResult; + + fn next(&mut self) -> Option { + self.response.records.pop_front().map(|x| Ok(x)) + } +} diff --git a/orientdb-client/tests/common/mod.rs b/orientdb-client/tests/common/mod.rs index 0677e37..1c29393 100644 --- a/orientdb-client/tests/common/mod.rs +++ b/orientdb-client/tests/common/mod.rs @@ -1,9 +1,9 @@ use dotenv::dotenv; -use std::env; - +use orientdb_client::sync::types::resultset::ResultSet; use orientdb_client::DatabaseType; use orientdb_client::OSession; use orientdb_client::OrientDB; +use std::env; #[derive(Debug)] pub struct OrientDBTest { @@ -98,13 +98,17 @@ pub fn create_database(db: &str, odb: &OrientDB, config: &OrientDBTest) { ) .expect(&format!("Cannot drop database with name {}", db)); } - odb.create_database( - db, + odb.execute( &config.r_password, &config.r_password, - DatabaseType::Memory, + &format!("create database {} memory users(admin identified by 'admin' role admin, reader identified by 'reader' role reader, writer identified by 'writer' role writer)", db) ) - .expect(&format!("Cannot create database with name {}", db)); + .expect(&format!("Cannot create database with name {}", db)). + run() + .expect(&format!("Cannot create database with name {}", db)) + .close() + .expect(&format!("Cannot create database with name {}", db)) + ; } #[allow(dead_code)] @@ -219,13 +223,15 @@ pub mod asynchronous { .await .expect(&format!("Cannot drop database with name {}", db)); } - odb.create_database( - db, - &config.r_password, - &config.r_password, - DatabaseType::Memory, - ) - .await - .expect(&format!("Cannot create database with name {}", db)); + + let _ = odb.execute( + &config.r_password, + &config.r_password, + &format!("create database {} memory users(admin identified by 'admin' role admin, reader identified by 'reader' role reader, writer identified by 'writer' role writer)", db) + ).await + .expect(&format!("Cannot create database with name {}", db)). + run().await + .expect(&format!("Cannot create database with name {}", db)) + ; } }