From 76b6a0eff9345ff18f34ba3b2c44847c317293fb Mon Sep 17 00:00:00 2001 From: Yuan Tang Date: Fri, 26 Mar 2021 15:18:50 -0400 Subject: [PATCH] feat(controller): Add last hit timestamp to memoization caches (#5487) Signed-off-by: terrytangyuan --- workflow/common/common.go | 3 +++ workflow/controller/cache/cache.go | 1 + workflow/controller/cache/configmap_cache.go | 24 ++++++++++++++++++-- workflow/controller/cache_test.go | 23 +++++++++++++++++++ 4 files changed, 49 insertions(+), 2 deletions(-) diff --git a/workflow/common/common.go b/workflow/common/common.go index b2f5a2fff46f..d35119182e53 100644 --- a/workflow/common/common.go +++ b/workflow/common/common.go @@ -84,6 +84,9 @@ const ( // LabelKeyOnExit is a label applied to Pods that are run from onExit nodes, so that they are not shut down when stopping a Workflow LabelKeyOnExit = workflow.WorkflowFullName + "/on-exit" + // LabelKeyCacheLastHitTimestamp is the timestamp when the memoization cache is last hit. + LabelKeyCacheLastHitTimestamp = "last-hit-timestamp" + // ExecutorArtifactBaseDir is the base directory in the init container in which artifacts will be copied to. // Each artifact will be named according to its input name (e.g: /argo/inputs/artifacts/CODE) ExecutorArtifactBaseDir = "/argo/inputs/artifacts" diff --git a/workflow/controller/cache/cache.go b/workflow/controller/cache/cache.go index da44145f9c17..910c7b556b94 100644 --- a/workflow/controller/cache/cache.go +++ b/workflow/controller/cache/cache.go @@ -22,6 +22,7 @@ type Entry struct { NodeID string `json:"nodeID"` Outputs *wfv1.Outputs `json:"outputs"` CreationTimestamp metav1.Time `json:"creationTimestamp"` + LastHitTimestamp metav1.Time `json:"lastHitTimestamp"` } func (e *Entry) Hit() bool { diff --git a/workflow/controller/cache/configmap_cache.go b/workflow/controller/cache/configmap_cache.go index 7420a8beea35..421470bac826 100644 --- a/workflow/controller/cache/configmap_cache.go +++ b/workflow/controller/cache/configmap_cache.go @@ -15,6 +15,7 @@ import ( "k8s.io/client-go/kubernetes" wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" + "github.com/argoproj/argo-workflows/v3/workflow/common" ) type configMapCache struct { @@ -60,7 +61,8 @@ func (c *configMapCache) Load(ctx context.Context, key string) (*Entry, error) { } c.logInfo(log.Fields{}, "config map cache loaded") - + hitTime := time.Now() + cm.SetLabels(map[string]string{common.LabelKeyCacheLastHitTimestamp: hitTime.Format(time.RFC3339)}) rawEntry, ok := cm.Data[key] if !ok || rawEntry == "" { c.logInfo(log.Fields{}, "config map cache miss: entry does not exist") @@ -73,6 +75,20 @@ func (c *configMapCache) Load(ctx context.Context, key string) (*Entry, error) { return nil, fmt.Errorf("malformed cache entry: could not unmarshal JSON; unable to parse: %w", err) } + entry.LastHitTimestamp = metav1.Time{Time: hitTime} + entryJSON, err := json.Marshal(entry) + if err != nil { + c.logError(err, log.Fields{"key": key}, "Unable to marshal cache entry with last hit timestamp") + return nil, fmt.Errorf("unable to marshal cache entry with last hit timestamp: %w", err) + } + cm.Data[key] = string(entryJSON) + + _, err = c.kubeClient.CoreV1().ConfigMaps(c.namespace).Update(ctx, cm, metav1.UpdateOptions{}) + if err != nil { + c.logError(err, log.Fields{}, "Error updating last hit timestamp on cache") + return nil, fmt.Errorf("error updating last hit timestamp on cache: %w", err) + } + return &entry, nil } @@ -102,10 +118,14 @@ func (c *configMapCache) Save(ctx context.Context, key string, nodeId string, va } } + creationTime := time.Now() + cache.SetLabels(map[string]string{common.LabelKeyCacheLastHitTimestamp: creationTime.Format(time.RFC3339)}) + newEntry := Entry{ NodeID: nodeId, Outputs: value, - CreationTimestamp: metav1.Time{Time: time.Now()}, + CreationTimestamp: metav1.Time{Time: creationTime}, + LastHitTimestamp: metav1.Time{Time: creationTime}, } entryJSON, err := json.Marshal(newEntry) diff --git a/workflow/controller/cache_test.go b/workflow/controller/cache_test.go index 6cd461dc4b67..bbd3e51db6fb 100644 --- a/workflow/controller/cache_test.go +++ b/workflow/controller/cache_test.go @@ -2,13 +2,16 @@ package controller import ( "context" + "encoding/json" "testing" + "time" "github.com/stretchr/testify/assert" apiv1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" + "github.com/argoproj/argo-workflows/v3/workflow/common" "github.com/argoproj/argo-workflows/v3/workflow/controller/cache" ) @@ -45,8 +48,21 @@ func TestConfigMapCacheLoadHit(t *testing.T) { _, err := controller.kubeclientset.CoreV1().ConfigMaps("default").Create(ctx, &sampleConfigMapCacheEntry, metav1.CreateOptions{}) assert.NoError(t, err) c := cache.NewConfigMapCache("default", controller.kubeclientset, "whalesay-cache") + + cm, err := controller.kubeclientset.CoreV1().ConfigMaps("default").Get(ctx, sampleConfigMapCacheEntry.Name, metav1.GetOptions{}) + assert.NoError(t, err) + assert.Nil(t, cm.Labels) + entry, err := c.Load(ctx, "hi-there-world") assert.NoError(t, err) + + cm, err = controller.kubeclientset.CoreV1().ConfigMaps("default").Get(ctx, sampleConfigMapCacheEntry.Name, metav1.GetOptions{}) + assert.NoError(t, err) + lastHitTimestampLabel, err := time.Parse(time.RFC3339, cm.Labels[common.LabelKeyCacheLastHitTimestamp]) + assert.NoError(t, err) + assert.True(t, lastHitTimestampLabel.After(entry.CreationTimestamp.Time)) + assert.Equal(t, lastHitTimestampLabel.Format(time.RFC3339), entry.LastHitTimestamp.Time.Format(time.RFC3339)) + outputs := entry.Outputs assert.NoError(t, err) if assert.Len(t, outputs.Parameters, 1) { @@ -87,4 +103,11 @@ func TestConfigMapCacheSave(t *testing.T) { cm, err := controller.kubeclientset.CoreV1().ConfigMaps("default").Get(ctx, "whalesay-cache", metav1.GetOptions{}) assert.NoError(t, err) assert.NotNil(t, cm) + lastHitTimestampLabel, err := time.Parse(time.RFC3339, cm.Labels[common.LabelKeyCacheLastHitTimestamp]) + assert.NoError(t, err) + var entry cache.Entry + err = json.Unmarshal([]byte(cm.Data["hi-there-world"]), &entry) + assert.NoError(t, err) + assert.Equal(t, lastHitTimestampLabel.Format(time.RFC3339), entry.LastHitTimestamp.Time.Format(time.RFC3339)) + assert.Equal(t, lastHitTimestampLabel.Format(time.RFC3339), entry.CreationTimestamp.Time.Format(time.RFC3339)) }