Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refresh cagg uses min value for dimension when start_time is NULL #7546

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/guc.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ extern bool ts_guc_enable_cagg_reorder_groupby;
extern TSDLLEXPORT int ts_guc_cagg_max_individual_materializations;
extern bool ts_guc_enable_now_constify;
extern bool ts_guc_enable_foreign_key_propagation;
extern bool ts_guc_enable_osm_reads;
extern TSDLLEXPORT bool ts_guc_enable_osm_reads;
#if PG16_GE
extern TSDLLEXPORT bool ts_guc_enable_cagg_sort_pushdown;
#endif
Expand Down
61 changes: 61 additions & 0 deletions src/hypertable.c
Original file line number Diff line number Diff line change
Expand Up @@ -2402,6 +2402,67 @@ ts_hypertable_get_open_dim_max_value(const Hypertable *ht, int dimension_index,
return max_value;
}

/*
* Get the min value of an open dimension.
*/
int64
ts_hypertable_get_open_dim_min_value(const Hypertable *ht, int dimension_index, bool *isnull)
{
StringInfo command;
const Dimension *dim;
int res;
bool min_isnull;
Datum mindat;
Oid timetype;

dim = hyperspace_get_open_dimension(ht->space, dimension_index);

if (NULL == dim)
elog(ERROR, "invalid open dimension index %d", dimension_index);

timetype = ts_dimension_get_partition_type(dim);

/*
* Query for the oldest chunk in the hypertable.
* Anyway to optimize this better?
*/
command = makeStringInfo();
appendStringInfo(command,
"SELECT pg_catalog.min(%s) FROM %s.%s",
quote_identifier(NameStr(dim->fd.column_name)),
quote_identifier(NameStr(ht->fd.schema_name)),
quote_identifier(NameStr(ht->fd.table_name)));

if (SPI_connect() != SPI_OK_CONNECT)
elog(ERROR, "could not connect to SPI");

res = SPI_execute(command->data, true /* read_only */, 0 /*count*/);

if (res < 0)
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
(errmsg("could not find the minimum time value for hypertable \"%s\"",
get_rel_name(ht->main_table_relid)))));

Ensure(SPI_gettypeid(SPI_tuptable->tupdesc, 1) == timetype,
"partition types for result (%d) and dimension (%d) do not match",
SPI_gettypeid(SPI_tuptable->tupdesc, 1),
ts_dimension_get_partition_type(dim));
mindat = SPI_getbinval(SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 1, &min_isnull);

if (isnull)
*isnull = min_isnull;

int64 min_value =
min_isnull ? ts_time_get_min(timetype) : ts_time_value_to_internal(mindat, timetype);

res = SPI_finish();
if (res != SPI_OK_FINISH)
elog(ERROR, "SPI_finish failed: %s", SPI_result_code_string(res));

return min_value;
}

