Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[!] add support for application commands #504

Merged
merged 8 commits into from
Aug 19, 2024
3 changes: 2 additions & 1 deletion src/.golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,5 @@ issues:
# it can be disabled by `exclude-use-default: false`. To list all
# excluded by default patterns execute `golangci-lint run --help`
exclude:
- CPUTimesStat # SA1019: CPUTimesStat is deprecated
- CPUTimesStat # SA1019: CPUTimesStat is deprecated
- SA5008 # duplicate struct tag "choice" (staticcheck)
81 changes: 81 additions & 0 deletions src/config/cmdmetric.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package config

import (
"context"
"fmt"
"math"
"slices"

"golang.org/x/exp/maps"
)

type MetricCommand struct {
owner *Options
PrintInit MetricPrintInitCommand `command:"print-init" description:"Get and print init SQL for a given metric or preset"`
PrintSQL MetricPrintSQLCommand `command:"print-sql" description:"Get and print SQL for a given metric"`
}

func NewMetricCommand(owner *Options) *MetricCommand {
return &MetricCommand{
owner: owner,
PrintInit: MetricPrintInitCommand{owner: owner},
PrintSQL: MetricPrintSQLCommand{owner: owner},
}
}

type MetricPrintInitCommand struct {
owner *Options
}

func (cmd *MetricPrintInitCommand) Execute(args []string) error {
err := cmd.owner.InitMetricReader(context.Background())
if err != nil {
return err
}
metrics, err := cmd.owner.MetricsReaderWriter.GetMetrics()
if err != nil {
return err
}
for _, name := range args {
if preset, ok := metrics.PresetDefs[name]; ok {
args = append(args, maps.Keys(preset.Metrics)...)
}
}
slices.Sort(args)
args = slices.Compact(args)
for _, mname := range args {
if m, ok := metrics.MetricDefs[mname]; ok && m.InitSQL != "" {
fmt.Println("-- ", mname)
fmt.Println(m.InitSQL)
}
}
cmd.owner.CompleteCommand(ExitCodeOK)
return nil
}

type MetricPrintSQLCommand struct {
owner *Options
Version int `short:"v" long:"version" description:"PostgreSQL version to get SQL for"`
}

func (cmd *MetricPrintSQLCommand) Execute(args []string) error {
err := cmd.owner.InitMetricReader(context.Background())
if err != nil {
return err
}
metrics, err := cmd.owner.MetricsReaderWriter.GetMetrics()
if err != nil {
return err
}
if cmd.Version == 0 {
cmd.Version = math.MaxInt32
}
for _, name := range args {
if m, ok := metrics.MetricDefs[name]; ok {
fmt.Println("-- ", name)
fmt.Println(m.GetSQL(cmd.Version))
}
}
cmd.owner.CompleteCommand(ExitCodeOK)
return nil
}
189 changes: 128 additions & 61 deletions src/config/cmdoptions.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,35 @@
package config

import (
"context"
"errors"
"fmt"
"io"
"io/fs"
"os"
"time"

"github.com/cybertec-postgresql/pgwatch3/db"
"github.com/cybertec-postgresql/pgwatch3/log"
"github.com/cybertec-postgresql/pgwatch3/metrics"
"github.com/cybertec-postgresql/pgwatch3/sinks"
"github.com/cybertec-postgresql/pgwatch3/sources"
"github.com/cybertec-postgresql/pgwatch3/webserver"
"github.com/jackc/pgx/v5"
flags "github.com/jessevdk/go-flags"
)

const (
ExitCodeOK int32 = iota
ExitCodeConfigError
ExitCodeCmdError
ExitCodeWebUIError
ExitCodeUpgradeError
ExitCodeUserCancel
ExitCodeShutdownCommand
ExitCodeFatalError
)

type Kind int

