Skip to content

Commit

Permalink
Improve applier and status synchronization
Browse files Browse the repository at this point in the history
- Use locks to protect the MultiError, Syncing, and Apply methods
  independently. This is a hack to improve thread-safety without
  changing the Applier interface. In the future, the interface
  should probably be rewritten to use channels instead.
- Centralize aggregation of sync errors in `updater.status()`
- Fix Syncing condition errors to include errors from all reconciler
  pipeline phases: rendering, source, & sync.
- Move the periodic sync status update from the reconciler into the
  parser. This removes the need for some synchronization that wasn't
  previously thread-safe.
- Remove several parser methods which are obsolete with the above
  changes.
- Rename applier.Interface to KptApplier to make way for the
  KptDestroyer interface later.
- Route the retry delay and status update delay up through the
  parser and reconciler. These are now hard-coded with the rest
  of the constants. They could be made into options later, but I
  didn't want to expose new options until we need them exposed.
- Rename -Frequency and -Period options to -Delay which more
  accurately reflects how they are used. Period is how long
  something takes. Frequency is how often something occurs.
  Delay is the time between occurances.
- Set ErrorSummary fields to nil when empty (no errors)

Change-Id: I1c3373f98bb5d842ce113ba61dc0c87e1119c665
  • Loading branch information
