Skip to content

Commit

Permalink
[*] move psutil and logparser to sources package (#363)
Browse files Browse the repository at this point in the history
  • Loading branch information
pashagolub authored Feb 13, 2024
1 parent fa5abe3 commit 397d2b2
Show file tree
Hide file tree
Showing 9 changed files with 104 additions and 113 deletions.
24 changes: 12 additions & 12 deletions src/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (

"github.com/cybertec-postgresql/pgwatch3/db"
"github.com/cybertec-postgresql/pgwatch3/metrics"
"github.com/cybertec-postgresql/pgwatch3/psutil"
"github.com/cybertec-postgresql/pgwatch3/metrics/psutil"
"github.com/cybertec-postgresql/pgwatch3/sources"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
Expand Down Expand Up @@ -81,30 +81,30 @@ func DBExecRead(ctx context.Context, conn db.PgxIface, sql string, args ...any)
return nil, err
}

func GetConnByUniqueName(dbUnique string) db.PgxIface {
monitoredDbConnCacheLock.RLock()
conn := monitoredDbConnCache[dbUnique]
monitoredDbConnCacheLock.RUnlock()
return conn
}

func DBExecReadByDbUniqueName(ctx context.Context, dbUnique string, sql string, args ...any) (metrics.Measurements, error) {
var conn db.PgxIface
var md sources.MonitoredDatabase
var data metrics.Measurements
var err error
var tx pgx.Tx
var exists bool
if strings.TrimSpace(sql) == "" {
return nil, errors.New("empty SQL")
}
md, err = GetMonitoredDatabaseByUniqueName(dbUnique)
if err != nil {
if md, err = GetMonitoredDatabaseByUniqueName(dbUnique); err != nil {
return nil, err
}
monitoredDbConnCacheLock.RLock()
// sqlx.DB itself is parallel safe
conn, exists = monitoredDbConnCache[dbUnique]
monitoredDbConnCacheLock.RUnlock()
if !exists || conn == nil {
if conn = GetConnByUniqueName(dbUnique); conn == nil {
logger.Errorf("SQL connection for dbUnique %s not found or nil", dbUnique) // Should always be initialized in the main loop DB discovery code ...
return nil, errors.New("SQL connection not found or nil")
}
tx, err = conn.Begin(ctx)
if err != nil {
if tx, err = conn.Begin(ctx); err != nil {
return nil, err
}
defer func() { _ = tx.Commit(ctx) }()
Expand Down Expand Up @@ -224,7 +224,7 @@ func DBGetPGVersion(ctx context.Context, dbUnique string, srcType sources.Kind,
}
getVerLock.Lock() // limit to 1 concurrent version info fetch per DB
defer getVerLock.Unlock()
logger.WithField("database", dbUnique).
logger.WithField("source", dbUnique).
WithField("type", srcType).Debug("determining DB version and recovery status...")

if verNew.Extensions == nil {
Expand Down
2 changes: 1 addition & 1 deletion src/log/formatter.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
func newFormatter(noColors bool) *Formatter {
return &Formatter{
HideKeys: false,
FieldsOrder: []string{"database", "metric", "sql", "params"},
FieldsOrder: []string{"source", "metric", "sql", "params"},
TimestampFormat: "2006-01-02 15:04:05.000",
ShowFullLevel: true,
NoColors: noColors,
Expand Down
52 changes: 31 additions & 21 deletions src/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,14 @@ import (
"github.com/cybertec-postgresql/pgwatch3/config"
"github.com/cybertec-postgresql/pgwatch3/log"
"github.com/cybertec-postgresql/pgwatch3/metrics"
"github.com/cybertec-postgresql/pgwatch3/psutil"
"github.com/cybertec-postgresql/pgwatch3/metrics/psutil"
"github.com/cybertec-postgresql/pgwatch3/sinks"
"github.com/cybertec-postgresql/pgwatch3/sources"
"github.com/cybertec-postgresql/pgwatch3/webserver"
"github.com/shopspring/decimal"
"github.com/sirupsen/logrus"
)

type ControlMessage struct {
Action string // START, STOP, PAUSE
Config map[string]float64
}

type MetricFetchMessage struct {
DBUniqueName string
DBUniqueNameOrig string
Expand Down Expand Up @@ -540,7 +535,7 @@ retry_with_superuser_sql: // if 1st fetch with normal SQL fails, try with SU SQL
goto retry_with_superuser_sql
}
if firstErr != nil {
logger.WithField("database", msg.DBUniqueName).WithField("metric", msg.MetricName).Error(err)
logger.WithField("source", msg.DBUniqueName).WithField("metric", msg.MetricName).Error(err)
return nil, firstErr // returning the initial error
}
logger.Infof("[%s:%s] failed to fetch metrics: %s", msg.DBUniqueName, msg.MetricName, err)
Expand All @@ -553,7 +548,7 @@ retry_with_superuser_sql: // if 1st fetch with normal SQL fails, try with SU SQL
return nil, err
}

logger.WithFields(map[string]any{"database": msg.DBUniqueName, "metric": msg.MetricName, "rows": len(data)}).Info("measurements fetched")
logger.WithFields(map[string]any{"source": msg.DBUniqueName, "metric": msg.MetricName, "rows": len(data)}).Info("measurements fetched")
if regexIsPgbouncerMetrics.MatchString(msg.MetricName) { // clean unwanted pgbouncer pool stats here as not possible in SQL
data = FilterPgbouncerData(data, md.GetDatabaseName(), vme)
}
Expand Down Expand Up @@ -717,8 +712,15 @@ func deepCopyMetricDefinitionMap(mdm map[string]map[uint]metrics.MetricPropertie
return newMdm
}

// ControlMessage notifies of shutdown + interval change
func MetricGathererLoop(ctx context.Context, dbUniqueName, dbUniqueNameOrig string, srcType sources.Kind, metricName string, configMap map[string]float64, controlCh <-chan ControlMessage, storeCh chan<- []metrics.MeasurementMessage) {
// metrics.ControlMessage notifies of shutdown + interval change
func MetricGathererLoop(ctx context.Context,
dbUniqueName, dbUniqueNameOrig string,
srcType sources.Kind,
metricName string,
configMap map[string]float64,
controlCh <-chan metrics.ControlMessage,
storeCh chan<- []metrics.MeasurementMessage) {

config := configMap
interval := config[metricName]
hostState := make(map[string]map[string]string)
Expand All @@ -732,9 +734,17 @@ func MetricGathererLoop(ctx context.Context, dbUniqueName, dbUniqueNameOrig stri
lastDBVersionFetchTime := time.Unix(0, 0) // check DB ver. ev. 5 min
var stmtTimeoutOverride int64

l := logger.WithField("database", dbUniqueName).WithField("metric", metricName)
l := logger.WithField("source", dbUniqueName).WithField("metric", metricName)
if metricName == specialMetricServerLogEventCounts {
logparseLoop(dbUniqueName, metricName, configMap, controlCh, storeCh) // no return
mdb, err := GetMonitoredDatabaseByUniqueName(dbUniqueName)
if err != nil {
return
}
dbPgVersionMapLock.RLock()
realDbname := dbPgVersionMap[dbUniqueName].RealDbname // to manage 2 sets of event counts - monitored DB + global
dbPgVersionMapLock.RUnlock()
conn := GetConnByUniqueName(dbUniqueName)
metrics.ParseLogs(ctx, conn, mdb, realDbname, metricName, configMap, controlCh, storeCh) // no return
return
}

Expand Down Expand Up @@ -1234,7 +1244,7 @@ func shouldDbBeMonitoredBasedOnCurrentState(md sources.MonitoredDatabase) bool {
return !IsDBDormant(md.DBUniqueName)
}

func ControlChannelsMapToList(controlChannels map[string]chan ControlMessage) []string {
func ControlChannelsMapToList(controlChannels map[string]chan metrics.ControlMessage) []string {
controlChannelList := make([]string, len(controlChannels))
i := 0
for key := range controlChannels {
Expand Down Expand Up @@ -1464,7 +1474,7 @@ func main() {
go StatsSummarizer(mainContext)
}

controlChannels := make(map[string](chan ControlMessage)) // [db1+metric1]=chan
controlChannels := make(map[string](chan metrics.ControlMessage)) // [db1+metric1]=chan
measurementCh := make(chan []metrics.MeasurementMessage, 10000)

var monitoredDbs sources.MonitoredDatabases
Expand Down Expand Up @@ -1517,7 +1527,7 @@ func main() {
}

logger.
WithField("databases", len(monitoredDbs)).
WithField("sources", len(monitoredDbs)).
WithField("metrics", len(metricDefinitionMap)).
Log(func() logrus.Level {
if firstLoop && len(monitoredDbs)*len(metricDefinitionMap) == 0 {
Expand All @@ -1529,7 +1539,7 @@ func main() {
firstLoop = false // only used for failing when 1st config reading fails

for _, host := range monitoredDbs {
logger.WithField("database", host.DBUniqueName).
logger.WithField("source", host.DBUniqueName).
WithField("metric", host.Metrics).
WithField("tags", host.CustomTags).
WithField("config", host.HostConfig).Debug()
Expand Down Expand Up @@ -1674,8 +1684,8 @@ func main() {
if metricDefOk && !chOk { // initialize a new per db/per metric control channel
if interval > 0 {
hostMetricIntervalMap[dbMetric] = interval
logger.WithField("database", dbUnique).WithField("metric", metric).WithField("interval", interval).Info("starting gatherer")
controlChannels[dbMetric] = make(chan ControlMessage, 1)
logger.WithField("source", dbUnique).WithField("metric", metric).WithField("interval", interval).Info("starting gatherer")
controlChannels[dbMetric] = make(chan metrics.ControlMessage, 1)

metricNameForStorage := metricName
if _, isSpecialMetric := specialMetrics[metricName]; !isSpecialMetric {
Expand All @@ -1701,7 +1711,7 @@ func main() {
} else if (!metricDefOk && chOk) || interval <= 0 {
// metric definition files were recently removed or interval set to zero
logger.Warning("shutting down metric", metric, "for", host.DBUniqueName)
controlChannels[dbMetric] <- ControlMessage{Action: gathererStatusStop}
controlChannels[dbMetric] <- metrics.ControlMessage{Action: gathererStatusStop}
delete(controlChannels, dbMetric)
} else if !metricDefOk {
epoch, ok := lastSQLFetchError.Load(metric)
Expand All @@ -1713,7 +1723,7 @@ func main() {
// check if interval has changed
if hostMetricIntervalMap[dbMetric] != interval {
logger.Warning("sending interval update for", dbUnique, metric)
controlChannels[dbMetric] <- ControlMessage{Action: gathererStatusStart, Config: metricConfig}
controlChannels[dbMetric] <- metrics.ControlMessage{Action: gathererStatusStart, Config: metricConfig}
hostMetricIntervalMap[dbMetric] = interval
}
}
Expand Down Expand Up @@ -1782,7 +1792,7 @@ func main() {

if mainContext.Err() != nil || wholeDbShutDownDueToRoleChange || dbRemovedFromConfig || singleMetricDisabled {
logger.Infof("shutting down gatherer for [%s:%s] ...", db, metric)
controlChannels[dbMetric] <- ControlMessage{Action: gathererStatusStop}
controlChannels[dbMetric] <- metrics.ControlMessage{Action: gathererStatusStop}
delete(controlChannels, dbMetric)
logger.Debugf("control channel for [%s:%s] deleted", db, metric)
gatherersShutDown++
Expand Down
Loading

0 comments on commit 397d2b2

Please sign in to comment.