Skip to content

Commit

Permalink
Support cagg invalidation trigger for inserts into compressed chunks
Browse files Browse the repository at this point in the history
After row triggers do not work when we insert into a compressed chunk.
This causes a problem for caggs as invalidations are not recorded.
Explicitly call the function to record invalidations when we
insert into a compressed chunk (if the hypertable has caggs
defined on it)

Fixes #3410.
  • Loading branch information
gayyappan committed Oct 21, 2021
1 parent c2e4f31 commit b7520b8
Show file tree
Hide file tree
Showing 11 changed files with 287 additions and 25 deletions.
10 changes: 10 additions & 0 deletions src/cross_module_fn.c
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,15 @@ continuous_agg_invalidate_all_default(const Hypertable *ht, int64 start, int64 e
pg_unreachable();
}

static void
continuous_agg_call_invalidation_trigger_default(int32 hypertable_id, Relation chunk_rel,
HeapTuple chunk_tuple, HeapTuple chunk_newtuple,
bool update)
{
error_no_default_fn_community();
pg_unreachable();
}

static Datum
empty_fn(PG_FUNCTION_ARGS)
{
Expand Down Expand Up @@ -347,6 +356,7 @@ TSDLLEXPORT CrossModuleFunctions ts_cm_functions_default = {
.continuous_agg_refresh_chunk = error_no_default_fn_pg_community,
.continuous_agg_invalidate = continuous_agg_invalidate_all_default,
.continuous_agg_update_options = continuous_agg_update_options_default,
.continuous_agg_call_invalidation_trigger = continuous_agg_call_invalidation_trigger_default,

/* compression */
.compressed_data_send = error_no_default_fn_pg_community,
Expand Down
3 changes: 3 additions & 0 deletions src/cross_module_fn.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ typedef struct CrossModuleFunctions
void (*continuous_agg_invalidate)(const Hypertable *ht, int64 start, int64 end);
void (*continuous_agg_update_options)(ContinuousAgg *cagg,
WithClauseResult *with_clause_options);
void (*continuous_agg_call_invalidation_trigger)(int32 hypertable_id, Relation chunk_rel,
HeapTuple chunk_tuple,
HeapTuple chunk_newtuple, bool update);

PGFunction compressed_data_send;
PGFunction compressed_data_recv;
Expand Down
20 changes: 20 additions & 0 deletions src/nodes/chunk_dispatch_state.c
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,27 @@ chunk_dispatch_exec(CustomScanState *node)
estate->es_result_relation_info = cis->result_relation_info;
#endif
Assert(ts_cm_functions->compress_row_exec != NULL);
TupleTableSlot *orig_slot = slot;
slot = ts_cm_functions->compress_row_exec(cis->compress_state, slot);
/* If we have cagg defined on the hypertable, we have to execute
* the function that records invalidations directly as AFTER ROW
* triggers do not work with compressed chunks.
*/
if (ts_continuous_aggs_find_by_raw_table_id(ht->fd.id))
{
Assert(ts_cm_functions->continuous_agg_call_invalidation_trigger);
HeapTupleTableSlot *hslot = (HeapTupleTableSlot *) orig_slot;
if (!hslot->tuple)
hslot->tuple = heap_form_tuple(orig_slot->tts_tupleDescriptor,
orig_slot->tts_values,
orig_slot->tts_isnull);

ts_cm_functions->continuous_agg_call_invalidation_trigger(ht->fd.id,
cis->rel,
hslot->tuple,
NULL /*chunk_newtuple */,
false /*update*/);
}
}
return slot;
}
Expand Down
31 changes: 22 additions & 9 deletions src/nodes/chunk_insert_state.c
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
#include "chunk_index.h"
#include "indexing.h"

#define CAGG_INVALIDATION_TRIGGER_NAME "ts_cagg_invalidation_trigger"

/* Just like ExecPrepareExpr except that it doesn't switch to the query memory context */
static inline ExprState *
prepare_constr_expr(Expr *node)
Expand Down Expand Up @@ -658,22 +660,33 @@ ts_chunk_insert_state_create(const Chunk *chunk, ChunkDispatch *dispatch)
if (tg->trig_insert_after_statement || tg->trig_insert_before_statement)
elog(ERROR, "statement trigger on chunk table not supported");

