Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: query requests dedup #1100

Merged
merged 10 commits into from
Jul 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions common_types/src/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ pub enum Error {

pub type Result<T> = std::result::Result<T, Error>;

#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct NullColumn(NullArray);

impl NullColumn {
Expand All @@ -109,7 +109,7 @@ impl NullColumn {
macro_rules! define_numeric_column {
($($Kind: ident), *) => {
$(paste! {
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct [<$Kind Column>]([<$Kind Array>]);

#[inline]
Expand All @@ -131,24 +131,24 @@ define_numeric_column!(
Float, Double, UInt64, UInt32, UInt16, UInt8, Int64, Int32, Int16, Int8, Boolean
);

#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct TimestampColumn(TimestampMillisecondArray);

#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct VarbinaryColumn(BinaryArray);

#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct StringColumn(StringArray);

/// dictionary encode type is difference from other types, need implement
/// without macro
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct StringDictionaryColumn(DictionaryArray<Int32Type>);

#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct DateColumn(DateArray);

#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct TimeColumn(TimeArray);

#[inline]
Expand Down Expand Up @@ -749,7 +749,7 @@ impl_column_block!(
macro_rules! define_column_block {
($($Kind: ident), *) => {
paste! {
#[derive(Debug)]
#[derive(Debug, Clone)]
pub enum ColumnBlock {
Null(NullColumn),
StringDictionary(StringDictionaryColumn),
Expand Down
8 changes: 8 additions & 0 deletions common_types/src/projected_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,10 @@ impl ProjectedSchema {
self.0.is_all_projection()
}

pub fn projection(&self) -> Option<Vec<usize>> {
self.0.projection()
}

/// Returns the [RowProjector] to project the rows with source schema to
/// rows with [RecordSchemaWithKey].
///
Expand Down Expand Up @@ -260,6 +264,10 @@ impl ProjectedSchemaInner {
self.projection.is_none()
}

fn projection(&self) -> Option<Vec<usize>> {
self.projection.clone()
}

// TODO(yingwen): We can fill missing not null column with default value instead
// of returning error.
fn try_project_with_key(&self, source_schema: &Schema) -> Result<RowProjector> {
Expand Down
4 changes: 2 additions & 2 deletions common_types/src/record_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ pub enum Error {

pub type Result<T> = std::result::Result<T, Error>;

#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct RecordBatchData {
arrow_record_batch: ArrowRecordBatch,
column_blocks: Vec<ColumnBlock>,
Expand Down Expand Up @@ -192,7 +192,7 @@ impl TryFrom<ArrowRecordBatch> for RecordBatchData {

// TODO(yingwen): The schema in RecordBatch should be much simple because it may
// lack some information.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct RecordBatch {
schema: RecordSchema,
data: RecordBatchData,
Expand Down
4 changes: 4 additions & 0 deletions server/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,9 @@ pub struct ServerConfig {

/// Config of remote engine client
pub remote_client: remote_engine_client::Config,

/// Whether to deduplicate requests
pub enable_query_dedup: bool,
}

impl Default for ServerConfig {
Expand All @@ -140,6 +143,7 @@ impl Default for ServerConfig {
route_cache: router::RouteCacheConfig::default(),
hotspot: hotspot::Config::default(),
remote_client: remote_engine_client::Config::default(),
enable_query_dedup: false,
}
}
}
Expand Down
89 changes: 89 additions & 0 deletions server/src/dedup_requests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0.

use std::{collections::HashMap, hash::Hash, sync::RwLock};

use tokio::sync::mpsc::Sender;

type Notifier<T> = Sender<T>;

#[derive(Debug)]
struct Notifiers<T> {
notifiers: RwLock<Vec<Notifier<T>>>,
}

impl<T> Notifiers<T> {
pub fn new(notifier: Notifier<T>) -> Self {
let notifiers = vec![notifier];
Self {
notifiers: RwLock::new(notifiers),
}
}

pub fn add_notifier(&self, notifier: Notifier<T>) {
self.notifiers.write().unwrap().push(notifier);
}
}

#[derive(Debug)]
pub struct RequestNotifiers<K, T>
where
K: PartialEq + Eq + Hash,
{
notifiers_by_key: RwLock<HashMap<K, Notifiers<T>>>,
}

impl<K, T> Default for RequestNotifiers<K, T>
where
K: PartialEq + Eq + Hash,
{
fn default() -> Self {
Self {
notifiers_by_key: RwLock::new(HashMap::new()),
}
}
}

impl<K, T> RequestNotifiers<K, T>
where
K: PartialEq + Eq + Hash,
{
/// Insert a notifier for the given key.
pub fn insert_notifier(&self, key: K, notifier: Notifier<T>) -> RequestResult {
// First try to read the notifiers, if the key exists, add the notifier to the
// notifiers.
let notifiers_by_key = self.notifiers_by_key.read().unwrap();
if let Some(notifiers) = notifiers_by_key.get(&key) {
notifiers.add_notifier(notifier);
return RequestResult::Wait;
}
drop(notifiers_by_key);

// If the key does not exist, try to write the notifiers.
let mut notifiers_by_key = self.notifiers_by_key.write().unwrap();
// double check, if the key exists, add the notifier to the notifiers.
if let Some(notifiers) = notifiers_by_key.get(&key) {
notifiers.add_notifier(notifier);
return RequestResult::Wait;
}

//the key is not existed, insert the key and the notifier.
notifiers_by_key.insert(key, Notifiers::new(notifier));
RequestResult::First
}

/// Take the notifiers for the given key, and remove the key from the map.
pub fn take_notifiers(&self, key: &K) -> Option<Vec<Notifier<T>>> {
self.notifiers_by_key
.write()
.unwrap()
.remove(key)
.map(|notifiers| notifiers.notifiers.into_inner().unwrap())
}
}

pub enum RequestResult {
// The first request for this key, need to handle this request.
First,
// There are other requests for this key, just wait for the result.
Wait,
}
1 change: 1 addition & 0 deletions server/src/grpc/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ make_auto_flush_static_metric! {
write_succeeded_row,
write_failed_row,
query_succeeded_row,
dedupped_stream_query,
}

pub struct RemoteEngineGrpcHandlerCounterVec: LocalIntCounter {
Expand Down
22 changes: 18 additions & 4 deletions server/src/grpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use ceresdbproto::{
storage::storage_service_server::StorageServiceServer,
};
use cluster::ClusterRef;
use common_types::column_schema;
use common_types::{column_schema, record_batch::RecordBatch};
use futures::FutureExt;
use generic_error::GenericError;
use log::{info, warn};
Expand All @@ -34,9 +34,13 @@ use table_engine::engine::EngineRuntimes;
use tokio::sync::oneshot::{self, Sender};
use tonic::transport::Server;

use crate::grpc::{
meta_event_service::MetaServiceImpl, remote_engine_service::RemoteEngineServiceImpl,
storage_service::StorageServiceImpl,
use crate::{
dedup_requests::RequestNotifiers,
grpc::{
meta_event_service::MetaServiceImpl,
remote_engine_service::{error, RemoteEngineServiceImpl, StreamReadReqKey},
storage_service::StorageServiceImpl,
},
};

mod meta_event_service;
Expand Down Expand Up @@ -196,6 +200,7 @@ pub struct Builder<Q> {
cluster: Option<ClusterRef>,
opened_wals: Option<OpenedWals>,
proxy: Option<Arc<Proxy<Q>>>,
request_notifiers: Option<Arc<RequestNotifiers<StreamReadReqKey, error::Result<RecordBatch>>>>,
}

impl<Q> Builder<Q> {
Expand All @@ -208,6 +213,7 @@ impl<Q> Builder<Q> {
cluster: None,
opened_wals: None,
proxy: None,
request_notifiers: None,
}
}

Expand Down Expand Up @@ -246,6 +252,13 @@ impl<Q> Builder<Q> {
self.proxy = Some(proxy);
self
}

pub fn request_notifiers(mut self, enable_query_dedup: bool) -> Self {
if enable_query_dedup {
self.request_notifiers = Some(Arc::new(RequestNotifiers::default()));
}
self
}
}

impl<Q: QueryExecutor + 'static> Builder<Q> {
Expand All @@ -269,6 +282,7 @@ impl<Q: QueryExecutor + 'static> Builder<Q> {
let service = RemoteEngineServiceImpl {
instance,
runtimes: runtimes.clone(),
request_notifiers: self.request_notifiers,
};
RemoteEngineServiceServer::new(service)
};
Expand Down
Loading