diff --git a/Cargo.lock b/Cargo.lock index 7096729..fff5eb5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,6 +1,6 @@ [root] name = "beamium" -version = "1.4.2" +version = "1.5.0" dependencies = [ "cast 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "clap 2.20.1 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/Cargo.toml b/Cargo.toml index 84f7394..2ff2320 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "beamium" -version = "1.4.2" +version = "1.5.0" authors = [ "d33d33 " ] build = "build.rs" diff --git a/README.md b/README.md index cc1c022..755e0e9 100644 --- a/README.md +++ b/README.md @@ -1,8 +1,8 @@ -# Beamium - Prometheus to Warp10 metrics forwarder +# Beamium - /metrics scraper (Warp10 & Prometheus) with DFO buffering, and Warp10 forward. [![version](https://img.shields.io/badge/status-alpha-orange.svg)](https://github.com/runabove/beamium) [![Build Status](https://travis-ci.org/runabove/beamium.svg?branch=master)](https://travis-ci.org/runabove/beamium) -Beamium collect metrics from Prometheus endpoints and forward them to Warp10 data platform. +Beamium collect metrics from /metrics HTTP endpoints (with support for Prometheus & Warp10/Sensision format) and forward them to Warp10 data platform. While acquiring metrics, Beamium uses DFO (Disk Fail Over) to prevent metrics loss due to eventual network issues or unavailable service. Beamium is written in Rust to ensure efficiency, a very low footprint and deterministic performances. @@ -12,6 +12,20 @@ Beamium key points: - **Versatile**: Beamium can also fetch metrics from a directory. - **Powerful**: Beamium is able to filter metrics and to send them to multiple Warp10 platforms. +## How it works? + +Scraper (optionals) will collect metrics from defined endpoints. They will store them into the source_dir. +Beamium will read files inside source_dir, and will fan out them according to the provided selector into sink_dir. +Finaly Beamium will push files from the sink_dir to the defined sinks. + +The pipeline can be describe this way : + + HTTP /metrics endpoint --scrape--> source_dir --route--> sink_dir --forward--> warp10 + +It also means that given your need, you could produce metrics directly to source/sink directory, example : + + $ TS=`date +%s` && echo $TS"000000// metrics{} true" >> /opt/beamium/data/sources/prefix-$TS.metrics + ## Status Beamium is currently under development. @@ -27,11 +41,11 @@ Beamium come with a [sample config file](config.sample.yaml). Simply copy the sa ### Definitions Config is composed of four parts: -#### Sources -Beamium can have none to many Prometheus endpoints. A *source* is defined as follow: +#### Scrapers +Beamium can have none to many Prometheus or Warp10/Sensision endpoints. A *scraper* is defined as follow: ``` yaml -sources: # Sources definitions (Optional) - source1: # Source name (Required) +scrapers: # Scrapers definitions (Optional) + scraper1: # Source name (Required) url: http://127.0.0.1:9100/metrics # Prometheus endpoint (Required) period: 10000 # Polling interval(ms) (Required) format: prometheus # Polling format (Optional, default: prometheus, value: [prometheus, sensision]) diff --git a/build.rs b/build.rs index fb43863..f4e52b6 100644 --- a/build.rs +++ b/build.rs @@ -5,10 +5,10 @@ use std::io::prelude::*; fn main() { let output = Command::new("git") - .arg("rev-parse") - .arg("HEAD") - .output() - .expect("failed to execute process"); + .arg("rev-parse") + .arg("HEAD") + .output() + .expect("failed to execute process"); let hash = String::from_utf8_lossy(&output.stdout); let content = format!("static COMMIT: &'static str = {:?};\n", hash.trim()); @@ -18,10 +18,11 @@ fn main() { if path.exists() { let mut f = File::open(path).expect("fail to open result.rs"); let mut current = String::new(); - f.read_to_string(&mut current).expect("fail to read result.rs"); + f.read_to_string(&mut current) + .expect("fail to read result.rs"); if current == content { - return + return; } }; diff --git a/src/config.rs b/src/config.rs index 5aec115..4ba2f9e 100644 --- a/src/config.rs +++ b/src/config.rs @@ -2,6 +2,7 @@ //! //! The Config module provides the beamium configuration. //! It set defaults and then load config from '/etc', local dir and provided path. + use std::fs::File; use std::io::Read; use std::io; @@ -20,7 +21,7 @@ use slog; #[derive(Clone)] /// Config root. pub struct Config { - pub sources: Vec, + pub scrapers: Vec, pub sinks: Vec, pub labels: HashMap, pub parameters: Parameters, @@ -28,19 +29,19 @@ pub struct Config { #[derive(Debug)] #[derive(Clone)] -/// Source config. -pub struct Source { +/// Scraper config. +pub struct Scraper { pub name: String, pub url: String, pub period: u64, - pub format: SourceFormat, + pub format: ScraperFormat, pub metrics: Option, } #[derive(Debug)] #[derive(Clone)] -/// Source format. -pub enum SourceFormat { +/// Scraper format. +pub enum ScraperFormat { Prometheus, Sensision, } @@ -150,7 +151,7 @@ impl error::Error for ConfigError { pub fn load_config(config_path: &str) -> Result { // Defaults let mut config = Config { - sources: Vec::new(), + scrapers: Vec::new(), labels: HashMap::new(), sinks: Vec::new(), parameters: Parameters { @@ -189,63 +190,77 @@ fn load_path>(file_path: P, config: &mut Config) -> Result<(), Co let mut contents = String::new(); try!(file.read_to_string(&mut contents)); let docs = try!(YamlLoader::load_from_str(&contents)); - + let config_scraper_keys = ["sources", "scrapers"]; for doc in &docs { - if !doc["sources"].is_badvalue() { - let sources = try!(doc["sources"] - .as_hash() - .ok_or("sources should be a map")); - - for (k, v) in sources { - let name = try!(k.as_str() - .ok_or("sources keys should be a string")); - let url = try!(v["url"] - .as_str() - .ok_or(format!("sources.{}.url is required and should be a string", name))); - let period = try!(v["period"] - .as_i64() - .ok_or(format!("sources.{}.period is required and should be a number", name))); - let period = try!(cast::u64(period) - .map_err(|_| format!("sources.{}.period is invalid", name))); - let format = if v["format"].is_badvalue() { - SourceFormat::Prometheus - } else { - let f = try!(v["format"] - .as_str() - .ok_or(format!("sources.{}.format should be a string", name))); - - if f == "prometheus" { - SourceFormat::Prometheus - } else if f == "sensision" { - SourceFormat::Sensision + for config_scraper_key in config_scraper_keys.iter() { + let key = *config_scraper_key; + if !doc[key].is_badvalue() { + if "sources" == key { + warn!("'sources' is deprecated and will be removed in further revision. \ + Please use 'scrapers' instead.",) + } + + let scrapers = try!(doc[key].as_hash().ok_or(format!("{} should be a map", key))); + + for (k, v) in scrapers { + let name = try!(k.as_str().ok_or(format!("{} keys should be a string", key))); + let url = try!(v["url"] + .as_str() + .ok_or(format!("{}.{}.url is required and should be a string", + key, + name))); + let period = try!(v["period"] + .as_i64() + .ok_or(format!("{}.{}.period is required and should be a number", + key, + name))); + let period = try!(cast::u64(period).map_err(|_| { + format!("scrapers.{}.period is invalid", name) + })); + let format = if v["format"].is_badvalue() { + ScraperFormat::Prometheus } else { - return Err(format!("sources.{}.format should be 'prometheus' or \ - 'sensision'", - name) - .into()); - } - }; - let metrics = if v["metrics"].is_badvalue() { - None - } else { - let mut metrics = Vec::new(); - let values = try!(v["metrics"].as_vec().ok_or("metrics should be an array")); - for v in values { - let value = try!(regex::Regex::new(try!(v.as_str() - .ok_or(format!("metrics.{} is invalid", name))))); - metrics.push(String::from(r"^(\S*)\s") + value.as_str()); - } - - Some(try!(regex::RegexSet::new(&metrics))) - }; - - config.sources.push(Source { - name: String::from(name), - url: String::from(url), - period: period, - format: format, - metrics: metrics, - }) + let f = try!(v["format"] + .as_str() + .ok_or(format!("scrapers.{}.format should be a string", + name))); + + if f == "prometheus" { + ScraperFormat::Prometheus + } else if f == "sensision" { + ScraperFormat::Sensision + } else { + return Err(format!("scrapers.{}.format should be 'prometheus' or \ + 'sensision'", + name) + .into()); + } + }; + let metrics = if v["metrics"].is_badvalue() { + None + } else { + let mut metrics = Vec::new(); + let values = + try!(v["metrics"].as_vec().ok_or("metrics should be an array")); + for v in values { + let value = try!(regex::Regex::new(try!(v.as_str() + .ok_or(format!("metrics.{} is invalid", name))))); + metrics.push(String::from(r"^(\S*)\s") + value.as_str()); + } + + Some(try!(regex::RegexSet::new(&metrics))) + }; + + config + .scrapers + .push(Scraper { + name: String::from(name), + url: String::from(url), + period: period, + format: format, + metrics: metrics, + }) + } } } @@ -254,17 +269,19 @@ fn load_path>(file_path: P, config: &mut Config) -> Result<(), Co for (k, v) in sinks { let name = try!(k.as_str().ok_or("sinks keys should be a string")); let url = try!(v["url"] - .as_str() - .ok_or(format!("sinks.{}.url is required and should be a string", name))); + .as_str() + .ok_or(format!("sinks.{}.url is required and should be a string", + name))); let token = try!(v["token"] - .as_str() - .ok_or(format!("sinks.{}.token is required and should be a string", name))); + .as_str() + .ok_or(format!("sinks.{}.token is required and should be a string", + name))); let token_header = if v["token-header"].is_badvalue() { "X-Warp10-Token" } else { try!(v["token-header"] - .as_str() - .ok_or(format!("sinks.{}.token-header should be a string", name))) + .as_str() + .ok_or(format!("sinks.{}.token-header should be a string", name))) }; let selector = if v["selector"].is_badvalue() { @@ -272,42 +289,48 @@ fn load_path>(file_path: P, config: &mut Config) -> Result<(), Co } else { Some(try!(regex::Regex::new(format!("^{}", try!(v["selector"] - .as_str() - .ok_or(format!("sinks.{}.selector \ + .as_str() + .ok_or(format!("sinks.{}.selector \ is invalid", - name)))) - .as_str()))) + name)))) + .as_str()))) }; let ttl = if v["ttl"].is_badvalue() { 3600 } else { let ttl = try!(v["ttl"] - .as_i64() - .ok_or(format!("sinks.{}.ttl should be a number", name))); - try!(cast::u64(ttl) - .map_err(|_| format!("sinks.{}.ttl should be a positive number", name))) + .as_i64() + .ok_or(format!("sinks.{}.ttl should be a number", + name))); + try!(cast::u64(ttl).map_err(|_| { + format!("sinks.{}.ttl should be a positive number", name) + })) }; let size = if v["size"].is_badvalue() { 1073741824 } else { let size = try!(v["size"] - .as_i64() - .ok_or(format!("sinks.{}.size should be a number", name))); - try!(cast::u64(size) - .map_err(|_| format!("sinks.{}.size should be a positive number", name))) + .as_i64() + .ok_or(format!("sinks.{}.size should be a number", + name))); + try!(cast::u64(size).map_err(|_| { + format!("sinks.{}.size should be a positive number", name) + })) }; - config.sinks.push(Sink { - name: String::from(name), - url: String::from(url), - token: String::from(token), - token_header: String::from(token_header), - selector: selector, - ttl: ttl, - size: size, - }) + config + .sinks + .push(Sink { + name: String::from(name), + url: String::from(url), + token: String::from(token), + token_header: String::from(token_header), + selector: selector, + ttl: ttl, + size: size, + }) } } @@ -316,8 +339,11 @@ fn load_path>(file_path: P, config: &mut Config) -> Result<(), Co for (k, v) in labels { let name = try!(k.as_str().ok_or("labels keys should be a string")); let value = try!(v.as_str() - .ok_or(format!("labels.{} value should be a string", name))); - config.labels.insert(String::from(name), String::from(value)); + .ok_or(format!("labels.{} value should be a string", + name))); + config + .labels + .insert(String::from(name), String::from(value)); } } @@ -377,20 +403,20 @@ fn load_path>(file_path: P, config: &mut Config) -> Result<(), Co let log_level = try!(cast::u64(log_level) .map_err(|_| format!("parameters.log-level is invalid"))); let log_level = try!(slog::Level::from_usize(log_level as usize) - .ok_or(format!("parameters.log-level is invalid"))); + .ok_or(format!("parameters.log-level is invalid"))); config.parameters.log_level = log_level; } if !doc["parameters"]["timeout"].is_badvalue() { - let timeout = try!(doc["parameters"]["timeout"] - .as_i64() - .ok_or(format!("parameters.timeout should be a number"))); + let timeout = + try!(doc["parameters"]["timeout"] + .as_i64() + .ok_or(format!("parameters.timeout should be a number",))); let timeout = try!(cast::u64(timeout) - .map_err(|_| format!("parameters.timeout is invalid"))); + .map_err(|_| format!("parameters.timeout is invalid"))); config.parameters.timeout = timeout; } } } - Ok(()) } diff --git a/src/log.rs b/src/log.rs index 8663508..fa7e87e 100644 --- a/src/log.rs +++ b/src/log.rs @@ -28,7 +28,10 @@ pub fn log(parameters: &config::Parameters, verbose: u64) { let drain_term = slog_term::streamer().full().build().ignore_err(); // File drain - let log_file = OpenOptions::new().create(true).append(true).open(¶meters.log_file); + let log_file = OpenOptions::new() + .create(true) + .append(true) + .open(¶meters.log_file); if log_file.is_err() { crit!("Fail to open log file at {:?}: {}", ¶meters.log_file, @@ -46,7 +49,7 @@ pub fn log(parameters: &config::Parameters, verbose: u64) { // Setup root logger let root_log = Logger::root(Duplicate::new(LevelFilter::new(drain_term, console_level), LevelFilter::new(file_drain, parameters.log_level)) - .ignore_err(), + .ignore_err(), o!()); slog_scope::set_global_logger(root_log); diff --git a/src/main.rs b/src/main.rs index 86eb275..347f8d5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -26,7 +26,7 @@ use nix::sys::signal; use std::time::Duration; mod config; -mod source; +mod scraper; mod router; mod sink; mod log; @@ -96,15 +96,15 @@ fn main() { // Synchronisation stuff let sigint = Arc::new(AtomicBool::new(false)); - let mut handles = Vec::with_capacity(config.sources.len() + 1 + config.sinks.len()); + let mut handles = Vec::with_capacity(config.scrapers.len() + 1 + config.sinks.len()); - // Spawn sources - info!("spawning sources"); - for source in config.sources { + // Spawn scrapers + info!("spawning scrapers"); + for scraper in config.scrapers { let (parameters, sigint) = (config.parameters.clone(), sigint.clone()); handles.push(thread::spawn(move || { - slog_scope::scope(slog_scope::logger().new(o!("source" => source.name.clone())), - || source::source(&source, ¶meters, sigint)); + slog_scope::scope(slog_scope::logger().new(o!("scraper" => scraper.name.clone())), + || scraper::scraper(&scraper, ¶meters, sigint)); })); } @@ -116,9 +116,10 @@ fn main() { config.parameters.clone(), sigint.clone()); handles.push(thread::spawn(move || { - slog_scope::scope(slog_scope::logger().new(o!()), - || router::router(&sinks, &labels, ¶meters, sigint)); - })); + slog_scope::scope(slog_scope::logger().new(o!()), || { + router::router(&sinks, &labels, ¶meters, sigint) + }); + })); } // Spawn sinks diff --git a/src/router.rs b/src/router.rs index 9a270dc..079436d 100644 --- a/src/router.rs +++ b/src/router.rs @@ -26,7 +26,8 @@ pub fn router(sinks: &Vec, parameters: &config::Parameters, sigint: Arc) { - let labels: String = labels.iter() + let labels: String = labels + .iter() .fold(String::new(), |acc, (k, v)| { let sep = if acc.is_empty() { "" } else { "," }; acc + sep + k + "=" + v @@ -126,10 +127,10 @@ fn route(sinks: &Vec, let slabels = labels.clone() + if plabels.trim().starts_with("}") { - "" - } else { - "," - } + &plabels; + "" + } else { + "," + } + &plabels; metrics.push(format!("{}{{{}", class, slabels)) } @@ -169,8 +170,8 @@ fn route(sinks: &Vec, if sink.selector.is_some() { let selector = sink.selector.as_ref().unwrap(); if !line.split_whitespace() - .nth(1) - .map_or(false, |class| selector.is_match(class)) { + .nth(1) + .map_or(false, |class| selector.is_match(class)) { continue; } } diff --git a/src/source.rs b/src/scraper.rs similarity index 78% rename from src/source.rs rename to src/scraper.rs index 72529bc..e812613 100644 --- a/src/source.rs +++ b/src/scraper.rs @@ -1,6 +1,6 @@ -//! # Source module. +//! # Scraper module. //! -//! The Source module fetch metrics to Prometheus. +//! The Scraper module fetch metrics to Prometheus. use std::thread; use std::time::Duration; use std::sync::Arc; @@ -19,21 +19,23 @@ use config; /// Thread sleeping time. const REST_TIME: u64 = 10; -/// Source loop. -pub fn source(source: &config::Source, parameters: &config::Parameters, sigint: Arc) { +/// Scraper loop. +pub fn scraper(scraper: &config::Scraper, + parameters: &config::Parameters, + sigint: Arc) { loop { let start = time::now_utc(); - match fetch(source, parameters) { + match fetch(scraper, parameters) { Err(err) => error!("fetch fail: {}", err), Ok(_) => info!("fetch success"), } let elapsed = (time::now_utc() - start).num_milliseconds() as u64; - let sleep_time = if elapsed > source.period { + let sleep_time = if elapsed > scraper.period { REST_TIME } else { - cmp::max(source.period - elapsed, REST_TIME) + cmp::max(scraper.period - elapsed, REST_TIME) }; for _ in 0..sleep_time / REST_TIME { thread::sleep(Duration::from_millis(REST_TIME)); @@ -45,15 +47,15 @@ pub fn source(source: &config::Source, parameters: &config::Parameters, sigint: } /// Fetch retrieve metrics from Prometheus. -fn fetch(source: &config::Source, parameters: &config::Parameters) -> Result<(), Box> { - debug!("fetch {}", &source.url); +fn fetch(scraper: &config::Scraper, parameters: &config::Parameters) -> Result<(), Box> { + debug!("fetch {}", &scraper.url); // Fetch metrics let mut client = hyper::Client::new(); client.set_write_timeout(Some(Duration::from_secs(parameters.timeout))); client.set_read_timeout(Some(Duration::from_secs(parameters.timeout))); - let mut res = try!(client.get(&source.url).send()); + let mut res = try!(client.get(&scraper.url).send()); if !res.status.is_success() { return Err(From::from("non 200 received")); } @@ -69,16 +71,16 @@ fn fetch(source: &config::Source, parameters: &config::Parameters) -> Result<(), let now = start.to_timespec().sec * 1000 * 1000 + (start.to_timespec().nsec as i64 / 1000); let dir = Path::new(¶meters.source_dir); - let temp_file = dir.join(format!("{}.tmp", source.name)); + let temp_file = dir.join(format!("{}.tmp", scraper.name)); debug!("write to tmp file {}", format!("{:?}", temp_file)); { // Open tmp file let mut file = try!(File::create(&temp_file)); for line in body.lines() { - let line = match source.format { - config::SourceFormat::Sensision => String::from(line.trim()), - config::SourceFormat::Prometheus => { + let line = match scraper.format { + config::ScraperFormat::Sensision => String::from(line.trim()), + config::ScraperFormat::Prometheus => { match format_prometheus(line.trim(), now) { Err(_) => { warn!("bad row {}", &line); @@ -93,8 +95,8 @@ fn fetch(source: &config::Source, parameters: &config::Parameters) -> Result<(), continue; } - if source.metrics.is_some() { - if !source.metrics.as_ref().unwrap().is_match(&line) { + if scraper.metrics.is_some() { + if !scraper.metrics.as_ref().unwrap().is_match(&line) { continue; } } @@ -106,8 +108,8 @@ fn fetch(source: &config::Source, parameters: &config::Parameters) -> Result<(), try!(file.flush()); } - // Rotate source file - let dest_file = dir.join(format!("{}-{}.metrics", source.name, now)); + // Rotate scraped file + let dest_file = dir.join(format!("{}-{}.metrics", scraper.name, now)); debug!("rotate tmp file to {}", format!("{:?}", dest_file)); try!(fs::rename(&temp_file, &dest_file)); @@ -131,12 +133,9 @@ fn format_prometheus(line: &str, now: i64) -> Result> { let mut tokens = v.split_whitespace(); let value = try!(tokens.next().ok_or("no value")); - let timestamp = tokens.next() - .map(|v| { - i64::from_str_radix(v, 10) - .map(|v| v * 1000) - .unwrap_or(now) - }) + let timestamp = tokens + .next() + .map(|v| i64::from_str_radix(v, 10).map(|v| v * 1000).unwrap_or(now)) .unwrap_or(now); // Format class