Skip to content

Commit

Permalink
Implement support for RETURNING clause for MERGE
Browse files Browse the repository at this point in the history
This adds support for the RETURNING clause with MERGE
statements, which was added in PG17.
  • Loading branch information
kpan2034 committed Sep 17, 2024
1 parent 2c3ff7a commit edf049a
Show file tree
Hide file tree
Showing 4 changed files with 223 additions and 49 deletions.
169 changes: 145 additions & 24 deletions src/import/ht_hypertable_modify.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
#include <postgres.h>
#include <catalog/pg_type.h>
#include <executor/nodeModifyTable.h>
#include <executor/tuptable.h>
#include <nodes/nodes.h>
#include <utils/snapmgr.h>

#include "ht_hypertable_modify.h"
Expand Down Expand Up @@ -337,6 +339,44 @@ ht_ExecDeleteEpilogue(ModifyTableContext * context, ResultRelInfo * resultRelInf
ExecARDeleteTriggersCompat(estate, resultRelInfo, tupleid, oldtuple, ar_delete_trig_tcs, false);
}

#if PG17_GE
/*
* ExecProcessReturning --- evaluate a RETURNING list
*
* resultRelInfo: current result rel
* tupleSlot: slot holding tuple actually inserted/updated/deleted
* planSlot: slot holding tuple returned by top subplan node
*
* Note: If tupleSlot is NULL, the FDW should have already provided econtext's
* scan tuple.
*
* Returns a slot holding the result tuple
*
* copied verbatim from executor/nodeModifyTable.c
*/
static TupleTableSlot *
ExecProcessReturning(ResultRelInfo *resultRelInfo, TupleTableSlot *tupleSlot,
TupleTableSlot *planSlot)
{
ProjectionInfo *projectReturning = resultRelInfo->ri_projectReturning;
ExprContext *econtext = projectReturning->pi_exprContext;

/* Make tuple and any needed join variables available to ExecProject */
if (tupleSlot)
econtext->ecxt_scantuple = tupleSlot;
econtext->ecxt_outertuple = planSlot;

/*
* RETURNING expressions might reference the tableoid column, so
* reinitialize tts_tableOid before evaluating them.
*/
econtext->ecxt_scantuple->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);

/* Compute the RETURNING expressions */
return ExecProject(projectReturning);
}
#endif

#if PG15_GE

