Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat(pisa-proxy, sharding): add database_table sharding rewrite #380

Merged
merged 16 commits into from
Nov 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
c5e7390
fix(pisa-proxy, sharding): fix don't merge when contains `count` func
xuanyuan300 Nov 14, 2022
25602ef
fix(pisa-proxy, sharding): fix count,sum field idx
xuanyuan300 Nov 14, 2022
4041f85
fix(pisa-proxy, sharding): fix columns incorrect when contains `avg`
xuanyuan300 Nov 14, 2022
197e2f4
fix(pisa-proxy, sharding): fix columns when contains avg func.
xuanyuan300 Nov 14, 2022
df221e1
fix(pisa-proxy, sharding): fix columns when contains avg func.
xuanyuan300 Nov 14, 2022
909018b
fix(pisa-proxy, sharding): fix columns when contains avg func in prep…
xuanyuan300 Nov 15, 2022
13f2229
fix(pisa-proxy, sharding): fix columns when contains avg func in prep…
xuanyuan300 Nov 15, 2022
425bd7a
fix(pisa-proxy, sharding): add get_meta_detail macro to simplify the …
xuanyuan300 Nov 16, 2022
4e0f5b5
refactor(pisa-proxy, sharding): WIP refactor sharding rewrite
xuanyuan300 Nov 16, 2022
183c3f7
refactor(pisa-proxy, sharding): WIP refactor sharding rewrite
xuanyuan300 Nov 17, 2022
4620da1
refactor(pisa-proxy, sharding): WIP refactor sharding rewrite
xuanyuan300 Nov 17, 2022
2a3f34b
refactor(pisa-proxy, sharding): WIP refactor sharding rewrite
xuanyuan300 Nov 18, 2022
eae41ce
refactor(pisa-proxy, sharding): WIP refactor sharding rewrite
xuanyuan300 Nov 18, 2022
1bd6f1a
refactor(pisa-proxy, sharding): WIP refactor sharding rewrite
xuanyuan300 Nov 19, 2022
e10cba7
refactor(pisa-proxy, sharding): WIP refactor sharding rewrite
xuanyuan300 Nov 20, 2022
4ec5dd3
chore(pisa-proxy, runtime): remove comments
xuanyuan300 Nov 21, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pisa-proxy/protocol/mysql/src/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use bytes::{Buf, BufMut, BytesMut};

use crate::{mysql_const::ColumnType, util::{ BufExt, BufMutExt, get_length }};

#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct ColumnInfo {
pub schema: Option<String>,
pub table_name: Option<String>,
Expand Down
41 changes: 31 additions & 10 deletions pisa-proxy/protocol/mysql/src/row.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;
use std::{sync::Arc, str::FromStr};

