Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: Divide flush and compaction job pool #4871

Merged
merged 4 commits into from
Oct 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions config/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@
| `wal.backoff_max` | String | `10s` | The maximum backoff delay.<br/>**It's only used when the provider is `kafka`**. |
| `wal.backoff_base` | Integer | `2` | The exponential backoff rate, i.e. next backoff = base * current backoff.<br/>**It's only used when the provider is `kafka`**. |
| `wal.backoff_deadline` | String | `5mins` | The deadline of retries.<br/>**It's only used when the provider is `kafka`**. |
| `wal.overwrite_entry_start_id` | Bool | `false` | Ignore missing entries during read WAL.<br/>**It's only used when the provider is `kafka`**.<br/><br/>This option ensures that when Kafka messages are deleted, the system <br/>can still successfully replay memtable data without throwing an <br/>out-of-range error. <br/>However, enabling this option might lead to unexpected data loss, <br/>as the system will skip over missing entries instead of treating <br/>them as critical errors. |
| `wal.overwrite_entry_start_id` | Bool | `false` | Ignore missing entries during read WAL.<br/>**It's only used when the provider is `kafka`**.<br/><br/>This option ensures that when Kafka messages are deleted, the system<br/>can still successfully replay memtable data without throwing an<br/>out-of-range error.<br/>However, enabling this option might lead to unexpected data loss,<br/>as the system will skip over missing entries instead of treating<br/>them as critical errors. |
evenyag marked this conversation as resolved.
Show resolved Hide resolved
| `metadata_store` | -- | -- | Metadata storage options. |
| `metadata_store.file_size` | String | `256MB` | Kv file size in bytes. |
| `metadata_store.purge_threshold` | String | `4GB` | Kv purge threshold. |
Expand Down Expand Up @@ -116,7 +116,9 @@
| `region_engine.mito.worker_request_batch_size` | Integer | `64` | Max batch size for a worker to handle requests. |
| `region_engine.mito.manifest_checkpoint_distance` | Integer | `10` | Number of meta action updated to trigger a new checkpoint for the manifest. |
| `region_engine.mito.compress_manifest` | Bool | `false` | Whether to compress manifest and checkpoint file by gzip (default false). |
| `region_engine.mito.max_background_jobs` | Integer | `4` | Max number of running background jobs |
| `region_engine.mito.max_background_flushes` | Integer | Auto | Max number of running background flush jobs (default: 1/2 of cpu cores). |
| `region_engine.mito.max_background_compactions` | Integer | Auto | Max number of running background compaction jobs (default: 1/4 of cpu cores). |
| `region_engine.mito.max_background_purges` | Integer | Auto | Max number of running background purge jobs (default: number of cpu cores). |
| `region_engine.mito.auto_flush_interval` | String | `1h` | Interval to auto flush a region if it has not flushed yet. |
| `region_engine.mito.global_write_buffer_size` | String | Auto | Global write buffer size for all regions. If not set, it's default to 1/8 of OS memory with a max limitation of 1GB. |
| `region_engine.mito.global_write_buffer_reject_size` | String | Auto | Global write buffer size threshold to reject write requests. If not set, it's default to 2 times of `global_write_buffer_size`. |
Expand Down Expand Up @@ -410,7 +412,7 @@
| `wal.backoff_deadline` | String | `5mins` | The deadline of retries.<br/>**It's only used when the provider is `kafka`**. |
| `wal.create_index` | Bool | `true` | Whether to enable WAL index creation.<br/>**It's only used when the provider is `kafka`**. |
| `wal.dump_index_interval` | String | `60s` | The interval for dumping WAL indexes.<br/>**It's only used when the provider is `kafka`**. |
| `wal.overwrite_entry_start_id` | Bool | `false` | Ignore missing entries during read WAL.<br/>**It's only used when the provider is `kafka`**.<br/><br/>This option ensures that when Kafka messages are deleted, the system <br/>can still successfully replay memtable data without throwing an <br/>out-of-range error. <br/>However, enabling this option might lead to unexpected data loss, <br/>as the system will skip over missing entries instead of treating <br/>them as critical errors. |
| `wal.overwrite_entry_start_id` | Bool | `false` | Ignore missing entries during read WAL.<br/>**It's only used when the provider is `kafka`**.<br/><br/>This option ensures that when Kafka messages are deleted, the system<br/>can still successfully replay memtable data without throwing an<br/>out-of-range error.<br/>However, enabling this option might lead to unexpected data loss,<br/>as the system will skip over missing entries instead of treating<br/>them as critical errors. |
| `storage` | -- | -- | The data storage options. |
| `storage.data_home` | String | `/tmp/greptimedb/` | The working home directory. |
| `storage.type` | String | `File` | The storage type used to store the data.<br/>- `File`: the data is stored in the local file system.<br/>- `S3`: the data is stored in the S3 object storage.<br/>- `Gcs`: the data is stored in the Google Cloud Storage.<br/>- `Azblob`: the data is stored in the Azure Blob Storage.<br/>- `Oss`: the data is stored in the Aliyun OSS. |
Expand All @@ -437,7 +439,9 @@
| `region_engine.mito.worker_request_batch_size` | Integer | `64` | Max batch size for a worker to handle requests. |
| `region_engine.mito.manifest_checkpoint_distance` | Integer | `10` | Number of meta action updated to trigger a new checkpoint for the manifest. |
| `region_engine.mito.compress_manifest` | Bool | `false` | Whether to compress manifest and checkpoint file by gzip (default false). |
| `region_engine.mito.max_background_jobs` | Integer | `4` | Max number of running background jobs |
| `region_engine.mito.max_background_flushes` | Integer | Auto | Max number of running background flush jobs (default: 1/2 of cpu cores). |
| `region_engine.mito.max_background_compactions` | Integer | Auto | Max number of running background compaction jobs (default: 1/4 of cpu cores). |
| `region_engine.mito.max_background_purges` | Integer | Auto | Max number of running background purge jobs (default: number of cpu cores). |
| `region_engine.mito.auto_flush_interval` | String | `1h` | Interval to auto flush a region if it has not flushed yet. |
| `region_engine.mito.global_write_buffer_size` | String | Auto | Global write buffer size for all regions. If not set, it's default to 1/8 of OS memory with a max limitation of 1GB. |
| `region_engine.mito.global_write_buffer_reject_size` | String | Auto | Global write buffer size threshold to reject write requests. If not set, it's default to 2 times of `global_write_buffer_size` |
Expand Down
25 changes: 17 additions & 8 deletions config/datanode.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -215,12 +215,12 @@ dump_index_interval = "60s"

