From 8132908c97ae469ace776b6cf55bbe3dd8bf8971 Mon Sep 17 00:00:00 2001 From: Sven Klemm Date: Sun, 5 Feb 2023 13:13:08 +0100 Subject: [PATCH] Refactor chunk decompression functions Restructure the code inside decompress_chunk slightly to make core loop reusable by other functions. --- tsl/src/compression/compression.c | 84 ++++++++++++++++--------------- 1 file changed, 44 insertions(+), 40 deletions(-) diff --git a/tsl/src/compression/compression.c b/tsl/src/compression/compression.c index 18c3db273e2..9ea6e617ad3 100644 --- a/tsl/src/compression/compression.c +++ b/tsl/src/compression/compression.c @@ -1507,9 +1507,13 @@ typedef struct RowDecompressor CommandId mycid; BulkInsertState bistate; - /* cache memory used to store the decompressed datums/is_null for form_tuple */ + Datum *compressed_datums; + bool *compressed_is_nulls; + Datum *decompressed_datums; bool *decompressed_is_nulls; + + MemoryContext per_compressed_row_ctx; } RowDecompressor; static PerCompressedColumn *create_per_compressed_column(TupleDesc in_desc, TupleDesc out_desc, @@ -1545,9 +1549,16 @@ build_decompressor(Relation in_rel, Relation out_rel) .mycid = GetCurrentCommandId(true), .bistate = GetBulkInsertState(), + .compressed_datums = palloc(sizeof(Datum) * in_desc->natts), + .compressed_is_nulls = palloc(sizeof(bool) * in_desc->natts), + /* cache memory used to store the decompressed datums/is_null for form_tuple */ .decompressed_datums = palloc(sizeof(Datum) * out_desc->natts), .decompressed_is_nulls = palloc(sizeof(bool) * out_desc->natts), + + .per_compressed_row_ctx = AllocSetContextCreate(CurrentMemoryContext, + "decompress chunk per-compressed row", + ALLOCSET_DEFAULT_SIZES), }; /* @@ -1564,59 +1575,42 @@ build_decompressor(Relation in_rel, Relation out_rel) void decompress_chunk(Oid in_table, Oid out_table) { - /* these locks are taken in the order uncompressed table then compressed table - * for consistency with compress_chunk - */ - /* we are _just_ INSERTing into the out_table so in principle we could take + /* + * Locks are taken in the order uncompressed table then compressed table + * for consistency with compress_chunk. + * We are _just_ INSERTing into the out_table so in principle we could take * a RowExclusive lock, and let other operations read and write this table * as we work. However, we currently compress each table as a oneshot, so * we're taking the stricter lock to prevent accidents. - */ - Relation out_rel = table_open(out_table, ExclusiveLock); - /*We want to prevent other decompressors from decompressing this table, + * We want to prevent other decompressors from decompressing this table, * and we want to prevent INSERTs or UPDATEs which could mess up our decompression. * We may as well allow readers to keep reading the compressed data while * we are compressing, so we only take an ExclusiveLock instead of AccessExclusive. */ + Relation out_rel = table_open(out_table, ExclusiveLock); Relation in_rel = relation_open(in_table, ExclusiveLock); RowDecompressor decompressor = build_decompressor(in_rel, out_rel); - Datum *compressed_datums = palloc(sizeof(*compressed_datums) * decompressor.in_desc->natts); - bool *compressed_is_nulls = palloc(sizeof(*compressed_is_nulls) * decompressor.in_desc->natts); - HeapTuple compressed_tuple; TableScanDesc heapScan = table_beginscan(in_rel, GetLatestSnapshot(), 0, (ScanKey) NULL); - MemoryContext per_compressed_row_ctx = - AllocSetContextCreate(CurrentMemoryContext, - "decompress chunk per-compressed row", - ALLOCSET_DEFAULT_SIZES); for (compressed_tuple = heap_getnext(heapScan, ForwardScanDirection); compressed_tuple != NULL; compressed_tuple = heap_getnext(heapScan, ForwardScanDirection)) { - MemoryContext old_ctx; - Assert(HeapTupleIsValid(compressed_tuple)); - - old_ctx = MemoryContextSwitchTo(per_compressed_row_ctx); - heap_deform_tuple(compressed_tuple, decompressor.in_desc, - compressed_datums, - compressed_is_nulls); - populate_per_compressed_columns_from_data(decompressor.per_compressed_cols, - decompressor.in_desc->natts, - compressed_datums, - compressed_is_nulls); + decompressor.compressed_datums, + decompressor.compressed_is_nulls); row_decompressor_decompress_row(&decompressor); - MemoryContextSwitchTo(old_ctx); - MemoryContextReset(per_compressed_row_ctx); } heap_endscan(heapScan); + FreeBulkInsertState(decompressor.bistate); + MemoryContextDelete(decompressor.per_compressed_row_ctx); /* Recreate all indexes on out rel, we already have an exclusive lock on it, * so the strong locks taken by reindex_relation shouldn't matter. */ @@ -1732,23 +1726,30 @@ populate_per_compressed_columns_from_data(PerCompressedColumn *per_compressed_co } static void -row_decompressor_decompress_row(RowDecompressor *row_decompressor) +row_decompressor_decompress_row(RowDecompressor *decompressor) { /* each compressed row decompresses to at least one row, * even if all the data is NULL */ bool wrote_data = false; bool is_done = false; + + MemoryContext old_ctx = MemoryContextSwitchTo(decompressor->per_compressed_row_ctx); + + populate_per_compressed_columns_from_data(decompressor->per_compressed_cols, + decompressor->in_desc->natts, + decompressor->compressed_datums, + decompressor->compressed_is_nulls); + do { /* we're done if all the decompressors return NULL */ is_done = true; - for (int16 col = 0; col < row_decompressor->num_compressed_columns; col++) + for (int16 col = 0; col < decompressor->num_compressed_columns; col++) { - bool col_is_done = - per_compressed_col_get_data(&row_decompressor->per_compressed_cols[col], - row_decompressor->decompressed_datums, - row_decompressor->decompressed_is_nulls); + bool col_is_done = per_compressed_col_get_data(&decompressor->per_compressed_cols[col], + decompressor->decompressed_datums, + decompressor->decompressed_is_nulls); is_done &= col_is_done; } @@ -1757,19 +1758,22 @@ row_decompressor_decompress_row(RowDecompressor *row_decompressor) */ if (!is_done || !wrote_data) { - HeapTuple decompressed_tuple = heap_form_tuple(row_decompressor->out_desc, - row_decompressor->decompressed_datums, - row_decompressor->decompressed_is_nulls); - heap_insert(row_decompressor->out_rel, + HeapTuple decompressed_tuple = heap_form_tuple(decompressor->out_desc, + decompressor->decompressed_datums, + decompressor->decompressed_is_nulls); + heap_insert(decompressor->out_rel, decompressed_tuple, - row_decompressor->mycid, + decompressor->mycid, 0 /*=options*/, - row_decompressor->bistate); + decompressor->bistate); heap_freetuple(decompressed_tuple); wrote_data = true; } } while (!is_done); + + MemoryContextSwitchTo(old_ctx); + MemoryContextReset(decompressor->per_compressed_row_ctx); } /* populate the relevent index in an array from a per_compressed_col.