Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix compress_policy for multi txn handling #3667

Merged
merged 2 commits into from
Oct 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
166 changes: 162 additions & 4 deletions sql/policy_internal.sql
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,172 @@ CREATE OR REPLACE PROCEDURE _timescaledb_internal.policy_reorder(job_id INTEGER,
AS '@MODULE_PATHNAME@', 'ts_policy_reorder_proc'
LANGUAGE C;

CREATE OR REPLACE PROCEDURE _timescaledb_internal.policy_compression(job_id INTEGER, config JSONB)
AS '@MODULE_PATHNAME@', 'ts_policy_compression_proc'
LANGUAGE C;

CREATE OR REPLACE PROCEDURE _timescaledb_internal.policy_recompression(job_id INTEGER, config JSONB)
AS '@MODULE_PATHNAME@', 'ts_policy_recompression_proc'
LANGUAGE C;

CREATE OR REPLACE PROCEDURE _timescaledb_internal.policy_refresh_continuous_aggregate(job_id INTEGER, config JSONB)
AS '@MODULE_PATHNAME@', 'ts_policy_refresh_cagg_proc'
LANGUAGE C;

CREATE OR REPLACE PROCEDURE
_timescaledb_internal.policy_compression_interval( job_id INTEGER,
htid INTEGER,
lag INTERVAL,
maxchunks INTEGER,
verbose_log BOOLEAN,
recompress_enabled BOOLEAN)
AS $$
DECLARE
htoid regclass;
chunk_rec record;
numchunks integer := 1;
BEGIN

SELECT format('%I.%I',schema_name, table_name) INTO htoid
FROM _timescaledb_catalog.hypertable
WHERE id = htid;
gayyappan marked this conversation as resolved.
Show resolved Hide resolved

FOR chunk_rec IN
SELECT show.oid, ch.schema_name, ch.table_name, ch.status
FROM show_chunks( htoid, older_than => lag) as show(oid)
INNER JOIN pg_class pgc ON pgc.oid = show.oid
INNER JOIN pg_namespace pgns ON pgc.relnamespace = pgns.oid
INNER JOIN _timescaledb_catalog.chunk ch ON ch.table_name = pgc.relname and ch.schema_name = pgns.nspname and ch.hypertable_id = htid
WHERE ch.dropped is false and (ch.status = 0 OR ch.status = 3)
LOOP
IF chunk_rec.status = 0 THEN
PERFORM compress_chunk( chunk_rec.oid );
ELSIF chunk_rec.status = 3 AND recompress_enabled = 'true' THEN
PERFORM recompress_chunk( chunk_rec.oid );
END IF;
COMMIT;
IF verbose_log THEN
RAISE LOG 'job % completed processing chunk %.%', job_id, chunk_rec.schema_name, chunk_rec.table_name;
END IF;
numchunks := numchunks + 1;
IF maxchunks > 0 AND numchunks >= maxchunks THEN
EXIT;
END IF;
Comment on lines +56 to +59
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can add a LIMIT maxchunks to the SELECT for the loop and avoid this code as well as improve performance.

Copy link
Contributor Author

@gayyappan gayyappan Oct 12, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would have to do this dynamically based on the maxchunks value, i.e handle NULL or not null.
The default is to not set maxchunks. Policies are created with NULL maxchunks.
So I don't see a point in optimizing for this special case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gayyappan IMHO it's simple to deal with it using dynamic SQL:

statement := format('
    SELECT show.oid, ch.schema_name, ch.table_name, ch.status
    FROM show_chunks(%L, older_than => %L) AS show(oid)
      JOIN pg_class pgc ON pgc.oid = show.oid
      JOIN pg_namespace pgns ON pgc.relnamespace = pgns.oid
      JOIN _timescaledb_catalog.chunk ch ON ch.table_name = pgc.relname AND ch.schema_name = pgns.nspname AND ch.hypertable_id = %L
    WHERE ch.dropped IS FALSE AND ch.status IN (0, 3)', htoid, lag, htid);
...

FOR chunk_rec IN EXECUTE format('%s %s', statement, COALESCE('LIMIT '||maxchunks, ''))
LOOP
...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The point is that I do not want to use dynamic SQL to optimize for a special case.

END LOOP;
END;
$$ LANGUAGE PLPGSQL;

CREATE OR REPLACE PROCEDURE
_timescaledb_internal.policy_compression_integer( job_id INTEGER,
htid INTEGER,
lag BIGINT,
maxchunks INTEGER,
verbose_log BOOLEAN,
recompress_enabled BOOLEAN)
AS $$
DECLARE
htoid regclass;
chunk_rec record;
numchunks integer := 0;
lag_integer BIGINT;
BEGIN

SELECT format('%I.%I',schema_name, table_name) INTO htoid
FROM _timescaledb_catalog.hypertable
WHERE id = htid;
gayyappan marked this conversation as resolved.
Show resolved Hide resolved

--for the integer case , we have to compute the lag w.r.t
-- the integer_now function and then pass on to show_chunks
lag_integer := _timescaledb_internal.subtract_integer_from_now( htoid, lag);

FOR chunk_rec IN
SELECT show.oid, ch.schema_name, ch.table_name, ch.status
FROM show_chunks( htoid, older_than => lag_integer) SHOW (oid)
gayyappan marked this conversation as resolved.
Show resolved Hide resolved
INNER JOIN pg_class pgc ON pgc.oid = show.oid
INNER JOIN pg_namespace pgns ON pgc.relnamespace = pgns.oid
INNER JOIN _timescaledb_catalog.chunk ch ON ch.table_name = pgc.relname and ch.schema_name = pgns.nspname and ch.hypertable_id = htid
WHERE ch.dropped is false and (ch.status = 0 OR ch.status = 3)
LOOP
IF chunk_rec.status = 0 THEN
PERFORM compress_chunk( chunk_rec.oid );
ELSIF chunk_rec.status = 3 AND recompress_enabled = 'true' THEN
PERFORM recompress_chunk( chunk_rec.oid );
END IF;
COMMIT;
IF verbose_log THEN
RAISE LOG 'job % completed processing chunk %.%', job_id, chunk_rec.schema_name, chunk_rec.table_name;
END IF;

numchunks := numchunks + 1;
IF maxchunks > 0 AND numchunks >= maxchunks THEN
EXIT;
END IF;
END LOOP;
Comment on lines +105 to +109
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, use a LIMIT on the select instead.

END;
$$ LANGUAGE PLPGSQL;

CREATE OR REPLACE PROCEDURE
_timescaledb_internal.policy_compression( job_id INTEGER, config JSONB)
AS $$
DECLARE
dimtype regtype;
compress_after text;
lag_interval interval;
lag_integer bigint;
htid integer;
htoid regclass;
chunk_rec record;
verbose_log bool;
maxchunks integer := 0;
numchunks integer := 1;
recompress_enabled bool;
BEGIN
IF config IS NULL THEN
RAISE EXCEPTION 'job % has null config', job_id;
END IF;

htid := jsonb_object_field_text (config, 'hypertable_id')::integer;
IF htid is NULL THEN
RAISE EXCEPTION 'job % config must have hypertable_id', job_id;
END IF;

verbose_log := jsonb_object_field_text (config, 'verbose_log')::boolean;
IF verbose_log is NULL THEN
verbose_log = 'false';
END IF;

maxchunks := jsonb_object_field_text (config, 'maxchunks_to_compress')::integer;
IF maxchunks IS NULL THEN
maxchunks = 0;
END IF;

recompress_enabled := jsonb_object_field_text (config, 'recompress')::boolean;
IF recompress_enabled IS NULL THEN
recompress_enabled = 'true';
END IF;

compress_after := jsonb_object_field_text(config, 'compress_after');
IF compress_after IS NULL THEN
RAISE EXCEPTION 'job % config must have compress_after', job_id;
END IF;

-- find primary dimension type --
SELECT column_type INTO STRICT dimtype
FROM ( SELECT ht.schema_name, ht.table_name, dim.column_name, dim.column_type,
row_number() over(partition by ht.id order by dim.id) as rn
FROM _timescaledb_catalog.hypertable ht ,
_timescaledb_catalog.dimension dim
WHERE ht.id = dim.hypertable_id and ht.id = htid ) q
WHERE rn = 1;

CASE WHEN (dimtype = 'TIMESTAMP'::regtype
fabriziomello marked this conversation as resolved.
Show resolved Hide resolved
OR dimtype = 'TIMESTAMPTZ'::regtype
OR dimtype = 'DATE'::regtype) THEN
lag_interval := jsonb_object_field_text(config, 'compress_after')::interval ;
CALL _timescaledb_internal.policy_compression_interval(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can simplify it a bit and instead of have two policy_compression_?? functions we can have just one policy_compression_execute and pass down the dimtype to deal with interval and integer in one place.

job_id, htid, lag_interval,
maxchunks, verbose_log, recompress_enabled);
ELSE
lag_integer := jsonb_object_field_text(config, 'compress_after')::bigint;
CALL _timescaledb_internal.policy_compression_integer(
job_id, htid, lag_integer,
maxchunks, verbose_log, recompress_enabled );
END CASE;
END;
$$ LANGUAGE PLPGSQL;
3 changes: 3 additions & 0 deletions sql/updates/latest-dev.sql
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
DROP FUNCTION IF EXISTS _timescaledb_internal.time_col_name_for_chunk(name,name);
DROP FUNCTION IF EXISTS _timescaledb_internal.time_col_type_for_chunk(name,name);

CREATE OR REPLACE FUNCTION _timescaledb_internal.subtract_integer_from_now( hypertable_relid REGCLASS, lag INT8 )
RETURNS INT8 AS '@MODULE_PATHNAME@', 'ts_subtract_integer_from_now' LANGUAGE C STABLE STRICT;
7 changes: 7 additions & 0 deletions sql/updates/reverse-dev.sql
Original file line number Diff line number Diff line change
@@ -1,2 +1,9 @@
DROP FUNCTION IF EXISTS timescaledb_experimental.time_bucket_ng(bucket_width INTERVAL, ts TIMESTAMPTZ, timezone TEXT);
DROP FUNCTION IF EXISTS timescaledb_experimental.time_bucket_ng(bucket_width INTERVAL, ts TIMESTAMPTZ, origin TIMESTAMPTZ, timezone TEXT);

DROP FUNCTION IF EXISTS _timescaledb_internal.subtract_integer_from_now;

--changes for compression policy ---
DROP PROCEDURE IF EXISTS _timescaledb_internal.policy_compression;
DROP PROCEDURE IF EXISTS _timescaledb_internal.policy_compression_interval;
DROP PROCEDURE IF EXISTS _timescaledb_internal.policy_compression_integer;
3 changes: 3 additions & 0 deletions sql/util_time.sql
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,6 @@ RETURNS BIGINT AS '@MODULE_PATHNAME@', 'ts_time_to_internal' LANGUAGE C VOLATILE

CREATE OR REPLACE FUNCTION _timescaledb_internal.cagg_watermark(hypertable_id INTEGER)
RETURNS INT8 AS '@MODULE_PATHNAME@', 'ts_continuous_agg_watermark' LANGUAGE C STABLE STRICT;

CREATE OR REPLACE FUNCTION _timescaledb_internal.subtract_integer_from_now( hypertable_relid REGCLASS, lag INT8 )
RETURNS INT8 AS '@MODULE_PATHNAME@', 'ts_subtract_integer_from_now' LANGUAGE C STABLE STRICT;
2 changes: 0 additions & 2 deletions src/cross_module_fn.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

/* bgw policy functions */
CROSSMODULE_WRAPPER(policy_compression_add);
CROSSMODULE_WRAPPER(policy_compression_proc);
CROSSMODULE_WRAPPER(policy_compression_remove);
CROSSMODULE_WRAPPER(policy_recompression_proc);
CROSSMODULE_WRAPPER(policy_refresh_cagg_add);
Expand Down Expand Up @@ -315,7 +314,6 @@ TSDLLEXPORT CrossModuleFunctions ts_cm_functions_default = {

/* bgw policies */
.policy_compression_add = error_no_default_fn_pg_community,
.policy_compression_proc = error_no_default_fn_pg_community,
.policy_compression_remove = error_no_default_fn_pg_community,
.policy_recompression_proc = error_no_default_fn_pg_community,
.policy_refresh_cagg_add = error_no_default_fn_pg_community,
Expand Down
1 change: 0 additions & 1 deletion src/cross_module_fn.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ typedef struct CrossModuleFunctions
void (*add_tsl_telemetry_info)(JsonbParseState **parse_state);

PGFunction policy_compression_add;
PGFunction policy_compression_proc;
PGFunction policy_compression_remove;
PGFunction policy_recompression_proc;
PGFunction policy_refresh_cagg_add;
Expand Down
72 changes: 71 additions & 1 deletion src/utils.c
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,10 @@

#include "compat/compat.h"
#include "chunk.h"
#include "guc.h"
#include "hypertable_cache.h"
#include "utils.h"
#include "time_utils.h"
#include "guc.h"

TS_FUNCTION_INFO_V1(ts_pg_timestamp_to_unix_microseconds);

Expand Down Expand Up @@ -823,3 +824,72 @@ ts_get_integer_now_func(const Dimension *open_dim)

return now_func;
}

/* subtract passed in interval from the now.
* Arguments:
* now_func : function used to compute now.
* interval : integer value
* Returns:
* now_func() - interval
*/
int64
ts_sub_integer_from_now(int64 interval, Oid time_dim_type, Oid now_func)
{
Datum now;
int64 res;

AssertArg(IS_INTEGER_TYPE(time_dim_type));

now = OidFunctionCall0(now_func);
switch (time_dim_type)
{
case INT2OID:
res = DatumGetInt16(now) - interval;
if (res < PG_INT16_MIN || res > PG_INT16_MAX)
ereport(ERROR,
(errcode(ERRCODE_INTERVAL_FIELD_OVERFLOW),
errmsg("integer time overflow")));
return res;
case INT4OID:
res = DatumGetInt32(now) - interval;
if (res < PG_INT32_MIN || res > PG_INT32_MAX)
ereport(ERROR,
(errcode(ERRCODE_INTERVAL_FIELD_OVERFLOW),
errmsg("integer time overflow")));
return res;
case INT8OID:
{
bool overflow = pg_sub_s64_overflow(DatumGetInt64(now), interval, &res);
if (overflow)
{
ereport(ERROR,
(errcode(ERRCODE_INTERVAL_FIELD_OVERFLOW),
errmsg("integer time overflow")));
}
return res;
}
default:
pg_unreachable();
}
}

TS_FUNCTION_INFO_V1(ts_subtract_integer_from_now);
Datum
ts_subtract_integer_from_now(PG_FUNCTION_ARGS)
{
Oid ht_relid = PG_GETARG_OID(0);
Datum lag = PG_GETARG_INT64(1);
Cache *hcache;
Hypertable *hypertable =
ts_hypertable_cache_get_cache_and_entry(ht_relid, CACHE_FLAG_NONE, &hcache);

const Dimension *dim = hyperspace_get_open_dimension(hypertable->space, 0);
Oid partitioning_type = ts_dimension_get_partition_type(dim);
Oid now_func = ts_get_integer_now_func(dim);
if (now_func == InvalidOid)
elog(ERROR, "could not find valid integer_now function for hypertable");
Assert(IS_INTEGER_TYPE(partitioning_type));
int64 res = ts_sub_integer_from_now(lag, partitioning_type, now_func);
ts_cache_release(hcache);
return Int64GetDatum(res);
}
1 change: 1 addition & 0 deletions src/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ extern TSDLLEXPORT Oid ts_get_cast_func(Oid source, Oid target);
typedef struct Dimension Dimension;

extern TSDLLEXPORT Oid ts_get_integer_now_func(const Dimension *open_dim);
extern TSDLLEXPORT int64 ts_sub_integer_from_now(int64 interval, Oid time_dim_type, Oid now_func);

extern TSDLLEXPORT void *ts_create_struct_from_slot(TupleTableSlot *slot, MemoryContext mctx,
size_t alloc_size, size_t copy_size);
Expand Down
13 changes: 0 additions & 13 deletions tsl/src/bgw_policy/compression_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -138,19 +138,6 @@ policy_recompression_get_recompress_after_interval(const Jsonb *config)
return interval;
}

Datum
policy_compression_proc(PG_FUNCTION_ARGS)
{
if (PG_NARGS() != 2 || PG_ARGISNULL(0) || PG_ARGISNULL(1))
PG_RETURN_VOID();

TS_PREVENT_FUNC_IF_READ_ONLY();

policy_compression_execute(PG_GETARG_INT32(0), PG_GETARG_JSONB_P(1));

PG_RETURN_VOID();
}

Datum
policy_recompression_proc(PG_FUNCTION_ARGS)
{
Expand Down
Loading