Skip to content

Commit

Permalink
Improve applier and status synchronization
Browse files Browse the repository at this point in the history
- Use locks to protect the MultiError, Syncing, and Apply methods
  independently. This is a hack to improve thread-safety without
  changing the Applier interface. In the future, the interface
  should probably be rewritten to use channels instead.
- Centralize aggregation of sync errors in `updater.status()`
- Fix Syncing condition errors to include errors from all reconciler
  pipeline phases: rendering, source, & sync.
- Move the periodic sync status update from the reconciler into the
  parser. This removes the need for some synchronization that wasn't
  previously thread-safe.
- Remove several parser methods which are obsolete with the above
  changes.
- Rename applier.Interface to KptApplier to make way for the
  KptDestroyer interface later.
- Route the retry delay and status update delay up through the
  parser and reconciler. These are now hard-coded with the rest
  of the constants. They could be made into options later, but I
  didn't want to expose new options until we need them exposed.
- Rename -Frequency and -Period options to -Delay which more
  accurately reflects how they are used. Period is how long
  something takes. Frequency is how often something occurs.
  Delay is the time between occurances.
- Set ErrorSummary fields to nil when empty (no errors)

Change-Id: I1c3373f98bb5d842ce113ba61dc0c87e1119c665
  • Loading branch information
