Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(query): Support access Redis data from dictionaries via the dict_get function. #16389

Merged
merged 48 commits into from
Sep 9, 2024
Merged
Show file tree
Hide file tree
Changes from 42 commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
cba752d
feat: dict_get to redis.
Winnie-Hong0927 Aug 27, 2024
a6f56e5
fix
Winnie-Hong0927 Aug 27, 2024
159a0b8
fix: resolve_dict_get
Winnie-Hong0927 Aug 27, 2024
a40e7dd
fix
Winnie-Hong0927 Aug 27, 2024
2b87a8e
fix
Winnie-Hong0927 Aug 28, 2024
42420df
fix
Winnie-Hong0927 Aug 28, 2024
12517c4
fix and fmt
Winnie-Hong0927 Aug 28, 2024
8b34bca
feat: add transform_dict_get
Winnie-Hong0927 Aug 28, 2024
d1ab1e6
feat: add lazy_static
Winnie-Hong0927 Aug 28, 2024
caa6f36
feat: add key's type Number and prepare to update transform more.
Winnie-Hong0927 Aug 29, 2024
d52168f
update: bind_create_dictionary
Winnie-Hong0927 Aug 29, 2024
7eb5c2e
update: transform default expr
Winnie-Hong0927 Aug 29, 2024
c7d0919
fix: transform and binder and type_check
Winnie-Hong0927 Aug 29, 2024
8e35973
update: operator & cache.
Winnie-Hong0927 Aug 30, 2024
0659312
fix
Winnie-Hong0927 Aug 30, 2024
e75068f
fix: operator
Winnie-Hong0927 Aug 30, 2024
0f4c6da
fix: transform--operators
Winnie-Hong0927 Sep 1, 2024
55bc12d
update: transform&resolve&argument
Winnie-Hong0927 Sep 2, 2024
77a7211
fix
Winnie-Hong0927 Sep 2, 2024
a9135cd
update: binder & test.
Winnie-Hong0927 Sep 2, 2024
28fe78c
feat: redis-server & test.
Winnie-Hong0927 Sep 3, 2024
9bda033
fmt
Winnie-Hong0927 Sep 3, 2024
4a92b4c
Merge branch 'main' into feat-dict-6
Winnie-Hong0927 Sep 3, 2024
772d96e
Merge branch 'datafuselabs:main' into feat-dict-6
Winnie-Hong0927 Sep 3, 2024
901fa53
update: test.
Winnie-Hong0927 Sep 4, 2024
58db937
fix: binder & test.
Winnie-Hong0927 Sep 4, 2024
4fc490c
fix: binder & test.
Winnie-Hong0927 Sep 4, 2024
3596a8f
update: errorcode & test
Winnie-Hong0927 Sep 4, 2024
987570b
fix
Winnie-Hong0927 Sep 4, 2024
2129321
fix
Winnie-Hong0927 Sep 4, 2024
cbc7798
fix
Winnie-Hong0927 Sep 4, 2024
bad22df
fix
Winnie-Hong0927 Sep 4, 2024
8325d2f
update: binder.
Winnie-Hong0927 Sep 5, 2024
dbe25f0
fix
Winnie-Hong0927 Sep 5, 2024
4fd90cf
fix: schema & transform
Winnie-Hong0927 Sep 6, 2024
23fe8ed
Merge branch 'main' into feat-dict-6
Winnie-Hong0927 Sep 6, 2024
a400102
merge
Winnie-Hong0927 Sep 6, 2024
ae9d623
update:test
Winnie-Hong0927 Sep 6, 2024
482284d
fix
Winnie-Hong0927 Sep 6, 2024
a4d53bc
fix
Winnie-Hong0927 Sep 6, 2024
88d6ef6
fix.
Winnie-Hong0927 Sep 6, 2024
8f9aaac
update: binder & transform.
Winnie-Hong0927 Sep 6, 2024
5ca18e2
fix: dict_get test
Winnie-Hong0927 Sep 7, 2024
449bdd1
update.
Winnie-Hong0927 Sep 8, 2024
b7cd407
Merge branch 'datafuselabs:main' into feat-dict-6
Winnie-Hong0927 Sep 8, 2024
cddb365
update
Winnie-Hong0927 Sep 9, 2024
f526945
Merge branch 'feat-dict-6' of https://github.com/Winnie-Hong0927/data…
Winnie-Hong0927 Sep 9, 2024
c2bbc10
Merge branch 'datafuselabs:main' into feat-dict-6
Winnie-Hong0927 Sep 9, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
244 changes: 219 additions & 25 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@ opendal = { version = "0.49.0", features = [
"services-moka",
"services-webhdfs",
"services-huggingface",
"services-redis",
] }
openraft = { git = "https://github.com/drmingdrmer/openraft", tag = "v0.10.0-alpha.6", features = [
"serde",
Expand Down
2 changes: 1 addition & 1 deletion scripts/ci/deploy/databend-query-standalone.sh
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,4 @@ echo 'Start databend-query...'
nohup target/${BUILD_PROFILE}/databend-query -c scripts/ci/deploy/config/databend-query-node-1.toml --internal-enable-sandbox-tenant &

echo "Waiting on databend-query 10 seconds..."
python3 scripts/ci/wait_tcp.py --timeout 30 --port 8000
python3 scripts/ci/wait_tcp.py --timeout 30 --port 8000
6 changes: 0 additions & 6 deletions src/common/exception/src/exception_code.rs
Original file line number Diff line number Diff line change
Expand Up @@ -386,12 +386,6 @@ build_exceptions! {
// dictionary
DictionaryAlreadyExists(3113),
UnknownDictionary(3114),
UnknownDictionaryId(3115),
UnsupportedDictionaryOption(3116),
UnsupportedDictionarySource(3117),
MissingDictionaryOption(3118),
WrongDictionaryFieldExpr(3119),

// Procedure
UnknownProcedure(3130),
ProcedureAlreadyExists(3131),
Expand Down
1 change: 1 addition & 0 deletions src/common/storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ pub use config::ShareTableConfig;
pub use config::StorageConfig;

mod operator;
pub use operator::build_operator;
pub use operator::init_operator;
pub use operator::DataOperator;

Expand Down
43 changes: 43 additions & 0 deletions src/meta/app/src/schema/dictionary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ use std::sync::Arc;

use chrono::DateTime;
use chrono::Utc;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_expression::TableSchema;

use super::dictionary_name_ident::DictionaryNameIdent;
Expand Down Expand Up @@ -77,6 +79,47 @@ impl Default for DictionaryMeta {
}
}

impl DictionaryMeta {
pub fn build_sql_connection_url(&self) -> Result<String> {
let username = self
.options
.get("username")
.ok_or_else(|| ErrorCode::BadArguments("Miss option `username`"))?;
let password = self
.options
.get("password")
.ok_or_else(|| ErrorCode::BadArguments("Miss option `password`"))?;
let host = self
.options
.get("host")
.ok_or_else(|| ErrorCode::BadArguments("Miss option `host`"))?;
let port = self
.options
.get("port")
.ok_or_else(|| ErrorCode::BadArguments("Miss option `port`"))?;
let db = self
.options
.get("db")
.ok_or_else(|| ErrorCode::BadArguments("Miss option `db`"))?;
Ok(format!(
"mysql://{}:{}@{}:{}/{}",
username, password, host, port, db
))
}

pub fn build_redis_connection_url(&self) -> Result<String> {
let host = self
.options
.get("host")
.ok_or_else(|| ErrorCode::BadArguments("Miss option `host`"))?;
let port = self
.options
.get("port")
.ok_or_else(|| ErrorCode::BadArguments("Miss option `port`"))?;
Ok(format!("tcp://{}:{}", host, port))
}
}

#[derive(Clone, Debug, PartialEq, Eq)]
pub struct CreateDictionaryReq {
pub dictionary_ident: DictionaryNameIdent,
Expand Down
2 changes: 1 addition & 1 deletion src/query/functions/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ pub fn is_builtin_function(name: &str) -> bool {
#[ctor]
pub static BUILTIN_FUNCTIONS: FunctionRegistry = builtin_functions();

pub const ASYNC_FUNCTIONS: [&str; 1] = ["nextval"];
pub const ASYNC_FUNCTIONS: [&str; 2] = ["nextval", "dict_get"];

pub const GENERAL_WINDOW_FUNCTIONS: [&str; 13] = [
"row_number",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,13 @@ impl PipelineBuilder {
pub(crate) fn build_async_function(&mut self, async_function: &AsyncFunction) -> Result<()> {
self.build_pipeline(&async_function.input)?;

let operators = TransformAsyncFunction::init_operators(&async_function.async_func_descs)?;
self.main_pipeline.add_async_transformer(|| {
TransformAsyncFunction::new(self.ctx.clone(), async_function.async_func_descs.clone())
TransformAsyncFunction::new(
self.ctx.clone(),
async_function.async_func_descs.clone(),
operators.clone(),
)
});

Ok(())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ mod transform_async_function;
mod transform_cache_scan;
mod transform_cast_schema;
mod transform_create_sets;
mod transform_dictionary;
mod transform_expression_scan;
mod transform_filter;
mod transform_limit;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::BTreeMap;
use std::sync::Arc;

use databend_common_exception::Result;
Expand All @@ -25,21 +26,29 @@ use databend_common_meta_app::schema::GetSequenceNextValueReq;
use databend_common_meta_app::schema::SequenceIdent;
use databend_common_pipeline_transforms::processors::AsyncTransform;
use databend_common_storages_fuse::TableContext;
use opendal::Operator;

use crate::sessions::QueryContext;
use crate::sql::executor::physical_plans::AsyncFunctionDesc;
use crate::sql::plans::AsyncFunctionArgument;

pub struct TransformAsyncFunction {
ctx: Arc<QueryContext>,
// key is the index of async_func_desc
pub(crate) operators: BTreeMap<usize, Arc<Operator>>,
async_func_descs: Vec<AsyncFunctionDesc>,
}

impl TransformAsyncFunction {
pub fn new(ctx: Arc<QueryContext>, async_func_descs: Vec<AsyncFunctionDesc>) -> Self {
pub fn new(
ctx: Arc<QueryContext>,
async_func_descs: Vec<AsyncFunctionDesc>,
operators: BTreeMap<usize, Arc<Operator>>,
) -> Self {
Self {
ctx,
async_func_descs,
operators,
}
}

Expand Down Expand Up @@ -80,7 +89,7 @@ impl AsyncTransform for TransformAsyncFunction {

#[async_backtrace::framed]
async fn transform(&mut self, mut data_block: DataBlock) -> Result<DataBlock> {
for async_func_desc in &self.async_func_descs {
for (i, async_func_desc) in self.async_func_descs.iter().enumerate() {
match &async_func_desc.func_arg {
AsyncFunctionArgument::SequenceFunction(sequence_name) => {
self.transform_sequence(
Expand All @@ -90,9 +99,18 @@ impl AsyncTransform for TransformAsyncFunction {
)
.await?;
}
AsyncFunctionArgument::DictGetFunction(dict_arg) => {
self.transform_dict_get(
i,
&mut data_block,
dict_arg,
&async_func_desc.arg_indices,
&async_func_desc.data_type,
)
.await?;
}
}
}

Ok(data_block)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::BTreeMap;
use std::sync::Arc;

use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_expression::types::DataType;
use databend_common_expression::BlockEntry;
use databend_common_expression::ColumnBuilder;
use databend_common_expression::DataBlock;
use databend_common_expression::Scalar;
use databend_common_expression::ScalarRef;
use databend_common_expression::Value;
use databend_common_storage::build_operator;
use opendal::services::Redis;
use opendal::Operator;

use crate::pipelines::processors::transforms::TransformAsyncFunction;
use crate::sql::executor::physical_plans::AsyncFunctionDesc;
use crate::sql::plans::AsyncFunctionArgument;
use crate::sql::plans::DictGetFunctionArgument;
use crate::sql::plans::DictionarySource;
use crate::sql::IndexType;

impl TransformAsyncFunction {
pub fn init_operators(
async_func_descs: &[AsyncFunctionDesc],
) -> Result<BTreeMap<usize, Arc<Operator>>> {
let mut operators = BTreeMap::new();
for (i, async_func_desc) in async_func_descs.iter().enumerate() {
if let AsyncFunctionArgument::DictGetFunction(dict_arg) = &async_func_desc.func_arg {
match &dict_arg.dict_source {
DictionarySource::Redis(redis_source) => {
let mut builder = Redis::default().endpoint(&redis_source.connection_url);
if let Some(ref username) = redis_source.username {
builder = builder.username(username);
}
if let Some(ref password) = redis_source.password {
builder = builder.password(password);
}
if let Some(db_index) = redis_source.db_index {
builder = builder.db(db_index);
}
let op = build_operator(builder)?;
operators.insert(i, Arc::new(op));
}
DictionarySource::Mysql(_) => {
return Err(ErrorCode::Unimplemented("Mysql source is unsupported"));
}
}
}
}
Ok(operators)
}

// transform add dict get column.
pub(crate) async fn transform_dict_get(
&self,
i: usize,
data_block: &mut DataBlock,
dict_arg: &DictGetFunctionArgument,
arg_indices: &[IndexType],
data_type: &DataType,
) -> Result<()> {
let op = self.operators.get(&i).unwrap().clone();

// only support one key field.
let arg_index = arg_indices[0];
let entry = data_block.get_by_offset(arg_index);
let value = match &entry.value {
Value::Scalar(scalar) => {
if let Scalar::String(key) = scalar {
let buffer = op.read(key).await;
match buffer {
Ok(res) => {
let value =
unsafe { String::from_utf8_unchecked(res.current().to_vec()) };
Value::Scalar(Scalar::String(value))
}
Err(_) => Value::Scalar(dict_arg.default_value.clone()),
}
} else {
Value::Scalar(dict_arg.default_value.clone())
}
}
Value::Column(column) => {
let mut builder = ColumnBuilder::with_capacity(data_type, column.len());
for scalar in column.iter() {
if let ScalarRef::String(key) = scalar {
let buffer = op.read(key).await;
match buffer {
Ok(res) => {
let value =
unsafe { String::from_utf8_unchecked(res.current().to_vec()) };
builder.push(ScalarRef::String(value.as_str()));
}
Err(_) => {
sundy-li marked this conversation as resolved.
Show resolved Hide resolved
builder.push(dict_arg.default_value.as_ref());
}
};
} else {
builder.push(dict_arg.default_value.as_ref());
}
}
Value::Column(builder.build())
}
};
let entry = BlockEntry {
data_type: data_type.clone(),
value,
};
data_block.add_column(entry);

Ok(())
}
}
Loading
Loading