Skip to content

Commit

Permalink
Allow DELETE on compressed chunks without decompression
Browse files Browse the repository at this point in the history
When the constraints of a DELETE on a compressed chunks fully cover the
batches we can optimize the DELETE to work directly on the compressed
batches and skip the expensive decompression part.
  • Loading branch information
svenklemm committed Jul 28, 2024
1 parent 377cc15 commit 69e55d1
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 12 deletions.
1 change: 1 addition & 0 deletions src/nodes/chunk_dispatch/chunk_dispatch.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ typedef struct ChunkDispatchState
ResultRelInfo *rri;
/* flag to represent dropped attributes */
bool is_dropped_attr_exists;
int64 batches_deleted;
int64 batches_filtered;
int64 batches_decompressed;
int64 tuples_decompressed;
Expand Down
3 changes: 3 additions & 0 deletions src/nodes/hypertable_modify.c
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ hypertable_modify_explain(CustomScanState *node, List *ancestors, ExplainState *
foreach (lc, chunk_dispatch_states)
{
ChunkDispatchState *cds = (ChunkDispatchState *) lfirst(lc);
state->batches_deleted += cds->batches_deleted;
state->batches_filtered += cds->batches_filtered;
state->batches_decompressed += cds->batches_decompressed;
state->tuples_decompressed += cds->tuples_decompressed;
Expand All @@ -251,6 +252,8 @@ hypertable_modify_explain(CustomScanState *node, List *ancestors, ExplainState *
ExplainPropertyInteger("Batches decompressed", NULL, state->batches_decompressed, es);
if (state->tuples_decompressed > 0)
ExplainPropertyInteger("Tuples decompressed", NULL, state->tuples_decompressed, es);
if (state->batches_deleted > 0)
ExplainPropertyInteger("Batches deleted", NULL, state->batches_deleted, es);
}

static CustomExecMethods hypertable_modify_state_methods = {
Expand Down
1 change: 1 addition & 0 deletions src/nodes/hypertable_modify.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ typedef struct HypertableModifyState
int64 tuples_decompressed;
int64 batches_decompressed;
int64 batches_filtered;
int64 batches_deleted;
} HypertableModifyState;

extern void ts_hypertable_modify_fixup_tlist(Plan *plan);
Expand Down
4 changes: 4 additions & 0 deletions tsl/src/compression/compression.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@ typedef struct RowDecompressor
CommandId mycid;
BulkInsertState bistate;

bool delete_only;

Datum *compressed_datums;
bool *compressed_is_nulls;

Expand All @@ -147,6 +149,7 @@ typedef struct RowDecompressor
MemoryContext per_compressed_row_ctx;
int64 batches_decompressed;
int64 tuples_decompressed;
int64 batches_deleted;

TupleTableSlot **decompressed_slots;
int unprocessed_tuples;
Expand Down Expand Up @@ -410,6 +413,7 @@ const CompressionAlgorithmDefinition *algorithm_definition(CompressionAlgorithm

struct decompress_batches_stats
{
int64 batches_deleted;
int64 batches_filtered;
int64 batches_decompressed;
int64 tuples_decompressed;
Expand Down
101 changes: 89 additions & 12 deletions tsl/src/compression/compression_dml.c
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,11 @@ decompress_batches_indexscan(Relation in_rel, Relation out_rel, Relation index_r
ScanKeyData *heap_scankeys, int num_heap_scankeys,
ScanKeyData *mem_scankeys, int num_mem_scankeys,
tuple_filtering_constraints *constraints, bool *skip_current_tuple,
Bitmapset *null_columns, List *is_nulls);
static struct decompress_batches_stats
decompress_batches_seqscan(Relation in_rel, Relation out_rel, Snapshot snapshot,
ScanKeyData *scankeys, int num_scankeys, ScanKeyData *mem_scankeys,
int num_mem_scankeys, tuple_filtering_constraints *constraints,
bool *skip_current_tuple, Bitmapset *null_columns, List *is_nulls);
bool delete_only, Bitmapset *null_columns, List *is_nulls);
static struct decompress_batches_stats decompress_batches_seqscan(
Relation in_rel, Relation out_rel, Snapshot snapshot, ScanKeyData *scankeys, int num_scankeys,
ScanKeyData *mem_scankeys, int num_mem_scankeys, tuple_filtering_constraints *constraints,
bool *skip_current_tuple, bool delete_only, Bitmapset *null_columns, List *is_nulls);

static bool batch_matches(RowDecompressor *decompressor, ScanKeyData *scankeys, int num_scankeys,
tuple_filtering_constraints *constraints, bool *skip_current_tuple);
Expand All @@ -60,6 +59,9 @@ static void report_error(TM_Result result);

static bool key_column_is_null(tuple_filtering_constraints *constraints, Relation chunk_rel,
Oid ht_relid, TupleTableSlot *slot);
static bool can_delete_without_decompression(HypertableModifyState *ht_state,
CompressionSettings *settings, Chunk *chunk,
List *predicates);

void
decompress_batches_for_insert(const ChunkInsertState *cis, TupleTableSlot *slot)
Expand Down Expand Up @@ -167,6 +169,7 @@ decompress_batches_for_insert(const ChunkInsertState *cis, TupleTableSlot *slot)
num_mem_scankeys,
constraints,
&skip_current_tuple,
false,
NULL, /* no null column check for non-segmentby
columns */
NIL);
Expand All @@ -193,6 +196,7 @@ decompress_batches_for_insert(const ChunkInsertState *cis, TupleTableSlot *slot)
num_mem_scankeys,
constraints,
&skip_current_tuple,
false,
null_columns,
NIL);
}
Expand All @@ -203,6 +207,7 @@ decompress_batches_for_insert(const ChunkInsertState *cis, TupleTableSlot *slot)
cis->cds->skip_current_tuple = true;
}

