Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: no full vacuum for various reports tables #5120

Merged
merged 9 commits into from
Sep 27, 2024
18 changes: 17 additions & 1 deletion enterprise/reporting/error_reporting.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
"go.uber.org/atomic"
"golang.org/x/sync/errgroup"

"github.com/rudderlabs/rudder-go-kit/bytesize"
"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"
Expand Down Expand Up @@ -158,12 +159,12 @@
if !edr.config.GetBool("Reporting.errorReporting.syncer.enabled", true) {
return func() {}
}
if _, err := dbHandle.ExecContext(
context.Background(),
fmt.Sprintf("vacuum full analyze %s", pq.QuoteIdentifier(ErrorDetailReportsTable)),
); err != nil {
edr.log.Errorn("error full vacuuming", logger.NewStringField("table", ErrorDetailReportsTable), obskit.Error(err))
panic(err)

Check warning on line 167 in enterprise/reporting/error_reporting.go

View check run for this annotation

Codecov / codecov/patch

enterprise/reporting/error_reporting.go#L162-L167

Added lines #L162 - L167 were not covered by tests
}

return func() {
Expand Down Expand Up @@ -362,21 +363,26 @@
// Aggregate
// Send in a separate go-routine
// Delete in a separate go-routine
var (
deletedRows int
vacuumDeletedRowThreshold = edr.config.GetReloadableIntVar(
100000, 1,
"Reporting.errorReporting.vacuumThresholdDeletedRows",
"Reporting.vacuumThresholdDeletedRows",
)
lastVacuum time.Time
vacuumInterval = edr.config.GetReloadableDurationVar(
15,
time.Minute,
"Reporting.errorReporting.vacuumInterval",
"Reporting.vacuumInterval",
)
vacuumThresholdBytes = config.GetReloadableInt64Var(
10*bytesize.GB, 1,
"Reporting.errorReporting.vacuumThresholdBytes",
"Reporting.vacuumThresholdBytes",
)
)

Check warning on line 385 in enterprise/reporting/error_reporting.go

View check run for this annotation

Codecov / codecov/patch

enterprise/reporting/error_reporting.go#L366-L385

Added lines #L366 - L385 were not covered by tests
for {
if ctx.Err() != nil {
edr.log.Infof("stopping mainLoop for syncer %s : %s", c.Label, ctx.Err())
Expand Down Expand Up @@ -449,15 +455,25 @@
errorDetailReportsDeleteQueryTimer.Since(deleteReportsStart)
if err != nil {
edr.log.Errorf("[ Error Detail Reporting ]: Error deleting local reports from %s: %v", ErrorDetailReportsTable, err)
} else {
deletedRows += len(reports)

Check warning on line 459 in enterprise/reporting/error_reporting.go

View check run for this annotation

Codecov / codecov/patch

enterprise/reporting/error_reporting.go#L458-L459

Added lines #L458 - L459 were not covered by tests
}
// vacuum error_reports_details table
var sizeEstimate int64
if err := dbHandle.QueryRowContext(
ctx,
`SELECT pg_table_size(oid) from pg_class where relname = $1`, ErrorDetailReportsTable,
fracasula marked this conversation as resolved.
Show resolved Hide resolved
).Scan(&sizeEstimate); err != nil {
edr.log.Errorn(
fmt.Sprintf(`Error getting %s table size estimate`, ErrorDetailReportsTable),
logger.NewErrorField(err),
)

Check warning on line 470 in enterprise/reporting/error_reporting.go

View check run for this annotation

Codecov / codecov/patch

enterprise/reporting/error_reporting.go#L462-L470

Added lines #L462 - L470 were not covered by tests
}
if deletedRows >= vacuumDeletedRowThreshold.Load() ||
time.Since(lastVacuum) >= vacuumInterval.Load() {
(sizeEstimate >= vacuumThresholdBytes.Load() && time.Since(lastVacuum) >= vacuumInterval.Load()) {
if err := edr.vacuum(ctx, dbHandle, tags); err == nil {
deletedRows = 0
lastVacuum = time.Now()

Check warning on line 476 in enterprise/reporting/error_reporting.go

View check run for this annotation

Codecov / codecov/patch

enterprise/reporting/error_reporting.go#L472-L476

Added lines #L472 - L476 were not covered by tests
}
}
}
Expand All @@ -476,28 +492,28 @@
}
}

func (edr *ErrorDetailReporter) vacuum(ctx context.Context, dbHandle *sql.DB, tags stats.Tags) error {
defer edr.stats.NewTaggedStat(StatReportingVacuumDuration, stats.TimerType, tags).RecordDuration()()
var query string
var full bool
if edr.vacuumFull.Load() {
query = fmt.Sprintf("vacuum full analyze %s", pq.QuoteIdentifier(ErrorDetailReportsTable))
full = true
} else {
query = fmt.Sprintf("vacuum analyze %s", pq.QuoteIdentifier(ErrorDetailReportsTable))

Check warning on line 503 in enterprise/reporting/error_reporting.go

View check run for this annotation

Codecov / codecov/patch

enterprise/reporting/error_reporting.go#L495-L503

Added lines #L495 - L503 were not covered by tests
}
_, err := dbHandle.ExecContext(ctx, query)
if err != nil {

Check warning on line 506 in enterprise/reporting/error_reporting.go

View check run for this annotation

Codecov / codecov/patch

enterprise/reporting/error_reporting.go#L505-L506

Added lines #L505 - L506 were not covered by tests
edr.log.Errorn(
"error vacuuming",
logger.NewStringField("table", ErrorDetailReportsTable),
obskit.Error(err),
logger.NewBoolField("full", full),

Check warning on line 511 in enterprise/reporting/error_reporting.go

View check run for this annotation

Codecov / codecov/patch

enterprise/reporting/error_reporting.go#L508-L511

Added lines #L508 - L511 were not covered by tests
)
return err

Check warning on line 513 in enterprise/reporting/error_reporting.go

View check run for this annotation

Codecov / codecov/patch

enterprise/reporting/error_reporting.go#L513

Added line #L513 was not covered by tests
}

return nil

Check warning on line 516 in enterprise/reporting/error_reporting.go

View check run for this annotation

Codecov / codecov/patch

enterprise/reporting/error_reporting.go#L516

Added line #L516 was not covered by tests
}

func (edr *ErrorDetailReporter) getReports(ctx context.Context, currentMs int64, syncerKey string) ([]*types.EDReportsDB, int64) {
Expand Down
13 changes: 12 additions & 1 deletion enterprise/reporting/flusher/flusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
"github.com/cenkalti/backoff"
"golang.org/x/sync/errgroup"

"github.com/rudderlabs/rudder-go-kit/bytesize"
"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"
Expand Down Expand Up @@ -43,6 +44,7 @@
aggressiveFlushEnabled config.ValueLoader[bool]
lagThresholdForAggresiveFlushInMins config.ValueLoader[time.Duration]
vacuumThresholdDeletedRows config.ValueLoader[int]
vacuumThresholdBytes config.ValueLoader[int64]
deletedRows int
lastVacuum time.Time
vacuumFull config.ValueLoader[bool]
Expand Down Expand Up @@ -81,6 +83,7 @@
lagThresholdForAggresiveFlushInMins := conf.GetReloadableDurationVar(5, time.Minute, "Reporting.flusher.lagThresholdForAggresiveFlushInMins")
vacuumThresholdDeletedRows := conf.GetReloadableIntVar(100000, 1, "Reporting.flusher.vacuumThresholdDeletedRows")
vacuumInterval := conf.GetReloadableDurationVar(15, time.Minute, "Reporting.flusher.vacuumInterval", "Reporting.vacuumInterval")
vacuumThresholdBytes := conf.GetReloadableInt64Var(10*bytesize.GB, 1, "Reporting.flusher.vacuumThresholdBytes", "Reporting.vacuumThresholdBytes")

tr := &http.Transport{}
client := &http.Client{Transport: tr, Timeout: config.GetDuration("HttpClient.reporting.timeout", 60, time.Second)}
Expand All @@ -100,6 +103,7 @@
vacuumThresholdDeletedRows: vacuumThresholdDeletedRows,
vacuumFull: conf.GetReloadableBoolVar(false, "Reporting.flusher.vacuumFull", "Reporting.vacuumFull"),
vacuumInterval: vacuumInterval,
vacuumThresholdBytes: vacuumThresholdBytes,

table: table,
aggregator: aggregator,
Expand All @@ -116,8 +120,8 @@
if _, err := db.Exec(
fmt.Sprintf("vacuum full analyze %s", pq.QuoteIdentifier(table)),
); err != nil {
log.Errorn("error full vacuuming", logger.NewStringField("table", table), obskit.Error(err))
return nil, fmt.Errorf("error full vacuuming %s table %w", table, err)

Check warning on line 124 in enterprise/reporting/flusher/flusher.go

View check run for this annotation

Codecov / codecov/patch

enterprise/reporting/flusher/flusher.go#L123-L124

Added lines #L123 - L124 were not covered by tests
}
return &f, nil
}
Expand Down Expand Up @@ -336,26 +340,33 @@
func (f *Flusher) vacuum(ctx context.Context) error {
var query string
var full bool
var sizeEstimate int64
if err := f.db.QueryRowContext(
ctx, `SELECT pg_table_size(oid) from pg_class where relname = $1`, f.table,
).Scan(&sizeEstimate); err != nil {
return fmt.Errorf("error getting table size %w", err)
}
if f.deletedRows >= f.vacuumThresholdDeletedRows.Load() ||
time.Since(f.lastVacuum) >= f.vacuumInterval.Load() {
(sizeEstimate >= f.vacuumThresholdBytes.Load() && time.Since(f.lastVacuum) >= f.vacuumInterval.Load()) {
vacuumStart := time.Now()
defer f.vacuumReportsTimer.Since(vacuumStart)
if f.vacuumFull.Load() {
full = true
query = fmt.Sprintf("vacuum full analyze %s", pq.QuoteIdentifier(f.table))
} else {
query = fmt.Sprintf("vacuum analyze %s", pq.QuoteIdentifier(f.table))

Check warning on line 357 in enterprise/reporting/flusher/flusher.go

View check run for this annotation

Codecov / codecov/patch

enterprise/reporting/flusher/flusher.go#L352-L357

Added lines #L352 - L357 were not covered by tests
}
if _, err := f.db.ExecContext(ctx, query); err != nil {
f.log.Errorn(
"error vacuuming",
logger.NewStringField("table", f.table),
obskit.Error(err),
logger.NewBoolField("full", full),
)

Check warning on line 365 in enterprise/reporting/flusher/flusher.go

View check run for this annotation

Codecov / codecov/patch

enterprise/reporting/flusher/flusher.go#L359-L365

Added lines #L359 - L365 were not covered by tests
return fmt.Errorf("error vacuuming table %w", err)
}
f.lastVacuum = time.Now()
f.deletedRows = 0

Check warning on line 369 in enterprise/reporting/flusher/flusher.go

View check run for this annotation

Codecov / codecov/patch

enterprise/reporting/flusher/flusher.go#L368-L369

Added lines #L368 - L369 were not covered by tests
}
return nil
}
Expand Down
18 changes: 15 additions & 3 deletions enterprise/reporting/reporting.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
"github.com/cenkalti/backoff/v4"
"github.com/lib/pq"

"github.com/rudderlabs/rudder-go-kit/bytesize"
"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"
Expand Down Expand Up @@ -161,8 +162,8 @@
return func() {}
}
if _, err := dbHandle.ExecContext(context.Background(), `vacuum full analyze reports;`); err != nil {
r.log.Errorn(`[ Reporting ]: Error full vacuuming reports table`, logger.NewErrorField(err))
panic(err)

Check warning on line 166 in enterprise/reporting/reporting.go

View check run for this annotation

Codecov / codecov/patch

enterprise/reporting/reporting.go#L165-L166

Added lines #L165 - L166 were not covered by tests
}
return func() {
r.g.Go(func() error {
Expand Down Expand Up @@ -385,8 +386,9 @@
var (
deletedRows int
vacuumDeletedRowsThreshold = config.GetReloadableIntVar(100000, 1, "Reporting.vacuumThresholdDeletedRows")
lastVaccum time.Time
lastVacuum time.Time
vacuumInterval = config.GetReloadableDurationVar(15, time.Minute, "Reporting.vacuumInterval")
vacuumThresholdBytes = config.GetReloadableInt64Var(10*bytesize.GB, 1, "Reporting.vacuumThresholdBytes")
)
for {
if ctx.Err() != nil {
Expand Down Expand Up @@ -464,11 +466,21 @@
deletedRows += len(reports)
}

var sizeEstimate int64
if err := dbHandle.QueryRowContext(
ctx,
fmt.Sprintf(`SELECT pg_table_size(oid) from pg_class where relname='%s';`, ReportsTable),
).Scan(&sizeEstimate); err != nil {
r.log.Errorn(
`[ Reporting ]: Error getting table size estimate`,
logger.NewErrorField(err),
)
}
if deletedRows >= vacuumDeletedRowsThreshold.Load() ||
time.Since(lastVaccum) > vacuumInterval.Load() {
(sizeEstimate >= vacuumThresholdBytes.Load() && time.Since(lastVacuum) > vacuumInterval.Load()) {
if err := r.vacuum(ctx, dbHandle, tags); err == nil {
deletedRows = 0
lastVaccum = time.Now()
lastVacuum = time.Now()

Check warning on line 483 in enterprise/reporting/reporting.go

View check run for this annotation

Codecov / codecov/patch

enterprise/reporting/reporting.go#L481-L483

Added lines #L481 - L483 were not covered by tests
}
}
}
Expand All @@ -488,27 +500,27 @@
}
}

func (r *DefaultReporter) vacuum(ctx context.Context, db *sql.DB, tags stats.Tags) error {
defer r.stats.NewTaggedStat(StatReportingVacuumDuration, stats.TimerType, tags).RecordDuration()()
var (
query string
full bool
)
if r.vacuumFull.Load() {
full = true
query = `vacuum full analyze reports;`
} else {
query = `vacuum analyze reports;`

Check warning on line 513 in enterprise/reporting/reporting.go

View check run for this annotation

Codecov / codecov/patch

enterprise/reporting/reporting.go#L503-L513

Added lines #L503 - L513 were not covered by tests
}
_, err := db.ExecContext(ctx, query)
if err != nil {
r.log.Errorn(
`[ Reporting ]: Error vacuuming reports table`,
obskit.Error(err),
logger.NewBoolField("full", full),
)

Check warning on line 521 in enterprise/reporting/reporting.go

View check run for this annotation

Codecov / codecov/patch

enterprise/reporting/reporting.go#L515-L521

Added lines #L515 - L521 were not covered by tests
}
return nil

Check warning on line 523 in enterprise/reporting/reporting.go

View check run for this annotation

Codecov / codecov/patch

enterprise/reporting/reporting.go#L523

Added line #L523 was not covered by tests
}

func (r *DefaultReporter) sendMetric(ctx context.Context, netClient *http.Client, label string, metric *types.Metric) error {
Expand Down
Loading