From f47611a28fc8e72fc19e1c3671f4c172b54c272d Mon Sep 17 00:00:00 2001 From: Jorge Hermo Date: Mon, 9 Sep 2024 01:37:56 +0200 Subject: [PATCH 01/23] feat(sinks): Initial postgres log sink implementation --- Cargo.lock | 206 +++++++++++++++++++++++- Cargo.toml | 5 + scripts/integration/postgres/test.yaml | 2 + src/sinks/mod.rs | 2 + src/sinks/postgres/config.rs | 113 +++++++++++++ src/sinks/postgres/integration_tests.rs | 106 ++++++++++++ src/sinks/postgres/mod.rs | 8 + src/sinks/postgres/service.rs | 143 ++++++++++++++++ src/sinks/postgres/sink.rs | 41 +++++ 9 files changed, 622 insertions(+), 4 deletions(-) create mode 100644 src/sinks/postgres/config.rs create mode 100644 src/sinks/postgres/integration_tests.rs create mode 100644 src/sinks/postgres/mod.rs create mode 100644 src/sinks/postgres/service.rs create mode 100644 src/sinks/postgres/sink.rs diff --git a/Cargo.lock b/Cargo.lock index 1ecadfde7a62c..c588ab78edb95 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -734,6 +734,15 @@ dependencies = [ "syn 2.0.75", ] +[[package]] +name = "atoi" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f28d99ec8bfea296261ca1af174f24225171fea9664ba9003cbebee704810528" +dependencies = [ + "num-traits", +] + [[package]] name = "atomic-waker" version = "1.1.2" @@ -1882,12 +1891,13 @@ dependencies = [ [[package]] name = "cc" -version = "1.0.83" +version = "1.1.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f1174fb0b6ec23863f8b971027804a42614e347eafb0a95bf0b12cdae21fc4d0" +checksum = "e9d013ecb737093c0e86b151a7b837993cf9ec6c502946cfb44bedc392421e0b" dependencies = [ "jobserver", "libc", + "shlex", ] [[package]] @@ -3034,6 +3044,12 @@ dependencies = [ "tracing 0.1.40", ] +[[package]] +name = "dotenvy" +version = "0.15.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1aaf95b3e5c8f23aa320147307562d361db0ae0d51242340f558153b4eb2439b" + [[package]] name = "duct" version = "0.13.6" @@ -3101,6 +3117,9 @@ name = "either" version = "1.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a26ae43d7bcc3b814de94796a5e736d4029efb0ee900c12e2d54c993ad1a1e07" +dependencies = [ + "serde", +] [[package]] name = "elliptic-curve" @@ -3305,6 +3324,17 @@ version = "3.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "281e452d3bad4005426416cdba5ccfd4f5c1280e10099e21db27f7c1c28347fc" +[[package]] +name = "etcetera" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "136d1b5283a1ab77bd9257427ffd09d8667ced0570b6f938942bc7568ed5b943" +dependencies = [ + "cfg-if", + "home", + "windows-sys 0.48.0", +] + [[package]] name = "event-listener" version = "2.5.3" @@ -3614,6 +3644,17 @@ dependencies = [ "futures-util", ] +[[package]] +name = "futures-intrusive" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d930c203dd0b6ff06e0201a4a2fe9149b43c684fd4420555b26d21b1a02956f" +dependencies = [ + "futures-core", + "lock_api", + "parking_lot", +] + [[package]] name = "futures-io" version = "0.3.30" @@ -3975,6 +4016,15 @@ dependencies = [ "allocator-api2", ] +[[package]] +name = "hashlink" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ba4ff7128dee98c7dc9794b6a411377e1404dba1c97deb8d1a55297bd25d8af" +dependencies = [ + "hashbrown 0.14.5", +] + [[package]] name = "hdrhistogram" version = "7.5.4" @@ -4849,9 +4899,9 @@ checksum = "8eaf4bc02d17cbdd7ff4c7438cafcdf7fb9a4613313ad11b4f8fefe7d3fa0130" [[package]] name = "jobserver" -version = "0.1.27" +version = "0.1.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8c37f63953c4c63420ed5fd3d6d398c719489b9f872b9fa683262f8edd363c7d" +checksum = "48d1dbcbbeb6a7fec7e059840aa538bd62aaccf972c7346c4d9d2059312853d0" dependencies = [ "libc", ] @@ -8701,6 +8751,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "shlex" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" + [[package]] name = "signal-hook" version = "0.3.17" @@ -8953,6 +9009,141 @@ dependencies = [ "der", ] +[[package]] +name = "sqlformat" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7bba3a93db0cc4f7bdece8bb09e77e2e785c20bfebf79eb8340ed80708048790" +dependencies = [ + "nom", + "unicode_categories", +] + +[[package]] +name = "sqlx" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93334716a037193fac19df402f8571269c84a00852f6a7066b5d2616dcd64d3e" +dependencies = [ + "sqlx-core", + "sqlx-macros", + "sqlx-postgres", +] + +[[package]] +name = "sqlx-core" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d4d8060b456358185f7d50c55d9b5066ad956956fddec42ee2e8567134a8936e" +dependencies = [ + "atoi", + "byteorder", + "bytes 1.7.1", + "crc", + "crossbeam-queue", + "either", + "event-listener 5.3.1", + "futures-channel", + "futures-core", + "futures-intrusive", + "futures-io", + "futures-util", + "hashbrown 0.14.5", + "hashlink", + "hex", + "indexmap 2.5.0", + "log", + "memchr", + "once_cell", + "paste", + "percent-encoding", + "serde", + "serde_json", + "sha2", + "smallvec", + "sqlformat", + "thiserror", + "tokio", + "tokio-stream", + "tracing 0.1.40", + "url", +] + +[[package]] +name = "sqlx-macros" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cac0692bcc9de3b073e8d747391827297e075c7710ff6276d9f7a1f3d58c6657" +dependencies = [ + "proc-macro2 1.0.86", + "quote 1.0.37", + "sqlx-core", + "sqlx-macros-core", + "syn 2.0.75", +] + +[[package]] +name = "sqlx-macros-core" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1804e8a7c7865599c9c79be146dc8a9fd8cc86935fa641d3ea58e5f0688abaa5" +dependencies = [ + "dotenvy", + "either", + "heck 0.5.0", + "hex", + "once_cell", + "proc-macro2 1.0.86", + "quote 1.0.37", + "serde", + "serde_json", + "sha2", + "sqlx-core", + "sqlx-postgres", + "syn 2.0.75", + "tempfile", + "tokio", + "url", +] + +[[package]] +name = "sqlx-postgres" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6fa91a732d854c5d7726349bb4bb879bb9478993ceb764247660aee25f67c2f8" +dependencies = [ + "atoi", + "base64 0.22.1", + "bitflags 2.4.1", + "byteorder", + "crc", + "dotenvy", + "etcetera", + "futures-channel", + "futures-core", + "futures-io", + "futures-util", + "hex", + "hkdf", + "hmac", + "home", + "itoa", + "log", + "md-5", + "memchr", + "once_cell", + "rand 0.8.5", + "serde", + "serde_json", + "sha2", + "smallvec", + "sqlx-core", + "stringprep", + "thiserror", + "tracing 0.1.40", + "whoami", +] + [[package]] name = "stability" version = "0.2.0" @@ -10209,6 +10400,12 @@ version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f962df74c8c05a667b5ee8bcf162993134c104e96440b663c8daa176dc772d8c" +[[package]] +name = "unicode_categories" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "39ec24b3121d976906ece63c9daad25b85969647682eee313cb5779fdd69e14e" + [[package]] name = "universal-hash" version = "0.5.1" @@ -10498,6 +10695,7 @@ dependencies = [ "snafu 0.7.5", "snap", "socket2 0.5.7", + "sqlx", "stream-cancel", "strip-ansi-escapes", "syslog", diff --git a/Cargo.toml b/Cargo.toml index 74efd11539857..9ff51601023b1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -342,6 +342,7 @@ smallvec = { version = "1", default-features = false, features = ["union", "serd snafu = { version = "0.7.5", default-features = false, features = ["futures", "std"] } snap = { version = "1.1.1", default-features = false } socket2 = { version = "0.5.7", default-features = false } +sqlx = { version = "0.8.2", default-features= false, features = ["derive", "postgres", "runtime-tokio"], optional=true } stream-cancel = { version = "0.8.2", default-features = false } strip-ansi-escapes = { version = "0.2.0", default-features = false } syslog = { version = "6.1.1", default-features = false, optional = true } @@ -699,6 +700,7 @@ sinks-logs = [ "sinks-new_relic_logs", "sinks-new_relic", "sinks-papertrail", + "sinks-postgres", "sinks-pulsar", "sinks-redis", "sinks-sematext", @@ -765,6 +767,7 @@ sinks-new_relic_logs = ["sinks-http"] sinks-new_relic = [] sinks-papertrail = ["dep:syslog"] sinks-prometheus = ["dep:base64", "vector-lib/prometheus"] +sinks-postgres = ["dep:sqlx"] sinks-pulsar = ["dep:apache-avro", "dep:pulsar", "dep:lru"] sinks-redis = ["dep:redis"] sinks-sematext = ["sinks-elasticsearch", "sinks-influxdb"] @@ -812,6 +815,7 @@ all-integration-tests = [ "nginx-integration-tests", "opentelemetry-integration-tests", "postgresql_metrics-integration-tests", + "postgres-integration-tests", "prometheus-integration-tests", "pulsar-integration-tests", "redis-integration-tests", @@ -877,6 +881,7 @@ nats-integration-tests = ["sinks-nats", "sources-nats"] nginx-integration-tests = ["sources-nginx_metrics"] opentelemetry-integration-tests = ["sources-opentelemetry"] postgresql_metrics-integration-tests = ["sources-postgresql_metrics"] +postgres-integration-tests = ["sinks-postgres"] prometheus-integration-tests = ["sinks-prometheus", "sources-prometheus", "sinks-influxdb"] pulsar-integration-tests = ["sinks-pulsar", "sources-pulsar"] redis-integration-tests = ["sinks-redis", "sources-redis"] diff --git a/scripts/integration/postgres/test.yaml b/scripts/integration/postgres/test.yaml index 67aa2ddc10b50..66562f32b65fc 100644 --- a/scripts/integration/postgres/test.yaml +++ b/scripts/integration/postgres/test.yaml @@ -1,5 +1,6 @@ features: - postgresql_metrics-integration-tests +- postgres-integration-tests test_filter: ::postgres @@ -18,6 +19,7 @@ matrix: # expressions are evaluated using https://github.com/micromatch/picomatch paths: - "src/internal_events/postgresql_metrics.rs" +- "src/sinks/postgres/**" - "src/sources/postgresql_metrics.rs" - "src/sources/util/**" - "scripts/integration/postgres/**" diff --git a/src/sinks/mod.rs b/src/sinks/mod.rs index bbd43af693b8c..e7ae005d1292f 100644 --- a/src/sinks/mod.rs +++ b/src/sinks/mod.rs @@ -81,6 +81,8 @@ pub mod new_relic; pub mod opendal_common; #[cfg(feature = "sinks-papertrail")] pub mod papertrail; +#[cfg(feature = "sinks-postgres")] +pub mod postgres; #[cfg(feature = "sinks-prometheus")] pub mod prometheus; #[cfg(feature = "sinks-pulsar")] diff --git a/src/sinks/postgres/config.rs b/src/sinks/postgres/config.rs new file mode 100644 index 0000000000000..015187113a1be --- /dev/null +++ b/src/sinks/postgres/config.rs @@ -0,0 +1,113 @@ +use futures::FutureExt; +use tower::ServiceBuilder; +use vector_lib::{ + config::AcknowledgementsConfig, + configurable::{component::GenerateConfig, configurable_component}, + sink::VectorSink, +}; + +use super::{ + service::{PostgresRetryLogic, PostgresService}, + sink::PostgresSink, +}; +use sqlx::{postgres::PgPoolOptions, Pool, Postgres}; + +use crate::{ + config::{Input, SinkConfig, SinkContext}, + sinks::{ + util::{ + BatchConfig, RealtimeSizeBasedDefaultBatchSettings, ServiceBuilderExt, + TowerRequestConfig, UriSerde, + }, + Healthcheck, + }, +}; + +/// Configuration for the `postgres` sink. +#[configurable_component(sink("postgres", "Deliver log data to a PostgreSQL database."))] +#[derive(Clone, Debug)] +#[serde(deny_unknown_fields)] +pub struct PostgresConfig { + /// TODO + /// TODO: if I used UriSerde instead of String, I couldn't get a url string to use + /// in the connection pool, as the password would be redacted with UriSerde::to_string + pub endpoint: String, + + /// TODO + pub table: String, + + #[configurable(derived)] + #[serde(default)] + pub batch: BatchConfig, + + #[configurable(derived)] + #[serde(default)] + pub request: TowerRequestConfig, + + #[configurable(derived)] + #[serde( + default, + deserialize_with = "crate::serde::bool_or_struct", + skip_serializing_if = "crate::serde::is_default" + )] + pub acknowledgements: AcknowledgementsConfig, +} + +impl GenerateConfig for PostgresConfig { + fn generate_config() -> toml::Value { + toml::from_str( + r#"endpoint = "postgres://user:password@localhost/default" + table = "default" + "#, + ) + .unwrap() + } +} + +#[async_trait::async_trait] +#[typetag::serde(name = "postgres")] +impl SinkConfig for PostgresConfig { + async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> { + // TODO: make connection pool configurable. Or should we just have one connection per sink? + // TODO: it seems that the number of connections in the pool does not affect the throughput of the sink + // does the sink execute batches in parallel? + let connection_pool = PgPoolOptions::new() + .max_connections(5) + .connect(&self.endpoint) + .await?; + + let healthcheck = healthcheck(connection_pool.clone()).boxed(); + + let batch_settings = self.batch.into_batcher_settings()?; + let request_settings = self.request.into_settings(); + + let endpoint_uri: UriSerde = self.endpoint.parse()?; + let service = PostgresService::new( + connection_pool, + self.table.clone(), + // TODO: this endpoint is used for metrics' tags. It could contain passwords, + // will it be redacted there? + endpoint_uri.to_string(), + ); + let service = ServiceBuilder::new() + .settings(request_settings, PostgresRetryLogic) + .service(service); + + let sink = PostgresSink::new(service, batch_settings); + + Ok((VectorSink::from_event_streamsink(sink), healthcheck)) + } + + fn input(&self) -> Input { + Input::log() + } + + fn acknowledgements(&self) -> &AcknowledgementsConfig { + &self.acknowledgements + } +} + +async fn healthcheck(connection_pool: Pool) -> crate::Result<()> { + sqlx::query("SELECT 1").execute(&connection_pool).await?; + Ok(()) +} diff --git a/src/sinks/postgres/integration_tests.rs b/src/sinks/postgres/integration_tests.rs new file mode 100644 index 0000000000000..5db12843fe3c7 --- /dev/null +++ b/src/sinks/postgres/integration_tests.rs @@ -0,0 +1,106 @@ +use crate::{ + config::{SinkConfig, SinkContext}, + sinks::{ + postgres::PostgresConfig, + util::{test::load_sink, UriSerde}, + }, + test_util::{components::run_and_assert_sink_compliance, random_string, trace_init}, +}; +use futures::stream; +use serde::{Deserialize, Serialize}; +use sqlx::{Connection, PgConnection}; +use std::future::ready; +use vector_lib::event::{BatchNotifier, BatchStatus, BatchStatusReceiver, Event, LogEvent}; + +fn pg_host() -> String { + std::env::var("PG_HOST").unwrap_or_else(|_| "localhost".into()) +} + +fn pg_url() -> String { + std::env::var("PG_URL") + .unwrap_or_else(|_| format!("postgres://vector:vector@{}/postgres", pg_host())) +} + +fn gen_table() -> String { + format!("test_{}", random_string(10).to_lowercase()) +} + +fn make_event() -> (Event, BatchStatusReceiver) { + let (batch, receiver) = BatchNotifier::new_with_receiver(); + let mut event = LogEvent::from("raw log line").with_batch_notifier(&batch); + event.insert("host", "example.com"); + (event.into(), receiver) +} + +#[derive(Serialize, Deserialize, sqlx::FromRow)] +struct TestEvent { + host: String, + timestamp: String, + message: String, +} + +async fn prepare_config() -> (String, String, PgConnection) { + trace_init(); + + let table = gen_table(); + let endpoint = pg_url(); + let _endpoint: UriSerde = endpoint.parse().unwrap(); + + let cfg = format!( + r#" + endpoint = "{endpoint}" + table = "{table}" + batch.max_events = 1 + "#, + ); + + let connection = PgConnection::connect(&endpoint) + .await + .expect("Failed to connect to Postgres"); + + (cfg, table, connection) +} + +async fn insert_event_with_cfg(cfg: String, table: String, mut connection: PgConnection) { + let create_table_sql = + format!("CREATE TABLE IF NOT EXISTS {table} (host text, timestamp text, message text)",); + sqlx::query(&create_table_sql) + .execute(&mut connection) + .await + .unwrap(); + + let (config, _) = load_sink::(&cfg).unwrap(); + let (sink, _hc) = config.build(SinkContext::default()).await.unwrap(); + + let (input_event, mut receiver) = make_event(); + run_and_assert_sink_compliance( + sink, + stream::once(ready(input_event.clone())), + &["endpoint", "protocol"], + ) + .await; + + let select_all_sql = format!("SELECT * FROM {table}"); + let events: Vec = sqlx::query_as(&select_all_sql) + .fetch_all(&mut connection) + .await + .unwrap(); + + assert_eq!(1, events.len()); + + // drop input_event after comparing with response + { + let log_event = input_event.into_log(); + let expected = serde_json::to_value(&log_event).unwrap(); + let actual = serde_json::to_value(&events[0]).unwrap(); + assert_eq!(expected, actual); + } + + assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered)); +} + +#[tokio::test] +async fn test_postgres_sink() { + let (cfg, table, connection) = prepare_config().await; + insert_event_with_cfg(cfg, table, connection).await; +} diff --git a/src/sinks/postgres/mod.rs b/src/sinks/postgres/mod.rs new file mode 100644 index 0000000000000..c8cad35966eff --- /dev/null +++ b/src/sinks/postgres/mod.rs @@ -0,0 +1,8 @@ +mod config; +mod service; +mod sink; +// #[cfg(all(test, feature = "postgres-integration-tests"))] +#[cfg(test)] +mod integration_tests; + +pub use self::config::PostgresConfig; diff --git a/src/sinks/postgres/service.rs b/src/sinks/postgres/service.rs new file mode 100644 index 0000000000000..8542f1292f5fe --- /dev/null +++ b/src/sinks/postgres/service.rs @@ -0,0 +1,143 @@ +use std::num::NonZeroUsize; +use std::task::{Context, Poll}; + +use crate::internal_events::EndpointBytesSent; +use crate::sinks::prelude::{RequestMetadataBuilder, RetryLogic}; +use futures::future::BoxFuture; +use sqlx::types::Json; +use sqlx::{Error as PostgresError, Pool, Postgres}; +use tower::Service; +use vector_lib::event::{EventFinalizers, EventStatus, Finalizable, LogEvent}; +use vector_lib::request_metadata::{GroupedCountByteSize, MetaDescriptive, RequestMetadata}; +use vector_lib::stream::DriverResponse; +use vector_lib::EstimatedJsonEncodedSizeOf; + +const POSTGRES_PROTOCOL: &str = "postgres"; + +#[derive(Clone)] +pub struct PostgresRetryLogic; + +impl RetryLogic for PostgresRetryLogic { + type Error = PostgresError; + type Response = PostgresResponse; + + fn is_retriable_error(&self, _error: &Self::Error) -> bool { + // TODO: Implement this + false + } +} + +// TODO: make this a cheap clone wrapping all the fields behind Arc/Rc +#[derive(Clone)] +pub struct PostgresService { + // TODO: change type to pool? + connection_pool: Pool, + table: String, + endpoint: String, +} + +impl PostgresService { + pub fn new(connection_pool: Pool, table: String, endpoint: String) -> Self { + Self { + connection_pool, + table, + endpoint, + } + } +} + +// TODO: do we need this clone? +#[derive(Clone)] +pub struct PostgresRequest { + pub events: Vec, + pub finalizers: EventFinalizers, + pub metadata: RequestMetadata, +} + +impl From> for PostgresRequest { + fn from(mut events: Vec) -> Self { + let finalizers = events.take_finalizers(); + let metadata_builder = RequestMetadataBuilder::from_events(&events); + let events_size = NonZeroUsize::new(events.estimated_json_encoded_size_of().get()) + .expect("payload should never be zero length"); + // TODO: is this metadata calculation ok? + let metadata = metadata_builder.with_request_size(events_size); + PostgresRequest { + events, + finalizers, + metadata, + } + } +} + +impl Finalizable for PostgresRequest { + fn take_finalizers(&mut self) -> EventFinalizers { + self.finalizers.take_finalizers() + } +} + +impl MetaDescriptive for PostgresRequest { + fn get_metadata(&self) -> &RequestMetadata { + &self.metadata + } + + fn metadata_mut(&mut self) -> &mut RequestMetadata { + &mut self.metadata + } +} + +pub struct PostgresResponse { + metadata: RequestMetadata, +} + +impl DriverResponse for PostgresResponse { + fn event_status(&self) -> EventStatus { + EventStatus::Delivered + } + + fn events_sent(&self) -> &GroupedCountByteSize { + // self.metadata.events_estimated_json_encoded_byte_size() + // TODO: implement this + self.metadata.events_estimated_json_encoded_byte_size() + } + + fn bytes_sent(&self) -> Option { + Some(self.metadata.request_encoded_size()) + } +} + +impl Service for PostgresService { + type Response = PostgresResponse; + type Error = PostgresError; + type Future = BoxFuture<'static, Result>; + + fn poll_ready(&mut self, _cx: &mut Context) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, request: PostgresRequest) -> Self::Future { + let service = self.clone(); + let future = async move { + let table = service.table; + let metadata = request.metadata; + // TODO: If a single item of the batch fails, the whole batch will fail its insert. + // Is this intended behaviour? + sqlx::query(&format!( + "INSERT INTO {table} SELECT * FROM jsonb_populate_recordset(NULL::{table}, $1)" + )) + .bind(Json(request.events)) + .execute(&service.connection_pool) + .await?; + + emit!(EndpointBytesSent { + byte_size: metadata.request_encoded_size(), + protocol: POSTGRES_PROTOCOL, + endpoint: &service.endpoint, + }); + + Ok(PostgresResponse { metadata }) + }; + + Box::pin(future) + } +} diff --git a/src/sinks/postgres/sink.rs b/src/sinks/postgres/sink.rs new file mode 100644 index 0000000000000..9439c2fc6db7c --- /dev/null +++ b/src/sinks/postgres/sink.rs @@ -0,0 +1,41 @@ +use super::service::{PostgresRequest, PostgresRetryLogic, PostgresService}; +use crate::sinks::prelude::*; + +pub struct PostgresSink { + // TODO: move those properties to the service + service: Svc, + + batch_settings: BatcherSettings, +} + +impl PostgresSink { + pub fn new( + service: Svc, + batch_settings: BatcherSettings, + ) -> Self { + Self { + service, + batch_settings, + } + } + + async fn run_inner(self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { + input + .map(Event::into_log) + .batched(self.batch_settings.as_byte_size_config()) + // . + // TODO: use request builder? + // . + .map(PostgresRequest::from) + .into_driver(self.service) + .run() + .await + } +} + +#[async_trait::async_trait] +impl StreamSink for PostgresSink { + async fn run(mut self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { + self.run_inner(input).await + } +} From 0a0576b39c62f3faac1a38004988d928b1cbafe8 Mon Sep 17 00:00:00 2001 From: Jorge Hermo Date: Mon, 9 Sep 2024 01:40:01 +0200 Subject: [PATCH 02/23] feat(sinks): Initial postgres log sink implementation --- src/sinks/postgres/mod.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/sinks/postgres/mod.rs b/src/sinks/postgres/mod.rs index c8cad35966eff..c62ce0b342814 100644 --- a/src/sinks/postgres/mod.rs +++ b/src/sinks/postgres/mod.rs @@ -1,8 +1,7 @@ mod config; +#[cfg(all(test, feature = "postgres-integration-tests"))] +mod integration_tests; mod service; mod sink; -// #[cfg(all(test, feature = "postgres-integration-tests"))] -#[cfg(test)] -mod integration_tests; pub use self::config::PostgresConfig; From bf2b7af0fe516d3a982157b233040d9810d98ac7 Mon Sep 17 00:00:00 2001 From: Jorge Hermo Date: Tue, 10 Sep 2024 00:17:42 +0200 Subject: [PATCH 03/23] test(sinks): Add integration test for postgres sink --- src/sinks/postgres/config.rs | 18 +++++++++++++----- src/sinks/postgres/integration_tests.rs | 13 +++++++++---- src/sinks/postgres/mod.rs | 3 ++- src/sinks/postgres/service.rs | 7 ++----- src/sinks/postgres/sink.rs | 5 +---- 5 files changed, 27 insertions(+), 19 deletions(-) diff --git a/src/sinks/postgres/config.rs b/src/sinks/postgres/config.rs index 015187113a1be..ca8f133014daa 100644 --- a/src/sinks/postgres/config.rs +++ b/src/sinks/postgres/config.rs @@ -23,19 +23,27 @@ use crate::{ }, }; +const fn default_pool_size() -> u32 { + 5 +} + /// Configuration for the `postgres` sink. #[configurable_component(sink("postgres", "Deliver log data to a PostgreSQL database."))] #[derive(Clone, Debug)] #[serde(deny_unknown_fields)] pub struct PostgresConfig { - /// TODO - /// TODO: if I used UriSerde instead of String, I couldn't get a url string to use - /// in the connection pool, as the password would be redacted with UriSerde::to_string + // TODO: if I used UriSerde instead of String, I couldn't get a url string to use + // in the connection pool, as the password would be redacted with UriSerde::to_string + /// The connection string for the PostgreSQL server. It can contain the username and password. pub endpoint: String, - /// TODO + /// The table that data is inserted into. pub table: String, + /// The postgres connection pool size. + #[serde(default = "default_pool_size")] + pub pool_size: u32, + #[configurable(derived)] #[serde(default)] pub batch: BatchConfig, @@ -72,7 +80,7 @@ impl SinkConfig for PostgresConfig { // TODO: it seems that the number of connections in the pool does not affect the throughput of the sink // does the sink execute batches in parallel? let connection_pool = PgPoolOptions::new() - .max_connections(5) + .max_connections(self.pool_size) .connect(&self.endpoint) .await?; diff --git a/src/sinks/postgres/integration_tests.rs b/src/sinks/postgres/integration_tests.rs index 5db12843fe3c7..30f44d5ee76ef 100644 --- a/src/sinks/postgres/integration_tests.rs +++ b/src/sinks/postgres/integration_tests.rs @@ -8,7 +8,7 @@ use crate::{ }; use futures::stream; use serde::{Deserialize, Serialize}; -use sqlx::{Connection, PgConnection}; +use sqlx::{Connection, FromRow, PgConnection}; use std::future::ready; use vector_lib::event::{BatchNotifier, BatchStatus, BatchStatusReceiver, Event, LogEvent}; @@ -29,14 +29,17 @@ fn make_event() -> (Event, BatchStatusReceiver) { let (batch, receiver) = BatchNotifier::new_with_receiver(); let mut event = LogEvent::from("raw log line").with_batch_notifier(&batch); event.insert("host", "example.com"); + let event_payload = event.clone().into_parts().0; + event.insert("payload", event_payload); (event.into(), receiver) } -#[derive(Serialize, Deserialize, sqlx::FromRow)] +#[derive(Debug, Serialize, Deserialize, FromRow)] struct TestEvent { host: String, timestamp: String, message: String, + payload: serde_json::Value, } async fn prepare_config() -> (String, String, PgConnection) { @@ -62,8 +65,10 @@ async fn prepare_config() -> (String, String, PgConnection) { } async fn insert_event_with_cfg(cfg: String, table: String, mut connection: PgConnection) { + // We store the timestamp as text and not as `timestamp with timezone` postgres type due to + // postgres not supporting nanosecond-resolution (it does support microsecond-resolution). let create_table_sql = - format!("CREATE TABLE IF NOT EXISTS {table} (host text, timestamp text, message text)",); + format!("CREATE TABLE IF NOT EXISTS {table} (host text, timestamp text, message text, payload jsonb)",); sqlx::query(&create_table_sql) .execute(&mut connection) .await @@ -85,7 +90,7 @@ async fn insert_event_with_cfg(cfg: String, table: String, mut connection: PgCon .fetch_all(&mut connection) .await .unwrap(); - + dbg!(&events); assert_eq!(1, events.len()); // drop input_event after comparing with response diff --git a/src/sinks/postgres/mod.rs b/src/sinks/postgres/mod.rs index c62ce0b342814..5bea153d1bc9c 100644 --- a/src/sinks/postgres/mod.rs +++ b/src/sinks/postgres/mod.rs @@ -1,5 +1,6 @@ mod config; -#[cfg(all(test, feature = "postgres-integration-tests"))] +// #[cfg(all(test, feature = "postgres-integration-tests"))] +#[cfg(test)] mod integration_tests; mod service; mod sink; diff --git a/src/sinks/postgres/service.rs b/src/sinks/postgres/service.rs index 8542f1292f5fe..50cac001b921e 100644 --- a/src/sinks/postgres/service.rs +++ b/src/sinks/postgres/service.rs @@ -27,10 +27,8 @@ impl RetryLogic for PostgresRetryLogic { } } -// TODO: make this a cheap clone wrapping all the fields behind Arc/Rc #[derive(Clone)] pub struct PostgresService { - // TODO: change type to pool? connection_pool: Pool, table: String, endpoint: String, @@ -60,7 +58,7 @@ impl From> for PostgresRequest { let metadata_builder = RequestMetadataBuilder::from_events(&events); let events_size = NonZeroUsize::new(events.estimated_json_encoded_size_of().get()) .expect("payload should never be zero length"); - // TODO: is this metadata calculation ok? + // TODO: is this metadata creation correct? let metadata = metadata_builder.with_request_size(events_size); PostgresRequest { events, @@ -96,8 +94,7 @@ impl DriverResponse for PostgresResponse { } fn events_sent(&self) -> &GroupedCountByteSize { - // self.metadata.events_estimated_json_encoded_byte_size() - // TODO: implement this + // TODO: Is this correct? self.metadata.events_estimated_json_encoded_byte_size() } diff --git a/src/sinks/postgres/sink.rs b/src/sinks/postgres/sink.rs index 9439c2fc6db7c..8863bea22ab48 100644 --- a/src/sinks/postgres/sink.rs +++ b/src/sinks/postgres/sink.rs @@ -2,9 +2,7 @@ use super::service::{PostgresRequest, PostgresRetryLogic, PostgresService}; use crate::sinks::prelude::*; pub struct PostgresSink { - // TODO: move those properties to the service service: Svc, - batch_settings: BatcherSettings, } @@ -22,10 +20,9 @@ impl PostgresSink { async fn run_inner(self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { input .map(Event::into_log) + // TODO: is this batch setting ok? .batched(self.batch_settings.as_byte_size_config()) - // . // TODO: use request builder? - // . .map(PostgresRequest::from) .into_driver(self.service) .run() From 2ecbd56311c7cb658dbb74b106f750624cca04ac Mon Sep 17 00:00:00 2001 From: Jorge Hermo Date: Tue, 10 Sep 2024 00:21:11 +0200 Subject: [PATCH 04/23] feat(sinks): postgres sink config unit tests --- src/sinks/postgres/config.rs | 26 ++++++++++++++++++++++++++ src/sinks/postgres/mod.rs | 3 +-- 2 files changed, 27 insertions(+), 2 deletions(-) diff --git a/src/sinks/postgres/config.rs b/src/sinks/postgres/config.rs index ca8f133014daa..ae32a63401146 100644 --- a/src/sinks/postgres/config.rs +++ b/src/sinks/postgres/config.rs @@ -119,3 +119,29 @@ async fn healthcheck(connection_pool: Pool) -> crate::Result<()> { sqlx::query("SELECT 1").execute(&connection_pool).await?; Ok(()) } + +#[cfg(test)] +mod tests{ + use super::*; + + #[test] + fn generate_config() { + crate::test_util::test_generate_config::(); + } + + #[test] + fn parse_config() { + let cfg = toml::from_str::( + r#" + endpoint = "postgres://user:password@localhost/default" + table = "mytable" + "#, + ) + .unwrap(); + assert_eq!( + cfg.endpoint, + "postgres://user:password@localhost/default" + ); + assert_eq!(cfg.table, "mytable"); + } +} diff --git a/src/sinks/postgres/mod.rs b/src/sinks/postgres/mod.rs index 5bea153d1bc9c..c62ce0b342814 100644 --- a/src/sinks/postgres/mod.rs +++ b/src/sinks/postgres/mod.rs @@ -1,6 +1,5 @@ mod config; -// #[cfg(all(test, feature = "postgres-integration-tests"))] -#[cfg(test)] +#[cfg(all(test, feature = "postgres-integration-tests"))] mod integration_tests; mod service; mod sink; From cee4b39b9e2abfd57f1b07f78c4c469695cbc41b Mon Sep 17 00:00:00 2001 From: Jorge Hermo Date: Tue, 10 Sep 2024 00:38:42 +0200 Subject: [PATCH 05/23] docs(website): update components cue --- .../components/sinks/base/postgres.cue | 263 ++++++++++++++++++ 1 file changed, 263 insertions(+) create mode 100644 website/cue/reference/components/sinks/base/postgres.cue diff --git a/website/cue/reference/components/sinks/base/postgres.cue b/website/cue/reference/components/sinks/base/postgres.cue new file mode 100644 index 0000000000000..1d3557c8225d5 --- /dev/null +++ b/website/cue/reference/components/sinks/base/postgres.cue @@ -0,0 +1,263 @@ +package metadata + +base: components: sinks: postgres: configuration: { + acknowledgements: { + description: """ + Controls how acknowledgements are handled for this sink. + + See [End-to-end Acknowledgements][e2e_acks] for more information on how event acknowledgement is handled. + + [e2e_acks]: https://vector.dev/docs/about/under-the-hood/architecture/end-to-end-acknowledgements/ + """ + required: false + type: object: options: enabled: { + description: """ + Whether or not end-to-end acknowledgements are enabled. + + When enabled for a sink, any source connected to that sink, where the source supports + end-to-end acknowledgements as well, waits for events to be acknowledged by **all + connected** sinks before acknowledging them at the source. + + Enabling or disabling acknowledgements at the sink level takes precedence over any global + [`acknowledgements`][global_acks] configuration. + + [global_acks]: https://vector.dev/docs/reference/configuration/global-options/#acknowledgements + """ + required: false + type: bool: {} + } + } + batch: { + description: "Event batching behavior." + required: false + type: object: options: { + max_bytes: { + description: """ + The maximum size of a batch that is processed by a sink. + + This is based on the uncompressed size of the batched events, before they are + serialized/compressed. + """ + required: false + type: uint: { + default: 10000000 + unit: "bytes" + } + } + max_events: { + description: "The maximum size of a batch before it is flushed." + required: false + type: uint: unit: "events" + } + timeout_secs: { + description: "The maximum age of a batch before it is flushed." + required: false + type: float: { + default: 1.0 + unit: "seconds" + } + } + } + } + endpoint: { + description: "The connection string for the PostgreSQL server. It can contain the username and password." + required: true + type: string: {} + } + pool_size: { + description: "The postgres connection pool size." + required: false + type: uint: default: 5 + } + request: { + description: """ + Middleware settings for outbound requests. + + Various settings can be configured, such as concurrency and rate limits, timeouts, retry behavior, etc. + + Note that the retry backoff policy follows the Fibonacci sequence. + """ + required: false + type: object: options: { + adaptive_concurrency: { + description: """ + Configuration of adaptive concurrency parameters. + + These parameters typically do not require changes from the default, and incorrect values can lead to meta-stable or + unstable performance and sink behavior. Proceed with caution. + """ + required: false + type: object: options: { + decrease_ratio: { + description: """ + The fraction of the current value to set the new concurrency limit when decreasing the limit. + + Valid values are greater than `0` and less than `1`. Smaller values cause the algorithm to scale back rapidly + when latency increases. + + Note that the new limit is rounded down after applying this ratio. + """ + required: false + type: float: default: 0.9 + } + ewma_alpha: { + description: """ + The weighting of new measurements compared to older measurements. + + Valid values are greater than `0` and less than `1`. + + ARC uses an exponentially weighted moving average (EWMA) of past RTT measurements as a reference to compare with + the current RTT. Smaller values cause this reference to adjust more slowly, which may be useful if a service has + unusually high response variability. + """ + required: false + type: float: default: 0.4 + } + initial_concurrency: { + description: """ + The initial concurrency limit to use. If not specified, the initial limit will be 1 (no concurrency). + + It is recommended to set this value to your service's average limit if you're seeing that it takes a + long time to ramp up adaptive concurrency after a restart. You can find this value by looking at the + `adaptive_concurrency_limit` metric. + """ + required: false + type: uint: default: 1 + } + max_concurrency_limit: { + description: """ + The maximum concurrency limit. + + The adaptive request concurrency limit will not go above this bound. This is put in place as a safeguard. + """ + required: false + type: uint: default: 200 + } + rtt_deviation_scale: { + description: """ + Scale of RTT deviations which are not considered anomalous. + + Valid values are greater than or equal to `0`, and we expect reasonable values to range from `1.0` to `3.0`. + + When calculating the past RTT average, we also compute a secondary “deviation” value that indicates how variable + those values are. We use that deviation when comparing the past RTT average to the current measurements, so we + can ignore increases in RTT that are within an expected range. This factor is used to scale up the deviation to + an appropriate range. Larger values cause the algorithm to ignore larger increases in the RTT. + """ + required: false + type: float: default: 2.5 + } + } + } + concurrency: { + description: """ + Configuration for outbound request concurrency. + + This can be set either to one of the below enum values or to a positive integer, which denotes + a fixed concurrency limit. + """ + required: false + type: { + string: { + default: "adaptive" + enum: { + adaptive: """ + Concurrency will be managed by Vector's [Adaptive Request Concurrency][arc] feature. + + [arc]: https://vector.dev/docs/about/under-the-hood/networking/arc/ + """ + none: """ + A fixed concurrency of 1. + + Only one request can be outstanding at any given time. + """ + } + } + uint: {} + } + } + rate_limit_duration_secs: { + description: "The time window used for the `rate_limit_num` option." + required: false + type: uint: { + default: 1 + unit: "seconds" + } + } + rate_limit_num: { + description: "The maximum number of requests allowed within the `rate_limit_duration_secs` time window." + required: false + type: uint: { + default: 9223372036854775807 + unit: "requests" + } + } + retry_attempts: { + description: "The maximum number of retries to make for failed requests." + required: false + type: uint: { + default: 9223372036854775807 + unit: "retries" + } + } + retry_initial_backoff_secs: { + description: """ + The amount of time to wait before attempting the first retry for a failed request. + + After the first retry has failed, the fibonacci sequence is used to select future backoffs. + """ + required: false + type: uint: { + default: 1 + unit: "seconds" + } + } + retry_jitter_mode: { + description: "The jitter mode to use for retry backoff behavior." + required: false + type: string: { + default: "Full" + enum: { + Full: """ + Full jitter. + + The random delay is anywhere from 0 up to the maximum current delay calculated by the backoff + strategy. + + Incorporating full jitter into your backoff strategy can greatly reduce the likelihood + of creating accidental denial of service (DoS) conditions against your own systems when + many clients are recovering from a failure state. + """ + None: "No jitter." + } + } + } + retry_max_duration_secs: { + description: "The maximum amount of time to wait between retries." + required: false + type: uint: { + default: 30 + unit: "seconds" + } + } + timeout_secs: { + description: """ + The time a request can take before being aborted. + + Datadog highly recommends that you do not lower this value below the service's internal timeout, as this could + create orphaned requests, pile on retries, and result in duplicate data downstream. + """ + required: false + type: uint: { + default: 60 + unit: "seconds" + } + } + } + } + table: { + description: "The table that data is inserted into." + required: true + type: string: {} + } +} From 548e996fec15eb6d743dfe4cbd15d67d4cd87d09 Mon Sep 17 00:00:00 2001 From: Jorge Hermo Date: Tue, 10 Sep 2024 00:41:21 +0200 Subject: [PATCH 06/23] chore: update spelling checklist --- .github/actions/spelling/expect.txt | 1 + Cargo.lock | 4 ++-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/.github/actions/spelling/expect.txt b/.github/actions/spelling/expect.txt index f239a69d464e9..5cfd900366012 100644 --- a/.github/actions/spelling/expect.txt +++ b/.github/actions/spelling/expect.txt @@ -1019,6 +1019,7 @@ splitn SPOF spog springframework +sqlx srcport SREs sret diff --git a/Cargo.lock b/Cargo.lock index dfef60c9f3842..74bcf6ac515e1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9079,7 +9079,7 @@ dependencies = [ "quote 1.0.37", "sqlx-core", "sqlx-macros-core", - "syn 2.0.75", + "syn 2.0.77", ] [[package]] @@ -9100,7 +9100,7 @@ dependencies = [ "sha2", "sqlx-core", "sqlx-postgres", - "syn 2.0.75", + "syn 2.0.77", "tempfile", "tokio", "url", From ea82ad445140638fbe7b78b5fcfd22128140e218 Mon Sep 17 00:00:00 2001 From: Jorge Hermo Date: Tue, 10 Sep 2024 00:42:12 +0200 Subject: [PATCH 07/23] style: cargo fmt --- src/sinks/postgres/config.rs | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/src/sinks/postgres/config.rs b/src/sinks/postgres/config.rs index ae32a63401146..b30557eff3a66 100644 --- a/src/sinks/postgres/config.rs +++ b/src/sinks/postgres/config.rs @@ -121,7 +121,7 @@ async fn healthcheck(connection_pool: Pool) -> crate::Result<()> { } #[cfg(test)] -mod tests{ +mod tests { use super::*; #[test] @@ -138,10 +138,7 @@ mod tests{ "#, ) .unwrap(); - assert_eq!( - cfg.endpoint, - "postgres://user:password@localhost/default" - ); + assert_eq!(cfg.endpoint, "postgres://user:password@localhost/default"); assert_eq!(cfg.table, "mytable"); } } From ae68fc42abf84c19eb7e40888134b780a6de1f94 Mon Sep 17 00:00:00 2001 From: Jorge Hermo Date: Tue, 10 Sep 2024 00:42:34 +0200 Subject: [PATCH 08/23] fix: clippy lint --- src/sinks/postgres/service.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/sinks/postgres/service.rs b/src/sinks/postgres/service.rs index 50cac001b921e..99e22adc1d80f 100644 --- a/src/sinks/postgres/service.rs +++ b/src/sinks/postgres/service.rs @@ -35,7 +35,7 @@ pub struct PostgresService { } impl PostgresService { - pub fn new(connection_pool: Pool, table: String, endpoint: String) -> Self { + pub const fn new(connection_pool: Pool, table: String, endpoint: String) -> Self { Self { connection_pool, table, From 89e63e336dcc2c05e385a279d1506cbd6a49b5b2 Mon Sep 17 00:00:00 2001 From: Jorge Hermo Date: Tue, 10 Sep 2024 00:44:56 +0200 Subject: [PATCH 09/23] fix: clippy lint --- src/sinks/postgres/sink.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/sinks/postgres/sink.rs b/src/sinks/postgres/sink.rs index 8863bea22ab48..6416ea9f27ded 100644 --- a/src/sinks/postgres/sink.rs +++ b/src/sinks/postgres/sink.rs @@ -7,7 +7,7 @@ pub struct PostgresSink { } impl PostgresSink { - pub fn new( + pub const fn new( service: Svc, batch_settings: BatcherSettings, ) -> Self { From f68f04b80262d7076124857162943f42f465a8f7 Mon Sep 17 00:00:00 2001 From: Jorge Hermo Date: Tue, 10 Sep 2024 00:45:04 +0200 Subject: [PATCH 10/23] docs: add changelog --- changelog.d/15765_postgres_sink.feature.md | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 changelog.d/15765_postgres_sink.feature.md diff --git a/changelog.d/15765_postgres_sink.feature.md b/changelog.d/15765_postgres_sink.feature.md new file mode 100644 index 0000000000000..c37cf472dfb6f --- /dev/null +++ b/changelog.d/15765_postgres_sink.feature.md @@ -0,0 +1,3 @@ +Add a new postgres sink which allows to send log events to a postgres database. + +authors:jorgehermo9 From f4cd4c766798cc787ccb8a5a9b153db57fec0be7 Mon Sep 17 00:00:00 2001 From: Jorge Hermo Date: Tue, 10 Sep 2024 00:49:25 +0200 Subject: [PATCH 11/23] docs: add changelog --- changelog.d/15765_postgres_sink.feature.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/changelog.d/15765_postgres_sink.feature.md b/changelog.d/15765_postgres_sink.feature.md index c37cf472dfb6f..294b5f3bc3f3f 100644 --- a/changelog.d/15765_postgres_sink.feature.md +++ b/changelog.d/15765_postgres_sink.feature.md @@ -1,3 +1,3 @@ Add a new postgres sink which allows to send log events to a postgres database. -authors:jorgehermo9 +authors: jorgehermo9 From 984750d631ea94b1f66013112ae140f5edb1287a Mon Sep 17 00:00:00 2001 From: Jorge Hermo Date: Fri, 22 Nov 2024 17:45:43 +0100 Subject: [PATCH 12/23] chore: rename postgres integration tests feature flag --- scripts/integration/postgres/test.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/integration/postgres/test.yaml b/scripts/integration/postgres/test.yaml index 66562f32b65fc..a89049f545948 100644 --- a/scripts/integration/postgres/test.yaml +++ b/scripts/integration/postgres/test.yaml @@ -1,6 +1,6 @@ features: - postgresql_metrics-integration-tests -- postgres-integration-tests +- postgres_sink-integration-tests test_filter: ::postgres From 5b1ca45a99afd35e53a4f317bbb1cbffde9803be Mon Sep 17 00:00:00 2001 From: Jorge Hermo Date: Sat, 23 Nov 2024 23:31:47 +0100 Subject: [PATCH 13/23] chore: rename postgres integration tests feature flag --- Cargo.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index e56a26e71dc5a..225965934b64e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -838,7 +838,7 @@ all-integration-tests = [ "nginx-integration-tests", "opentelemetry-integration-tests", "postgresql_metrics-integration-tests", - "postgres-integration-tests", + "postgres_sink-integration-tests", "prometheus-integration-tests", "pulsar-integration-tests", "redis-integration-tests", @@ -903,7 +903,7 @@ nats-integration-tests = ["sinks-nats", "sources-nats"] nginx-integration-tests = ["sources-nginx_metrics"] opentelemetry-integration-tests = ["sources-opentelemetry", "dep:prost"] postgresql_metrics-integration-tests = ["sources-postgresql_metrics"] -postgres-integration-tests = ["sinks-postgres"] +postgres_sink-integration-tests = ["sinks-postgres"] prometheus-integration-tests = ["sinks-prometheus", "sources-prometheus", "sinks-influxdb"] pulsar-integration-tests = ["sinks-pulsar", "sources-pulsar"] redis-integration-tests = ["sinks-redis", "sources-redis"] From 21f3ad3a56090b94cb8f58ec7bf3bfbf4fb51439 Mon Sep 17 00:00:00 2001 From: Jorge Hermo Date: Sun, 24 Nov 2024 14:06:59 +0100 Subject: [PATCH 14/23] docs: include connection pool reference --- src/sinks/postgres/config.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/sinks/postgres/config.rs b/src/sinks/postgres/config.rs index b30557eff3a66..b5b325a51c1d0 100644 --- a/src/sinks/postgres/config.rs +++ b/src/sinks/postgres/config.rs @@ -40,7 +40,8 @@ pub struct PostgresConfig { /// The table that data is inserted into. pub table: String, - /// The postgres connection pool size. + /// The postgres connection pool size. See [this](https://docs.rs/sqlx/latest/sqlx/struct.Pool.html#why-use-a-pool) for more + /// information about why a connection pool should be used. #[serde(default = "default_pool_size")] pub pool_size: u32, @@ -106,6 +107,7 @@ impl SinkConfig for PostgresConfig { Ok((VectorSink::from_event_streamsink(sink), healthcheck)) } + // TODO: allow for Input::all() fn input(&self) -> Input { Input::log() } From 0bea2f71eef375387a703bf7c9e3098e863f086d Mon Sep 17 00:00:00 2001 From: Jorge Hermo Date: Fri, 6 Dec 2024 18:40:11 +0100 Subject: [PATCH 15/23] fix: fix compilation issue in benches --- benches/metrics_snapshot.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/benches/metrics_snapshot.rs b/benches/metrics_snapshot.rs index bb59ccf65433c..1611fe624cd98 100644 --- a/benches/metrics_snapshot.rs +++ b/benches/metrics_snapshot.rs @@ -23,7 +23,7 @@ fn prepare_metrics(cardinality: usize) -> &'static vector::metrics::Controller { controller.reset(); for idx in 0..cardinality { - metrics::counter!("test", 1, "idx" => idx.to_string()); + metrics::counter!("test", "idx" => idx.to_string()); } controller From e45055d348c74e31bb7d1e9cccaaf469bb6cb2c0 Mon Sep 17 00:00:00 2001 From: Jorge Hermo Date: Fri, 6 Dec 2024 18:40:39 +0100 Subject: [PATCH 16/23] chore: rename feature flag --- src/sinks/postgres/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/sinks/postgres/mod.rs b/src/sinks/postgres/mod.rs index c62ce0b342814..ffbe6d9a4b071 100644 --- a/src/sinks/postgres/mod.rs +++ b/src/sinks/postgres/mod.rs @@ -1,5 +1,5 @@ mod config; -#[cfg(all(test, feature = "postgres-integration-tests"))] +#[cfg(all(test, feature = "postgres_sink-integration-tests"))] mod integration_tests; mod service; mod sink; From 6805961e981eac67d388bbcf84f103d9d2a9ed10 Mon Sep 17 00:00:00 2001 From: Jorge Hermo Date: Fri, 6 Dec 2024 18:47:22 +0100 Subject: [PATCH 17/23] feat: update --- Cargo.lock | 200 +++++++++++++++++++++++- scripts/integration/postgres/test.yaml | 4 +- src/sinks/databend/integration_tests.rs | 8 +- src/sinks/postgres/integration_tests.rs | 23 +-- src/sources/postgresql_metrics.rs | 29 +--- src/test_util/mod.rs | 4 + 6 files changed, 216 insertions(+), 52 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 19a9f2eed20d4..40651c009440f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -750,6 +750,15 @@ dependencies = [ "syn 2.0.90", ] +[[package]] +name = "atoi" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f28d99ec8bfea296261ca1af174f24225171fea9664ba9003cbebee704810528" +dependencies = [ + "num-traits", +] + [[package]] name = "atomic-waker" version = "1.1.2" @@ -3134,6 +3143,12 @@ dependencies = [ "tracing 0.1.41", ] +[[package]] +name = "dotenvy" +version = "0.15.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1aaf95b3e5c8f23aa320147307562d361db0ae0d51242340f558153b4eb2439b" + [[package]] name = "duct" version = "0.13.6" @@ -3201,6 +3216,9 @@ name = "either" version = "1.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a26ae43d7bcc3b814de94796a5e736d4029efb0ee900c12e2d54c993ad1a1e07" +dependencies = [ + "serde", +] [[package]] name = "elliptic-curve" @@ -3405,6 +3423,17 @@ version = "3.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "281e452d3bad4005426416cdba5ccfd4f5c1280e10099e21db27f7c1c28347fc" +[[package]] +name = "etcetera" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "136d1b5283a1ab77bd9257427ffd09d8667ced0570b6f938942bc7568ed5b943" +dependencies = [ + "cfg-if", + "home", + "windows-sys 0.48.0", +] + [[package]] name = "event-listener" version = "2.5.3" @@ -3732,6 +3761,17 @@ dependencies = [ "futures-util", ] +[[package]] +name = "futures-intrusive" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d930c203dd0b6ff06e0201a4a2fe9149b43c684fd4420555b26d21b1a02956f" +dependencies = [ + "futures-core", + "lock_api", + "parking_lot", +] + [[package]] name = "futures-io" version = "0.3.31" @@ -4091,6 +4131,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" dependencies = [ "ahash 0.8.11", + "allocator-api2", ] [[package]] @@ -4104,6 +4145,15 @@ dependencies = [ "foldhash", ] +[[package]] +name = "hashlink" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ba4ff7128dee98c7dc9794b6a411377e1404dba1c97deb8d1a55297bd25d8af" +dependencies = [ + "hashbrown 0.14.5", +] + [[package]] name = "hdrhistogram" version = "7.5.4" @@ -4490,7 +4540,7 @@ dependencies = [ "httpdate", "itoa", "pin-project-lite", - "socket2 0.4.10", + "socket2 0.5.8", "tokio", "tower-service", "tracing 0.1.41", @@ -7458,7 +7508,7 @@ checksum = "22505a5c94da8e3b7c2996394d1c933236c4d743e81a410bcca4e6989fc066a4" dependencies = [ "bytes 1.9.0", "heck 0.5.0", - "itertools 0.11.0", + "itertools 0.12.1", "log", "multimap", "once_cell", @@ -7491,7 +7541,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "81bddcdb20abf9501610992b6759a4c888aef7d1a7247ef75e2404275ac24af1" dependencies = [ "anyhow", - "itertools 0.11.0", + "itertools 0.12.1", "proc-macro2 1.0.92", "quote 1.0.37", "syn 2.0.90", @@ -7504,7 +7554,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e9552f850d5f0964a4e4d0bf306459ac29323ddfbae05e35a7c0d35cb0803cc5" dependencies = [ "anyhow", - "itertools 0.11.0", + "itertools 0.13.0", "proc-macro2 1.0.92", "quote 1.0.37", "syn 2.0.90", @@ -9258,6 +9308,141 @@ dependencies = [ "der", ] +[[package]] +name = "sqlformat" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7bba3a93db0cc4f7bdece8bb09e77e2e785c20bfebf79eb8340ed80708048790" +dependencies = [ + "nom", + "unicode_categories", +] + +[[package]] +name = "sqlx" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93334716a037193fac19df402f8571269c84a00852f6a7066b5d2616dcd64d3e" +dependencies = [ + "sqlx-core", + "sqlx-macros", + "sqlx-postgres", +] + +[[package]] +name = "sqlx-core" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d4d8060b456358185f7d50c55d9b5066ad956956fddec42ee2e8567134a8936e" +dependencies = [ + "atoi", + "byteorder", + "bytes 1.9.0", + "crc", + "crossbeam-queue", + "either", + "event-listener 5.3.1", + "futures-channel", + "futures-core", + "futures-intrusive", + "futures-io", + "futures-util", + "hashbrown 0.14.5", + "hashlink", + "hex", + "indexmap 2.7.0", + "log", + "memchr", + "once_cell", + "paste", + "percent-encoding", + "serde", + "serde_json", + "sha2", + "smallvec", + "sqlformat", + "thiserror 1.0.68", + "tokio", + "tokio-stream", + "tracing 0.1.41", + "url", +] + +[[package]] +name = "sqlx-macros" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cac0692bcc9de3b073e8d747391827297e075c7710ff6276d9f7a1f3d58c6657" +dependencies = [ + "proc-macro2 1.0.92", + "quote 1.0.37", + "sqlx-core", + "sqlx-macros-core", + "syn 2.0.90", +] + +[[package]] +name = "sqlx-macros-core" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1804e8a7c7865599c9c79be146dc8a9fd8cc86935fa641d3ea58e5f0688abaa5" +dependencies = [ + "dotenvy", + "either", + "heck 0.5.0", + "hex", + "once_cell", + "proc-macro2 1.0.92", + "quote 1.0.37", + "serde", + "serde_json", + "sha2", + "sqlx-core", + "sqlx-postgres", + "syn 2.0.90", + "tempfile", + "tokio", + "url", +] + +[[package]] +name = "sqlx-postgres" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6fa91a732d854c5d7726349bb4bb879bb9478993ceb764247660aee25f67c2f8" +dependencies = [ + "atoi", + "base64 0.22.1", + "bitflags 2.4.1", + "byteorder", + "crc", + "dotenvy", + "etcetera", + "futures-channel", + "futures-core", + "futures-io", + "futures-util", + "hex", + "hkdf", + "hmac", + "home", + "itoa", + "log", + "md-5", + "memchr", + "once_cell", + "rand 0.8.5", + "serde", + "serde_json", + "sha2", + "smallvec", + "sqlx-core", + "stringprep", + "thiserror 1.0.68", + "tracing 0.1.41", + "whoami", +] + [[package]] name = "stable_deref_trait" version = "1.2.0" @@ -10622,6 +10807,12 @@ version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f962df74c8c05a667b5ee8bcf162993134c104e96440b663c8daa176dc772d8c" +[[package]] +name = "unicode_categories" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "39ec24b3121d976906ece63c9daad25b85969647682eee313cb5779fdd69e14e" + [[package]] name = "universal-hash" version = "0.5.1" @@ -10920,6 +11111,7 @@ dependencies = [ "snafu 0.7.5", "snap", "socket2 0.5.8", + "sqlx", "stream-cancel", "strip-ansi-escapes", "sysinfo", diff --git a/scripts/integration/postgres/test.yaml b/scripts/integration/postgres/test.yaml index a89049f545948..a627448e6c382 100644 --- a/scripts/integration/postgres/test.yaml +++ b/scripts/integration/postgres/test.yaml @@ -5,8 +5,8 @@ features: test_filter: ::postgres env: - PG_HOST: postgres - PG_SOCKET: /pg_socket + PG_URL: postgresql://vector:vector@postgres/postgres + PG_SOCKET_URL: postgresql:///postgres?host=/pg_socket&user=vector&password=vector runner: volumes: diff --git a/src/sinks/databend/integration_tests.rs b/src/sinks/databend/integration_tests.rs index 9c050da165600..7a2e32ac464e4 100644 --- a/src/sinks/databend/integration_tests.rs +++ b/src/sinks/databend/integration_tests.rs @@ -13,7 +13,7 @@ use crate::{ sinks::util::UriSerde, test_util::{ components::{run_and_assert_sink_compliance, HTTP_SINK_TAGS}, - random_string, trace_init, + temp_table, trace_init, }, }; @@ -24,10 +24,6 @@ fn databend_endpoint() -> String { .unwrap_or_else(|_| "databend://vector:vector@databend:8000?sslmode=disable".into()) } -fn gen_table() -> String { - format!("test_{}", random_string(10).to_lowercase()) -} - fn make_event() -> (Event, BatchStatusReceiver) { let (batch, receiver) = BatchNotifier::new_with_receiver(); let mut event = LogEvent::from("raw log line").with_batch_notifier(&batch); @@ -38,7 +34,7 @@ fn make_event() -> (Event, BatchStatusReceiver) { async fn prepare_config(codec: &str, compression: &str) -> (String, String, DatabendAPIClient) { trace_init(); - let table = gen_table(); + let table = temp_table(); let endpoint = databend_endpoint(); let _endpoint: UriSerde = endpoint.parse().unwrap(); diff --git a/src/sinks/postgres/integration_tests.rs b/src/sinks/postgres/integration_tests.rs index 30f44d5ee76ef..add12435a110b 100644 --- a/src/sinks/postgres/integration_tests.rs +++ b/src/sinks/postgres/integration_tests.rs @@ -4,7 +4,7 @@ use crate::{ postgres::PostgresConfig, util::{test::load_sink, UriSerde}, }, - test_util::{components::run_and_assert_sink_compliance, random_string, trace_init}, + test_util::{components::run_and_assert_sink_compliance, temp_table, trace_init}, }; use futures::stream; use serde::{Deserialize, Serialize}; @@ -12,17 +12,8 @@ use sqlx::{Connection, FromRow, PgConnection}; use std::future::ready; use vector_lib::event::{BatchNotifier, BatchStatus, BatchStatusReceiver, Event, LogEvent}; -fn pg_host() -> String { - std::env::var("PG_HOST").unwrap_or_else(|_| "localhost".into()) -} - fn pg_url() -> String { - std::env::var("PG_URL") - .unwrap_or_else(|_| format!("postgres://vector:vector@{}/postgres", pg_host())) -} - -fn gen_table() -> String { - format!("test_{}", random_string(10).to_lowercase()) + std::env::var("PG_URL").expect("PG_URL must be set") } fn make_event() -> (Event, BatchStatusReceiver) { @@ -45,9 +36,8 @@ struct TestEvent { async fn prepare_config() -> (String, String, PgConnection) { trace_init(); - let table = gen_table(); - let endpoint = pg_url(); - let _endpoint: UriSerde = endpoint.parse().unwrap(); + let table = temp_table(); + let endpoint: UriSerde = pg_url().parse().unwrap(); let cfg = format!( r#" @@ -57,12 +47,14 @@ async fn prepare_config() -> (String, String, PgConnection) { "#, ); - let connection = PgConnection::connect(&endpoint) + let connection = PgConnection::connect(endpoint.to_string().as_str()) .await .expect("Failed to connect to Postgres"); (cfg, table, connection) } +// TODO: create table that has an `insertion_date` that defaults to NOW in postgres, so we can order +// by it and get the event insertion order to check with the expected order. async fn insert_event_with_cfg(cfg: String, table: String, mut connection: PgConnection) { // We store the timestamp as text and not as `timestamp with timezone` postgres type due to @@ -90,7 +82,6 @@ async fn insert_event_with_cfg(cfg: String, table: String, mut connection: PgCon .fetch_all(&mut connection) .await .unwrap(); - dbg!(&events); assert_eq!(1, events.len()); // drop input_event after comparing with response diff --git a/src/sources/postgresql_metrics.rs b/src/sources/postgresql_metrics.rs index f4a6f3f4e6418..1b354f7c5e5f1 100644 --- a/src/sources/postgresql_metrics.rs +++ b/src/sources/postgresql_metrics.rs @@ -995,8 +995,6 @@ mod tests { #[cfg(all(test, feature = "postgresql_metrics-integration-tests"))] mod integration_tests { - use std::path::PathBuf; - use super::*; use crate::{ event::Event, @@ -1004,25 +1002,12 @@ mod integration_tests { tls, SourceSender, }; - fn pg_host() -> String { - std::env::var("PG_HOST").unwrap_or_else(|_| "localhost".into()) - } - - fn pg_socket() -> PathBuf { - std::env::var("PG_SOCKET") - .map(PathBuf::from) - .unwrap_or_else(|_| { - let current_dir = std::env::current_dir().unwrap(); - current_dir - .join("tests") - .join("data") - .join("postgresql-local-socket") - }) + fn pg_url() -> String { + std::env::var("PG_URL").expect("PG_URL must be set") } - fn pg_url() -> String { - std::env::var("PG_URL") - .unwrap_or_else(|_| format!("postgres://vector:vector@{}/postgres", pg_host())) + fn pg_socket_url() -> String { + std::env::var("PG_SOCKET_URL").expect("PG_SOCKET_URL must be set") } async fn test_postgresql_metrics( @@ -1117,11 +1102,7 @@ mod integration_tests { #[tokio::test] async fn test_local() { - let endpoint = format!( - "postgresql:///postgres?host={}&user=vector&password=vector", - pg_socket().to_str().unwrap() - ); - test_postgresql_metrics(endpoint, None, None, None).await; + test_postgresql_metrics(pg_socket_url(), None, None, None).await; } #[tokio::test] diff --git a/src/test_util/mod.rs b/src/test_util/mod.rs index 51e2a75ee894c..b5e7cd6457272 100644 --- a/src/test_util/mod.rs +++ b/src/test_util/mod.rs @@ -231,6 +231,10 @@ pub fn temp_dir() -> PathBuf { path.join(dir_name) } +pub fn temp_table() -> String { + format!("test_{}", random_string(10).to_lowercase()) +} + pub fn map_event_batch_stream( stream: impl Stream, batch: Option, From c88bccc598630a58572eca318a720cc5f4325860 Mon Sep 17 00:00:00 2001 From: Jorge Hermo Date: Fri, 6 Dec 2024 20:06:29 +0100 Subject: [PATCH 18/23] test: refactored test --- src/sinks/clickhouse/integration_tests.rs | 20 +++---- src/sinks/postgres/config.rs | 2 +- src/sinks/postgres/integration_tests.rs | 65 +++++++++++------------ 3 files changed, 40 insertions(+), 47 deletions(-) diff --git a/src/sinks/clickhouse/integration_tests.rs b/src/sinks/clickhouse/integration_tests.rs index 703eb5fdc5b00..87793007fd0b9 100644 --- a/src/sinks/clickhouse/integration_tests.rs +++ b/src/sinks/clickhouse/integration_tests.rs @@ -26,7 +26,7 @@ use crate::{ sinks::util::{BatchConfig, Compression, TowerRequestConfig}, test_util::{ components::{run_and_assert_sink_compliance, SINK_TAGS}, - random_string, trace_init, + temp_table, trace_init, }, }; @@ -38,7 +38,7 @@ fn clickhouse_address() -> String { async fn insert_events() { trace_init(); - let table = gen_table(); + let table = temp_table(); let host = clickhouse_address(); let mut batch = BatchConfig::default(); @@ -87,7 +87,7 @@ async fn insert_events() { async fn skip_unknown_fields() { trace_init(); - let table = gen_table(); + let table = temp_table(); let host = clickhouse_address(); let mut batch = BatchConfig::default(); @@ -133,7 +133,7 @@ async fn skip_unknown_fields() { async fn insert_events_unix_timestamps() { trace_init(); - let table = gen_table(); + let table = temp_table(); let host = clickhouse_address(); let mut batch = BatchConfig::default(); @@ -192,7 +192,7 @@ async fn insert_events_unix_timestamps() { async fn insert_events_unix_timestamps_toml_config() { trace_init(); - let table = gen_table(); + let table = temp_table(); let host = clickhouse_address(); let config: ClickhouseConfig = toml::from_str(&format!( @@ -250,7 +250,7 @@ timestamp_format = "unix""#, async fn no_retry_on_incorrect_data() { trace_init(); - let table = gen_table(); + let table = temp_table(); let host = clickhouse_address(); let mut batch = BatchConfig::default(); @@ -309,7 +309,7 @@ async fn no_retry_on_incorrect_data_warp() { let config = ClickhouseConfig { endpoint: host.parse().unwrap(), - table: gen_table().try_into().unwrap(), + table: temp_table().try_into().unwrap(), batch, ..Default::default() }; @@ -334,7 +334,7 @@ async fn templated_table() { let n_tables = 2; let table_events: Vec<(String, Event, BatchStatusReceiver)> = (0..n_tables) .map(|_| { - let table = gen_table(); + let table = temp_table(); let (mut event, receiver) = make_event(); event.as_mut_log().insert("table", table.as_str()); (table, event, receiver) @@ -468,7 +468,3 @@ struct Stats { elapsed: f64, rows_read: usize, } - -fn gen_table() -> String { - format!("test_{}", random_string(10).to_lowercase()) -} diff --git a/src/sinks/postgres/config.rs b/src/sinks/postgres/config.rs index b5b325a51c1d0..a3746fce42a83 100644 --- a/src/sinks/postgres/config.rs +++ b/src/sinks/postgres/config.rs @@ -29,7 +29,7 @@ const fn default_pool_size() -> u32 { /// Configuration for the `postgres` sink. #[configurable_component(sink("postgres", "Deliver log data to a PostgreSQL database."))] -#[derive(Clone, Debug)] +#[derive(Clone, Default, Debug)] #[serde(deny_unknown_fields)] pub struct PostgresConfig { // TODO: if I used UriSerde instead of String, I couldn't get a url string to use diff --git a/src/sinks/postgres/integration_tests.rs b/src/sinks/postgres/integration_tests.rs index add12435a110b..56ca2adcc14a7 100644 --- a/src/sinks/postgres/integration_tests.rs +++ b/src/sinks/postgres/integration_tests.rs @@ -1,9 +1,6 @@ use crate::{ config::{SinkConfig, SinkContext}, - sinks::{ - postgres::PostgresConfig, - util::{test::load_sink, UriSerde}, - }, + sinks::{postgres::PostgresConfig, util::test::load_sink}, test_util::{components::run_and_assert_sink_compliance, temp_table, trace_init}, }; use futures::stream; @@ -16,60 +13,67 @@ fn pg_url() -> String { std::env::var("PG_URL").expect("PG_URL must be set") } -fn make_event() -> (Event, BatchStatusReceiver) { - let (batch, receiver) = BatchNotifier::new_with_receiver(); - let mut event = LogEvent::from("raw log line").with_batch_notifier(&batch); +fn create_event(id: i64) -> Event { + let mut event = LogEvent::from("raw log line"); + event.insert("id", id); event.insert("host", "example.com"); let event_payload = event.clone().into_parts().0; event.insert("payload", event_payload); - (event.into(), receiver) + event.into() +} + +fn create_event_with_notifier(id: i64) -> (Event, BatchStatusReceiver) { + let (batch, receiver) = BatchNotifier::new_with_receiver(); + let event = create_event(id).with_batch_notifier(&batch); + (event, receiver) } #[derive(Debug, Serialize, Deserialize, FromRow)] struct TestEvent { + id: i64, host: String, timestamp: String, message: String, payload: serde_json::Value, } -async fn prepare_config() -> (String, String, PgConnection) { +async fn prepare_config() -> (PostgresConfig, String, PgConnection) { trace_init(); let table = temp_table(); - let endpoint: UriSerde = pg_url().parse().unwrap(); + let endpoint = pg_url(); - let cfg = format!( + let config_str = format!( r#" endpoint = "{endpoint}" table = "{table}" - batch.max_events = 1 "#, ); + let (config, _) = load_sink::(&config_str).unwrap(); - let connection = PgConnection::connect(endpoint.to_string().as_str()) + let connection = PgConnection::connect(endpoint.as_str()) .await .expect("Failed to connect to Postgres"); - (cfg, table, connection) + (config, table, connection) } + // TODO: create table that has an `insertion_date` that defaults to NOW in postgres, so we can order // by it and get the event insertion order to check with the expected order. - -async fn insert_event_with_cfg(cfg: String, table: String, mut connection: PgConnection) { +#[tokio::test] +async fn insert_single_event() { + let (config, table, mut connection) = prepare_config().await; + let (sink, _hc) = config.build(SinkContext::default()).await.unwrap(); // We store the timestamp as text and not as `timestamp with timezone` postgres type due to // postgres not supporting nanosecond-resolution (it does support microsecond-resolution). let create_table_sql = - format!("CREATE TABLE IF NOT EXISTS {table} (host text, timestamp text, message text, payload jsonb)",); + format!("CREATE TABLE IF NOT EXISTS {table} (id bigint, host text, timestamp text, message text, payload jsonb)"); sqlx::query(&create_table_sql) .execute(&mut connection) .await .unwrap(); - let (config, _) = load_sink::(&cfg).unwrap(); - let (sink, _hc) = config.build(SinkContext::default()).await.unwrap(); - - let (input_event, mut receiver) = make_event(); + let (input_event, mut receiver) = create_event_with_notifier(0); run_and_assert_sink_compliance( sink, stream::once(ready(input_event.clone())), @@ -78,25 +82,18 @@ async fn insert_event_with_cfg(cfg: String, table: String, mut connection: PgCon .await; let select_all_sql = format!("SELECT * FROM {table}"); - let events: Vec = sqlx::query_as(&select_all_sql) - .fetch_all(&mut connection) + let actual_event: TestEvent = sqlx::query_as(&select_all_sql) + .fetch_one(&mut connection) .await .unwrap(); - assert_eq!(1, events.len()); // drop input_event after comparing with response { - let log_event = input_event.into_log(); - let expected = serde_json::to_value(&log_event).unwrap(); - let actual = serde_json::to_value(&events[0]).unwrap(); - assert_eq!(expected, actual); + let input_log_event = input_event.into_log(); + let expected_value = serde_json::to_value(&input_log_event).unwrap(); + let actual_value = serde_json::to_value(actual_event).unwrap(); + assert_eq!(expected_value, actual_value); } assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered)); } - -#[tokio::test] -async fn test_postgres_sink() { - let (cfg, table, connection) = prepare_config().await; - insert_event_with_cfg(cfg, table, connection).await; -} From f58a51b194d342eaab75b86e64f1535b111a78a3 Mon Sep 17 00:00:00 2001 From: Jorge Hermo Date: Sun, 15 Dec 2024 18:16:25 +0100 Subject: [PATCH 19/23] test: add new test & store timestamp instead of text --- Cargo.lock | 102 ++++++++++++++++++++- Cargo.toml | 2 +- src/sinks/postgres/integration_tests.rs | 114 ++++++++++++++++++++---- 3 files changed, 196 insertions(+), 22 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 40651c009440f..1c25f354f0e86 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1649,6 +1649,9 @@ name = "bitflags" version = "2.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "327762f6e5a765692301e5bb513e0d9fef63be86bbc14528052b1cd3e6f03e07" +dependencies = [ + "serde", +] [[package]] name = "bitmask-enum" @@ -1974,12 +1977,13 @@ dependencies = [ [[package]] name = "cc" -version = "1.0.83" +version = "1.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f1174fb0b6ec23863f8b971027804a42614e347eafb0a95bf0b12cdae21fc4d0" +checksum = "9157bbaa6b165880c27a4293a474c91cdcf265cc68cc829bf10be0964a391caf" dependencies = [ "jobserver", "libc", + "shlex", ] [[package]] @@ -5173,9 +5177,9 @@ checksum = "8eaf4bc02d17cbdd7ff4c7438cafcdf7fb9a4613313ad11b4f8fefe7d3fa0130" [[package]] name = "jobserver" -version = "0.1.27" +version = "0.1.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8c37f63953c4c63420ed5fd3d6d398c719489b9f872b9fa683262f8edd363c7d" +checksum = "48d1dbcbbeb6a7fec7e059840aa538bd62aaccf972c7346c4d9d2059312853d0" dependencies = [ "libc", ] @@ -5503,6 +5507,17 @@ version = "0.2.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4ec2a862134d2a7d32d7983ddcdd1c4923530833c9f2ea1a44fc5fa473989058" +[[package]] +name = "libsqlite3-sys" +version = "0.30.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2e99fb7a497b1e3339bc746195567ed8d3e24945ecd636e3619d20b9de9e9149" +dependencies = [ + "cc", + "pkg-config", + "vcpkg", +] + [[package]] name = "libz-sys" version = "1.1.12" @@ -9056,6 +9071,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "shlex" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" + [[package]] name = "signal-hook" version = "0.3.17" @@ -9326,7 +9347,9 @@ checksum = "93334716a037193fac19df402f8571269c84a00852f6a7066b5d2616dcd64d3e" dependencies = [ "sqlx-core", "sqlx-macros", + "sqlx-mysql", "sqlx-postgres", + "sqlx-sqlite", ] [[package]] @@ -9338,6 +9361,7 @@ dependencies = [ "atoi", "byteorder", "bytes 1.9.0", + "chrono", "crc", "crossbeam-queue", "either", @@ -9398,13 +9422,58 @@ dependencies = [ "serde_json", "sha2", "sqlx-core", + "sqlx-mysql", "sqlx-postgres", + "sqlx-sqlite", "syn 2.0.90", "tempfile", "tokio", "url", ] +[[package]] +name = "sqlx-mysql" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "64bb4714269afa44aef2755150a0fc19d756fb580a67db8885608cf02f47d06a" +dependencies = [ + "atoi", + "base64 0.22.1", + "bitflags 2.4.1", + "byteorder", + "bytes 1.9.0", + "chrono", + "crc", + "digest", + "dotenvy", + "either", + "futures-channel", + "futures-core", + "futures-io", + "futures-util", + "generic-array", + "hex", + "hkdf", + "hmac", + "itoa", + "log", + "md-5", + "memchr", + "once_cell", + "percent-encoding", + "rand 0.8.5", + "rsa", + "serde", + "sha1", + "sha2", + "smallvec", + "sqlx-core", + "stringprep", + "thiserror 1.0.68", + "tracing 0.1.41", + "whoami", +] + [[package]] name = "sqlx-postgres" version = "0.8.2" @@ -9415,6 +9484,7 @@ dependencies = [ "base64 0.22.1", "bitflags 2.4.1", "byteorder", + "chrono", "crc", "dotenvy", "etcetera", @@ -9443,6 +9513,30 @@ dependencies = [ "whoami", ] +[[package]] +name = "sqlx-sqlite" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d5b2cf34a45953bfd3daaf3db0f7a7878ab9b7a6b91b422d24a7a9e4c857b680" +dependencies = [ + "atoi", + "chrono", + "flume 0.11.0", + "futures-channel", + "futures-core", + "futures-executor", + "futures-intrusive", + "futures-util", + "libsqlite3-sys", + "log", + "percent-encoding", + "serde", + "serde_urlencoded", + "sqlx-core", + "tracing 0.1.41", + "url", +] + [[package]] name = "stable_deref_trait" version = "1.2.0" diff --git a/Cargo.toml b/Cargo.toml index a4f86714f4deb..4aa40a2eb415b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -360,7 +360,7 @@ semver = { version = "1.0.23", default-features = false, features = ["serde", "s smallvec = { version = "1", default-features = false, features = ["union", "serde"] } snap = { version = "1.1.1", default-features = false } socket2 = { version = "0.5.8", default-features = false } -sqlx = { version = "0.8.2", default-features= false, features = ["derive", "postgres", "runtime-tokio"], optional=true } +sqlx = { version = "0.8.2", default-features= false, features = ["derive", "postgres", "chrono", "runtime-tokio"], optional=true } stream-cancel = { version = "0.8.2", default-features = false } strip-ansi-escapes = { version = "0.2.0", default-features = false } syslog = { version = "6.1.1", default-features = false, optional = true } diff --git a/src/sinks/postgres/integration_tests.rs b/src/sinks/postgres/integration_tests.rs index 56ca2adcc14a7..674ec4030cc4e 100644 --- a/src/sinks/postgres/integration_tests.rs +++ b/src/sinks/postgres/integration_tests.rs @@ -3,12 +3,15 @@ use crate::{ sinks::{postgres::PostgresConfig, util::test::load_sink}, test_util::{components::run_and_assert_sink_compliance, temp_table, trace_init}, }; +use chrono::{DateTime, Utc}; use futures::stream; use serde::{Deserialize, Serialize}; use sqlx::{Connection, FromRow, PgConnection}; use std::future::ready; use vector_lib::event::{BatchNotifier, BatchStatus, BatchStatusReceiver, Event, LogEvent}; +const POSTGRES_SINK_TAGS: [&str; 2] = ["endpoint", "protocol"]; + fn pg_url() -> String { std::env::var("PG_URL").expect("PG_URL must be set") } @@ -19,6 +22,11 @@ fn create_event(id: i64) -> Event { event.insert("host", "example.com"); let event_payload = event.clone().into_parts().0; event.insert("payload", event_payload); + let timestamp = Utc::now(); + // Postgres does not support nanosecond-resolution, so we truncate the timestamp to microsecond-resolution. + let timestamp_microsecond_resolution = + DateTime::from_timestamp_micros(timestamp.timestamp_micros()); + event.insert("timestamp", timestamp_microsecond_resolution); event.into() } @@ -28,11 +36,17 @@ fn create_event_with_notifier(id: i64) -> (Event, BatchStatusReceiver) { (event, receiver) } +fn create_events(count: usize) -> (Vec, BatchStatusReceiver) { + let mut events = (0..count as i64).map(create_event).collect::>(); + let receiver = BatchNotifier::apply_to(&mut events); + return (events, receiver); +} + #[derive(Debug, Serialize, Deserialize, FromRow)] struct TestEvent { id: i64, host: String, - timestamp: String, + timestamp: DateTime, message: String, payload: serde_json::Value, } @@ -58,16 +72,88 @@ async fn prepare_config() -> (PostgresConfig, String, PgConnection) { (config, table, connection) } -// TODO: create table that has an `insertion_date` that defaults to NOW in postgres, so we can order -// by it and get the event insertion order to check with the expected order. #[tokio::test] async fn insert_single_event() { + let (config, table, mut connection) = prepare_config().await; + let (sink, _hc) = config.build(SinkContext::default()).await.unwrap(); + let create_table_sql = + format!("CREATE TABLE IF NOT EXISTS {table} (id BIGINT, host TEXT, timestamp TIMESTAMPTZ, message TEXT, payload JSONB)"); + sqlx::query(&create_table_sql) + .execute(&mut connection) + .await + .unwrap(); + + let (input_event, mut receiver) = create_event_with_notifier(0); + let input_log_event = input_event.clone().into_log(); + let expected_value = serde_json::to_value(&input_log_event).unwrap(); + + run_and_assert_sink_compliance(sink, stream::once(ready(input_event)), &POSTGRES_SINK_TAGS) + .await; + // We drop the event to notify the receiver that the batch was delivered. + std::mem::drop(input_log_event); + assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered)); + + let select_all_sql = format!("SELECT * FROM {table}"); + let actual_event: TestEvent = sqlx::query_as(&select_all_sql) + .fetch_one(&mut connection) + .await + .unwrap(); + let actual_value = serde_json::to_value(actual_event).unwrap(); + assert_eq!(expected_value, actual_value); +} + +#[tokio::test] +async fn insert_multiple_events() { + let (config, table, mut connection) = prepare_config().await; + let (sink, _hc) = config.build(SinkContext::default()).await.unwrap(); + let create_table_sql = format!( + "CREATE TABLE IF NOT EXISTS {table} (id BIGINT, host TEXT, timestamp TIMESTAMPTZ, message TEXT, payload JSONB)" + ); + sqlx::query(&create_table_sql) + .execute(&mut connection) + .await + .unwrap(); + + let (input_events, mut receiver) = create_events(100); + let input_log_events = input_events + .clone() + .into_iter() + .map(Event::into_log) + .collect::>(); + let expected_values = input_log_events + .iter() + .map(|event| serde_json::to_value(event).unwrap()) + .collect::>(); + run_and_assert_sink_compliance(sink, stream::iter(input_events), &POSTGRES_SINK_TAGS).await; + // We drop the event to notify the receiver that the batch was delivered. + std::mem::drop(input_log_events); + assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered)); + + let select_all_sql = format!("SELECT * FROM {table} ORDER BY id"); + let actual_events: Vec = sqlx::query_as(&select_all_sql) + .fetch_all(&mut connection) + .await + .unwrap(); + let actual_values = actual_events + .iter() + .map(|event| serde_json::to_value(event).unwrap()) + .collect::>(); + assert_eq!(expected_values, actual_values); +} + +// Using null::{table} with jsonb_populate_recordset does not work with default values +// https://dba.stackexchange.com/questions/308114/use-default-value-instead-of-inserted-null +// https://stackoverflow.com/questions/49992531/postgresql-insert-a-null-convert-to-default +// TODO: this cannot be fixed without a workaround involving a trigger creation, which is beyond +// Vector's job in the DB. We should document this limitation alongside with this test. +#[tokio::test] +async fn default_columns_are_not_populated() { let (config, table, mut connection) = prepare_config().await; let (sink, _hc) = config.build(SinkContext::default()).await.unwrap(); // We store the timestamp as text and not as `timestamp with timezone` postgres type due to // postgres not supporting nanosecond-resolution (it does support microsecond-resolution). let create_table_sql = - format!("CREATE TABLE IF NOT EXISTS {table} (id bigint, host text, timestamp text, message text, payload jsonb)"); + format!("CREATE TABLE IF NOT EXISTS {table} (id BIGINT, not_existing_column TEXT DEFAULT 'default_value')"); sqlx::query(&create_table_sql) .execute(&mut connection) .await @@ -77,23 +163,17 @@ async fn insert_single_event() { run_and_assert_sink_compliance( sink, stream::once(ready(input_event.clone())), - &["endpoint", "protocol"], + &POSTGRES_SINK_TAGS, ) .await; + // We drop the event to notify the receiver that the batch was delivered. + std::mem::drop(input_event); + assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered)); - let select_all_sql = format!("SELECT * FROM {table}"); - let actual_event: TestEvent = sqlx::query_as(&select_all_sql) + let select_all_sql = format!("SELECT not_existing_column FROM {table}"); + let inserted_not_existing_column: (Option,) = sqlx::query_as(&select_all_sql) .fetch_one(&mut connection) .await .unwrap(); - - // drop input_event after comparing with response - { - let input_log_event = input_event.into_log(); - let expected_value = serde_json::to_value(&input_log_event).unwrap(); - let actual_value = serde_json::to_value(actual_event).unwrap(); - assert_eq!(expected_value, actual_value); - } - - assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered)); + assert_eq!(inserted_not_existing_column.0, None); } From 95768bbd97a9fb2fe0983fbbda7630301e6c4ead Mon Sep 17 00:00:00 2001 From: Jorge Hermo Date: Sun, 15 Dec 2024 18:17:06 +0100 Subject: [PATCH 20/23] test: add new test & store timestamp instead of text --- src/sinks/postgres/integration_tests.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/sinks/postgres/integration_tests.rs b/src/sinks/postgres/integration_tests.rs index 674ec4030cc4e..1edee7dba95ad 100644 --- a/src/sinks/postgres/integration_tests.rs +++ b/src/sinks/postgres/integration_tests.rs @@ -150,8 +150,6 @@ async fn insert_multiple_events() { async fn default_columns_are_not_populated() { let (config, table, mut connection) = prepare_config().await; let (sink, _hc) = config.build(SinkContext::default()).await.unwrap(); - // We store the timestamp as text and not as `timestamp with timezone` postgres type due to - // postgres not supporting nanosecond-resolution (it does support microsecond-resolution). let create_table_sql = format!("CREATE TABLE IF NOT EXISTS {table} (id BIGINT, not_existing_column TEXT DEFAULT 'default_value')"); sqlx::query(&create_table_sql) From 270d32c0a3533725ffc16cbd2a444f309b25d821 Mon Sep 17 00:00:00 2001 From: Jorge Hermo Date: Sun, 15 Dec 2024 18:19:33 +0100 Subject: [PATCH 21/23] test: add new test & store timestamp instead of text --- src/sinks/postgres/integration_tests.rs | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/src/sinks/postgres/integration_tests.rs b/src/sinks/postgres/integration_tests.rs index 1edee7dba95ad..9860abd5bd653 100644 --- a/src/sinks/postgres/integration_tests.rs +++ b/src/sinks/postgres/integration_tests.rs @@ -77,7 +77,7 @@ async fn insert_single_event() { let (config, table, mut connection) = prepare_config().await; let (sink, _hc) = config.build(SinkContext::default()).await.unwrap(); let create_table_sql = - format!("CREATE TABLE IF NOT EXISTS {table} (id BIGINT, host TEXT, timestamp TIMESTAMPTZ, message TEXT, payload JSONB)"); + format!("CREATE TABLE {table} (id BIGINT, host TEXT, timestamp TIMESTAMPTZ, message TEXT, payload JSONB)"); sqlx::query(&create_table_sql) .execute(&mut connection) .await @@ -107,7 +107,7 @@ async fn insert_multiple_events() { let (config, table, mut connection) = prepare_config().await; let (sink, _hc) = config.build(SinkContext::default()).await.unwrap(); let create_table_sql = format!( - "CREATE TABLE IF NOT EXISTS {table} (id BIGINT, host TEXT, timestamp TIMESTAMPTZ, message TEXT, payload JSONB)" + "CREATE TABLE {table} (id BIGINT, host TEXT, timestamp TIMESTAMPTZ, message TEXT, payload JSONB)" ); sqlx::query(&create_table_sql) .execute(&mut connection) @@ -141,7 +141,9 @@ async fn insert_multiple_events() { assert_eq!(expected_values, actual_values); } -// Using null::{table} with jsonb_populate_recordset does not work with default values +// Using null::{table} with jsonb_populate_recordset does not work with default values. +// it is like inserting null values explicitly, it does not use table's default values. +// // https://dba.stackexchange.com/questions/308114/use-default-value-instead-of-inserted-null // https://stackoverflow.com/questions/49992531/postgresql-insert-a-null-convert-to-default // TODO: this cannot be fixed without a workaround involving a trigger creation, which is beyond @@ -150,8 +152,9 @@ async fn insert_multiple_events() { async fn default_columns_are_not_populated() { let (config, table, mut connection) = prepare_config().await; let (sink, _hc) = config.build(SinkContext::default()).await.unwrap(); - let create_table_sql = - format!("CREATE TABLE IF NOT EXISTS {table} (id BIGINT, not_existing_column TEXT DEFAULT 'default_value')"); + let create_table_sql = format!( + "CREATE TABLE {table} (id BIGINT, not_existing_column TEXT DEFAULT 'default_value')" + ); sqlx::query(&create_table_sql) .execute(&mut connection) .await From 0f07f64c61a5021e62648363531e33b9d78e2fd7 Mon Sep 17 00:00:00 2001 From: Jorge Hermo Date: Sun, 15 Dec 2024 18:22:52 +0100 Subject: [PATCH 22/23] test: add reference link --- src/sinks/postgres/integration_tests.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/sinks/postgres/integration_tests.rs b/src/sinks/postgres/integration_tests.rs index 9860abd5bd653..ff00ff5a8fb4a 100644 --- a/src/sinks/postgres/integration_tests.rs +++ b/src/sinks/postgres/integration_tests.rs @@ -24,6 +24,7 @@ fn create_event(id: i64) -> Event { event.insert("payload", event_payload); let timestamp = Utc::now(); // Postgres does not support nanosecond-resolution, so we truncate the timestamp to microsecond-resolution. + // https://www.postgresql.org/docs/current/datatype-datetime.html let timestamp_microsecond_resolution = DateTime::from_timestamp_micros(timestamp.timestamp_micros()); event.insert("timestamp", timestamp_microsecond_resolution); @@ -39,7 +40,7 @@ fn create_event_with_notifier(id: i64) -> (Event, BatchStatusReceiver) { fn create_events(count: usize) -> (Vec, BatchStatusReceiver) { let mut events = (0..count as i64).map(create_event).collect::>(); let receiver = BatchNotifier::apply_to(&mut events); - return (events, receiver); + (events, receiver) } #[derive(Debug, Serialize, Deserialize, FromRow)] @@ -143,7 +144,6 @@ async fn insert_multiple_events() { // Using null::{table} with jsonb_populate_recordset does not work with default values. // it is like inserting null values explicitly, it does not use table's default values. -// // https://dba.stackexchange.com/questions/308114/use-default-value-instead-of-inserted-null // https://stackoverflow.com/questions/49992531/postgresql-insert-a-null-convert-to-default // TODO: this cannot be fixed without a workaround involving a trigger creation, which is beyond From 603fc5ca2cd8355402e0dc078ec23c884076d5a4 Mon Sep 17 00:00:00 2001 From: Jorge Hermo Date: Sun, 22 Dec 2024 11:14:18 +0100 Subject: [PATCH 23/23] test: add more tests --- src/sinks/postgres/integration_tests.rs | 141 ++++++++++++++++++++++-- 1 file changed, 133 insertions(+), 8 deletions(-) diff --git a/src/sinks/postgres/integration_tests.rs b/src/sinks/postgres/integration_tests.rs index ff00ff5a8fb4a..6700e36d5ca35 100644 --- a/src/sinks/postgres/integration_tests.rs +++ b/src/sinks/postgres/integration_tests.rs @@ -1,7 +1,12 @@ use crate::{ config::{SinkConfig, SinkContext}, sinks::{postgres::PostgresConfig, util::test::load_sink}, - test_util::{components::run_and_assert_sink_compliance, temp_table, trace_init}, + test_util::{ + components::{ + run_and_assert_sink_compliance, run_and_assert_sink_error, COMPONENT_ERROR_TAGS, + }, + temp_table, trace_init, + }, }; use chrono::{DateTime, Utc}; use futures::stream; @@ -53,15 +58,13 @@ struct TestEvent { } async fn prepare_config() -> (PostgresConfig, String, PgConnection) { - trace_init(); - let table = temp_table(); let endpoint = pg_url(); - let config_str = format!( r#" endpoint = "{endpoint}" table = "{table}" + batch.max_events = 1 "#, ); let (config, _) = load_sink::(&config_str).unwrap(); @@ -73,8 +76,37 @@ async fn prepare_config() -> (PostgresConfig, String, PgConnection) { (config, table, connection) } +#[tokio::test] +async fn healthcheck_passes() { + trace_init(); + let (config, _table, _connection) = prepare_config().await; + let (_sink, healthcheck) = config.build(SinkContext::default()).await.unwrap(); + healthcheck.await.unwrap(); +} + +// This test does not actually fail in the healthcheck query, but in the connection pool creation at +// `PostgresConfig::build` +#[tokio::test] +async fn healthcheck_fails() { + trace_init(); + + let table = temp_table(); + let endpoint = "postgres://user:pass?host=/unknown_socket_path".to_string(); + let config_str = format!( + r#" + endpoint = "{endpoint}" + table = "{table}" + "#, + ); + let (config, _) = load_sink::(&config_str).unwrap(); + + assert!(config.build(SinkContext::default()).await.is_err()); +} + #[tokio::test] async fn insert_single_event() { + trace_init(); + let (config, table, mut connection) = prepare_config().await; let (sink, _hc) = config.build(SinkContext::default()).await.unwrap(); let create_table_sql = @@ -105,6 +137,8 @@ async fn insert_single_event() { #[tokio::test] async fn insert_multiple_events() { + trace_init(); + let (config, table, mut connection) = prepare_config().await; let (sink, _hc) = config.build(SinkContext::default()).await.unwrap(); let create_table_sql = format!( @@ -150,10 +184,12 @@ async fn insert_multiple_events() { // Vector's job in the DB. We should document this limitation alongside with this test. #[tokio::test] async fn default_columns_are_not_populated() { + trace_init(); + let (config, table, mut connection) = prepare_config().await; let (sink, _hc) = config.build(SinkContext::default()).await.unwrap(); let create_table_sql = format!( - "CREATE TABLE {table} (id BIGINT, not_existing_column TEXT DEFAULT 'default_value')" + "CREATE TABLE {table} (id BIGINT, not_existing_field TEXT DEFAULT 'default_value')" ); sqlx::query(&create_table_sql) .execute(&mut connection) @@ -171,10 +207,99 @@ async fn default_columns_are_not_populated() { std::mem::drop(input_event); assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered)); - let select_all_sql = format!("SELECT not_existing_column FROM {table}"); - let inserted_not_existing_column: (Option,) = sqlx::query_as(&select_all_sql) + let select_all_sql = format!("SELECT not_existing_field FROM {table}"); + let inserted_not_existing_field: (Option,) = sqlx::query_as(&select_all_sql) .fetch_one(&mut connection) .await .unwrap(); - assert_eq!(inserted_not_existing_column.0, None); + assert_eq!(inserted_not_existing_field.0, None); +} + +#[tokio::test] +async fn extra_fields_are_ignored() { + trace_init(); + + let (config, table, mut connection) = prepare_config().await; + let (sink, _hc) = config.build(SinkContext::default()).await.unwrap(); + let create_table_sql = format!("CREATE TABLE {table} (message TEXT)"); + sqlx::query(&create_table_sql) + .execute(&mut connection) + .await + .unwrap(); + + let (input_event, mut receiver) = create_event_with_notifier(0); + let input_log_event = input_event.clone().into_log(); + let expected_value = input_log_event + .get_message() + .unwrap() + .as_str() + .unwrap() + .into_owned(); + + run_and_assert_sink_compliance(sink, stream::once(ready(input_event)), &POSTGRES_SINK_TAGS) + .await; + // We drop the event to notify the receiver that the batch was delivered. + std::mem::drop(input_log_event); + assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered)); + + let select_all_sql = format!("SELECT * FROM {table}"); + let actual_value: (String,) = sqlx::query_as(&select_all_sql) + .fetch_one(&mut connection) + .await + .unwrap(); + assert_eq!(expected_value, actual_value.0); +} + +#[tokio::test] +async fn insertion_fails_required_field_is_not_present() { + trace_init(); + + let (config, table, mut connection) = prepare_config().await; + let (sink, _hc) = config.build(SinkContext::default()).await.unwrap(); + let create_table_sql = + format!("CREATE TABLE {table} (message TEXT, not_existing_field TEXT NOT NULL)"); + sqlx::query(&create_table_sql) + .execute(&mut connection) + .await + .unwrap(); + + let (input_event, mut receiver) = create_event_with_notifier(0); + + run_and_assert_sink_error( + sink, + stream::once(ready(input_event.clone())), + &COMPONENT_ERROR_TAGS, + ) + .await; + // We drop the event to notify the receiver that the batch was delivered. + std::mem::drop(input_event); + assert_eq!(receiver.try_recv(), Ok(BatchStatus::Rejected)); + + // We ensure that the event was not inserted. + let select_all_sql = format!("SELECT * FROM {table}"); + let first_row: Option<(String, String)> = sqlx::query_as(&select_all_sql) + .fetch_optional(&mut connection) + .await + .unwrap(); + assert_eq!(first_row, None); +} + +#[tokio::test] +async fn insertion_fails_missing_table() { + trace_init(); + + let table = "missing_table".to_string(); + let (mut config, _, _) = prepare_config().await; + config.table = table.clone(); + + let (sink, _hc) = config.build(SinkContext::default()).await.unwrap(); + let (input_event, mut receiver) = create_event_with_notifier(0); + + run_and_assert_sink_error( + sink, + stream::once(ready(input_event)), + &COMPONENT_ERROR_TAGS, + ) + .await; + assert_eq!(receiver.try_recv(), Ok(BatchStatus::Rejected)); }