From a1b32ba9be5af0594cdbe5cdc3726394fc2b76f5 Mon Sep 17 00:00:00 2001 From: Jason Kim Date: Fri, 26 Jun 2020 20:35:34 -0700 Subject: [PATCH] ysql: implement backfill for index (#2301) Summary: Implement core functionality for the backfill part of YSQL multi-stage create index. Do the following checked items: - [x] Add `BACKFILL INDEX` grammar for postgres - [x] Establish basic communication from tserver to postgres - [x] Use ancient write time for inserting rows for backfill - [x] Use supplied read time for selecting rows to backfill - [ ] Establish connection when `yugabyte` role is password protected - [ ] Handle errors anywhere in the schema migration process - [ ] Handle multiple indexes backfilling at same time (issue #4785) - [ ] Have postgres respect master to tserver RPC deadline - [ ] Support create unique index (issue #4899) - [ ] Support nested DDL create index (issue #4786) - [ ] Work on multi-stage drop index Implement it as follows: 1. Pass database name from master to tserver on `BackfillIndex` request 1. Link libpq to tablet in order to send libpq request from tserver 1. Add `BACKFILL INDEX READ TIME PARTITION [ FROM [ TO ] ]` grammar 1. Wire it down a similar path as `index_build`, but pass down read time and partition key (don't handle row keys yet) through exec params 1. Pass down hard-coded ancient write time 1. Read from indexed table tablet with specified partition key with specified read time 1. Non-transactionally write to index table with specified write time For now, explicitly error on unique index creation and nested DDL index creation because they are unstable. They can later be enabled and wired to use the fast path (no multi-stage). Eventually, after some work, we want to enable them with backfill (multi-stage). Also, remove support for collecting `reltuples` stats on indexes when using backfill. We don't really use this stat, and we don't even collect it for non-index tables, so it shouldn't be a big deal for now. This is part 4 of the effort of bringing index backfill to YSQL. Keep #2301 open. Depends on D8368 Depends on D8578 Test Plan: `./yb_build.sh --cxx-test pgwrapper_pg_libpq-test --gtest_filter 'PgLibPqTest.Backfill*'` Reviewers: amitanand, neil, mihnea Reviewed By: mihnea Subscribers: yql, bogdan Differential Revision: https://phabricator.dev.yugabyte.com/D8487 --- CMakeLists.txt | 2 + src/postgres/CMakeLists.txt | 3 +- src/postgres/src/backend/access/ybc/ybcin.c | 50 ++- src/postgres/src/backend/catalog/index.c | 180 +++++++- src/postgres/src/backend/commands/indexcmds.c | 92 +++- .../src/backend/executor/ybcModifyTable.c | 23 +- src/postgres/src/backend/parser/gram.y | 96 ++++- src/postgres/src/backend/tcop/utility.c | 15 + .../src/backend/utils/misc/pg_yb_utils.c | 6 + src/postgres/src/include/access/amapi.h | 12 + src/postgres/src/include/access/ybcin.h | 5 + src/postgres/src/include/catalog/index.h | 14 + src/postgres/src/include/commands/defrem.h | 1 + .../src/include/executor/ybcModifyTable.h | 7 +- src/postgres/src/include/nodes/nodes.h | 2 + src/postgres/src/include/nodes/parsenodes.h | 25 ++ src/postgres/src/include/parser/kwlist.h | 1 + src/postgres/src/include/pg_yb_utils.h | 1 + src/postgres/src/tools/pgindent/typedefs.list | 2 + src/yb/client/async_rpc.cc | 3 + src/yb/client/batcher.h | 2 +- src/yb/client/client.cc | 24 ++ src/yb/client/client.h | 6 + src/yb/client/session.cc | 2 +- src/yb/client/session.h | 2 +- src/yb/client/yb_op.cc | 6 + src/yb/client/yb_op.h | 9 + src/yb/master/backfill_index.cc | 25 +- src/yb/master/backfill_index.h | 8 +- src/yb/tablet/CMakeLists.txt | 15 +- src/yb/tablet/tablet.cc | 95 ++++- src/yb/tablet/tablet.h | 7 + src/yb/tserver/tablet_server.cc | 3 + src/yb/tserver/tablet_server.h | 5 + src/yb/tserver/tablet_service.cc | 34 +- src/yb/tserver/tserver_admin.proto | 4 + src/yb/util/CMakeLists.txt | 1 + src/yb/util/pg_connstr.cc | 41 ++ src/yb/util/pg_connstr.h | 26 ++ src/yb/yql/pggate/pg_ddl.cc | 2 + src/yb/yql/pggate/pg_dml_write.cc | 6 + src/yb/yql/pggate/pg_dml_write.h | 2 + src/yb/yql/pggate/pg_doc_op.cc | 32 ++ src/yb/yql/pggate/pg_doc_op.h | 16 + src/yb/yql/pggate/pggate.cc | 8 + src/yb/yql/pggate/pggate.h | 2 + src/yb/yql/pggate/ybc_pg_typedefs.h | 4 + src/yb/yql/pggate/ybc_pggate.cc | 10 + src/yb/yql/pggate/ybc_pggate.h | 2 + src/yb/yql/pgwrapper/libpq_utils.cc | 24 ++ src/yb/yql/pgwrapper/libpq_utils.h | 3 + src/yb/yql/pgwrapper/pg_libpq-test.cc | 403 ++++++++++++++++++ 52 files changed, 1312 insertions(+), 57 deletions(-) create mode 100644 src/yb/util/pg_connstr.cc create mode 100644 src/yb/util/pg_connstr.h diff --git a/CMakeLists.txt b/CMakeLists.txt index 1e1c308f7e51..29fa16631d8b 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1685,6 +1685,8 @@ endif() message("Linker flags: ${CMAKE_EXE_LINKER_FLAGS}") +set(PQ_SHARED_LIB + "${YB_BUILD_ROOT}/postgres/lib/libpq${YB_SHARED_LIBRARY_SUFFIX}") set(YB_PGBACKEND_SHARED_LIB "${YB_BUILD_ROOT}/postgres/lib/libyb_pgbackend${YB_SHARED_LIBRARY_SUFFIX}") diff --git a/src/postgres/CMakeLists.txt b/src/postgres/CMakeLists.txt index e195a49f3262..5c75b7864d8d 100644 --- a/src/postgres/CMakeLists.txt +++ b/src/postgres/CMakeLists.txt @@ -44,7 +44,8 @@ set(build_postgres_args # its own flags. add_custom_target(configure_postgres ALL COMMAND ${build_postgres_args} --step configure) add_custom_target(postgres ALL COMMAND ${build_postgres_args} --step make - BYPRODUCTS "${YB_PGBACKEND_SHARED_LIB}") + BYPRODUCTS "${YB_PGBACKEND_SHARED_LIB}" + "${PQ_SHARED_LIB}") add_dependencies(postgres configure_postgres) # ------------------------------------------------------------------------------------------------ diff --git a/src/postgres/src/backend/access/ybc/ybcin.c b/src/postgres/src/backend/access/ybc/ybcin.c index 412ad07ca4b2..66f7c032a5f9 100644 --- a/src/postgres/src/backend/access/ybc/ybcin.c +++ b/src/postgres/src/backend/access/ybc/ybcin.c @@ -41,8 +41,9 @@ /* Working state for ybcinbuild and its callback */ typedef struct { - bool isprimary; - double index_tuples; + bool isprimary; /* are we building a primary index? */ + double index_tuples; /* # of tuples inserted into index */ + bool is_backfill; /* are we concurrently backfilling an index? */ } YBCBuildState; /* @@ -93,6 +94,7 @@ ybcinhandler(PG_FUNCTION_ARGS) amroutine->amparallelrescan = NULL; amroutine->yb_aminsert = ybcininsert; amroutine->yb_amdelete = ybcindelete; + amroutine->yb_ambackfill = ybcinbackfill; PG_RETURN_POINTER(amroutine); } @@ -104,7 +106,11 @@ ybcinbuildCallback(Relation index, HeapTuple heapTuple, Datum *values, bool *isn YBCBuildState *buildstate = (YBCBuildState *)state; if (!buildstate->isprimary) - YBCExecuteInsertIndex(index, values, isnull, heapTuple->t_ybctid); + YBCExecuteInsertIndex(index, + values, + isnull, + heapTuple->t_ybctid, + buildstate->is_backfill); buildstate->index_tuples += 1; } @@ -118,6 +124,7 @@ ybcinbuild(Relation heap, Relation index, struct IndexInfo *indexInfo) /* Do the heap scan */ buildstate.isprimary = index->rd_index->indisprimary; buildstate.index_tuples = 0; + buildstate.is_backfill = false; heap_tuples = IndexBuildHeapScan(heap, index, indexInfo, true, ybcinbuildCallback, &buildstate, NULL); @@ -130,6 +137,37 @@ ybcinbuild(Relation heap, Relation index, struct IndexInfo *indexInfo) return result; } +IndexBuildResult * +ybcinbackfill(Relation heap, + Relation index, + struct IndexInfo *indexInfo, + uint64_t *read_time, + RowBounds *row_bounds) +{ + YBCBuildState buildstate; + double heap_tuples = 0; + + /* Do the heap scan */ + buildstate.isprimary = index->rd_index->indisprimary; + buildstate.index_tuples = 0; + buildstate.is_backfill = true; + heap_tuples = IndexBackfillHeapRangeScan(heap, + index, + indexInfo, + ybcinbuildCallback, + &buildstate, + read_time, + row_bounds); + + /* + * Return statistics + */ + IndexBuildResult *result = (IndexBuildResult *) palloc(sizeof(IndexBuildResult)); + result->heap_tuples = heap_tuples; + result->index_tuples = buildstate.index_tuples; + return result; +} + void ybcinbuildempty(Relation index) { @@ -141,7 +179,11 @@ ybcininsert(Relation index, Datum *values, bool *isnull, Datum ybctid, Relation IndexUniqueCheck checkUnique, struct IndexInfo *indexInfo) { if (!index->rd_index->indisprimary) - YBCExecuteInsertIndex(index, values, isnull, ybctid); + YBCExecuteInsertIndex(index, + values, + isnull, + ybctid, + false /* is_backfill */); return index->rd_index->indisunique ? true : false; } diff --git a/src/postgres/src/backend/catalog/index.c b/src/postgres/src/backend/catalog/index.c index 6d434ce7d6e9..280dc25fb256 100644 --- a/src/postgres/src/backend/catalog/index.c +++ b/src/postgres/src/backend/catalog/index.c @@ -131,6 +131,18 @@ static void UpdateIndexRelation(Oid indexoid, Oid heapoid, static void index_update_stats(Relation rel, bool hasindex, double reltuples); +static double IndexBuildHeapRangeScanInternal(Relation heapRelation, + Relation indexRelation, + IndexInfo *indexInfo, + bool allow_sync, + bool anyvisible, + BlockNumber start_blockno, + BlockNumber numblocks, + IndexBuildCallback callback, + void *callback_state, + HeapScanDesc scan, + uint64_t *read_time, + RowBounds *row_bounds); static void IndexCheckExclusion(Relation heapRelation, Relation indexRelation, IndexInfo *indexInfo); @@ -2453,6 +2465,88 @@ index_build(Relation heapRelation, SetUserIdAndSecContext(save_userid, save_sec_context); } +/* + * index_backfill - invoke access-method-specific index backfill procedure + * + * This is mainly a copy of index_build. index_build is used for + * non-multi-stage index creation; index_backfill is used for multi-stage index + * creation. + */ +void +index_backfill(Relation heapRelation, + Relation indexRelation, + IndexInfo *indexInfo, + bool isprimary, + uint64_t *read_time, + RowBounds *row_bounds) +{ + IndexBuildResult *stats; + Oid save_userid; + int save_sec_context; + int save_nestlevel; + + /* + * sanity checks + */ + Assert(RelationIsValid(indexRelation)); + Assert(PointerIsValid(indexRelation->rd_amroutine)); + Assert(PointerIsValid(indexRelation->rd_amroutine->yb_ambackfill)); + + ereport(DEBUG1, + (errmsg("backfilling index \"%s\" on table \"%s\"", + RelationGetRelationName(indexRelation), + RelationGetRelationName(heapRelation)))); + + /* + * Switch to the table owner's userid, so that any index functions are run + * as that user. Also lock down security-restricted operations and + * arrange to make GUC variable changes local to this command. + */ + GetUserIdAndSecContext(&save_userid, &save_sec_context); + SetUserIdAndSecContext(heapRelation->rd_rel->relowner, + save_sec_context | SECURITY_RESTRICTED_OPERATION); + save_nestlevel = NewGUCNestLevel(); + + /* + * Call the access method's build procedure + */ + stats = indexRelation->rd_amroutine->yb_ambackfill(heapRelation, + indexRelation, + indexInfo, + read_time, + row_bounds); + Assert(PointerIsValid(stats)); + + /* + * I don't think we should be backfilling unlogged indexes. + */ + Assert(indexRelation->rd_rel->relpersistence != RELPERSISTENCE_UNLOGGED); + + /* + * Update heap and index pg_class rows + * TODO(jason): properly update reltuples. They can't be set here because + * this backfill func is called for each backfill chunk request from + * master, and we need some way to sum up the tuple numbers. We also don't + * even collect stats properly for heapRelation anyway, at the moment. + */ + index_update_stats(heapRelation, + true, + -1); + + index_update_stats(indexRelation, + false, + -1); + + /* Make the updated catalog row versions visible */ + CommandCounterIncrement(); + + /* Roll back any GUC changes executed by index functions */ + AtEOXact_GUC(false, save_nestlevel); + + /* Restore userid and security context */ + SetUserIdAndSecContext(save_userid, save_sec_context); +} + /* * IndexBuildHeapScan - scan the heap relation to find tuples to be indexed @@ -2493,16 +2587,6 @@ IndexBuildHeapScan(Relation heapRelation, callback, callback_state, scan); } -/* - * As above, except that instead of scanning the complete heap, only the given - * number of blocks are scanned. Scan to end-of-rel can be signalled by - * passing InvalidBlockNumber as numblocks. Note that restricting the range - * to scan cannot be done when requesting syncscan. - * - * When "anyvisible" mode is requested, all tuples visible to any transaction - * are indexed and counted as live, including those inserted or deleted by - * transactions that are still in progress. - */ double IndexBuildHeapRangeScan(Relation heapRelation, Relation indexRelation, @@ -2514,6 +2598,67 @@ IndexBuildHeapRangeScan(Relation heapRelation, IndexBuildCallback callback, void *callback_state, HeapScanDesc scan) +{ + return IndexBuildHeapRangeScanInternal(heapRelation, + indexRelation, + indexInfo, + allow_sync, + anyvisible, + start_blockno, + numblocks, + callback, + callback_state, + scan, + NULL /* read_time */, + NULL /* row_bounds */); +} + +double +IndexBackfillHeapRangeScan(Relation heapRelation, + Relation indexRelation, + IndexInfo *indexInfo, + IndexBuildCallback callback, + void *callback_state, + uint64_t *read_time, + RowBounds *row_bounds) +{ + return IndexBuildHeapRangeScanInternal(heapRelation, + indexRelation, + indexInfo, + true /* allow_sync */, + false /* any_visible */, + 0 /* start_blockno */, + InvalidBlockNumber /* num_blocks */, + callback, + callback_state, + NULL /* scan */, + read_time, + row_bounds); +} + +/* + * As above, except that instead of scanning the complete heap, only the given + * number of blocks are scanned. Scan to end-of-rel can be signalled by + * passing InvalidBlockNumber as numblocks. Note that restricting the range + * to scan cannot be done when requesting syncscan. + * + * When "anyvisible" mode is requested, all tuples visible to any transaction + * are indexed and counted as live, including those inserted or deleted by + * transactions that are still in progress. + */ +static double +IndexBuildHeapRangeScanInternal(Relation heapRelation, + Relation indexRelation, + IndexInfo *indexInfo, + bool allow_sync, + bool anyvisible, + BlockNumber start_blockno, + BlockNumber numblocks, + IndexBuildCallback callback, + void *callback_state, + HeapScanDesc scan, + uint64_t *read_time, + RowBounds *row_bounds) { bool is_system_catalog; bool checking_uniqueness; @@ -2557,6 +2702,15 @@ IndexBuildHeapRangeScan(Relation heapRelation, econtext = GetPerTupleExprContext(estate); slot = MakeSingleTupleTableSlot(RelationGetDescr(heapRelation)); + /* + * Set some exec params. + */ + YBCPgExecParameters *exec_params = &estate->yb_exec_params; + if (read_time) + exec_params->read_time = *read_time; + if (row_bounds) + exec_params->partition_key = pstrdup(row_bounds->partition_key); + /* Arrange for econtext's scan tuple to be the tuple under test */ econtext->ecxt_scantuple = slot; @@ -2598,6 +2752,8 @@ IndexBuildHeapRangeScan(Relation heapRelation, NULL, /* scan key */ true, /* buffer access strategy OK */ allow_sync); /* syncscan OK? */ + if (IsYBRelation(heapRelation)) + scan->ybscan->exec_params = exec_params; } else { @@ -2616,8 +2772,8 @@ IndexBuildHeapRangeScan(Relation heapRelation, /* * Must call GetOldestXmin() with SnapshotAny. Should never call * GetOldestXmin() with MVCC snapshot. (It's especially worth checking - * this for parallel builds, since ambuild routines that support parallel - * builds must work these details out for themselves.) + * this for parallel builds, since yb_ambackfill routines that support + * parallel builds must work these details out for themselves.) */ Assert(snapshot == SnapshotAny || IsMVCCSnapshot(snapshot)); Assert(snapshot == SnapshotAny ? TransactionIdIsValid(OldestXmin) : diff --git a/src/postgres/src/backend/commands/indexcmds.c b/src/postgres/src/backend/commands/indexcmds.c index 440a2d3932e0..1ad4ff138da1 100644 --- a/src/postgres/src/backend/commands/indexcmds.c +++ b/src/postgres/src/backend/commands/indexcmds.c @@ -394,22 +394,38 @@ DefineIndex(Oid relationId, INDEX_MAX_KEYS))); /* - * An index build should be concurent when + * An index build should be concurent when all of the following hold: * - index backfill is enabled * - the index is secondary * - the indexed table is not temporary * Otherwise, it should not be concurrent. This logic works because - * - primary keys can't be altered after `CREATE TABLE`, so there is no - * need for index backfill. + * - primary key indexes are on the main table, and index backfill doesn't + * apply to them. * - temporary tables cannot have concurrency issues when building indexes. + * Concurrent index build is currently disabled for + * - indexes in nested DDL + * - unique indexes */ stmt->concurrent = (!YBCGetDisableIndexBackfill() && !stmt->primary && IsYBRelationById(relationId)); - if (stmt->concurrent) - ereport(LOG, - (errmsg("creating index concurrently for table with oid %d", - relationId))); + { + int ddl_nesting_level = YBGetDdlNestingLevel(); + if (stmt->concurrent && ddl_nesting_level != 1) + ereport(ERROR, + (errmsg("backfill for secondary indexes is currently only" + " supported for standalone CREATE INDEX" + " statements"), + errhint("See https://github.com/YugaByte/yugabyte-db/issues/%d. " + "Click '+' on the description to raise its priority", + 4786))); + } + if (stmt->concurrent && stmt->unique) + ereport(ERROR, + (errmsg("backfill for unique indexes is not yet supported"), + errhint("See https://github.com/YugaByte/yugabyte-db/issues/%d. " + "Click '+' on the description to raise its priority", + 4899))); /* * Only SELECT ... FOR UPDATE/SHARE are allowed while doing a standard @@ -1214,6 +1230,8 @@ DefineIndex(Oid relationId, YBIncrementDdlNestingLevel(); StartTransactionCommand(); + /* TODO(jason): handle exclusion constraints, possibly not here. */ + /* * TODO(jason): retry backfill or revert schema changes instead of failing * through HandleYBStatus. @@ -1229,9 +1247,6 @@ DefineIndex(Oid relationId, * YB_INDEX_PERM_WRITE_AND_DELETE_WHILE_REMOVING. */ - /* We should now definitely not be advertising any xmin. */ - Assert(MyPgXact->xmin == InvalidTransactionId); - /* * Index can now be marked valid -- update its pg_index entry */ @@ -2649,3 +2664,60 @@ IndexSetParentIndex(Relation partitionIdx, Oid parentOid) CommandCounterIncrement(); } } + +void +BackfillIndex(BackfillIndexStmt *stmt) +{ + IndexInfo *indexInfo; + ListCell *cell; + Oid heapId; + Oid indexId; + Relation heapRel; + Relation indexRel; + + if (YBCGetDisableIndexBackfill()) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("backfill is not enabled"))); + + /* + * Examine oid list. Currently, we only allow it to be a single oid, but + * later it should handle multiple oids of indexes on the same indexed + * table. + * TODO(jason): fix from here downwards for issue #4785. + */ + if (list_length(stmt->oid_list) != 1) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("only a single oid is allowed in BACKFILL INDEX (see" + " issue #4785)"))); + + foreach(cell, stmt->oid_list) + { + indexId = lfirst_oid(cell); + } + + heapId = IndexGetRelation(indexId, false); + // TODO(jason): why ShareLock instead of ShareUpdateExclusiveLock? + heapRel = heap_open(heapId, ShareLock); + indexRel = index_open(indexId, ShareLock); + + indexInfo = BuildIndexInfo(indexRel); + /* + * The index should be ready for writes because it should be on the + * BACKFILLING permission. + */ + Assert(indexInfo->ii_ReadyForInserts); + indexInfo->ii_Concurrent = true; + indexInfo->ii_BrokenHotChain = false; + + index_backfill(heapRel, + indexRel, + indexInfo, + false, + &stmt->read_time, + stmt->row_bounds); + + index_close(indexRel, ShareLock); + heap_close(heapRel, ShareLock); +} diff --git a/src/postgres/src/backend/executor/ybcModifyTable.c b/src/postgres/src/backend/executor/ybcModifyTable.c index 58dcdafbdf32..2fd1d31b3ae4 100644 --- a/src/postgres/src/backend/executor/ybcModifyTable.c +++ b/src/postgres/src/backend/executor/ybcModifyTable.c @@ -506,7 +506,11 @@ Oid YBCHeapInsert(TupleTableSlot *slot, } } -void YBCExecuteInsertIndex(Relation index, Datum *values, bool *isnull, Datum ybctid) +void YBCExecuteInsertIndex(Relation index, + Datum *values, + bool *isnull, + Datum ybctid, + bool is_backfill) { Assert(index->rd_rel->relkind == RELKIND_INDEX); Assert(ybctid != 0); @@ -516,10 +520,14 @@ void YBCExecuteInsertIndex(Relation index, Datum *values, bool *isnull, Datum yb YBCPgStatement insert_stmt = NULL; /* Create the INSERT request and add the values from the tuple. */ + /* + * TODO(jason): rename `is_single_row_txn` to something like + * `non_distributed_txn` when closing issue #4906. + */ HandleYBStatus(YBCPgNewInsert(dboid, - relid, - false /* is_single_row_txn */, - &insert_stmt)); + relid, + is_backfill /* is_single_row_txn */, + &insert_stmt)); PrepareIndexWriteStmt(insert_stmt, index, values, isnull, RelationGetNumberOfAttributes(index), @@ -533,6 +541,13 @@ void YBCExecuteInsertIndex(Relation index, Datum *values, bool *isnull, Datum yb HandleYBStatus(YBCPgInsertStmtSetUpsertMode(insert_stmt)); } + /* For index backfill, set write hybrid time to a time in the past. This + * is to guarantee that backfilled writes are temporally before any online + * writes. */ + /* TODO(jason): don't hard-code 50. */ + if (is_backfill) + HandleYBStatus(YBCPgInsertStmtSetWriteTime(insert_stmt, 50)); + /* Execute the insert and clean up. */ YBCExecWriteStmt(insert_stmt, index, NULL /* rows_affected_count */); } diff --git a/src/postgres/src/backend/parser/gram.y b/src/postgres/src/backend/parser/gram.y index bc9f50ed3d37..f051d20e96dd 100644 --- a/src/postgres/src/backend/parser/gram.y +++ b/src/postgres/src/backend/parser/gram.y @@ -276,6 +276,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); PartitionBoundSpec *partboundspec; RoleSpec *rolespec; OptSplit *splitopt; + RowBounds *rowbounds; } %type stmt schema_stmt @@ -317,6 +318,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); CreateMatViewStmt RefreshMatViewStmt CreateAmStmt CreatePublicationStmt AlterPublicationStmt CreateSubscriptionStmt AlterSubscriptionStmt DropSubscriptionStmt + BackfillIndexStmt %type select_no_parens select_with_parens select_clause simple_select values_clause @@ -557,6 +559,8 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); %type opt_varying opt_timezone opt_no_inherit %type Iconst SignedIconst +%type Oid +%type oid_list %type Sconst comment_text notify_payload %type RoleId opt_boolean_or_string %type var_list @@ -623,6 +627,11 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); %type hash_partbound partbound_datum_list range_datum_list %type hash_partbound_elem +%type RowBounds +%type partition_key +%type row_key row_key_end row_key_start +%type read_time + /* * Non-keyword token types. These are hard-wired into the "flex" lexer. * They must be listed first so that their numeric codes do not depend on @@ -649,7 +658,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); AGGREGATE ALL ALSO ALTER ALWAYS ANALYSE ANALYZE AND ANY ARRAY AS ASC ASSERTION ASSIGNMENT ASYMMETRIC AT ATTACH ATTRIBUTE AUTHORIZATION - BACKWARD BEFORE BEGIN_P BETWEEN BIGINT BINARY BIT + BACKFILL BACKWARD BEFORE BEGIN_P BETWEEN BIGINT BINARY BIT BOOLEAN_P BOTH BY CACHE CALL CALLED CASCADE CASCADED CASE CAST CATALOG_P CHAIN CHAR_P @@ -925,6 +934,7 @@ stmt : | AlterExtensionContentsStmt { parser_ybc_beta_feature(@1, "extension"); } | AlterExtensionStmt { parser_ybc_beta_feature(@1, "extension"); } | AnalyzeStmt { parser_ybc_beta_feature(@1, "analyze"); } + | BackfillIndexStmt { parser_ybc_beta_feature(@1, "backfill index"); } | CreateFunctionStmt { parser_ybc_beta_feature(@1, "function"); } | CreateOpClassStmt { parser_ybc_beta_feature(@1, "opclass"); } | CreatePolicyStmt { parser_ybc_beta_feature(@1, "roles"); } @@ -8060,6 +8070,87 @@ opt_nulls_order: NULLS_LA FIRST_P { $$ = SORTBY_NULLS_FIRST; } | /*EMPTY*/ { $$ = SORTBY_NULLS_DEFAULT; } ; +BackfillIndexStmt: + BACKFILL INDEX oid_list + READ TIME read_time + RowBounds + { + BackfillIndexStmt *n = makeNode(BackfillIndexStmt); + n->oid_list = $3; + { + char *nptr = $6; + char *end; + errno = 0; + n->read_time = strtoul(nptr, &end, 10); + if (!(*nptr != '\0' && *end == '\0') + || errno == ERANGE) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("read time must be uint64"), + parser_errposition(@6))); + } + n->row_bounds = $7; + $$ = (Node *)n; + } + ; + +oid_list: Oid + { + $$ = list_make1_oid($1); + } + | oid_list ',' Oid + { + $$ = lappend_oid($1, $3); + } + ; + +read_time: I_or_F_const + { + A_Const *con = (A_Const *)$1; + if (con->val.type == T_Integer) + $$ = psprintf("%d", con->val.val.ival); + else + $$ = con->val.val.str; + } + ; + +RowBounds: PARTITION partition_key + { + $$ = makeNode(RowBounds); + /* Strip the leading 'x' */ + $$->partition_key = $2 + 1; + $$->row_key_start = NULL; + $$->row_key_end = NULL; + } + | PARTITION partition_key FROM row_key_start + { + $$ = makeNode(RowBounds); + /* Strip the leading 'x' */ + $$->partition_key = $2 + 1; + $$->row_key_start = $4 + 1; + $$->row_key_end = NULL; + } + | PARTITION partition_key FROM row_key_start TO row_key_end + { + $$ = makeNode(RowBounds); + /* Strip the leading 'x' */ + $$->partition_key = $2 + 1; + $$->row_key_start = $4 + 1; + $$->row_key_end = $6 + 1; + } + ; + +partition_key: + XCONST { $$ = $1; }; + +row_key_start: + row_key { $$ = $1; }; + +row_key_end: + row_key { $$ = $1; }; + +row_key: XCONST { $$ = $1; }; + /***************************************************************************** * @@ -15619,6 +15710,8 @@ SignedIconst: Iconst { $$ = $1; } | '-' Iconst { $$ = - $2; } ; +Oid: ICONST { $$ = $1; }; + /* Role specifications */ RoleId: RoleSpec { @@ -15772,6 +15865,7 @@ unreserved_keyword: | AT | ATTACH | ATTRIBUTE + | BACKFILL | BACKWARD | BEFORE | BEGIN_P diff --git a/src/postgres/src/backend/tcop/utility.c b/src/postgres/src/backend/tcop/utility.c index afc719b13b9b..502084207e7d 100644 --- a/src/postgres/src/backend/tcop/utility.c +++ b/src/postgres/src/backend/tcop/utility.c @@ -828,6 +828,13 @@ standard_ProcessUtility(PlannedStmt *pstmt, } break; + case T_BackfillIndexStmt: + { + BackfillIndexStmt *stmt = (BackfillIndexStmt *) parsetree; + BackfillIndex(stmt); + } + break; + /* * The following statements are supported by Event Triggers only * in some cases, so we "fast path" them in the other cases. @@ -2730,6 +2737,10 @@ CreateCommandTag(Node *parsetree) tag = "REINDEX"; break; + case T_BackfillIndexStmt: + tag = "BACKFILL INDEX"; + break; + case T_CreateConversionStmt: tag = "CREATE CONVERSION"; break; @@ -3330,6 +3341,10 @@ GetCommandLogLevel(Node *parsetree) lev = LOGSTMT_ALL; /* should this be DDL? */ break; + case T_BackfillIndexStmt: + lev = LOGSTMT_ALL; /* should this be DDL? */ + break; + case T_CreateConversionStmt: lev = LOGSTMT_DDL; break; diff --git a/src/postgres/src/backend/utils/misc/pg_yb_utils.c b/src/postgres/src/backend/utils/misc/pg_yb_utils.c index 67ef39bee709..c69158b0590c 100644 --- a/src/postgres/src/backend/utils/misc/pg_yb_utils.c +++ b/src/postgres/src/backend/utils/misc/pg_yb_utils.c @@ -726,6 +726,12 @@ YBIsInitDbAlreadyDone() static ProcessUtility_hook_type prev_ProcessUtility = NULL; static int ddl_nesting_level = 0; +int +YBGetDdlNestingLevel() +{ + return ddl_nesting_level; +} + void YBIncrementDdlNestingLevel() { diff --git a/src/postgres/src/include/access/amapi.h b/src/postgres/src/include/access/amapi.h index 11cc8b51afda..e9f8f8625f9d 100644 --- a/src/postgres/src/include/access/amapi.h +++ b/src/postgres/src/include/access/amapi.h @@ -25,6 +25,9 @@ struct IndexPath; /* Likewise, this file shouldn't depend on execnodes.h. */ struct IndexInfo; +/* Likewise, this file shouldn't depend on parsenodes.h */ +struct RowBounds; + /* * Properties for amproperty API. This list covers properties known to the @@ -93,6 +96,13 @@ typedef void (*yb_amdelete_function) (Relation indexRelation, Relation heapRelation, struct IndexInfo *indexInfo); +/* backfill this Yugabyte-based index */ +typedef IndexBuildResult *(*yb_ambackfill_function) (Relation heapRelation, + Relation indexRelation, + struct IndexInfo *indexInfo, + uint64_t *read_time, + struct RowBounds *row_bounds); + /* bulk delete */ typedef IndexBulkDeleteResult *(*ambulkdelete_function) (IndexVacuumInfo *info, IndexBulkDeleteResult *stats, @@ -244,8 +254,10 @@ typedef struct IndexAmRoutine aminitparallelscan_function aminitparallelscan; /* can be NULL */ amparallelrescan_function amparallelrescan; /* can be NULL */ + /* interface functions to support Yugabyte indexes */ yb_aminsert_function yb_aminsert; yb_amdelete_function yb_amdelete; + yb_ambackfill_function yb_ambackfill; } IndexAmRoutine; diff --git a/src/postgres/src/include/access/ybcin.h b/src/postgres/src/include/access/ybcin.h index 2bc2c933fb15..5fea73f9930c 100644 --- a/src/postgres/src/include/access/ybcin.h +++ b/src/postgres/src/include/access/ybcin.h @@ -34,6 +34,11 @@ extern bool ybcininsert(Relation rel, Datum *values, bool *isnull, Datum ybctid, IndexUniqueCheck checkUnique, struct IndexInfo *indexInfo); extern void ybcindelete(Relation rel, Datum *values, bool *isnull, Datum ybctid, Relation heapRel, struct IndexInfo *indexInfo); +extern IndexBuildResult *ybcinbackfill(Relation heap, + Relation index, + struct IndexInfo *indexInfo, + uint64_t *read_time, + RowBounds *row_bounds); extern IndexBulkDeleteResult *ybcinbulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats, IndexBulkDeleteCallback callback, diff --git a/src/postgres/src/include/catalog/index.h b/src/postgres/src/include/catalog/index.h index 0871814b0a00..be18a810f9bb 100644 --- a/src/postgres/src/include/catalog/index.h +++ b/src/postgres/src/include/catalog/index.h @@ -113,6 +113,13 @@ extern void index_build(Relation heapRelation, bool isreindex, bool parallel); +extern void index_backfill(Relation heapRelation, + Relation indexRelation, + IndexInfo *indexInfo, + bool isprimary, + uint64_t *read_time, + RowBounds *row_bounds); + extern double IndexBuildHeapScan(Relation heapRelation, Relation indexRelation, IndexInfo *indexInfo, @@ -130,6 +137,13 @@ extern double IndexBuildHeapRangeScan(Relation heapRelation, IndexBuildCallback callback, void *callback_state, HeapScanDesc scan); +extern double IndexBackfillHeapRangeScan(Relation heapRelation, + Relation indexRelation, + IndexInfo *indexInfo, + IndexBuildCallback callback, + void *callback_state, + uint64_t *read_time, + RowBounds *row_bounds); extern void validate_index(Oid heapId, Oid indexId, Snapshot snapshot); diff --git a/src/postgres/src/include/commands/defrem.h b/src/postgres/src/include/commands/defrem.h index 1d05a4bcdc88..62f91514004e 100644 --- a/src/postgres/src/include/commands/defrem.h +++ b/src/postgres/src/include/commands/defrem.h @@ -50,6 +50,7 @@ extern bool CheckIndexCompatible(Oid oldId, extern Oid GetDefaultOpClass(Oid type_id, Oid am_id); extern Oid ResolveOpClass(List *opclass, Oid attrType, const char *accessMethodName, Oid accessMethodId); +extern void BackfillIndex(BackfillIndexStmt *stmt); /* commands/functioncmds.c */ extern ObjectAddress CreateFunction(ParseState *pstate, CreateFunctionStmt *stmt); diff --git a/src/postgres/src/include/executor/ybcModifyTable.h b/src/postgres/src/include/executor/ybcModifyTable.h index 52088bb92a83..0afbbc646c47 100644 --- a/src/postgres/src/include/executor/ybcModifyTable.h +++ b/src/postgres/src/include/executor/ybcModifyTable.h @@ -57,9 +57,10 @@ extern Oid YBCExecuteNonTxnInsert(Relation rel, * Insert a tuple into the an index's backing YugaByte index table. */ extern void YBCExecuteInsertIndex(Relation rel, - Datum *values, - bool *isnull, - Datum ybctid); + Datum *values, + bool *isnull, + Datum ybctid, + bool is_backfill); /* * Delete a tuple (identified by ybctid) from a YugaByte table. diff --git a/src/postgres/src/include/nodes/nodes.h b/src/postgres/src/include/nodes/nodes.h index 9677cebb8b72..e2e15d554f53 100644 --- a/src/postgres/src/include/nodes/nodes.h +++ b/src/postgres/src/include/nodes/nodes.h @@ -359,6 +359,7 @@ typedef enum NodeTag T_LockStmt, T_ConstraintsSetStmt, T_ReindexStmt, + T_BackfillIndexStmt, T_CheckPointStmt, T_CreateSchemaStmt, T_AlterDatabaseStmt, @@ -476,6 +477,7 @@ typedef enum NodeTag T_PartitionCmd, T_VacuumRelation, T_OptSplit, + T_RowBounds, /* * TAGS FOR REPLICATION GRAMMAR PARSE NODES (replnodes.h) diff --git a/src/postgres/src/include/nodes/parsenodes.h b/src/postgres/src/include/nodes/parsenodes.h index da0f62363df2..614ee17c6c5d 100644 --- a/src/postgres/src/include/nodes/parsenodes.h +++ b/src/postgres/src/include/nodes/parsenodes.h @@ -868,6 +868,18 @@ typedef struct PartitionCmd PartitionBoundSpec *bound; /* FOR VALUES, if attaching */ } PartitionCmd; +/* + * RowBounds - row bounds for BACKFILL INDEX statement + */ +typedef struct RowBounds +{ + NodeTag type; + const char *partition_key; /* Partition key of tablet containing bound + */ + const char *row_key_start; /* Starting row of bound (inclusive) */ + const char *row_key_end; /* Ending row of bound (exclusive) */ +} RowBounds; + /**************************************************************************** * Nodes for a Query tree ****************************************************************************/ @@ -3340,6 +3352,19 @@ typedef struct ReindexStmt int options; /* Reindex options flags */ } ReindexStmt; +/* ---------------------- + * BACKFILL INDEX Statement + * ---------------------- + */ + +typedef struct BackfillIndexStmt +{ + NodeTag type; + List *oid_list; /* Oids of indexes to backfill */ + uint64_t read_time; /* Read time for backfill */ + RowBounds *row_bounds; /* Rows to backfill */ +} BackfillIndexStmt; + /* ---------------------- * CREATE CONVERSION Statement * ---------------------- diff --git a/src/postgres/src/include/parser/kwlist.h b/src/postgres/src/include/parser/kwlist.h index 5e350e4bfafd..ce7bbcfb735c 100644 --- a/src/postgres/src/include/parser/kwlist.h +++ b/src/postgres/src/include/parser/kwlist.h @@ -52,6 +52,7 @@ PG_KEYWORD("at", AT, UNRESERVED_KEYWORD) PG_KEYWORD("attach", ATTACH, UNRESERVED_KEYWORD) PG_KEYWORD("attribute", ATTRIBUTE, UNRESERVED_KEYWORD) PG_KEYWORD("authorization", AUTHORIZATION, TYPE_FUNC_NAME_KEYWORD) +PG_KEYWORD("backfill", BACKFILL, UNRESERVED_KEYWORD) PG_KEYWORD("backward", BACKWARD, UNRESERVED_KEYWORD) PG_KEYWORD("before", BEFORE, UNRESERVED_KEYWORD) PG_KEYWORD("begin", BEGIN_P, UNRESERVED_KEYWORD) diff --git a/src/postgres/src/include/pg_yb_utils.h b/src/postgres/src/include/pg_yb_utils.h index 9bb8b95643c8..b1c1ad928721 100644 --- a/src/postgres/src/include/pg_yb_utils.h +++ b/src/postgres/src/include/pg_yb_utils.h @@ -294,6 +294,7 @@ extern const char* YBHeapTupleToString(HeapTuple tuple, TupleDesc tupleDesc); */ bool YBIsInitDbAlreadyDone(); +int YBGetDdlNestingLevel(); void YBIncrementDdlNestingLevel(); void YBDecrementDdlNestingLevel(bool success); diff --git a/src/postgres/src/tools/pgindent/typedefs.list b/src/postgres/src/tools/pgindent/typedefs.list index ed68cc4085e0..fd25c59cd15e 100644 --- a/src/postgres/src/tools/pgindent/typedefs.list +++ b/src/postgres/src/tools/pgindent/typedefs.list @@ -148,6 +148,7 @@ AutoVacuumShmemStruct AutoVacuumWorkItem AutoVacuumWorkItemType AuxProcType +BackfillIndexStmt BF_ctx BF_key BF_word @@ -2001,6 +2002,7 @@ RoleSpec RoleSpecType RoleStmtType RollupData +RowBounds RowCompareExpr RowCompareType RowExpr diff --git a/src/yb/client/async_rpc.cc b/src/yb/client/async_rpc.cc index c8e6196a047a..3498d732de6b 100644 --- a/src/yb/client/async_rpc.cc +++ b/src/yb/client/async_rpc.cc @@ -381,6 +381,9 @@ WriteRpc::WriteRpc(AsyncRpcData* data, MonoDelta timeout) CHECK_EQ(table()->table_type(), YBTableType::PGSQL_TABLE_TYPE); auto* pgsql_op = down_cast(op->yb_op.get()); req_.add_pgsql_write_batch()->Swap(pgsql_op->mutable_request()); + if (pgsql_op->write_time()) { + req_.set_external_hybrid_time(pgsql_op->write_time().ToUint64()); + } break; } case YBOperation::Type::PGSQL_READ: FALLTHROUGH_INTENDED; diff --git a/src/yb/client/batcher.h b/src/yb/client/batcher.h index e5d3c1031bea..2235dc57e9d5 100644 --- a/src/yb/client/batcher.h +++ b/src/yb/client/batcher.h @@ -168,7 +168,7 @@ class Batcher : public RefCountedThreadSafe { force_consistent_read_ = value; } - void SetHybridTimeForWrite(HybridTime ht) { + void SetHybridTimeForWrite(const HybridTime ht) { hybrid_time_for_write_ = ht; } diff --git a/src/yb/client/client.cc b/src/yb/client/client.cc index 8c331dd8d190..bc04beb84603 100644 --- a/src/yb/client/client.cc +++ b/src/yb/client/client.cc @@ -580,6 +580,30 @@ Status YBClient::GetTableSchemaById(const TableId& table_id, std::shared_ptrGetTableSchemaById(this, table_id, deadline, info, callback); } +Result YBClient::GetIndexPermissions( + const TableId& table_id, + const TableId& index_id) { + auto deadline = CoarseMonoClock::Now() + default_admin_operation_timeout(); + return data_->GetIndexPermissions( + this, + table_id, + index_id, + deadline); +} + +Result YBClient::GetIndexPermissions( + const YBTableName& table_name, + const YBTableName& index_name) { + auto deadline = CoarseMonoClock::Now() + default_admin_operation_timeout(); + YBTableInfo table_info = VERIFY_RESULT(GetYBTableInfo(table_name)); + YBTableInfo index_info = VERIFY_RESULT(GetYBTableInfo(index_name)); + return data_->GetIndexPermissions( + this, + table_info.table_id, + index_info.table_id, + deadline); +} + Result YBClient::WaitUntilIndexPermissionsAtLeast( const TableId& table_id, const TableId& index_id, diff --git a/src/yb/client/client.h b/src/yb/client/client.h index 0108fbf2f1f2..c474e52608b4 100644 --- a/src/yb/client/client.h +++ b/src/yb/client/client.h @@ -322,6 +322,12 @@ class YBClient { CHECKED_STATUS GetTableSchemaById(const TableId& table_id, std::shared_ptr info, StatusCallback callback); + Result GetIndexPermissions( + const TableId& table_id, + const TableId& index_id); + Result GetIndexPermissions( + const YBTableName& table_name, + const YBTableName& index_name); Result WaitUntilIndexPermissionsAtLeast( const TableId& table_id, const TableId& index_id, diff --git a/src/yb/client/session.cc b/src/yb/client/session.cc index 5d882262de5d..b4992a9bf4fa 100644 --- a/src/yb/client/session.cc +++ b/src/yb/client/session.cc @@ -194,7 +194,7 @@ ConsistentReadPoint* YBSession::read_point() { return transaction_ ? &transaction_->read_point() : read_point_.get(); } -void YBSession::SetHybridTimeForWrite(HybridTime ht) { +void YBSession::SetHybridTimeForWrite(const HybridTime ht) { hybrid_time_for_write_ = ht; if (batcher_) { batcher_->SetHybridTimeForWrite(hybrid_time_for_write_); diff --git a/src/yb/client/session.h b/src/yb/client/session.h index 901741a8a602..40e7c5cdafe9 100644 --- a/src/yb/client/session.h +++ b/src/yb/client/session.h @@ -109,7 +109,7 @@ class YBSession : public std::enable_shared_from_this { void DeferReadPoint(); // Used for backfilling the index, where we may want to write with a historic timestamp. - void SetHybridTimeForWrite(HybridTime ht); + void SetHybridTimeForWrite(const HybridTime ht); // Changed transaction used by this session. void SetTransaction(YBTransactionPtr transaction); diff --git a/src/yb/client/yb_op.cc b/src/yb/client/yb_op.cc index 03b6cbac9a7a..d98e9cdc96c7 100644 --- a/src/yb/client/yb_op.cc +++ b/src/yb/client/yb_op.cc @@ -717,6 +717,12 @@ void YBPgsqlReadOp::SetHashCode(const uint16_t hash_code) { } Status YBPgsqlReadOp::GetPartitionKey(string* partition_key) const { + // If partition key is explicitly specified, use that. + if (partition_key_) { + *partition_key = partition_key_.get(); + return Status::OK(); + } + const Schema schema = table_->InternalSchema(); if (!read_request_->partition_column_values().empty()) { // If hashed columns are set, use them to compute the exact key and set the bounds diff --git a/src/yb/client/yb_op.h b/src/yb/client/yb_op.h index 0cf3413cd427..5a80c815c039 100644 --- a/src/yb/client/yb_op.h +++ b/src/yb/client/yb_op.h @@ -35,6 +35,8 @@ #include #include +#include + #include "yb/client/client_fwd.h" #include "yb/common/partial_row.h" @@ -471,6 +473,9 @@ class YBPgsqlWriteOp : public YBPgsqlOp { is_single_row_txn_ = is_single_row_txn; } + const HybridTime& write_time() const { return write_time_; } + void SetWriteTime(const HybridTime& value) { write_time_ = value; } + protected: virtual Type type() const override { return PGSQL_WRITE; } @@ -485,6 +490,7 @@ class YBPgsqlWriteOp : public YBPgsqlOp { // Whether this operation should be run as a single row txn. // Else could be distributed transaction (or non-transactional) depending on target table type. bool is_single_row_txn_ = false; + HybridTime write_time_; }; class YBPgsqlReadOp : public YBPgsqlOp { @@ -533,6 +539,8 @@ class YBPgsqlReadOp : public YBPgsqlOp { bool should_add_intents(IsolationLevel isolation_level) override; + void SetPartitionKey(const std::string& partition_key) { partition_key_ = partition_key; } + protected: virtual Type type() const override { return PGSQL_READ; } @@ -542,6 +550,7 @@ class YBPgsqlReadOp : public YBPgsqlOp { std::unique_ptr read_request_; YBConsistencyLevel yb_consistency_level_; ReadHybridTime read_time_; + boost::optional partition_key_ = boost::none; }; // This class is not thread-safe, though different YBNoOp objects on diff --git a/src/yb/master/backfill_index.cc b/src/yb/master/backfill_index.cc index cddcd1441c3e..a03a04b2c21e 100644 --- a/src/yb/master/backfill_index.cc +++ b/src/yb/master/backfill_index.cc @@ -277,9 +277,18 @@ Status MultiStageAlterTable::StartBackfillingData( l->Commit(); } indexed_table->SetIsBackfilling(true); + + scoped_refptr ns_info; + { + NamespaceIdentifierPB ns_identifier; + ns_identifier.set_id(indexed_table->namespace_id()); + RETURN_NOT_OK_PREPEND( + catalog_manager->FindNamespace(ns_identifier, &ns_info), + "Getting namespace info for backfill"); + } auto backfill_table = std::make_shared( catalog_manager->master_, catalog_manager->AsyncTaskPool(), - indexed_table, std::vector{index_pb}); + indexed_table, std::vector{index_pb}, ns_info); backfill_table->Launch(); return Status::OK(); } @@ -441,9 +450,10 @@ void BackfillTableJob::SetState(MonitoredTaskState new_state) { // ----------------------------------------------------------------------------------------------- BackfillTable::BackfillTable(Master *master, ThreadPool *callback_pool, const scoped_refptr &indexed_table, - std::vector indexes) + std::vector indexes, + const scoped_refptr &ns_info) : master_(master), callback_pool_(callback_pool), - indexed_table_(indexed_table), indexes_to_build_(indexes) { + indexed_table_(indexed_table), indexes_to_build_(indexes), ns_info_(ns_info) { LOG_IF(DFATAL, indexes_to_build_.size() != 1) << "As of Dec 2019, we only support " << "building one index at a time. indexes_to_build_.size() = " @@ -519,6 +529,10 @@ std::string BackfillTable::description() const { : Format("Waiting to GetSafeTime from $0/$1 tablets", num_pending, num_tablets))); } +const std::string BackfillTable::GetNamespaceName() const { + return ns_info_->name(); +} + Status BackfillTable::UpdateSafeTime(const Status& s, HybridTime ht) { if (!s.ok()) { // Move on to ABORTED permission. @@ -543,7 +557,7 @@ Status BackfillTable::UpdateSafeTime(const Status& s, HybridTime ht) { read_timestamp = read_time_for_backfill_; } - // If OK then move on to READ permissions. + // If OK then move on to doing backfill. if (!timestamp_chosen() && --tablets_pending_ == 0) { LOG_WITH_PREFIX(INFO) << "Completed fetching SafeTime for the table " << yb::ToString(indexed_table_) << " will be using " @@ -954,6 +968,9 @@ bool BackfillChunk::SendRequest(int attempt) { req.set_read_at_hybrid_time(backfill_tablet_->read_time_for_backfill().ToUint64()); req.set_schema_version(backfill_tablet_->schema_version()); req.set_start_key(start_key_); + if (backfill_tablet_->tablet()->table()->GetTableType() == TableType::PGSQL_TABLE_TYPE) { + req.set_namespace_name(backfill_tablet_->GetNamespaceName()); + } for (const IndexInfoPB& idx_info : backfill_tablet_->indexes()) { req.add_indexes()->CopyFrom(idx_info); } diff --git a/src/yb/master/backfill_index.h b/src/yb/master/backfill_index.h index 52823afe1f7d..b9350907cce0 100644 --- a/src/yb/master/backfill_index.h +++ b/src/yb/master/backfill_index.h @@ -85,7 +85,8 @@ class BackfillTable : public std::enable_shared_from_this { public: BackfillTable(Master *master, ThreadPool *callback_pool, const scoped_refptr &indexed_table, - std::vector indexes); + std::vector indexes, + const scoped_refptr &ns_info); void Launch(); @@ -126,6 +127,8 @@ class BackfillTable : public std::enable_shared_from_this { return leader_term_; } + const std::string GetNamespaceName() const; + private: void LaunchComputeSafeTimeForRead(); @@ -167,6 +170,7 @@ class BackfillTable : public std::enable_shared_from_this { std::shared_ptr backfill_job_; mutable simple_spinlock mutex_; HybridTime read_time_for_backfill_ GUARDED_BY(mutex_){HybridTime::kMin}; + const scoped_refptr ns_info_; }; class BackfillTableJob : public MonitoredTask { @@ -239,6 +243,8 @@ class BackfillTablet : public std::enable_shared_from_this { return done_.load(std::memory_order_acquire); } + const std::string GetNamespaceName() const { return backfill_table_->GetNamespaceName(); } + private: std::shared_ptr backfill_table_; const scoped_refptr tablet_; diff --git a/src/yb/tablet/CMakeLists.txt b/src/yb/tablet/CMakeLists.txt index ab812d3732ff..c92f0db85fd9 100644 --- a/src/yb/tablet/CMakeLists.txt +++ b/src/yb/tablet/CMakeLists.txt @@ -30,6 +30,13 @@ # under the License. # +include_directories(${YB_BUILD_ROOT}/postgres/include) +link_directories(${YB_BUILD_ROOT}/postgres/lib) + +add_library(pq SHARED IMPORTED) +set_target_properties(pq PROPERTIES IMPORTED_LOCATION "${PQ_SHARED_LIB}") +message("Added shared library dependency pq: ${PQ_SHARED_LIB}") + set(TABLET_SRCS abstract_tablet.cc cleanup_aborts_task.cc @@ -65,6 +72,9 @@ set(TABLET_SRCS preparer.cc ${TABLET_SRCS_EXTENSIONS}) +set(TABLET_DEPS + pq) + PROTOBUF_GENERATE_CPP( TABLET_PROTO_SRCS TABLET_PROTO_HDRS TABLET_PROTO_TGTS SOURCE_ROOT ${CMAKE_CURRENT_SOURCE_DIR}/../.. @@ -82,7 +92,10 @@ ADD_YB_LIBRARY(tablet_proto DEPS ${TABLET_PROTO_LIBS} opid_proto NONLINK_DEPS ${TABLET_PROTO_TGTS}) -add_library(tablet ${TABLET_SRCS}) +ADD_YB_LIBRARY( + tablet + SRCS ${TABLET_SRCS} + DEPS ${TABLET_DEPS}) cotire(tablet) ADD_YB_LIBRARY( diff --git a/src/yb/tablet/tablet.cc b/src/yb/tablet/tablet.cc index 63e27dbf2750..f75a1100c704 100644 --- a/src/yb/tablet/tablet.cc +++ b/src/yb/tablet/tablet.cc @@ -32,6 +32,8 @@ #include "yb/tablet/tablet.h" +#include + #include #include #include @@ -123,6 +125,8 @@ #include "yb/util/locks.h" #include "yb/util/mem_tracker.h" #include "yb/util/metrics.h" +#include "yb/util/net/net_util.h" +#include "yb/util/pg_connstr.h" #include "yb/util/scope_exit.h" #include "yb/util/slice.h" #include "yb/util/stopwatch.h" @@ -383,7 +387,7 @@ Tablet::Tablet(const TabletInitData& data) txns_enabled_(data.txns_enabled), retention_policy_(std::make_shared(clock_, metadata_.get())) { CHECK(schema()->has_column_ids()); - LOG_WITH_PREFIX(INFO) << " Schema version for " << metadata_->table_name() << " is " + LOG_WITH_PREFIX(INFO) << "Schema version for " << metadata_->table_name() << " is " << metadata_->schema_version(); if (data.metric_registry) { @@ -1584,7 +1588,6 @@ Result Tablet::HasScanReachedMaxPartitionKey( } } else if (pgsql_read_request.has_max_partition_key() && !pgsql_read_request.max_partition_key().empty()) { - docdb::DocKey partition_doc_key(*metadata_->schema()); VERIFY_RESULT(partition_doc_key.DecodeFrom( partition_key, docdb::DocKeyPart::kWholeDocKey, docdb::AllowSpecial::kTrue)); @@ -1615,7 +1618,6 @@ CHECKED_STATUS Tablet::CreatePagingStateForRead(const PgsqlReadRequestPB& pgsql_ !response->has_paging_state() && (!pgsql_read_request.has_limit() || row_count < pgsql_read_request.limit() || pgsql_read_request.return_paging_state())) { - // For backward scans partition_key_start must be used as next_partition_key. // Client level logic will check it and route next request to the preceding tablet. const auto& next_partition_key = @@ -2009,17 +2011,88 @@ Status Tablet::AlterWalRetentionSecs(ChangeMetadataOperationState* operation_sta operation_state->ToString()); } +// Assume that we are already in the Backfilling mode. +Result Tablet::BackfillIndexesForYsql( + const std::vector& indexes, + const std::string& backfill_from, + const CoarseTimePoint deadline, + const HybridTime read_time, + const HostPort& pgsql_proxy_bind_address, + const std::string& database_name) { + if (PREDICT_FALSE(FLAGS_TEST_slowdown_backfill_by_ms > 0)) { + TRACE("Sleeping for $0 ms", FLAGS_TEST_slowdown_backfill_by_ms); + SleepFor(MonoDelta::FromMilliseconds(FLAGS_TEST_slowdown_backfill_by_ms)); + } + LOG(INFO) << "Begin " << __func__ + << " at " << read_time + << " for " << yb::ToString(indexes); + + if (!backfill_from.empty()) { + return STATUS( + InvalidArgument, + "YSQL index backfill does not support backfill_from, yet"); + } + + // Construct connection string. + // TODO(jason): handle "yugabyte" role being password protected + std::string conn_str = Format( + "dbname='$0' host=$1 port=$2 user=$3", + EscapePgConnStrValue(database_name), + pgsql_proxy_bind_address.host(), + pgsql_proxy_bind_address.port(), + "yugabyte"); + VLOG(1) << __func__ << ": libpq connection string: " << conn_str; + + // Construct query string. + std::string index_oids; + { + std::stringstream ss; + for (auto& index : indexes) { + Oid index_oid = VERIFY_RESULT(GetPgsqlTableOid(index.table_id())); + ss << index_oid << ","; + } + index_oids = ss.str(); + index_oids.pop_back(); + } + std::string partition_key = metadata_->partition()->partition_key_start(); + // Ignoring the current situation where users can run BACKFILL INDEX queries themselves, this + // should be safe from injection attacks because the parameters only consist of characters + // [,0-9a-f]. + // TODO(jason): pass deadline + std::string query_str = Format( + "BACKFILL INDEX $0 READ TIME $1 PARTITION x'$2';", + index_oids, + read_time.ToUint64(), + b2a_hex(partition_key)); + VLOG(1) << __func__ << ": libpq query string: " << query_str; + + // Connect and execute. + auto conn = PQconnectdb(conn_str.c_str()); + auto res = PQexec(conn, query_str.c_str()); + auto status = PQresultStatus(res); + PQclear(res); + PQfinish(conn); + + // TODO(jason): more properly handle bad statuses + if (status == PGRES_FATAL_ERROR) { + return STATUS_FORMAT( + QLError, + "Got PQ status $0 with message \"$1\" when running \"$2\"", + status, + PQresultErrorMessage(res), + query_str); + } + // TODO(jason): handle partially finished backfills. How am I going to get that info? From + // response message by libpq or manual DocDB inspection? + return ""; +} + // Should backfill the index with the information contained in this tablet. // Assume that we are already in the Backfilling mode. Result Tablet::BackfillIndexes(const std::vector &indexes, const std::string& backfill_from, const CoarseTimePoint deadline, const HybridTime read_time) { - if (table_type_ == PGSQL_TABLE_TYPE) { - // TODO(jason): handle YSQL backfill. - // For now, mark the backfill as done. - return ""; - } if (PREDICT_FALSE(FLAGS_TEST_slowdown_backfill_by_ms > 0)) { TRACE("Sleeping for $0 ms", FLAGS_TEST_slowdown_backfill_by_ms); SleepFor(MonoDelta::FromMilliseconds(FLAGS_TEST_slowdown_backfill_by_ms)); @@ -2044,9 +2117,9 @@ Result Tablet::BackfillIndexes(const std::vector &indexe } } } - std::vector index_names; + std::vector index_ids; for (const IndexInfo& idx : indexes) { - index_names.push_back(idx.table_id()); + index_ids.push_back(idx.table_id()); for (const auto& idx_col : idx.columns()) { if (col_ids_set.find(idx_col.indexed_column_id) == col_ids_set.end()) { col_ids_set.insert(idx_col.indexed_column_id); @@ -2095,7 +2168,7 @@ Result Tablet::BackfillIndexes(const std::vector &indexe VLOG(1) << "Processed " << num_rows_processed << " rows"; RETURN_NOT_OK(FlushIndexBatchIfRequired(&index_requests, /* forced */ true)); LOG(INFO) << "Done BackfillIndexes at " << read_time << " for " - << yb::ToString(index_names) << " until " + << yb::ToString(index_ids) << " until " << (resume_from.empty() ? "" : b2a_hex(resume_from)); return resume_from; diff --git a/src/yb/tablet/tablet.h b/src/yb/tablet/tablet.h index d867dc2fe2a8..50a970c3311f 100644 --- a/src/yb/tablet/tablet.h +++ b/src/yb/tablet/tablet.h @@ -202,6 +202,13 @@ class Tablet : public AbstractTablet, public TransactionIntentApplier { CHECKED_STATUS EnableCompactions(ScopedRWOperationPause* operation_pause); + Result BackfillIndexesForYsql( + const std::vector& indexes, + const std::string& backfill_from, + const CoarseTimePoint deadline, + const HybridTime read_time, + const HostPort& pgsql_proxy_bind_address, + const std::string& database_name); Result BackfillIndexes(const std::vector& indexes, const std::string& backfill_from, const CoarseTimePoint deadline, diff --git a/src/yb/tserver/tablet_server.cc b/src/yb/tserver/tablet_server.cc index 1fb613ad3f3f..50c69b409e6e 100644 --- a/src/yb/tserver/tablet_server.cc +++ b/src/yb/tserver/tablet_server.cc @@ -254,6 +254,9 @@ Status TabletServer::Init() { shared_object_->SetEndpoint(bound_addresses.front()); } + // 5433 is kDefaultPort in src/yb/yql/pgwrapper/pg_wrapper.h. + RETURN_NOT_OK(pgsql_proxy_bind_address_.ParseString(FLAGS_pgsql_proxy_bind_address, 5433)); + return Status::OK(); } diff --git a/src/yb/tserver/tablet_server.h b/src/yb/tserver/tablet_server.h index 232ee06fe9c0..b22c356f94e0 100644 --- a/src/yb/tserver/tablet_server.h +++ b/src/yb/tserver/tablet_server.h @@ -201,6 +201,8 @@ class TabletServer : public server::RpcAndWebServerBase, public TabletServerIf { return log_prefix_; } + const HostPort pgsql_proxy_bind_address() const { return pgsql_proxy_bind_address_; } + protected: virtual CHECKED_STATUS RegisterServices(); @@ -272,6 +274,9 @@ class TabletServer : public server::RpcAndWebServerBase, public TabletServerIf { std::string log_prefix_; + // Bind address of postgres proxy under this tserver. + HostPort pgsql_proxy_bind_address_; + DISALLOW_COPY_AND_ASSIGN(TabletServer); }; diff --git a/src/yb/tserver/tablet_service.cc b/src/yb/tserver/tablet_service.cc index 7a4cf0d041f4..f8a5556475ae 100644 --- a/src/yb/tserver/tablet_service.cc +++ b/src/yb/tserver/tablet_service.cc @@ -733,8 +733,38 @@ void TabletServiceAdminImpl::BackfillIndex( return; } - Result resume_from = tablet.peer->tablet()->BackfillIndexes( - indexes_to_backfill, req->start_key(), deadline, read_at); + Result resume_from = STATUS(InternalError, "placeholder"); + if (tablet.peer->tablet()->table_type() == TableType::PGSQL_TABLE_TYPE) { + if (!req->has_namespace_name()) { + SetupErrorAndRespond( + resp->mutable_error(), + STATUS( + InvalidArgument, + "Attempted backfill on YSQL table without supplying database name"), + TabletServerErrorPB::OPERATION_NOT_SUPPORTED, + &context); + return; + } + // TODO(jason): handle missing pgsql_proxy_bind_address (I think it is possible when disabling + // YSQL). + resume_from = tablet.peer->tablet()->BackfillIndexesForYsql( + indexes_to_backfill, + req->start_key(), + deadline, + read_at, + server_->pgsql_proxy_bind_address(), + req->namespace_name()); + } else if (tablet.peer->tablet()->table_type() == TableType::YQL_TABLE_TYPE) { + resume_from = tablet.peer->tablet()->BackfillIndexes( + indexes_to_backfill, req->start_key(), deadline, read_at); + } else { + SetupErrorAndRespond( + resp->mutable_error(), + STATUS(InvalidArgument, "Attempted backfill on tablet of invalid table type"), + TabletServerErrorPB::OPERATION_NOT_SUPPORTED, + &context); + return; + } DVLOG(1) << "Tablet " << tablet.peer->tablet_id() << ". Backfilled indexes for : " << yb::ToString(index_ids) << " got " << resume_from.ToString(); diff --git a/src/yb/tserver/tserver_admin.proto b/src/yb/tserver/tserver_admin.proto index 94699689c565..3959b86d4c18 100644 --- a/src/yb/tserver/tserver_admin.proto +++ b/src/yb/tserver/tserver_admin.proto @@ -108,6 +108,7 @@ message BackfillIndexRequestPB { required bytes tablet_id = 2; + // Indexes on the _same table_ to backfill. repeated IndexInfoPB indexes = 3; optional uint32 schema_version = 4; @@ -118,6 +119,9 @@ message BackfillIndexRequestPB { optional bytes end_key = 7; optional fixed64 propagated_hybrid_time = 8; + + // Currently only used for YSQL. + optional string namespace_name = 9; } message BackfillIndexResponsePB { diff --git a/src/yb/util/CMakeLists.txt b/src/yb/util/CMakeLists.txt index c870fae92663..73b40e94b49b 100644 --- a/src/yb/util/CMakeLists.txt +++ b/src/yb/util/CMakeLists.txt @@ -213,6 +213,7 @@ set(UTIL_SRCS path_util.cc pb_util-internal.cc pb_util.cc + pg_connstr.cc physical_time.cc port_picker.cc priority_thread_pool.cc diff --git a/src/yb/util/pg_connstr.cc b/src/yb/util/pg_connstr.cc new file mode 100644 index 000000000000..31aef90670b5 --- /dev/null +++ b/src/yb/util/pg_connstr.cc @@ -0,0 +1,41 @@ +// Copyright (c) YugaByte, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations +// under the License. +// + +#include "yb/util/pg_connstr.h" + +#include + +namespace yb { + +namespace { + +// Taken from by Gauthier Boaglio. +std::string ReplaceAll(std::string str, const std::string& from, const std::string& to) { + size_t start_pos = 0; + while ((start_pos = str.find(from, start_pos)) != std::string::npos) { + str.replace(start_pos, from.length(), to); + start_pos += to.length(); // Handles case where 'to' is a substring of 'from' + } + return str; +} + +} // anonymous namespace + +std::string EscapePgConnStrValue(const std::string input) { + std::string output = input; + output = ReplaceAll(output, "\\", "\\\\"); + output = ReplaceAll(output, "'", "\\'"); + return output; +} + +} // namespace yb diff --git a/src/yb/util/pg_connstr.h b/src/yb/util/pg_connstr.h new file mode 100644 index 000000000000..1baa076b000e --- /dev/null +++ b/src/yb/util/pg_connstr.h @@ -0,0 +1,26 @@ +// Copyright (c) YugaByte, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations +// under the License. +// + +#ifndef YB_UTIL_PG_CONNSTR_H +#define YB_UTIL_PG_CONNSTR_H + +#include + +namespace yb { + +// TODO(jason): integrate better with Format (yb/util/format.h). +std::string EscapePgConnStrValue(const std::string input); + +} // namespace yb + +#endif // YB_UTIL_PG_CONNSTR_H diff --git a/src/yb/yql/pggate/pg_ddl.cc b/src/yb/yql/pggate/pg_ddl.cc index 865b75d55d5f..41236645ac15 100644 --- a/src/yb/yql/pggate/pg_ddl.cc +++ b/src/yb/yql/pggate/pg_ddl.cc @@ -283,6 +283,8 @@ Status PgCreateTable::Exec() { // For index, set indexed (base) table id. if (indexed_table_id()) { table_creator->indexed_table_id(indexed_table_id()->GetYBTableId()); + // TODO(jason): only skip waiting on concurrent index build. + table_creator->wait(false); } if (is_unique_index()) { table_creator->is_unique_index(true); diff --git a/src/yb/yql/pggate/pg_dml_write.cc b/src/yb/yql/pggate/pg_dml_write.cc index 333c72166a5c..83e26e2d5760 100644 --- a/src/yb/yql/pggate/pg_dml_write.cc +++ b/src/yb/yql/pggate/pg_dml_write.cc @@ -141,6 +141,12 @@ Status PgDmlWrite::Exec(bool force_non_bufferable) { return Status::OK(); } +Status PgDmlWrite::SetWriteTime(const HybridTime& write_time) { + SCHECK(doc_op_.get() != nullptr, RuntimeError, "expected doc_op_ to be initialized"); + down_cast(doc_op_.get())->SetWriteTime(write_time); + return Status::OK(); +} + void PgDmlWrite::AllocWriteRequest() { auto wop = AllocWriteOperation(); DCHECK(wop); diff --git a/src/yb/yql/pggate/pg_dml_write.h b/src/yb/yql/pggate/pg_dml_write.h index 84f75e1523f3..1c293b992e01 100644 --- a/src/yb/yql/pggate/pg_dml_write.h +++ b/src/yb/yql/pggate/pg_dml_write.h @@ -50,6 +50,8 @@ class PgDmlWrite : public PgDml { return rows_affected_count_; } + CHECKED_STATUS SetWriteTime(const HybridTime& write_time); + protected: // Constructor. PgDmlWrite(PgSession::ScopedRefPtr pg_session, diff --git a/src/yb/yql/pggate/pg_doc_op.cc b/src/yb/yql/pggate/pg_doc_op.cc index 3db6e25a62ee..ecee7675f229 100644 --- a/src/yb/yql/pggate/pg_doc_op.cc +++ b/src/yb/yql/pggate/pg_doc_op.cc @@ -295,6 +295,10 @@ Result> PgDocOp::ProcessResponseResult() { return result; } +void PgDocOp::SetReadTime() { + read_time_ = exec_params_.read_time; +} + //------------------------------------------------------------------------------------------------- PgDocReadOp::PgDocReadOp(const PgSession::ScopedRefPtr& pg_session, @@ -309,6 +313,8 @@ void PgDocReadOp::ExecuteInit(const PgExecParameters *exec_params) { template_op_->mutable_request()->set_return_paging_state(true); SetRequestPrefetchLimit(); SetRowMark(); + SetReadTime(); + SetPartitionKey(); } Result> PgDocReadOp::ProcessResponseImpl() { @@ -624,6 +630,13 @@ Status PgDocReadOp::ProcessResponsePagingState() { } } + // If partition key of tablet to scan is specified, then we should be done. This is because, + // curently, only `BACKFILL INDEX ... PARTITION ...` statements set `partition_key`, and they scan + // a single tablet. + if (partition_key_) { + has_more_data = false; + } + if (has_more_data || send_count < active_op_count_) { // Move inactive ops to the end of pgsql_ops_ to make room for new set of arguments. MoveInactiveOpsOutside(); @@ -672,6 +685,20 @@ void PgDocReadOp::SetRowMark() { } } +void PgDocReadOp::SetReadTime() { + PgDocOp::SetReadTime(); + if (read_time_) { + template_op_->SetReadTime(ReadHybridTime::FromUint64(read_time_)); + } +} + +void PgDocReadOp::SetPartitionKey() { + if (exec_params_.partition_key != NULL) { + partition_key_ = a2b_hex(exec_params_.partition_key); + template_op_->SetPartitionKey(partition_key_.get()); + } +} + Status PgDocReadOp::ResetInactivePgsqlOps() { // Clear the existing ybctids. for (int op_index = active_op_count_; op_index < pgsql_ops_.size(); op_index++) { @@ -745,5 +772,10 @@ Status PgDocWriteOp::CreateRequests() { return Status::OK(); } +void PgDocWriteOp::SetWriteTime(const HybridTime& write_time) { + write_op_->SetWriteTime(write_time); +} + + } // namespace pggate } // namespace yb diff --git a/src/yb/yql/pggate/pg_doc_op.h b/src/yb/yql/pggate/pg_doc_op.h index 6e89f8031d9f..babeba668416 100644 --- a/src/yb/yql/pggate/pg_doc_op.h +++ b/src/yb/yql/pggate/pg_doc_op.h @@ -17,6 +17,8 @@ #include +#include + #include "yb/util/locks.h" #include "yb/client/yb_op.h" #include "yb/yql/pggate/pg_session.h" @@ -262,6 +264,8 @@ class PgDocOp : public std::enable_shared_from_this { // Process the result set in server response. Result> ProcessResponseResult(); + void SetReadTime(); + private: CHECKED_STATUS SendRequest(bool force_non_bufferable); @@ -421,6 +425,12 @@ class PgDocReadOp : public PgDocOp { // Set the row_mark_type field of our read request based on our exec control parameter. void SetRowMark(); + // Set the read_time for our read request based on our exec control parameter. + void SetReadTime(); + + // Set the partition key for our read request based on our exec control paramater. + void SetPartitionKey(); + // Clone the template into actual requests to be sent to server. std::unique_ptr CloneFromTemplate() override { return template_op_->DeepCopy(); @@ -459,6 +469,9 @@ class PgDocReadOp : public PgDocOp { // For a query clause "h1 = 1 AND h2 IN (2,3) AND h3 IN (4,5,6) AND h4 = 7", // this will be initialized to [[1], [2, 3], [4, 5, 6], [7]] std::vector> partition_exprs_; + + // The partition key identifying the sole tablet to read from. + boost::optional partition_key_; }; //-------------------------------------------------------------------------------------------------- @@ -478,6 +491,9 @@ class PgDocWriteOp : public PgDocOp { const PgObjectId& relation_id, std::unique_ptr write_op); + // Set write time. + void SetWriteTime(const HybridTime& write_time); + private: // Process response implementation. Result> ProcessResponseImpl() override; diff --git a/src/yb/yql/pggate/pggate.cc b/src/yb/yql/pggate/pggate.cc index c3edfefd69f6..2d706a90433f 100644 --- a/src/yb/yql/pggate/pggate.cc +++ b/src/yb/yql/pggate/pggate.cc @@ -825,6 +825,14 @@ Status PgApiImpl::InsertStmtSetUpsertMode(PgStatement *handle) { return Status::OK(); } +Status PgApiImpl::InsertStmtSetWriteTime(PgStatement *handle, const HybridTime write_time) { + if (!PgStatement::IsValidStmt(handle, StmtOp::STMT_INSERT)) { + // Invalid handle. + return STATUS(InvalidArgument, "Invalid statement handle"); + } + RETURN_NOT_OK(down_cast(handle)->SetWriteTime(write_time)); + return Status::OK(); +} // Update ------------------------------------------------------------------------------------------ diff --git a/src/yb/yql/pggate/pggate.h b/src/yb/yql/pggate/pggate.h index 3b9c51834ac1..aa4543a8ef22 100644 --- a/src/yb/yql/pggate/pggate.h +++ b/src/yb/yql/pggate/pggate.h @@ -358,6 +358,8 @@ class PgApiImpl { CHECKED_STATUS InsertStmtSetUpsertMode(PgStatement *handle); + CHECKED_STATUS InsertStmtSetWriteTime(PgStatement *handle, const HybridTime write_time); + //------------------------------------------------------------------------------------------------ // Update. CHECKED_STATUS NewUpdate(const PgObjectId& table_id, diff --git a/src/yb/yql/pggate/ybc_pg_typedefs.h b/src/yb/yql/pggate/ybc_pg_typedefs.h index eefcda543ac9..af2c9e6edd3d 100644 --- a/src/yb/yql/pggate/ybc_pg_typedefs.h +++ b/src/yb/yql/pggate/ybc_pg_typedefs.h @@ -216,8 +216,12 @@ typedef struct PgExecParameters { // For now we only support one rowmark. #ifdef __cplusplus int rowmark = -1; + uint64_t read_time = 0; + char *partition_key = NULL; #else int rowmark; + uint64_t read_time; + char *partition_key; #endif } YBCPgExecParameters; diff --git a/src/yb/yql/pggate/ybc_pggate.cc b/src/yb/yql/pggate/ybc_pggate.cc index 3acd01c4dfe7..0892fd08c089 100644 --- a/src/yb/yql/pggate/ybc_pggate.cc +++ b/src/yb/yql/pggate/ybc_pggate.cc @@ -571,6 +571,16 @@ YBCStatus YBCPgInsertStmtSetUpsertMode(YBCPgStatement handle) { return ToYBCStatus(pgapi->InsertStmtSetUpsertMode(handle)); } +YBCStatus YBCPgInsertStmtSetWriteTime(YBCPgStatement handle, const uint64_t write_time) { + HybridTime write_hybrid_time; + YBCStatus status = ToYBCStatus(write_hybrid_time.FromUint64(write_time)); + if (status) { + return status; + } else { + return ToYBCStatus(pgapi->InsertStmtSetWriteTime(handle, write_hybrid_time)); + } +} + // UPDATE Operations ------------------------------------------------------------------------------- YBCStatus YBCPgNewUpdate(const YBCPgOid database_oid, const YBCPgOid table_oid, diff --git a/src/yb/yql/pggate/ybc_pggate.h b/src/yb/yql/pggate/ybc_pggate.h index 8dd17de2656a..935b50a24d86 100644 --- a/src/yb/yql/pggate/ybc_pggate.h +++ b/src/yb/yql/pggate/ybc_pggate.h @@ -346,6 +346,8 @@ YBCStatus YBCPgExecInsert(YBCPgStatement handle); YBCStatus YBCPgInsertStmtSetUpsertMode(YBCPgStatement handle); +YBCStatus YBCPgInsertStmtSetWriteTime(YBCPgStatement handle, const uint64_t write_time); + // UPDATE ------------------------------------------------------------------------------------------ YBCStatus YBCPgNewUpdate(YBCPgOid database_oid, YBCPgOid table_oid, diff --git a/src/yb/yql/pgwrapper/libpq_utils.cc b/src/yb/yql/pgwrapper/libpq_utils.cc index 01c50a87b6bf..3cafd2658124 100644 --- a/src/yb/yql/pgwrapper/libpq_utils.cc +++ b/src/yb/yql/pgwrapper/libpq_utils.cc @@ -235,6 +235,30 @@ CHECKED_STATUS PGConn::RollbackTransaction() { return Execute("ROLLBACK"); } +Result PGConn::HasIndexScan(const std::string& query) { + constexpr int kExpectedColumns = 1; + auto res = VERIFY_RESULT(FetchFormat("EXPLAIN $0", query)); + + { + int fetched_columns = PQnfields(res.get()); + if (fetched_columns != kExpectedColumns) { + return STATUS_FORMAT( + InternalError, "Fetched $0 columns, expected $1", fetched_columns, kExpectedColumns); + } + } + + for (int line = 0; line < PQntuples(res.get()); ++line) { + std::string value = VERIFY_RESULT(GetString(res.get(), line, 0)); + if (value.find("Index Scan") != std::string::npos) { + return true; + } else if (value.find("Index Only Scan") != std::string::npos) { + return true; + } + } + return false; +} + + Status PGConn::CopyBegin(const std::string& command) { auto result = VERIFY_RESULT(CheckResult( PGResultPtr( diff --git a/src/yb/yql/pgwrapper/libpq_utils.h b/src/yb/yql/pgwrapper/libpq_utils.h index 6ff17e87c34a..c43b2afa76ce 100644 --- a/src/yb/yql/pgwrapper/libpq_utils.h +++ b/src/yb/yql/pgwrapper/libpq_utils.h @@ -101,6 +101,9 @@ class PGConn { CHECKED_STATUS CommitTransaction(); CHECKED_STATUS RollbackTransaction(); + // Would this query use an index [only] scan? + Result HasIndexScan(const std::string& query); + CHECKED_STATUS CopyBegin(const std::string& command); Result CopyEnd(); diff --git a/src/yb/yql/pgwrapper/pg_libpq-test.cc b/src/yb/yql/pgwrapper/pg_libpq-test.cc index dc12dc59377d..ece8255fafa5 100644 --- a/src/yb/yql/pgwrapper/pg_libpq-test.cc +++ b/src/yb/yql/pgwrapper/pg_libpq-test.cc @@ -1333,5 +1333,408 @@ TEST_F(PgLibPqTest, YB_DISABLE_TEST_IN_TSAN(LoadBalanceMultipleColocatedDB)) { ASSERT_EQ(ts_loads.size(), 3); } +// Override the base test to start a cluster with index backfill enabled. +class PgLibPqTestIndexBackfill : public PgLibPqTest { + public: + PgLibPqTestIndexBackfill() { + more_master_flags.push_back("--ysql_disable_index_backfill=false"); + more_tserver_flags.push_back("--ysql_disable_index_backfill=false"); + } + + void UpdateMiniClusterOptions(ExternalMiniClusterOptions* options) override { + options->extra_master_flags.insert( + std::end(options->extra_master_flags), + std::begin(more_master_flags), + std::end(more_master_flags)); + options->extra_tserver_flags.insert( + std::end(options->extra_tserver_flags), + std::begin(more_tserver_flags), + std::end(more_tserver_flags)); + } + + protected: + std::vector more_master_flags; + std::vector more_tserver_flags; +}; + +// Make sure that backfill works. +TEST_F_EX(PgLibPqTest, + YB_DISABLE_TEST_IN_TSAN(BackfillSimple), + PgLibPqTestIndexBackfill) { + const std::string kNamespaceName = "yugabyte"; + const std::string kTableName = "t"; + + auto client = ASSERT_RESULT(cluster_->CreateClient()); + auto conn = ASSERT_RESULT(ConnectToDB(kNamespaceName)); + + ASSERT_OK(conn.ExecuteFormat("CREATE TABLE $0 (c char, i int, p point)", kTableName)); + ASSERT_OK(conn.ExecuteFormat("INSERT INTO $0 VALUES ('a', 0, '(1, 2)')", kTableName)); + ASSERT_OK(conn.ExecuteFormat("INSERT INTO $0 VALUES ('y', -5, '(0, -2)')", kTableName)); + ASSERT_OK(conn.ExecuteFormat("INSERT INTO $0 VALUES ('b', 100, '(868, 9843)')", kTableName)); + ASSERT_OK(conn.ExecuteFormat("CREATE INDEX ON $0 (c ASC)", kTableName)); + + // Index scan to verify contents of index table. + const std::string query = Format("SELECT * FROM $0 ORDER BY c", kTableName); + ASSERT_TRUE(ASSERT_RESULT(conn.HasIndexScan(query))); + auto res = ASSERT_RESULT(conn.Fetch(query)); + ASSERT_EQ(PQntuples(res.get()), 3); + ASSERT_EQ(PQnfields(res.get()), 3); + std::array values = { + ASSERT_RESULT(GetInt32(res.get(), 0, 1)), + ASSERT_RESULT(GetInt32(res.get(), 1, 1)), + ASSERT_RESULT(GetInt32(res.get(), 2, 1)), + }; + ASSERT_EQ(values[0], 0); + ASSERT_EQ(values[1], 100); + ASSERT_EQ(values[2], -5); +} + +// Make sure that partial indexes work for index backfill. +TEST_F_EX(PgLibPqTest, + YB_DISABLE_TEST_IN_TSAN(BackfillPartial), + PgLibPqTestIndexBackfill) { + constexpr int kNumRows = 7; + const std::string kNamespaceName = "yugabyte"; + const std::string kTableName = "t"; + + auto client = ASSERT_RESULT(cluster_->CreateClient()); + auto conn = ASSERT_RESULT(ConnectToDB(kNamespaceName)); + + ASSERT_OK(conn.ExecuteFormat("CREATE TABLE $0 (i int, j int)", kTableName)); + ASSERT_OK(conn.ExecuteFormat( + "INSERT INTO $0 VALUES (generate_series(1, $1), generate_series(-1, -$1, -1))", + kTableName, + kNumRows)); + ASSERT_OK(conn.ExecuteFormat("CREATE INDEX ON $0 (i ASC) WHERE j > -5", kTableName)); + + // Index scan to verify contents of index table. + { + const std::string query = Format("SELECT j from $0 WHERE j > -3 ORDER BY i", kTableName); + ASSERT_TRUE(ASSERT_RESULT(conn.HasIndexScan(query))); + auto res = ASSERT_RESULT(conn.Fetch(query)); + ASSERT_EQ(PQntuples(res.get()), 2); + ASSERT_EQ(PQnfields(res.get()), 1); + std::array values = { + ASSERT_RESULT(GetInt32(res.get(), 0, 0)), + ASSERT_RESULT(GetInt32(res.get(), 1, 0)), + }; + ASSERT_EQ(values[0], -1); + ASSERT_EQ(values[1], -2); + } + { + const std::string query = Format( + "SELECT i from $0 WHERE j > -5 ORDER BY i DESC LIMIT 2", + kTableName); + ASSERT_TRUE(ASSERT_RESULT(conn.HasIndexScan(query))); + auto res = ASSERT_RESULT(conn.Fetch(query)); + ASSERT_EQ(PQntuples(res.get()), 2); + ASSERT_EQ(PQnfields(res.get()), 1); + std::array values = { + ASSERT_RESULT(GetInt32(res.get(), 0, 0)), + ASSERT_RESULT(GetInt32(res.get(), 1, 0)), + }; + ASSERT_EQ(values[0], 4); + ASSERT_EQ(values[1], 3); + } +} + +// Make sure that expression indexes work for index backfill. +TEST_F_EX(PgLibPqTest, + YB_DISABLE_TEST_IN_TSAN(BackfillExpression), + PgLibPqTestIndexBackfill) { + constexpr int kNumRows = 9; + const std::string kNamespaceName = "yugabyte"; + const std::string kTableName = "t"; + + auto client = ASSERT_RESULT(cluster_->CreateClient()); + auto conn = ASSERT_RESULT(ConnectToDB(kNamespaceName)); + + ASSERT_OK(conn.ExecuteFormat("CREATE TABLE $0 (i int, j int)", kTableName)); + ASSERT_OK(conn.ExecuteFormat( + "INSERT INTO $0 VALUES (generate_series(1, $1), generate_series(11, 10 + $1))", + kTableName, + kNumRows)); + ASSERT_OK(conn.ExecuteFormat("CREATE INDEX ON $0 ((j % i))", kTableName)); + + // Index scan to verify contents of index table. + const std::string query = Format( + "SELECT j, i, j % i as mod from $0 WHERE j % i = 2 ORDER BY i", + kTableName); + ASSERT_TRUE(ASSERT_RESULT(conn.HasIndexScan(query))); + auto res = ASSERT_RESULT(conn.Fetch(query)); + ASSERT_EQ(PQntuples(res.get()), 2); + ASSERT_EQ(PQnfields(res.get()), 3); + std::array, 2> values = {{ + { + ASSERT_RESULT(GetInt32(res.get(), 0, 0)), + ASSERT_RESULT(GetInt32(res.get(), 0, 1)), + ASSERT_RESULT(GetInt32(res.get(), 0, 2)), + }, + { + ASSERT_RESULT(GetInt32(res.get(), 1, 0)), + ASSERT_RESULT(GetInt32(res.get(), 1, 1)), + ASSERT_RESULT(GetInt32(res.get(), 1, 2)), + }, + }}; + ASSERT_EQ(values[0][0], 14); + ASSERT_EQ(values[0][1], 4); + ASSERT_EQ(values[0][2], 2); + ASSERT_EQ(values[1][0], 18); + ASSERT_EQ(values[1][1], 8); + ASSERT_EQ(values[1][2], 2); +} + +// Override the index backfill test to have slower backfill-related operations +class PgLibPqTestIndexBackfillSlow : public PgLibPqTestIndexBackfill { + public: + PgLibPqTestIndexBackfillSlow() { + more_master_flags.push_back("--TEST_slowdown_backfill_alter_table_rpcs_ms=7000"); + more_tserver_flags.push_back("--TEST_slowdown_backfill_by_ms=7000"); + } + + void UpdateMiniClusterOptions(ExternalMiniClusterOptions* options) override { + options->extra_master_flags.insert( + std::end(options->extra_master_flags), + std::begin(more_master_flags), + std::end(more_master_flags)); + options->extra_tserver_flags.insert( + std::end(options->extra_tserver_flags), + std::begin(more_tserver_flags), + std::end(more_tserver_flags)); + } +}; + +// Make sure that read time (and write time) for backfill works. Simulate this situation: +// Session A Session B +// -------------------------- --------------------------------- +// CREATE INDEX +// - DELETE_ONLY perm +// - WRITE_DELETE perm +// - BACKFILL perm +// - get safe time for read +// UPDATE a row of the indexed table +// - do the actual backfill +// - READ_WRITE_DELETE perm +// The backfill should use the values before update when writing to the index. The update should +// write and delete to the index because of permissions. Since backfill writes with an ancient +// timestamp, the update should appear to have happened after the backfill. +TEST_F_EX(PgLibPqTest, + YB_DISABLE_TEST_IN_TSAN(BackfillReadTime), + PgLibPqTestIndexBackfillSlow) { + const std::string kIndexName = "rn_idx"; + const std::string kNamespaceName = "yugabyte"; + const std::string kTableName = "rn"; + + auto client = ASSERT_RESULT(cluster_->CreateClient()); + auto conn = ASSERT_RESULT(ConnectToDB(kNamespaceName)); + + ASSERT_OK(conn.ExecuteFormat("CREATE TABLE $0 (i int, j int, PRIMARY KEY (i ASC))", kTableName)); + for (auto pair = std::make_pair(0, 10); pair.first < 6; ++pair.first, ++pair.second) { + ASSERT_OK(conn.ExecuteFormat("INSERT INTO $0 VALUES ($1, $2)", + kTableName, + pair.first, + pair.second)); + } + + std::vector threads; + threads.emplace_back([&] { + auto conn = ASSERT_RESULT(ConnectToDB(kNamespaceName)); + ASSERT_OK(conn.ExecuteFormat("CREATE INDEX $0 ON $1 (j ASC)", kIndexName, kTableName)); + { + // Index scan to verify contents of index table. + const std::string query = Format("SELECT * FROM $0 WHERE j = 113", kTableName); + ASSERT_TRUE(ASSERT_RESULT(conn.HasIndexScan(query))); + auto res = ASSERT_RESULT(conn.Fetch(query)); + auto lines = PQntuples(res.get()); + ASSERT_EQ(1, lines); + auto columns = PQnfields(res.get()); + ASSERT_EQ(2, columns); + auto key = ASSERT_RESULT(GetInt32(res.get(), 0, 0)); + ASSERT_EQ(key, 3); + // Make sure that the update is visible. + auto value = ASSERT_RESULT(GetInt32(res.get(), 0, 1)); + ASSERT_EQ(value, 113); + } + }); + threads.emplace_back([&] { + // Sleep to avoid querying for index too early. + std::this_thread::sleep_for(7s * 2); + + std::string table_id = + ASSERT_RESULT(GetTableIdByTableName(client.get(), kNamespaceName, kTableName)); + std::string index_id = + ASSERT_RESULT(GetTableIdByTableName(client.get(), kNamespaceName, kIndexName)); + + // Wait for backfill stage. + { + IndexPermissions actual_permissions = + ASSERT_RESULT(client->WaitUntilIndexPermissionsAtLeast( + table_id, + index_id, + IndexPermissions::INDEX_PERM_DO_BACKFILL)); + ASSERT_LE(actual_permissions, IndexPermissions::INDEX_PERM_READ_WRITE_AND_DELETE) + << "index creation failed"; + ASSERT_NE(actual_permissions, IndexPermissions::INDEX_PERM_READ_WRITE_AND_DELETE) + << "index finished backfilling too quickly"; + } + + // Give the backfill stage enough time to get a read time. + // TODO(jason): come up with some way to wait until the read time is chosen rather than relying + // on a brittle sleep. + std::this_thread::sleep_for(5s); + + auto conn = ASSERT_RESULT(ConnectToDB(kNamespaceName)); + ASSERT_OK(conn.ExecuteFormat("UPDATE $0 SET j = j + 100 WHERE i = 3", kTableName)); + + // It should still be in the backfill stage, hopefully before the actual backfill started. + { + IndexPermissions actual_permissions = + ASSERT_RESULT(client->GetIndexPermissions( + table_id, + index_id)); + ASSERT_EQ( + actual_permissions, + IndexPermissions::INDEX_PERM_DO_BACKFILL); + } + }); + + for (auto& thread : threads) { + thread.join(); + } +} + +// Make sure that updates at each stage of multi-stage index create work. Simulate this situation: +// Session A Session B +// -------------------------- --------------------------------- +// CREATE INDEX +// - DELETE_ONLY perm +// UPDATE a row of the indexed table +// - WRITE_DELETE perm +// UPDATE a row of the indexed table +// - BACKFILL perm +// UPDATE a row of the indexed table +// - READ_WRITE_DELETE perm +// UPDATE a row of the indexed table +// Updates should succeed and get written to the index. +TEST_F_EX(PgLibPqTest, + YB_DISABLE_TEST_IN_TSAN(BackfillPermissions), + PgLibPqTestIndexBackfillSlow) { + const auto kGetTableIdWaitTime = 10s; + const auto kThreadWaitTime = 60s; + const std::array, 4> permission_key_pairs = { + std::make_pair(IndexPermissions::INDEX_PERM_DELETE_ONLY, 2), + std::make_pair(IndexPermissions::INDEX_PERM_WRITE_AND_DELETE, 3), + std::make_pair(IndexPermissions::INDEX_PERM_DO_BACKFILL, 4), + std::make_pair(IndexPermissions::INDEX_PERM_READ_WRITE_AND_DELETE, 5), + }; + const std::string kIndexName = "rn_idx"; + const std::string kNamespaceName = "yugabyte"; + const std::string kTableName = "rn"; + + auto client = ASSERT_RESULT(cluster_->CreateClient()); + auto conn = ASSERT_RESULT(ConnectToDB(kNamespaceName)); + + ASSERT_OK(conn.ExecuteFormat("CREATE TABLE $0 (i int, j int, PRIMARY KEY (i ASC))", kTableName)); + for (auto pair = std::make_pair(0, 10); pair.first < 6; ++pair.first, ++pair.second) { + ASSERT_OK(conn.ExecuteFormat("INSERT INTO $0 VALUES ($1, $2)", + kTableName, + pair.first, + pair.second)); + } + + auto wait_for_perm = + [&](const TableId& table_id, + const TableId &index_id, + const IndexPermissions& target_permission) -> Status { + IndexPermissions actual_permission = + VERIFY_RESULT(client->WaitUntilIndexPermissionsAtLeast( + table_id, + index_id, + target_permission)); + if (actual_permission > target_permission) { + return STATUS(RuntimeError, "Exceeded target permission"); + } else { + return Status::OK(); + } + }; + auto assert_perm = + [&](const TableId& table_id, + const TableId &index_id, + const IndexPermissions& target_permission) { + IndexPermissions actual_permission = + ASSERT_RESULT(client->GetIndexPermissions( + table_id, + index_id)); + ASSERT_EQ( + actual_permission, + target_permission); + }; + + std::atomic updates(0); + TestThreadHolder thread_holder; + thread_holder.AddThreadFunctor([&] { + auto conn = ASSERT_RESULT(ConnectToDB(kNamespaceName)); + ASSERT_OK(conn.ExecuteFormat("CREATE INDEX $0 ON $1 (j ASC)", kIndexName, kTableName)); + }); + thread_holder.AddThreadFunctor([&] { + // Wait to avoid querying for index too early. + ASSERT_OK(WaitFor( + [&]() -> Result { + if (GetTableIdByTableName(client.get(), kNamespaceName, kIndexName).ok()) { + return true; + } else { + return false; + } + }, + kGetTableIdWaitTime, + "Wait to get index table id by name")); + + std::string table_id = + ASSERT_RESULT(GetTableIdByTableName(client.get(), kNamespaceName, kTableName)); + std::string index_id = + ASSERT_RESULT(GetTableIdByTableName(client.get(), kNamespaceName, kIndexName)); + + for (auto pair : permission_key_pairs) { + IndexPermissions permission = pair.first; + int key = pair.second; + + ASSERT_OK(wait_for_perm(table_id, index_id, permission)); + + // Create a new connection every loop iteration to avoid stale table cache issues. + // TODO(jason): no longer create new connections after closing issue #4828 (move this outside + // the loop). + auto conn = ASSERT_RESULT(ConnectToDB(kNamespaceName)); + LOG(INFO) << "running UPDATE on i = " << key; + ASSERT_OK(conn.ExecuteFormat("UPDATE $0 SET j = j + 100 WHERE i = $1", kTableName, key)); + + assert_perm(table_id, index_id, permission); + updates++; + } + }); + + thread_holder.WaitAndStop(kThreadWaitTime); + + ASSERT_EQ(updates.load(std::memory_order_acquire), permission_key_pairs.size()); + + for (auto pair : permission_key_pairs) { + int expected_key = pair.second; + + // Verify contents of index table. + const std::string query = Format( + "WITH j_idx AS (SELECT * FROM $0 ORDER BY j) SELECT j FROM j_idx WHERE i = $1", + kTableName, + expected_key); + ASSERT_TRUE(ASSERT_RESULT(conn.HasIndexScan(query))); + auto res = ASSERT_RESULT(conn.Fetch(query)); + int lines = PQntuples(res.get()); + ASSERT_EQ(1, lines); + int columns = PQnfields(res.get()); + ASSERT_EQ(1, columns); + // Make sure that the update is visible. + int value = ASSERT_RESULT(GetInt32(res.get(), 0, 0)); + ASSERT_EQ(value, expected_key + 110); + } +} + } // namespace pgwrapper } // namespace yb