Skip to content

Commit

Permalink
chore(buffers): remove LevelDB-based disk_v1 buffer impl (#15928)
Browse files Browse the repository at this point in the history
  • Loading branch information
tobz authored Jan 12, 2023
1 parent f0d3f1b commit 26e7fe1
Show file tree
Hide file tree
Showing 33 changed files with 41 additions and 2,523 deletions.
2 changes: 1 addition & 1 deletion .cargo/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ rustflags = [
"-Dclippy::dbg_macro",
]

# We need to bring in `libstdc++` for things that build against C++ (LevelDB, librdkafka, etc) which comes along in the
# We need to bring in `libstdc++` for things that build against C++ (librdkafka, etc) which comes along in the
# `cross` base image but _isn't_ in a path searched by the linker normally. Additionally, our custom Docker image that
# we base on the `cross` image moves `libstdc++` into this custom-looking directory to avoid some _other_ libraries
# included in the `cross` base image from having a higher precedence than some of the "self-contained" libraries that
Expand Down
31 changes: 0 additions & 31 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 0 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -369,8 +369,6 @@ wiremock = "0.5.16"
zstd = { version = "0.12.2", default-features = false }

[patch.crates-io]
# A patch for lib/vector-core/buffers, addresses Issue 7514
leveldb-sys = { git = "https://github.com/vectordotdev/leveldb-sys.git", branch = "leveldb_mmap_limit" }
# Removes dependency on `time` v0.1
# https://github.com/chronotope/chrono/pull/578
chrono = { git = "https://github.com/vectordotdev/chrono.git", branch = "no-default-time-v0.4.22-1" }
Expand Down
3 changes: 0 additions & 3 deletions lib/vector-buffers/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,11 @@ bytes = { version = "1.3.0", default-features = false }
crc32fast = { version = "1.3.2", default-features = false }
crossbeam-queue = { version = "0.3.8", default-features = false, features = ["std"] }
crossbeam-utils = { version = "0.8.14", default-features = false }
db-key = { version = "0.0.5", default-features = false }
fslock = { version = "0.2.1", default-features = false, features = ["std"] }
futures = { version = "0.3.25", default-features = false, features = ["std"] }
leveldb = { version = "0.8.6", default-features = false }
memmap2 = { version = "0.5.8", default-features = false }
metrics = "0.20.1"
num-traits = { version = "0.2.15", default-features = false }
parking_lot = { version = "0.12.1", default-features = false }
pin-project = { version = "1.0.12", default-features = false }
rkyv = { version = "0.7.39", default-features = false, features = ["size_32", "std", "strict", "validation"] }
serde = { version = "1.0.152", default-features = false, features = ["derive"] }
Expand Down
25 changes: 0 additions & 25 deletions lib/vector-buffers/benches/sized_records.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,6 @@ impl Drop for PathGuard {
}
}

fn create_disk_v1_variant(_max_events: usize, max_size: u64) -> BufferType {
BufferType::DiskV1 {
max_size: NonZeroU64::new(max_size).unwrap(),
when_full: WhenFull::DropNewest,
}
}

