Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

[NSE-167]Hashmap build opt for semi/anti/exists join #197

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,22 @@ case class ColumnarShuffledHashJoinExec(
}
}

val builder_type = {
if (condition.isDefined) 1
else {
joinType match {
case LeftSemi =>
3
case LeftAnti =>
3
case j: ExistenceJoin =>
3
case other =>
1
}
}
}

def buildCheck(): Unit = {
// build check for condition
val conditionExpr: Expression = condition.orNull
Expand Down Expand Up @@ -180,7 +196,8 @@ case class ColumnarShuffledHashJoinExec(
ColumnarCodegenContext(
inputSchema,
null,
ColumnarConditionedProbeJoin.prepareHashBuildFunction(buildKeyExprs, buildPlan.output, 1))
ColumnarConditionedProbeJoin
.prepareHashBuildFunction(buildKeyExprs, buildPlan.output, builder_type))
}

override def supportColumnarCodegen: Boolean = true
Expand Down Expand Up @@ -256,7 +273,7 @@ case class ColumnarShuffledHashJoinExec(
val hashRelationBatchHolder: ListBuffer[ColumnarBatch] = ListBuffer()
val hash_relation_function =
ColumnarConditionedProbeJoin
.prepareHashBuildFunction(buildKeyExprs, buildPlan.output, 1)
.prepareHashBuildFunction(buildKeyExprs, buildPlan.output, builder_type)
val hash_relation_schema = ConverterUtils.toArrowSchema(buildPlan.output)
val hash_relation_expr =
TreeBuilder.makeExpression(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ object ColumnarConditionedProbeJoin extends Logging {
def prepareHashBuildFunction(
buildKeys: Seq[Expression],
buildInputAttributes: Seq[Attribute],
builder_type: Int = 0,
builder_type: Int = 1,
is_broadcast: Boolean = false): TreeNode = {
val buildInputFieldList: List[Field] = buildInputAttributes.toList.map(attr => {
Field
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@ class HashRelationKernel::Impl {
std::dynamic_pointer_cast<gandiva::LiteralNode>(parameter_nodes[0])->holder());
builder_type_ = std::stoi(builder_type_str);
}
if (builder_type_ == 3) {
// This is for using unsafeHashMap while with skipDuplication strategy
semi_ = true;
builder_type_ = 1;
}
if (builder_type_ == 0) {
// builder_type_ == 0 will be abandoned in near future, won't support
// decimal here.
Expand Down Expand Up @@ -227,11 +232,11 @@ class HashRelationKernel::Impl {
PROCESS(arrow::Decimal128Type)
if (project_outputs.size() == 1) {
switch (project_outputs[0]->type_id()) {
#define PROCESS(InType) \
case TypeTraits<InType>::type_id: { \
using ArrayType = precompile::TypeTraits<InType>::ArrayType; \
auto typed_key_arr = std::make_shared<ArrayType>(project_outputs[0]); \
RETURN_NOT_OK(hash_relation_->AppendKeyColumn(key_array, typed_key_arr)); \
#define PROCESS(InType) \
case TypeTraits<InType>::type_id: { \
using ArrayType = precompile::TypeTraits<InType>::ArrayType; \
auto typed_key_arr = std::make_shared<ArrayType>(project_outputs[0]); \
RETURN_NOT_OK(hash_relation_->AppendKeyColumn(key_array, typed_key_arr, semi_)); \
} break;
PROCESS_SUPPORTED_TYPES(PROCESS)
#undef PROCESS
Expand All @@ -252,7 +257,7 @@ class HashRelationKernel::Impl {
RETURN_NOT_OK(MakeUnsafeArray(arr->type(), i++, arr, &payload));
payloads.push_back(payload);
}
RETURN_NOT_OK(hash_relation_->AppendKeyColumn(key_array, payloads));
RETURN_NOT_OK(hash_relation_->AppendKeyColumn(key_array, payloads, semi_));
}
}
}
Expand Down Expand Up @@ -281,6 +286,7 @@ class HashRelationKernel::Impl {
std::vector<std::shared_ptr<arrow::Array>> key_hash_cached_;
uint64_t num_total_cached_ = 0;
int builder_type_ = 0;
bool semi_ = false;
int key_size_ = -1; // If key_size_ != 0, key will be stored directly in key_map

class HashRelationResultIterator : public ResultIterator<HashRelation> {
Expand Down
68 changes: 58 additions & 10 deletions native-sql-engine/cpp/src/codegen/common/hash_relation.h
Original file line number Diff line number Diff line change
Expand Up @@ -167,9 +167,9 @@ class HashRelation {
return arrow::Status::Invalid("Error minimizing hash table");
}

arrow::Status AppendKeyColumn(
std::shared_ptr<arrow::Array> in,
const std::vector<std::shared_ptr<UnsafeArray>>& payloads) {
arrow::Status AppendKeyColumn(std::shared_ptr<arrow::Array> in,
const std::vector<std::shared_ptr<UnsafeArray>>& payloads,
bool semi = false) {
if (hash_table_ == nullptr) {
throw std::runtime_error("HashRelation Get failed, hash_table is null.");
}
Expand All @@ -184,7 +184,11 @@ class HashRelation {
// chendi: Since spark won't join rows contain null, we will skip null
// row.
if (payload->isNullExists()) continue;
RETURN_NOT_OK(Insert(typed_array->GetView(i), payload, num_arrays_, i));
if (!semi) {
RETURN_NOT_OK(Insert(typed_array->GetView(i), payload, num_arrays_, i));
} else {
RETURN_NOT_OK(InsertSkipDup(typed_array->GetView(i), payload, num_arrays_, i));
}
}

num_arrays_++;
Expand All @@ -196,7 +200,8 @@ class HashRelation {
typename std::enable_if_t<!std::is_same<KeyArrayType, StringArray>::value>* =
nullptr>
arrow::Status AppendKeyColumn(std::shared_ptr<arrow::Array> in,
std::shared_ptr<KeyArrayType> original_key) {
std::shared_ptr<KeyArrayType> original_key,
bool semi = false) {
if (hash_table_ == nullptr) {
throw std::runtime_error("HashRelation Get failed, hash_table is null.");
}
Expand All @@ -212,8 +217,13 @@ class HashRelation {
if (original_key->IsNull(i)) {
RETURN_NOT_OK(InsertNull(num_arrays_, i));
} else {
RETURN_NOT_OK(
Insert(typed_array->GetView(i), original_key->GetView(i), num_arrays_, i));
if (!semi) {
RETURN_NOT_OK(Insert(typed_array->GetView(i), original_key->GetView(i),
num_arrays_, i));
} else {
RETURN_NOT_OK(InsertSkipDup(typed_array->GetView(i), original_key->GetView(i),
num_arrays_, i));
}
}
}
}
Expand All @@ -224,7 +234,8 @@ class HashRelation {
}

arrow::Status AppendKeyColumn(std::shared_ptr<arrow::Array> in,
std::shared_ptr<StringArray> original_key) {
std::shared_ptr<StringArray> original_key,
bool semi = false) {
if (hash_table_ == nullptr) {
throw std::runtime_error("HashRelation Get failed, hash_table is null.");
}
Expand All @@ -242,8 +253,13 @@ class HashRelation {
RETURN_NOT_OK(InsertNull(num_arrays_, i));
} else {
auto str = original_key->GetString(i);
RETURN_NOT_OK(
Insert(typed_array->GetView(i), str.data(), str.size(), num_arrays_, i));
if (!semi) {
RETURN_NOT_OK(
Insert(typed_array->GetView(i), str.data(), str.size(), num_arrays_, i));
} else {
RETURN_NOT_OK(InsertSkipDup(typed_array->GetView(i), str.data(), str.size(),
num_arrays_, i));
}
}
}
}
Expand Down Expand Up @@ -462,6 +478,38 @@ class HashRelation {
return arrow::Status::OK();
}

arrow::Status InsertSkipDup(int32_t v, std::shared_ptr<UnsafeRow> payload,
uint32_t array_id, uint32_t id) {
assert(hash_table_ != nullptr);
auto index = ArrayItemIndex(array_id, id);
if (!appendNewKey(hash_table_, payload.get(), v, (char*)&index,
sizeof(ArrayItemIndex))) {
return arrow::Status::CapacityError("Insert to HashMap failed.");
}
return arrow::Status::OK();
}

template <typename CType>
arrow::Status InsertSkipDup(int32_t v, CType payload, uint32_t array_id, uint32_t id) {
assert(hash_table_ != nullptr);
auto index = ArrayItemIndex(array_id, id);
if (!appendNewKey(hash_table_, payload, v, (char*)&index, sizeof(ArrayItemIndex))) {
return arrow::Status::CapacityError("Insert to HashMap failed.");
}
return arrow::Status::OK();
}

arrow::Status InsertSkipDup(int32_t v, const char* payload, size_t payload_len,
uint32_t array_id, uint32_t id) {
assert(hash_table_ != nullptr);
auto index = ArrayItemIndex(array_id, id);
if (!appendNewKey(hash_table_, payload, payload_len, v, (char*)&index,
sizeof(ArrayItemIndex))) {
return arrow::Status::CapacityError("Insert to HashMap failed.");
}
return arrow::Status::OK();
}

arrow::Status InsertNull(uint32_t array_id, uint32_t id) {
// since vanilla spark doesn't support match null in join
// we can directly retun to optimize
Expand Down
Loading