use crate::{
column::ColumnInfo,
Expand All @@ -37,8 +37,8 @@ pub enum RowDataTyp<T: AsRef<[u8]>> {
pub struct RowPartData {
pub data: Box<[u8]>,
pub start_idx: usize,
pub start_part_idx: usize,
pub end_part_idx: usize,
pub part_encode_length: usize,
pub part_data_length: usize,
}

crate::gen_row_data!(RowDataTyp, Text(RowDataText), Binary(RowDataBinary));
Expand Down Expand Up @@ -83,8 +83,7 @@ impl<T: AsRef<[u8]>> RowData<T> for RowDataText<T> {
fn decode_with_name<V: Value>(&mut self, name: &str) -> value::Result<V> {
let row_data = self.get_row_data_with_name(name)?;
match row_data {
Some(data) => Value::from(&data.data[data.start_part_idx..data.end_part_idx]),

Some(data) => Value::from(&data.data),
_ => Ok(None),
}
}
Expand All @@ -104,10 +103,10 @@ impl<T: AsRef<[u8]>> RowData<T> for RowDataText<T> {

return Ok(Some(
RowPartData {
data: self.buf.as_ref()[idx..idx + (pos + length) as usize].into(),
data: self.buf.as_ref()[idx + pos as usize .. idx + (pos + length) as usize].into(),
start_idx: idx,
start_part_idx: pos as usize,
end_part_idx: (pos + length) as usize,
part_encode_length: pos as usize,
part_data_length: length as usize,
}
));
}
Expand Down Expand Up @@ -239,12 +238,13 @@ impl<T: AsRef<[u8]>> RowData<T> for RowDataBinary<T> {

// Need to add packet header and null_map to returnd data
let raw_data = &self.buf.as_ref()[start_pos + pos as usize..(start_pos + pos as usize + length as usize)];
println!("eeeeeeeeeeeee {:?}", &raw_data[..]);
wbtlb marked this conversation as resolved.
Show resolved Hide resolved
return Ok(Some(
RowPartData {
data: raw_data.into(),
start_idx: start_pos,
start_part_idx: pos as usize,
end_part_idx: (pos + length) as usize,
part_encode_length: pos as usize,
part_data_length: length as usize,
}
))
}
Expand All @@ -254,6 +254,27 @@ impl<T: AsRef<[u8]>> RowData<T> for RowDataBinary<T> {
}
}


// Box has default 'static bound, use `'e` lifetime relax bound.
pub fn decode_with_name<'e, T: AsRef<[u8]>, V: Value + std::str::FromStr>(row_data: &mut RowDataTyp<T>, name: &str, is_binary: bool) -> Result<Option<V>, Box<dyn std::error::Error + Send + Sync + 'e> >
where
T: AsRef<[u8]>,
V: Value + std::str::FromStr,
<V as FromStr>::Err: std::error::Error + Sync + Send + 'e
{
if is_binary {
row_data.decode_with_name::<V>(name)
} else {
let new_value = row_data.decode_with_name::<String>(name)?;
if let Some(new_value) = new_value {
let new_value = new_value.parse::<V>()?;
Ok(Some(new_value))
} else {
Ok(None)
}
}
}

#[cfg(test)]
mod test {
use std::sync::Arc;
Expand Down
3 changes: 2 additions & 1 deletion pisa-proxy/protocol/mysql/src/value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ use chrono::{Duration, NaiveDateTime, NaiveDate, NaiveTime};

use crate::err::DecodeRowError;

pub type Result<T> = std::result::Result<Option<T>, Box<dyn std::error::Error + Send + Sync>>;
type BoxError = Box<dyn std::error::Error + Send + Sync>;
pub type Result<T> = std::result::Result<Option<T>, BoxError>;

pub trait Value: Sized {
type Item: Convert<Self>;
Expand Down
1 change: 1 addition & 0 deletions pisa-proxy/proxy/strategy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,4 @@ aho-corasick = "0.7.19"
itertools = "0.10.4"
thiserror = "1.0"
crc32fast = "1.3.2"
paste = "1.0.9"
135 changes: 96 additions & 39 deletions pisa-proxy/proxy/strategy/src/sharding_rewrite/generic_meta.rs
Original file line number Diff line number Diff line change
@@ -1,128 +1,185 @@
// Copyright 2022 SphereEx Authors
//
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//
// http://www.apache.org/licenses/LICENSE-2.0
//
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use crate::config::{StrategyType, Sharding, ShardingAlgorithmName};
use endpoint::endpoint::Endpoint;

use crate::config::{Sharding, ShardingAlgorithmName, StrategyType};

#[derive(Debug)]
pub(crate) struct ShardingMetaBaseInfo<'a> {
pub column: (Option<&'a str>, Option<&'a str>),
pub count: (Option<u32>, Option<u32>),
pub algo: (Option<&'a ShardingAlgorithmName>, Option<&'a ShardingAlgorithmName>),
}

pub trait ShardingMeta {
fn get_sharding_column(&self) -> (Option<&str>, Option<&str>);
fn get_algo(&self) -> (Option<&ShardingAlgorithmName>, Option<&ShardingAlgorithmName>);
fn get_sharding_count(&self) -> (Option<u64>, Option<u64>);
fn get_sharding_count(&self) -> (Option<u32>, Option<u32>);
fn get_actual_schema<'a>(
&'a self,
endpoints: &'a [Endpoint],
idx: Option<usize>,
) -> Option<&'a str>;
fn get_endpoint<'a>(
&'a self,
endpoints: &'a [Endpoint],
idx: Option<usize>,
) -> Option<Endpoint>;
fn get_strategy_typ(&self) -> super::StrategyTyp;
}

/// Todo: use macro generate
impl ShardingMeta for Sharding {
fn get_sharding_column(&self) -> (Option<&str>, Option<&str>) {
if let Some(strategy) = &self.database_strategy {
return strategy.get_sharding_column()
return strategy.get_sharding_column();
}

if let Some(strategy) = &self.table_strategy {
return strategy.get_sharding_column()
return strategy.get_sharding_column();
}

if let Some(strategy) = &self.database_table_strategy {
return strategy.get_sharding_column()
return strategy.get_sharding_column();
}

(None, None)
}

fn get_algo(&self) -> (Option<&ShardingAlgorithmName>, Option<&ShardingAlgorithmName>) {
if let Some(strategy) = &self.database_strategy {
return strategy.get_algo()
return strategy.get_algo();
}

if let Some(strategy) = &self.table_strategy {
return strategy.get_algo()
return strategy.get_algo();
}

if let Some(strategy) = &self.database_table_strategy {
return strategy.get_algo()
return strategy.get_algo();
}

(None, None)
}

fn get_sharding_count(&self) -> (Option<u64>, Option<u64>) {
fn get_sharding_count(&self) -> (Option<u32>, Option<u32>) {
if let Some(_) = &self.database_strategy {
return (Some(self.actual_datanodes.len() as u64), None)
return (Some(self.actual_datanodes.len() as u32), None);
}

if let Some(strategy) = &self.table_strategy {
return (None, strategy.get_sharding_count().1)
return (None, strategy.get_sharding_count().1);
}

if let Some(strategy) = &self.database_table_strategy {
return (Some(self.actual_datanodes.len() as u64), strategy.get_sharding_count().1)
return (Some(self.actual_datanodes.len() as u32), strategy.get_sharding_count().1);
}

(None, None)
}

fn get_actual_schema<'a>(
&self,
endpoints: &'a [Endpoint],
idx: Option<usize>,
) -> Option<&'a str> {
if self.database_strategy.is_some() || self.database_table_strategy.is_some() {
let ep = endpoints.iter().find(|ep| ep.name == self.actual_datanodes[idx.unwrap()]);
return ep.map(|x| x.db.as_str());
}

None
}

fn get_endpoint(&self, endpoints: &[Endpoint], idx: Option<usize>) -> Option<Endpoint> {
let idx = if self.table_strategy.is_some() { 0 } else { idx.unwrap() };
endpoints.iter().find(|ep| ep.name == self.actual_datanodes[idx]).map(|x| x.clone())
}

fn get_strategy_typ(&self) -> super::StrategyTyp {
if self.database_strategy.is_some() {
super::StrategyTyp::Database
} else if self.table_strategy.is_some() {
super::StrategyTyp::Table
} else {
super::StrategyTyp::DatabaseTable
}
}
}

