From d33435fa849590e88ddb27259283bb4db2d76a63 Mon Sep 17 00:00:00 2001 From: niebayes Date: Wed, 3 Apr 2024 11:16:05 +0800 Subject: [PATCH] feat: introduce wal benchmarker (#3446) * feat: introduce wal benchmarker * chore: add log store metrics * chore: add some comments to wal benchmarker * fix: ci * chore: add more metrics for kafka logstore * chore: add more timers for kafka logstore * chore: add more configs * chore: move humantime to common dependencies * refactor: refactor wal benchmarker * fix: apply suggestions from code review * doc: add a simple README for wal benchmarker * fix: Cargo.toml * fix: clippy * chore: rename wal.rs to wal_bench.rs * fix: compile --- Cargo.lock | 22 ++ Cargo.toml | 2 + benchmarks/Cargo.toml | 20 ++ benchmarks/README.md | 11 + benchmarks/config/wal_bench.example.toml | 21 ++ benchmarks/src/bin/wal_bench.rs | 326 +++++++++++++++++++ benchmarks/src/lib.rs | 16 + benchmarks/src/metrics.rs | 39 +++ benchmarks/src/wal_bench.rs | 361 +++++++++++++++++++++ src/log-store/Cargo.toml | 2 + src/log-store/src/kafka.rs | 29 ++ src/log-store/src/kafka/log_store.rs | 18 +- src/log-store/src/kafka/util/record.rs | 8 + src/log-store/src/lib.rs | 1 + src/log-store/src/metrics.rs | 107 ++++++ src/log-store/src/noop.rs | 4 + src/log-store/src/raft_engine.rs | 22 ++ src/log-store/src/raft_engine/log_store.rs | 15 +- src/meta-srv/Cargo.toml | 2 +- src/query/Cargo.toml | 2 +- src/store-api/src/logstore/entry.rs | 3 + src/store-api/src/logstore/entry_stream.rs | 5 + src/table/Cargo.toml | 2 +- tests-fuzz/Cargo.toml | 1 + tests-integration/Cargo.toml | 2 +- 25 files changed, 1035 insertions(+), 6 deletions(-) create mode 100644 benchmarks/README.md create mode 100644 benchmarks/config/wal_bench.example.toml create mode 100644 benchmarks/src/bin/wal_bench.rs create mode 100644 benchmarks/src/lib.rs create mode 100644 benchmarks/src/metrics.rs create mode 100644 benchmarks/src/wal_bench.rs create mode 100644 src/log-store/src/metrics.rs diff --git a/Cargo.lock b/Cargo.lock index 150b72157651..397459043901 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -876,14 +876,34 @@ dependencies = [ name = "benchmarks" version = "0.7.1" dependencies = [ + "api", "arrow", "chrono", "clap 4.4.11", "client", + "common-base", + "common-telemetry", + "common-wal", + "dotenv", + "futures", "futures-util", + "humantime", + "humantime-serde", "indicatif", + "itertools 0.10.5", + "lazy_static", + "log-store", + "mito2", + "num_cpus", "parquet", + "prometheus", + "rand", + "rskafka", + "serde", + "store-api", "tokio", + "toml 0.8.8", + "uuid", ] [[package]] @@ -4845,6 +4865,8 @@ dependencies = [ "futures", "futures-util", "itertools 0.10.5", + "lazy_static", + "prometheus", "protobuf", "protobuf-build", "raft-engine", diff --git a/Cargo.toml b/Cargo.toml index 5140c5729d71..0bf04f17b82a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -99,11 +99,13 @@ datafusion-physical-expr = { git = "https://github.com/apache/arrow-datafusion.g datafusion-sql = { git = "https://github.com/apache/arrow-datafusion.git", rev = "26e43acac3a96cec8dd4c8365f22dfb1a84306e9" } datafusion-substrait = { git = "https://github.com/apache/arrow-datafusion.git", rev = "26e43acac3a96cec8dd4c8365f22dfb1a84306e9" } derive_builder = "0.12" +dotenv = "0.15" etcd-client = "0.12" fst = "0.4.7" futures = "0.3" futures-util = "0.3" greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "06f6297ff3cab578a1589741b504342fbad70453" } +humantime = "2.1" humantime-serde = "1.1" itertools = "0.10" lazy_static = "1.4" diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml index 862579b26dfb..18b44e944858 100644 --- a/benchmarks/Cargo.toml +++ b/benchmarks/Cargo.toml @@ -8,11 +8,31 @@ license.workspace = true workspace = true [dependencies] +api.workspace = true arrow.workspace = true chrono.workspace = true clap.workspace = true client.workspace = true +common-base.workspace = true +common-telemetry.workspace = true +common-wal.workspace = true +dotenv.workspace = true +futures.workspace = true futures-util.workspace = true +humantime.workspace = true +humantime-serde.workspace = true indicatif = "0.17.1" +itertools.workspace = true +lazy_static.workspace = true +log-store.workspace = true +mito2.workspace = true +num_cpus.workspace = true parquet.workspace = true +prometheus.workspace = true +rand.workspace = true +rskafka.workspace = true +serde.workspace = true +store-api.workspace = true tokio.workspace = true +toml.workspace = true +uuid.workspace = true diff --git a/benchmarks/README.md b/benchmarks/README.md new file mode 100644 index 000000000000..c281af38293e --- /dev/null +++ b/benchmarks/README.md @@ -0,0 +1,11 @@ +Benchmarkers for GreptimeDB +-------------------------------- + +## Wal Benchmarker +The wal benchmarker serves to evaluate the performance of GreptimeDB's Write-Ahead Log (WAL) component. It meticulously assesses the read/write performance of the WAL under diverse workloads generated by the benchmarker. + + +### How to use +To compile the benchmarker, navigate to the `greptimedb/benchmarks` directory and execute `cargo build --release`. Subsequently, you'll find the compiled target located at `greptimedb/target/release/wal_bench`. + +The `./wal_bench -h` command reveals numerous arguments that the target accepts. Among these, a notable one is the `cfg-file` argument. By utilizing a configuration file in the TOML format, you can bypass the need to repeatedly specify cumbersome arguments. diff --git a/benchmarks/config/wal_bench.example.toml b/benchmarks/config/wal_bench.example.toml new file mode 100644 index 000000000000..72faed0d7410 --- /dev/null +++ b/benchmarks/config/wal_bench.example.toml @@ -0,0 +1,21 @@ +# Refers to the documents of `Args` in benchmarks/src/wal.rs`. +wal_provider = "kafka" +bootstrap_brokers = ["localhost:9092"] +num_workers = 10 +num_topics = 32 +num_regions = 1000 +num_scrapes = 1000 +num_rows = 5 +col_types = "ifs" +max_batch_size = "512KB" +linger = "1ms" +backoff_init = "10ms" +backoff_max = "1ms" +backoff_base = 2 +backoff_deadline = "3s" +compression = "zstd" +rng_seed = 42 +skip_read = false +skip_write = false +random_topics = true +report_metrics = false diff --git a/benchmarks/src/bin/wal_bench.rs b/benchmarks/src/bin/wal_bench.rs new file mode 100644 index 000000000000..6caa7b699871 --- /dev/null +++ b/benchmarks/src/bin/wal_bench.rs @@ -0,0 +1,326 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#![feature(int_roundings)] + +use std::fs; +use std::sync::Arc; +use std::time::Instant; + +use api::v1::{ColumnDataType, ColumnSchema, SemanticType}; +use benchmarks::metrics; +use benchmarks::wal_bench::{Args, Config, Region, WalProvider}; +use clap::Parser; +use common_telemetry::info; +use common_wal::config::kafka::common::BackoffConfig; +use common_wal::config::kafka::DatanodeKafkaConfig as KafkaConfig; +use common_wal::config::raft_engine::RaftEngineConfig; +use common_wal::options::{KafkaWalOptions, WalOptions}; +use itertools::Itertools; +use log_store::kafka::log_store::KafkaLogStore; +use log_store::raft_engine::log_store::RaftEngineLogStore; +use mito2::wal::Wal; +use prometheus::{Encoder, TextEncoder}; +use rand::distributions::{Alphanumeric, DistString}; +use rand::rngs::SmallRng; +use rand::SeedableRng; +use rskafka::client::partition::Compression; +use rskafka::client::ClientBuilder; +use store_api::logstore::LogStore; +use store_api::storage::RegionId; + +async fn run_benchmarker(cfg: &Config, topics: &[String], wal: Arc>) { + let chunk_size = cfg.num_regions.div_ceil(cfg.num_workers); + let region_chunks = (0..cfg.num_regions) + .map(|id| { + build_region( + id as u64, + topics, + &mut SmallRng::seed_from_u64(cfg.rng_seed), + cfg, + ) + }) + .chunks(chunk_size as usize) + .into_iter() + .map(|chunk| Arc::new(chunk.collect::>())) + .collect::>(); + + let mut write_elapsed = 0; + let mut read_elapsed = 0; + + if !cfg.skip_write { + info!("Benchmarking write ..."); + + let num_scrapes = cfg.num_scrapes; + let timer = Instant::now(); + futures::future::join_all((0..cfg.num_workers).map(|i| { + let wal = wal.clone(); + let regions = region_chunks[i as usize].clone(); + tokio::spawn(async move { + for _ in 0..num_scrapes { + let mut wal_writer = wal.writer(); + regions + .iter() + .for_each(|region| region.add_wal_entry(&mut wal_writer)); + wal_writer.write_to_wal().await.unwrap(); + } + }) + })) + .await; + write_elapsed += timer.elapsed().as_millis(); + } + + if !cfg.skip_read { + info!("Benchmarking read ..."); + + let timer = Instant::now(); + futures::future::join_all((0..cfg.num_workers).map(|i| { + let wal = wal.clone(); + let regions = region_chunks[i as usize].clone(); + tokio::spawn(async move { + for region in regions.iter() { + region.replay(&wal).await; + } + }) + })) + .await; + read_elapsed = timer.elapsed().as_millis(); + } + + dump_report(cfg, write_elapsed, read_elapsed); +} + +fn build_region(id: u64, topics: &[String], rng: &mut SmallRng, cfg: &Config) -> Region { + let wal_options = match cfg.wal_provider { + WalProvider::Kafka => { + assert!(!topics.is_empty()); + WalOptions::Kafka(KafkaWalOptions { + topic: topics.get(id as usize % topics.len()).cloned().unwrap(), + }) + } + WalProvider::RaftEngine => WalOptions::RaftEngine, + }; + Region::new( + RegionId::from_u64(id), + build_schema(&parse_col_types(&cfg.col_types), rng), + wal_options, + cfg.num_rows, + cfg.rng_seed, + ) +} + +fn build_schema(col_types: &[ColumnDataType], mut rng: &mut SmallRng) -> Vec { + col_types + .iter() + .map(|col_type| ColumnSchema { + column_name: Alphanumeric.sample_string(&mut rng, 5), + datatype: *col_type as i32, + semantic_type: SemanticType::Field as i32, + datatype_extension: None, + }) + .chain(vec![ColumnSchema { + column_name: "ts".to_string(), + datatype: ColumnDataType::TimestampMillisecond as i32, + semantic_type: SemanticType::Tag as i32, + datatype_extension: None, + }]) + .collect() +} + +fn dump_report(cfg: &Config, write_elapsed: u128, read_elapsed: u128) { + let cost_report = format!( + "write costs: {} ms, read costs: {} ms", + write_elapsed, read_elapsed, + ); + + let total_written_bytes = metrics::METRIC_WAL_WRITE_BYTES_TOTAL.get() as u128; + let write_throughput = if write_elapsed > 0 { + (total_written_bytes * 1000).div_floor(write_elapsed) + } else { + 0 + }; + let total_read_bytes = metrics::METRIC_WAL_READ_BYTES_TOTAL.get() as u128; + let read_throughput = if read_elapsed > 0 { + (total_read_bytes * 1000).div_floor(read_elapsed) + } else { + 0 + }; + + let throughput_report = format!( + "total written bytes: {} bytes, total read bytes: {} bytes, write throuput: {} bytes/s ({} mb/s), read throughput: {} bytes/s ({} mb/s)", + total_written_bytes, + total_read_bytes, + write_throughput, + write_throughput.div_floor(1 << 20), + read_throughput, + read_throughput.div_floor(1 << 20), + ); + + let metrics_report = if cfg.report_metrics { + let mut buffer = Vec::new(); + let encoder = TextEncoder::new(); + let metrics = prometheus::gather(); + encoder.encode(&metrics, &mut buffer).unwrap(); + String::from_utf8(buffer).unwrap() + } else { + String::new() + }; + + info!( + r#" +Benchmark config: +{cfg:?} + +Benchmark report: +{cost_report} +{throughput_report} +{metrics_report}"# + ); +} + +async fn create_topics(cfg: &Config) -> Vec { + // Creates topics. + let client = ClientBuilder::new(cfg.bootstrap_brokers.clone()) + .build() + .await + .unwrap(); + let ctrl_client = client.controller_client().unwrap(); + let (topics, tasks): (Vec<_>, Vec<_>) = (0..cfg.num_topics) + .map(|i| { + let topic = if cfg.random_topics { + format!( + "greptime_wal_bench_topic_{}_{}", + uuid::Uuid::new_v4().as_u128(), + i + ) + } else { + format!("greptime_wal_bench_topic_{}", i) + }; + let task = ctrl_client.create_topic( + topic.clone(), + 1, + cfg.bootstrap_brokers.len() as i16, + 2000, + ); + (topic, task) + }) + .unzip(); + // Must ignore errors since we allow topics being created more than once. + let _ = futures::future::try_join_all(tasks).await; + + topics +} + +fn parse_compression(comp: &str) -> Compression { + match comp { + "no" => Compression::NoCompression, + "gzip" => Compression::Gzip, + "lz4" => Compression::Lz4, + "snappy" => Compression::Snappy, + "zstd" => Compression::Zstd, + other => unreachable!("Unrecognized compression {other}"), + } +} + +fn parse_col_types(col_types: &str) -> Vec { + let parts = col_types.split('x').collect::>(); + assert!(parts.len() <= 2); + + let pattern = parts[0]; + let repeat = parts + .get(1) + .map(|r| r.parse::().unwrap()) + .unwrap_or(1); + + pattern + .chars() + .map(|c| match c { + 'i' | 'I' => ColumnDataType::Int64, + 'f' | 'F' => ColumnDataType::Float64, + 's' | 'S' => ColumnDataType::String, + other => unreachable!("Cannot parse {other} as a column data type"), + }) + .cycle() + .take(pattern.len() * repeat) + .collect() +} + +fn main() { + // Sets the global logging to INFO and suppress loggings from rskafka other than ERROR and upper ones. + std::env::set_var("UNITTEST_LOG_LEVEL", "info,rskafka=error"); + common_telemetry::init_default_ut_logging(); + + let args = Args::parse(); + let cfg = if !args.cfg_file.is_empty() { + toml::from_str(&fs::read_to_string(&args.cfg_file).unwrap()).unwrap() + } else { + Config::from(args) + }; + + // Validates arguments. + if cfg.num_regions < cfg.num_workers { + panic!("num_regions must be greater than or equal to num_workers"); + } + if cfg + .num_workers + .min(cfg.num_topics) + .min(cfg.num_regions) + .min(cfg.num_scrapes) + .min(cfg.max_batch_size.as_bytes() as u32) + .min(cfg.bootstrap_brokers.len() as u32) + == 0 + { + panic!("Invalid arguments"); + } + + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap() + .block_on(async { + match cfg.wal_provider { + WalProvider::Kafka => { + let topics = create_topics(&cfg).await; + let kafka_cfg = KafkaConfig { + broker_endpoints: cfg.bootstrap_brokers.clone(), + max_batch_size: cfg.max_batch_size, + linger: cfg.linger, + backoff: BackoffConfig { + init: cfg.backoff_init, + max: cfg.backoff_max, + base: cfg.backoff_base, + deadline: Some(cfg.backoff_deadline), + }, + compression: parse_compression(&cfg.compression), + ..Default::default() + }; + let store = Arc::new(KafkaLogStore::try_new(&kafka_cfg).await.unwrap()); + let wal = Arc::new(Wal::new(store)); + run_benchmarker(&cfg, &topics, wal).await; + } + WalProvider::RaftEngine => { + // The benchmarker assumes the raft engine directory exists. + let store = RaftEngineLogStore::try_new( + "/tmp/greptimedb/raft-engine-wal".to_string(), + RaftEngineConfig::default(), + ) + .await + .map(Arc::new) + .unwrap(); + let wal = Arc::new(Wal::new(store)); + run_benchmarker(&cfg, &[], wal).await; + } + } + }); +} diff --git a/benchmarks/src/lib.rs b/benchmarks/src/lib.rs new file mode 100644 index 000000000000..bab08887f765 --- /dev/null +++ b/benchmarks/src/lib.rs @@ -0,0 +1,16 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod metrics; +pub mod wal_bench; diff --git a/benchmarks/src/metrics.rs b/benchmarks/src/metrics.rs new file mode 100644 index 000000000000..d65fd1eb9aa0 --- /dev/null +++ b/benchmarks/src/metrics.rs @@ -0,0 +1,39 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use lazy_static::lazy_static; +use prometheus::*; + +/// Logstore label. +pub const LOGSTORE_LABEL: &str = "logstore"; +/// Operation type label. +pub const OPTYPE_LABEL: &str = "optype"; + +lazy_static! { + /// Counters of bytes of each operation on a logstore. + pub static ref METRIC_WAL_OP_BYTES_TOTAL: IntCounterVec = register_int_counter_vec!( + "greptime_bench_wal_op_bytes_total", + "wal operation bytes total", + &[OPTYPE_LABEL], + ) + .unwrap(); + /// Counter of bytes of the append_batch operation. + pub static ref METRIC_WAL_WRITE_BYTES_TOTAL: IntCounter = METRIC_WAL_OP_BYTES_TOTAL.with_label_values( + &["write"], + ); + /// Counter of bytes of the read operation. + pub static ref METRIC_WAL_READ_BYTES_TOTAL: IntCounter = METRIC_WAL_OP_BYTES_TOTAL.with_label_values( + &["read"], + ); +} diff --git a/benchmarks/src/wal_bench.rs b/benchmarks/src/wal_bench.rs new file mode 100644 index 000000000000..10e88f99f37c --- /dev/null +++ b/benchmarks/src/wal_bench.rs @@ -0,0 +1,361 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::mem::size_of; +use std::sync::atomic::{AtomicI64, AtomicU64, Ordering}; +use std::sync::{Arc, Mutex}; +use std::time::Duration; + +use api::v1::value::ValueData; +use api::v1::{ColumnDataType, ColumnSchema, Mutation, OpType, Row, Rows, Value, WalEntry}; +use clap::{Parser, ValueEnum}; +use common_base::readable_size::ReadableSize; +use common_wal::options::WalOptions; +use futures::StreamExt; +use mito2::wal::{Wal, WalWriter}; +use rand::distributions::{Alphanumeric, DistString, Uniform}; +use rand::rngs::SmallRng; +use rand::{Rng, SeedableRng}; +use serde::{Deserialize, Serialize}; +use store_api::logstore::LogStore; +use store_api::storage::RegionId; + +use crate::metrics; + +/// The wal provider. +#[derive(Clone, ValueEnum, Default, Debug, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum WalProvider { + #[default] + RaftEngine, + Kafka, +} + +#[derive(Parser)] +pub struct Args { + /// The provided configuration file. + /// The example configuration file can be found at `greptimedb/benchmarks/config/wal_bench.example.toml`. + #[clap(long, short = 'c')] + pub cfg_file: String, + + /// The wal provider. + #[clap(long, value_enum, default_value_t = WalProvider::default())] + pub wal_provider: WalProvider, + + /// The advertised addresses of the kafka brokers. + /// If there're multiple bootstrap brokers, their addresses should be separated by comma, for e.g. "localhost:9092,localhost:9093". + #[clap(long, short = 'b', default_value = "localhost:9092")] + pub bootstrap_brokers: String, + + /// The number of workers each running in a dedicated thread. + #[clap(long, default_value_t = num_cpus::get() as u32)] + pub num_workers: u32, + + /// The number of kafka topics to be created. + #[clap(long, default_value_t = 32)] + pub num_topics: u32, + + /// The number of regions. + #[clap(long, default_value_t = 1000)] + pub num_regions: u32, + + /// The number of times each region is scraped. + #[clap(long, default_value_t = 1000)] + pub num_scrapes: u32, + + /// The number of rows in each wal entry. + /// Each time a region is scraped, a wal entry containing will be produced. + #[clap(long, default_value_t = 5)] + pub num_rows: u32, + + /// The column types of the schema for each region. + /// Currently, three column types are supported: + /// - i = ColumnDataType::Int64 + /// - f = ColumnDataType::Float64 + /// - s = ColumnDataType::String + /// For e.g., "ifs" will be parsed as three columns: i64, f64, and string. + /// + /// Additionally, a "x" sign can be provided to repeat the column types for a given number of times. + /// For e.g., "iix2" will be parsed as 4 columns: i64, i64, i64, and i64. + /// This feature is useful if you want to specify many columns. + #[clap(long, default_value = "ifs")] + pub col_types: String, + + /// The maximum size of a batch of kafka records. + /// The default value is 1mb. + #[clap(long, default_value = "512KB")] + pub max_batch_size: ReadableSize, + + /// The minimum latency the kafka client issues a batch of kafka records. + /// However, a batch of kafka records would be immediately issued if a record cannot be fit into the batch. + #[clap(long, default_value = "1ms")] + pub linger: String, + + /// The initial backoff delay of the kafka consumer. + #[clap(long, default_value = "10ms")] + pub backoff_init: String, + + /// The maximum backoff delay of the kafka consumer. + #[clap(long, default_value = "1s")] + pub backoff_max: String, + + /// The exponential backoff rate of the kafka consumer. The next back off = base * the current backoff. + #[clap(long, default_value_t = 2)] + pub backoff_base: u32, + + /// The deadline of backoff. The backoff ends if the total backoff delay reaches the deadline. + #[clap(long, default_value = "3s")] + pub backoff_deadline: String, + + /// The client-side compression algorithm for kafka records. + #[clap(long, default_value = "zstd")] + pub compression: String, + + /// The seed of random number generators. + #[clap(long, default_value_t = 42)] + pub rng_seed: u64, + + /// Skips the read phase, aka. region replay, if set to true. + #[clap(long, default_value_t = false)] + pub skip_read: bool, + + /// Skips the write phase if set to true. + #[clap(long, default_value_t = false)] + pub skip_write: bool, + + /// Randomly generates topic names if set to true. + /// Useful when you want to run the benchmarker without worrying about the topics created before. + #[clap(long, default_value_t = false)] + pub random_topics: bool, + + /// Logs out the gathered prometheus metrics when the benchmarker ends. + #[clap(long, default_value_t = false)] + pub report_metrics: bool, +} + +/// Benchmarker config. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Config { + pub wal_provider: WalProvider, + pub bootstrap_brokers: Vec, + pub num_workers: u32, + pub num_topics: u32, + pub num_regions: u32, + pub num_scrapes: u32, + pub num_rows: u32, + pub col_types: String, + pub max_batch_size: ReadableSize, + #[serde(with = "humantime_serde")] + pub linger: Duration, + #[serde(with = "humantime_serde")] + pub backoff_init: Duration, + #[serde(with = "humantime_serde")] + pub backoff_max: Duration, + pub backoff_base: u32, + #[serde(with = "humantime_serde")] + pub backoff_deadline: Duration, + pub compression: String, + pub rng_seed: u64, + pub skip_read: bool, + pub skip_write: bool, + pub random_topics: bool, + pub report_metrics: bool, +} + +impl From for Config { + fn from(args: Args) -> Self { + let cfg = Self { + wal_provider: args.wal_provider, + bootstrap_brokers: args + .bootstrap_brokers + .split(',') + .map(ToString::to_string) + .collect::>(), + num_workers: args.num_workers.min(num_cpus::get() as u32), + num_topics: args.num_topics, + num_regions: args.num_regions, + num_scrapes: args.num_scrapes, + num_rows: args.num_rows, + col_types: args.col_types, + max_batch_size: args.max_batch_size, + linger: humantime::parse_duration(&args.linger).unwrap(), + backoff_init: humantime::parse_duration(&args.backoff_init).unwrap(), + backoff_max: humantime::parse_duration(&args.backoff_max).unwrap(), + backoff_base: args.backoff_base, + backoff_deadline: humantime::parse_duration(&args.backoff_deadline).unwrap(), + compression: args.compression, + rng_seed: args.rng_seed, + skip_read: args.skip_read, + skip_write: args.skip_write, + random_topics: args.random_topics, + report_metrics: args.report_metrics, + }; + + cfg + } +} + +/// The region used for wal benchmarker. +pub struct Region { + id: RegionId, + schema: Vec, + wal_options: WalOptions, + next_sequence: AtomicU64, + next_entry_id: AtomicU64, + next_timestamp: AtomicI64, + rng: Mutex>, + num_rows: u32, +} + +impl Region { + /// Creates a new region. + pub fn new( + id: RegionId, + schema: Vec, + wal_options: WalOptions, + num_rows: u32, + rng_seed: u64, + ) -> Self { + Self { + id, + schema, + wal_options, + next_sequence: AtomicU64::new(1), + next_entry_id: AtomicU64::new(1), + next_timestamp: AtomicI64::new(1655276557000), + rng: Mutex::new(Some(SmallRng::seed_from_u64(rng_seed))), + num_rows, + } + } + + /// Scrapes the region and adds the generated entry to wal. + pub fn add_wal_entry(&self, wal_writer: &mut WalWriter) { + let mutation = Mutation { + op_type: OpType::Put as i32, + sequence: self + .next_sequence + .fetch_add(self.num_rows as u64, Ordering::Relaxed), + rows: Some(self.build_rows()), + }; + let entry = WalEntry { + mutations: vec![mutation], + }; + metrics::METRIC_WAL_WRITE_BYTES_TOTAL.inc_by(Self::entry_estimated_size(&entry) as u64); + + wal_writer + .add_entry( + self.id, + self.next_entry_id.fetch_add(1, Ordering::Relaxed), + &entry, + &self.wal_options, + ) + .unwrap(); + } + + /// Replays the region. + pub async fn replay(&self, wal: &Arc>) { + let mut wal_stream = wal.scan(self.id, 0, &self.wal_options).unwrap(); + while let Some(res) = wal_stream.next().await { + let (_, entry) = res.unwrap(); + metrics::METRIC_WAL_READ_BYTES_TOTAL.inc_by(Self::entry_estimated_size(&entry) as u64); + } + } + + /// Computes the estimated size in bytes of the entry. + pub fn entry_estimated_size(entry: &WalEntry) -> usize { + let wrapper_size = size_of::() + + entry.mutations.capacity() * size_of::() + + size_of::(); + + let rows = entry.mutations[0].rows.as_ref().unwrap(); + + let schema_size = rows.schema.capacity() * size_of::() + + rows + .schema + .iter() + .map(|s| s.column_name.capacity()) + .sum::(); + let values_size = (rows.rows.capacity() * size_of::()) + + rows + .rows + .iter() + .map(|r| r.values.capacity() * size_of::()) + .sum::(); + + wrapper_size + schema_size + values_size + } + + fn build_rows(&self) -> Rows { + let cols = self + .schema + .iter() + .map(|col_schema| { + let col_data_type = ColumnDataType::try_from(col_schema.datatype).unwrap(); + self.build_col(&col_data_type, self.num_rows) + }) + .collect::>(); + + let rows = (0..self.num_rows) + .map(|i| { + let values = cols.iter().map(|col| col[i as usize].clone()).collect(); + Row { values } + }) + .collect(); + + Rows { + schema: self.schema.clone(), + rows, + } + } + + fn build_col(&self, col_data_type: &ColumnDataType, num_rows: u32) -> Vec { + let mut rng_guard = self.rng.lock().unwrap(); + let rng = rng_guard.as_mut().unwrap(); + match col_data_type { + ColumnDataType::TimestampMillisecond => (0..num_rows) + .map(|_| { + let ts = self.next_timestamp.fetch_add(1000, Ordering::Relaxed); + Value { + value_data: Some(ValueData::TimestampMillisecondValue(ts)), + } + }) + .collect(), + ColumnDataType::Int64 => (0..num_rows) + .map(|_| { + let v = rng.sample(Uniform::new(0, 10_000)); + Value { + value_data: Some(ValueData::I64Value(v)), + } + }) + .collect(), + ColumnDataType::Float64 => (0..num_rows) + .map(|_| { + let v = rng.sample(Uniform::new(0.0, 5000.0)); + Value { + value_data: Some(ValueData::F64Value(v)), + } + }) + .collect(), + ColumnDataType::String => (0..num_rows) + .map(|_| { + let v = Alphanumeric.sample_string(rng, 10); + Value { + value_data: Some(ValueData::StringValue(v)), + } + }) + .collect(), + _ => unreachable!(), + } + } +} diff --git a/src/log-store/Cargo.toml b/src/log-store/Cargo.toml index fa98d65daebc..851c3f94f0b9 100644 --- a/src/log-store/Cargo.toml +++ b/src/log-store/Cargo.toml @@ -26,6 +26,8 @@ common-time.workspace = true common-wal.workspace = true futures.workspace = true futures-util.workspace = true +lazy_static.workspace = true +prometheus.workspace = true protobuf = { version = "2", features = ["bytes"] } raft-engine.workspace = true rskafka.workspace = true diff --git a/src/log-store/src/kafka.rs b/src/log-store/src/kafka.rs index ef84b9a68acf..d80a19d5c38f 100644 --- a/src/log-store/src/kafka.rs +++ b/src/log-store/src/kafka.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::mem::size_of; pub(crate) mod client_manager; pub mod log_store; pub(crate) mod util; @@ -69,6 +70,10 @@ impl Entry for EntryImpl { fn namespace(&self) -> Self::Namespace { self.ns.clone() } + + fn estimated_size(&self) -> usize { + size_of::() + self.data.capacity() * size_of::() + self.ns.topic.capacity() + } } impl Display for EntryImpl { @@ -82,3 +87,27 @@ impl Display for EntryImpl { ) } } + +#[cfg(test)] +mod tests { + use std::mem::size_of; + + use store_api::logstore::entry::Entry; + + use crate::kafka::{EntryImpl, NamespaceImpl}; + + #[test] + fn test_estimated_size() { + let entry = EntryImpl { + data: Vec::with_capacity(100), + id: 0, + ns: NamespaceImpl { + region_id: 0, + topic: String::with_capacity(10), + }, + }; + let expected = size_of::() + 100 * size_of::() + 10; + let got = entry.estimated_size(); + assert_eq!(expected, got); + } +} diff --git a/src/log-store/src/kafka/log_store.rs b/src/log-store/src/kafka/log_store.rs index 49b8deb27906..14f7ba8df1bb 100644 --- a/src/log-store/src/kafka/log_store.rs +++ b/src/log-store/src/kafka/log_store.rs @@ -22,7 +22,7 @@ use futures_util::StreamExt; use rskafka::client::consumer::{StartOffset, StreamConsumerBuilder}; use rskafka::client::partition::OffsetAt; use snafu::ResultExt; -use store_api::logstore::entry::Id as EntryId; +use store_api::logstore::entry::{Entry as EntryTrait, Id as EntryId}; use store_api::logstore::entry_stream::SendableEntryStream; use store_api::logstore::namespace::Id as NamespaceId; use store_api::logstore::{AppendBatchResponse, AppendResponse, LogStore}; @@ -32,6 +32,7 @@ use crate::kafka::client_manager::{ClientManager, ClientManagerRef}; use crate::kafka::util::offset::Offset; use crate::kafka::util::record::{maybe_emit_entry, Record, RecordProducer}; use crate::kafka::{EntryImpl, NamespaceImpl}; +use crate::metrics; /// A log store backed by Kafka. #[derive(Debug)] @@ -86,6 +87,15 @@ impl LogStore for KafkaLogStore { /// Appends a batch of entries and returns a response containing a map where the key is a region id /// while the value is the id of the last successfully written entry of the region. async fn append_batch(&self, entries: Vec) -> Result { + metrics::METRIC_KAFKA_APPEND_BATCH_CALLS_TOTAL.inc(); + metrics::METRIC_KAFKA_APPEND_BATCH_BYTES_TOTAL.inc_by( + entries + .iter() + .map(EntryTrait::estimated_size) + .sum::() as u64, + ); + let _timer = metrics::METRIC_KAFKA_APPEND_BATCH_ELAPSED.start_timer(); + if entries.is_empty() { return Ok(AppendBatchResponse::default()); } @@ -124,6 +134,9 @@ impl LogStore for KafkaLogStore { ns: &Self::Namespace, entry_id: EntryId, ) -> Result> { + metrics::METRIC_KAFKA_READ_CALLS_TOTAL.inc(); + let _timer = metrics::METRIC_KAFKA_READ_ELAPSED.start_timer(); + // Gets the client associated with the topic. let client = self .client_manager @@ -183,6 +196,9 @@ impl LogStore for KafkaLogStore { })?; let (kafka_record, offset) = (record_and_offset.record, record_and_offset.offset); + metrics::METRIC_KAFKA_READ_RECORD_BYTES_TOTAL + .inc_by(kafka_record.approximate_size() as u64); + debug!( "Read a record at offset {} for ns {}, high watermark: {}", offset, ns_clone, high_watermark diff --git a/src/log-store/src/kafka/util/record.rs b/src/log-store/src/kafka/util/record.rs index 9bc97557ad39..e2035318c4c7 100644 --- a/src/log-store/src/kafka/util/record.rs +++ b/src/log-store/src/kafka/util/record.rs @@ -25,6 +25,7 @@ use crate::error::{ use crate::kafka::client_manager::ClientManagerRef; use crate::kafka::util::offset::Offset; use crate::kafka::{EntryId, EntryImpl, NamespaceImpl}; +use crate::metrics; /// The current version of Record. pub(crate) const VERSION: u32 = 0; @@ -97,6 +98,7 @@ impl TryFrom for KafkaRecord { } } +// TODO(niebayes): improve the performance of decoding kafka record. impl TryFrom for Record { type Error = crate::error::Error; @@ -150,6 +152,7 @@ impl RecordProducer { /// Produces the buffered entries to Kafka sever. Those entries may span several Kafka records. /// Returns the offset of the last successfully produced record. + // TODO(niebayes): maybe requires more fine-grained metrics to measure stages of writing to kafka. pub(crate) async fn produce(self, client_manager: &ClientManagerRef) -> Result { ensure!(!self.entries.is_empty(), EmptyEntriesSnafu); @@ -173,6 +176,11 @@ impl RecordProducer { for entry in self.entries { for record in build_records(entry, max_record_size) { let kafka_record = KafkaRecord::try_from(record)?; + + metrics::METRIC_KAFKA_PRODUCE_RECORD_COUNTS.inc(); + metrics::METRIC_KAFKA_PRODUCE_RECORD_BYTES_TOTAL + .inc_by(kafka_record.approximate_size() as u64); + // Records of a certain region cannot be produced in parallel since their order must be static. let offset = producer .produce(kafka_record.clone()) diff --git a/src/log-store/src/lib.rs b/src/log-store/src/lib.rs index a57e76850e2d..c035e5fcff80 100644 --- a/src/log-store/src/lib.rs +++ b/src/log-store/src/lib.rs @@ -17,6 +17,7 @@ pub mod error; pub mod kafka; +pub mod metrics; mod noop; pub mod raft_engine; pub mod test_util; diff --git a/src/log-store/src/metrics.rs b/src/log-store/src/metrics.rs new file mode 100644 index 000000000000..bdd03b34c44f --- /dev/null +++ b/src/log-store/src/metrics.rs @@ -0,0 +1,107 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use lazy_static::lazy_static; +use prometheus::*; + +/// Logstore label. +pub const LOGSTORE_LABEL: &str = "logstore"; +/// Operation type label. +pub const OPTYPE_LABEL: &str = "optype"; + +lazy_static! { + /// Counters of bytes of each operation on a logstore. + pub static ref METRIC_LOGSTORE_OP_BYTES_TOTAL: IntCounterVec = register_int_counter_vec!( + "greptime_logstore_op_bytes_total", + "logstore operation bytes total", + &[LOGSTORE_LABEL, OPTYPE_LABEL], + ) + .unwrap(); + /// Counter of bytes of the append_batch operation on the kafka logstore. + pub static ref METRIC_KAFKA_APPEND_BATCH_BYTES_TOTAL: IntCounter = METRIC_LOGSTORE_OP_BYTES_TOTAL.with_label_values( + &["kafka", "append_batch"], + ); + /// Counter of bytes of the read operation on the kafka logstore. + pub static ref METRIC_KAFKA_READ_BYTES_TOTAL: IntCounter = METRIC_LOGSTORE_OP_BYTES_TOTAL.with_label_values( + &["kafka", "read"], + ); + /// Counter of bytes of the append_batch operation on the raft-engine logstore. + pub static ref METRIC_RAFT_ENGINE_APPEND_BATCH_BYTES_TOTAL: IntCounter = METRIC_LOGSTORE_OP_BYTES_TOTAL.with_label_values( + &["raft-engine", "append_batch"], + ); + /// Counter of bytes of the read operation on the raft-engine logstore. + pub static ref METRIC_RAFT_ENGINE_READ_BYTES_TOTAL: IntCounter = METRIC_LOGSTORE_OP_BYTES_TOTAL.with_label_values( + &["raft-engine", "read"], + ); + + /// Counter of bytes of the records read by the kafka logstore. + pub static ref METRIC_KAFKA_READ_RECORD_BYTES_TOTAL: IntCounter = register_int_counter!( + "greptime_kafka_read_record_bytes_total", + "kafka read record bytes total" + ).unwrap(); + + /// Counter of the numbers of the records produced by the kafka logstore. + pub static ref METRIC_KAFKA_PRODUCE_RECORD_COUNTS: IntCounter = register_int_counter!( + "greptime_kafka_produce_record_counts", + "kafka produce record counts", + ).unwrap(); + + /// Counter of bytes of the records produced by the kafka logstore. + pub static ref METRIC_KAFKA_PRODUCE_RECORD_BYTES_TOTAL: IntCounter = register_int_counter!( + "greptime_kafka_produce_record_bytes_total", + "kafka produce record bytes total" + ).unwrap(); + + /// Counters of calls of each operation on a logstore. + pub static ref METRIC_LOGSTORE_OP_CALLS_TOTAL: IntCounterVec = register_int_counter_vec!( + "greptime_logstore_op_calls_total", + "logstore operation calls total", + &[LOGSTORE_LABEL, OPTYPE_LABEL], + ) + .unwrap(); + /// Counter of calls of the append_batch operation on the kafka logstore. + pub static ref METRIC_KAFKA_APPEND_BATCH_CALLS_TOTAL: IntCounter = METRIC_LOGSTORE_OP_CALLS_TOTAL.with_label_values( + &["kafka", "append_batch"], + ); + /// Counter of calls of the read operation on the kafka logstore. + pub static ref METRIC_KAFKA_READ_CALLS_TOTAL: IntCounter = METRIC_LOGSTORE_OP_CALLS_TOTAL.with_label_values( + &["kafka", "read"], + ); + /// Counter of calls of the append_batch operation on the raft-engine logstore. + pub static ref METRIC_RAFT_ENGINE_APPEND_BATCH_CALLS_TOTAL: IntCounter = METRIC_LOGSTORE_OP_CALLS_TOTAL.with_label_values( + &["raft-engine", "append_batch"], + ); + /// Counter of calls of the read operation on the raft-engine logstore. + pub static ref METRIC_RAFT_ENGINE_READ_CALLS_TOTAL: IntCounter = METRIC_LOGSTORE_OP_CALLS_TOTAL.with_label_values( + &["raft-engine", "read"], + ); + + /// Timer of operations on a logstore. + pub static ref METRIC_LOGSTORE_OP_ELAPSED: HistogramVec = register_histogram_vec!( + "greptime_logstore_op_elapsed", + "logstore operation elapsed", + &[LOGSTORE_LABEL, OPTYPE_LABEL], + ) + .unwrap(); + /// Timer of the append_batch operation on the kafka logstore. + pub static ref METRIC_KAFKA_APPEND_BATCH_ELAPSED: Histogram = METRIC_LOGSTORE_OP_ELAPSED.with_label_values(&["kafka", "append_batch"]); + /// Timer of the append_batch operation on the kafka logstore. + /// This timer only measures the duration of the read operation, not measures the total duration of replay. + pub static ref METRIC_KAFKA_READ_ELAPSED: Histogram = METRIC_LOGSTORE_OP_ELAPSED.with_label_values(&["kafka", "read"]); + /// Timer of the append_batch operation on the raft-engine logstore. + pub static ref METRIC_RAFT_ENGINE_APPEND_BATCH_ELAPSED: Histogram = METRIC_LOGSTORE_OP_ELAPSED.with_label_values(&["raft-engine", "append_batch"]); + /// Timer of the append_batch operation on the raft-engine logstore. + /// This timer only measures the duration of the read operation, not measures the total duration of replay. + pub static ref METRIC_RAFT_ENGINE_READ_ELAPSED: Histogram = METRIC_LOGSTORE_OP_ELAPSED.with_label_values(&["raft-engine", "read"]); +} diff --git a/src/log-store/src/noop.rs b/src/log-store/src/noop.rs index 1ffada095e80..48668c39ec04 100644 --- a/src/log-store/src/noop.rs +++ b/src/log-store/src/noop.rs @@ -50,6 +50,10 @@ impl Entry for EntryImpl { fn namespace(&self) -> Self::Namespace { Default::default() } + + fn estimated_size(&self) -> usize { + 0 + } } #[async_trait::async_trait] diff --git a/src/log-store/src/raft_engine.rs b/src/log-store/src/raft_engine.rs index fe761388f6dd..49082acab041 100644 --- a/src/log-store/src/raft_engine.rs +++ b/src/log-store/src/raft_engine.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::hash::{Hash, Hasher}; +use std::mem::size_of; use store_api::logstore::entry::{Entry, Id as EntryId}; use store_api::logstore::namespace::{Id as NamespaceId, Namespace}; @@ -83,4 +84,25 @@ impl Entry for EntryImpl { ..Default::default() } } + + fn estimated_size(&self) -> usize { + size_of::() + self.data.capacity() * size_of::() + } +} + +#[cfg(test)] +mod tests { + use std::mem::size_of; + + use store_api::logstore::entry::Entry; + + use crate::raft_engine::protos::logstore::EntryImpl; + + #[test] + fn test_estimated_size() { + let entry = EntryImpl::create(1, 1, Vec::with_capacity(100)); + let expected = size_of::() + 100; + let got = entry.estimated_size(); + assert_eq!(expected, got); + } } diff --git a/src/log-store/src/raft_engine/log_store.rs b/src/log-store/src/raft_engine/log_store.rs index 13a32e6fe7af..f5fab5f97eed 100644 --- a/src/log-store/src/raft_engine/log_store.rs +++ b/src/log-store/src/raft_engine/log_store.rs @@ -25,7 +25,7 @@ use common_wal::config::raft_engine::RaftEngineConfig; use common_wal::options::WalOptions; use raft_engine::{Config, Engine, LogBatch, MessageExt, ReadableSize, RecoveryMode}; use snafu::{ensure, ResultExt}; -use store_api::logstore::entry::Id as EntryId; +use store_api::logstore::entry::{Entry as EntryTrait, Id as EntryId}; use store_api::logstore::entry_stream::SendableEntryStream; use store_api::logstore::namespace::{Id as NamespaceId, Namespace as NamespaceTrait}; use store_api::logstore::{AppendBatchResponse, AppendResponse, LogStore}; @@ -35,6 +35,7 @@ use crate::error::{ IllegalNamespaceSnafu, IllegalStateSnafu, OverrideCompactedEntrySnafu, RaftEngineSnafu, Result, StartGcTaskSnafu, StopGcTaskSnafu, }; +use crate::metrics; use crate::raft_engine::backend::SYSTEM_NAMESPACE; use crate::raft_engine::protos::logstore::{EntryImpl, NamespaceImpl as Namespace}; @@ -248,6 +249,15 @@ impl LogStore for RaftEngineLogStore { /// Appends a batch of entries to logstore. `RaftEngineLogStore` assures the atomicity of /// batch append. async fn append_batch(&self, entries: Vec) -> Result { + metrics::METRIC_RAFT_ENGINE_APPEND_BATCH_CALLS_TOTAL.inc(); + metrics::METRIC_RAFT_ENGINE_APPEND_BATCH_BYTES_TOTAL.inc_by( + entries + .iter() + .map(EntryTrait::estimated_size) + .sum::() as u64, + ); + let _timer = metrics::METRIC_RAFT_ENGINE_APPEND_BATCH_ELAPSED.start_timer(); + ensure!(self.started(), IllegalStateSnafu); if entries.is_empty() { return Ok(AppendBatchResponse::default()); @@ -280,6 +290,9 @@ impl LogStore for RaftEngineLogStore { ns: &Self::Namespace, entry_id: EntryId, ) -> Result> { + metrics::METRIC_RAFT_ENGINE_READ_CALLS_TOTAL.inc(); + let _timer = metrics::METRIC_RAFT_ENGINE_READ_ELAPSED.start_timer(); + ensure!(self.started(), IllegalStateSnafu); let engine = self.engine.clone(); diff --git a/src/meta-srv/Cargo.toml b/src/meta-srv/Cargo.toml index 13e27b4efd75..137e45f57a6a 100644 --- a/src/meta-srv/Cargo.toml +++ b/src/meta-srv/Cargo.toml @@ -33,7 +33,7 @@ etcd-client.workspace = true futures.workspace = true h2 = "0.3" http-body = "0.4" -humantime = "2.1" +humantime.workspace = true humantime-serde.workspace = true itertools.workspace = true lazy_static.workspace = true diff --git a/src/query/Cargo.toml b/src/query/Cargo.toml index eb680b1a9bd8..78e435861470 100644 --- a/src/query/Cargo.toml +++ b/src/query/Cargo.toml @@ -40,7 +40,7 @@ datatypes.workspace = true futures = "0.3" futures-util.workspace = true greptime-proto.workspace = true -humantime = "2.1" +humantime.workspace = true lazy_static.workspace = true meter-core.workspace = true meter-macros.workspace = true diff --git a/src/store-api/src/logstore/entry.rs b/src/store-api/src/logstore/entry.rs index 1748ff5621be..671f55ac35a2 100644 --- a/src/store-api/src/logstore/entry.rs +++ b/src/store-api/src/logstore/entry.rs @@ -35,4 +35,7 @@ pub trait Entry: Send + Sync { /// Returns the namespace of the entry. fn namespace(&self) -> Self::Namespace; + + /// Computes the estimated size in bytes of the entry. + fn estimated_size(&self) -> usize; } diff --git a/src/store-api/src/logstore/entry_stream.rs b/src/store-api/src/logstore/entry_stream.rs index 89a8bd192e42..23d131e451aa 100644 --- a/src/store-api/src/logstore/entry_stream.rs +++ b/src/store-api/src/logstore/entry_stream.rs @@ -31,6 +31,7 @@ pub type SendableEntryStream<'a, I, E> = Pin #[cfg(test)] mod tests { use std::any::Any; + use std::mem::size_of; use std::task::{Context, Poll}; use common_error::ext::StackError; @@ -87,6 +88,10 @@ mod tests { fn namespace(&self) -> Self::Namespace { Namespace {} } + + fn estimated_size(&self) -> usize { + self.data.capacity() * size_of::() + } } impl SimpleEntry { diff --git a/src/table/Cargo.toml b/src/table/Cargo.toml index f3b8530441c7..b285be08b907 100644 --- a/src/table/Cargo.toml +++ b/src/table/Cargo.toml @@ -29,7 +29,7 @@ datafusion-physical-expr.workspace = true datatypes.workspace = true derive_builder.workspace = true futures.workspace = true -humantime = "2.1" +humantime.workspace = true humantime-serde.workspace = true paste = "1.0" serde.workspace = true diff --git a/tests-fuzz/Cargo.toml b/tests-fuzz/Cargo.toml index ce216de41970..abe17f5e978d 100644 --- a/tests-fuzz/Cargo.toml +++ b/tests-fuzz/Cargo.toml @@ -40,6 +40,7 @@ sqlx = { version = "0.6", features = [ ] } [dev-dependencies] +dotenv.workspace = true tokio = { workspace = true } [[bin]] diff --git a/tests-integration/Cargo.toml b/tests-integration/Cargo.toml index 1e549c6f9337..aa40cae92ac0 100644 --- a/tests-integration/Cargo.toml +++ b/tests-integration/Cargo.toml @@ -36,7 +36,7 @@ common-test-util.workspace = true common-wal.workspace = true datanode = { workspace = true } datatypes.workspace = true -dotenv = "0.15" +dotenv.workspace = true frontend = { workspace = true, features = ["testing"] } futures.workspace = true meta-client.workspace = true