Skip to content

Commit

Permalink
Use patch instead of update for remote crs in cr-syncer / use differe…
Browse files Browse the repository at this point in the history
…nt workqueue in cr-syncer
  • Loading branch information
oliver-goetz committed Jan 28, 2021
1 parent 6990b46 commit 965fd98
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 30 deletions.
13 changes: 2 additions & 11 deletions src/Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/go/cmd/cr-syncer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ go_library(
"@io_k8s_apimachinery//pkg/apis/meta/v1/unstructured:go_default_library",
"@io_k8s_apimachinery//pkg/runtime:go_default_library",
"@io_k8s_apimachinery//pkg/runtime/schema:go_default_library",
"@io_k8s_apimachinery//pkg/types:go_default_library",
"@io_k8s_apimachinery//pkg/watch:go_default_library",
"@io_k8s_client_go//dynamic:go_default_library",
"@io_k8s_client_go//rest:go_default_library",
Expand Down
75 changes: 56 additions & 19 deletions src/go/cmd/cr-syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@ package main

import (
"context"
"encoding/json"
"fmt"
"log"
"net/http"
"strconv"
"time"

"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
Expand All @@ -30,6 +32,7 @@ import (
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/tools/cache"
Expand Down Expand Up @@ -171,8 +174,8 @@ func newCRSyncer(
subtree: annotations[annotationStatusSubtree],
upstream: remote.Resource(gvr).Namespace(ns),
downstream: local.Resource(gvr).Namespace(ns),
upstreamQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "upstream"),
downstreamQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "downstream"),
upstreamQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemFastSlowRateLimiter(time.Millisecond*500, time.Second*5, 5), "upstream"),
downstreamQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemFastSlowRateLimiter(time.Millisecond*500, time.Second*5, 5), "downstream"),
done: make(chan struct{}),
}
switch src := annotations[annotationSpecSource]; src {
Expand Down Expand Up @@ -366,45 +369,79 @@ func (s *crSyncer) syncDownstream(key string) error {
}
return nil
}
dst := dstObj.(*unstructured.Unstructured).DeepCopy()

// Copy full status or subtree from src to dst.
// Copy full status or subtree from src status.
var status interface{}
if s.subtree == "" {
dst.Object["status"] = src.Object["status"]
status = src.Object["status"]
} else if src.Object["status"] != nil {
srcStatus, ok := src.Object["status"].(map[string]interface{})
if !ok {
return fmt.Errorf("Expected status of %s in downstream cluster to be a dict", src.GetName())
}
if dst.Object["status"] == nil {
dst.Object["status"] = make(map[string]interface{})
}
dstStatus, ok := dst.Object["status"].(map[string]interface{})
if !ok {
return fmt.Errorf("Expected status of %s in upstream cluster to be a dict", src.GetName())
}
if srcStatus[s.subtree] != nil {
dstStatus[s.subtree] = srcStatus[s.subtree]
} else {
delete(dstStatus, s.subtree)
status = srcStatus[s.subtree]
}
}

dst := &unstructured.Unstructured{map[string]interface{}{}}
setAnnotation(dst, annotationResourceVersion, src.GetResourceVersion())

patchData := make([]map[string]interface{}, 0, 2)
patchData = append(patchData, map[string]interface{}{
"op": "add",
"path": "/metadata/annotations",
"value": dst.GetAnnotations(),
})

// We need to make a dedicated UpdateStatus call if the status is defined
// as an explicit subresource of the CRD.
if statusIsSubresource {
// Status must not be null/nil.
if dst.Object["status"] == nil {
dst.Object["status"] = struct{}{}
if status == nil {
status = struct{}{}
}
updated, err := s.upstream.UpdateStatus(dst, metav1.UpdateOptions{})
if s.subtree == "" || status == struct{}{} {
patchData = append(patchData, map[string]interface{}{
"op": "add",
"path": "/status",
"value": status,
})
} else {
patchData = append(patchData, map[string]interface{}{
"op": "add",
"path": fmt.Sprintf("/status/%s", s.subtree),
"value": status,
})
}
patchDataJSON, err := json.Marshal(patchData)
if err != nil {
return newAPIErrorf(dst, "Marshalling JSON failed: %s", err)
}
updated, err := s.upstream.Patch(dstObj.(*unstructured.Unstructured).GetName(), types.JSONPatchType, patchDataJSON, metav1.PatchOptions{}, "status")
if err != nil {
return newAPIErrorf(dst, "update status failed: %s", err)
}
dst = updated
} else {
updated, err := s.upstream.Update(dst, metav1.UpdateOptions{})
if s.subtree == "" || status == nil {
patchData = append(patchData, map[string]interface{}{
"op": "add",
"path": "/status",
"value": status,
})
} else {
patchData = append(patchData, map[string]interface{}{
"op": "add",
"path": fmt.Sprintf("/status/%s", s.subtree),
"value": status,
})
}
patchDataJSON, err := json.Marshal(patchData)
if err != nil {
return newAPIErrorf(dst, "Marshalling JSON failed: %s", err)
}
updated, err := s.upstream.Patch(dstObj.(*unstructured.Unstructured).GetName(), types.JSONPatchType, patchDataJSON, metav1.PatchOptions{})
if err != nil {
return newAPIErrorf(dst, "update failed: %s", err)
}
Expand Down

0 comments on commit 965fd98

Please sign in to comment.