Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: improve region manifest service #1268

Merged
merged 19 commits into from
Mar 31, 2023
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 9 additions & 1 deletion config/datanode.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,19 @@ type = "File"
data_dir = "/tmp/greptimedb/data/"

# Compaction options, see `standalone.example.toml`.
[compaction]
[storage.compaction]
max_inflight_tasks = 4
max_files_in_level0 = 8
max_purge_tasks = 32

# Storage manifest options
[storage.manifest]
# Region checkpoint actions margin.
# Create a checkpoint every <checkpoint_margin> actions.
checkpoint_margin = 10
# Region manifest logs and checkpoints gc execution duration
gc_duration = '30s'
killme2008 marked this conversation as resolved.
Show resolved Hide resolved

# Procedure storage options, see `standalone.example.toml`.
# [procedure.store]
# type = "File"
Expand Down
11 changes: 10 additions & 1 deletion config/standalone.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -99,14 +99,23 @@ type = "File"
data_dir = "/tmp/greptimedb/data/"

# Compaction options.
[compaction]
[storage.compaction]
# Max task number that can concurrently run.
max_inflight_tasks = 4
# Max files in level 0 to trigger compaction.
max_files_in_level0 = 8
# Max task number for SST purge task after compaction.
max_purge_tasks = 32

# Storage manifest options
[storage.manifest]
# Region checkpoint actions margin.
# Create a checkpoint every <checkpoint_margin> actions.
checkpoint_margin = 10
# Region manifest logs and checkpoints gc execution duration
gc_duration = '30s'


# Procedure storage options.
# Uncomment to enable.
# [procedure.store]
Expand Down
33 changes: 22 additions & 11 deletions src/cmd/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ impl TryFrom<StartCommand> for DatanodeOptions {
}

if let Some(data_dir) = cmd.data_dir {
opts.storage = ObjectStoreConfig::File(FileConfig { data_dir });
opts.storage.store = ObjectStoreConfig::File(FileConfig { data_dir });
}

if let Some(wal_dir) = cmd.wal_dir {
Expand All @@ -167,7 +167,7 @@ mod tests {
use std::time::Duration;

use common_test_util::temp_dir::create_named_temp_file;
use datanode::datanode::{CompactionConfig, ObjectStoreConfig};
use datanode::datanode::{CompactionConfig, ObjectStoreConfig, RegionManifestConfig};
use servers::Mode;

use super::*;
Expand Down Expand Up @@ -203,10 +203,14 @@ mod tests {
type = "File"
data_dir = "/tmp/greptimedb/data/"

[compaction]
max_inflight_tasks = 4
max_files_in_level0 = 8
[storage.compaction]
max_inflight_tasks = 3
max_files_in_level0 = 7
max_purge_tasks = 32

[storage.manifest]
checkpoint_margin = 9
gc_duration = '7s'
"#;
write!(file, "{}", toml_str).unwrap();

Expand Down Expand Up @@ -237,21 +241,28 @@ mod tests {
assert_eq!(3000, timeout_millis);
assert!(tcp_nodelay);

match options.storage {
ObjectStoreConfig::File(FileConfig { data_dir }) => {
assert_eq!("/tmp/greptimedb/data/".to_string(), data_dir)
match &options.storage.store {
ObjectStoreConfig::File(FileConfig { data_dir, .. }) => {
assert_eq!("/tmp/greptimedb/data/", data_dir)
}
ObjectStoreConfig::S3 { .. } => unreachable!(),
ObjectStoreConfig::Oss { .. } => unreachable!(),
};

assert_eq!(
CompactionConfig {
max_inflight_tasks: 4,
max_files_in_level0: 8,
max_inflight_tasks: 3,
max_files_in_level0: 7,
max_purge_tasks: 32,
},
options.compaction
options.storage.compaction,
);
assert_eq!(
RegionManifestConfig {
checkpoint_margin: Some(9),
gc_duration: Some(Duration::from_secs(7)),
},
options.storage.manifest,
);
}

Expand Down
7 changes: 3 additions & 4 deletions src/cmd/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use clap::Parser;
use common_base::Plugins;
use common_telemetry::info;
use datanode::datanode::{
CompactionConfig, Datanode, DatanodeOptions, ObjectStoreConfig, ProcedureConfig, WalConfig,
CompactionConfig, Datanode, DatanodeOptions, ProcedureConfig, StorageConfig, WalConfig,
};
use datanode::instance::InstanceRef;
use frontend::frontend::FrontendOptions;
Expand Down Expand Up @@ -82,7 +82,7 @@ pub struct StandaloneOptions {
pub prometheus_options: Option<PrometheusOptions>,
pub prom_options: Option<PromOptions>,
pub wal: WalConfig,
pub storage: ObjectStoreConfig,
pub storage: StorageConfig,
pub compaction: CompactionConfig,
killme2008 marked this conversation as resolved.
Show resolved Hide resolved
pub procedure: Option<ProcedureConfig>,
}
Expand All @@ -101,7 +101,7 @@ impl Default for StandaloneOptions {
prometheus_options: Some(PrometheusOptions::default()),
prom_options: Some(PromOptions::default()),
wal: WalConfig::default(),
storage: ObjectStoreConfig::default(),
storage: StorageConfig::default(),
compaction: CompactionConfig::default(),
procedure: None,
}
Expand Down Expand Up @@ -129,7 +129,6 @@ impl StandaloneOptions {
enable_memory_catalog: self.enable_memory_catalog,
wal: self.wal,
storage: self.storage,
compaction: self.compaction,
procedure: self.procedure,
..Default::default()
}
Expand Down
2 changes: 2 additions & 0 deletions src/common/runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@ edition.workspace = true
license.workspace = true

[dependencies]
async-trait.workspace = true
common-error = { path = "../error" }
common-telemetry = { path = "../telemetry" }
metrics = "0.20"
once_cell = "1.12"
paste.workspace = true
snafu.workspace = true
tokio.workspace = true
tokio-util.workspace = true

[dev-dependencies]
tokio-test = "0.4"
14 changes: 14 additions & 0 deletions src/common/runtime/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::any::Any;

use common_error::prelude::*;
use tokio::task::JoinError;

pub type Result<T> = std::result::Result<T, Error>;

Expand All @@ -26,6 +27,19 @@ pub enum Error {
source: std::io::Error,
backtrace: Backtrace,
},
#[snafu(display("Repeated task {} not started yet", name))]
IllegalState { name: String, backtrace: Backtrace },

#[snafu(display(
"Failed to wait for repeated task {} to stop, source: {}",
name,
source
))]
WaitGcTaskStop {
name: String,
source: JoinError,
backtrace: Backtrace,
},
}

impl ErrorExt for Error {
Expand Down
2 changes: 2 additions & 0 deletions src/common/runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
pub mod error;
mod global;
pub mod metric;
mod repeated_task;
pub mod runtime;

pub use global::{
Expand All @@ -23,4 +24,5 @@ pub use global::{
spawn_read, spawn_write, write_runtime,
};

pub use crate::repeated_task::{RepeatedTask, TaskFunction, TaskFunctionRef};
pub use crate::runtime::{Builder, JoinError, JoinHandle, Runtime};
Loading