karlkfi committed Nov 9, 2022
1 parent 8d9c21b commit 3c50011
Show file tree
Hide file tree
Showing 25 changed files with 1,126 additions and 763 deletions.
37 changes: 18 additions & 19 deletions cmd/hydration-controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,17 @@ import (
"flag"
"os"
"strings"
"time"

"k8s.io/klog/v2"
"k8s.io/klog/v2/klogr"
"kpt.dev/configsync/pkg/api/configsync"
"kpt.dev/configsync/pkg/api/configsync/v1beta1"
"kpt.dev/configsync/pkg/hydrate"
"kpt.dev/configsync/pkg/importer/filesystem/cmpath"
"kpt.dev/configsync/pkg/kmetrics"
"kpt.dev/configsync/pkg/profiler"
"kpt.dev/configsync/pkg/reconcilermanager"
"kpt.dev/configsync/pkg/reconcilermanager/controllers"
"kpt.dev/configsync/pkg/util/log"
ctrl "sigs.k8s.io/controller-runtime"
)
Expand All @@ -55,15 +56,18 @@ var (
syncDir = flag.String("sync-dir", os.Getenv(reconcilermanager.SyncDirKey),
"Relative path of the root directory within the repo.")

hydrationPollingPeriodStr = flag.String("polling-period", os.Getenv(reconcilermanager.HydrationPollingPeriod),
// TODO: rename to `polling-delay` to match internal name.
pollingDelay = flag.Duration("polling-period",
controllers.PollingPeriod(reconcilermanager.HydrationPollingPeriod, configsync.DefaultHydrationPollingDelay),
"Period of time between checking the filesystem for rendering the DRY configs.")

// rehydratePeriod sets the hydration-controller to re-run the hydration process
// rehydrateDelay sets the hydration-controller to re-run the hydration process
// periodically when errors happen. It retries on both transient errors and permanent errors.
// Other ways to trigger the hydration process are:
// - push a new commit
// - delete the done file from the hydration-controller.
rehydratePeriod = flag.Duration("rehydrate-period", 30*time.Minute,
// TODO: rename to `rehydrate-delay` to match internal name.
rehydrateDelay = flag.Duration("rehydrate-period", configsync.DefaultHydrationRetryDelay,
"Period of time between rehydrating on errors.")

reconcilerName = flag.String("reconciler-name", os.Getenv(reconcilermanager.ReconcilerNameKey),
Expand Down Expand Up @@ -107,22 +111,17 @@ func main() {
dir := strings.TrimPrefix(*syncDir, "/")
relSyncDir := cmpath.RelativeOS(dir)

hydrationPollingPeriod, err := time.ParseDuration(*hydrationPollingPeriodStr)
if err != nil {
klog.Fatalf("Failed to get hydration polling period: %v", err)
}

hydrator := &hydrate.Hydrator{
DonePath: absDonePath,
SourceType: v1beta1.SourceType(*sourceType),
SourceRoot: absSourceRootDir,
HydratedRoot: absHydratedRootDir,
SourceLink: *sourceLinkDir,
HydratedLink: *hydratedLinkDir,
SyncDir: relSyncDir,
PollingFrequency: hydrationPollingPeriod,
RehydrateFrequency: *rehydratePeriod,
ReconcilerName: *reconcilerName,
DonePath: absDonePath,
SourceType: v1beta1.SourceType(*sourceType),
SourceRoot: absSourceRootDir,
HydratedRoot: absHydratedRootDir,
SourceLink: *sourceLinkDir,
HydratedLink: *hydratedLinkDir,
SyncDir: relSyncDir,
PollingDelay: *pollingDelay,
RehydrateDelay: *rehydrateDelay,
ReconcilerName: *reconcilerName,
}

hydrator.Run(context.Background())
Expand Down
10 changes: 6 additions & 4 deletions cmd/reconciler-manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,12 @@ var (
clusterName = flag.String("cluster-name", os.Getenv(reconcilermanager.ClusterNameKey),
"Cluster name to use for Cluster selection")

reconcilerPollingPeriod = flag.Duration("reconciler-polling-period", controllers.PollingPeriod(reconcilermanager.ReconcilerPollingPeriod, configsync.DefaultReconcilerPollingPeriod),
// TODO: rename to `reconciler-polling-delay` to match internal name.
reconcilerPollingDelay = flag.Duration("reconciler-polling-period", controllers.PollingPeriod(reconcilermanager.ReconcilerPollingPeriod, configsync.DefaultReconcilerPollingDelay),
"How often the reconciler should poll the filesystem for updates to the source or rendered configs.")

hydrationPollingPeriod = flag.Duration("hydration-polling-period", controllers.PollingPeriod(reconcilermanager.HydrationPollingPeriod, configsync.DefaultHydrationPollingPeriod),
// TODO: rename to `hydration-polling-delay` to match internal name.
hydrationPollingDelay = flag.Duration("hydration-polling-period", controllers.PollingPeriod(reconcilermanager.HydrationPollingPeriod, configsync.DefaultHydrationPollingDelay),
"How often the hydration-controller should poll the filesystem for rendering the DRY configs.")

setupLog = ctrl.Log.WithName("setup")
Expand Down Expand Up @@ -78,15 +80,15 @@ func main() {

watchFleetMembership := fleetMembershipCRDExists(mgr.GetConfig(), mgr.GetRESTMapper())

repoSync := controllers.NewRepoSyncReconciler(*clusterName, *reconcilerPollingPeriod, *hydrationPollingPeriod, mgr.GetClient(),
repoSync := controllers.NewRepoSyncReconciler(*clusterName, *reconcilerPollingDelay, *hydrationPollingDelay, mgr.GetClient(),
ctrl.Log.WithName("controllers").WithName(configsync.RepoSyncKind),
mgr.GetScheme())
if err := repoSync.SetupWithManager(mgr, watchFleetMembership); err != nil {
setupLog.Error(err, "unable to create controller", "controller", configsync.RepoSyncKind)
os.Exit(1)
}

rootSync := controllers.NewRootSyncReconciler(*clusterName, *reconcilerPollingPeriod, *hydrationPollingPeriod, mgr.GetClient(),
rootSync := controllers.NewRootSyncReconciler(*clusterName, *reconcilerPollingDelay, *hydrationPollingDelay, mgr.GetClient(),
ctrl.Log.WithName("controllers").WithName(configsync.RootSyncKind),
mgr.GetScheme())
if err := rootSync.SetupWithManager(mgr, watchFleetMembership); err != nil {
Expand Down
47 changes: 26 additions & 21 deletions cmd/reconciler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,14 @@ var (
fightDetectionThreshold = flag.Float64(
"fight-detection-threshold", 5.0,
"The rate of updates per minute to an API Resource at which the Syncer logs warnings about too many updates to the resource.")
// TODO: rename to `resync-delay` to match internal name
resyncPeriod = flag.Duration("resync-period", time.Hour,
"Period of time between forced re-syncs from source (even without a new commit).")
workers = flag.Int("workers", 1,
"Number of concurrent remediator workers to run at once.")
filesystemPollingPeriod = flag.Duration("filesystem-polling-period", controllers.PollingPeriod(reconcilermanager.ReconcilerPollingPeriod, configsync.DefaultReconcilerPollingPeriod),
// TODO: rename to `filesystem-polling-delay` to match internal name.
pollingDelay = flag.Duration("filesystem-polling-period",
controllers.PollingPeriod(reconcilermanager.ReconcilerPollingPeriod, configsync.DefaultReconcilerPollingDelay),
"Period of time between checking the filesystem for updates to the source or rendered configs.")

// Root-Repo-only flags. If set for a Namespace-scoped Reconciler, causes the Reconciler to fail immediately.
Expand Down Expand Up @@ -161,26 +164,28 @@ func main() {
}

opts := reconciler.Options{
ClusterName: *clusterName,
FightDetectionThreshold: *fightDetectionThreshold,
NumWorkers: *workers,
ReconcilerScope: declared.Scope(*scope),
ResyncPeriod: *resyncPeriod,
FilesystemPollingFrequency: *filesystemPollingPeriod,
SourceRoot: absSourceDir,
RepoRoot: absRepoRoot,
HydratedRoot: *hydratedRootDir,
HydratedLink: *hydratedLinkDir,
SourceRev: *sourceRev,
SourceBranch: *sourceBranch,
SourceType: v1beta1.SourceType(*sourceType),
SourceRepo: *sourceRepo,
SyncDir: relSyncDir,
SyncName: *syncName,
ReconcilerName: *reconcilerName,
StatusMode: *statusMode,
ReconcileTimeout: *reconcileTimeout,
APIServerTimeout: *apiServerTimeout,
ClusterName: *clusterName,
FightDetectionThreshold: *fightDetectionThreshold,
NumWorkers: *workers,
ReconcilerScope: declared.Scope(*scope),
PollingDelay: *pollingDelay,
ResyncDelay: *resyncPeriod,
RetryDelay: configsync.DefaultReconcilerRetryDelay,
StatusUpdateDelay: configsync.DefaultSyncStatusUpdateDelay,
SourceRoot: absSourceDir,
RepoRoot: absRepoRoot,
HydratedRoot: *hydratedRootDir,
HydratedLink: *hydratedLinkDir,
SourceRev: *sourceRev,
SourceBranch: *sourceBranch,
SourceType: v1beta1.SourceType(*sourceType),
SourceRepo: *sourceRepo,
SyncDir: relSyncDir,
SyncName: *syncName,
ReconcilerName: *reconcilerName,
StatusMode: *statusMode,
ReconcileTimeout: *reconcileTimeout,
APIServerTimeout: *apiServerTimeout,
}

if declared.Scope(*scope) == declared.RootReconciler {
Expand Down
5 changes: 1 addition & 4 deletions e2e/nomostest/wait_for_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,17 +244,14 @@ func (nt *NT) WaitForRootSyncRenderingError(rsName, code string, message string,
}

// WaitForRootSyncSyncError waits until the given error (code and message) is present on the RootSync resource
func (nt *NT) WaitForRootSyncSyncError(rsName, code string, message string, ignoreSyncingCondition bool, opts ...WaitOption) {
func (nt *NT) WaitForRootSyncSyncError(rsName, code string, message string, opts ...WaitOption) {
Wait(nt.T, fmt.Sprintf("RootSync %s rendering error code %s", rsName, code), nt.DefaultWaitTimeout,
func() error {
rs := fake.RootSyncObjectV1Beta1(rsName)
err := nt.Get(rs.GetName(), rs.GetNamespace(), rs)
if err != nil {
return err
}
if ignoreSyncingCondition {
return validateError(rs.Status.Sync.Errors, code, message)
}
syncingCondition := rootsync.GetCondition(rs.Status.Conditions, v1beta1.RootSyncSyncing)
return validateRootSyncError(rs.Status.Sync.Errors, syncingCondition, code, message, []v1beta1.ErrorSource{v1beta1.SyncError})
},
Expand Down
Loading

0 comments on commit 3c50011

Please sign in to comment.