Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VAULT-15395: Support mocking time functions in the activity log #20720

Merged
merged 5 commits into from
May 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 5 additions & 20 deletions helper/metricsutil/gauge_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,24 +11,9 @@ import (

"github.com/armon/go-metrics"
log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/vault/helper/timeutil"
)

// This interface allows unit tests to substitute in a simulated clock.
type clock interface {
Now() time.Time
NewTicker(time.Duration) *time.Ticker
}

type defaultClock struct{}

func (_ defaultClock) Now() time.Time {
return time.Now()
}

func (_ defaultClock) NewTicker(d time.Duration) *time.Ticker {
return time.NewTicker(d)
}

// GaugeLabelValues is one gauge in a set sharing a single key, that
// are measured in a batch.
type GaugeLabelValues struct {
Expand Down Expand Up @@ -76,7 +61,7 @@ type GaugeCollectionProcess struct {
maxGaugeCardinality int

// time source
clock clock
clock timeutil.Clock
}

// NewGaugeCollectionProcess creates a new collection process for the callback
Expand All @@ -101,7 +86,7 @@ func NewGaugeCollectionProcess(
gaugeInterval,
maxGaugeCardinality,
logger,
defaultClock{},
timeutil.DefaultClock{},
)
}

Expand All @@ -124,7 +109,7 @@ func (m *ClusterMetricSink) NewGaugeCollectionProcess(
m.GaugeInterval,
m.MaxGaugeCardinality,
logger,
defaultClock{},
timeutil.DefaultClock{},
)
}

