Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement support for RETURNING clause for MERGE #7275

Merged
merged 1 commit into from
Sep 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .unreleased/pr_7275
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Implements: #7275: Add support for RETURNING clause for MERGE
169 changes: 145 additions & 24 deletions src/import/ht_hypertable_modify.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
#include <postgres.h>
#include <catalog/pg_type.h>
#include <executor/nodeModifyTable.h>
#include <executor/tuptable.h>
#include <nodes/nodes.h>
#include <utils/snapmgr.h>

#include "ht_hypertable_modify.h"
Expand Down Expand Up @@ -337,6 +339,44 @@
ExecARDeleteTriggersCompat(estate, resultRelInfo, tupleid, oldtuple, ar_delete_trig_tcs, false);
}

#if PG17_GE
/*
* ExecProcessReturning --- evaluate a RETURNING list
*
* resultRelInfo: current result rel
* tupleSlot: slot holding tuple actually inserted/updated/deleted
* planSlot: slot holding tuple returned by top subplan node
*
* Note: If tupleSlot is NULL, the FDW should have already provided econtext's
* scan tuple.
*
* Returns a slot holding the result tuple
*
* copied verbatim from executor/nodeModifyTable.c
*/
static TupleTableSlot *
ExecProcessReturning(ResultRelInfo *resultRelInfo, TupleTableSlot *tupleSlot,
TupleTableSlot *planSlot)
{
ProjectionInfo *projectReturning = resultRelInfo->ri_projectReturning;
ExprContext *econtext = projectReturning->pi_exprContext;

/* Make tuple and any needed join variables available to ExecProject */
if (tupleSlot)
econtext->ecxt_scantuple = tupleSlot;
econtext->ecxt_outertuple = planSlot;

/*
* RETURNING expressions might reference the tableoid column, so
* reinitialize tts_tableOid before evaluating them.
*/
econtext->ecxt_scantuple->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);

/* Compute the RETURNING expressions */
return ExecProject(projectReturning);
}
#endif

#if PG15_GE

TupleTableSlot *ExecInsert(ModifyTableContext * context, ResultRelInfo * resultRelInfo,
Expand Down Expand Up @@ -367,10 +407,11 @@
* source tuple.
*/

bool
TupleTableSlot*
ht_ExecMergeMatched(ModifyTableContext * context, ResultRelInfo * resultRelInfo, ItemPointer tupleid,
bool canSetTag)
HeapTuple oldtuple, bool canSetTag, bool *matched)
{

ModifyTableState *mtstate = context->mtstate;
TupleTableSlot *newslot;
EState *estate = context->estate;
Expand All @@ -379,15 +420,22 @@
EPQState *epqstate = &mtstate->mt_epqstate;
ListCell *l;

TupleTableSlot *rslot = NULL;


Assert(*matched == true);

/*
* If there are no WHEN MATCHED actions, we are done.
*/
#if PG17_GE
if (resultRelInfo->ri_MergeActions[MERGE_WHEN_MATCHED] == NIL)
return true;
return NULL;
#else
if (resultRelInfo->ri_matchedMergeAction == NIL)
return true;
if (resultRelInfo->ri_matchedMergeAction == NIL){
*matched = true;
return NULL;
}
#endif
/*
* Make tuple and any needed join variables available to ExecQual and
Expand Down Expand Up @@ -469,7 +517,11 @@
*/
newslot = ExecProject(relaction->mas_proj);

#if PG17_GE
mtstate->mt_merge_action = relaction;
#else
context->relaction = relaction;
#endif
context->GetUpdateNewTuple = mergeGetUpdateNewTuple;
context->cpUpdateRetrySlot = NULL;

Expand All @@ -479,10 +531,10 @@
result = TM_Ok;
#else
if (result == TM_Ok)
return true; /* "do nothing" */
return NULL;
#endif

/* if not TM_OK, it is concurrent update/delete */
#endif
break;
}
ht_ExecUpdatePrepareSlot(resultRelInfo, newslot, context->estate);
Expand All @@ -507,14 +559,18 @@
break;

