Skip to content
This repository has been archived by the owner on Oct 14, 2024. It is now read-only.

Commit

Permalink
refactor: reconcile event
Browse files Browse the repository at this point in the history
* extend `ReconcileEvent` interface with `Hash` function returning
  `string` type which does implement the `comparable` interface required
  by `map` for types used as keys.
* change `common.Queue` to rely on the `ReconcileEvent` type instead of
  `comparable` to provide more flexibility on how  the items pushed to
  the queue are compared to eachother avoiding duplicated items on the
  queue.
* add logging of items being pushed to the queue or popped
  from the queue for reconciling.
  • Loading branch information
chrisgacsal committed Jun 13, 2023
1 parent a54ca71 commit 1e2482a
Show file tree
Hide file tree
Showing 12 changed files with 95 additions and 45 deletions.
12 changes: 9 additions & 3 deletions runtime_scan/pkg/orchestrator/common/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,15 @@

package common

import log "github.com/sirupsen/logrus"
import (
"fmt"

"github.com/sirupsen/logrus"
)

type ReconcileEvent interface {
ToFields() log.Fields
String() string
fmt.Stringer

Hash() string
ToFields() logrus.Fields
}
5 changes: 3 additions & 2 deletions runtime_scan/pkg/orchestrator/common/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"github.com/openclarity/vmclarity/shared/pkg/log"
)

