Skip to content

Commit

Permalink
Add tests and benchmarks for delta computation
Browse files Browse the repository at this point in the history
  • Loading branch information
julianbrost committed Sep 23, 2021
1 parent 4457f9f commit cfd3fe3
Show file tree
Hide file tree
Showing 2 changed files with 267 additions and 0 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ require (
github.com/jessevdk/go-flags v1.5.0
github.com/jmoiron/sqlx v1.3.4
github.com/pkg/errors v0.9.1
github.com/stretchr/testify v1.7.0
go.uber.org/zap v1.19.0
golang.org/x/exp v0.0.0-20210514180818-737f94c0881e
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
Expand Down
266 changes: 266 additions & 0 deletions pkg/icingadb/delta_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,266 @@
package icingadb

import (
"context"
"encoding/binary"
"github.com/icinga/icingadb/pkg/common"
"github.com/icinga/icingadb/pkg/contracts"
v1 "github.com/icinga/icingadb/pkg/icingadb/v1"
"github.com/icinga/icingadb/pkg/types"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"go.uber.org/zap/zaptest"
"strconv"
"sync"
"testing"
)

func TestDelta(t *testing.T) {
type TestData struct {
Name string // name for the sub-test
Actual, Desired uint64 // checksum to send to actual/desired
Create, Update, Delete uint64 // checksum that must be in the corresponding map (if != 0)
}

tests := []TestData{{
Name: "Empty",
}, {
Name: "Create",
Desired: 0x1111111111111111,
Create: 0x1111111111111111,
}, {
Name: "Update",
Actual: 0x1111111111111111,
Desired: 0x2222222222222222,
Update: 0x2222222222222222,
}, {
Name: "Delete",
Actual: 0x1111111111111111,
Delete: 0x1111111111111111,
}, {
Name: "Keep",
Actual: 0x1111111111111111,
Desired: 0x1111111111111111,
}}

makeEndpoint := func(id, checksum uint64) *v1.Endpoint {
e := new(v1.Endpoint)
e.Id = testDeltaMakeIdOrChecksum(id)
e.PropertiesChecksum = testDeltaMakeIdOrChecksum(checksum)
return e
}

// Send the entities to the actual and desired channels in different ordering to catch bugs in the implementation
// that only show depending on the order in which actual and desired values are processed for an ID.
type SendOrder struct {
Name string
Send func(id uint64, test TestData, chActual, chDesired chan<- contracts.Entity)
}
sendOrders := []SendOrder{{
Name: "ActualFirst",
Send: func(id uint64, test TestData, chActual, chDesired chan<- contracts.Entity) {
if test.Actual != 0 {
chActual <- makeEndpoint(id, test.Actual)
}
if test.Desired != 0 {
chDesired <- makeEndpoint(id, test.Desired)
}
},
}, {
Name: "DesiredFirst",
Send: func(id uint64, test TestData, chActual, chDesired chan<- contracts.Entity) {
if test.Desired != 0 {
chDesired <- makeEndpoint(id, test.Desired)
}
if test.Actual != 0 {
chActual <- makeEndpoint(id, test.Actual)
}
},
}}

for _, test := range tests {
t.Run(test.Name, func(t *testing.T) {
for _, sendOrder := range sendOrders {
t.Run(sendOrder.Name, func(t *testing.T) {
id := uint64(0x42)
chActual := make(chan contracts.Entity)
chDesired := make(chan contracts.Entity)
subject := common.NewSyncSubject(v1.NewEndpoint)
logger := zaptest.NewLogger(t).Sugar()

go func() {
sendOrder.Send(id, test, chActual, chDesired)
close(chActual)
close(chDesired)
}()

delta := NewDelta(context.Background(), chActual, chDesired, subject, logger)
err := delta.Wait()
require.NoError(t, err, "delta should finish without error")

_, ok := <-chActual
require.False(t, ok, "chActual should have been closed")
_, ok = <-chDesired
require.False(t, ok, "chDesired should have been closed")

testDeltaVerifyResult(t, "Create", testDeltaMakeExpectedMap(id, test.Create), delta.Create)
testDeltaVerifyResult(t, "Update", testDeltaMakeExpectedMap(id, test.Update), delta.Update)
testDeltaVerifyResult(t, "Delete", testDeltaMakeExpectedMap(id, test.Delete), delta.Delete)
})
}
})
}

t.Run("Combined", func(t *testing.T) {
chActual := make(chan contracts.Entity)
chDesired := make(chan contracts.Entity)
subject := common.NewSyncSubject(v1.NewEndpoint)
logger := zaptest.NewLogger(t).Sugar()

expectedCreate := make(map[uint64]uint64)
expectedUpdate := make(map[uint64]uint64)
expectedDelete := make(map[uint64]uint64)

nextId := uint64(1)
var wg sync.WaitGroup
for _, test := range tests {
test := test
for _, sendOrder := range sendOrders {
sendOrder := sendOrder
id := nextId
nextId++
// Log ID mapping to allow easier debugging in case of failures.
t.Logf("ID=%d(%s) Test=%s SendOrder=%s",
id, testDeltaMakeIdOrChecksum(id).String(), test.Name, sendOrder.Name)
wg.Add(1)
go func() {
defer wg.Done()
sendOrder.Send(id, test, chActual, chDesired)
}()

if test.Create != 0 {
expectedCreate[id] = test.Create
}
if test.Update != 0 {
expectedUpdate[id] = test.Update
}
if test.Delete != 0 {
expectedDelete[id] = test.Delete
}
}
}
go func() {
wg.Wait()
close(chActual)
close(chDesired)
}()

delta := NewDelta(context.Background(), chActual, chDesired, subject, logger)
err := delta.Wait()
require.NoError(t, err, "delta should finish without error")

_, ok := <-chActual
require.False(t, ok, "chActual should have been closed")
_, ok = <-chDesired
require.False(t, ok, "chDesired should have been closed")

testDeltaVerifyResult(t, "Create", expectedCreate, delta.Create)
testDeltaVerifyResult(t, "Update", expectedUpdate, delta.Update)
testDeltaVerifyResult(t, "Delete", expectedDelete, delta.Delete)
})
}

func testDeltaMakeIdOrChecksum(i uint64) types.Binary {
b := make([]byte, 20)
binary.BigEndian.PutUint64(b, i)
return b
}

func testDeltaMakeExpectedMap(id uint64, checksum uint64) map[uint64]uint64 {
if checksum == 0 {
return nil
} else {
return map[uint64]uint64{
id: checksum,
}
}
}

func testDeltaVerifyResult(t *testing.T, name string, expected map[uint64]uint64, got EntitiesById) {
for id, checksum := range expected {
idKey := testDeltaMakeIdOrChecksum(id).String()
if assert.Containsf(t, got, idKey, "%s: should contain %s", name, idKey) {
expectedChecksum := testDeltaMakeIdOrChecksum(checksum).String()
gotChecksum := got[idKey].(contracts.Checksumer).Checksum().String()
assert.Equalf(t, expectedChecksum, gotChecksum, "%s: %s should match checksum", name, idKey)
delete(got, idKey)
}
}

for id := range got {
assert.Failf(t, "unexpected element", "%s: should not contain %s", name, id)
}
}

func BenchmarkDelta(b *testing.B) {
for n := 1 << 10; n <= 1<<20; n <<= 1 {
b.Run(strconv.Itoa(n), func(b *testing.B) {
benchmarkDelta(b, n)
})
}
}

func benchmarkDelta(b *testing.B, numEntities int) {
chActual := make([]chan contracts.Entity, b.N)
chDesired := make([]chan contracts.Entity, b.N)
for i := 0; i < b.N; i++ {
chActual[i] = make(chan contracts.Entity, numEntities)
chDesired[i] = make(chan contracts.Entity, numEntities)
}
makeEndpoint := func(id1, id2, checksum uint64) *v1.Endpoint {
e := new(v1.Endpoint)
e.Id = make([]byte, 20)
binary.BigEndian.PutUint64(e.Id[0:], id1)
binary.BigEndian.PutUint64(e.Id[8:], id2)
e.PropertiesChecksum = make([]byte, 20)
binary.BigEndian.PutUint64(e.PropertiesChecksum, checksum)
return e
}
for i := 0; i < numEntities; i++ {
// each iteration writes exactly one entity to each channel
var eActual, eDesired contracts.Entity
switch i % 3 {
case 0: // distinct IDs
eActual = makeEndpoint(1, uint64(i), uint64(i))
eDesired = makeEndpoint(2, uint64(i), uint64(i))
case 1: // same ID, same checksum
e := makeEndpoint(3, uint64(i), uint64(i))
eActual = e
eDesired = e
case 2: // same ID, different checksum
eActual = makeEndpoint(4, uint64(i), uint64(i))
eDesired = makeEndpoint(4, uint64(i), uint64(i+1))
}
for _, ch := range chActual {
ch <- eActual
}
for _, ch := range chDesired {
ch <- eDesired
}
}
for i := 0; i < b.N; i++ {
close(chActual[i])
close(chDesired[i])
}
subject := common.NewSyncSubject(v1.NewEndpoint)
// logger := zaptest.NewLogger(b).Sugar()
logger := zap.New(zapcore.NewTee()).Sugar()
b.ResetTimer()
for i := 0; i < b.N; i++ {
d := NewDelta(context.Background(), chActual[i], chDesired[i], subject, logger)
err := d.Wait()
assert.NoError(b, err, "delta should not fail")
}
}

0 comments on commit cfd3fe3

Please sign in to comment.