diff --git a/src/.golangci.yml b/src/.golangci.yml index 4a662c78d1..20308d99fc 100644 --- a/src/.golangci.yml +++ b/src/.golangci.yml @@ -17,4 +17,5 @@ issues: # it can be disabled by `exclude-use-default: false`. To list all # excluded by default patterns execute `golangci-lint run --help` exclude: - - CPUTimesStat # SA1019: CPUTimesStat is deprecated \ No newline at end of file + - CPUTimesStat # SA1019: CPUTimesStat is deprecated + - SA5008 # duplicate struct tag "choice" (staticcheck) \ No newline at end of file diff --git a/src/config/cmdmetric.go b/src/config/cmdmetric.go new file mode 100644 index 0000000000..607be55d80 --- /dev/null +++ b/src/config/cmdmetric.go @@ -0,0 +1,81 @@ +package config + +import ( + "context" + "fmt" + "math" + "slices" + + "golang.org/x/exp/maps" +) + +type MetricCommand struct { + owner *Options + PrintInit MetricPrintInitCommand `command:"print-init" description:"Get and print init SQL for a given metric or preset"` + PrintSQL MetricPrintSQLCommand `command:"print-sql" description:"Get and print SQL for a given metric"` +} + +func NewMetricCommand(owner *Options) *MetricCommand { + return &MetricCommand{ + owner: owner, + PrintInit: MetricPrintInitCommand{owner: owner}, + PrintSQL: MetricPrintSQLCommand{owner: owner}, + } +} + +type MetricPrintInitCommand struct { + owner *Options +} + +func (cmd *MetricPrintInitCommand) Execute(args []string) error { + err := cmd.owner.InitMetricReader(context.Background()) + if err != nil { + return err + } + metrics, err := cmd.owner.MetricsReaderWriter.GetMetrics() + if err != nil { + return err + } + for _, name := range args { + if preset, ok := metrics.PresetDefs[name]; ok { + args = append(args, maps.Keys(preset.Metrics)...) + } + } + slices.Sort(args) + args = slices.Compact(args) + for _, mname := range args { + if m, ok := metrics.MetricDefs[mname]; ok && m.InitSQL != "" { + fmt.Println("-- ", mname) + fmt.Println(m.InitSQL) + } + } + cmd.owner.CompleteCommand(ExitCodeOK) + return nil +} + +type MetricPrintSQLCommand struct { + owner *Options + Version int `short:"v" long:"version" description:"PostgreSQL version to get SQL for"` +} + +func (cmd *MetricPrintSQLCommand) Execute(args []string) error { + err := cmd.owner.InitMetricReader(context.Background()) + if err != nil { + return err + } + metrics, err := cmd.owner.MetricsReaderWriter.GetMetrics() + if err != nil { + return err + } + if cmd.Version == 0 { + cmd.Version = math.MaxInt32 + } + for _, name := range args { + if m, ok := metrics.MetricDefs[name]; ok { + fmt.Println("-- ", name) + fmt.Println(m.GetSQL(cmd.Version)) + } + } + cmd.owner.CompleteCommand(ExitCodeOK) + return nil +} diff --git a/src/config/cmdoptions.go b/src/config/cmdoptions.go index 8e0b068641..39ad094016 100644 --- a/src/config/cmdoptions.go +++ b/src/config/cmdoptions.go @@ -1,15 +1,35 @@ package config import ( + "context" "errors" + "fmt" "io" + "io/fs" "os" "time" + "github.com/cybertec-postgresql/pgwatch3/db" + "github.com/cybertec-postgresql/pgwatch3/log" + "github.com/cybertec-postgresql/pgwatch3/metrics" + "github.com/cybertec-postgresql/pgwatch3/sinks" + "github.com/cybertec-postgresql/pgwatch3/sources" + "github.com/cybertec-postgresql/pgwatch3/webserver" "github.com/jackc/pgx/v5" flags "github.com/jessevdk/go-flags" ) +const ( + ExitCodeOK int32 = iota + ExitCodeConfigError + ExitCodeCmdError + ExitCodeWebUIError + ExitCodeUpgradeError + ExitCodeUserCancel + ExitCodeShutdownCommand + ExitCodeFatalError +) + type Kind int const ( @@ -19,76 +39,53 @@ const ( ConfigError ) -// SourceOpts specifies the sources retrieval options -type SourceOpts struct { - Config string `short:"c" long:"config" mapstructure:"config" description:"Postgres URI, file or folder of YAML files containing info on which DBs to monitor and where to store metrics" env:"PW3_CONFIG"` - Refresh int `long:"refresh" mapstructure:"refresh" description:"How frequently to resync sources and metrics" env:"PW3_REFRESH" default:"120"` - Groups []string `short:"g" long:"group" mapstructure:"group" description:"Groups for filtering which databases to monitor. By default all are monitored" env:"PW3_GROUP"` - MinDbSizeMB int64 `long:"min-db-size-mb" mapstructure:"min-db-size-mb" description:"Smaller size DBs will be ignored and not monitored until they reach the threshold." env:"PW3_MIN_DB_SIZE_MB" default:"0"` - MaxParallelConnectionsPerDb int `long:"max-parallel-connections-per-db" mapstructure:"max-parallel-connections-per-db" description:"Max parallel metric fetches per DB. Note the multiplication effect on multi-DB instances" env:"PW3_MAX_PARALLEL_CONNECTIONS_PER_DB" default:"2"` - TryCreateListedExtsIfMissing string `long:"try-create-listed-exts-if-missing" mapstructure:"try-create-listed-exts-if-missing" description:"Try creating the listed extensions (comma sep.) on first connect for all monitored DBs when missing. Main usage - pg_stat_statements" env:"PW3_TRY_CREATE_LISTED_EXTS_IF_MISSING" default:""` -} - -// MetricOpts specifies metric definitions -type MetricOpts struct { - Metrics string `short:"m" long:"metrics" mapstructure:"metrics" description:"File or folder of YAML files with metrics definitions" env:"PW3_METRICS"` - NoHelperFunctions bool `long:"no-helper-functions" mapstructure:"no-helper-functions" description:"Ignore metric definitions using helper functions (in form get_smth()) and don't also roll out any helpers automatically" env:"PW3_NO_HELPER_FUNCTIONS"` - DirectOSStats bool `long:"direct-os-stats" mapstructure:"direct-os-stats" description:"Extract OS related psutil statistics not via PL/Python wrappers but directly on host" env:"PW3_DIRECT_OS_STATS"` - InstanceLevelCacheMaxSeconds int64 `long:"instance-level-cache-max-seconds" mapstructure:"instance-level-cache-max-seconds" description:"Max allowed staleness for instance level metric data shared between DBs of an instance. Affects 'continuous' host types only. Set to 0 to disable" env:"PW3_INSTANCE_LEVEL_CACHE_MAX_SECONDS" default:"30"` - EmergencyPauseTriggerfile string `long:"emergency-pause-triggerfile" mapstructure:"emergency-pause-triggerfile" description:"When the file exists no metrics will be temporarily fetched / scraped" env:"PW3_EMERGENCY_PAUSE_TRIGGERFILE" default:"/tmp/pgwatch3-emergency-pause"` -} - -// MeasurementOpts specifies the storage configuration to store metrics measurements -type MeasurementOpts struct { - Sinks []string `long:"sink" mapstructure:"sink" description:"URI where metrics will be stored" env:"PW3_SINK"` - BatchingDelay time.Duration `long:"batching-delay" mapstructure:"batching-delay" description:"Max milliseconds to wait for a batched metrics flush. [Default: 250ms]" default:"250ms" env:"PW3_BATCHING_MAX_DELAY"` - Retention int `long:"retention" mapstructure:"retention" description:"If set, metrics older than that will be deleted" default:"14" env:"PW3_RETENTION"` - RealDbnameField string `long:"real-dbname-field" mapstructure:"real-dbname-field" description:"Tag key for real DB name if --add-real-dbname enabled" env:"PW3_REAL_DBNAME_FIELD" default:"real_dbname"` - SystemIdentifierField string `long:"system-identifier-field" mapstructure:"system-identifier-field" description:"Tag key for system identifier value if --add-system-identifier" env:"PW3_SYSTEM_IDENTIFIER_FIELD" default:"sys_id"` -} - -// LoggingOpts specifies the logging configuration -type LoggingOpts struct { - LogLevel string `short:"v" long:"log-level" mapstructure:"log-level" description:"Verbosity level for stdout and log file" choice:"debug" choice:"info" choice:"error" default:"info"` - LogFile string `long:"log-file" mapstructure:"log-file" description:"File name to store logs"` - LogFileFormat string `long:"log-file-format" mapstructure:"log-file-format" description:"Format of file logs" choice:"json" choice:"text" default:"json"` - LogFileRotate bool `long:"log-file-rotate" mapstructure:"log-file-rotate" description:"Rotate log files"` - LogFileSize int `long:"log-file-size" mapstructure:"log-file-size" description:"Maximum size in MB of the log file before it gets rotated" default:"100"` - LogFileAge int `long:"log-file-age" mapstructure:"log-file-age" description:"Number of days to retain old log files, 0 means forever" default:"0"` - LogFileNumber int `long:"log-file-number" mapstructure:"log-file-number" description:"Maximum number of old log files to retain, 0 to retain all" default:"0"` +type Options struct { + Sources sources.SourceCmdOpts `group:"Sources"` + Metrics metrics.MetricCmdOpts `group:"Metrics"` + Sinks sinks.SinkCmdOpts `group:"Sinks"` + Logging log.LoggingCmdOpts `group:"Logging"` + WebUI webserver.WebUICmdOpts `group:"WebUI"` + Init bool `long:"init" description:"Initialize configurations schemas to the latest version and exit. Can be used with --upgrade"` + Upgrade bool `long:"upgrade" description:"Upgrade configurations to the latest version"` + Help bool + + // sourcesReaderWriter reads/writes the monitored sources (databases, patroni clusters, pgpools, etc.) information + SourcesReaderWriter sources.ReaderWriter + // metricsReaderWriter reads/writes the metric and preset definitions + MetricsReaderWriter metrics.ReaderWriter + ExitCode int32 + CommandCompleted bool } -// WebUIOpts specifies the internal web UI server options -type WebUIOpts struct { - WebAddr string `long:"web-addr" mapstructure:"web-addr" description:"TCP address in the form 'host:port' to listen on" default:":8080" env:"PW3_WEBADDR"` - WebUser string `long:"web-user" mapstructure:"web-user" description:"Admin login" env:"PW3_WEBUSER"` - WebPassword string `long:"web-password" mapstructure:"web-password" description:"Admin password" env:"PW3_WEBPASSWORD"` -} - -type Options struct { - Sources SourceOpts `group:"Sources"` - Metrics MetricOpts `group:"Metrics"` - Measurements MeasurementOpts `group:"Measurements"` - Logging LoggingOpts `group:"Logging"` - WebUI WebUIOpts `group:"WebUI"` - Init bool `long:"init" description:"Initialize configurations schemas to the latest version and exit. Can be used with --upgrade"` - Upgrade bool `long:"upgrade" description:"Upgrade configurations to the latest version"` - Ping bool `long:"ping" mapstructure:"ping" description:"Try to connect to all configured DB-s, report errors and then exit" env:"PW3_PING"` - Help bool +func addCommands(parser *flags.Parser, opts *Options) { + _, _ = parser.AddCommand("metric", + "Manage metrics", + "Commands to manage metrics", + NewMetricCommand(opts)) + _, _ = parser.AddCommand("source", + "Manage sources", + "Commands to manage sources", + NewSourceCommand(opts)) } // New returns a new instance of CmdOptions func New(writer io.Writer) (*Options, error) { cmdOpts := new(Options) parser := flags.NewParser(cmdOpts, flags.HelpFlag) - var err error - if _, err = parser.Parse(); err != nil { + parser.SubcommandsOptional = true // if not command specified, start monitoring + addCommands(parser, cmdOpts) + nonParsedArgs, err := parser.Parse() + if err != nil { //subcommands executed as part of parsing if flagsErr, ok := err.(*flags.Error); ok && flagsErr.Type == flags.ErrHelp { cmdOpts.Help = true } if !flags.WroteHelp(err) { parser.WriteHelp(writer) } + } else { + if !cmdOpts.CommandCompleted && len(nonParsedArgs) > 0 { + err = fmt.Errorf("unknown argument(s): %v", nonParsedArgs) + } } if err == nil { err = validateConfig(cmdOpts) @@ -96,17 +93,25 @@ func New(writer io.Writer) (*Options, error) { return cmdOpts, err } +func (c *Options) CompleteCommand(code int32) { + c.CommandCompleted = true + c.ExitCode = code +} + // Verbose returns true if the debug log is enabled -func (c Options) Verbose() bool { +func (c *Options) Verbose() bool { return c.Logging.LogLevel == "debug" } -func (c Options) GetConfigKind() (_ Kind, err error) { - if _, err := pgx.ParseConfig(c.Sources.Config); err == nil { +func (c *Options) GetConfigKind(arg string) (_ Kind, err error) { + if arg == "" { + return Kind(ConfigError), errors.New("no configuration provided") + } + if _, err := pgx.ParseConfig(arg); err == nil { return Kind(ConfigPgURL), nil } var fi os.FileInfo - if fi, err = os.Stat(c.Sources.Config); err == nil { + if fi, err = os.Stat(arg); err == nil { if fi.IsDir() { return Kind(ConfigFolder), nil } @@ -115,7 +120,69 @@ func (c Options) GetConfigKind() (_ Kind, err error) { return Kind(ConfigError), err } +// InitMetricReader creates a new source reader based on the configuration kind from the options. +func (c *Options) InitMetricReader(ctx context.Context) (err error) { + var configKind Kind + if c.Metrics.Metrics == "" { //if config database is configured, use it for metrics as well + if k, err := c.GetConfigKind(c.Sources.Sources); err == nil && k == ConfigPgURL { + c.Metrics.Metrics = c.Sources.Sources + } else { // otherwise use built-in metrics + c.MetricsReaderWriter, err = metrics.NewYAMLMetricReaderWriter(ctx, "") + return err + } + } + if configKind, err = c.GetConfigKind(c.Metrics.Metrics); err != nil { + return + } + switch configKind { + case ConfigPgURL: + var configDb db.PgxPoolIface + if configDb, err = db.New(ctx, c.Sources.Sources); err != nil { + return err + } + c.MetricsReaderWriter, err = metrics.NewPostgresMetricReaderWriter(ctx, configDb) + default: + c.MetricsReaderWriter, err = metrics.NewYAMLMetricReaderWriter(ctx, c.Metrics.Metrics) + } + return err +} + +// InitSourceReader creates a new source reader based on the configuration kind from the options. +func (c *Options) InitSourceReader(ctx context.Context) (err error) { + var configKind Kind + if configKind, err = c.GetConfigKind(c.Sources.Sources); err != nil { + return + } + switch configKind { + case ConfigPgURL: + var configDb db.PgxPoolIface + if configDb, err = db.New(ctx, c.Sources.Sources); err != nil { + return err + } + c.SourcesReaderWriter, err = sources.NewPostgresSourcesReaderWriter(ctx, configDb) + default: + c.SourcesReaderWriter, err = sources.NewYAMLSourcesReaderWriter(ctx, c.Sources.Sources) + } + return err +} + +// InitConfigReaders creates the configuration readers based on the configuration kind from the options. +func (c *Options) InitConfigReaders(ctx context.Context) error { + return errors.Join(c.InitMetricReader(ctx), c.InitSourceReader(ctx)) +} + +// InitWebUI initializes the web UI server +func (c *Options) InitWebUI(fs fs.FS, logger log.LoggerIface) error { + if webserver.Init(c.WebUI, fs, c.MetricsReaderWriter, c.SourcesReaderWriter, logger) == nil { + return errors.New("failed to initialize web UI") + } + return nil +} + func validateConfig(c *Options) error { + if len(c.Sources.Sources)+len(c.Metrics.Metrics) == 0 { + return errors.New("both --sources and --metrics are empty") + } if c.Sources.Refresh <= 1 { return errors.New("--servers-refresh-loop-seconds must be greater than 1") } @@ -124,7 +191,7 @@ func validateConfig(c *Options) error { } // validate that input is boolean is set - if c.Measurements.BatchingDelay <= 0 || c.Measurements.BatchingDelay > time.Hour { + if c.Sinks.BatchingDelay <= 0 || c.Sinks.BatchingDelay > time.Hour { return errors.New("--batching-delay-ms must be between 0 and 3600000") } diff --git a/src/config/cmdoptions_test.go b/src/config/cmdoptions_test.go index c158f72992..e7c4816240 100644 --- a/src/config/cmdoptions_test.go +++ b/src/config/cmdoptions_test.go @@ -4,6 +4,7 @@ import ( "os" "testing" + "github.com/cybertec-postgresql/pgwatch3/log" flags "github.com/jessevdk/go-flags" "github.com/stretchr/testify/assert" ) @@ -40,9 +41,9 @@ func TestParseSuccess(t *testing.T) { } func TestLogLevel(t *testing.T) { - c := &Options{Logging: LoggingOpts{LogLevel: "debug"}} + c := &Options{Logging: log.LoggingCmdOpts{LogLevel: "debug"}} assert.True(t, c.Verbose()) - c = &Options{Logging: LoggingOpts{LogLevel: "info"}} + c = &Options{Logging: log.LoggingCmdOpts{LogLevel: "info"}} assert.False(t, c.Verbose()) } @@ -52,7 +53,7 @@ func TestNewCmdOptions(t *testing.T) { } func TestConfig(t *testing.T) { - os.Args = []string{0: "config_test", "--config=sample.config.yaml"} + os.Args = []string{0: "config_test", "--sources=sample.config.yaml"} _, err := New(nil) assert.NoError(t, err) @@ -61,7 +62,7 @@ func TestConfig(t *testing.T) { assert.Error(t, err) os.Args = []string{0: "config_test"} // clientname arg is missing, but set PW3_CONFIG - assert.NoError(t, os.Setenv("PW3_CONFIG", "postgresql://foo:baz@bar/test")) + t.Setenv("PW3_SOURCES", "postgresql://foo:baz@bar/test") _, err = New(nil) assert.NoError(t, err) } diff --git a/src/config/cmdsource.go b/src/config/cmdsource.go new file mode 100644 index 0000000000..fa355d653a --- /dev/null +++ b/src/config/cmdsource.go @@ -0,0 +1,72 @@ +package config + +import ( + "context" + "errors" + "fmt" + + "github.com/cybertec-postgresql/pgwatch3/sources" +) + +type SourceCommand struct { + owner *Options + Ping SourcePingCommand `command:"ping" description:"Try to connect to configured sources, report errors if any and then exit"` + // PrintSQL SourcePrintCommand `command:"print" description:"Get and print SQL for a given Source"` +} + +func NewSourceCommand(owner *Options) *SourceCommand { + return &SourceCommand{ + owner: owner, + Ping: SourcePingCommand{owner: owner}, + } +} + +type SourcePingCommand struct { + owner *Options +} + +func (cmd *SourcePingCommand) Execute(args []string) error { + err := cmd.owner.InitSourceReader(context.Background()) + if err != nil { + return err + } + srcs, err := cmd.owner.SourcesReaderWriter.GetSources() + if err != nil { + return err + } + var foundSources sources.Sources + if len(args) == 0 { + foundSources = srcs + } else { + for _, name := range args { + for _, s := range srcs { + if s.Name == name { + foundSources = append(foundSources, s) + } + } + } + } + var e error + for _, s := range foundSources { + switch s.Kind { + case sources.SourcePatroni, sources.SourcePatroniContinuous, sources.SourcePatroniNamespace: + _, e = sources.ResolveDatabasesFromPatroni(s) + case sources.SourcePostgresContinuous: + _, e = sources.ResolveDatabasesFromPostgres(s) + default: + mdb := &sources.MonitoredDatabase{Source: s} + e = mdb.Ping(context.Background()) + } + if e != nil { + fmt.Printf("FAIL:\t%s (%s)\n", s.Name, e) + } else { + fmt.Printf("OK:\t%s\n", s.Name) + } + err = errors.Join(err, e) + } + // err here specifies execution error, not configuration error + // so we indicate it with a special exit code + // but we still return nil to indicate that the command was executed + cmd.owner.CompleteCommand(map[bool]int32{true: ExitCodeCmdError, false: ExitCodeOK}[err != nil]) + return nil +} diff --git a/src/config/cmdsource_test.go b/src/config/cmdsource_test.go new file mode 100644 index 0000000000..3835aef77b --- /dev/null +++ b/src/config/cmdsource_test.go @@ -0,0 +1,31 @@ +package config + +import ( + "os" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestSourcePingCommand_Execute(t *testing.T) { + + f, err := os.CreateTemp(t.TempDir(), "sample.config.yaml") + require.NoError(t, err) + defer f.Close() + + _, err = f.WriteString(` +- name: test1 + kind: postgres + conn_str: postgresql://foo:bar/baz`) + + require.NoError(t, err) + + os.Args = []string{0: "config_test", "--sources=" + f.Name(), "source", "ping"} + _, err = New(nil) + assert.NoError(t, err) + + os.Args = []string{0: "config_test", "--sources=" + f.Name(), "source", "ping", "test1"} + _, err = New(nil) + assert.NoError(t, err) +} diff --git a/src/embed.go b/src/embed.go index 4d43705d96..535d40ae5a 100644 --- a/src/embed.go +++ b/src/embed.go @@ -1,6 +1,15 @@ package main -import "embed" +import ( + "embed" + "io/fs" +) //go:embed webui/build -var webuifs embed.FS +var efs embed.FS + +var webuifs fs.FS + +func init() { + webuifs, _ = fs.Sub(efs, "webui/build") +} diff --git a/src/log/cmdopts.go b/src/log/cmdopts.go new file mode 100644 index 0000000000..12510f4ae9 --- /dev/null +++ b/src/log/cmdopts.go @@ -0,0 +1,12 @@ +package log + +// LoggingCmdOpts specifies the logging command-line options +type LoggingCmdOpts struct { + LogLevel string `short:"v" long:"log-level" mapstructure:"log-level" description:"Verbosity level for stdout and log file" choice:"debug" choice:"info" choice:"error" default:"info"` + LogFile string `long:"log-file" mapstructure:"log-file" description:"File name to store logs"` + LogFileFormat string `long:"log-file-format" mapstructure:"log-file-format" description:"Format of file logs" choice:"json" choice:"text" default:"json"` + LogFileRotate bool `long:"log-file-rotate" mapstructure:"log-file-rotate" description:"Rotate log files"` + LogFileSize int `long:"log-file-size" mapstructure:"log-file-size" description:"Maximum size in MB of the log file before it gets rotated" default:"100"` + LogFileAge int `long:"log-file-age" mapstructure:"log-file-age" description:"Number of days to retain old log files, 0 means forever" default:"0"` + LogFileNumber int `long:"log-file-number" mapstructure:"log-file-number" description:"Maximum number of old log files to retain, 0 to retain all" default:"0"` +} diff --git a/src/log/log.go b/src/log/log.go index 161d4a6478..5537541e9e 100644 --- a/src/log/log.go +++ b/src/log/log.go @@ -4,7 +4,6 @@ import ( "context" "os" - "github.com/cybertec-postgresql/pgwatch3/config" "github.com/jackc/pgx/v5/tracelog" "github.com/rifflock/lfshook" "github.com/sirupsen/logrus" @@ -30,7 +29,7 @@ type logger struct { *BrokerHook } -func getLogFileWriter(opts config.LoggingOpts) any { +func getLogFileWriter(opts LoggingCmdOpts) any { if opts.LogFileRotate { return &lumberjack.Logger{ Filename: opts.LogFile, @@ -47,7 +46,7 @@ const ( enableColors = false ) -func getLogFileFormatter(opts config.LoggingOpts) logrus.Formatter { +func getLogFileFormatter(opts LoggingCmdOpts) logrus.Formatter { if opts.LogFileFormat == "text" { return newFormatter(disableColors) } @@ -55,7 +54,7 @@ func getLogFileFormatter(opts config.LoggingOpts) logrus.Formatter { } // Init creates logging facilities for the application -func Init(opts config.LoggingOpts) LoggerHookerIface { +func Init(opts LoggingCmdOpts) LoggerHookerIface { var err error l := logger{logrus.New(), NewBrokerHook(context.Background(), opts.LogLevel)} l.AddHook(l.BrokerHook) diff --git a/src/log/log_file_test.go b/src/log/log_file_test.go index 4e2be697af..60fb403eb1 100644 --- a/src/log/log_file_test.go +++ b/src/log/log_file_test.go @@ -3,19 +3,18 @@ package log import ( "testing" - "github.com/cybertec-postgresql/pgwatch3/config" "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" "gopkg.in/natefinch/lumberjack.v2" ) func TestGetLogFileWriter(t *testing.T) { - assert.IsType(t, getLogFileWriter(config.LoggingOpts{LogFileRotate: true}), &lumberjack.Logger{}) - assert.IsType(t, getLogFileWriter(config.LoggingOpts{LogFileRotate: false}), "string") + assert.IsType(t, getLogFileWriter(LoggingCmdOpts{LogFileRotate: true}), &lumberjack.Logger{}) + assert.IsType(t, getLogFileWriter(LoggingCmdOpts{LogFileRotate: false}), "string") } func TestGetLogFileFormatter(t *testing.T) { - assert.IsType(t, getLogFileFormatter(config.LoggingOpts{LogFileFormat: "json"}), &logrus.JSONFormatter{}) - assert.IsType(t, getLogFileFormatter(config.LoggingOpts{LogFileFormat: "blah"}), &logrus.JSONFormatter{}) - assert.IsType(t, getLogFileFormatter(config.LoggingOpts{LogFileFormat: "text"}), &Formatter{}) + assert.IsType(t, getLogFileFormatter(LoggingCmdOpts{LogFileFormat: "json"}), &logrus.JSONFormatter{}) + assert.IsType(t, getLogFileFormatter(LoggingCmdOpts{LogFileFormat: "blah"}), &logrus.JSONFormatter{}) + assert.IsType(t, getLogFileFormatter(LoggingCmdOpts{LogFileFormat: "text"}), &Formatter{}) } diff --git a/src/log/log_test.go b/src/log/log_test.go index aaa6308c4b..3a291bcc64 100644 --- a/src/log/log_test.go +++ b/src/log/log_test.go @@ -5,15 +5,14 @@ import ( "os" "testing" - "github.com/cybertec-postgresql/pgwatch3/config" "github.com/cybertec-postgresql/pgwatch3/log" "github.com/jackc/pgx/v5/tracelog" "github.com/stretchr/testify/assert" ) func TestInit(t *testing.T) { - assert.NotNil(t, log.Init(config.LoggingOpts{LogLevel: "debug"})) - l := log.Init(config.LoggingOpts{LogLevel: "foobar"}) + assert.NotNil(t, log.Init(log.LoggingCmdOpts{LogLevel: "debug"})) + l := log.Init(log.LoggingCmdOpts{LogLevel: "foobar"}) pgxl := log.NewPgxLogger(l) assert.NotNil(t, pgxl) ctx := log.WithLogger(context.Background(), l) @@ -22,14 +21,14 @@ func TestInit(t *testing.T) { } func TestFileLogger(t *testing.T) { - l := log.Init(config.LoggingOpts{LogLevel: "debug", LogFile: "test.log", LogFileFormat: "text"}) + l := log.Init(log.LoggingCmdOpts{LogLevel: "debug", LogFile: "test.log", LogFileFormat: "text"}) l.Info("test") assert.FileExists(t, "test.log", "Log file should be created") _ = os.Remove("test.log") } func TestPgxLog(_ *testing.T) { - pgxl := log.NewPgxLogger(log.Init(config.LoggingOpts{LogLevel: "trace"})) + pgxl := log.NewPgxLogger(log.Init(log.LoggingCmdOpts{LogLevel: "trace"})) var level tracelog.LogLevel for level = tracelog.LogLevelNone; level <= tracelog.LogLevelTrace; level++ { pgxl.Log(context.Background(), level, "foo", map[string]interface{}{"func": "TestPgxLog"}) diff --git a/src/main.go b/src/main.go index 142ab488df..cb6ed044cf 100644 --- a/src/main.go +++ b/src/main.go @@ -4,7 +4,6 @@ import ( "context" "errors" "fmt" - "io/fs" "os" "os/signal" "runtime/debug" @@ -12,12 +11,9 @@ import ( "syscall" "github.com/cybertec-postgresql/pgwatch3/config" - "github.com/cybertec-postgresql/pgwatch3/db" "github.com/cybertec-postgresql/pgwatch3/log" "github.com/cybertec-postgresql/pgwatch3/metrics" "github.com/cybertec-postgresql/pgwatch3/reaper" - "github.com/cybertec-postgresql/pgwatch3/sources" - "github.com/cybertec-postgresql/pgwatch3/webserver" ) // version output variables @@ -29,11 +25,13 @@ var ( ) func printVersion() { - fmt.Printf(`pgwatch3: + fmt.Printf(` +Version info: Version: %s DB Schema: %s Git Commit: %s Built: %s + `, version, dbapi, commit, date) } @@ -45,57 +43,19 @@ func SetupCloseHandler(cancel context.CancelFunc) { signal.Notify(c, os.Interrupt, syscall.SIGTERM) go func() { <-c - logger.Debug("SetupCloseHandler received an interrupt from OS. Closing session...") + log.GetLogger(mainCtx).Debug("SetupCloseHandler received an interrupt from OS. Closing session...") cancel() - exitCode.Store(ExitCodeUserCancel) + exitCode.Store(config.ExitCodeUserCancel) }() } -const ( - ExitCodeOK int32 = iota - ExitCodeConfigError - ExitCodeInitError - ExitCodeWebUIError - ExitCodeUpgradeError - ExitCodeUserCancel - ExitCodeShutdownCommand - ExitCodeFatalError -) - var ( - exitCode atomic.Int32 // Exit code to be returned to the OS - mainContext context.Context // Main context for the application - logger log.LoggerHookerIface // Logger for the application - opts *config.Options // Command line options for the application + exitCode atomic.Int32 // Exit code to be returned to the OS + mainCtx context.Context // Main context for the application + logger log.LoggerHookerIface // Logger for the application + opts *config.Options // Command line options for the application ) -// NewConfigurationReaders creates the configuration readers based on the configuration kind from the options. -func NewConfigurationReaders(opts *config.Options) (sourcesReader sources.ReaderWriter, metricsReader metrics.ReaderWriter, err error) { - var configKind config.Kind - configKind, err = opts.GetConfigKind() - switch { - case err != nil: - return - case configKind != config.ConfigPgURL: - ctx := log.WithLogger(mainContext, logger.WithField("config", "files")) - if sourcesReader, err = sources.NewYAMLSourcesReaderWriter(ctx, opts.Sources.Config); err != nil { - return - } - metricsReader, err = metrics.NewYAMLMetricReaderWriter(ctx, opts.Metrics.Metrics) - default: - var configDb db.PgxPoolIface - ctx := log.WithLogger(mainContext, logger.WithField("config", "postgres")) - if configDb, err = db.New(ctx, opts.Sources.Config); err != nil { - return - } - if metricsReader, err = metrics.NewPostgresMetricReaderWriter(ctx, configDb); err != nil { - return - } - sourcesReader, err = sources.NewPostgresSourcesReaderWriter(ctx, configDb) - } - return -} - // UpgradeConfiguration upgrades the configuration if needed. func UpgradeConfiguration(m metrics.Migrator) (err error) { if opts.Upgrade { @@ -115,16 +75,16 @@ func main() { err error cancel context.CancelFunc ) - exitCode.Store(ExitCodeOK) + exitCode.Store(config.ExitCodeOK) defer func() { if err := recover(); err != nil { - exitCode.Store(ExitCodeFatalError) - log.GetLogger(mainContext).WithField("callstack", string(debug.Stack())).Error(err) + exitCode.Store(config.ExitCodeFatalError) + log.GetLogger(mainCtx).WithField("callstack", string(debug.Stack())).Error(err) } os.Exit(int(exitCode.Load())) }() - mainContext, cancel = context.WithCancel(context.Background()) + mainCtx, cancel = context.WithCancel(context.Background()) SetupCloseHandler(cancel) defer cancel() @@ -132,20 +92,24 @@ func main() { printVersion() fmt.Println(err) if !opts.Help { - exitCode.Store(ExitCodeConfigError) + exitCode.Store(config.ExitCodeConfigError) } return } + + // check if some sub-command was executed and exit + if opts.CommandCompleted { + exitCode.Store(opts.ExitCode) + return + } + logger = log.Init(opts.Logging) - mainContext = log.WithLogger(mainContext, logger) + mainCtx = log.WithLogger(mainCtx, logger) logger.Debugf("opts: %+v", opts) - // sourcesReaderWriter reads/writes the monitored sources (databases, patroni clusters, pgpools, etc.) information - // metricsReaderWriter reads/writes the metric and preset definitions - sourcesReaderWriter, metricsReaderWriter, err := NewConfigurationReaders(opts) - if err != nil { - exitCode.Store(ExitCodeInitError) + if err := opts.InitConfigReaders(mainCtx); err != nil { + exitCode.Store(config.ExitCodeCmdError) logger.Error(err) return } @@ -153,9 +117,9 @@ func main() { //check if we want to upgrade the configuration, which can be one of the following: // - PostgreSQL database schema // - YAML file schema - if m, ok := metricsReaderWriter.(metrics.Migrator); ok { + if m, ok := opts.MetricsReaderWriter.(metrics.Migrator); ok { if err = UpgradeConfiguration(m); err != nil { - exitCode.Store(ExitCodeUpgradeError) + exitCode.Store(config.ExitCodeUpgradeError) logger.Error(err) return } @@ -167,17 +131,15 @@ func main() { return } - if !opts.Ping { - uifs, _ := fs.Sub(webuifs, "webui/build") - ui := webserver.Init(opts.WebUI, uifs, metricsReaderWriter, sourcesReaderWriter, logger) - if ui == nil { - os.Exit(int(ExitCodeWebUIError)) - } + if err = opts.InitWebUI(webuifs, logger); err != nil { + exitCode.Store(config.ExitCodeWebUIError) + logger.Error(err) + return } - reaper := reaper.NewReaper(opts, sourcesReaderWriter, metricsReaderWriter) - if err = reaper.Reap(mainContext); err != nil { + reaper := reaper.NewReaper(opts, opts.SourcesReaderWriter, opts.MetricsReaderWriter) + if err = reaper.Reap(mainCtx); err != nil { logger.Error(err) - exitCode.Store(ExitCodeFatalError) + exitCode.Store(config.ExitCodeFatalError) } } diff --git a/src/metrics/cmdopts.go b/src/metrics/cmdopts.go new file mode 100644 index 0000000000..51da2ee3e3 --- /dev/null +++ b/src/metrics/cmdopts.go @@ -0,0 +1,10 @@ +package metrics + +// MetricCmdOpts specifies metric command-line options +type MetricCmdOpts struct { + Metrics string `short:"m" long:"metrics" mapstructure:"metrics" description:"File or folder of YAML files with metrics definitions" env:"PW3_METRICS"` + NoHelperFunctions bool `long:"no-helper-functions" mapstructure:"no-helper-functions" description:"Ignore metric definitions using helper functions (in form get_smth()) and don't also roll out any helpers automatically" env:"PW3_NO_HELPER_FUNCTIONS"` + DirectOSStats bool `long:"direct-os-stats" mapstructure:"direct-os-stats" description:"Extract OS related psutil statistics not via PL/Python wrappers but directly on host" env:"PW3_DIRECT_OS_STATS"` + InstanceLevelCacheMaxSeconds int64 `long:"instance-level-cache-max-seconds" mapstructure:"instance-level-cache-max-seconds" description:"Max allowed staleness for instance level metric data shared between DBs of an instance. Affects 'continuous' host types only. Set to 0 to disable" env:"PW3_INSTANCE_LEVEL_CACHE_MAX_SECONDS" default:"30"` + EmergencyPauseTriggerfile string `long:"emergency-pause-triggerfile" mapstructure:"emergency-pause-triggerfile" description:"When the file exists no metrics will be temporarily fetched / scraped" env:"PW3_EMERGENCY_PAUSE_TRIGGERFILE" default:"/tmp/pgwatch3-emergency-pause"` +} diff --git a/src/metrics/logparse.go b/src/metrics/logparse.go index 16595d8fa5..f8f491c37e 100644 --- a/src/metrics/logparse.go +++ b/src/metrics/logparse.go @@ -103,7 +103,7 @@ func eventCountsToMetricStoreMessages(eventCounts, eventCountsTotal map[string]i } allSeverityCounts["epoch_ns"] = time.Now().UnixNano() return []MeasurementMessage{{ - DBName: mdb.DBUniqueName, + DBName: mdb.Name, SourceType: string(mdb.Kind), MetricName: specialMetricServerLogEventCounts, Data: Measurements{allSeverityCounts}, diff --git a/src/metrics/metrics.yaml b/src/metrics/metrics.yaml index a54d6f561f..10d2044232 100644 --- a/src/metrics/metrics.yaml +++ b/src/metrics/metrics.yaml @@ -323,21 +323,16 @@ metrics: get_load_average(); -- needs the plpythonu proc from "metric_fetching_helpers" folder init_sql: |- BEGIN; - CREATE EXTENSION IF NOT EXISTS plpython3u; - CREATE OR REPLACE FUNCTION get_load_average(OUT load_1min float, OUT load_5min float, OUT load_15min float) AS $$ - from os import getloadavg - la = getloadavg() - return [la[0], la[1], la[2]] + from os import getloadavg + la = getloadavg() + return [la[0], la[1], la[2]] $$ LANGUAGE plpython3u VOLATILE; - GRANT EXECUTE ON FUNCTION get_load_average() TO pgwatch3; - COMMENT ON FUNCTION get_load_average() is 'created for pgwatch3'; - - COMMIT + COMMIT; gauges: - '*' is_instance_level: true diff --git a/src/metrics/postgres.go b/src/metrics/postgres.go index 8514e1bd80..2bafe8b632 100644 --- a/src/metrics/postgres.go +++ b/src/metrics/postgres.go @@ -5,7 +5,6 @@ import ( _ "embed" "github.com/cybertec-postgresql/pgwatch3/db" - "github.com/cybertec-postgresql/pgwatch3/log" ) func NewPostgresMetricReaderWriter(ctx context.Context, conn db.PgxPoolIface) (ReaderWriter, error) { @@ -50,8 +49,6 @@ var _ Migrator = (*dbMetricReaderWriter)(nil) // writeMetricsToPostgres writes the metrics and presets definitions to the // pgwatch3.metric and pgwatch3.preset tables in the ConfigDB. func writeMetricsToPostgres(ctx context.Context, conn db.PgxIface, metricDefs *Metrics) error { - logger := log.GetLogger(ctx) - logger.Info("writing metrics definitions to configuration database...") tx, err := conn.Begin(ctx) if err != nil { return err @@ -82,8 +79,6 @@ func writeMetricsToPostgres(ctx context.Context, conn db.PgxIface, metricDefs *M func (dmrw *dbMetricReaderWriter) GetMetrics() (metricDefMapNew *Metrics, err error) { ctx := dmrw.ctx conn := dmrw.configDb - logger := log.GetLogger(ctx) - logger.Info("reading metrics definitions from configuration database...") metricDefMapNew = &Metrics{MetricDefs{}, PresetDefs{}} rows, err := conn.Query(ctx, `SELECT name, sqls, init_sql, description, node_status, gauges, is_instance_level, storage_name FROM pgwatch3.metric`) if err != nil { diff --git a/src/reaper/cache.go b/src/reaper/cache.go index 13ef11eeed..242a74a86f 100644 --- a/src/reaper/cache.go +++ b/src/reaper/cache.go @@ -37,8 +37,8 @@ var lastSQLFetchError sync.Map func InitPGVersionInfoFetchingLockIfNil(md *sources.MonitoredDatabase) { MonitoredDatabasesSettingsLock.Lock() - if _, ok := MonitoredDatabasesSettingsGetLock[md.DBUniqueName]; !ok { - MonitoredDatabasesSettingsGetLock[md.DBUniqueName] = &sync.RWMutex{} + if _, ok := MonitoredDatabasesSettingsGetLock[md.Name]; !ok { + MonitoredDatabasesSettingsGetLock[md.Name] = &sync.RWMutex{} } MonitoredDatabasesSettingsLock.Unlock() } @@ -46,7 +46,7 @@ func InitPGVersionInfoFetchingLockIfNil(md *sources.MonitoredDatabase) { func UpdateMonitoredDBCache(data sources.MonitoredDatabases) { monitoredDbCacheNew := make(map[string]*sources.MonitoredDatabase) for _, row := range data { - monitoredDbCacheNew[row.DBUniqueName] = row + monitoredDbCacheNew[row.Name] = row } monitoredDbCacheLock.Lock() monitoredDbCache = monitoredDbCacheNew diff --git a/src/reaper/database.go b/src/reaper/database.go index 25954a8ec1..f25a858d77 100644 --- a/src/reaper/database.go +++ b/src/reaper/database.go @@ -912,9 +912,9 @@ func TryCreateMetricsFetchingHelpers(ctx context.Context, md *sources.MonitoredD _, err = c.Exec(ctx, Metric.InitSQL) if err != nil { - log.GetLogger(ctx).Warningf("Failed to create a metric fetching helper for %s in %s: %v", md.DBUniqueName, metricName, err) + log.GetLogger(ctx).Warningf("Failed to create a metric fetching helper for %s in %s: %v", md.Name, metricName, err) } else { - log.GetLogger(ctx).Info("Successfully created metric fetching helper for", md.DBUniqueName, metricName) + log.GetLogger(ctx).Info("Successfully created metric fetching helper for", md.Name, metricName) } } return nil @@ -940,13 +940,13 @@ func CloseResourcesForRemovedMonitoredDBs(metricsWriter *sinks.MultiWriter, curr var curDBsMap = make(map[string]bool) for _, curDB := range currentDBs { - curDBsMap[curDB.DBUniqueName] = true + curDBsMap[curDB.Name] = true } for _, prevDB := range prevLoopDBs { - if _, ok := curDBsMap[prevDB.DBUniqueName]; !ok { // removed from config + if _, ok := curDBsMap[prevDB.Name]; !ok { // removed from config prevDB.Conn.Close() - _ = metricsWriter.SyncMetrics(prevDB.DBUniqueName, "", "remove") + _ = metricsWriter.SyncMetrics(prevDB.Name, "", "remove") } } diff --git a/src/reaper/reaper.go b/src/reaper/reaper.go index a376c4dfee..58ac7fe7ed 100644 --- a/src/reaper/reaper.go +++ b/src/reaper/reaper.go @@ -3,7 +3,6 @@ package reaper import ( "context" "fmt" - "os" "slices" "strings" "sync" @@ -57,13 +56,11 @@ func (r *Reaper) Reap(mainContext context.Context) (err error) { } go SyncMetricDefs(mainContext, metricsReaderWriter) - if measurementsWriter, err = sinks.NewMultiWriter(mainContext, opts, metricDefinitionMap); err != nil { + if measurementsWriter, err = sinks.NewMultiWriter(mainContext, &opts.Sinks, metricDefinitionMap); err != nil { logger.Fatal(err) } r.measurementCh = make(chan []metrics.MeasurementMessage, 10000) - if !opts.Ping { - go measurementsWriter.WriteMeasurements(mainContext, r.measurementCh) - } + go measurementsWriter.WriteMeasurements(mainContext, r.measurementCh) for { //main loop hostsToShutDownDueToRoleChange := make(map[string]bool) // hosts went from master to standby and have "only if master" set @@ -73,7 +70,7 @@ func (r *Reaper) Reap(mainContext context.Context) (err error) { if firstLoop { logger.Fatal("could not fetch active hosts - check config!", err) } else { - logger.Error("could not fetch active hosts, using last valid config data. err:", err) + logger.Error("could not fetch active hosts, using last valid config data:", err) time.Sleep(time.Second * time.Duration(opts.Sources.Refresh)) continue } @@ -104,12 +101,12 @@ func (r *Reaper) Reap(mainContext context.Context) (err error) { firstLoop = false // only used for failing when 1st config reading fails for _, monitoredDB := range monitoredDbs { - logger.WithField("source", monitoredDB.DBUniqueName). + logger.WithField("source", monitoredDB.Name). WithField("metric", monitoredDB.Metrics). WithField("tags", monitoredDB.CustomTags). WithField("config", monitoredDB.HostConfig).Debug() - dbUnique := monitoredDB.DBUniqueName + dbUnique := monitoredDB.Name dbUniqueOrig := monitoredDB.GetDatabaseName() srcType := monitoredDB.Kind @@ -124,12 +121,12 @@ func (r *Reaper) Reap(mainContext context.Context) (err error) { ver, err = GetMonitoredDatabaseSettings(mainContext, dbUnique, srcType, true) if err != nil { - logger.Errorf("could not start metric gathering for DB \"%s\" due to connection problem: %s", dbUnique, err) + logger.Errorf("could not start metric gathering due to connection problem: %s", err) continue } - logger.Infof("Connect OK. [%s] is on version %s (in recovery: %v)", dbUnique, ver.VersionStr, ver.IsInRecovery) + logger.Infof("Connect OK. Version: %s (in recovery: %v)", ver.VersionStr, ver.IsInRecovery) if ver.IsInRecovery && monitoredDB.OnlyIfMaster { - logger.Infof("[%s] not added to monitoring due to 'master only' property", dbUnique) + logger.Infof("not added to monitoring due to 'master only' property") continue } metricConfig = func() map[string]float64 { @@ -154,7 +151,7 @@ func (r *Reaper) Reap(mainContext context.Context) (err error) { }() } - if !opts.Ping && monitoredDB.IsPostgresSource() && !ver.IsInRecovery { + if monitoredDB.IsPostgresSource() && !ver.IsInRecovery { if opts.Metrics.NoHelperFunctions { logger.Infof("[%s] skipping rollout out helper functions due to the --no-helper-functions flag ...", dbUnique) } else { @@ -209,11 +206,6 @@ func (r *Reaper) Reap(mainContext context.Context) (err error) { } } - if opts.Ping { - logger.Infof("All configured %d DB hosts were reachable", len(monitoredDbs)) - os.Exit(0) - } - for metricName, interval := range metricConfig { metric := metricName metricDefOk := false @@ -272,7 +264,7 @@ func (r *Reaper) Reap(mainContext context.Context) (err error) { if cancelFunc, isOk := cancelFuncs[dbMetric]; isOk { cancelFunc() } - logger.Warning("shutting down metric", metric, "for", monitoredDB.DBUniqueName) + logger.Warning("shutting down metric", metric, "for", monitoredDB.Name) delete(cancelFuncs, dbMetric) } else if !metricDefOk { epoch, ok := lastSQLFetchError.Load(metric) @@ -523,7 +515,7 @@ func SyncMonitoredDBsToDatastore(ctx context.Context, monitoredDbs []*sources.Mo db[tagPrefix+k] = v } msms = append(msms, metrics.MeasurementMessage{ - DBName: mdb.DBUniqueName, + DBName: mdb.Name, MetricName: monitoredDbsDatastoreSyncMetricName, Data: metrics.Measurements{db}, }) @@ -540,16 +532,16 @@ func SyncMonitoredDBsToDatastore(ctx context.Context, monitoredDbs []*sources.Mo func AddDbnameSysinfoIfNotExistsToQueryResultData(data metrics.Measurements, ver MonitoredDatabaseSettings, opts *config.Options) metrics.Measurements { enrichedData := make(metrics.Measurements, 0) for _, dr := range data { - if opts.Measurements.RealDbnameField > "" && ver.RealDbname > "" { - old, ok := dr[opts.Measurements.RealDbnameField] + if opts.Sinks.RealDbnameField > "" && ver.RealDbname > "" { + old, ok := dr[opts.Sinks.RealDbnameField] if !ok || old == "" { - dr[opts.Measurements.RealDbnameField] = ver.RealDbname + dr[opts.Sinks.RealDbnameField] = ver.RealDbname } } - if opts.Measurements.SystemIdentifierField > "" && ver.SystemIdentifier > "" { - old, ok := dr[opts.Measurements.SystemIdentifierField] + if opts.Sinks.SystemIdentifierField > "" && ver.SystemIdentifier > "" { + old, ok := dr[opts.Sinks.SystemIdentifierField] if !ok || old == "" { - dr[opts.Measurements.SystemIdentifierField] = ver.SystemIdentifier + dr[opts.Sinks.SystemIdentifierField] = ver.SystemIdentifier } } enrichedData = append(enrichedData, dr) @@ -731,7 +723,7 @@ func FetchMetrics(ctx context.Context, send_to_storageChannel: - if (opts.Measurements.RealDbnameField > "" || opts.Measurements.SystemIdentifierField > "") && msg.Source == sources.SourcePostgres { + if (opts.Sinks.RealDbnameField > "" || opts.Sinks.SystemIdentifierField > "") && msg.Source == sources.SourcePostgres { MonitoredDatabasesSettingsLock.RLock() ver := MonitoredDatabasesSettings[msg.DBUniqueName] MonitoredDatabasesSettingsLock.RUnlock() diff --git a/src/sinks/cmdopts.go b/src/sinks/cmdopts.go new file mode 100644 index 0000000000..4a88724c81 --- /dev/null +++ b/src/sinks/cmdopts.go @@ -0,0 +1,12 @@ +package sinks + +import "time" + +// SinkCmdOpts specifies the storage configuration to store metrics measurements +type SinkCmdOpts struct { + Sinks []string `long:"sink" mapstructure:"sink" description:"URI where metrics will be stored" env:"PW3_SINK"` + BatchingDelay time.Duration `long:"batching-delay" mapstructure:"batching-delay" description:"Max milliseconds to wait for a batched metrics flush. [Default: 250ms]" default:"250ms" env:"PW3_BATCHING_MAX_DELAY"` + Retention int `long:"retention" mapstructure:"retention" description:"If set, metrics older than that will be deleted" default:"14" env:"PW3_RETENTION"` + RealDbnameField string `long:"real-dbname-field" mapstructure:"real-dbname-field" description:"Tag key for real DB name if --add-real-dbname enabled" env:"PW3_REAL_DBNAME_FIELD" default:"real_dbname"` + SystemIdentifierField string `long:"system-identifier-field" mapstructure:"system-identifier-field" description:"Tag key for system identifier value if --add-system-identifier" env:"PW3_SYSTEM_IDENTIFIER_FIELD" default:"sys_id"` +} \ No newline at end of file diff --git a/src/sinks/multiwriter.go b/src/sinks/multiwriter.go index 86aad1d8bc..699a1fe265 100644 --- a/src/sinks/multiwriter.go +++ b/src/sinks/multiwriter.go @@ -7,7 +7,6 @@ import ( "strings" "sync" - "github.com/cybertec-postgresql/pgwatch3/config" "github.com/cybertec-postgresql/pgwatch3/log" "github.com/cybertec-postgresql/pgwatch3/metrics" ) @@ -25,11 +24,11 @@ type MultiWriter struct { } // NewMultiWriter creates and returns new instance of MultiWriter struct. -func NewMultiWriter(ctx context.Context, opts *config.Options, metricDefs *metrics.Metrics) (mw *MultiWriter, err error) { +func NewMultiWriter(ctx context.Context, opts *SinkCmdOpts, metricDefs *metrics.Metrics) (mw *MultiWriter, err error) { var w Writer logger := log.GetLogger(ctx) mw = &MultiWriter{} - for _, s := range opts.Measurements.Sinks { + for _, s := range opts.Sinks { l := logger.WithField("sink", s) ctx = log.WithLogger(ctx, l) scheme, path, found := strings.Cut(s, "://") @@ -40,7 +39,7 @@ func NewMultiWriter(ctx context.Context, opts *config.Options, metricDefs *metri case "jsonfile": w, err = NewJSONWriter(ctx, path) case "postgres", "postgresql": - w, err = NewPostgresWriter(ctx, s, &opts.Measurements, metricDefs) + w, err = NewPostgresWriter(ctx, s, opts, metricDefs) case "prometheus": w, err = NewPrometheusWriter(ctx, path) case "rpc": diff --git a/src/sinks/multiwriter_test.go b/src/sinks/multiwriter_test.go index 47e3488bc7..5a665d5e00 100644 --- a/src/sinks/multiwriter_test.go +++ b/src/sinks/multiwriter_test.go @@ -4,7 +4,6 @@ import ( "context" "testing" - "github.com/cybertec-postgresql/pgwatch3/config" "github.com/cybertec-postgresql/pgwatch3/metrics" "github.com/stretchr/testify/assert" ) @@ -21,20 +20,16 @@ func (mw *MockWriter) Write(_ []metrics.MeasurementMessage) error { func TestNewMultiWriter(t *testing.T) { input := []struct { - opts *config.Options + opts *SinkCmdOpts mw bool // MultiWriter returned err bool // error returned }{ - {&config.Options{}, false, true}, - {&config.Options{ - Measurements: config.MeasurementOpts{ - Sinks: []string{"foo"}, - }, + {&SinkCmdOpts{}, false, true}, + {&SinkCmdOpts{ + Sinks: []string{"foo"}, }, false, true}, - {&config.Options{ - Measurements: config.MeasurementOpts{ - Sinks: []string{"jsonfile://test.json"}, - }, + {&SinkCmdOpts{ + Sinks: []string{"jsonfile://test.json"}, }, true, false}, } diff --git a/src/sinks/postgres.go b/src/sinks/postgres.go index 5106b52948..65c9c2721c 100644 --- a/src/sinks/postgres.go +++ b/src/sinks/postgres.go @@ -10,7 +10,6 @@ import ( "strings" "time" - "github.com/cybertec-postgresql/pgwatch3/config" "github.com/cybertec-postgresql/pgwatch3/db" "github.com/cybertec-postgresql/pgwatch3/log" "github.com/cybertec-postgresql/pgwatch3/metrics" @@ -23,7 +22,7 @@ var ( deleterDelay = time.Hour ) -func NewPostgresWriter(ctx context.Context, connstr string, opts *config.MeasurementOpts, metricDefs *metrics.Metrics) (pgw *PostgresWriter, err error) { +func NewPostgresWriter(ctx context.Context, connstr string, opts *SinkCmdOpts, metricDefs *metrics.Metrics) (pgw *PostgresWriter, err error) { var conn db.PgxPoolIface if conn, err = db.New(ctx, connstr); err != nil { return @@ -31,7 +30,7 @@ func NewPostgresWriter(ctx context.Context, connstr string, opts *config.Measure return NewWriterFromPostgresConn(ctx, conn, opts, metricDefs) } -func NewWriterFromPostgresConn(ctx context.Context, conn db.PgxPoolIface, opts *config.MeasurementOpts, metricDefs *metrics.Metrics) (pgw *PostgresWriter, err error) { +func NewWriterFromPostgresConn(ctx context.Context, conn db.PgxPoolIface, opts *SinkCmdOpts, metricDefs *metrics.Metrics) (pgw *PostgresWriter, err error) { pgw = &PostgresWriter{ сtx: ctx, metricDefs: metricDefs, @@ -105,7 +104,7 @@ type PostgresWriter struct { sinkDb db.PgxPoolIface metricSchema DbStorageSchemaType metricDefs *metrics.Metrics - opts *config.MeasurementOpts + opts *SinkCmdOpts input chan []metrics.MeasurementMessage lastError chan error } diff --git a/src/sinks/postgres_test.go b/src/sinks/postgres_test.go index e0c6c92495..1e2a0a622a 100644 --- a/src/sinks/postgres_test.go +++ b/src/sinks/postgres_test.go @@ -6,7 +6,6 @@ import ( "testing" "time" - "github.com/cybertec-postgresql/pgwatch3/config" "github.com/cybertec-postgresql/pgwatch3/metrics" "github.com/pashagolub/pgxmock/v4" "github.com/stretchr/testify/assert" @@ -44,7 +43,7 @@ func TestNewWriterFromPostgresConn(t *testing.T) { conn.ExpectExec("select admin.ensure_dummy_metrics_table").WithArgs(m).WillReturnResult(pgxmock.NewResult("EXECUTE", 1)) } - opts := &config.MeasurementOpts{BatchingDelay: time.Hour, Retention: 356} + opts := &SinkCmdOpts{BatchingDelay: time.Hour, Retention: 356} pgw, err := NewWriterFromPostgresConn(ctx, conn, opts, metrics.GetDefaultMetrics()) assert.NoError(t, err) assert.NotNil(t, pgw) diff --git a/src/sources/cmdopts.go b/src/sources/cmdopts.go new file mode 100644 index 0000000000..ccc004e841 --- /dev/null +++ b/src/sources/cmdopts.go @@ -0,0 +1,11 @@ +package sources + +// SourceOpts specifies the sources related command-line options +type SourceCmdOpts struct { + Sources string `short:"s" long:"sources" mapstructure:"config" description:"Postgres URI, file or folder of YAML files containing info on which DBs to monitor" env:"PW3_SOURCES"` + Refresh int `long:"refresh" mapstructure:"refresh" description:"How frequently to resync sources and metrics" env:"PW3_REFRESH" default:"120"` + Groups []string `short:"g" long:"group" mapstructure:"group" description:"Groups for filtering which databases to monitor. By default all are monitored" env:"PW3_GROUP"` + MinDbSizeMB int64 `long:"min-db-size-mb" mapstructure:"min-db-size-mb" description:"Smaller size DBs will be ignored and not monitored until they reach the threshold." env:"PW3_MIN_DB_SIZE_MB" default:"0"` + MaxParallelConnectionsPerDb int `long:"max-parallel-connections-per-db" mapstructure:"max-parallel-connections-per-db" description:"Max parallel metric fetches per DB. Note the multiplication effect on multi-DB instances" env:"PW3_MAX_PARALLEL_CONNECTIONS_PER_DB" default:"2"` + TryCreateListedExtsIfMissing string `long:"try-create-listed-exts-if-missing" mapstructure:"try-create-listed-exts-if-missing" description:"Try creating the listed extensions (comma sep.) on first connect for all monitored DBs when missing. Main usage - pg_stat_statements" env:"PW3_TRY_CREATE_LISTED_EXTS_IF_MISSING" default:""` +} diff --git a/src/sources/postgres.go b/src/sources/postgres.go index e41b36b270..fb931da03c 100644 --- a/src/sources/postgres.go +++ b/src/sources/postgres.go @@ -74,7 +74,7 @@ on conflict (name) do update set host_config = $13, only_if_master = $14` _, err = conn.Exec(context.Background(), sql, - md.DBUniqueName, md.Group, md.Kind, + md.Name, md.Group, md.Kind, md.ConnStr, m(md.Metrics), m(md.MetricsStandby), md.PresetMetrics, md.PresetMetricsStandby, md.IsSuperuser, md.IncludePattern, md.ExcludePattern, m(md.CustomTags), m(md.HostConfig), md.OnlyIfMaster) diff --git a/src/sources/postgres_test.go b/src/sources/postgres_test.go index a00cbc7bc1..cfec6b5d01 100644 --- a/src/sources/postgres_test.go +++ b/src/sources/postgres_test.go @@ -70,7 +70,7 @@ func TestSyncFromReader(t *testing.T) { a.NoError(err) md := &sources.MonitoredDatabase{} - md.DBUniqueName = "db1" + md.Name = "db1" dbs := sources.MonitoredDatabases{md} dbs, err = dbs.SyncFromReader(pgrw) a.NoError(err) @@ -98,7 +98,7 @@ func TestUpdateDatabase(t *testing.T) { a.NoError(err) md := sources.Source{ - DBUniqueName: "db1", + Name: "db1", Group: "group1", Kind: sources.Kind("postgres"), ConnStr: "postgres://user:pass@localhost:5432/db1", @@ -111,7 +111,7 @@ func TestUpdateDatabase(t *testing.T) { } conn.ExpectPing() conn.ExpectExec(`insert into pgwatch3\.source`).WithArgs( - md.DBUniqueName, md.Group, md.Kind, + md.Name, md.Group, md.Kind, md.ConnStr, `{"metric":60}`, `{"standby_metric":60}`, md.PresetMetrics, md.PresetMetricsStandby, md.IsSuperuser, md.IncludePattern, md.ExcludePattern, `{"tag":"value"}`, @@ -134,7 +134,7 @@ func TestWriteMonitoredDatabases(t *testing.T) { conn, err := pgxmock.NewPool() a.NoError(err) md := sources.Source{ - DBUniqueName: "db1", + Name: "db1", Group: "group1", Kind: sources.Kind("postgres"), ConnStr: "postgres://user:pass@localhost:5432/db1", @@ -152,7 +152,7 @@ func TestWriteMonitoredDatabases(t *testing.T) { conn.ExpectBegin() conn.ExpectExec(`truncate pgwatch3\.source`).WillReturnResult(pgxmock.NewResult("TRUNCATE", 1)) conn.ExpectExec(`insert into pgwatch3\.source`).WithArgs( - md.DBUniqueName, md.Group, md.Kind, + md.Name, md.Group, md.Kind, md.ConnStr, `{"metric":60}`, `{"standby_metric":60}`, md.PresetMetrics, md.PresetMetricsStandby, md.IsSuperuser, md.IncludePattern, md.ExcludePattern, `{"tag":"value"}`, nil, md.OnlyIfMaster, @@ -188,7 +188,7 @@ func TestWriteMonitoredDatabases(t *testing.T) { conn.ExpectBegin() conn.ExpectExec(`truncate pgwatch3\.source`).WillReturnResult(pgxmock.NewResult("TRUNCATE", 1)) conn.ExpectExec(`insert into pgwatch3\.source`).WithArgs( - md.DBUniqueName, md.Group, md.Kind, + md.Name, md.Group, md.Kind, md.ConnStr, `{"metric":60}`, `{"standby_metric":60}`, md.PresetMetrics, md.PresetMetricsStandby, md.IsSuperuser, md.IncludePattern, md.ExcludePattern, `{"tag":"value"}`, nil, md.OnlyIfMaster, diff --git a/src/sources/resolver.go b/src/sources/resolver.go index 3906e1a7eb..c8e8b894d8 100644 --- a/src/sources/resolver.go +++ b/src/sources/resolver.go @@ -5,6 +5,7 @@ package sources // Postgres resolver will return the list of databases from the given Postgres instance. import ( + "cmp" "context" "crypto/tls" "crypto/x509" @@ -23,23 +24,18 @@ import ( pgx "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" client "go.etcd.io/etcd/client/v3" + "go.uber.org/zap" ) // ResolveDatabases() updates list of monitored objects from continuous monitoring sources, e.g. patroni -func (srcs Sources) ResolveDatabases() (MonitoredDatabases, error) { +func (srcs Sources) ResolveDatabases() (_ MonitoredDatabases, err error) { resolvedDbs := make(MonitoredDatabases, 0, len(srcs)) for _, s := range srcs { if !s.IsEnabled { continue } - dbs, err := s.ResolveDatabases() - if err != nil { - return nil, err - } - if len(dbs) == 0 { - resolvedDbs = append(resolvedDbs, &MonitoredDatabase{Source: *(&s).Clone()}) - continue - } + dbs, e := s.ResolveDatabases() + err = errors.Join(err, e) resolvedDbs = append(resolvedDbs, dbs...) } return resolvedDbs, nil @@ -53,7 +49,7 @@ func (s Source) ResolveDatabases() (MonitoredDatabases, error) { case SourcePostgresContinuous: return ResolveDatabasesFromPostgres(s) } - return nil, nil + return MonitoredDatabases{&MonitoredDatabase{Source: *(&s).Clone()}}, nil } type PatroniClusterMember struct { @@ -157,58 +153,63 @@ func getEtcdClusterMembers(s Source) ([]PatroniClusterMember, error) { DialKeepAliveTimeout: time.Second, Username: s.HostConfig.Username, Password: s.HostConfig.Password, + DialTimeout: 5 * time.Second, + Logger: zap.NewNop(), } c, err := client.New(cfg) if err != nil { - logger.Errorf("[%s ]Could not connect to ETCD: %v", s.DBUniqueName, err) return ret, err } + + ctx, cancel := context.WithTimeoutCause(context.Background(), 5*time.Second, errors.New("etcd client timeout")) + defer cancel() kapi := c.KV if s.Kind == SourcePatroniNamespace { // all scopes, all DBs (regex filtering applies if defined) if len(s.GetDatabaseName()) > 0 { - return ret, fmt.Errorf("Skipping Patroni entry %s - cannot specify a DB name when monitoring all scopes (regex patterns are supported though)", s.DBUniqueName) + return ret, fmt.Errorf("Skipping Patroni entry %s - cannot specify a DB name when monitoring all scopes (regex patterns are supported though)", s.Name) } if s.HostConfig.Namespace == "" { - return ret, fmt.Errorf("Skipping Patroni entry %s - search 'namespace' not specified", s.DBUniqueName) + return ret, fmt.Errorf("Skipping Patroni entry %s - search 'namespace' not specified", s.Name) } - resp, err := kapi.Get(context.Background(), s.HostConfig.Namespace) + resp, err := kapi.Get(ctx, s.HostConfig.Namespace) if err != nil { - return ret, err + + return ret, cmp.Or(context.Cause(ctx), err) } for _, node := range resp.Kvs { scope := path.Base(string(node.Key)) // Key="/service/batman" - scopeMembers, err := extractEtcdScopeMembers(s, scope, kapi, true) + scopeMembers, err := extractEtcdScopeMembers(ctx, s, scope, kapi, true) if err != nil { continue } ret = append(ret, scopeMembers...) } } else { - ret, err = extractEtcdScopeMembers(s, s.HostConfig.Scope, kapi, false) + ret, err = extractEtcdScopeMembers(ctx, s, s.HostConfig.Scope, kapi, false) if err != nil { - return ret, err + return ret, cmp.Or(context.Cause(ctx), err) } } - lastFoundClusterMembers[s.DBUniqueName] = ret + lastFoundClusterMembers[s.Name] = ret return ret, nil } -func extractEtcdScopeMembers(s Source, scope string, kapi client.KV, addScopeToName bool) ([]PatroniClusterMember, error) { +func extractEtcdScopeMembers(ctx context.Context, s Source, scope string, kapi client.KV, addScopeToName bool) ([]PatroniClusterMember, error) { var ret = make([]PatroniClusterMember, 0) var name string membersPath := path.Join(s.HostConfig.Namespace, scope, "members") - resp, err := kapi.Get(context.Background(), membersPath) + resp, err := kapi.Get(ctx, membersPath) if err != nil { return nil, err } - logger.Debugf("ETCD response for %s scope %s: %+v", s.DBUniqueName, scope, resp) + logger.Debugf("ETCD response for %s scope %s: %+v", s.Name, scope, resp) for _, node := range resp.Kvs { - logger.Debugf("Found a cluster member from etcd [%s:%s]: %+v", s.DBUniqueName, scope, node.Value) + logger.Debugf("Found a cluster member from etcd [%s:%s]: %+v", s.Name, scope, node.Value) nodeData, err := jsonTextToStringMap(string(node.Value)) if err != nil { logger.Errorf("Could not parse ETCD node data for node \"%s\": %s", node, err) @@ -235,58 +236,55 @@ const ( func ResolveDatabasesFromPatroni(ce Source) ([]*MonitoredDatabase, error) { var mds []*MonitoredDatabase - var cm []PatroniClusterMember + var clusterMembers []PatroniClusterMember var err error var ok bool var dbUnique string switch ce.HostConfig.DcsType { case dcsTypeEtcd: - cm, err = getEtcdClusterMembers(ce) + clusterMembers, err = getEtcdClusterMembers(ce) case dcsTypeZookeeper: - cm, err = getZookeeperClusterMembers(ce) + clusterMembers, err = getZookeeperClusterMembers(ce) case dcsTypeConsul: - cm, err = getConsulClusterMembers(ce) + clusterMembers, err = getConsulClusterMembers(ce) default: return nil, errors.New("unknown DCS") } if err != nil { - logger.Warningf("Failed to get info from DCS for %s, using previous member info if any", ce.DBUniqueName) - cm, ok = lastFoundClusterMembers[ce.DBUniqueName] - if ok { // mask error from main loop not to remove monitored DBs due to "jitter" + logger.WithField("source", ce.Name).Debug("Failed to get info from DCS, using previous member info if any") + if clusterMembers, ok = lastFoundClusterMembers[ce.Name]; ok { // mask error from main loop not to remove monitored DBs due to "jitter" err = nil } } else { - lastFoundClusterMembers[ce.DBUniqueName] = cm + lastFoundClusterMembers[ce.Name] = clusterMembers } - if len(cm) == 0 { - logger.Warningf("No Patroni cluster members found for cluster [%s:%s]", ce.DBUniqueName, ce.HostConfig.Scope) - return mds, nil + if len(clusterMembers) == 0 { + return mds, err } - logger.Infof("Found %d Patroni members for entry %s", len(cm), ce.DBUniqueName) - for _, m := range cm { - logger.Infof("Processing Patroni cluster member [%s:%s]", ce.DBUniqueName, m.Name) + for _, m := range clusterMembers { + logger.Infof("Processing Patroni cluster member [%s:%s]", ce.Name, m.Name) if ce.OnlyIfMaster && m.Role != "master" { - logger.Infof("Skipping over Patroni cluster member [%s:%s] as not a master", ce.DBUniqueName, m.Name) + logger.Infof("Skipping over Patroni cluster member [%s:%s] as not a master", ce.Name, m.Name) continue } host, port, err := parseHostAndPortFromJdbcConnStr(m.ConnURL) if err != nil { - logger.Errorf("Could not parse Patroni conn str \"%s\" [%s:%s]: %v", m.ConnURL, ce.DBUniqueName, m.Scope, err) + logger.Errorf("Could not parse Patroni conn str \"%s\" [%s:%s]: %v", m.ConnURL, ce.Name, m.Scope, err) continue } if ce.OnlyIfMaster { - dbUnique = ce.DBUniqueName + dbUnique = ce.Name if ce.Kind == SourcePatroniNamespace { - dbUnique = ce.DBUniqueName + "_" + m.Scope + dbUnique = ce.Name + "_" + m.Scope } } else { - dbUnique = ce.DBUniqueName + "_" + m.Name + dbUnique = ce.Name + "_" + m.Name } if ce.GetDatabaseName() != "" { c := &MonitoredDatabase{Source: *ce.Clone()} - c.DBUniqueName = dbUnique + c.Name = dbUnique mds = append(mds, c) continue } @@ -300,7 +298,7 @@ func ResolveDatabasesFromPatroni(ce Source) ([]*MonitoredDatabase, error) { return err }) if err != nil { - logger.Errorf("Could not contact Patroni member [%s:%s]: %v", ce.DBUniqueName, m.Scope, err) + logger.Errorf("Could not contact Patroni member [%s:%s]: %v", ce.Name, m.Scope, err) continue } defer c.Close() @@ -319,7 +317,7 @@ func ResolveDatabasesFromPatroni(ce Source) ([]*MonitoredDatabase, error) { } data, err := pgx.CollectRows(rows, pgx.RowToMap) if err != nil { - logger.Errorf("Could not get DB name listing from Patroni member [%s:%s]: %v", ce.DBUniqueName, m.Scope, err) + logger.Errorf("Could not get DB name listing from Patroni member [%s:%s]: %v", ce.Name, m.Scope, err) continue } @@ -331,7 +329,7 @@ func ResolveDatabasesFromPatroni(ce Source) ([]*MonitoredDatabase, error) { connURL.Host = host + ":" + port connURL.Path = d["datname"].(string) c := ce.Clone() - c.DBUniqueName = dbUnique + "_" + d["datname_escaped"].(string) + c.Name = dbUnique + "_" + d["datname_escaped"].(string) c.ConnStr = connURL.String() mds = append(mds, &MonitoredDatabase{Source: *c}) } @@ -371,7 +369,7 @@ func ResolveDatabasesFromPostgres(s Source) (resolvedDbs MonitoredDatabases, err return nil, err } rdb := &MonitoredDatabase{Source: *s.Clone()} - rdb.DBUniqueName += "_" + dbname + rdb.Name += "_" + dbname rdb.SetDatabaseName(dbname) resolvedDbs = append(resolvedDbs, rdb) } diff --git a/src/sources/resolver_test.go b/src/sources/resolver_test.go index 2ab95a2af6..ac255c6b55 100644 --- a/src/sources/resolver_test.go +++ b/src/sources/resolver_test.go @@ -5,6 +5,7 @@ import ( "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/cybertec-postgresql/pgwatch3/sources" testcontainers "github.com/testcontainers/testcontainers-go" @@ -21,12 +22,12 @@ func TestMonitoredDatabase_ResolveDatabasesFromPostgres(t *testing.T) { WithOccurrence(2). WithStartupTimeout(5*time.Second)), ) - assert.NoError(t, err) + require.NoError(t, err) defer func() { assert.NoError(t, pgContainer.Terminate(ctx)) }() // Create a new MonitoredDatabase instance md := sources.Source{} - md.DBUniqueName = "continuous" + md.Name = "continuous" md.Kind = sources.SourcePostgresContinuous md.ConnStr, err = pgContainer.ConnectionString(ctx) assert.NoError(t, err) @@ -37,11 +38,11 @@ func TestMonitoredDatabase_ResolveDatabasesFromPostgres(t *testing.T) { assert.True(t, len(dbs) == 2) //postgres and mydatabase // check the "continuous_mydatabase" - db := dbs.GetMonitoredDatabase(md.DBUniqueName + "_mydatabase") + db := dbs.GetMonitoredDatabase(md.Name + "_mydatabase") assert.NotNil(t, db) assert.Equal(t, "mydatabase", db.GetDatabaseName()) //check unexpected database - db = dbs.GetMonitoredDatabase(md.DBUniqueName + "_unexpected") + db = dbs.GetMonitoredDatabase(md.Name + "_unexpected") assert.Nil(t, db) } diff --git a/src/sources/sample.sources.yaml b/src/sources/sample.sources.yaml index 05663281bd..4f976d2fee 100644 --- a/src/sources/sample.sources.yaml +++ b/src/sources/sample.sources.yaml @@ -1,4 +1,4 @@ -- unique_name: test1 # an arbitrary name for the monitored DB. functions also as prefix for found DBs if using continuous discovery "dbtype"-s +- name: test1 # an arbitrary name for the monitored DB. functions also as prefix for found DBs if using continuous discovery "dbtype"-s # Should be chosen carefully - cannot be (easily) changed for the already stored metric data! kind: postgres # postgres|postgres-continuous-discovery|pgbouncer|pgpool|patroni|patroni-continuous-discovery|patroni-namespace-discovery # defaults to postgres if not specified @@ -18,14 +18,8 @@ sslcert: '' sslkey: '' -- unique_name: test2 +- name: test2 kind: patroni-continuous-discovery - host: localhost - port: 5432 - dbname: '' - user: postgres - password: - sslmode: disable host_config: # used for storing patroni connect info (dbtype=patroni|patroni-continuous-discovery) or log parsing settings dcs_type: etcd dcs_endpoints: ["http://localhost:2379"] @@ -48,14 +42,8 @@ is_enabled: false only_if_master: true # don't gather metrics from standby servers -- unique_name: pgbouncer - dbtype: pgbouncer - host: 127.0.0.1 - port: 6432 - dbname: postgres - user: pgwatch2 - sslmode: disable - is_superuser: false +- name: pgbouncer + conn_str: postgresql://pgwatch3:pgwatch3admin@localhost/pgwatch3 custom_metrics: pgbouncer_stats: 10 pgbouncer_clients: 10 @@ -64,7 +52,7 @@ sslcert: '' sslkey: '' -- unique_name: $name +- name: envvar kind: $kind is_enabled: true include_pattern: $include_pattern diff --git a/src/sources/types.go b/src/sources/types.go index 87803792a0..624f02738a 100644 --- a/src/sources/types.go +++ b/src/sources/types.go @@ -43,7 +43,7 @@ type ( // through a connection pooler, which supports its own additional metrics. If one is not interested in // those additional metrics, it is ok to specify the connection details as a regular postgres source. Source struct { - DBUniqueName string `yaml:"unique_name" db:"name"` + Name string `yaml:"name" db:"name"` Group string `yaml:"group" db:"group"` ConnStr string `yaml:"conn_str" db:"connstr"` Metrics map[string]float64 `yaml:"custom_metrics" db:"config"` @@ -92,14 +92,28 @@ type ( MonitoredDatabases []*MonitoredDatabase ) +// Ping will try to establish a brand new connection to the server and return any error +func (md *MonitoredDatabase) Ping(ctx context.Context) error { + c, err := pgx.Connect(ctx, md.ConnStr) + if err != nil { + return err + } + defer func() { _ = c.Close(ctx) }() + return c.Ping(ctx) +} + +// Connect will establish a connection to the database if it's not already connected. +// If the connection is already established, it pings the server to ensure it's still alive. func (md *MonitoredDatabase) Connect(ctx context.Context) (err error) { - if md.Conn != nil { - return md.Conn.Ping(ctx) + if md.Conn == nil { + if md.Conn, err = db.New(ctx, md.ConnStr); err != nil { + return err + } } - md.Conn, err = db.New(ctx, md.ConnStr) - return + return md.Conn.Ping(ctx) } +// GetDatabaseName returns the database name from the connection string func (md *MonitoredDatabase) GetDatabaseName() string { var err error if md.ConnConfig == nil { @@ -110,6 +124,8 @@ func (md *MonitoredDatabase) GetDatabaseName() string { return md.ConnConfig.Database } +// SetDatabaseName sets the database name in the connection config but +// does not update the connection string func (md *MonitoredDatabase) SetDatabaseName(name string) { var err error if md.ConnConfig == nil { @@ -126,28 +142,29 @@ func (md *MonitoredDatabase) IsPostgresSource() bool { func (mds MonitoredDatabases) GetMonitoredDatabase(DBUniqueName string) *MonitoredDatabase { for _, md := range mds { - if md.DBUniqueName == DBUniqueName { + if md.Name == DBUniqueName { return md } } return nil } +// SyncFromReader will update the monitored databases with the latest configuration from the reader. +// Any resolution errors will be returned, e.g. etcd unavailability. +// It's up to the caller to proceed with the databases available or stop the execution due to errors. func (mds MonitoredDatabases) SyncFromReader(r Reader) (newmds MonitoredDatabases, err error) { srcs, err := r.GetSources() if err != nil { return nil, err } - if newmds, err = srcs.ResolveDatabases(); err != nil { - return nil, err - } + newmds, err = srcs.ResolveDatabases() for _, newMD := range newmds { - if md := mds.GetMonitoredDatabase(newMD.DBUniqueName); md != nil { + if md := mds.GetMonitoredDatabase(newMD.Name); md != nil { newMD.Conn = md.Conn newMD.ConnConfig = md.ConnConfig } } - return newmds, nil + return newmds, err } type HostConfigAttrs struct { diff --git a/src/sources/yaml.go b/src/sources/yaml.go index 7bb1e6f482..d7440232c3 100644 --- a/src/sources/yaml.go +++ b/src/sources/yaml.go @@ -36,7 +36,7 @@ func (fcr *fileSourcesReaderWriter) UpdateSource(md Source) error { return err } for i, db := range dbs { - if db.DBUniqueName == md.DBUniqueName { + if db.Name == md.Name { dbs[i] = md return fcr.WriteSources(dbs) } @@ -50,7 +50,7 @@ func (fcr *fileSourcesReaderWriter) DeleteSource(name string) error { if err != nil { return err } - dbs = slices.DeleteFunc(dbs, func(md Source) bool { return md.DBUniqueName == name }) + dbs = slices.DeleteFunc(dbs, func(md Source) bool { return md.Name == name }) return fcr.WriteSources(dbs) } @@ -106,8 +106,8 @@ func (fcr *fileSourcesReaderWriter) expandEnvVars(md Source) Source { if strings.HasPrefix(string(md.Kind), "$") { md.Kind = Kind(os.ExpandEnv(string(md.Kind))) } - if strings.HasPrefix(md.DBUniqueName, "$") { - md.DBUniqueName = os.ExpandEnv(md.DBUniqueName) + if strings.HasPrefix(md.Name, "$") { + md.Name = os.ExpandEnv(md.Name) } if strings.HasPrefix(md.IncludePattern, "$") { md.IncludePattern = os.ExpandEnv(md.IncludePattern) diff --git a/src/sources/yaml_test.go b/src/sources/yaml_test.go index c8c8a54f5b..a32f0cb3eb 100644 --- a/src/sources/yaml_test.go +++ b/src/sources/yaml_test.go @@ -120,14 +120,14 @@ func TestYAMLUpdateDatabase(t *testing.T) { // change the connection string of the first database md := sources.Source{} - md.DBUniqueName = "test1" + md.Name = "test1" md.ConnStr = "postgresql://localhost/test1" err = yamlrw.UpdateSource(md) a.NoError(err) // add a new database md = sources.Source{} - md.DBUniqueName = "test5" + md.Name = "test5" md.ConnStr = "postgresql://localhost/test5" err = yamlrw.UpdateSource(md) a.NoError(err) diff --git a/src/webserver/cmdopts.go b/src/webserver/cmdopts.go new file mode 100644 index 0000000000..2c7f068062 --- /dev/null +++ b/src/webserver/cmdopts.go @@ -0,0 +1,8 @@ +package webserver + +// WebUICmdOpts specifies the internal web UI server command-line options +type WebUICmdOpts struct { + WebAddr string `long:"web-addr" mapstructure:"web-addr" description:"TCP address in the form 'host:port' to listen on" default:":8080" env:"PW3_WEBADDR"` + WebUser string `long:"web-user" mapstructure:"web-user" description:"Admin login" env:"PW3_WEBUSER"` + WebPassword string `long:"web-password" mapstructure:"web-password" description:"Admin password" env:"PW3_WEBPASSWORD"` +} diff --git a/src/webserver/server_test.go b/src/webserver/server_test.go index 3f15ddfed6..24e5a4704c 100644 --- a/src/webserver/server_test.go +++ b/src/webserver/server_test.go @@ -10,7 +10,6 @@ import ( "strings" "testing" - "github.com/cybertec-postgresql/pgwatch3/config" "github.com/cybertec-postgresql/pgwatch3/log" "github.com/cybertec-postgresql/pgwatch3/webserver" "github.com/stretchr/testify/assert" @@ -22,7 +21,7 @@ type Credentials struct { } func TestStatus(t *testing.T) { - restsrv := webserver.Init(config.WebUIOpts{WebAddr: "127.0.0.1:8080"}, os.DirFS("../webui/build"), nil, nil, log.FallbackLogger) + restsrv := webserver.Init(webserver.WebUICmdOpts{WebAddr: "127.0.0.1:8080"}, os.DirFS("../webui/build"), nil, nil, log.FallbackLogger) assert.NotNil(t, restsrv) // r, err := http.Get("http://localhost:8080/") // assert.NoError(t, err) @@ -34,7 +33,7 @@ func TestStatus(t *testing.T) { func TestServerNoAuth(t *testing.T) { host := "http://localhost:8081" - restsrv := webserver.Init(config.WebUIOpts{WebAddr: "localhost:8081"}, os.DirFS("../webui/build"), nil, nil, log.FallbackLogger) + restsrv := webserver.Init(webserver.WebUICmdOpts{WebAddr: "localhost:8081"}, os.DirFS("../webui/build"), nil, nil, log.FallbackLogger) assert.NotNil(t, restsrv) rr := httptest.NewRecorder() // test request metrics @@ -65,7 +64,7 @@ func TestServerNoAuth(t *testing.T) { func TestGetToken(t *testing.T) { host := "http://localhost:8082" - restsrv := webserver.Init(config.WebUIOpts{WebAddr: "localhost:8082"}, os.DirFS("../webui/build"), nil, nil, log.FallbackLogger) + restsrv := webserver.Init(webserver.WebUICmdOpts{WebAddr: "localhost:8082"}, os.DirFS("../webui/build"), nil, nil, log.FallbackLogger) rr := httptest.NewRecorder() credentials := Credentials{ diff --git a/src/webserver/webserver.go b/src/webserver/webserver.go index 63e3fefc63..8ecfc0b69e 100644 --- a/src/webserver/webserver.go +++ b/src/webserver/webserver.go @@ -12,7 +12,6 @@ import ( "strings" "time" - "github.com/cybertec-postgresql/pgwatch3/config" "github.com/cybertec-postgresql/pgwatch3/log" "github.com/cybertec-postgresql/pgwatch3/metrics" "github.com/cybertec-postgresql/pgwatch3/sources" @@ -21,13 +20,13 @@ import ( type WebUIServer struct { l log.LoggerIface http.Server - config.WebUIOpts + WebUICmdOpts uiFS fs.FS metricsReaderWriter metrics.ReaderWriter sourcesReaderWriter sources.ReaderWriter } -func Init(opts config.WebUIOpts, webuifs fs.FS, mrw metrics.ReaderWriter, srw sources.ReaderWriter, logger log.LoggerIface) *WebUIServer { +func Init(opts WebUICmdOpts, webuifs fs.FS, mrw metrics.ReaderWriter, srw sources.ReaderWriter, logger log.LoggerIface) *WebUIServer { mux := http.NewServeMux() s := &WebUIServer{ logger,