const (
Expand All @@ -19,94 +39,79 @@ const (
ConfigError
)

// SourceOpts specifies the sources retrieval options
type SourceOpts struct {
Config string `short:"c" long:"config" mapstructure:"config" description:"Postgres URI, file or folder of YAML files containing info on which DBs to monitor and where to store metrics" env:"PW3_CONFIG"`
Refresh int `long:"refresh" mapstructure:"refresh" description:"How frequently to resync sources and metrics" env:"PW3_REFRESH" default:"120"`
Groups []string `short:"g" long:"group" mapstructure:"group" description:"Groups for filtering which databases to monitor. By default all are monitored" env:"PW3_GROUP"`
MinDbSizeMB int64 `long:"min-db-size-mb" mapstructure:"min-db-size-mb" description:"Smaller size DBs will be ignored and not monitored until they reach the threshold." env:"PW3_MIN_DB_SIZE_MB" default:"0"`
MaxParallelConnectionsPerDb int `long:"max-parallel-connections-per-db" mapstructure:"max-parallel-connections-per-db" description:"Max parallel metric fetches per DB. Note the multiplication effect on multi-DB instances" env:"PW3_MAX_PARALLEL_CONNECTIONS_PER_DB" default:"2"`
TryCreateListedExtsIfMissing string `long:"try-create-listed-exts-if-missing" mapstructure:"try-create-listed-exts-if-missing" description:"Try creating the listed extensions (comma sep.) on first connect for all monitored DBs when missing. Main usage - pg_stat_statements" env:"PW3_TRY_CREATE_LISTED_EXTS_IF_MISSING" default:""`
}

// MetricOpts specifies metric definitions
type MetricOpts struct {
Metrics string `short:"m" long:"metrics" mapstructure:"metrics" description:"File or folder of YAML files with metrics definitions" env:"PW3_METRICS"`
NoHelperFunctions bool `long:"no-helper-functions" mapstructure:"no-helper-functions" description:"Ignore metric definitions using helper functions (in form get_smth()) and don't also roll out any helpers automatically" env:"PW3_NO_HELPER_FUNCTIONS"`
DirectOSStats bool `long:"direct-os-stats" mapstructure:"direct-os-stats" description:"Extract OS related psutil statistics not via PL/Python wrappers but directly on host" env:"PW3_DIRECT_OS_STATS"`
InstanceLevelCacheMaxSeconds int64 `long:"instance-level-cache-max-seconds" mapstructure:"instance-level-cache-max-seconds" description:"Max allowed staleness for instance level metric data shared between DBs of an instance. Affects 'continuous' host types only. Set to 0 to disable" env:"PW3_INSTANCE_LEVEL_CACHE_MAX_SECONDS" default:"30"`
EmergencyPauseTriggerfile string `long:"emergency-pause-triggerfile" mapstructure:"emergency-pause-triggerfile" description:"When the file exists no metrics will be temporarily fetched / scraped" env:"PW3_EMERGENCY_PAUSE_TRIGGERFILE" default:"/tmp/pgwatch3-emergency-pause"`
}

// MeasurementOpts specifies the storage configuration to store metrics measurements
type MeasurementOpts struct {
Sinks []string `long:"sink" mapstructure:"sink" description:"URI where metrics will be stored" env:"PW3_SINK"`
BatchingDelay time.Duration `long:"batching-delay" mapstructure:"batching-delay" description:"Max milliseconds to wait for a batched metrics flush. [Default: 250ms]" default:"250ms" env:"PW3_BATCHING_MAX_DELAY"`
Retention int `long:"retention" mapstructure:"retention" description:"If set, metrics older than that will be deleted" default:"14" env:"PW3_RETENTION"`
RealDbnameField string `long:"real-dbname-field" mapstructure:"real-dbname-field" description:"Tag key for real DB name if --add-real-dbname enabled" env:"PW3_REAL_DBNAME_FIELD" default:"real_dbname"`
SystemIdentifierField string `long:"system-identifier-field" mapstructure:"system-identifier-field" description:"Tag key for system identifier value if --add-system-identifier" env:"PW3_SYSTEM_IDENTIFIER_FIELD" default:"sys_id"`
}

// LoggingOpts specifies the logging configuration
type LoggingOpts struct {
LogLevel string `short:"v" long:"log-level" mapstructure:"log-level" description:"Verbosity level for stdout and log file" choice:"debug" choice:"info" choice:"error" default:"info"`
LogFile string `long:"log-file" mapstructure:"log-file" description:"File name to store logs"`
LogFileFormat string `long:"log-file-format" mapstructure:"log-file-format" description:"Format of file logs" choice:"json" choice:"text" default:"json"`
LogFileRotate bool `long:"log-file-rotate" mapstructure:"log-file-rotate" description:"Rotate log files"`
LogFileSize int `long:"log-file-size" mapstructure:"log-file-size" description:"Maximum size in MB of the log file before it gets rotated" default:"100"`
LogFileAge int `long:"log-file-age" mapstructure:"log-file-age" description:"Number of days to retain old log files, 0 means forever" default:"0"`
LogFileNumber int `long:"log-file-number" mapstructure:"log-file-number" description:"Maximum number of old log files to retain, 0 to retain all" default:"0"`
type Options struct {
Sources sources.SourceCmdOpts `group:"Sources"`
Metrics metrics.MetricCmdOpts `group:"Metrics"`
Sinks sinks.SinkCmdOpts `group:"Sinks"`
Logging log.LoggingCmdOpts `group:"Logging"`
WebUI webserver.WebUICmdOpts `group:"WebUI"`
Init bool `long:"init" description:"Initialize configurations schemas to the latest version and exit. Can be used with --upgrade"`
Upgrade bool `long:"upgrade" description:"Upgrade configurations to the latest version"`
Help bool

// sourcesReaderWriter reads/writes the monitored sources (databases, patroni clusters, pgpools, etc.) information
SourcesReaderWriter sources.ReaderWriter
// metricsReaderWriter reads/writes the metric and preset definitions
MetricsReaderWriter metrics.ReaderWriter
ExitCode int32
CommandCompleted bool
}

// WebUIOpts specifies the internal web UI server options
type WebUIOpts struct {
WebAddr string `long:"web-addr" mapstructure:"web-addr" description:"TCP address in the form 'host:port' to listen on" default:":8080" env:"PW3_WEBADDR"`
WebUser string `long:"web-user" mapstructure:"web-user" description:"Admin login" env:"PW3_WEBUSER"`
WebPassword string `long:"web-password" mapstructure:"web-password" description:"Admin password" env:"PW3_WEBPASSWORD"`
}

type Options struct {
Sources SourceOpts `group:"Sources"`
Metrics MetricOpts `group:"Metrics"`
Measurements MeasurementOpts `group:"Measurements"`
Logging LoggingOpts `group:"Logging"`
WebUI WebUIOpts `group:"WebUI"`
Init bool `long:"init" description:"Initialize configurations schemas to the latest version and exit. Can be used with --upgrade"`
Upgrade bool `long:"upgrade" description:"Upgrade configurations to the latest version"`
Ping bool `long:"ping" mapstructure:"ping" description:"Try to connect to all configured DB-s, report errors and then exit" env:"PW3_PING"`
Help bool
func addCommands(parser *flags.Parser, opts *Options) {
_, _ = parser.AddCommand("metric",
"Manage metrics",
"Commands to manage metrics",
NewMetricCommand(opts))
_, _ = parser.AddCommand("source",
"Manage sources",
"Commands to manage sources",
NewSourceCommand(opts))
}

// New returns a new instance of CmdOptions
func New(writer io.Writer) (*Options, error) {
cmdOpts := new(Options)
parser := flags.NewParser(cmdOpts, flags.HelpFlag)
var err error
if _, err = parser.Parse(); err != nil {
parser.SubcommandsOptional = true // if not command specified, start monitoring
addCommands(parser, cmdOpts)
nonParsedArgs, err := parser.Parse()
if err != nil { //subcommands executed as part of parsing
if flagsErr, ok := err.(*flags.Error); ok && flagsErr.Type == flags.ErrHelp {
cmdOpts.Help = true
}
if !flags.WroteHelp(err) {
parser.WriteHelp(writer)
}
} else {
if !cmdOpts.CommandCompleted && len(nonParsedArgs) > 0 {
err = fmt.Errorf("unknown argument(s): %v", nonParsedArgs)
}
}
if err == nil {
err = validateConfig(cmdOpts)
}
return cmdOpts, err
}

func (c *Options) CompleteCommand(code int32) {
c.CommandCompleted = true
c.ExitCode = code
}

// Verbose returns true if the debug log is enabled
func (c Options) Verbose() bool {
func (c *Options) Verbose() bool {
return c.Logging.LogLevel == "debug"
}

func (c Options) GetConfigKind() (_ Kind, err error) {
if _, err := pgx.ParseConfig(c.Sources.Config); err == nil {
func (c *Options) GetConfigKind(arg string) (_ Kind, err error) {
if arg == "" {
return Kind(ConfigError), errors.New("no configuration provided")
}
if _, err := pgx.ParseConfig(arg); err == nil {
return Kind(ConfigPgURL), nil
}
var fi os.FileInfo
if fi, err = os.Stat(c.Sources.Config); err == nil {
if fi, err = os.Stat(arg); err == nil {
if fi.IsDir() {
return Kind(ConfigFolder), nil
}
Expand All @@ -115,7 +120,69 @@ func (c Options) GetConfigKind() (_ Kind, err error) {
return Kind(ConfigError), err
}

// InitMetricReader creates a new source reader based on the configuration kind from the options.
func (c *Options) InitMetricReader(ctx context.Context) (err error) {
var configKind Kind
if c.Metrics.Metrics == "" { //if config database is configured, use it for metrics as well
if k, err := c.GetConfigKind(c.Sources.Sources); err == nil && k == ConfigPgURL {
c.Metrics.Metrics = c.Sources.Sources
} else { // otherwise use built-in metrics
c.MetricsReaderWriter, err = metrics.NewYAMLMetricReaderWriter(ctx, "")
return err
}
}
if configKind, err = c.GetConfigKind(c.Metrics.Metrics); err != nil {
return
}
switch configKind {
case ConfigPgURL:
var configDb db.PgxPoolIface
if configDb, err = db.New(ctx, c.Sources.Sources); err != nil {
return err
}
c.MetricsReaderWriter, err = metrics.NewPostgresMetricReaderWriter(ctx, configDb)
default:
c.MetricsReaderWriter, err = metrics.NewYAMLMetricReaderWriter(ctx, c.Metrics.Metrics)
}
return err
}

// InitSourceReader creates a new source reader based on the configuration kind from the options.
func (c *Options) InitSourceReader(ctx context.Context) (err error) {
var configKind Kind
if configKind, err = c.GetConfigKind(c.Sources.Sources); err != nil {
return
}
switch configKind {
case ConfigPgURL:
var configDb db.PgxPoolIface
if configDb, err = db.New(ctx, c.Sources.Sources); err != nil {
return err
}
c.SourcesReaderWriter, err = sources.NewPostgresSourcesReaderWriter(ctx, configDb)
default:
c.SourcesReaderWriter, err = sources.NewYAMLSourcesReaderWriter(ctx, c.Sources.Sources)
}
return err
}

// InitConfigReaders creates the configuration readers based on the configuration kind from the options.
func (c *Options) InitConfigReaders(ctx context.Context) error {
return errors.Join(c.InitMetricReader(ctx), c.InitSourceReader(ctx))
}

// InitWebUI initializes the web UI server
func (c *Options) InitWebUI(fs fs.FS, logger log.LoggerIface) error {
if webserver.Init(c.WebUI, fs, c.MetricsReaderWriter, c.SourcesReaderWriter, logger) == nil {
return errors.New("failed to initialize web UI")
}
return nil
}

func validateConfig(c *Options) error {
if len(c.Sources.Sources)+len(c.Metrics.Metrics) == 0 {
return errors.New("both --sources and --metrics are empty")
}
if c.Sources.Refresh <= 1 {
return errors.New("--servers-refresh-loop-seconds must be greater than 1")
}
Expand All @@ -124,7 +191,7 @@ func validateConfig(c *Options) error {
}

// validate that input is boolean is set
if c.Measurements.BatchingDelay <= 0 || c.Measurements.BatchingDelay > time.Hour {
if c.Sinks.BatchingDelay <= 0 || c.Sinks.BatchingDelay > time.Hour {
return errors.New("--batching-delay-ms must be between 0 and 3600000")
}

Expand Down
9 changes: 5 additions & 4 deletions src/config/cmdoptions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"os"
"testing"

"github.com/cybertec-postgresql/pgwatch3/log"
flags "github.com/jessevdk/go-flags"
"github.com/stretchr/testify/assert"
)
Expand Down Expand Up @@ -40,9 +41,9 @@ func TestParseSuccess(t *testing.T) {
}

func TestLogLevel(t *testing.T) {
c := &Options{Logging: LoggingOpts{LogLevel: "debug"}}
c := &Options{Logging: log.LoggingCmdOpts{LogLevel: "debug"}}
assert.True(t, c.Verbose())
c = &Options{Logging: LoggingOpts{LogLevel: "info"}}
c = &Options{Logging: log.LoggingCmdOpts{LogLevel: "info"}}
assert.False(t, c.Verbose())
}

Expand All @@ -52,7 +53,7 @@ func TestNewCmdOptions(t *testing.T) {
}

func TestConfig(t *testing.T) {
os.Args = []string{0: "config_test", "--config=sample.config.yaml"}
os.Args = []string{0: "config_test", "--sources=sample.config.yaml"}
_, err := New(nil)
assert.NoError(t, err)

Expand All @@ -61,7 +62,7 @@ func TestConfig(t *testing.T) {
assert.Error(t, err)

os.Args = []string{0: "config_test"} // clientname arg is missing, but set PW3_CONFIG
assert.NoError(t, os.Setenv("PW3_CONFIG", "postgresql://foo:baz@bar/test"))
t.Setenv("PW3_SOURCES", "postgresql://foo:baz@bar/test")
_, err = New(nil)
assert.NoError(t, err)
}
Loading
Loading