Skip to content

Commit

Permalink
[columnar] PostgreSQL 16 compatibility
Browse files Browse the repository at this point in the history
  • Loading branch information
JerrySievert authored and mkaruza committed Nov 14, 2023
1 parent 62bef0b commit 9ad6160
Show file tree
Hide file tree
Showing 27 changed files with 1,952 additions and 182 deletions.
2 changes: 1 addition & 1 deletion columnar/configure
Original file line number Diff line number Diff line change
Expand Up @@ -2802,7 +2802,7 @@ if test -z "$version_num"; then
as_fn_error $? "Could not detect PostgreSQL version from pg_config." "$LINENO" 5
fi

if test "$version_num" != '13' -a "$version_num" != '14' -a "$version_num" != '15'; then
if test "$version_num" != '13' -a "$version_num" != '14' -a "$version_num" != '15' -a "$version_num" != '16'; then
as_fn_error $? "Citus is not compatible with the detected PostgreSQL version ${version_num}." "$LINENO" 5
else
{ printf "%s\n" "$as_me:${as_lineno-$LINENO}: building against PostgreSQL $version_num" >&5
Expand Down
2 changes: 1 addition & 1 deletion columnar/configure.in
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ if test -z "$version_num"; then
AC_MSG_ERROR([Could not detect PostgreSQL version from pg_config.])
fi

if test "$version_num" != '13' -a "$version_num" != '14' -a "$version_num" != '15'; then
if test "$version_num" != '13' -a "$version_num" != '14' -a "$version_num" != '15' -a "$version_num" != '16'; then
AC_MSG_ERROR([Citus is not compatible with the detected PostgreSQL version ${version_num}.])
else
AC_MSG_NOTICE([building against PostgreSQL $version_num])
Expand Down
8 changes: 4 additions & 4 deletions columnar/src/backend/columnar/columnar.c
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ bool columnar_enable_parallel_execution = true;
int columnar_min_parallel_processes = 8;
bool columnar_enable_vectorization = true;
bool columnar_enable_dml = true;
bool columnar_enable_page_cache = true;
bool columnar_enable_page_cache = false;
int columnar_page_cache_size = 200U;

static const struct config_enum_entry columnar_compression_options[] =
Expand Down Expand Up @@ -132,7 +132,7 @@ columnar_guc_init()
&columnar_enable_parallel_execution,
true,
PGC_USERSET,
GUC_NO_SHOW_ALL,
GUC_NO_SHOW_ALL | GUC_NOT_IN_SAMPLE,
NULL,
NULL,
NULL);
Expand All @@ -156,7 +156,7 @@ columnar_guc_init()
&columnar_enable_vectorization,
true,
PGC_USERSET,
GUC_NO_SHOW_ALL,
GUC_NO_SHOW_ALL | GUC_NOT_IN_SAMPLE,
NULL,
NULL,
NULL);
Expand All @@ -167,7 +167,7 @@ columnar_guc_init()
&columnar_enable_dml,
true,
PGC_USERSET,
GUC_NO_SHOW_ALL,
GUC_NO_SHOW_ALL | GUC_NOT_IN_SAMPLE,
NULL,
NULL,
NULL);
Expand Down
5 changes: 5 additions & 0 deletions columnar/src/backend/columnar/columnar_compression.c
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
#include "postgres.h"

#include "citus_version.h"
#include "pg_version_compat.h"

#include "common/pg_lzcompress.h"
#include "lib/stringinfo.h"

Expand All @@ -27,6 +29,9 @@
#include <zstd.h>
#endif

#if PG_VERSION_NUM >= PG_VERSION_16
#include "varatt.h"
#endif
/*
* The information at the start of the compressed data. This decription is taken
* from pg_lzcompress in pre-9.5 version of PostgreSQL.
Expand Down
125 changes: 117 additions & 8 deletions columnar/src/backend/columnar/columnar_customscan.c
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
*/

#include "citus_version.h"
#include "pg_version_compat.h"

#include "postgres.h"