## Ignore missing entries during read WAL.
## **It's only used when the provider is `kafka`**.
##
## This option ensures that when Kafka messages are deleted, the system
## can still successfully replay memtable data without throwing an
## out-of-range error.
## However, enabling this option might lead to unexpected data loss,
## as the system will skip over missing entries instead of treating
##
## This option ensures that when Kafka messages are deleted, the system
## can still successfully replay memtable data without throwing an
## out-of-range error.
## However, enabling this option might lead to unexpected data loss,
## as the system will skip over missing entries instead of treating
## them as critical errors.
overwrite_entry_start_id = false

Expand Down Expand Up @@ -416,8 +416,17 @@ manifest_checkpoint_distance = 10
## Whether to compress manifest and checkpoint file by gzip (default false).
compress_manifest = false

## Max number of running background jobs
max_background_jobs = 4
## Max number of running background flush jobs (default: 1/2 of cpu cores).
## @toml2docs:none-default="Auto"
#+ max_background_flushes = 4

## Max number of running background compaction jobs (default: 1/4 of cpu cores).
## @toml2docs:none-default="Auto"
#+ max_background_compactions = 2

## Max number of running background purge jobs (default: number of cpu cores).
## @toml2docs:none-default="Auto"
#+ max_background_purges = 8

## Interval to auto flush a region if it has not flushed yet.
auto_flush_interval = "1h"
Expand Down
25 changes: 17 additions & 8 deletions config/standalone.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -239,12 +239,12 @@ backoff_deadline = "5mins"

