Skip to content

Commit

Permalink
Add unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
tamirms committed Jun 14, 2024
1 parent 08a339f commit ff87c92
Show file tree
Hide file tree
Showing 5 changed files with 371 additions and 31 deletions.
5 changes: 4 additions & 1 deletion services/horizon/internal/db2/history/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,10 @@ type IngestionQ interface {
TruncateIngestStateTables(context.Context) error
DeleteRangeAll(ctx context.Context, start, end int64) (int64, error)
DeleteTransactionsFilteredTmpOlderThan(ctx context.Context, howOldInSeconds uint64) (int64, error)
TryStateVerificationLock(ctx context.Context) (bool, error)
GetNextLedgerSequence(context.Context, uint32) (uint32, bool, error)
TryStateVerificationLock(context.Context) (bool, error)
TryReaperLock(context.Context) (bool, error)
ElderLedger(context.Context, interface{}) error
}

// QAccounts defines account related queries.
Expand Down
4 changes: 1 addition & 3 deletions services/horizon/internal/ingest/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -710,9 +710,7 @@ func (s *system) maybeReapHistory(lastIngestedLedger uint32) {
s.wg.Add(1)
go func() {
defer s.wg.Done()
if err := s.reaper.DeleteUnretainedHistory(s.ctx); err != nil {
log.WithError(err).Warn("could not reap history")
}
s.reaper.DeleteUnretainedHistory(s.ctx)
}()
}

Expand Down
15 changes: 15 additions & 0 deletions services/horizon/internal/ingest/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,21 @@ func (m *mockDBQ) TryStateVerificationLock(ctx context.Context) (bool, error) {
return args.Get(0).(bool), args.Error(1)
}

func (m *mockDBQ) TryReaperLock(ctx context.Context) (bool, error) {
args := m.Called(ctx)
return args.Get(0).(bool), args.Error(1)
}

func (m *mockDBQ) GetNextLedgerSequence(ctx context.Context, start uint32) (uint32, bool, error) {
args := m.Called(ctx, start)
return args.Get(0).(uint32), args.Get(1).(bool), args.Error(2)
}

func (m *mockDBQ) ElderLedger(ctx context.Context, dest interface{}) error {
args := m.Called(ctx, dest)
return args.Error(0)
}

