Skip to content

Commit

Permalink
Merge pull request #234 from karinushka/split-output
Browse files Browse the repository at this point in the history
Allow chunking of stdout output files.
  • Loading branch information
FedericoPonzi authored Jun 2, 2024
2 parents 31c8297 + 714700c commit ebba0da
Show file tree
Hide file tree
Showing 7 changed files with 151 additions and 15 deletions.
10 changes: 10 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ maplit = "~1.0"
shellexpand = "~3.1"
anyhow = "~1.0"
thiserror = "~1.0"
bytefmt = "0.1.7"

[features]
default = ["http-healthcheck"]
Expand Down
6 changes: 6 additions & 0 deletions DOCUMENTATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@ If service `a` should start after service `b`, then `a` will be started as soon
If `b` goes in a `FinishedFailed` state (finished in an unsuccessful manner), `a` might not start at all.
* **`start-delay` = `time`**: Start this service with the specified delay. Check how to specify times [here](https://github.com/tailhook/humantime/blob/49f11fdc2a59746085d2457cb46bce204dec746a/src/duration.rs#L338)
* **`stdout` = `STDOUT|STDERR|file-path`**: Redirect stdout of this service. STDOUT and STDERR are special strings, pointing to stdout and stderr respectively. Otherwise, a file path is assumed.
* **`stdout-rotate-size` = `string`**: Chunk size of the file specified in `stdout`.
Once the file grows above the specified size it will be closed and a new file will be created with a suffix `.1`.
Once the new file also grows above the specified size it will also be closed and a next one will be created with the next suffix `.2`.
This allows adding external log rotation script, which can compress the old logs and maybe move them out to a different storage location.
The size is parsed using `bytefmt` - for example `100 MB`, `200 KB`, `110 MIB` or `200 GIB`.
If unset, the default value will be `100 MB`.
* **`stderr` = `STDOUT|STDERR|file-path`**: Redirect stderr of this service. Read `stdout` above for a complete reference.
* **`user` = `uid|username`**: Will run this service as this user. Either an uid or a username (check it in /etc/passwd)
* **`working-directory` = `string`**: Will run this command in this directory. Defaults to the working directory of the horust process.
Expand Down
14 changes: 8 additions & 6 deletions example_services/sample_service.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@
command = "/bin/bash -c 'echo hello world'"
start-delay = "2s"
start-after = ["database", "backend.toml"]
stdout = "STDOUT"
stderr = "/var/logs/hello_world_svc/stderr.log"
stdout = "/var/logs/hello_world_svc/stdout.log"
stdout-rotate-size = "100 MB"
stderr = "STDERR"
# Check also `templating.toml`
user = "${USER}"
working-directory = "/tmp/"
Expand All @@ -25,22 +26,23 @@ max-failed = 3

[failure]
# by convention, zero conveys successful execution. Use this parameter to add more successfull exit codes.
successful-exit-code = [ 0, 1, 255]
successful-exit-code = [0, 1, 255]
# Don't shut all the services down if this service fails.
strategy = "ignore"

[environment]
# Regardless of this value, the programm will get `USER`, `HOSTNAME`, `HOME` and `PATH`.
keep-env = false
# Use for fine-grained re-exports.
re-export = [ "PATH", "DB_PASS"]
re-export = ["PATH", "DB_PASS"]
# You can provide additional env variables using a map.
additional = { key = "value"}
additional = { key = "value" }

[termination]
# Signal to use for termination.
signal = "TERM"
# Timeout before shutting the service down.
wait = "10s"
# If any of the services in the list has failed, shut down this service.
die-if-failed = [ "db.toml"]
die-if-failed = ["db.toml"]

20 changes: 17 additions & 3 deletions src/horust/formats/service.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use std::collections::HashMap;
use std::env;
use std::ffi::OsStr;
use std::fmt::{Debug, Formatter};
use std::path::{Path, PathBuf};
use std::str::FromStr;
use std::time::Duration;
use std::{env, os::fd::RawFd};

use anyhow::{Context, Error, Result};
use nix::sys::signal::Signal;
Expand Down Expand Up @@ -33,6 +33,8 @@ pub struct Service {
pub working_directory: PathBuf,
#[serde(default = "Service::default_stdout_log")]
pub stdout: LogOutput,
#[serde(default, skip_serializing, deserialize_with = "str_to_bytes")]
pub stdout_rotate_size: u64,
#[serde(default = "Service::default_stderr_log")]
pub stderr: LogOutput,
#[serde(default, with = "humantime_serde")]
Expand Down Expand Up @@ -104,6 +106,7 @@ impl Default for Service {
start_after: Default::default(),
working_directory: env::current_dir().unwrap(),
stdout: Default::default(),
stdout_rotate_size: 0,
stderr: Default::default(),
user: Default::default(),
restart: Default::default(),
Expand Down Expand Up @@ -133,6 +136,7 @@ pub enum LogOutput {
#[default]
Stdout,
Path(PathBuf),
Pipe(RawFd),
}

impl Serialize for LogOutput {
Expand Down Expand Up @@ -186,6 +190,7 @@ impl From<LogOutput> for String {
let path = path.display();
path.to_string()
}
Pipe(fd) => format!("{fd}"),
}
}
}
Expand Down Expand Up @@ -643,6 +648,14 @@ pub fn validate(services: Vec<Service>) -> Result<Vec<Service>, ValidationErrors
}
}