bool
ts_hypertable_has_compression_table(const Hypertable *ht)
{
Expand Down
2 changes: 2 additions & 0 deletions src/hypertable.h
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,8 @@ extern TSDLLEXPORT bool ts_hypertable_set_compress_interval(Hypertable *ht,
int64 compress_interval);
extern TSDLLEXPORT int64 ts_hypertable_get_open_dim_max_value(const Hypertable *ht,
int dimension_index, bool *isnull);
extern TSDLLEXPORT int64 ts_hypertable_get_open_dim_min_value(const Hypertable *ht,
int dimension_index, bool *isnull);

extern TSDLLEXPORT bool ts_hypertable_has_compression_table(const Hypertable *ht);
extern TSDLLEXPORT void ts_hypertable_formdata_fill(FormData_hypertable *fd, const TupleInfo *ti);
Expand Down
16 changes: 16 additions & 0 deletions src/ts_catalog/continuous_agg.c
Original file line number Diff line number Diff line change
Expand Up @@ -1587,6 +1587,22 @@ ts_compute_circumscribed_bucketed_refresh_window_variable(int64 *start, int64 *e
*end = ts_time_value_to_internal(end_new, TIMESTAMPOID);
}

int64
ts_compute_circumscribed_bucketed_refresh_window_start_variable(
int64 start, const ContinuousAggsBucketFunction *bf)
{
Datum start_old, start_new;

/*
* It's OK to use TIMESTAMPOID here.
* See the comment in ts_compute_inscribed_bucketed_refresh_window_variable()
*/
start_old = ts_internal_to_time_value(start, TIMESTAMPOID);
start_new = generic_time_bucket(bf, start_old);

return ts_time_value_to_internal(start_new, TIMESTAMPOID);
}

/*
* Calculates the beginning of the next bucket.
*
Expand Down
3 changes: 3 additions & 0 deletions src/ts_catalog/continuous_agg.h
Original file line number Diff line number Diff line change
Expand Up @@ -215,3 +215,6 @@ extern TSDLLEXPORT Query *ts_continuous_agg_get_query(ContinuousAgg *cagg);

extern TSDLLEXPORT int64
ts_continuous_agg_fixed_bucket_width(const ContinuousAggsBucketFunction *bucket_function);

extern TSDLLEXPORT int64 ts_compute_circumscribed_bucketed_refresh_window_start_variable(
int64 start, const ContinuousAggsBucketFunction *bf);
103 changes: 99 additions & 4 deletions tsl/src/continuous_aggs/refresh.c
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,56 @@ compute_circumscribed_bucketed_refresh_window(const ContinuousAgg *cagg,
return result;
}

static int64
compute_circumscribed_refresh_window_start(const ContinuousAgg *cagg, const InternalTimeRange *const refresh_window,
const ContinuousAggsBucketFunction *bucket_function)
{
Assert(refresh_window != NULL);
Assert(bucket_function != NULL);

if (bucket_function->bucket_fixed_interval == false)
{
InternalTimeRange result = *refresh_window;
result.start =
ts_compute_circumscribed_bucketed_refresh_window_start_variable(refresh_window->start,
bucket_function);
return result.start;
}

/* Interval is fixed */
int64 bucket_width = ts_continuous_agg_fixed_bucket_width(bucket_function);
Assert(bucket_width > 0);

InternalTimeRange result = *refresh_window;
InternalTimeRange largest_bucketed_window =
get_largest_bucketed_window(refresh_window->type, bucket_width);

/* Get offset and origin for bucket function */
NullableDatum offset = INIT_NULL_DATUM;
NullableDatum origin = INIT_NULL_DATUM;
fill_bucket_offset_origin(cagg, refresh_window, &offset, &origin);

/* Defined offset and origin in one function is not supported */
Assert(offset.isnull == true || origin.isnull == true);

if (refresh_window->start <= largest_bucketed_window.start)
{
result.start = largest_bucketed_window.start;
}
else
{
/* For alignment with a bucket, which includes the start of the refresh window, we just
* need to get start of the bucket. */
result.start = ts_time_bucket_by_type_extended(bucket_width,
refresh_window->start,
refresh_window->type,
offset,
origin);
}

return result.start;
}

/*
* Initialize the refresh state for a continuous aggregate.
*
Expand Down Expand Up @@ -790,26 +840,66 @@ continuous_agg_refresh_internal(const ContinuousAgg *cagg,
* still take a long time and it is probably best for consistency to always
* prevent transaction blocks. */
PreventInTransactionBlock(true, REFRESH_FUNCTION_NAME);
elog(NOTICE,
"HERE refrech window INPUT IS (%ld %ld) (start_is_null %d)",
refresh_window.start,
refresh_window.end,
start_isnull);