## Ignore missing entries during read WAL.
## **It's only used when the provider is `kafka`**.
##
## This option ensures that when Kafka messages are deleted, the system
## can still successfully replay memtable data without throwing an
## out-of-range error.
## However, enabling this option might lead to unexpected data loss,
## as the system will skip over missing entries instead of treating
##
## This option ensures that when Kafka messages are deleted, the system
## can still successfully replay memtable data without throwing an
## out-of-range error.
## However, enabling this option might lead to unexpected data loss,
## as the system will skip over missing entries instead of treating
## them as critical errors.
overwrite_entry_start_id = false

Expand Down Expand Up @@ -454,8 +454,17 @@ manifest_checkpoint_distance = 10
## Whether to compress manifest and checkpoint file by gzip (default false).
compress_manifest = false

## Max number of running background jobs
max_background_jobs = 4
## Max number of running background flush jobs (default: 1/2 of cpu cores).
## @toml2docs:none-default="Auto"
#+ max_background_flushes = 4

## Max number of running background compaction jobs (default: 1/4 of cpu cores).
## @toml2docs:none-default="Auto"
#+ max_background_compactions = 2

## Max number of running background purge jobs (default: number of cpu cores).
## @toml2docs:none-default="Auto"
#+ max_background_purges = 8

## Interval to auto flush a region if it has not flushed yet.
auto_flush_interval = "1h"
Expand Down
2 changes: 1 addition & 1 deletion src/mito2/src/compaction/compactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ pub async fn open_compaction_region(
));