fn str_to_bytes<'de, D>(deserializer: D) -> Result<u64, D::Error>
where
D: Deserializer<'de>,
{
let s = String::deserialize(deserializer)?;
bytefmt::parse(s).map_err(de::Error::custom)
}

#[cfg(test)]
mod test {
use std::str::FromStr;
Expand Down Expand Up @@ -683,8 +696,9 @@ mod test {
.collect(),
},
working_directory: "/tmp/".into(),
stdout: "STDOUT".into(),
stderr: "/var/logs/hello_world_svc/stderr.log".into(),
stdout: "/var/logs/hello_world_svc/stdout.log".into(),
stdout_rotate_size: 100_000_000,
stderr: "STDERR".into(),
start_delay: Duration::from_secs(2),
start_after: vec!["database".into(), "backend.toml".into()],
restart: Restart {
Expand Down
75 changes: 69 additions & 6 deletions src/horust/supervisor/process_spawner.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
use std::ffi::{CStr, CString};
use std::io;
use std::ops::Add;
use std::os::unix::io::AsRawFd;
use std::path::PathBuf;
use std::path::{Path, PathBuf};
use std::time::Duration;
use std::{fs::File, io::BufReader};
use std::{fs::OpenOptions, ops::Add};
use std::{
io::{self, Read},
os::fd::OwnedFd,
};

use anyhow::{anyhow, Context, Result};
use crossbeam::channel::{after, tick};
Expand Down Expand Up @@ -61,7 +65,7 @@ pub(crate) fn spawn_fork_exec_handler(
fn exec_args(service: &Service) -> Result<(CString, Vec<CString>, Vec<CString>)> {
let chunks: Vec<String> =
shlex::split(&service.command).context(format!("Invalid command: {}", service.command,))?;
let program_name = String::from(chunks.get(0).unwrap());
let program_name = String::from(chunks.first().unwrap());
let to_cstring = |s: Vec<String>| {
s.into_iter()
.map(|arg| CString::new(arg).map_err(Into::into))
Expand All @@ -70,7 +74,7 @@ fn exec_args(service: &Service) -> Result<(CString, Vec<CString>, Vec<CString>)>
let arg_cstrings = to_cstring(chunks)?;
let environment = service.get_environment()?;
let env_cstrings = to_cstring(environment)?;
let path = if program_name.contains("/") {
let path = if program_name.contains('/') {
program_name.to_string()
} else {
find_program(&program_name)?
Expand Down Expand Up @@ -121,12 +125,33 @@ fn spawn_process(service: &Service) -> Result<Pid> {
let cwd = service.working_directory.clone();
let arg_cptr: Vec<&CStr> = arg_cstrings.iter().map(|c| c.as_c_str()).collect();
let env_cptr: Vec<&CStr> = env_cstrings.iter().map(|c| c.as_c_str()).collect();
let mut service_copy = service.clone();
let (pipe_read, pipe_write) = if service.stdout_rotate_size > 0 {
let (pipe_read, pipe_write) = unistd::pipe()?;
(Some(pipe_read), Some(pipe_write))
} else {
(None, None)
};
match unsafe { fork() } {
Ok(ForkResult::Child) => {
child_process_main(service, path, cwd, uid, arg_cptr, env_cptr);
if let Some(pipe_write) = &pipe_write {
drop(pipe_read.unwrap());
service_copy.stdout = LogOutput::Pipe(pipe_write.as_raw_fd());
}
child_process_main(&service_copy, path, cwd, uid, arg_cptr, env_cptr);
unreachable!();
// Here the "pipe_write" would go out of scope and its descriptor would be closed.
// But because child_process_main() does an exec() and never returns, the raw
// descriptor inside the LogOutput::Pipe stays open.
}
Ok(ForkResult::Parent { child, .. }) => {
pipe_read.and_then(|pipe| {
drop(pipe_write.unwrap());
std::thread::spawn(move || {
chunked_writer(pipe, service_copy).map_err(|e| error!("{e}"))
});
None::<()>
});
debug!("Spawned child with PID {}.", child);
Ok(child)
}
Expand All @@ -151,6 +176,12 @@ fn redirect_output(
// Redirect stdout to stderr
unistd::dup2(stderr, stdout)?;
}
(LogOutput::Pipe(pipe), LogOutput::Stderr) => {
unistd::dup2(*pipe, stderr)?;
}
(LogOutput::Pipe(pipe), LogOutput::Stdout) => {
unistd::dup2(*pipe, stdout)?;
}
(LogOutput::Path(path), LogOutput::Stdout) => {
let raw_fd = fcntl::open(
path,
Expand All @@ -173,6 +204,38 @@ fn redirect_output(
Ok(())
}

fn open_next_chunk(base_path: &Path) -> io::Result<File> {
let mut count = 1;
let mut path = base_path;
let mut path_str;

while path.is_file() {
path_str = format!("{}.{count}", base_path.to_string_lossy());
path = Path::new(&path_str);
count += 1;
}
debug!("Opening next log output: {}", path.display());
OpenOptions::new().create(true).append(true).open(path)
}

fn chunked_writer(fd: OwnedFd, service: Service) -> Result<()> {
let source = File::from(fd);
let path = match &service.stdout {
LogOutput::Path(path) => path,
_ => return Err(anyhow!("Log output path is not set")),
};
loop {
let mut reader = BufReader::new(&source).take(service.stdout_rotate_size);
let mut output = open_next_chunk(path)?;
let copied = io::copy(&mut reader, &mut output)?;
if copied < service.stdout_rotate_size {
debug!("EOF reached");
break;
}
}
Ok(())
}

/// Find program on PATH.
///
fn find_program(program_name: &String) -> Result<String> {
Expand Down
40 changes: 40 additions & 0 deletions tests/section_general.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ printf "{}" {}"#,
assert_eq!(content, pattern);
}
}

#[test]
fn test_output_redirection() {
let from = ["stdout", "stderr"];
Expand All @@ -56,6 +57,45 @@ fn test_output_redirection() {
.for_each(|(stream, to)| test_single_output_redirection(stream, to));
}

#[test]
fn test_output_log_rotation() {
let pattern = "Hello";
let max_size = 50;
let num_logs = 4;
let (mut cmd, temp_dir) = get_cli();
let output = temp_dir.path().join("out.log").display().to_string();
let last_output = temp_dir
.path()
.join(format!("out.log.{}", num_logs - 2))
.display()
.to_string();
let script = format!(
r#"#!/usr/bin/env bash
for i in {{1..{}}}; do echo {} ; done
sync
sleep 10
exit 0
"#,
// How many patterns do we need to repeat to reach required file size.
10 + (max_size * num_logs) / (pattern.len() + 1),
pattern,
);
let service = [
format!(r#"stdout="{}""#, output),
format!(r#"stdout-rotate-size="{}""#, max_size),
]
.join("\n");
store_service_script(
temp_dir.path(),
script.as_str(),
Some(service.as_str()),
None,
);
cmd.assert().success().stdout(is_empty());
let content = std::fs::read_to_string(last_output).unwrap();
assert!(content.starts_with(pattern));
}

#[test]
fn test_search_path_not_found() {
let (mut cmd, temp_dir) = get_cli();
Expand Down

0 comments on commit ebba0da

Please sign in to comment.