Skip to content

Commit

Permalink
Add jq set returning function
Browse files Browse the repository at this point in the history
  • Loading branch information
maxjustus committed Aug 20, 2024
1 parent 449d66e commit 3721e11
Show file tree
Hide file tree
Showing 4 changed files with 399 additions and 20 deletions.
71 changes: 71 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions src/query/functions/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,12 @@ geos = { workspace = true }
geozero = { workspace = true }
h3o = "0.4.0"
hex = "0.4.3"
hifijson = "0.2.2"
itertools = { workspace = true }
jaq-core = "1.5.1"
jaq-interpret = { version = "1.5.0", features = ["default"] }
jaq-parse = "1.0.3"
jaq-std = "1.6.0"
jsonb = { workspace = true }
lexical-core = "0.8.5"
libm = "0.2.6"
Expand Down
166 changes: 166 additions & 0 deletions src/query/functions/src/srfs/variant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,14 @@ use databend_common_expression::Scalar;
use databend_common_expression::ScalarRef;
use databend_common_expression::Value;
use databend_common_expression::ValueRef;
use jaq_core;
use jaq_interpret::Ctx;
use jaq_interpret::FilterT;
use jaq_interpret::ParseCtx;
use jaq_interpret::RcIter;
use jaq_interpret::Val;
use jaq_parse;
use jaq_std;
use jsonb::array_length;
use jsonb::array_values;
use jsonb::as_str;
Expand All @@ -47,6 +55,7 @@ use jsonb::jsonpath::Mode as SelectorMode;
use jsonb::jsonpath::Selector;
use jsonb::object_each;
use jsonb::object_keys;
use jsonb::to_serde_json;

pub fn register(registry: &mut FunctionRegistry) {
registry.properties.insert(
Expand Down Expand Up @@ -77,6 +86,7 @@ pub fn register(registry: &mut FunctionRegistry) {
let val_arg = args[0].clone().to_owned();
let path_arg = args[1].clone().to_owned();
let mut results = Vec::with_capacity(ctx.num_rows);

match path_arg {
Value::Scalar(Scalar::String(path)) => {
match parse_json_path(path.as_bytes()) {
Expand Down Expand Up @@ -460,6 +470,162 @@ pub fn register(registry: &mut FunctionRegistry) {
},
}))
});

registry.properties.insert(
"jq".to_string(),
FunctionProperty::default().kind(FunctionKind::SRF),
);
registry.register_function_factory("jq", |_, args_type| {
if args_type.len() != 2 {
return None;
}
if args_type[0].remove_nullable() != DataType::String {
return None;
}
if args_type[1].remove_nullable() != DataType::Variant && args_type[1] != DataType::Null {
return None;
}

Some(Arc::new(Function {
signature: FunctionSignature {
name: "jq".to_string(),
args_type: args_type.to_vec(),
return_type: DataType::Tuple(vec![DataType::Nullable(Box::new(DataType::Variant))]),
},
eval: FunctionEval::SRF {
eval: Box::new(|args, ctx, max_nums_per_row| {
let jq_filter_col = args[0].clone().to_owned();
let jq_filter = match jq_filter_col.index(0) {
Some(ScalarRef::String(s)) => s,
_ => {
ctx.set_error(0, "jq filter must be a scalar string");
return vec![];
}
};

let mut defs = ParseCtx::new(vec![]);
defs.insert_natives(jaq_core::core());
defs.insert_defs(jaq_std::std());
assert!(defs.errs.is_empty());
let (filter, errs) = jaq_parse::parse(jq_filter, jaq_parse::main());
if !errs.is_empty() {
ctx.set_error(0, errs[0].to_string());
return vec![];
}

let filter = defs.compile(filter.unwrap());
if defs.errs.len() > 0 {
let err_str = defs
.errs
.iter()
.map(|e| format!("err: {} location: {:?}", e.0, e.1))
.collect::<Vec<_>>()
.join("\n");
ctx.set_error(0, err_str);
return vec![];
}

let jaq_args = vec![];
// You can pass additional scalar inputs as args to the jq filter.
// This could be a useful enhancement, but leaving it out for mow.
let inputs = RcIter::new(core::iter::empty());
let jaq_ctx = Ctx::new(jaq_args, &inputs);

let json_arg = args[1].clone().to_owned();
(0..ctx.num_rows)
.map(|row| {
// with an SRF each row returns a Value::Column or Value::Scalar of results
// so it's sort of a pivot or an array of arrays, where each row returns
// a column representing multiple results for that row.
// if a row returns null, you return a null vec in a Value::Scalar.
let mut res_builder = BinaryColumnBuilder::with_capacity(0, 1);
let null_result = (Value::Scalar(Scalar::Tuple(vec![Scalar::Null])), 0);

match json_arg.index(row) {
Some(ScalarRef::Null) => {
return null_result;
}
Some(ScalarRef::Variant(v)) => {
let s = to_serde_json(v);
match s {
Err(e) => {
ctx.set_error(row, e.to_string());
return null_result;
}
Ok(s) => {
let jaq_val = Val::from(s);
let jaq_out = filter.run((jaq_ctx.clone(), jaq_val));

for res in jaq_out {
match res {
Err(err) => {
ctx.set_error(row, err.to_string());
return null_result;
}
Ok(res) => {
let res_json_str = jaq_val_to_json(&res);
res_builder.put_str(&res_json_str);
res_builder.commit_row();
}
};
}
}
}
}
None => {
return null_result;
}
_ => unreachable!(),
}

let res_col = Column::Variant(res_builder.build()).wrap_nullable(None);
let res_len = res_col.len();
max_nums_per_row[row] = std::cmp::max(max_nums_per_row[row], res_len);
(Value::Column(Column::Tuple(vec![res_col])), res_len)
})
.collect()
}),
},
}))
});
}

// This comes straight from the jaq source. It converts a Jaq val type to a JSON string.
// https://github.com/01mf02/jaq/blob/426fdab46c95e7ed0dadc5c049b3d83388271b1a/jaq/src/main.rs#L511
// there may be an opportunity to format directly into
// jsonb data so it doesn't need to be re-parsed.
fn jaq_val_to_json(val: &Val) -> String {
match val {
Val::Null => "null".to_string(),
Val::Bool(b) => b.to_string(),
Val::Num(n) => n.to_string(),
Val::Float(f) if f.is_finite() => f.to_string(),
Val::Float(_) => "null".to_string(),
Val::Int(i) => i.to_string(),
Val::Str(s) => format!("\"{}\"", s.to_string()),
Val::Arr(a) => {
let mut res = "[".to_string();
for (i, v) in a.iter().enumerate() {
if i > 0 {
res.push_str(", ");
}
res.push_str(&jaq_val_to_json(v));
}
res.push_str("]");
res
}
Val::Obj(o) => {
let mut res = "{".to_string();
for (i, (k, v)) in o.iter().enumerate() {
if i > 0 {
res.push_str(", ");
}
res.push_str(&format!("\"{}\": {}", k, jaq_val_to_json(v)));
}
res.push_str("}");
res
}
}
}

pub(crate) fn unnest_variant_array(
Expand Down
Loading

0 comments on commit 3721e11

Please sign in to comment.