Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add server command and fix ci build #42

Merged
merged 7 commits into from
Oct 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions .github/workflows/coverage.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ on:
push:
branches:
- master
workflow_dispatch:
pull_request:
branches:
- master
Expand All @@ -16,14 +17,14 @@ jobs:
matrix:
os: [ubuntu-latest]
rust: [nightly]
orientdb-server: [3.1.11]
orientdb-server: [3.2.34]
steps:


- uses: actions/checkout@v2
- name: Starting Gremlin Servers
run: |
docker-compose -f ./docker-compose/docker-compose.yaml up -d
docker compose -f ./docker-compose/docker-compose.yaml up -d
env:
ORIENTDB_VERSION: ${{ matrix.orientdb-server }}

Expand Down
5 changes: 3 additions & 2 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ on:
push:
branches:
- "*"
workflow_dispatch:
pull_request:
branches:
- master
Expand All @@ -16,13 +17,13 @@ jobs:
matrix:
os: [ubuntu-latest]
rust: [stable]
orientdb-server: [3.1.11]
orientdb-server: [3.2.34]
steps:

- uses: actions/checkout@v2
- name: Starting OrientDB Server
run: |
docker-compose -f ./docker-compose/docker-compose.yaml up -d
docker compose -f ./docker-compose/docker-compose.yaml up -d
env:
ORIENTDB_VERSION: ${{ matrix.orientdb-server }}

Expand Down
6 changes: 3 additions & 3 deletions orientdb-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,14 @@ async-trait = { version = "0.1.10", optional = true }
futures = { version="0.3", optional=true }
mobc = {version = "0.7", optional = true, default-features=false, features = ["unstable"] }
tokio = { version = "1", optional=true, features = ["full"] }
uuid = { version = "1.6", optional=true }
uuid = { version = "1.10", optional=true }
orientdb-macro = { path="../orientdb-macro", version="0.2", optional=true }



[dev-dependencies]
dotenv = "0.14.1"
uuid = { version = "1.6", features=["v4"] }
dotenv = "0.15.0"
uuid = { version = "1.10", features=["v4"] }

[[example]]
name="async-simple"
Expand Down
34 changes: 34 additions & 0 deletions orientdb-client/src/asynchronous/client.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
use super::network::cluster::AsyncConnection;
use super::network::cluster::{Cluster, Server};
use super::session::{OSession, SessionPool, SessionPoolManager};
use crate::asynchronous::server_statement::ServerStatement;
use crate::asynchronous::types::resultset::ServerResultSet;
use crate::common::protocol::messages::request::{
Close, Connect, CreateDB, DropDB, ExistDB, MsgHeader, Open,
};
use crate::common::protocol::messages::response;
use crate::common::types::result::OResult;
use crate::common::ConnectionOptions;
use crate::{DatabaseType, OrientResult};
use futures::Stream;
use std::future::Future;
use std::net::SocketAddr;
use std::net::ToSocketAddrs;
Expand Down Expand Up @@ -205,4 +209,34 @@ impl OrientDBClientInternal {
})
.await
}

pub async fn execute(
&self,
user: &str,
password: &str,
query: &str,
) -> OrientResult<ServerStatement> {
Ok(ServerStatement::new(
self,
user.to_string(),
password.to_string(),
query.to_string(),
))
}

pub(crate) async fn run(
&self,
stmt: ServerStatement<'_>,
) -> OrientResult<impl Stream<Item = OrientResult<OResult>>> {
let user = stmt.user.clone();
let pwd = stmt.password.clone();
self.run_as_admin(&user, &pwd, move |session, mut conn| async move {
let response: response::ServerQuery = conn
.send(stmt.into_query(session.session_id, session.token).into())
.await?
.payload();
Ok((conn, ServerResultSet::new(response)))
})
.await
}
}
1 change: 1 addition & 0 deletions orientdb-client/src/asynchronous/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ pub mod client;
pub mod live;
pub mod live_statement;
pub mod network;
pub mod server_statement;
pub mod session;
pub mod statement;
pub mod types;
Expand Down
38 changes: 37 additions & 1 deletion orientdb-client/src/asynchronous/network/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ use crate::common::protocol::deserializer::DocumentDeserializer;
use crate::common::protocol::messages::response::Response;
use crate::common::protocol::messages::response::Status;
use crate::common::protocol::messages::response::{
Connect, CreateDB, DropDB, ExistDB, Header, LiveQuery, LiveQueryResult, Open, Query, QueryClose,
Connect, CreateDB, DropDB, ExistDB, Header, LiveQuery, LiveQueryResult, Open, Query,
QueryClose, ServerQuery,
};
use crate::common::types::error::{OError, RequestError};
use crate::common::types::OResult;
Expand Down Expand Up @@ -174,6 +175,36 @@ impl VersionedDecoder for Protocol37 {
stats,
))
}