TupleTableSlot *ExecInsert(ModifyTableContext * context, ResultRelInfo * resultRelInfo,
Expand Down Expand Up @@ -367,10 +407,11 @@ static TupleTableSlot * mergeGetUpdateNewTuple(ResultRelInfo * relinfo, TupleTab
* source tuple.
*/

bool
TupleTableSlot*
ht_ExecMergeMatched(ModifyTableContext * context, ResultRelInfo * resultRelInfo, ItemPointer tupleid,
bool canSetTag)
HeapTuple oldtuple, bool canSetTag, bool *matched)
{

ModifyTableState *mtstate = context->mtstate;
TupleTableSlot *newslot;
EState *estate = context->estate;
Expand All @@ -379,15 +420,22 @@ ht_ExecMergeMatched(ModifyTableContext * context, ResultRelInfo * resultRelInfo,
EPQState *epqstate = &mtstate->mt_epqstate;
ListCell *l;

TupleTableSlot *rslot = NULL;


Assert(*matched == true);

/*
* If there are no WHEN MATCHED actions, we are done.
*/
#if PG17_GE
if (resultRelInfo->ri_MergeActions[MERGE_WHEN_MATCHED] == NIL)
return true;
return NULL;
#else
if (resultRelInfo->ri_matchedMergeAction == NIL)
return true;
if (resultRelInfo->ri_matchedMergeAction == NIL){
*matched = true;
return NULL;
}
#endif
/*
* Make tuple and any needed join variables available to ExecQual and
Expand Down Expand Up @@ -469,7 +517,11 @@ lmerge_matched:;
*/
newslot = ExecProject(relaction->mas_proj);

#if PG17_GE
mtstate->mt_merge_action = relaction;
#else
context->relaction = relaction;
#endif
context->GetUpdateNewTuple = mergeGetUpdateNewTuple;
context->cpUpdateRetrySlot = NULL;

Expand All @@ -479,10 +531,10 @@ lmerge_matched:;
result = TM_Ok;
#else
if (result == TM_Ok)
return true; /* "do nothing" */
return NULL;
#endif

/* if not TM_OK, it is concurrent update/delete */
#endif
break;
}
ht_ExecUpdatePrepareSlot(resultRelInfo, newslot, context->estate);
Expand All @@ -507,14 +559,18 @@ lmerge_matched:;
break;

case CMD_DELETE:
#if PG17_GE
mtstate->mt_merge_action = relaction;
#else
context->relaction = relaction;
#endif
if (!ht_ExecDeletePrologue(context, resultRelInfo, tupleid, NULL, NULL, &result))
{
#if PG16_LT
result = TM_Ok;
#else
if (result == TM_Ok)
return true; /* "do nothing" */
return NULL; /* "do nothing" */

/* if not TM_OK, it is concurrent update/delete */
#endif
Expand Down Expand Up @@ -545,10 +601,12 @@ lmerge_matched:;
break;

case TM_SelfModified:
if (context->tmfd.cmax != estate->es_output_cid)
ereport(ERROR,
(errcode(ERRCODE_TRIGGERED_DATA_CHANGE_VIOLATION),
errmsg("tuple to be updated or deleted was already modified by an operation triggered by the current command"),
errhint("Consider using an AFTER trigger instead of a BEFORE trigger to propagate changes to other rows.")));

/*
* The SQL standard disallows this for MERGE.
*/
if (TransactionIdIsCurrentTransactionId(context->tmfd.xmax))
ereport(ERROR,
(errcode(ERRCODE_CARDINALITY_VIOLATION),
Expand All @@ -570,7 +628,9 @@ lmerge_matched:;
* If the tuple was already deleted, return to let
* caller handle it under NOT MATCHED clauses.
*/
return false;
*matched = false;
return NULL;

Check warning on line 632 in src/import/ht_hypertable_modify.c

View check run for this annotation

Codecov / codecov/patch

src/import/ht_hypertable_modify.c#L631-L632

Added lines #L631 - L632 were not covered by tests


case TM_Updated:
{
Expand Down Expand Up @@ -633,6 +693,7 @@ lmerge_matched:;
&context->tmfd);
switch (result) {
case TM_Ok:
// TODO: update this to match PG17
epqslot = EvalPlanQual(epqstate,
resultRelationDesc,
resultRelInfo->ri_RangeTableIndex,
Expand Down Expand Up @@ -690,7 +751,8 @@ lmerge_matched:;
* tuple already deleted; tell caller
* to run NOT MATCHED actions
*/
return false;
*matched = false;
return NULL;

Check warning on line 755 in src/import/ht_hypertable_modify.c

View check run for this annotation

Codecov / codecov/patch

src/import/ht_hypertable_modify.c#L754-L755

Added lines #L754 - L755 were not covered by tests

case TM_SelfModified:

Expand All @@ -715,15 +777,25 @@ lmerge_matched:;
"by an operation triggered by the current command"),
errhint("Consider using an AFTER trigger instead of a BEFORE "
"trigger to propagate changes to other rows.")));
return false;
if (TransactionIdIsCurrentTransactionId(context->tmfd.xmax))
ereport(ERROR,
(errcode(ERRCODE_CARDINALITY_VIOLATION),
/* translator: %s is a SQL command name */
errmsg("%s command cannot affect row a second time",
"MERGE"),
errhint("Ensure that not more than one source row matches any one target row.")));

/* This shouldn't happen */
elog(ERROR, "attempted to update or delete invisible tuple");
return NULL;

default:
/*
* see table_tuple_lock call in
* ht_ExecDelete()
*/
elog(ERROR, "unexpected table_tuple_lock status: %u", result);
return false;
return NULL;
}
}

Expand All @@ -735,6 +807,33 @@ lmerge_matched:;
break;
}

#if PG17_GE
/* Process RETURNING if present */
if (resultRelInfo->ri_projectReturning)
{
switch (commandType)
{
case CMD_UPDATE:
rslot = ExecProcessReturning(resultRelInfo, newslot,
context->planSlot);
break;

case CMD_DELETE:
rslot = ExecProcessReturning(resultRelInfo,
resultRelInfo->ri_oldTupleSlot,
context->planSlot);
break;

case CMD_NOTHING:
break;

default:
elog(ERROR, "unrecognized commandType: %d",
(int) commandType);
}
}
#endif

/*
* We've activated one of the WHEN clauses, so we don't
* search further. This is required behaviour, not an
Expand All @@ -743,23 +842,25 @@ lmerge_matched:;
break;
}


/*
* Successfully executed an action or no qualifying action was found.
*/
return true;
return rslot;
}