impl ShardingMeta for StrategyType {
fn get_sharding_column(&self) -> (Option<&str>, Option<&str>) {
match self {
Self::DatabaseStrategyConfig(config) => {
(Some(&config.database_sharding_column), None)
},
Self::DatabaseStrategyConfig(config) => (Some(&config.database_sharding_column), None),

Self::DatabaseTableStrategyConfig(config) => {
(Some(&config.database_sharding_column), Some(&config.table_sharding_column))
},
}

Self::TableStrategyConfig(config) => {
(None, Some(&config.table_sharding_column))
},
Self::TableStrategyConfig(config) => (None, Some(&config.table_sharding_column)),

_ => (None, None)
_ => (None, None),
}
}

fn get_algo(&self) -> (Option<&ShardingAlgorithmName>, Option<&ShardingAlgorithmName>) {
match self {
Self::DatabaseStrategyConfig(config) => {
(Some(&config.database_sharding_algorithm_name), None)
},
}

Self::DatabaseTableStrategyConfig(config) => {
(Some(&config.database_sharding_algorithm_name), Some(&config.table_sharding_algorithm_name))
},
Self::DatabaseTableStrategyConfig(config) => (
Some(&config.database_sharding_algorithm_name),
Some(&config.table_sharding_algorithm_name),
),

Self::TableStrategyConfig(config) => {
(None, Some(&config.table_sharding_algorithm_name))
},
}

_ => (None, None)
_ => (None, None),
}
}

fn get_sharding_count(&self) -> (Option<u64>, Option<u64>) {
fn get_sharding_count(&self) -> (Option<u32>, Option<u32>) {
match self {
Self::DatabaseStrategyConfig(_) => {
unimplemented!()
},
}

Self::DatabaseTableStrategyConfig(config) => {
(None, Some(config.shading_count.into()))
},
Self::DatabaseTableStrategyConfig(config) => (None, Some(config.shading_count.into())),

Self::TableStrategyConfig(config) => {
(None, Some(config.sharding_count.into()))
},
Self::TableStrategyConfig(config) => (None, Some(config.sharding_count.into())),

_ => (None, None)
_ => (None, None),
}
}
}

fn get_actual_schema<'a>(
&'a self,
_endpoints: &'a [Endpoint],
_idx: Option<usize>,
) -> Option<&'a str> {
None
}

fn get_endpoint(&self, _endpoints: &[Endpoint], _idx: Option<usize>) -> Option<Endpoint> {
None
}

fn get_strategy_typ(&self) -> super::StrategyTyp {
unimplemented!()
}
}
23 changes: 23 additions & 0 deletions pisa-proxy/proxy/strategy/src/sharding_rewrite/macros.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Copyright 2022 SphereEx Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#[macro_export]
macro_rules! get_meta_detail {
($meta:ident, $($meta_typ:ident),*) => {
paste! {
$(let $meta_typ = $meta.[<get_ $meta_typ>]();)*
}

}
}
Loading