Skip to content

Commit

Permalink
Support transparent decompression on individual chunks
Browse files Browse the repository at this point in the history
This patch adds support for transparent decompression in queries
on individual chunks.
This is required for distributed hypertables with compression
when enable_per_data_node_queries is set to false. Without
this functionality queries on distributed hypertables with
compression would not return data for compressed chunks as
the generated FDW queries would target individual chunks.

Fixes #3714
  • Loading branch information
svenklemm committed Oct 20, 2021
1 parent 8862081 commit acc6abe
Show file tree
Hide file tree
Showing 16 changed files with 3,825 additions and 71 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/linux-32bit-build-and-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@ jobs:
build_type: [ Debug ]
include:
- pg: "12.8"
ignores: append-12 debug_notice transparent_decompression-12 plan_skip_scan-12 pg_dump
ignores: append-12 debug_notice transparent_decompression-12 transparent_decompress_chunk-12 plan_skip_scan-12 pg_dump
pg_major: 12
- pg: "13.4"
ignores: append-13 debug_notice transparent_decompression-13 pg_dump
ignores: append-13 debug_notice transparent_decompression-13 transparent_decompress_chunk-13 pg_dump
pg_major: 13
- pg: "14.0"
ignores: append-14 debug_notice transparent_decompression-14 pg_dump
ignores: append-14 debug_notice transparent_decompression-14 transparent_decompress_chunk-14 pg_dump
pg_major: 14

steps:
Expand Down
28 changes: 22 additions & 6 deletions src/planner.c
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,8 @@ rte_mark_for_expansion(RangeTblEntry *rte)
rte->inh = false;
}

static bool
rte_is_marked_for_expansion(const RangeTblEntry *rte)
bool
ts_rte_is_marked_for_expansion(const RangeTblEntry *rte)
{
if (NULL == rte->ctename)
return false;
Expand Down Expand Up @@ -240,7 +240,7 @@ preprocess_query(Node *node, Query *rootquery)
/* This lookup will warm the cache with all hypertables in the query */
ht = ts_hypertable_cache_get_entry(hcache, rte->relid, CACHE_FLAG_MISSING_OK);

if (NULL != ht)
if (ht)
{
/* Mark hypertable RTEs we'd like to expand ourselves */
if (ts_guc_enable_optimizations && ts_guc_enable_constraint_exclusion &&
Expand All @@ -258,6 +258,22 @@ preprocess_query(Node *node, Query *rootquery)
Assert(ht != NULL);
}
}
else
{
/* To properly keep track of SELECT FROM ONLY <chunk> we
* have to mark the rte here because postgres will set
* rte->inh to false (when it detects the chunk has no
* children which is true for all our chunks) before it
* reaches set_rel_pathlist hook. But chunks from queries
* like SELECT .. FROM ONLY <chunk> has rte->inh set to
* false and other chunks have rte->inh set to true.
* We want to distinguish between the two cases here by
* marking the chunk when rte->inh is true.
*/
Chunk *chunk = ts_chunk_get_by_relid(rte->relid, false);
if (chunk && rte->inh)
rte_mark_for_expansion(rte);
}
break;
default:
break;
Expand Down Expand Up @@ -563,7 +579,7 @@ rte_should_expand(const RangeTblEntry *rte)
{
bool is_hypertable = ts_rte_is_hypertable(rte, NULL);

return is_hypertable && !rte->inh && rte_is_marked_for_expansion(rte);
return is_hypertable && !rte->inh && ts_rte_is_marked_for_expansion(rte);
}

static void
Expand Down Expand Up @@ -770,7 +786,7 @@ timescaledb_set_rel_pathlist(PlannerInfo *root, RelOptInfo *rel, Index rti, Rang
reltype = classify_relation(root, rel, &ht);

/* Check for unexpanded hypertable */
if (!rte->inh && rte_is_marked_for_expansion(rte))
if (!rte->inh && ts_rte_is_marked_for_expansion(rte))
reenable_inheritance(root, rel, rti, rte);

/* Call other extensions. Do it after table expansion. */
Expand Down Expand Up @@ -910,7 +926,7 @@ join_involves_hypertable(const PlannerInfo *root, const RelOptInfo *rel)
/* This might give a false positive for chunks in case of PostgreSQL
* expansion since the ctename is copied from the parent hypertable
* to the chunk */
return rte_is_marked_for_expansion(rte);
return ts_rte_is_marked_for_expansion(rte);
}
return false;
}
Expand Down
1 change: 1 addition & 0 deletions src/planner.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ typedef struct TimescaleDBPrivate
} TimescaleDBPrivate;

