Skip to content

Commit

Permalink
Merge branch 'master' into 830-lsof-zombie-fix
Browse files Browse the repository at this point in the history
  • Loading branch information
ikopylov authored Sep 12, 2023
2 parents 170ba36 + 6096279 commit bb3aa85
Show file tree
Hide file tree
Showing 6 changed files with 283 additions and 37 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ Bob versions changelog

## [Unreleased]
#### Added
- Bobd test mode (#550)
- Added optional get & exist optimization that skips old partitions by its timestamp (#702)
- Added mimalloc allocator for musl target (#688)
- Added jemalloc-profile for memory profiling (#797)
Expand All @@ -20,6 +21,7 @@ Bob versions changelog
#### Fixed
- Fix memory leak due to prometheus lib (#788)
- Fix for grinder delete metrics not being initialized (#824)
- Fix chrono deprecated function warning (#832)
- Fix lsof zombie spawn (#830)

#### Updated
Expand Down
1 change: 1 addition & 0 deletions bob-apps/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ lazy_static = "1.4"
log = "0.4"
log4rs = "1.2"
metrics = { version = "0.17", features = ["std"] }
network-interface = "0.1.2"
mockall = "0.11"
prost = "0.11"
regex = "1.6.0"
Expand Down
187 changes: 155 additions & 32 deletions bob-apps/bin/bobd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ use bob::{
VirtualMapper, BackendType, FactoryTlsConfig,
};
use bob_access::{Authenticator, BasicAuthenticator, DeclaredCredentials, StubAuthenticator, UsersMap, AuthenticationType};
use clap::{crate_version, App, Arg, ArgMatches};
use clap::{crate_version, App, Arg, ArgMatches, SubCommand};
use std::{
collections::HashMap,
collections::{HashMap, HashSet},
error::Error as ErrorTrait,
net::{IpAddr, Ipv4Addr, SocketAddr},
net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4}, str::FromStr,
};
use tokio::runtime::Handle;
use tonic::transport::Server;
Expand All @@ -25,35 +25,65 @@ use std::fs::create_dir;
#[macro_use]
extern crate log;

use log4rs::append::console::ConsoleAppender;
use log4rs::encode::pattern::PatternEncoder;
use log4rs::config::{Appender, Config, Root};

use network_interface::{NetworkInterface, NetworkInterfaceConfig, Addr};

use anyhow::{anyhow, Context, Result as AnyResult};

#[tokio::main]
async fn main() {
let matches = get_matches();

if matches.value_of("cluster").is_none() {
eprintln!("Expect cluster config");
eprintln!("use --help");
return;
}
let cluster;
let node;
if let (sc, Some(sub_matches)) = matches.subcommand() {
match sc {
"testmode" => match configure_testmode(sub_matches) {
Ok((c, n)) => {
cluster = c;
node = n;
}
Err(e) => {
eprintln!("Initialization error: {}", e);
eprintln!("use --help");
return;
}
},
_ => unreachable!("unknown command"),
}
} else {
if matches.value_of("cluster").is_none() {
eprintln!("Expect cluster config");
eprintln!("use --help");
return;
}

if matches.value_of("node").is_none() {
eprintln!("Expect node config");
eprintln!("use --help");
return;
}
if matches.value_of("node").is_none() {
eprintln!("Expect node config");
eprintln!("use --help");
return;
}

let cluster_config = matches.value_of("cluster").unwrap();
println!("Cluster config: {:?}", cluster_config);
cluster = ClusterConfig::try_get(cluster_config).await.map_err(|err| {
eprintln!("Cluster config parsing error: {}", err);
err
}).expect("Cluster config parsing error");

let cluster_config = matches.value_of("cluster").expect("'cluster' argument is required");
println!("Cluster config: {:?}", cluster_config);
let cluster = ClusterConfig::try_get(cluster_config).await.map_err(|err| {
eprintln!("Cluster config parsing error: {}", err);
err
}).expect("Cluster config parsing error");

let node_config_file = matches.value_of("node").expect("'node' argument is required");
println!("Node config: {:?}", node_config_file);
let node = cluster.get(node_config_file).await.map_err(|err| {
eprintln!("Node config parsing error: {}", err);
err
}).expect("Node config parsing error");
let node_config_file = matches.value_of("node").unwrap();
println!("Node config: {:?}", node_config_file);
node = cluster.get(node_config_file).await.map_err(|err| {
eprintln!("Node config parsing error: {}", err);
err
}).expect("Node config parsing error");

check_folders(&node, matches.is_present("init_folders"));
}

let mut extra_logstash_fields = HashMap::new();
extra_logstash_fields.insert("node_name".to_string(), serde_json::Value::String(node.name().to_string()));
Expand All @@ -63,8 +93,6 @@ async fn main() {
log4rs::init_file(node.log_config(), log4rs::config::Deserializers::default().with_logstash_extra(extra_logstash_fields))
.expect("can't find log config");

check_folders(&node, matches.is_present("init_folders"));

let mut mapper = VirtualMapper::new(&node, &cluster);

let bind = node.bind();
Expand Down Expand Up @@ -139,6 +167,73 @@ async fn main() {
}
}

fn configure_testmode(sub_matches: &ArgMatches) -> AnyResult<(ClusterConfig, NodeConfig)> {
let mut addresses = Vec::with_capacity(1);
let port = match sub_matches.value_of("grpc-port") {
Some(v) => v.parse().context("could not parse --grpc-port")?,
None => 20000
};
let mut this_node = None;
if let Some(node_list) = sub_matches.value_of("nodes") {
let available_ips: HashSet<_> = NetworkInterface::show()?.into_iter().filter_map(|itf|
match itf.addr? {
Addr::V4(addr) => {
Some(addr.ip)
},
_ => None
}).collect();

for (index, addr) in node_list.split(",").enumerate() {
let addr = addr.trim();
let v4addr = SocketAddrV4::from_str(addr)?;
if this_node.is_none() {
if port == v4addr.port() && available_ips.contains(v4addr.ip()) {
this_node = Some(index)
}
}
addresses.push(String::from(addr));
}
} else {
this_node = Some(0);
addresses.push(format!("127.0.0.1:{port}"))
}
let this_node_index = this_node.ok_or(anyhow!("current node address not found"))?;
let cluster = ClusterConfig::get_testmode(
sub_matches.value_of("data").unwrap_or(format!("data_{this_node_index}").as_str()).to_string(),
addresses)?;
let http_api_port = match sub_matches.value_of("restapi-port") {
Some(v) => Some(v.parse().context("could not parse --restapi-port")?),
None => None
};
let node = cluster.get_testmode_node_config(this_node_index, http_api_port)?;

init_testmode_logger(log::LevelFilter::Error);

check_folders(&node, true);

println!("Bob is starting");
let n = &cluster.nodes()[this_node_index];
println!("Data directory: {}", n.disks()[0].path());
println!("gRPC API available at: {}", n.address());
let rest_api_address = node.http_api_address();
let rest_api_port = node.http_api_port();
println!("REST API available at: http://{rest_api_address}:{rest_api_port}");
println!("REST API Put and Get available at: http://{rest_api_address}:{rest_api_port}/data");

Ok((cluster, node))
}

fn init_testmode_logger(loglevel: log::LevelFilter) {
let stdout = ConsoleAppender::builder()
.encoder(Box::new(PatternEncoder::new( "{d(%Y-%m-%d %H:%M:%S):<20} {M:>20.30}:{L:>3} {h({l})} {m}\n")))
.build();
let config = Config::builder()
.appender(Appender::builder().build("stdout", Box::new(stdout)))
.build(Root::builder().appender("stdout").build(loglevel))
.unwrap();
log4rs::init_config(config).unwrap();
}

async fn run_server<A: Authenticator>(node: NodeConfig, authenticator: A, mapper: VirtualMapper, address: IpAddr, port: u16, addr: SocketAddr) {
let (metrics, shared_metrics) = init_counters(&node, &addr.to_string()).await;
let handle = Handle::current();
Expand Down Expand Up @@ -288,39 +383,66 @@ fn check_folders(node: &NodeConfig, init_flag: bool) {

fn get_matches<'a>() -> ArgMatches<'a> {
let ver = format!("{}\n{}", crate_version!(), BuildInfo::default());
let testmode_sc = SubCommand::with_name("testmode")
.about("Bob's test mode")
.arg(
Arg::with_name("data")
.help("Path to bob data directory")
.takes_value(true)
.long("data")
)
.arg(
Arg::with_name("grpc-port")
.help("gRPC API port")
.takes_value(true)
.long("grpc-port")
)
.arg(
Arg::with_name("restapi-port")
.help("REST API port")
.takes_value(true)
.long("restapi-port")
)
.arg(
Arg::with_name("nodes")
.help("Comma separated node addresses. Example: 127.0.0.1:20000,127.0.0.1:20001")
.takes_value(true)
.long("nodes")
);

App::new("bobd")
.version(ver.as_str())
.arg(
Arg::with_name("cluster")
.help("cluster config file")
.help("Cluster config file")
.takes_value(true)
.short("c")
.long("cluster"),
)
.arg(
Arg::with_name("node")
.help("node config file")
.help("Node config file")
.takes_value(true)
.short("n")
.long("node"),
)
.arg(
Arg::with_name("name")
.help("node name")
.help("Node name")
.takes_value(true)
.short("a")
.long("name"),
)
.arg(
Arg::with_name("http_api_address")
.help("http api address")
.help("Http api address")
.short("h")
.long("host")
.takes_value(true),
)
.arg(
Arg::with_name("http_api_port")
.help("http api port")
.help("Http api port")
.short("p")
.long("port")
.takes_value(true),
Expand All @@ -331,5 +453,6 @@ fn get_matches<'a>() -> ArgMatches<'a> {
.long("init_folders")
.takes_value(false),
)
.subcommand(testmode_sc)
.get_matches()
}
2 changes: 1 addition & 1 deletion bob-backend/src/pearl/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ impl Utils {
Error::failed(format!("smth wrong with time: {:?}, error: {}", period, e))
})
.map(|period| {
let time = DateTime::from_utc(
let time = DateTime::from_naive_utc_and_offset(
NaiveDateTime::from_timestamp_opt(time.try_into().unwrap(), 0).expect("time out of range"),
Utc,
);
Expand Down
46 changes: 45 additions & 1 deletion bob-common/src/configs/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::{
node::NodeName,
core_types::{DiskPath, VDiskId, NodeDisk, DiskName},
};
use anyhow::Result as AnyResult;
use anyhow::{Result as AnyResult, anyhow};
use http::Uri;
use std::collections::{ HashMap, HashSet };

Expand Down Expand Up @@ -394,6 +394,50 @@ impl Cluster {
Ok(config)
}
}

pub fn get_testmode(path: String, addresses: Vec<String>) -> AnyResult<Self> {
let disks = vec![DiskPath::new("disk_0".into(), &path)];
let mut nodes = Vec::with_capacity(addresses.len());
let mut vdisks = Vec::with_capacity(addresses.len());
for (i, address) in addresses.into_iter().enumerate() {
let node = Node {
name: format!("node_{i}"),
address,
disks: disks.clone()
};
let replica = Replica::new(node.name().to_string(), node.disks()[0].name().to_string());
let mut vdisk = VDisk::new(i as u32);
vdisk.push_replica(replica);
nodes.push(node);
vdisks.push(vdisk);
}
let dist_func = DistributionFunc::default();
let config = Cluster {
nodes,
vdisks,
racks: vec![],
distribution_func: dist_func
};

if let Err(e) = config.validate() {
let msg = format!("config is not valid: {e}");
Err(anyhow!(msg))
} else {
Ok(config)
}
}

pub fn get_testmode_node_config(&self, n_node: usize, rest_port: Option<u16>) -> AnyResult<NodeConfig> {
let node = &self.nodes().get(n_node).ok_or(anyhow!("node with index {} not found", n_node))?;
let config = NodeConfig::get_testmode(node.name(), node.disks()[0].name(), rest_port);
if let Err(e) = config.validate() {
Err(anyhow!("config is not valid: {e}"))
} else {
self.check(&config)
.map_err(|e| anyhow!("node config check failed: {e}"))?;
Ok(config)
}
}
}

impl Validatable for Cluster {
Expand Down
Loading

0 comments on commit bb3aa85

Please sign in to comment.