Skip to content

Commit

Permalink
[yugabyte#2040] YSQL: Framework for expression pushdown part 2
Browse files Browse the repository at this point in the history
Summary:
Further extension of work done in 05a5386, which allows to execute Postgres code within TServer process.
We use this ability to execute Postgres expression generated by the planner where data are and hence reduce amount data sent across network.
In part 2 expression pushdown support for UPDATE statement was reworked, and support for sequential scan conditions was added.

Summary of changes:

 # Refined communication protocol to send Postgres expressions down to DocDB. Now protobuf can carry three separate lists:
 ## Conditional expressions (row filter). Evaluated against current row, if any evaluation result is false, row is ignored;
 ## Target expression. Evaluated against current row to calculate values to insert into the table or return back to Postgres, or both;
 ## List of columns referenced by A and/or B. Column reference consists of DocDB identifier to locate value in the current row, Postgres identifier, by which the expressions refer it and Postgres type info needed to convert DocDB value to Datum/isnull pair expected by Postgres code.

 # Created new class DocPgExprExecutor to encapsulate Postgres expression evaluation. An instance of that class is intended to live during the protobuf request's handling and optimized to evaluate same set of expressions against multiple rows having same schema (rows of the same table). Lifecycle of a DocPgExprExecutor consists of two phases: initialization and evaluation.
  During initialization phase caller is supposed to provide table schema as well as conditions, targets and column references extracted from the protocol message. Those elements are processed and stored  in internal data structures in a form optimized for fast evaluation. Elements provided during initialization phase can not be modified after evaluation phase begins.
  During evaluation phase caller provides the row and the storage for target values. The executor extracts the values and convert them to Postgres Datum/isnull pairs according to the column references. Then executor evaluates the conditions, and if any result is false the execution stops and result returned, target storage remains unchanged. If all they true, the executor evaluates target expressions and stores results.

 # Planner now has single function YbCanPushdownExpr to determine if DocDB can evaluate the expression. Same function collects column references along the way. A GUC variable yb_enable_expression_pushdown was introduced. If set to false, it makes YbCanPushdownExpr to always return false and effectively disable pushdown. The default is false for compatibility, if cluster undergoing rolling upgrade, plans without pushdown are compatible with older DocDB.
  Same logic that used to determine expression pushability was used for loosely related tasks, like analyze returning clause expression, determine if the statement is a "single row" a.k.a. "fast path". These logics are corrected.

 # Effectively reverts yugabyte#5257, see the comments on the issue for details.

Features not implemented, but potentially can make part 3:
- Push down conditions for UPDATE, DELETE statements, non-index conditions for index scans;
- Push down target expressions for scans, if that reduces returned row width;
- Parallel execution of expressions with pushed down conditions. They are expected to return much less bytes per second of results and Postgres instance collecting results less likely to become bottleneck;
- Eliminate other DocDB expression types used by Postgres. They are simple ones (columns, constants) and can be replaced by equivalent serialized Postgres expressions.

Test Plan:
New tests were added where pushdown is enabled for the session:

`ybd --java-test 'org.yb.pgsql.TestPgRegressPushdown'`

Tests updated to reflect changes:

`ybd --java-test 'org.yb.pgsql.TestPgRegressDml'`
`ybd --java-test 'org.yb.pgsql.TestPgRegressPlanner'`

For QA testing if upgrade from previous versions is not involved it is recommended to enable pushdown cluster-wide:

`yb-ctl create --ysql_pg_conf_csv='yb_enable_expression_pushdown=true'`

There might be differences in plans, but the query results are expected to be the same.

Reviewers: neil, dsrinivasan, myang, yguan, jason, dmitry, mihnea

Reviewed By: jason, mihnea

Subscribers: mbautin, smishra, zyu, yql

Differential Revision: https://phabricator.dev.yugabyte.com/D13038
  • Loading branch information
andrei-mart committed Feb 16, 2022
1 parent 60b4711 commit 3200786
Show file tree
Hide file tree
Showing 69 changed files with 6,866 additions and 1,394 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright (c) YugaByte, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
// in compliance with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations
// under the License.
//
package org.yb.pgsql;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.yb.util.YBTestRunnerNonTsanOnly;

/**
* Runs the pg_regress test suite on YB code.
*/
@RunWith(value=YBTestRunnerNonTsanOnly.class)
public class TestPgRegressPushdown extends BasePgSQLTest {
@Override
public int getTestMethodTimeoutSec() {
return 1800;
}

@Test
public void testPgRegressPushdown() throws Exception {
runPgRegressTest("yb_pg_pushdown_serial_schedule");
}
}
64 changes: 28 additions & 36 deletions src/postgres/src/backend/catalog/yb_catalog/yb_catalog_version.c
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ bool YbIncrementMasterCatalogVersionTableEntry(bool is_breaking_change)
return false;

YBCPgStatement update_stmt = NULL;
YBCPgTypeAttrs type_attrs = { 0 };
YBCPgExpr yb_expr;
HeapTuple tuple = NULL;
Relation rel = RelationIdGetRelation(YBCatalogVersionRelationId);

Expand All @@ -107,8 +109,8 @@ bool YbIncrementMasterCatalogVersionTableEntry(bool is_breaking_change)
tuple = heap_form_tuple(RelationGetDescr(rel), values, nulls);

Datum ybctid = YBCGetYBTupleIdFromTuple(rel,
tuple,
RelationGetDescr(rel));
tuple,
RelationGetDescr(rel));

