Skip to content

Commit

Permalink
Log CAgg refresh details in BGW
Browse files Browse the repository at this point in the history
Currently, when a CAgg is refreshed using a background worker, only the
refresh invocation is logged. However, the details of the refresh, such
as the number of individual ranges that are refreshed, are not logged.
This PR changes the log level for these details in background workers to
INFO, to ensure the information is captured.
  • Loading branch information
jnidzwetzki committed Mar 13, 2024
1 parent 06f08c5 commit 8e6994b
Show file tree
Hide file tree
Showing 11 changed files with 328 additions and 87 deletions.
7 changes: 7 additions & 0 deletions tsl/src/continuous_aggs/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,13 @@ typedef struct CAggTimebucketInfo
TimestampTz origin;
} CAggTimebucketInfo;

typedef enum CaggRefreshCallContext
{
CAGG_REFRESH_CREATION,
CAGG_REFRESH_WINDOW,
CAGG_REFRESH_POLICY,
} CaggRefreshCallContext;

#define CAGG_MAKEQUERY(selquery, srcquery) \
do \
{ \
Expand Down
6 changes: 4 additions & 2 deletions tsl/src/continuous_aggs/invalidation.c
Original file line number Diff line number Diff line change
Expand Up @@ -1015,7 +1015,8 @@ InvalidationStore *
invalidation_process_cagg_log(int32 mat_hypertable_id, int32 raw_hypertable_id,
const InternalTimeRange *refresh_window,
const CaggsInfo *all_caggs_info, const long max_materializations,
bool *do_merged_refresh, InternalTimeRange *ret_merged_refresh_window)
bool *do_merged_refresh, InternalTimeRange *ret_merged_refresh_window,
const CaggRefreshCallContext callctx)
{
CaggInvalidationState state;
InvalidationStore *store = NULL;
Expand Down Expand Up @@ -1057,7 +1058,8 @@ invalidation_process_cagg_log(int32 mat_hypertable_id, int32 raw_hypertable_id,
store,
state.bucket_width,
state.bucket_function,
&merged_refresh_window);
&merged_refresh_window,
callctx);
*do_merged_refresh = true;
*ret_merged_refresh_window = merged_refresh_window;
invalidation_store_free(store);
Expand Down
2 changes: 1 addition & 1 deletion tsl/src/continuous_aggs/invalidation.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,6 @@ extern void invalidation_process_hypertable_log(int32 mat_hypertable_id, int32 r
extern InvalidationStore *invalidation_process_cagg_log(
int32 mat_hypertable_id, int32 raw_hypertable_id, const InternalTimeRange *refresh_window,
const CaggsInfo *all_caggs_info, const long max_materializations, bool *do_merged_refresh,
InternalTimeRange *ret_merged_refresh_window);
InternalTimeRange *ret_merged_refresh_window, const CaggRefreshCallContext callctx);

extern void invalidation_store_free(InvalidationStore *store);
51 changes: 30 additions & 21 deletions tsl/src/continuous_aggs/refresh.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
#include "invalidation_threshold.h"
#include "guc.h"

#define CAGG_REFRESH_LOG_LEVEL (callctx == CAGG_REFRESH_POLICY ? LOG : DEBUG1)