Expand All @@ -39,8 +40,13 @@
#include "optimizer/plancat.h"
#include "optimizer/planner.h"
#include "optimizer/restrictinfo.h"
#if PG_VERSION_NUM >= PG_VERSION_16
#include "parser/parse_relation.h"
#include "parser/parsetree.h"
#endif
#include "storage/smgr.h"
#include "utils/builtins.h"
#include "utils/guc.h"
#include "utils/lsyscache.h"
#include "utils/relcache.h"
#include "utils/ruleutils.h"
Expand All @@ -56,6 +62,10 @@
#include "columnar/vectorization/columnar_vector_execution.h"
#include "columnar/vectorization/columnar_vector_types.h"

#ifndef Abs
#define Abs(x) ((x) >= 0 ? (x) : -(x))
#endif

/*
* ColumnarScanState represents the state for a columnar scan. It's a
* CustomScanState with additional fields specific to columnar scans.
Expand Down Expand Up @@ -188,7 +198,7 @@ static planner_hook_type PreviousPlannerHook = NULL;

static bool EnableColumnarCustomScan = true;
static bool EnableColumnarQualPushdown = true;
static double ColumnarQualPushdownCorrelationThreshold = 0.9;
static double ColumnarQualPushdownCorrelationThreshold = 0.4;
static int ColumnarMaxCustomScanPaths = 64;
static int ColumnarPlannerDebugLevel = DEBUG3;

Expand Down Expand Up @@ -267,7 +277,7 @@ columnar_customscan_init()
&EnableColumnarCustomScan,
true,
PGC_USERSET,
GUC_NO_SHOW_ALL,
GUC_NO_SHOW_ALL | GUC_NOT_IN_SAMPLE,
NULL, NULL, NULL);
DefineCustomBoolVariable(
"columnar.enable_qual_pushdown",
Expand All @@ -277,7 +287,7 @@ columnar_customscan_init()
&EnableColumnarQualPushdown,
true,
PGC_USERSET,
GUC_NO_SHOW_ALL,
GUC_NO_SHOW_ALL | GUC_NOT_IN_SAMPLE,
NULL, NULL, NULL);
DefineCustomRealVariable(
"columnar.qual_pushdown_correlation_threshold",
Expand All @@ -291,7 +301,7 @@ columnar_customscan_init()
0.0,
1.0,
PGC_USERSET,
GUC_NO_SHOW_ALL,
GUC_NO_SHOW_ALL |GUC_NOT_IN_SAMPLE,
NULL, NULL, NULL);
DefineCustomIntVariable(
"columnar.max_custom_scan_paths",
Expand All @@ -303,7 +313,7 @@ columnar_customscan_init()
1,
1024,
PGC_USERSET,
GUC_NO_SHOW_ALL,
GUC_NO_SHOW_ALL | GUC_NOT_IN_SAMPLE,
NULL, NULL, NULL);
DefineCustomEnumVariable(
"columnar.planner_debug_level",
Expand Down Expand Up @@ -1454,6 +1464,66 @@ ContainsExecParams(Node *node, void *notUsed)
return expression_tree_walker(node, ContainsExecParams, NULL);
}

#if PG_VERSION_NUM >= PG_VERSION_16

/*
* fixup_inherited_columns
*
* Exact function Copied from PG16 as it's static.
*
* When user is querying on a table with children, it implicitly accesses
* child tables also. So, we also need to check security label of child
* tables and columns, but there is no guarantee attribute numbers are
* same between the parent and children.
* It returns a bitmapset which contains attribute number of the child
* table based on the given bitmapset of the parent.
*/
static Bitmapset *
fixup_inherited_columns(Oid parentId, Oid childId, Bitmapset *columns)
{
Bitmapset *result = NULL;

/*
* obviously, no need to do anything here
*/
if (parentId == childId)
{
return columns;
}

int index = -1;
while ((index = bms_next_member(columns, index)) >= 0)
{
/* bit numbers are offset by FirstLowInvalidHeapAttributeNumber */
AttrNumber attno = index + FirstLowInvalidHeapAttributeNumber;

/*
* whole-row-reference shall be fixed-up later
*/
if (attno == InvalidAttrNumber)
{
result = bms_add_member(result, index);
continue;
}

char *attname = get_attname(parentId, attno, false);
attno = get_attnum(childId, attname);
if (attno == InvalidAttrNumber)
{
elog(ERROR, "cache lookup failed for attribute %s of relation %u",
attname, childId);
}

result = bms_add_member(result,
attno - FirstLowInvalidHeapAttributeNumber);

pfree(attname);
}

return result;
}
#endif


