Skip to content

Commit

Permalink
feat(canary): Add test to check query results with and without cache. (
Browse files Browse the repository at this point in the history
…#13104)

Signed-off-by: Kaviraj <kavirajkanagaraj@gmail.com>
  • Loading branch information
kavirajk authored Jun 3, 2024
1 parent 8084259 commit 71507a2
Show file tree
Hide file tree
Showing 5 changed files with 192 additions and 24 deletions.
1 change: 1 addition & 0 deletions clients/cmd/promtail/promtail-local-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@ scrape_configs:
labels:
job: varlogs
__path__: /var/log/*log
stream: stdout
6 changes: 5 additions & 1 deletion cmd/loki-canary/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ func main() {
metricTestQueryRange := flag.Duration("metric-test-range", 24*time.Hour, "The range value [24h] used in the metric test instant-query."+
" Note: this value is truncated to the running time of the canary until this value is reached")

cacheTestInterval := flag.Duration("cache-test-interval", 15*time.Minute, "The interval the cache test query should be run")
cacheTestQueryRange := flag.Duration("cache-test-range", 24*time.Hour, "The range value [24h] used in the cache test instant-query.")
cacheTestQueryNow := flag.Duration("cache-test-now", 1*time.Hour, "duration how far back from current time the execution time (--now) should be set for running this query in the cache test instant-query.")

spotCheckInterval := flag.Duration("spot-check-interval", 15*time.Minute, "Interval that a single result will be kept from sent entries and spot-checked against Loki, "+
"e.g. 15min default one entry every 15 min will be saved and then queried again every 15min until spot-check-max is reached")
spotCheckMax := flag.Duration("spot-check-max", 4*time.Hour, "How far back to check a spot check entry before dropping it")
Expand Down Expand Up @@ -189,7 +193,7 @@ func main() {
_, _ = fmt.Fprintf(os.Stderr, "Unable to create reader for Loki querier, check config: %s", err)
os.Exit(1)
}
c.comparator = comparator.NewComparator(os.Stderr, *wait, *maxWait, *pruneInterval, *spotCheckInterval, *spotCheckMax, *spotCheckQueryRate, *spotCheckWait, *metricTestInterval, *metricTestQueryRange, *interval, *buckets, sentChan, receivedChan, c.reader, true)
c.comparator = comparator.NewComparator(os.Stderr, *wait, *maxWait, *pruneInterval, *spotCheckInterval, *spotCheckMax, *spotCheckQueryRate, *spotCheckWait, *metricTestInterval, *metricTestQueryRange, *cacheTestInterval, *cacheTestQueryRange, *cacheTestQueryNow, *interval, *buckets, sentChan, receivedChan, c.reader, true)
}

startCanary()
Expand Down
99 changes: 90 additions & 9 deletions pkg/canary/comparator/comparator.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package comparator
import (
"fmt"
"io"
"math"
"math/rand"
"sync"
"time"
Expand All @@ -24,6 +25,8 @@ const (
DebugWebsocketMissingEntry = "websocket missing entry: %v\n"
DebugQueryResult = "confirmation query result: %v\n"
DebugEntryFound = "missing websocket entry %v was found %v seconds after it was originally sent\n"

floatDiffTolerance = 1e-6
)

var (
Expand Down Expand Up @@ -90,6 +93,16 @@ var (
Help: "how long the spot check test query execution took in seconds.",
Buckets: instrument.DefBuckets,
})
queryResultsDiff = promauto.NewCounter(prometheus.CounterOpts{
Namespace: "loki_canary",
Name: "cache_test_query_results_diff_total",
Help: "counts number of times the query results was different with and without cache ",
})
queryResultsTotal = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "loki_canary",
Name: "cache_test_query_results_total",
Help: "counts number of times the query results test requests are done ",
}, []string{"status"}) // status=success/failure
)

type Comparator struct {
Expand All @@ -98,6 +111,7 @@ type Comparator struct {
spotEntMtx sync.Mutex // Locks access to []spotCheck
spotMtx sync.Mutex // Locks spotcheckRunning for single threaded but async spotCheck()
metTestMtx sync.Mutex // Locks metricTestRunning for single threaded but async metricTest()
cacheTestMtx sync.Mutex // Locks cacheTestRunning for single threaded but async cacheTest()
pruneMtx sync.Mutex // Locks pruneEntriesRunning for single threaded but async pruneEntries()
w io.Writer
entries []*time.Time
Expand All @@ -116,14 +130,19 @@ type Comparator struct {
metricTestInterval time.Duration
metricTestRange time.Duration
metricTestRunning bool
writeInterval time.Duration
confirmAsync bool
startTime time.Time
sent chan time.Time
recv chan time.Time
rdr reader.LokiReader
quit chan struct{}
done chan struct{}
cacheTestInterval time.Duration
cacheTestRange time.Duration
// how far back from current time the execution time (--now) should be set for running this query.
cacheTestNow time.Duration
cacheTestRunning bool
writeInterval time.Duration
confirmAsync bool
startTime time.Time
sent chan time.Time
recv chan time.Time
rdr reader.LokiReader
quit chan struct{}
done chan struct{}
}

func NewComparator(writer io.Writer,
Expand All @@ -133,6 +152,9 @@ func NewComparator(writer io.Writer,
spotCheckInterval, spotCheckMax, spotCheckQueryRate, spotCheckWait time.Duration,
metricTestInterval time.Duration,
metricTestRange time.Duration,
cacheTestInterval time.Duration,
cacheTestRange time.Duration,
cacheTestNow time.Duration,
writeInterval time.Duration,
buckets int,
sentChan chan time.Time,
Expand All @@ -155,6 +177,10 @@ func NewComparator(writer io.Writer,
metricTestInterval: metricTestInterval,
metricTestRange: metricTestRange,
metricTestRunning: false,
cacheTestInterval: cacheTestInterval,
cacheTestRange: cacheTestRange,
cacheTestNow: cacheTestNow,
cacheTestRunning: false,
writeInterval: writeInterval,
confirmAsync: confirmAsync,
startTime: time.Now(),
Expand Down Expand Up @@ -252,10 +278,12 @@ func (c *Comparator) run() {
randomGenerator := rand.New(rand.NewSource(time.Now().UnixNano()))
mt := time.NewTicker(time.Duration(randomGenerator.Int63n(c.metricTestInterval.Nanoseconds())))
sc := time.NewTicker(c.spotCheckQueryRate)
ct := time.NewTicker(c.cacheTestInterval)
defer func() {
t.Stop()
mt.Stop()
sc.Stop()
ct.Stop()
close(c.done)
}()

Expand Down Expand Up @@ -294,12 +322,65 @@ func (c *Comparator) run() {
firstMt = false
mt.Reset(c.metricTestInterval)
}
case <-ct.C:
// Only run one instance of cache tests at a time.
c.cacheTestMtx.Lock()
if !c.cacheTestRunning {
c.cacheTestRunning = true
go c.cacheTest(time.Now())
}
c.cacheTestMtx.Unlock()

case <-c.quit:
return
}
}
}

func (c *Comparator) cacheTest(currTime time.Time) {
defer func() {
c.cacheTestMtx.Lock()
c.cacheTestRunning = false
c.cacheTestMtx.Unlock()
}()

// cacheTest is currently run using `reader.CountOverTime()` which is an instant query.
// We make the query with and without cache over the data that is not changing (e.g: --now="1hr ago") instead of on latest data that is a moving target.
queryStartTime := currTime.Add(-c.cacheTestNow)

// We cannot query for range before the pod even started.
if queryStartTime.Before(c.startTime) {
// we wait.
fmt.Fprintf(c.w, "cacheTest not run. still waiting for query start range(%s) to past the process start time(%s).\n", queryStartTime, c.startTime)
return
}

rangeDuration := c.cacheTestRange
rng := fmt.Sprintf("%.0fs", rangeDuration.Seconds())

// with cache
countCache, err := c.rdr.QueryCountOverTime(rng, queryStartTime, true)
if err != nil {
fmt.Fprintf(c.w, "error running cache query test with cache: %s\n", err.Error())
queryResultsTotal.WithLabelValues("failure").Inc()
return
}

// without cache
countNocache, err := c.rdr.QueryCountOverTime(rng, queryStartTime, false)
if err != nil {
fmt.Fprintf(c.w, "error running cache query test without cache: %s\n", err.Error())
queryResultsTotal.WithLabelValues("failure").Inc()
return
}

queryResultsTotal.WithLabelValues("success").Inc()
if math.Abs(countNocache-countCache) > floatDiffTolerance {
queryResultsDiff.Inc()
fmt.Fprintf(c.w, "found a diff in instant query results time: %s, result_with_cache: %v, result_without_cache: %v\n", queryStartTime, countCache, countNocache)
}
}

// check that the expected # of log lines have been written to Loki
func (c *Comparator) metricTest(currTime time.Time) {
// Always make sure to set the running state back to false
Expand All @@ -317,7 +398,7 @@ func (c *Comparator) metricTest(currTime time.Time) {
adjustedRange = currTime.Sub(c.startTime)
}
begin := time.Now()
actualCount, err := c.rdr.QueryCountOverTime(fmt.Sprintf("%.0fs", adjustedRange.Seconds()))
actualCount, err := c.rdr.QueryCountOverTime(fmt.Sprintf("%.0fs", adjustedRange.Seconds()), begin, true)
metricTestLatency.Observe(time.Since(begin).Seconds())
if err != nil {
fmt.Fprintf(c.w, "error running metric query test: %s\n", err.Error())
Expand Down
99 changes: 89 additions & 10 deletions pkg/canary/comparator/comparator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func TestComparatorEntryReceivedOutOfOrder(t *testing.T) {
duplicateEntries = &mockCounter{}

actual := &bytes.Buffer{}
c := NewComparator(actual, 1*time.Hour, 1*time.Hour, 1*time.Hour, 15*time.Minute, 4*time.Hour, 4*time.Hour, 0, 1*time.Minute, 0, 0, 1, make(chan time.Time), make(chan time.Time), nil, false)
c := NewComparator(actual, 1*time.Hour, 1*time.Hour, 1*time.Hour, 15*time.Minute, 4*time.Hour, 4*time.Hour, 0, 1*time.Minute, 0, 1*time.Hour, 3*time.Hour, 30*time.Minute, 0, 1, make(chan time.Time), make(chan time.Time), nil, false)

t1 := time.Now()
t2 := t1.Add(1 * time.Second)
Expand Down Expand Up @@ -60,7 +60,7 @@ func TestComparatorEntryReceivedNotExpected(t *testing.T) {
duplicateEntries = &mockCounter{}

actual := &bytes.Buffer{}
c := NewComparator(actual, 1*time.Hour, 1*time.Hour, 1*time.Hour, 15*time.Minute, 4*time.Hour, 4*time.Hour, 0, 1*time.Minute, 0, 0, 1, make(chan time.Time), make(chan time.Time), nil, false)
c := NewComparator(actual, 1*time.Hour, 1*time.Hour, 1*time.Hour, 15*time.Minute, 4*time.Hour, 4*time.Hour, 0, 1*time.Minute, 0, 1*time.Hour, 3*time.Hour, 30*time.Minute, 0, 1, make(chan time.Time), make(chan time.Time), nil, false)

t1 := time.Now()
t2 := t1.Add(1 * time.Second)
Expand Down Expand Up @@ -101,7 +101,7 @@ func TestComparatorEntryReceivedDuplicate(t *testing.T) {
duplicateEntries = &mockCounter{}

actual := &bytes.Buffer{}
c := NewComparator(actual, 1*time.Hour, 1*time.Hour, 1*time.Hour, 15*time.Minute, 4*time.Hour, 4*time.Hour, 0, 1*time.Minute, 0, 0, 1, make(chan time.Time), make(chan time.Time), nil, false)
c := NewComparator(actual, 1*time.Hour, 1*time.Hour, 1*time.Hour, 15*time.Minute, 4*time.Hour, 4*time.Hour, 0, 1*time.Minute, 0, 1*time.Hour, 3*time.Hour, 30*time.Minute, 0, 1, make(chan time.Time), make(chan time.Time), nil, false)

t1 := time.Unix(0, 0)
t2 := t1.Add(1 * time.Second)
Expand Down Expand Up @@ -159,7 +159,7 @@ func TestEntryNeverReceived(t *testing.T) {
wait := 60 * time.Second
maxWait := 300 * time.Second
//We set the prune interval timer to a huge value here so that it never runs, instead we call pruneEntries manually below
c := NewComparator(actual, wait, maxWait, 50*time.Hour, 15*time.Minute, 4*time.Hour, 4*time.Hour, 0, 1*time.Minute, 0, 0, 1, make(chan time.Time), make(chan time.Time), mr, false)
c := NewComparator(actual, wait, maxWait, 50*time.Hour, 15*time.Minute, 4*time.Hour, 4*time.Hour, 0, 1*time.Minute, 0, 1*time.Hour, 3*time.Hour, 30*time.Minute, 0, 1, make(chan time.Time), make(chan time.Time), mr, false)

c.entrySent(t1)
c.entrySent(t2)
Expand Down Expand Up @@ -232,7 +232,7 @@ func TestConcurrentConfirmMissing(t *testing.T) {
wait := 30 * time.Millisecond
maxWait := 30 * time.Millisecond

c := NewComparator(output, wait, maxWait, 50*time.Hour, 15*time.Minute, 4*time.Hour, 4*time.Hour, 0, 1*time.Minute, 0, 0, 1, make(chan time.Time), make(chan time.Time), mr, false)
c := NewComparator(output, wait, maxWait, 50*time.Hour, 15*time.Minute, 4*time.Hour, 4*time.Hour, 0, 1*time.Minute, 0, 1*time.Hour, 3*time.Hour, 30*time.Minute, 0, 1, make(chan time.Time), make(chan time.Time), mr, false)

for _, t := range found {
tCopy := t
Expand Down Expand Up @@ -263,7 +263,7 @@ func TestPruneAckdEntires(t *testing.T) {
wait := 30 * time.Millisecond
maxWait := 30 * time.Millisecond
//We set the prune interval timer to a huge value here so that it never runs, instead we call pruneEntries manually below
c := NewComparator(actual, wait, maxWait, 50*time.Hour, 15*time.Minute, 4*time.Hour, 4*time.Hour, 0, 1*time.Minute, 0, 0, 1, make(chan time.Time), make(chan time.Time), nil, false)
c := NewComparator(actual, wait, maxWait, 50*time.Hour, 15*time.Minute, 4*time.Hour, 4*time.Hour, 0, 1*time.Minute, 0, 1*time.Hour, 3*time.Hour, 30*time.Minute, 0, 1, make(chan time.Time), make(chan time.Time), nil, false)

t1 := time.Unix(0, 0)
t2 := t1.Add(1 * time.Millisecond)
Expand Down Expand Up @@ -320,7 +320,7 @@ func TestSpotCheck(t *testing.T) {
spotCheck := 10 * time.Millisecond
spotCheckMax := 20 * time.Millisecond
//We set the prune interval timer to a huge value here so that it never runs, instead we call spotCheckEntries manually below
c := NewComparator(actual, 1*time.Hour, 1*time.Hour, 50*time.Hour, spotCheck, spotCheckMax, 4*time.Hour, 3*time.Millisecond, 1*time.Minute, 0, 0, 1, make(chan time.Time), make(chan time.Time), mr, false)
c := NewComparator(actual, 1*time.Hour, 1*time.Hour, 50*time.Hour, spotCheck, spotCheckMax, 4*time.Hour, 3*time.Millisecond, 1*time.Minute, 0, 1*time.Hour, 3*time.Hour, 30*time.Minute, 0, 1, make(chan time.Time), make(chan time.Time), mr, false)

// Send all the entries
for i := range entries {
Expand Down Expand Up @@ -360,6 +360,42 @@ func TestSpotCheck(t *testing.T) {
prometheus.Unregister(responseLatency)
}

func TestCacheTest(t *testing.T) {
actual := &bytes.Buffer{}
mr := &mockReader{}
now := time.Now()
cacheTestInterval := 500 * time.Millisecond
cacheTestRange := 30 * time.Second
cacheTestNow := 2 * time.Second

c := NewComparator(actual, 1*time.Hour, 1*time.Hour, 50*time.Hour, 0, 0, 4*time.Hour, 0, 10*time.Minute, 0, cacheTestInterval, cacheTestRange, cacheTestNow, 1*time.Hour, 1, make(chan time.Time), make(chan time.Time), mr, false)
// Force the start time to a known value
c.startTime = time.Unix(10, 0)

queryResultsDiff = &mockCounter{}
mr.countOverTime = 2.3
mr.noCacheCountOvertime = mr.countOverTime // same value for both with and without cache
c.cacheTest(now)
assert.Equal(t, 0, queryResultsDiff.(*mockCounter).count)

queryResultsDiff = &mockCounter{} // reset counter
mr.countOverTime = 2.3 // value not important
mr.noCacheCountOvertime = 2.5 // different than `countOverTime` value.
c.cacheTest(now)
assert.Equal(t, 1, queryResultsDiff.(*mockCounter).count)

queryResultsDiff = &mockCounter{} // reset counter
mr.countOverTime = 2.3 // value not important
mr.noCacheCountOvertime = 2.30000005 // different than `countOverTime` value but withing tolerance
c.cacheTest(now)
assert.Equal(t, 0, queryResultsDiff.(*mockCounter).count)

// This avoids a panic on subsequent test execution,
// seems ugly but was easy, and multiple instantiations
// of the comparator should be an error
prometheus.Unregister(responseLatency)
}

func TestMetricTest(t *testing.T) {
metricTestActual = &mockGauge{}
metricTestExpected = &mockGauge{}
Expand All @@ -371,7 +407,7 @@ func TestMetricTest(t *testing.T) {
mr := &mockReader{}
metricTestRange := 30 * time.Second
//We set the prune interval timer to a huge value here so that it never runs, instead we call spotCheckEntries manually below
c := NewComparator(actual, 1*time.Hour, 1*time.Hour, 50*time.Hour, 0, 0, 4*time.Hour, 0, 10*time.Minute, metricTestRange, writeInterval, 1, make(chan time.Time), make(chan time.Time), mr, false)
c := NewComparator(actual, 1*time.Hour, 1*time.Hour, 50*time.Hour, 0, 0, 4*time.Hour, 0, 10*time.Minute, metricTestRange, 1*time.Hour, 3*time.Hour, 30*time.Minute, writeInterval, 1, make(chan time.Time), make(chan time.Time), mr, false)
// Force the start time to a known value
c.startTime = time.Unix(10, 0)

Expand Down Expand Up @@ -456,6 +492,42 @@ func (m *mockCounter) Inc() {
m.count++
}

type mockCounterVec struct {
mockCounter
labels []string
}

func (m *mockCounterVec) WithLabelValues(lvs ...string) prometheus.Counter {
m.labels = lvs
return &m.mockCounter
}

func (m *mockCounterVec) Desc() *prometheus.Desc {
panic("implement me")
}

func (m *mockCounterVec) Write(*io_prometheus_client.Metric) error {
panic("implement me")
}

func (m *mockCounterVec) Describe(chan<- *prometheus.Desc) {
panic("implement me")
}

func (m *mockCounterVec) Collect(chan<- prometheus.Metric) {
panic("implement me")
}

func (m *mockCounterVec) Add(float64) {
panic("implement me")
}

func (m *mockCounterVec) Inc() {
m.cLck.Lock()
defer m.cLck.Unlock()
m.count++
}

type mockGauge struct {
cLck sync.Mutex
val float64
Expand Down Expand Up @@ -507,13 +579,20 @@ type mockReader struct {
resp []time.Time
countOverTime float64
queryRange string

// return this value if called without cache.
noCacheCountOvertime float64
}

func (r *mockReader) Query(_ time.Time, _ time.Time) ([]time.Time, error) {
return r.resp, nil
}

func (r *mockReader) QueryCountOverTime(queryRange string) (float64, error) {
func (r *mockReader) QueryCountOverTime(queryRange string, _ time.Time, cache bool) (float64, error) {
r.queryRange = queryRange
return r.countOverTime, nil
res := r.countOverTime
if !cache {
res = r.noCacheCountOvertime
}
return res, nil
}
Loading

0 comments on commit 71507a2

Please sign in to comment.