Skip to content

Commit

Permalink
Refactor compression filter handling
Browse files Browse the repository at this point in the history
Change the code to have less direct references to FormData_hypertable_compression.
The patch also renames SegmentFilter to BatchFilter to make the purpose clearer.
  • Loading branch information
svenklemm committed Nov 20, 2023
1 parent 3c860cc commit 747f4e2
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 93 deletions.
94 changes: 45 additions & 49 deletions tsl/src/compression/compression.c
Original file line number Diff line number Diff line change
Expand Up @@ -934,8 +934,9 @@ row_compressor_init(RowCompressor *row_compressor, TupleDesc uncompressed_tuple_

if (compression_info->orderby_column_index > 0)
{
char *segment_min_col_name = compression_column_segment_min_name(compression_info);
char *segment_max_col_name = compression_column_segment_max_name(compression_info);
int16 index = compression_info->orderby_column_index;
char *segment_min_col_name = column_segment_min_name(index);
char *segment_max_col_name = column_segment_max_name(index);
AttrNumber segment_min_attr_number =
get_attnum(compressed_table->rd_id, segment_min_col_name);
AttrNumber segment_max_attr_number =
Expand Down Expand Up @@ -1970,22 +1971,23 @@ build_scankeys(int32 hypertable_id, Oid hypertable_relid, RowDecompressor *decom
}
if (COMPRESSIONCOL_IS_ORDER_BY(fd))
{
int16 index = fd->orderby_column_index;
/* Cannot optimize orderby columns with NULL values since those
* are not visible in metadata
*/
if (isnull)
continue;

key_index = create_segment_filter_scankey(decompressor,
compression_column_segment_min_name(fd),
column_segment_min_name(index),
BTLessEqualStrategyNumber,
scankeys,
key_index,
null_columns,
value,
false); /* is_null_check */
key_index = create_segment_filter_scankey(decompressor,
compression_column_segment_max_name(fd),
column_segment_max_name(index),
BTGreaterEqualStrategyNumber,
scankeys,
key_index,
Expand Down Expand Up @@ -2543,13 +2545,12 @@ ts_fuzz_compression(PG_FUNCTION_ARGS)
#endif

#if PG14_GE
static SegmentFilter *
add_filter_column_strategy(char *column_name, StrategyNumber strategy, Const *value,
bool is_null_check)
static BatchFilter *
make_batchfilter(char *column_name, StrategyNumber strategy, Const *value, bool is_null_check)
{
SegmentFilter *segment_filter = palloc0(sizeof(*segment_filter));
BatchFilter *segment_filter = palloc0(sizeof(*segment_filter));

*segment_filter = (SegmentFilter){
*segment_filter = (BatchFilter){
.strategy = strategy,
.value = value,
.is_null_check = is_null_check,
Expand Down Expand Up @@ -2643,7 +2644,7 @@ fix_and_reorder_index_filters(Relation comp_chunk_rel, Relation index_rel,
forboth (lp, segmentby_predicates, lf, index_filters)
{
Node *node = lfirst(lp);
SegmentFilter *sf = lfirst(lf);
BatchFilter *sf = lfirst(lf);

if (node == NULL)
continue;
Expand Down Expand Up @@ -2788,7 +2789,7 @@ find_matching_index(Relation comp_chunk_rel, List *index_filters)
{
AttrNumber attnum = index_rel->rd_index->indkey.values[i];
char *attname = get_attname(RelationGetRelid(comp_chunk_rel), attnum, false);
SegmentFilter *sf = lfirst(li);
BatchFilter *sf = lfirst(li);
/* ensure column exists in index relation */
if (!strcmp(attname, sf->column_name.data))
{
Expand Down Expand Up @@ -2877,59 +2878,55 @@ fill_predicate_context(Chunk *ch, List *predicates, List **filters, List **index
{
/* save segment by column name and its corresponding value specified in
* WHERE */
*index_filters =
lappend(*index_filters,
add_filter_column_strategy(column_name,
op_strategy,
arg_value,
false)); /* is_null_check */
*index_filters = lappend(*index_filters,
make_batchfilter(column_name,
op_strategy,
arg_value,
false)); /* is_null_check */
*segmentby_predicates = lappend(*segmentby_predicates, node);
}
}
}
else if (COMPRESSIONCOL_IS_ORDER_BY(fd))
{
int16 index = fd->orderby_column_index;
switch (op_strategy)
{
case BTEqualStrategyNumber:
{
/* orderby col = value implies min <= value and max >= value */
*filters = lappend(
*filters,
add_filter_column_strategy(compression_column_segment_min_name(fd),
BTLessEqualStrategyNumber,
arg_value,
false)); /* is_null_check */
*filters = lappend(
*filters,
add_filter_column_strategy(compression_column_segment_max_name(fd),
BTGreaterEqualStrategyNumber,
arg_value,
false)); /* is_null_check */
*filters = lappend(*filters,
make_batchfilter(column_segment_min_name(index),
BTLessEqualStrategyNumber,
arg_value,
false)); /* is_null_check */
*filters = lappend(*filters,
make_batchfilter(column_segment_max_name(index),
BTGreaterEqualStrategyNumber,
arg_value,
false)); /* is_null_check */
}
break;
case BTLessStrategyNumber:
case BTLessEqualStrategyNumber:
{
/* orderby col <[=] value implies min <[=] value */
*filters = lappend(
*filters,
add_filter_column_strategy(compression_column_segment_min_name(fd),
op_strategy,
arg_value,
false)); /* is_null_check */
*filters = lappend(*filters,
make_batchfilter(column_segment_min_name(index),
op_strategy,
arg_value,
false)); /* is_null_check */
}
break;
case BTGreaterStrategyNumber:
case BTGreaterEqualStrategyNumber:
{
/* orderby col >[=] value implies max >[=] value */
*filters = lappend(
*filters,
add_filter_column_strategy(compression_column_segment_max_name(fd),
op_strategy,
arg_value,
false)); /* is_null_check */
*filters = lappend(*filters,
make_batchfilter(column_segment_max_name(index),
op_strategy,
arg_value,
false)); /* is_null_check */
}
}
}
Expand All @@ -2949,12 +2946,11 @@ fill_predicate_context(Chunk *ch, List *predicates, List **filters, List **index
ts_hypertable_compression_get_by_pkey(ch->fd.hypertable_id, column_name);
if (COMPRESSIONCOL_IS_SEGMENT_BY(fd))
{
*index_filters =
lappend(*index_filters,
add_filter_column_strategy(column_name,
InvalidStrategy,
NULL,
true)); /* is_null_check */
*index_filters = lappend(*index_filters,
make_batchfilter(column_name,
InvalidStrategy,
NULL,
true)); /* is_null_check */
*segmentby_predicates = lappend(*segmentby_predicates, node);
if (ntest->nulltesttype == IS_NULL)
*is_null = lappend_int(*is_null, 1);
Expand Down Expand Up @@ -2986,7 +2982,7 @@ build_update_delete_scankeys(RowDecompressor *decompressor, List *filters, int *
Bitmapset **null_columns)
{
ListCell *lc;
SegmentFilter *filter;
BatchFilter *filter;
int key_index = 0;

ScanKeyData *scankeys = palloc0(filters->length * sizeof(ScanKeyData));
Expand Down Expand Up @@ -3288,7 +3284,7 @@ decompress_batches_for_update_delete(HypertableModifyState *ht_state, Chunk *chu
Relation comp_chunk_rel;
Chunk *comp_chunk;
RowDecompressor decompressor;
SegmentFilter *filter;
BatchFilter *filter;

bool chunk_status_changed = false;
ScanKeyData *scankeys = NULL;
Expand Down
10 changes: 7 additions & 3 deletions tsl/src/compression/compression.h
Original file line number Diff line number Diff line change
Expand Up @@ -268,8 +268,12 @@ typedef struct RowCompressor
int insert_options;
} RowCompressor;

/* SegmentFilter is used for filtering segments based on qualifiers */
typedef struct SegmentFilter
/*
* BatchFilter is used for filtering batches before decompressing.
* The columns will either be segmentby columns or the corresponding
* metadata columns of orderby columns.
*/
typedef struct BatchFilter
{
/* Column which we use for filtering */
NameData column_name;
Expand All @@ -279,7 +283,7 @@ typedef struct SegmentFilter
Const *value;
/* IS NULL or IS NOT NULL */
bool is_null_check;
} SegmentFilter;
} BatchFilter;

extern Datum tsl_compressed_data_decompress_forward(PG_FUNCTION_ARGS);
extern Datum tsl_compressed_data_decompress_reverse(PG_FUNCTION_ARGS);
Expand Down
35 changes: 11 additions & 24 deletions tsl/src/compression/create.c
Original file line number Diff line number Diff line change
Expand Up @@ -137,18 +137,6 @@ column_segment_max_name(int16 column_index)
COMPRESSION_COLUMN_METADATA_MAX_COLUMN_NAME);
}

char *
compression_column_segment_min_name(const FormData_hypertable_compression *fd)
{
return column_segment_min_name(fd->orderby_column_index);
}

char *
compression_column_segment_max_name(const FormData_hypertable_compression *fd)
{
return column_segment_max_name(fd->orderby_column_index);
}

static void
compresscolinfo_add_metadata_columns(CompressColInfo *cc, Relation uncompressed_rel)
{
Expand Down Expand Up @@ -178,6 +166,7 @@ compresscolinfo_add_metadata_columns(CompressColInfo *cc, Relation uncompressed_
{
if (cc->col_meta[colno].orderby_column_index > 0)
{
int16 index = cc->col_meta[colno].orderby_column_index;
FormData_hypertable_compression fd = cc->col_meta[colno];
AttrNumber col_attno = get_attnum(uncompressed_rel->rd_id, NameStr(fd.attname));
Form_pg_attribute attr = TupleDescAttr(RelationGetDescr(uncompressed_rel),
Expand All @@ -191,18 +180,16 @@ compresscolinfo_add_metadata_columns(CompressColInfo *cc, Relation uncompressed_
errdetail("Could not identify a less-than operator for the type.")));

/* segment_meta min and max columns */
cc->coldeflist =
lappend(cc->coldeflist,
makeColumnDef(compression_column_segment_min_name(&cc->col_meta[colno]),
attr->atttypid,
-1 /* typemod */,
0 /*collation*/));
cc->coldeflist =
lappend(cc->coldeflist,
makeColumnDef(compression_column_segment_max_name(&cc->col_meta[colno]),
attr->atttypid,
-1 /* typemod */,
0 /*collation*/));
cc->coldeflist = lappend(cc->coldeflist,
makeColumnDef(column_segment_min_name(index),
attr->atttypid,
-1 /* typemod */,
0 /*collation*/));
cc->coldeflist = lappend(cc->coldeflist,
makeColumnDef(column_segment_max_name(index),
attr->atttypid,
-1 /* typemod */,
0 /*collation*/));
}
}
}
Expand Down
3 changes: 0 additions & 3 deletions tsl/src/compression/create.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,6 @@ void tsl_process_compress_table_drop_column(Hypertable *ht, char *name);
void tsl_process_compress_table_rename_column(Hypertable *ht, const RenameStmt *stmt);
Chunk *create_compress_chunk(Hypertable *compress_ht, Chunk *src_chunk, Oid table_id);

char *compression_column_segment_min_name(const FormData_hypertable_compression *fd);
char *compression_column_segment_max_name(const FormData_hypertable_compression *fd);

char *column_segment_min_name(int16 column_index);
char *column_segment_max_name(int16 column_index);

Expand Down
7 changes: 3 additions & 4 deletions tsl/src/nodes/decompress_chunk/decompress_chunk.c
Original file line number Diff line number Diff line change
Expand Up @@ -1099,15 +1099,14 @@ compressed_rel_setup_reltarget(RelOptInfo *compressed_rel, CompressionInfo *info
/* if the column is an orderby, add it's metadata columns too */
if (column_info->orderby_column_index > 0)
{
int16 index = column_info->orderby_column_index;
compressed_reltarget_add_var_for_column(compressed_rel,
compressed_relid,
compression_column_segment_min_name(
column_info),
column_segment_min_name(index),
&attrs_used);
compressed_reltarget_add_var_for_column(compressed_rel,
compressed_relid,
compression_column_segment_max_name(
column_info),
column_segment_max_name(index),
&attrs_used);
}
}
Expand Down
20 changes: 10 additions & 10 deletions tsl/src/nodes/decompress_chunk/qual_pushdown.c
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,9 @@ make_segment_meta_opexpr(QualPushdownContext *context, Oid opno, AttrNumber meta
}

static AttrNumber
get_segment_meta_min_attr_number(FormData_hypertable_compression *compression_info,
Oid compressed_relid)
get_segment_meta_min_attr_number(int16 orderby_column_index, Oid compressed_relid)
{
char *meta_col_name = compression_column_segment_min_name(compression_info);
char *meta_col_name = column_segment_min_name(orderby_column_index);

if (meta_col_name == NULL)
elog(ERROR, "could not find meta column");
Expand All @@ -140,10 +139,9 @@ get_segment_meta_min_attr_number(FormData_hypertable_compression *compression_in
}

static AttrNumber
get_segment_meta_max_attr_number(FormData_hypertable_compression *compression_info,
Oid compressed_relid)
get_segment_meta_max_attr_number(int16 orderby_column_index, Oid compressed_relid)
{
char *meta_col_name = compression_column_segment_max_name(compression_info);
char *meta_col_name = column_segment_max_name(orderby_column_index);

if (meta_col_name == NULL)
elog(ERROR, "could not find meta column");
Expand Down Expand Up @@ -224,6 +222,8 @@ pushdown_op_to_segment_meta_min_max(QualPushdownContext *context, List *expr_arg
else
return NULL;

int16 index = compression_info->orderby_column_index;

/* May be able to allow non-strict operations as well.
* Next steps: Think through edge cases, either allow and write tests or figure out why we must
* block strict operations
Expand Down Expand Up @@ -267,15 +267,15 @@ pushdown_op_to_segment_meta_min_max(QualPushdownContext *context, List *expr_arg
return make_andclause(list_make2(
make_segment_meta_opexpr(context,
opno_le,
get_segment_meta_min_attr_number(compression_info,
get_segment_meta_min_attr_number(index,
context->compressed_rte
->relid),
var_with_segment_meta,
expr,
BTLessEqualStrategyNumber),
make_segment_meta_opexpr(context,
opno_ge,
get_segment_meta_max_attr_number(compression_info,
get_segment_meta_max_attr_number(index,
context->compressed_rte
->relid),
var_with_segment_meta,
Expand All @@ -295,7 +295,7 @@ pushdown_op_to_segment_meta_min_max(QualPushdownContext *context, List *expr_arg
return (Expr *)
make_segment_meta_opexpr(context,
opno,
get_segment_meta_min_attr_number(compression_info,
get_segment_meta_min_attr_number(index,
context
->compressed_rte
->relid),
Expand All @@ -317,7 +317,7 @@ pushdown_op_to_segment_meta_min_max(QualPushdownContext *context, List *expr_arg
return (Expr *)
make_segment_meta_opexpr(context,
opno,
get_segment_meta_max_attr_number(compression_info,
get_segment_meta_max_attr_number(index,
context
->compressed_rte
->relid),
Expand Down

0 comments on commit 747f4e2

Please sign in to comment.