Skip to content

Commit

Permalink
vectorized aggregation as separate plan node (#6784)
Browse files Browse the repository at this point in the history
This PR is a little too big, but it proved difficult to split into parts
because they are all dependent.

* Move the vectorized aggregation into a separate plan node, which
simplifies working with targetlist in DecompressChunk node.

* Add a post-planning hook that replaces the normal partial aggregation
node with the vectorized aggregation node. The advantage of this
compared to planning on Path stage is that we know which columns support
bulk decompression and which filters are vectorized.

* Use the compressed batch API in vectorized aggregation. This
simplifies the code.

* Support vectorized aggregation after vectorized filters.

* Add a simple generic interface for vectorized aggregate functions. For
now the only function is still `sum(int4)`.

* The parallel plans are now used more often, maybe because the old code
didn't add costs for aggregation and just used the costs from
DecompressChunk, so the costs of parallel plans were less different. The
current code does the cost-based planning for normal aggregates, and
then after planning replaces them with vectorized, so now we basically
follow the plan choice that Postgres makes for the usual aggregation.
  • Loading branch information
akuzm authored Apr 11, 2024
1 parent 610db31 commit 6e7b6e9
Show file tree
Hide file tree
Showing 41 changed files with 2,709 additions and 1,759 deletions.
8 changes: 3 additions & 5 deletions src/cross_module_fn.c
Original file line number Diff line number Diff line change
Expand Up @@ -124,11 +124,9 @@ job_execute_default_fn(BgwJob *job)
pg_unreachable();
}

static bool
push_down_aggregation(PlannerInfo *root, AggPath *aggregation_path, Path *subpath)
static void
tsl_postprocess_plan_stub(PlannedStmt *stmt)
{
/* Don't skip adding the agg node on top of the path */
return false;
}