karlkfi committed Nov 5, 2022
1 parent 8d9c21b commit dac0ce9
Show file tree
Hide file tree
Showing 24 changed files with 775 additions and 647 deletions.
37 changes: 18 additions & 19 deletions cmd/hydration-controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,17 @@ import (
"flag"
"os"
"strings"
"time"

"k8s.io/klog/v2"
"k8s.io/klog/v2/klogr"
"kpt.dev/configsync/pkg/api/configsync"
"kpt.dev/configsync/pkg/api/configsync/v1beta1"
"kpt.dev/configsync/pkg/hydrate"
"kpt.dev/configsync/pkg/importer/filesystem/cmpath"
"kpt.dev/configsync/pkg/kmetrics"
"kpt.dev/configsync/pkg/profiler"
"kpt.dev/configsync/pkg/reconcilermanager"
"kpt.dev/configsync/pkg/reconcilermanager/controllers"
"kpt.dev/configsync/pkg/util/log"
ctrl "sigs.k8s.io/controller-runtime"
)
Expand All @@ -55,15 +56,18 @@ var (
syncDir = flag.String("sync-dir", os.Getenv(reconcilermanager.SyncDirKey),
"Relative path of the root directory within the repo.")

hydrationPollingPeriodStr = flag.String("polling-period", os.Getenv(reconcilermanager.HydrationPollingPeriod),
// TODO: rename to `polling-delay` to match internal name.
pollingDelay = flag.Duration("polling-period",
controllers.PollingPeriod(reconcilermanager.HydrationPollingPeriod, configsync.DefaultHydrationPollingDelay),
"Period of time between checking the filesystem for rendering the DRY configs.")

// rehydratePeriod sets the hydration-controller to re-run the hydration process
// rehydrateDelay sets the hydration-controller to re-run the hydration process
// periodically when errors happen. It retries on both transient errors and permanent errors.
// Other ways to trigger the hydration process are:
// - push a new commit
// - delete the done file from the hydration-controller.
rehydratePeriod = flag.Duration("rehydrate-period", 30*time.Minute,
// TODO: rename to `rehydrate-delay` to match internal name.
rehydrateDelay = flag.Duration("rehydrate-period", configsync.DefaultHydrationRetryDelay,
"Period of time between rehydrating on errors.")

reconcilerName = flag.String("reconciler-name", os.Getenv(reconcilermanager.ReconcilerNameKey),
Expand Down Expand Up @@ -107,22 +111,17 @@ func main() {
dir := strings.TrimPrefix(*syncDir, "/")
relSyncDir := cmpath.RelativeOS(dir)

hydrationPollingPeriod, err := time.ParseDuration(*hydrationPollingPeriodStr)
if err != nil {
klog.Fatalf("Failed to get hydration polling period: %v", err)
}

hydrator := &hydrate.Hydrator{
DonePath: absDonePath,
SourceType: v1beta1.SourceType(*sourceType),
SourceRoot: absSourceRootDir,
HydratedRoot: absHydratedRootDir,
SourceLink: *sourceLinkDir,
HydratedLink: *hydratedLinkDir,
SyncDir: relSyncDir,
PollingFrequency: hydrationPollingPeriod,
RehydrateFrequency: *rehydratePeriod,
ReconcilerName: *reconcilerName,
DonePath: absDonePath,
SourceType: v1beta1.SourceType(*sourceType),
SourceRoot: absSourceRootDir,
HydratedRoot: absHydratedRootDir,
SourceLink: *sourceLinkDir,
HydratedLink: *hydratedLinkDir,
SyncDir: relSyncDir,
PollingDelay: *pollingDelay,
RehydrateDelay: *rehydrateDelay,
ReconcilerName: *reconcilerName,
}

hydrator.Run(context.Background())
Expand Down
10 changes: 6 additions & 4 deletions cmd/reconciler-manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,12 @@ var (
clusterName = flag.String("cluster-name", os.Getenv(reconcilermanager.ClusterNameKey),
"Cluster name to use for Cluster selection")

reconcilerPollingPeriod = flag.Duration("reconciler-polling-period", controllers.PollingPeriod(reconcilermanager.ReconcilerPollingPeriod, configsync.DefaultReconcilerPollingPeriod),
// TODO: rename to `reconciler-polling-delay` to match internal name.
reconcilerPollingDelay = flag.Duration("reconciler-polling-period", controllers.PollingPeriod(reconcilermanager.ReconcilerPollingPeriod, configsync.DefaultReconcilerPollingDelay),
"How often the reconciler should poll the filesystem for updates to the source or rendered configs.")

hydrationPollingPeriod = flag.Duration("hydration-polling-period", controllers.PollingPeriod(reconcilermanager.HydrationPollingPeriod, configsync.DefaultHydrationPollingPeriod),
// TODO: rename to `hydration-polling-delay` to match internal name.
hydrationPollingDelay = flag.Duration("hydration-polling-period", controllers.PollingPeriod(reconcilermanager.HydrationPollingPeriod, configsync.DefaultHydrationPollingDelay),
"How often the hydration-controller should poll the filesystem for rendering the DRY configs.")

setupLog = ctrl.Log.WithName("setup")
Expand Down Expand Up @@ -78,15 +80,15 @@ func main() {

watchFleetMembership := fleetMembershipCRDExists(mgr.GetConfig(), mgr.GetRESTMapper())

repoSync := controllers.NewRepoSyncReconciler(*clusterName, *reconcilerPollingPeriod, *hydrationPollingPeriod, mgr.GetClient(),
repoSync := controllers.NewRepoSyncReconciler(*clusterName, *reconcilerPollingDelay, *hydrationPollingDelay, mgr.GetClient(),
ctrl.Log.WithName("controllers").WithName(configsync.RepoSyncKind),
mgr.GetScheme())
if err := repoSync.SetupWithManager(mgr, watchFleetMembership); err != nil {
setupLog.Error(err, "unable to create controller", "controller", configsync.RepoSyncKind)
os.Exit(1)
}

rootSync := controllers.NewRootSyncReconciler(*clusterName, *reconcilerPollingPeriod, *hydrationPollingPeriod, mgr.GetClient(),
rootSync := controllers.NewRootSyncReconciler(*clusterName, *reconcilerPollingDelay, *hydrationPollingDelay, mgr.GetClient(),
ctrl.Log.WithName("controllers").WithName(configsync.RootSyncKind),
mgr.GetScheme())
if err := rootSync.SetupWithManager(mgr, watchFleetMembership); err != nil {
Expand Down
47 changes: 26 additions & 21 deletions cmd/reconciler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,14 @@ var (
fightDetectionThreshold = flag.Float64(
"fight-detection-threshold", 5.0,
"The rate of updates per minute to an API Resource at which the Syncer logs warnings about too many updates to the resource.")
// TODO: rename to `resync-delay` to match internal name
resyncPeriod = flag.Duration("resync-period", time.Hour,
"Period of time between forced re-syncs from source (even without a new commit).")
workers = flag.Int("workers", 1,
"Number of concurrent remediator workers to run at once.")
filesystemPollingPeriod = flag.Duration("filesystem-polling-period", controllers.PollingPeriod(reconcilermanager.ReconcilerPollingPeriod, configsync.DefaultReconcilerPollingPeriod),
// TODO: rename to `filesystem-polling-delay` to match internal name.
pollingDelay = flag.Duration("filesystem-polling-period",
controllers.PollingPeriod(reconcilermanager.ReconcilerPollingPeriod, configsync.DefaultReconcilerPollingDelay),
"Period of time between checking the filesystem for updates to the source or rendered configs.")

// Root-Repo-only flags. If set for a Namespace-scoped Reconciler, causes the Reconciler to fail immediately.
Expand Down Expand Up @@ -161,26 +164,28 @@ func main() {
}

opts := reconciler.Options{
ClusterName: *clusterName,
FightDetectionThreshold: *fightDetectionThreshold,
NumWorkers: *workers,
ReconcilerScope: declared.Scope(*scope),
ResyncPeriod: *resyncPeriod,
FilesystemPollingFrequency: *filesystemPollingPeriod,
SourceRoot: absSourceDir,
RepoRoot: absRepoRoot,
HydratedRoot: *hydratedRootDir,
HydratedLink: *hydratedLinkDir,
SourceRev: *sourceRev,
SourceBranch: *sourceBranch,
SourceType: v1beta1.SourceType(*sourceType),
SourceRepo: *sourceRepo,
SyncDir: relSyncDir,
SyncName: *syncName,
ReconcilerName: *reconcilerName,
StatusMode: *statusMode,
ReconcileTimeout: *reconcileTimeout,
APIServerTimeout: *apiServerTimeout,
ClusterName: *clusterName,
FightDetectionThreshold: *fightDetectionThreshold,
NumWorkers: *workers,
ReconcilerScope: declared.Scope(*scope),
PollingDelay: *pollingDelay,
ResyncDelay: *resyncPeriod,
RetryDelay: configsync.DefaultReconcilerRetryDelay,
StatusUpdateDelay: configsync.DefaultSyncStatusUpdateDelay,
SourceRoot: absSourceDir,
RepoRoot: absRepoRoot,
HydratedRoot: *hydratedRootDir,
HydratedLink: *hydratedLinkDir,
SourceRev: *sourceRev,
SourceBranch: *sourceBranch,
SourceType: v1beta1.SourceType(*sourceType),
SourceRepo: *sourceRepo,
SyncDir: relSyncDir,
SyncName: *syncName,
ReconcilerName: *reconcilerName,
StatusMode: *statusMode,
ReconcileTimeout: *reconcileTimeout,
APIServerTimeout: *apiServerTimeout,
}

if declared.Scope(*scope) == declared.RootReconciler {
Expand Down
5 changes: 1 addition & 4 deletions e2e/nomostest/wait_for_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,17 +244,14 @@ func (nt *NT) WaitForRootSyncRenderingError(rsName, code string, message string,
}

// WaitForRootSyncSyncError waits until the given error (code and message) is present on the RootSync resource
func (nt *NT) WaitForRootSyncSyncError(rsName, code string, message string, ignoreSyncingCondition bool, opts ...WaitOption) {
func (nt *NT) WaitForRootSyncSyncError(rsName, code string, message string, opts ...WaitOption) {
Wait(nt.T, fmt.Sprintf("RootSync %s rendering error code %s", rsName, code), nt.DefaultWaitTimeout,
func() error {
rs := fake.RootSyncObjectV1Beta1(rsName)
err := nt.Get(rs.GetName(), rs.GetNamespace(), rs)
if err != nil {
return err
}
if ignoreSyncingCondition {
return validateError(rs.Status.Sync.Errors, code, message)
}
syncingCondition := rootsync.GetCondition(rs.Status.Conditions, v1beta1.RootSyncSyncing)
return validateRootSyncError(rs.Status.Sync.Errors, syncingCondition, code, message, []v1beta1.ErrorSource{v1beta1.SyncError})
},
Expand Down
26 changes: 19 additions & 7 deletions e2e/testcases/multi_sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,10 +399,21 @@ func TestConflictingDefinitions_RootToRoot(t *testing.T) {
nt.RootRepos[rootSync2].CommitAndPush("add conflicting pod owner role")

nt.T.Logf("The admission webhook should deny the update request in Root %s", rootSync2)
nt.WaitForRootSyncSyncError(rootSync2, applier.ApplierErrorCode, "denied the request", false)
nt.T.Logf("Root %s should also get surfaced with the conflict error", configsync.RootSyncName)
// Ignore the Syncing condition because the sync status is updated by another reconciler, which doesn't update the SyncingCondition.
nt.WaitForRootSyncSyncError(configsync.RootSyncName, status.ManagementConflictErrorCode, "declared in another repository", true)
nt.WaitForRootSyncSyncError(rootSync2, applier.ApplierErrorCode, "denied the request")

// The first RootSync to apply the object will apply its manager annotation,
// which causes the webhook to reject attempts to update the object by other
// RootSyncs, the manager annotation won't change, so the first RootSync
// won't get any management conflict errors.
// The first RootSync would only get an error if the webhook was disabled.

// TODO: Should adoption fights happen with the webhook enabled?
// The remediator doesn't return a KptManagementConflictError, so the
// following error validation will never succeed.

// nt.T.Logf("Root %s should also get surfaced with the conflict error", configsync.RootSyncName)
// nt.WaitForRootSyncSyncError(configsync.RootSyncName, status.ManagementConflictErrorCode, "declared in another repository")

nt.T.Logf("The Role resource version should not be changed")
if err := nt.Validate("pods", testNs, &rbacv1.Role{},
nomostest.ResourceVersionEquals(nt, roleResourceVersion)); err != nil {
Expand All @@ -411,17 +422,18 @@ func TestConflictingDefinitions_RootToRoot(t *testing.T) {

nt.T.Logf("Stop the admission webhook, the remediator should report the conflicts")
nomostest.StopWebhook(nt)

nt.T.Logf("The Role resource version should be changed because two reconcilers are fighting with each other")
if _, err := nomostest.Retry(90*time.Second, func() error {
return nt.Validate("pods", testNs, &rbacv1.Role{},
nomostest.ResourceVersionNotEquals(nt, roleResourceVersion))
}); err != nil {
nt.T.Fatal(err)
}

nt.T.Logf("Both of the two RootSyncs still report problems because the remediators detect the conflicts")
// Ignore the Syncing condition because the sync status might be updated by another reconciler, which doesn't update the SyncingCondition.
nt.WaitForRootSyncSyncError(configsync.RootSyncName, status.ManagementConflictErrorCode, "declared in another repository", true)
nt.WaitForRootSyncSyncError(rootSync2, status.ManagementConflictErrorCode, "declared in another repository", true)
nt.WaitForRootSyncSyncError(configsync.RootSyncName, status.ManagementConflictErrorCode, "declared in another repository")
nt.WaitForRootSyncSyncError(rootSync2, status.ManagementConflictErrorCode, "declared in another repository")

nt.T.Logf("Remove the declaration from one Root repo %s", configsync.RootSyncName)
nt.RootRepos[configsync.RootSyncName].Remove(podRoleFilePath)
Expand Down
16 changes: 8 additions & 8 deletions e2e/testcases/sync_ordering_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func TestMultiDependencies(t *testing.T) {
nt.RootRepos[configsync.RootSyncName].Add("acme/cm0.yaml", fake.ConfigMapObject(core.Name(cm0Name), core.Namespace(namespaceName),
core.Annotation(dependson.Annotation, "/namespaces/bookstore/ConfigMap/cm2")))
nt.RootRepos[configsync.RootSyncName].CommitAndPush("Create a cyclic dependency between cm0, cm1, and cm2")
nt.WaitForRootSyncSyncError(configsync.RootSyncName, applier.ApplierErrorCode, "cyclic dependency", false)
nt.WaitForRootSyncSyncError(configsync.RootSyncName, applier.ApplierErrorCode, "cyclic dependency")

nt.T.Log("Verify that cm0 does not have the dependsOn annotation")
if err := nt.Validate(cm0Name, namespaceName, &corev1.ConfigMap{}, nomostest.MissingAnnotation(dependson.Annotation)); err != nil {
Expand Down Expand Up @@ -331,7 +331,7 @@ func TestExternalDependencyError(t *testing.T) {
nt.WaitForRepoSyncs()
nt.RootRepos[configsync.RootSyncName].Remove("acme/cm0.yaml")
nt.RootRepos[configsync.RootSyncName].CommitAndPush("Removing cm0 from the git repo")
nt.WaitForRootSyncSyncError(configsync.RootSyncName, applier.ApplierErrorCode, "dependency", false)
nt.WaitForRootSyncSyncError(configsync.RootSyncName, applier.ApplierErrorCode, "dependency")

// TestCase: cm1 depends on cm0; both are managed by ConfigSync.
// Both exist in the repo and in the cluster.
Expand All @@ -345,7 +345,7 @@ func TestExternalDependencyError(t *testing.T) {
nt.RootRepos[configsync.RootSyncName].Add("acme/cm0.yaml", fake.ConfigMapObject(core.Name(cm0Name), core.Namespace(namespaceName),
core.Annotation(metadata.ResourceManagementKey, metadata.ResourceManagementDisabled)))
nt.RootRepos[configsync.RootSyncName].CommitAndPush("Disabling management for cm0 in the git repo")
nt.WaitForRootSyncSyncError(configsync.RootSyncName, applier.ApplierErrorCode, "external dependency", false)
nt.WaitForRootSyncSyncError(configsync.RootSyncName, applier.ApplierErrorCode, "external dependency")

// TestCase: cm1 depends on cm0; cm0 is disabled.
// Neither exists in the cluster.
Expand All @@ -364,7 +364,7 @@ func TestExternalDependencyError(t *testing.T) {
nt.RootRepos[configsync.RootSyncName].Add("acme/cm1.yaml", fake.ConfigMapObject(core.Name(cm1Name), core.Namespace(namespaceName),
core.Annotation(dependson.Annotation, "/namespaces/bookstore/ConfigMap/cm0")))
nt.RootRepos[configsync.RootSyncName].CommitAndPush("Adding cm1 and cm0: cm1 depends on cm0, cm0 is disabled")
nt.WaitForRootSyncSyncError(configsync.RootSyncName, applier.ApplierErrorCode, "external dependency", false)
nt.WaitForRootSyncSyncError(configsync.RootSyncName, applier.ApplierErrorCode, "external dependency")

// TestCase: cm1 depends on object that is not in the repo or cluster.
// Expected an ExternalDependencyError
Expand All @@ -376,7 +376,7 @@ func TestExternalDependencyError(t *testing.T) {
nt.RootRepos[configsync.RootSyncName].Add("acme/cm1.yaml", fake.ConfigMapObject(core.Name(cm1Name), core.Namespace(namespaceName),
core.Annotation(dependson.Annotation, "/namespaces/bookstore/ConfigMap/cm-not-exist")))
nt.RootRepos[configsync.RootSyncName].CommitAndPush("Adding cm1: cm1 depends on a resource that doesn't exist in either the repo or in cluster")
nt.WaitForRootSyncSyncError(configsync.RootSyncName, applier.ApplierErrorCode, "external dependency", false)
nt.WaitForRootSyncSyncError(configsync.RootSyncName, applier.ApplierErrorCode, "external dependency")

// TestCase: cm1 depends on an object that is not in the repo, but in the cluster
// Expected an ExternalDependencyError
Expand All @@ -391,7 +391,7 @@ func TestExternalDependencyError(t *testing.T) {
nt.RootRepos[configsync.RootSyncName].Add("acme/cm1.yaml", fake.ConfigMapObject(core.Name(cm1Name), core.Namespace(namespaceName),
core.Annotation(dependson.Annotation, "/namespaces/bookstore/ConfigMap/cm4")))
nt.RootRepos[configsync.RootSyncName].CommitAndPush("Adding cm1: cm1 depends on a resource that only exists in the cluster")
nt.WaitForRootSyncSyncError(configsync.RootSyncName, applier.ApplierErrorCode, "external dependency", false)
nt.WaitForRootSyncSyncError(configsync.RootSyncName, applier.ApplierErrorCode, "external dependency")
nt.T.Log("Cleaning up")
nt.RootRepos[configsync.RootSyncName].Remove("acme/cm1.yaml")
nt.RootRepos[configsync.RootSyncName].CommitAndPush("remove cm1")
Expand Down Expand Up @@ -492,7 +492,7 @@ func TestDependencyWithReconciliation(t *testing.T) {
core.Annotation(dependson.Annotation, "/namespaces/bookstore/Pod/pod3")))
nt.RootRepos[configsync.RootSyncName].CommitAndPush("Add pod3 and pod4 (pod4 depends on pod3 and pod3 won't be reconciled)")
nt.WaitForRootSyncSyncError(configsync.RootSyncName, applier.ApplierErrorCode,
"skipped apply of Pod, bookstore/pod4: dependency apply reconcile timeout: bookstore_pod3__Pod", false)
"skipped apply of Pod, bookstore/pod4: dependency apply reconcile timeout: bookstore_pod3__Pod")

_, err = nomostest.Retry(20, func() error {
pod3 := &corev1.Pod{}
Expand Down Expand Up @@ -546,7 +546,7 @@ func TestDependencyWithReconciliation(t *testing.T) {
nt.T.Logf("Remove pod5 from the repo")
nt.RootRepos[configsync.RootSyncName].Remove("acme/pod5.yaml")
nt.RootRepos[configsync.RootSyncName].CommitAndPush("Remove pod5")
nt.WaitForRootSyncSyncError(configsync.RootSyncName, applier.ApplierErrorCode, "dependency", false)
nt.WaitForRootSyncSyncError(configsync.RootSyncName, applier.ApplierErrorCode, "dependency")

_, err = nomostest.Retry(20, func() error {
pod5 := &corev1.Pod{}
Expand Down
28 changes: 22 additions & 6 deletions pkg/api/configsync/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,29 @@ const (
// DefaultPeriodSecs is the default value in seconds between consecutive syncs.
DefaultPeriodSecs = 15

// DefaultReconcilerPollingPeriod defines how often the reconciler should poll
// the filesystem for updates to the source or rendered configs.
DefaultReconcilerPollingPeriod = 5 * time.Second
// DefaultReconcilerPollingDelay is the time delay between polling the
// filesystem for config updates to sync.
DefaultReconcilerPollingDelay = 5 * time.Second

// DefaultHydrationPollingPeriod defines how often the hydration controller
// should poll the filesystem for rendering the DRY configs.
DefaultHydrationPollingPeriod = 5 * time.Second
// DefaultHydrationPollingDelay is the time delay between polling the
// filesystem for config updates to render.
DefaultHydrationPollingDelay = 5 * time.Second

// DefaultHydrationRetryDelay is the time delay between attempts to
// re-render config after an error.
// TODO: replace with retry-backoff strategy
DefaultHydrationRetryDelay = 30 * time.Minute

// DefaultReconcilerRetryDelay is the time delay between parser runs when
// the last run errored or remediator watches need updating or there are
// management conflicts.
// TODO: replace with retry-backoff strategy
DefaultReconcilerRetryDelay = time.Second

// DefaultSyncStatusUpdateDelay is the time delay between status updates
// when the parser is not running. These updates report new management
// conflict errors from the remediator, if there are any.
DefaultSyncStatusUpdateDelay = 5 * time.Second

// DefaultReconcileTimeout defines the timeout of kpt applier reconcile/prune task
DefaultReconcileTimeout = 5 * time.Minute
Expand Down
Loading

0 comments on commit dac0ce9

Please sign in to comment.