Skip to content

Commit

Permalink
Rewrite delta to use only a single goroutine
Browse files Browse the repository at this point in the history
  • Loading branch information
julianbrost committed Sep 24, 2021
1 parent 1e9a88b commit 66d9b0e
Showing 1 changed file with 65 additions and 83 deletions.
148 changes: 65 additions & 83 deletions pkg/icingadb/delta.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,10 @@ package icingadb

import (
"context"
"github.com/icinga/icingadb/pkg/com"
"github.com/icinga/icingadb/pkg/common"
"github.com/icinga/icingadb/pkg/contracts"
"github.com/icinga/icingadb/pkg/utils"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"sync"
"time"
)

Expand All @@ -22,7 +19,8 @@ type Delta struct {
logger *zap.SugaredLogger
}

// NewDelta creates a new Delta and starts calculating it.
// NewDelta creates a new Delta and starts calculating it. The caller must ensure
// that no duplicate entities are sent to the same stream.
func NewDelta(ctx context.Context, actual, desired <-chan contracts.Entity, subject *common.SyncSubject, logger *zap.SugaredLogger) *Delta {
delta := &Delta{
Subject: subject,
Expand All @@ -43,98 +41,82 @@ func (delta *Delta) Wait() error {
func (delta *Delta) run(ctx context.Context, actualCh, desiredCh <-chan contracts.Entity) {
defer close(delta.done)

start := time.Now()
var endActual, endDesired time.Time
var numActual, numDesired uint64

actual := EntitiesById{} // only read from actualCh (so far)
desired := EntitiesById{} // only read from desiredCh (so far)

var update EntitiesById
if delta.Subject.WithChecksum() {
update = EntitiesById{}
update = EntitiesById{} // read from actualCh and desiredCh with mismatching checksums
}
actual := EntitiesById{}
desired := EntitiesById{}
var mtx, updateMtx sync.Mutex
g, ctx := errgroup.WithContext(ctx)

g.Go(func() error {
var cnt com.Counter
defer utils.Timed(time.Now(), func(elapsed time.Duration) {
delta.logger.Debugf(
"Synced %d actual elements of type %s in %s", cnt.Val(), utils.Name(delta.Subject.Entity()), elapsed)
})
for {
select {
case a, ok := <-actualCh:
if !ok {
return nil
}

id := a.ID().String()
mtx.Lock()

if d, ok := desired[id]; ok {
delete(desired, id)
mtx.Unlock()

if delta.Subject.WithChecksum() && !a.(contracts.Checksumer).Checksum().Equal(d.(contracts.Checksumer).Checksum()) {
updateMtx.Lock()
update[id] = d
updateMtx.Unlock()
}
} else {
actual[id] = a
mtx.Unlock()
}

cnt.Inc()
case <-ctx.Done():
return ctx.Err()
for actualCh != nil || desiredCh != nil {
select {
case actualValue, ok := <-actualCh:
if !ok {
endActual = time.Now()
actualCh = nil // Done reading all actual entities, disable this case.
break
}
}
})

g.Go(func() error {
var cnt com.Counter
defer utils.Timed(time.Now(), func(elapsed time.Duration) {
delta.logger.Debugf(
"Synced %d desired elements of type %s in %s", cnt.Val(), utils.Name(delta.Subject.Entity()), elapsed)
})
for {
select {
case d, ok := <-desiredCh:
if !ok {
return nil
}
numActual++

id := d.ID().String()
mtx.Lock()

if a, ok := actual[id]; ok {
delete(actual, id)
mtx.Unlock()

if delta.Subject.WithChecksum() && !a.(contracts.Checksumer).Checksum().Equal(d.(contracts.Checksumer).Checksum()) {
updateMtx.Lock()
update[id] = d
updateMtx.Unlock()
}
} else {
desired[id] = d
mtx.Unlock()
id := actualValue.ID().String()
if desiredValue, ok := desired[id]; ok {
delete(desired, id)
if update != nil && !checksumsMatch(actualValue, desiredValue) {
update[id] = desiredValue
}
} else {
actual[id] = actualValue
}

cnt.Inc()
case <-ctx.Done():
return ctx.Err()
case desiredValue, ok := <-desiredCh:
if !ok {
endDesired = time.Now()
desiredCh = nil // Done reading all desired entities, disable this case.
break
}
}
})
numDesired++

if err := g.Wait(); err != nil {
delta.done <- err
id := desiredValue.ID().String()
if actualValue, ok := actual[id]; ok {
delete(actual, id)
if update != nil && !checksumsMatch(actualValue, desiredValue) {
update[id] = desiredValue
}
} else {
desired[id] = desiredValue
}

return
case <-ctx.Done():
delta.done <- ctx.Err()
return
}
}

delta.Create = desired
delta.Update = update
delta.Delete = actual
if delta.Subject.WithChecksum() {
delta.Update = update
}

delta.logger.Debugw("Delta finished",
zap.String("subject", utils.Name(delta.Subject.Entity())),
zap.Duration("time_total", time.Since(start)),
zap.Duration("time_actual", endActual.Sub(start)),
zap.Duration("time_desired", endDesired.Sub(start)),
zap.Uint64("num_actual", numActual),
zap.Uint64("num_desired", numDesired),
zap.Int("create", len(delta.Create)),
zap.Int("update", len(delta.Update)),
zap.Int("delete", len(delta.Delete)))
}

// checksumsMatch returns whether the checksums of two entities are the same.
// Both entities must implement contracts.Checksumer.
func checksumsMatch(a, b contracts.Entity) bool {
c1 := a.(contracts.Checksumer).Checksum()
c2 := b.(contracts.Checksumer).Checksum()
return c1.Equal(c2)
}

0 comments on commit 66d9b0e

Please sign in to comment.