case CMD_DELETE:
#if PG17_GE
mtstate->mt_merge_action = relaction;
#else
context->relaction = relaction;
#endif
if (!ht_ExecDeletePrologue(context, resultRelInfo, tupleid, NULL, NULL, &result))
{
#if PG16_LT
result = TM_Ok;
#else
if (result == TM_Ok)
return true; /* "do nothing" */
return NULL; /* "do nothing" */

/* if not TM_OK, it is concurrent update/delete */
#endif
Expand Down Expand Up @@ -545,10 +601,12 @@
break;

case TM_SelfModified:
if (context->tmfd.cmax != estate->es_output_cid)
ereport(ERROR,
(errcode(ERRCODE_TRIGGERED_DATA_CHANGE_VIOLATION),
errmsg("tuple to be updated or deleted was already modified by an operation triggered by the current command"),
errhint("Consider using an AFTER trigger instead of a BEFORE trigger to propagate changes to other rows.")));

/*
* The SQL standard disallows this for MERGE.
*/
if (TransactionIdIsCurrentTransactionId(context->tmfd.xmax))
ereport(ERROR,
(errcode(ERRCODE_CARDINALITY_VIOLATION),
Expand All @@ -570,7 +628,9 @@
* If the tuple was already deleted, return to let
* caller handle it under NOT MATCHED clauses.
*/
return false;
*matched = false;
return NULL;

Check warning on line 632 in src/import/ht_hypertable_modify.c

View check run for this annotation

Codecov / codecov/patch

src/import/ht_hypertable_modify.c#L631-L632

Added lines #L631 - L632 were not covered by tests


case TM_Updated:
{
Expand Down Expand Up @@ -633,6 +693,7 @@
&context->tmfd);
switch (result) {
case TM_Ok:
// TODO: update this to match PG17
epqslot = EvalPlanQual(epqstate,
resultRelationDesc,
resultRelInfo->ri_RangeTableIndex,
Expand Down Expand Up @@ -690,7 +751,8 @@
* tuple already deleted; tell caller
* to run NOT MATCHED actions
*/
return false;
*matched = false;
return NULL;

Check warning on line 755 in src/import/ht_hypertable_modify.c

View check run for this annotation

Codecov / codecov/patch

src/import/ht_hypertable_modify.c#L754-L755

Added lines #L754 - L755 were not covered by tests

case TM_SelfModified:

Expand All @@ -715,15 +777,25 @@
"by an operation triggered by the current command"),
errhint("Consider using an AFTER trigger instead of a BEFORE "
"trigger to propagate changes to other rows.")));
return false;
if (TransactionIdIsCurrentTransactionId(context->tmfd.xmax))
ereport(ERROR,
(errcode(ERRCODE_CARDINALITY_VIOLATION),
/* translator: %s is a SQL command name */
errmsg("%s command cannot affect row a second time",
"MERGE"),
errhint("Ensure that not more than one source row matches any one target row.")));

/* This shouldn't happen */
elog(ERROR, "attempted to update or delete invisible tuple");
return NULL;

default:
/*
* see table_tuple_lock call in
* ht_ExecDelete()
*/
elog(ERROR, "unexpected table_tuple_lock status: %u", result);
return false;
return NULL;
}
}

Expand All @@ -735,6 +807,33 @@
break;
}

#if PG17_GE
/* Process RETURNING if present */
if (resultRelInfo->ri_projectReturning)
{
switch (commandType)
{
case CMD_UPDATE:
rslot = ExecProcessReturning(resultRelInfo, newslot,

Check warning on line 817 in src/import/ht_hypertable_modify.c

View check run for this annotation

Codecov / codecov/patch

src/import/ht_hypertable_modify.c#L816-L817

Added lines #L816 - L817 were not covered by tests
context->planSlot);
break;

Check warning on line 819 in src/import/ht_hypertable_modify.c

View check run for this annotation

Codecov / codecov/patch

src/import/ht_hypertable_modify.c#L819

Added line #L819 was not covered by tests

case CMD_DELETE:
rslot = ExecProcessReturning(resultRelInfo,
resultRelInfo->ri_oldTupleSlot,
context->planSlot);
break;

case CMD_NOTHING:
break;

Check warning on line 828 in src/import/ht_hypertable_modify.c

View check run for this annotation

Codecov / codecov/patch

src/import/ht_hypertable_modify.c#L827-L828

Added lines #L827 - L828 were not covered by tests

default:

Check warning on line 830 in src/import/ht_hypertable_modify.c

View check run for this annotation

Codecov / codecov/patch

src/import/ht_hypertable_modify.c#L830

Added line #L830 was not covered by tests
elog(ERROR, "unrecognized commandType: %d",
(int) commandType);
}
}
#endif

/*
* We've activated one of the WHEN clauses, so we don't
* search further. This is required behaviour, not an
Expand All @@ -743,23 +842,25 @@
break;
}


/*
* Successfully executed an action or no qualifying action was found.
*/
return true;
return rslot;
}

