Skip to content

Commit

Permalink
[*] move DbStorageSchemaType related code to PostgresWriter (#344)
Browse files Browse the repository at this point in the history
  • Loading branch information
pashagolub authored Jan 11, 2024
1 parent 51b2fab commit 7df6d62
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 63 deletions.
3 changes: 1 addition & 2 deletions src/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,8 +230,6 @@ var undersizedDBsLock = sync.RWMutex{}
var recoveryIgnoredDBs = make(map[string]bool) // DBs in recovery state and OnlyIfMaster specified in config
var recoveryIgnoredDBsLock = sync.RWMutex{}

var MetricSchema metrics.MetricSchemaType

var logger log.LoggerHookerIface

// VersionToInt parses a given version and returns an integer or
Expand Down Expand Up @@ -1287,6 +1285,7 @@ func IsInDisabledTimeDayRange(localTime time.Time, metricAttrsDisabledDays strin
func UpdateMetricDefinitions(newMetrics map[string]map[uint]metrics.MetricProperties, _ map[string]string) {
metricDefMapLock.Lock()
metricDefinitionMap = newMetrics

metricDefMapLock.Unlock()
logger.Debug("metrics definitions refreshed - nr. found:", len(newMetrics))
}
Expand Down
17 changes: 0 additions & 17 deletions src/metrics/io.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,6 @@ import (
"gopkg.in/yaml.v2"
)

type MetricSchemaType int

const (
MetricSchemaPostgres MetricSchemaType = iota
MetricSchemaTimescale
)

func GetMetricSchemaType(ctx context.Context, conn db.PgxIface) (metricSchema MetricSchemaType, err error) {
var isTs bool
metricSchema = MetricSchemaPostgres
sqlSchemaType := `SELECT schema_type = 'timescale' FROM admin.storage_schema_type`
if err = conn.QueryRow(ctx, sqlSchemaType).Scan(&isTs); err == nil && isTs {
metricSchema = MetricSchemaTimescale
}
return
}

func versionToInt(v string) uint {
if len(v) == 0 {
return 0
Expand Down
89 changes: 53 additions & 36 deletions src/metrics/sinks/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@ import (

func NewPostgresWriter(ctx context.Context, connstr string, opts *config.Options, metricDefs metrics.MetricVersionDefs) (pgw *PostgresWriter, err error) {
pgw = &PostgresWriter{
ctx: ctx,
Ctx: ctx,
MetricDefs: metricDefs,
opts: opts,
}
if pgw.SinkDb, err = db.InitAndTestMetricStoreConnection(ctx, connstr); err != nil {
return
}
if pgw.MetricSchema, err = metrics.GetMetricSchemaType(ctx, pgw.SinkDb); err != nil {
if err = pgw.ReadMetricSchemaType(); err != nil {
pgw.SinkDb.Close()
return
}
Expand All @@ -39,9 +39,9 @@ func NewPostgresWriter(ctx context.Context, connstr string, opts *config.Options
}

type PostgresWriter struct {
ctx context.Context
Ctx context.Context
SinkDb db.PgxPoolIface
MetricSchema metrics.MetricSchemaType
MetricSchema DbStorageSchemaType
MetricDefs metrics.MetricVersionDefs
opts *config.Options
}
Expand All @@ -51,6 +51,23 @@ type ExistingPartitionInfo struct {
EndTime time.Time
}

type DbStorageSchemaType int

const (
DbStorageSchemaPostgres DbStorageSchemaType = iota
DbStorageSchemaTimescale
)

func (pgw *PostgresWriter) ReadMetricSchemaType() (err error) {
var isTs bool
pgw.MetricSchema = DbStorageSchemaPostgres
sqlSchemaType := `SELECT schema_type = 'timescale' FROM admin.storage_schema_type`
if err = pgw.SinkDb.QueryRow(pgw.Ctx, sqlSchemaType).Scan(&isTs); err == nil && isTs {
pgw.MetricSchema = DbStorageSchemaTimescale
}
return
}

const (
epochColumnName string = "epoch_ns" // this column (epoch in nanoseconds) is expected in every metric query
tagPrefix string = "tag_"
Expand Down Expand Up @@ -84,15 +101,15 @@ func (pgw *PostgresWriter) EnsureBuiltinMetricDummies() (err error) {
}

func (pgw *PostgresWriter) EnsureMetricDummy(metric string) (err error) {
_, err = pgw.SinkDb.Exec(pgw.ctx, "select admin.ensure_dummy_metrics_table($1)", metric)
_, err = pgw.SinkDb.Exec(pgw.Ctx, "select admin.ensure_dummy_metrics_table($1)", metric)
return
}

func (pgw *PostgresWriter) Write(msgs []metrics.MetricStoreMessage) error {
if len(msgs) == 0 {
return nil
}
logger := log.GetLogger(pgw.ctx)
logger := log.GetLogger(pgw.Ctx)
tsWarningPrinted := false
metricsToStorePerMetric := make(map[string][]metrics.MetricStoreMessagePostgres)
rowsBatched := 0
Expand Down Expand Up @@ -161,7 +178,7 @@ func (pgw *PostgresWriter) Write(msgs []metrics.MetricStoreMessage) error {

rowsBatched++

if pgw.MetricSchema == metrics.MetricSchemaTimescale {
if pgw.MetricSchema == DbStorageSchemaTimescale {
// set min/max timestamps to check/create partitions
bounds, ok := pgPartBounds[msg.MetricName]
if !ok || (ok && epochTime.Before(bounds.StartTime)) {
Expand All @@ -172,7 +189,7 @@ func (pgw *PostgresWriter) Write(msgs []metrics.MetricStoreMessage) error {
bounds.EndTime = epochTime
pgPartBounds[msg.MetricName] = bounds
}
} else if pgw.MetricSchema == metrics.MetricSchemaPostgres {
} else if pgw.MetricSchema == DbStorageSchemaPostgres {
_, ok := pgPartBoundsDbName[msg.MetricName]
if !ok {
pgPartBoundsDbName[msg.MetricName] = make(map[string]ExistingPartitionInfo)
Expand All @@ -190,9 +207,9 @@ func (pgw *PostgresWriter) Write(msgs []metrics.MetricStoreMessage) error {
}
}

if pgw.MetricSchema == metrics.MetricSchemaPostgres {
if pgw.MetricSchema == DbStorageSchemaPostgres {
err = pgw.EnsureMetricDbnameTime(pgPartBoundsDbName, forceRecreatePGMetricPartitions)
} else if pgw.MetricSchema == metrics.MetricSchemaTimescale {
} else if pgw.MetricSchema == DbStorageSchemaTimescale {
err = pgw.EnsureMetricTimescale(pgPartBounds, forceRecreatePGMetricPartitions)
} else {
logger.Fatal("should never happen...")
Expand Down Expand Up @@ -271,11 +288,11 @@ func (pgw *PostgresWriter) Write(msgs []metrics.MetricStoreMessage) error {
}

func (pgw *PostgresWriter) EnsureMetric(pgPartBounds map[string]ExistingPartitionInfo, force bool) (err error) {
logger := log.GetLogger(pgw.ctx)
logger := log.GetLogger(pgw.Ctx)
sqlEnsure := `select * from admin.ensure_partition_metric($1)`
for metric := range pgPartBounds {
if _, ok := partitionMapMetric[metric]; !ok || force {
if _, err = pgw.SinkDb.Exec(pgw.ctx, sqlEnsure, metric); err != nil {
if _, err = pgw.SinkDb.Exec(pgw.Ctx, sqlEnsure, metric); err != nil {
logger.Errorf("Failed to create partition on metric '%s': %w", metric, err)
return err
}
Expand All @@ -287,7 +304,7 @@ func (pgw *PostgresWriter) EnsureMetric(pgPartBounds map[string]ExistingPartitio

// EnsureMetricTime creates special partitions if Timescale used for realtime metrics
func (pgw *PostgresWriter) EnsureMetricTime(pgPartBounds map[string]ExistingPartitionInfo, force bool) error {
logger := log.GetLogger(pgw.ctx)
logger := log.GetLogger(pgw.Ctx)
sqlEnsure := `select * from admin.ensure_partition_metric_time($1, $2)`
for metric, pb := range pgPartBounds {
if !strings.HasSuffix(metric, "_realtime") {
Expand All @@ -299,15 +316,15 @@ func (pgw *PostgresWriter) EnsureMetricTime(pgPartBounds map[string]ExistingPart

partInfo, ok := partitionMapMetric[metric]
if !ok || (ok && (pb.StartTime.Before(partInfo.StartTime))) || force {
err := pgw.SinkDb.QueryRow(pgw.ctx, sqlEnsure, metric, pb.StartTime).Scan(&partInfo)
err := pgw.SinkDb.QueryRow(pgw.Ctx, sqlEnsure, metric, pb.StartTime).Scan(&partInfo)
if err != nil {
logger.Error("Failed to create partition on 'metrics':", err)
return err
}
partitionMapMetric[metric] = partInfo
}
if pb.EndTime.After(partInfo.EndTime) || force {
err := pgw.SinkDb.QueryRow(pgw.ctx, sqlEnsure, metric, pb.EndTime).Scan(&partInfo.EndTime)
err := pgw.SinkDb.QueryRow(pgw.Ctx, sqlEnsure, metric, pb.EndTime).Scan(&partInfo.EndTime)
if err != nil {
logger.Error("Failed to create partition on 'metrics':", err)
return err
Expand All @@ -319,14 +336,14 @@ func (pgw *PostgresWriter) EnsureMetricTime(pgPartBounds map[string]ExistingPart
}

func (pgw *PostgresWriter) EnsureMetricTimescale(pgPartBounds map[string]ExistingPartitionInfo, force bool) (err error) {
logger := log.GetLogger(pgw.ctx)
logger := log.GetLogger(pgw.Ctx)
sqlEnsure := `select * from admin.ensure_partition_timescale($1)`
for metric := range pgPartBounds {
if strings.HasSuffix(metric, "_realtime") {
continue
}
if _, ok := partitionMapMetric[metric]; !ok {
if _, err = pgw.SinkDb.Exec(pgw.ctx, sqlEnsure, metric); err != nil {
if _, err = pgw.SinkDb.Exec(pgw.Ctx, sqlEnsure, metric); err != nil {
logger.Errorf("Failed to create a TimescaleDB table for metric '%s': %v", metric, err)
return err
}
Expand All @@ -351,7 +368,7 @@ func (pgw *PostgresWriter) EnsureMetricDbnameTime(metricDbnamePartBounds map[str
}
partInfo, ok := partitionMapMetricDbname[metric][dbname]
if !ok || (ok && (pb.StartTime.Before(partInfo.StartTime))) || force {
if rows, err = pgw.SinkDb.Query(pgw.ctx, sqlEnsure, metric, dbname, pb.StartTime); err != nil {
if rows, err = pgw.SinkDb.Query(pgw.Ctx, sqlEnsure, metric, dbname, pb.StartTime); err != nil {
return
}
if partInfo, err = pgx.CollectOneRow(rows, pgx.RowToStructByPos[ExistingPartitionInfo]); err != nil {
Expand All @@ -360,7 +377,7 @@ func (pgw *PostgresWriter) EnsureMetricDbnameTime(metricDbnamePartBounds map[str
partitionMapMetricDbname[metric][dbname] = partInfo
}
if pb.EndTime.After(partInfo.EndTime) || pb.EndTime.Equal(partInfo.EndTime) || force {
if rows, err = pgw.SinkDb.Query(pgw.ctx, sqlEnsure, metric, dbname, pb.StartTime); err != nil {
if rows, err = pgw.SinkDb.Query(pgw.Ctx, sqlEnsure, metric, dbname, pb.StartTime); err != nil {
return
}
if partInfo, err = pgx.CollectOneRow(rows, pgx.RowToStructByPos[ExistingPartitionInfo]); err != nil {
Expand All @@ -378,23 +395,23 @@ func (pgw *PostgresWriter) OldPostgresMetricsDeleter() {
if metricAgeDaysThreshold <= 0 {
return
}
logger := log.GetLogger(pgw.ctx)
logger := log.GetLogger(pgw.Ctx)
select {
case <-pgw.ctx.Done():
case <-pgw.Ctx.Done():
return
case <-time.After(time.Hour):
// to reduce distracting log messages at startup
}

for {
if pgw.MetricSchema == metrics.MetricSchemaTimescale {
if pgw.MetricSchema == DbStorageSchemaTimescale {
partsDropped, err := pgw.DropOldTimePartitions(metricAgeDaysThreshold)
if err != nil {
logger.Errorf("Failed to drop old partitions (>%d days) from Postgres: %v", metricAgeDaysThreshold, err)
continue
}
logger.Infof("Dropped %d old metric partitions...", partsDropped)
} else if pgw.MetricSchema == metrics.MetricSchemaPostgres {
} else if pgw.MetricSchema == DbStorageSchemaPostgres {
partsToDrop, err := pgw.GetOldTimePartitions(metricAgeDaysThreshold)
if err != nil {
logger.Errorf("Failed to get a listing of old (>%d days) time partitions from Postgres metrics DB - check that the admin.get_old_time_partitions() function is rolled out: %v", metricAgeDaysThreshold, err)
Expand All @@ -407,7 +424,7 @@ func (pgw *PostgresWriter) OldPostgresMetricsDeleter() {
sqlDropTable := `DROP TABLE IF EXISTS ` + pgx.Identifier{toDrop}.Sanitize()
logger.Debugf("Dropping old metric data partition: %s", toDrop)

if _, err := pgw.SinkDb.Exec(pgw.ctx, sqlDropTable); err != nil {
if _, err := pgw.SinkDb.Exec(pgw.Ctx, sqlDropTable); err != nil {
logger.Errorf("Failed to drop old partition %s from Postgres metrics DB: %w", toDrop, err)
time.Sleep(time.Second * 300)
} else {
Expand All @@ -419,15 +436,15 @@ func (pgw *PostgresWriter) OldPostgresMetricsDeleter() {
}
}
select {
case <-pgw.ctx.Done():
case <-pgw.Ctx.Done():
return
case <-time.After(time.Hour * 12):
}
}
}

func (pgw *PostgresWriter) UniqueDbnamesListingMaintainer() {
logger := log.GetLogger(pgw.ctx)
logger := log.GetLogger(pgw.Ctx)
// due to metrics deletion the listing can go out of sync (a trigger not really wanted)
sqlGetAdvisoryLock := `SELECT pg_try_advisory_lock(1571543679778230000) AS have_lock` // 1571543679778230000 is just a random bigint
sqlTopLevelMetrics := `SELECT table_name FROM admin.get_top_level_metric_tables()`
Expand All @@ -446,13 +463,13 @@ func (pgw *PostgresWriter) UniqueDbnamesListingMaintainer() {

for {
select {
case <-pgw.ctx.Done():
case <-pgw.Ctx.Done():
return
case <-time.After(time.Hour * 24):
}
var lock bool
logger.Infof("Trying to get metricsDb listing maintaner advisory lock...") // to only have one "maintainer" in case of a "push" setup, as can get costly
if err := pgw.SinkDb.QueryRow(pgw.ctx, sqlGetAdvisoryLock).Scan(&lock); err != nil {
if err := pgw.SinkDb.QueryRow(pgw.Ctx, sqlGetAdvisoryLock).Scan(&lock); err != nil {
logger.Error("Getting metricsDb listing maintaner advisory lock failed:", err)
continue
}
Expand All @@ -462,7 +479,7 @@ func (pgw *PostgresWriter) UniqueDbnamesListingMaintainer() {
}

logger.Info("Refreshing admin.all_distinct_dbname_metrics listing table...")
rows, _ := pgw.SinkDb.Query(pgw.ctx, sqlTopLevelMetrics)
rows, _ := pgw.SinkDb.Query(pgw.Ctx, sqlTopLevelMetrics)
allDistinctMetricTables, err := pgx.CollectRows(rows, pgx.RowTo[string])
if err != nil {
logger.Error(err)
Expand All @@ -475,7 +492,7 @@ func (pgw *PostgresWriter) UniqueDbnamesListingMaintainer() {
metricName := strings.Replace(tableName, "public.", "", 1)

logger.Debugf("Refreshing all_distinct_dbname_metrics listing for metric: %s", metricName)
rows, _ := pgw.SinkDb.Query(pgw.ctx, fmt.Sprintf(sqlDistinct, tableName, tableName))
rows, _ := pgw.SinkDb.Query(pgw.Ctx, fmt.Sprintf(sqlDistinct, tableName, tableName))
ret, err := pgx.CollectRows(rows, pgx.RowTo[string])
// ret, err := DBExecRead(mainContext, metricDb, fmt.Sprintf(sqlDistinct, tableName, tableName))
if err != nil {
Expand All @@ -493,19 +510,19 @@ func (pgw *PostgresWriter) UniqueDbnamesListingMaintainer() {
if len(foundDbnamesArr) == 0 { // delete all entries for given metric
logger.Debugf("Deleting Postgres all_distinct_dbname_metrics listing table entries for metric '%s':", metricName)

_, err = pgw.SinkDb.Exec(pgw.ctx, sqlDeleteAll, metricName)
_, err = pgw.SinkDb.Exec(pgw.Ctx, sqlDeleteAll, metricName)
if err != nil {
logger.Errorf("Could not delete Postgres all_distinct_dbname_metrics listing table entries for metric '%s': %s", metricName, err)
}
continue
}
cmdTag, err := pgw.SinkDb.Exec(pgw.ctx, sqlDelete, foundDbnamesArr, metricName)
cmdTag, err := pgw.SinkDb.Exec(pgw.Ctx, sqlDelete, foundDbnamesArr, metricName)
if err != nil {
logger.Errorf("Could not refresh Postgres all_distinct_dbname_metrics listing table for metric '%s': %s", metricName, err)
} else if cmdTag.RowsAffected() > 0 {
logger.Infof("Removed %d stale entries from all_distinct_dbname_metrics listing table for metric: %s", cmdTag.RowsAffected(), metricName)
}
cmdTag, err = pgw.SinkDb.Exec(pgw.ctx, sqlAdd, foundDbnamesArr, metricName)
cmdTag, err = pgw.SinkDb.Exec(pgw.Ctx, sqlAdd, foundDbnamesArr, metricName)
if err != nil {
logger.Errorf("Could not refresh Postgres all_distinct_dbname_metrics listing table for metric '%s': %s", metricName, err)
} else if cmdTag.RowsAffected() > 0 {
Expand All @@ -519,13 +536,13 @@ func (pgw *PostgresWriter) UniqueDbnamesListingMaintainer() {

func (pgw *PostgresWriter) DropOldTimePartitions(metricAgeDaysThreshold int) (res int, err error) {
sqlOldPart := `select admin.drop_old_time_partitions($1, $2)`
err = pgw.SinkDb.QueryRow(pgw.ctx, sqlOldPart, metricAgeDaysThreshold, false).Scan(&res)
err = pgw.SinkDb.QueryRow(pgw.Ctx, sqlOldPart, metricAgeDaysThreshold, false).Scan(&res)
return
}

func (pgw *PostgresWriter) GetOldTimePartitions(metricAgeDaysThreshold int) ([]string, error) {
sqlGetOldParts := `select admin.get_old_time_partitions($1)`
rows, err := pgw.SinkDb.Query(pgw.ctx, sqlGetOldParts, metricAgeDaysThreshold)
rows, err := pgw.SinkDb.Query(pgw.Ctx, sqlGetOldParts, metricAgeDaysThreshold)
if err == nil {
return pgx.CollectRows(rows, pgx.RowTo[string])
}
Expand All @@ -538,6 +555,6 @@ func (pgw *PostgresWriter) AddDBUniqueMetricToListingTable(dbUnique, metric stri
where not exists (
select * from admin.all_distinct_dbname_metrics where dbname = $1 and metric = $2
)`
_, err := pgw.SinkDb.Exec(pgw.ctx, sql, dbUnique, metric)
_, err := pgw.SinkDb.Exec(pgw.Ctx, sql, dbUnique, metric)
return err
}
19 changes: 11 additions & 8 deletions src/metrics/io_test.go → src/metrics/sinks/postgres_test.go
Original file line number Diff line number Diff line change
@@ -1,29 +1,32 @@
package metrics_test
package sinks_test

import (
"context"
"errors"
"testing"

"github.com/cybertec-postgresql/pgwatch3/metrics"
"github.com/cybertec-postgresql/pgwatch3/metrics/sinks"
"github.com/pashagolub/pgxmock/v3"
"github.com/stretchr/testify/assert"
)

var ctx = context.Background()

func TestGetMetricSchemaType(t *testing.T) {
func TestReadMetricSchemaType(t *testing.T) {
conn, err := pgxmock.NewPool()
assert.NoError(t, err)

pgw := sinks.PostgresWriter{
Ctx: ctx,
SinkDb: conn,
}

conn.ExpectQuery("SELECT schema_type").
WillReturnError(errors.New("expected"))
_, err = metrics.GetMetricSchemaType(ctx, conn)
assert.Error(t, err)
assert.Error(t, pgw.ReadMetricSchemaType())

conn.ExpectQuery("SELECT schema_type").
WillReturnRows(pgxmock.NewRows([]string{"schema_type"}).AddRow(true))
schemaType, err := metrics.GetMetricSchemaType(context.Background(), conn)
assert.NoError(t, err)
assert.Equal(t, metrics.MetricSchemaTimescale, schemaType)
assert.NoError(t, pgw.ReadMetricSchemaType())
assert.Equal(t, sinks.DbStorageSchemaTimescale, pgw.MetricSchema)
}

0 comments on commit 7df6d62

Please sign in to comment.