diff --git a/.github/actions/spelling/allow.txt b/.github/actions/spelling/allow.txt index 2564c2cd006ae..a26c8c560906f 100644 --- a/.github/actions/spelling/allow.txt +++ b/.github/actions/spelling/allow.txt @@ -53,6 +53,7 @@ Enot Evercoss Explay FAQs +FDO FQDNs Fabro Figma diff --git a/Cargo.lock b/Cargo.lock index 3b60529e4be98..b6527c8999cd0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1652,7 +1652,7 @@ dependencies = [ "tokio-util", "url", "webpki", - "webpki-roots", + "webpki-roots 0.22.5", "winapi", ] @@ -1825,9 +1825,9 @@ dependencies = [ [[package]] name = "bytesize" -version = "1.2.0" +version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "38fcc2979eff34a4b84e1cf9a1e3da42a7d44b3b690a40cdcb23e3d556cfb2e5" +checksum = "a3e368af43e418a04d52505cf3dbc23dda4e3407ae2fa99fd0e4f308ce546acc" [[package]] name = "cache-padded" @@ -2062,9 +2062,9 @@ dependencies = [ [[package]] name = "clap" -version = "4.3.22" +version = "4.3.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b417ae4361bca3f5de378294fc7472d3c4ed86a5ef9f49e93ae722f432aae8d2" +checksum = "fb690e81c7840c0d7aade59f242ea3b41b9bc27bcd5997890e7702ae4b32e487" dependencies = [ "clap_builder", "clap_derive", @@ -2077,15 +2077,15 @@ version = "2.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1eef05769009513df2eb1c3b4613e7fad873a14c600ff025b08f250f59fee7de" dependencies = [ - "clap 4.3.22", + "clap 4.3.24", "log", ] [[package]] name = "clap_builder" -version = "4.3.22" +version = "4.3.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9c90dc0f0e42c64bff177ca9d7be6fcc9ddb0f26a6e062174a61c84dd6c644d4" +checksum = "5ed2e96bc16d8d740f6f48d663eddf4b8a0983e79210fd55479b7bcd0a69860e" dependencies = [ "anstream", "anstyle 1.0.0", @@ -2100,7 +2100,7 @@ version = "4.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5fc443334c81a804575546c5a8a79b4913b50e28d69232903604cada1de817ce" dependencies = [ - "clap 4.3.22", + "clap 4.3.24", ] [[package]] @@ -2155,7 +2155,7 @@ dependencies = [ "indoc", "memchr", "once_cell", - "ordered-float 3.9.0", + "ordered-float 3.9.1", "prost", "prost-reflect", "regex", @@ -2405,7 +2405,7 @@ dependencies = [ "anes", "cast", "ciborium", - "clap 4.3.22", + "clap 4.3.24", "criterion-plot", "futures 0.3.28", "is-terminal", @@ -2769,9 +2769,9 @@ dependencies = [ [[package]] name = "dashmap" -version = "5.5.0" +version = "5.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6943ae99c34386c84a470c499d3414f66502a41340aa895406e0d2e4a207b91d" +checksum = "edd72493923899c6f10c641bdbdeddc7183d6396641d99c1a0d1597f37f92e28" dependencies = [ "cfg-if", "hashbrown 0.14.0", @@ -3079,9 +3079,9 @@ checksum = "34aa73646ffb006b8f5147f3dc182bd4bcb190227ce861fc4a4844bf8e3cb2c0" [[package]] name = "encoding_rs" -version = "0.8.32" +version = "0.8.33" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "071a31f4ee85403370b58aca746f01041ede6f0da2730960ad001edc2b71b394" +checksum = "7268b386296a025e474d5140678f75d6de9493ae55a5d709eeb9dd08149945e1" dependencies = [ "cfg-if", "serde", @@ -4469,7 +4469,7 @@ dependencies = [ "socket2 0.4.9", "widestring 0.5.1", "winapi", - "winreg", + "winreg 0.10.1", ] [[package]] @@ -5192,7 +5192,7 @@ dependencies = [ "indexmap 1.9.3", "metrics", "num_cpus", - "ordered-float 3.9.0", + "ordered-float 3.9.1", "quanta", "radix_trie", "sketches-ddsketch", @@ -5316,7 +5316,7 @@ dependencies = [ "trust-dns-resolver", "typed-builder 0.10.0", "uuid", - "webpki-roots", + "webpki-roots 0.22.5", ] [[package]] @@ -5494,9 +5494,9 @@ checksum = "38bf9645c8b145698bb0b18a4637dcacbc421ea49bef2317e4fd8065a387cf21" [[package]] name = "notify" -version = "6.1.0" +version = "6.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3dbf4f3b058c29e9642cf53217e0531cbfc78bc24e0a212a9837b7415b9d9007" +checksum = "6205bd8bb1e454ad2e27422015fb5e4f2bcc7e08fa8f27058670d208324a4d2d" dependencies = [ "bitflags 2.3.2", "filetime", @@ -5868,7 +5868,7 @@ dependencies = [ "bytes 1.4.0", "chrono", "hex", - "ordered-float 3.9.0", + "ordered-float 3.9.1", "prost", "prost-build", "tonic", @@ -5895,9 +5895,9 @@ dependencies = [ [[package]] name = "ordered-float" -version = "3.9.0" +version = "3.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "126d3e6f3926bfb0fb24495b4f4da50626f547e54956594748e3d8882a0320b4" +checksum = "2a54938017eacd63036332b4ae5c8a49fc8c0c1d6d629893057e4f13609edd06" dependencies = [ "num-traits", ] @@ -7064,9 +7064,9 @@ dependencies = [ [[package]] name = "reqwest" -version = "0.11.18" +version = "0.11.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cde824a14b7c14f85caff81225f411faacc04a2013f41670f41443742b1c1c55" +checksum = "3e9ad3fe7488d7e34558a2033d45a0c90b72d97b4f80705666fea71472e2e6a1" dependencies = [ "base64 0.21.2", "bytes 1.4.0", @@ -7102,8 +7102,8 @@ dependencies = [ "wasm-bindgen-futures", "wasm-streams", "web-sys", - "webpki-roots", - "winreg", + "webpki-roots 0.25.2", + "winreg 0.50.0", ] [[package]] @@ -7364,9 +7364,9 @@ dependencies = [ [[package]] name = "rustls-webpki" -version = "0.101.2" +version = "0.101.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "513722fd73ad80a71f72b61009ea1b584bcfa1483ca93949c8f290298837fa59" +checksum = "7d93931baf2d282fff8d3a532bbfd7653f734643161b87e3e01e59a04439bf0d" dependencies = [ "ring", "untrusted", @@ -9379,7 +9379,7 @@ dependencies = [ "atty", "cached", "chrono", - "clap 4.3.22", + "clap 4.3.24", "clap-verbosity-flag", "clap_complete", "confy", @@ -9456,7 +9456,7 @@ dependencies = [ "bytesize", "chrono", "cidr-utils", - "clap 4.3.22", + "clap 4.3.24", "codecs", "colored", "console-subscriber", @@ -9525,7 +9525,7 @@ dependencies = [ "openssl-probe", "openssl-src", "opentelemetry-proto", - "ordered-float 3.9.0", + "ordered-float 3.9.1", "paste", "percent-encoding", "pin-project", @@ -9616,7 +9616,7 @@ dependencies = [ "anyhow", "async-trait", "chrono", - "clap 4.3.22", + "clap 4.3.24", "futures 0.3.28", "graphql_client", "indoc", @@ -9639,7 +9639,7 @@ dependencies = [ "async-trait", "bytecheck", "bytes 1.4.0", - "clap 4.3.22", + "clap 4.3.24", "crc32fast", "criterion", "crossbeam-queue", @@ -9688,7 +9688,7 @@ dependencies = [ "indexmap 2.0.0", "metrics", "nom", - "ordered-float 3.9.0", + "ordered-float 3.9.1", "paste", "pin-project", "quickcheck", @@ -9730,6 +9730,7 @@ dependencies = [ "url", "vector-config-common", "vector-config-macros", + "vector-core", "vrl", ] @@ -9798,7 +9799,7 @@ dependencies = [ "noisy_float", "once_cell", "openssl", - "ordered-float 3.9.0", + "ordered-float 3.9.1", "parking_lot", "pin-project", "proptest", @@ -9859,7 +9860,7 @@ dependencies = [ name = "vector-vrl-cli" version = "0.1.0" dependencies = [ - "clap 4.3.22", + "clap 4.3.24", "vector-vrl-functions", "vrl", ] @@ -9878,7 +9879,7 @@ dependencies = [ "ansi_term", "chrono", "chrono-tz", - "clap 4.3.22", + "clap 4.3.24", "enrichment", "glob", "prettydiff", @@ -9938,7 +9939,7 @@ dependencies = [ "chrono", "chrono-tz", "cidr-utils", - "clap 4.3.22", + "clap 4.3.24", "codespan-reporting", "crypto_secretbox", "csv", @@ -9963,7 +9964,7 @@ dependencies = [ "ofb", "once_cell", "onig", - "ordered-float 3.9.0", + "ordered-float 3.9.1", "paste", "peeking_take_while", "percent-encoding", @@ -10176,9 +10177,9 @@ checksum = "ca6ad05a4870b2bf5fe995117d3728437bd27d7cd5f06f13c17443ef369775a1" [[package]] name = "wasm-streams" -version = "0.2.3" +version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6bbae3363c08332cadccd13b67db371814cd214c2524020932f0804b8cf7c078" +checksum = "b4609d447824375f43e1ffbc051b50ad8f4b3ae8219680c94452ea05eb240ac7" dependencies = [ "futures-util", "js-sys", @@ -10233,6 +10234,12 @@ dependencies = [ "webpki", ] +[[package]] +name = "webpki-roots" +version = "0.25.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "14247bb57be4f377dfb94c72830b8ce8fc6beac03cf4bf7b9732eadd414123fc" + [[package]] name = "wepoll-ffi" version = "0.1.2" @@ -10488,6 +10495,16 @@ dependencies = [ "winapi", ] +[[package]] +name = "winreg" +version = "0.50.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "524e57b2c537c0f9b1e69f1965311ec12182b4122e45035b1508cd24d2adadb1" +dependencies = [ + "cfg-if", + "windows-sys 0.48.0", +] + [[package]] name = "wiremock" version = "0.5.19" diff --git a/Cargo.toml b/Cargo.toml index 17a62c31a20d4..4b9968c0ce52b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -252,16 +252,16 @@ base64 = { version = "0.21.2", default-features = false, optional = true } bloomy = { version = "1.2.0", default-features = false, optional = true } bollard = { version = "0.14.0", default-features = false, features = ["ssl", "chrono"], optional = true } bytes = { version = "1.4.0", default-features = false, features = ["serde"] } -bytesize = { version = "1.2.0", default-features = false } +bytesize = { version = "1.3.0", default-features = false } chrono = { version = "0.4.26", default-features = false, features = ["serde"] } cidr-utils = { version = "0.5.10", default-features = false } -clap = { version = "4.3.22", default-features = false, features = ["derive", "error-context", "env", "help", "std", "string", "usage", "wrap_help"] } +clap = { version = "4.3.24", default-features = false, features = ["derive", "error-context", "env", "help", "std", "string", "usage", "wrap_help"] } colored = { version = "2.0.4", default-features = false } csv = { version = "1.2", default-features = false } derivative = { version = "2.2.0", default-features = false } dirs-next = { version = "2.0.0", default-features = false, optional = true } dyn-clone = { version = "1.0.13", default-features = false } -encoding_rs = { version = "0.8.32", default-features = false, features = ["serde"] } +encoding_rs = { version = "0.8.33", default-features = false, features = ["serde"] } enum_dispatch = { version = "0.3.12", default-features = false } exitcode = { version = "1.1.2", default-features = false } flate2 = { version = "1.0.27", default-features = false, features = ["default"] } @@ -295,11 +295,11 @@ mongodb = { version = "2.6.1", default-features = false, features = ["tokio-runt async-nats = { version = "0.31.0", default-features = false, optional = true } nkeys = { version = "0.3.1", default-features = false, optional = true } nom = { version = "7.1.3", default-features = false, optional = true } -notify = { version = "6.1.0", default-features = false, features = ["macos_fsevent"] } +notify = { version = "6.1.1", default-features = false, features = ["macos_fsevent"] } once_cell = { version = "1.18", default-features = false } openssl = { version = "0.10.56", default-features = false, features = ["vendored"] } openssl-probe = { version = "0.1.5", default-features = false } -ordered-float = { version = "3.9.0", default-features = false } +ordered-float = { version = "3.9.1", default-features = false } paste = "1.0.14" percent-encoding = { version = "2.3.0", default-features = false } postgres-openssl = { version = "0.5.0", default-features = false, features = ["runtime"], optional = true } @@ -394,6 +394,7 @@ ntapi = { git = "https://github.com/MSxDOS/ntapi.git", rev = "24fc1e47677fc9f6e3 openssl-sys = { git = "https://github.com/vectordotdev/rust-openssl", tag = "openssl-sys-v0.9.91_3.0.0" } openssl-src = { git = "https://github.com/vectordotdev/openssl-src-rs", tag = "release-300-force-engine_3.1.2" } + [features] # Default features for *-unknown-linux-gnu and *-apple-darwin default = ["api", "api-client", "enrichment-tables", "sinks", "sources", "sources-dnstap", "transforms", "unix", "rdkafka?/gssapi-vendored", "enterprise", "component-validation-runner"] @@ -579,6 +580,7 @@ transforms-logs = [ "transforms-aws_ec2_metadata", "transforms-dedupe", "transforms-filter", + "transforms-log_to_metric", "transforms-lua", "transforms-metric_to_log", "transforms-pipelines", @@ -591,6 +593,7 @@ transforms-logs = [ transforms-metrics = [ "transforms-aggregate", "transforms-filter", + "transforms-log_to_metric", "transforms-lua", "transforms-metric_to_log", "transforms-pipelines", @@ -603,6 +606,7 @@ transforms-aggregate = [] transforms-aws_ec2_metadata = ["dep:arc-swap"] transforms-dedupe = ["dep:lru"] transforms-filter = [] +transforms-log_to_metric = [] transforms-lua = ["dep:mlua", "vector-core/lua"] transforms-metric_to_log = [] transforms-pipelines = ["transforms-filter", "transforms-route"] @@ -847,6 +851,7 @@ shutdown-tests = ["api", "sinks-blackhole", "sinks-console", "sinks-prometheus", cli-tests = ["sinks-blackhole", "sinks-socket", "sources-demo_logs", "sources-file"] vector-api-tests = [ "sources-demo_logs", + "transforms-log_to_metric", "transforms-remap", "sinks-blackhole" ] diff --git a/LICENSE-3rdparty.csv b/LICENSE-3rdparty.csv index 548f62740b012..022c138d1b5eb 100644 --- a/LICENSE-3rdparty.csv +++ b/LICENSE-3rdparty.csv @@ -618,6 +618,7 @@ web-sys,https://github.com/rustwasm/wasm-bindgen/tree/master/crates/web-sys,MIT webbrowser,https://github.com/amodm/webbrowser-rs,MIT OR Apache-2.0,Amod Malviya @amodm webpki,https://github.com/briansmith/webpki,ISC,Brian Smith webpki-roots,https://github.com/rustls/webpki-roots,MPL-2.0,Joseph Birr-Pixton +webpki-roots,https://github.com/rustls/webpki-roots,MPL-2.0,The webpki-roots Authors wepoll-ffi,https://github.com/aclysma/wepoll-ffi,MIT OR Apache-2.0 OR BSD-2-Clause,Philip Degarmo whoami,https://github.com/ardaku/whoami,Apache-2.0 OR BSL-1.0 OR MIT,The whoami Authors widestring,https://github.com/starkat99/widestring-rs,MIT OR Apache-2.0,Kathryn Long diff --git a/deny.toml b/deny.toml index f9453e19a109c..2b3d8991358a8 100644 --- a/deny.toml +++ b/deny.toml @@ -38,4 +38,6 @@ license-files = [ [advisories] ignore = [ + # requires our dependencies to migrate to `rustls-webpki` + "RUSTSEC-2023-0052" ] diff --git a/distribution/kubernetes/vector-agent/README.md b/distribution/kubernetes/vector-agent/README.md index 5ee8fc7c1024e..da0e08ecb1397 100644 --- a/distribution/kubernetes/vector-agent/README.md +++ b/distribution/kubernetes/vector-agent/README.md @@ -1,6 +1,6 @@ The kubernetes manifests found in this directory have been automatically generated from the [helm chart `vector/vector`](https://github.com/vectordotdev/helm-charts/tree/master/charts/vector) -version 0.24.0 with the following `values.yaml`: +version 0.24.1 with the following `values.yaml`: ```yaml role: Agent diff --git a/distribution/kubernetes/vector-agent/configmap.yaml b/distribution/kubernetes/vector-agent/configmap.yaml index a9e84166bccc6..ce2fff8c7d8ab 100644 --- a/distribution/kubernetes/vector-agent/configmap.yaml +++ b/distribution/kubernetes/vector-agent/configmap.yaml @@ -8,7 +8,7 @@ metadata: app.kubernetes.io/name: vector app.kubernetes.io/instance: vector app.kubernetes.io/component: Agent - app.kubernetes.io/version: "0.32.0-distroless-libc" + app.kubernetes.io/version: "0.32.1-distroless-libc" data: agent.yaml: | data_dir: /vector-data-dir diff --git a/distribution/kubernetes/vector-agent/daemonset.yaml b/distribution/kubernetes/vector-agent/daemonset.yaml index a126c066e7179..d2bf2a8819a08 100644 --- a/distribution/kubernetes/vector-agent/daemonset.yaml +++ b/distribution/kubernetes/vector-agent/daemonset.yaml @@ -8,7 +8,7 @@ metadata: app.kubernetes.io/name: vector app.kubernetes.io/instance: vector app.kubernetes.io/component: Agent - app.kubernetes.io/version: "0.32.0-distroless-libc" + app.kubernetes.io/version: "0.32.1-distroless-libc" annotations: {} spec: selector: @@ -30,7 +30,7 @@ spec: dnsPolicy: ClusterFirst containers: - name: vector - image: "timberio/vector:0.32.0-distroless-libc" + image: "timberio/vector:0.32.1-distroless-libc" imagePullPolicy: IfNotPresent args: - --config-dir diff --git a/distribution/kubernetes/vector-agent/rbac.yaml b/distribution/kubernetes/vector-agent/rbac.yaml index 76a46ad57d691..8f1c94e56578e 100644 --- a/distribution/kubernetes/vector-agent/rbac.yaml +++ b/distribution/kubernetes/vector-agent/rbac.yaml @@ -10,7 +10,7 @@ metadata: app.kubernetes.io/name: vector app.kubernetes.io/instance: vector app.kubernetes.io/component: Agent - app.kubernetes.io/version: "0.32.0-distroless-libc" + app.kubernetes.io/version: "0.32.1-distroless-libc" rules: - apiGroups: - "" @@ -31,7 +31,7 @@ metadata: app.kubernetes.io/name: vector app.kubernetes.io/instance: vector app.kubernetes.io/component: Agent - app.kubernetes.io/version: "0.32.0-distroless-libc" + app.kubernetes.io/version: "0.32.1-distroless-libc" roleRef: apiGroup: rbac.authorization.k8s.io kind: ClusterRole diff --git a/distribution/kubernetes/vector-agent/service-headless.yaml b/distribution/kubernetes/vector-agent/service-headless.yaml index 0843111bf6412..341aa58559b94 100644 --- a/distribution/kubernetes/vector-agent/service-headless.yaml +++ b/distribution/kubernetes/vector-agent/service-headless.yaml @@ -8,7 +8,7 @@ metadata: app.kubernetes.io/name: vector app.kubernetes.io/instance: vector app.kubernetes.io/component: Agent - app.kubernetes.io/version: "0.32.0-distroless-libc" + app.kubernetes.io/version: "0.32.1-distroless-libc" annotations: spec: clusterIP: None diff --git a/distribution/kubernetes/vector-agent/serviceaccount.yaml b/distribution/kubernetes/vector-agent/serviceaccount.yaml index 2067ddb10f697..4f6168cd29661 100644 --- a/distribution/kubernetes/vector-agent/serviceaccount.yaml +++ b/distribution/kubernetes/vector-agent/serviceaccount.yaml @@ -8,5 +8,5 @@ metadata: app.kubernetes.io/name: vector app.kubernetes.io/instance: vector app.kubernetes.io/component: Agent - app.kubernetes.io/version: "0.32.0-distroless-libc" + app.kubernetes.io/version: "0.32.1-distroless-libc" automountServiceAccountToken: true diff --git a/distribution/kubernetes/vector-aggregator/README.md b/distribution/kubernetes/vector-aggregator/README.md index 40e4967b4288a..5a94f3d9fce14 100644 --- a/distribution/kubernetes/vector-aggregator/README.md +++ b/distribution/kubernetes/vector-aggregator/README.md @@ -1,6 +1,6 @@ The kubernetes manifests found in this directory have been automatically generated from the [helm chart `vector/vector`](https://github.com/vectordotdev/helm-charts/tree/master/charts/vector) -version 0.24.0 with the following `values.yaml`: +version 0.24.1 with the following `values.yaml`: ```yaml diff --git a/distribution/kubernetes/vector-aggregator/configmap.yaml b/distribution/kubernetes/vector-aggregator/configmap.yaml index c0b4896af6df1..15cc73d5b32a4 100644 --- a/distribution/kubernetes/vector-aggregator/configmap.yaml +++ b/distribution/kubernetes/vector-aggregator/configmap.yaml @@ -8,7 +8,7 @@ metadata: app.kubernetes.io/name: vector app.kubernetes.io/instance: vector app.kubernetes.io/component: Aggregator - app.kubernetes.io/version: "0.32.0-distroless-libc" + app.kubernetes.io/version: "0.32.1-distroless-libc" data: aggregator.yaml: | data_dir: /vector-data-dir diff --git a/distribution/kubernetes/vector-aggregator/service-headless.yaml b/distribution/kubernetes/vector-aggregator/service-headless.yaml index 53cf1a70b1666..060893759c6f6 100644 --- a/distribution/kubernetes/vector-aggregator/service-headless.yaml +++ b/distribution/kubernetes/vector-aggregator/service-headless.yaml @@ -8,7 +8,7 @@ metadata: app.kubernetes.io/name: vector app.kubernetes.io/instance: vector app.kubernetes.io/component: Aggregator - app.kubernetes.io/version: "0.32.0-distroless-libc" + app.kubernetes.io/version: "0.32.1-distroless-libc" annotations: spec: clusterIP: None diff --git a/distribution/kubernetes/vector-aggregator/service.yaml b/distribution/kubernetes/vector-aggregator/service.yaml index 7244804732d44..3945e1989ff6a 100644 --- a/distribution/kubernetes/vector-aggregator/service.yaml +++ b/distribution/kubernetes/vector-aggregator/service.yaml @@ -8,7 +8,7 @@ metadata: app.kubernetes.io/name: vector app.kubernetes.io/instance: vector app.kubernetes.io/component: Aggregator - app.kubernetes.io/version: "0.32.0-distroless-libc" + app.kubernetes.io/version: "0.32.1-distroless-libc" annotations: spec: ports: diff --git a/distribution/kubernetes/vector-aggregator/serviceaccount.yaml b/distribution/kubernetes/vector-aggregator/serviceaccount.yaml index 0086c778431f8..8f2dd1668c7be 100644 --- a/distribution/kubernetes/vector-aggregator/serviceaccount.yaml +++ b/distribution/kubernetes/vector-aggregator/serviceaccount.yaml @@ -8,5 +8,5 @@ metadata: app.kubernetes.io/name: vector app.kubernetes.io/instance: vector app.kubernetes.io/component: Aggregator - app.kubernetes.io/version: "0.32.0-distroless-libc" + app.kubernetes.io/version: "0.32.1-distroless-libc" automountServiceAccountToken: true diff --git a/distribution/kubernetes/vector-aggregator/statefulset.yaml b/distribution/kubernetes/vector-aggregator/statefulset.yaml index bc02dcc4cfe2f..eb03b80e10c69 100644 --- a/distribution/kubernetes/vector-aggregator/statefulset.yaml +++ b/distribution/kubernetes/vector-aggregator/statefulset.yaml @@ -8,7 +8,7 @@ metadata: app.kubernetes.io/name: vector app.kubernetes.io/instance: vector app.kubernetes.io/component: Aggregator - app.kubernetes.io/version: "0.32.0-distroless-libc" + app.kubernetes.io/version: "0.32.1-distroless-libc" annotations: {} spec: replicas: 1 @@ -32,7 +32,7 @@ spec: dnsPolicy: ClusterFirst containers: - name: vector - image: "timberio/vector:0.32.0-distroless-libc" + image: "timberio/vector:0.32.1-distroless-libc" imagePullPolicy: IfNotPresent args: - --config-dir diff --git a/distribution/kubernetes/vector-stateless-aggregator/README.md b/distribution/kubernetes/vector-stateless-aggregator/README.md index fc1b9dcde8b6a..d8a71f5e0fc81 100644 --- a/distribution/kubernetes/vector-stateless-aggregator/README.md +++ b/distribution/kubernetes/vector-stateless-aggregator/README.md @@ -1,6 +1,6 @@ The kubernetes manifests found in this directory have been automatically generated from the [helm chart `vector/vector`](https://github.com/vectordotdev/helm-charts/tree/master/charts/vector) -version 0.24.0 with the following `values.yaml`: +version 0.24.1 with the following `values.yaml`: ```yaml role: Stateless-Aggregator diff --git a/distribution/kubernetes/vector-stateless-aggregator/configmap.yaml b/distribution/kubernetes/vector-stateless-aggregator/configmap.yaml index 623457ec6111f..e85fc18d2afac 100644 --- a/distribution/kubernetes/vector-stateless-aggregator/configmap.yaml +++ b/distribution/kubernetes/vector-stateless-aggregator/configmap.yaml @@ -8,7 +8,7 @@ metadata: app.kubernetes.io/name: vector app.kubernetes.io/instance: vector app.kubernetes.io/component: Stateless-Aggregator - app.kubernetes.io/version: "0.32.0-distroless-libc" + app.kubernetes.io/version: "0.32.1-distroless-libc" data: aggregator.yaml: | data_dir: /vector-data-dir diff --git a/distribution/kubernetes/vector-stateless-aggregator/deployment.yaml b/distribution/kubernetes/vector-stateless-aggregator/deployment.yaml index 223f49dddf17c..d5d391654a422 100644 --- a/distribution/kubernetes/vector-stateless-aggregator/deployment.yaml +++ b/distribution/kubernetes/vector-stateless-aggregator/deployment.yaml @@ -8,7 +8,7 @@ metadata: app.kubernetes.io/name: vector app.kubernetes.io/instance: vector app.kubernetes.io/component: Stateless-Aggregator - app.kubernetes.io/version: "0.32.0-distroless-libc" + app.kubernetes.io/version: "0.32.1-distroless-libc" annotations: {} spec: replicas: 1 @@ -30,7 +30,7 @@ spec: dnsPolicy: ClusterFirst containers: - name: vector - image: "timberio/vector:0.32.0-distroless-libc" + image: "timberio/vector:0.32.1-distroless-libc" imagePullPolicy: IfNotPresent args: - --config-dir diff --git a/distribution/kubernetes/vector-stateless-aggregator/service-headless.yaml b/distribution/kubernetes/vector-stateless-aggregator/service-headless.yaml index 562b989159047..6ea7268685da2 100644 --- a/distribution/kubernetes/vector-stateless-aggregator/service-headless.yaml +++ b/distribution/kubernetes/vector-stateless-aggregator/service-headless.yaml @@ -8,7 +8,7 @@ metadata: app.kubernetes.io/name: vector app.kubernetes.io/instance: vector app.kubernetes.io/component: Stateless-Aggregator - app.kubernetes.io/version: "0.32.0-distroless-libc" + app.kubernetes.io/version: "0.32.1-distroless-libc" annotations: spec: clusterIP: None diff --git a/distribution/kubernetes/vector-stateless-aggregator/service.yaml b/distribution/kubernetes/vector-stateless-aggregator/service.yaml index 34b6464ec7d00..c80814951d7a5 100644 --- a/distribution/kubernetes/vector-stateless-aggregator/service.yaml +++ b/distribution/kubernetes/vector-stateless-aggregator/service.yaml @@ -8,7 +8,7 @@ metadata: app.kubernetes.io/name: vector app.kubernetes.io/instance: vector app.kubernetes.io/component: Stateless-Aggregator - app.kubernetes.io/version: "0.32.0-distroless-libc" + app.kubernetes.io/version: "0.32.1-distroless-libc" annotations: spec: ports: diff --git a/distribution/kubernetes/vector-stateless-aggregator/serviceaccount.yaml b/distribution/kubernetes/vector-stateless-aggregator/serviceaccount.yaml index 8c1b67a83a0d8..245c11ac328cd 100644 --- a/distribution/kubernetes/vector-stateless-aggregator/serviceaccount.yaml +++ b/distribution/kubernetes/vector-stateless-aggregator/serviceaccount.yaml @@ -8,5 +8,5 @@ metadata: app.kubernetes.io/name: vector app.kubernetes.io/instance: vector app.kubernetes.io/component: Stateless-Aggregator - app.kubernetes.io/version: "0.32.0-distroless-libc" + app.kubernetes.io/version: "0.32.1-distroless-libc" automountServiceAccountToken: true diff --git a/lib/codecs/Cargo.toml b/lib/codecs/Cargo.toml index 2adaa25737750..c2ce406a130c8 100644 --- a/lib/codecs/Cargo.toml +++ b/lib/codecs/Cargo.toml @@ -15,7 +15,7 @@ dyn-clone = { version = "1", default-features = false } lookup = { package = "vector-lookup", path = "../vector-lookup", default-features = false } memchr = { version = "2", default-features = false } once_cell = { version = "1.18", default-features = false } -ordered-float = { version = "3.9.0", default-features = false } +ordered-float = { version = "3.9.1", default-features = false } prost = { version = "0.11.8", default-features = false, features = ["std"] } prost-reflect = { version = "0.11", default-features = false, features = ["serde"] } regex = { version = "1.9.3", default-features = false, features = ["std", "perf"] } @@ -38,7 +38,7 @@ futures = { version = "0.3", default-features = false } indoc = { version = "2", default-features = false } tokio = { version = "1", features = ["test-util"] } similar-asserts = "1.5.0" -vrl = { version = "0.6.0", features = ["test"]} +vrl = { version = "0.6.0", features = ["cli", "test", "test_framework", "arbitrary"] } vector-core = { path = "../vector-core", default-features = false, features = ["test"] } [features] diff --git a/lib/k8s-e2e-tests/Cargo.toml b/lib/k8s-e2e-tests/Cargo.toml index 9c63afacbc534..811924671ca69 100644 --- a/lib/k8s-e2e-tests/Cargo.toml +++ b/lib/k8s-e2e-tests/Cargo.toml @@ -12,7 +12,7 @@ futures = "0.3" k8s-openapi = { version = "0.16.0", default-features = false, features = ["v1_19"] } k8s-test-framework = { version = "0.1", path = "../k8s-test-framework" } regex = "1" -reqwest = { version = "0.11.18", features = ["json"] } +reqwest = { version = "0.11.20", features = ["json"] } serde_json = "1" tokio = { version = "1.32.0", features = ["full"] } indoc = "2.0.3" diff --git a/lib/opentelemetry-proto/Cargo.toml b/lib/opentelemetry-proto/Cargo.toml index 3b22dbb1b3017..e20acec6c320e 100644 --- a/lib/opentelemetry-proto/Cargo.toml +++ b/lib/opentelemetry-proto/Cargo.toml @@ -14,7 +14,7 @@ bytes = { version = "1.4.0", default-features = false, features = ["serde"] } chrono = { version = "0.4.19", default-features = false, features = ["serde"] } hex = { version = "0.4.3", default-features = false, features = ["std"] } lookup = { package = "vector-lookup", path = "../vector-lookup", default-features = false } -ordered-float = { version = "3.9.0", default-features = false } +ordered-float = { version = "3.9.1", default-features = false } prost = { version = "0.11", default-features = false, features = ["std"] } tonic = { version = "0.9", default-features = false, features = ["codegen", "gzip", "prost", "tls", "tls-roots", "transport"] } vrl.workspace = true diff --git a/lib/tracing-limit/Cargo.toml b/lib/tracing-limit/Cargo.toml index 5eb8fe73f0840..19d85737484b5 100644 --- a/lib/tracing-limit/Cargo.toml +++ b/lib/tracing-limit/Cargo.toml @@ -9,7 +9,7 @@ license = "MPL-2.0" [dependencies] tracing-core = { version = "0.1", default-features = false } tracing-subscriber = { version = "0.3", default-features = false, features = ["registry", "std"] } -dashmap = { version = "5.5.0", default-features = false } +dashmap = { version = "5.5.1", default-features = false } [dev-dependencies] criterion = "0.5" diff --git a/lib/vector-api-client/Cargo.toml b/lib/vector-api-client/Cargo.toml index ff42a6d32c62d..fe163f621aec4 100644 --- a/lib/vector-api-client/Cargo.toml +++ b/lib/vector-api-client/Cargo.toml @@ -25,12 +25,12 @@ tokio-stream = { version = "0.1.14", default-features = false, features = ["sync graphql_client = { version = "0.13.0", default-features = false, features = ["graphql_query_derive"] } # HTTP / WebSockets -reqwest = { version = "0.11.18", default-features = false, features = ["json"] } +reqwest = { version = "0.11.20", default-features = false, features = ["json"] } tokio-tungstenite = { version = "0.20.0", default-features = false, features = ["connect", "rustls"] } # External libs chrono = { version = "0.4.6", default-features = false, features = ["serde"] } -clap = { version = "4.3.22", default-features = false, features = ["derive"] } +clap = { version = "4.3.24", default-features = false, features = ["derive"] } url = { version = "2.4.0", default-features = false } uuid = { version = "1", default-features = false, features = ["serde", "v4"] } indoc = { version = "2.0.3", default-features = false } diff --git a/lib/vector-buffers/Cargo.toml b/lib/vector-buffers/Cargo.toml index aa37e0d02b769..d7553e4545836 100644 --- a/lib/vector-buffers/Cargo.toml +++ b/lib/vector-buffers/Cargo.toml @@ -32,7 +32,7 @@ vector-config-macros = { path = "../vector-config-macros", default-features = fa vector-common = { path = "../vector-common", default-features = false, features = ["byte_size_of", "serde"] } [dev-dependencies] -clap = "4.3.22" +clap = "4.3.24" criterion = { version = "0.5", features = ["html_reports", "async_tokio"] } crossbeam-queue = "0.3.8" hdrhistogram = "7.5.2" diff --git a/lib/vector-common/Cargo.toml b/lib/vector-common/Cargo.toml index 3ff2e7a308bd8..d73f74e9532ac 100644 --- a/lib/vector-common/Cargo.toml +++ b/lib/vector-common/Cargo.toml @@ -51,7 +51,7 @@ futures = { version = "0.3.28", default-features = false, features = ["std"] } indexmap = { version = "~2.0.0", default-features = false, features = ["std"] } metrics = "0.21.1" nom = { version = "7", optional = true } -ordered-float = { version = "3.9.0", default-features = false } +ordered-float = { version = "3.9.1", default-features = false } paste = "1.0.14" pin-project.workspace = true ryu = { version = "1", default-features = false } diff --git a/lib/vector-config/Cargo.toml b/lib/vector-config/Cargo.toml index 71f86425f7df6..147e2be98ea90 100644 --- a/lib/vector-config/Cargo.toml +++ b/lib/vector-config/Cargo.toml @@ -34,3 +34,4 @@ vector-config-macros = { path = "../vector-config-macros" } [dev-dependencies] assert-json-diff = { version = "2", default-features = false } serde_with = { version = "3.3.0", default-features = false, features = ["std", "macros"] } +vector-core = { path = "../vector-core", default-features = false, features = ["test"] } diff --git a/lib/vector-core/Cargo.toml b/lib/vector-core/Cargo.toml index efa202d54450e..1abd475c0b85b 100644 --- a/lib/vector-core/Cargo.toml +++ b/lib/vector-core/Cargo.toml @@ -31,7 +31,7 @@ metrics-util = { version = "0.15.1", default-features = false, features = ["regi mlua = { version = "0.8.10", default-features = false, features = ["lua54", "send", "vendored"], optional = true } no-proxy = { version = "0.3.4", default-features = false, features = ["serialize"] } once_cell = { version = "1.18", default-features = false } -ordered-float = { version = "3.9.0", default-features = false } +ordered-float = { version = "3.9.1", default-features = false } openssl = { version = "0.10.56", default-features = false, features = ["vendored"] } parking_lot = { version = "0.12.1", default-features = false } pin-project.workspace = true @@ -95,7 +95,7 @@ rand = "0.8.5" rand_distr = "0.4.3" tracing-subscriber = { version = "0.3.17", default-features = false, features = ["env-filter", "fmt", "ansi", "registry"] } vector-common = { path = "../vector-common", default-features = false, features = ["test"] } -vrl.workspace = true +vrl = { version = "0.6.0", features = ["cli", "test", "test_framework", "arbitrary"] } [features] api = ["dep:async-graphql"] diff --git a/lib/vector-vrl/cli/Cargo.toml b/lib/vector-vrl/cli/Cargo.toml index 5ad70ebe60a2b..2d55a431c7b58 100644 --- a/lib/vector-vrl/cli/Cargo.toml +++ b/lib/vector-vrl/cli/Cargo.toml @@ -7,6 +7,6 @@ publish = false license = "MPL-2.0" [dependencies] -clap = { version = "4.3.22", features = ["derive"] } +clap = { version = "4.3.24", features = ["derive"] } vector-vrl-functions = { path = "../functions" } vrl.workspace = true diff --git a/lib/vector-vrl/tests/Cargo.toml b/lib/vector-vrl/tests/Cargo.toml index 59e3a2199c9d3..12ff294e25d85 100644 --- a/lib/vector-vrl/tests/Cargo.toml +++ b/lib/vector-vrl/tests/Cargo.toml @@ -13,7 +13,7 @@ vector-vrl-functions = { path = "../../vector-vrl/functions" } ansi_term = "0.12" chrono = "0.4" chrono-tz = "0.8" -clap = { version = "4.3.22", features = ["derive"] } +clap = { version = "4.3.24", features = ["derive"] } glob = "0.3" prettydiff = "0.6" regex = "1" diff --git a/src/api/tap.rs b/src/api/tap.rs index 38603f8aace4e..15ddfcf4406c0 100644 --- a/src/api/tap.rs +++ b/src/api/tap.rs @@ -415,6 +415,7 @@ async fn tap_handler( test, feature = "sinks-blackhole", feature = "sources-demo_logs", + feature = "transforms-log_to_metric", feature = "transforms-remap", ))] mod tests { diff --git a/src/config/cmd.rs b/src/config/cmd.rs index c5a2c0b79c9aa..078426cbe7bf2 100644 --- a/src/config/cmd.rs +++ b/src/config/cmd.rs @@ -202,8 +202,10 @@ mod tests { use serde_json::json; use vector_config::component::{SinkDescription, SourceDescription, TransformDescription}; + use crate::config::Format; use crate::{ config::{cmd::serialize_to_json, vars, ConfigBuilder}, + generate, generate::{generate_example, TransformInputsStrategy}, }; @@ -325,13 +327,13 @@ mod tests { .collect::>() .join(","), ); - generate_example( - false, - generate_config_str.as_ref(), - &None, - TransformInputsStrategy::All, - ) - .expect("invalid config generated") + let opts = generate::Opts { + fragment: true, + expression: generate_config_str.to_string(), + file: None, + format: Format::Toml, + }; + generate_example(&opts, TransformInputsStrategy::All).expect("invalid config generated") } proptest! { diff --git a/src/config/format.rs b/src/config/format.rs index 74d8b5911df49..723394fa53646 100644 --- a/src/config/format.rs +++ b/src/config/format.rs @@ -3,6 +3,7 @@ #![deny(missing_docs, missing_debug_implementations)] use std::path::Path; +use std::str::FromStr; use serde::de; @@ -21,6 +22,19 @@ pub enum Format { Yaml, } +impl FromStr for Format { + type Err = String; + + fn from_str(s: &str) -> Result { + match s.to_lowercase().as_str() { + "toml" => Ok(Format::Toml), + "yaml" => Ok(Format::Yaml), + "json" => Ok(Format::Json), + _ => Err(format!("Invalid format: {}", s)), + } + } +} + impl Format { /// Obtain the format from the file path using extension as a hint. pub fn from_path>(path: T) -> Result { @@ -34,8 +48,6 @@ impl Format { } /// Parse the string represented in the specified format. -/// If the format is unknown - fallback to the default format and attempt -/// parsing using that. pub fn deserialize(content: &str, format: Format) -> Result> where T: de::DeserializeOwned, @@ -47,9 +59,31 @@ where } } +/// Serialize the specified `value` into a string. +pub fn serialize(value: &T, format: Format) -> Result +where + T: serde::ser::Serialize, +{ + match format { + Format::Toml => toml::to_string(value).map_err(|e| e.to_string()), + Format::Yaml => serde_yaml::to_string(value).map_err(|e| e.to_string()), + Format::Json => serde_json::to_string_pretty(value).map_err(|e| e.to_string()), + } +} + #[cfg(test)] mod tests { use super::*; + use proptest::prelude::*; + + impl Arbitrary for Format { + type Parameters = (); + fn arbitrary_with(_args: Self::Parameters) -> Self::Strategy { + prop_oneof![Just(Format::Toml), Just(Format::Json), Just(Format::Yaml),].boxed() + } + + type Strategy = BoxedStrategy; + } /// This test ensures the logic to guess file format from the file path /// works correctly. diff --git a/src/generate.rs b/src/generate.rs index 92d1215022a19..91bf0eda77578 100644 --- a/src/generate.rs +++ b/src/generate.rs @@ -15,14 +15,14 @@ use vector_config::component::{ }; use vector_core::{buffers::BufferConfig, config::GlobalOptions, default_data_dir}; -use crate::config::SinkHealthcheckOptions; +use crate::config::{format, Format, SinkHealthcheckOptions}; #[derive(Parser, Debug)] #[command(rename_all = "kebab-case")] pub struct Opts { /// Whether to skip the generation of global fields. #[arg(short, long)] - fragment: bool, + pub(crate) fragment: bool, /// Generate expression, e.g. 'stdin/remap,filter/console' /// @@ -53,11 +53,14 @@ pub struct Opts { /// from the last transform or, if none are specified, from all sources. It /// is then up to you to restructure the `inputs` of each component to build /// the topology you need. - expression: String, + pub(crate) expression: String, /// Generate config as a file #[arg(long)] - file: Option, + pub(crate) file: Option, + + #[arg(long, default_value = "yaml")] + pub(crate) format: Format, } #[derive(Serialize)] @@ -78,8 +81,11 @@ pub struct TransformOuter { #[derive(Serialize, Default)] pub struct Config { + #[serde(skip_serializing_if = "Option::is_none")] pub sources: Option>, + #[serde(skip_serializing_if = "Option::is_none")] pub transforms: Option>, + #[serde(skip_serializing_if = "Option::is_none")] pub sinks: Option>, } @@ -99,13 +105,20 @@ pub(crate) enum TransformInputsStrategy { All, } +#[derive(Serialize, Default)] +struct FullConfig { + #[serde(flatten)] + global_options: Option, + #[serde(flatten)] + config: Config, +} + pub(crate) fn generate_example( - include_globals: bool, - expression: &str, - file: &Option, + opts: &Opts, transform_inputs_strategy: TransformInputsStrategy, ) -> Result> { - let components: Vec> = expression + let components: Vec> = opts + .expression .split(|c| c == '|' || c == '/') .map(|s| { s.split(',') @@ -115,10 +128,6 @@ pub(crate) fn generate_example( }) .collect(); - let globals = GlobalOptions { - data_dir: default_data_dir(), - ..Default::default() - }; let mut config = Config::default(); let mut errs = Vec::new(); @@ -305,51 +314,27 @@ pub(crate) fn generate_example( return Err(errs); } - let mut builder = if include_globals { - match toml::to_string(&globals) { - Ok(s) => s, - Err(err) => { - errs.push(format!("failed to marshal globals: {}", err)); - return Err(errs); - } - } - } else { - String::new() - }; - if let Some(sources) = config.sources { - match toml::to_string(&{ - Config { - sources: Some(sources), - ..Default::default() - } - }) { - Ok(v) => builder = [builder, v].join("\n"), - Err(e) => errs.push(format!("failed to marshal sources: {}", e)), - } - } - if let Some(transforms) = config.transforms { - match toml::to_string(&{ - Config { - transforms: Some(transforms), + let full_config = FullConfig { + global_options: if !opts.fragment { + Some(GlobalOptions { + data_dir: default_data_dir(), ..Default::default() - } - }) { - Ok(v) => builder = [builder, v].join("\n"), - Err(e) => errs.push(format!("failed to marshal transforms: {}", e)), - } - } - if let Some(sinks) = config.sinks { - match toml::to_string(&{ - Config { - sinks: Some(sinks), - ..Default::default() - } - }) { - Ok(v) => builder = [builder, v].join("\n"), - Err(e) => errs.push(format!("failed to marshal sinks: {}", e)), + }) + } else { + None + }, + config, + }; + + let builder = match format::serialize(&full_config, opts.format) { + Ok(v) => v, + Err(e) => { + errs.push(format!("failed to marshal sources: {e}")); + return Err(errs); } - } + }; + let file = opts.file.as_ref(); if file.is_some() { #[allow(clippy::print_stdout)] match write_config(file.as_ref().unwrap(), &builder) { @@ -359,7 +344,7 @@ pub(crate) fn generate_example( &file.as_ref().unwrap().join("\n") ) } - Err(e) => errs.push(format!("failed to write to file: {}", e)), + Err(e) => errs.push(format!("failed to write to file: {e}")), }; }; @@ -371,12 +356,7 @@ pub(crate) fn generate_example( } pub fn cmd(opts: &Opts) -> exitcode::ExitCode { - match generate_example( - !opts.fragment, - &opts.expression, - &opts.file, - TransformInputsStrategy::Auto, - ) { + match generate_example(opts, TransformInputsStrategy::Auto) { Ok(s) => { #[allow(clippy::print_stdout)] { @@ -411,42 +391,40 @@ fn write_config(filepath: &Path, body: &str) -> Result<(), crate::Error> { #[cfg(test)] mod tests { use super::*; - - #[test] - fn generate_all() { - let mut errors = Vec::new(); - - for name in SourceDescription::types() { - let param = format!("{}//", name); - let cfg = generate_example(true, ¶m, &None, TransformInputsStrategy::Auto).unwrap(); - if let Err(error) = toml::from_str::(&cfg) { - errors.push((param, error)); - } + use crate::config::ConfigBuilder; + use proptest::prelude::*; + + fn generate_and_deserialize(expression: String, format: Format) { + let opts = Opts { + fragment: false, + expression, + file: None, + format, + }; + let cfg_string = generate_example(&opts, TransformInputsStrategy::Auto).unwrap(); + if let Err(error) = format::deserialize::(&cfg_string, opts.format) { + panic!( + "Failed to generate example for {} with error: {error:?})", + opts.expression + ); } + } - for name in TransformDescription::types() { - let param = format!("/{}/", name); - let cfg = generate_example(true, ¶m, &None, TransformInputsStrategy::Auto).unwrap(); - if let Err(error) = toml::from_str::(&cfg) { - errors.push((param, error)); + proptest! { + #[test] + fn generate_all(format in any::()) { + for name in SourceDescription::types() { + generate_and_deserialize(format!("{}//", name), format); } - } - for name in SinkDescription::types() { - let param = format!("//{}", name); - let cfg = generate_example(true, ¶m, &None, TransformInputsStrategy::Auto).unwrap(); - if let Err(error) = toml::from_str::(&cfg) { - errors.push((param, error)); + for name in TransformDescription::types() { + generate_and_deserialize(format!("/{}/", name), format); } - } - for (component, error) in &errors { - #[allow(clippy::print_stdout)] - { - println!("{:?} : {}", component, error); + for name in SinkDescription::types() { + generate_and_deserialize(format!("//{}", name), format); } } - assert!(errors.is_empty()); } #[cfg(all(feature = "sources-stdin", feature = "sinks-console"))] @@ -458,12 +436,14 @@ mod tests { let tempdir = tempdir().expect("Unable to create tempdir for config"); let filepath = tempdir.path().join("./config.example.toml"); - let cfg = generate_example( - true, - "stdin/test_basic/console", - &Some(filepath.clone()), - TransformInputsStrategy::Auto, - ); + let opts = Opts { + fragment: false, + expression: "stdin/test_basic/console".to_string(), + file: Some(filepath.clone()), + format: Format::Toml, + }; + + let cfg = generate_example(&opts, TransformInputsStrategy::Auto); let filecontents = fs::read_to_string( fs::canonicalize(&filepath).expect("Could not return canonicalized filepath"), ) @@ -474,14 +454,16 @@ mod tests { #[cfg(all(feature = "sources-stdin", feature = "sinks-console"))] #[test] - fn generate_basic() { + fn generate_basic_toml() { + let mut opts = Opts { + fragment: false, + expression: "stdin/test_basic/console".to_string(), + file: None, + format: Format::Toml, + }; + assert_eq!( - generate_example( - true, - "stdin/test_basic/console", - &None, - TransformInputsStrategy::Auto - ), + generate_example(&opts, TransformInputsStrategy::Auto), Ok(indoc::indoc! {r#"data_dir = "/var/lib/vector/" [sources.source0] @@ -516,13 +498,9 @@ mod tests { .to_string()) ); + opts.expression = "stdin|test_basic|console".to_string(); assert_eq!( - generate_example( - true, - "stdin|test_basic|console", - &None, - TransformInputsStrategy::Auto - ), + generate_example(&opts, TransformInputsStrategy::Auto), Ok(indoc::indoc! {r#"data_dir = "/var/lib/vector/" [sources.source0] @@ -557,8 +535,9 @@ mod tests { .to_string()) ); + opts.expression = "stdin//console".to_string(); assert_eq!( - generate_example(true, "stdin//console", &None, TransformInputsStrategy::Auto), + generate_example(&opts, TransformInputsStrategy::Auto), Ok(indoc::indoc! {r#"data_dir = "/var/lib/vector/" [sources.source0] @@ -587,8 +566,9 @@ mod tests { .to_string()) ); + opts.expression = "//console".to_string(); assert_eq!( - generate_example(true, "//console", &None, TransformInputsStrategy::Auto), + generate_example(&opts, TransformInputsStrategy::Auto), Ok(indoc::indoc! {r#"data_dir = "/var/lib/vector/" [sinks.sink0] @@ -610,13 +590,9 @@ mod tests { .to_string()) ); + opts.expression = "/test_basic,test_basic,test_basic".to_string(); assert_eq!( - generate_example( - true, - "/test_basic,test_basic,test_basic", - &None, - TransformInputsStrategy::Auto - ), + generate_example(&opts, TransformInputsStrategy::Auto), Ok(indoc::indoc! {r#"data_dir = "/var/lib/vector/" [transforms.transform0] @@ -640,15 +616,11 @@ mod tests { .to_string()) ); + opts.fragment = true; + opts.expression = "/test_basic,test_basic,test_basic".to_string(); assert_eq!( - generate_example( - false, - "/test_basic,test_basic,test_basic", - &None, - TransformInputsStrategy::Auto - ), + generate_example(&opts, TransformInputsStrategy::Auto), Ok(indoc::indoc! {r#" - [transforms.transform0] inputs = [] increase = 0.0 @@ -670,4 +642,122 @@ mod tests { .to_string()) ); } + + #[test] + fn generate_basic_yaml() { + let opts = Opts { + fragment: false, + expression: "demo_logs/remap/console".to_string(), + file: None, + format: Format::Yaml, + }; + + assert_eq!( + generate_example(&opts, TransformInputsStrategy::Auto).unwrap(), + indoc::indoc! {r#" + data_dir: /var/lib/vector/ + sources: + source0: + count: 9223372036854775807 + format: json + interval: 1.0 + type: demo_logs + decoding: + codec: bytes + framing: + method: bytes + transforms: + transform0: + inputs: + - source0 + drop_on_abort: false + drop_on_error: false + metric_tag_values: single + reroute_dropped: false + runtime: ast + type: remap + sinks: + sink0: + inputs: + - transform0 + target: stdout + type: console + encoding: + codec: json + healthcheck: + enabled: true + uri: null + buffer: + type: memory + max_events: 500 + when_full: block + "#} + ); + } + + #[test] + fn generate_basic_json() { + let opts = Opts { + fragment: false, + expression: "demo_logs/remap/console".to_string(), + file: None, + format: Format::Json, + }; + + assert_eq!( + generate_example(&opts, TransformInputsStrategy::Auto).unwrap(), + indoc::indoc! {r#" + { + "data_dir": "/var/lib/vector/", + "sources": { + "source0": { + "count": 9223372036854775807, + "format": "json", + "interval": 1.0, + "type": "demo_logs", + "decoding": { + "codec": "bytes" + }, + "framing": { + "method": "bytes" + } + } + }, + "transforms": { + "transform0": { + "inputs": [ + "source0" + ], + "drop_on_abort": false, + "drop_on_error": false, + "metric_tag_values": "single", + "reroute_dropped": false, + "runtime": "ast", + "type": "remap" + } + }, + "sinks": { + "sink0": { + "inputs": [ + "transform0" + ], + "target": "stdout", + "type": "console", + "encoding": { + "codec": "json" + }, + "healthcheck": { + "enabled": true, + "uri": null + }, + "buffer": { + "type": "memory", + "max_events": 500, + "when_full": "block" + } + } + } + }"#} + ); + } } diff --git a/src/internal_events/mod.rs b/src/internal_events/mod.rs index f1073573cb51c..2fec43ea4f7cd 100644 --- a/src/internal_events/mod.rs +++ b/src/internal_events/mod.rs @@ -76,6 +76,7 @@ mod journald; mod kafka; #[cfg(feature = "sources-kubernetes_logs")] mod kubernetes_logs; +#[cfg(feature = "transforms-log_to_metric")] mod log_to_metric; mod logplex; #[cfg(feature = "sinks-loki")] @@ -213,6 +214,7 @@ pub(crate) use self::journald::*; pub(crate) use self::kafka::*; #[cfg(feature = "sources-kubernetes_logs")] pub(crate) use self::kubernetes_logs::*; +#[cfg(feature = "transforms-log_to_metric")] pub(crate) use self::log_to_metric::*; #[cfg(feature = "sources-heroku_logs")] pub(crate) use self::logplex::*; @@ -224,6 +226,7 @@ pub(crate) use self::lua::*; pub(crate) use self::metric_to_log::*; #[cfg(feature = "sources-nginx_metrics")] pub(crate) use self::nginx_metrics::*; +#[allow(unused_imports)] pub(crate) use self::parser::*; #[cfg(feature = "sources-postgresql_metrics")] pub(crate) use self::postgresql_metrics::*; diff --git a/src/sinks/gcp/mod.rs b/src/sinks/gcp/mod.rs index 022482490fa53..5a7f64b84c076 100644 --- a/src/sinks/gcp/mod.rs +++ b/src/sinks/gcp/mod.rs @@ -6,7 +6,7 @@ use vector_config::configurable_component; pub mod chronicle_unstructured; pub mod cloud_storage; pub mod pubsub; -pub mod stackdriver_logs; +pub mod stackdriver; pub mod stackdriver_metrics; /// A monitored resource. diff --git a/src/sinks/gcp/stackdriver/logs/config.rs b/src/sinks/gcp/stackdriver/logs/config.rs new file mode 100644 index 0000000000000..71adf3d0ef778 --- /dev/null +++ b/src/sinks/gcp/stackdriver/logs/config.rs @@ -0,0 +1,278 @@ +//! Configuration for the `gcp_stackdriver_logs` sink. + +use crate::{ + gcp::{GcpAuthConfig, GcpAuthenticator, Scope}, + http::HttpClient, + schema, + sinks::{ + gcs_common::config::healthcheck_response, + prelude::*, + util::{ + http::{http_response_retry_logic, HttpService}, + BoxedRawValue, RealtimeSizeBasedDefaultBatchSettings, + }, + }, +}; +use http::{Request, Uri}; +use hyper::Body; +use lookup::lookup_v2::ConfigValuePath; +use snafu::Snafu; +use std::collections::HashMap; +use vrl::value::Kind; + +use super::{ + encoder::StackdriverLogsEncoder, request_builder::StackdriverLogsRequestBuilder, + service::StackdriverLogsServiceRequestBuilder, sink::StackdriverLogsSink, +}; + +#[derive(Debug, Snafu)] +enum HealthcheckError { + #[snafu(display("Resource not found"))] + NotFound, +} + +/// Configuration for the `gcp_stackdriver_logs` sink. +#[configurable_component(sink( + "gcp_stackdriver_logs", + "Deliver logs to GCP's Cloud Operations suite." +))] +#[derive(Clone, Debug, Default)] +#[serde(deny_unknown_fields)] +pub(super) struct StackdriverConfig { + #[serde(skip, default = "default_endpoint")] + pub(super) endpoint: String, + + #[serde(flatten)] + pub(super) log_name: StackdriverLogName, + + /// The log ID to which to publish logs. + /// + /// This is a name you create to identify this log stream. + pub(super) log_id: Template, + + /// The monitored resource to associate the logs with. + pub(super) resource: StackdriverResource, + + /// The field of the log event from which to take the outgoing log’s `severity` field. + /// + /// The named field is removed from the log event if present, and must be either an integer + /// between 0 and 800 or a string containing one of the [severity level names][sev_names] (case + /// is ignored) or a common prefix such as `err`. + /// + /// If no severity key is specified, the severity of outgoing records is set to 0 (`DEFAULT`). + /// + /// See the [GCP Stackdriver Logging LogSeverity description][logsev_docs] for more details on + /// the value of the `severity` field. + /// + /// [sev_names]: https://cloud.google.com/logging/docs/reference/v2/rest/v2/LogEntry#logseverity + /// [logsev_docs]: https://cloud.google.com/logging/docs/reference/v2/rest/v2/LogEntry#logseverity + #[configurable(metadata(docs::examples = "severity"))] + pub(super) severity_key: Option, + + #[serde(flatten)] + pub(super) auth: GcpAuthConfig, + + #[configurable(derived)] + #[serde( + default, + skip_serializing_if = "crate::serde::skip_serializing_if_default" + )] + pub(super) encoding: Transformer, + + #[configurable(derived)] + #[serde(default)] + pub(super) batch: BatchConfig, + + #[configurable(derived)] + #[serde(default)] + pub(super) request: TowerRequestConfig, + + #[configurable(derived)] + pub(super) tls: Option, + + #[configurable(derived)] + #[serde( + default, + deserialize_with = "crate::serde::bool_or_struct", + skip_serializing_if = "crate::serde::skip_serializing_if_default" + )] + acknowledgements: AcknowledgementsConfig, +} + +pub(super) fn default_endpoint() -> String { + "https://logging.googleapis.com/v2/entries:write".to_string() +} + +// 10MB limit for entries.write: https://cloud.google.com/logging/quotas#api-limits +const MAX_BATCH_PAYLOAD_SIZE: usize = 10_000_000; + +/// Logging locations. +#[configurable_component] +#[derive(Clone, Debug, Derivative)] +#[derivative(Default)] +pub(super) enum StackdriverLogName { + /// The billing account ID to which to publish logs. + /// + /// Exactly one of `billing_account_id`, `folder_id`, `organization_id`, or `project_id` must be set. + #[serde(rename = "billing_account_id")] + #[configurable(metadata(docs::examples = "012345-6789AB-CDEF01"))] + BillingAccount(String), + + /// The folder ID to which to publish logs. + /// + /// See the [Google Cloud Platform folder documentation][folder_docs] for more details. + /// + /// Exactly one of `billing_account_id`, `folder_id`, `organization_id`, or `project_id` must be set. + /// + /// [folder_docs]: https://cloud.google.com/resource-manager/docs/creating-managing-folders + #[serde(rename = "folder_id")] + #[configurable(metadata(docs::examples = "My Folder"))] + Folder(String), + + /// The organization ID to which to publish logs. + /// + /// This would be the identifier assigned to your organization on Google Cloud Platform. + /// + /// Exactly one of `billing_account_id`, `folder_id`, `organization_id`, or `project_id` must be set. + #[serde(rename = "organization_id")] + #[configurable(metadata(docs::examples = "622418129737"))] + Organization(String), + + /// The project ID to which to publish logs. + /// + /// See the [Google Cloud Platform project management documentation][project_docs] for more details. + /// + /// Exactly one of `billing_account_id`, `folder_id`, `organization_id`, or `project_id` must be set. + /// + /// [project_docs]: https://cloud.google.com/resource-manager/docs/creating-managing-projects + #[derivative(Default)] + #[serde(rename = "project_id")] + #[configurable(metadata(docs::examples = "vector-123456"))] + Project(String), +} + +/// A monitored resource. +/// +/// Monitored resources in GCP allow associating logs and metrics specifically with native resources +/// within Google Cloud Platform. This takes the form of a "type" field which identifies the +/// resource, and a set of type-specific labels to uniquely identify a resource of that type. +/// +/// See [Monitored resource types][mon_docs] for more information. +/// +/// [mon_docs]: https://cloud.google.com/monitoring/api/resources +// TODO: this type is specific to the stackdrivers log sink because it allows for template-able +// label values, but we should consider replacing `sinks::gcp::GcpTypedResource` with this so both +// the stackdriver metrics _and_ logs sink can have template-able label values, and less duplication +#[configurable_component] +#[derive(Clone, Debug, Default)] +pub(super) struct StackdriverResource { + /// The monitored resource type. + /// + /// For example, the type of a Compute Engine VM instance is `gce_instance`. + /// See the [Google Cloud Platform monitored resource documentation][gcp_resources] for + /// more details. + /// + /// [gcp_resources]: https://cloud.google.com/monitoring/api/resources + #[serde(rename = "type")] + pub(super) type_: String, + + /// Type-specific labels. + #[serde(flatten)] + #[configurable(metadata(docs::additional_props_description = "A type-specific label."))] + #[configurable(metadata(docs::examples = "label_examples()"))] + pub(super) labels: HashMap, +} + +fn label_examples() -> HashMap { + let mut example = HashMap::new(); + example.insert("instanceId".to_string(), "Twilight".to_string()); + example.insert("zone".to_string(), "{{ zone }}".to_string()); + example +} + +impl_generate_config_from_default!(StackdriverConfig); + +#[async_trait::async_trait] +#[typetag::serde(name = "gcp_stackdriver_logs")] +impl SinkConfig for StackdriverConfig { + async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> { + let auth = self.auth.build(Scope::LoggingWrite).await?; + + let request_builder = StackdriverLogsRequestBuilder { + encoder: StackdriverLogsEncoder::new( + self.encoding.clone(), + self.log_id.clone(), + self.log_name.clone(), + self.resource.clone(), + self.severity_key.clone(), + ), + }; + + let batch_settings = self + .batch + .validate()? + .limit_max_bytes(MAX_BATCH_PAYLOAD_SIZE)? + .into_batcher_settings()?; + + let request_limits = self.request.unwrap_with( + &TowerRequestConfig::default() + .rate_limit_duration_secs(1) + .rate_limit_num(1000), + ); + + let tls_settings = TlsSettings::from_options(&self.tls)?; + let client = HttpClient::new(tls_settings, cx.proxy())?; + + let uri: Uri = self.endpoint.parse()?; + + let stackdriver_logs_service_request_builder = StackdriverLogsServiceRequestBuilder { + uri: uri.clone(), + auth: auth.clone(), + }; + + let service = HttpService::new(client.clone(), stackdriver_logs_service_request_builder); + + let service = ServiceBuilder::new() + .settings(request_limits, http_response_retry_logic()) + .service(service); + + let sink = StackdriverLogsSink::new(service, batch_settings, request_builder); + + let healthcheck = healthcheck(client, auth.clone(), uri).boxed(); + + auth.spawn_regenerate_token(); + + Ok((VectorSink::from_event_streamsink(sink), healthcheck)) + } + + fn input(&self) -> Input { + let requirement = + schema::Requirement::empty().required_meaning("timestamp", Kind::timestamp()); + + Input::log().with_schema_requirement(requirement) + } + + fn acknowledgements(&self) -> &AcknowledgementsConfig { + &self.acknowledgements + } +} + +async fn healthcheck(client: HttpClient, auth: GcpAuthenticator, uri: Uri) -> crate::Result<()> { + let entries: Vec = Vec::new(); + let events = serde_json::json!({ "entries": entries }); + + let body = crate::serde::json::to_bytes(&events).unwrap().freeze(); + + let mut request = Request::post(uri) + .header("Content-Type", "application/json") + .body(body) + .unwrap(); + + auth.apply(&mut request); + + let request = request.map(Body::from); + + let response = client.send(request).await?; + + healthcheck_response(response, HealthcheckError::NotFound.into()) +} diff --git a/src/sinks/gcp/stackdriver/logs/encoder.rs b/src/sinks/gcp/stackdriver/logs/encoder.rs new file mode 100644 index 0000000000000..00888cf35997f --- /dev/null +++ b/src/sinks/gcp/stackdriver/logs/encoder.rs @@ -0,0 +1,188 @@ +//! Encoding for the `gcp_stackdriver_logs` sink. + +use std::{collections::HashMap, io}; + +use bytes::BytesMut; +use lookup::lookup_v2::ConfigValuePath; +use serde_json::{json, to_vec, Map}; +use vrl::path::PathPrefix; + +use crate::{ + sinks::{prelude::*, util::encoding::Encoder as SinkEncoder}, + template::TemplateRenderingError, +}; + +use super::config::{StackdriverLogName, StackdriverResource}; + +#[derive(Clone, Debug)] +pub(super) struct StackdriverLogsEncoder { + transformer: Transformer, + log_id: Template, + log_name: StackdriverLogName, + resource: StackdriverResource, + severity_key: Option, +} + +impl StackdriverLogsEncoder { + /// Creates a new `StackdriverLogsEncoder`. + pub(super) const fn new( + transformer: Transformer, + log_id: Template, + log_name: StackdriverLogName, + resource: StackdriverResource, + severity_key: Option, + ) -> Self { + Self { + transformer, + log_id, + log_name, + resource, + severity_key, + } + } + + pub(super) fn encode_event(&self, event: Event) -> Option { + let mut labels = HashMap::with_capacity(self.resource.labels.len()); + for (key, template) in &self.resource.labels { + let value = template + .render_string(&event) + .map_err(|error| { + emit!(crate::internal_events::TemplateRenderingError { + error, + field: Some("resource.labels"), + drop_event: true, + }); + }) + .ok()?; + labels.insert(key.clone(), value); + } + let log_name = self + .log_name(&event) + .map_err(|error| { + emit!(crate::internal_events::TemplateRenderingError { + error, + field: Some("log_id"), + drop_event: true, + }); + }) + .ok()?; + + let mut log = event.into_log(); + let severity = self + .severity_key + .as_ref() + .and_then(|key| log.remove((PathPrefix::Event, &key.0))) + .map(remap_severity) + .unwrap_or_else(|| 0.into()); + + let mut event = Event::Log(log); + self.transformer.transform(&mut event); + + let log = event.into_log(); + + let mut entry = Map::with_capacity(5); + entry.insert("logName".into(), json!(log_name)); + entry.insert("jsonPayload".into(), json!(log)); + entry.insert("severity".into(), json!(severity)); + entry.insert( + "resource".into(), + json!({ + "type": self.resource.type_, + "labels": labels, + }), + ); + + // If the event contains a timestamp, send it in the main message so gcp can pick it up. + if let Some(timestamp) = log.get_timestamp() { + entry.insert("timestamp".into(), json!(timestamp)); + } + + Some(json!(entry)) + } + + fn log_name(&self, event: &Event) -> Result { + use StackdriverLogName::*; + + let log_id = self.log_id.render_string(event)?; + + Ok(match &self.log_name { + BillingAccount(acct) => format!("billingAccounts/{}/logs/{}", acct, log_id), + Folder(folder) => format!("folders/{}/logs/{}", folder, log_id), + Organization(org) => format!("organizations/{}/logs/{}", org, log_id), + Project(project) => format!("projects/{}/logs/{}", project, log_id), + }) + } +} + +pub(super) fn remap_severity(severity: Value) -> Value { + let n = match severity { + Value::Integer(n) => n - n % 100, + Value::Bytes(s) => { + let s = String::from_utf8_lossy(&s); + match s.parse::() { + Ok(n) => (n - n % 100) as i64, + Err(_) => match s.to_uppercase() { + s if s.starts_with("EMERG") || s.starts_with("FATAL") => 800, + s if s.starts_with("ALERT") => 700, + s if s.starts_with("CRIT") => 600, + s if s.starts_with("ERR") || s == "ER" => 500, + s if s.starts_with("WARN") => 400, + s if s.starts_with("NOTICE") => 300, + s if s.starts_with("INFO") => 200, + s if s.starts_with("DEBUG") || s.starts_with("TRACE") => 100, + s if s.starts_with("DEFAULT") => 0, + _ => { + warn!( + message = "Unknown severity value string, using DEFAULT.", + value = %s, + internal_log_rate_limit = true + ); + 0 + } + }, + } + } + value => { + warn!( + message = "Unknown severity value type, using DEFAULT.", + ?value, + internal_log_rate_limit = true + ); + 0 + } + }; + Value::Integer(n) +} + +impl SinkEncoder> for StackdriverLogsEncoder { + fn encode_input( + &self, + events: Vec, + writer: &mut dyn io::Write, + ) -> io::Result<(usize, GroupedCountByteSize)> { + let mut byte_size = telemetry().create_request_count_byte_size(); + let mut n_events = events.len(); + let mut body = BytesMut::new(); + + let mut entries = Vec::with_capacity(n_events); + for event in &events { + let size = event.estimated_json_encoded_size_of(); + if let Some(data) = self.encode_event(event.clone()) { + byte_size.add_event(event, size); + entries.push(data) + } else { + // encode_event() emits the `TemplateRenderingError` internal event, + // which emits an `EventsDropped`, so no need to here. + n_events -= 1; + } + } + + let events = json!({ "entries": entries }); + + body.extend(&to_vec(&events)?); + + let body = body.freeze(); + + write_all(writer, n_events, body.as_ref()).map(|()| (body.len(), byte_size)) + } +} diff --git a/src/sinks/gcp/stackdriver/logs/mod.rs b/src/sinks/gcp/stackdriver/logs/mod.rs new file mode 100644 index 0000000000000..5f5ca2cf03ae5 --- /dev/null +++ b/src/sinks/gcp/stackdriver/logs/mod.rs @@ -0,0 +1,14 @@ +//! The GCP Stackdriver Logs [`vector_core::sink::VectorSink`]. +//! +//! This module contains the [`vector_core::sink::VectorSink`] instance that is responsible for +//! taking a stream of [`vector_core::event::Event`]s and forwarding them to the GCP +//! Stackdriver service. + +mod config; +mod encoder; +mod request_builder; +mod service; +mod sink; + +#[cfg(test)] +mod tests; diff --git a/src/sinks/gcp/stackdriver/logs/request_builder.rs b/src/sinks/gcp/stackdriver/logs/request_builder.rs new file mode 100644 index 0000000000000..25de168cbe3ac --- /dev/null +++ b/src/sinks/gcp/stackdriver/logs/request_builder.rs @@ -0,0 +1,47 @@ +//! `RequestBuilder` implementation for the `gcp_stackdriver_logs` sink. + +use bytes::Bytes; +use std::io; + +use crate::sinks::{prelude::*, util::http::HttpRequest}; + +use super::encoder::StackdriverLogsEncoder; + +pub(super) struct StackdriverLogsRequestBuilder { + pub(super) encoder: StackdriverLogsEncoder, +} + +impl RequestBuilder> for StackdriverLogsRequestBuilder { + type Metadata = EventFinalizers; + type Events = Vec; + type Encoder = StackdriverLogsEncoder; + type Payload = Bytes; + type Request = HttpRequest; + type Error = io::Error; + + fn compression(&self) -> Compression { + Compression::None + } + + fn encoder(&self) -> &Self::Encoder { + &self.encoder + } + + fn split_input( + &self, + mut events: Vec, + ) -> (Self::Metadata, RequestMetadataBuilder, Self::Events) { + let finalizers = events.take_finalizers(); + let builder = RequestMetadataBuilder::from_events(&events); + (finalizers, builder, events) + } + + fn build_request( + &self, + metadata: Self::Metadata, + request_metadata: RequestMetadata, + payload: EncodeResult, + ) -> Self::Request { + HttpRequest::new(payload.into_payload(), metadata, request_metadata) + } +} diff --git a/src/sinks/gcp/stackdriver/logs/service.rs b/src/sinks/gcp/stackdriver/logs/service.rs new file mode 100644 index 0000000000000..ee0a3d86e223f --- /dev/null +++ b/src/sinks/gcp/stackdriver/logs/service.rs @@ -0,0 +1,25 @@ +//! Service implementation for the `gcp_stackdriver_logs` sink. + +use bytes::Bytes; +use http::{Request, Uri}; + +use crate::{gcp::GcpAuthenticator, sinks::util::http::HttpServiceRequestBuilder}; + +#[derive(Debug, Clone)] +pub(super) struct StackdriverLogsServiceRequestBuilder { + pub(super) uri: Uri, + pub(super) auth: GcpAuthenticator, +} + +impl HttpServiceRequestBuilder for StackdriverLogsServiceRequestBuilder { + fn build(&self, body: Bytes) -> Request { + let mut request = Request::post(self.uri.clone()) + .header("Content-Type", "application/json") + .body(body) + .unwrap(); + + self.auth.apply(&mut request); + + request + } +} diff --git a/src/sinks/gcp/stackdriver/logs/sink.rs b/src/sinks/gcp/stackdriver/logs/sink.rs new file mode 100644 index 0000000000000..c15ed0f342ce6 --- /dev/null +++ b/src/sinks/gcp/stackdriver/logs/sink.rs @@ -0,0 +1,77 @@ +//! Implementation of the `gcp_stackdriver_logs` sink. + +use crate::sinks::{ + prelude::*, + util::http::{HttpJsonBatchSizer, HttpRequest}, +}; + +use super::request_builder::StackdriverLogsRequestBuilder; + +pub(super) struct StackdriverLogsSink { + service: S, + batch_settings: BatcherSettings, + request_builder: StackdriverLogsRequestBuilder, +} + +impl StackdriverLogsSink +where + S: Service + Send + 'static, + S::Future: Send + 'static, + S::Response: DriverResponse + Send + 'static, + S::Error: std::fmt::Debug + Into + Send, +{ + /// Creates a new `StackdriverLogsSink`. + pub(super) const fn new( + service: S, + batch_settings: BatcherSettings, + request_builder: StackdriverLogsRequestBuilder, + ) -> Self { + Self { + service, + batch_settings, + request_builder, + } + } + + async fn run_inner(self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { + input + // Batch the input stream with size calculation based on the estimated encoded json size + .batched( + self.batch_settings + .into_item_size_config(HttpJsonBatchSizer), + ) + // Build requests with no concurrency limit. + .request_builder(None, self.request_builder) + // Filter out any errors that occurred in the request building. + .filter_map(|request| async move { + match request { + Err(error) => { + emit!(SinkRequestBuildError { error }); + None + } + Ok(req) => Some(req), + } + }) + // Generate the driver that will send requests and handle retries, + // event finalization, and logging/internal metric reporting. + .into_driver(self.service) + .run() + .await + } +} + +#[async_trait::async_trait] +impl StreamSink for StackdriverLogsSink +where + S: Service + Send + 'static, + S::Future: Send + 'static, + S::Response: DriverResponse + Send + 'static, + S::Error: std::fmt::Debug + Into + Send, +{ + async fn run( + self: Box, + input: futures_util::stream::BoxStream<'_, Event>, + ) -> Result<(), ()> { + self.run_inner(input).await + } +} diff --git a/src/sinks/gcp/stackdriver/logs/tests.rs b/src/sinks/gcp/stackdriver/logs/tests.rs new file mode 100644 index 0000000000000..ee56c7d1c5424 --- /dev/null +++ b/src/sinks/gcp/stackdriver/logs/tests.rs @@ -0,0 +1,297 @@ +//! Unit tests for the `gcp_stackdriver_logs` sink. + +use bytes::Bytes; +use chrono::{TimeZone, Utc}; +use futures::{future::ready, stream}; +use http::Uri; +use indoc::indoc; +use lookup::lookup_v2::ConfigValuePath; +use serde::Deserialize; +use std::collections::HashMap; + +use crate::{ + config::{GenerateConfig, SinkConfig, SinkContext}, + event::{LogEvent, Value}, + gcp::GcpAuthenticator, + sinks::{ + gcp::stackdriver::logs::{ + config::StackdriverLogName, encoder::remap_severity, + service::StackdriverLogsServiceRequestBuilder, + }, + prelude::*, + util::{encoding::Encoder as _, http::HttpServiceRequestBuilder}, + }, + test_util::{ + components::{run_and_assert_sink_compliance, HTTP_SINK_TAGS}, + http::{always_200_response, spawn_blackhole_http_server}, + }, +}; + +use super::{ + config::{default_endpoint, StackdriverConfig, StackdriverResource}, + encoder::StackdriverLogsEncoder, +}; + +#[test] +fn generate_config() { + crate::test_util::test_generate_config::(); +} + +#[tokio::test] +async fn component_spec_compliance() { + let mock_endpoint = spawn_blackhole_http_server(always_200_response).await; + + let config = StackdriverConfig::generate_config().to_string(); + let mut config = StackdriverConfig::deserialize(toml::de::ValueDeserializer::new(&config)) + .expect("config should be valid"); + + // If we don't override the credentials path/API key, it tries to directly call out to the Google Instance + // Metadata API, which we clearly don't have in unit tests. :) + config.auth.credentials_path = None; + config.auth.api_key = Some("fake".to_string().into()); + config.endpoint = mock_endpoint.to_string(); + + let context = SinkContext::default(); + let (sink, _healthcheck) = config.build(context).await.unwrap(); + + let event = Event::Log(LogEvent::from("simple message")); + run_and_assert_sink_compliance(sink, stream::once(ready(event)), &HTTP_SINK_TAGS).await; +} + +#[test] +fn encode_valid() { + let mut transformer = Transformer::default(); + transformer + .set_except_fields(Some(vec![ + "anumber".into(), + "node_id".into(), + "log_id".into(), + ])) + .unwrap(); + + let encoder = StackdriverLogsEncoder::new( + transformer, + Template::try_from("{{ log_id }}").unwrap(), + StackdriverLogName::Project("project".to_owned()), + StackdriverResource { + type_: "generic_node".to_owned(), + labels: HashMap::from([ + ( + "namespace".to_owned(), + Template::try_from("office").unwrap(), + ), + ( + "node_id".to_owned(), + Template::try_from("{{ node_id }}").unwrap(), + ), + ]), + }, + Some(ConfigValuePath::try_from("anumber".to_owned()).unwrap()), + ); + + let log = [ + ("message", "hello world"), + ("anumber", "100"), + ("node_id", "10.10.10.1"), + ("log_id", "testlogs"), + ] + .iter() + .copied() + .collect::(); + let json = encoder.encode_event(Event::from(log)).unwrap(); + assert_eq!( + json, + serde_json::json!({ + "logName":"projects/project/logs/testlogs", + "jsonPayload":{"message":"hello world"}, + "severity":100, + "resource":{ + "type":"generic_node", + "labels":{"namespace":"office","node_id":"10.10.10.1"} + } + }) + ); +} + +#[test] +fn encode_inserts_timestamp() { + let transformer = Transformer::default(); + + let encoder = StackdriverLogsEncoder::new( + transformer, + Template::try_from("testlogs").unwrap(), + StackdriverLogName::Project("project".to_owned()), + StackdriverResource { + type_: "generic_node".to_owned(), + labels: HashMap::from([( + "namespace".to_owned(), + Template::try_from("office").unwrap(), + )]), + }, + Some(ConfigValuePath::try_from("anumber".to_owned()).unwrap()), + ); + + let mut log = LogEvent::default(); + log.insert("message", Value::Bytes("hello world".into())); + log.insert("anumber", Value::Bytes("100".into())); + log.insert( + "timestamp", + Value::Timestamp( + Utc.with_ymd_and_hms(2020, 1, 1, 12, 30, 0) + .single() + .expect("invalid timestamp"), + ), + ); + + let json = encoder.encode_event(Event::from(log)).unwrap(); + assert_eq!( + json, + serde_json::json!({ + "logName":"projects/project/logs/testlogs", + "jsonPayload":{"message":"hello world","timestamp":"2020-01-01T12:30:00Z"}, + "severity":100, + "resource":{ + "type":"generic_node", + "labels":{"namespace":"office"}}, + "timestamp":"2020-01-01T12:30:00Z" + }) + ); +} + +#[test] +fn severity_remaps_strings() { + for &(s, n) in &[ + ("EMERGENCY", 800), // Handles full upper case + ("EMERG", 800), // Handles abbreviations + ("FATAL", 800), // Handles highest alternate + ("alert", 700), // Handles lower case + ("CrIt1c", 600), // Handles mixed case and suffixes + ("err404", 500), // Handles lower case and suffixes + ("warnings", 400), + ("notice", 300), + ("info", 200), + ("DEBUG2", 100), // Handles upper case and suffixes + ("trace", 100), // Handles lowest alternate + ("nothing", 0), // Maps unknown terms to DEFAULT + ("123", 100), // Handles numbers in strings + ("-100", 0), // Maps negatives to DEFAULT + ] { + assert_eq!( + remap_severity(s.into()), + Value::Integer(n), + "remap_severity({:?}) != {}", + s, + n + ); + } +} + +#[tokio::test] +async fn correct_request() { + let uri: Uri = default_endpoint().parse().unwrap(); + + let transformer = Transformer::default(); + let encoder = StackdriverLogsEncoder::new( + transformer, + Template::try_from("testlogs").unwrap(), + StackdriverLogName::Project("project".to_owned()), + StackdriverResource { + type_: "generic_node".to_owned(), + labels: HashMap::from([( + "namespace".to_owned(), + Template::try_from("office").unwrap(), + )]), + }, + None, + ); + + let log1 = [("message", "hello")].iter().copied().collect::(); + let log2 = [("message", "world")].iter().copied().collect::(); + + let events = vec![Event::from(log1), Event::from(log2)]; + + let mut writer = Vec::new(); + let (_, _) = encoder.encode_input(events, &mut writer).unwrap(); + + let body = Bytes::copy_from_slice(&writer); + + let stackdriver_logs_service_request_builder = StackdriverLogsServiceRequestBuilder { + uri: uri.clone(), + auth: GcpAuthenticator::None, + }; + + let request = stackdriver_logs_service_request_builder.build(body); + let (parts, body) = request.into_parts(); + let json: serde_json::Value = serde_json::from_slice(&body[..]).unwrap(); + + assert_eq!( + &parts.uri.to_string(), + "https://logging.googleapis.com/v2/entries:write" + ); + assert_eq!( + json, + serde_json::json!({ + "entries": [ + { + "logName": "projects/project/logs/testlogs", + "severity": 0, + "jsonPayload": { + "message": "hello" + }, + "resource": { + "type": "generic_node", + "labels": { + "namespace": "office" + } + } + }, + { + "logName": "projects/project/logs/testlogs", + "severity": 0, + "jsonPayload": { + "message": "world" + }, + "resource": { + "type": "generic_node", + "labels": { + "namespace": "office" + } + } + } + ] + }) + ); +} + +#[tokio::test] +async fn fails_missing_creds() { + let config: StackdriverConfig = toml::from_str(indoc! {r#" + project_id = "project" + log_id = "testlogs" + resource.type = "generic_node" + resource.namespace = "office" + "#}) + .unwrap(); + if config.build(SinkContext::default()).await.is_ok() { + panic!("config.build failed to error"); + } +} + +#[test] +fn fails_invalid_log_names() { + toml::from_str::(indoc! {r#" + log_id = "testlogs" + resource.type = "generic_node" + resource.namespace = "office" + "#}) + .expect_err("Config parsing failed to error with missing ids"); + + toml::from_str::(indoc! {r#" + project_id = "project" + folder_id = "folder" + log_id = "testlogs" + resource.type = "generic_node" + resource.namespace = "office" + "#}) + .expect_err("Config parsing failed to error with extraneous ids"); +} diff --git a/src/sinks/gcp/stackdriver/mod.rs b/src/sinks/gcp/stackdriver/mod.rs new file mode 100644 index 0000000000000..dff3ee587ba14 --- /dev/null +++ b/src/sinks/gcp/stackdriver/mod.rs @@ -0,0 +1 @@ +mod logs; diff --git a/src/sinks/gcp/stackdriver_logs.rs b/src/sinks/gcp/stackdriver_logs.rs deleted file mode 100644 index 6cf277405fafd..0000000000000 --- a/src/sinks/gcp/stackdriver_logs.rs +++ /dev/null @@ -1,687 +0,0 @@ -use std::collections::HashMap; - -use bytes::Bytes; -use futures::{FutureExt, SinkExt}; -use http::{Request, Uri}; -use hyper::Body; -use lookup::lookup_v2::ConfigValuePath; -use serde_json::{json, map}; -use snafu::Snafu; -use vector_config::configurable_component; -use vrl::path::PathPrefix; -use vrl::value::Kind; - -use crate::{ - codecs::Transformer, - config::{AcknowledgementsConfig, Input, SinkConfig, SinkContext}, - event::{Event, Value}, - gcp::{GcpAuthConfig, GcpAuthenticator, Scope}, - http::HttpClient, - schema, - sinks::{ - gcs_common::config::healthcheck_response, - util::{ - http::{BatchedHttpSink, HttpEventEncoder, HttpSink}, - BatchConfig, BoxedRawValue, JsonArrayBuffer, RealtimeSizeBasedDefaultBatchSettings, - TowerRequestConfig, - }, - Healthcheck, VectorSink, - }, - template::{Template, TemplateRenderingError}, - tls::{TlsConfig, TlsSettings}, -}; - -#[derive(Debug, Snafu)] -enum HealthcheckError { - #[snafu(display("Resource not found"))] - NotFound, -} - -/// Configuration for the `gcp_stackdriver_logs` sink. -#[configurable_component(sink( - "gcp_stackdriver_logs", - "Deliver logs to GCP's Cloud Operations suite." -))] -#[derive(Clone, Debug, Default)] -#[serde(deny_unknown_fields)] -pub struct StackdriverConfig { - #[serde(skip, default = "default_endpoint")] - endpoint: String, - - #[serde(flatten)] - pub log_name: StackdriverLogName, - - /// The log ID to which to publish logs. - /// - /// This is a name you create to identify this log stream. - pub log_id: Template, - - /// The monitored resource to associate the logs with. - pub resource: StackdriverResource, - - /// The field of the log event from which to take the outgoing log’s `severity` field. - /// - /// The named field is removed from the log event if present, and must be either an integer - /// between 0 and 800 or a string containing one of the [severity level names][sev_names] (case - /// is ignored) or a common prefix such as `err`. - /// - /// If no severity key is specified, the severity of outgoing records is set to 0 (`DEFAULT`). - /// - /// See the [GCP Stackdriver Logging LogSeverity description][logsev_docs] for more details on - /// the value of the `severity` field. - /// - /// [sev_names]: https://cloud.google.com/logging/docs/reference/v2/rest/v2/LogEntry#logseverity - /// [logsev_docs]: https://cloud.google.com/logging/docs/reference/v2/rest/v2/LogEntry#logseverity - #[configurable(metadata(docs::examples = "severity"))] - pub severity_key: Option, - - #[serde(flatten)] - pub auth: GcpAuthConfig, - - #[configurable(derived)] - #[serde( - default, - skip_serializing_if = "crate::serde::skip_serializing_if_default" - )] - pub encoding: Transformer, - - #[configurable(derived)] - #[serde(default)] - pub batch: BatchConfig, - - #[configurable(derived)] - #[serde(default)] - pub request: TowerRequestConfig, - - #[configurable(derived)] - pub tls: Option, - - #[configurable(derived)] - #[serde( - default, - deserialize_with = "crate::serde::bool_or_struct", - skip_serializing_if = "crate::serde::skip_serializing_if_default" - )] - acknowledgements: AcknowledgementsConfig, -} - -fn default_endpoint() -> String { - "https://logging.googleapis.com/v2/entries:write".to_string() -} - -#[derive(Clone, Debug)] -struct StackdriverSink { - config: StackdriverConfig, - auth: GcpAuthenticator, - severity_key: Option, - uri: Uri, -} - -// 10MB limit for entries.write: https://cloud.google.com/logging/quotas#api-limits -const MAX_BATCH_PAYLOAD_SIZE: usize = 10_000_000; - -/// Logging locations. -#[configurable_component] -#[derive(Clone, Debug, Derivative)] -#[derivative(Default)] -pub enum StackdriverLogName { - /// The billing account ID to which to publish logs. - /// - /// Exactly one of `billing_account_id`, `folder_id`, `organization_id`, or `project_id` must be set. - #[serde(rename = "billing_account_id")] - #[configurable(metadata(docs::examples = "012345-6789AB-CDEF01"))] - BillingAccount(String), - - /// The folder ID to which to publish logs. - /// - /// See the [Google Cloud Platform folder documentation][folder_docs] for more details. - /// - /// Exactly one of `billing_account_id`, `folder_id`, `organization_id`, or `project_id` must be set. - /// - /// [folder_docs]: https://cloud.google.com/resource-manager/docs/creating-managing-folders - #[serde(rename = "folder_id")] - #[configurable(metadata(docs::examples = "My Folder"))] - Folder(String), - - /// The organization ID to which to publish logs. - /// - /// This would be the identifier assigned to your organization on Google Cloud Platform. - /// - /// Exactly one of `billing_account_id`, `folder_id`, `organization_id`, or `project_id` must be set. - #[serde(rename = "organization_id")] - #[configurable(metadata(docs::examples = "622418129737"))] - Organization(String), - - /// The project ID to which to publish logs. - /// - /// See the [Google Cloud Platform project management documentation][project_docs] for more details. - /// - /// Exactly one of `billing_account_id`, `folder_id`, `organization_id`, or `project_id` must be set. - /// - /// [project_docs]: https://cloud.google.com/resource-manager/docs/creating-managing-projects - #[derivative(Default)] - #[serde(rename = "project_id")] - #[configurable(metadata(docs::examples = "vector-123456"))] - Project(String), -} - -/// A monitored resource. -/// -/// Monitored resources in GCP allow associating logs and metrics specifically with native resources -/// within Google Cloud Platform. This takes the form of a "type" field which identifies the -/// resource, and a set of type-specific labels to uniquely identify a resource of that type. -/// -/// See [Monitored resource types][mon_docs] for more information. -/// -/// [mon_docs]: https://cloud.google.com/monitoring/api/resources -// TODO: this type is specific to the stackdrivers log sink because it allows for template-able -// label values, but we should consider replacing `sinks::gcp::GcpTypedResource` with this so both -// the stackdriver metrics _and_ logs sink can have template-able label values, and less duplication -#[configurable_component] -#[derive(Clone, Debug, Default)] -pub struct StackdriverResource { - /// The monitored resource type. - /// - /// For example, the type of a Compute Engine VM instance is `gce_instance`. - /// See the [Google Cloud Platform monitored resource documentation][gcp_resources] for - /// more details. - /// - /// [gcp_resources]: https://cloud.google.com/monitoring/api/resources - #[serde(rename = "type")] - pub type_: String, - - /// Type-specific labels. - #[serde(flatten)] - #[configurable(metadata(docs::additional_props_description = "A type-specific label."))] - #[configurable(metadata(docs::examples = "label_examples()"))] - pub labels: HashMap, -} - -fn label_examples() -> HashMap { - let mut example = HashMap::new(); - example.insert("instanceId".to_string(), "Twilight".to_string()); - example.insert("zone".to_string(), "{{ zone }}".to_string()); - example -} - -impl_generate_config_from_default!(StackdriverConfig); - -#[async_trait::async_trait] -#[typetag::serde(name = "gcp_stackdriver_logs")] -impl SinkConfig for StackdriverConfig { - async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> { - let auth = self.auth.build(Scope::LoggingWrite).await?; - - let batch = self - .batch - .validate()? - .limit_max_bytes(MAX_BATCH_PAYLOAD_SIZE)? - .into_batch_settings()?; - let request = self.request.unwrap_with( - &TowerRequestConfig::default() - .rate_limit_duration_secs(1) - .rate_limit_num(1000), - ); - let tls_settings = TlsSettings::from_options(&self.tls)?; - let client = HttpClient::new(tls_settings, cx.proxy())?; - - let sink = StackdriverSink { - config: self.clone(), - auth: auth.clone(), - severity_key: self.severity_key.clone(), - uri: self.endpoint.parse().unwrap(), - }; - - let healthcheck = healthcheck(client.clone(), sink.clone()).boxed(); - auth.spawn_regenerate_token(); - let sink = BatchedHttpSink::new( - sink, - JsonArrayBuffer::new(batch.size), - request, - batch.timeout, - client, - ) - .sink_map_err(|error| error!(message = "Fatal gcp_stackdriver_logs sink error.", %error)); - - #[allow(deprecated)] - Ok((VectorSink::from_event_sink(sink), healthcheck)) - } - - fn input(&self) -> Input { - let requirement = - schema::Requirement::empty().required_meaning("timestamp", Kind::timestamp()); - - Input::log().with_schema_requirement(requirement) - } - - fn acknowledgements(&self) -> &AcknowledgementsConfig { - &self.acknowledgements - } -} - -struct StackdriverEventEncoder { - config: StackdriverConfig, - severity_key: Option, -} - -impl HttpEventEncoder for StackdriverEventEncoder { - fn encode_event(&mut self, event: Event) -> Option { - let mut labels = HashMap::with_capacity(self.config.resource.labels.len()); - for (key, template) in &self.config.resource.labels { - let value = template - .render_string(&event) - .map_err(|error| { - emit!(crate::internal_events::TemplateRenderingError { - error, - field: Some("resource.labels"), - drop_event: true, - }); - }) - .ok()?; - labels.insert(key.clone(), value); - } - let log_name = self - .config - .log_name(&event) - .map_err(|error| { - emit!(crate::internal_events::TemplateRenderingError { - error, - field: Some("log_id"), - drop_event: true, - }); - }) - .ok()?; - - let mut log = event.into_log(); - let severity = self - .severity_key - .as_ref() - .and_then(|key| log.remove((PathPrefix::Event, &key.0))) - .map(remap_severity) - .unwrap_or_else(|| 0.into()); - - let mut event = Event::Log(log); - self.config.encoding.transform(&mut event); - - let log = event.into_log(); - - let mut entry = map::Map::with_capacity(5); - entry.insert("logName".into(), json!(log_name)); - entry.insert("jsonPayload".into(), json!(log)); - entry.insert("severity".into(), json!(severity)); - entry.insert( - "resource".into(), - json!({ - "type": self.config.resource.type_, - "labels": labels, - }), - ); - - // If the event contains a timestamp, send it in the main message so gcp can pick it up. - if let Some(timestamp) = log.get_timestamp() { - entry.insert("timestamp".into(), json!(timestamp)); - } - - Some(json!(entry)) - } -} - -#[async_trait::async_trait] -impl HttpSink for StackdriverSink { - type Input = serde_json::Value; - type Output = Vec; - type Encoder = StackdriverEventEncoder; - - fn build_encoder(&self) -> Self::Encoder { - StackdriverEventEncoder { - config: self.config.clone(), - severity_key: self.severity_key.clone(), - } - } - - async fn build_request(&self, events: Self::Output) -> crate::Result> { - let events = serde_json::json!({ "entries": events }); - - let body = crate::serde::json::to_bytes(&events).unwrap().freeze(); - - let mut request = Request::post(self.uri.clone()) - .header("Content-Type", "application/json") - .body(body) - .unwrap(); - self.auth.apply(&mut request); - - Ok(request) - } -} - -fn remap_severity(severity: Value) -> Value { - let n = match severity { - Value::Integer(n) => n - n % 100, - Value::Bytes(s) => { - let s = String::from_utf8_lossy(&s); - match s.parse::() { - Ok(n) => (n - n % 100) as i64, - Err(_) => match s.to_uppercase() { - s if s.starts_with("EMERG") || s.starts_with("FATAL") => 800, - s if s.starts_with("ALERT") => 700, - s if s.starts_with("CRIT") => 600, - s if s.starts_with("ERR") || s == "ER" => 500, - s if s.starts_with("WARN") => 400, - s if s.starts_with("NOTICE") => 300, - s if s.starts_with("INFO") => 200, - s if s.starts_with("DEBUG") || s.starts_with("TRACE") => 100, - s if s.starts_with("DEFAULT") => 0, - _ => { - warn!( - message = "Unknown severity value string, using DEFAULT.", - value = %s, - internal_log_rate_limit = true - ); - 0 - } - }, - } - } - value => { - warn!( - message = "Unknown severity value type, using DEFAULT.", - ?value, - internal_log_rate_limit = true - ); - 0 - } - }; - Value::Integer(n) -} - -async fn healthcheck(client: HttpClient, sink: StackdriverSink) -> crate::Result<()> { - let request = sink.build_request(vec![]).await?.map(Body::from); - - let response = client.send(request).await?; - healthcheck_response(response, HealthcheckError::NotFound.into()) -} - -impl StackdriverConfig { - fn log_name(&self, event: &Event) -> Result { - use StackdriverLogName::*; - - let log_id = self.log_id.render_string(event)?; - - Ok(match &self.log_name { - BillingAccount(acct) => format!("billingAccounts/{}/logs/{}", acct, log_id), - Folder(folder) => format!("folders/{}/logs/{}", folder, log_id), - Organization(org) => format!("organizations/{}/logs/{}", org, log_id), - Project(project) => format!("projects/{}/logs/{}", project, log_id), - }) - } -} - -#[cfg(test)] -mod tests { - use chrono::{TimeZone, Utc}; - use futures::{future::ready, stream}; - use indoc::indoc; - use serde::Deserialize; - use serde_json::value::RawValue; - - use super::*; - use crate::{ - config::{GenerateConfig, SinkConfig, SinkContext}, - event::{LogEvent, Value}, - test_util::{ - components::{run_and_assert_sink_compliance, SINK_TAGS}, - http::{always_200_response, spawn_blackhole_http_server}, - }, - }; - - #[test] - fn generate_config() { - crate::test_util::test_generate_config::(); - } - - #[tokio::test] - async fn component_spec_compliance() { - let mock_endpoint = spawn_blackhole_http_server(always_200_response).await; - - let config = StackdriverConfig::generate_config().to_string(); - let mut config = StackdriverConfig::deserialize(toml::de::ValueDeserializer::new(&config)) - .expect("config should be valid"); - - // If we don't override the credentials path/API key, it tries to directly call out to the Google Instance - // Metadata API, which we clearly don't have in unit tests. :) - config.auth.credentials_path = None; - config.auth.api_key = Some("fake".to_string().into()); - config.endpoint = mock_endpoint.to_string(); - - let context = SinkContext::default(); - let (sink, _healthcheck) = config.build(context).await.unwrap(); - - let event = Event::Log(LogEvent::from("simple message")); - run_and_assert_sink_compliance(sink, stream::once(ready(event)), &SINK_TAGS).await; - } - - #[test] - fn encode_valid() { - let config: StackdriverConfig = toml::from_str(indoc! {r#" - project_id = "project" - log_id = "{{ log_id }}" - resource.type = "generic_node" - resource.namespace = "office" - resource.node_id = "{{ node_id }}" - encoding.except_fields = ["anumber", "node_id", "log_id"] - "#}) - .unwrap(); - - let sink = StackdriverSink { - config, - auth: GcpAuthenticator::None, - severity_key: Some("anumber".into()), - uri: default_endpoint().parse().unwrap(), - }; - let mut encoder = sink.build_encoder(); - - let log = [ - ("message", "hello world"), - ("anumber", "100"), - ("node_id", "10.10.10.1"), - ("log_id", "testlogs"), - ] - .iter() - .copied() - .collect::(); - let json = encoder.encode_event(Event::from(log)).unwrap(); - assert_eq!( - json, - serde_json::json!({ - "logName":"projects/project/logs/testlogs", - "jsonPayload":{"message":"hello world"}, - "severity":100, - "resource":{ - "type":"generic_node", - "labels":{"namespace":"office","node_id":"10.10.10.1"} - } - }) - ); - } - - #[test] - fn encode_inserts_timestamp() { - let config: StackdriverConfig = toml::from_str(indoc! {r#" - project_id = "project" - log_id = "testlogs" - resource.type = "generic_node" - resource.namespace = "office" - "#}) - .unwrap(); - - let sink = StackdriverSink { - config, - auth: GcpAuthenticator::None, - severity_key: Some("anumber".into()), - uri: default_endpoint().parse().unwrap(), - }; - let mut encoder = sink.build_encoder(); - - let mut log = LogEvent::default(); - log.insert("message", Value::Bytes("hello world".into())); - log.insert("anumber", Value::Bytes("100".into())); - log.insert( - "timestamp", - Value::Timestamp( - Utc.with_ymd_and_hms(2020, 1, 1, 12, 30, 0) - .single() - .expect("invalid timestamp"), - ), - ); - - let json = encoder.encode_event(Event::from(log)).unwrap(); - assert_eq!( - json, - serde_json::json!({ - "logName":"projects/project/logs/testlogs", - "jsonPayload":{"message":"hello world","timestamp":"2020-01-01T12:30:00Z"}, - "severity":100, - "resource":{ - "type":"generic_node", - "labels":{"namespace":"office"}}, - "timestamp":"2020-01-01T12:30:00Z" - }) - ); - } - - #[test] - fn severity_remaps_strings() { - for &(s, n) in &[ - ("EMERGENCY", 800), // Handles full upper case - ("EMERG", 800), // Handles abbreviations - ("FATAL", 800), // Handles highest alternate - ("alert", 700), // Handles lower case - ("CrIt1c", 600), // Handles mixed case and suffixes - ("err404", 500), // Handles lower case and suffixes - ("warnings", 400), - ("notice", 300), - ("info", 200), - ("DEBUG2", 100), // Handles upper case and suffixes - ("trace", 100), // Handles lowest alternate - ("nothing", 0), // Maps unknown terms to DEFAULT - ("123", 100), // Handles numbers in strings - ("-100", 0), // Maps negatives to DEFAULT - ] { - assert_eq!( - remap_severity(s.into()), - Value::Integer(n), - "remap_severity({:?}) != {}", - s, - n - ); - } - } - - #[tokio::test] - async fn correct_request() { - let config: StackdriverConfig = toml::from_str(indoc! {r#" - project_id = "project" - log_id = "testlogs" - resource.type = "generic_node" - resource.namespace = "office" - "#}) - .unwrap(); - - let sink = StackdriverSink { - config, - auth: GcpAuthenticator::None, - severity_key: None, - uri: default_endpoint().parse().unwrap(), - }; - let mut encoder = sink.build_encoder(); - - let log1 = [("message", "hello")].iter().copied().collect::(); - let log2 = [("message", "world")].iter().copied().collect::(); - let event1 = encoder.encode_event(Event::from(log1)).unwrap(); - let event2 = encoder.encode_event(Event::from(log2)).unwrap(); - - let json1 = serde_json::to_string(&event1).unwrap(); - let json2 = serde_json::to_string(&event2).unwrap(); - let raw1 = RawValue::from_string(json1).unwrap(); - let raw2 = RawValue::from_string(json2).unwrap(); - - let events = vec![raw1, raw2]; - - let request = sink.build_request(events).await.unwrap(); - - let (parts, body) = request.into_parts(); - - let json: serde_json::Value = serde_json::from_slice(&body[..]).unwrap(); - - assert_eq!( - &parts.uri.to_string(), - "https://logging.googleapis.com/v2/entries:write" - ); - assert_eq!( - json, - serde_json::json!({ - "entries": [ - { - "logName": "projects/project/logs/testlogs", - "severity": 0, - "jsonPayload": { - "message": "hello" - }, - "resource": { - "type": "generic_node", - "labels": { - "namespace": "office" - } - } - }, - { - "logName": "projects/project/logs/testlogs", - "severity": 0, - "jsonPayload": { - "message": "world" - }, - "resource": { - "type": "generic_node", - "labels": { - "namespace": "office" - } - } - } - ] - }) - ); - } - - #[tokio::test] - async fn fails_missing_creds() { - let config: StackdriverConfig = toml::from_str(indoc! {r#" - project_id = "project" - log_id = "testlogs" - resource.type = "generic_node" - resource.namespace = "office" - "#}) - .unwrap(); - if config.build(SinkContext::default()).await.is_ok() { - panic!("config.build failed to error"); - } - } - - #[test] - fn fails_invalid_log_names() { - toml::from_str::(indoc! {r#" - log_id = "testlogs" - resource.type = "generic_node" - resource.namespace = "office" - "#}) - .expect_err("Config parsing failed to error with missing ids"); - - toml::from_str::(indoc! {r#" - project_id = "project" - folder_id = "folder" - log_id = "testlogs" - resource.type = "generic_node" - resource.namespace = "office" - "#}) - .expect_err("Config parsing failed to error with extraneous ids"); - } -} diff --git a/src/sinks/honeycomb.rs b/src/sinks/honeycomb.rs deleted file mode 100644 index 6cba7079a84e0..0000000000000 --- a/src/sinks/honeycomb.rs +++ /dev/null @@ -1,254 +0,0 @@ -use bytes::Bytes; -use futures::{FutureExt, SinkExt}; -use http::{Request, StatusCode, Uri}; -use serde_json::json; -use vector_common::sensitive_string::SensitiveString; -use vector_config::configurable_component; -use vrl::value::Kind; - -use crate::{ - codecs::Transformer, - config::{AcknowledgementsConfig, GenerateConfig, Input, SinkConfig, SinkContext}, - event::{Event, Value}, - http::HttpClient, - schema, - sinks::util::{ - http::{BatchedHttpSink, HttpEventEncoder, HttpSink}, - BatchConfig, BoxedRawValue, JsonArrayBuffer, SinkBatchSettings, TowerRequestConfig, - }, -}; - -/// Configuration for the `honeycomb` sink. -#[configurable_component(sink("honeycomb", "Deliver log events to Honeycomb."))] -#[derive(Clone, Debug)] -pub struct HoneycombConfig { - // This endpoint is not user-configurable and only exists for testing purposes - #[serde(skip, default = "default_endpoint")] - endpoint: String, - - /// The API key that is used to authenticate against Honeycomb. - #[configurable(metadata(docs::examples = "${HONEYCOMB_API_KEY}"))] - #[configurable(metadata(docs::examples = "some-api-key"))] - api_key: SensitiveString, - - /// The dataset to which logs are sent. - #[configurable(metadata(docs::examples = "my-honeycomb-dataset"))] - // TODO: we probably want to make this a template - // but this limits us in how we can do our healthcheck. - dataset: String, - - #[configurable(derived)] - #[serde(default)] - batch: BatchConfig, - - #[configurable(derived)] - #[serde(default)] - request: TowerRequestConfig, - - #[configurable(derived)] - #[serde( - default, - skip_serializing_if = "crate::serde::skip_serializing_if_default" - )] - encoding: Transformer, - - #[configurable(derived)] - #[serde( - default, - deserialize_with = "crate::serde::bool_or_struct", - skip_serializing_if = "crate::serde::skip_serializing_if_default" - )] - acknowledgements: AcknowledgementsConfig, -} - -fn default_endpoint() -> String { - "https://api.honeycomb.io/1/batch".to_string() -} - -#[derive(Clone, Copy, Debug, Default)] -struct HoneycombDefaultBatchSettings; - -impl SinkBatchSettings for HoneycombDefaultBatchSettings { - const MAX_EVENTS: Option = None; - const MAX_BYTES: Option = Some(100_000); - const TIMEOUT_SECS: f64 = 1.0; -} - -impl GenerateConfig for HoneycombConfig { - fn generate_config() -> toml::Value { - toml::from_str( - r#"api_key = "${HONEYCOMB_API_KEY}" - dataset = "my-honeycomb-dataset""#, - ) - .unwrap() - } -} - -#[async_trait::async_trait] -#[typetag::serde(name = "honeycomb")] -impl SinkConfig for HoneycombConfig { - async fn build( - &self, - cx: SinkContext, - ) -> crate::Result<(super::VectorSink, super::Healthcheck)> { - let request_settings = self.request.unwrap_with(&TowerRequestConfig::default()); - let batch_settings = self.batch.into_batch_settings()?; - - let buffer = JsonArrayBuffer::new(batch_settings.size); - - let client = HttpClient::new(None, cx.proxy())?; - - let sink = BatchedHttpSink::new( - self.clone(), - buffer, - request_settings, - batch_settings.timeout, - client.clone(), - ) - .sink_map_err(|error| error!(message = "Fatal honeycomb sink error.", %error)); - - let healthcheck = healthcheck(self.clone(), client).boxed(); - - #[allow(deprecated)] - Ok((super::VectorSink::from_event_sink(sink), healthcheck)) - } - - fn input(&self) -> Input { - let requirement = - schema::Requirement::empty().optional_meaning("timestamp", Kind::timestamp()); - - Input::log().with_schema_requirement(requirement) - } - - fn acknowledgements(&self) -> &AcknowledgementsConfig { - &self.acknowledgements - } -} - -pub struct HoneycombEventEncoder { - transformer: Transformer, -} - -impl HttpEventEncoder for HoneycombEventEncoder { - fn encode_event(&mut self, mut event: Event) -> Option { - self.transformer.transform(&mut event); - let mut log = event.into_log(); - - let timestamp = if let Some(Value::Timestamp(ts)) = log.remove_timestamp() { - ts - } else { - chrono::Utc::now() - }; - - let data = json!({ - "time": timestamp.to_rfc3339_opts(chrono::SecondsFormat::Nanos, true), - "data": log.convert_to_fields(), - }); - - Some(data) - } -} - -#[async_trait::async_trait] -impl HttpSink for HoneycombConfig { - type Input = serde_json::Value; - type Output = Vec; - type Encoder = HoneycombEventEncoder; - - fn build_encoder(&self) -> Self::Encoder { - HoneycombEventEncoder { - transformer: self.encoding.clone(), - } - } - - async fn build_request(&self, events: Self::Output) -> crate::Result> { - let uri = self.build_uri(); - let request = Request::post(uri).header("X-Honeycomb-Team", self.api_key.inner()); - let body = crate::serde::json::to_bytes(&events).unwrap().freeze(); - - request.body(body).map_err(Into::into) - } -} - -impl HoneycombConfig { - fn build_uri(&self) -> Uri { - let uri = format!("{}/{}", self.endpoint, self.dataset); - - uri.parse::().expect("This should be a valid uri") - } -} - -async fn healthcheck(config: HoneycombConfig, client: HttpClient) -> crate::Result<()> { - let req = config - .build_request(Vec::new()) - .await? - .map(hyper::Body::from); - - let res = client.send(req).await?; - - let status = res.status(); - let body = hyper::body::to_bytes(res.into_body()).await?; - - if status == StatusCode::BAD_REQUEST { - Ok(()) - } else if status == StatusCode::UNAUTHORIZED { - let json: serde_json::Value = serde_json::from_slice(&body[..])?; - - let message = if let Some(s) = json - .as_object() - .and_then(|o| o.get("error")) - .and_then(|s| s.as_str()) - { - s.to_string() - } else { - "Token is not valid, 401 returned.".to_string() - }; - - Err(message.into()) - } else { - let body = String::from_utf8_lossy(&body[..]); - - Err(format!( - "Server returned unexpected error status: {} body: {}", - status, body - ) - .into()) - } -} -#[cfg(test)] -mod test { - use futures::{future::ready, stream}; - use serde::Deserialize; - use vector_core::event::{Event, LogEvent}; - - use crate::{ - config::{GenerateConfig, SinkConfig, SinkContext}, - test_util::{ - components::{run_and_assert_sink_compliance, HTTP_SINK_TAGS}, - http::{always_200_response, spawn_blackhole_http_server}, - }, - }; - - use super::HoneycombConfig; - - #[test] - fn generate_config() { - crate::test_util::test_generate_config::(); - } - - #[tokio::test] - async fn component_spec_compliance() { - let mock_endpoint = spawn_blackhole_http_server(always_200_response).await; - - let config = HoneycombConfig::generate_config().to_string(); - let mut config = HoneycombConfig::deserialize(toml::de::ValueDeserializer::new(&config)) - .expect("config should be valid"); - config.endpoint = mock_endpoint.to_string(); - - let context = SinkContext::default(); - let (sink, _healthcheck) = config.build(context).await.unwrap(); - - let event = Event::Log(LogEvent::from("simple message")); - run_and_assert_sink_compliance(sink, stream::once(ready(event)), &HTTP_SINK_TAGS).await; - } -} diff --git a/src/sinks/honeycomb/config.rs b/src/sinks/honeycomb/config.rs new file mode 100644 index 0000000000000..0a53bf16369cc --- /dev/null +++ b/src/sinks/honeycomb/config.rs @@ -0,0 +1,185 @@ +//! Configuration for the `honeycomb` sink. + +use bytes::Bytes; +use futures::FutureExt; +use http::{Request, StatusCode, Uri}; +use vector_common::sensitive_string::SensitiveString; +use vrl::value::Kind; + +use crate::{ + http::HttpClient, + sinks::{ + prelude::*, + util::{ + http::{http_response_retry_logic, HttpService}, + BatchConfig, BoxedRawValue, + }, + }, +}; + +use super::{ + encoder::HoneycombEncoder, request_builder::HoneycombRequestBuilder, + service::HoneycombSvcRequestBuilder, sink::HoneycombSink, +}; + +pub(super) const HTTP_HEADER_HONEYCOMB: &str = "X-Honeycomb-Team"; + +/// Configuration for the `honeycomb` sink. +#[configurable_component(sink("honeycomb", "Deliver log events to Honeycomb."))] +#[derive(Clone, Debug)] +pub struct HoneycombConfig { + // This endpoint is not user-configurable and only exists for testing purposes + #[serde(skip, default = "default_endpoint")] + pub(super) endpoint: String, + + /// The API key that is used to authenticate against Honeycomb. + #[configurable(metadata(docs::examples = "${HONEYCOMB_API_KEY}"))] + #[configurable(metadata(docs::examples = "some-api-key"))] + api_key: SensitiveString, + + /// The dataset to which logs are sent. + #[configurable(metadata(docs::examples = "my-honeycomb-dataset"))] + // TODO: we probably want to make this a template + // but this limits us in how we can do our healthcheck. + dataset: String, + + #[configurable(derived)] + #[serde(default)] + batch: BatchConfig, + + #[configurable(derived)] + #[serde(default)] + request: TowerRequestConfig, + + #[configurable(derived)] + #[serde( + default, + skip_serializing_if = "crate::serde::skip_serializing_if_default" + )] + encoding: Transformer, + + #[configurable(derived)] + #[serde( + default, + deserialize_with = "crate::serde::bool_or_struct", + skip_serializing_if = "crate::serde::skip_serializing_if_default" + )] + acknowledgements: AcknowledgementsConfig, +} + +fn default_endpoint() -> String { + "https://api.honeycomb.io/1/batch".to_string() +} + +#[derive(Clone, Copy, Debug, Default)] +struct HoneycombDefaultBatchSettings; + +impl SinkBatchSettings for HoneycombDefaultBatchSettings { + const MAX_EVENTS: Option = None; + const MAX_BYTES: Option = Some(100_000); + const TIMEOUT_SECS: f64 = 1.0; +} + +impl GenerateConfig for HoneycombConfig { + fn generate_config() -> toml::Value { + toml::from_str( + r#"api_key = "${HONEYCOMB_API_KEY}" + dataset = "my-honeycomb-dataset""#, + ) + .unwrap() + } +} + +#[async_trait::async_trait] +#[typetag::serde(name = "honeycomb")] +impl SinkConfig for HoneycombConfig { + async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> { + let batch_settings = self.batch.validate()?.into_batcher_settings()?; + + let request_builder = HoneycombRequestBuilder { + encoder: HoneycombEncoder { + transformer: self.encoding.clone(), + }, + }; + + let uri = self.build_uri()?; + + let honeycomb_service_request_builder = HoneycombSvcRequestBuilder { + uri: uri.clone(), + api_key: self.api_key.clone(), + }; + + let client = HttpClient::new(None, cx.proxy())?; + + let service = HttpService::new(client.clone(), honeycomb_service_request_builder); + + let request_limits = self.request.unwrap_with(&TowerRequestConfig::default()); + + let service = ServiceBuilder::new() + .settings(request_limits, http_response_retry_logic()) + .service(service); + + let sink = HoneycombSink::new(service, batch_settings, request_builder); + + let healthcheck = healthcheck(uri, self.api_key.clone(), client).boxed(); + + Ok((VectorSink::from_event_streamsink(sink), healthcheck)) + } + + fn input(&self) -> Input { + let requirement = Requirement::empty().optional_meaning("timestamp", Kind::timestamp()); + + Input::log().with_schema_requirement(requirement) + } + + fn acknowledgements(&self) -> &AcknowledgementsConfig { + &self.acknowledgements + } +} + +impl HoneycombConfig { + fn build_uri(&self) -> crate::Result { + let uri = format!("{}/{}", self.endpoint, self.dataset); + uri.parse::().map_err(Into::into) + } +} + +async fn healthcheck(uri: Uri, api_key: SensitiveString, client: HttpClient) -> crate::Result<()> { + let request = Request::post(uri).header(HTTP_HEADER_HONEYCOMB, api_key.inner()); + let body = crate::serde::json::to_bytes(&Vec::::new()) + .unwrap() + .freeze(); + let req: Request = request.body(body)?; + let req = req.map(hyper::Body::from); + + let res = client.send(req).await?; + + let status = res.status(); + let body = hyper::body::to_bytes(res.into_body()).await?; + + if status == StatusCode::BAD_REQUEST { + Ok(()) + } else if status == StatusCode::UNAUTHORIZED { + let json: serde_json::Value = serde_json::from_slice(&body[..])?; + + let message = if let Some(s) = json + .as_object() + .and_then(|o| o.get("error")) + .and_then(|s| s.as_str()) + { + s.to_string() + } else { + "Token is not valid, 401 returned.".to_string() + }; + + Err(message.into()) + } else { + let body = String::from_utf8_lossy(&body[..]); + + Err(format!( + "Server returned unexpected error status: {} body: {}", + status, body + ) + .into()) + } +} diff --git a/src/sinks/honeycomb/encoder.rs b/src/sinks/honeycomb/encoder.rs new file mode 100644 index 0000000000000..0b9ccd429063d --- /dev/null +++ b/src/sinks/honeycomb/encoder.rs @@ -0,0 +1,52 @@ +//! Encoding for the `honeycomb` sink. + +use bytes::BytesMut; +use chrono::{SecondsFormat, Utc}; +use serde_json::{json, to_vec}; +use std::io; + +use crate::sinks::{ + prelude::*, + util::encoding::{write_all, Encoder as SinkEncoder}, +}; + +pub(super) struct HoneycombEncoder { + pub(super) transformer: Transformer, +} + +impl SinkEncoder> for HoneycombEncoder { + fn encode_input( + &self, + events: Vec, + writer: &mut dyn io::Write, + ) -> io::Result<(usize, GroupedCountByteSize)> { + let mut byte_size = telemetry().create_request_count_byte_size(); + let mut body = BytesMut::new(); + let n_events = events.len(); + + for mut event in events { + self.transformer.transform(&mut event); + + byte_size.add_event(&event, event.estimated_json_encoded_size_of()); + + let log = event.as_mut_log(); + + let timestamp = if let Some(Value::Timestamp(ts)) = log.remove_timestamp() { + ts + } else { + Utc::now() + }; + + let data = to_vec(&json!({ + "time": timestamp.to_rfc3339_opts(SecondsFormat::Nanos, true), + "data": log.convert_to_fields(), + }))?; + + body.extend(&data); + } + + let body = body.freeze(); + + write_all(writer, n_events, body.as_ref()).map(|()| (body.len(), byte_size)) + } +} diff --git a/src/sinks/honeycomb/mod.rs b/src/sinks/honeycomb/mod.rs new file mode 100644 index 0000000000000..e93745194e92d --- /dev/null +++ b/src/sinks/honeycomb/mod.rs @@ -0,0 +1,13 @@ +//! The Honeycomb [`vector_core::sink::VectorSink`]. +//! +//! This module contains the [`vector_core::sink::VectorSink`] instance that is responsible for +//! taking a stream of [`vector_core::event::Event`]s and forwarding them to the Honeycomb service. + +mod config; +mod encoder; +mod request_builder; +mod service; +mod sink; + +#[cfg(test)] +mod tests; diff --git a/src/sinks/honeycomb/request_builder.rs b/src/sinks/honeycomb/request_builder.rs new file mode 100644 index 0000000000000..a84c9dec2ba4b --- /dev/null +++ b/src/sinks/honeycomb/request_builder.rs @@ -0,0 +1,47 @@ +//! `RequestBuilder` implementation for the `honeycomb` sink. + +use bytes::Bytes; +use std::io; + +use crate::sinks::{prelude::*, util::http::HttpRequest}; + +use super::encoder::HoneycombEncoder; + +pub(super) struct HoneycombRequestBuilder { + pub(super) encoder: HoneycombEncoder, +} + +impl RequestBuilder> for HoneycombRequestBuilder { + type Metadata = EventFinalizers; + type Events = Vec; + type Encoder = HoneycombEncoder; + type Payload = Bytes; + type Request = HttpRequest; + type Error = io::Error; + + fn compression(&self) -> Compression { + Compression::None + } + + fn encoder(&self) -> &Self::Encoder { + &self.encoder + } + + fn split_input( + &self, + mut events: Vec, + ) -> (Self::Metadata, RequestMetadataBuilder, Self::Events) { + let finalizers = events.take_finalizers(); + let builder = RequestMetadataBuilder::from_events(&events); + (finalizers, builder, events) + } + + fn build_request( + &self, + metadata: Self::Metadata, + request_metadata: RequestMetadata, + payload: EncodeResult, + ) -> Self::Request { + HttpRequest::new(payload.into_payload(), metadata, request_metadata) + } +} diff --git a/src/sinks/honeycomb/service.rs b/src/sinks/honeycomb/service.rs new file mode 100644 index 0000000000000..57eda27501558 --- /dev/null +++ b/src/sinks/honeycomb/service.rs @@ -0,0 +1,25 @@ +//! Service implementation for the `honeycomb` sink. + +use bytes::Bytes; +use http::{Request, Uri}; +use vector_common::sensitive_string::SensitiveString; + +use crate::sinks::util::http::HttpServiceRequestBuilder; + +use super::config::HTTP_HEADER_HONEYCOMB; + +#[derive(Debug, Clone)] +pub(super) struct HoneycombSvcRequestBuilder { + pub(super) uri: Uri, + pub(super) api_key: SensitiveString, +} + +impl HttpServiceRequestBuilder for HoneycombSvcRequestBuilder { + fn build(&self, body: Bytes) -> Request { + let request = Request::post(&self.uri).header(HTTP_HEADER_HONEYCOMB, self.api_key.inner()); + + request + .body(body) + .expect("Failed to assign body to request- builder has errors") + } +} diff --git a/src/sinks/honeycomb/sink.rs b/src/sinks/honeycomb/sink.rs new file mode 100644 index 0000000000000..9575577b91e11 --- /dev/null +++ b/src/sinks/honeycomb/sink.rs @@ -0,0 +1,77 @@ +//! Implementation of the `honeycomb` sink. + +use crate::sinks::{ + prelude::*, + util::http::{HttpJsonBatchSizer, HttpRequest}, +}; + +use super::request_builder::HoneycombRequestBuilder; + +pub(super) struct HoneycombSink { + service: S, + batch_settings: BatcherSettings, + request_builder: HoneycombRequestBuilder, +} + +impl HoneycombSink +where + S: Service + Send + 'static, + S::Future: Send + 'static, + S::Response: DriverResponse + Send + 'static, + S::Error: std::fmt::Debug + Into + Send, +{ + /// Creates a new `HoneycombSink`. + pub(super) const fn new( + service: S, + batch_settings: BatcherSettings, + request_builder: HoneycombRequestBuilder, + ) -> Self { + Self { + service, + batch_settings, + request_builder, + } + } + + async fn run_inner(self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { + input + // Batch the input stream with size calculation based on the estimated encoded json size + .batched( + self.batch_settings + .into_item_size_config(HttpJsonBatchSizer), + ) + // Build requests with no concurrency limit. + .request_builder(None, self.request_builder) + // Filter out any errors that occurred in the request building. + .filter_map(|request| async move { + match request { + Err(error) => { + emit!(SinkRequestBuildError { error }); + None + } + Ok(req) => Some(req), + } + }) + // Generate the driver that will send requests and handle retries, + // event finalization, and logging/internal metric reporting. + .into_driver(self.service) + .run() + .await + } +} + +#[async_trait::async_trait] +impl StreamSink for HoneycombSink +where + S: Service + Send + 'static, + S::Future: Send + 'static, + S::Response: DriverResponse + Send + 'static, + S::Error: std::fmt::Debug + Into + Send, +{ + async fn run( + self: Box, + input: futures_util::stream::BoxStream<'_, Event>, + ) -> Result<(), ()> { + self.run_inner(input).await + } +} diff --git a/src/sinks/honeycomb/tests.rs b/src/sinks/honeycomb/tests.rs new file mode 100644 index 0000000000000..29186f47f8012 --- /dev/null +++ b/src/sinks/honeycomb/tests.rs @@ -0,0 +1,35 @@ +//! Unit tests for the `honeycomb` sink. + +use futures::{future::ready, stream}; +use serde::Deserialize; + +use crate::{ + sinks::prelude::*, + test_util::{ + components::{run_and_assert_sink_compliance, HTTP_SINK_TAGS}, + http::{always_200_response, spawn_blackhole_http_server}, + }, +}; + +use super::config::HoneycombConfig; + +#[test] +fn generate_config() { + crate::test_util::test_generate_config::(); +} + +#[tokio::test] +async fn component_spec_compliance() { + let mock_endpoint = spawn_blackhole_http_server(always_200_response).await; + + let config = HoneycombConfig::generate_config().to_string(); + let mut config = HoneycombConfig::deserialize(toml::de::ValueDeserializer::new(&config)) + .expect("config should be valid"); + config.endpoint = mock_endpoint.to_string(); + + let context = SinkContext::default(); + let (sink, _healthcheck) = config.build(context).await.unwrap(); + + let event = Event::Log(LogEvent::from("simple message")); + run_and_assert_sink_compliance(sink, stream::once(ready(event)), &HTTP_SINK_TAGS).await; +} diff --git a/src/sinks/http/config.rs b/src/sinks/http/config.rs index 5c933c1265eae..c5782cae0cabc 100644 --- a/src/sinks/http/config.rs +++ b/src/sinks/http/config.rs @@ -14,7 +14,7 @@ use crate::{ sinks::{ prelude::*, util::{ - http::{HttpResponse, HttpService, HttpStatusRetryLogic, RequestConfig}, + http::{http_response_retry_logic, HttpService, RequestConfig}, RealtimeSizeBasedDefaultBatchSettings, UriSerde, }, }, @@ -288,11 +288,8 @@ impl SinkConfig for HttpSinkConfig { let request_limits = self.request.tower.unwrap_with(&Default::default()); - let retry_logic = - HttpStatusRetryLogic::new(|req: &HttpResponse| req.http_response.status()); - let service = ServiceBuilder::new() - .settings(request_limits, retry_logic) + .settings(request_limits, http_response_retry_logic()) .service(service); let sink = HttpSink::new(service, batch_settings, request_builder); diff --git a/src/sinks/util/http.rs b/src/sinks/util/http.rs index 4dbfcab0ab4f8..93a56f5bf6097 100644 --- a/src/sinks/util/http.rs +++ b/src/sinks/util/http.rs @@ -20,7 +20,9 @@ use snafu::{ResultExt, Snafu}; use tower::{Service, ServiceBuilder}; use tower_http::decompression::DecompressionLayer; use vector_config::configurable_component; -use vector_core::{ByteSizeOf, EstimatedJsonEncodedSizeOf}; +use vector_core::{ + stream::batcher::limiter::ItemBatchSize, ByteSizeOf, EstimatedJsonEncodedSizeOf, +}; use super::{ retries::{RetryAction, RetryLogic}, @@ -678,6 +680,24 @@ impl DriverResponse for HttpResponse { } } +/// Creates a `RetryLogic` for use with `HttpResponse`. +pub fn http_response_retry_logic() -> HttpStatusRetryLogic< + impl Fn(&HttpResponse) -> StatusCode + Clone + Send + Sync + 'static, + HttpResponse, +> { + HttpStatusRetryLogic::new(|req: &HttpResponse| req.http_response.status()) +} + +/// Uses the estimated json encoded size to determine batch sizing. +#[derive(Default)] +pub struct HttpJsonBatchSizer; + +impl ItemBatchSize for HttpJsonBatchSizer { + fn size(&self, item: &Event) -> usize { + item.estimated_json_encoded_size_of().get() + } +} + /// HTTP request builder for HTTP stream sinks using the generic `HttpService` pub trait HttpServiceRequestBuilder { fn build(&self, body: Bytes) -> Request; diff --git a/src/sinks/websocket/config.rs b/src/sinks/websocket/config.rs index 6027273ee4275..76a3391ff0eac 100644 --- a/src/sinks/websocket/config.rs +++ b/src/sinks/websocket/config.rs @@ -94,7 +94,7 @@ impl SinkConfig for WebSocketSinkConfig { } fn input(&self) -> Input { - Input::log() + Input::new(self.encoding.config().input_type()) } fn acknowledgements(&self) -> &AcknowledgementsConfig { diff --git a/src/transforms/mod.rs b/src/transforms/mod.rs index b78dda290b9aa..361e5d68db413 100644 --- a/src/transforms/mod.rs +++ b/src/transforms/mod.rs @@ -12,6 +12,7 @@ pub mod aws_ec2_metadata; pub mod dedupe; #[cfg(feature = "transforms-filter")] pub mod filter; +#[cfg(feature = "transforms-log_to_metric")] pub mod log_to_metric; #[cfg(feature = "transforms-lua")] pub mod lua; diff --git a/vdev/Cargo.toml b/vdev/Cargo.toml index 306ac05146688..fabdfbe82bdca 100644 --- a/vdev/Cargo.toml +++ b/vdev/Cargo.toml @@ -12,7 +12,7 @@ anyhow = "1.0.75" atty = "0.2.14" cached = "0.44.0" chrono = { version = "0.4.22", default-features = false, features = ["serde", "clock"] } -clap = { version = "4.3.22", features = ["derive"] } +clap = { version = "4.3.24", features = ["derive"] } clap-verbosity-flag = "2.0.1" clap_complete = "4.3.2" confy = "0.5.1" diff --git a/website/content/en/docs/about/under-the-hood/architecture/pipeline-model.md b/website/content/en/docs/about/under-the-hood/architecture/pipeline-model.md index ee799c687e47d..bcd2144ab2aeb 100644 --- a/website/content/en/docs/about/under-the-hood/architecture/pipeline-model.md +++ b/website/content/en/docs/about/under-the-hood/architecture/pipeline-model.md @@ -10,7 +10,7 @@ Vector's pipeline model is based on a [directed acyclic graph][dag] of [componen ## Defining pipelines -A Vector pipeline is defined through a TOML, YAML, or JSON [configuration] file. For maintainability, many Vector users use configuration and data templating languages like [Jsonnet] or [CUE]. +A Vector pipeline is defined through a YAML, TOML, or JSON [configuration] file. For maintainability, many Vector users use configuration and data templating languages like [Jsonnet] or [CUE]. Configuration is checked at pipeline compile time (when Vector boots). This prevents simple mistakes and enforces DAG properties. diff --git a/website/content/en/docs/administration/tuning.md b/website/content/en/docs/administration/tuning/_index.md similarity index 93% rename from website/content/en/docs/administration/tuning.md rename to website/content/en/docs/administration/tuning/_index.md index fe92dae9c11db..836557b142088 100644 --- a/website/content/en/docs/administration/tuning.md +++ b/website/content/en/docs/administration/tuning/_index.md @@ -7,3 +7,7 @@ tags: ["tuning", "rust", "performance"] Vector is written in [Rust] and therefore doesn't include a runtime or a virtual machine. There are no special service-level steps you need to undertake to improve performance as Vector takes full advantage of all system resources by default and without any adjustments. [rust]: https://rust-lang.org + +## Pages + +{{< pages >}} diff --git a/website/content/en/docs/administration/tuning/pgo.md b/website/content/en/docs/administration/tuning/pgo.md new file mode 100644 index 0000000000000..6018ea0e3b7e6 --- /dev/null +++ b/website/content/en/docs/administration/tuning/pgo.md @@ -0,0 +1,31 @@ +--- +title: Profile-Guided Optimization +description: How to optimize Vector performance with Profile-Guided Optimization +short: PGO +weight: 3 +tags: ["pgo", "tuning", "rust", "performance"] +--- + +Profile-Guided Optimization (PGO) is a compiler optimization technique where a program is optimized based on the runtime profile. + +According to the [tests], we see improvements of up to 15% more processed log events per second on some Vector workloads. The performance benefits depend on your typical workload - you can get better or worse results. + +More information about PGO in Vector you can read in the corresponding GitHub [issue]. + +### How to build Vector with PGO? + +There are two major kinds of PGO: Instrumentation and Sampling (also known as AutoFDO). In this guide, is described the Instrumentation PGO with Vector. In this guide we use [cargo-pgo] for building Vector with PGO. + +* Install [cargo-pgo]. +* Check out the Vector repository. +* Go to the Vector source directory and run `cargo pgo build`. It will build the instrumented Vector version. +* Run instrumented Vector on your test load like `cargo pgo run -- -- -c vector.toml` and wait for some time to collect enough information from your workload. Usually, waiting several minutes is enough (but your case can be different). +* Stop Vector instance. The profile data will be generated in the `target/pgo-profiles` directory. +* Run `cargo pgo optimize`. It will build Vector with PGO optimization. + +A more detailed guide on how to apply PGO is in the Rust [documentation]. + +[tests]: https://github.com/vectordotdev/vector/issues/15631#issue-1502073978 +[issue]: https://github.com/vectordotdev/vector/issues/15631 +[documentation]: https://doc.rust-lang.org/rustc/profile-guided-optimization.html +[cargo-pgo]: https://github.com/Kobzol/cargo-pgo diff --git a/website/content/en/docs/reference/configuration/_index.md b/website/content/en/docs/reference/configuration/_index.md index c9e47556afe25..699fc2a1c6a2a 100644 --- a/website/content/en/docs/reference/configuration/_index.md +++ b/website/content/en/docs/reference/configuration/_index.md @@ -270,7 +270,7 @@ environment variable example. ### Formats -Vector supports [TOML], [YAML], and [JSON] to ensure that Vector fits into your +Vector supports [YAML], [TOML], and [JSON] to ensure that Vector fits into your workflow. A side benefit of supporting YAML and JSON is that they enable you to use data templating languages such as [ytt], [Jsonnet] and [Cue]. diff --git a/website/content/en/docs/setup/installation/operating-systems/macos.md b/website/content/en/docs/setup/installation/operating-systems/macos.md index f83293bdf4a98..bdd227255e3f5 100644 --- a/website/content/en/docs/setup/installation/operating-systems/macos.md +++ b/website/content/en/docs/setup/installation/operating-systems/macos.md @@ -1,6 +1,7 @@ --- title: macOS weight: 5 +supported_installers: ["Homebrew", "Vector installer", "Docker"] --- [macOS] is the primary operating system for Apple's Mac computers. It's a certified Unix system based on Apple's Darwin operating system. This page covers installing and managing Vector on the macOS operating system. diff --git a/website/cue/reference.cue b/website/cue/reference.cue index 17f587f64f3c4..8ad1381afdce8 100644 --- a/website/cue/reference.cue +++ b/website/cue/reference.cue @@ -455,8 +455,8 @@ _values: { Config format | Example :-------------|:------- - [TOML](\(urls.toml)) | `condition = ".status == 200"` [YAML](\(urls.yaml)) | `condition: .status == 200` + [TOML](\(urls.toml)) | `condition = ".status == 200"` [JSON](\(urls.json)) | `"condition": ".status == 200"` """ } diff --git a/website/cue/reference/administration/management.cue b/website/cue/reference/administration/management.cue index 38128dae5d264..d717b2b3ed66d 100644 --- a/website/cue/reference/administration/management.cue +++ b/website/cue/reference/administration/management.cue @@ -16,7 +16,7 @@ administration: management: { variables: { variants?: [string, ...string] - config_formats: ["toml", "yaml", "json"] + config_formats: ["yaml", "toml", "json"] } manage?: { diff --git a/website/cue/reference/cli.cue b/website/cue/reference/cli.cue index ec32672cf73df..eb9072083449d 100644 --- a/website/cue/reference/cli.cue +++ b/website/cue/reference/cli.cue @@ -136,6 +136,11 @@ cli: { type: "string" env_var: "VECTOR_CONFIG_DIR" } + "config-yaml": { + description: env_vars.VECTOR_CONFIG_YAML.description + type: "string" + env_var: "VECTOR_CONFIG_YAML" + } "config-toml": { description: env_vars.VECTOR_CONFIG_TOML.description type: "string" @@ -146,11 +151,6 @@ cli: { type: "string" env_var: "VECTOR_CONFIG_JSON" } - "config-yaml": { - description: env_vars.VECTOR_CONFIG_YAML.description - type: "string" - env_var: "VECTOR_CONFIG_YAML" - } "graceful-shutdown-limit-secs": { description: env_vars.VECTOR_GRACEFUL_SHUTDOWN_LIMIT_SECS.description default: env_vars.VECTOR_GRACEFUL_SHUTDOWN_LIMIT_SECS.type.uint.default @@ -200,7 +200,7 @@ cli: { You can also visualize the output online at [webgraphviz.com](http://www.webgraphviz.com/). """ - example: "vector graph --config /etc/vector/vector.toml | dot -Tsvg > graph.svg" + example: "vector graph --config /etc/vector/vector.yaml | dot -Tsvg > graph.svg" options: _core_options } @@ -333,8 +333,8 @@ cli: { type: "enum" default: "json" enum: { - json: "Output events as JSON" yaml: "Output events as YAML" + json: "Output events as JSON" logfmt: "Output events as logfmt" } } @@ -416,24 +416,24 @@ cli: { } options: { - "config-toml": { + "config-yaml": { description: """ Any number of Vector config files to validate. - TOML file format is assumed. + YAML file format is assumed. """ type: "string" } - "config-json": { + "config-toml": { description: """ Any number of Vector config files to validate. - JSON file format is assumed. + TOML file format is assumed. """ type: "string" } - "config-yaml": { + "config-json": { description: """ Any number of Vector config files to validate. - YAML file format is assumed. + JSON file format is assumed. """ type: "string" } @@ -537,7 +537,7 @@ cli: { Read configuration from one or more files. Wildcard paths are supported. If no files are specified the default config path `/etc/vector/vector.toml` is targeted. TOML, YAML and JSON file formats are supported. The format to interpret the file with is determined from - the file extension (`.toml`, `.yaml`, `.json`). Vector falls back to TOML if it can't + the file extension (`.yaml`, `.toml`, `.json`). Vector falls back to YAML if it can't detect a supported format. """ type: string: { diff --git a/website/cue/reference/components/sinks/websocket.cue b/website/cue/reference/components/sinks/websocket.cue index f38a55091b4e9..d46777404de88 100644 --- a/website/cue/reference/components/sinks/websocket.cue +++ b/website/cue/reference/components/sinks/websocket.cue @@ -67,9 +67,16 @@ components: sinks: websocket: { } input: { - logs: true - metrics: null - traces: false + logs: true + metrics: { + counter: true + distribution: true + gauge: true + histogram: true + summary: true + set: true + } + traces: true } telemetry: metrics: { diff --git a/website/cue/reference/configuration.cue b/website/cue/reference/configuration.cue index bf1a7e8f98453..cb501633edb63 100644 --- a/website/cue/reference/configuration.cue +++ b/website/cue/reference/configuration.cue @@ -701,7 +701,7 @@ configuration: { formats: { title: "Formats" body: """ - Vector supports [TOML](\(urls.toml)), [YAML](\(urls.yaml)), and [JSON](\(urls.json)) to + Vector supports [YAML](\(urls.yaml)), [TOML](\(urls.toml)), and [JSON](\(urls.json)) to ensure Vector fits into your workflow. A side benefit of supporting YAML and JSON is that they enable you to use data templating languages such as [ytt](\(urls.ytt)), [Jsonnet](\(urls.jsonnet)) and [Cue](\(urls.cue)). diff --git a/website/cue/reference/remap/expressions/comparison.cue b/website/cue/reference/remap/expressions/comparison.cue index f2c83581eb050..53fbef91d1c42 100644 --- a/website/cue/reference/remap/expressions/comparison.cue +++ b/website/cue/reference/remap/expressions/comparison.cue @@ -1,10 +1,10 @@ package metadata remap: expressions: comparison: { - title: "Comparison" + title: "Comparison" description: """ A _comparison_ expression compares two expressions (operands) and produces a Boolean as defined by the - operator. + operator. Please refer to the [match function](\(urls.vrl_match_function)) for matching a string against a regex. """ return: """ Returns a Boolean as defined by the operator. @@ -39,12 +39,19 @@ remap: expressions: comparison: { examples: [ { - title: "Equal" + title: "Equal integers" source: #""" 1 == 1 """# return: true }, + { + title: "Equal integer and float" + source: #""" + 1 == 1.0 + """# + return: true + }, { title: "Not equal" source: #""" @@ -52,6 +59,21 @@ remap: expressions: comparison: { """# return: true }, + { + title: "Equal string" + source: #""" + x = "foo" + x == "foo" + """# + return: true + }, + { + title: "Not equal strings" + source: #""" + "foo" != "bar" + """# + return: true + }, { title: "Greater than or equal" source: #""" diff --git a/website/cue/reference/remap/functions/match.cue b/website/cue/reference/remap/functions/match.cue index 44d095bb07e8d..4ee82bdb5a574 100644 --- a/website/cue/reference/remap/functions/match.cue +++ b/website/cue/reference/remap/functions/match.cue @@ -31,5 +31,12 @@ remap: functions: match: { """ return: true }, + { + title: "String does not match the regular expression" + source: """ + match("I'm a little teapot", r'.*balloon') + """ + return: false + }, ] } diff --git a/website/cue/reference/urls.cue b/website/cue/reference/urls.cue index 73ff61e5b3ec8..b8aad39e8175d 100644 --- a/website/cue/reference/urls.cue +++ b/website/cue/reference/urls.cue @@ -635,6 +635,7 @@ urls: { vrl_features: "\(vrl_reference)/#features" vrl_functions: "\(vrl_reference)/functions" vrl_literals: "\(vrl_expressions)/#literal-expressions" + vrl_match_function: "\(vrl_functions)/#match" vrl_parsing_functions: "\(vrl_functions)/#parse-functions" vrl_path_expressions: "\(vrl_expressions)#path" vrl_performance: "\(vrl_reference)#performance" diff --git a/website/layouts/partials/blog/authors.html b/website/layouts/partials/blog/authors.html index 952f5335b1c18..47e822ed6754a 100644 --- a/website/layouts/partials/blog/authors.html +++ b/website/layouts/partials/blog/authors.html @@ -19,4 +19,4 @@ {{ end }} - \ No newline at end of file + diff --git a/website/layouts/partials/components/example-configs.html b/website/layouts/partials/components/example-configs.html index 17fe753299052..79a75c81bbe48 100644 --- a/website/layouts/partials/components/example-configs.html +++ b/website/layouts/partials/components/example-configs.html @@ -1,6 +1,6 @@ {{ $examples := .examples }} {{ $levels := slice "common" "advanced" }} -{{ $formats := slice "toml" "yaml" "json" }} +{{ $formats := slice "yaml" "toml" "json" }}
{{ partial "heading.html" (dict "text" "Example configurations" "level" 3) }} @@ -34,4 +34,4 @@ {{ end }}
{{ end }} - \ No newline at end of file + diff --git a/website/layouts/partials/data.html b/website/layouts/partials/data.html index 55f14ab1c44a5..a4d3fa1e3dc6e 100644 --- a/website/layouts/partials/data.html +++ b/website/layouts/partials/data.html @@ -318,7 +318,7 @@ {{/* Source/transform/sink examples */}} {{ with .component_examples }} - {{ $formats := slice "toml" "yaml" "json" }} + {{ $formats := slice "yaml" "toml" "json" }} {{ $defaultFormat := index $formats 0 }}
{{ range . }} @@ -2193,7 +2193,7 @@

{{ partial "heading.html" (dict "text" .title "level" 4) }} - {{ $formats := slice "toml" "yaml" "json" }} + {{ $formats := slice "yaml" "toml" "json" }}
{{ range $formats }}