/* No bucketing when open ended */
if (!(start_isnull && end_isnull))
if (!(start_isnull && end_isnull) || (start_isnull && !ts_guc_enable_osm_reads))
{
int64 mints = 0, bucket_mints = 0;
bool min_isnull = true;
if (start_isnull && ts_guc_enable_osm_reads == false)
{
/*set refresh window start to min(ts) of raw hypertable as tiered
* data is not visible */
InternalTimeRange tw = *refresh_window_arg;
const Hypertable *raw_ht = cagg_get_hypertable_or_fail(cagg->data.raw_hypertable_id);
mints = ts_hypertable_get_open_dim_min_value(raw_ht, 0, &min_isnull);
tw.start = mints;
bucket_mints = compute_circumscribed_refresh_window_start(cagg, &tw, cagg->bucket_function);
elog(NOTICE, "got mints=%ld bucketmints=%ld ", mints, bucket_mints);
}
if (cagg->bucket_function->bucket_fixed_interval == false)
{
refresh_window = *refresh_window_arg;
elog(NOTICE,
"NOT FIXED INTERVAL original (%ld %ld)",
refresh_window.start,
refresh_window.end);
if (!min_isnull)
refresh_window.start = bucket_mints;

ts_compute_inscribed_bucketed_refresh_window_variable(&refresh_window.start,
&refresh_window.end,
cagg->bucket_function);
elog(NOTICE,
"NOT FIXED INTERVAL adjusted (%ld %ld)",
refresh_window.start,
refresh_window.end);
}
else
{
InternalTimeRange refresh_window_arg_copy = *refresh_window_arg;
if (!min_isnull)
refresh_window_arg_copy.start = bucket_mints;
int64 bucket_width = ts_continuous_agg_fixed_bucket_width(cagg->bucket_function);
Assert(bucket_width > 0);
refresh_window =
compute_inscribed_bucketed_refresh_window(cagg, refresh_window_arg, bucket_width);
compute_inscribed_bucketed_refresh_window(cagg,
(const InternalTimeRange
*) (&refresh_window_arg_copy),
bucket_width);
elog(NOTICE,
"FIXED INTERVAL orig(%ld %ld) adjusted (%ld %ld)",
refresh_window_arg_copy.start,
refresh_window_arg_copy.end,
refresh_window.start,
refresh_window.end);
}
}

