Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow DELETE on compressed chunks without decompression #6882

Merged
merged 1 commit into from
Aug 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .unreleased/pr_6882
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Implements: #6882 Allow DELETE on compressed chunks without decompression
12 changes: 12 additions & 0 deletions src/guc.c
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ bool ts_guc_enable_foreign_key_propagation = true;
TSDLLEXPORT bool ts_guc_enable_cagg_watermark_constify = true;
TSDLLEXPORT int ts_guc_cagg_max_individual_materializations = 10;
bool ts_guc_enable_osm_reads = true;
TSDLLEXPORT bool ts_guc_enable_compressed_direct_batch_delete = true;
TSDLLEXPORT bool ts_guc_enable_dml_decompression = true;
TSDLLEXPORT bool ts_guc_enable_dml_decompression_tuple_filtering = true;
TSDLLEXPORT int ts_guc_max_tuples_decompressed_per_dml = 100000;
Expand Down Expand Up @@ -463,6 +464,17 @@ _guc_init(void)
NULL,
NULL);

DefineCustomBoolVariable(MAKE_EXTOPTION("enable_compressed_direct_batch_delete"),
"Enable direct deletion of compressed batches",
"Enable direct batch deletion in compressed chunks",
&ts_guc_enable_compressed_direct_batch_delete,
true,
PGC_USERSET,
0,
NULL,
NULL,
NULL);

