Skip to content

Commit

Permalink
chore(core): Refactor vector-core::stream into its own package (#18900
Browse files Browse the repository at this point in the history
)

* chore(deps): Refactor `vector-core::stream` into its own package

* Fixes
  • Loading branch information
bruceg authored Oct 23, 2023
1 parent 8b56a93 commit 96f4d73
Show file tree
Hide file tree
Showing 47 changed files with 166 additions and 140 deletions.
24 changes: 21 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ members = [
"lib/vector-config-macros",
"lib/vector-core",
"lib/vector-lookup",
"lib/vector-stream",
"lib/vector-vrl/cli",
"lib/vector-vrl/functions",
"lib/vector-vrl/tests",
Expand Down Expand Up @@ -144,6 +145,7 @@ vector-config = { path = "lib/vector-config" }
vector-config-common = { path = "lib/vector-config-common" }
vector-config-macros = { path = "lib/vector-config-macros" }
vector-core = { path = "lib/vector-core", default-features = false, features = ["vrl"] }
vector-stream = { path = "lib/vector-stream" }
vector-vrl-functions = { path = "lib/vector-vrl/functions" }
loki-logproto = { path = "lib/loki-logproto", optional = true }

Expand Down
3 changes: 0 additions & 3 deletions lib/vector-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ publish = false

[dependencies]
async-graphql = { version = "5.0.10", default-features = false, features = ["playground" ], optional = true }
async-stream = { version = "0.3.5", default-features = false }
async-trait = { version = "0.1", default-features = false }
bitmask-enum = { version = "2.2.2", default-features = false }
bytes = { version = "1.5.0", default-features = false, features = ["serde"] }
Expand Down Expand Up @@ -53,13 +52,11 @@ tokio-stream = { version = "0.1", default-features = false, features = ["time"],
tokio-util = { version = "0.7.0", default-features = false, features = ["time"] }
toml = { version = "0.8.3", default-features = false }
tonic = { version = "0.10", default-features = false, features = ["transport"] }
tower = { version = "0.4", default-features = false, features = ["util"] }
tracing = { version = "0.1.34", default-features = false }
tracing-core = { version = "0.1.26", default-features = false }
tracing-log = { version = "0.1.3", default-features = false }
tracing-subscriber = { version = "0.3.17", default-features = false, features = ["std"] }
typetag = { version = "0.2.13", default-features = false }
twox-hash = { version = "1.6.3", default-features = false }
url = { version = "2", default-features = false }
vector-buffers = { path = "../vector-buffers", default-features = false }
vector-common = { path = "../vector-common" }
Expand Down
1 change: 0 additions & 1 deletion lib/vector-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ pub mod schema;
pub mod serde;
pub mod sink;
pub mod source;
pub mod stream;
pub mod tcp;
#[cfg(test)]
mod test_util;
Expand Down
11 changes: 0 additions & 11 deletions lib/vector-core/src/stream/mod.rs

This file was deleted.

4 changes: 2 additions & 2 deletions lib/vector-core/src/time.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::task::{Context, Poll};
/// this trait represents the minimum functionality required to describe management of keyed timers
/// for the types implemented in this crate that require such behavior.
///
/// Users can look at `vector_core::stream::batcher::ExpirationQueue` for a concrete implementation.
/// Users can look at `vector_stream::batcher::ExpirationQueue` for a concrete implementation.
pub trait KeyedTimer<K> {
/// Clear the timer.
///
Expand All @@ -32,6 +32,6 @@ pub trait KeyedTimer<K> {
/// Unlike a typical stream, returning `None` only indicates that the queue
/// is empty, not that the queue will never return anything else in the future.
///
/// Used primarily for property testing vis-á-vis `vector_core::stream::batcher::Batcher`.
/// Used primarily for property testing vis-á-vis `vector_stream::batcher::Batcher`.
fn poll_expired(&mut self, cx: &mut Context) -> Poll<Option<K>>;
}
24 changes: 24 additions & 0 deletions lib/vector-stream/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
[package]
name = "vector-stream"
version = "0.1.0"
authors = ["Vector Contributors <vector@datadoghq.com>"]
edition = "2021"
publish = false

[dependencies]
async-stream = { version = "0.3.5", default-features = false }
futures = { version = "0.3.28", default-features = false, features = ["std"] }
futures-util = { version = "0.3.28", default-features = false, features = ["std"] }
pin-project.workspace = true
tokio = { version = "1.33.0", default-features = false, features = ["net"] }
tokio-util = { version = "0.7.0", default-features = false, features = ["time"] }
tower = { version = "0.4", default-features = false, features = ["util"] }
tracing = { version = "0.1.34", default-features = false }
twox-hash = { version = "1.6.3", default-features = false }
vector-common = { path = "../vector-common" }
vector-core = { path = "../vector-core" }

[dev-dependencies]
proptest = "1.3"
rand = "0.8.5"
rand_distr = "0.4.3"
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use crate::{stream::batcher::data::BatchData, ByteSizeOf};
use vector_core::ByteSizeOf;

use crate::batcher::data::BatchData;

pub trait BatchLimiter<T, B> {
type ItemMetadata;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ mod test {
use futures::stream;

use super::*;
use crate::stream::BatcherSettings;
use crate::BatcherSettings;

#[tokio::test]
async fn item_limit() {
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ use vector_common::internal_event::{
RegisteredEventCache, SharedString, TaggedEventsSent,
};
use vector_common::request_metadata::{GroupedCountByteSize, MetaDescriptive};

use super::FuturesUnorderedCount;
use crate::{
use vector_core::{
event::{EventFinalizers, EventStatus, Finalizable},
internal_event::emit,
};

use super::FuturesUnorderedCount;

pub trait DriverResponse {
fn event_status(&self) -> EventStatus;
fn events_sent(&self) -> &GroupedCountByteSize;
Expand Down Expand Up @@ -293,7 +293,7 @@ mod tests {
}

impl Finalizable for DelayRequest {
fn take_finalizers(&mut self) -> crate::event::EventFinalizers {
fn take_finalizers(&mut self) -> vector_core::event::EventFinalizers {
std::mem::take(&mut self.1)
}
}
Expand Down Expand Up @@ -365,6 +365,7 @@ mod tests {
let upper = self.upper_bound_us;

// Generate a value between 10ms and 500ms, with a long tail shape to the distribution.
#[allow(clippy::cast_sign_loss)] // Value will be positive anyways
self.jitter
.sample_iter(&mut self.jitter_gen)
.map(|n| n * lower as f64)
Expand Down
File renamed without changes.
26 changes: 26 additions & 0 deletions lib/vector-stream/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#![deny(warnings)]
#![deny(clippy::all)]
#![deny(clippy::pedantic)]
#![deny(unreachable_pub)]
#![deny(unused_allocation)]
#![deny(unused_extern_crates)]
#![deny(unused_assignments)]
#![deny(unused_comparisons)]
#![allow(clippy::module_name_repetitions)]
#![allow(clippy::must_use_candidate)]
#![allow(clippy::type_complexity)]

pub mod batcher;
mod concurrent_map;
mod driver;
pub mod expiration_map;
mod futures_unordered_count;
mod partitioned_batcher;

pub use concurrent_map::ConcurrentMap;
pub use driver::{Driver, DriverResponse};
use futures_unordered_count::FuturesUnorderedCount;
pub use partitioned_batcher::{BatcherSettings, ExpirationQueue, PartitionedBatcher};

#[macro_use]
extern crate tracing;
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,15 @@ use futures::stream::{Fuse, Stream, StreamExt};
use pin_project::pin_project;
use tokio_util::time::{delay_queue::Key, DelayQueue};
use twox_hash::XxHash64;
use vector_core::{partition::Partitioner, time::KeyedTimer, ByteSizeOf};

use crate::{
partition::Partitioner,
stream::batcher::{
config::BatchConfigParts,
data::BatchReduce,
limiter::{ByteSizeOfItemSize, ItemBatchSize, SizeLimit},
},
time::KeyedTimer,
ByteSizeOf,
use crate::batcher::{
config::BatchConfigParts,
data::BatchReduce,
limiter::{ByteSizeOfItemSize, ItemBatchSize, SizeLimit},
BatchConfig,
};

use super::batcher::BatchConfig;

/// A `KeyedTimer` based on `DelayQueue`.
pub struct ExpirationQueue<K> {
/// The timeout to give each new key entry
Expand Down Expand Up @@ -351,6 +346,7 @@ where
}
}

#[allow(clippy::cast_sign_loss)]
#[cfg(test)]
mod test {
use std::{
Expand All @@ -365,14 +361,11 @@ mod test {
use pin_project::pin_project;
use proptest::prelude::*;
use tokio::{pin, time::advance};
use vector_core::{partition::Partitioner, time::KeyedTimer};

use crate::{
partition::Partitioner,
stream::{
partitioned_batcher::{ExpirationQueue, PartitionedBatcher},
BatcherSettings,
},
time::KeyedTimer,
partitioned_batcher::{ExpirationQueue, PartitionedBatcher},
BatcherSettings,
};

#[derive(Debug)]
Expand Down
2 changes: 1 addition & 1 deletion src/sinks/appsignal/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use vector_common::{
finalization::EventStatus, request_metadata::GroupedCountByteSize,
request_metadata::MetaDescriptive, sensitive_string::SensitiveString,
};
use vector_core::stream::DriverResponse;
use vector_stream::DriverResponse;

use crate::{
http::HttpClient,
Expand Down
2 changes: 1 addition & 1 deletion src/sinks/aws_cloudwatch_logs/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use vector_common::{
finalization::EventStatus,
request_metadata::{GroupedCountByteSize, MetaDescriptive},
};
use vector_core::stream::DriverResponse;
use vector_stream::DriverResponse;

use crate::sinks::{
aws_cloudwatch_logs::{
Expand Down
7 changes: 2 additions & 5 deletions src/sinks/aws_cloudwatch_logs/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,8 @@ use chrono::{Duration, Utc};
use futures::{future, stream::BoxStream, StreamExt};
use tower::Service;
use vector_common::request_metadata::{MetaDescriptive, RequestMetadata};
use vector_core::{
partition::Partitioner,
sink::StreamSink,
stream::{BatcherSettings, DriverResponse},
};
use vector_core::{partition::Partitioner, sink::StreamSink};
use vector_stream::{BatcherSettings, DriverResponse};

use crate::{
event::{Event, EventFinalizers, Finalizable},
Expand Down
2 changes: 1 addition & 1 deletion src/sinks/aws_kinesis/config.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use lookup::lookup_v2::ConfigValuePath;
use std::marker::PhantomData;

use vector_core::stream::BatcherSettings;
use vector_stream::BatcherSettings;

use crate::{
aws::{AwsAuthentication, RegionOrEndpoint},
Expand Down
3 changes: 2 additions & 1 deletion src/sinks/aws_s_s/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ use aws_sdk_sqs::types::SdkError;
use futures::future::BoxFuture;
use tower::Service;
use vector_common::request_metadata::GroupedCountByteSize;
use vector_core::{event::EventStatus, stream::DriverResponse, ByteSizeOf};
use vector_core::{event::EventStatus, ByteSizeOf};
use vector_stream::DriverResponse;

use super::{client::Client, request_builder::SendMessageEntry};

Expand Down
2 changes: 1 addition & 1 deletion src/sinks/azure_common/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use vector_common::{
json_size::JsonSize,
request_metadata::{GroupedCountByteSize, MetaDescriptive, RequestMetadata},
};
use vector_core::stream::DriverResponse;
use vector_stream::DriverResponse;

use crate::{
event::{EventFinalizers, EventStatus, Finalizable},
Expand Down
2 changes: 1 addition & 1 deletion src/sinks/databend/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use snafu::Snafu;
use tower::Service;
use vector_common::finalization::{EventFinalizers, EventStatus, Finalizable};
use vector_common::request_metadata::{GroupedCountByteSize, MetaDescriptive, RequestMetadata};
use vector_core::stream::DriverResponse;
use vector_stream::DriverResponse;

use crate::{internal_events::EndpointBytesSent, sinks::util::retries::RetryLogic};

Expand Down
2 changes: 1 addition & 1 deletion src/sinks/datadog/events/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use http::Request;
use hyper::Body;
use tower::{Service, ServiceExt};
use vector_common::request_metadata::{GroupedCountByteSize, MetaDescriptive};
use vector_core::stream::DriverResponse;
use vector_stream::DriverResponse;

use crate::{
event::EventStatus,
Expand Down
6 changes: 2 additions & 4 deletions src/sinks/datadog/logs/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,8 @@ use indexmap::IndexMap;
use tower::Service;
use tracing::Instrument;
use vector_common::request_metadata::{GroupedCountByteSize, MetaDescriptive, RequestMetadata};
use vector_core::{
event::{EventFinalizers, EventStatus, Finalizable},
stream::DriverResponse,
};
use vector_core::event::{EventFinalizers, EventStatus, Finalizable};
use vector_stream::DriverResponse;

use crate::{
http::HttpClient,
Expand Down
6 changes: 2 additions & 4 deletions src/sinks/datadog/metrics/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,8 @@ use hyper::Body;
use snafu::ResultExt;
use tower::Service;
use vector_common::request_metadata::{GroupedCountByteSize, MetaDescriptive, RequestMetadata};
use vector_core::{
event::{EventFinalizers, EventStatus, Finalizable},
stream::DriverResponse,
};
use vector_core::event::{EventFinalizers, EventStatus, Finalizable};
use vector_stream::DriverResponse;

use crate::{
http::{BuildRequestSnafu, CallRequestSnafu, HttpClient},
Expand Down
Loading

0 comments on commit 96f4d73

Please sign in to comment.