/*
* Execute the first qualifying NOT MATCHED action.
*/
void
TupleTableSlot*
ht_ExecMergeNotMatched(ModifyTableContext * context, ResultRelInfo * resultRelInfo,
ChunkDispatchState * cds, bool canSetTag)
{
ModifyTableState *mtstate = context->mtstate;
ExprContext *econtext = mtstate->ps.ps_ExprContext;
List *actionStates = NIL;
ListCell *l;
TupleTableSlot *rslot = NULL;

/*
* For INSERT actions, the root relation's merge action is OK since
Expand Down Expand Up @@ -814,7 +915,11 @@ ht_ExecMergeNotMatched(ModifyTableContext * context, ResultRelInfo * resultRelIn
* tuple here.
*/
newslot = ExecProject(action->mas_proj);
#if PG17_GE
mtstate->mt_merge_action = action;
#else
context->relaction = action;
#endif
if (cds->is_dropped_attr_exists)
{
AttrMap *map;
Expand All @@ -835,15 +940,15 @@ ht_ExecMergeNotMatched(ModifyTableContext * context, ResultRelInfo * resultRelIn
newslot,
MakeSingleTupleTableSlot(chunktupdesc,
&TTSOpsVirtual));
(void) ExecInsert(context,
rslot = ExecInsert(context,
cds->rri,
(chunk_slot ? chunk_slot : newslot),
canSetTag);
if (chunk_slot)
ExecDropSingleTupleTableSlot(chunk_slot);
}
else
(void) ExecInsert(context, cds->rri, newslot, canSetTag);
rslot = ExecInsert(context, cds->rri, newslot, canSetTag);
mtstate->mt_merge_inserted = 1;
break;
case CMD_NOTHING:
Expand All @@ -860,16 +965,19 @@ ht_ExecMergeNotMatched(ModifyTableContext * context, ResultRelInfo * resultRelIn
*/
break;
}

return rslot;
}

/*
* Perform MERGE.
*/
TupleTableSlot *
ht_ExecMerge(ModifyTableContext * context, ResultRelInfo * resultRelInfo, ChunkDispatchState * cds,
ItemPointer tupleid, bool canSetTag)
ItemPointer tupleid, HeapTuple oldtuple, bool canSetTag)
{
bool matched;
TupleTableSlot* rslot = NULL;

/*-----
* If we are dealing with a WHEN MATCHED case (tupleid is valid), we
Expand Down Expand Up @@ -914,20 +1022,33 @@ ht_ExecMerge(ModifyTableContext * context, ResultRelInfo * resultRelInfo, ChunkD
* from ht_ExecMergeNotMatched to ht_ExecMergeMatched, there is no risk of a
* livelock.
*/
#if PG17_GE
matched = tupleid != NULL || oldtuple != NULL;
#else
matched = tupleid != NULL;
#endif
if (matched)
matched = ht_ExecMergeMatched(context, resultRelInfo, tupleid, canSetTag);
rslot = ht_ExecMergeMatched(context, resultRelInfo, tupleid, oldtuple, canSetTag, &matched);


/*
* Either we were dealing with a NOT MATCHED tuple or
* ht_ExecMergeMatched() returned "false", indicating the previously
* MATCHED tuple no longer matches.
*/
if (!matched)
ht_ExecMergeNotMatched(context, resultRelInfo, cds, canSetTag);
{
#if PG17_GE
if(rslot == NULL)
rslot = ht_ExecMergeNotMatched(context, resultRelInfo, cds, canSetTag);
else
context->mtstate->mt_merge_pending_not_matched = context->planSlot;
#else
(void) ht_ExecMergeNotMatched(context, resultRelInfo, cds, canSetTag);
#endif
}

/* No RETURNING support yet */
return NULL;
return rslot;
}

/*
Expand Down
13 changes: 7 additions & 6 deletions src/import/ht_hypertable_modify.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include "compat/compat.h"
#include "nodes/chunk_dispatch/chunk_dispatch.h"
#include <executor/tuptable.h>

/* clang-format off */
/*
Expand Down Expand Up @@ -103,11 +104,11 @@ void ht_ExecDeleteEpilogue(ModifyTableContext * context, ResultRelInfo * result
ItemPointer tupleid, HeapTuple oldtuple);

#if PG15_GE
/* MERGE specific */
TupleTableSlot *ht_ExecMerge(ModifyTableContext * context, ResultRelInfo * resultRelInfo,
ChunkDispatchState * cds, ItemPointer tupleid, bool canSetTag);
bool ht_ExecMergeMatched(ModifyTableContext * context, ResultRelInfo * resultRelInfo,
ItemPointer tupleid, bool canSetTag);
void ht_ExecMergeNotMatched(ModifyTableContext * context, ResultRelInfo * resultRelInfo,
TupleTableSlot *
ht_ExecMerge(ModifyTableContext * context, ResultRelInfo * resultRelInfo, ChunkDispatchState * cds,
ItemPointer tupleid, HeapTuple oldtuple, bool canSetTag);
TupleTableSlot* ht_ExecMergeMatched(ModifyTableContext * context, ResultRelInfo * resultRelInfo,
ItemPointer tupleid, HeapTuple oldtuple, bool canSetTag, bool *matched);
TupleTableSlot* ht_ExecMergeNotMatched(ModifyTableContext * context, ResultRelInfo * resultRelInfo,
ChunkDispatchState * cds, bool canSetTag);
#endif
Loading

0 comments on commit edf049a

Please sign in to comment.