Skip to content

Commit

Permalink
VAULT-15395: Support mocking time functions in the activity log (#20720)
Browse files Browse the repository at this point in the history
* mock time in the activity log

* cleanup

* fix comment

* pr fixes

* update comment to explain why new timer is needed
  • Loading branch information
miagilepner authored May 23, 2023
1 parent 476bec1 commit 018ea84
Show file tree
Hide file tree
Showing 7 changed files with 148 additions and 52 deletions.
25 changes: 5 additions & 20 deletions helper/metricsutil/gauge_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,24 +11,9 @@ import (

"github.com/armon/go-metrics"
log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/vault/helper/timeutil"
)

// This interface allows unit tests to substitute in a simulated clock.
type clock interface {
Now() time.Time
NewTicker(time.Duration) *time.Ticker
}

type defaultClock struct{}

func (_ defaultClock) Now() time.Time {
return time.Now()
}

func (_ defaultClock) NewTicker(d time.Duration) *time.Ticker {
return time.NewTicker(d)
}

// GaugeLabelValues is one gauge in a set sharing a single key, that
// are measured in a batch.
type GaugeLabelValues struct {
Expand Down Expand Up @@ -76,7 +61,7 @@ type GaugeCollectionProcess struct {
maxGaugeCardinality int

// time source
clock clock
clock timeutil.Clock
}

// NewGaugeCollectionProcess creates a new collection process for the callback
Expand All @@ -101,7 +86,7 @@ func NewGaugeCollectionProcess(
gaugeInterval,
maxGaugeCardinality,
logger,
defaultClock{},
timeutil.DefaultClock{},
)
}

Expand All @@ -124,7 +109,7 @@ func (m *ClusterMetricSink) NewGaugeCollectionProcess(
m.GaugeInterval,
m.MaxGaugeCardinality,
logger,
defaultClock{},
timeutil.DefaultClock{},
)
}

