Skip to content

Commit

Permalink
[#23192] YSQL: Simplify/cleanup code in PgDml/PgSelect/PgSelectIndex etc
Browse files Browse the repository at this point in the history
Summary:
The diffs major changes are

- Substitute `PgPrepareParameters` structure with the PgDml::PrepareParameters which contains only required fields (avoid having multiple source of same data)
- Move `LoadTable` into `PgDml` class to avoid copy-paste implementations in subclasses
- Remove redundant destructors
- Change interface of several functions/methods from accepting output params to using Result (i.e. `Result<bool> SampleNextBlock()` instead of `Status SampleNextBlock(bool *has_more)`)
- Add missed includes
- Fix `*` and `&` alignment (i.e. use `auto&` vs `auto &`)
- Make several methods `private` instead of `public`

**Note:** This diff doesn't change any logic. No new unit tests is required.
Jira: DB-12133

Test Plan: Jenkins

Reviewers: amartsinchyk, telgersma

Reviewed By: amartsinchyk

Subscribers: yql

Tags: #jenkins-ready

Differential Revision: https://phorge.dev.yugabyte.com/D36545
  • Loading branch information
d-uspenskiy committed Jul 16, 2024
1 parent db445ce commit 47da28c
Show file tree
Hide file tree
Showing 18 changed files with 508 additions and 697 deletions.
129 changes: 48 additions & 81 deletions src/yb/yql/pggate/pg_dml.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,54 +27,27 @@
#include "yb/yql/pggate/util/pg_doc_data.h"
#include "yb/yql/pggate/ybc_pggate.h"

using std::vector;
namespace yb::pggate {

namespace yb {
namespace pggate {

//--------------------------------------------------------------------------------------------------
// PgDml
//--------------------------------------------------------------------------------------------------

PgDml::PgDml(PgSession::ScopedRefPtr pg_session,
const PgObjectId& table_id,
bool is_region_local)
: PgStatement(std::move(pg_session)), table_id_(table_id), is_region_local_(is_region_local) {
}

PgDml::PgDml(PgSession::ScopedRefPtr pg_session,
const PgObjectId& table_id,
const PgObjectId& index_id,
const PgPrepareParameters *prepare_params,
bool is_region_local)
: PgDml(pg_session, table_id, is_region_local) {

if (prepare_params) {
prepare_params_ = *prepare_params;
// Primary index does not have its own data table.
if (prepare_params_.use_secondary_index) {
index_id_ = index_id;
}
}
}
PgDml::PgDml(
PgSession::ScopedRefPtr pg_session, const PgObjectId& table_id, bool is_region_local,
const PrepareParameters& prepare_params, const PgObjectId& secondary_index_id)
: PgStatement(std::move(pg_session)),
table_id_(table_id), secondary_index_id_(secondary_index_id),
prepare_params_(prepare_params), is_region_local_(is_region_local) {}

PgDml::~PgDml() = default;

//--------------------------------------------------------------------------------------------------

Status PgDml::AppendTarget(PgExpr *target) {
Status PgDml::AppendTarget(PgExpr* target) {
// Except for base_ctid, all targets should be appended to this DML.
if (target_ && (prepare_params_.index_only_scan || !target->is_ybbasetid())) {
RETURN_NOT_OK(AppendTargetPB(target));
} else {
// Append base_ctid to the index_query.
RETURN_NOT_OK(secondary_index_query_->AppendTargetPB(target));
}

return Status::OK();
// base_ctid goes to the index_query.
return target_ && (prepare_params_.index_only_scan || !target->is_ybbasetid())
? AppendTargetPB(target) : secondary_index_query_->AppendTargetPB(target);
}

Status PgDml::AppendTargetPB(PgExpr *target) {
Status PgDml::AppendTargetPB(PgExpr* target) {
// Append to targets_.
bool is_aggregate = target->is_aggregate();
if (targets_.empty()) {
Expand All @@ -101,7 +74,7 @@ Status PgDml::AppendTargetPB(PgExpr *target) {
return target->PrepareForRead(this, AllocTargetPB());
}

Status PgDml::AppendQual(PgExpr *qual, bool is_primary) {
Status PgDml::AppendQual(PgExpr* qual, bool is_primary) {
if (!is_primary) {
DCHECK(secondary_index_query_) << "The secondary index query is expected";
return secondary_index_query_->AppendQual(qual, true);
Expand All @@ -127,7 +100,7 @@ Status PgDml::AppendColumnRef(PgColumnRef* colref, bool is_primary) {
// Postgres attribute number, this is column id to refer the column from Postgres code
int attr_num = colref->attr_num();
// Retrieve column metadata from the target relation metadata
PgColumn& col = VERIFY_RESULT(target_.ColumnForAttr(attr_num));
auto& col = VERIFY_RESULT_REF(target_.ColumnForAttr(attr_num));
if (!col.is_virtual_column()) {
// Do not overwrite Postgres
if (!col.has_pg_type_info()) {
Expand All @@ -148,9 +121,9 @@ Status PgDml::AppendColumnRef(PgColumnRef* colref, bool is_primary) {
return Status::OK();
}

Result<const PgColumn&> PgDml::PrepareColumnForRead(int attr_num, LWPgsqlExpressionPB *target_pb) {
Result<const PgColumn&> PgDml::PrepareColumnForRead(int attr_num, LWPgsqlExpressionPB* target_pb) {
// Find column from targeted table.
PgColumn& col = VERIFY_RESULT(target_.ColumnForAttr(attr_num));
auto& col = VERIFY_RESULT_REF(target_.ColumnForAttr(attr_num));

// Prepare protobuf to send to DocDB.
if (target_pb) {
Expand All @@ -165,9 +138,9 @@ Result<const PgColumn&> PgDml::PrepareColumnForRead(int attr_num, LWPgsqlExpress
return const_cast<const PgColumn&>(col);
}

Result<const PgColumn&> PgDml::PrepareColumnForRead(int attr_num, LWQLExpressionPB *target_pb) {
Result<const PgColumn&> PgDml::PrepareColumnForRead(int attr_num, LWQLExpressionPB* target_pb) {
// Find column from targeted table.
PgColumn& col = VERIFY_RESULT(target_.ColumnForAttr(attr_num));
auto& col = VERIFY_RESULT_REF(target_.ColumnForAttr(attr_num));

// Prepare protobuf to send to DocDB.
if (target_pb) {
Expand All @@ -182,7 +155,7 @@ Result<const PgColumn&> PgDml::PrepareColumnForRead(int attr_num, LWQLExpression
return const_cast<const PgColumn&>(col);
}

Status PgDml::PrepareColumnForWrite(PgColumn *pg_col, LWPgsqlExpressionPB *assign_pb) {
Status PgDml::PrepareColumnForWrite(PgColumn* pg_col, LWPgsqlExpressionPB* assign_pb) {
// Prepare protobuf to send to DocDB.
assign_pb->set_column_id(pg_col->id());

Expand All @@ -194,7 +167,7 @@ Status PgDml::PrepareColumnForWrite(PgColumn *pg_col, LWPgsqlExpressionPB *assig
return Status::OK();
}

void PgDml::ColumnRefsToPB(LWPgsqlColumnRefsPB *column_refs) {
void PgDml::ColumnRefsToPB(LWPgsqlColumnRefsPB* column_refs) {
column_refs->Clear();
for (const PgColumn& col : target_.columns()) {
if (col.read_requested() || col.write_requested()) {
Expand All @@ -206,7 +179,7 @@ void PgDml::ColumnRefsToPB(LWPgsqlColumnRefsPB *column_refs) {
void PgDml::ColRefsToPB() {
// Remove previously set column references in case if the statement is being reexecuted
ClearColRefPBs();
for (const PgColumn& col : target_.columns()) {
for (const auto& col : target_.columns()) {
// Only used columns are added to the request
if (col.read_requested() || col.write_requested()) {
// Allocate a protobuf entry
Expand All @@ -227,14 +200,14 @@ void PgDml::ColRefsToPB() {

//--------------------------------------------------------------------------------------------------

Status PgDml::BindColumn(int attr_num, PgExpr *attr_value) {
Status PgDml::BindColumn(int attr_num, PgExpr* attr_value) {
if (secondary_index_query_) {
// Bind by secondary key.
return secondary_index_query_->BindColumn(attr_num, attr_value);
}

// Find column to bind.
PgColumn& column = VERIFY_RESULT(bind_.ColumnForAttr(attr_num));
auto& column = VERIFY_RESULT_REF(bind_.ColumnForAttr(attr_num));

// Check datatype.
const auto attr_internal_type = attr_value->internal_type();
Expand Down Expand Up @@ -277,9 +250,9 @@ Status PgDml::BindTable() {

//--------------------------------------------------------------------------------------------------

Status PgDml::AssignColumn(int attr_num, PgExpr *attr_value) {
Status PgDml::AssignColumn(int attr_num, PgExpr* attr_value) {
// Find column from targeted table.
PgColumn& column = VERIFY_RESULT(target_.ColumnForAttr(attr_num));
auto& column = VERIFY_RESULT_REF(target_.ColumnForAttr(attr_num));

// Check datatype.
SCHECK_EQ(column.internal_type(), attr_value->internal_type(), Corruption,
Expand Down Expand Up @@ -326,18 +299,13 @@ Status PgDml::UpdateAssignPBs() {

//--------------------------------------------------------------------------------------------------

Result<bool> PgDml::ProcessSecondaryIndexRequest(const PgExecParameters *exec_params) {
Result<bool> PgDml::ProcessSecondaryIndexRequest(const PgExecParameters* exec_params) {
if (!secondary_index_query_) {
// Secondary INDEX is not used in this request.
return false;
}

// Execute query in PgGate.
// If index query is not yet executed, run it.
if (!secondary_index_query_->is_executed()) {
secondary_index_query_->set_is_executed(true);
RETURN_NOT_OK(secondary_index_query_->Exec(exec_params));
}
RETURN_NOT_OK(ExecSecondaryIndexOnce());

// Not processing index request if it does not require its own doc operator.
//
Expand All @@ -350,7 +318,8 @@ Result<bool> PgDml::ProcessSecondaryIndexRequest(const PgExecParameters *exec_pa

// When INDEX has its own doc_op, execute it to fetch next batch of ybctids which is then used
// to read data from the main table.
if (!VERIFY_RESULT(secondary_index_query_->FetchYbctidBatch(&retrieved_ybctids_))) {
retrieved_ybctids_ = VERIFY_RESULT(secondary_index_query_->FetchYbctidBatch());
if (!retrieved_ybctids_) {
// No more rows of ybctids.
return false;
}
Expand All @@ -359,8 +328,8 @@ Result<bool> PgDml::ProcessSecondaryIndexRequest(const PgExecParameters *exec_pa
return true;

// Update request with the new batch of ybctids to fetch the next batch of rows.
RETURN_NOT_OK(UpdateRequestWithYbctids(*retrieved_ybctids_,
KeepOrder(secondary_index_query_->KeepOrder())));
RETURN_NOT_OK(UpdateRequestWithYbctids(
*retrieved_ybctids_, KeepOrder(secondary_index_query_->KeepOrder())));

AtomicFlagSleepMs(&FLAGS_TEST_inject_delay_between_prepare_ybctid_execute_batch_ybctid_ms);
return true;
Expand All @@ -373,11 +342,8 @@ Status PgDml::UpdateRequestWithYbctids(const std::vector<Slice>& ybctids, KeepOr
}), ybctids.size()}, keep_order);
}

Status PgDml::Fetch(int32_t natts,
uint64_t *values,
bool *isnulls,
PgSysColumns *syscols,
bool *has_data) {
Status PgDml::Fetch(
int32_t natts, uint64_t* values, bool* isnulls, PgSysColumns* syscols, bool* has_data) {
// Each isnulls and values correspond (in order) to columns from the table schema.
// Initialize to nulls for any columns not present in result.
if (isnulls) {
Expand Down Expand Up @@ -433,7 +399,7 @@ Result<bool> PgDml::FetchDataFromServer() {
return true;
}

Result<bool> PgDml::GetNextRow(PgTuple *pg_tuple) {
Result<bool> PgDml::GetNextRow(PgTuple* pg_tuple) {
for (;;) {
for (auto rowset_iter = rowsets_.begin(); rowset_iter != rowsets_.end();) {
// Check if the rowset has any data.
Expand Down Expand Up @@ -478,24 +444,25 @@ Result<bool> PgDml::GetNextRow(PgTuple *pg_tuple) {
return false;
}

bool PgDml::has_aggregate_targets() const {
return has_aggregate_targets_;
}

bool PgDml::has_system_targets() const {
return has_system_targets_;
}

bool PgDml::has_secondary_index_with_doc_op() const {
return secondary_index_query_ && secondary_index_query_->has_doc_op();
}

Result<YBCPgColumnInfo> PgDml::GetColumnInfo(int attr_num) const {
if (secondary_index_query_) {
return secondary_index_query_->GetColumnInfo(attr_num);
return secondary_index_query_ ?
secondary_index_query_->GetColumnInfo(attr_num) : bind_->GetColumnInfo(attr_num);
}

Status PgDml::ExecSecondaryIndexOnce() {
if (is_secondary_index_executed_) {
return Status::OK();
}
return bind_->GetColumnInfo(attr_num);
is_secondary_index_executed_ = true;
return secondary_index_query_->Exec(pg_exec_params_);
}

Result<PgTableDescPtr> PgDml::LoadTable() {
return pg_session_->LoadTable(table_id_);
}

} // namespace pggate
} // namespace yb
} // namespace yb::pggate
Loading

0 comments on commit 47da28c

Please sign in to comment.