typedef struct CaggRefreshState
{
ContinuousAgg cagg;
Expand All @@ -56,17 +58,19 @@ static void continuous_agg_refresh_execute(const CaggRefreshState *refresh,
static void log_refresh_window(int elevel, const ContinuousAgg *cagg,
const InternalTimeRange *refresh_window, const char *msg);
static void continuous_agg_refresh_execute_wrapper(const InternalTimeRange *bucketed_refresh_window,
const CaggRefreshCallContext callctx,
const long iteration, void *arg1_refresh,
void *arg2_chunk_id);
static void update_merged_refresh_window(const InternalTimeRange *bucketed_refresh_window,
const long iteration, void *arg1_merged_refresh_window,
void *arg2);
const CaggRefreshCallContext callctx, const long iteration,
void *arg1_merged_refresh_window, void *arg2);
static void continuous_agg_refresh_with_window(const ContinuousAgg *cagg,
const InternalTimeRange *refresh_window,
const InvalidationStore *invalidations,
const int64 bucket_width, int32 chunk_id,
const bool do_merged_refresh,
const InternalTimeRange merged_refresh_window);
const InternalTimeRange merged_refresh_window,
const CaggRefreshCallContext callctx);
static ContinuousAgg *get_cagg_by_relid(const Oid cagg_relid);
static void emit_up_to_date_notice(const ContinuousAgg *cagg, const CaggRefreshCallContext callctx);
static bool process_cagg_invalidations_and_refresh(const ContinuousAgg *cagg,
Expand Down Expand Up @@ -336,24 +340,29 @@ log_refresh_window(int elevel, const ContinuousAgg *cagg, const InternalTimeRang
}

typedef void (*scan_refresh_ranges_funct_t)(const InternalTimeRange *bucketed_refresh_window,
const CaggRefreshCallContext callctx,
const long iteration, /* 0 is first range */
void *arg1, void *arg2);

static void
continuous_agg_refresh_execute_wrapper(const InternalTimeRange *bucketed_refresh_window,
const long iteration, void *arg1_refresh,
void *arg2_chunk_id)
const CaggRefreshCallContext callctx, const long iteration,
void *arg1_refresh, void *arg2_chunk_id)
{
const CaggRefreshState *refresh = (const CaggRefreshState *) arg1_refresh;
const int32 chunk_id = *(const int32 *) arg2_chunk_id;
(void) iteration;

log_refresh_window(DEBUG1, &refresh->cagg, bucketed_refresh_window, "invalidation refresh on");
log_refresh_window(CAGG_REFRESH_LOG_LEVEL,
&refresh->cagg,
bucketed_refresh_window,
"continuous aggregate refresh (individual invalidation) on");
continuous_agg_refresh_execute(refresh, bucketed_refresh_window, chunk_id);
}

