Skip to content

Commit

Permalink
Merge pull request #3134 from embik/cleanup-informers-hook-context
Browse files Browse the repository at this point in the history
🐛 Inject `kcp-start-informers` logger into correct hook context
  • Loading branch information
kcp-ci-bot committed Jun 4, 2024
2 parents 3fb4c44 + a87cc78 commit 5299c30
Showing 1 changed file with 26 additions and 26 deletions.
52 changes: 26 additions & 26 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,32 +346,32 @@ func (s *Server) Run(ctx context.Context) error {
hookName := "kcp-start-informers"
if err := s.AddPostStartHook(hookName, func(hookContext genericapiserver.PostStartHookContext) error {
logger = logger.WithValues("postStartHook", hookName)
ctx = klog.NewContext(ctx, logger)
hookCtx := klog.NewContext(goContext(hookContext), logger)

logger.Info("starting kube informers")
s.KubeSharedInformerFactory.Start(hookContext.StopCh)
s.ApiExtensionsSharedInformerFactory.Start(hookContext.StopCh)
s.CacheKubeSharedInformerFactory.Start(hookContext.StopCh)
s.KubeSharedInformerFactory.Start(hookCtx.Done())
s.ApiExtensionsSharedInformerFactory.Start(hookCtx.Done())
s.CacheKubeSharedInformerFactory.Start(hookCtx.Done())

s.KubeSharedInformerFactory.WaitForCacheSync(hookContext.StopCh)
s.ApiExtensionsSharedInformerFactory.WaitForCacheSync(hookContext.StopCh)
s.CacheKubeSharedInformerFactory.WaitForCacheSync(hookContext.StopCh)
s.KubeSharedInformerFactory.WaitForCacheSync(hookCtx.Done())
s.ApiExtensionsSharedInformerFactory.WaitForCacheSync(hookCtx.Done())
s.CacheKubeSharedInformerFactory.WaitForCacheSync(hookCtx.Done())

select {
case <-hookContext.StopCh:
case <-hookCtx.Done():
return nil // context closed, avoid reporting success below
default:
}

logger.Info("finished starting kube informers")

logger.Info("bootstrapping system CRDs")
if err := wait.PollUntilContextCancel(goContext(hookContext), time.Second, true, func(ctx context.Context) (bool, error) {
if err := wait.PollUntilContextCancel(hookCtx, time.Second, true, func(ctx context.Context) (bool, error) {
if err := systemcrds.Bootstrap(ctx,
s.ApiExtensionsClusterClient.Cluster(SystemCRDClusterName.Path()),
s.ApiExtensionsClusterClient.Cluster(SystemCRDClusterName.Path()).Discovery(),
s.DynamicClusterClient.Cluster(SystemCRDClusterName.Path()),
sets.New[string](s.Options.Extra.BatteriesIncluded...),
sets.New(s.Options.Extra.BatteriesIncluded...),
); err != nil {
logger.Error(err, "failed to bootstrap system CRDs, retrying")
return false, nil // keep trying
Expand All @@ -384,11 +384,11 @@ func (s *Server) Run(ctx context.Context) error {
logger.Info("finished bootstrapping system CRDs")

logger.Info("bootstrapping the shard workspace")
if err := wait.PollUntilContextCancel(goContext(hookContext), time.Second, true, func(ctx context.Context) (bool, error) {
if err := wait.PollUntilContextCancel(hookCtx, time.Second, true, func(ctx context.Context) (bool, error) {
if err := configshard.Bootstrap(ctx,
s.ApiExtensionsClusterClient.Cluster(configshard.SystemShardCluster.Path()).Discovery(),
s.DynamicClusterClient.Cluster(configshard.SystemShardCluster.Path()),
sets.New[string](s.Options.Extra.BatteriesIncluded...),
sets.New(s.Options.Extra.BatteriesIncluded...),
s.KcpClusterClient.Cluster(configshard.SystemShardCluster.Path())); err != nil {
logger.Error(err, "failed to bootstrap the shard workspace")
return false, nil // keep trying
Expand All @@ -405,7 +405,7 @@ func (s *Server) Run(ctx context.Context) error {
go s.KcpSharedInformerFactory.Core().V1alpha1().LogicalClusters().Informer().Run(hookContext.StopCh)

logger.Info("starting APIExport, APIBinding and LogicalCluster informers")
if err := wait.PollUntilContextCancel(goContext(hookContext), time.Millisecond*100, true, func(ctx context.Context) (bool, error) {
if err := wait.PollUntilContextCancel(hookCtx, time.Millisecond*100, true, func(ctx context.Context) (bool, error) {
exportsSynced := s.KcpSharedInformerFactory.Apis().V1alpha1().APIExports().Informer().HasSynced()
cacheExportsSynced := s.KcpSharedInformerFactory.Apis().V1alpha1().APIExports().Informer().HasSynced()
logicalClusterSynced := s.KcpSharedInformerFactory.Core().V1alpha1().LogicalClusters().Informer().HasSynced()
Expand All @@ -421,19 +421,19 @@ func (s *Server) Run(ctx context.Context) error {
s.RootShardKcpClusterClient = s.KcpClusterClient

// bootstrap root workspace phase 0 only if we are on the root shard, no APIBinding resources yet
if err := configrootphase0.Bootstrap(goContext(hookContext),
if err := configrootphase0.Bootstrap(hookCtx,
s.KcpClusterClient.Cluster(core.RootCluster.Path()),
s.ApiExtensionsClusterClient.Cluster(core.RootCluster.Path()).Discovery(),
s.DynamicClusterClient.Cluster(core.RootCluster.Path()),
sets.New[string](s.Options.Extra.BatteriesIncluded...),
sets.New(s.Options.Extra.BatteriesIncluded...),
); err != nil {
logger.Error(err, "failed to bootstrap root workspace phase 0")
return nil // don't klog.Fatal. This only happens when context is cancelled.
}
logger.Info("bootstrapped root workspace phase 0")

logger.Info("getting kcp APIExport identities")
if err := wait.PollUntilContextCancel(goContext(hookContext), time.Millisecond*500, true, func(ctx context.Context) (bool, error) {
if err := wait.PollUntilContextCancel(hookCtx, time.Millisecond*500, true, func(ctx context.Context) (bool, error) {
if err := s.resolveIdentities(ctx); err != nil {
logger.V(3).Info("failed to resolve identities, keeping trying", "err", err)
return false, nil
Expand All @@ -459,11 +459,11 @@ func (s *Server) Run(ctx context.Context) error {
logger.Info("finished getting kcp APIExport identities for the root shard")
}

s.KcpSharedInformerFactory.Start(hookContext.StopCh)
s.CacheKcpSharedInformerFactory.Start(hookContext.StopCh)
s.KcpSharedInformerFactory.Start(hookCtx.Done())
s.CacheKcpSharedInformerFactory.Start(hookCtx.Done())

s.KcpSharedInformerFactory.WaitForCacheSync(hookContext.StopCh)
s.CacheKcpSharedInformerFactory.WaitForCacheSync(hookContext.StopCh)
s.KcpSharedInformerFactory.WaitForCacheSync(hookCtx.Done())
s.CacheKcpSharedInformerFactory.WaitForCacheSync(hookCtx.Done())

// create or update shard
shard := &corev1alpha1.Shard{
Expand All @@ -481,7 +481,7 @@ func (s *Server) Run(ctx context.Context) error {
},
}
logger.Info("Creating or updating Shard", "shard", s.Options.Extra.ShardName)
if err := wait.PollUntilContextCancel(goContext(hookContext), time.Second, true, func(ctx context.Context) (bool, error) {
if err := wait.PollUntilContextCancel(hookCtx, time.Second, true, func(ctx context.Context) (bool, error) {
existingShard, err := s.RootShardKcpClusterClient.Cluster(core.RootCluster.Path()).CoreV1alpha1().Shards().Get(ctx, shard.Name, metav1.GetOptions{})
if err != nil && !errors.IsNotFound(err) {
logger.Error(err, "failed getting Shard from the root workspace")
Expand All @@ -497,7 +497,7 @@ func (s *Server) Run(ctx context.Context) error {
existingShard.Spec.BaseURL = shard.Spec.BaseURL
existingShard.Spec.ExternalURL = shard.Spec.ExternalURL
existingShard.Spec.VirtualWorkspaceURL = shard.Spec.VirtualWorkspaceURL
if _, err := s.RootShardKcpClusterClient.Cluster(core.RootCluster.Path()).CoreV1alpha1().Shards().Update(ctx, existingShard, metav1.UpdateOptions{}); err != nil {
if _, err := s.RootShardKcpClusterClient.Cluster(core.RootCluster.Path()).CoreV1alpha1().Shards().Update(hookCtx, existingShard, metav1.UpdateOptions{}); err != nil {
logger.Error(err, "failed updating Shard in the root workspace")
return false, nil
}
Expand All @@ -509,15 +509,15 @@ func (s *Server) Run(ctx context.Context) error {
}

select {
case <-hookContext.StopCh:
case <-hookCtx.Done():
return nil // context closed, avoid reporting success below
default:
}

logger.Info("finished starting (remaining) kcp informers")

logger.Info("starting dynamic metadata informer worker")
go s.DiscoveringDynamicSharedInformerFactory.StartWorker(goContext(hookContext))
go s.DiscoveringDynamicSharedInformerFactory.StartWorker(hookCtx)

logger.Info("synced all informers, ready to start controllers")
close(s.syncedCh)
Expand All @@ -526,11 +526,11 @@ func (s *Server) Run(ctx context.Context) error {
// the root ws is only present on the root shard
logger.Info("starting bootstrapping root workspace phase 1")
if err := configroot.Bootstrap(
goContext(hookContext),
hookCtx,
s.BootstrapApiExtensionsClusterClient.Cluster(core.RootCluster.Path()).Discovery(),
s.BootstrapDynamicClusterClient.Cluster(core.RootCluster.Path()),
s.Options.HomeWorkspaces.HomeCreatorGroups,
sets.New[string](s.Options.Extra.BatteriesIncluded...),
sets.New(s.Options.Extra.BatteriesIncluded...),
); err != nil {
logger.Error(err, "failed to bootstrap root workspace phase 1")
return nil // don't klog.Fatal. This only happens when context is cancelled.
Expand Down

0 comments on commit 5299c30

Please sign in to comment.