Skip to content

Commit

Permalink
fix(config): Fix preloading log_schema (vectordotdev#17759)
Browse files Browse the repository at this point in the history
Preload global parameter log_schema before compiling sources config.
It fixes an issue with globally configured log_schema and sources in
separate files.
Revert 3e971fb

---------

Signed-off-by: Artur Malchanau <artur.molchanov@bolt.eu>
Co-authored-by: Bruce Guenter <bruce@untroubled.org>
  • Loading branch information
Hexta and bruceg authored Jun 29, 2023
1 parent ed59f37 commit 659e1e6
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 18 deletions.
6 changes: 4 additions & 2 deletions src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -466,12 +466,14 @@ pub async fn load_configs(
paths = ?config_paths.iter().map(<&PathBuf>::from).collect::<Vec<_>>()
);

// config::init_log_schema should be called before initializing sources.
#[cfg(not(feature = "enterprise-tests"))]
config::init_log_schema(&config_paths, true).map_err(handle_config_errors)?;

let mut config =
config::load_from_paths_with_provider_and_secrets(&config_paths, signal_handler)
.await
.map_err(handle_config_errors)?;
#[cfg(not(feature = "enterprise-tests"))]
config::init_log_schema(config.global.log_schema.clone(), true);

config::init_telemetry(config.global.telemetry.clone(), true);

Expand Down
12 changes: 11 additions & 1 deletion src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,19 @@ pub use unit_test::{build_unit_tests, build_unit_tests_main, UnitTestResult};
pub use validation::warnings;
pub use vars::{interpolate, ENVIRONMENT_VARIABLE_INTERPOLATION_REGEX};
pub use vector_core::config::{
init_log_schema, init_telemetry, log_schema, proxy::ProxyConfig, telemetry, LogSchema, OutputId,
init_telemetry, log_schema, proxy::ProxyConfig, telemetry, LogSchema, OutputId,
};

/// Loads Log Schema from configurations and sets global schema.
/// Once this is done, configurations can be correctly loaded using
/// configured log schema defaults.
/// If deny is set, will panic if schema has already been set.
pub fn init_log_schema(config_paths: &[ConfigPath], deny_if_set: bool) -> Result<(), Vec<String>> {
let (builder, _) = load_builder_from_paths(config_paths)?;
vector_core::config::init_log_schema(builder.global.log_schema, deny_if_set);
Ok(())
}

#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq)]
pub enum ConfigPath {
File(PathBuf, FormatHint),
Expand Down
15 changes: 1 addition & 14 deletions src/config/unit_test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,24 +72,11 @@ impl UnitTest {
}
}

/// Loads Log Schema from configurations and sets global schema.
/// Once this is done, configurations can be correctly loaded using
/// configured log schema defaults.
/// If deny is set, will panic if schema has already been set.
fn init_log_schema_from_paths(
config_paths: &[ConfigPath],
deny_if_set: bool,
) -> Result<(), Vec<String>> {
let (builder, _) = config::loading::load_builder_from_paths(config_paths)?;
vector_core::config::init_log_schema(builder.global.log_schema, deny_if_set);
Ok(())
}

pub async fn build_unit_tests_main(
paths: &[ConfigPath],
signal_handler: &mut signal::SignalHandler,
) -> Result<Vec<UnitTest>, Vec<String>> {
init_log_schema_from_paths(paths, false)?;
config::init_log_schema(paths, false)?;
let (mut secrets_backends_loader, _) = loading::load_secret_backends_from_paths(paths)?;
let (config_builder, _) = if secrets_backends_loader.has_secrets_to_retrieve() {
let resolved_secrets = secrets_backends_loader
Expand Down
4 changes: 3 additions & 1 deletion src/validate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,10 +135,12 @@ pub fn validate_config(opts: &Opts, fmt: &mut Formatter) -> Option<Config> {
fmt.title(format!("Failed to load {:?}", &paths_list));
fmt.sub_error(errors);
};
config::init_log_schema(&paths, true)
.map_err(&mut report_error)
.ok()?;
let (builder, load_warnings) = config::load_builder_from_paths(&paths)
.map_err(&mut report_error)
.ok()?;
config::init_log_schema(builder.global.log_schema.clone(), true);

// Build
let (config, build_warnings) = builder
Expand Down
72 changes: 72 additions & 0 deletions tests/integration/shutdown.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::{
fs::create_dir,
fs::read_dir,
io::Write,
net::SocketAddr,
Expand Down Expand Up @@ -200,6 +201,77 @@ fn log_schema() {
assert_eq!(event["test_msg"], json!("42"));
}

#[test]
fn log_schema_multiple_config_files() {
// Vector command
let mut cmd = Command::cargo_bin("vector").unwrap();

let config_dir = create_directory();

let sinks_config_dir = config_dir.join("sinks");
create_dir(sinks_config_dir.clone()).unwrap();

let sources_config_dir = config_dir.join("sources");
create_dir(sources_config_dir.clone()).unwrap();

let input_dir = create_directory();
let input_file = input_dir.join("input_file");

overwrite_file(
config_dir.join("vector.toml"),
r#"
data_dir = "${VECTOR_DATA_DIR}"
log_schema.host_key = "test_host"
"#,
);

overwrite_file(
sources_config_dir.join("in_file.toml"),
r#"
type = "file"
include = ["${VECTOR_TEST_INPUT_FILE}"]
"#,
);

overwrite_file(
sinks_config_dir.join("out_console.toml"),
r#"
inputs = ["in_file"]
type = "console"
encoding.codec = "json"
"#,
);

overwrite_file(
input_file.clone(),
r#"42
"#,
);

cmd.arg("--quiet")
.env("VECTOR_CONFIG_DIR", config_dir)
.env("VECTOR_DATA_DIR", create_directory())
.env("VECTOR_TEST_INPUT_FILE", input_file.clone());

// Run vector
let vector = cmd.stdout(std::process::Stdio::piped()).spawn().unwrap();

// Give vector time to start.
sleep(STARTUP_TIME);

// Signal shutdown
kill(Pid::from_raw(vector.id() as i32), Signal::SIGTERM).unwrap();

// Wait for shutdown
let output = vector.wait_with_output().unwrap();
assert!(output.status.success(), "Vector didn't exit successfully.");

// Output
let event: Value = serde_json::from_slice(output.stdout.as_slice()).unwrap();
assert_eq!(event["message"], json!("42"));
assert_eq!(event["test_host"], json!("runner"));
}

#[test]
fn configuration_path_recomputed() {
// Directory with configuration files
Expand Down

0 comments on commit 659e1e6

Please sign in to comment.