Skip to content

Commit

Permalink
feat: block rules support query (#1420)
Browse files Browse the repository at this point in the history
## Rationale
Query with long time range usually cost too much resources, which affect
stable of the whole cluster

## Detailed Changes
- Support block query by query range


## Test Plan
Manually
```bash

curl 0:5000/admin/block -H 'content-type: application/json' -d '
{
  "operation": "Set",
  "write_block_list": [],
  "read_block_list": [],
  "block_rules": [
    {"type": "QueryRange", "content": "24h"}
  ]
}'
```
  • Loading branch information
jiacai2050 authored Jan 4, 2024
1 parent 61b123a commit 904e2d5
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 4 deletions.
44 changes: 41 additions & 3 deletions proxy/src/limiter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,16 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::{collections::HashSet, sync::RwLock};
use std::{collections::HashSet, str::FromStr, sync::RwLock};

use datafusion::logical_expr::logical_plan::LogicalPlan;
use macros::define_result;
use query_frontend::plan::Plan;
use serde::{Deserialize, Serialize};
use snafu::Snafu;
use time_ext::ReadableDuration;

use crate::metrics::BLOCKED_REQUEST_COUNTER_VEC_GLOBAL;

#[derive(Snafu, Debug)]
#[snafu(visibility(pub))]
Expand All @@ -33,12 +36,26 @@ pub enum Error {
define_result!(Error);

#[derive(Clone, Copy, Deserialize, Debug, PartialEq, Eq, Hash, Serialize, PartialOrd, Ord)]
#[serde(tag = "type", content = "content")]
pub enum BlockRule {
QueryWithoutPredicate,
/// Max time range a query can scan.
#[serde(deserialize_with = "deserialize_readable_duration")]
QueryRange(i64),
AnyQuery,
AnyInsert,
}

fn deserialize_readable_duration<'de, D>(deserializer: D) -> std::result::Result<i64, D::Error>
where
D: serde::Deserializer<'de>,
{
let s: &str = Deserialize::deserialize(deserializer)?;
ReadableDuration::from_str(s)
.map(|d| d.0.as_millis() as i64)
.map_err(serde::de::Error::custom)
}

#[derive(Default, Clone, Deserialize, Debug, Serialize)]
#[serde(default)]
pub struct LimiterConfig {
Expand All @@ -52,6 +69,17 @@ impl BlockRule {
match self {
BlockRule::QueryWithoutPredicate => self.is_query_without_predicate(plan),
BlockRule::AnyQuery => matches!(plan, Plan::Query(_)),
BlockRule::QueryRange(threshold) => {
if let Plan::Query(plan) = plan {
if let Some(range) = plan.query_range() {
if range > *threshold {
return true;
}
}
}

false
}
BlockRule::AnyInsert => matches!(plan, Plan::Insert(_)),
}
}
Expand Down Expand Up @@ -159,8 +187,18 @@ impl Limiter {
///
/// Error will throws if the plan is forbidden to execute.
pub fn try_limit(&self, plan: &Plan) -> Result<()> {
self.try_limit_by_block_list(plan)?;
self.try_limit_by_rules(plan)
let result = {
self.try_limit_by_block_list(plan)?;
self.try_limit_by_rules(plan)
};

if result.is_err() {
BLOCKED_REQUEST_COUNTER_VEC_GLOBAL
.with_label_values(&[plan.plan_type()])
.inc();
}

result
}

pub fn add_write_block_list(&self, block_list: Vec<String>) {
Expand Down
6 changes: 6 additions & 0 deletions proxy/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,12 @@ lazy_static! {
pub static ref HTTP_HANDLER_COUNTER_VEC_GLOBAL: IntCounterVec =
register_int_counter_vec!("http_handler_counter", "Http handler counter", &["type"])
.unwrap();
pub static ref BLOCKED_REQUEST_COUNTER_VEC_GLOBAL: IntCounterVec = register_int_counter_vec!(
"blocked_request_counter",
"Blocked request counter",
&["type"]
)
.unwrap();
}

lazy_static! {
Expand Down
2 changes: 1 addition & 1 deletion proxy/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ impl Proxy {
.try_limit(&plan)
.box_err()
.context(Internal {
msg: "Request is blocked",
msg: format!("Request is blocked, table_name:{table_name:?}"),
})?;
}

Expand Down
27 changes: 27 additions & 0 deletions query_frontend/src/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,21 @@ pub enum Plan {
Exists(ExistsTablePlan),
}

impl Plan {
pub fn plan_type(&self) -> &str {
match self {
Self::Query(_) => "query",
Self::Insert(_) => "insert",
Self::Create(_)
| Self::Drop(_)
| Self::Describe(_)
| Self::AlterTable(_)
| Self::Show(_)
| Self::Exists(_) => "other",
}
}
}

pub struct PriorityContext {
pub time_range_threshold: u64,
}
Expand Down Expand Up @@ -201,6 +216,18 @@ impl QueryPlan {

Some(priority)
}

/// When query contains invalid time range such as `[200, 100]`, it will
/// return None.
pub fn query_range(&self) -> Option<i64> {
self.extract_time_range().map(|time_range| {
time_range
.exclusive_end()
.as_i64()
.checked_sub(time_range.inclusive_start().as_i64())
.unwrap_or(i64::MAX)
})
}
}

impl Debug for QueryPlan {
Expand Down

0 comments on commit 904e2d5

Please sign in to comment.