Skip to content

Commit

Permalink
tracing and request timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
maxmindlin committed Feb 26, 2024
1 parent 7b5de0b commit 4709bd1
Show file tree
Hide file tree
Showing 9 changed files with 85 additions and 32 deletions.
7 changes: 6 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,12 @@ ci = ["github"]
# The installers to generate for each app
installers = ["shell"]
# Target platforms to build apps for (Rust target-triple syntax)
targets = ["aarch64-apple-darwin", "x86_64-apple-darwin", "x86_64-unknown-linux-gnu", "x86_64-pc-windows-msvc"]
targets = [
"aarch64-apple-darwin",
"x86_64-apple-darwin",
"x86_64-unknown-linux-gnu",
"x86_64-pc-windows-msvc",
]
# Publish jobs to run in CI
pr-run-mode = "plan"

Expand Down
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,16 @@ See [releases](https://github.com/maxmindlin/locust/releases/latest) for binarie
- Configuration commands that make sure your proxies work out of the box.
- Querying via tags that gives visibility into proxies without having to execute SQL.

Locust is a super proxy that maintains a pool of upstream proxies that it uses to route traffic to the requested web resources. As a user, you use it like any other proxy. When Locust receives a web request it analyzes the target URL and determines which proxy is best for the given request. Locust keeps track of metadata and metrics about every web request it completes in order to continually fine tune which proxies to use and remove bad ones. You can also tag web domains in order to instruct Locust to limit the pool of proxies it will choose from for requests to these domains. Via the CLI, you can also create and cycle Squid proxies in your GCP account which Locust will automatically start using as they are created.
Locust is a super proxy that maintains a pool of upstream proxies that it uses to route traffic to the requested web resources. As a user, you use it like any other proxy. When Locust receives a web request it analyzes the target URL and determines which proxy is best for the given request.

<h1 align="center">
<img src="https://raw.githubusercontent.com/maxmindlin/locust/main/assets/diagram.png" width="700"><br>
</h1>

Locust keeps track of metadata and metrics about every web request it completes in order to continually fine tune which proxies to use and remove bad ones. You can also tag web domains in order to instruct Locust to limit the pool of proxies it will choose from for requests to these domains.

Via the CLI, you can also create and cycle Squid proxies in your GCP account which Locust will automatically start using as they are created.

## Requirements

### CLI
Expand Down
13 changes: 7 additions & 6 deletions locust-core/src/crud/proxies.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
use sqlx::{
postgres::{PgPool, PgRow},
Error, Row,
};
use sqlx::{postgres::PgPool, Error, Row};

use crate::models::proxies::{NewProxy, Proxy, ProxyMetric, ProxySession};

Expand Down Expand Up @@ -58,7 +55,7 @@ pub async fn get_general_proxy(pool: &PgPool) -> Result<Proxy, Error> {
}

pub async fn get_proxy_by_id(pool: &PgPool, id: i32) -> Result<Proxy, Error> {
sqlx::query_as::<_, Proxy>(
let proxy = sqlx::query_as::<_, Proxy>(
r#"
SELECT
id, protocol, host, port, username, password, provider
Expand All @@ -68,7 +65,11 @@ pub async fn get_proxy_by_id(pool: &PgPool, id: i32) -> Result<Proxy, Error> {
)
.bind(id)
.fetch_one(pool)
.await
.await?;

update_proxy_last_used(pool, id).await?;

Ok(proxy)
}

async fn update_proxy_last_used(pool: &PgPool, id: i32) -> Result<(), Error> {
Expand Down
2 changes: 1 addition & 1 deletion locust-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ pub mod crud;
pub mod models;

pub async fn new_pool() -> Result<PgPool, Error> {
let user = env::var("POSTGRES_USER").unwrap();
let user = env::var("POSTGRES_USER").unwrap_or("postgres".into());
let pwd = env::var("POSTGRES_PASSWORD").unwrap_or("password".into());
let db = env::var("POSTGRES_DB").unwrap_or("postgres".into());
let host = env::var("POSTGRES_HOST").unwrap_or("localhost".into());
Expand Down
1 change: 1 addition & 0 deletions src/ca/openssl_authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ pub struct OpensslAuthority {
cache: Cache<Authority, Arc<ServerConfig>>,
}

#[allow(dead_code)]
impl OpensslAuthority {
/// Creates a new openssl authority.
pub fn new(pkey: PKey<Private>, ca_cert: X509, hash: MessageDigest, cache_size: u64) -> Self {
Expand Down
1 change: 1 addition & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use thiserror::Error;

#[derive(Debug, Error)]
#[non_exhaustive]
#[allow(dead_code)]
pub enum Error {
#[error("invalid CA")]
Tls(#[from] rcgen::Error),
Expand Down
4 changes: 2 additions & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ mod worker;

use crate::worker::DBWorker;
use ca::RcgenAuthority;
use futures::{executor::block_on, Future};
use futures::Future;
use hyper::{
server::conn::AddrStream,
service::{make_service_fn, service_fn},
Expand Down Expand Up @@ -105,7 +105,7 @@ async fn main() {
db_job_chan: tx,
};

println!("Starting up proxy server!");
info!("Starting up proxy server!");
if let Err(e) = wrapper.start(shutdown_signal()).await {
error!("{}", e);
}
Expand Down
78 changes: 59 additions & 19 deletions src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,9 @@ use hyper::{
use hyper_proxy::{Intercept, Proxy, ProxyConnector};
use hyper_rustls::{HttpsConnector, HttpsConnectorBuilder};
use locust_core::{
crud::{
self,
proxies::{
create_proxy_session, get_general_proxy, get_proxy_by_domain, get_proxy_by_id,
get_proxy_session,
},
crud::proxies::{
create_proxy_session, get_general_proxy, get_proxy_by_domain, get_proxy_by_id,
get_proxy_session,
},
models,
};
Expand All @@ -27,17 +24,19 @@ use std::{
convert::Infallible,
future::Future,
sync::{mpsc, Arc},
time::Instant,
time::{Duration, Instant},
};
use tokio::{
io::{AsyncRead, AsyncReadExt, AsyncWrite},
net::TcpStream,
task::JoinHandle,
time::timeout,
};
use tokio_rustls::TlsAcceptor;
use tracing::{error, info_span, warn, Instrument, Span};
use tracing::{error, info, info_span, warn, Instrument, Span};

const SESSION_KEY: &str = "locust_session";
const DEFAULT_TIMEOUT_SECS: u64 = 30;

fn bad_request() -> Response<Body> {
Response::builder()
Expand Down Expand Up @@ -81,29 +80,41 @@ where
}
}

/// The main method for the proxy Service. Determines what type of network request
/// is being requested and handles it. Acts as a MITM proxy and decrypts the content
/// of the request.
///
/// Modifies the request with required information for the Locust service and stores
/// metrics and metadata about proxy responses.
pub async fn proxy(self, req: Request<Body>) -> Result<Response<Body>, Infallible> {
println!("REQUEST: {req:?}");
info!("REQUEST: {req:?}");
if req.method() == Method::CONNECT {
Ok(self.process_connect(req))
} else if hyper_tungstenite::is_upgrade_request(&req) {
unimplemented!()
} else {
let req = normalize_request(req);
let maybe_session = extract_session_cookie(&req);
let host: Option<String> = req.uri().host().map(Into::into);
let (upstream_proxy, session_id) = match maybe_session {
// If we dont already have a session, get a proxy
// from the db and create a new session with it.
None => {
let proxy = self
.get_upstream_proxy(host.clone())
.await
.expect("Error getting proxy for client");
println!("CREATING SESSION");
info!("CREATING SESSION");
let session = create_proxy_session(&self.db, proxy.id)
.await
.expect("Error creation proxy session");
(proxy, session.id)
}

// If we already have a session going then look it up
// and look up the proxy associated with it.
Some(id) => {
println!("USING SESSION");
info!("USING SESSION");
let session = get_proxy_session(&self.db, id)
.await
.expect("Error getting proxy session");
Expand All @@ -113,24 +124,51 @@ where
(proxy, session.id)
}
};
// @TODO perhaps cache clients to various proxies? TBD how much
// overhead creating a client every time creates. Caching would
// increase memory usage but perhaps lower latency.
let client = build_client(&upstream_proxy);
let start_time = Instant::now();
println!("SENDING REQ");
let mut res = client
.request(normalize_request(req))
.await
.expect("Error with request");

// Make the upstream request, but wrap it in
// a timeout. If the timeout completes first,
// then return a gateway timeout response.
let mut res = match timeout(
Duration::from_secs(DEFAULT_TIMEOUT_SECS),
client.request(req),
)
.await
{
Ok(res) => match res {
Ok(res) => res,
Err(e) => {
error!("Error making request {e}");
Response::builder()
.status(StatusCode::INTERNAL_SERVER_ERROR)
.body(hyper::Body::empty())
.unwrap()
}
},
Err(_) => Response::builder()
.status(StatusCode::GATEWAY_TIMEOUT)
.body(hyper::Body::empty())
.unwrap(),
};
let duration = start_time.elapsed().as_millis();
println!("RESPONSE: {res:?}");
info!("RESPONSE STATUS: {}", res.status());
if let Err(e) = self.db_job_chan.send(DBJob::ProxyResponse {
proxy_id: upstream_proxy.id,
status: res.status(),
response_time: duration as u32,
domain: host.map(Into::into),
}) {
println!("Error sending proxy response job: {e}");
warn!("Error sending proxy response job: {e}");
}
res.headers_mut().insert(

// Instruct the client to add the session cookie
// so that its included in the subsequent requests
// and we can make sure to use the same proxy.
res.headers_mut().append(
"Set-Cookie",
HeaderValue::from_str(format!("{SESSION_KEY}={session_id}").as_ref()).unwrap(),
);
Expand Down Expand Up @@ -269,6 +307,8 @@ where
}
}

/// Creates an HTTPS client that proxies traffic to the provided
/// Proxy.
fn build_client(
upstream_proxy: &models::proxies::Proxy,
) -> Client<ProxyConnector<HttpsConnector<HttpConnector>>> {
Expand Down
5 changes: 3 additions & 2 deletions src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::sync::{mpsc, Arc};
use http::StatusCode;
use locust_core::{crud::proxies::add_proxy_metric, models::proxies::ProxyMetric};
use sqlx::PgPool;
use tracing::warn;

pub struct DBWorker {
pool: Arc<PgPool>,
Expand All @@ -19,7 +20,7 @@ impl DBWorker {
self.process_job(job).await;
}

println!("Error receiving worker job. Exiting");
warn!("Error receiving worker job. Exiting");
}

async fn process_job(&self, job: DBJob) {
Expand All @@ -41,7 +42,7 @@ impl DBWorker {
)
.await
{
println!("Error processing db job {e}");
warn!("Error processing db job {e}");
}
}
}
Expand Down

0 comments on commit 4709bd1

Please sign in to comment.