/*
* Execute the first qualifying NOT MATCHED action.
*/
void
TupleTableSlot*
ht_ExecMergeNotMatched(ModifyTableContext * context, ResultRelInfo * resultRelInfo,
ChunkDispatchState * cds, bool canSetTag)
{
ModifyTableState *mtstate = context->mtstate;
ExprContext *econtext = mtstate->ps.ps_ExprContext;
List *actionStates = NIL;
ListCell *l;
TupleTableSlot *rslot = NULL;

/*
* For INSERT actions, the root relation's merge action is OK since
Expand Down Expand Up @@ -814,7 +915,11 @@
* tuple here.
*/
newslot = ExecProject(action->mas_proj);
#if PG17_GE
mtstate->mt_merge_action = action;
#else
context->relaction = action;
#endif
if (cds->is_dropped_attr_exists)
{
AttrMap *map;
Expand All @@ -835,15 +940,15 @@
newslot,
MakeSingleTupleTableSlot(chunktupdesc,
&TTSOpsVirtual));
(void) ExecInsert(context,
rslot = ExecInsert(context,
cds->rri,
(chunk_slot ? chunk_slot : newslot),
canSetTag);
if (chunk_slot)
ExecDropSingleTupleTableSlot(chunk_slot);
}
else
(void) ExecInsert(context, cds->rri, newslot, canSetTag);
rslot = ExecInsert(context, cds->rri, newslot, canSetTag);
mtstate->mt_merge_inserted = 1;
break;
case CMD_NOTHING:
Expand All @@ -860,16 +965,19 @@
*/
break;
}

return rslot;
}

/*
* Perform MERGE.
*/
TupleTableSlot *
ht_ExecMerge(ModifyTableContext * context, ResultRelInfo * resultRelInfo, ChunkDispatchState * cds,
ItemPointer tupleid, bool canSetTag)
ItemPointer tupleid, HeapTuple oldtuple, bool canSetTag)
{
bool matched;
TupleTableSlot* rslot = NULL;

/*-----
* If we are dealing with a WHEN MATCHED case (tupleid is valid), we
Expand Down Expand Up @@ -914,20 +1022,33 @@
* from ht_ExecMergeNotMatched to ht_ExecMergeMatched, there is no risk of a
* livelock.
*/
#if PG17_GE
matched = tupleid != NULL || oldtuple != NULL;
#else
matched = tupleid != NULL;
#endif
if (matched)
matched = ht_ExecMergeMatched(context, resultRelInfo, tupleid, canSetTag);
rslot = ht_ExecMergeMatched(context, resultRelInfo, tupleid, oldtuple, canSetTag, &matched);


/*
* Either we were dealing with a NOT MATCHED tuple or
* ht_ExecMergeMatched() returned "false", indicating the previously
* MATCHED tuple no longer matches.
*/
if (!matched)
ht_ExecMergeNotMatched(context, resultRelInfo, cds, canSetTag);
{
#if PG17_GE
if(rslot == NULL)
rslot = ht_ExecMergeNotMatched(context, resultRelInfo, cds, canSetTag);
else
context->mtstate->mt_merge_pending_not_matched = context->planSlot;

Check warning on line 1045 in src/import/ht_hypertable_modify.c

View check run for this annotation

Codecov / codecov/patch

src/import/ht_hypertable_modify.c#L1045

Added line #L1045 was not covered by tests
#else
(void) ht_ExecMergeNotMatched(context, resultRelInfo, cds, canSetTag);
#endif
}

/* No RETURNING support yet */
return NULL;
return rslot;
}

/*
Expand Down
13 changes: 7 additions & 6 deletions src/import/ht_hypertable_modify.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include "compat/compat.h"
#include "nodes/chunk_dispatch/chunk_dispatch.h"
#include <executor/tuptable.h>

/* clang-format off */
/*
Expand Down Expand Up @@ -103,11 +104,11 @@ void ht_ExecDeleteEpilogue(ModifyTableContext * context, ResultRelInfo * result
ItemPointer tupleid, HeapTuple oldtuple);

#if PG15_GE
/* MERGE specific */
TupleTableSlot *ht_ExecMerge(ModifyTableContext * context, ResultRelInfo * resultRelInfo,
ChunkDispatchState * cds, ItemPointer tupleid, bool canSetTag);
bool ht_ExecMergeMatched(ModifyTableContext * context, ResultRelInfo * resultRelInfo,
ItemPointer tupleid, bool canSetTag);
void ht_ExecMergeNotMatched(ModifyTableContext * context, ResultRelInfo * resultRelInfo,
TupleTableSlot *
ht_ExecMerge(ModifyTableContext * context, ResultRelInfo * resultRelInfo, ChunkDispatchState * cds,
ItemPointer tupleid, HeapTuple oldtuple, bool canSetTag);
TupleTableSlot* ht_ExecMergeMatched(ModifyTableContext * context, ResultRelInfo * resultRelInfo,
ItemPointer tupleid, HeapTuple oldtuple, bool canSetTag, bool *matched);
TupleTableSlot* ht_ExecMergeNotMatched(ModifyTableContext * context, ResultRelInfo * resultRelInfo,
ChunkDispatchState * cds, bool canSetTag);
#endif
Loading
Loading