Skip to content

Commit

Permalink
logs stats
Browse files Browse the repository at this point in the history
  • Loading branch information
ragibkl committed Aug 28, 2024
1 parent 4644615 commit e2c33e7
Show file tree
Hide file tree
Showing 7 changed files with 205 additions and 89 deletions.
1 change: 1 addition & 0 deletions src/get_logs.hbs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

<body>
<p>ip address: {{ip}}</p>
<p>active ips in last day: {{ active_ips_last_day }}</p>

<table>
<tr>
Expand Down
46 changes: 38 additions & 8 deletions src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,36 @@ use axum::{
};
use handlebars::Handlebars;

use crate::logs_store::{LogsStore, QueryLog};
use crate::logs::{QueryLog, QueryLogs, UsageStats};

static GET_LOGS_TEMPLATE: &str = include_str!("./get_logs.hbs");

#[derive(serde::Serialize, Debug, Clone)]
pub struct GetLogsApiOutput {
ip: String,
queries: Vec<QueryLog>,
}

#[derive(serde::Serialize, Debug, Clone)]
pub struct GetLogsOutput {
ip: String,
queries: Vec<QueryLog>,
active_ips_last_day: usize,
}

#[derive(Clone)]
pub struct AppState {
logs_store: QueryLogs,
usage_stats: UsageStats,
}

impl AppState {
pub fn new(logs_store: QueryLogs, usage_stats: UsageStats) -> Self {
Self {
logs_store,
usage_stats,
}
}
}

