Skip to content

Commit

Permalink
fix: Cr comments
Browse files Browse the repository at this point in the history
  • Loading branch information
v0y4g3r committed Feb 9, 2023
1 parent 2b2b893 commit 25b6661
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 12 deletions.
5 changes: 3 additions & 2 deletions src/storage/src/compaction/dedup_deque.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::hash_map::Entry;
use std::collections::{HashMap, VecDeque};
use std::fmt::{Debug, Formatter};
use std::hash::Hash;
Expand All @@ -29,8 +30,8 @@ impl<K: Eq + Hash + Clone, V> DedupDeque<K, V> {
/// returns false.
pub fn push_back(&mut self, key: K, value: V) -> bool {
debug_assert_eq!(self.deque.len(), self.existing.len());
if !self.existing.contains_key(&key) {
self.existing.insert(key.clone(), value);
if let Entry::Vacant(entry) = self.existing.entry(key.clone()) {
entry.insert(value);
self.deque.push_back(key);
return true;
}
Expand Down
20 changes: 10 additions & 10 deletions src/storage/src/compaction/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@
// limitations under the License.

use std::marker::PhantomData;
use std::sync::{Arc, Mutex};
use std::sync::{Arc, Mutex, RwLock};

use async_trait::async_trait;
use common_telemetry::{debug, info};
use snafu::ResultExt;
use table::metadata::TableId;
use tokio::sync::{Notify, RwLock};
use tokio::sync::Notify;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;

Expand Down Expand Up @@ -93,7 +93,7 @@ where
request.table_id(),
self.remaining_requests().await
);
let mut queue = self.request_queue.write().await;
let mut queue = self.request_queue.write().unwrap();
let res = queue.push_back(request.table_id(), request);
self.task_notifier.notify_one();
Ok(res)
Expand Down Expand Up @@ -147,7 +147,7 @@ where
}

async fn remaining_requests(&self) -> usize {
self.request_queue.read().await.len()
self.request_queue.read().unwrap().len()
}
}

Expand All @@ -172,15 +172,15 @@ impl<R, T: CompactionTask, P: Picker<R, T>> CompactionHandler<R, T, P> {
_ = task_notifier.notified() => {
// poll requests as many as possible until rate limited, and then wait for
// notification (some task's finished).
debug!("Notified, queue size: {:?}", self.req_queue.read().await.len());
debug!("Notified, queue size: {:?}", self.req_queue.read().unwrap().len());
while let Some((table_id, req)) = self.poll_task().await {
if let Ok(token) = limiter.acquire_token(&req) {
debug!("Executing compaction request: {}", table_id);
self.handle_compaction_request(req, token).await;
} else {
// compaction rate limited, put back to req queue to wait for next
// schedule
debug!("Put back request {}, queue size: {}", table_id, self.req_queue.read().await.len());
debug!("Put back request {}, queue size: {}", table_id, self.req_queue.read().unwrap().len());
self.put_back_req(table_id, req).await;
break;
}
Expand All @@ -196,14 +196,14 @@ impl<R, T: CompactionTask, P: Picker<R, T>> CompactionHandler<R, T, P> {

#[inline]
async fn poll_task(&self) -> Option<(TableId, R)> {
let mut queue = self.req_queue.write().await;
let mut queue = self.req_queue.write().unwrap();
queue.pop_front()
}

/// Puts request back to the front of request queue.
#[inline]
async fn put_back_req(&self, table_id: TableId, req: R) {
let mut queue = self.req_queue.write().await;
let mut queue = self.req_queue.write().unwrap();
queue.push_front(table_id, req);
}

Expand Down Expand Up @@ -305,12 +305,12 @@ mod tests {

queue
.write()
.await
.unwrap()
.push_back(1, CompactionRequestImpl::default());
handler.task_notifier.notify_one();
queue
.write()
.await
.unwrap()
.push_back(2, CompactionRequestImpl::default());
handler.task_notifier.notify_one();

Expand Down

0 comments on commit 25b6661

Please sign in to comment.