elog(NOTICE, "HERE refrech window (%ld %ld)", refresh_window.start, refresh_window.end);
if (refresh_window.start >= refresh_window.end)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
Expand Down Expand Up @@ -859,6 +949,11 @@ continuous_agg_refresh_internal(const ContinuousAgg *cagg,
(IS_TIMESTAMP_TYPE(refresh_window.type) &&
invalidation_threshold == ts_time_get_min(refresh_window.type)))
{
elog(NOTICE,
"upto date (start=%ld end=%ld) threshold=%ld",
refresh_window.start,
refresh_window.end,
invalidation_threshold);
emit_up_to_date_notice(cagg, callctx);

/* Restore search_path */
Expand Down
99 changes: 99 additions & 0 deletions tsl/test/expected/cagg_osm.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
-- This file and its contents are licensed under the Timescale License.
-- Please see the included NOTICE for copyright information and
-- LICENSE-TIMESCALE for a copy of the license.
-- These tests work for PG14 or greater
-- Remember to coordinate any changes to functionality with the Cloud
-- Native Storage team. Tests for the following API:
-- * cagg refresh with GUC enable_tiered_reads
\set EXPLAIN 'EXPLAIN (COSTS OFF)'
--SETUP for OSM chunk --
--need superuser access to create foreign data server
\c :TEST_DBNAME :ROLE_SUPERUSER
CREATE DATABASE postgres_fdw_db;
GRANT ALL PRIVILEGES ON DATABASE postgres_fdw_db TO :ROLE_4;
\c postgres_fdw_db :ROLE_4
CREATE TABLE fdw_table( timec timestamptz NOT NULL , acq_id bigint, value bigint);
INSERT INTO fdw_table VALUES( '2020-01-01 01:00', 100, 1000);
--create foreign server and user mappings as superuser
\c :TEST_DBNAME :ROLE_SUPERUSER
SELECT current_setting('port') as "PORTNO" \gset
CREATE EXTENSION postgres_fdw;
CREATE SERVER s3_server FOREIGN DATA WRAPPER postgres_fdw
OPTIONS ( host 'localhost', dbname 'postgres_fdw_db', port :'PORTNO');
GRANT USAGE ON FOREIGN SERVER s3_server TO :ROLE_4;
CREATE USER MAPPING FOR :ROLE_4 SERVER s3_server
OPTIONS ( user :'ROLE_4' , password :'ROLE_4_PASS');
ALTER USER MAPPING FOR :ROLE_4 SERVER s3_server
OPTIONS (ADD password_required 'false');
\c :TEST_DBNAME :ROLE_4;
-- this is a stand-in for the OSM table
CREATE FOREIGN TABLE child_fdw_table
(timec timestamptz NOT NULL, acq_id bigint, value bigint)
SERVER s3_server OPTIONS ( schema_name 'public', table_name 'fdw_table');
--now attach foreign table as a chunk of the hypertable.
CREATE TABLE ht_try(timec timestamptz NOT NULL, acq_id bigint, value bigint);
SELECT create_hypertable('ht_try', 'timec', chunk_time_interval => interval '1 day');
create_hypertable
---------------------
(1,public,ht_try,t)
(1 row)

INSERT INTO ht_try VALUES ('2022-05-05 01:00', 222, 222);
SELECT * FROM child_fdw_table;
timec | acq_id | value
------------------------------+--------+-------
Wed Jan 01 01:00:00 2020 PST | 100 | 1000
(1 row)

SELECT _timescaledb_functions.attach_osm_table_chunk('ht_try', 'child_fdw_table');
attach_osm_table_chunk
------------------------
t
(1 row)

-- must also update the range since the created chunk contains data
SELECT _timescaledb_functions.hypertable_osm_range_update('ht_try', '2020-01-01'::timestamptz, '2020-01-02');
hypertable_osm_range_update
-----------------------------
f
(1 row)

set timescaledb.enable_tiered_reads = true;
SELECT * from ht_try ORDER BY 1;
timec | acq_id | value
------------------------------+--------+-------
Wed Jan 01 01:00:00 2020 PST | 100 | 1000
Thu May 05 01:00:00 2022 PDT | 222 | 222
(2 rows)

--disable tiered reads --
--set timescaledb.enable_tiered_reads = false;
SELECT * from ht_try ORDER BY 1;
timec | acq_id | value
------------------------------+--------+-------
Wed Jan 01 01:00:00 2020 PST | 100 | 1000
Thu May 05 01:00:00 2022 PDT | 222 | 222
(2 rows)

--TEST cagg creation
CREATE MATERIALIZED VIEW cagg_ht_osm WITH (timescaledb.continuous, timescaledb.materialized_only = true)
AS
SELECT time_bucket('7 days'::interval, timec), count(*)
FROM ht_try
GROUP BY 1;
NOTICE: HERE refrech window INPUT IS (-210866803200000000 9223372036854775807) (start_is_null 1)
NOTICE: HERE refrech window (-210866803200000000 9223372036854775807)
NOTICE: refreshing continuous aggregate "cagg_ht_osm"
SELECT * FROM cagg_ht_osm ORDER BY 1;
time_bucket | count
------------------------------+-------
Sun Dec 29 16:00:00 2019 PST | 1
Sun May 01 17:00:00 2022 PDT | 1
(2 rows)

SELECT * FROM
_timescaledb_catalog.continuous_aggs_hypertable_invalidation_log ORDER BY 1;
hypertable_id | lowest_modified_value | greatest_modified_value
---------------+-----------------------+-------------------------
(0 rows)

1 change: 1 addition & 0 deletions tsl/test/sql/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ set(TEST_FILES
bgw_policy.sql
bgw_security.sql
cagg_deprecated_bucket_ng.sql
cagg_osm.sql
cagg_errors.sql
cagg_invalidation.sql
cagg_policy.sql
Expand Down
Loading
Loading