Skip to content

Commit

Permalink
chore(vector sink): rewrite vector sink (v2) in the new style (#10148)
Browse files Browse the repository at this point in the history
closes #9445

The V1 version is already in the new style, so this only modified the v2 version.

As part of this work, the batcher was made even more generic. You can now choose the output type (not necessarily just a Vec), and you have full control of how the output is built. Instead of just appending to a Vec, you can build it incrementally in a lambda, "reducer" style. The batch "data", and "limiter" were also split into respective traits so it's easy to override one or the other. You can see an example of the "reducer" style in the sink.rs of the vector sink.

Signed-off-by: Nathan Fox <fuchsnj@gmail.com>
Signed-off-by: Brian L. Troutwine <brian@troutwine.us>
  • Loading branch information
fuchsnj authored Nov 24, 2021
1 parent 5f9b930 commit 00777ee
Show file tree
Hide file tree
Showing 20 changed files with 1,104 additions and 807 deletions.
266 changes: 0 additions & 266 deletions lib/vector-core/src/stream/batcher.rs

This file was deleted.

81 changes: 81 additions & 0 deletions lib/vector-core/src/stream/batcher/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
use super::data;
use super::limiter;

use std::time::Duration;

use data::BatchData;
use limiter::BatchLimiter;

pub struct BatchConfigParts<L, D> {
pub batch_limiter: L,
pub batch_data: D,
pub timeout: Duration,
}

pub trait BatchConfig<T> {
type ItemMetadata;
type Batch;

/// Returns the number of elements in the batch
fn len(&self) -> usize;

/// Determines whether the batch is empty or not
fn is_empty(&self) -> bool {
self.len() == 0
}

/// Returns the current batch, and resets any internal state
fn take_batch(&mut self) -> Self::Batch;

/// Adds a single item to the batch, with the given metadata that was calculated by `item_fits_in_batch`
fn push(&mut self, item: T, metadata: Self::ItemMetadata);

/// Returns true if it is not possible for another item to fit in the batch
fn is_batch_full(&self) -> bool;

/// It is safe to assume that `is_batch_full` would return `false` before this is called.
/// You can return arbitrary metadata for an item that will be given back when the item
/// is actually pushed onto the batch. This is useful if there is an expensive calculation
/// to determine the "size" of the item.
fn item_fits_in_batch(&self, item: &T) -> (bool, Self::ItemMetadata);

/// Returns the maximum amount of time to wait for inputs to a single batch.
/// The timer starts when the first item is received for a batch.
fn timeout(&self) -> Duration;
}

impl<T, L, B> BatchConfig<T> for BatchConfigParts<L, B>
where
L: BatchLimiter<T, B>,
B: BatchData<T>,
{
type ItemMetadata = L::ItemMetadata;
type Batch = B::Batch;

fn len(&self) -> usize {
self.batch_data.len()
}

fn take_batch(&mut self) -> Self::Batch {
self.batch_limiter.reset();
self.batch_data.take_batch()
}

fn push(&mut self, item: T, metadata: Self::ItemMetadata) {
self.batch_data.push_item(item);
self.batch_limiter.push_item(metadata);
}

fn is_batch_full(&self) -> bool {
self.batch_limiter.is_batch_full(&self.batch_data)
}

fn item_fits_in_batch(&self, item: &T) -> (bool, Self::ItemMetadata) {
self.batch_limiter
.item_fits_in_batch(item, &self.batch_data)
}

fn timeout(&self) -> Duration {
self.timeout
}
}
Loading

0 comments on commit 00777ee

Please sign in to comment.