Skip to content

Commit

Permalink
Bug fixs
Browse files Browse the repository at this point in the history
merge to 2d4cc9e1358c980b4f726e17d036639bc31127aa (#188)
contains:
    first_value with PRECEDING LEFT and NON-PRECEDING RIGHT rewrite error and count* materialize SlotDescriptor
    error when referring to the slot of the current query and subquery Simultaneously.

    fix join and count(*) materialize SlotDescriptor error.

    fix materialize scannode's conjuncts bug.

    remove no used materialization work.

    it have to evaluate orderby in subquery because we limit the number of rows returned by subquery.

    the method of judging limit is wrong.

    user info is missing when retrying to call load check.

    It's wrong to pass aggregate function when it's param is not materialized.

    InsertStmt does not pass the session param to observer.
  • Loading branch information
morningman authored Apr 11, 2018
1 parent 5de798f commit 6cf2fb4
Show file tree
Hide file tree
Showing 12 changed files with 65 additions and 43 deletions.
2 changes: 2 additions & 0 deletions be/src/http/action/mini_load.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,8 @@ Status MiniLoadAction::check_auth(HttpRequest* http_req) {
return status;
}
client->loadCheck(res, req);
_user.assign(req.user);
_cluster.assign(cluster);
}
} catch (apache::thrift::TException& e) {
// failed when retry.
Expand Down
8 changes: 4 additions & 4 deletions fe/src/com/baidu/palo/analysis/AggregateInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -297,8 +297,8 @@ private void createDistinctAggInfo(

public ArrayList<FunctionCallExpr> getMaterializedAggregateExprs() {
ArrayList<FunctionCallExpr> result = Lists.newArrayList();
for (Integer i : materializedAggregateSlots_) {
result.add(aggregateExprs_.get(i));
for (Integer i: materializedSlots_) {
result.add(aggregateExprs_.get(i));
}
return result;
}
Expand Down Expand Up @@ -725,8 +725,8 @@ public void materializeRequiredSlots(Analyzer analyzer, ExprSubstitutionMap smap
// over query statements, if aggregate functions contain count(*), now
// materialize all slots this SelectStmt maps.
// chenhao added.
if (hasCountStar) {
resolvedExprs = smap.getRhs();
if (hasCountStar && smap != null && smap.size() > 0) {
resolvedExprs.addAll(smap.getRhs());
}
analyzer.materializeSlots(resolvedExprs);

Expand Down
2 changes: 1 addition & 1 deletion fe/src/com/baidu/palo/analysis/AnalyticExpr.java
Original file line number Diff line number Diff line change
Expand Up @@ -660,7 +660,7 @@ private void standardize(Analyzer analyzer) throws AnalysisException {
// -1 indicates that no NULL values are inserted even though we set the end
// bound to the start bound (which is PRECEDING) below; this is different from
// the default behavior of windows with an end bound PRECEDING.
paramExprs.add(new DecimalLiteral(BigDecimal.valueOf(-1)));
paramExprs.add(new IntLiteral(-1, Type.BIGINT));
}

window = new AnalyticWindow(window.getType(),
Expand Down
4 changes: 4 additions & 0 deletions fe/src/com/baidu/palo/analysis/LimitElement.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ public long getOffset() {
return offset;
}

public boolean hasOffset() {
return offset != 0;
}

public String toSql() {
if (limit == -1) {
return "";
Expand Down
68 changes: 35 additions & 33 deletions fe/src/com/baidu/palo/analysis/QueryStmt.java
Original file line number Diff line number Diff line change
Expand Up @@ -156,20 +156,20 @@ public List<TupleId> getCorrelatedTupleIds(Analyzer analyzer)
collectTableRefs(tblRefs);
for (TableRef tblRef: tblRefs) {
if (absoluteRef == null && !tblRef.isRelative()) absoluteRef = tblRef;
if (tblRef.isCorrelated()) {
/*
// Check if the correlated table ref is rooted at a tuple descriptor from within
// this query stmt. If so, the correlation is contained within this stmt
// and the table ref does not conflict with absolute refs.
CollectionTableRef t = (CollectionTableRef) tblRef;
Preconditions.checkState(t.getResolvedPath().isRootedAtTuple());
// This check relies on tblRefs being in depth-first order.
if (!tblRefIds.contains(t.getResolvedPath().getRootDesc().getId())) {
if (correlatedRef == null) correlatedRef = tblRef;
correlatedTupleIds.add(t.getResolvedPath().getRootDesc().getId());
}
*/
}
/*if (tblRef.isCorrelated()) {
*
* // Check if the correlated table ref is rooted at a tuple descriptor from within
* // this query stmt. If so, the correlation is contained within this stmt
* // and the table ref does not conflict with absolute refs.
* CollectionTableRef t = (CollectionTableRef) tblRef;
* Preconditions.checkState(t.getResolvedPath().isRootedAtTuple());
* // This check relies on tblRefs being in depth-first order.
* if (!tblRefIds.contains(t.getResolvedPath().getRootDesc().getId())) {
* if (correlatedRef == null) correlatedRef = tblRef;
* correlatedTupleIds.add(t.getResolvedPath().getRootDesc().getId());
* }
*
}*/
if (correlatedRef != null && absoluteRef != null) {
throw new AnalysisException(String.format(
"Nested query is illegal because it contains a table reference '%s' " +
Expand Down Expand Up @@ -234,23 +234,25 @@ protected void createSortInfo(Analyzer analyzer) throws AnalysisException {
sortInfo = new SortInfo(orderingExprs, isAscOrder, nullsFirstParams);
// order by w/o limit and offset in inline views, union operands and insert statements
// are ignored.
if (!hasLimit() && !hasOffset() && !analyzer.isRootAnalyzer()) {
evaluateOrderBy = false;
// Return a warning that the order by was ignored.
StringBuilder strBuilder = new StringBuilder();
strBuilder.append("Ignoring ORDER BY clause without LIMIT or OFFSET: ");
strBuilder.append("ORDER BY ");
strBuilder.append(orderByElements.get(0).toSql());
for (int i = 1; i < orderByElements.size(); ++i) {
strBuilder.append(", ").append(orderByElements.get(i).toSql());
}
strBuilder.append(".\nAn ORDER BY appearing in a view, subquery, union operand, ");
strBuilder.append("or an insert/ctas statement has no effect on the query result ");
strBuilder.append("unless a LIMIT and/or OFFSET is used in conjunction ");
strBuilder.append("with the ORDER BY.");
} else {
evaluateOrderBy = true;
}
// TODO chenhao, open this when we don't limit rows subquery returns by SortNode.
/*if (!hasLimit() && !hasOffset() && !analyzer.isRootAnalyzer()) {
* evaluateOrderBy = false;
* // Return a warning that the order by was ignored.
* StringBuilder strBuilder = new StringBuilder();
* strBuilder.append("Ignoring ORDER BY clause without LIMIT or OFFSET: ");
* strBuilder.append("ORDER BY ");
* strBuilder.append(orderByElements.get(0).toSql());
* for (int i = 1; i < orderByElements.size(); ++i) {
* strBuilder.append(", ").append(orderByElements.get(i).toSql());
* }
* strBuilder.append(".\nAn ORDER BY appearing in a view, subquery, union operand, ");
* strBuilder.append("or an insert/ctas statement has no effect on the query result ");
* strBuilder.append("unless a LIMIT and/or OFFSET is used in conjunction ");
* strBuilder.append("with the ORDER BY.");
* } else {
*/
evaluateOrderBy = true;
//}
}

/**
Expand Down Expand Up @@ -400,8 +402,8 @@ public void removeOrderByElements() {
public boolean hasOrderByClause() {
return orderByElements != null;
}
public boolean hasLimit() { return limitElement != null; }
public boolean hasOffset() { return limitElement != null && limitElement.getOffset() != 0; }
public boolean hasLimit() { return limitElement != null && limitElement.hasLimit(); }
public boolean hasOffset() { return limitElement != null && limitElement.hasOffset(); }

public long getLimit() {
return limitElement.getLimit();
Expand Down
2 changes: 1 addition & 1 deletion fe/src/com/baidu/palo/analysis/SelectStmt.java
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,7 @@ public void materializeRequiredSlots(Analyzer analyzer) throws AnalysisException
analyzer.getUnassignedConjuncts(getTableRefIds(), true);
List<Expr> unassignedJoinConjuncts = Lists.newArrayList();
for (Expr e: unassigned) {
if (analyzer.evalByJoin(e)) {
if (analyzer.evalAfterJoin(e)) {
unassignedJoinConjuncts.add(e);
}
}
Expand Down
5 changes: 2 additions & 3 deletions fe/src/com/baidu/palo/planner/AggregationNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -236,8 +236,7 @@ protected void toThrift(TPlanNode msg) {

List<TExpr> aggregateFunctions = Lists.newArrayList();
// only serialize agg exprs that are being materialized
//for (FunctionCallExpr e: aggInfo.getMaterializedAggregateExprs()) {
for (FunctionCallExpr e : aggInfo.getAggregateExprs()) {
for (FunctionCallExpr e: aggInfo.getMaterializedAggregateExprs()) {
aggregateFunctions.add(e.treeToThrift());
}
msg.agg_node =
Expand All @@ -254,7 +253,7 @@ protected void toThrift(TPlanNode msg) {
@Override
protected String getNodeExplainString(String detailPrefix, TExplainLevel detailLevel) {
StringBuilder output = new StringBuilder();
if (aggInfo.getAggregateExprs() != null && aggInfo.getAggregateExprs().size() > 0) {
if (aggInfo.getAggregateExprs() != null && aggInfo.getMaterializedAggregateExprs().size() > 0) {
output.append(detailPrefix + "output: ").append(
getExplainString(aggInfo.getAggregateExprs()) + "\n");
}
Expand Down
3 changes: 2 additions & 1 deletion fe/src/com/baidu/palo/planner/Planner.java
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,9 @@ public void createPlanFragments(StatementBase statment, Analyzer analyzer, TQuer
}
}

// TODO chenhao16 , no used materialization work
// compute referenced slots before calling computeMemLayout()
analyzer.markRefdSlots(analyzer, singleNodePlan, resultExprs, null);
//analyzer.markRefdSlots(analyzer, singleNodePlan, resultExprs, null);

setResultExprScale(analyzer, queryStmt.getResultExprs());

Expand Down
3 changes: 3 additions & 0 deletions fe/src/com/baidu/palo/planner/SingleNodePlanner.java
Original file line number Diff line number Diff line change
Expand Up @@ -1140,6 +1140,9 @@ private PlanNode createScanNode(Analyzer analyzer, TableRef tblRef)
}
// assignConjuncts(scanNode, analyzer);
scanNode.init(analyzer);
// TODO chenhao16 add
// materialize conjuncts in where
analyzer.materializeSlots(scanNode.getConjuncts());
scanNodes.add(scanNode);
return scanNode;
}
Expand Down
7 changes: 7 additions & 0 deletions fe/src/com/baidu/palo/qe/ConnectProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,13 @@ public TMasterOpResult proxyExecute(TMasterOpRequest request) {
if (request.isSetResourceInfo()) {
ctx.getSessionVariable().setResourceGroup(request.getResourceInfo().getGroup());
}
if (request.isSetExecMemLimit()) {
ctx.getSessionVariable().setMaxExecMemByte(request.getExecMemLimit());
}
if (request.isSetQueryTimeout()) {
ctx.getSessionVariable().setQueryTimeoutS(request.getQueryTimeout());
}

ctx.setThreadLocalInfo();

StmtExecutor executor = null;
Expand Down
2 changes: 2 additions & 0 deletions fe/src/com/baidu/palo/qe/MasterOpExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ private void forward() throws Exception {
params.setUser(ctx.getUser());
params.setDb(ctx.getDatabase());
params.setResourceInfo(ctx.toResourceCtx());
params.setExecMemLimit(ctx.getSessionVariable().getMaxExecMemByte());
params.setQueryTimeout(ctx.getSessionVariable().getQueryTimeoutS());

LOG.info("Forward statement {} to Master {}", originStmt, thriftAddress);

Expand Down
2 changes: 2 additions & 0 deletions gensrc/thrift/FrontendService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,8 @@ struct TMasterOpRequest {
3: required string sql
4: optional Types.TResourceInfo resourceInfo
5: optional string cluster
6: optional i64 execMemLimit
7: optional i32 queryTimeout
}

struct TColumnDefinition {
Expand Down

0 comments on commit 6cf2fb4

Please sign in to comment.