let file_purger = {
let purge_scheduler = Arc::new(LocalScheduler::new(mito_config.max_background_jobs));
let purge_scheduler = Arc::new(LocalScheduler::new(mito_config.max_background_purges));
Arc::new(LocalFilePurger::new(
purge_scheduler.clone(),
access_layer.clone(),
Expand Down
38 changes: 29 additions & 9 deletions src/mito2/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,6 @@ use crate::error::Result;
use crate::memtable::MemtableConfig;
use crate::sst::DEFAULT_WRITE_BUFFER_SIZE;

/// Default max running background job.
const DEFAULT_MAX_BG_JOB: usize = 4;

const MULTIPART_UPLOAD_MINIMUM_SIZE: ReadableSize = ReadableSize::mb(5);
/// Default channel size for parallel scan task.
const DEFAULT_SCAN_CHANNEL_SIZE: usize = 32;
Expand Down Expand Up @@ -69,8 +66,12 @@ pub struct MitoConfig {
pub compress_manifest: bool,

// Background job configs:
/// Max number of running background jobs (default 4).
pub max_background_jobs: usize,
/// Max number of running background flush jobs (default: 1/2 of cpu cores).
pub max_background_flushes: usize,
/// Max number of running background compaction jobs (default: 1/4 of cpu cores).
pub max_background_compactions: usize,
/// Max number of running background purge jobs (default: number of cpu cores).
pub max_background_purges: usize,

// Flush configs:
/// Interval to auto flush a region if it has not flushed yet (default 30 min).
Expand Down Expand Up @@ -137,7 +138,9 @@ impl Default for MitoConfig {
worker_request_batch_size: 64,
manifest_checkpoint_distance: 10,
compress_manifest: false,
max_background_jobs: DEFAULT_MAX_BG_JOB,
max_background_flushes: divide_num_cpus(2),
max_background_compactions: divide_num_cpus(4),
max_background_purges: common_config::utils::get_cpus(),
auto_flush_interval: Duration::from_secs(30 * 60),
global_write_buffer_size: ReadableSize::gb(1),
global_write_buffer_reject_size: ReadableSize::gb(2),
Expand Down Expand Up @@ -185,9 +188,26 @@ impl MitoConfig {
self.worker_channel_size = 1;
}

if self.max_background_jobs == 0 {
warn!("Sanitize max background jobs 0 to {}", DEFAULT_MAX_BG_JOB);
self.max_background_jobs = DEFAULT_MAX_BG_JOB;
if self.max_background_flushes == 0 {
warn!(
"Sanitize max background flushes 0 to {}",
divide_num_cpus(2)
);
self.max_background_flushes = divide_num_cpus(2);
}
if self.max_background_compactions == 0 {
warn!(
"Sanitize max background compactions 0 to {}",
divide_num_cpus(4)
);
self.max_background_compactions = divide_num_cpus(4);
}
if self.max_background_purges == 0 {
warn!(
"Sanitize max background purges 0 to {}",
common_config::utils::get_cpus()
);
self.max_background_purges = common_config::utils::get_cpus();
}

if self.global_write_buffer_reject_size <= self.global_write_buffer_size {
Expand Down
4 changes: 2 additions & 2 deletions src/mito2/src/engine/compaction_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ async fn test_readonly_during_compaction() {
.create_engine_with(
MitoConfig {
// Ensure there is only one background worker for purge task.
max_background_jobs: 1,
max_background_purges: 1,
..Default::default()
},
None,
Expand Down Expand Up @@ -310,7 +310,7 @@ async fn test_readonly_during_compaction() {
listener.wake();

let notify = Arc::new(Notify::new());
// We already sets max background jobs to 1, so we can submit a task to the
// We already sets max background purges to 1, so we can submit a task to the
// purge scheduler to ensure all purge tasks are finished.
let job_notify = notify.clone();
engine
Expand Down
41 changes: 26 additions & 15 deletions src/mito2/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,10 @@ pub(crate) const MAX_INITIAL_CHECK_DELAY_SECS: u64 = 60 * 3;
pub(crate) struct WorkerGroup {
/// Workers of the group.
workers: Vec<RegionWorker>,
/// Global background job scheduelr.
scheduler: SchedulerRef,
/// Flush background job pool.
flush_job_pool: SchedulerRef,
/// Compaction background job pool.
compact_job_pool: SchedulerRef,
/// Scheduler for file purgers.
purge_scheduler: SchedulerRef,
/// Cache.
Expand Down Expand Up @@ -146,10 +148,10 @@ impl WorkerGroup {
let intermediate_manager = IntermediateManager::init_fs(&config.index.aux_path)
.await?
.with_buffer_size(Some(config.index.write_buffer_size.as_bytes() as _));
let scheduler = Arc::new(LocalScheduler::new(config.max_background_jobs));
let flush_job_pool = Arc::new(LocalScheduler::new(config.max_background_flushes));
let compact_job_pool = Arc::new(LocalScheduler::new(config.max_background_compactions));
// We use another scheduler to avoid purge jobs blocking other jobs.
// A purge job is cheaper than other background jobs so they share the same job limit.
let purge_scheduler = Arc::new(LocalScheduler::new(config.max_background_jobs));
let purge_scheduler = Arc::new(LocalScheduler::new(config.max_background_purges));
let write_cache = write_cache_from_config(
&config,
object_store_manager.clone(),
Expand Down Expand Up @@ -178,7 +180,8 @@ impl WorkerGroup {
log_store: log_store.clone(),
object_store_manager: object_store_manager.clone(),
write_buffer_manager: write_buffer_manager.clone(),
scheduler: scheduler.clone(),
flush_job_pool: flush_job_pool.clone(),
compact_job_pool: compact_job_pool.clone(),
purge_scheduler: purge_scheduler.clone(),
listener: WorkerListener::default(),
cache_manager: cache_manager.clone(),
Expand All @@ -195,7 +198,8 @@ impl WorkerGroup {

Ok(WorkerGroup {
workers,
scheduler,
flush_job_pool,
compact_job_pool,
purge_scheduler,
cache_manager,
})
Expand All @@ -205,8 +209,11 @@ impl WorkerGroup {
pub(crate) async fn stop(&self) -> Result<()> {
info!("Stop region worker group");

// TODO(yingwen): Do we need to stop gracefully?
// Stops the scheduler gracefully.
self.scheduler.stop(true).await?;
self.compact_job_pool.stop(true).await?;
// Stops the scheduler gracefully.
self.flush_job_pool.stop(true).await?;
// Stops the purge scheduler gracefully.
self.purge_scheduler.stop(true).await?;

Expand Down Expand Up @@ -275,8 +282,9 @@ impl WorkerGroup {
.with_notifier(flush_sender.clone()),
)
});
let scheduler = Arc::new(LocalScheduler::new(config.max_background_jobs));
let purge_scheduler = Arc::new(LocalScheduler::new(config.max_background_jobs));
let flush_job_pool = Arc::new(LocalScheduler::new(config.max_background_flushes));
let compact_job_pool = Arc::new(LocalScheduler::new(config.max_background_compactions));
let purge_scheduler = Arc::new(LocalScheduler::new(config.max_background_flushes));
let puffin_manager_factory = PuffinManagerFactory::new(
&config.index.aux_path,
config.index.staging_size.as_bytes(),
Expand Down Expand Up @@ -310,7 +318,8 @@ impl WorkerGroup {
log_store: log_store.clone(),
object_store_manager: object_store_manager.clone(),
write_buffer_manager: write_buffer_manager.clone(),
scheduler: scheduler.clone(),
flush_job_pool: flush_job_pool.clone(),
compact_job_pool: compact_job_pool.clone(),
purge_scheduler: purge_scheduler.clone(),
listener: WorkerListener::new(listener.clone()),
cache_manager: cache_manager.clone(),
Expand All @@ -327,7 +336,8 @@ impl WorkerGroup {

Ok(WorkerGroup {
workers,
scheduler,
flush_job_pool,
compact_job_pool,
purge_scheduler,
cache_manager,
})
Expand Down Expand Up @@ -382,7 +392,8 @@ struct WorkerStarter<S> {
log_store: Arc<S>,
object_store_manager: ObjectStoreManagerRef,
write_buffer_manager: WriteBufferManagerRef,
scheduler: SchedulerRef,
compact_job_pool: SchedulerRef,
flush_job_pool: SchedulerRef,
purge_scheduler: SchedulerRef,
listener: WorkerListener,
cache_manager: CacheManagerRef,
Expand Down Expand Up @@ -423,9 +434,9 @@ impl<S: LogStore> WorkerStarter<S> {
),
purge_scheduler: self.purge_scheduler.clone(),
write_buffer_manager: self.write_buffer_manager,
flush_scheduler: FlushScheduler::new(self.scheduler.clone()),
flush_scheduler: FlushScheduler::new(self.flush_job_pool),
compaction_scheduler: CompactionScheduler::new(
self.scheduler,
self.compact_job_pool,
sender.clone(),
self.cache_manager.clone(),
self.config,
Expand Down
4 changes: 3 additions & 1 deletion tests-integration/tests/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -866,7 +866,6 @@ worker_channel_size = 128
worker_request_batch_size = 64
manifest_checkpoint_distance = 10
compress_manifest = false
max_background_jobs = 4
auto_flush_interval = "30m"
enable_experimental_write_cache = false
experimental_write_cache_path = ""
Expand Down Expand Up @@ -939,6 +938,9 @@ fn drop_lines_with_inconsistent_results(input: String) -> String {
"content_cache_size =",
"name =",
"recovery_parallelism =",
"max_background_flushes =",
"max_background_compactions =",
"max_background_purges =",
];

input
Expand Down