Skip to content

Commit

Permalink
Don't copy compressed slot to compressed batch struct (#6806)
Browse files Browse the repository at this point in the history
There is overhead associated with copying the heap tuple and (un)pinning
the respective heap buffers, which becomes apparent in vectorized
aggregation.

Instead of this, it is enough to copy the by-reference segmentby values
to the per-batch context.

Also we have to copy in the rare case where the compressed data is
inlined into the compressed row and not toasted.
  • Loading branch information
akuzm authored Apr 11, 2024
1 parent 971e6c3 commit 610db31
Show file tree
Hide file tree
Showing 7 changed files with 102 additions and 103 deletions.
8 changes: 4 additions & 4 deletions tsl/src/compression/compression.c
Original file line number Diff line number Diff line change
Expand Up @@ -1652,10 +1652,10 @@ decompress_batch(RowDecompressor *decompressor)

/* Normal compressed column. */
Datum compressed_datum = PointerGetDatum(
detoaster_detoast_attr((struct varlena *) DatumGetPointer(
decompressor->compressed_datums[input_column]),
&decompressor->detoaster,
CurrentMemoryContext));
detoaster_detoast_attr_copy((struct varlena *) DatumGetPointer(
decompressor->compressed_datums[input_column]),
&decompressor->detoaster,
CurrentMemoryContext));
CompressedDataHeader *header = get_compressed_data_header(compressed_datum);
column_info->iterator =
definitions[header->compression_algorithm]
Expand Down
136 changes: 73 additions & 63 deletions tsl/src/nodes/decompress_chunk/compressed_batch.c
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,8 @@ get_max_text_datum_size(ArrowArray *text_array)
}