Expand All @@ -137,7 +122,7 @@ func newGaugeCollectionProcessWithClock(
gaugeInterval time.Duration,
maxGaugeCardinality int,
logger log.Logger,
clock clock,
clock timeutil.Clock,
) (*GaugeCollectionProcess, error) {
process := &GaugeCollectionProcess{
stop: make(chan struct{}, 1),
Expand Down
6 changes: 4 additions & 2 deletions helper/metricsutil/gauge_process_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/armon/go-metrics"
log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/vault/helper/timeutil"
)

// SimulatedTime maintains a virtual clock so the test isn't
Expand All @@ -24,9 +25,10 @@ import (
type SimulatedTime struct {
now time.Time
tickerBarrier chan *SimulatedTicker
timeutil.DefaultClock
}

var _ clock = &SimulatedTime{}
var _ timeutil.Clock = &SimulatedTime{}

type SimulatedTicker struct {
ticker *time.Ticker
Expand Down Expand Up @@ -121,7 +123,7 @@ func TestGauge_Creation(t *testing.T) {
t.Fatalf("Error creating collection process: %v", err)
}

if _, ok := p.clock.(defaultClock); !ok {
if _, ok := p.clock.(timeutil.DefaultClock); !ok {
t.Error("Default clock not installed.")
}

Expand Down
23 changes: 23 additions & 0 deletions helper/timeutil/timeutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,3 +142,26 @@ func SkipAtEndOfMonth(t *testing.T) {
t.Skip("too close to end of month")
}
}

// This interface allows unit tests to substitute in a simulated Clock.
type Clock interface {
Now() time.Time
NewTicker(time.Duration) *time.Ticker
NewTimer(time.Duration) *time.Timer
}

type DefaultClock struct{}

var _ Clock = (*DefaultClock)(nil)

func (_ DefaultClock) Now() time.Time {
return time.Now()
}

func (_ DefaultClock) NewTicker(d time.Duration) *time.Ticker {
return time.NewTicker(d)
}

func (_ DefaultClock) NewTimer(d time.Duration) *time.Timer {
return time.NewTimer(d)
}
67 changes: 43 additions & 24 deletions vault/activity_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,12 @@ type ActivityLog struct {
// CensusReportInterval is the testing configuration for time between
// Write() calls initiated in CensusReport.
CensusReportInterval time.Duration

// clock is used to support manipulating time in unit and integration tests
clock timeutil.Clock
// precomputedQueryWritten receives an element whenever a precomputed query
// is written. It's used for unit testing
precomputedQueryWritten chan struct{}
}

// These non-persistent configuration options allow us to disable
Expand All @@ -205,6 +211,10 @@ type ActivityLogCoreConfig struct {

// MinimumRetentionMonths defines the minimum value for retention
MinimumRetentionMonths int

// Clock holds a custom clock to modify time.Now, time.Ticker, time.Timer.
// If nil, the default functions from the time package are used
Clock timeutil.Clock
}

// NewActivityLog creates an activity log.
Expand All @@ -214,6 +224,10 @@ func NewActivityLog(core *Core, logger log.Logger, view *BarrierView, metrics me
return nil, err
}

clock := core.activityLogConfig.Clock
if clock == nil {
clock = timeutil.DefaultClock{}
}
a := &ActivityLog{
core: core,
configOverrides: &core.activityLogConfig,
Expand All @@ -227,7 +241,7 @@ func NewActivityLog(core *Core, logger log.Logger, view *BarrierView, metrics me
doneCh: make(chan struct{}, 1),
partialMonthClientTracker: make(map[string]*activity.EntityRecord),
CensusReportInterval: time.Hour * 1,

clock: clock,
currentSegment: segmentInfo{
startTimestamp: 0,
currentClients: &activity.EntityActivityLog{
Expand All @@ -243,6 +257,7 @@ func NewActivityLog(core *Core, logger log.Logger, view *BarrierView, metrics me
},
standbyFragmentsReceived: make([]*activity.LogFragment, 0),
inprocessExport: atomic.NewBool(false),
precomputedQueryWritten: make(chan struct{}),
}

config, err := a.loadConfigOrDefault(core.activeContext)
Expand Down Expand Up @@ -274,7 +289,7 @@ func (a *ActivityLog) saveCurrentSegmentToStorage(ctx context.Context, force boo
// :force: forces a save of tokens/entities even if the in-memory log is empty
func (a *ActivityLog) saveCurrentSegmentToStorageLocked(ctx context.Context, force bool) error {
defer a.metrics.MeasureSinceWithLabels([]string{"core", "activity", "segment_write"},
time.Now(), []metricsutil.Label{})
a.clock.Now(), []metricsutil.Label{})

// Swap out the pending fragments
a.fragmentLock.Lock()
Expand Down Expand Up @@ -433,7 +448,7 @@ func (a *ActivityLog) saveCurrentSegmentInternal(ctx context.Context, force bool
case err != nil:
a.logger.Error(fmt.Sprintf("unable to retrieve oldest version timestamp: %s", err.Error()))
case len(a.currentSegment.tokenCount.CountByNamespaceID) > 0 &&
(oldestUpgradeTime.Add(time.Duration(trackedTWESegmentPeriod * time.Hour)).Before(time.Now())):
(oldestUpgradeTime.Add(time.Duration(trackedTWESegmentPeriod * time.Hour)).Before(a.clock.Now())):
a.logger.Error(fmt.Sprintf("storing nonzero token count over a month after vault was upgraded to %s", oldestVersion))
default:
if len(a.currentSegment.tokenCount.CountByNamespaceID) > 0 {
Expand Down Expand Up @@ -1005,7 +1020,7 @@ func (a *ActivityLog) SetConfig(ctx context.Context, config activityConfig) {

forceSave := false
if a.enabled && a.currentSegment.startTimestamp == 0 {
a.startNewCurrentLogLocked(time.Now().UTC())
a.startNewCurrentLogLocked(a.clock.Now().UTC())
// Force a save so we can distinguish between
//
// Month N-1: present
Expand All @@ -1031,7 +1046,7 @@ func (a *ActivityLog) SetConfig(ctx context.Context, config activityConfig) {
}

// check for segments out of retention period, if it has changed
go a.retentionWorker(ctx, time.Now(), a.retentionMonths)
go a.retentionWorker(ctx, a.clock.Now(), a.retentionMonths)
}

// update the enable flag and reset the current log
Expand Down Expand Up @@ -1097,7 +1112,7 @@ func (c *Core) setupActivityLogLocked(ctx context.Context, wg *sync.WaitGroup) e
c.activityLog = manager

// load activity log for "this month" into memory
err = manager.refreshFromStoredLog(manager.core.activeContext, wg, time.Now().UTC())
err = manager.refreshFromStoredLog(manager.core.activeContext, wg, manager.clock.Now().UTC())
if err != nil {
return err
}
Expand All @@ -1121,7 +1136,7 @@ func (c *Core) setupActivityLogLocked(ctx context.Context, wg *sync.WaitGroup) e
// Signal when this is done so that unit tests can proceed.
manager.retentionDone = make(chan struct{})
go func(months int) {
manager.retentionWorker(ctx, time.Now(), months)
manager.retentionWorker(ctx, manager.clock.Now(), months)
close(manager.retentionDone)
}(manager.retentionMonths)

Expand Down Expand Up @@ -1158,7 +1173,7 @@ func (a *ActivityLog) StartOfNextMonth() time.Time {
defer a.l.RUnlock()
var segmentStart time.Time
if a.currentSegment.startTimestamp == 0 {
segmentStart = time.Now().UTC()
segmentStart = a.clock.Now().UTC()
} else {
segmentStart = time.Unix(a.currentSegment.startTimestamp, 0).UTC()
}
Expand All @@ -1170,12 +1185,12 @@ func (a *ActivityLog) StartOfNextMonth() time.Time {
// perfStandbyFragmentWorker handles scheduling fragments
// to send via RPC; it runs on perf standby nodes only.
func (a *ActivityLog) perfStandbyFragmentWorker(ctx context.Context) {
timer := time.NewTimer(time.Duration(0))
timer := a.clock.NewTimer(time.Duration(0))
fragmentWaiting := false
// Eat first event, so timer is stopped
<-timer.C

endOfMonth := time.NewTimer(a.StartOfNextMonth().Sub(time.Now()))
endOfMonth := a.clock.NewTimer(a.StartOfNextMonth().Sub(a.clock.Now()))
if a.configOverrides.DisableTimers {
endOfMonth.Stop()
}
Expand Down Expand Up @@ -1247,18 +1262,18 @@ func (a *ActivityLog) perfStandbyFragmentWorker(ctx context.Context) {
// Set timer for next month.
// The current segment *probably* hasn't been set yet (via invalidation),
// so don't rely on it.
target := timeutil.StartOfNextMonth(time.Now().UTC())
endOfMonth.Reset(target.Sub(time.Now()))
target := timeutil.StartOfNextMonth(a.clock.Now().UTC())
endOfMonth.Reset(target.Sub(a.clock.Now()))
}
}
}

// activeFragmentWorker handles scheduling the write of the next
// segment. It runs on active nodes only.
func (a *ActivityLog) activeFragmentWorker(ctx context.Context) {
ticker := time.NewTicker(activitySegmentInterval)
ticker := a.clock.NewTicker(activitySegmentInterval)

endOfMonth := time.NewTimer(a.StartOfNextMonth().Sub(time.Now()))
endOfMonth := a.clock.NewTimer(a.StartOfNextMonth().Sub(a.clock.Now()))
if a.configOverrides.DisableTimers {
endOfMonth.Stop()
}
Expand Down Expand Up @@ -1308,7 +1323,7 @@ func (a *ActivityLog) activeFragmentWorker(ctx context.Context) {

// Reset the schedule to wait 10 minutes from this forced write.
ticker.Stop()
ticker = time.NewTicker(activitySegmentInterval)
ticker = a.clock.NewTicker(activitySegmentInterval)

// Simpler, but ticker.Reset was introduced in go 1.15:
// ticker.Reset(activitySegmentInterval)
Expand All @@ -1324,7 +1339,7 @@ func (a *ActivityLog) activeFragmentWorker(ctx context.Context) {
go a.retentionWorker(ctx, currentTime.UTC(), a.retentionMonths)
a.l.RUnlock()

delta := a.StartOfNextMonth().Sub(time.Now())
delta := a.StartOfNextMonth().Sub(a.clock.Now())
if delta < 20*time.Minute {
delta = 20 * time.Minute
}
Expand Down Expand Up @@ -1513,7 +1528,7 @@ func (a *ActivityLog) createCurrentFragment() {
Clients: make([]*activity.EntityRecord, 0, 120),
NonEntityTokens: make(map[string]uint64),
}
a.fragmentCreation = time.Now().UTC()
a.fragmentCreation = a.clock.Now().UTC()

// Signal that a new segment is available, start
// the timer to send it.
Expand Down Expand Up @@ -1613,13 +1628,13 @@ func (a *ActivityLog) handleQuery(ctx context.Context, startTime, endTime time.T
// with the endTime equal to the end of the last month, and add in the current month
// data.
precomputedQueryEndTime := endTime
if timeutil.IsCurrentMonth(endTime, time.Now().UTC()) {
if timeutil.IsCurrentMonth(endTime, a.clock.Now().UTC()) {
precomputedQueryEndTime = timeutil.EndOfMonth(timeutil.MonthsPreviousTo(1, timeutil.StartOfMonth(endTime)))
computePartial = true
}

pq := &activity.PrecomputedQuery{}
if startTime.After(precomputedQueryEndTime) && timeutil.IsCurrentMonth(startTime, time.Now().UTC()) {
if startTime.After(precomputedQueryEndTime) && timeutil.IsCurrentMonth(startTime, a.clock.Now().UTC()) {
// We're only calculating the partial month client count. Skip the precomputation
// get call.
pq = &activity.PrecomputedQuery{
Expand Down Expand Up @@ -1794,21 +1809,21 @@ func (a *ActivityLog) handleQuery(ctx context.Context, startTime, endTime time.T
a.sortActivityLogMonthsResponse(months)

// Modify the final month output to make response more consumable based on API request
months = modifyResponseMonths(months, startTime, endTime)
months = a.modifyResponseMonths(months, startTime, endTime)
responseData["months"] = months

return responseData, nil
}

// modifyResponseMonths fills out various parts of the query structure to help
// activity log clients parse the returned query.
func modifyResponseMonths(months []*ResponseMonth, start time.Time, end time.Time) []*ResponseMonth {
func (a *ActivityLog) modifyResponseMonths(months []*ResponseMonth, start time.Time, end time.Time) []*ResponseMonth {
if len(months) == 0 {
return months
}
start = timeutil.StartOfMonth(start)
end = timeutil.EndOfMonth(end)
if timeutil.IsCurrentMonth(end, time.Now().UTC()) {
if timeutil.IsCurrentMonth(end, a.clock.Now().UTC()) {
end = timeutil.EndOfMonth(timeutil.StartOfMonth(end).AddDate(0, -1, 0))
}
modifiedResponseMonths := make([]*ResponseMonth, 0)
Expand Down Expand Up @@ -2328,7 +2343,7 @@ func (a *ActivityLog) precomputedQueryWorker(ctx context.Context) error {
// If there's an intent log, finish it even if the feature is currently disabled.
a.l.RLock()
currentMonth := a.currentSegment.startTimestamp
// Base retention period on the month we are generating (even in the past)--- time.Now()
// Base retention period on the month we are generating (even in the past)--- a.clock.Now()
// would work but this will be easier to control in tests.
retentionWindow := timeutil.MonthsPreviousTo(a.retentionMonths, time.Unix(intent.NextMonth, 0).UTC())
a.l.RUnlock()
Expand Down Expand Up @@ -2396,6 +2411,10 @@ func (a *ActivityLog) precomputedQueryWorker(ctx context.Context) error {

a.logger.Info("finished computing queries", "month", endTime)

select {
case a.precomputedQueryWritten <- struct{}{}:
default:
}
return nil
}

Expand Down Expand Up @@ -2489,7 +2508,7 @@ func (a *ActivityLog) populateNamespaceAndMonthlyBreakdowns() (map[int64]*proces
byNamespace := make(map[string]*processByNamespace)
byMonth := make(map[int64]*processMonth)
for _, e := range a.partialMonthClientTracker {
processClientRecord(e, byNamespace, byMonth, time.Now())
processClientRecord(e, byNamespace, byMonth, a.clock.Now())
}
return byMonth, byNamespace
}
Expand Down
Loading

0 comments on commit 018ea84

Please sign in to comment.