fn get_ip(addr: SocketAddr) -> String {
Expand All @@ -28,29 +50,37 @@ fn get_ip(addr: SocketAddr) -> String {
#[axum_macros::debug_handler]
pub async fn get_logs_api(
ConnectInfo(addr): ConnectInfo<SocketAddr>,
State(logs_store): State<LogsStore>,
) -> Json<GetLogsOutput> {
State(app_state): State<AppState>,
) -> Json<GetLogsApiOutput> {
tracing::info!("get_logs_api - addr: {addr}");

let ip = get_ip(addr);
let queries = logs_store.get_logs_for_ip(&ip);
let queries = app_state.logs_store.get_logs_for_ip(&ip);

Json(GetLogsOutput { ip, queries })
Json(GetLogsApiOutput { ip, queries })
}

#[axum_macros::debug_handler]
pub async fn get_logs(
ConnectInfo(addr): ConnectInfo<SocketAddr>,
State(logs_store): State<LogsStore>,
State(app_state): State<AppState>,
) -> Html<String> {
tracing::info!("get_logs - addr: {addr}");

let ip = get_ip(addr);
let queries = logs_store.get_logs_for_ip(&ip);
let queries = app_state.logs_store.get_logs_for_ip(&ip);
let active_ips_last_day = app_state.usage_stats.get_active_ips_in_last_day();

let reg = Handlebars::new();
let response = reg
.render_template(GET_LOGS_TEMPLATE, &GetLogsOutput { ip, queries })
.render_template(
GET_LOGS_TEMPLATE,
&GetLogsOutput {
ip,
queries,
active_ips_last_day,
},
)
.unwrap();

Html(response)
Expand Down
49 changes: 49 additions & 0 deletions src/logs/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
mod query_log;
mod query_logs;
mod usage_stats;

pub use query_log::*;
pub use query_logs::*;
pub use usage_stats::*;

use crate::tasks::dnstap::read_dnstap_logs;

#[derive(Debug, Clone)]
pub struct LogsConsumer {
logs_store: QueryLogs,
usage_stats: UsageStats,
}

impl LogsConsumer {
pub fn new(logs_store: QueryLogs, usage_stats: UsageStats) -> Self {
Self {
logs_store,
usage_stats,
}
}

pub async fn ingest_logs_from_file(&self) {
tracing::trace!("LogsStore remove_expired_logs");
self.logs_store.remove_expired_logs();
tracing::trace!("LogsStore remove_expired_logs. DONE");

tracing::trace!("LogsStore read_dnstap_logs");
let content = read_dnstap_logs().await;
let content_len = content.len();
tracing::trace!("LogsStore read_dnstap_logs. DONE, content_len={content_len}");

tracing::trace!("LogsStore extract_query_logs");
let logs_hash_map = extract_query_logs(&content);
let logs_hash_map_len = logs_hash_map.len();
tracing::trace!(
"LogsStore extract_query_logs. DONE, logs_hash_map_len={logs_hash_map_len}"
);

tracing::trace!("LogsStore logs_hash_map");
self.logs_store.merge_logs(&logs_hash_map);
self.logs_store.remove_expired_logs();

self.usage_stats.merge_logs(&logs_hash_map);
tracing::trace!("LogsStore logs_hash_map. DONE");
}
}
81 changes: 8 additions & 73 deletions src/logs_store.rs → src/logs/query_log.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,6 @@
use std::{
collections::HashMap,
sync::{Arc, Mutex},
};
use std::collections::HashMap;

use chrono::{DateTime, Duration, NaiveDateTime, Utc};

use crate::tasks::dnstap::read_dnstap_logs;
use chrono::{DateTime, NaiveDateTime, Utc};

#[allow(dead_code)]
#[derive(serde::Deserialize, Debug, Clone, PartialEq)]
Expand Down Expand Up @@ -39,10 +34,10 @@ fn parse_query_time(query_time: &str) -> DateTime<Utc> {

#[derive(serde::Serialize, Debug, Clone, PartialEq)]
pub struct QueryLog {
ip: String,
query_time: chrono::DateTime<Utc>,
question: String,
answers: Vec<String>,
pub ip: String,
pub query_time: chrono::DateTime<Utc>,
pub question: String,
pub answers: Vec<String>,
}

impl From<&RawLog> for QueryLog {
Expand Down Expand Up @@ -78,7 +73,7 @@ impl From<&RawLog> for QueryLog {
}
}

fn extract_query_logs(content: &str) -> HashMap<String, Vec<QueryLog>> {
pub fn extract_query_logs(content: &str) -> HashMap<String, Vec<QueryLog>> {
let mut logs_store: HashMap<String, Vec<QueryLog>> = HashMap::new();

let content_parts = content
Expand Down Expand Up @@ -106,73 +101,13 @@ fn extract_query_logs(content: &str) -> HashMap<String, Vec<QueryLog>> {
logs_store
}

#[derive(Debug, Clone, Default)]
pub struct LogsStore {
logs_store: Arc<Mutex<HashMap<String, Vec<QueryLog>>>>,
}

impl LogsStore {
pub fn remove_expired_logs(&self) {
let query_time_cutoff = Utc::now() - Duration::minutes(10);

let mut logs_store_guard = self.logs_store.lock().unwrap();
for query_logs in logs_store_guard.values_mut() {
query_logs.retain(|q| q.query_time > query_time_cutoff);
}
}

pub fn merge_logs(&self, logs_hash_map: HashMap<String, Vec<QueryLog>>) {
let mut logs_store_guard = self.logs_store.lock().unwrap();
for (ip, logs) in logs_hash_map.into_iter() {
match logs_store_guard.get_mut(&ip) {
Some(existing_logs) => {
existing_logs.extend(logs);
}
None => {
logs_store_guard.insert(ip, logs);
}
}
}
}

pub async fn ingest_logs_from_file(&self) {
tracing::info!("LogsStore remove_expired_logs");
self.remove_expired_logs();
tracing::info!("LogsStore remove_expired_logs. DONE");

tracing::info!("LogsStore read_dnstap_logs");
let content = read_dnstap_logs().await;
let content_len = content.len();
tracing::info!("LogsStore read_dnstap_logs. DONE, content_len={content_len}");

tracing::info!("LogsStore extract_query_logs");
let logs_hash_map = extract_query_logs(&content);
let logs_hash_map_len = logs_hash_map.len();
tracing::info!("LogsStore extract_query_logs. DONE, logs_hash_map_len={logs_hash_map_len}");

tracing::info!("LogsStore logs_hash_map");
self.merge_logs(logs_hash_map);
self.remove_expired_logs();
tracing::info!("LogsStore logs_hash_map. DONE");
}

pub fn get_logs_for_ip(&self, ip: &str) -> Vec<QueryLog> {
match self.logs_store.lock().unwrap().get(ip).cloned() {
Some(logs) => logs,
None => Vec::new(),
}
}
}

#[cfg(test)]
mod tests {
use std::collections::HashMap;

use chrono::TimeZone;

use crate::logs_store::{parse_query_time, QueryLog};

use super::extract_query_logs;
use super::{extract_query_logs, parse_query_time, QueryLog};

#[test]
fn test_parse_query_time() {
Expand Down
45 changes: 45 additions & 0 deletions src/logs/query_logs.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
use std::{
collections::HashMap,
sync::{Arc, Mutex},
};

use chrono::{Duration, Utc};

use super::QueryLog;

#[derive(Debug, Clone, Default)]
pub struct QueryLogs {
logs_store: Arc<Mutex<HashMap<String, Vec<QueryLog>>>>,
}

impl QueryLogs {
pub fn remove_expired_logs(&self) {
let query_time_cutoff = Utc::now() - Duration::minutes(10);

let mut logs_store_guard = self.logs_store.lock().unwrap();
for query_logs in logs_store_guard.values_mut() {
query_logs.retain(|q| q.query_time > query_time_cutoff);
}
}

pub fn merge_logs(&self, logs_hash_map: &HashMap<String, Vec<QueryLog>>) {
let mut logs_store_guard = self.logs_store.lock().unwrap();
for (ip, logs) in logs_hash_map.iter() {
match logs_store_guard.get_mut(ip) {
Some(existing_logs) => {
existing_logs.extend(logs.clone());
}
None => {
logs_store_guard.insert(ip.to_string(), logs.clone());
}
}
}
}

pub fn get_logs_for_ip(&self, ip: &str) -> Vec<QueryLog> {
match self.logs_store.lock().unwrap().get(ip).cloned() {
Some(logs) => logs,
None => Vec::new(),
}
}
}
45 changes: 45 additions & 0 deletions src/logs/usage_stats.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
use std::{
collections::HashMap,
sync::{Arc, Mutex},
};

use chrono::{DateTime, Duration, Utc};

use super::QueryLog;

#[derive(Debug, Clone, Default)]
pub struct UsageStats {
last_query_times: Arc<Mutex<HashMap<String, DateTime<Utc>>>>,
}

impl UsageStats {
pub fn merge_logs(&self, logs_hash_map: &HashMap<String, Vec<QueryLog>>) {
let mut last_query_times = self.last_query_times.lock().unwrap().clone();

for (ip, queries) in logs_hash_map.iter() {
let Some(last_qt) = queries.last().map(|q| q.query_time) else {
continue;
};

match last_query_times.get_mut(ip) {
Some(qt) => *qt = last_qt,
None => {
last_query_times.insert(ip.to_string(), last_qt);
}
}
}

*self.last_query_times.lock().unwrap() = last_query_times;
}

pub fn get_active_ips_in_last_day(&self) -> usize {
let one_day = Utc::now() - Duration::days(1);

self.last_query_times
.lock()
.unwrap()
.iter()
.filter(|(_, qt)| **qt > one_day)
.count()
}
}
Loading

0 comments on commit e2c33e7

Please sign in to comment.