Skip to content

Commit

Permalink
support structured logging for pkg/controller
Browse files Browse the repository at this point in the history
Signed-off-by: jairui <jairuigou@gmail.com>
  • Loading branch information
jairuigou committed Jun 14, 2024
1 parent 0313790 commit f123e71
Show file tree
Hide file tree
Showing 76 changed files with 651 additions and 682 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func (r *ReconcileAdvancedCronJob) reconcileBroadcastJob(ctx context.Context, re

var childJobs appsv1alpha1.BroadcastJobList
if err := r.List(ctx, &childJobs, client.InNamespace(advancedCronJob.Namespace), client.MatchingFields{jobOwnerKey: advancedCronJob.Name}); err != nil {
klog.Error(err, "unable to list child Jobs", req.NamespacedName)
klog.ErrorS(err, "Unable to list child Jobs", "advancedCronJob", req)
return ctrl.Result{}, err
}

Expand Down Expand Up @@ -98,7 +98,7 @@ func (r *ReconcileAdvancedCronJob) reconcileBroadcastJob(ctx context.Context, re
// the active jobs themselves.
scheduledTimeForJob, err := getScheduledTimeForJob(&job)
if err != nil {
klog.Error(err, "unable to parse schedule time for child broadcastjob ", job.Name, req.NamespacedName)
klog.ErrorS(err, "Unable to parse schedule time for child BroadcastJob", "broadcastJob", klog.KObj(&job), "advancedCronJob", req)
continue
}
if scheduledTimeForJob != nil {
Expand All @@ -120,15 +120,15 @@ func (r *ReconcileAdvancedCronJob) reconcileBroadcastJob(ctx context.Context, re
for _, activeJob := range activeJobs {
jobRef, err := ref.GetReference(r.scheme, activeJob)
if err != nil {
klog.Error(err, "unable to make reference to active broadcastjob ", " job ", activeJob, req.NamespacedName)
klog.ErrorS(err, "Unable to make reference to active BroadcastJob", "broadcastJob", klog.KObj(activeJob), "advancedCronJob", req)
continue
}
advancedCronJob.Status.Active = append(advancedCronJob.Status.Active, *jobRef)
}

klog.V(1).Info("advancedCronJob count ", " active advancedCronJob ", len(activeJobs), " successful advancedCronJob ", len(successfulJobs), " failed advancedCronJob ", len(failedJobs), req.NamespacedName)
klog.V(1).InfoS("AdvancedCronJob count", "activeJobCount", len(activeJobs), "successfulJobCount", len(successfulJobs), "failedJobCount", len(failedJobs), "advancedCronJob", req)
if err := r.updateAdvancedJobStatus(req, &advancedCronJob); err != nil {
klog.Error(err, "unable to update AdvancedCronJob status", req.NamespacedName)
klog.ErrorS(err, "Unable to update AdvancedCronJob status", "advancedCronJob", req)
return ctrl.Result{}, err
}

Expand All @@ -154,9 +154,9 @@ func (r *ReconcileAdvancedCronJob) reconcileBroadcastJob(ctx context.Context, re
break
}
if err := r.Delete(ctx, job, client.PropagationPolicy(metav1.DeletePropagationBackground)); client.IgnoreNotFound(err) != nil {
klog.Error(err, "unable to delete old failed broadcastjob", "job", job, req.NamespacedName)
klog.ErrorS(err, "Unable to delete old failed BroadcastJob", "oldFailedBroadcastJob", klog.KObj(job), "advancedCronJob", req)
} else {
klog.V(0).Info("deleted old failed broadcastjob", "job", job, req.NamespacedName)
klog.InfoS("Deleted old failed BroadcastJob", "oldFailedBroadcastJob", klog.KObj(job), "advancedCronJob", req)
}
}
}
Expand All @@ -173,9 +173,9 @@ func (r *ReconcileAdvancedCronJob) reconcileBroadcastJob(ctx context.Context, re
break
}
if err := r.Delete(ctx, job, client.PropagationPolicy(metav1.DeletePropagationBackground)); (err) != nil {
klog.Error(err, "unable to delete old successful broadcastjob ", job.Name, req.NamespacedName)
klog.ErrorS(err, "Unable to delete old successful BroadcastJob", "oldSuccessfulBroadcastJob", klog.KObj(job), "advancedCronJob", req)
} else {
klog.V(0).Info("deleted old successful broadcastjob ", job.Name, req.NamespacedName)
klog.InfoS("Deleted old successful BroadcastJob", "oldSuccessfulBroadcastJob", klog.KObj(job), "advancedCronJob", req)
}
}
}
Expand All @@ -187,7 +187,7 @@ func (r *ReconcileAdvancedCronJob) reconcileBroadcastJob(ctx context.Context, re
*/

if advancedCronJob.Spec.Paused != nil && *advancedCronJob.Spec.Paused {
klog.V(1).Info("advancedCronJob paused, skipping", req.NamespacedName)
klog.V(1).InfoS("AdvancedCronJob paused, skipping", "advancedCronJob", req)
return ctrl.Result{}, nil
}

Expand Down Expand Up @@ -265,7 +265,7 @@ func (r *ReconcileAdvancedCronJob) reconcileBroadcastJob(ctx context.Context, re
now := realClock{}.Now()
missedRun, nextRun, err := getNextSchedule(&advancedCronJob, now)
if err != nil {
klog.Error(err, "unable to figure out CronJob schedule", req.NamespacedName)
klog.ErrorS(err, "Unable to figure out CronJob schedule", "advancedCronJob", req)
// we don't really care about requeuing until we get an update that
// fixes the schedule, so don't return an error
return ctrl.Result{}, nil
Expand All @@ -282,7 +282,7 @@ func (r *ReconcileAdvancedCronJob) reconcileBroadcastJob(ctx context.Context, re
If we've missed a run, and we're still within the deadline to start it, we'll need to run a job.
*/
if missedRun.IsZero() {
klog.V(1).Info("no upcoming scheduled times, sleeping until next now ", now, " and next run ", nextRun, req.NamespacedName)
klog.V(1).InfoS("No upcoming scheduled times, sleeping until next run", "now", now, "nextRun", nextRun, "advancedCronJob", req)
return scheduledResult, nil
}

Expand All @@ -292,7 +292,7 @@ func (r *ReconcileAdvancedCronJob) reconcileBroadcastJob(ctx context.Context, re
tooLate = missedRun.Add(time.Duration(*advancedCronJob.Spec.StartingDeadlineSeconds) * time.Second).Before(now)
}
if tooLate {
klog.V(1).Info("missed starting deadline for last run, sleeping till next", "current run", missedRun, req.NamespacedName)
klog.V(1).InfoS("Missed starting deadline for last run, sleeping till next run", "missedRun", missedRun, "advancedCronJob", req)
return scheduledResult, nil
}

Expand All @@ -304,7 +304,7 @@ func (r *ReconcileAdvancedCronJob) reconcileBroadcastJob(ctx context.Context, re
// figure out how to run this job -- concurrency policy might forbid us from running
// multiple at the same time...
if advancedCronJob.Spec.ConcurrencyPolicy == appsv1alpha1.ForbidConcurrent && len(activeJobs) > 0 {
klog.V(1).Info("concurrency policy blocks concurrent runs, skipping", "num active", len(activeJobs), req.NamespacedName)
klog.V(1).InfoS("Concurrency policy blocks concurrent runs, skipping", "activeBroadcastJobCount", len(activeJobs), "advancedCronJob", req)
return scheduledResult, nil
}

Expand All @@ -313,7 +313,7 @@ func (r *ReconcileAdvancedCronJob) reconcileBroadcastJob(ctx context.Context, re
for _, activeJob := range activeJobs {
// we don't care if the job was already deleted
if err := r.Delete(ctx, activeJob, client.PropagationPolicy(metav1.DeletePropagationBackground)); client.IgnoreNotFound(err) != nil {
klog.Error(err, "unable to delete active broadcastjob", "job", activeJob, req.NamespacedName)
klog.ErrorS(err, "Unable to delete active broadcastjob", "broadcastJob", klog.KObj(activeJob), "advancedCronJob", req)
return ctrl.Result{}, err
}
}
Expand Down Expand Up @@ -360,18 +360,18 @@ func (r *ReconcileAdvancedCronJob) reconcileBroadcastJob(ctx context.Context, re
// actually make the job...
job, err := constructBrJobForCronJob(&advancedCronJob, missedRun)
if err != nil {
klog.Error(err, "unable to construct broadcastjob from template", req.NamespacedName)
klog.ErrorS(err, "Unable to construct broadcastjob from template", "advancedCronJob", req)
// don't bother requeuing until we get a change to the spec
return scheduledResult, nil
}

// ...and create it on the cluster
if err := r.Create(ctx, job); err != nil {
klog.Error(err, "unable to create BroadcastJob for CronJob", "job", job, req.NamespacedName)
klog.ErrorS(err, "Unable to create BroadcastJob for CronJob", "broadcastJob", klog.KObj(job), "advancedCronJob", req)
return ctrl.Result{}, err
}

klog.V(1).Info("created BroadcastJob for CronJob run", "job", job, req.NamespacedName)
klog.V(1).InfoS("Created BroadcastJob for CronJob run", "broadcastJob", klog.KObj(job), "advancedCronJob", req)

/*
### 7: Requeue when we either see a running job or it's time for the next scheduled run
Expand Down
18 changes: 9 additions & 9 deletions pkg/controller/advancedcronjob/advancedcronjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,28 +76,28 @@ func newReconciler(mgr manager.Manager) reconcile.Reconciler {
// add adds a new Controller to mgr with r as the reconcile.Reconciler
func add(mgr manager.Manager, r reconcile.Reconciler) error {
// Create a new controller
klog.Info("Starting AdvancedCronJob Controller")
klog.InfoS("Starting AdvancedCronJob Controller")
c, err := controller.New("advancedcronjob-controller", mgr, controller.Options{Reconciler: r,
MaxConcurrentReconciles: concurrentReconciles, CacheSyncTimeout: util.GetControllerCacheSyncTimeout()})
if err != nil {
klog.Error(err)
klog.ErrorS(err, "Failed to create AdvandedCronJob controller")
return err
}

// Watch for changes to AdvancedCronJob
src := source.Kind(mgr.GetCache(), &appsv1alpha1.AdvancedCronJob{})
if err = c.Watch(src, &handler.EnqueueRequestForObject{}); err != nil {
klog.Error(err)
klog.ErrorS(err, "Failed to watch AdvancedCronJob")
return err
}

if err = watchJob(mgr, c); err != nil {
klog.Error(err)
klog.ErrorS(err, "Failed to watch Job")
return err
}

if err = watchBroadcastJob(mgr, c); err != nil {
klog.Error(err)
klog.ErrorS(err, "Failed to watch BroadcastJob")
return err
}
return nil
Expand Down Expand Up @@ -133,7 +133,7 @@ type ReconcileAdvancedCronJob struct {

func (r *ReconcileAdvancedCronJob) Reconcile(_ context.Context, req ctrl.Request) (ctrl.Result, error) {
ctx := context.Background()
klog.Info("Running AdvancedCronJob job ", req.NamespacedName)
klog.InfoS("Running AdvancedCronJob job", "advancedCronJob", req)

namespacedName := types.NamespacedName{
Namespace: req.Namespace,
Expand All @@ -143,7 +143,7 @@ func (r *ReconcileAdvancedCronJob) Reconcile(_ context.Context, req ctrl.Request
var advancedCronJob appsv1alpha1.AdvancedCronJob

if err := r.Get(ctx, namespacedName, &advancedCronJob); err != nil {
klog.Error(err, "unable to fetch CronJob", req.NamespacedName)
klog.ErrorS(err, "Unable to fetch CronJob", "advancedCronJob", req)
// we'll ignore not-found errors, since they can't be fixed by an immediate
// requeue (we'll need to wait for a new notification), and we can get them
// on deleted requests.
Expand All @@ -156,7 +156,7 @@ func (r *ReconcileAdvancedCronJob) Reconcile(_ context.Context, req ctrl.Request
case appsv1alpha1.BroadcastJobTemplate:
return r.reconcileBroadcastJob(ctx, req, advancedCronJob)
default:
klog.Info("No template found", req.NamespacedName)
klog.InfoS("No template found", "advancedCronJob", req)
}

return ctrl.Result{}, nil
Expand All @@ -169,7 +169,7 @@ func (r *ReconcileAdvancedCronJob) SetupWithManager(mgr ctrl.Manager) error {
}

func (r *ReconcileAdvancedCronJob) updateAdvancedJobStatus(request reconcile.Request, advancedCronJob *appsv1alpha1.AdvancedCronJob) error {
klog.V(1).Info(fmt.Sprintf("Updating job %s status %#v", advancedCronJob.Name, advancedCronJob.Status))
klog.V(1).InfoS("Updating job status", "advancedCronJob", klog.KObj(advancedCronJob), "status", advancedCronJob.Status)
advancedCronJobCopy := advancedCronJob.DeepCopy()
return retry.RetryOnConflict(retry.DefaultRetry, func() error {
err := r.Status().Update(context.TODO(), advancedCronJobCopy)
Expand Down
36 changes: 18 additions & 18 deletions pkg/controller/advancedcronjob/advancedcronjob_job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (r *ReconcileAdvancedCronJob) reconcileJob(ctx context.Context, req ctrl.Re
advancedCronJob.Status.Type = appsv1alpha1.JobTemplate
var childJobs batchv1.JobList
if err := r.List(ctx, &childJobs, client.InNamespace(advancedCronJob.Namespace), client.MatchingFields{jobOwnerKey: advancedCronJob.Name}); err != nil {
klog.Error(err, "unable to list child Jobs", req.NamespacedName)
klog.ErrorS(err, "Unable to list child Jobs", "advancedCronJob", req)
return ctrl.Result{}, err
}

Expand Down Expand Up @@ -101,7 +101,7 @@ func (r *ReconcileAdvancedCronJob) reconcileJob(ctx context.Context, req ctrl.Re
// the active jobs themselves.
scheduledTimeForJob, err := getScheduledTimeForJob(&job)
if err != nil {
klog.Error(err, "unable to parse schedule time for child job", "job", &job, req.NamespacedName)
klog.ErrorS(err, "Unable to parse schedule time for child job", "job", klog.KObj(&job), "advancedCronJob", req)
continue
}
if scheduledTimeForJob != nil {
Expand All @@ -123,15 +123,15 @@ func (r *ReconcileAdvancedCronJob) reconcileJob(ctx context.Context, req ctrl.Re
for _, activeJob := range activeJobs {
jobRef, err := ref.GetReference(r.scheme, activeJob)
if err != nil {
klog.Error(err, "unable to make reference to active job", "job", activeJob, req.NamespacedName)
klog.ErrorS(err, "Unable to make reference to active job", "job", klog.KObj(activeJob), "advancedCronJob", req)
continue
}
advancedCronJob.Status.Active = append(advancedCronJob.Status.Active, *jobRef)
}

klog.V(1).Info("job count ", " active jobs ", len(activeJobs), " successful jobs ", len(successfulJobs), " failed jobs ", len(failedJobs), req.NamespacedName)
klog.V(1).InfoS("Job count", "activeJobCount", len(activeJobs), "successfulJobCount", len(successfulJobs), "failedJobCount", len(failedJobs), "advancedCronJob", req)
if err := r.updateAdvancedJobStatus(req, &advancedCronJob); err != nil {
klog.Error(err, "unable to update AdvancedCronJob status", req.NamespacedName)
klog.ErrorS(err, "Unable to update AdvancedCronJob status", "advancedCronJob", req)
return ctrl.Result{}, err
}

Expand All @@ -157,9 +157,9 @@ func (r *ReconcileAdvancedCronJob) reconcileJob(ctx context.Context, req ctrl.Re
break
}
if err := r.Delete(ctx, job, client.PropagationPolicy(metav1.DeletePropagationBackground)); client.IgnoreNotFound(err) != nil {
klog.Error(err, "unable to delete old failed job ", job.Name, req.NamespacedName)
klog.ErrorS(err, "Unable to delete old failed job", "job", klog.KObj(job), "advancedCronJob", req)
} else {
klog.V(0).Info("deleted old failed job ", job.Name, req.NamespacedName)
klog.InfoS("Deleted old failed job", "job", klog.KObj(job), "advancedCronJob", req)
}
}
}
Expand All @@ -176,9 +176,9 @@ func (r *ReconcileAdvancedCronJob) reconcileJob(ctx context.Context, req ctrl.Re
break
}
if err := r.Delete(ctx, job, client.PropagationPolicy(metav1.DeletePropagationBackground)); (err) != nil {
klog.Error(err, "unable to delete old successful job ", job.Name, req.NamespacedName)
klog.ErrorS(err, "Unable to delete old successful job", "job", klog.KObj(job), "advancedCronJob", req)
} else {
klog.V(0).Info("deleted old successful job ", job.Name, req.NamespacedName)
klog.InfoS("Deleted old successful job", "job", klog.KObj(job), "advancedCronJob", req)
}
}
}
Expand All @@ -190,7 +190,7 @@ func (r *ReconcileAdvancedCronJob) reconcileJob(ctx context.Context, req ctrl.Re
*/

if advancedCronJob.Spec.Paused != nil && *advancedCronJob.Spec.Paused {
klog.V(1).Info("cronjob paused, skipping", req.NamespacedName)
klog.V(1).InfoS("CronJob paused, skipping", "advancedCronJob", req)
return ctrl.Result{}, nil
}

Expand Down Expand Up @@ -268,7 +268,7 @@ func (r *ReconcileAdvancedCronJob) reconcileJob(ctx context.Context, req ctrl.Re
now := realClock{}.Now()
missedRun, nextRun, err := getNextSchedule(&advancedCronJob, now)
if err != nil {
klog.Error(err, "unable to figure out CronJob schedule", req.NamespacedName)
klog.ErrorS(err, "Unable to figure out CronJob schedule", "advancedCronJob", req)
// we don't really care about requeuing until we get an update that
// fixes the schedule, so don't return an error
return ctrl.Result{}, nil
Expand All @@ -285,7 +285,7 @@ func (r *ReconcileAdvancedCronJob) reconcileJob(ctx context.Context, req ctrl.Re
If we've missed a run, and we're still within the deadline to start it, we'll need to run a job.
*/
if missedRun.IsZero() {
klog.V(1).Info("no upcoming scheduled times, sleeping until next now ", now, " and next run ", nextRun, req.NamespacedName)
klog.V(1).InfoS("No upcoming scheduled times, sleeping until next run", "now", now, "nextRun", nextRun, "advancedCronJob", req)
return scheduledResult, nil
}

Expand All @@ -295,7 +295,7 @@ func (r *ReconcileAdvancedCronJob) reconcileJob(ctx context.Context, req ctrl.Re
tooLate = missedRun.Add(time.Duration(*advancedCronJob.Spec.StartingDeadlineSeconds) * time.Second).Before(now)
}
if tooLate {
klog.V(1).Info("missed starting deadline for last run, sleeping till next", "current run", missedRun, req.NamespacedName)
klog.V(1).InfoS("Missed starting deadline for last run, sleeping till next run", "missedRun", missedRun, "advancedCronJob", req)
return scheduledResult, nil
}

Expand All @@ -307,7 +307,7 @@ func (r *ReconcileAdvancedCronJob) reconcileJob(ctx context.Context, req ctrl.Re
// figure out how to run this job -- concurrency policy might forbid us from running
// multiple at the same time...
if advancedCronJob.Spec.ConcurrencyPolicy == appsv1alpha1.ForbidConcurrent && len(activeJobs) > 0 {
klog.V(1).Info("concurrency policy blocks concurrent runs, skipping", "num active", len(activeJobs), req.NamespacedName)
klog.V(1).InfoS("Concurrency policy blocks concurrent runs, skipping", "activeJobCount", len(activeJobs), "advancedCronJob", req)
return scheduledResult, nil
}

Expand All @@ -316,7 +316,7 @@ func (r *ReconcileAdvancedCronJob) reconcileJob(ctx context.Context, req ctrl.Re
for _, activeJob := range activeJobs {
// we don't care if the job was already deleted
if err := r.Delete(ctx, activeJob, client.PropagationPolicy(metav1.DeletePropagationBackground)); client.IgnoreNotFound(err) != nil {
klog.Error(err, "unable to delete active job", "job", activeJob, req.NamespacedName)
klog.ErrorS(err, "Unable to delete active job", "job", klog.KObj(activeJob), "advancedCronJob", req)
return ctrl.Result{}, err
}
}
Expand Down Expand Up @@ -363,18 +363,18 @@ func (r *ReconcileAdvancedCronJob) reconcileJob(ctx context.Context, req ctrl.Re
// actually make the job...
job, err := constructJobForCronJob(&advancedCronJob, missedRun)
if err != nil {
klog.Error(err, "unable to construct job from template", req.NamespacedName)
klog.ErrorS(err, "Unable to construct job from template", "advancedCronJob", req)
// don't bother requeuing until we get a change to the spec
return scheduledResult, nil
}

// ...and create it on the cluster
if err := r.Create(ctx, job); err != nil {
klog.Error(err, "unable to create Job for AdvancedCronJob", "job", job, req.NamespacedName)
klog.ErrorS(err, "Unable to create Job for AdvancedCronJob", "job", klog.KObj(job), "advancedCronJob", req)
return ctrl.Result{}, err
}

klog.V(1).Info("created Job for AdvancedCronJob run", "job", job, req.NamespacedName)
klog.V(1).InfoS("Created Job for AdvancedCronJob run", "job", klog.KObj(job), "advancedCronJob", req)

/*
### 7: Requeue when we either see a running job or it's time for the next scheduled run
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/advancedcronjob/advancedcronjob_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func formatSchedule(acj *appsv1alpha1.AdvancedCronJob) string {
}
if acj.Spec.TimeZone != nil {
if _, err := time.LoadLocation(*acj.Spec.TimeZone); err != nil {
klog.Errorf("Failed to load location %s for %s/%s: %v", *acj.Spec.TimeZone, acj.Namespace, acj.Name, err)
klog.ErrorS(err, "Failed to load location for advancedCronJob", "location", *acj.Spec.TimeZone, "advancedCronJob", klog.KObj(acj))
return acj.Spec.Schedule
}
return fmt.Sprintf("TZ=%s %s", *acj.Spec.TimeZone, acj.Spec.Schedule)
Expand Down
Loading

0 comments on commit f123e71

Please sign in to comment.