Skip to content

Commit

Permalink
Decouple batch array from DecompressChunk
Browse files Browse the repository at this point in the history
Refactor DecompressChunk to make it more modular. This is the first
step of a bigger refactor that aims to create better separation of
concerns across the code that implements transparent decompression.

Currently, the code is only semi-modular, and all state is kept
directly in the DecompressChunk scan node. As a result, this object
has to be passed into all the related modules, which makes it harder
to call that code from other places without a reference to the "big"
scan node.

The goal of this change (and upcoming changes) is to make each module
hold its own state relevant to that module. DecompressChunk can then
be composed of these distinct modules instead of being a monolith.
  • Loading branch information
erimatnor committed Nov 28, 2023
1 parent 645727b commit 4e8581f
Show file tree
Hide file tree
Showing 8 changed files with 164 additions and 157 deletions.
94 changes: 48 additions & 46 deletions tsl/src/nodes/decompress_chunk/batch_array.c
Original file line number Diff line number Diff line change
Expand Up @@ -4,38 +4,37 @@
* LICENSE-TIMESCALE for a copy of the license.
*/

#include <postgres.h>
#include <nodes/bitmapset.h>

#include "compression/compression.h"
#include "nodes/decompress_chunk/batch_array.h"
#include "nodes/decompress_chunk/exec.h"

#include "nodes/decompress_chunk/compressed_batch.h"
/*
* Create states to hold information for up to n batches.
*/
void
batch_array_create(DecompressChunkState *chunk_state, int nbatches)
batch_array_init(BatchArray *array, int nbatches, int ncolumns_per_batch,
Size memory_context_block_size_bytes)
{
Assert(nbatches >= 0);

chunk_state->n_batch_states = nbatches;
chunk_state->batch_states = palloc0(chunk_state->n_batch_state_bytes * nbatches);

chunk_state->unused_batch_states = bms_add_range(NULL, 0, nbatches - 1);

Assert(bms_num_members(chunk_state->unused_batch_states) == chunk_state->n_batch_states);
array->n_batch_states = nbatches;
array->n_columns_per_batch = ncolumns_per_batch;
array->unused_batch_states = bms_add_range(NULL, 0, nbatches - 1);
array->batch_memory_context_bytes = memory_context_block_size_bytes;
array->n_batch_state_bytes =
sizeof(DecompressBatchState) + sizeof(CompressedColumnValues) * ncolumns_per_batch;
array->batch_states = palloc0(array->n_batch_state_bytes * nbatches);
Assert(bms_num_members(array->unused_batch_states) == array->n_batch_states);
}

/*
* Destroy batch states.
*/
void
batch_array_destroy(DecompressChunkState *chunk_state)
batch_array_destroy(BatchArray *array)
{
for (int i = 0; i < chunk_state->n_batch_states; i++)
for (int i = 0; i < array->n_batch_states; i++)
{
DecompressBatchState *batch_state = batch_array_get_at(chunk_state, i);
DecompressBatchState *batch_state = batch_array_get_at(array, i);
Assert(batch_state != NULL);

if (batch_state->compressed_slot != NULL)
Expand All @@ -45,50 +44,46 @@ batch_array_destroy(DecompressChunkState *chunk_state)
ExecDropSingleTupleTableSlot(batch_state->decompressed_scan_slot);
}

pfree(chunk_state->batch_states);
chunk_state->batch_states = NULL;
pfree(array->batch_states);
array->batch_states = NULL;
}

/*
* Enhance the capacity of existing batch states.
*/
static void
batch_array_enlarge(DecompressChunkState *chunk_state, int new_number)
batch_array_enlarge(BatchArray *array, int new_number)
{
Assert(new_number > chunk_state->n_batch_states);
Assert(new_number > array->n_batch_states);

/* Request additional memory */
chunk_state->batch_states =
repalloc(chunk_state->batch_states, chunk_state->n_batch_state_bytes * new_number);
array->batch_states = repalloc(array->batch_states, array->n_batch_state_bytes * new_number);

/* Zero out the tail. The batch states are initialized on first use. */
memset(((char *) chunk_state->batch_states) +
chunk_state->n_batch_state_bytes * chunk_state->n_batch_states,
memset(((char *) array->batch_states) + array->n_batch_state_bytes * array->n_batch_states,
0x0,
chunk_state->n_batch_state_bytes * (new_number - chunk_state->n_batch_states));
array->n_batch_state_bytes * (new_number - array->n_batch_states));

/* Register the new states as unused */
chunk_state->unused_batch_states = bms_add_range(chunk_state->unused_batch_states,
chunk_state->n_batch_states,
new_number - 1);
array->unused_batch_states =
bms_add_range(array->unused_batch_states, array->n_batch_states, new_number - 1);

Assert(bms_num_members(chunk_state->unused_batch_states) ==
new_number - chunk_state->n_batch_states);
Assert(bms_num_members(array->unused_batch_states) == new_number - array->n_batch_states);

/* Update number of available batch states */
chunk_state->n_batch_states = new_number;
array->n_batch_states = new_number;
}