static void
decompress_column(DecompressContext *dcontext, DecompressBatchState *batch_state, int i)
decompress_column(DecompressContext *dcontext, DecompressBatchState *batch_state,
TupleTableSlot *compressed_slot, int i)
{
CompressionColumnDescription *column_description = &dcontext->template_columns[i];
CompressedColumnValues *column_values = &batch_state->compressed_columns[i];
Expand All @@ -168,9 +169,7 @@ decompress_column(DecompressContext *dcontext, DecompressBatchState *batch_state
Assert(value_bytes != 0);

bool isnull;
Datum value = slot_getattr(batch_state->compressed_slot,
column_description->compressed_scan_attno,
&isnull);
Datum value = slot_getattr(compressed_slot, column_description->compressed_scan_attno, &isnull);

if (isnull)
{
Expand All @@ -188,9 +187,9 @@ decompress_column(DecompressContext *dcontext, DecompressBatchState *batch_state
}

/* Detoast the compressed datum. */
value = PointerGetDatum(detoaster_detoast_attr((struct varlena *) DatumGetPointer(value),
&dcontext->detoaster,
batch_state->per_batch_context));
value = PointerGetDatum(detoaster_detoast_attr_copy((struct varlena *) DatumGetPointer(value),
&dcontext->detoaster,
batch_state->per_batch_context));

/* Decompress the entire batch if it is supported. */
CompressedDataHeader *header = (CompressedDataHeader *) value;
Expand Down Expand Up @@ -330,8 +329,8 @@ translate_bitmap_from_dictionary(const ArrowArray *arrow, const uint64 *dict_res
}

static void
compute_plain_qual(DecompressContext *dcontext, DecompressBatchState *batch_state, Node *qual,
uint64 *restrict result)
compute_plain_qual(DecompressContext *dcontext, DecompressBatchState *batch_state,
TupleTableSlot *compressed_slot, Node *qual, uint64 *restrict result)
{
/*
* Some predicates can be evaluated to a Const at run time.
Expand Down Expand Up @@ -423,7 +422,7 @@ compute_plain_qual(DecompressContext *dcontext, DecompressBatchState *batch_stat
* skip decompressing some columns if the entire batch doesn't pass
* the quals.
*/
decompress_column(dcontext, batch_state, column_index);
decompress_column(dcontext, batch_state, compressed_slot, column_index);
Assert(column_values->decompression_type != DT_Invalid);
}

Expand Down Expand Up @@ -566,16 +565,16 @@ compute_plain_qual(DecompressContext *dcontext, DecompressBatchState *batch_stat
}

static void compute_one_qual(DecompressContext *dcontext, DecompressBatchState *batch_state,
Node *qual, uint64 *restrict result);
TupleTableSlot *compressed_slot, Node *qual, uint64 *restrict result);

static void
compute_qual_conjunction(DecompressContext *dcontext, DecompressBatchState *batch_state,
List *quals, uint64 *restrict result)
TupleTableSlot *compressed_slot, List *quals, uint64 *restrict result)
{
ListCell *lc;
foreach (lc, quals)
{
compute_one_qual(dcontext, batch_state, lfirst(lc), result);
compute_one_qual(dcontext, batch_state, compressed_slot, lfirst(lc), result);
if (get_vector_qual_summary(result, batch_state->total_batch_rows) == NoRowsPass)
{
/*
Expand All @@ -589,7 +588,7 @@ compute_qual_conjunction(DecompressContext *dcontext, DecompressBatchState *batc

static void
compute_qual_disjunction(DecompressContext *dcontext, DecompressBatchState *batch_state,
List *quals, uint64 *restrict result)
TupleTableSlot *compressed_slot, List *quals, uint64 *restrict result)
{
const size_t n_rows = batch_state->total_batch_rows;
const size_t n_result_words = (n_rows + 63) / 64;
Expand All @@ -608,7 +607,7 @@ compute_qual_disjunction(DecompressContext *dcontext, DecompressBatchState *batc
{
one_qual_result[i] = (uint64) -1;
}
compute_one_qual(dcontext, batch_state, lfirst(lc), one_qual_result);
compute_one_qual(dcontext, batch_state, compressed_slot, lfirst(lc), one_qual_result);
for (size_t i = 0; i < n_result_words; i++)
{
or_result[i] |= one_qual_result[i];
Expand All @@ -631,19 +630,19 @@ compute_qual_disjunction(DecompressContext *dcontext, DecompressBatchState *batc
}

static void
compute_one_qual(DecompressContext *dcontext, DecompressBatchState *batch_state, Node *qual,
uint64 *restrict result)
compute_one_qual(DecompressContext *dcontext, DecompressBatchState *batch_state,
TupleTableSlot *compressed_slot, Node *qual, uint64 *restrict result)
{
if (!IsA(qual, BoolExpr))
{
compute_plain_qual(dcontext, batch_state, qual, result);
compute_plain_qual(dcontext, batch_state, compressed_slot, qual, result);
return;
}

BoolExpr *boolexpr = castNode(BoolExpr, qual);
if (boolexpr->boolop == AND_EXPR)
{
compute_qual_conjunction(dcontext, batch_state, boolexpr->args, result);
compute_qual_conjunction(dcontext, batch_state, compressed_slot, boolexpr->args, result);
return;
}

Expand All @@ -652,7 +651,7 @@ compute_one_qual(DecompressContext *dcontext, DecompressBatchState *batch_state,
* NOT and consider it non-vectorizable at planning time. So only OR is left.
*/
Ensure(boolexpr->boolop == OR_EXPR, "expected OR");
compute_qual_disjunction(dcontext, batch_state, boolexpr->args, result);
compute_qual_disjunction(dcontext, batch_state, compressed_slot, boolexpr->args, result);
}

/*
Expand All @@ -661,7 +660,8 @@ compute_one_qual(DecompressContext *dcontext, DecompressBatchState *batch_state,
* optimizations.
*/
static VectorQualSummary
compute_vector_quals(DecompressContext *dcontext, DecompressBatchState *batch_state)
compute_vector_quals(DecompressContext *dcontext, DecompressBatchState *batch_state,
TupleTableSlot *compressed_slot)
{
/*
* Allocate the bitmap that will hold the vectorized qual results. We will
Expand All @@ -688,6 +688,7 @@ compute_vector_quals(DecompressContext *dcontext, DecompressBatchState *batch_st
*/
compute_qual_conjunction(dcontext,
batch_state,
compressed_slot,
dcontext->vectorized_quals_constified,
batch_state->vector_qual_result);

Expand All @@ -709,7 +710,6 @@ compressed_batch_discard_tuples(DecompressBatchState *batch_state)

if (batch_state->per_batch_context != NULL)
{
ExecClearTuple(batch_state->compressed_slot);
ExecClearTuple(&batch_state->decompressed_scan_slot_data.base);
MemoryContextReset(batch_state->per_batch_context);
}
Expand All @@ -720,7 +720,6 @@ compressed_batch_discard_tuples(DecompressBatchState *batch_state)
*/
Assert(IsA(&batch_state->decompressed_scan_slot_data, Invalid));
Assert(batch_state->decompressed_scan_slot_data.base.tts_ops == NULL);
Assert(batch_state->compressed_slot == NULL);
}
}

Expand All @@ -730,24 +729,12 @@ compressed_batch_discard_tuples(DecompressBatchState *batch_state)
* relatively expensive.
*/
static void
compressed_batch_lazy_init(DecompressContext *dcontext, DecompressBatchState *batch_state,
TupleTableSlot *compressed_slot)
compressed_batch_lazy_init(DecompressContext *dcontext, DecompressBatchState *batch_state)
{
/* Init memory context */
batch_state->per_batch_context = create_per_batch_mctx(dcontext);
Assert(batch_state->per_batch_context != NULL);

Assert(batch_state->compressed_slot == NULL);

/* Create a non ref-counted copy of the compressed tuple descriptor */
if (dcontext->compressed_slot_tdesc == NULL)
dcontext->compressed_slot_tdesc =
CreateTupleDescCopyConstr(compressed_slot->tts_tupleDescriptor);
Assert(dcontext->compressed_slot_tdesc->tdrefcount == -1);

batch_state->compressed_slot =
MakeSingleTupleTableSlot(dcontext->compressed_slot_tdesc, compressed_slot->tts_ops);

/* Get a reference to the output TupleTableSlot */
TupleTableSlot *decompressed_slot = dcontext->decompressed_slot;

Expand All @@ -771,11 +758,19 @@ compressed_batch_lazy_init(DecompressContext *dcontext, DecompressBatchState *ba

slot->tts_mcxt = CurrentMemoryContext;
slot->tts_nvalid = 0;
slot->tts_values = palloc(MAXALIGN(slot->tts_tupleDescriptor->natts * sizeof(Datum)) +
MAXALIGN(slot->tts_tupleDescriptor->natts * sizeof(bool)));
slot->tts_values = palloc0(MAXALIGN(slot->tts_tupleDescriptor->natts * sizeof(Datum)) +
MAXALIGN(slot->tts_tupleDescriptor->natts * sizeof(bool)));
slot->tts_isnull = (bool *) ((char *) slot->tts_values) +
MAXALIGN(slot->tts_tupleDescriptor->natts * sizeof(Datum));

/*
* Have to initially set nulls to true, because this is the uncompressed chunk
* tuple, and some of its columns might be not even decompressed. The tuple
* slot functions will get confused by them, because they expect a non-null
* value for attributes not marked as null.
*/
memset(slot->tts_isnull, true, slot->tts_tupleDescriptor->natts * sizeof(bool));

/*
* DecompressChunk produces virtual tuple slots.
*/
Expand All @@ -788,7 +783,8 @@ compressed_batch_lazy_init(DecompressContext *dcontext, DecompressBatchState *ba
*/
void
compressed_batch_set_compressed_tuple(DecompressContext *dcontext,
DecompressBatchState *batch_state, TupleTableSlot *subslot)
DecompressBatchState *batch_state,
TupleTableSlot *compressed_slot)
{
Assert(TupIsNull(compressed_batch_current_tuple(batch_state)));

Expand All @@ -798,23 +794,10 @@ compressed_batch_set_compressed_tuple(DecompressContext *dcontext,
*/
if (batch_state->per_batch_context == NULL)
{
compressed_batch_lazy_init(dcontext, batch_state, subslot);
}
else
{
Assert(batch_state->compressed_slot != NULL);
compressed_batch_lazy_init(dcontext, batch_state);
}

/* Ensure that all fields are empty. Calling ExecClearTuple is not enough
* because some attributes might not be populated (e.g., due to a dropped
* column) and these attributes need to be set to null. */
TupleTableSlot *decompressed_tuple = compressed_batch_current_tuple(batch_state);
Assert(decompressed_tuple != NULL);
ExecStoreAllNullTuple(decompressed_tuple);
ExecClearTuple(decompressed_tuple);

ExecCopySlot(batch_state->compressed_slot, subslot);
Assert(!TupIsNull(batch_state->compressed_slot));

batch_state->total_batch_rows = 0;
batch_state->next_batch_row = 0;
Expand Down Expand Up @@ -849,15 +832,43 @@ compressed_batch_set_compressed_tuple(DecompressContext *dcontext,
*/
AttrNumber attr = AttrNumberGetAttrOffset(column_description->output_attno);
decompressed_tuple->tts_values[attr] =
slot_getattr(batch_state->compressed_slot,
slot_getattr(compressed_slot,
column_description->compressed_scan_attno,
&decompressed_tuple->tts_isnull[attr]);

/*
* Note that if it's not a by-value type, we should copy it into
* the slot context.
*/
if (!column_description->by_value &&
DatumGetPointer(decompressed_tuple->tts_values[attr]) != NULL)
{
if (column_description->value_bytes < 0)
{
/* This is a varlena type. */
decompressed_tuple->tts_values[attr] = PointerGetDatum(
detoaster_detoast_attr_copy((struct varlena *)
decompressed_tuple->tts_values[attr],
&dcontext->detoaster,
batch_state->per_batch_context));
}
else
{
/* This is a fixed-length by-reference type. */
void *tmp = MemoryContextAlloc(batch_state->per_batch_context,
column_description->value_bytes);
memcpy(tmp,
DatumGetPointer(decompressed_tuple->tts_values[attr]),
column_description->value_bytes);
decompressed_tuple->tts_values[attr] = PointerGetDatum(tmp);
}
}
break;
}
case COUNT_COLUMN:
{
bool isnull;
Datum value = slot_getattr(batch_state->compressed_slot,
Datum value = slot_getattr(compressed_slot,
column_description->compressed_scan_attno,
&isnull);
/* count column should never be NULL */
Expand Down Expand Up @@ -885,9 +896,10 @@ compressed_batch_set_compressed_tuple(DecompressContext *dcontext,
}
}

VectorQualSummary vector_qual_summary = dcontext->vectorized_quals_constified != NIL ?
compute_vector_quals(dcontext, batch_state) :
AllRowsPass;
VectorQualSummary vector_qual_summary =
dcontext->vectorized_quals_constified != NIL ?
compute_vector_quals(dcontext, batch_state, compressed_slot) :
AllRowsPass;
if (vector_qual_summary == NoRowsPass && !dcontext->batch_sorted_merge)
{
/*
Expand Down Expand Up @@ -917,7 +929,7 @@ compressed_batch_set_compressed_tuple(DecompressContext *dcontext,
CompressedColumnValues *column_values = &batch_state->compressed_columns[i];
if (column_values->decompression_type == DT_Invalid)
{
decompress_column(dcontext, batch_state, i);
decompress_column(dcontext, batch_state, compressed_slot, i);
Assert(column_values->decompression_type != DT_Invalid);
}
}
Expand Down Expand Up @@ -1225,16 +1237,14 @@ compressed_batch_destroy(DecompressBatchState *batch_state)
batch_state->per_batch_context = NULL;
}

if (batch_state->compressed_slot != NULL)
if (batch_state->decompressed_scan_slot_data.base.tts_values != NULL)
{
/*
* Can be separately NULL in the current simplified prototype for
* vectorized aggregation, but ideally it should change together with
* per-batch context.
*/
ExecDropSingleTupleTableSlot(batch_state->compressed_slot);
batch_state->compressed_slot = NULL;

pfree(batch_state->decompressed_scan_slot_data.base.tts_values);
batch_state->decompressed_scan_slot_data.base.tts_values = NULL;
}
}
8 changes: 1 addition & 7 deletions tsl/src/nodes/decompress_chunk/compressed_batch.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,6 @@ typedef struct DecompressBatchState
*/
VirtualTupleTableSlot decompressed_scan_slot_data;

/*
* Compressed target slot. We have to keep a local copy when doing batch
* sorted merge, because the segmentby column values might reference the
* original tuple, and a batch outlives its source tuple.
*/
TupleTableSlot *compressed_slot;
uint16 total_batch_rows;
uint16 next_batch_row;
MemoryContext per_batch_context;
Expand All @@ -104,7 +98,7 @@ typedef struct DecompressBatchState

extern void compressed_batch_set_compressed_tuple(DecompressContext *dcontext,
DecompressBatchState *batch_state,
TupleTableSlot *subslot);
TupleTableSlot *compressed_slot);

extern void compressed_batch_advance(DecompressContext *dcontext,
DecompressBatchState *batch_state);
Expand Down
18 changes: 2 additions & 16 deletions tsl/src/nodes/decompress_chunk/decompress_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ typedef struct CompressionColumnDescription
{
CompressionColumnType type;
Oid typid;
int value_bytes;
int16 value_bytes;
bool by_value;

/*
* Attno of the decompressed column in the output of DecompressChunk node.
Expand Down Expand Up @@ -63,21 +64,6 @@ typedef struct DecompressContext

TupleTableSlot *decompressed_slot;

/*
* Make non-refcounted copies of the tupdesc for reuse across all batch states
* and avoid spending CPU in ResourceOwner when creating a big number of table
* slots. This happens because each new slot pins its tuple descriptor using
* PinTupleDesc, and for reference-counting tuples this involves adding a new
* reference to ResourceOwner, which is not very efficient for a large number of
* references.
*
* We don't have to do this for the decompressed slot tuple descriptor,
* because there we use custom tuple slot (de)initialization functions, which
* don't use reference counting and just use a raw pointer to the tuple
* descriptor.
*/
TupleDesc compressed_slot_tdesc;

PlanState *ps; /* Set for filtering and instrumentation */

Detoaster detoaster;
Expand Down
Loading

0 comments on commit 610db31

Please sign in to comment.