static void
update_merged_refresh_window(const InternalTimeRange *bucketed_refresh_window, const long iteration,
update_merged_refresh_window(const InternalTimeRange *bucketed_refresh_window,
const CaggRefreshCallContext callctx, const long iteration,
void *arg1_merged_refresh_window, void *arg2)
{
InternalTimeRange *merged_refresh_window = (InternalTimeRange *) arg1_merged_refresh_window;
Expand All @@ -376,6 +385,7 @@ continuous_agg_scan_refresh_window_ranges(const InternalTimeRange *refresh_windo
const InvalidationStore *invalidations,
const int64 bucket_width,
const ContinuousAggsBucketFunction *bucket_function,
const CaggRefreshCallContext callctx,
scan_refresh_ranges_funct_t exec_func, void *func_arg1,
void *func_arg2)
{
Expand Down Expand Up @@ -411,7 +421,7 @@ continuous_agg_scan_refresh_window_ranges(const InternalTimeRange *refresh_windo
bucket_width,
bucket_function);

(*exec_func)(&bucketed_refresh_window, count, func_arg1, func_arg2);
(*exec_func)(&bucketed_refresh_window, callctx, count, func_arg1, func_arg2);

count++;
}
Expand Down Expand Up @@ -452,7 +462,8 @@ continuous_agg_refresh_with_window(const ContinuousAgg *cagg,
const InternalTimeRange *refresh_window,
const InvalidationStore *invalidations, const int64 bucket_width,
int32 chunk_id, const bool do_merged_refresh,
const InternalTimeRange merged_refresh_window)
const InternalTimeRange merged_refresh_window,
const CaggRefreshCallContext callctx)
{
CaggRefreshState refresh;

Expand All @@ -478,10 +489,10 @@ continuous_agg_refresh_with_window(const ContinuousAgg *cagg,
Assert((bucket_width == BUCKET_WIDTH_VARIABLE) ||
(merged_refresh_window.end - bucket_width <= refresh_window->end));

log_refresh_window(DEBUG1,
log_refresh_window(CAGG_REFRESH_LOG_LEVEL,
cagg,
&merged_refresh_window,
"merged invalidations for refresh on");
"continuous aggregate refresh (merged invalidation) on");
continuous_agg_refresh_execute(&refresh, &merged_refresh_window, chunk_id);
}
else
Expand All @@ -491,6 +502,7 @@ continuous_agg_refresh_with_window(const ContinuousAgg *cagg,
invalidations,
bucket_width,
cagg->bucket_function,
callctx,
continuous_agg_refresh_execute_wrapper,
(void *) &refresh /* arg1 */,
(void *) &chunk_id /* arg2 */);
Expand Down Expand Up @@ -574,7 +586,6 @@ emit_up_to_date_notice(const ContinuousAgg *cagg, const CaggRefreshCallContext c
{
switch (callctx)
{
case CAGG_REFRESH_CHUNK:
case CAGG_REFRESH_WINDOW:
case CAGG_REFRESH_CREATION:
elog(NOTICE,
Expand All @@ -591,13 +602,15 @@ continuous_agg_calculate_merged_refresh_window(const InternalTimeRange *refresh_
const InvalidationStore *invalidations,
const int64 bucket_width,
const ContinuousAggsBucketFunction *bucket_function,
InternalTimeRange *merged_refresh_window)
InternalTimeRange *merged_refresh_window,
const CaggRefreshCallContext callctx)
{
long count pg_attribute_unused();
count = continuous_agg_scan_refresh_window_ranges(refresh_window,
invalidations,
bucket_width,
bucket_function,
callctx,
update_merged_refresh_window,
(void *) merged_refresh_window,
NULL /* arg2 */);
Expand Down Expand Up @@ -631,7 +644,8 @@ process_cagg_invalidations_and_refresh(const ContinuousAgg *cagg,
&all_caggs_info,
ts_guc_cagg_max_individual_materializations,
&do_merged_refresh,
&merged_refresh_window);
&merged_refresh_window,
callctx);

if (invalidations != NULL || do_merged_refresh)
{
Expand All @@ -654,7 +668,8 @@ process_cagg_invalidations_and_refresh(const ContinuousAgg *cagg,
bucket_width,
chunk_id,
do_merged_refresh,
merged_refresh_window);
merged_refresh_window,
callctx);
if (invalidations)
invalidation_store_free(invalidations);
return true;
Expand Down Expand Up @@ -725,12 +740,6 @@ continuous_agg_refresh_internal(const ContinuousAgg *cagg,
errdetail("The refresh window must cover at least one bucket of data."),
errhint("Align the refresh window with the bucket"
" time zone or use at least two buckets.")));

log_refresh_window(callctx == CAGG_REFRESH_POLICY ? LOG : DEBUG1,
cagg,
&refresh_window,
"refreshing continuous aggregate");

/*
* Perform the refresh across two transactions.
*
Expand Down
10 changes: 1 addition & 9 deletions tsl/src/continuous_aggs/refresh.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,11 @@
#include "materialize.h"
#include "invalidation.h"

typedef enum CaggRefreshCallContext
{
CAGG_REFRESH_CREATION,
CAGG_REFRESH_WINDOW,
CAGG_REFRESH_CHUNK,
CAGG_REFRESH_POLICY,
} CaggRefreshCallContext;

extern Datum continuous_agg_refresh(PG_FUNCTION_ARGS);
extern void continuous_agg_calculate_merged_refresh_window(
const InternalTimeRange *refresh_window, const InvalidationStore *invalidations,
const int64 bucket_width, const ContinuousAggsBucketFunction *bucket_function,
InternalTimeRange *merged_refresh_window);
InternalTimeRange *merged_refresh_window, const CaggRefreshCallContext callctx);
extern void continuous_agg_refresh_internal(const ContinuousAgg *cagg,
const InternalTimeRange *refresh_window,
const CaggRefreshCallContext callctx,
Expand Down
Loading

0 comments on commit 8e6994b

Please sign in to comment.