Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

queue: support priority queue #72

Merged
merged 10 commits into from
Dec 29, 2022
Merged

queue: support priority queue #72

merged 10 commits into from
Dec 29, 2022

Conversation

glorv
Copy link
Contributor

@glorv glorv commented Dec 2, 2022

this PR introduces a new type of queue: priority queue. The priority queue sorts task by their priority value that can allow inserting tasks in the front or middle, so we can support a more flexible schedule policy.

Why Priority

The default multi-level queue is suitable for most FIFO scenarios. But in cases that need more fine-grained control of the task schedule order, FIFO is not good enough. For example, in a shared environment for multiple users, the scheduler should schedule tasks from different users according to their share. In this case, a priority-based scheduler based on the total execution time by the user may be a workable solution.

Drawbacks

The priority queue is not perfect for all cases. The scheduling overhead is a little bigger than the multi-level queue. And there is no default implementation that is suitable for common cases. The schedule also doesn't implement the multi-level strategy that automatically degrades slow tasks.

Implement detail

The priority queue is backed by crossbeam-skiplist, the key is the priority value and the value is the task. The implementation attaches the key with an extra auto-incremented sequence number to ensure the key never duplicate with each other.

The user should provide an implementation of the trait TaskPriorityPriovider to generate the priority value of each task.

Benchmark Result

I test the priority queue performance by comparing the overall performance of tidb cluster with sysbench oltp_read_only and select_random_ranges workload.
In this brief benchmark, the priority scheduler implements the mclock algorithm to support fair scheduling among multi resource groups. Becuase by default there is only one resource group, so it can be treat as a FIFO queue. The priority test code is: https://github.com/glorv/tikv/tree/qos-http and the nightly code is commit 2704588c6aaa1a269bb91499229e358d23cc636b.

The benchmark is running on 3 * 8core tikv and 6 * 8 core tidb so tikv should be the performance bottleneck.
Result:

图片
(the first is priority queue and second is yatp multi-level queue)

The benchmark shows that the overhead of priority-queue is bigger than multi-level queue. The performance drops about 5% in oltp_read_only and 1% in select_random_ranges.

Signed-off-by: glorv <glorvs@163.com>
@glorv
Copy link
Contributor Author

glorv commented Dec 2, 2022

@Connor1996 @sticnarf @BusyJay PTAL

Signed-off-by: glorv <glorvs@163.com>
@BusyJay
Copy link
Member

BusyJay commented Dec 6, 2022

Can you highlight the cases that priority queue solves better than multi level queue? And what's the drawbacks of priority queue?

@glorv
Copy link
Contributor Author

glorv commented Dec 7, 2022

Can you highlight the cases that priority queue solves better than multi level queue? And what's the drawbacks of priority queue?

@BusyJay I have changed the description, PTAL again, thanks

Signed-off-by: glorv <glorvs@163.com>
@glorv
Copy link
Contributor Author

glorv commented Dec 13, 2022

@Connor1996 @sticnarf PTAL, thanks

Cargo.toml Outdated
@@ -10,6 +10,7 @@ repository = "https://github.com/tikv/yatp/"

[dependencies]
crossbeam-deque = "0.8"
crossbeam-skiplist = { git = "https://github.com/crossbeam-rs/crossbeam" }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

crossbeam-skiplist 0.1 recently releases. We can use the crates.io version now.

}
}

/// A holder to store task. The slot can be concurrently visit by multiple thread in the skip-list.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I only see push and pop access the skiplist and they don't seem to visit a slot concurrently. Where does concurrent visit happen?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, in our case it's safe to take the value. But the pop() api only provide readonly access to the key and value. So here we wrap it in a atomic ptr to workaround this restriction. Do you have any idea to avoid this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see. The skiplist cannot guarantee that the entry is not borrowed by others when popped, so we only have read only access to the value.

Instead of an AtomicPtr wrapping a Box, do you think a crossbeam AtomicCell is simpler and cheaper? There will be no extra allocation. The future TaskCell is just a pointer so it should also be lock free.

Copy link
Contributor Author

@glorv glorv Dec 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems https://github.com/crossbeam-rs/crossbeam/blob/0de660ec88481ff7787374efee0c2e21d4d403b2/crossbeam-utils/src/atomic/atomic_cell.rs#L1020 AtomicCell is also not very cheap because it uses a global lock for large value (> 8 bytes).

Copy link
Contributor Author

@glorv glorv Dec 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. I found we can simply replace the AtomicPtr with RefCell<Option<T>>. Since only the thread pops the entry can visit the value, it's safe to impl Sync for the Slot type. @sticnarf PTAL again.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The future taskcell is only 8 bytes so atomic cell is cheap. For other types, yes, a Mutex is better than the implementation of AtomicCell.

While I don't think RefCell + unsafe impl Sync is a good combination.

If you care about safety, Mutex<Option<>> is your choice. And it's fast in practice when there is not any race (only need a very few atomic operations).
If you are confident enough, just use UnsafeCell. RefCell does not provide you with extra safety when you mark it Sync because its borrow checks will become unreliable when multiple threads access it concurrently.

Copy link
Contributor Author

@glorv glorv Dec 16, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Change to using AtomicCell. Though UnsafeCell is also an available choice, since this isn't a performance bottleneck, using AtomicCell can avoid unsafe code.