/*
* Mark a DecompressBatchState as unused
*/
void
batch_array_free_at(DecompressChunkState *chunk_state, int batch_index)
batch_array_clear_at(BatchArray *array, int batch_index)
{
Assert(batch_index >= 0);
Assert(batch_index < chunk_state->n_batch_states);
Assert(batch_index < array->n_batch_states);

DecompressBatchState *batch_state = batch_array_get_at(chunk_state, batch_index);
DecompressBatchState *batch_state = batch_array_get_at(array, batch_index);

/* Reset batch state */
batch_state->total_batch_rows = 0;
Expand All @@ -102,29 +97,36 @@ batch_array_free_at(DecompressChunkState *chunk_state, int batch_index)
MemoryContextReset(batch_state->per_batch_context);
}

chunk_state->unused_batch_states =
bms_add_member(chunk_state->unused_batch_states, batch_index);
array->unused_batch_states = bms_add_member(array->unused_batch_states, batch_index);
}

void
batch_array_clear_all(BatchArray *array)
{
for (int i = 0; i < array->n_batch_states; i++)
batch_array_clear_at(array, i);

Assert(bms_num_members(array->unused_batch_states) == array->n_batch_states);
}

/*
* Get the next free and unused batch state and mark as used
*/
int
batch_array_get_free_slot(DecompressChunkState *chunk_state)
batch_array_get_unused_slot(BatchArray *array)
{
if (bms_is_empty(chunk_state->unused_batch_states))
batch_array_enlarge(chunk_state, chunk_state->n_batch_states * 2);
if (bms_is_empty(array->unused_batch_states))
batch_array_enlarge(array, array->n_batch_states * 2);

Assert(!bms_is_empty(chunk_state->unused_batch_states));
Assert(!bms_is_empty(array->unused_batch_states));

int next_free_batch = bms_next_member(chunk_state->unused_batch_states, -1);
int next_unused_batch = bms_next_member(array->unused_batch_states, -1);

Assert(next_free_batch >= 0);
Assert(next_free_batch < chunk_state->n_batch_states);
Assert(TupIsNull(batch_array_get_at(chunk_state, next_free_batch)->decompressed_scan_slot));
Assert(next_unused_batch >= 0);
Assert(next_unused_batch < array->n_batch_states);
Assert(TupIsNull(batch_array_get_at(array, next_unused_batch)->decompressed_scan_slot));

chunk_state->unused_batch_states =
bms_del_member(chunk_state->unused_batch_states, next_free_batch);
array->unused_batch_states = bms_del_member(array->unused_batch_states, next_unused_batch);

return next_free_batch;
return next_unused_batch;
}
46 changes: 34 additions & 12 deletions tsl/src/nodes/decompress_chunk/batch_array.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,35 +3,57 @@
* Please see the included NOTICE for copyright information and
* LICENSE-TIMESCALE for a copy of the license.
*/

#pragma once

#include "compression/compression.h"
#include "nodes/decompress_chunk/compressed_batch.h"
#include "nodes/decompress_chunk/exec.h"
#include <postgres.h>
#include <nodes/bitmapset.h>
#include <stdbool.h>

/* The value for an invalid batch id */
#define INVALID_BATCH_ID -1

typedef struct BatchArray
{
/* Batch states */
int n_batch_states; /* Number of batch states */
/*
* The batch states. It's void* because they have a variable length
* column array, so normal indexing can't be used. Use the batch_array_get_at
* accessor instead.
*/
void *batch_states;
int n_batch_state_bytes;
int n_columns_per_batch;
Bitmapset *unused_batch_states; /* The unused batch states */
int batch_memory_context_bytes;
} BatchArray;
/*
* Create states to hold information for up to n batches
*/
void batch_array_create(DecompressChunkState *chunk_state, int nbatches);
void batch_array_init(BatchArray *array, int nbatches, int ncolumns_per_batch,
Size memory_context_block_size_bytes);