DefineCustomIntVariable(MAKE_EXTOPTION("max_tuples_decompressed_per_dml_transaction"),
"The max number of tuples that can be decompressed during an "
"INSERT, UPDATE, or DELETE.",
Expand Down
1 change: 1 addition & 0 deletions src/guc.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ extern TSDLLEXPORT bool ts_guc_enable_cagg_watermark_constify;
extern bool ts_guc_enable_osm_reads;
extern TSDLLEXPORT bool ts_guc_enable_dml_decompression;
extern TSDLLEXPORT bool ts_guc_enable_dml_decompression_tuple_filtering;
extern TSDLLEXPORT bool ts_guc_enable_compressed_direct_batch_delete;
extern TSDLLEXPORT int ts_guc_max_tuples_decompressed_per_dml;
extern TSDLLEXPORT bool ts_guc_enable_transparent_decompression;
extern TSDLLEXPORT bool ts_guc_enable_compression_wal_markers;
Expand Down
1 change: 1 addition & 0 deletions src/nodes/chunk_dispatch/chunk_dispatch.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ typedef struct ChunkDispatchState
ResultRelInfo *rri;
/* flag to represent dropped attributes */
bool is_dropped_attr_exists;
int64 batches_deleted;
int64 batches_filtered;
int64 batches_decompressed;
int64 tuples_decompressed;
Expand Down
3 changes: 3 additions & 0 deletions src/nodes/hypertable_modify.c
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ hypertable_modify_explain(CustomScanState *node, List *ancestors, ExplainState *
foreach (lc, chunk_dispatch_states)
{
ChunkDispatchState *cds = (ChunkDispatchState *) lfirst(lc);
state->batches_deleted += cds->batches_deleted;
state->batches_filtered += cds->batches_filtered;
state->batches_decompressed += cds->batches_decompressed;
state->tuples_decompressed += cds->tuples_decompressed;
Expand All @@ -251,6 +252,8 @@ hypertable_modify_explain(CustomScanState *node, List *ancestors, ExplainState *
ExplainPropertyInteger("Batches decompressed", NULL, state->batches_decompressed, es);
if (state->tuples_decompressed > 0)
ExplainPropertyInteger("Tuples decompressed", NULL, state->tuples_decompressed, es);
if (state->batches_deleted > 0)
ExplainPropertyInteger("Batches deleted", NULL, state->batches_deleted, es);
}

static CustomExecMethods hypertable_modify_state_methods = {
Expand Down
1 change: 1 addition & 0 deletions src/nodes/hypertable_modify.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ typedef struct HypertableModifyState
int64 tuples_decompressed;
int64 batches_decompressed;
int64 batches_filtered;
int64 batches_deleted;
} HypertableModifyState;

extern void ts_hypertable_modify_fixup_tlist(Plan *plan);
Expand Down
4 changes: 4 additions & 0 deletions tsl/src/compression/compression.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@ typedef struct RowDecompressor
CommandId mycid;
BulkInsertState bistate;

bool delete_only;

Datum *compressed_datums;
bool *compressed_is_nulls;

Expand All @@ -147,6 +149,7 @@ typedef struct RowDecompressor
MemoryContext per_compressed_row_ctx;
int64 batches_decompressed;
int64 tuples_decompressed;
int64 batches_deleted;

TupleTableSlot **decompressed_slots;
int unprocessed_tuples;
Expand Down Expand Up @@ -412,6 +415,7 @@ const CompressionAlgorithmDefinition *algorithm_definition(CompressionAlgorithm

struct decompress_batches_stats
{
int64 batches_deleted;
int64 batches_filtered;
int64 batches_decompressed;
int64 tuples_decompressed;
Expand Down
122 changes: 107 additions & 15 deletions tsl/src/compression/compression_dml.c
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,11 @@ decompress_batches_indexscan(Relation in_rel, Relation out_rel, Relation index_r
ScanKeyData *heap_scankeys, int num_heap_scankeys,
ScanKeyData *mem_scankeys, int num_mem_scankeys,
tuple_filtering_constraints *constraints, bool *skip_current_tuple,
Bitmapset *null_columns, List *is_nulls);
static struct decompress_batches_stats
decompress_batches_seqscan(Relation in_rel, Relation out_rel, Snapshot snapshot,
ScanKeyData *scankeys, int num_scankeys, ScanKeyData *mem_scankeys,
int num_mem_scankeys, tuple_filtering_constraints *constraints,
bool *skip_current_tuple, Bitmapset *null_columns, List *is_nulls);
bool delete_only, Bitmapset *null_columns, List *is_nulls);
static struct decompress_batches_stats decompress_batches_seqscan(
Relation in_rel, Relation out_rel, Snapshot snapshot, ScanKeyData *scankeys, int num_scankeys,
ScanKeyData *mem_scankeys, int num_mem_scankeys, tuple_filtering_constraints *constraints,
bool *skip_current_tuple, bool delete_only, Bitmapset *null_columns, List *is_nulls);

static bool batch_matches(RowDecompressor *decompressor, ScanKeyData *scankeys, int num_scankeys,
tuple_filtering_constraints *constraints, bool *skip_current_tuple);
Expand All @@ -60,6 +59,9 @@ static void report_error(TM_Result result);

static bool key_column_is_null(tuple_filtering_constraints *constraints, Relation chunk_rel,
Oid ht_relid, TupleTableSlot *slot);
static bool can_delete_without_decompression(HypertableModifyState *ht_state,
CompressionSettings *settings, Chunk *chunk,
List *predicates);

void
decompress_batches_for_insert(const ChunkInsertState *cis, TupleTableSlot *slot)
Expand Down Expand Up @@ -167,6 +169,7 @@ decompress_batches_for_insert(const ChunkInsertState *cis, TupleTableSlot *slot)
num_mem_scankeys,
constraints,
&skip_current_tuple,
false,
NULL, /* no null column check for non-segmentby
columns */
NIL);
Expand All @@ -193,6 +196,7 @@ decompress_batches_for_insert(const ChunkInsertState *cis, TupleTableSlot *slot)
num_mem_scankeys,
constraints,
&skip_current_tuple,
false,
null_columns,
NIL);
}
Expand All @@ -203,6 +207,7 @@ decompress_batches_for_insert(const ChunkInsertState *cis, TupleTableSlot *slot)
cis->cds->skip_current_tuple = true;
}

