Skip to content

Commit

Permalink
test
Browse files Browse the repository at this point in the history
  • Loading branch information
ActivePeter committed Oct 23, 2024
1 parent 1f4ba19 commit 3e8ddf7
Show file tree
Hide file tree
Showing 14 changed files with 627 additions and 426 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 2 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@ chrono = { version = "0.4", features = ["serde"] }
clap = { version = "4.4", features = ["derive"] }
config = "0.13.0"
crossbeam-utils = "0.8"
ratelimit = "0.9"
dashmap = "5.4"
datafusion = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "7823ef2f63663907edab46af0d51359900f608d6" }
datafusion-common = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "7823ef2f63663907edab46af0d51359900f608d6" }
Expand Down Expand Up @@ -141,6 +140,7 @@ opentelemetry-proto = { version = "0.5", features = [
"with-serde",
"logs",
] }
parking_lot = "0.12"
parquet = { version = "51.0.0", default-features = false, features = ["arrow", "async", "object_store"] }
paste = "1.0"
pin-project = "1.0"
Expand All @@ -149,6 +149,7 @@ promql-parser = { version = "0.4.1" }
prost = "0.12"
raft-engine = { version = "0.4.1", default-features = false }
rand = "0.8"
ratelimit = "0.9"
regex = "1.8"
regex-automata = { version = "0.4" }
reqwest = { version = "0.12", default-features = false, features = [
Expand All @@ -157,7 +158,6 @@ reqwest = { version = "0.12", default-features = false, features = [
"stream",
"multipart",
] }
parking_lot = "0.12"
rskafka = { git = "https://github.com/influxdata/rskafka.git", rev = "75535b5ad9bae4a5dbb582c82e44dfd81ec10105", features = [
"transport-tls",
] }
Expand Down Expand Up @@ -250,7 +250,6 @@ store-api = { path = "src/store-api" }
substrait = { path = "src/common/substrait" }
table = { path = "src/table" }


[patch.crates-io]
# change all rustls dependencies to use our fork to default to `ring` to make it "just work"
hyper-rustls = { git = "https://github.com/GreptimeTeam/hyper-rustls" }
Expand Down
3 changes: 0 additions & 3 deletions src/common/runtime/.gitignore

This file was deleted.

21 changes: 15 additions & 6 deletions src/common/runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,31 +4,40 @@ version.workspace = true
edition.workspace = true
license.workspace = true

[lib]
path = "src/lib.rs"

[[bin]]
name = "common-runtime-bin"
path = "src/bin.rs"

[lints]
workspace = true

[dependencies]
async-trait.workspace = true
clap.workspace = true
common-error.workspace = true
common-macro.workspace = true
common-telemetry.workspace = true
futures.workspace = true
lazy_static.workspace = true
num_cpus.workspace = true
once_cell.workspace = true
parking_lot.workspace = true
paste.workspace = true
pin-project.workspace = true
prometheus.workspace = true
rand.workspace = true
ratelimit.workspace = true
serde.workspace = true
serde_json.workspace = true
snafu.workspace = true
tempfile.workspace = true
tokio.workspace = true
tokio-metrics = "0.3"
tokio-metrics-collector = { git = "https://github.com/MichaelScofield/tokio-metrics-collector.git", rev = "89d692d5753d28564a7aac73c6ac5aba22243ba0" }
tokio-util.workspace = true
serde_json.workspace = true
parking_lot.workspace = true
ratelimit.workspace = true
rand.workspace = true
pin-project.workspace = true
futures.workspace = true

[dev-dependencies]
tokio-test = "0.4"
61 changes: 58 additions & 3 deletions src/common/runtime/README
Original file line number Diff line number Diff line change
@@ -1,5 +1,60 @@
# Run performance test for different priority & workload type
# Greptime Runtime

## Run performance test for different priority & workload type

```
# workspace is at this subcrate
cargo run --release -- --loop-cnt 500
```
cargo test --package common-runtime --lib -- test_metrics::test_all_cpu_usage --nocapture
```

## Related PRs & ISSUEs

- Preliminary support cpu limitation

ISSUE: https://github.com/GreptimeTeam/greptimedb/issues/3685

PR: https://github.com/GreptimeTeam/greptimedb/pull/4782

## CPU resource constraints (ThrottleableRuntime)


To achieve CPU resource constraints, we adopt the concept of rate limiting. When creating a future, we first wrap it with another layer of future to intercept the poll operation during runtime. By using the ratelimit library, we can simply implement a mechanism that allows only a limited number of polls for a batch of tasks under a certain priority within a specific time frame (the current token generation interval is set to 10ms).

The default used runtime can be switched by
``` rust
pub type Runtime = DefaultRuntime;
```
in `runtime.rs`.

We tested four type of workload with 5 priorities, whose setup are as follows:

``` rust
impl Priority {
fn ratelimiter_count(&self) -> Result<Option<Ratelimiter>> {
let max = 8000;
let gen_per_10ms = match self {
Priority::VeryLow => Some(2000),
Priority::Low => Some(4000),
Priority::Middle => Some(6000),
Priority::High => Some(8000),
Priority::VeryHigh => None,
};
if let Some(gen_per_10ms) = gen_per_10ms {
Ratelimiter::builder(gen_per_10ms, Duration::from_millis(10)) // generate poll count per 10ms
.max_tokens(max) // reserved token for batch request
.build()
.context(BuildRuntimeRateLimiterSnafu)
.map(Some)
} else {
Ok(None)
}
}
}
```

This is the preliminary experimental effect so far:

![](resources/rdme-exp.png)

## TODO
- Introduce PID to achieve more accurate limitation.
Binary file added src/common/runtime/resources/rdme-exp.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
206 changes: 206 additions & 0 deletions src/common/runtime/src/bin.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use clap::Parser;

#[derive(Debug, Default, Parser)]
pub struct Command {
#[clap(long)]
loop_cnt: usize,
}

fn main() {
common_telemetry::init_default_ut_logging();
let cmd = Command::parse();

test_diff_priority_cpu::test_diff_workload_priority(cmd.loop_cnt);
}

mod test_diff_priority_cpu {
use std::path::PathBuf;

use common_runtime::runtime::{BuilderBuild, Priority, RuntimeTrait};
use common_runtime::{Builder, Runtime};
use common_telemetry::debug;
use tempfile::TempDir;

fn compute_pi_str(precision: usize) -> String {
let mut pi = 0.0;
let mut sign = 1.0;

for i in 0..precision {
pi += sign / (2 * i + 1) as f64;
sign *= -1.0;
}

pi *= 4.0;
format!("{:.prec$}", pi, prec = precision)
}

macro_rules! def_workload_enum {
($($variant:ident),+) => {
#[derive(Debug)]
enum WorkloadType {
$($variant),+
}

/// array of workloads for iteration
const WORKLOADS: &'static [WorkloadType] = &[
$( WorkloadType::$variant ),+
];
};
}

def_workload_enum!(
ComputeHeavily,
ComputeHeavily2,
WriteFile,
SpawnBlockingWriteFile
);

async fn workload_compute_heavily() {
let prefix = 10;

for _ in 0..3000 {
let _ = compute_pi_str(prefix);
tokio::task::yield_now().await;
}
}
async fn workload_compute_heavily2() {
let prefix = 30;
for _ in 0..2000 {
let _ = compute_pi_str(prefix);
tokio::task::yield_now().await;
std::thread::yield_now();
}
}
async fn workload_write_file(_idx: u64, tempdir: PathBuf) {
use tokio::io::AsyncWriteExt;
let prefix = 50;

let mut file = tokio::fs::OpenOptions::new()
.write(true)
.append(true)
.create(true)
.open(tempdir.join(format!("pi_{}", prefix)))
.await
.unwrap();
for i in 0..200 {
let pi = compute_pi_str(prefix);

if i % 2 == 0 {
file.write_all(pi.as_bytes()).await.unwrap();
}
}
}
async fn workload_spawn_blocking_write_file(tempdir: PathBuf) {
use std::io::Write;
let prefix = 100;
let mut file = Some(
std::fs::OpenOptions::new()
.append(true)
.create(true)
.open(tempdir.join(format!("pi_{}", prefix)))
.unwrap(),
);
for i in 0..100 {
let pi = compute_pi_str(prefix);
if i % 2 == 0 {
let mut file1 = file.take().unwrap();
file = Some(
tokio::task::spawn_blocking(move || {
file1.write_all(pi.as_bytes()).unwrap();
file1
})
.await
.unwrap(),
);
}
}
}

pub fn test_diff_workload_priority(loop_cnt: usize) {
let tempdir = tempfile::tempdir().unwrap();
let priorities = [
Priority::VeryLow,
Priority::Low,
Priority::Middle,
Priority::High,
Priority::VeryHigh,
];
for wl in WORKLOADS {
for p in priorities.iter() {
let runtime: Runtime = Builder::default()
.runtime_name("test")
.thread_name("test")
.worker_threads(8)
.priority(*p)
.build()
.expect("Fail to create runtime");
let runtime2 = runtime.clone();
runtime.block_on(test_spec_priority_and_workload(
*p, runtime2, wl, &tempdir, loop_cnt,
));
}
}
}

async fn test_spec_priority_and_workload(
priority: Priority,
runtime: Runtime,
workload_id: &WorkloadType,
tempdir: &TempDir,
loop_cnt: usize,
) {
tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await;
debug!(
"testing cpu usage for priority {:?} workload_id {:?}",
priority, workload_id,
);
// start monitor thread
let mut tasks = vec![];
let start = std::time::Instant::now();
for i in 0..loop_cnt {
// persist cpu usage in json: {priority}.{workload_id}
match *workload_id {
WorkloadType::ComputeHeavily => {
tasks.push(runtime.spawn(workload_compute_heavily()));
}
WorkloadType::ComputeHeavily2 => {
tasks.push(runtime.spawn(workload_compute_heavily2()));
}
WorkloadType::SpawnBlockingWriteFile => {
tasks.push(runtime.spawn(workload_spawn_blocking_write_file(
tempdir.path().to_path_buf(),
)));
}
WorkloadType::WriteFile => {
tasks.push(
runtime.spawn(workload_write_file(i as u64, tempdir.path().to_path_buf())),
);
}
}
}
for task in tasks {
task.await.unwrap();
}
let elapsed = start.elapsed();
debug!(
"test cpu usage for priority {:?} workload_id {:?} elapsed {}ms",
priority,
workload_id,
elapsed.as_millis()
);
}
}
8 changes: 8 additions & 0 deletions src/common/runtime/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,14 @@ pub enum Error {
location: Location,
},

#[snafu(display("Failed to build runtime rate limiter"))]
BuildRuntimeRateLimiter {
#[snafu(implicit)]
location: Location,
#[snafu(source)]
error: ratelimit::Error,
},

#[snafu(display("Repeated task {} is already started", name))]
IllegalState {
name: String,
Expand Down
Loading

0 comments on commit 3e8ddf7

Please sign in to comment.