async fn decode_server_query<T>(buf: &mut T) -> OrientResult<ServerQuery>
where
T: AsyncRead + Unpin + Send,
{
let query_id = reader::read_string(buf).await?;
let changes = reader::read_bool(buf).await?;
let has_plan = reader::read_bool(buf).await?;

let execution_plan = if has_plan {
Some(read_result(buf).await?)
} else {
None
};

let _prefetched = reader::read_i32(buf).await?;
let records = read_result_set(buf).await?;
let has_next = reader::read_bool(buf).await?;
let stats = read_query_stats(buf).await?;
let _reaload_metadata = reader::read_bool(buf).await?;

Ok(ServerQuery::new(
query_id,
changes,
execution_plan,
records,
has_next,
stats,
))
}
async fn decode_connect<T>(buf: &mut T) -> OrientResult<Connect>
where
T: AsyncRead + Unpin + Send,
Expand Down Expand Up @@ -209,6 +240,10 @@ pub trait VersionedDecoder {
where
T: AsyncRead + Unpin + Send;

async fn decode_server_query<T>(buf: &mut T) -> OrientResult<ServerQuery>
where
T: AsyncRead + Unpin + Send;

async fn decode_live_query<T>(buf: &mut T) -> OrientResult<LiveQuery>
where
T: AsyncRead + Unpin + Send;
Expand Down Expand Up @@ -259,6 +294,7 @@ where
4 => T::decode_create_db(buf).await?.into(),
6 => T::decode_exist(buf).await?.into(),
7 => T::decode_drop_db(buf).await?.into(),
50 => T::decode_server_query(buf).await?.into(),
45 => T::decode_query(buf).await?.into(),
46 => T::decode_query_close(buf).await?.into(),
47 => T::decode_query(buf).await?.into(),
Expand Down
129 changes: 129 additions & 0 deletions orientdb-client/src/asynchronous/server_statement.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
use super::client::OrientDBClientInternal;
use crate::common::protocol::messages::request::ServerQuery;
use crate::common::types::value::{IntoOValue, OValue};
use crate::common::types::OResult;
#[cfg(feature = "sugar")]
use crate::types::result::FromResult;
use crate::OrientResult;
use futures::Stream;
use std::collections::HashMap;

#[cfg(feature = "sugar")]
use futures::StreamExt;

pub struct ServerStatement<'a> {
client: &'a OrientDBClientInternal,
pub(crate) user: String,
pub(crate) password: String,
stm: String,
params: HashMap<String, OValue>,
language: String,
page_size: i32,
mode: i8,
named: bool,
}

impl<'a> ServerStatement<'a> {
pub(crate) fn new(
client: &'a OrientDBClientInternal,
user: String,
password: String,
stm: String,
) -> ServerStatement<'a> {
ServerStatement {
client,
user,
password,
stm,
params: HashMap::new(),
named: true,
mode: 1,
language: String::from("sql"),
page_size: 150,
}
}

pub fn positional(mut self, params: &[&dyn IntoOValue]) -> Self {
let mut p = HashMap::new();
for (i, elem) in params.iter().enumerate() {
p.insert(i.to_string(), elem.into_ovalue());
}
self.params = p;
self.named = false;
self
}
pub fn named(mut self, params: &[(&str, &dyn IntoOValue)]) -> Self {
self.params = params
.iter()
.map(|&(k, ref v)| (String::from(k), v.into_ovalue()))
.collect();

self.named = true;
self
}

pub async fn run(self) -> OrientResult<impl Stream<Item = OrientResult<OResult>>> {
self.client.run(self.into()).await
}

#[cfg(feature = "sugar")]
pub async fn fetch_one<T>(self) -> OrientResult<Option<T>>
where
T: FromResult,
{
let mut stream = self
.client
.run(self.into())
.await?
.map(|r| r.and_then(T::from_result));

match stream.next().await {
Some(r) => Ok(Some(r?)),
None => Ok(None),
}
}

#[cfg(feature = "sugar")]
pub async fn fetch<T>(self) -> OrientResult<Vec<T>>
where
T: FromResult,
{
let mut stream = self
.client
.run(self.into())
.await?
.map(|r| r.and_then(T::from_result));

let mut results = Vec::new();

while let Some(r) = stream.next().await {
results.push(r?);
}
Ok(results)
}

#[cfg(feature = "sugar")]
pub async fn stream<T>(self) -> OrientResult<impl Stream<Item = OrientResult<T>>>
where
T: FromResult,
{
Ok(self
.client
.run(self.into())
.await?
.map(|r| r.and_then(T::from_result)))
}

pub(crate) fn into_query(self, session_id: i32, token: Option<Vec<u8>>) -> ServerQuery {
ServerQuery {
session_id,
token,
query: self.stm,
language: self.language,
named: self.named,
parameters: self.params,
page_size: self.page_size,
mode: self.mode,
}
}
}
20 changes: 19 additions & 1 deletion orientdb-client/src/asynchronous/types/resultset.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::asynchronous::network::cluster::Server;
use crate::common::protocol::messages::request::{QueryClose, QueryNext};
use crate::common::protocol::messages::response::Query;
use crate::common::protocol::messages::response::{Query, ServerQuery};
use crate::common::types::result::OResult;
use crate::OrientResult;
#[cfg(feature = "async-std-runtime")]
Expand Down Expand Up @@ -123,3 +123,21 @@ async fn close_result(
conn.send(msg.into()).await?;
Ok(())
}

pub struct ServerResultSet {
response: ServerQuery,
}

impl ServerResultSet {
pub(crate) fn new(response: ServerQuery) -> ServerResultSet {
ServerResultSet { response }
}
}

impl futures::Stream for ServerResultSet {
type Item = OrientResult<OResult>;

fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Poll::Ready(self.response.records.pop_front().map(|x| Ok(x)))
}
}
Loading
Loading