Skip to content

Commit

Permalink
Support HashJoin with default implementation of memory hash table
Browse files Browse the repository at this point in the history
Signed-off-by: Lantao Jin <ltjin@amazon.com>
  • Loading branch information
LantaoJin committed Sep 3, 2024
1 parent 8066f16 commit 29947d3
Show file tree
Hide file tree
Showing 9 changed files with 1,340 additions and 240 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,6 @@ public void push() {
environment = new TypeEnvironment(environment);
}

public void cleanFields() {
environment.clearAllFields();
}

/**
* Return current environment.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,16 @@

package org.opensearch.sql.planner;

import static org.opensearch.sql.planner.physical.join.JoinOperator.BuildSide.BuildLeft;
import static org.opensearch.sql.planner.physical.join.JoinOperator.BuildSide.BuildRight;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.sql.ast.tree.Join;
import org.opensearch.sql.common.utils.StringUtils;
import org.opensearch.sql.exception.SemanticCheckException;
import org.opensearch.sql.executor.pagination.PlanSerializer;
Expand Down Expand Up @@ -54,6 +58,7 @@
import org.opensearch.sql.planner.physical.ValuesOperator;
import org.opensearch.sql.planner.physical.WindowOperator;
import org.opensearch.sql.planner.physical.join.HashJoinOperator;
import org.opensearch.sql.planner.physical.join.JoinOperator;
import org.opensearch.sql.planner.physical.join.JoinPredicatesHelper;
import org.opensearch.sql.planner.physical.join.NestedLoopJoinOperator;
import org.opensearch.sql.storage.read.TableScanBuilder;
Expand Down Expand Up @@ -205,7 +210,8 @@ && canEvaluate((ReferenceExpression) pair.getRight(), join.getRight())) {
}
}

// 1. Determining Join with Hint. TODO
// 1. Determining Join with Hint and build side.
JoinOperator.BuildSide buildSide = determineBuildSide(join.getType());
// 2. Pick hash join if it is an equi-join and hash join supported
if (!equiJoinKeys.isEmpty()) {
Pair<List<Expression>, List<Expression>> unzipped = JoinPredicatesHelper.unzip(equiJoinKeys);
Expand All @@ -217,6 +223,7 @@ && canEvaluate((ReferenceExpression) pair.getRight(), join.getRight())) {
leftKeys,
rightKeys,
join.getType(),
buildSide,
visitRelation((LogicalRelation) join.getLeft(), ctx),
visitRelation((LogicalRelation) join.getRight(), ctx),
Optional.empty());
Expand All @@ -227,10 +234,22 @@ && canEvaluate((ReferenceExpression) pair.getRight(), join.getRight())) {
visitRelation((LogicalRelation) join.getLeft(), ctx),
visitRelation((LogicalRelation) join.getRight(), ctx),
join.getType(),
buildSide,
join.getCondition());
}
}

/**
* Build side is right by default (except RightOuter). TODO set the smaller side as the build side
* TODO set build side from hint if provided
*
* @param joinType Join type
* @return Build side
*/
private JoinOperator.BuildSide determineBuildSide(Join.JoinType joinType) {
return joinType == Join.JoinType.RIGHT ? BuildLeft : BuildRight;
}

/** Return true if the reference can be evaluated in relation */
private boolean canEvaluate(ReferenceExpression expr, LogicalPlan plan) {
if (plan instanceof LogicalRelation relation) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package org.opensearch.sql.planner.physical.join;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.opensearch.sql.data.model.ExprValue;

public class DefaultHashedRelation implements HashedRelation, Serializable {

private final Map<ExprValue, List<ExprValue>> map = new HashMap<>();
private int numKeys;
private int numValues;

@Override
public List<ExprValue> get(ExprValue key) {
return map.get(key);
}

@Override
public ExprValue getValue(ExprValue key) {
List<ExprValue> values = map.get(key);
return values != null && !values.isEmpty() ? values.getFirst() : null;
}

@Override
public boolean containsKey(ExprValue key) {
return map.containsKey(key);
}

@Override
public Iterator<ExprValue> keyIterator() {
return map.keySet().iterator();
}

@Override
public boolean isUniqueKey() {
return numKeys == numValues;
}

@Override
public void close() {
map.clear();
}

@Override
public void put(ExprValue key, ExprValue value) {
map.computeIfAbsent(key, k -> new ArrayList<>()).add(value);
numKeys++;
numValues++;
}
}
Loading

0 comments on commit 29947d3

Please sign in to comment.