Skip to content

Commit

Permalink
Refactor chunk decompression functions
Browse files Browse the repository at this point in the history
Restructure the code inside decompress_chunk slightly to make core
loop reusable by other functions.
  • Loading branch information
svenklemm committed Feb 6, 2023
1 parent fb3ad7d commit 8132908
Showing 1 changed file with 44 additions and 40 deletions.
84 changes: 44 additions & 40 deletions tsl/src/compression/compression.c
Original file line number Diff line number Diff line change
Expand Up @@ -1507,9 +1507,13 @@ typedef struct RowDecompressor
CommandId mycid;
BulkInsertState bistate;

/* cache memory used to store the decompressed datums/is_null for form_tuple */
Datum *compressed_datums;
bool *compressed_is_nulls;

Datum *decompressed_datums;
bool *decompressed_is_nulls;

MemoryContext per_compressed_row_ctx;
} RowDecompressor;

static PerCompressedColumn *create_per_compressed_column(TupleDesc in_desc, TupleDesc out_desc,
Expand Down Expand Up @@ -1545,9 +1549,16 @@ build_decompressor(Relation in_rel, Relation out_rel)
.mycid = GetCurrentCommandId(true),
.bistate = GetBulkInsertState(),

.compressed_datums = palloc(sizeof(Datum) * in_desc->natts),
.compressed_is_nulls = palloc(sizeof(bool) * in_desc->natts),

/* cache memory used to store the decompressed datums/is_null for form_tuple */
.decompressed_datums = palloc(sizeof(Datum) * out_desc->natts),
.decompressed_is_nulls = palloc(sizeof(bool) * out_desc->natts),

.per_compressed_row_ctx = AllocSetContextCreate(CurrentMemoryContext,
"decompress chunk per-compressed row",
ALLOCSET_DEFAULT_SIZES),
};

/*
Expand All @@ -1564,59 +1575,42 @@ build_decompressor(Relation in_rel, Relation out_rel)
void
decompress_chunk(Oid in_table, Oid out_table)
{
/* these locks are taken in the order uncompressed table then compressed table
* for consistency with compress_chunk
*/
/* we are _just_ INSERTing into the out_table so in principle we could take
/*
* Locks are taken in the order uncompressed table then compressed table
* for consistency with compress_chunk.
* We are _just_ INSERTing into the out_table so in principle we could take
* a RowExclusive lock, and let other operations read and write this table
* as we work. However, we currently compress each table as a oneshot, so
* we're taking the stricter lock to prevent accidents.
*/
Relation out_rel = table_open(out_table, ExclusiveLock);
/*We want to prevent other decompressors from decompressing this table,
* We want to prevent other decompressors from decompressing this table,
* and we want to prevent INSERTs or UPDATEs which could mess up our decompression.
* We may as well allow readers to keep reading the compressed data while
* we are compressing, so we only take an ExclusiveLock instead of AccessExclusive.
*/
Relation out_rel = table_open(out_table, ExclusiveLock);
Relation in_rel = relation_open(in_table, ExclusiveLock);

RowDecompressor decompressor = build_decompressor(in_rel, out_rel);

Datum *compressed_datums = palloc(sizeof(*compressed_datums) * decompressor.in_desc->natts);
bool *compressed_is_nulls = palloc(sizeof(*compressed_is_nulls) * decompressor.in_desc->natts);

HeapTuple compressed_tuple;
TableScanDesc heapScan = table_beginscan(in_rel, GetLatestSnapshot(), 0, (ScanKey) NULL);
MemoryContext per_compressed_row_ctx =
AllocSetContextCreate(CurrentMemoryContext,
"decompress chunk per-compressed row",
ALLOCSET_DEFAULT_SIZES);

for (compressed_tuple = heap_getnext(heapScan, ForwardScanDirection); compressed_tuple != NULL;
compressed_tuple = heap_getnext(heapScan, ForwardScanDirection))
{
MemoryContext old_ctx;

Assert(HeapTupleIsValid(compressed_tuple));

old_ctx = MemoryContextSwitchTo(per_compressed_row_ctx);

heap_deform_tuple(compressed_tuple,
decompressor.in_desc,
compressed_datums,
compressed_is_nulls);
populate_per_compressed_columns_from_data(decompressor.per_compressed_cols,
decompressor.in_desc->natts,
compressed_datums,
compressed_is_nulls);
decompressor.compressed_datums,
decompressor.compressed_is_nulls);

row_decompressor_decompress_row(&decompressor);
MemoryContextSwitchTo(old_ctx);
MemoryContextReset(per_compressed_row_ctx);
}

heap_endscan(heapScan);

FreeBulkInsertState(decompressor.bistate);
MemoryContextDelete(decompressor.per_compressed_row_ctx);

/* Recreate all indexes on out rel, we already have an exclusive lock on it,
* so the strong locks taken by reindex_relation shouldn't matter. */
Expand Down Expand Up @@ -1732,23 +1726,30 @@ populate_per_compressed_columns_from_data(PerCompressedColumn *per_compressed_co
}

static void
row_decompressor_decompress_row(RowDecompressor *row_decompressor)
row_decompressor_decompress_row(RowDecompressor *decompressor)
{
/* each compressed row decompresses to at least one row,
* even if all the data is NULL
*/
bool wrote_data = false;
bool is_done = false;

MemoryContext old_ctx = MemoryContextSwitchTo(decompressor->per_compressed_row_ctx);

populate_per_compressed_columns_from_data(decompressor->per_compressed_cols,
decompressor->in_desc->natts,
decompressor->compressed_datums,
decompressor->compressed_is_nulls);

do
{
/* we're done if all the decompressors return NULL */
is_done = true;
for (int16 col = 0; col < row_decompressor->num_compressed_columns; col++)
for (int16 col = 0; col < decompressor->num_compressed_columns; col++)
{
bool col_is_done =
per_compressed_col_get_data(&row_decompressor->per_compressed_cols[col],
row_decompressor->decompressed_datums,
row_decompressor->decompressed_is_nulls);
bool col_is_done = per_compressed_col_get_data(&decompressor->per_compressed_cols[col],
decompressor->decompressed_datums,
decompressor->decompressed_is_nulls);
is_done &= col_is_done;
}

Expand All @@ -1757,19 +1758,22 @@ row_decompressor_decompress_row(RowDecompressor *row_decompressor)
*/
if (!is_done || !wrote_data)
{
HeapTuple decompressed_tuple = heap_form_tuple(row_decompressor->out_desc,
row_decompressor->decompressed_datums,
row_decompressor->decompressed_is_nulls);
heap_insert(row_decompressor->out_rel,
HeapTuple decompressed_tuple = heap_form_tuple(decompressor->out_desc,
decompressor->decompressed_datums,
decompressor->decompressed_is_nulls);
heap_insert(decompressor->out_rel,
decompressed_tuple,
row_decompressor->mycid,
decompressor->mycid,
0 /*=options*/,
row_decompressor->bistate);
decompressor->bistate);

heap_freetuple(decompressed_tuple);
wrote_data = true;
}
} while (!is_done);

MemoryContextSwitchTo(old_ctx);
MemoryContextReset(decompressor->per_compressed_row_ctx);
}

/* populate the relevent index in an array from a per_compressed_col.
Expand Down

0 comments on commit 8132908

Please sign in to comment.