extern TSDLLEXPORT bool ts_rte_is_hypertable(const RangeTblEntry *rte, bool *isdistributed);
extern TSDLLEXPORT bool ts_rte_is_marked_for_expansion(const RangeTblEntry *rte);

static inline TimescaleDBPrivate *
ts_create_private_reloptinfo(RelOptInfo *rel)
Expand Down
87 changes: 39 additions & 48 deletions tsl/src/nodes/decompress_chunk/decompress_chunk.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
#include <utils/lsyscache.h>
#include <utils/typcache.h>

#include <planner.h>

#include "hypertable_compression.h"
#include "import/planner.h"
#include "compression/create.h"
Expand Down Expand Up @@ -279,8 +281,18 @@ build_compressioninfo(PlannerInfo *root, Hypertable *ht, RelOptInfo *chunk_rel)
info->chunk_rel = chunk_rel;
info->chunk_rte = planner_rt_fetch(chunk_rel->relid, root);

appinfo = ts_get_appendrelinfo(root, chunk_rel->relid, false);
info->ht_rte = planner_rt_fetch(appinfo->parent_relid, root);
if (chunk_rel->reloptkind == RELOPT_OTHER_MEMBER_REL)
{
appinfo = ts_get_appendrelinfo(root, chunk_rel->relid, false);
info->ht_rte = planner_rt_fetch(appinfo->parent_relid, root);
}
else
{
Assert(chunk_rel->reloptkind == RELOPT_BASEREL);
info->single_chunk = true;
info->ht_rte = info->chunk_rte;
}

info->hypertable_id = ht->fd.id;

