Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[arrow-ballista] enable delete log periodically (#280) #285

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 72 additions & 0 deletions ballista/rust/core/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use crate::execution_plans::{
};
use crate::serde::scheduler::PartitionStats;
use async_trait::async_trait;
use chrono::{DateTime, Duration as Cduration, Utc};
use datafusion::arrow::datatypes::Schema;
use datafusion::arrow::{ipc::writer::FileWriter, record_batch::RecordBatch};
use datafusion::datasource::object_store::{ObjectStoreProvider, ObjectStoreRegistry};
Expand Down Expand Up @@ -49,13 +50,15 @@ use datafusion_proto::logical_plan::{
AsLogicalPlan, DefaultLogicalExtensionCodec, LogicalExtensionCodec,
};
use futures::StreamExt;
use log::{info, warn};
use object_store::ObjectStore;
use std::io::{BufWriter, Write};
use std::marker::PhantomData;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;
use std::{fs::File, pin::Pin};
use tokio::fs;
use tonic::codegen::StdError;
use tonic::transport::{Channel, Error, Server};
use url::Url;
Expand Down Expand Up @@ -401,3 +404,72 @@ pub fn collect_plan_metrics(plan: &dyn ExecutionPlan) -> Vec<MetricsSet> {
});
metrics_array
}

/// This function will scheduled periodically for cleanup log.
pub async fn clean_log_loop(work_dir: &str, ttl_seconds: u32) -> Result<()> {
let mut dir = fs::read_dir(work_dir).await?;
let mut to_deleted = Vec::new();
let cutoff = Utc::now() - Cduration::seconds(ttl_seconds as i64);
let mut need_delete;
while let Some(child) = dir.next_entry().await? {
if let Ok(metadata) = child.metadata().await {
// only delete the log file
if metadata.is_file() {
let modified_time: DateTime<Utc> =
metadata.modified().map(chrono::DateTime::from)?;
if modified_time < cutoff {
need_delete = child.path().into_os_string();
to_deleted.push(need_delete)
}
}
} else {
warn!("Found a dir {:?} in clean log skip it.", child)
}
}
info!(
"The log files {:?} that have not been modified for {:?} seconds will be deleted",
&to_deleted, ttl_seconds
);
for del in to_deleted {
fs::remove_file(del).await?;
}
Ok(())
}

#[cfg(test)]
mod tests {
use crate::utils::clean_log_loop;
use std::fs;
use std::fs::File;
use std::io::Write;
use std::time::Duration;
use tempfile::TempDir;

#[tokio::test]
async fn test_clean_up_log() {
let work_dir = TempDir::new().unwrap().into_path();
let job_dir = work_dir.as_path().join("log");
let file_path = job_dir.as_path().join("1.log");
let data = "Jorge,2018-12-13T12:12:10.011Z\n\
Andrew,2018-11-13T17:11:10.011Z";
fs::create_dir(job_dir.clone()).unwrap();
File::create(&file_path)
.expect("creating temp file")
.write_all(data.as_bytes())
.expect("writing data");

let count1 = fs::read_dir(job_dir.clone()).unwrap().count();
assert_eq!(count1, 1);
let mut handles = vec![];
let job_dir_clone = job_dir.clone();
handles.push(tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(2)).await;
clean_log_loop(job_dir_clone.to_str().unwrap(), 1)
.await
.unwrap();
}));
futures::future::join_all(handles).await;
let count2 = fs::read_dir(job_dir.clone()).unwrap().count();
assert_eq!(count2, 0);
}
}
6 changes: 6 additions & 0 deletions ballista/rust/executor/executor_config_spec.toml
Original file line number Diff line number Diff line change
Expand Up @@ -119,3 +119,9 @@ name = "log_level_setting"
type = "String"
doc = "special log level for sub mod. link: https://docs.rs/env_logger/latest/env_logger/#enabling-logging. For example we want whole level is INFO but datafusion mode is DEBUG"
default = "std::string::String::from(\"INFO,datafusion=INFO\")"

