diff --git a/CHANGELOG.md b/CHANGELOG.md index 201a3544..96ef6ecb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ Bob versions changelog ## [Unreleased] #### Added +- Bobd test mode (#550) - Added optional get & exist optimization that skips old partitions by its timestamp (#702) - Added mimalloc allocator for musl target (#688) - Added jemalloc-profile for memory profiling (#797) @@ -20,6 +21,7 @@ Bob versions changelog #### Fixed - Fix memory leak due to prometheus lib (#788) - Fix for grinder delete metrics not being initialized (#824) +- Fix chrono deprecated function warning (#832) - Fix lsof zombie spawn (#830) #### Updated diff --git a/bob-apps/Cargo.toml b/bob-apps/Cargo.toml index d76a17f9..cc361a62 100644 --- a/bob-apps/Cargo.toml +++ b/bob-apps/Cargo.toml @@ -47,6 +47,7 @@ lazy_static = "1.4" log = "0.4" log4rs = "1.2" metrics = { version = "0.17", features = ["std"] } +network-interface = "0.1.2" mockall = "0.11" prost = "0.11" regex = "1.6.0" diff --git a/bob-apps/bin/bobd.rs b/bob-apps/bin/bobd.rs index ebe2b631..5167c47b 100755 --- a/bob-apps/bin/bobd.rs +++ b/bob-apps/bin/bobd.rs @@ -10,11 +10,11 @@ use bob::{ VirtualMapper, BackendType, FactoryTlsConfig, }; use bob_access::{Authenticator, BasicAuthenticator, DeclaredCredentials, StubAuthenticator, UsersMap, AuthenticationType}; -use clap::{crate_version, App, Arg, ArgMatches}; +use clap::{crate_version, App, Arg, ArgMatches, SubCommand}; use std::{ - collections::HashMap, + collections::{HashMap, HashSet}, error::Error as ErrorTrait, - net::{IpAddr, Ipv4Addr, SocketAddr}, + net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4}, str::FromStr, }; use tokio::runtime::Handle; use tonic::transport::Server; @@ -25,35 +25,65 @@ use std::fs::create_dir; #[macro_use] extern crate log; +use log4rs::append::console::ConsoleAppender; +use log4rs::encode::pattern::PatternEncoder; +use log4rs::config::{Appender, Config, Root}; + +use network_interface::{NetworkInterface, NetworkInterfaceConfig, Addr}; + +use anyhow::{anyhow, Context, Result as AnyResult}; + #[tokio::main] async fn main() { let matches = get_matches(); - if matches.value_of("cluster").is_none() { - eprintln!("Expect cluster config"); - eprintln!("use --help"); - return; - } + let cluster; + let node; + if let (sc, Some(sub_matches)) = matches.subcommand() { + match sc { + "testmode" => match configure_testmode(sub_matches) { + Ok((c, n)) => { + cluster = c; + node = n; + } + Err(e) => { + eprintln!("Initialization error: {}", e); + eprintln!("use --help"); + return; + } + }, + _ => unreachable!("unknown command"), + } + } else { + if matches.value_of("cluster").is_none() { + eprintln!("Expect cluster config"); + eprintln!("use --help"); + return; + } - if matches.value_of("node").is_none() { - eprintln!("Expect node config"); - eprintln!("use --help"); - return; - } + if matches.value_of("node").is_none() { + eprintln!("Expect node config"); + eprintln!("use --help"); + return; + } + + let cluster_config = matches.value_of("cluster").unwrap(); + println!("Cluster config: {:?}", cluster_config); + cluster = ClusterConfig::try_get(cluster_config).await.map_err(|err| { + eprintln!("Cluster config parsing error: {}", err); + err + }).expect("Cluster config parsing error"); - let cluster_config = matches.value_of("cluster").expect("'cluster' argument is required"); - println!("Cluster config: {:?}", cluster_config); - let cluster = ClusterConfig::try_get(cluster_config).await.map_err(|err| { - eprintln!("Cluster config parsing error: {}", err); - err - }).expect("Cluster config parsing error"); - let node_config_file = matches.value_of("node").expect("'node' argument is required"); - println!("Node config: {:?}", node_config_file); - let node = cluster.get(node_config_file).await.map_err(|err| { - eprintln!("Node config parsing error: {}", err); - err - }).expect("Node config parsing error"); + let node_config_file = matches.value_of("node").unwrap(); + println!("Node config: {:?}", node_config_file); + node = cluster.get(node_config_file).await.map_err(|err| { + eprintln!("Node config parsing error: {}", err); + err + }).expect("Node config parsing error"); + + check_folders(&node, matches.is_present("init_folders")); + } let mut extra_logstash_fields = HashMap::new(); extra_logstash_fields.insert("node_name".to_string(), serde_json::Value::String(node.name().to_string())); @@ -63,8 +93,6 @@ async fn main() { log4rs::init_file(node.log_config(), log4rs::config::Deserializers::default().with_logstash_extra(extra_logstash_fields)) .expect("can't find log config"); - check_folders(&node, matches.is_present("init_folders")); - let mut mapper = VirtualMapper::new(&node, &cluster); let bind = node.bind(); @@ -139,6 +167,73 @@ async fn main() { } } +fn configure_testmode(sub_matches: &ArgMatches) -> AnyResult<(ClusterConfig, NodeConfig)> { + let mut addresses = Vec::with_capacity(1); + let port = match sub_matches.value_of("grpc-port") { + Some(v) => v.parse().context("could not parse --grpc-port")?, + None => 20000 + }; + let mut this_node = None; + if let Some(node_list) = sub_matches.value_of("nodes") { + let available_ips: HashSet<_> = NetworkInterface::show()?.into_iter().filter_map(|itf| + match itf.addr? { + Addr::V4(addr) => { + Some(addr.ip) + }, + _ => None + }).collect(); + + for (index, addr) in node_list.split(",").enumerate() { + let addr = addr.trim(); + let v4addr = SocketAddrV4::from_str(addr)?; + if this_node.is_none() { + if port == v4addr.port() && available_ips.contains(v4addr.ip()) { + this_node = Some(index) + } + } + addresses.push(String::from(addr)); + } + } else { + this_node = Some(0); + addresses.push(format!("127.0.0.1:{port}")) + } + let this_node_index = this_node.ok_or(anyhow!("current node address not found"))?; + let cluster = ClusterConfig::get_testmode( + sub_matches.value_of("data").unwrap_or(format!("data_{this_node_index}").as_str()).to_string(), + addresses)?; + let http_api_port = match sub_matches.value_of("restapi-port") { + Some(v) => Some(v.parse().context("could not parse --restapi-port")?), + None => None + }; + let node = cluster.get_testmode_node_config(this_node_index, http_api_port)?; + + init_testmode_logger(log::LevelFilter::Error); + + check_folders(&node, true); + + println!("Bob is starting"); + let n = &cluster.nodes()[this_node_index]; + println!("Data directory: {}", n.disks()[0].path()); + println!("gRPC API available at: {}", n.address()); + let rest_api_address = node.http_api_address(); + let rest_api_port = node.http_api_port(); + println!("REST API available at: http://{rest_api_address}:{rest_api_port}"); + println!("REST API Put and Get available at: http://{rest_api_address}:{rest_api_port}/data"); + + Ok((cluster, node)) +} + +fn init_testmode_logger(loglevel: log::LevelFilter) { + let stdout = ConsoleAppender::builder() + .encoder(Box::new(PatternEncoder::new( "{d(%Y-%m-%d %H:%M:%S):<20} {M:>20.30}:{L:>3} {h({l})} {m}\n"))) + .build(); + let config = Config::builder() + .appender(Appender::builder().build("stdout", Box::new(stdout))) + .build(Root::builder().appender("stdout").build(loglevel)) + .unwrap(); + log4rs::init_config(config).unwrap(); +} + async fn run_server(node: NodeConfig, authenticator: A, mapper: VirtualMapper, address: IpAddr, port: u16, addr: SocketAddr) { let (metrics, shared_metrics) = init_counters(&node, &addr.to_string()).await; let handle = Handle::current(); @@ -288,39 +383,66 @@ fn check_folders(node: &NodeConfig, init_flag: bool) { fn get_matches<'a>() -> ArgMatches<'a> { let ver = format!("{}\n{}", crate_version!(), BuildInfo::default()); + let testmode_sc = SubCommand::with_name("testmode") + .about("Bob's test mode") + .arg( + Arg::with_name("data") + .help("Path to bob data directory") + .takes_value(true) + .long("data") + ) + .arg( + Arg::with_name("grpc-port") + .help("gRPC API port") + .takes_value(true) + .long("grpc-port") + ) + .arg( + Arg::with_name("restapi-port") + .help("REST API port") + .takes_value(true) + .long("restapi-port") + ) + .arg( + Arg::with_name("nodes") + .help("Comma separated node addresses. Example: 127.0.0.1:20000,127.0.0.1:20001") + .takes_value(true) + .long("nodes") + ); + App::new("bobd") .version(ver.as_str()) .arg( Arg::with_name("cluster") - .help("cluster config file") + .help("Cluster config file") .takes_value(true) .short("c") .long("cluster"), ) .arg( Arg::with_name("node") - .help("node config file") + .help("Node config file") .takes_value(true) .short("n") .long("node"), ) .arg( Arg::with_name("name") - .help("node name") + .help("Node name") .takes_value(true) .short("a") .long("name"), ) .arg( Arg::with_name("http_api_address") - .help("http api address") + .help("Http api address") .short("h") .long("host") .takes_value(true), ) .arg( Arg::with_name("http_api_port") - .help("http api port") + .help("Http api port") .short("p") .long("port") .takes_value(true), @@ -331,5 +453,6 @@ fn get_matches<'a>() -> ArgMatches<'a> { .long("init_folders") .takes_value(false), ) + .subcommand(testmode_sc) .get_matches() } diff --git a/bob-backend/src/pearl/utils.rs b/bob-backend/src/pearl/utils.rs index 72b1569c..98b8f1d5 100755 --- a/bob-backend/src/pearl/utils.rs +++ b/bob-backend/src/pearl/utils.rs @@ -93,7 +93,7 @@ impl Utils { Error::failed(format!("smth wrong with time: {:?}, error: {}", period, e)) }) .map(|period| { - let time = DateTime::from_utc( + let time = DateTime::from_naive_utc_and_offset( NaiveDateTime::from_timestamp_opt(time.try_into().unwrap(), 0).expect("time out of range"), Utc, ); diff --git a/bob-common/src/configs/cluster.rs b/bob-common/src/configs/cluster.rs index 9f34f6dd..206cb375 100755 --- a/bob-common/src/configs/cluster.rs +++ b/bob-common/src/configs/cluster.rs @@ -8,7 +8,7 @@ use crate::{ node::NodeName, core_types::{DiskPath, VDiskId, NodeDisk, DiskName}, }; -use anyhow::Result as AnyResult; +use anyhow::{Result as AnyResult, anyhow}; use http::Uri; use std::collections::{ HashMap, HashSet }; @@ -394,6 +394,50 @@ impl Cluster { Ok(config) } } + + pub fn get_testmode(path: String, addresses: Vec) -> AnyResult { + let disks = vec![DiskPath::new("disk_0".into(), &path)]; + let mut nodes = Vec::with_capacity(addresses.len()); + let mut vdisks = Vec::with_capacity(addresses.len()); + for (i, address) in addresses.into_iter().enumerate() { + let node = Node { + name: format!("node_{i}"), + address, + disks: disks.clone() + }; + let replica = Replica::new(node.name().to_string(), node.disks()[0].name().to_string()); + let mut vdisk = VDisk::new(i as u32); + vdisk.push_replica(replica); + nodes.push(node); + vdisks.push(vdisk); + } + let dist_func = DistributionFunc::default(); + let config = Cluster { + nodes, + vdisks, + racks: vec![], + distribution_func: dist_func + }; + + if let Err(e) = config.validate() { + let msg = format!("config is not valid: {e}"); + Err(anyhow!(msg)) + } else { + Ok(config) + } + } + + pub fn get_testmode_node_config(&self, n_node: usize, rest_port: Option) -> AnyResult { + let node = &self.nodes().get(n_node).ok_or(anyhow!("node with index {} not found", n_node))?; + let config = NodeConfig::get_testmode(node.name(), node.disks()[0].name(), rest_port); + if let Err(e) = config.validate() { + Err(anyhow!("config is not valid: {e}")) + } else { + self.check(&config) + .map_err(|e| anyhow!("node config check failed: {e}"))?; + Ok(config) + } + } } impl Validatable for Cluster { diff --git a/bob-common/src/configs/node.rs b/bob-common/src/configs/node.rs index d3c345c2..b6033b79 100755 --- a/bob-common/src/configs/node.rs +++ b/bob-common/src/configs/node.rs @@ -5,7 +5,7 @@ use super::{ validation::Validatable }; use bob_access::AuthenticationType; -use crate::core_types::DiskPath; +use crate::core_types::{DiskPath, DiskName}; use futures::Future; use humantime::Duration as HumanDuration; use std::{ @@ -21,7 +21,7 @@ use std::{net::Ipv4Addr, sync::Arc, fs}; use tokio::time::sleep; use tonic::transport::{ServerTlsConfig, Identity}; -use ubyte::ByteUnit; +use ubyte::{ByteUnit, ToByteUnit}; const AIO_FLAG_ORDERING: Ordering = Ordering::Relaxed; @@ -214,6 +214,25 @@ impl MetricsConfig { } } +impl Default for MetricsConfig { + fn default() -> Self { + let name = Some(String::from("bob")); + let prometheus_addr = String::default(); + let prometheus_enabled = false; + let graphite_enabled = false; + let graphite = None; + let prefix = None; + Self { + name, + prometheus_addr, + prometheus_enabled, + graphite_enabled, + graphite, + prefix + } + } +} + impl Validatable for MetricsConfig { fn validate(&self) -> Result<(), String> { self.check_unset()?; @@ -336,7 +355,7 @@ impl Pearl { } fn default_max_blob_size() -> ByteUnit { - ByteUnit::MB + ByteUnit::GB } pub fn max_blob_size(&self) -> u64 { @@ -441,6 +460,30 @@ impl Pearl { } unreachable!() } + + pub fn get_testmode(alien_disk: &DiskName) -> Self { + Self { + max_blob_size: ByteUnit::GB, + max_data_in_blob: 100_000, + blob_file_name_prefix: Pearl::default_blob_file_name_prefix(), + fail_retry_timeout: Pearl::default_fail_retry_timeout(), + fail_retry_count: Pearl::default_fail_retry_count(), + alien_disk: Some(alien_disk.to_string()), + allow_duplicates: Pearl::default_allow_duplicates(), + settings: BackendSettings { + root_dir_name: String::from("bob"), + alien_root_dir_name: String::from("alien"), + timestamp_period: String::from("1d"), + create_pearl_wait_delay: String::from("100ms") + }, + hash_chars_count: Pearl::default_hash_chars_count(), + enable_aio: Pearl::default_enable_aio(), + disks_events_logfile: Pearl::default_disks_events_logfile(), + bloom_filter_max_buf_bits_count: Some(1_000_000), + validate_data_checksum_during_index_regen: Pearl::default_validate_data_checksum_during_index_regen(), + skip_holders_by_timestamp_step_when_reading: None + } + } } impl Validatable for Pearl { @@ -799,6 +842,39 @@ impl NodeConfig { pub fn default_holder_group_size() -> usize { 8 } + + pub fn get_testmode(node_name: &str, disk_name: &DiskName, rest_port: Option) -> Self { + Self { + log_config: String::from("dummy"), + users_config: String::from("dummy"), + name: String::from(node_name), + quorum: 1, + operation_timeout: String::from("60sec"), + check_interval: String::from("5000ms"), + count_interval: NodeConfig::default_count_interval(), + cluster_policy: String::from("quorum"), + backend_type: String::from("pearl"), + pearl: Some(Pearl::get_testmode(disk_name)), + metrics: Some(MetricsConfig::default()), + bind_ref: Arc::default(), + disks_ref: Arc::default(), + cleanup_interval: String::from("1h"), + open_blobs_soft_limit: None, + open_blobs_hard_limit: None, + bloom_filter_memory_limit: Some(8.gibibytes()), + index_memory_limit: Some(8.gibibytes()), + index_memory_limit_soft: None, + init_par_degree: NodeConfig::default_init_par_degree(), + disk_access_par_degree: NodeConfig::default_disk_access_par_degree(), + http_api_port: rest_port.unwrap_or_else(|| Node::default_http_api_port()), + http_api_address: Node::default_http_api_address(), + bind_to_ip_address: None, + holder_group_size: NodeConfig::default_holder_group_size(), + authentication_type: NodeConfig::default_authentication_type(), + tls: None, + hostname_resolve_period_ms: NodeConfig::default_hostname_resolve_period_ms() + } + } } impl Validatable for NodeConfig {