/* Bind ybctid to identify the current row. */
YBCPgExpr ybctid_expr = YBCNewConstant(update_stmt, BYTEAOID, InvalidOid, ybctid,
Expand All @@ -118,54 +120,44 @@ bool YbIncrementMasterCatalogVersionTableEntry(bool is_breaking_change)
/* Set expression c = c + 1 for current version attribute. */
AttrNumber attnum = Anum_pg_yb_catalog_version_current_version;
Var *arg1 = makeVar(1,
attnum,
INT8OID,
0,
InvalidOid,
0);
attnum,
INT8OID,
0,
InvalidOid,
0);

Const *arg2 = makeConst(INT8OID,
0,
InvalidOid,
sizeof(int64),
(Datum) 1,
false,
true);
0,
InvalidOid,
sizeof(int64),
(Datum) 1,
false,
true);

List *args = list_make2(arg1, arg2);

FuncExpr *expr = makeFuncExpr(F_INT8PL,
INT8OID,
args,
InvalidOid,
InvalidOid,
COERCE_EXPLICIT_CALL);
INT8OID,
args,
InvalidOid,
InvalidOid,
COERCE_EXPLICIT_CALL);

/* INT8 OID. */
YBCPgExpr ybc_expr = YBCNewEvalSingleParamExprCall(update_stmt,
(Expr *) expr,
attnum,
INT8OID,
0,
InvalidOid);
YBCPgExpr ybc_expr = YBCNewEvalExprCall(update_stmt, (Expr *) expr);

HandleYBStatus(YBCPgDmlAssignColumn(update_stmt, attnum, ybc_expr));
yb_expr = YBCNewColumnRef(update_stmt,
attnum,
INT8OID,
InvalidOid,
&type_attrs);
HandleYBStatus(YbPgDmlAppendColumnRef(update_stmt, yb_expr));

/* If breaking change set the latest breaking version to the same expression. */
if (is_breaking_change)
{
YBExprParamDesc params[2];
params[0].attno = attnum + 1;
params[0].typid = INT8OID;
params[0].typmod = 0;
params[0].collid = InvalidOid;

params[1].attno = attnum;
params[1].typid = INT8OID;
params[1].typmod = 0;
params[1].collid = InvalidOid;

YBCPgExpr ybc_expr = YBCNewEvalExprCall(update_stmt, (Expr *) expr, params, 2);
ybc_expr = YBCNewEvalExprCall(update_stmt, (Expr *) expr);
HandleYBStatus(YBCPgDmlAssignColumn(update_stmt, attnum + 1, ybc_expr));
}

Expand Down
4 changes: 0 additions & 4 deletions src/postgres/src/backend/catalog/yb_catalog/yb_type.c
Original file line number Diff line number Diff line change
Expand Up @@ -1250,10 +1250,6 @@ static const YBCPgTypeEntity YbTypeEntityTable[] = {
(YBCPgDatumToData)YbDatumToCStr,
(YBCPgDatumFromData)YbCStrToDatum },

{ ANYOID, YB_YQL_DATA_TYPE_INT32, true, sizeof(int32),
(YBCPgDatumToData)YbDatumToInt32,
(YBCPgDatumFromData)YbInt32ToDatum },

{ ANYARRAYOID, YB_YQL_DATA_TYPE_BINARY, false, -1,
(YBCPgDatumToData)YbDatumToBinary,
(YBCPgDatumFromData)YbBinaryToDatum },
Expand Down
2 changes: 2 additions & 0 deletions src/postgres/src/backend/commands/explain.c
Original file line number Diff line number Diff line change
Expand Up @@ -1702,6 +1702,8 @@ ExplainNode(PlanState *planstate, List *ancestors,
if (plan->qual)
show_instrumentation_count("Rows Removed by Filter", 1,
planstate, es);
show_scan_qual(((ForeignScan *) plan)->fdw_recheck_quals,
"Remote Filter", planstate, ancestors, es);
show_foreignscan_info((ForeignScanState *) planstate, es);
break;
case T_CustomScan:
Expand Down
1 change: 0 additions & 1 deletion src/postgres/src/backend/executor/execUtils.c
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,6 @@ CreateExecutorState(void)
estate->yb_exec_params.limit_offset = 0;
estate->yb_exec_params.limit_use_default = true;
estate->yb_exec_params.rowmark = -1;
estate->yb_can_batch_updates = false;
estate->yb_exec_params.is_index_backfill = false;
/*
* Pointer to the query read hybrid time. This pointer is passed
Expand Down
6 changes: 0 additions & 6 deletions src/postgres/src/backend/executor/functions.c
Original file line number Diff line number Diff line change
Expand Up @@ -836,12 +836,6 @@ postquel_start(execution_state *es, SQLFunctionCachePtr fcache)
else
eflags = 0; /* default run-to-completion flags */
ExecutorStart(es->qd, eflags);

/*
* Since PGSQL functions don't require the row count from updates, we
* can allow for batched updates.
*/
es->qd->estate->yb_can_batch_updates = true;
}

es->status = F_EXEC_RUN;
Expand Down
9 changes: 7 additions & 2 deletions src/postgres/src/backend/executor/nodeModifyTable.c
Original file line number Diff line number Diff line change
Expand Up @@ -1453,8 +1453,13 @@ ExecUpdate(ModifyTableState *mtstate,
}
else
{
row_found = YBCExecuteUpdate(
resultRelationDesc, planSlot, tuple, estate, mtstate, actualUpdatedCols);
row_found = YBCExecuteUpdate(resultRelationDesc,
planSlot,
tuple,
estate,
mtstate,
actualUpdatedCols,
canSetTag);
}

bms_free(extraUpdatedCols);
Expand Down
Loading

0 comments on commit 3200786

Please sign in to comment.