Skip to content

Commit

Permalink
aggregate query
Browse files Browse the repository at this point in the history
  • Loading branch information
maxmindlin committed Feb 25, 2024
1 parent 38a2bed commit 1e25201
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 9 deletions.
10 changes: 6 additions & 4 deletions locust-core/src/crud/proxies.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pub async fn get_proxy_by_domain(pool: &PgPool, domain: &str) -> Result<Proxy, E
JOIN domain_tag_map as dtm ON t.id = dtm.tag_id
JOIN domains as d ON d.id = dtm.domain_id
WHERE d.host = $1 AND p.date_deleted IS NULL
ORDER BY p.date_last_used
ORDER BY p.date_last_used DESC
"#,
)
.bind(domain)
Expand All @@ -43,7 +43,7 @@ pub async fn get_general_proxy(pool: &PgPool) -> Result<Proxy, Error> {
p.id, p.protocol, p.host, p.port, p.username, p.password, p.provider
FROM proxies as p
WHERE p.date_deleted IS NULL
ORDER BY p.date_last_used
ORDER BY p.date_last_used DESC
"#,
)
.fetch_one(pool)
Expand Down Expand Up @@ -171,13 +171,15 @@ pub async fn add_proxy_metric(pool: &PgPool, metric: ProxyMetric) -> Result<(),
sqlx::query(
r#"
INSERT INTO
proxy_metrics (time, proxy_id, status, response_time)
values (NOW(), $1, $2, $3)
proxy_metrics (time, proxy_id, status, success, response_time, domain)
values (NOW(), $1, $2, $3, $4, $5)
"#,
)
.bind(metric.proxy_id)
.bind(metric.status as i32)
.bind(metric.status < 400)
.bind(metric.response_time as i32)
.bind(metric.domain.to_owned())
.execute(pool)
.await?;
Ok(())
Expand Down
1 change: 1 addition & 0 deletions locust-core/src/models/proxies.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,5 @@ pub struct ProxyMetric {
pub proxy_id: i32,
pub status: u16,
pub response_time: u32,
pub domain: Option<String>,
}
25 changes: 25 additions & 0 deletions migrations/V2__proxy_metrics.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ CREATE TABLE IF NOT EXISTS proxy_metrics (
time TIMESTAMPTZ NOT NULL,
proxy_id integer NOT NULL,
status integer NOT NULL,
success boolean NOT NULL,
response_time integer NOT NULL,
domain varchar NOT NULL,
CONSTRAINT proxy_id_fkey FOREIGN KEY (proxy_id) REFERENCES proxies(id)
);

Expand All @@ -12,5 +14,28 @@ SELECT add_retention_policy('proxy_metrics', INTERVAL '1 month');
CREATE INDEX idx_status_time ON proxy_metrics(status, time DESC);
CREATE INDEX idx_proxy_id_time ON proxy_metrics(proxy_id, time DESC);
CREATE INDEX idx_response_time_time ON proxy_metrics(response_time, time DESC);
CREATE INDEX idx_domain_time ON proxy_metrics(domain, time DESC);
CREATE INDEX idx_success_time ON proxy_metrics(success, time DESC);
CREATE INDEX idx_proxy_metrics_proxy_id ON proxy_metrics(proxy_id);
CREATE INDEX idx_proxy_metrics_status ON proxy_metrics(status);
CREATE INDEX idx_proxy_metrics_domain ON proxy_metrics(domain);

CREATE MATERIALIZED VIEW proxy_metrics_daily
WITH (timescaledb.continuous) AS
SELECT
domain,
proxy_id,
success,
time_bucket(INTERVAL '1 day', time) as bucket,
AVG(response_time),
MAX(response_time),
MIN(response_time)
FROM proxy_metrics
GROUP BY domain, proxy_id, success, bucket
WITH NO DATA;

SELECT add_continuous_aggregate_policy('proxy_metrics_daily',
start_offset => INTERVAL '1 month',
end_offset => INTERVAL '1 day',
schedule_interval => INTERVAL '1 hour');

16 changes: 11 additions & 5 deletions src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,14 +72,15 @@ where
}

pub async fn proxy(self, req: Request<Body>) -> Result<Response<Body>, Infallible> {
println!("REQUEST: {:?}", req);
println!("REQUEST: {req:?}");
if req.method() == Method::CONNECT {
Ok(self.process_connect(req))
} else if hyper_tungstenite::is_upgrade_request(&req) {
unimplemented!()
} else {
let host: Option<String> = req.uri().host().map(Into::into);
let upstream_proxy = self
.get_upstream_proxy(req.uri())
.get_upstream_proxy(host.clone())
.await
.expect("Error getting proxy for client");
let client = build_client(&upstream_proxy);
Expand All @@ -89,20 +90,25 @@ where
.await
.expect("Error with request");
let duration = start_time.elapsed().as_millis();
println!("RESPONSE: {res:?}");
if let Err(e) = self.db_job_chan.send(DBJob::ProxyResponse {
proxy_id: upstream_proxy.id,
status: res.status(),
response_time: duration as u32,
domain: host.map(Into::into),
}) {
println!("Error sending proxy response job: {e}");
}
Ok(res)
}
}

async fn get_upstream_proxy(&self, uri: &Uri) -> Result<models::proxies::Proxy, sqlx::Error> {
match uri.host() {
Some(host) => get_proxy_by_domain(&self.db, host).await,
async fn get_upstream_proxy(
&self,
host: Option<String>,
) -> Result<models::proxies::Proxy, sqlx::Error> {
match host {
Some(host) => get_proxy_by_domain(&self.db, &host).await,
None => get_general_proxy(&self.db).await,
}
}
Expand Down
3 changes: 3 additions & 0 deletions src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,15 @@ impl DBWorker {
proxy_id,
status,
response_time,
domain,
} => {
if let Err(e) = add_proxy_metric(
&self.pool,
ProxyMetric {
proxy_id,
response_time,
status: status.as_u16(),
domain,
},
)
.await
Expand All @@ -51,5 +53,6 @@ pub enum DBJob {
proxy_id: i32,
status: StatusCode,
response_time: u32,
domain: Option<String>,
},
}

0 comments on commit 1e25201

Please sign in to comment.