cis->cds->batches_deleted += stats.batches_deleted;
svenklemm marked this conversation as resolved.
Show resolved Hide resolved
cis->cds->batches_filtered += stats.batches_filtered;
cis->cds->batches_decompressed += stats.batches_decompressed;
cis->cds->tuples_decompressed += stats.tuples_decompressed;
Expand All @@ -223,7 +228,7 @@ decompress_batches_for_insert(const ChunkInsertState *cis, TupleTableSlot *slot)
*/
static bool
decompress_batches_for_update_delete(HypertableModifyState *ht_state, Chunk *chunk,
List *predicates, EState *estate)
List *predicates, EState *estate, bool has_joins)
{
/* process each chunk with its corresponding predicates */

Expand All @@ -248,6 +253,8 @@ decompress_batches_for_update_delete(HypertableModifyState *ht_state, Chunk *chu

comp_chunk = ts_chunk_get_by_id(chunk->fd.compressed_chunk_id, true);
CompressionSettings *settings = ts_compression_settings_get(comp_chunk->table_id);
bool delete_only = ht_state->mt->operation == CMD_DELETE && !has_joins &&
can_delete_without_decompression(ht_state, settings, chunk, predicates);

process_predicates(chunk,
settings,
Expand Down Expand Up @@ -289,6 +296,7 @@ decompress_batches_for_update_delete(HypertableModifyState *ht_state, Chunk *chu
num_mem_scankeys,
NULL,
NULL,
delete_only,
null_columns,
is_null);
/* close the selected index */
Expand All @@ -305,6 +313,7 @@ decompress_batches_for_update_delete(HypertableModifyState *ht_state, Chunk *chu
num_mem_scankeys,
NULL,
NULL,
delete_only,
null_columns,
is_null);
}
Expand All @@ -329,6 +338,7 @@ decompress_batches_for_update_delete(HypertableModifyState *ht_state, Chunk *chu
filter = lfirst(lc);
pfree(filter);
}
ht_state->batches_deleted += stats.batches_deleted;
ht_state->batches_filtered += stats.batches_filtered;
ht_state->batches_decompressed += stats.batches_decompressed;
ht_state->tuples_decompressed += stats.tuples_decompressed;
Expand All @@ -351,7 +361,7 @@ decompress_batches_indexscan(Relation in_rel, Relation out_rel, Relation index_r
ScanKeyData *heap_scankeys, int num_heap_scankeys,
ScanKeyData *mem_scankeys, int num_mem_scankeys,
tuple_filtering_constraints *constraints, bool *skip_current_tuple,
Bitmapset *null_columns, List *is_nulls)
bool delete_only, Bitmapset *null_columns, List *is_nulls)
{
HeapTuple compressed_tuple;
RowDecompressor decompressor;
Expand Down Expand Up @@ -403,7 +413,7 @@ decompress_batches_indexscan(Relation in_rel, Relation out_rel, Relation index_r
valid = true;
for (; attrno >= 0; attrno = bms_next_member(null_columns, attrno))
{
is_null_condition = list_nth_int(is_nulls, pos);
is_null_condition = is_nulls && list_nth_int(is_nulls, pos);
seg_col_is_null = slot_attisnull(slot, attrno);
if ((seg_col_is_null && !is_null_condition) || (!seg_col_is_null && is_null_condition))
{
Expand All @@ -426,6 +436,8 @@ decompress_batches_indexscan(Relation in_rel, Relation out_rel, Relation index_r
if (!decompressor_initialized)
{
decompressor = build_decompressor(in_rel, out_rel);
decompressor.delete_only = delete_only;

decompressor_initialized = true;
}

Expand Down Expand Up @@ -475,8 +487,15 @@ decompress_batches_indexscan(Relation in_rel, Relation out_rel, Relation index_r
report_error(result);
return stats;
}
stats.tuples_decompressed += row_decompressor_decompress_row_to_table(&decompressor);
stats.batches_decompressed++;
if (decompressor.delete_only)
{
stats.batches_deleted++;
}
else
{
stats.tuples_decompressed += row_decompressor_decompress_row_to_table(&decompressor);
stats.batches_decompressed++;
}
write_logical_replication_msg_decompression_end();
}

Expand Down Expand Up @@ -515,7 +534,8 @@ static struct decompress_batches_stats
decompress_batches_seqscan(Relation in_rel, Relation out_rel, Snapshot snapshot,
ScanKeyData *scankeys, int num_scankeys, ScanKeyData *mem_scankeys,
int num_mem_scankeys, tuple_filtering_constraints *constraints,
bool *skip_current_tuple, Bitmapset *null_columns, List *is_nulls)
bool *skip_current_tuple, bool delete_only, Bitmapset *null_columns,
List *is_nulls)
{
RowDecompressor decompressor;
bool decompressor_initialized = false;
Expand Down Expand Up @@ -568,6 +588,7 @@ decompress_batches_seqscan(Relation in_rel, Relation out_rel, Snapshot snapshot,
if (!decompressor_initialized)
{
decompressor = build_decompressor(in_rel, out_rel);
decompressor.delete_only = delete_only;
decompressor_initialized = true;
}

Expand Down Expand Up @@ -612,8 +633,15 @@ decompress_batches_seqscan(Relation in_rel, Relation out_rel, Snapshot snapshot,
table_endscan(scan);
report_error(result);
}
stats.tuples_decompressed += row_decompressor_decompress_row_to_table(&decompressor);
stats.batches_decompressed++;
if (decompressor.delete_only)
{
stats.batches_deleted++;
}
else
{
stats.tuples_decompressed += row_decompressor_decompress_row_to_table(&decompressor);
stats.batches_decompressed++;
}
write_logical_replication_msg_decompression_end();
}
if (scankeys)
Expand Down Expand Up @@ -690,6 +718,7 @@ struct decompress_chunk_context
HypertableModifyState *ht_state;
/* indicates decompression actually occurred */
bool batches_decompressed;
bool has_joins;
};

static bool decompress_chunk_walker(PlanState *ps, struct decompress_chunk_context *ctx);
Expand Down Expand Up @@ -751,6 +780,13 @@ decompress_chunk_walker(PlanState *ps, struct decompress_chunk_context *ctx)
needs_decompression = true;
break;
}
case T_NestLoopState:
case T_MergeJoinState:
case T_HashJoinState:
{
ctx->has_joins = true;
break;
}
default:
break;
}
Expand All @@ -777,7 +813,8 @@ decompress_chunk_walker(PlanState *ps, struct decompress_chunk_context *ctx)
batches_decompressed = decompress_batches_for_update_delete(ctx->ht_state,
current_chunk,
predicates,
ps->state);
ps->state,
ctx->has_joins);
ctx->batches_decompressed |= batches_decompressed;

/* This is a workaround specifically for bitmap heap scans:
Expand Down Expand Up @@ -1415,3 +1452,58 @@ key_column_is_null(tuple_filtering_constraints *constraints, Relation chunk_rel,

return false;
}

static bool
can_delete_without_decompression(HypertableModifyState *ht_state, CompressionSettings *settings,
Chunk *chunk, List *predicates)
{
ListCell *lc;

if (!ts_guc_enable_compressed_direct_batch_delete)
return false;

/*
* If there is a RETURNING clause we skip the optimization to delete compressed batches directly
*/
if (ht_state->mt->returningLists)
return false;

/*
* If there are any DELETE row triggers on the hypertable we skip the optimization
* to delete compressed batches directly.
*/
ModifyTableState *ps =
linitial_node(ModifyTableState, castNode(CustomScanState, ht_state)->custom_ps);
if (ps->rootResultRelInfo->ri_TrigDesc)
{
TriggerDesc *trigdesc = ps->rootResultRelInfo->ri_TrigDesc;
if (trigdesc->trig_delete_before_row || trigdesc->trig_delete_after_row ||
trigdesc->trig_delete_instead_row)
{
return false;
}
}

foreach (lc, predicates)
{
Node *node = lfirst(lc);
Var *var;
Expr *arg_value;
Oid opno;

if (ts_extract_expr_args((Expr *) node, &var, &arg_value, &opno, NULL))
{
if (!IsA(arg_value, Const))
{
return false;
}
char *column_name = get_attname(chunk->table_id, var->varattno, false);
if (ts_array_is_member(settings->fd.segmentby, column_name))
{
continue;
}
}
return false;
}
return true;
}
Loading
Loading