Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Commit

Permalink
Adds support for reloader in Promscale.
Browse files Browse the repository at this point in the history
Signed-off-by: Harkishen-Singh <harkishensingh@hotmail.com>
  • Loading branch information
Harkishen-Singh committed Jun 16, 2022
1 parent 6b402cb commit 8739804
Show file tree
Hide file tree
Showing 8 changed files with 130 additions and 57 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ We use the following categories for changes:
- Telemetry for recording rules and alerting [#1424]
- Set number of ingest copiers to the number of DB CPUs [#1387]
- Telemetry for helm chart installations [#1429]
- Ability to reload rules and alerting config [#1426]

### Fixed
- Trace query returns empty result when queried with
Expand Down
29 changes: 29 additions & 0 deletions pkg/api/reload.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// This file and its contents are licensed under the Apache License 2.0.
// Please see the included NOTICE for copyright information and
// LICENSE for a copy of the license.

package api

import (
"fmt"
"net/http"

"github.com/timescale/promscale/pkg/log"
)

func Reload(reload func() error, webAdmin bool) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if !webAdmin {
err := fmt.Errorf("reload received but web admin is disabled. To enable, start Promscale with '-web.enable-admin-api' flag")
log.Error("msg", err.Error())
http.Error(w, fmt.Errorf("failed to reload: %w", err).Error(), http.StatusUnauthorized)
return
}
if err := reload(); err != nil {
log.Error("msg", "failed to reload", "err", err.Error())
http.Error(w, fmt.Errorf("failed to reload: %w", err).Error(), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Length", "0")
}
}
5 changes: 4 additions & 1 deletion pkg/api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
"github.com/timescale/promscale/pkg/telemetry"
)

func GenerateRouter(apiConf *Config, promqlConf *query.Config, client *pgclient.Client, query *jaegerQuery.Query) (*mux.Router, error) {
func GenerateRouter(apiConf *Config, promqlConf *query.Config, client *pgclient.Client, query *jaegerQuery.Query, reload func() error) (*mux.Router, error) {
var writePreprocessors []parser.Preprocessor
if apiConf.HighAvailability {
service := ha.NewService(haClient.NewLeaseClient(client.Connection))
Expand Down Expand Up @@ -101,6 +101,9 @@ func GenerateRouter(apiConf *Config, promqlConf *query.Config, client *pgclient.
router.Path("/healthz").Methods(http.MethodGet, http.MethodOptions, http.MethodHead).HandlerFunc(Health(healthChecker))
router.Path(apiConf.TelemetryPath).Methods(http.MethodGet).HandlerFunc(promhttp.Handler().ServeHTTP)

reloadHandler := timeHandler(metrics.HTTPRequestDuration, "/-/reload", Reload(reload, apiConf.AdminAPIEnabled))
router.Path("/-/reload").Methods(http.MethodPost).HandlerFunc(reloadHandler)

jaeger.ExtendQueryAPIs(router, client.Connection, query)

debugProf := router.PathPrefix("/debug/pprof").Subrouter()
Expand Down
20 changes: 16 additions & 4 deletions pkg/rules/rules.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type Manager struct {
postRulesProcessing prom_rules.RuleGroupPostProcessFunc
}

func NewManager(ctx context.Context, r prometheus.Registerer, client *pgclient.Client, cfg *Config) (*Manager, error) {
func NewManager(ctx context.Context, r prometheus.Registerer, client *pgclient.Client, cfg *Config) (*Manager, func() error, error) {
discoveryManagerNotify := discovery.NewManager(ctx, log.GetLogger(), discovery.Name("notify"))

notifierManager := notifier.NewManager(&notifier.Options{
Expand All @@ -44,7 +44,7 @@ func NewManager(ctx context.Context, r prometheus.Registerer, client *pgclient.C
// For the moment, we do not have any external UI url, hence we provide an empty one.
parsedUrl, err := url.Parse("")
if err != nil {
return nil, fmt.Errorf("parsing UI-URL: %w", err)
return nil, nil, fmt.Errorf("parsing UI-URL: %w", err)
}

rulesManager := prom_rules.NewManager(&prom_rules.ManagerOptions{
Expand All @@ -60,12 +60,24 @@ func NewManager(ctx context.Context, r prometheus.Registerer, client *pgclient.C
ForGracePeriod: cfg.ForGracePeriod,
ResendDelay: cfg.ResendDelay,
})
return &Manager{

manager := &Manager{
ctx: ctx,
rulesManager: rulesManager,
notifierManager: notifierManager,
discoveryManager: discoveryManagerNotify,
}, nil
}
return manager, manager.getReloader(cfg), nil
}

func (m *Manager) getReloader(cfg *Config) func() error {
return func() error {
err := Validate(cfg) // This refreshes the RulesCfg.PrometheusConfig entry in RulesCfg after reading the PrometheusConfigAddress.
if err != nil {
return fmt.Errorf("error validating rules-config: %w", err)
}
return errors.WithMessage(m.ApplyConfig(cfg.PrometheusConfig), "error applying config")
}
}

func (m *Manager) WithPostRulesProcess(f prom_rules.RuleGroupPostProcessFunc) {
Expand Down
102 changes: 58 additions & 44 deletions pkg/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"net/http"
"os"
"os/signal"
"syscall"
"time"

_ "github.com/jackc/pgx/v4/stdlib"
Expand All @@ -25,7 +26,6 @@ import (

"github.com/google/uuid"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"

"github.com/jaegertracing/jaeger/plugin/storage/grpc/shared"
"github.com/prometheus/client_golang/prometheus"
"github.com/thanos-io/thanos/pkg/store/storepb"
Expand Down Expand Up @@ -137,9 +137,52 @@ func Run(cfg *Config) error {
}
}

var (
group run.Group
reloadRules func() error
)
pendingTelemetry := map[string]string{
"rules_enabled": "false", // Will be written in telemetry table as `promscale_rules_enabled: false`
"alerting_enabled": "false", // `promscale_alerting_enabled: false`
}
if cfg.RulesCfg.ContainsRules() {
pendingTelemetry["rules_enabled"] = "true"
if cfg.RulesCfg.ContainsAlertingConfig() {
pendingTelemetry["alerting_enabled"] = "true"
} else {
log.Debug("msg", "Alerting configuration not present in the given Prometheus configuration file. Alerting will not be initialized")
}

rulesCtx, stopRuler := context.WithCancel(context.Background())
defer stopRuler()

var manager *rules.Manager
manager, reloadRules, err = rules.NewManager(rulesCtx, prometheus.DefaultRegisterer, client, &cfg.RulesCfg)
if err != nil {
return fmt.Errorf("error creating rules manager: %w", err)
}
cfg.APICfg.Rules = manager

if err := reloadRules(); err != nil {
return err
}

group.Add(
func() error {
log.Info("msg", "Started Rule-Manager")
return manager.Run()
}, func(error) {
log.Info("msg", "Stopping Rule-Manager")
stopRuler()
},
)
} else {
log.Debug("msg", "Rules files not found in the given Prometheus configuration file. Both rule-manager and alerting will not be initialized")
}

jaegerQuery := query.New(client.QuerierConnection, &cfg.TracingCfg)

router, err := api.GenerateRouter(&cfg.APICfg, &cfg.PromQLCfg, client, jaegerQuery)
router, err := api.GenerateRouter(&cfg.APICfg, &cfg.PromQLCfg, client, jaegerQuery, reloadRules)
if err != nil {
log.Error("msg", "aborting startup due to error", "err", fmt.Sprintf("generate router: %s", err.Error()))
return fmt.Errorf("generate router: %w", err)
Expand All @@ -149,7 +192,6 @@ func Run(cfg *Config) error {
telemetryEngine.Start()
defer telemetryEngine.Stop()

var group run.Group
if len(cfg.ThanosStoreAPIListenAddr) > 0 {
srv := thanos.NewStorage(client.Queryable())
options := make([]grpc.ServerOption, 0)
Expand Down Expand Up @@ -222,44 +264,6 @@ func Run(cfg *Config) error {
},
)

pendingTelemetry := map[string]string{
"rules_enabled": "false", // Will be written in telemetry table as `promscale_rules_enabled: false`
"alerting_enabled": "false", // `promscale_alerting_enabled: false`
}
if cfg.RulesCfg.ContainsRules() {
pendingTelemetry["rules_enabled"] = "true"
if cfg.RulesCfg.ContainsAlertingConfig() {
pendingTelemetry["alerting_enabled"] = "true"
} else {
log.Debug("msg", "Alerting configuration not present in the given Prometheus configuration file. Alerting will not be initialized")
}

rulesCtx, stopRuler := context.WithCancel(context.Background())
defer stopRuler()

manager, err := rules.NewManager(rulesCtx, prometheus.DefaultRegisterer, client, &cfg.RulesCfg)
if err != nil {
return fmt.Errorf("error creating rules manager: %w", err)
}
cfg.APICfg.Rules = manager

group.Add(
func() error {
promCfg := cfg.RulesCfg.PrometheusConfig
if err = manager.ApplyConfig(promCfg); err != nil {
return fmt.Errorf("error applying Prometheus configuration to rules manager: %w", err)
}
log.Info("msg", "Started Rule-Manager")
return manager.Run()
}, func(err error) {
log.Info("msg", "Stopping Rule-Manager")
stopRuler()
},
)
} else {
log.Debug("msg", "Rules files not found in the given Prometheus configuration file. Both rule-manager and alerting will not be initialized")
}

// Asynchronously update the telemetry information.
go func() {
for k, v := range pendingTelemetry {
Expand Down Expand Up @@ -296,11 +300,21 @@ func Run(cfg *Config) error {

// Listen to OS interrupt signals.
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
signal.Notify(c, syscall.SIGINT, syscall.SIGHUP)
group.Add(
func() error {
<-c
return nil
for {
switch <-c {
case syscall.SIGINT:
return nil
case syscall.SIGHUP:
if err := reloadRules(); err != nil {
log.Error("msg", "error reloading rules", "err", err.Error())
continue
}
log.Debug("msg", "success reloading rules")
}
}
}, func(err error) {
close(c)
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ func buildRouterWithAPIConfig(pool *pgxpool.Pool, cfg *api.Config) (*mux.Router,

jaegerQuery := jaegerquery.New(pgClient.QuerierConnection, &jaegerquery.DefaultConfig)

router, err := api.GenerateRouter(cfg, qryCfg, pgClient, jaegerQuery)
router, err := api.GenerateRouter(cfg, qryCfg, pgClient, jaegerQuery, nil)
if err != nil {
return nil, nil, fmt.Errorf("generate router: %w", err)
}
Expand Down
27 changes: 20 additions & 7 deletions pkg/tests/end_to_end_tests/rules_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@ import (
"github.com/timescale/promscale/pkg/tenancy"
)

const RecordingRulesEvalConfigPath = "../testdata/rules/config.recording_rules_eval.yaml"
const (
RecordingRulesEvalConfigPath = "../testdata/rules/config.recording_rules_eval.yaml"
EmptyRecordingRulesConfigPath = "../testdata/rules/config.empty_rules.yaml"
)

func TestRecordingRulesEval(t *testing.T) {
withDB(t, *testDatabase, func(db *pgxpool.Pool, t testing.TB) {
Expand Down Expand Up @@ -56,22 +59,27 @@ func TestRecordingRulesEval(t *testing.T) {
OutageTolerance: rules.DefaultOutageTolerance,
ForGracePeriod: rules.DefaultForGracePeriod,
ResendDelay: rules.DefaultResendDelay,
PrometheusConfigAddress: RecordingRulesEvalConfigPath,
PrometheusConfigAddress: EmptyRecordingRulesConfigPath, // Start with empty rules.
}
require.NoError(t, rules.Validate(rulesCfg))
require.True(t, rulesCfg.ContainsRules())
require.False(t, rulesCfg.ContainsRules())

ruleCtx, stopRuler := context.WithCancel(context.Background())
defer stopRuler()

manager, err := rules.NewManager(ruleCtx, prometheus.NewRegistry(), pgClient, rulesCfg)
manager, reloadRules, err := rules.NewManager(ruleCtx, prometheus.NewRegistry(), pgClient, rulesCfg)
require.NoError(t, err)

require.NotNil(t, rulesCfg.PrometheusConfig)
require.NoError(t, reloadRules())
require.False(t, rulesCfg.ContainsRules())

ruleGroups := manager.RuleGroups()
require.Equal(t, 0, len(ruleGroups))

manager.WithPostRulesProcess(func(*prom_rules.Group, time.Time, log.Logger) error {
defer func() {
stopRuler() // Cancels the context so that the blocking manager.Run() is released when the test finishes.
stopRuler() // Shuts down the manager.Run() as soon as the test completes.
}()
// Check if recording rule as a metric exists in metric catalog table.
var exists bool
Expand All @@ -88,8 +96,13 @@ func TestRecordingRulesEval(t *testing.T) {

return nil
})
require.NoError(t, manager.ApplyConfig(rulesCfg.PrometheusConfig))
require.NoError(t, manager.Run(), "error running rules manager") // This is blocking. It will be released after stopRuler() in defer func.
// Reload with configuration file that contains some rules.
rulesCfg.PrometheusConfigAddress = RecordingRulesEvalConfigPath
require.NoError(t, reloadRules())
require.True(t, rulesCfg.ContainsRules())
require.Equal(t, 1, len(manager.RuleGroups()))

require.NoError(t, manager.Run(), "error running rules manager")
})
}

Expand Down
1 change: 1 addition & 0 deletions pkg/tests/testdata/rules/config.empty_rules.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# Empty file indicates recording rules are not applied.

0 comments on commit 8739804

Please sign in to comment.