Skip to content

Commit

Permalink
feat: Add custom event aggregator function with annotations (#9247)
Browse files Browse the repository at this point in the history
* feat: Add custom event aggregator function with annotations

Signed-off-by: Yuan Tang <terrytangyuan@gmail.com>

* chore: test env var

Signed-off-by: Yuan Tang <terrytangyuan@gmail.com>

* chore: sort the vals

Signed-off-by: Yuan Tang <terrytangyuan@gmail.com>

* Update event_recorder_manager.go
  • Loading branch information
terrytangyuan authored Jul 29, 2022
1 parent 1899a7b commit fd6c7a7
Show file tree
Hide file tree
Showing 5 changed files with 90 additions and 2 deletions.
3 changes: 2 additions & 1 deletion docs/environment-variables.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ most users. Environment variables may be removed at any time.
| `CRON_SYNC_PERIOD` | `time.Duration` | `10s` | How often to sync cron workflows. |
| `DEFAULT_REQUEUE_TIME` | `time.Duration` | `10s` | The re-queue time for the rate limiter of the workflow queue. |
| `EXPRESSION_TEMPLATES` | `bool` | `true` | Escape hatch to disable expression templates. |
| `EVENT_AGGREGATION_WITH_ANNOTATIONS` | `bool` | `false` | Whether event annotations will be used when aggregating events. |
| `GRPC_MESSAGE_SIZE` | `string` | Use different GRPC Max message size for Argo server deployment (supporting huge workflows). |
| `GZIP_IMPLEMENTATION` | `string` | `PGZip` | The implementation of compression/decompression. Currently only "`PGZip`" and "`GZip`" are supported. |
| `INFORMER_WRITE_BACK` | `bool` | `true` | Whether to write back to informer instead of catching up. |
Expand All @@ -40,7 +41,7 @@ most users. Environment variables may be removed at any time.
| `LEADER_ELECTION_RETRY_PERIOD` | `time.Duration` | `5s` | The duration that the leader election clients should wait between tries of actions. |
| `MAX_OPERATION_TIME` | `time.Duration` | `30s` | The maximum time a workflow operation is allowed to run for before re-queuing the workflow onto the work queue. |
| `OFFLOAD_NODE_STATUS_TTL` | `time.Duration` | `5m` | The TTL to delete the offloaded node status. Currently only used for testing. |
| `POD_NAMES` | `string` | `v2` | Whether to have pod names contain the template name (v2) or be the node id (v1) - should be set the same for Argo Server. |
| `POD_NAMES` | `string` | `v2` | Whether to have pod names contain the template name (v2) or be the node id (v1) - should be set the same for Argo Server. |
| `RECENTLY_STARTED_POD_DURATION` | `time.Duration` | `10s` | The duration of a pod before the pod is considered to be recently started. |
| `RETRY_BACKOFF_DURATION` | `time.Duration` | `10ms` | The retry back-off duration when retrying API calls. |
| `RETRY_BACKOFF_FACTOR` | `float` | `2.0` | The retry back-off factor when retrying API calls. |
Expand Down
2 changes: 2 additions & 0 deletions workflow/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ const (
AnnotationKeyNodeName = workflow.WorkflowFullName + "/node-name"
// AnnotationKeyNodeName is the node's type
AnnotationKeyNodeType = workflow.WorkflowFullName + "/node-type"
// AnnotationKeyNodeStartTime is the node's start timestamp.
AnnotationKeyNodeStartTime = workflow.WorkflowFullName + "/node-start-time"

// AnnotationKeyRBACRule is a rule to match the claims
AnnotationKeyRBACRule = workflow.WorkflowFullName + "/rbac-rule"
Expand Down
4 changes: 4 additions & 0 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2283,6 +2283,10 @@ func (woc *wfOperationCtx) recordNodePhaseEvent(node *wfv1.NodeStatus) {
annotations := map[string]string{
common.AnnotationKeyNodeType: string(node.Type),
common.AnnotationKeyNodeName: node.Name,
common.AnnotationKeyNodeID: node.ID,
// For retried/resubmitted workflows, the only main differentiation is the start time of nodes.
// We include this annotation here so that we could avoid combining events for those nodes.
common.AnnotationKeyNodeStartTime: strconv.FormatInt(node.StartedAt.UnixNano(), 10),
}
var involvedObject runtime.Object = woc.wf
if eventConfig.SendAsPod {
Expand Down
36 changes: 35 additions & 1 deletion workflow/events/event_recorder_manager.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
package events

import (
"sort"
"strings"
"sync"

"github.com/argoproj/argo-workflows/v3/util/env"

log "github.com/sirupsen/logrus"
apiv1 "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes"
Expand All @@ -24,14 +28,44 @@ type eventRecorderManager struct {
eventRecorders map[string]record.EventRecorder
}

// customEventAggregatorFuncWithAnnotations enhances the default `EventAggregatorByReasonFunc` by
// including annotation values as part of the event aggregation key.
func customEventAggregatorFuncWithAnnotations(event *apiv1.Event) (string, string) {
var joinedAnnotationsStr string
includeAnnotations := env.LookupEnvStringOr("EVENT_AGGREGATION_WITH_ANNOTATIONS", "false")
if annotations := event.GetAnnotations(); includeAnnotations == "true" && annotations != nil {
annotationVals := make([]string, 0, len(annotations))
for _, v := range annotations {
annotationVals = append(annotationVals, v)
}
sort.Strings(annotationVals)
joinedAnnotationsStr = strings.Join(annotationVals, "")
}
return strings.Join([]string{
event.Source.Component,
event.Source.Host,
event.InvolvedObject.Kind,
event.InvolvedObject.Namespace,
event.InvolvedObject.Name,
string(event.InvolvedObject.UID),
event.InvolvedObject.APIVersion,
event.Type,
event.Reason,
event.ReportingController,
event.ReportingInstance,
joinedAnnotationsStr,
},
""), event.Message
}

func (m *eventRecorderManager) Get(namespace string) record.EventRecorder {
m.lock.Lock()
defer m.lock.Unlock()
eventRecorder, ok := m.eventRecorders[namespace]
if ok {
return eventRecorder
}
eventCorrelationOption := record.CorrelatorOptions{BurstSize: defaultSpamBurst}
eventCorrelationOption := record.CorrelatorOptions{BurstSize: defaultSpamBurst, KeyFunc: customEventAggregatorFuncWithAnnotations}
eventBroadcaster := record.NewBroadcasterWithCorrelatorOptions(eventCorrelationOption)
eventBroadcaster.StartLogging(log.Debugf)
eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: m.kubernetes.CoreV1().Events(namespace)})
Expand Down
47 changes: 47 additions & 0 deletions workflow/events/event_recorder_manager_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package events

import (
"os"
"testing"

apiv1 "k8s.io/api/core/v1"

"github.com/stretchr/testify/assert"
)

const aggregationWithAnnotationsEnvKey = "EVENT_AGGREGATION_WITH_ANNOTATIONS"

func TestCustomEventAggregatorFuncWithAnnotations(t *testing.T) {
event := apiv1.Event{}
key, msg := customEventAggregatorFuncWithAnnotations(&event)
assert.Equal(t, "", key)
assert.Equal(t, "", msg)

event.Source = apiv1.EventSource{Component: "component1", Host: "host1"}
event.InvolvedObject.Name = "name1"
event.Message = "message1"

key, msg = customEventAggregatorFuncWithAnnotations(&event)
assert.Equal(t, "component1host1name1", key)
assert.Equal(t, "message1", msg)

// Test default behavior where annotations are not used for aggregation
event.ObjectMeta.Annotations = map[string]string{"key1": "val1", "key2": "val2"}
key, msg = customEventAggregatorFuncWithAnnotations(&event)
assert.Equal(t, "component1host1name1", key)
assert.Equal(t, "message1", msg)

_ = os.Setenv(aggregationWithAnnotationsEnvKey, "true")
key, msg = customEventAggregatorFuncWithAnnotations(&event)
assert.Equal(t, "component1host1name1val1val2", key)
assert.Equal(t, "message1", msg)

// Test annotations with values in different order
_ = os.Setenv(aggregationWithAnnotationsEnvKey, "true")
event.ObjectMeta.Annotations = map[string]string{"key2": "val2", "key1": "val1"}
key, msg = customEventAggregatorFuncWithAnnotations(&event)
assert.Equal(t, "component1host1name1val1val2", key)
assert.Equal(t, "message1", msg)

_ = os.Unsetenv(aggregationWithAnnotationsEnvKey)
}

0 comments on commit fd6c7a7

Please sign in to comment.