From cfd3fe3a9a47a61a7418ea248e0312620f06101a Mon Sep 17 00:00:00 2001 From: Julian Brost Date: Wed, 1 Sep 2021 16:33:44 +0200 Subject: [PATCH] Add tests and benchmarks for delta computation --- go.mod | 1 + pkg/icingadb/delta_test.go | 266 +++++++++++++++++++++++++++++++++++++ 2 files changed, 267 insertions(+) create mode 100644 pkg/icingadb/delta_test.go diff --git a/go.mod b/go.mod index 7be6444ed..efa965905 100644 --- a/go.mod +++ b/go.mod @@ -10,6 +10,7 @@ require ( github.com/jessevdk/go-flags v1.5.0 github.com/jmoiron/sqlx v1.3.4 github.com/pkg/errors v0.9.1 + github.com/stretchr/testify v1.7.0 go.uber.org/zap v1.19.0 golang.org/x/exp v0.0.0-20210514180818-737f94c0881e golang.org/x/sync v0.0.0-20210220032951-036812b2e83c diff --git a/pkg/icingadb/delta_test.go b/pkg/icingadb/delta_test.go new file mode 100644 index 000000000..9f1cdf881 --- /dev/null +++ b/pkg/icingadb/delta_test.go @@ -0,0 +1,266 @@ +package icingadb + +import ( + "context" + "encoding/binary" + "github.com/icinga/icingadb/pkg/common" + "github.com/icinga/icingadb/pkg/contracts" + v1 "github.com/icinga/icingadb/pkg/icingadb/v1" + "github.com/icinga/icingadb/pkg/types" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + "go.uber.org/zap/zaptest" + "strconv" + "sync" + "testing" +) + +func TestDelta(t *testing.T) { + type TestData struct { + Name string // name for the sub-test + Actual, Desired uint64 // checksum to send to actual/desired + Create, Update, Delete uint64 // checksum that must be in the corresponding map (if != 0) + } + + tests := []TestData{{ + Name: "Empty", + }, { + Name: "Create", + Desired: 0x1111111111111111, + Create: 0x1111111111111111, + }, { + Name: "Update", + Actual: 0x1111111111111111, + Desired: 0x2222222222222222, + Update: 0x2222222222222222, + }, { + Name: "Delete", + Actual: 0x1111111111111111, + Delete: 0x1111111111111111, + }, { + Name: "Keep", + Actual: 0x1111111111111111, + Desired: 0x1111111111111111, + }} + + makeEndpoint := func(id, checksum uint64) *v1.Endpoint { + e := new(v1.Endpoint) + e.Id = testDeltaMakeIdOrChecksum(id) + e.PropertiesChecksum = testDeltaMakeIdOrChecksum(checksum) + return e + } + + // Send the entities to the actual and desired channels in different ordering to catch bugs in the implementation + // that only show depending on the order in which actual and desired values are processed for an ID. + type SendOrder struct { + Name string + Send func(id uint64, test TestData, chActual, chDesired chan<- contracts.Entity) + } + sendOrders := []SendOrder{{ + Name: "ActualFirst", + Send: func(id uint64, test TestData, chActual, chDesired chan<- contracts.Entity) { + if test.Actual != 0 { + chActual <- makeEndpoint(id, test.Actual) + } + if test.Desired != 0 { + chDesired <- makeEndpoint(id, test.Desired) + } + }, + }, { + Name: "DesiredFirst", + Send: func(id uint64, test TestData, chActual, chDesired chan<- contracts.Entity) { + if test.Desired != 0 { + chDesired <- makeEndpoint(id, test.Desired) + } + if test.Actual != 0 { + chActual <- makeEndpoint(id, test.Actual) + } + }, + }} + + for _, test := range tests { + t.Run(test.Name, func(t *testing.T) { + for _, sendOrder := range sendOrders { + t.Run(sendOrder.Name, func(t *testing.T) { + id := uint64(0x42) + chActual := make(chan contracts.Entity) + chDesired := make(chan contracts.Entity) + subject := common.NewSyncSubject(v1.NewEndpoint) + logger := zaptest.NewLogger(t).Sugar() + + go func() { + sendOrder.Send(id, test, chActual, chDesired) + close(chActual) + close(chDesired) + }() + + delta := NewDelta(context.Background(), chActual, chDesired, subject, logger) + err := delta.Wait() + require.NoError(t, err, "delta should finish without error") + + _, ok := <-chActual + require.False(t, ok, "chActual should have been closed") + _, ok = <-chDesired + require.False(t, ok, "chDesired should have been closed") + + testDeltaVerifyResult(t, "Create", testDeltaMakeExpectedMap(id, test.Create), delta.Create) + testDeltaVerifyResult(t, "Update", testDeltaMakeExpectedMap(id, test.Update), delta.Update) + testDeltaVerifyResult(t, "Delete", testDeltaMakeExpectedMap(id, test.Delete), delta.Delete) + }) + } + }) + } + + t.Run("Combined", func(t *testing.T) { + chActual := make(chan contracts.Entity) + chDesired := make(chan contracts.Entity) + subject := common.NewSyncSubject(v1.NewEndpoint) + logger := zaptest.NewLogger(t).Sugar() + + expectedCreate := make(map[uint64]uint64) + expectedUpdate := make(map[uint64]uint64) + expectedDelete := make(map[uint64]uint64) + + nextId := uint64(1) + var wg sync.WaitGroup + for _, test := range tests { + test := test + for _, sendOrder := range sendOrders { + sendOrder := sendOrder + id := nextId + nextId++ + // Log ID mapping to allow easier debugging in case of failures. + t.Logf("ID=%d(%s) Test=%s SendOrder=%s", + id, testDeltaMakeIdOrChecksum(id).String(), test.Name, sendOrder.Name) + wg.Add(1) + go func() { + defer wg.Done() + sendOrder.Send(id, test, chActual, chDesired) + }() + + if test.Create != 0 { + expectedCreate[id] = test.Create + } + if test.Update != 0 { + expectedUpdate[id] = test.Update + } + if test.Delete != 0 { + expectedDelete[id] = test.Delete + } + } + } + go func() { + wg.Wait() + close(chActual) + close(chDesired) + }() + + delta := NewDelta(context.Background(), chActual, chDesired, subject, logger) + err := delta.Wait() + require.NoError(t, err, "delta should finish without error") + + _, ok := <-chActual + require.False(t, ok, "chActual should have been closed") + _, ok = <-chDesired + require.False(t, ok, "chDesired should have been closed") + + testDeltaVerifyResult(t, "Create", expectedCreate, delta.Create) + testDeltaVerifyResult(t, "Update", expectedUpdate, delta.Update) + testDeltaVerifyResult(t, "Delete", expectedDelete, delta.Delete) + }) +} + +func testDeltaMakeIdOrChecksum(i uint64) types.Binary { + b := make([]byte, 20) + binary.BigEndian.PutUint64(b, i) + return b +} + +func testDeltaMakeExpectedMap(id uint64, checksum uint64) map[uint64]uint64 { + if checksum == 0 { + return nil + } else { + return map[uint64]uint64{ + id: checksum, + } + } +} + +func testDeltaVerifyResult(t *testing.T, name string, expected map[uint64]uint64, got EntitiesById) { + for id, checksum := range expected { + idKey := testDeltaMakeIdOrChecksum(id).String() + if assert.Containsf(t, got, idKey, "%s: should contain %s", name, idKey) { + expectedChecksum := testDeltaMakeIdOrChecksum(checksum).String() + gotChecksum := got[idKey].(contracts.Checksumer).Checksum().String() + assert.Equalf(t, expectedChecksum, gotChecksum, "%s: %s should match checksum", name, idKey) + delete(got, idKey) + } + } + + for id := range got { + assert.Failf(t, "unexpected element", "%s: should not contain %s", name, id) + } +} + +func BenchmarkDelta(b *testing.B) { + for n := 1 << 10; n <= 1<<20; n <<= 1 { + b.Run(strconv.Itoa(n), func(b *testing.B) { + benchmarkDelta(b, n) + }) + } +} + +func benchmarkDelta(b *testing.B, numEntities int) { + chActual := make([]chan contracts.Entity, b.N) + chDesired := make([]chan contracts.Entity, b.N) + for i := 0; i < b.N; i++ { + chActual[i] = make(chan contracts.Entity, numEntities) + chDesired[i] = make(chan contracts.Entity, numEntities) + } + makeEndpoint := func(id1, id2, checksum uint64) *v1.Endpoint { + e := new(v1.Endpoint) + e.Id = make([]byte, 20) + binary.BigEndian.PutUint64(e.Id[0:], id1) + binary.BigEndian.PutUint64(e.Id[8:], id2) + e.PropertiesChecksum = make([]byte, 20) + binary.BigEndian.PutUint64(e.PropertiesChecksum, checksum) + return e + } + for i := 0; i < numEntities; i++ { + // each iteration writes exactly one entity to each channel + var eActual, eDesired contracts.Entity + switch i % 3 { + case 0: // distinct IDs + eActual = makeEndpoint(1, uint64(i), uint64(i)) + eDesired = makeEndpoint(2, uint64(i), uint64(i)) + case 1: // same ID, same checksum + e := makeEndpoint(3, uint64(i), uint64(i)) + eActual = e + eDesired = e + case 2: // same ID, different checksum + eActual = makeEndpoint(4, uint64(i), uint64(i)) + eDesired = makeEndpoint(4, uint64(i), uint64(i+1)) + } + for _, ch := range chActual { + ch <- eActual + } + for _, ch := range chDesired { + ch <- eDesired + } + } + for i := 0; i < b.N; i++ { + close(chActual[i]) + close(chDesired[i]) + } + subject := common.NewSyncSubject(v1.NewEndpoint) + // logger := zaptest.NewLogger(b).Sugar() + logger := zap.New(zapcore.NewTee()).Sugar() + b.ResetTimer() + for i := 0; i < b.N; i++ { + d := NewDelta(context.Background(), chActual[i], chDesired[i], subject, logger) + err := d.Wait() + assert.NoError(b, err, "delta should not fail") + } +}