/*
* Create and add a path with the given parameterization paramRelids.
Expand Down Expand Up @@ -1536,7 +1606,42 @@ AddColumnarScanPath(PlannerInfo *root, RelOptInfo *rel, RangeTblEntry *rte,
cpath->custom_private = list_make2(NIL, NIL);
}

int numberOfColumnsRead = bms_num_members(rte->selectedCols);
int numberOfColumnsRead = 0;
#if PG_VERSION_NUM >= PG_VERSION_16
if (rte->perminfoindex > 0)
{
/*
* If perminfoindex > 0, that means that this relation's permission info
* is directly found in the list of rteperminfos of the Query(root->parse)
* So, all we have to do here is retrieve that info.
*/
RTEPermissionInfo *perminfo = getRTEPermissionInfo(root->parse->rteperminfos,
rte);
numberOfColumnsRead = bms_num_members(perminfo->selectedCols);
}
else
{
/*
* If perminfoindex = 0, that means we are skipping the check for permission info
* for this relation, which means that it's either a partition or an inheritance child.
* In these cases, we need to access the permission info of the top parent of this relation.
* After thorough checking, we found that the index of the top parent pointing to the correct
* range table entry in Query's range tables (root->parse->rtable) is found under
* RelOptInfo rel->top_parent->relid.
* For reference, check expand_partitioned_rtentry and expand_inherited_rtentry PG functions
*/
Assert(rel->top_parent);
RangeTblEntry *parent_rte = rt_fetch(rel->top_parent->relid, root->parse->rtable);
RTEPermissionInfo *perminfo = getRTEPermissionInfo(root->parse->rteperminfos,
parent_rte);
numberOfColumnsRead = bms_num_members(fixup_inherited_columns(perminfo->relid,
rte->relid,
perminfo->
selectedCols));
}
#else
numberOfColumnsRead = bms_num_members(rte->selectedCols);
#endif
int numberOfClausesPushed = list_length(allClauses);

/* Queries that contain only aggregate with STAR doesn't have any
Expand Down Expand Up @@ -1631,7 +1736,7 @@ static Cost
ColumnarPerStripeScanCost(RelOptInfo *rel, Oid relationId, int numberOfColumnsRead)
{
Relation relation = RelationIdGetRelation(relationId);
List *stripeList = StripesForRelfilenode(relation->rd_node, ForwardScanDirection);
List *stripeList = StripesForRelfilenode(RelationPhysicalIdentifier_compat(relation), ForwardScanDirection);
RelationClose(relation);

uint32 maxColumnCount = 0;
Expand Down Expand Up @@ -1683,7 +1788,7 @@ static uint64
ColumnarTableStripeCount(Oid relationId)
{
Relation relation = RelationIdGetRelation(relationId);
List *stripeList = StripesForRelfilenode(relation->rd_node, ForwardScanDirection);
List *stripeList = StripesForRelfilenode(RelationPhysicalIdentifier_compat(relation), ForwardScanDirection);
int stripeCount = list_length(stripeList);
RelationClose(relation);

Expand Down Expand Up @@ -1966,7 +2071,11 @@ ColumnarScan_BeginCustomScan(CustomScanState *cscanstate, EState *estate, int ef
*/
columnarScanState->snapshot = estate->es_snapshot;
columnarScanState->snapshotRegisteredByUs = false;
#if PG_VERSION_NUM >= PG_VERSION_16
Oid relationOid = cscanstate->ss.ss_currentRelation->rd_locator.relNumber;
#else
Oid relationOid = cscanstate->ss.ss_currentRelation->rd_node.relNode;
#endif

if(!IsInParallelMode())
{
Expand Down
Loading

0 comments on commit 9ad6160

Please sign in to comment.