Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[*] move psutil and logparser to sources package #363

Merged
merged 3 commits into from
Feb 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 12 additions & 12 deletions src/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (

"github.com/cybertec-postgresql/pgwatch3/db"
"github.com/cybertec-postgresql/pgwatch3/metrics"
"github.com/cybertec-postgresql/pgwatch3/psutil"
"github.com/cybertec-postgresql/pgwatch3/metrics/psutil"
"github.com/cybertec-postgresql/pgwatch3/sources"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
Expand Down Expand Up @@ -81,30 +81,30 @@ func DBExecRead(ctx context.Context, conn db.PgxIface, sql string, args ...any)
return nil, err
}

func GetConnByUniqueName(dbUnique string) db.PgxIface {
monitoredDbConnCacheLock.RLock()
conn := monitoredDbConnCache[dbUnique]
monitoredDbConnCacheLock.RUnlock()
return conn
}

func DBExecReadByDbUniqueName(ctx context.Context, dbUnique string, sql string, args ...any) (metrics.Measurements, error) {
var conn db.PgxIface
var md sources.MonitoredDatabase
var data metrics.Measurements
var err error
var tx pgx.Tx
var exists bool
if strings.TrimSpace(sql) == "" {
return nil, errors.New("empty SQL")
}
md, err = GetMonitoredDatabaseByUniqueName(dbUnique)
if err != nil {
if md, err = GetMonitoredDatabaseByUniqueName(dbUnique); err != nil {
return nil, err
}
monitoredDbConnCacheLock.RLock()
// sqlx.DB itself is parallel safe
conn, exists = monitoredDbConnCache[dbUnique]
monitoredDbConnCacheLock.RUnlock()
if !exists || conn == nil {
if conn = GetConnByUniqueName(dbUnique); conn == nil {
logger.Errorf("SQL connection for dbUnique %s not found or nil", dbUnique) // Should always be initialized in the main loop DB discovery code ...
return nil, errors.New("SQL connection not found or nil")
}
tx, err = conn.Begin(ctx)
if err != nil {
if tx, err = conn.Begin(ctx); err != nil {
return nil, err
}
defer func() { _ = tx.Commit(ctx) }()
Expand Down Expand Up @@ -224,7 +224,7 @@ func DBGetPGVersion(ctx context.Context, dbUnique string, srcType sources.Kind,
}
getVerLock.Lock() // limit to 1 concurrent version info fetch per DB
defer getVerLock.Unlock()
logger.WithField("database", dbUnique).
logger.WithField("source", dbUnique).
WithField("type", srcType).Debug("determining DB version and recovery status...")

if verNew.Extensions == nil {
Expand Down
2 changes: 1 addition & 1 deletion src/log/formatter.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
func newFormatter(noColors bool) *Formatter {
return &Formatter{
HideKeys: false,
FieldsOrder: []string{"database", "metric", "sql", "params"},
FieldsOrder: []string{"source", "metric", "sql", "params"},
TimestampFormat: "2006-01-02 15:04:05.000",
ShowFullLevel: true,
NoColors: noColors,
Expand Down
52 changes: 31 additions & 21 deletions src/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,14 @@ import (
"github.com/cybertec-postgresql/pgwatch3/config"
"github.com/cybertec-postgresql/pgwatch3/log"
"github.com/cybertec-postgresql/pgwatch3/metrics"
"github.com/cybertec-postgresql/pgwatch3/psutil"
"github.com/cybertec-postgresql/pgwatch3/metrics/psutil"
"github.com/cybertec-postgresql/pgwatch3/sinks"
"github.com/cybertec-postgresql/pgwatch3/sources"
"github.com/cybertec-postgresql/pgwatch3/webserver"
"github.com/shopspring/decimal"
"github.com/sirupsen/logrus"
)

type ControlMessage struct {
Action string // START, STOP, PAUSE
Config map[string]float64
}

type MetricFetchMessage struct {
DBUniqueName string
DBUniqueNameOrig string
Expand Down Expand Up @@ -540,7 +535,7 @@ retry_with_superuser_sql: // if 1st fetch with normal SQL fails, try with SU SQL
goto retry_with_superuser_sql
}
if firstErr != nil {
logger.WithField("database", msg.DBUniqueName).WithField("metric", msg.MetricName).Error(err)
logger.WithField("source", msg.DBUniqueName).WithField("metric", msg.MetricName).Error(err)
return nil, firstErr // returning the initial error
}
logger.Infof("[%s:%s] failed to fetch metrics: %s", msg.DBUniqueName, msg.MetricName, err)
Expand All @@ -553,7 +548,7 @@ retry_with_superuser_sql: // if 1st fetch with normal SQL fails, try with SU SQL
return nil, err
}

logger.WithFields(map[string]any{"database": msg.DBUniqueName, "metric": msg.MetricName, "rows": len(data)}).Info("measurements fetched")
logger.WithFields(map[string]any{"source": msg.DBUniqueName, "metric": msg.MetricName, "rows": len(data)}).Info("measurements fetched")
if regexIsPgbouncerMetrics.MatchString(msg.MetricName) { // clean unwanted pgbouncer pool stats here as not possible in SQL
data = FilterPgbouncerData(data, md.GetDatabaseName(), vme)
}
Expand Down Expand Up @@ -717,8 +712,15 @@ func deepCopyMetricDefinitionMap(mdm map[string]map[uint]metrics.MetricPropertie
return newMdm
}

// ControlMessage notifies of shutdown + interval change
func MetricGathererLoop(ctx context.Context, dbUniqueName, dbUniqueNameOrig string, srcType sources.Kind, metricName string, configMap map[string]float64, controlCh <-chan ControlMessage, storeCh chan<- []metrics.MeasurementMessage) {
// metrics.ControlMessage notifies of shutdown + interval change
func MetricGathererLoop(ctx context.Context,
dbUniqueName, dbUniqueNameOrig string,
srcType sources.Kind,
metricName string,
configMap map[string]float64,
controlCh <-chan metrics.ControlMessage,
storeCh chan<- []metrics.MeasurementMessage) {

config := configMap
interval := config[metricName]
hostState := make(map[string]map[string]string)
Expand All @@ -732,9 +734,17 @@ func MetricGathererLoop(ctx context.Context, dbUniqueName, dbUniqueNameOrig stri
lastDBVersionFetchTime := time.Unix(0, 0) // check DB ver. ev. 5 min
var stmtTimeoutOverride int64

l := logger.WithField("database", dbUniqueName).WithField("metric", metricName)
l := logger.WithField("source", dbUniqueName).WithField("metric", metricName)
if metricName == specialMetricServerLogEventCounts {
logparseLoop(dbUniqueName, metricName, configMap, controlCh, storeCh) // no return
mdb, err := GetMonitoredDatabaseByUniqueName(dbUniqueName)
if err != nil {
return
}
dbPgVersionMapLock.RLock()
realDbname := dbPgVersionMap[dbUniqueName].RealDbname // to manage 2 sets of event counts - monitored DB + global
dbPgVersionMapLock.RUnlock()
conn := GetConnByUniqueName(dbUniqueName)
metrics.ParseLogs(ctx, conn, mdb, realDbname, metricName, configMap, controlCh, storeCh) // no return
return
}

Expand Down Expand Up @@ -1234,7 +1244,7 @@ func shouldDbBeMonitoredBasedOnCurrentState(md sources.MonitoredDatabase) bool {
return !IsDBDormant(md.DBUniqueName)
}

func ControlChannelsMapToList(controlChannels map[string]chan ControlMessage) []string {
func ControlChannelsMapToList(controlChannels map[string]chan metrics.ControlMessage) []string {
controlChannelList := make([]string, len(controlChannels))
i := 0
for key := range controlChannels {
Expand Down Expand Up @@ -1464,7 +1474,7 @@ func main() {
go StatsSummarizer(mainContext)
}

controlChannels := make(map[string](chan ControlMessage)) // [db1+metric1]=chan
controlChannels := make(map[string](chan metrics.ControlMessage)) // [db1+metric1]=chan
measurementCh := make(chan []metrics.MeasurementMessage, 10000)

var monitoredDbs sources.MonitoredDatabases
Expand Down Expand Up @@ -1517,7 +1527,7 @@ func main() {
}

logger.
WithField("databases", len(monitoredDbs)).
WithField("sources", len(monitoredDbs)).
WithField("metrics", len(metricDefinitionMap)).
Log(func() logrus.Level {
if firstLoop && len(monitoredDbs)*len(metricDefinitionMap) == 0 {
Expand All @@ -1529,7 +1539,7 @@ func main() {
firstLoop = false // only used for failing when 1st config reading fails

for _, host := range monitoredDbs {
logger.WithField("database", host.DBUniqueName).
logger.WithField("source", host.DBUniqueName).
WithField("metric", host.Metrics).
WithField("tags", host.CustomTags).
WithField("config", host.HostConfig).Debug()
Expand Down Expand Up @@ -1674,8 +1684,8 @@ func main() {
if metricDefOk && !chOk { // initialize a new per db/per metric control channel
if interval > 0 {
hostMetricIntervalMap[dbMetric] = interval
logger.WithField("database", dbUnique).WithField("metric", metric).WithField("interval", interval).Info("starting gatherer")
controlChannels[dbMetric] = make(chan ControlMessage, 1)
logger.WithField("source", dbUnique).WithField("metric", metric).WithField("interval", interval).Info("starting gatherer")
controlChannels[dbMetric] = make(chan metrics.ControlMessage, 1)

metricNameForStorage := metricName
if _, isSpecialMetric := specialMetrics[metricName]; !isSpecialMetric {
Expand All @@ -1701,7 +1711,7 @@ func main() {
} else if (!metricDefOk && chOk) || interval <= 0 {
// metric definition files were recently removed or interval set to zero
logger.Warning("shutting down metric", metric, "for", host.DBUniqueName)
controlChannels[dbMetric] <- ControlMessage{Action: gathererStatusStop}
controlChannels[dbMetric] <- metrics.ControlMessage{Action: gathererStatusStop}
delete(controlChannels, dbMetric)
} else if !metricDefOk {
epoch, ok := lastSQLFetchError.Load(metric)
Expand All @@ -1713,7 +1723,7 @@ func main() {
// check if interval has changed
if hostMetricIntervalMap[dbMetric] != interval {
logger.Warning("sending interval update for", dbUnique, metric)
controlChannels[dbMetric] <- ControlMessage{Action: gathererStatusStart, Config: metricConfig}
controlChannels[dbMetric] <- metrics.ControlMessage{Action: gathererStatusStart, Config: metricConfig}
hostMetricIntervalMap[dbMetric] = interval
}
}
Expand Down Expand Up @@ -1782,7 +1792,7 @@ func main() {

if mainContext.Err() != nil || wholeDbShutDownDueToRoleChange || dbRemovedFromConfig || singleMetricDisabled {
logger.Infof("shutting down gatherer for [%s:%s] ...", db, metric)
controlChannels[dbMetric] <- ControlMessage{Action: gathererStatusStop}
controlChannels[dbMetric] <- metrics.ControlMessage{Action: gathererStatusStop}
delete(controlChannels, dbMetric)
logger.Debugf("control channel for [%s:%s] deleted", db, metric)
gatherersShutDown++
Expand Down
Loading
Loading