Skip to content

Commit

Permalink
make the missing entry lookup synchronous during tests to hopefully a…
Browse files Browse the repository at this point in the history
…void races around the entry order and test verifications
  • Loading branch information
slim-bean committed Aug 13, 2019
1 parent 600b6cf commit 9563033
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 8 deletions.
2 changes: 1 addition & 1 deletion cmd/loki-canary/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func main() {

w := writer.NewWriter(os.Stdout, sentChan, *interval, *size)
r := reader.NewReader(os.Stderr, receivedChan, *tls, *addr, *user, *pass, *lName, *lVal)
c := comparator.NewComparator(os.Stderr, *wait, *pruneInterval, *buckets, sentChan, receivedChan, r)
c := comparator.NewComparator(os.Stderr, *wait, *pruneInterval, *buckets, sentChan, receivedChan, r, true)

http.Handle("/metrics", promhttp.Handler())
go func() {
Expand Down
13 changes: 11 additions & 2 deletions pkg/canary/comparator/comparator.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ type Comparator struct {
ackdEntries []*time.Time
maxWait time.Duration
pruneInterval time.Duration
confirmAsync bool
sent chan time.Time
recv chan time.Time
rdr reader.LokiReader
Expand All @@ -69,12 +70,13 @@ type Comparator struct {
}

func NewComparator(writer io.Writer, maxWait time.Duration, pruneInterval time.Duration,
buckets int, sentChan chan time.Time, receivedChan chan time.Time, reader reader.LokiReader) *Comparator {
buckets int, sentChan chan time.Time, receivedChan chan time.Time, reader reader.LokiReader, confirmAsync bool) *Comparator {
c := &Comparator{
w: writer,
entries: []*time.Time{},
maxWait: maxWait,
pruneInterval: pruneInterval,
confirmAsync: confirmAsync,
sent: sentChan,
recv: receivedChan,
rdr: reader,
Expand Down Expand Up @@ -160,6 +162,8 @@ func (c *Comparator) entryReceived(ts time.Time) {
}

func (c *Comparator) Size() int {
c.entMtx.Lock()
defer c.entMtx.Unlock()
return len(c.entries)
}

Expand Down Expand Up @@ -209,7 +213,12 @@ func (c *Comparator) pruneEntries() {
}
c.entries = c.entries[:k]
if len(missing) > 0 {
go c.confirmMissing(missing)
if c.confirmAsync {
go c.confirmMissing(missing)
} else {
c.confirmMissing(missing)
}

}

// Prune the acknowledged list, remove anything older than our maxwait
Expand Down
10 changes: 5 additions & 5 deletions pkg/canary/comparator/comparator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func TestComparatorEntryReceivedOutOfOrder(t *testing.T) {
duplicateEntries = &mockCounter{}

actual := &bytes.Buffer{}
c := NewComparator(actual, 1*time.Hour, 1*time.Hour, 1, make(chan time.Time), make(chan time.Time), nil)
c := NewComparator(actual, 1*time.Hour, 1*time.Hour, 1, make(chan time.Time), make(chan time.Time), nil, false)

t1 := time.Now()
t2 := t1.Add(1 * time.Second)
Expand Down Expand Up @@ -60,7 +60,7 @@ func TestComparatorEntryReceivedNotExpected(t *testing.T) {
duplicateEntries = &mockCounter{}

actual := &bytes.Buffer{}
c := NewComparator(actual, 1*time.Hour, 1*time.Hour, 1, make(chan time.Time), make(chan time.Time), nil)
c := NewComparator(actual, 1*time.Hour, 1*time.Hour, 1, make(chan time.Time), make(chan time.Time), nil, false)

t1 := time.Now()
t2 := t1.Add(1 * time.Second)
Expand Down Expand Up @@ -101,7 +101,7 @@ func TestComparatorEntryReceivedDuplicate(t *testing.T) {
duplicateEntries = &mockCounter{}

actual := &bytes.Buffer{}
c := NewComparator(actual, 1*time.Hour, 1*time.Hour, 1, make(chan time.Time), make(chan time.Time), nil)
c := NewComparator(actual, 1*time.Hour, 1*time.Hour, 1, make(chan time.Time), make(chan time.Time), nil, false)

t1 := time.Now()
t2 := t1.Add(1 * time.Second)
Expand Down Expand Up @@ -157,7 +157,7 @@ func TestEntryNeverReceived(t *testing.T) {

mr := &mockReader{found}
maxWait := 50 * time.Millisecond
c := NewComparator(actual, maxWait, 2*time.Millisecond, 1, make(chan time.Time), make(chan time.Time), mr)
c := NewComparator(actual, maxWait, 2*time.Millisecond, 1, make(chan time.Time), make(chan time.Time), mr, false)

c.entrySent(t1)
c.entrySent(t2)
Expand Down Expand Up @@ -202,7 +202,7 @@ func TestEntryNeverReceived(t *testing.T) {
func TestPruneAckdEntires(t *testing.T) {
actual := &bytes.Buffer{}
maxWait := 30 * time.Millisecond
c := NewComparator(actual, maxWait, 10*time.Millisecond, 1, make(chan time.Time), make(chan time.Time), nil)
c := NewComparator(actual, maxWait, 10*time.Millisecond, 1, make(chan time.Time), make(chan time.Time), nil, false)

t1 := time.Now()
t2 := t1.Add(1 * time.Millisecond)
Expand Down

0 comments on commit 9563033

Please sign in to comment.