type Poller[T comparable] struct {
type Poller[T ReconcileEvent] struct {
// How often to re-poll the API for new items and try to publish them
// on the event channel. If the current items aren't handled they will
// be dropped and new items fetched when the PollPeriod is up.
Expand Down Expand Up @@ -54,8 +54,9 @@ func (p *Poller[T]) pollThenWait(ctx context.Context) {
if err != nil {
logger.Errorf("Failed to get items to reconcile: %v", err)
} else {
logger.Infof("Found %d items to reconcile, adding them to the queue", len(items))
logger.Debugf("Found %d items to reconcile, adding them to the queue", len(items))
for _, item := range items {
logger.WithFields(item.ToFields()).Debugf("Adding item to the queue")
p.Queue.Enqueue(item)
}
}
Expand Down
56 changes: 30 additions & 26 deletions runtime_scan/pkg/orchestrator/common/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,18 @@ import (
"time"
)

type Enqueuer[T comparable] interface {
type Enqueuer[T ReconcileEvent] interface {
Enqueue(item T)
EnqueueAfter(item T, d time.Duration)
}

type Dequeuer[T comparable] interface {
type Dequeuer[T ReconcileEvent] interface {
Dequeue(ctx context.Context) (T, error)
Done(item T)
RequeueAfter(item T, d time.Duration)
}

type Queue[T comparable] struct {
type Queue[T ReconcileEvent] struct {
// Channel used internally to block Dequeue when the queue is empty,
// and notify Dequeue when a new item is added through Enqueue.
itemAdded chan struct{}
Expand All @@ -45,7 +45,7 @@ type Queue[T comparable] struct {
// A map used as a set of unique items which are in the queue. This is
// used by Enqueue and Has to provide a quick reference to whats in the
// queue without needing to loop through the queue slice.
inqueue map[T]struct{}
inqueue map[string]T

// A map used as a set of unique items which are processing. This keeps
// track of items which have been Dequeued but are still being
Expand All @@ -55,25 +55,25 @@ type Queue[T comparable] struct {
// We use a separate map for this instead of reusing inqueue to prevent
// calls to Done() removing items from inqueue when they are actually
// in the queue slice.
processing map[T]struct{}
processing map[string]T

// A map to track items which have been scheduled to be queued at a
// later date. We keep track of these items to prevent them being
// enqueued earlier than their scheduled time through Enqueue.
waitingForEnqueue map[T]struct{}
waitingForEnqueue map[string]T

// A mutex lock which protects the queue from simultaneous reads and
// writes ensuring the queue can be used by multiple go routines safely.
l sync.Mutex
}

func NewQueue[T comparable]() *Queue[T] {
func NewQueue[T ReconcileEvent]() *Queue[T] {
return &Queue[T]{
itemAdded: make(chan struct{}),
queue: make([]T, 0),
inqueue: map[T]struct{}{},
processing: map[T]struct{}{},
waitingForEnqueue: map[T]struct{}{},
inqueue: make(map[string]T),
processing: make(map[string]T),
waitingForEnqueue: make(map[string]T),
}
}

Expand Down Expand Up @@ -106,8 +106,9 @@ func (q *Queue[T]) Dequeue(ctx context.Context) (T, error) {

item := q.queue[0]
q.queue = q.queue[1:]
delete(q.inqueue, item)
q.processing[item] = struct{}{}
itemKey := item.Hash()
delete(q.inqueue, itemKey)
q.processing[itemKey] = item

return item, nil
}
Expand Down Expand Up @@ -135,33 +136,35 @@ func (q *Queue[T]) EnqueueAfter(item T, d time.Duration) {
// EnqueueAfter and public RequeueAfter functions, these should not be called
// without obtaining a lock on Queue first.
func (q *Queue[T]) enqueueAfter(item T, d time.Duration) {
_, inQueue := q.inqueue[item]
_, isProcessing := q.processing[item]
_, isWaitingForEnqueue := q.waitingForEnqueue[item]
itemKey := item.Hash()
_, inQueue := q.inqueue[itemKey]
_, isProcessing := q.processing[itemKey]
_, isWaitingForEnqueue := q.waitingForEnqueue[itemKey]
if inQueue || isProcessing || isWaitingForEnqueue {
// item is already known by the queue so there is nothing to do
return
}

q.waitingForEnqueue[item] = struct{}{}
q.waitingForEnqueue[itemKey] = item
go func() {
<-time.After(d)
q.l.Lock()
defer q.l.Unlock()
delete(q.waitingForEnqueue, item)
delete(q.waitingForEnqueue, itemKey)
q.enqueue(item)
}()
}

// Internal enqueue function that it can be reused by public functions Enqueue
// and EnqueueAfter.
func (q *Queue[T]) enqueue(item T) {
_, inQueue := q.inqueue[item]
_, isProcessing := q.processing[item]
_, isWaitingForEnqueue := q.waitingForEnqueue[item]
itemKey := item.Hash()
_, inQueue := q.inqueue[itemKey]
_, isProcessing := q.processing[itemKey]
_, isWaitingForEnqueue := q.waitingForEnqueue[itemKey]
if !inQueue && !isProcessing && !isWaitingForEnqueue {
q.queue = append(q.queue, item)
q.inqueue[item] = struct{}{}
q.inqueue[itemKey] = item

select {
case q.itemAdded <- struct{}{}:
Expand All @@ -187,9 +190,10 @@ func (q *Queue[T]) Has(item T) bool {
q.l.Lock()
defer q.l.Unlock()

_, inQueue := q.inqueue[item]
_, isProcessing := q.processing[item]
_, isWaitingForEnqueue := q.waitingForEnqueue[item]
itemKey := item.Hash()
_, inQueue := q.inqueue[itemKey]
_, isProcessing := q.processing[itemKey]
_, isWaitingForEnqueue := q.waitingForEnqueue[itemKey]
return inQueue || isProcessing || isWaitingForEnqueue
}

Expand All @@ -198,7 +202,7 @@ func (q *Queue[T]) Has(item T) bool {
func (q *Queue[T]) Done(item T) {
q.l.Lock()
defer q.l.Unlock()
delete(q.processing, item)
delete(q.processing, item.Hash())
}

// RequeueAfter will mark a processing item as Done and then schedule it to be
Expand All @@ -209,7 +213,7 @@ func (q *Queue[T]) RequeueAfter(item T, d time.Duration) {
q.l.Lock()
defer q.l.Unlock()

delete(q.processing, item)
delete(q.processing, item.Hash())
q.enqueueAfter(item, d)
}

Expand Down
15 changes: 15 additions & 0 deletions runtime_scan/pkg/orchestrator/common/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"testing"

"github.com/google/go-cmp/cmp"
"github.com/sirupsen/logrus"
)

type AddAction struct {
Expand All @@ -43,6 +44,20 @@ type TestObject struct {
ID string
}

func (o TestObject) ToFields() logrus.Fields {
return logrus.Fields{
"ID": o.ID,
}
}

func (o TestObject) String() string {
return o.ID
}

func (o TestObject) Hash() string {
return o.ID
}

// nolint:cyclop
func TestQueue(t *testing.T) {
tests := []struct {
Expand Down
10 changes: 6 additions & 4 deletions runtime_scan/pkg/orchestrator/common/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func NewRequeueAfterError(d time.Duration, msg string) error {
return RequeueAfterError{d, msg}
}

type Reconciler[T comparable] struct {
type Reconciler[T ReconcileEvent] struct {
// Reconcile function which will be called whenever there is an event on EventChan
ReconcileFunction func(context.Context, T) error

Expand All @@ -63,11 +63,11 @@ func (r *Reconciler[T]) Start(ctx context.Context) {
if err != nil {
logger.Errorf("Failed to get item from queue: %v", err)
} else {
// NOTE: shadowing logger variable is intentional
logger := logger.WithFields(item.ToFields())
logger.Infof("Reconciling item")
timeoutCtx, cancel := context.WithTimeout(ctx, r.ReconcileTimeout)
err := r.ReconcileFunction(timeoutCtx, item)
if err != nil {
logger.Errorf("Failed to reconcile item: %v", err)
}

// Make sure timeout context is canceled to
// prevent orphaned resources
Expand All @@ -79,8 +79,10 @@ func (r *Reconciler[T]) Start(ctx context.Context) {
// item as Done.
var requeueAfterError RequeueAfterError
if errors.As(err, &requeueAfterError) {
logger.Infof("Requeue item: %v", err)
r.Queue.RequeueAfter(item, requeueAfterError.d)
} else {
logger.Errorf("Failed to reconcile item: %v", err)
r.Queue.Done(item)
}
}
Expand Down
6 changes: 5 additions & 1 deletion runtime_scan/pkg/orchestrator/scanconfigwatcher/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type ScanConfigReconcileEvent struct {
ScanConfigID models.ScanConfigID
}

func (e *ScanConfigReconcileEvent) ToFields() log.Fields {
func (e ScanConfigReconcileEvent) ToFields() log.Fields {
return log.Fields{
"ScanConfigID": e.ScanConfigID,
}
Expand All @@ -36,3 +36,7 @@ func (e *ScanConfigReconcileEvent) ToFields() log.Fields {
func (e ScanConfigReconcileEvent) String() string {
return fmt.Sprintf("ScanConfigID=%s", e.ScanConfigID)
}

func (e ScanConfigReconcileEvent) Hash() string {
return e.ScanConfigID
}
2 changes: 0 additions & 2 deletions runtime_scan/pkg/orchestrator/scanconfigwatcher/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,6 @@ func (w *Watcher) Reconcile(ctx context.Context, event ScanConfigReconcileEvent)
logger := log.GetLoggerFromContextOrDiscard(ctx).WithFields(event.ToFields())
ctx = log.SetLoggerForContext(ctx, logger)

logger.Infof("Reconciling ScanConfig event")

scanConfig, err := w.backend.GetScanConfig(ctx, event.ScanConfigID, models.GetScanConfigsScanConfigIDParams{})
if err != nil || scanConfig == nil {
return fmt.Errorf("failed to fetch ScanConfig. Event=%s: %w", event, err)
Expand Down
18 changes: 17 additions & 1 deletion runtime_scan/pkg/orchestrator/scanresultprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"fmt"
"time"

"github.com/sirupsen/logrus"

"github.com/openclarity/vmclarity/api/models"
"github.com/openclarity/vmclarity/runtime_scan/pkg/orchestrator/common"
"github.com/openclarity/vmclarity/shared/pkg/backendclient"
Expand Down Expand Up @@ -119,7 +121,21 @@ func (srp *ScanResultProcessor) Reconcile(ctx context.Context, event ScanResultR
}

type ScanResultReconcileEvent struct {
ScanResultID string
ScanResultID models.ScanResultID
}

func (e ScanResultReconcileEvent) ToFields() logrus.Fields {
return logrus.Fields{
"ScanResultID": e.ScanResultID,
}
}

func (e ScanResultReconcileEvent) String() string {
return fmt.Sprintf("ScanResultID=%s", e.ScanResultID)
}

func (e ScanResultReconcileEvent) Hash() string {
return e.ScanResultID
}

func (srp *ScanResultProcessor) GetItems(ctx context.Context) ([]ScanResultReconcileEvent, error) {
Expand Down
6 changes: 5 additions & 1 deletion runtime_scan/pkg/orchestrator/scanresultwatcher/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type ScanResultReconcileEvent struct {
TargetID models.TargetID
}

func (e *ScanResultReconcileEvent) ToFields() log.Fields {
func (e ScanResultReconcileEvent) ToFields() log.Fields {
return log.Fields{
"ScanResultID": e.ScanResultID,
"ScanID": e.ScanID,
Expand All @@ -40,3 +40,7 @@ func (e *ScanResultReconcileEvent) ToFields() log.Fields {
func (e ScanResultReconcileEvent) String() string {
return fmt.Sprintf("ScanResultID=%s ScanID=%s TargetID=%s", e.ScanResultID, e.ScanID, e.TargetID)
}

func (e ScanResultReconcileEvent) Hash() string {
return e.ScanResultID
}
2 changes: 0 additions & 2 deletions runtime_scan/pkg/orchestrator/scanresultwatcher/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,6 @@ func (w *Watcher) Reconcile(ctx context.Context, event ScanResultReconcileEvent)
logger := log.GetLoggerFromContextOrDiscard(ctx).WithFields(event.ToFields())
ctx = log.SetLoggerForContext(ctx, logger)

logger.Infof("Reconciling ScanResult event")

scanResult, err := w.backend.GetScanResult(ctx, event.ScanResultID, models.GetScanResultsScanResultIDParams{
Expand: utils.PointerTo("scan,target"),
})
Expand Down
6 changes: 5 additions & 1 deletion runtime_scan/pkg/orchestrator/scanwatcher/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type ScanReconcileEvent struct {
ScanID models.ScanID
}

func (e *ScanReconcileEvent) ToFields() log.Fields {
func (e ScanReconcileEvent) ToFields() log.Fields {
return log.Fields{
"ScanID": e.ScanID,
}
Expand All @@ -36,3 +36,7 @@ func (e *ScanReconcileEvent) ToFields() log.Fields {
func (e ScanReconcileEvent) String() string {
return fmt.Sprintf("ScanID=%s", e.ScanID)
}

func (e ScanReconcileEvent) Hash() string {
return e.ScanID
}
2 changes: 0 additions & 2 deletions runtime_scan/pkg/orchestrator/scanwatcher/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,6 @@ func (w *Watcher) Reconcile(ctx context.Context, event ScanReconcileEvent) error
logger := log.GetLoggerFromContextOrDiscard(ctx).WithFields(event.ToFields())
ctx = log.SetLoggerForContext(ctx, logger)

logger.Infof("Reconciling Scan event")

params := models.GetScansScanIDParams{
Expand: utils.PointerTo("scanConfig"),
}
Expand Down

0 comments on commit 1e2482a

Please sign in to comment.