/* AFTER ROW triggers do not work since we redirect the insert
* to the compressed chunk. We still want cagg triggers to fire.
* We'll call them directly. But raise an error if there are
* other triggers
*/
if (has_compressed_chunk && tg->trig_insert_after_row)
{
StringInfo trigger_list = makeStringInfo();
int i = 0;
Assert(tg->numtriggers > 0);
while (i < tg->numtriggers)
for (int i = 0; i < tg->numtriggers; i++)
{
appendStringInfoString(trigger_list, tg->triggers[i].tgname);
if (++i < tg->numtriggers)
if (strncmp(tg->triggers[i].tgname,
CAGG_INVALIDATION_TRIGGER_NAME,
strlen(CAGG_INVALIDATION_TRIGGER_NAME)) == 0)
continue;
if (i > 0)
appendStringInfoString(trigger_list, ", ");
appendStringInfoString(trigger_list, tg->triggers[i].tgname);
}
if (trigger_list->len != 0)
{
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("after insert row trigger on compressed chunk not supported"),
errdetail("Triggers: %s", trigger_list->data),
errhint("Decompress the chunk first before inserting into it.")));
}
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("after insert row trigger on compressed chunk not supported"),
errdetail("Triggers: %s", trigger_list->data),
errhint("Decompress the chunk first before inserting into it.")));
}
}

Expand Down
49 changes: 33 additions & 16 deletions tsl/src/continuous_aggs/insert.c
Original file line number Diff line number Diff line change
Expand Up @@ -197,9 +197,6 @@ continuous_agg_trigfn(PG_FUNCTION_ARGS)
TriggerData *trigdata = (TriggerData *) fcinfo->context;
char *hypertable_id_str;
int32 hypertable_id;
ContinuousAggsCacheInvalEntry *cache_entry;
bool found;
int64 timeval;
if (trigdata->tg_trigger->tgnargs < 0)
elog(ERROR, "must supply hypertable id");

Expand All @@ -210,7 +207,31 @@ continuous_agg_trigfn(PG_FUNCTION_ARGS)
elog(ERROR, "continuous agg trigger function must be called by trigger manager");
if (!TRIGGER_FIRED_AFTER(trigdata->tg_event) || !TRIGGER_FIRED_FOR_ROW(trigdata->tg_event))
elog(ERROR, "continuous agg trigger function must be called in per row after trigger");
execute_cagg_trigger(hypertable_id,
trigdata->tg_relation,
trigdata->tg_trigtuple,
trigdata->tg_newtuple,
TRIGGER_FIRED_BY_UPDATE(trigdata->tg_event));
if (!TRIGGER_FIRED_BY_UPDATE(trigdata->tg_event))
return PointerGetDatum(trigdata->tg_trigtuple);
else
return PointerGetDatum(trigdata->tg_newtuple);
}

/*
* chunk_tuple is the tuple from trigdata->tg_trigtuple
* i.e. the one being/inserts/deleted/updated.
* (for updates: this is the row before modification)
* chunk_newtuple is the tuple from trigdata->tg_newtuple.
*/
void
execute_cagg_trigger(int32 hypertable_id, Relation chunk_rel, HeapTuple chunk_tuple,
HeapTuple chunk_newtuple, bool update)
{
ContinuousAggsCacheInvalEntry *cache_entry;
bool found;
int64 timeval;
Oid chunk_relid = chunk_rel->rd_id;
/* On first call, init the mctx and hash table*/
if (!continuous_aggs_cache_inval_htab)
cache_inval_init();
Expand All @@ -222,31 +243,27 @@ continuous_agg_trigfn(PG_FUNCTION_ARGS)
cache_inval_entry_init(cache_entry, hypertable_id);

/* handle the case where we need to repopulate the cached chunk data */
if (cache_entry->previous_chunk_relid != trigdata->tg_relation->rd_id)
cache_entry_switch_to_chunk(cache_entry,
trigdata->tg_relation->rd_id,
trigdata->tg_relation);
if (cache_entry->previous_chunk_relid != chunk_relid)
cache_entry_switch_to_chunk(cache_entry, chunk_relid, chunk_rel);

timeval = tuple_get_time(&cache_entry->hypertable_open_dimension,
trigdata->tg_trigtuple,
chunk_tuple,
cache_entry->previous_chunk_open_dimension,
RelationGetDescr(trigdata->tg_relation));
RelationGetDescr(chunk_rel));

update_cache_entry(cache_entry, timeval);

if (!TRIGGER_FIRED_BY_UPDATE(trigdata->tg_event))
return PointerGetDatum(trigdata->tg_trigtuple);
if (!update)
return;

/* on update we need to invalidate the new time value as well as the old one*/
timeval = tuple_get_time(&cache_entry->hypertable_open_dimension,
trigdata->tg_newtuple,
chunk_newtuple,
cache_entry->previous_chunk_open_dimension,
RelationGetDescr(trigdata->tg_relation));
RelationGetDescr(chunk_rel));

update_cache_entry(cache_entry, timeval);

return PointerGetDatum(trigdata->tg_newtuple);
};
}