Expand All @@ -137,7 +122,7 @@ func newGaugeCollectionProcessWithClock(
gaugeInterval time.Duration,
maxGaugeCardinality int,
logger log.Logger,
clock clock,
clock timeutil.Clock,
) (*GaugeCollectionProcess, error) {
process := &GaugeCollectionProcess{
stop: make(chan struct{}, 1),
Expand Down
6 changes: 4 additions & 2 deletions helper/metricsutil/gauge_process_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/armon/go-metrics"
log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/vault/helper/timeutil"
)

// SimulatedTime maintains a virtual clock so the test isn't
Expand All @@ -24,9 +25,10 @@ import (
type SimulatedTime struct {
now time.Time
tickerBarrier chan *SimulatedTicker
timeutil.DefaultClock
}

var _ clock = &SimulatedTime{}
var _ timeutil.Clock = &SimulatedTime{}

type SimulatedTicker struct {
ticker *time.Ticker
Expand Down Expand Up @@ -121,7 +123,7 @@ func TestGauge_Creation(t *testing.T) {
t.Fatalf("Error creating collection process: %v", err)
}

if _, ok := p.clock.(defaultClock); !ok {
if _, ok := p.clock.(timeutil.DefaultClock); !ok {
t.Error("Default clock not installed.")
}

Expand Down
23 changes: 23 additions & 0 deletions helper/timeutil/timeutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,3 +142,26 @@ func SkipAtEndOfMonth(t *testing.T) {
t.Skip("too close to end of month")
}
}

// This interface allows unit tests to substitute in a simulated Clock.
type Clock interface {
Now() time.Time
NewTicker(time.Duration) *time.Ticker
NewTimer(time.Duration) *time.Timer
}

type DefaultClock struct{}
miagilepner marked this conversation as resolved.
Show resolved Hide resolved

var _ Clock = (*DefaultClock)(nil)

func (_ DefaultClock) Now() time.Time {
return time.Now()
}

func (_ DefaultClock) NewTicker(d time.Duration) *time.Ticker {
return time.NewTicker(d)
}

func (_ DefaultClock) NewTimer(d time.Duration) *time.Timer {
return time.NewTimer(d)
}
67 changes: 43 additions & 24 deletions vault/activity_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,12 @@ type ActivityLog struct {
// CensusReportInterval is the testing configuration for time between
// Write() calls initiated in CensusReport.
CensusReportInterval time.Duration

// clock is used to support manipulating time in unit and integration tests
clock timeutil.Clock
// precomputedQueryWritten receives an element whenever a precomputed query
// is written. It's used for unit testing
precomputedQueryWritten chan struct{}
}

// These non-persistent configuration options allow us to disable
Expand All @@ -205,6 +211,10 @@ type ActivityLogCoreConfig struct {

// MinimumRetentionMonths defines the minimum value for retention
MinimumRetentionMonths int

// Clock holds a custom clock to modify time.Now, time.Ticker, time.Timer.
// If nil, the default functions from the time package are used
Clock timeutil.Clock
}

// NewActivityLog creates an activity log.
Expand All @@ -214,6 +224,10 @@ func NewActivityLog(core *Core, logger log.Logger, view *BarrierView, metrics me
return nil, err
}

clock := core.activityLogConfig.Clock
if clock == nil {
clock = timeutil.DefaultClock{}
}
a := &ActivityLog{
core: core,
configOverrides: &core.activityLogConfig,
Expand All @@ -227,7 +241,7 @@ func NewActivityLog(core *Core, logger log.Logger, view *BarrierView, metrics me
doneCh: make(chan struct{}, 1),
partialMonthClientTracker: make(map[string]*activity.EntityRecord),
CensusReportInterval: time.Hour * 1,

clock: clock,
currentSegment: segmentInfo{
startTimestamp: 0,
currentClients: &activity.EntityActivityLog{
Expand All @@ -243,6 +257,7 @@ func NewActivityLog(core *Core, logger log.Logger, view *BarrierView, metrics me
},
standbyFragmentsReceived: make([]*activity.LogFragment, 0),
inprocessExport: atomic.NewBool(false),
precomputedQueryWritten: make(chan struct{}),
}

config, err := a.loadConfigOrDefault(core.activeContext)
Expand Down Expand Up @@ -274,7 +289,7 @@ func (a *ActivityLog) saveCurrentSegmentToStorage(ctx context.Context, force boo
// :force: forces a save of tokens/entities even if the in-memory log is empty
func (a *ActivityLog) saveCurrentSegmentToStorageLocked(ctx context.Context, force bool) error {
defer a.metrics.MeasureSinceWithLabels([]string{"core", "activity", "segment_write"},
time.Now(), []metricsutil.Label{})
a.clock.Now(), []metricsutil.Label{})

// Swap out the pending fragments
a.fragmentLock.Lock()
Expand Down Expand Up @@ -433,7 +448,7 @@ func (a *ActivityLog) saveCurrentSegmentInternal(ctx context.Context, force bool
case err != nil:
a.logger.Error(fmt.Sprintf("unable to retrieve oldest version timestamp: %s", err.Error()))
case len(a.currentSegment.tokenCount.CountByNamespaceID) > 0 &&
(oldestUpgradeTime.Add(time.Duration(trackedTWESegmentPeriod * time.Hour)).Before(time.Now())):
(oldestUpgradeTime.Add(time.Duration(trackedTWESegmentPeriod * time.Hour)).Before(a.clock.Now())):
a.logger.Error(fmt.Sprintf("storing nonzero token count over a month after vault was upgraded to %s", oldestVersion))
default:
if len(a.currentSegment.tokenCount.CountByNamespaceID) > 0 {
Expand Down Expand Up @@ -1005,7 +1020,7 @@ func (a *ActivityLog) SetConfig(ctx context.Context, config activityConfig) {

forceSave := false
if a.enabled && a.currentSegment.startTimestamp == 0 {
a.startNewCurrentLogLocked(time.Now().UTC())
a.startNewCurrentLogLocked(a.clock.Now().UTC())
// Force a save so we can distinguish between
//
// Month N-1: present
Expand All @@ -1031,7 +1046,7 @@ func (a *ActivityLog) SetConfig(ctx context.Context, config activityConfig) {
}

// check for segments out of retention period, if it has changed
go a.retentionWorker(ctx, time.Now(), a.retentionMonths)
go a.retentionWorker(ctx, a.clock.Now(), a.retentionMonths)
}

// update the enable flag and reset the current log
Expand Down Expand Up @@ -1097,7 +1112,7 @@ func (c *Core) setupActivityLogLocked(ctx context.Context, wg *sync.WaitGroup) e
c.activityLog = manager

// load activity log for "this month" into memory
err = manager.refreshFromStoredLog(manager.core.activeContext, wg, time.Now().UTC())
err = manager.refreshFromStoredLog(manager.core.activeContext, wg, manager.clock.Now().UTC())
if err != nil {
return err
}
Expand All @@ -1121,7 +1136,7 @@ func (c *Core) setupActivityLogLocked(ctx context.Context, wg *sync.WaitGroup) e
// Signal when this is done so that unit tests can proceed.
manager.retentionDone = make(chan struct{})
go func(months int) {
manager.retentionWorker(ctx, time.Now(), months)
manager.retentionWorker(ctx, manager.clock.Now(), months)
close(manager.retentionDone)
}(manager.retentionMonths)

Expand Down Expand Up @@ -1158,7 +1173,7 @@ func (a *ActivityLog) StartOfNextMonth() time.Time {
defer a.l.RUnlock()
var segmentStart time.Time
if a.currentSegment.startTimestamp == 0 {
segmentStart = time.Now().UTC()
segmentStart = a.clock.Now().UTC()
} else {
segmentStart = time.Unix(a.currentSegment.startTimestamp, 0).UTC()
}
Expand All @@ -1170,12 +1185,12 @@ func (a *ActivityLog) StartOfNextMonth() time.Time {
// perfStandbyFragmentWorker handles scheduling fragments
// to send via RPC; it runs on perf standby nodes only.
func (a *ActivityLog) perfStandbyFragmentWorker(ctx context.Context) {
timer := time.NewTimer(time.Duration(0))
timer := a.clock.NewTimer(time.Duration(0))
fragmentWaiting := false
// Eat first event, so timer is stopped
<-timer.C

endOfMonth := time.NewTimer(a.StartOfNextMonth().Sub(time.Now()))
endOfMonth := a.clock.NewTimer(a.StartOfNextMonth().Sub(a.clock.Now()))
if a.configOverrides.DisableTimers {
endOfMonth.Stop()
}
Expand Down Expand Up @@ -1247,18 +1262,18 @@ func (a *ActivityLog) perfStandbyFragmentWorker(ctx context.Context) {
// Set timer for next month.
// The current segment *probably* hasn't been set yet (via invalidation),
// so don't rely on it.
target := timeutil.StartOfNextMonth(time.Now().UTC())
endOfMonth.Reset(target.Sub(time.Now()))
target := timeutil.StartOfNextMonth(a.clock.Now().UTC())
endOfMonth.Reset(target.Sub(a.clock.Now()))
}
}
}

// activeFragmentWorker handles scheduling the write of the next
// segment. It runs on active nodes only.
func (a *ActivityLog) activeFragmentWorker(ctx context.Context) {
ticker := time.NewTicker(activitySegmentInterval)
ticker := a.clock.NewTicker(activitySegmentInterval)

endOfMonth := time.NewTimer(a.StartOfNextMonth().Sub(time.Now()))
endOfMonth := a.clock.NewTimer(a.StartOfNextMonth().Sub(a.clock.Now()))
if a.configOverrides.DisableTimers {
endOfMonth.Stop()
}
Expand Down Expand Up @@ -1308,7 +1323,7 @@ func (a *ActivityLog) activeFragmentWorker(ctx context.Context) {

// Reset the schedule to wait 10 minutes from this forced write.
ticker.Stop()
ticker = time.NewTicker(activitySegmentInterval)
ticker = a.clock.NewTicker(activitySegmentInterval)

// Simpler, but ticker.Reset was introduced in go 1.15:
// ticker.Reset(activitySegmentInterval)
Expand All @@ -1324,7 +1339,7 @@ func (a *ActivityLog) activeFragmentWorker(ctx context.Context) {
go a.retentionWorker(ctx, currentTime.UTC(), a.retentionMonths)
a.l.RUnlock()

delta := a.StartOfNextMonth().Sub(time.Now())
delta := a.StartOfNextMonth().Sub(a.clock.Now())
if delta < 20*time.Minute {
delta = 20 * time.Minute
}
Expand Down Expand Up @@ -1513,7 +1528,7 @@ func (a *ActivityLog) createCurrentFragment() {
Clients: make([]*activity.EntityRecord, 0, 120),
NonEntityTokens: make(map[string]uint64),
}
a.fragmentCreation = time.Now().UTC()
a.fragmentCreation = a.clock.Now().UTC()

// Signal that a new segment is available, start
// the timer to send it.
Expand Down Expand Up @@ -1613,13 +1628,13 @@ func (a *ActivityLog) handleQuery(ctx context.Context, startTime, endTime time.T
// with the endTime equal to the end of the last month, and add in the current month
// data.
precomputedQueryEndTime := endTime
if timeutil.IsCurrentMonth(endTime, time.Now().UTC()) {
if timeutil.IsCurrentMonth(endTime, a.clock.Now().UTC()) {
precomputedQueryEndTime = timeutil.EndOfMonth(timeutil.MonthsPreviousTo(1, timeutil.StartOfMonth(endTime)))
computePartial = true
}

pq := &activity.PrecomputedQuery{}
if startTime.After(precomputedQueryEndTime) && timeutil.IsCurrentMonth(startTime, time.Now().UTC()) {
if startTime.After(precomputedQueryEndTime) && timeutil.IsCurrentMonth(startTime, a.clock.Now().UTC()) {
// We're only calculating the partial month client count. Skip the precomputation
// get call.
pq = &activity.PrecomputedQuery{
Expand Down Expand Up @@ -1794,21 +1809,21 @@ func (a *ActivityLog) handleQuery(ctx context.Context, startTime, endTime time.T
a.sortActivityLogMonthsResponse(months)

// Modify the final month output to make response more consumable based on API request
months = modifyResponseMonths(months, startTime, endTime)
months = a.modifyResponseMonths(months, startTime, endTime)
responseData["months"] = months

return responseData, nil
}

// modifyResponseMonths fills out various parts of the query structure to help
// activity log clients parse the returned query.
func modifyResponseMonths(months []*ResponseMonth, start time.Time, end time.Time) []*ResponseMonth {
func (a *ActivityLog) modifyResponseMonths(months []*ResponseMonth, start time.Time, end time.Time) []*ResponseMonth {
if len(months) == 0 {
return months
}
start = timeutil.StartOfMonth(start)
end = timeutil.EndOfMonth(end)
if timeutil.IsCurrentMonth(end, time.Now().UTC()) {
if timeutil.IsCurrentMonth(end, a.clock.Now().UTC()) {
end = timeutil.EndOfMonth(timeutil.StartOfMonth(end).AddDate(0, -1, 0))
}
modifiedResponseMonths := make([]*ResponseMonth, 0)
Expand Down Expand Up @@ -2328,7 +2343,7 @@ func (a *ActivityLog) precomputedQueryWorker(ctx context.Context) error {
// If there's an intent log, finish it even if the feature is currently disabled.
a.l.RLock()
currentMonth := a.currentSegment.startTimestamp
// Base retention period on the month we are generating (even in the past)--- time.Now()
// Base retention period on the month we are generating (even in the past)--- a.clock.Now()
// would work but this will be easier to control in tests.
retentionWindow := timeutil.MonthsPreviousTo(a.retentionMonths, time.Unix(intent.NextMonth, 0).UTC())
a.l.RUnlock()
Expand Down Expand Up @@ -2396,6 +2411,10 @@ func (a *ActivityLog) precomputedQueryWorker(ctx context.Context) error {

a.logger.Info("finished computing queries", "month", endTime)

select {
case a.precomputedQueryWritten <- struct{}{}:
default:
}
hghaf099 marked this conversation as resolved.
Show resolved Hide resolved
return nil
}

Expand Down Expand Up @@ -2489,7 +2508,7 @@ func (a *ActivityLog) populateNamespaceAndMonthlyBreakdowns() (map[int64]*proces
byNamespace := make(map[string]*processByNamespace)
byMonth := make(map[int64]*processMonth)
for _, e := range a.partialMonthClientTracker {
processClientRecord(e, byNamespace, byMonth, time.Now())
processClientRecord(e, byNamespace, byMonth, a.clock.Now())
}
return byMonth, byNamespace
}
Expand Down
Loading