///
#[derive(Clone)]
struct PriorityTaskManager {
level_manager: Arc<TaskLevelManager>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With a priority queue, why is there still a level manager?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While using the priority queue, we still need to auto demote slow tasks. So the priority manager can use the level information to set different priority value. Or we have to implement similar mechanism to do so.

Signed-off-by: glorv <glorvs@163.com>
Signed-off-by: glorv <glorvs@163.com>
Signed-off-by: glorv <glorvs@163.com>
sticnarf
sticnarf previously approved these changes Dec 20, 2022
Copy link
Contributor

@sticnarf sticnarf left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.
@BusyJay Do you want to take another look?

@@ -290,6 +290,17 @@ impl Builder {
self.build_with_queue_and_runner(QueueType::Multilevel(queue_builder), runner_builder)
}

///
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment

@@ -28,6 +28,8 @@ pub struct Extras {
pub(crate) fixed_level: Option<u8>,
/// Number of execute times
pub(crate) exec_times: u32,
/// The task group id. Used in priority queue.
pub(crate) group_id: u64,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not group name?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. But I think it a bit weird here because the group_id/name is only fits in some specified cases(e.g. in tikv), but may not fit for other cases. I'm not sure if it is better to be defined as extra_data: Vec<u8>, so the user can put arbitary data in this field as they need.

Signed-off-by: glorv <glorvs@163.com>
}

#[inline]
fn gen_key(&self, weight: u64) -> MapKey {
Copy link

@nolouch nolouch Dec 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

may s/weight/prioriy/ more unify.

}

impl Config {
/// Sets the name of the multilevel task queue. Metrics of multilevel
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// Sets the name of the multilevel task queue. Metrics of multilevel
/// Sets the name of the priority task queue.

}

impl Builder {
/// Creates a multilevel task queue builder with specified config and [`TaskPriorityProvider`].
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// Creates a multilevel task queue builder with specified config and [`TaskPriorityProvider`].
/// Creates a prioriy task queue builder with specified config and [`TaskPriorityProvider`].

/// The configurations of priority task queues.
pub struct Config {
name: Option<String>,
level_time_threshold: [Duration; LEVEL_NUM - 1],
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can't configure this field?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's the same as multi-level, not configurable currently

@@ -18,7 +19,7 @@ pub use self::extras::Extras;
use std::time::Instant;

/// A cell containing a task and needed extra information.
pub trait TaskCell {
pub trait TaskCell: 'static {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The skiplist's value must be 'static. I think it's fair that a task spawned in the thread pool should live long enough.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see why skiplist's value should be 'static. Technically, as long as the value outlive skiplist, it should be OK. If that's an API shortcoming, then the constraint should be put into priority queue instead of here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like an API shortcoming. I tried making the constraint priority-queue-only, but because we use enum to dispatch instead of trait, the constraint propagates up.

@@ -28,6 +28,8 @@ pub struct Extras {
pub(crate) fixed_level: Option<u8>,
/// Number of execute times
pub(crate) exec_times: u32,
/// The task group name. Used in priority queue.
pub(crate) group_name: String,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see it's set or get in any place.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the multi-tenant implementation, we need to attach the task with the resource group name. I change this field to metadata: Vec<u8> to allow priority queue user to store arbitrary data here.

///
pub fn build_priority_future_pool(
&self,
priority_priovider: Arc<dyn priority::TaskPriorityProvider>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

priovider -> provider

if let Some(ref running_time) = total_running_time {
running_time.inc_by(elapsed);
}
if level == 0 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't always be 0?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. The priority runner also implements the level degradation logic like the multilevel queue, so the user may assign a different priority for higher level tasks.

fn get_priority(&self, extras: &Extras) -> u64;
}

///
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment

Signed-off-by: glorv <glorvs@163.com>
@glorv
Copy link
Contributor Author

glorv commented Dec 21, 2022

@BusyJay @Connor1996 @nolouch PTAL again, thank you very much.

&mut self.metadata
}

/// Set the group name of this task.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

outdated comment

Signed-off-by: glorv <glorvs@163.com>
}
}

/// The local queue is just a proxy of the global queue.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then why not just use the same struct? You can define different alias.

Copy link
Member

@BusyJay BusyJay left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about updating benches to include microbench of the new queue?

Rest LGTM

pub trait TaskPriorityProvider: Send + Sync + 'static {
/// Return a priority value of this task, all tasks in the priority
/// queue is ordered by this value.
fn get_priority(&self, extras: &Extras) -> u64;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
fn get_priority(&self, extras: &Extras) -> u64;
fn priority_of(&self, extras: &Extras) -> u64;

Copy link

@nolouch nolouch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

BornChanger
BornChanger previously approved these changes Dec 26, 2022
Connor1996
Connor1996 previously approved these changes Dec 26, 2022
Copy link
Member

@Connor1996 Connor1996 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Signed-off-by: glorv <glorvs@163.com>
@BornChanger
Copy link

/test-address-sanitizer-ubuntu-latest-nightly

@glorv
Copy link
Contributor Author

glorv commented Dec 28, 2022

@BusyJay PTAL again

Copy link

@BornChanger BornChanger left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@glorv
Copy link
Contributor Author

glorv commented Dec 29, 2022

The failed case is caused by the extra check by Miri, and is fixed by this PR crossbeam-rs/crossbeam#940. Since it doesn't affect correctness, we can fix in the next release version of crossbeam-skiplist.

@glorv
Copy link
Contributor Author

glorv commented Dec 29, 2022

The failed case is caused by the extra check by Miri, and is fixed by this PR crossbeam-rs/crossbeam#940. Since it doesn't affect correctness, we can fix in the next release version of crossbeam-skiplist.

crossbeam-skip already released v0.1.1 which included the fixed PR, so we can just rerun the failed case.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants