diff --git a/src/cross_module_fn.c b/src/cross_module_fn.c index 6e637265110..9fa2e3ed4be 100644 --- a/src/cross_module_fn.c +++ b/src/cross_module_fn.c @@ -222,6 +222,15 @@ continuous_agg_invalidate_all_default(const Hypertable *ht, int64 start, int64 e pg_unreachable(); } +static void +continuous_agg_call_invalidation_trigger_default(int32 hypertable_id, Relation chunk_rel, + HeapTuple chunk_tuple, HeapTuple chunk_newtuple, + bool update) +{ + error_no_default_fn_community(); + pg_unreachable(); +} + static Datum empty_fn(PG_FUNCTION_ARGS) { @@ -347,6 +356,7 @@ TSDLLEXPORT CrossModuleFunctions ts_cm_functions_default = { .continuous_agg_refresh_chunk = error_no_default_fn_pg_community, .continuous_agg_invalidate = continuous_agg_invalidate_all_default, .continuous_agg_update_options = continuous_agg_update_options_default, + .continuous_agg_call_invalidation_trigger = continuous_agg_call_invalidation_trigger_default, /* compression */ .compressed_data_send = error_no_default_fn_pg_community, diff --git a/src/cross_module_fn.h b/src/cross_module_fn.h index a892a8d754f..520ba036c43 100644 --- a/src/cross_module_fn.h +++ b/src/cross_module_fn.h @@ -95,6 +95,9 @@ typedef struct CrossModuleFunctions void (*continuous_agg_invalidate)(const Hypertable *ht, int64 start, int64 end); void (*continuous_agg_update_options)(ContinuousAgg *cagg, WithClauseResult *with_clause_options); + void (*continuous_agg_call_invalidation_trigger)(int32 hypertable_id, Relation chunk_rel, + HeapTuple chunk_tuple, + HeapTuple chunk_newtuple, bool update); PGFunction compressed_data_send; PGFunction compressed_data_recv; diff --git a/src/nodes/chunk_dispatch_state.c b/src/nodes/chunk_dispatch_state.c index 5187f086f63..16d797cb468 100644 --- a/src/nodes/chunk_dispatch_state.c +++ b/src/nodes/chunk_dispatch_state.c @@ -157,7 +157,27 @@ chunk_dispatch_exec(CustomScanState *node) estate->es_result_relation_info = cis->result_relation_info; #endif Assert(ts_cm_functions->compress_row_exec != NULL); + TupleTableSlot *orig_slot = slot; slot = ts_cm_functions->compress_row_exec(cis->compress_state, slot); + /* If we have cagg defined on the hypertable, we have to execute + * the function that records invalidations directly as AFTER ROW + * triggers do not work with compressed chunks. + */ + if (ts_continuous_aggs_find_by_raw_table_id(ht->fd.id)) + { + Assert(ts_cm_functions->continuous_agg_call_invalidation_trigger); + HeapTupleTableSlot *hslot = (HeapTupleTableSlot *) orig_slot; + if (!hslot->tuple) + hslot->tuple = heap_form_tuple(orig_slot->tts_tupleDescriptor, + orig_slot->tts_values, + orig_slot->tts_isnull); + + ts_cm_functions->continuous_agg_call_invalidation_trigger(ht->fd.id, + cis->rel, + hslot->tuple, + NULL /*chunk_newtuple */, + false /*update*/); + } } return slot; } diff --git a/src/nodes/chunk_insert_state.c b/src/nodes/chunk_insert_state.c index 97dc5b8b89d..9f3cbd3a565 100644 --- a/src/nodes/chunk_insert_state.c +++ b/src/nodes/chunk_insert_state.c @@ -33,6 +33,8 @@ #include "chunk_index.h" #include "indexing.h" +#define CAGG_INVALIDATION_TRIGGER_NAME "ts_cagg_invalidation_trigger" + /* Just like ExecPrepareExpr except that it doesn't switch to the query memory context */ static inline ExprState * prepare_constr_expr(Expr *node) @@ -658,22 +660,33 @@ ts_chunk_insert_state_create(const Chunk *chunk, ChunkDispatch *dispatch) if (tg->trig_insert_after_statement || tg->trig_insert_before_statement) elog(ERROR, "statement trigger on chunk table not supported"); + /* AFTER ROW triggers do not work since we redirect the insert + * to the compressed chunk. We still want cagg triggers to fire. + * We'll call them directly. But raise an error if there are + * other triggers + */ if (has_compressed_chunk && tg->trig_insert_after_row) { StringInfo trigger_list = makeStringInfo(); - int i = 0; Assert(tg->numtriggers > 0); - while (i < tg->numtriggers) + for (int i = 0; i < tg->numtriggers; i++) { - appendStringInfoString(trigger_list, tg->triggers[i].tgname); - if (++i < tg->numtriggers) + if (strncmp(tg->triggers[i].tgname, + CAGG_INVALIDATION_TRIGGER_NAME, + strlen(CAGG_INVALIDATION_TRIGGER_NAME)) == 0) + continue; + if (i > 0) appendStringInfoString(trigger_list, ", "); + appendStringInfoString(trigger_list, tg->triggers[i].tgname); + } + if (trigger_list->len != 0) + { + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("after insert row trigger on compressed chunk not supported"), + errdetail("Triggers: %s", trigger_list->data), + errhint("Decompress the chunk first before inserting into it."))); } - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("after insert row trigger on compressed chunk not supported"), - errdetail("Triggers: %s", trigger_list->data), - errhint("Decompress the chunk first before inserting into it."))); } } diff --git a/tsl/src/continuous_aggs/insert.c b/tsl/src/continuous_aggs/insert.c index a45e1dace26..2c1ddb5886f 100644 --- a/tsl/src/continuous_aggs/insert.c +++ b/tsl/src/continuous_aggs/insert.c @@ -197,9 +197,6 @@ continuous_agg_trigfn(PG_FUNCTION_ARGS) TriggerData *trigdata = (TriggerData *) fcinfo->context; char *hypertable_id_str; int32 hypertable_id; - ContinuousAggsCacheInvalEntry *cache_entry; - bool found; - int64 timeval; if (trigdata->tg_trigger->tgnargs < 0) elog(ERROR, "must supply hypertable id"); @@ -210,7 +207,31 @@ continuous_agg_trigfn(PG_FUNCTION_ARGS) elog(ERROR, "continuous agg trigger function must be called by trigger manager"); if (!TRIGGER_FIRED_AFTER(trigdata->tg_event) || !TRIGGER_FIRED_FOR_ROW(trigdata->tg_event)) elog(ERROR, "continuous agg trigger function must be called in per row after trigger"); + execute_cagg_trigger(hypertable_id, + trigdata->tg_relation, + trigdata->tg_trigtuple, + trigdata->tg_newtuple, + TRIGGER_FIRED_BY_UPDATE(trigdata->tg_event)); + if (!TRIGGER_FIRED_BY_UPDATE(trigdata->tg_event)) + return PointerGetDatum(trigdata->tg_trigtuple); + else + return PointerGetDatum(trigdata->tg_newtuple); +} +/* + * chunk_tuple is the tuple from trigdata->tg_trigtuple + * i.e. the one being/inserts/deleted/updated. + * (for updates: this is the row before modification) + * chunk_newtuple is the tuple from trigdata->tg_newtuple. + */ +void +execute_cagg_trigger(int32 hypertable_id, Relation chunk_rel, HeapTuple chunk_tuple, + HeapTuple chunk_newtuple, bool update) +{ + ContinuousAggsCacheInvalEntry *cache_entry; + bool found; + int64 timeval; + Oid chunk_relid = chunk_rel->rd_id; /* On first call, init the mctx and hash table*/ if (!continuous_aggs_cache_inval_htab) cache_inval_init(); @@ -222,31 +243,27 @@ continuous_agg_trigfn(PG_FUNCTION_ARGS) cache_inval_entry_init(cache_entry, hypertable_id); /* handle the case where we need to repopulate the cached chunk data */ - if (cache_entry->previous_chunk_relid != trigdata->tg_relation->rd_id) - cache_entry_switch_to_chunk(cache_entry, - trigdata->tg_relation->rd_id, - trigdata->tg_relation); + if (cache_entry->previous_chunk_relid != chunk_relid) + cache_entry_switch_to_chunk(cache_entry, chunk_relid, chunk_rel); timeval = tuple_get_time(&cache_entry->hypertable_open_dimension, - trigdata->tg_trigtuple, + chunk_tuple, cache_entry->previous_chunk_open_dimension, - RelationGetDescr(trigdata->tg_relation)); + RelationGetDescr(chunk_rel)); update_cache_entry(cache_entry, timeval); - if (!TRIGGER_FIRED_BY_UPDATE(trigdata->tg_event)) - return PointerGetDatum(trigdata->tg_trigtuple); + if (!update) + return; /* on update we need to invalidate the new time value as well as the old one*/ timeval = tuple_get_time(&cache_entry->hypertable_open_dimension, - trigdata->tg_newtuple, + chunk_newtuple, cache_entry->previous_chunk_open_dimension, - RelationGetDescr(trigdata->tg_relation)); + RelationGetDescr(chunk_rel)); update_cache_entry(cache_entry, timeval); - - return PointerGetDatum(trigdata->tg_newtuple); -}; +} static void cache_inval_entry_write(ContinuousAggsCacheInvalEntry *entry) diff --git a/tsl/src/continuous_aggs/insert.h b/tsl/src/continuous_aggs/insert.h index e4e53c6135d..ffcdf8c1ab8 100644 --- a/tsl/src/continuous_aggs/insert.h +++ b/tsl/src/continuous_aggs/insert.h @@ -12,5 +12,7 @@ extern Datum continuous_agg_trigfn(PG_FUNCTION_ARGS); extern void _continuous_aggs_cache_inval_init(); extern void _continuous_aggs_cache_inval_fini(); +extern void execute_cagg_trigger(int32 hypertable_id, Relation chunk_rel, HeapTuple chunk_tuple, + HeapTuple chunk_newtuple, bool update); #endif /* TIMESCALEDB_TSL_CONTINUOUS_AGGS_INSERT_H */ diff --git a/tsl/src/init.c b/tsl/src/init.c index 9ec48152e84..d993d6162c9 100644 --- a/tsl/src/init.c +++ b/tsl/src/init.c @@ -127,6 +127,7 @@ CrossModuleFunctions tsl_cm_functions = { .finalize_agg_ffunc = tsl_finalize_agg_ffunc, .process_cagg_viewstmt = tsl_process_continuous_agg_viewstmt, .continuous_agg_invalidation_trigger = continuous_agg_trigfn, + .continuous_agg_call_invalidation_trigger = execute_cagg_trigger, .continuous_agg_update_options = continuous_agg_update_options, .continuous_agg_refresh = continuous_agg_refresh, .continuous_agg_refresh_chunk = continuous_agg_refresh_chunk, diff --git a/tsl/test/expected/compression_insert-12.out b/tsl/test/expected/compression_insert-12.out index 0c28bbb5e3f..8554d89247a 100644 --- a/tsl/test/expected/compression_insert-12.out +++ b/tsl/test/expected/compression_insert-12.out @@ -856,3 +856,57 @@ NOTICE: chunk "_hyper_13_20_chunk" is already compressed -> Seq Scan on compress_hyper_14_23_chunk (10 rows) +-- TEST cagg triggers with insert into compressed chunk +CREATE TABLE conditions ( + timec TIMESTAMPTZ NOT NULL, + temperature DOUBLE PRECISION NULL, + humidity DOUBLE PRECISION NULL + ); +SELECT table_name from create_hypertable( 'conditions', 'timec'); + table_name +------------ + conditions +(1 row) + +INSERT INTO conditions +SELECT generate_series('2010-01-01 09:00:00-08'::timestamptz, '2010-01-03 09:00:00-08'::timestamptz, '1 day'), 55 , 45; +CREATE MATERIALIZED VIEW cagg_conditions WITH (timescaledb.continuous, + timescaledb.materialized_only = true) +AS +SELECT time_bucket( '7 days', timec) bkt, count(*) cnt, sum(temperature) sumb +FROM conditions +GROUP BY time_bucket('7 days', timec); +NOTICE: refreshing continuous aggregate "cagg_conditions" +SELECT * FROM cagg_conditions ORDER BY 1; + bkt | cnt | sumb +------------------------------+-----+------ + Sun Dec 27 16:00:00 2009 PST | 3 | 165 +(1 row) + +ALTER TABLE conditions SET (timescaledb.compress); +SELECT compress_chunk(ch) FROM show_chunks('conditions') ch; + compress_chunk +------------------------------------------ + _timescaledb_internal._hyper_15_24_chunk +(1 row) + + +SELECT chunk_name, range_start, range_end, is_compressed +FROM timescaledb_information.chunks +WHERE hypertable_name = 'conditions'; + chunk_name | range_start | range_end | is_compressed +--------------------+------------------------------+------------------------------+--------------- + _hyper_15_24_chunk | Wed Dec 30 16:00:00 2009 PST | Wed Jan 06 16:00:00 2010 PST | t +(1 row) + +--now insert into compressed chunk +INSERT INTO conditions VALUES('2010-01-01 12:00:00-08', 10, 20); +INSERT INTO conditions VALUES('2010-01-01 12:00:00-08', 10, 20); +--refresh cagg, should have updated info +CALL refresh_continuous_aggregate('cagg_conditions', NULL, '2011-01-01 12:00:00-08' ); +SELECT * FROM cagg_conditions ORDER BY 1; + bkt | cnt | sumb +------------------------------+-----+------ + Sun Dec 27 16:00:00 2009 PST | 5 | 185 +(1 row) + diff --git a/tsl/test/expected/compression_insert-13.out b/tsl/test/expected/compression_insert-13.out index a92c2d200ae..060ce0c966a 100644 --- a/tsl/test/expected/compression_insert-13.out +++ b/tsl/test/expected/compression_insert-13.out @@ -856,3 +856,57 @@ NOTICE: chunk "_hyper_13_20_chunk" is already compressed -> Seq Scan on compress_hyper_14_23_chunk (10 rows) +-- TEST cagg triggers with insert into compressed chunk +CREATE TABLE conditions ( + timec TIMESTAMPTZ NOT NULL, + temperature DOUBLE PRECISION NULL, + humidity DOUBLE PRECISION NULL + ); +SELECT table_name from create_hypertable( 'conditions', 'timec'); + table_name +------------ + conditions +(1 row) + +INSERT INTO conditions +SELECT generate_series('2010-01-01 09:00:00-08'::timestamptz, '2010-01-03 09:00:00-08'::timestamptz, '1 day'), 55 , 45; +CREATE MATERIALIZED VIEW cagg_conditions WITH (timescaledb.continuous, + timescaledb.materialized_only = true) +AS +SELECT time_bucket( '7 days', timec) bkt, count(*) cnt, sum(temperature) sumb +FROM conditions +GROUP BY time_bucket('7 days', timec); +NOTICE: refreshing continuous aggregate "cagg_conditions" +SELECT * FROM cagg_conditions ORDER BY 1; + bkt | cnt | sumb +------------------------------+-----+------ + Sun Dec 27 16:00:00 2009 PST | 3 | 165 +(1 row) + +ALTER TABLE conditions SET (timescaledb.compress); +SELECT compress_chunk(ch) FROM show_chunks('conditions') ch; + compress_chunk +------------------------------------------ + _timescaledb_internal._hyper_15_24_chunk +(1 row) + + +SELECT chunk_name, range_start, range_end, is_compressed +FROM timescaledb_information.chunks +WHERE hypertable_name = 'conditions'; + chunk_name | range_start | range_end | is_compressed +--------------------+------------------------------+------------------------------+--------------- + _hyper_15_24_chunk | Wed Dec 30 16:00:00 2009 PST | Wed Jan 06 16:00:00 2010 PST | t +(1 row) + +--now insert into compressed chunk +INSERT INTO conditions VALUES('2010-01-01 12:00:00-08', 10, 20); +INSERT INTO conditions VALUES('2010-01-01 12:00:00-08', 10, 20); +--refresh cagg, should have updated info +CALL refresh_continuous_aggregate('cagg_conditions', NULL, '2011-01-01 12:00:00-08' ); +SELECT * FROM cagg_conditions ORDER BY 1; + bkt | cnt | sumb +------------------------------+-----+------ + Sun Dec 27 16:00:00 2009 PST | 5 | 185 +(1 row) + diff --git a/tsl/test/expected/compression_insert-14.out b/tsl/test/expected/compression_insert-14.out index a92c2d200ae..060ce0c966a 100644 --- a/tsl/test/expected/compression_insert-14.out +++ b/tsl/test/expected/compression_insert-14.out @@ -856,3 +856,57 @@ NOTICE: chunk "_hyper_13_20_chunk" is already compressed -> Seq Scan on compress_hyper_14_23_chunk (10 rows) +-- TEST cagg triggers with insert into compressed chunk +CREATE TABLE conditions ( + timec TIMESTAMPTZ NOT NULL, + temperature DOUBLE PRECISION NULL, + humidity DOUBLE PRECISION NULL + ); +SELECT table_name from create_hypertable( 'conditions', 'timec'); + table_name +------------ + conditions +(1 row) + +INSERT INTO conditions +SELECT generate_series('2010-01-01 09:00:00-08'::timestamptz, '2010-01-03 09:00:00-08'::timestamptz, '1 day'), 55 , 45; +CREATE MATERIALIZED VIEW cagg_conditions WITH (timescaledb.continuous, + timescaledb.materialized_only = true) +AS +SELECT time_bucket( '7 days', timec) bkt, count(*) cnt, sum(temperature) sumb +FROM conditions +GROUP BY time_bucket('7 days', timec); +NOTICE: refreshing continuous aggregate "cagg_conditions" +SELECT * FROM cagg_conditions ORDER BY 1; + bkt | cnt | sumb +------------------------------+-----+------ + Sun Dec 27 16:00:00 2009 PST | 3 | 165 +(1 row) + +ALTER TABLE conditions SET (timescaledb.compress); +SELECT compress_chunk(ch) FROM show_chunks('conditions') ch; + compress_chunk +------------------------------------------ + _timescaledb_internal._hyper_15_24_chunk +(1 row) + + +SELECT chunk_name, range_start, range_end, is_compressed +FROM timescaledb_information.chunks +WHERE hypertable_name = 'conditions'; + chunk_name | range_start | range_end | is_compressed +--------------------+------------------------------+------------------------------+--------------- + _hyper_15_24_chunk | Wed Dec 30 16:00:00 2009 PST | Wed Jan 06 16:00:00 2010 PST | t +(1 row) + +--now insert into compressed chunk +INSERT INTO conditions VALUES('2010-01-01 12:00:00-08', 10, 20); +INSERT INTO conditions VALUES('2010-01-01 12:00:00-08', 10, 20); +--refresh cagg, should have updated info +CALL refresh_continuous_aggregate('cagg_conditions', NULL, '2011-01-01 12:00:00-08' ); +SELECT * FROM cagg_conditions ORDER BY 1; + bkt | cnt | sumb +------------------------------+-----+------ + Sun Dec 27 16:00:00 2009 PST | 5 | 185 +(1 row) + diff --git a/tsl/test/sql/compression_insert.sql.in b/tsl/test/sql/compression_insert.sql.in index 64f7b007362..b45306b2e59 100644 --- a/tsl/test/sql/compression_insert.sql.in +++ b/tsl/test/sql/compression_insert.sql.in @@ -550,3 +550,37 @@ SELECT compress_chunk(format('%I.%I',chunk_schema,chunk_name), true) FROM timesc -- should be ordered append :PREFIX SELECT * FROM test_ordering ORDER BY 1; + +-- TEST cagg triggers with insert into compressed chunk +CREATE TABLE conditions ( + timec TIMESTAMPTZ NOT NULL, + temperature DOUBLE PRECISION NULL, + humidity DOUBLE PRECISION NULL + ); +SELECT table_name from create_hypertable( 'conditions', 'timec'); +INSERT INTO conditions +SELECT generate_series('2010-01-01 09:00:00-08'::timestamptz, '2010-01-03 09:00:00-08'::timestamptz, '1 day'), 55 , 45; + +CREATE MATERIALIZED VIEW cagg_conditions WITH (timescaledb.continuous, + timescaledb.materialized_only = true) +AS +SELECT time_bucket( '7 days', timec) bkt, count(*) cnt, sum(temperature) sumb +FROM conditions +GROUP BY time_bucket('7 days', timec); + +SELECT * FROM cagg_conditions ORDER BY 1; + +ALTER TABLE conditions SET (timescaledb.compress); +SELECT compress_chunk(ch) FROM show_chunks('conditions') ch; + +SELECT chunk_name, range_start, range_end, is_compressed +FROM timescaledb_information.chunks +WHERE hypertable_name = 'conditions'; + +--now insert into compressed chunk +INSERT INTO conditions VALUES('2010-01-01 12:00:00-08', 10, 20); +INSERT INTO conditions VALUES('2010-01-01 12:00:00-08', 10, 20); + +--refresh cagg, should have updated info +CALL refresh_continuous_aggregate('cagg_conditions', NULL, '2011-01-01 12:00:00-08' ); +SELECT * FROM cagg_conditions ORDER BY 1;