void batch_array_destroy(DecompressChunkState *chunk_state);
void batch_array_destroy(BatchArray *array);

extern int batch_array_get_free_slot(DecompressChunkState *chunk_state);
extern int batch_array_get_unused_slot(BatchArray *array);

inline static DecompressBatchState *
batch_array_get_at(DecompressChunkState *chunk_state, int batch_index)
inline static struct DecompressBatchState *
batch_array_get_at(const BatchArray *array, int batch_index)
{
/*
* Since we're accessing batch states through a "char" pointer, use
* "restrict" to tell the compiler that it doesn't alias with anything.
* Might be important in hot loops.
*/
return (DecompressBatchState *) ((char *restrict) chunk_state->batch_states +
chunk_state->n_batch_state_bytes * batch_index);
return (struct DecompressBatchState *) ((char *restrict) array->batch_states +
array->n_batch_state_bytes * batch_index);
}

extern void batch_array_free_at(DecompressChunkState *chunk_state, int batch_index);
extern void batch_array_clear_at(BatchArray *array, int batch_index);
extern void batch_array_clear_all(BatchArray *array);

inline static bool
batch_array_has_active_batches(const BatchArray *array)
{
return bms_num_members(array->unused_batch_states) != array->n_batch_states;
}
20 changes: 12 additions & 8 deletions tsl/src/nodes/decompress_chunk/batch_queue_fifo.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,25 +14,28 @@
inline static void
batch_queue_fifo_create(DecompressChunkState *chunk_state)
{
batch_array_create(chunk_state, 1);
batch_array_init(&chunk_state->batch_array,
1,
chunk_state->num_compressed_columns,
chunk_state->batch_memory_context_bytes);
}

inline static void
batch_queue_fifo_free(DecompressChunkState *chunk_state)
{
batch_array_destroy(chunk_state);
batch_array_destroy(&chunk_state->batch_array);
}

inline static bool
batch_queue_fifo_needs_next_batch(DecompressChunkState *chunk_state)
{
return TupIsNull(batch_array_get_at(chunk_state, 0)->decompressed_scan_slot);
return TupIsNull(batch_array_get_at(&chunk_state->batch_array, 0)->decompressed_scan_slot);
}

inline static void
batch_queue_fifo_pop(DecompressChunkState *chunk_state)
{
DecompressBatchState *batch_state = batch_array_get_at(chunk_state, 0);
DecompressBatchState *batch_state = batch_array_get_at(&chunk_state->batch_array, 0);
if (TupIsNull(batch_state->decompressed_scan_slot))
{
/* Allow this function to be called on the initial empty queue. */
Expand All @@ -45,20 +48,21 @@ batch_queue_fifo_pop(DecompressChunkState *chunk_state)
inline static void
batch_queue_fifo_push_batch(DecompressChunkState *chunk_state, TupleTableSlot *compressed_slot)
{
DecompressBatchState *batch_state = batch_array_get_at(chunk_state, 0);
Assert(TupIsNull(batch_array_get_at(chunk_state, 0)->decompressed_scan_slot));
BatchArray *batch_array = &chunk_state->batch_array;
DecompressBatchState *batch_state = batch_array_get_at(batch_array, 0);
Assert(TupIsNull(batch_array_get_at(batch_array, 0)->decompressed_scan_slot));
compressed_batch_set_compressed_tuple(chunk_state, batch_state, compressed_slot);
compressed_batch_advance(chunk_state, batch_state);
}

inline static void
batch_queue_fifo_reset(DecompressChunkState *chunk_state)
{
batch_array_free_at(chunk_state, 0);
batch_array_clear_at(&chunk_state->batch_array, 0);
}

inline static TupleTableSlot *
batch_queue_fifo_top_tuple(DecompressChunkState *chunk_state)
{
return batch_array_get_at(chunk_state, 0)->decompressed_scan_slot;
return batch_array_get_at(&chunk_state->batch_array, 0)->decompressed_scan_slot;
}
Loading

0 comments on commit 4e8581f

Please sign in to comment.