Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GSB API response payload any data type support #2636

Merged
merged 4 commits into from
Jun 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 67 additions & 1 deletion core/gsb-api/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ mod tests {
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Duration;
use test_case::test_case;
use ya_core_model::gftp::{GetChunk, GftpChunk};
use ya_core_model::gftp::{GetChunk, GftpChunk, UploadChunk};
use ya_core_model::NodeId;
use ya_service_api_interfaces::Provider;
use ya_service_api_web::middleware::auth::dummy::DummyAuth;
Expand Down Expand Up @@ -255,6 +255,72 @@ mod tests {
);
}

#[actix_web::test]
#[serial]
async fn upload_chunk_test() {
let mut api = dummy_api();

let service_number = SERVICE_COUNTER.fetch_add(1, Ordering::SeqCst);
let service_addr = format!("{SERVICE_ADDR}_{service_number}");

let bind_req = api
.post(format!("/{}/{}", GSB_API_PATH, "services"))
.send_json(&ServiceRequest {
listen: ServiceListenRequest {
components: vec!["UploadChunk".to_string()],
on: service_addr.clone(),
},
});

let body =
verify_bind_service_response(bind_req, vec!["UploadChunk".to_string()], &service_addr)
.await;

let services_path = format!("gsb-api/v1/services/{}", body.services_id);
let mut ws_frames = api.ws_at(&services_path).await.unwrap();

let gsb_endpoint = ya_service_bus::typed::service(service_addr.clone());

let (gsb_res, ws_res) = tokio::join!(
async {
gsb_endpoint
.call(UploadChunk {
chunk: GftpChunk {
offset: 0,
content: vec![1, 2, 3],
},
})
.await
},
async {
println!("WS sleep");
tokio::time::sleep(Duration::from_millis(100)).await;
let ws_req = ws_frames.next().await;
let ws_req = match ws_req {
Some(Ok(Frame::Binary(ws_req))) => {
flexbuffers::from_slice::<TestWsRequest<UploadChunk>>(&ws_req).unwrap()
}
msg => panic!("Unexpected msg: {:?}", msg),
};
let ws_res = TestWsResponse {
id: ws_req.id,
payload: (),
};
let ws_res = flexbuffers::to_vec(ws_res).unwrap();
ws_frames
.send(ws::Message::Binary(Bytes::from(ws_res)))
.await
}
);

ws_res.unwrap();
gsb_res
.expect("Response is unit type")
.expect("Response is ok");

verify_delete_service(&mut api, &service_addr).await;
}

#[actix_web::test]
#[serial]
async fn ok_payload_test() {
Expand Down
48 changes: 30 additions & 18 deletions core/gsb-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use actix_http::ws::CloseCode;
use actix_http::ws::{CloseReason, ProtocolError};
use actix_web_actors::ws::{self, WebsocketContext};

use flexbuffers::{BuilderOptions, MapReader, Reader};
use flexbuffers::{BuilderOptions, Reader};
use futures::FutureExt;
use serde::{Deserialize, Serialize};
use service::Service;
Expand Down Expand Up @@ -41,15 +41,15 @@ impl GsbApiService {
pub(crate) type GsbError = ya_service_bus::Error;

#[derive(Message, Serialize, Deserialize, Debug)]
#[rtype(result = "Result<(), anyhow::Error>")]
#[rtype(result = "anyhow::Result<()>")]
struct WsRequest {
id: String,
component: String,
payload: Vec<u8>,
}

#[derive(Message, Debug)]
#[rtype(result = "Result<(), anyhow::Error>")]
#[rtype(result = "anyhow::Result<()>")]
pub(crate) struct WsResponse {
pub id: String,
pub response: WsResponseMsg,
Expand All @@ -59,14 +59,12 @@ impl WsResponse {
pub(crate) fn try_new(
response_key: &str,
id: &str,
payload: &MapReader<&[u8]>,
payload: &Reader<&[u8]>,
) -> Result<WsResponse, String> {
let mut response_builder = flexbuffers::Builder::new(BuilderOptions::empty());
let mut response_map_builder = response_builder.start_map();
let response_map_field_builder = response_map_builder.start_map(response_key);
match flexbuffer_util::clone_map(response_map_field_builder, payload) {
let response_map_builder = response_builder.start_map();
match flexbuffer_util::clone_field(response_map_builder, payload, response_key) {
Ok(_) => {
response_map_builder.end_map();
let response = WsResponse {
id: id.to_string(),
response: WsResponseMsg::Message(response_builder.view().to_vec()),
Expand Down Expand Up @@ -197,10 +195,10 @@ fn read_ws_response(buffer: &bytes::Bytes) -> Result<WsResponse, String> {
.map_err(|err| format!("Missing root map. Err: {err}"))?;
let id = flexbuffer_util::read_string(&response, "id")
.map_err(|err| format!("Missing response id. Err: {err}"))?;
if let Ok(error_payload) = flexbuffer_util::read_map(&response, "error", false) {
if let Ok(error_payload) = flexbuffer_util::read_field(&response, "error", false) {
WsResponse::try_new("Err", &id, &error_payload)
.map_err(|err| format!("Failed to read error payload. Id: {id}. Err: {err}"))
} else if let Ok(payload) = flexbuffer_util::read_map(&response, "payload", true) {
} else if let Ok(payload) = flexbuffer_util::read_field(&response, "payload", true) {
WsResponse::try_new("Ok", &id, &payload)
.map_err(|err| format!("Failed to read payload. Id: {id}. Err: {err}"))
} else {
Expand Down Expand Up @@ -375,10 +373,7 @@ mod flexbuffer_util {
}
}

pub(crate) fn read_string(
reader: &MapReader<&[u8]>,
key: &str,
) -> Result<String, anyhow::Error> {
pub(crate) fn read_string(reader: &MapReader<&[u8]>, key: &str) -> anyhow::Result<String> {
match reader.index(key) {
Ok(field) => match field.get_str() {
Ok(txt) => Ok(txt.to_string()),
Expand All @@ -391,7 +386,7 @@ mod flexbuffer_util {
pub(crate) fn as_map<'a>(
reader: &Reader<&'a [u8]>,
allow_empty: bool,
) -> Result<MapReader<&'a [u8]>, anyhow::Error> {
) -> anyhow::Result<MapReader<&'a [u8]>> {
match reader.get_map() {
Ok(map) => {
if allow_empty || !map.is_empty() {
Expand All @@ -403,17 +398,34 @@ mod flexbuffer_util {
}
}

pub(crate) fn read_map<'a>(
pub(crate) fn read_field<'a>(
reader: &MapReader<&'a [u8]>,
key: &str,
allow_empty: bool,
) -> Result<MapReader<&'a [u8]>, anyhow::Error> {
) -> anyhow::Result<Reader<&'a [u8]>> {
match reader.index(key) {
Ok(reader) => as_map(&reader, allow_empty),
Ok(reader) => {
if reader.length() > 0 || allow_empty {
return Ok(reader);
}
anyhow::bail!("Empty response field {key}");
}
Err(err) => anyhow::bail!("Failed to find response field: {}. Err: {}", key, err),
}
}

pub(crate) fn clone_field(
builder: MapBuilder,
reader: &Reader<&[u8]>,
key: &str,
) -> Result<(), flexbuffers::ReaderError> {
let mut pusher = FlexMapPusher { builder, key };
let value_type = reader.flexbuffer_type();
pusher = push(reader, value_type, pusher)?;
pusher.end();
Ok(())
}

pub(crate) fn clone_map(
builder: MapBuilder,
map_reader: &MapReader<&[u8]>,
Expand Down