Skip to content

Commit

Permalink
Merge pull request #37 from AdExNetwork/bugfixes
Browse files Browse the repository at this point in the history
Bug fixes
  • Loading branch information
Ivo Georgiev authored Dec 20, 2020
2 parents 7cb5017 + e122a14 commit b152b6d
Show file tree
Hide file tree
Showing 11 changed files with 306 additions and 233 deletions.
24 changes: 19 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 5 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@ authors = ["Lachezar Lechev <lachezar@adex.network>"]
edition = "2018"

[dependencies]
primitives = { git = "https://github.com/AdExNetwork/adex-validator-stack-rust", branch = "dev" }
primitives = { git = "https://github.com/AdExNetwork/adex-validator-stack-rust", branch = "bugfixes" }
chrono = { version = "0.4" }
futures = { version = "0.3" }
async-trait = { version = "^0.1" }
thiserror = "^1.0"

# Server
tokio = { version = "0.2", features = ["macros", "rt-threaded", "sync"] }
tokio = { version = "0.2", features = ["macros", "rt-threaded", "sync", "signal"] }
hyper-tls = "0.4"
hyper = { version = "0.13", features = ["stream"] }
http = "0.2"

Expand All @@ -27,8 +28,8 @@ clap = "2.33"
toml = "0.5"

# Logging
slog = { version = "2.5" }
slog-term = "2.5"
slog = { version = "2.7" }
slog-term = "2.6"
slog-async = "2.5"
# Other
lazy_static = "1.4"
Expand Down
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,14 @@ docker build -t adex-supermarket .

2. After building the image you can start a container (`production`):

- with production `https://market.adex.network`:

```
docker run --detach -e ENV=production -p 3000:3000 -e MARKET_URL=https://market.adex.network/ adex-supermarket
```

- with locally running `adex-market`:

```bash
docker run --detach -e ENV=production -e MARKET_URL=https://localhost:4000 adex-supermarket
```
Expand Down
59 changes: 47 additions & 12 deletions src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ mod test {
.expect("Valid URL");

let channel = setup_channel(&leader_url, &follower_url);
let channel_id = channel.id;
let leader_id = channel.spec.validators.leader().id;
let follower_id = channel.spec.validators.follower().id;

Expand Down Expand Up @@ -286,7 +287,11 @@ mod test {
validator_messages: vec![ValidatorMessage {
from: follower_id,
received: Utc::now(),
msg: get_new_state_msg().msg,
msg: MessageTypes::NewState(NewState {
signature: String::from("0x0"),
state_root: String::from("0x0"),
balances: expected_balances.clone(),
}),
}],
};

Expand Down Expand Up @@ -316,17 +321,22 @@ mod test {
.await;

Mock::given(method("GET"))
.and(path("/leader/last-approved"))
.and(path(format!(
"/leader/channel/{}/last-approved",
channel_id
)))
.and(query_param("withHeartbeat", "true"))
.respond_with(ResponseTemplate::new(200).set_body_json(&leader_last_approved))
.expect(2_u64)
.mount(&mock_server)
.await;

Mock::given(method("GET"))
.and(path(format!(
"/leader/validator-messages/{}/NewState",
leader_id
"/leader/channel/{}/validator-messages/{}/NewState",
channel_id, leader_id
)))
.and(query_param("limit", "1"))
.respond_with(ResponseTemplate::new(200).set_body_json(&leader_latest_new_state))
.expect(2_u64)
.mount(&mock_server)
Expand All @@ -340,7 +350,11 @@ mod test {
.await;

Mock::given(method("GET"))
.and(path("/follower/last-approved"))
.and(path(format!(
"/follower/channel/{}/last-approved",
channel_id
)))
.and(query_param("withHeartbeat", "true"))
.respond_with(ResponseTemplate::new(200).set_body_json(&follower_last_approved))
.expect(2_u64)
.mount(&mock_server)
Expand All @@ -356,7 +370,7 @@ mod test {
.get(&channel.id)
.expect("This Campaign should exist and should be active");

assert_eq!(Status::Waiting, active_campaign.status);
assert_eq!(Status::Active, active_campaign.status);
assert_eq!(expected_balances, active_campaign.balances);
}

Expand Down Expand Up @@ -418,14 +432,22 @@ mod test {
};

Mock::given(method("GET"))
.and(path("/leader/last-approved"))
.and(path(format!(
"/leader/channel/{}/last-approved",
channel_id
)))
.and(query_param("withHeartbeat", "true"))
.respond_with(ResponseTemplate::new(200).set_body_json(&leader_last_approved))
.expect(1_u64)
.mount(&mock_server)
.await;

Mock::given(method("GET"))
.and(path("/follower/last-approved"))
.and(path(format!(
"/follower/channel/{}/last-approved",
channel_id
)))
.and(query_param("withHeartbeat", "true"))
.respond_with(ResponseTemplate::new(200).set_body_json(&follower_last_approved))
.expect(1_u64)
.mount(&mock_server)
Expand Down Expand Up @@ -487,7 +509,11 @@ mod test {
};

Mock::given(method("GET"))
.and(path("/leader/last-approved"))
.and(path(format!(
"/leader/channel/{}/last-approved",
channel_id
)))
.and(query_param("withHeartbeat", "true"))
.respond_with(ResponseTemplate::new(200).set_body_json(&leader_last_approved))
.expect(1_u64)
.mount(&mock_server)
Expand Down Expand Up @@ -588,7 +614,10 @@ mod test {
.await;

Mock::given(method("GET"))
.and(path("/leader/last-approved"))
.and(path(format!(
"/leader/channel/{}/last-approved",
channel_id
)))
.and(query_param("withHeartbeat", "true"))
.respond_with(ResponseTemplate::new(200).set_body_json(&leader_last_approved))
// The second time we call is from the Follower Validator to get up to date Status of the Campaign
Expand All @@ -604,7 +633,10 @@ mod test {
.await;

Mock::given(method("GET"))
.and(path("/follower/last-approved"))
.and(path(format!(
"/follower/channel/{}/last-approved",
channel_id
)))
.and(query_param("withHeartbeat", "true"))
.respond_with(ResponseTemplate::new(200).set_body_json(&follower_last_approved))
.expect(2_u64)
Expand Down Expand Up @@ -706,7 +738,10 @@ mod test {
};

Mock::given(method("GET"))
.and(path("/leader/last-approved"))
.and(path(format!(
"/leader/channel/{}/last-approved",
channel_id
)))
.and(query_param("withHeartbeat", "true"))
.respond_with(ResponseTemplate::new(200).set_body_json(&leader_last_approved))
// The second time we call is from the Follower Validator to get up to date Status of the Campaign
Expand Down
57 changes: 48 additions & 9 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
#![deny(clippy::all)]
#![deny(rust_2018_idioms)]
pub use cache::Cache;
use hyper::{client::HttpConnector, Body, Method, Request, Response, Server};
use hyper::{Body, Client, Method, Request, Response, Server};
use hyper_tls::HttpsConnector;
use std::net::SocketAddr;
use std::sync::Arc;
use thiserror::Error;

use http::{StatusCode, Uri};
use http::{header::HOST, StatusCode, Uri};
use slog::{error, info, Logger};

pub mod cache;
Expand All @@ -25,6 +26,9 @@ pub use sentry_api::SentryApi;

pub(crate) static ROUTE_UNITS_FOR_SLOT: &str = "/units-for-slot/";

// Uses Https Client
type HyperClient = Client<HttpsConnector<hyper::client::connect::HttpConnector>>;

#[derive(Debug, Error)]
pub enum Error {
#[error(transparent)]
Expand Down Expand Up @@ -56,7 +60,9 @@ pub async fn serve(
) -> Result<(), Error> {
use hyper::service::{make_service_fn, service_fn};

let client = hyper::Client::new();
let https = HttpsConnector::new();
let client: HyperClient = Client::builder().build(https);

let market = Arc::new(MarketApi::new(market_url, logger.clone())?);

let cache = spawn_fetch_campaigns(logger.clone(), config.clone()).await?;
Expand Down Expand Up @@ -91,8 +97,10 @@ pub async fn serve(
// Then bind and serve...
let server = Server::bind(&addr).serve(make_service);

let graceful = server.with_graceful_shutdown(shutdown_signal());

// And run forever...
if let Err(e) = server.await {
if let Err(e) = graceful.await {
error!(&logger, "server error: {}", e);
}

Expand All @@ -103,7 +111,7 @@ async fn handle<C: cache::Client>(
mut req: Request<Body>,
config: Config,
cache: Cache<C>,
client: hyper::Client<HttpConnector>,
client: HyperClient,
logger: Logger,
market: Arc<MarketApi>,
) -> Result<Response<Body>, Error> {
Expand All @@ -114,20 +122,44 @@ async fn handle<C: cache::Client>(
get_units_for_slot(&logger, market.clone(), &config, &cache, req).await
}
(_, method) => {
use http::uri::PathAndQuery;

let method = method.clone();

let path_and_query = req
.uri()
.path_and_query()
.map(ToOwned::to_owned)
.unwrap_or_else(|| PathAndQuery::from_static(""));
.map(|p_q| {
let string = p_q.to_string();
// the MarketUrl (i.e. ApiUrl) always suffixes the path
string
.strip_prefix('/')
.map(ToString::to_string)
.unwrap_or(string)
})
.unwrap_or_default();

let uri = format!("{}{}", market.market_url, path_and_query);

*req.uri_mut() = uri.parse::<Uri>()?;

// for Cloudflare we need to add a HOST header
let market_host_header = {
let url = market.market_url.to_url();
let host = url
.host_str()
.expect("MarketUrl always has a host")
.to_string();

match url.port() {
Some(port) => format!("{}:{}", host, port),
None => host,
}
};

let host = market_host_header
.parse()
.expect("The MarketUrl should be valid HOST header");
req.headers_mut().insert(HOST, host);

let proxy_response = match client.request(req).await {
Ok(response) => {
info!(&logger, "Proxied request to market"; "uri" => uri, "method" => %method);
Expand All @@ -146,6 +178,13 @@ async fn handle<C: cache::Client>(
}
}

async fn shutdown_signal() {
// Wait for the CTRL+C signal
tokio::signal::ctrl_c()
.await
.expect("failed to install CTRL+C signal handler");
}

async fn spawn_fetch_campaigns(
logger: Logger,
config: Config,
Expand Down
Loading

0 comments on commit b152b6d

Please sign in to comment.