static bool
Expand Down Expand Up @@ -322,7 +320,7 @@ TSDLLEXPORT CrossModuleFunctions ts_cm_functions_default = {
.policies_alter = error_no_default_fn_pg_community,
.policies_show = error_no_default_fn_pg_community,

.push_down_aggregation = push_down_aggregation,
.tsl_postprocess_plan = tsl_postprocess_plan_stub,

.partialize_agg = error_no_default_fn_pg_community,
.finalize_agg_sfunc = error_no_default_fn_pg_community,
Expand Down
2 changes: 1 addition & 1 deletion src/cross_module_fn.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ typedef struct CrossModuleFunctions
PGFunction move_chunk;

/* Vectorized queries */
bool (*push_down_aggregation)(PlannerInfo *root, AggPath *aggregation_path, Path *subpath);
void (*tsl_postprocess_plan)(PlannedStmt *stmt);

/* Continuous Aggregates */
PGFunction partialize_agg;
Expand Down
1 change: 1 addition & 0 deletions src/import/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
set(SOURCES
${CMAKE_CURRENT_SOURCE_DIR}/allpaths.c
${CMAKE_CURRENT_SOURCE_DIR}/ht_hypertable_modify.c
${CMAKE_CURRENT_SOURCE_DIR}/list.c
${CMAKE_CURRENT_SOURCE_DIR}/planner.c
${CMAKE_CURRENT_SOURCE_DIR}/ts_explain.c)

Expand Down
88 changes: 88 additions & 0 deletions src/import/list.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* This file and its contents are licensed under the Apache License 2.0.
* Please see the included NOTICE for copyright information and
* LICENSE-APACHE for a copy of the license.
*/

#include <postgres.h>

#include <nodes/pg_list.h>
#include <port/pg_bitutils.h>

#include "import/list.h"

/*
* This file contains source code that was copied and/or modified from
* the PostgreSQL database, which is licensed under the open-source
* PostgreSQL License. Please see the NOTICE at the top level
* directory for a copy of the PostgreSQL License.
*
* Copied from PostgreSQL 15.0 (2a7ce2e2ce474504a707ec03e128fde66cfb8b48)
* without modifications.
*/

/* Overhead for the fixed part of a List header, measured in ListCells */
#define LIST_HEADER_OVERHEAD ((int) ((offsetof(List, initial_elements) - 1) / sizeof(ListCell) + 1))

/*
* Return a freshly allocated List with room for at least min_size cells.
*
* Since empty non-NIL lists are invalid, new_list() sets the initial length
* to min_size, effectively marking that number of cells as valid; the caller
* is responsible for filling in their data.
*/
List *
ts_new_list(NodeTag type, int min_size)
{
List *newlist;
int max_size;

Assert(min_size > 0);

/*
* We allocate all the requested cells, and possibly some more, as part of
* the same palloc request as the List header. This is a big win for the
* typical case of short fixed-length lists. It can lose if we allocate a
* moderately long list and then it gets extended; we'll be wasting more
* initial_elements[] space than if we'd made the header small. However,
* rounding up the request as we do in the normal code path provides some
* defense against small extensions.
*/

#ifndef DEBUG_LIST_MEMORY_USAGE

/*
* Normally, we set up a list with some extra cells, to allow it to grow
* without a repalloc. Prefer cell counts chosen to make the total
* allocation a power-of-2, since palloc would round it up to that anyway.
* (That stops being true for very large allocations, but very long lists
* are infrequent, so it doesn't seem worth special logic for such cases.)
*
* The minimum allocation is 8 ListCell units, providing either 4 or 5
* available ListCells depending on the machine's word width. Counting
* palloc's overhead, this uses the same amount of space as a one-cell
* list did in the old implementation, and less space for any longer list.
*
* We needn't worry about integer overflow; no caller passes min_size
* that's more than twice the size of an existing list, so the size limits
* within palloc will ensure that we don't overflow here.
*/
max_size = pg_nextpower2_32(Max(8, min_size + LIST_HEADER_OVERHEAD));
max_size -= LIST_HEADER_OVERHEAD;
#else

/*
* For debugging, don't allow any extra space. This forces any cell
* addition to go through enlarge_list() and thus move the existing data.
*/
max_size = min_size;
#endif

newlist = (List *) palloc(offsetof(List, initial_elements) + max_size * sizeof(ListCell));
newlist->type = type;
newlist->length = min_size;
newlist->max_length = max_size;
newlist->elements = newlist->initial_elements;

return newlist;
}
17 changes: 17 additions & 0 deletions src/import/list.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* This file and its contents are licensed under the Apache License 2.0.
* Please see the included NOTICE for copyright information and
* LICENSE-APACHE for a copy of the license.
*/
#pragma once

#include "export.h"

/*
* This file contains source code that was copied and/or modified from
* the PostgreSQL database, which is licensed under the open-source
* PostgreSQL License. Please see the NOTICE at the top level
* directory for a copy of the PostgreSQL License.
*/

extern TSDLLEXPORT List *ts_new_list(NodeTag type, int min_size);
33 changes: 29 additions & 4 deletions src/nodes/chunk_append/planner.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "nodes/chunk_append/chunk_append.h"
#include "nodes/chunk_append/transform.h"
#include "nodes/hypertable_modify.h"
#include "nodes/vector_agg.h"
#include "import/planner.h"
#include "guc.h"

Expand Down Expand Up @@ -404,11 +405,35 @@ ts_chunk_append_get_scan_plan(Plan *plan)
return (Scan *) plan;
break;
case T_CustomScan:
if (castNode(CustomScan, plan)->scan.scanrelid > 0)
{
CustomScan *custom = castNode(CustomScan, plan);
if (custom->scan.scanrelid > 0)
{
/*
* The custom plan node is a scan itself. This handles the
* DecompressChunk node.
*/
return (Scan *) plan;
else
return NULL;
break;
}

if (strcmp(custom->methods->CustomName, VECTOR_AGG_NODE_NAME) == 0)
{
/*
* This is a vectorized aggregation node, we have to recurse
* into its child, similar to the normal aggregation node.
*
* Unfortunately we have to hardcode the node name here, because
* we can't depend on the TSL library.
*/
return ts_chunk_append_get_scan_plan(linitial(custom->custom_plans));
}

/*
* This is some other unknown custom scan node, we can't recurse
* into it.
*/
return NULL;
}
case T_Agg:
if (plan->lefttree != NULL)
{
Expand Down
13 changes: 13 additions & 0 deletions src/nodes/vector_agg.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
/*
* This file and its contents are licensed under the Apache License 2.0.
* Please see the included NOTICE for copyright information and
* LICENSE-APACHE for a copy of the license.
*/
#pragma once

/*
* This file defines the node name of Vector Aggregation custom node, to be
* used in the Apache part of the Timescale extension. The node itself is in the
* the TSL part.
*/
#define VECTOR_AGG_NODE_NAME "VectorAgg"
18 changes: 2 additions & 16 deletions src/planner/partialize.c
Original file line number Diff line number Diff line change
Expand Up @@ -421,29 +421,15 @@ add_partially_aggregated_subpaths(PlannerInfo *root, Path *parent_path,
AggPath *agg_path =
create_sorted_partial_agg_path(root, subpath, chunktarget, d_num_groups, extra_data);

if (ts_cm_functions->push_down_aggregation(root, agg_path, subpath))
{
*sorted_paths = lappend(*sorted_paths, subpath);
}
else
{
*sorted_paths = lappend(*sorted_paths, (Path *) agg_path);
}
*sorted_paths = lappend(*sorted_paths, (Path *) agg_path);
}

if (can_hash)
{
AggPath *agg_path =
create_hashed_partial_agg_path(root, subpath, chunktarget, d_num_groups, extra_data);

if (ts_cm_functions->push_down_aggregation(root, agg_path, subpath))
{
*hashed_paths = lappend(*hashed_paths, subpath);
}
else
{
*hashed_paths = lappend(*hashed_paths, (Path *) agg_path);
}
*hashed_paths = lappend(*hashed_paths, (Path *) agg_path);
}
}

Expand Down
2 changes: 2 additions & 0 deletions src/planner/planner.c
Original file line number Diff line number Diff line change
Expand Up @@ -580,6 +580,8 @@ timescaledb_planner(Query *parse, const char *query_string, int cursor_opts,
AGGSPLITOP_SERIALIZE | AGGSPLITOP_SKIPFINAL;
}
}