info->hypertable_compression_info = ts_hypertable_compression_get(ht->fd.id);
Expand Down Expand Up @@ -325,27 +337,25 @@ ts_decompress_chunk_generate_paths(PlannerInfo *root, RelOptInfo *chunk_rel, Hyp
Chunk *chunk)
{
RelOptInfo *compressed_rel;
RelOptInfo *hypertable_rel;
ListCell *lc;
double new_row_estimate;
Index ht_relid = 0;

CompressionInfo *info = build_compressioninfo(root, ht, chunk_rel);
Index ht_index;

/* double check we don't end up here on single chunk queries with ONLY */
Assert(info->chunk_rel->reloptkind == RELOPT_OTHER_MEMBER_REL ||
(info->chunk_rel->reloptkind == RELOPT_BASEREL &&
ts_rte_is_marked_for_expansion(info->chunk_rte)));

/*
* since we rely on parallel coordination from the scan below
* this node it is probably not beneficial to have more
* than a single worker per chunk
*/
int parallel_workers = 1;
AppendRelInfo *chunk_info = ts_get_appendrelinfo(root, chunk_rel->relid, false);
SortInfo sort_info = build_sortinfo(chunk, chunk_rel, info, root->query_pathkeys);

Assert(chunk_info != NULL);
Assert(chunk_info->parent_reloid == ht->main_table_relid);
ht_index = chunk_info->parent_relid;
hypertable_rel = root->simple_rel_array[ht_index];

Assert(chunk->fd.compressed_chunk_id > 0);

chunk_rel->pathlist = NIL;
Expand All @@ -360,8 +370,17 @@ ts_decompress_chunk_generate_paths(PlannerInfo *root, RelOptInfo *chunk_rel, Hyp
pushdown_quals(root, chunk_rel, compressed_rel, info->hypertable_compression_info);
set_baserel_size_estimates(root, compressed_rel);
new_row_estimate = compressed_rel->rows * DECOMPRESS_CHUNK_BATCH_SIZE;
/* adjust the parent's estimate by the diff of new and old estimate */
hypertable_rel->rows += (new_row_estimate - chunk_rel->rows);

if (!info->single_chunk)
{
/* adjust the parent's estimate by the diff of new and old estimate */
AppendRelInfo *chunk_info = ts_get_appendrelinfo(root, chunk_rel->relid, false);
Assert(chunk_info->parent_reloid == ht->main_table_relid);
ht_relid = chunk_info->parent_relid;
RelOptInfo *hypertable_rel = root->simple_rel_array[ht_relid];
hypertable_rel->rows += (new_row_estimate - chunk_rel->rows);
}

chunk_rel->rows = new_row_estimate;
create_compressed_scan_paths(root,
compressed_rel,
Expand All @@ -370,7 +389,10 @@ ts_decompress_chunk_generate_paths(PlannerInfo *root, RelOptInfo *chunk_rel, Hyp
&sort_info);

/* compute parent relids of the chunk and use it to filter paths*/
Relids parent_relids = find_childrel_parents(root, chunk_rel);
Relids parent_relids = NULL;
if (!info->single_chunk)
parent_relids = find_childrel_parents(root, chunk_rel);

/* create non-parallel paths */
foreach (lc, compressed_rel->pathlist)
{
Expand Down Expand Up @@ -455,7 +477,8 @@ ts_decompress_chunk_generate_paths(PlannerInfo *root, RelOptInfo *chunk_rel, Hyp
DecompressChunkPath *path;
if (child_path->param_info != NULL &&
(bms_is_member(chunk_rel->relid, child_path->param_info->ppi_req_outer) ||
bms_is_member(ht_index, child_path->param_info->ppi_req_outer)))
(!info->single_chunk &&
bms_is_member(ht_relid, child_path->param_info->ppi_req_outer))))
continue;
path = decompress_chunk_path_create(root, info, parallel_workers, child_path);
add_partial_path(chunk_rel, &path->cpath.path);
Expand Down Expand Up @@ -920,20 +943,7 @@ decompress_chunk_add_plannerinfo(PlannerInfo *root, CompressionInfo *info, Chunk
Oid compressed_relid = compressed_chunk->table_id;
RelOptInfo *compressed_rel;

/* repalloc() does not work with NULL argument */
Assert(root->simple_rel_array);
Assert(root->simple_rte_array);
Assert(root->append_rel_array);

root->simple_rel_array_size++;
root->simple_rel_array =
repalloc(root->simple_rel_array, root->simple_rel_array_size * sizeof(RelOptInfo *));
root->simple_rte_array =
repalloc(root->simple_rte_array, root->simple_rel_array_size * sizeof(RangeTblEntry *));
root->append_rel_array =
repalloc(root->append_rel_array, root->simple_rel_array_size * sizeof(AppendRelInfo *));
root->append_rel_array[compressed_index] = NULL;

expand_planner_arrays(root, 1);
info->compressed_rte = decompress_chunk_make_rte(compressed_relid, AccessShareLock);
root->simple_rte_array[compressed_index] = info->compressed_rte;

Expand All @@ -948,7 +958,7 @@ decompress_chunk_add_plannerinfo(PlannerInfo *root, CompressionInfo *info, Chunk
* in generate_join_implied_equalities (called by
* get_baserel_parampathinfo <- create_index_paths)
*/
Assert(chunk_rel->top_parent_relids != NULL);
Assert(info->single_chunk || chunk_rel->top_parent_relids != NULL);
compressed_rel->top_parent_relids = bms_copy(chunk_rel->top_parent_relids);

root->simple_rel_array[compressed_index] = compressed_rel;
Expand Down Expand Up @@ -1132,25 +1142,6 @@ get_column_compressioninfo(List *hypertable_compression_info, char *column_name)
pg_unreachable();
}

/*
* find matching column attno for compressed chunk based on hypertable attno
*
* since we dont want aliasing to interfere we lookup directly in catalog
* instead of using RangeTblEntry
*/
AttrNumber
get_compressed_attno(CompressionInfo *info, AttrNumber ht_attno)
{
AttrNumber compressed_attno;
char *chunk_col = get_attname(info->ht_rte->relid, ht_attno, false);
compressed_attno = get_attnum(info->compressed_rte->relid, chunk_col);

if (compressed_attno == InvalidAttrNumber)
elog(ERROR, "No matching column in compressed chunk found.");

return compressed_attno;
}

/*
* Find toplevel equality constraints of segmentby columns in baserestrictinfo
*
Expand Down
3 changes: 2 additions & 1 deletion tsl/src/nodes/decompress_chunk/decompress_chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ typedef struct CompressionInfo
/* compressed chunk attribute numbers for columns that are compressed */
Bitmapset *compressed_chunk_compressed_attnos;

bool single_chunk; /* query on explicit chunk */

} CompressionInfo;

typedef struct DecompressChunkPath
Expand All @@ -57,6 +59,5 @@ void ts_decompress_chunk_generate_paths(PlannerInfo *root, RelOptInfo *rel, Hype

FormData_hypertable_compression *get_column_compressioninfo(List *hypertable_compression_info,
char *column_name);
AttrNumber get_compressed_attno(CompressionInfo *info, AttrNumber chunk_attno);

#endif /* TIMESCALEDB_DECOMPRESS_CHUNK_H */
20 changes: 20 additions & 0 deletions tsl/src/nodes/decompress_chunk/planner.c
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,26 @@ make_compressed_scan_meta_targetentry(DecompressChunkPath *path, char *column_na
return makeTargetEntry((Expr *) scan_var, tle_index, NULL, false);
}

/*
* Find matching column attno for compressed chunk based on hypertable attno.
*
* Since we dont want aliasing to interfere we lookup directly in catalog
* instead of using RangeTblEntry.
*/
static AttrNumber
get_compressed_attno(CompressionInfo *info, AttrNumber ht_attno)
{
AttrNumber compressed_attno;
Assert(info->ht_rte);
char *chunk_col = get_attname(info->ht_rte->relid, ht_attno, false);
compressed_attno = get_attnum(info->compressed_rte->relid, chunk_col);

if (compressed_attno == InvalidAttrNumber)
elog(ERROR, "no matching column in compressed chunk found");

return compressed_attno;
}

static TargetEntry *
make_compressed_scan_targetentry(DecompressChunkPath *path, AttrNumber ht_attno, int tle_index)
{
Expand Down
16 changes: 13 additions & 3 deletions tsl/src/planner.c
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,19 @@ void
tsl_set_rel_pathlist_query(PlannerInfo *root, RelOptInfo *rel, Index rti, RangeTblEntry *rte,
Hypertable *ht)
{
if (ts_guc_enable_transparent_decompression && ht != NULL &&
rel->reloptkind == RELOPT_OTHER_MEMBER_REL && TS_HYPERTABLE_HAS_COMPRESSION_TABLE(ht) &&
rel->fdw_private != NULL && ((TimescaleDBPrivate *) rel->fdw_private)->compressed)
/* We can get here via query on hypertable in that case reloptkind
* will be RELOPT_OTHER_MEMBER_REL or via direct query on chunk
* in that case reloptkind will be RELOPT_BASEREL.
* If we get here via SELECT * FROM <chunk>, we decompress the chunk,
* unless the query was SELECT * FROM ONLY <chunk>.
* We check if it is the ONLY case by calling ts_rte_is_marked_for_expansion.
* Respecting ONLY here is important to not break postgres tools like pg_dump.
*/
if (ts_guc_enable_transparent_decompression && ht &&
(rel->reloptkind == RELOPT_OTHER_MEMBER_REL ||
(rel->reloptkind == RELOPT_BASEREL && ts_rte_is_marked_for_expansion(rte))) &&
TS_HYPERTABLE_HAS_COMPRESSION_TABLE(ht) && rel->fdw_private != NULL &&
((TimescaleDBPrivate *) rel->fdw_private)->compressed)
{
Chunk *chunk = ts_chunk_get_by_relid(rte->relid, true);

Expand Down
26 changes: 24 additions & 2 deletions tsl/test/expected/dist_compression.out
Original file line number Diff line number Diff line change
Expand Up @@ -878,6 +878,15 @@ SELECT count(*) from test_recomp_int;
9
(1 row)

-- check with per datanode queries disabled
SET timescaledb.enable_per_data_node_queries TO false;
SELECT count(*) from test_recomp_int;
count
-------
9
(1 row)

RESET timescaledb.enable_per_data_node_queries;
SELECT * from test_recomp_int_chunk_status ORDER BY 1;
chunk_name | chunk_status
------------------------+--------------
Expand Down Expand Up @@ -936,16 +945,29 @@ SELECT * from test_recomp_int_chunk_status ORDER BY 1;
_dist_hyper_4_16_chunk | 3
(3 rows)

SELECT time_bucket(20, time ), count(*)
SELECT time_bucket(20, time), count(*)
FROM test_recomp_int
GROUP BY time_bucket( 20, time) ORDER BY 1;
GROUP BY time_bucket(20, time) ORDER BY 1;
time_bucket | count
-------------+-------
0 | 14
60 | 3
100 | 5
(3 rows)

-- check with per datanode queries disabled
SET timescaledb.enable_per_data_node_queries TO false;
SELECT time_bucket(20, time), count(*)
FROM test_recomp_int
GROUP BY time_bucket(20, time) ORDER BY 1;
time_bucket | count
-------------+-------
0 | 14
60 | 3
100 | 5
(3 rows)

RESET timescaledb.enable_per_data_node_queries;
--check compression_status afterwards--
SELECT recompress_chunk(chunk, true) FROM
( SELECT chunk FROM show_chunks('test_recomp_int') AS chunk ORDER BY chunk LIMIT 2)q;
Expand Down
9 changes: 7 additions & 2 deletions tsl/test/isolation/expected/compression_ddl.out
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,15 @@ _timescaledb_internal._hyper_1_1_chunk
(1 row)

step Cc: COMMIT;
step SC1: SELECT count(*) from _timescaledb_internal._hyper_1_1_chunk;
step SC1: SELECT count(*) AS only FROM ONLY _timescaledb_internal._hyper_1_1_chunk; SELECT count(*) FROM _timescaledb_internal._hyper_1_1_chunk;
only
----
0
(1 row)

count
-----
0
11
(1 row)

step S1: SELECT count(*) from ts_device_table;
Expand Down
Loading

0 comments on commit acc6abe

Please sign in to comment.