Skip to content

Commit

Permalink
Merge branch 'gamerhash/master' of github.com:golemfactory/yagna into…
Browse files Browse the repository at this point in the history
… transfers/add-deploy-sha2
  • Loading branch information
nieznanysprawiciel committed Sep 9, 2024
2 parents 9c0ef7f + 7b1f1c0 commit a0f9856
Show file tree
Hide file tree
Showing 8 changed files with 116 additions and 71 deletions.
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ static-openssl = ["openssl/vendored", "openssl-probe"]
dummy-driver = ['ya-dummy-driver']
erc20-driver = ['ya-erc20-driver']
tos = []
framework-test = ['ya-exe-unit/framework-test']
framework-test = ['ya-exe-unit/framework-test', 'ya-activity/framework-test']
# Temporary to make goth integration tests work
central-net = ['ya-net/central-net']
packet-trace-enable = [
Expand Down Expand Up @@ -236,7 +236,7 @@ members = [
# this entry is needed to make sqlx version >=0.5.9 work with diesel 1.4.*
# diesel 1.4.* supports up to 0.23.0, but sqlx 0.5.9 requires 0.22.0
# sqlx 0.5.10 need 0.23.2, so 0.5.9 is last version possible
derive_more = "0.99.11"
derive_more = "0.99"
erc20_payment_lib = { git = "https://github.com/golemfactory/erc20_payment_lib", rev = "4eb076ec19bf58cf4063b4cdb4cf370473892203" }
erc20_processor = { git = "https://github.com/golemfactory/erc20_payment_lib", rev = "4eb076ec19bf58cf4063b4cdb4cf370473892203" }
erc20_payment_lib_common = { git = "https://github.com/golemfactory/erc20_payment_lib", rev = "4eb076ec19bf58cf4063b4cdb4cf370473892203" }
Expand Down
3 changes: 3 additions & 0 deletions core/activity/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ version = "0.4.0"
authors = ["Golem Factory <contact@golem.network>"]
edition = "2018"

[features]
framework-test = ['ya-gsb-http-proxy/framework-test']

[dependencies]
ya-core-model = { version = "0.9", features = ["activity", "market"] }
ya-client-model = { version = "0.6", features = ["sgx"] }
Expand Down
7 changes: 5 additions & 2 deletions core/activity/src/http_proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,12 @@ async fn proxy_http_request(
let activity_id = path_activity_url.activity_id;
let path = path_activity_url.url;

let result = authorize_activity_executor(&db, id.identity, &activity_id, Role::Requestor).await;
let result =
authorize_activity_initiator(&db, id.identity, &activity_id, Role::Requestor).await;
if let Err(e) = result {
log::error!("Authorize error {}", e);
log::info!(
"Proxy authorize error (currently not authorized requests are not rejected): {e}"
);
}

let agreement = get_activity_agreement(&db, &activity_id, Role::Requestor).await?;
Expand Down
35 changes: 19 additions & 16 deletions exe-unit/components/gsb-http-proxy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,34 +4,37 @@ version = "0.1.0"
edition = "2021"


[features]
framework-test = []

[dependencies]
ya-service-bus = { workspace = true }
ya-counters = { path = "../counters" }
ya-client-model = "0.6"
ya-core-model = { version = "^0.9" }

thiserror = "1.0"
serde = { version = "1.0", features = ["derive"] }
chrono = "0.4"
http = "1.0"
serde_json = "1.0"
tokio = { version = "1.35", features = ["full"] }
reqwest = { version = "0.11", features = ["json", "stream"] }
log = { version = "0.4", features = [] }
async-stream = "0.3"
futures = { version = "0.3", features = [] }
futures-core = "0.3"
serde_derive = "1.0"
actix = "0.13"
actix-http = "3"
actix-web = "4"
actix-rt = "2.7"
anyhow = "1.0"
rand = "0.8.5"
hex = "0.4.3"
env_logger = "0.10.2"
async-stream = "0.3"
bytes = "1.6.0"
derive_more = "0.99.17"
chrono = "0.4"
derive_more = { workspace = true }
env_logger = "0.10.2"
futures = { version = "0.3", features = [] }
futures-core = "0.3"
hex = "0.4.3"
http = "1.0"
log = { version = "0.4", features = [] }
rand = { workspace = true }
reqwest = { version = "0.11", features = ["json", "stream"] }
serde = { version = "1.0", features = ["derive"] }
serde_derive = "1.0"
serde_json = "1.0"
thiserror = "1.0"
tokio = { version = "1", features = ["full"] }

[dev-dependencies]
mockito = "1.2"
Expand Down
49 changes: 32 additions & 17 deletions exe-unit/components/gsb-http-proxy/src/gsb_to_http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@ impl GsbToHttpProxy {

pub fn bind(&mut self, gsb_path: &str) -> Handle {
let this = self.clone();
bus::bind(gsb_path, move |message: GsbHttpCallMessage| {
bus::bind_with_caller(gsb_path, move |caller, message: GsbHttpCallMessage| {
let mut this = this.clone();
async move { Ok(this.pass(message).await) }
async move { Ok(this.pass(caller, message).await) }
})
}

Expand All @@ -67,10 +67,13 @@ impl GsbToHttpProxy {
})
}

pub async fn pass(&mut self, message: GsbHttpCallMessage) -> GsbHttpCallResponse {
pub async fn pass(
&mut self,
caller: String,
message: GsbHttpCallMessage,
) -> GsbHttpCallResponse {
let url = format!("{}{}", self.base_url, message.path);
log::info!("Gsb to http call - Url: {url}");

let path = message.path.clone();
let mut counters = self.counters.clone();

let method = match Method::from_bytes(message.method.to_uppercase().as_bytes()) {
Expand All @@ -82,9 +85,10 @@ impl GsbToHttpProxy {
)
}
};
let builder = Self::create_request_builder(method, &url, message.headers, message.body);
let builder =
Self::create_request_builder(method.clone(), &url, message.headers, message.body);

log::debug!("Calling {}", &url);
log::info!("Gsb proxy http call {method} to {url} from {caller}");
let response_handler = counters.on_request();
let response = builder
.send()
Expand All @@ -98,18 +102,27 @@ impl GsbToHttpProxy {
response_handler.on_response();
match response.bytes().await {
Ok(bytes) => {
log::info!(
"GSB http proxy: response for {method} `{path}` status: {status_code}"
);
GsbHttpCallResponse::new(bytes.to_vec(), response_headers, status_code)
}
Err(err) => GsbHttpCallResponse::with_message(
format!("Error in response: {err}").into_bytes(),
StatusCode::INTERNAL_SERVER_ERROR.as_u16(),
),
Err(err) => {
log::info!("GSB http proxy: response for {method} `{path}` status: {status_code}, error: {err}");
GsbHttpCallResponse::with_message(
format!("Error in response: {err}").into_bytes(),
StatusCode::INTERNAL_SERVER_ERROR.as_u16(),
)
}
}
}
Err(err) => GsbHttpCallResponse::with_message(
format!("Error in response: {err}").into_bytes(),
StatusCode::INTERNAL_SERVER_ERROR.as_u16(),
),
Err(err) => {
log::info!("GSB http proxy: error calling {method} `{path}`: {err}");
GsbHttpCallResponse::with_message(
format!("Error in response: {err}").into_bytes(),
StatusCode::INTERNAL_SERVER_ERROR.as_u16(),
)
}
}
}

Expand Down Expand Up @@ -272,8 +285,9 @@ mod tests {
let mut requests_counter = gsb_call.requests_counter();
let mut requests_duration_counter = gsb_call.requests_duration_counter();

let caller = "0x0000000000000000000000000000000000000000".to_string();
let message = message();
let response = gsb_call.pass(message).await;
let response = gsb_call.pass(caller.clone(), message).await;

let mut headers = vec![];

Expand Down Expand Up @@ -377,9 +391,10 @@ mod tests {

async fn run_10_requests(mut gsb_call_proxy: GsbToHttpProxy) {
let message = message();
let caller = "0x0000000000000000000000000000000000000000".to_string();
for _ in 0..10 {
let message = message.clone();
let response = gsb_call_proxy.pass(message).await;
let response = gsb_call_proxy.pass(caller.clone(), message).await;
assert_eq!("response".as_bytes(), response.body.msg_bytes);
}
}
Expand Down
79 changes: 48 additions & 31 deletions exe-unit/components/gsb-http-proxy/src/http_to_gsb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,15 @@ impl HttpToGsbProxy {
bus_addr: bus_addr.to_string(),
}
}

pub fn endpoint(&self) -> bus::Endpoint {
match &self.binding {
BindingMode::Local => bus::service(&self.bus_addr),
BindingMode::Net(binding) => ya_net::from(binding.from)
.to(binding.to)
.service(&self.bus_addr),
}
}
}

pub struct HttpToGsbProxyResponse<T> {
Expand Down Expand Up @@ -74,44 +83,52 @@ impl HttpToGsbProxy {
};

let msg = GsbHttpCallMessage {
method,
path,
method: method.clone(),
path: path.clone(),
body,
headers: Headers::default().filter(&headers),
};

let response = match &self.binding {
BindingMode::Local => bus::service(&self.bus_addr).call(msg).await,
BindingMode::Net(binding) => {
ya_net::from(binding.from)
.to(binding.to)
.service(&self.bus_addr)
.call(msg)
.await
}
};
let endpoint = self.endpoint();

let result = response.unwrap_or_else(|e| Err(HttpProxyStatusError::from(e)));
log::info!("Proxy http {msg} call to [{}]", endpoint.addr());
let result = endpoint
.call(msg)
.await
.unwrap_or_else(|e| Err(HttpProxyStatusError::from(e)));

match result {
Ok(r) => HttpToGsbProxyResponse {
body: actix_web::web::Bytes::from(r.body.msg_bytes)
.try_into_bytes()
.map_err(|_| {
Error::GsbFailure("Failed to invoke GsbHttpProxy call".to_string())
}),
status_code: r.header.status_code,
response_headers: r.header.response_headers,
},
Err(err) => HttpToGsbProxyResponse {
body: actix_web::web::Bytes::from(format!("Error: {}", err))
.try_into_bytes()
.map_err(|_| {
Error::GsbFailure("Failed to invoke GsbHttpProxy call".to_string())
}),
status_code: StatusCode::INTERNAL_SERVER_ERROR.as_u16(),
response_headers: HashMap::new(),
},
Ok(r) => {
log::info!(
"Http proxy: response for {method} `{path}` call to [{}]: status: {}",
endpoint.addr(),
r.header.status_code
);
HttpToGsbProxyResponse {
body: actix_web::web::Bytes::from(r.body.msg_bytes)
.try_into_bytes()
.map_err(|_| {
Error::GsbFailure("Failed to invoke GsbHttpProxy call".to_string())
}),
status_code: r.header.status_code,
response_headers: r.header.response_headers,
}
}
Err(err) => {
log::warn!(
"Http proxy: error calling {method} `{path}` at [{}]: error: {err}",
endpoint.addr()
);
HttpToGsbProxyResponse {
body: actix_web::web::Bytes::from(format!("Error: {err}"))
.try_into_bytes()
.map_err(|_| {
Error::GsbFailure("Failed to invoke GsbHttpProxy call".to_string())
}),
status_code: StatusCode::INTERNAL_SERVER_ERROR.as_u16(),
response_headers: HashMap::new(),
}
}
}
}

Expand Down
6 changes: 5 additions & 1 deletion exe-unit/components/gsb-http-proxy/src/message.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
use crate::error::HttpProxyStatusError;
use crate::response::{GsbHttpCallResponse, GsbHttpCallResponseStreamChunk};

use derive_more::Display;
use serde_derive::{Deserialize, Serialize};
use std::collections::HashMap;

use ya_service_bus::{RpcMessage, RpcStreamMessage};

#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Display)]
#[serde(rename_all = "camelCase")]
#[display(fmt = "{method} `{path}`")]
pub struct GsbHttpCallMessage {
pub method: String,
pub path: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,6 @@ clap = { version = "4.3.4", features = ["derive"] }
rand = "0.8.5"
serde = { version = "1.0.164", features = ["derive"] }
serde_json = "1"
tokio = { version = "1.28.2", features = ["full"] }
reqwest = "0.11.18"
tokio = { version = "1", features = ["full"] }
reqwest = "0.11"
anyhow = "1.0.71"

0 comments on commit a0f9856

Please sign in to comment.