fn create_disk_v2_variant(_max_events: usize, max_size: u64) -> BufferType {
BufferType::DiskV2 {
max_size: NonZeroU64::new(max_size).unwrap(),
Expand Down Expand Up @@ -144,15 +137,6 @@ macro_rules! experiment {

/// Writes all messages into the buffer, and then reads them all out.
fn write_then_read(c: &mut Criterion) {
experiment!(
c,
[32, 64, 128, 256, 512, 1024],
"buffer-disk-v1",
"write-then-read",
wtr_measurement,
create_disk_v1_variant
);

experiment!(
c,
[32, 64, 128, 256, 512, 1024],
Expand All @@ -174,15 +158,6 @@ fn write_then_read(c: &mut Criterion) {

/// Writes a message, and then reads a message, until all messages are gone.
fn write_and_read(c: &mut Criterion) {
experiment!(
c,
[32, 64, 128, 256, 512, 1024],
"buffer-disk-v1",
"write-and-read",
war_measurement,
create_disk_v1_variant
);

experiment!(
c,
[32, 64, 128, 256, 512, 1024],
Expand Down
10 changes: 0 additions & 10 deletions lib/vector-buffers/examples/buffer_perf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,16 +265,6 @@ where
when_full,
}
}
"disk-v1" => {
info!(
"[buffer-perf] creating disk v1 buffer with max_size={}, in blocking mode",
max_size_bytes
);
BufferType::DiskV1 {
max_size: max_size_bytes,
when_full,
}
}
"disk-v2" => {
info!(
"[buffer-perf] creating disk v2 buffer with max_size={}, in blocking mode",
Expand Down
60 changes: 1 addition & 59 deletions lib/vector-buffers/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use crate::{
builder::{TopologyBuilder, TopologyError},
channel::{BufferReceiver, BufferSender},
},
variants::{DiskV1Buffer, DiskV2Buffer, MemoryBuffer},
variants::{DiskV2Buffer, MemoryBuffer},
Bufferable, WhenFull,
};

Expand All @@ -34,8 +34,6 @@ pub enum BufferBuildError {
enum BufferTypeKind {
#[serde(rename = "memory")]
Memory,
#[serde(rename = "disk_v1")]
DiskV1,
#[serde(rename = "disk")]
DiskV2,
}
Expand Down Expand Up @@ -99,18 +97,6 @@ impl BufferTypeVisitor {
when_full,
})
}
BufferTypeKind::DiskV1 => {
if max_events.is_some() {
return Err(de::Error::unknown_field(
"max_events",
&["type", "max_size", "when_full"],
));
}
Ok(BufferType::DiskV1 {
max_size: max_size.ok_or_else(|| de::Error::missing_field("max_size"))?,
when_full,
})
}
BufferTypeKind::DiskV2 => {
if max_events.is_some() {
return Err(de::Error::unknown_field(
Expand Down Expand Up @@ -211,24 +197,6 @@ pub enum BufferType {
when_full: WhenFull,
},

/// A buffer stage backed by an on-disk database, powered by LevelDB.
///
/// This is less performant, but more durable. Data that has been synchronized to disk will not
/// be lost if Vector is restarted forcefully or crashes.
#[configurable(deprecated)]
#[configurable(title = "Events are buffered on disk. (version 1)")]
#[serde(rename = "disk_v1")]
DiskV1 {
/// The maximum size of the buffer on disk.
///
/// Must be at least ~256 megabytes (268435488 bytes).
max_size: NonZeroU64,

#[configurable(derived)]
#[serde(default)]
when_full: WhenFull,
},

/// A buffer stage backed by disk.
///
/// This is less performant, but more durable. Data that has been synchronized to disk will not
Expand Down Expand Up @@ -277,14 +245,6 @@ impl BufferType {
None => None,
Some(global_data_dir) => match self {
Self::Memory { .. } => None,
Self::DiskV1 { max_size, .. } => {
let data_dir = crate::variants::disk_v1::get_new_style_buffer_dir_path(
&global_data_dir,
id.id(),
);

Some(DiskUsage::new(id.clone(), data_dir, *max_size))
}
Self::DiskV2 { max_size, .. } => {
let data_dir = crate::variants::disk_v2::get_disk_v2_data_dir_path(
&global_data_dir,
Expand Down Expand Up @@ -319,13 +279,6 @@ impl BufferType {
} => {
builder.stage(MemoryBuffer::new(max_events), when_full);
}
BufferType::DiskV1 {
when_full,
max_size,
} => {
let data_dir = data_dir.ok_or(BufferBuildError::RequiresDataDir)?;
builder.stage(DiskV1Buffer::new(id, data_dir, max_size), when_full);
}
BufferType::DiskV2 {
when_full,
max_size,
Expand Down Expand Up @@ -514,17 +467,6 @@ max_events: 42

#[test]
fn ensure_field_defaults_for_all_types() {
check_single_stage(
r#"
type: disk_v1
max_size: 1024
"#,
BufferType::DiskV1 {
max_size: NonZeroU64::new(1024).unwrap(),
when_full: WhenFull::Block,
},
);

check_single_stage(
r#"
type: memory
Expand Down
4 changes: 0 additions & 4 deletions lib/vector-buffers/src/test/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,10 +277,6 @@ impl FixedEncodable for MultiEventRecord {
message_wrapper!(PoisonPillMultiEventRecord: u32, |m: &Self| m.0);

impl PoisonPillMultiEventRecord {
pub fn poisoned() -> Self {
Self::new(42)
}

pub fn encoded_size(&self) -> usize {
usize::try_from(self.0).unwrap_or(usize::MAX) + std::mem::size_of::<u32>()
}
Expand Down
57 changes: 8 additions & 49 deletions lib/vector-buffers/src/test/variant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::{
builder::TopologyBuilder,
channel::{BufferReceiver, BufferSender},
},
variants::{DiskV1Buffer, DiskV2Buffer, MemoryBuffer},
variants::{DiskV2Buffer, MemoryBuffer},
Bufferable, WhenFull,
};

Expand All @@ -34,12 +34,6 @@ pub enum Variant {
max_events: NonZeroUsize,
when_full: WhenFull,
},
DiskV1 {
max_size: NonZeroU64,
when_full: WhenFull,
data_dir: PathBuf,
id: String,
},
DiskV2 {
max_size: NonZeroU64,
when_full: WhenFull,
Expand All @@ -62,17 +56,6 @@ impl Variant {
} => {
builder.stage(MemoryBuffer::new(*max_events), *when_full);
}
Variant::DiskV1 {
max_size,
when_full,
data_dir,
id,
} => {
builder.stage(
DiskV1Buffer::new(id.clone(), data_dir.clone(), *max_size),
*when_full,
);
}
Variant::DiskV2 {
max_size,
when_full,
Expand Down Expand Up @@ -117,7 +100,7 @@ impl Arbitrary for Id {
#[cfg(test)]
impl Arbitrary for Variant {
fn arbitrary(g: &mut Gen) -> Self {
let idx = usize::arbitrary(g) % 3;
let use_memory_buffer = bool::arbitrary(g);

// Using a u16 ensures we avoid any allocation errors for our holding buffers, etc.
let max_events = NonZeroU16::arbitrary(g)
Expand All @@ -129,24 +112,18 @@ impl Arbitrary for Variant {

let when_full = WhenFull::arbitrary(g);

match idx {
0 => Variant::Memory {
if use_memory_buffer {
Variant::Memory {
max_events,
when_full,
},
1 => Variant::DiskV1 {
max_size,
when_full,
id: Id::arbitrary(g).inner,
data_dir: PathBuf::arbitrary(g),
},
2 => Variant::DiskV2 {
}
} else {
Variant::DiskV2 {
max_size,
when_full,
id: Id::arbitrary(g).inner,
data_dir: PathBuf::arbitrary(g),
},
_ => unreachable!("idx divisor should be 3"),
}
}
}

Expand All @@ -163,24 +140,6 @@ impl Arbitrary for Variant {
when_full,
}))
}
Variant::DiskV1 {
max_size,
when_full,
id,
data_dir,
..
} => {
let max_size = *max_size;
let when_full = *when_full;
let id = id.clone();
let data_dir = data_dir.clone();
Box::new(max_size.shrink().map(move |ms| Variant::DiskV1 {
max_size: ms,
when_full,
id: id.clone(),
data_dir: data_dir.clone(),
}))
}
Variant::DiskV2 {
max_size,
when_full,
Expand Down
Loading

0 comments on commit 26e7fe1

Please sign in to comment.