Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rewrite delta to use only a single goroutine #373

Merged
merged 2 commits into from
Sep 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ require (
github.com/jessevdk/go-flags v1.5.0
github.com/jmoiron/sqlx v1.3.4
github.com/pkg/errors v0.9.1
github.com/stretchr/testify v1.7.0
go.uber.org/zap v1.19.1
golang.org/x/exp v0.0.0-20210514180818-737f94c0881e
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
Expand Down
148 changes: 65 additions & 83 deletions pkg/icingadb/delta.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,10 @@ package icingadb

import (
"context"
"github.com/icinga/icingadb/pkg/com"
"github.com/icinga/icingadb/pkg/common"
"github.com/icinga/icingadb/pkg/contracts"
"github.com/icinga/icingadb/pkg/utils"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"sync"
"time"
)

Expand All @@ -22,7 +19,8 @@ type Delta struct {
logger *zap.SugaredLogger
}

// NewDelta creates a new Delta and starts calculating it.
// NewDelta creates a new Delta and starts calculating it. The caller must ensure
// that no duplicate entities are sent to the same stream.
func NewDelta(ctx context.Context, actual, desired <-chan contracts.Entity, subject *common.SyncSubject, logger *zap.SugaredLogger) *Delta {
delta := &Delta{
Subject: subject,
Expand All @@ -43,98 +41,82 @@ func (delta *Delta) Wait() error {
func (delta *Delta) run(ctx context.Context, actualCh, desiredCh <-chan contracts.Entity) {
defer close(delta.done)

start := time.Now()
var endActual, endDesired time.Time
var numActual, numDesired uint64

actual := EntitiesById{} // only read from actualCh (so far)
desired := EntitiesById{} // only read from desiredCh (so far)

var update EntitiesById
if delta.Subject.WithChecksum() {
update = EntitiesById{}
update = EntitiesById{} // read from actualCh and desiredCh with mismatching checksums
}
actual := EntitiesById{}
desired := EntitiesById{}
var mtx, updateMtx sync.Mutex
g, ctx := errgroup.WithContext(ctx)

g.Go(func() error {
var cnt com.Counter
defer utils.Timed(time.Now(), func(elapsed time.Duration) {
delta.logger.Debugf(
"Synced %d actual elements of type %s in %s", cnt.Val(), utils.Name(delta.Subject.Entity()), elapsed)
})
for {
select {
case a, ok := <-actualCh:
if !ok {
return nil
}

id := a.ID().String()
mtx.Lock()

if d, ok := desired[id]; ok {
delete(desired, id)
mtx.Unlock()

if delta.Subject.WithChecksum() && !a.(contracts.Checksumer).Checksum().Equal(d.(contracts.Checksumer).Checksum()) {
updateMtx.Lock()
update[id] = d
updateMtx.Unlock()
}
} else {
actual[id] = a
mtx.Unlock()
}

cnt.Inc()
case <-ctx.Done():
return ctx.Err()
for actualCh != nil || desiredCh != nil {
select {
case actualValue, ok := <-actualCh:
if !ok {
endActual = time.Now()
actualCh = nil // Done reading all actual entities, disable this case.
break
}
}
})

g.Go(func() error {
var cnt com.Counter
defer utils.Timed(time.Now(), func(elapsed time.Duration) {
delta.logger.Debugf(
"Synced %d desired elements of type %s in %s", cnt.Val(), utils.Name(delta.Subject.Entity()), elapsed)
})
for {
select {
case d, ok := <-desiredCh:
if !ok {
return nil
}
numActual++

id := d.ID().String()
mtx.Lock()

if a, ok := actual[id]; ok {
delete(actual, id)
mtx.Unlock()

if delta.Subject.WithChecksum() && !a.(contracts.Checksumer).Checksum().Equal(d.(contracts.Checksumer).Checksum()) {
updateMtx.Lock()
update[id] = d
updateMtx.Unlock()
}
} else {
desired[id] = d
mtx.Unlock()
id := actualValue.ID().String()
if desiredValue, ok := desired[id]; ok {
delete(desired, id)
if update != nil && !checksumsMatch(actualValue, desiredValue) {
update[id] = desiredValue
}
} else {
actual[id] = actualValue
}

cnt.Inc()
case <-ctx.Done():
return ctx.Err()
case desiredValue, ok := <-desiredCh:
if !ok {
endDesired = time.Now()
desiredCh = nil // Done reading all desired entities, disable this case.
break
}
}
})
numDesired++

if err := g.Wait(); err != nil {
delta.done <- err
id := desiredValue.ID().String()
if actualValue, ok := actual[id]; ok {
delete(actual, id)
if update != nil && !checksumsMatch(actualValue, desiredValue) {
update[id] = desiredValue
}
} else {
desired[id] = desiredValue
}

return
case <-ctx.Done():
delta.done <- ctx.Err()
return
}
}

delta.Create = desired
delta.Update = update
delta.Delete = actual
if delta.Subject.WithChecksum() {
delta.Update = update
}

delta.logger.Debugw("Delta finished",
zap.String("subject", utils.Name(delta.Subject.Entity())),
zap.Duration("time_total", time.Since(start)),
zap.Duration("time_actual", endActual.Sub(start)),
zap.Duration("time_desired", endDesired.Sub(start)),
zap.Uint64("num_actual", numActual),
zap.Uint64("num_desired", numDesired),
zap.Int("create", len(delta.Create)),
zap.Int("update", len(delta.Update)),
zap.Int("delete", len(delta.Delete)))
}

// checksumsMatch returns whether the checksums of two entities are the same.
// Both entities must implement contracts.Checksumer.
func checksumsMatch(a, b contracts.Entity) bool {
c1 := a.(contracts.Checksumer).Checksum()
c2 := b.(contracts.Checksumer).Checksum()
return c1.Equal(c2)
}
Loading