func (m *mockDBQ) GetTx() *sqlx.Tx {
args := m.Called()
if args.Get(0) == nil {
Expand Down
90 changes: 63 additions & 27 deletions services/horizon/internal/ingest/reap.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@ import (

// Reaper represents the history reaping subsystem of horizon.
type Reaper struct {
historyQ *history.Q
reapLockQ *history.Q
historyQ history.IngestionQ
reapLockQ history.IngestionQ
config ReapConfig
logger *logpkg.Entry

totalDuration *prometheus.SummaryVec
totalDeleted *prometheus.SummaryVec
deleteBatchDuration prometheus.Summary
rowsDeleted prometheus.Summary
rowsInBatchDeleted prometheus.Summary

lock sync.Mutex
}
Expand All @@ -36,24 +38,36 @@ type ReapConfig struct {

// NewReaper creates a new Reaper instance
func NewReaper(config ReapConfig, dbSession db.SessionInterface) *Reaper {
r := &Reaper{
historyQ: &history.Q{dbSession.Clone()},
reapLockQ: &history.Q{dbSession.Clone()},
return newReaper(config, &history.Q{dbSession.Clone()}, &history.Q{dbSession.Clone()})
}

func newReaper(config ReapConfig, historyQ, reapLockQ history.IngestionQ) *Reaper {
return &Reaper{
historyQ: historyQ,
reapLockQ: reapLockQ,
config: config,
deleteBatchDuration: prometheus.NewSummary(prometheus.SummaryOpts{
Namespace: "horizon", Subsystem: "reap", Name: "delete_batch_duration",
Namespace: "horizon", Subsystem: "reap", Name: "batch_duration",
Help: "reap batch duration in seconds, sliding window = 10m",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
}),
rowsDeleted: prometheus.NewSummary(prometheus.SummaryOpts{
Namespace: "horizon", Subsystem: "reap", Name: "rows_deleted",
rowsInBatchDeleted: prometheus.NewSummary(prometheus.SummaryOpts{
Namespace: "horizon", Subsystem: "reap", Name: "batch_rows_deleted",
Help: "rows deleted during reap batch , sliding window = 10m",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
}),
totalDuration: prometheus.NewSummaryVec(prometheus.SummaryOpts{
Namespace: "horizon", Subsystem: "reap", Name: "duration",
Help: "reap invocation duration in seconds, sliding window = 10m",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
}, []string{"complete"}),
totalDeleted: prometheus.NewSummaryVec(prometheus.SummaryOpts{
Namespace: "horizon", Subsystem: "reap", Name: "rows_deleted",
Help: "rows deleted during reap invocation, sliding window = 10m",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
}, []string{"complete"}),
logger: log.WithField("subservice", "reaper"),
}

return r
}

// DeleteUnretainedHistory removes all data associated with unretained ledgers.
Expand All @@ -70,7 +84,7 @@ func (r *Reaper) DeleteUnretainedHistory(ctx context.Context) error {
defer r.lock.Unlock()

if err := r.reapLockQ.Begin(ctx); err != nil {
return errors.Wrap(err, "error while acquiring reaper lock transaction")
return errors.Wrap(err, "error while starting reaper lock transaction")
}
defer func() {
if err := r.reapLockQ.Rollback(); err != nil {
Expand Down Expand Up @@ -104,20 +118,40 @@ func (r *Reaper) DeleteUnretainedHistory(ctx context.Context) error {
return nil
}

if err = r.clearBefore(ctx, oldest, targetElder); err != nil {
return err
}
startTime := time.Now()
var totalDeleted int64
var complete bool
totalDeleted, err = r.clearBefore(ctx, oldest, targetElder)
elapsedSeconds := time.Since(startTime).Seconds()
logger := r.logger.
WithField("duration", elapsedSeconds).
WithField("rows_deleted", totalDeleted)

r.logger.
WithField("new_elder", targetElder).
Info("reaper succeeded")
if err != nil {
logger.WithError(err).Warn("reaper failed")
} else {
complete = true
logger.
WithField("new_elder", targetElder).
Info("reaper succeeded")
}

return nil
labels := prometheus.Labels{
"complete": strconv.FormatBool(complete),
}
r.totalDeleted.With(labels).Observe(float64(totalDeleted))
r.totalDuration.With(labels).Observe(elapsedSeconds)
return err
}

// RegisterMetrics registers the prometheus metrics
func (s *Reaper) RegisterMetrics(registry *prometheus.Registry) {
registry.MustRegister(s.deleteBatchDuration, s.rowsDeleted)
registry.MustRegister(
s.deleteBatchDuration,
s.rowsInBatchDeleted,
s.totalDuration,
s.totalDeleted,
)
}

// Work backwards in 50k (by default, otherwise configurable via the CLI) ledger
Expand All @@ -131,10 +165,11 @@ func (s *Reaper) RegisterMetrics(registry *prometheus.Registry) {
// hour, and slowing it down enough to leave some CPU for other processes.
var sleep = 1 * time.Second

func (r *Reaper) clearBefore(ctx context.Context, startSeq, endSeq uint32) error {
func (r *Reaper) clearBefore(ctx context.Context, startSeq, endSeq uint32) (int64, error) {
batchSize := r.config.ReapBatchSize
var sum int64
if batchSize <= 0 {
return fmt.Errorf("invalid batch size for reaping (%d)", batchSize)
return sum, fmt.Errorf("invalid batch size for reaping (%d)", batchSize)
}

r.logger.WithField("start_ledger", startSeq).
Expand All @@ -150,24 +185,25 @@ func (r *Reaper) clearBefore(ctx context.Context, startSeq, endSeq uint32) error

count, err := r.deleteBatch(ctx, batchStartSeq, batchEndSeq)
if err != nil {
return err
return sum, err
}
sum += count
if count == 0 {
next, ok, err := r.historyQ.GetNextLedgerSequence(ctx, batchStartSeq)
if err != nil {
return errors.Wrapf(err, "could not find next ledger sequence after %d", batchStartSeq)
return sum, errors.Wrapf(err, "could not find next ledger sequence after %d", batchStartSeq)
}
if !ok {
break
}
batchStartSeq = next
} else {
batchStartSeq += batchSize
batchStartSeq += batchSize + 1
}
time.Sleep(sleep)
}

return nil
return sum, nil
}

func (r *Reaper) deleteBatch(ctx context.Context, batchStartSeq, batchEndSeq uint32) (int64, error) {
Expand Down Expand Up @@ -200,7 +236,7 @@ func (r *Reaper) deleteBatch(ctx context.Context, batchStartSeq, batchEndSeq uin
WithField("duration", elapsedSeconds).
Info("successfully deleted batch")

r.rowsDeleted.Observe(float64(count))
r.rowsInBatchDeleted.Observe(float64(count))
r.deleteBatchDuration.Observe(elapsedSeconds)
return count, nil
}
Loading

0 comments on commit ff87c92

Please sign in to comment.