[[param]]
name = "cleanup_log_ttl"
type = "u32"
doc = "The number of hours to retain log files on each node, zero means disable."
default = "0"
30 changes: 29 additions & 1 deletion ballista/rust/executor/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,19 +93,47 @@ async fn main() -> Result<()> {
let grpc_port = opt.bind_grpc_port;
let log_dir = opt.log_dir;
let print_thread_info = opt.print_thread_info;
let cleanup_log_ttl = opt.cleanup_log_ttl;

let scheduler_name = format!("executor_{}_{}", bind_host, port);

// File layer
if let Some(log_dir) = log_dir {
let log_file = tracing_appender::rolling::daily(log_dir, &scheduler_name);
let log_file = if cleanup_log_ttl > 24 {
tracing_appender::rolling::daily(log_dir.clone(), &scheduler_name)
} else {
tracing_appender::rolling::hourly(log_dir.clone(), &scheduler_name)
};
tracing_subscriber::fmt()
.with_ansi(true)
.with_thread_names(print_thread_info)
.with_thread_ids(print_thread_info)
.with_writer(log_file)
.with_env_filter(special_mod_log_level)
.init();

if cleanup_log_ttl > 0 {
// cleanup_log_ttl unit is hour
let ttl_seconds = opt.cleanup_log_ttl * 60 * 60;
let mut interval_time =
time::interval(Core_Duration::from_secs((ttl_seconds / 2) as u64));

info!(
"Enable cleanup log loop which cleanup_log_ttl is {} hours in log_dir {}",
cleanup_log_ttl, &log_dir
);

tokio::spawn(async move {
loop {
interval_time.tick().await;
if let Err(e) =
ballista_core::utils::clean_log_loop(&log_dir, ttl_seconds).await
{
error!("Ballista executor fail to clean_log {:?}", e)
}
}
});
}
} else {
//Console layer
let rust_log = env::var(EnvFilter::DEFAULT_ENV);
Expand Down
6 changes: 6 additions & 0 deletions ballista/rust/scheduler/scheduler_config_spec.toml
Original file line number Diff line number Diff line change
Expand Up @@ -100,3 +100,9 @@ name = "log_level_setting"
type = "String"
doc = "special log level for sub mod. link: https://docs.rs/env_logger/latest/env_logger/#enabling-logging. For example we want whole level is INFO but datafusion mode is DEBUG"
default = "std::string::String::from(\"INFO,datafusion=INFO\")"

[[param]]
name = "cleanup_log_ttl"
type = "u32"
doc = "The number of hours to retain log files on each node, zero means disable."
default = "0"
29 changes: 27 additions & 2 deletions ballista/rust/scheduler/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use ballista_scheduler::scheduler_server::externalscaler::external_scaler_server
use futures::future::{self, Either, TryFutureExt};
use hyper::{server::conn::AddrStream, service::make_service_fn, Server};
use std::convert::Infallible;
use std::time::Duration;
use std::{env, io, net::SocketAddr, sync::Arc};
use tonic::transport::server::Connected;
use tower::Service;
Expand All @@ -46,7 +47,8 @@ use ballista_scheduler::state::backend::{StateBackend, StateBackendClient};
use ballista_core::config::TaskSchedulingPolicy;
use ballista_core::serde::BallistaCodec;
use ballista_core::utils::default_session_builder;
use log::info;
use log::{error, info};
use tokio::time;

#[macro_use]
extern crate configure_me;
Expand Down Expand Up @@ -168,20 +170,43 @@ async fn main() -> Result<()> {
let port = opt.bind_port;
let log_dir = opt.log_dir;
let print_thread_info = opt.print_thread_info;
let cleanup_log_ttl = opt.cleanup_log_ttl;
let log_file_name_prefix =
format!("scheduler_{}_{}_{}", namespace, external_host, port);
let scheduler_name = format!("{}:{}", external_host, port);

// File layer
if let Some(log_dir) = log_dir {
let log_file = tracing_appender::rolling::daily(log_dir, &log_file_name_prefix);
let log_file = tracing_appender::rolling::daily(&log_dir, &log_file_name_prefix);
tracing_subscriber::fmt()
.with_ansi(true)
.with_thread_names(print_thread_info)
.with_thread_ids(print_thread_info)
.with_writer(log_file)
.with_env_filter(special_mod_log_level)
.init();

if cleanup_log_ttl > 0 {
// cleanup_log_ttl unit is hour
let ttl_seconds = opt.cleanup_log_ttl * 60 * 60;
let mut interval_time =
time::interval(Duration::from_secs((ttl_seconds / 2) as u64));
info!(
"Enable cleanup log loop which cleanup_log_ttl is {} hours in log_dir {}",
cleanup_log_ttl, &log_dir
);

tokio::spawn(async move {
loop {
interval_time.tick().await;
if let Err(e) =
ballista_core::utils::clean_log_loop(&log_dir, ttl_seconds).await
{
error!("Ballista executor fail to clean_log {:?}", e)
}
}
});
}
} else {
//Console layer
let rust_log = env::var(EnvFilter::DEFAULT_ENV);
Expand Down