cis->cds->batches_deleted += stats.batches_deleted;
cis->cds->batches_filtered += stats.batches_filtered;
cis->cds->batches_decompressed += stats.batches_decompressed;
cis->cds->tuples_decompressed += stats.tuples_decompressed;
Expand Down Expand Up @@ -248,6 +253,8 @@ decompress_batches_for_update_delete(HypertableModifyState *ht_state, Chunk *chu

comp_chunk = ts_chunk_get_by_id(chunk->fd.compressed_chunk_id, true);
CompressionSettings *settings = ts_compression_settings_get(comp_chunk->table_id);
bool delete_only = ht_state->mt->operation == CMD_DELETE &&
can_delete_without_decompression(ht_state, settings, chunk, predicates);

process_predicates(chunk,
settings,
Expand Down Expand Up @@ -289,6 +296,7 @@ decompress_batches_for_update_delete(HypertableModifyState *ht_state, Chunk *chu
num_mem_scankeys,
NULL,
NULL,
delete_only,
null_columns,
is_null);
/* close the selected index */
Expand All @@ -305,6 +313,7 @@ decompress_batches_for_update_delete(HypertableModifyState *ht_state, Chunk *chu
num_mem_scankeys,
NULL,
NULL,
delete_only,
null_columns,
is_null);
}
Expand All @@ -329,6 +338,7 @@ decompress_batches_for_update_delete(HypertableModifyState *ht_state, Chunk *chu
filter = lfirst(lc);
pfree(filter);
}
ht_state->batches_deleted += stats.batches_deleted;
ht_state->batches_filtered += stats.batches_filtered;
ht_state->batches_decompressed += stats.batches_decompressed;
ht_state->tuples_decompressed += stats.tuples_decompressed;
Expand All @@ -351,7 +361,7 @@ decompress_batches_indexscan(Relation in_rel, Relation out_rel, Relation index_r
ScanKeyData *heap_scankeys, int num_heap_scankeys,
ScanKeyData *mem_scankeys, int num_mem_scankeys,
tuple_filtering_constraints *constraints, bool *skip_current_tuple,
Bitmapset *null_columns, List *is_nulls)
bool delete_only, Bitmapset *null_columns, List *is_nulls)
{
HeapTuple compressed_tuple;
RowDecompressor decompressor;
Expand Down Expand Up @@ -426,6 +436,8 @@ decompress_batches_indexscan(Relation in_rel, Relation out_rel, Relation index_r
if (!decompressor_initialized)
{
decompressor = build_decompressor(in_rel, out_rel);
decompressor.delete_only = delete_only;

decompressor_initialized = true;
}

Expand Down Expand Up @@ -475,8 +487,15 @@ decompress_batches_indexscan(Relation in_rel, Relation out_rel, Relation index_r
report_error(result);
return stats;
}
stats.tuples_decompressed += row_decompressor_decompress_row_to_table(&decompressor);
stats.batches_decompressed++;
if (decompressor.delete_only)
{
stats.batches_deleted++;
}
else
{
stats.tuples_decompressed += row_decompressor_decompress_row_to_table(&decompressor);
stats.batches_decompressed++;
}
write_logical_replication_msg_decompression_end();
}

Expand Down Expand Up @@ -515,7 +534,8 @@ static struct decompress_batches_stats
decompress_batches_seqscan(Relation in_rel, Relation out_rel, Snapshot snapshot,
ScanKeyData *scankeys, int num_scankeys, ScanKeyData *mem_scankeys,
int num_mem_scankeys, tuple_filtering_constraints *constraints,
bool *skip_current_tuple, Bitmapset *null_columns, List *is_nulls)
bool *skip_current_tuple, bool delete_only, Bitmapset *null_columns,
List *is_nulls)
{
RowDecompressor decompressor;
bool decompressor_initialized = false;
Expand Down Expand Up @@ -568,6 +588,7 @@ decompress_batches_seqscan(Relation in_rel, Relation out_rel, Snapshot snapshot,
if (!decompressor_initialized)
{
decompressor = build_decompressor(in_rel, out_rel);
decompressor.delete_only = delete_only;
decompressor_initialized = true;
}

Expand Down Expand Up @@ -612,8 +633,15 @@ decompress_batches_seqscan(Relation in_rel, Relation out_rel, Snapshot snapshot,
table_endscan(scan);
report_error(result);
}
stats.tuples_decompressed += row_decompressor_decompress_row_to_table(&decompressor);
stats.batches_decompressed++;
if (decompressor.delete_only)
{
stats.batches_deleted++;
}
else
{
stats.tuples_decompressed += row_decompressor_decompress_row_to_table(&decompressor);
stats.batches_decompressed++;
}
write_logical_replication_msg_decompression_end();
}
if (scankeys)
Expand Down Expand Up @@ -1415,3 +1443,52 @@ key_column_is_null(tuple_filtering_constraints *constraints, Relation chunk_rel,

return false;
}

static bool
can_delete_without_decompression(HypertableModifyState *ht_state, CompressionSettings *settings,
Chunk *chunk, List *predicates)
{
ListCell *lc;

/*
* If there are any DELETE row triggers on the hypertable we skip the optimization
* to delete compressed batches directly.
*/
ModifyTableState *ps =
linitial_node(ModifyTableState, castNode(CustomScanState, ht_state)->custom_ps);
if (ps->rootResultRelInfo->ri_TrigDesc)
{
TriggerDesc *trigdesc = ps->rootResultRelInfo->ri_TrigDesc;
if (trigdesc->trig_delete_before_row || trigdesc->trig_delete_after_row ||
trigdesc->trig_delete_instead_row)
{
return false;
}
}

foreach (lc, predicates)
{
Node *node = lfirst(lc);
Var *var;
Expr *arg_value;
Oid opno;

if (ts_extract_expr_args((Expr *) node, &var, &arg_value, &opno, NULL))

{
if (!IsA(arg_value, Const))

{
return false;
}
char *column_name = get_attname(chunk->table_id, var->varattno, false);
if (ts_array_is_member(settings->fd.segmentby, column_name))

{
continue;
}
}
return false;
}
return true;
}

0 comments on commit 69e55d1

Please sign in to comment.