static void
cache_inval_entry_write(ContinuousAggsCacheInvalEntry *entry)
Expand Down
2 changes: 2 additions & 0 deletions tsl/src/continuous_aggs/insert.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,7 @@ extern Datum continuous_agg_trigfn(PG_FUNCTION_ARGS);

extern void _continuous_aggs_cache_inval_init();
extern void _continuous_aggs_cache_inval_fini();
extern void execute_cagg_trigger(int32 hypertable_id, Relation chunk_rel, HeapTuple chunk_tuple,
HeapTuple chunk_newtuple, bool update);

#endif /* TIMESCALEDB_TSL_CONTINUOUS_AGGS_INSERT_H */
1 change: 1 addition & 0 deletions tsl/src/init.c
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ CrossModuleFunctions tsl_cm_functions = {
.finalize_agg_ffunc = tsl_finalize_agg_ffunc,
.process_cagg_viewstmt = tsl_process_continuous_agg_viewstmt,
.continuous_agg_invalidation_trigger = continuous_agg_trigfn,
.continuous_agg_call_invalidation_trigger = execute_cagg_trigger,
.continuous_agg_update_options = continuous_agg_update_options,
.continuous_agg_refresh = continuous_agg_refresh,
.continuous_agg_refresh_chunk = continuous_agg_refresh_chunk,
Expand Down
54 changes: 54 additions & 0 deletions tsl/test/expected/compression_insert-12.out
Original file line number Diff line number Diff line change
Expand Up @@ -856,3 +856,57 @@ NOTICE: chunk "_hyper_13_20_chunk" is already compressed
-> Seq Scan on compress_hyper_14_23_chunk
(10 rows)

-- TEST cagg triggers with insert into compressed chunk
CREATE TABLE conditions (
timec TIMESTAMPTZ NOT NULL,
temperature DOUBLE PRECISION NULL,
humidity DOUBLE PRECISION NULL
);
SELECT table_name from create_hypertable( 'conditions', 'timec');
table_name
------------
conditions
(1 row)

INSERT INTO conditions
SELECT generate_series('2010-01-01 09:00:00-08'::timestamptz, '2010-01-03 09:00:00-08'::timestamptz, '1 day'), 55 , 45;
CREATE MATERIALIZED VIEW cagg_conditions WITH (timescaledb.continuous,
timescaledb.materialized_only = true)
AS
SELECT time_bucket( '7 days', timec) bkt, count(*) cnt, sum(temperature) sumb
FROM conditions
GROUP BY time_bucket('7 days', timec);
NOTICE: refreshing continuous aggregate "cagg_conditions"
SELECT * FROM cagg_conditions ORDER BY 1;
bkt | cnt | sumb
------------------------------+-----+------
Sun Dec 27 16:00:00 2009 PST | 3 | 165
(1 row)

ALTER TABLE conditions SET (timescaledb.compress);
SELECT compress_chunk(ch) FROM show_chunks('conditions') ch;
compress_chunk
------------------------------------------
_timescaledb_internal._hyper_15_24_chunk
(1 row)


SELECT chunk_name, range_start, range_end, is_compressed
FROM timescaledb_information.chunks
WHERE hypertable_name = 'conditions';
chunk_name | range_start | range_end | is_compressed
--------------------+------------------------------+------------------------------+---------------
_hyper_15_24_chunk | Wed Dec 30 16:00:00 2009 PST | Wed Jan 06 16:00:00 2010 PST | t
(1 row)

--now insert into compressed chunk
INSERT INTO conditions VALUES('2010-01-01 12:00:00-08', 10, 20);
INSERT INTO conditions VALUES('2010-01-01 12:00:00-08', 10, 20);
--refresh cagg, should have updated info
CALL refresh_continuous_aggregate('cagg_conditions', NULL, '2011-01-01 12:00:00-08' );
SELECT * FROM cagg_conditions ORDER BY 1;
bkt | cnt | sumb
------------------------------+-----+------
Sun Dec 27 16:00:00 2009 PST | 5 | 185
(1 row)

54 changes: 54 additions & 0 deletions tsl/test/expected/compression_insert-13.out
Original file line number Diff line number Diff line change
Expand Up @@ -856,3 +856,57 @@ NOTICE: chunk "_hyper_13_20_chunk" is already compressed
-> Seq Scan on compress_hyper_14_23_chunk
(10 rows)

-- TEST cagg triggers with insert into compressed chunk
CREATE TABLE conditions (
timec TIMESTAMPTZ NOT NULL,
temperature DOUBLE PRECISION NULL,
humidity DOUBLE PRECISION NULL
);
SELECT table_name from create_hypertable( 'conditions', 'timec');
table_name
------------
conditions
(1 row)

INSERT INTO conditions
SELECT generate_series('2010-01-01 09:00:00-08'::timestamptz, '2010-01-03 09:00:00-08'::timestamptz, '1 day'), 55 , 45;
CREATE MATERIALIZED VIEW cagg_conditions WITH (timescaledb.continuous,
timescaledb.materialized_only = true)
AS
SELECT time_bucket( '7 days', timec) bkt, count(*) cnt, sum(temperature) sumb
FROM conditions
GROUP BY time_bucket('7 days', timec);
NOTICE: refreshing continuous aggregate "cagg_conditions"
SELECT * FROM cagg_conditions ORDER BY 1;
bkt | cnt | sumb
------------------------------+-----+------
Sun Dec 27 16:00:00 2009 PST | 3 | 165
(1 row)

ALTER TABLE conditions SET (timescaledb.compress);
SELECT compress_chunk(ch) FROM show_chunks('conditions') ch;
compress_chunk
------------------------------------------
_timescaledb_internal._hyper_15_24_chunk
(1 row)


SELECT chunk_name, range_start, range_end, is_compressed
FROM timescaledb_information.chunks
WHERE hypertable_name = 'conditions';
chunk_name | range_start | range_end | is_compressed
--------------------+------------------------------+------------------------------+---------------
_hyper_15_24_chunk | Wed Dec 30 16:00:00 2009 PST | Wed Jan 06 16:00:00 2010 PST | t
(1 row)

--now insert into compressed chunk
INSERT INTO conditions VALUES('2010-01-01 12:00:00-08', 10, 20);
INSERT INTO conditions VALUES('2010-01-01 12:00:00-08', 10, 20);
--refresh cagg, should have updated info
CALL refresh_continuous_aggregate('cagg_conditions', NULL, '2011-01-01 12:00:00-08' );
SELECT * FROM cagg_conditions ORDER BY 1;
bkt | cnt | sumb
------------------------------+-----+------
Sun Dec 27 16:00:00 2009 PST | 5 | 185
(1 row)

54 changes: 54 additions & 0 deletions tsl/test/expected/compression_insert-14.out
Original file line number Diff line number Diff line change
Expand Up @@ -856,3 +856,57 @@ NOTICE: chunk "_hyper_13_20_chunk" is already compressed
-> Seq Scan on compress_hyper_14_23_chunk
(10 rows)

-- TEST cagg triggers with insert into compressed chunk
CREATE TABLE conditions (
timec TIMESTAMPTZ NOT NULL,
temperature DOUBLE PRECISION NULL,
humidity DOUBLE PRECISION NULL
);
SELECT table_name from create_hypertable( 'conditions', 'timec');
table_name
------------
conditions
(1 row)

INSERT INTO conditions
SELECT generate_series('2010-01-01 09:00:00-08'::timestamptz, '2010-01-03 09:00:00-08'::timestamptz, '1 day'), 55 , 45;
CREATE MATERIALIZED VIEW cagg_conditions WITH (timescaledb.continuous,
timescaledb.materialized_only = true)
AS
SELECT time_bucket( '7 days', timec) bkt, count(*) cnt, sum(temperature) sumb
FROM conditions
GROUP BY time_bucket('7 days', timec);
NOTICE: refreshing continuous aggregate "cagg_conditions"
SELECT * FROM cagg_conditions ORDER BY 1;
bkt | cnt | sumb
------------------------------+-----+------
Sun Dec 27 16:00:00 2009 PST | 3 | 165
(1 row)

ALTER TABLE conditions SET (timescaledb.compress);
SELECT compress_chunk(ch) FROM show_chunks('conditions') ch;
compress_chunk
------------------------------------------
_timescaledb_internal._hyper_15_24_chunk
(1 row)


SELECT chunk_name, range_start, range_end, is_compressed
FROM timescaledb_information.chunks
WHERE hypertable_name = 'conditions';
chunk_name | range_start | range_end | is_compressed
--------------------+------------------------------+------------------------------+---------------
_hyper_15_24_chunk | Wed Dec 30 16:00:00 2009 PST | Wed Jan 06 16:00:00 2010 PST | t
(1 row)

--now insert into compressed chunk
INSERT INTO conditions VALUES('2010-01-01 12:00:00-08', 10, 20);
INSERT INTO conditions VALUES('2010-01-01 12:00:00-08', 10, 20);
--refresh cagg, should have updated info
CALL refresh_continuous_aggregate('cagg_conditions', NULL, '2011-01-01 12:00:00-08' );
SELECT * FROM cagg_conditions ORDER BY 1;
bkt | cnt | sumb
------------------------------+-----+------
Sun Dec 27 16:00:00 2009 PST | 5 | 185
(1 row)

Loading

0 comments on commit b7520b8

Please sign in to comment.