Skip to content

Commit

Permalink
fmt
Browse files Browse the repository at this point in the history
  • Loading branch information
akiradeveloper committed Jun 9, 2024
1 parent 37e6440 commit f2ac53b
Show file tree
Hide file tree
Showing 8 changed files with 26 additions and 14 deletions.
1 change: 1 addition & 0 deletions lolraft/src/generated/lolraft.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
// This file is @generated by prost-build.
/// Update request to the `RaftApp`.
/// This type of request is serialized in the log and processed sequentially.
/// `request_id` is unique identifier of the request to avoid executing duplicating requests.
Expand Down
Binary file modified lolraft/src/generated/lolraft_descriptor.bin
Binary file not shown.
2 changes: 1 addition & 1 deletion lolraft/src/process/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ mod command_log;
use command_log::CommandLog;
mod voter;
use voter::Voter;
mod query_queue;
mod app;
mod query_queue;
use app::App;

mod command;
Expand Down
14 changes: 7 additions & 7 deletions lolraft/src/process/query_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ impl Processor {
/// Register a query to be executed when the read index reaches `read_index`.
/// `read_index` is the index of the commit pointer of when the query is submitted.
pub fn process(&self, index: Index) -> usize {
let qs = self.inner.try_iter().take_while(|(read_index, _)| *read_index <= index);
let qs = self
.inner
.try_iter()
.take_while(|(read_index, _)| *read_index <= index);

let mut n = 0;
for (_, q) in qs {
Expand All @@ -38,7 +41,7 @@ impl Processor {
// which just results in failing on the client side.
if let Ok(resp) = app.process_read(&q.message).await {
q.user_completion.complete_with(resp).ok();
}
}
};
tokio::spawn(fut);
}
Expand All @@ -49,10 +52,7 @@ impl Processor {

pub fn new(app: Ref<App>) -> (Producer, Processor) {
let (tx, rx) = flume::unbounded();
let processor = Processor {
inner: rx,
app,
};
let processor = Processor { inner: rx, app };
let producer = Producer { inner: tx };
(producer, processor)
}
}
7 changes: 6 additions & 1 deletion lolraft/src/process/thread/advance_user.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,12 @@ impl Thread {
}
}

pub fn new(command_log: CommandLog, app: App, consumer: EventConsumer<KernEvent>, producer: EventProducer<ApplicationEvent>) -> ThreadHandle {
pub fn new(
command_log: CommandLog,
app: App,
consumer: EventConsumer<KernEvent>,
producer: EventProducer<ApplicationEvent>,
) -> ThreadHandle {
Thread {
command_log,
app,
Expand Down
2 changes: 1 addition & 1 deletion lolraft/src/process/thread/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,4 +78,4 @@ pub struct CommitEvent;
pub struct KernEvent;

#[derive(Clone)]
pub struct ApplicationEvent;
pub struct ApplicationEvent;
10 changes: 8 additions & 2 deletions lolraft/src/process/thread/query_execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ impl Thread {
fn do_loop(self) -> ThreadHandle {
let fut = async move {
loop {
self.consumer.consume_events(Duration::from_millis(100)).await;
self.consumer
.consume_events(Duration::from_millis(100))
.await;
while self.advance_once().await {
tokio::task::yield_now().await;
}
Expand All @@ -27,7 +29,11 @@ impl Thread {
}
}

pub fn new(query_queue: query_queue::Processor, command_log: Ref<CommandLog>, consumer: EventConsumer<ApplicationEvent>) -> ThreadHandle {
pub fn new(
query_queue: query_queue::Processor,
command_log: Ref<CommandLog>,
consumer: EventConsumer<ApplicationEvent>,
) -> ThreadHandle {
Thread {
query_queue,
command_log,
Expand Down
4 changes: 2 additions & 2 deletions tests/lol-tests/benches/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ fn do_bench(m: u16, b: &mut test::Bencher) {

rt.block_on(async {
drop(cluster);
});
});
}

#[bench]
Expand All @@ -49,4 +49,4 @@ fn query_100(b: &mut test::Bencher) {
#[bench]
fn query_1000(b: &mut test::Bencher) {
do_bench(1000, b);
}
}

0 comments on commit f2ac53b

Please sign in to comment.