Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support json_extract(column, column, ...) #8555

Merged
merged 9 commits into from
Jan 9, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
311 changes: 171 additions & 140 deletions dbms/src/Functions/FunctionsJson.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,20 +82,27 @@ class FunctionJsonExtract : public IFunction
String getName() const override { return name; }

size_t getNumberOfArguments() const override { return 0; }
bool isVariadic() const override { return true; }

bool useDefaultImplementationForNulls() const override { return false; }
bool isVariadic() const override { return true; }
bool useDefaultImplementationForConstants() const override { return true; }

DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override
{
if unlikely (arguments.size() < 2)
{
throw Exception(
fmt::format("Illegal arguments count {} of function {}", arguments.size(), getName()),
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
}
for (const auto & arg : arguments)
{
if (!arg->onlyNull())
{
const auto * nested_arg_type = removeNullable(arg).get();
if unlikely (!nested_arg_type->isStringOrFixedString())
throw Exception(
"Illegal type " + arg->getName() + " of argument of function " + getName(),
fmt::format("Illegal type {} of argument of function {}", arg->getName(), getName()),
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
}
}
Expand All @@ -105,197 +112,221 @@ class FunctionJsonExtract : public IFunction
void executeImpl(Block & block, const ColumnNumbers & arguments, size_t result) const override
{
size_t rows = block.rows();
auto & res_col = block.getByPosition(result).column;
/// First check if JsonObject is only null, if so, result is null, no need to parse path exprs even if path is
const auto * json_column = block.getByPosition(arguments[0]).column.get();
bool all_null = false;
bool const_json = false;
const ColumnString * source_data_column_ptr;
const ColumnNullable * source_nullable_column_ptr = nullptr;
if unlikely (json_column->onlyNull())
{
block.getByPosition(result).column = block.getByPosition(result).type->createColumnConst(rows, Null());
res_col = block.getByPosition(result).type->createColumnConst(rows, Null());
return;
}
else if (const auto * const_nullable_col = checkAndGetColumnConst<ColumnNullable>(json_column))
{
const_json = true;
json_column = const_nullable_col->getDataColumnPtr().get();
}
else if (const auto * const_col = checkAndGetColumnConst<ColumnString>(json_column))
if (json_column->isColumnNullable())
{
SeaRise marked this conversation as resolved.
Show resolved Hide resolved
const_json = true;
json_column = const_col->getDataColumnPtr().get();
}

if (const auto * nullable_col = checkAndGetColumn<ColumnNullable>(json_column))
{
source_nullable_column_ptr = nullable_col;
all_null = isColumnOnlyNull(nullable_col); /// changes
source_data_column_ptr = checkAndGetColumn<ColumnString>(nullable_col->getNestedColumnPtr().get());
if unlikely (!source_data_column_ptr)
throw Exception(
fmt::format("Illegal column {} of argument of function {}", json_column->getName(), getName()),
ErrorCodes::ILLEGAL_COLUMN);
}
else if (const auto * string_col = checkAndGetColumn<ColumnString>(json_column))
{
source_data_column_ptr = string_col;
}
else
{
throw Exception(
fmt::format("Illegal column {} of argument of function {}", json_column->getName(), getName()),
ErrorCodes::ILLEGAL_COLUMN);
const auto & column_nullable = static_cast<const ColumnNullable &>(*json_column);
const auto & null_map = column_nullable.getNullMapData();
auto null_count = countBytesInFilter(null_map.data(), null_map.size());
if (null_count == rows)
{
res_col = block.getByPosition(result).type->createColumnConst(rows, Null());
SeaRise marked this conversation as resolved.
Show resolved Hide resolved
SeaRise marked this conversation as resolved.
Show resolved Hide resolved
return;
}
}
SeaRise marked this conversation as resolved.
Show resolved Hide resolved

if unlikely (all_null)
{
block.getByPosition(result).column = block.getByPosition(result).type->createColumnConst(rows, Null());
return;
}
auto nested_block = createBlockWithNestedColumns(block, arguments);
auto json_source = createDynamicStringSource(*nested_block.getByPosition(arguments[0]).column);

bool all_path_arguments_constants = true;
size_t arguments_size = arguments.size();
RUNTIME_CHECK(arguments_size > 1);
for (size_t i = 0; i < arguments_size; ++i)
{
const auto & elem = block.getByPosition(arguments[i]);
if (i > 0)
all_path_arguments_constants &= elem.column->isColumnConst();
}

if unlikely (!all_path_arguments_constants)
throw Exception(
"None const type of json path argument of function " + getName(),
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);

/// Parses all paths
std::vector<JsonPathExprRefContainerPtr> path_expr_container_vec;
path_expr_container_vec.reserve(arguments_size - 1);
StringSources path_sources;
path_sources.reserve(arguments.size() - 1);
std::vector<const NullMap *> path_null_maps;
path_null_maps.reserve(arguments.size() - 1);
for (size_t i = 1; i < arguments_size; ++i)
{
const ColumnPtr column = block.getByPosition(arguments[i]).column;
if (column->onlyNull())
const auto & path_col = block.getByPosition(arguments[i]).column;
if (path_col->onlyNull())
{
block.getByPosition(result).column = block.getByPosition(result).type->createColumnConst(rows, Null());
SeaRise marked this conversation as resolved.
Show resolved Hide resolved
return;
}

const auto * nested_column = static_cast<const ColumnConst *>(column.get())->getDataColumnPtr().get();
StringRef path_str;
if (const auto * nullable_string_path_col = checkAndGetColumn<ColumnNullable>(nested_column))
{
if unlikely (nullable_string_path_col->isNullAt(0))
{
block.getByPosition(result).column
= block.getByPosition(result).type->createColumnConst(rows, Null());
return;
}
nested_column = nullable_string_path_col->getNestedColumnPtr().get();
}

if (const auto * col = checkAndGetColumn<ColumnString>(nested_column))
all_path_arguments_constants &= path_col->isColumnConst();
path_sources.push_back(createDynamicStringSource(*nested_block.getByPosition(arguments[i]).column));
if (path_col->isColumnNullable())
{
path_str = col->getDataAt(0);
const auto & path_column_nullable = static_cast<const ColumnNullable &>(*path_col);
path_null_maps.push_back(&path_column_nullable.getNullMapData());
}
else if (const auto * fixed_string_col = checkAndGetColumn<ColumnFixedString>(nested_column))
else
{
path_str = fixed_string_col->getDataAt(0);
path_null_maps.push_back(nullptr);
}
else
throw Exception(
fmt::format("Illegal column {} of argument of function {}", column->getName(), getName()),
ErrorCodes::ILLEGAL_COLUMN);

auto path_expr = JsonPathExpr::parseJsonPathExpr(path_str);
/// If any path_expr failed to parse, throw exception
if (!path_expr)
throw Exception(
fmt::format(
"Illegal json path expression {} of argument of function {}",
column->getName(),
getName()),
ErrorCodes::ILLEGAL_COLUMN);
path_expr_container_vec.push_back(std::make_unique<JsonPathExprRefContainer>(path_expr));
}

const ColumnPtr column = block.getByPosition(arguments[0]).column;
if (const_json)
if (json_column->isColumnNullable())
{
auto nullable_col
= calculateResultCol(source_data_column_ptr, source_nullable_column_ptr, path_expr_container_vec);
block.getByPosition(result).column = ColumnConst::create(std::move(nullable_col), rows);
const auto & column_nullable = static_cast<const ColumnNullable &>(*json_column);
const auto & null_map = column_nullable.getNullMapData();
if (all_path_arguments_constants)
{
res_col = doExecuteForConstPaths<true>(json_source, null_map, path_sources, rows);
}
else
{
res_col = doExecuteCommon<true>(json_source, null_map, path_sources, path_null_maps, rows);
}
}
else
{
block.getByPosition(result).column
= calculateResultCol(source_data_column_ptr, source_nullable_column_ptr, path_expr_container_vec);
if (all_path_arguments_constants)
{
res_col = doExecuteForConstPaths<false>(json_source, {}, path_sources, rows);
}
else
{
res_col = doExecuteCommon<false>(json_source, {}, path_sources, path_null_maps, rows);
}
}
}

private:
static MutableColumnPtr calculateResultCol(
const ColumnString * source_col,
const ColumnNullable * source_nullable_col,
std::vector<JsonPathExprRefContainerPtr> & path_expr_container_vec)
template <bool is_json_nullable>
MutableColumnPtr doExecuteForConstPaths(
const std::unique_ptr<IStringSource> & json_source,
const NullMap & null_map_json,
const StringSources & path_sources,
size_t rows) const
{
size_t rows = source_col->size();
const ColumnString::Chars_t & data_from = source_col->getChars();
const IColumn::Offsets & offsets_from = source_col->getOffsets();
#define FINISH_PER_ROW \
writeChar(0, write_buffer); \
offsets_to[row] = write_buffer.count(); \
json_source->next();

// build path expressions for const paths.
auto path_expr_container_vec = buildJsonPathExprContainer(path_sources);

auto col_to = ColumnString::create();
ColumnString::Chars_t & data_to = col_to->getChars();
ColumnString::Offsets & offsets_to = col_to->getOffsets();
offsets_to.resize(rows);
ColumnUInt8::MutablePtr col_null_map = ColumnUInt8::create(rows, 0);
ColumnUInt8::Container & vec_null_map = col_null_map->getData();
JsonBinary::JsonBinaryWriteBuffer write_buffer(data_to);
size_t current_offset = 0;
for (size_t i = 0; i < rows; ++i)
ColumnUInt8::Container & null_map_to = col_null_map->getData();
JsonBinary::JsonBinaryWriteBuffer write_buffer(data_to, rows);

for (size_t row = 0; row < rows; ++row)
{
size_t next_offset = offsets_from[i];
size_t data_length = next_offset - current_offset - 1;
bool found;
if unlikely (isNullJsonBinary(data_length))
{
found = false;
}
else
if constexpr (is_json_nullable)
{
JsonBinary json_binary(
data_from[current_offset],
StringRef(&data_from[current_offset + 1], data_length - 1));
found = json_binary.extract(path_expr_container_vec, write_buffer);
if (null_map_json[row])
{
FINISH_PER_ROW
null_map_to[row] = 1;
continue;
}
}
if (!found)
vec_null_map[i] = 1;
writeChar(0, write_buffer);
offsets_to[i] = write_buffer.count();
current_offset = next_offset;

const auto & json_val = json_source->getWhole();
assert(json_val.size > 0);
JsonBinary json_binary(json_val.data[0], StringRef(&json_val.data[1], json_val.size - 1));
if (!json_binary.extract(path_expr_container_vec, write_buffer))
null_map_to[row] = 1;
FINISH_PER_ROW
}
data_to.resize(write_buffer.count());
return ColumnNullable::create(std::move(col_to), std::move(col_null_map));

#undef FINISH_PER_ROW
}

template <bool is_json_nullable>
MutableColumnPtr doExecuteCommon(
const std::unique_ptr<IStringSource> & json_source,
const NullMap & null_map_json,
const StringSources & path_sources,
const std::vector<const NullMap *> & path_null_maps,
size_t rows) const
{
#define FINISH_PER_ROW \
writeChar(0, write_buffer); \
offsets_to[row] = write_buffer.count(); \
for (const auto & path_source : path_sources) \
{ \
assert(path_source); \
path_source->next(); \
} \
json_source->next();

if (source_nullable_col)
auto col_to = ColumnString::create();
ColumnString::Chars_t & data_to = col_to->getChars();
ColumnString::Offsets & offsets_to = col_to->getOffsets();
offsets_to.resize(rows);
ColumnUInt8::MutablePtr col_null_map = ColumnUInt8::create(rows, 0);
ColumnUInt8::Container & null_map_to = col_null_map->getData();
JsonBinary::JsonBinaryWriteBuffer write_buffer(data_to, rows);

for (size_t row = 0; row < rows; ++row)
{
const auto & source_null_map = source_nullable_col->getNullMapColumn().getData();
for (size_t i = 0, size = vec_null_map.size(); i < size; ++i)
if (source_null_map[i])
vec_null_map[i] = 1;
if constexpr (is_json_nullable)
{
if (null_map_json[row])
{
FINISH_PER_ROW
null_map_to[row] = 1;
continue;
}
}

bool has_null_path = false;
for (const auto * path_null_map : path_null_maps)
{
if (path_null_map && (*path_null_map)[row])
{
has_null_path = true;
break;
}
}
if (has_null_path)
{
FINISH_PER_ROW
null_map_to[row] = 1;
continue;
}

const auto & json_val = json_source->getWhole();
assert(json_val.size > 0);
JsonBinary json_binary(json_val.data[0], StringRef(&json_val.data[1], json_val.size - 1));
auto path_expr_container_vec = buildJsonPathExprContainer(path_sources);
if (!json_binary.extract(path_expr_container_vec, write_buffer))
null_map_to[row] = 1;
FINISH_PER_ROW
}
return ColumnNullable::create(std::move(col_to), std::move(col_null_map));

#undef FINISH_PER_ROW
}

inline static bool isColumnOnlyNull(const ColumnNullable * column)
std::vector<JsonPathExprRefContainerPtr> buildJsonPathExprContainer(const StringSources & path_sources) const
{
if unlikely (column->empty())
return false;
std::vector<JsonPathExprRefContainerPtr> path_expr_container_vec;
path_expr_container_vec.reserve(path_sources.size());
for (const auto & path_source : path_sources)
{
assert(path_source);
const auto & path_val = path_source->getWhole();
const auto & path_str_ref = StringRef{path_val.data, path_val.size};
auto path_expr = JsonPathExpr::parseJsonPathExpr(path_str_ref);
/// If path_expr failed to parse, throw exception
if unlikely (!path_expr)
throw Exception(
fmt::format(
"Illegal json path expression `{}` of function {}",
path_str_ref.toStringView(),
getName()),
ErrorCodes::ILLEGAL_COLUMN);

bool ret = true;
const auto & data = column->getNullMapColumn().getData();
auto size = data.size();
for (size_t i = 0; i < size; ++i)
ret &= (data[i] != 0);
return ret;
path_expr_container_vec.push_back(std::make_unique<JsonPathExprRefContainer>(path_expr));
}
return path_expr_container_vec;
}
};

Expand Down
Loading