Skip to content

Commit

Permalink
Allow DELETE on compressed chunks without decompression
Browse files Browse the repository at this point in the history
When the constraints of a DELETE on a compressed chunks fully cover the
batches we can optimize the DELETE to work directly on the compressed
batches and skip the expensive decompression part.
  • Loading branch information
svenklemm committed May 3, 2024
1 parent 67bd5a8 commit c22c271
Show file tree
Hide file tree
Showing 4 changed files with 121 additions and 4 deletions.
2 changes: 2 additions & 0 deletions src/nodes/hypertable_modify.c
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,8 @@ hypertable_modify_explain(CustomScanState *node, List *ancestors, ExplainState *
state->tuples_decompressed += cds->tuples_decompressed;
}
}
if (state->batches_deleted > 0)
ExplainPropertyInteger("Batches deleted", NULL, state->batches_deleted, es);
if (state->batches_decompressed > 0)
ExplainPropertyInteger("Batches decompressed", NULL, state->batches_decompressed, es);
if (state->tuples_decompressed > 0)
Expand Down
1 change: 1 addition & 0 deletions src/nodes/hypertable_modify.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ typedef struct HypertableModifyState
Snapshot snapshot;
int64 tuples_decompressed;
int64 batches_decompressed;
int64 batches_deleted;
} HypertableModifyState;

extern void ts_hypertable_modify_fixup_tlist(Plan *plan);
Expand Down
119 changes: 115 additions & 4 deletions tsl/src/compression/compression.c
Original file line number Diff line number Diff line change
Expand Up @@ -2865,8 +2865,15 @@ decompress_batches(RowDecompressor *decompressor, ScanKeyData *scankeys, int num
table_endscan(scan);
report_error(result);
}
row_decompressor_decompress_row_to_table(decompressor);
*chunk_status_changed = true;
if (decompressor->delete_only)
{
decompressor->batches_deleted++;
}
else
{
row_decompressor_decompress_row_to_table(decompressor);
*chunk_status_changed = true;
}
}
if (scankeys)
pfree(scankeys);
Expand Down Expand Up @@ -3043,8 +3050,15 @@ decompress_batches_using_index(RowDecompressor *decompressor, Relation index_rel
index_close(index_rel, AccessShareLock);
report_error(result);
}
row_decompressor_decompress_row_to_table(decompressor);
*chunk_status_changed = true;
if (decompressor->delete_only)
{
decompressor->batches_deleted++;
}
else
{
row_decompressor_decompress_row_to_table(decompressor);
*chunk_status_changed = true;
}
}

if (ts_guc_debug_compression_path_info)
Expand All @@ -3062,6 +3076,96 @@ decompress_batches_using_index(RowDecompressor *decompressor, Relation index_rel
return true;
}

static bool
extract_operands(Node *node, Var **var, Const **arg_value)
{
switch (nodeTag(node))
{
case T_OpExpr:
{
OpExpr *opexpr = (OpExpr *) node;

Expr *leftop = linitial(opexpr->args);
Expr *rightop = lsecond(opexpr->args);

if (IsA(leftop, RelabelType))
leftop = ((RelabelType *) leftop)->arg;
if (IsA(rightop, RelabelType))
rightop = ((RelabelType *) rightop)->arg;

if (IsA(leftop, Var) && IsA(rightop, Const))
{
*var = (Var *) leftop;
*arg_value = (Const *) rightop;
return true;
}
else if (IsA(rightop, Var) && IsA(leftop, Const))
{
*var = (Var *) rightop;
*arg_value = (Const *) leftop;
return true;
}
break;
}
case T_ScalarArrayOpExpr:
{
ScalarArrayOpExpr *opexpr = (ScalarArrayOpExpr *) node;

Expr *leftop = linitial(opexpr->args);
Expr *rightop = lsecond(opexpr->args);

if (IsA(leftop, RelabelType))
leftop = ((RelabelType *) leftop)->arg;
if (IsA(rightop, RelabelType))
rightop = ((RelabelType *) rightop)->arg;

if (IsA(leftop, Var) && IsA(rightop, Const))
{
*var = (Var *) leftop;
*arg_value = (Const *) rightop;
return true;
}
else if (IsA(rightop, Var) && IsA(leftop, Const))
{
*var = (Var *) rightop;
*arg_value = (Const *) leftop;
return true;
}
break;
}
default:
elog(NOTICE, "Unsupported node type %d", nodeTag(node));
break;
}

return false;
}

static bool
can_delete_without_decompression(CompressionSettings *settings, Chunk *chunk, List *predicates)
{
ListCell *lc;

foreach (lc, predicates)
{
Node *node = lfirst(lc);
Var *var;
Const *arg_value;

if (extract_operands(node, &var, &arg_value))
{
char *column_name = get_attname(chunk->table_id, var->varattno, false);
if (ts_array_is_member(settings->fd.segmentby, column_name))
{
continue;
}
}
return false;
}

return true;
}

/*
* This method will:
* 1. Evaluate WHERE clauses and check if SEGMENT BY columns
Expand Down Expand Up @@ -3103,6 +3207,12 @@ decompress_batches_for_update_delete(HypertableModifyState *ht_state, Chunk *chu
comp_chunk_rel = table_open(comp_chunk->table_id, RowExclusiveLock);
decompressor = build_decompressor(comp_chunk_rel, chunk_rel);

if (ht_state->mt->operation == CMD_DELETE &&
can_delete_without_decompression(settings, chunk, predicates))
{
decompressor.delete_only = true;
}

if (index_filters)
{
matching_index_rel = find_matching_index(comp_chunk_rel, &index_filters, &heap_filters);
Expand Down Expand Up @@ -3163,6 +3273,7 @@ decompress_batches_for_update_delete(HypertableModifyState *ht_state, Chunk *chu
filter = lfirst(lc);
pfree(filter);
}
ht_state->batches_deleted += decompressor.batches_deleted;
ht_state->batches_decompressed += decompressor.batches_decompressed;
ht_state->tuples_decompressed += decompressor.tuples_decompressed;
}
Expand Down
3 changes: 3 additions & 0 deletions tsl/src/compression/compression.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@ typedef struct RowDecompressor
CommandId mycid;
BulkInsertState bistate;

bool delete_only;

Datum *compressed_datums;
bool *compressed_is_nulls;

Expand All @@ -147,6 +149,7 @@ typedef struct RowDecompressor
MemoryContext per_compressed_row_ctx;
int64 batches_decompressed;
int64 tuples_decompressed;
int64 batches_deleted;

TupleTableSlot **decompressed_slots;

Expand Down

0 comments on commit c22c271

Please sign in to comment.