ts_cm_functions->tsl_postprocess_plan(stmt);
}

if (reset_baserel_info)
Expand Down
1 change: 0 additions & 1 deletion tsl/src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ set(SOURCES
chunk_api.c
chunk.c
init.c
partialize_agg.c
partialize_finalize.c
planner.c
process_utility.c
Expand Down
25 changes: 13 additions & 12 deletions tsl/src/compression/array.c
Original file line number Diff line number Diff line change
Expand Up @@ -571,7 +571,20 @@ text_array_decompress_all_serialized_no_header(StringInfo si, bool has_nulls,

const int validity_bitmap_bytes = sizeof(uint64) * (pad_to_multiple(64, n_total) / 64);
uint64 *restrict validity_bitmap = MemoryContextAlloc(dest_mctx, validity_bitmap_bytes);

/*
* First, mark all data as valid, we will fill the nulls later if needed.
* Note that the validity bitmap size is a multiple of 64 bits. We have to
* fill the tail bits with zeros, because the corresponding elements are not
* valid.
*
*/
memset(validity_bitmap, 0xFF, validity_bitmap_bytes);
if (n_total % 64)
{
const uint64 tail_mask = -1ULL >> (64 - n_total % 64);
validity_bitmap[n_total / 64] &= tail_mask;
}

if (has_nulls)
{
Expand Down Expand Up @@ -613,18 +626,6 @@ text_array_decompress_all_serialized_no_header(StringInfo si, bool has_nulls,

Assert(current_notnull_element == -1);
}
else
{
/*
* The validity bitmap size is a multiple of 64 bits. Fill the tail bits
* with zeros, because the corresponding elements are not valid.
*/
if (n_total % 64)
{
const uint64 tail_mask = -1ULL >> (64 - n_total % 64);
validity_bitmap[n_total / 64] &= tail_mask;
}
}

ArrowArray *result = MemoryContextAllocZero(dest_mctx, sizeof(ArrowArray) + sizeof(void *) * 3);
const void **buffers = (const void **) &result[1];
Expand Down
19 changes: 19 additions & 0 deletions tsl/src/compression/arrow_c_data_interface.h
Original file line number Diff line number Diff line change
Expand Up @@ -160,3 +160,22 @@ pad_to_multiple(uint64 pad_to, uint64 source_value)
{
return ((source_value + pad_to - 1) / pad_to) * pad_to;
}

static inline size_t
arrow_num_valid(uint64 *bitmap, size_t total_rows)
{
uint64 num_valid = 0;
#ifdef HAVE__BUILTIN_POPCOUNT
const uint64 words = pad_to_multiple(64, total_rows) / 64;
for (uint64 i = 0; i < words; i++)
{
num_valid += __builtin_popcountll(bitmap[i]);
}
#else
for (size_t i = 0; i < total_rows; i++)
{
num_valid += arrow_row_is_valid(bitmap, i);
}
#endif
return num_valid;
}
33 changes: 12 additions & 21 deletions tsl/src/compression/deltadelta_impl.c
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,19 @@ FUNCTION_NAME(delta_delta_decompress_all, ELEMENT_TYPE)(Datum compressed, Memory
}
#undef INNER_LOOP_SIZE

/* All data valid by default, we will fill in the nulls later. */
/*
* First, mark all data as valid, we will fill the nulls later if needed.
* Note that the validity bitmap size is a multiple of 64 bits. We have to
* fill the tail bits with zeros, because the corresponding elements are not
* valid.
*
*/
memset(validity_bitmap, 0xFF, validity_bitmap_bytes);
if (n_total % 64)
{
const uint64 tail_mask = -1ULL >> (64 - n_total % 64);
validity_bitmap[n_total / 64] &= tail_mask;
}

/* Now move the data to account for nulls, and fill the validity bitmap. */
if (has_nulls)
Expand Down Expand Up @@ -122,26 +133,6 @@ FUNCTION_NAME(delta_delta_decompress_all, ELEMENT_TYPE)(Datum compressed, Memory

Assert(current_notnull_element == -1);
}
else
{
/*
* The validity bitmap size is a multiple of 64 bits. Fill the tail bits
* with zeros, because the corresponding elements are not valid.
*/
if (n_total % 64)
{
const uint64 tail_mask = -1ULL >> (64 - n_total % 64);
validity_bitmap[n_total / 64] &= tail_mask;

#ifdef USE_ASSERT_CHECKING
for (uint32 i = 0; i < 64; i++)
{
Assert(arrow_row_is_valid(validity_bitmap, (n_total / 64) * 64 + i) ==
(i < n_total % 64));
}
#endif
}
}

/* Return the result. */
ArrowArray *result = MemoryContextAllocZero(dest_mctx, sizeof(ArrowArray) + sizeof(void *) * 2);
Expand Down
Loading

0 comments on commit 6e7b6e9

Please sign in to comment.