Skip to content

Commit

Permalink
Merge pull request #67 from topolvm/fix-map-growing
Browse files Browse the repository at this point in the history
Fix map growing
  • Loading branch information
cupnes committed May 18, 2023
2 parents 75a4967 + 96b253f commit ed3563e
Show file tree
Hide file tree
Showing 7 changed files with 55 additions and 11 deletions.
2 changes: 1 addition & 1 deletion charts/pie/templates/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ rules:
- update
- watch
- apiGroups:
- batch/v1
- batch
resources:
- jobs
verbs:
Expand Down
2 changes: 1 addition & 1 deletion config/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ rules:
- update
- watch
- apiGroups:
- batch/v1
- batch
resources:
- jobs
verbs:
Expand Down
1 change: 1 addition & 0 deletions constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package constants
const (
ProbeNamePrefix = "pie-probe"
ProbeContainerName = "probe"
PodFinalizerName = "pie.topolvm.io/pod"
NodeFinalizerName = "pie.topolvm.io/node"
StorageClassFinalizerName = "pie.topolvm.io/storage-class"

Expand Down
12 changes: 12 additions & 0 deletions controllers/node_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,16 @@ func makeCronSchedule(storageClass, nodeName string, period int) string {
return fmt.Sprintf("%d-59/%d * * * *", h.Sum32()%uint32(period), period)
}

func addPodFinalizer(spec *corev1.PodTemplateSpec) {
finalizers := spec.GetFinalizers()
for _, finalizer := range finalizers {
if finalizer == constants.PodFinalizerName {
return
}
}
spec.SetFinalizers(append(finalizers, constants.PodFinalizerName))
}

func (r *NodeReconciler) createOrUpdateJob(ctx context.Context, storageClass, nodeName string) error {
nodeCtrlLogger.Info("createOrUpdateJob")
defer nodeCtrlLogger.Info("createOrUpdateJob Finished")
Expand All @@ -212,6 +222,8 @@ func (r *NodeReconciler) createOrUpdateJob(ctx context.Context, storageClass, no

cronjob.Spec.JobTemplate.Spec.Template.SetLabels(label)

addPodFinalizer(&cronjob.Spec.JobTemplate.Spec.Template)

if len(cronjob.Spec.JobTemplate.Spec.Template.Spec.Containers) != 1 {
cronjob.Spec.JobTemplate.Spec.Template.Spec.Containers = []corev1.Container{{}}
}
Expand Down
15 changes: 14 additions & 1 deletion controllers/pod_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/log"
)

Expand Down Expand Up @@ -68,12 +69,15 @@ func (r *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R
err := r.client.Get(ctx, client.ObjectKey{Namespace: req.Namespace, Name: req.Name}, &pod)
if err != nil {
if apierrors.IsNotFound(err) {
r.po.deleteEventTime(pod.Name)
return ctrl.Result{}, nil
}
return ctrl.Result{}, err
}

if !controllerutil.ContainsFinalizer(&pod, constants.PodFinalizerName) {
return ctrl.Result{}, nil
}

r.po.setPodRegisteredTime(pod.Name, pod.CreationTimestamp.Time)

for _, status := range pod.Status.ContainerStatuses {
Expand All @@ -87,6 +91,15 @@ func (r *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R
}
}

if !pod.DeletionTimestamp.IsZero() {
r.po.deleteEventTime(pod.Name)
controllerutil.RemoveFinalizer(&pod, constants.PodFinalizerName)
err := r.client.Update(ctx, &pod)
if err != nil {
return ctrl.Result{}, err
}
}

return ctrl.Result{}, nil
}

Expand Down
22 changes: 19 additions & 3 deletions controllers/provision_observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@ type provisionObserver struct {
podRegisteredTime map[string]time.Time
podStartedTime map[string]time.Time
countedFlag map[string]struct{}
mu sync.Mutex
// mu protects above maps
mu sync.Mutex
makeCheckCh chan struct{}
checkDoneCh chan struct{}
}

func newProvisionObserver(
Expand All @@ -45,6 +48,8 @@ func newProvisionObserver(
podRegisteredTime: make(map[string]time.Time),
podStartedTime: make(map[string]time.Time),
countedFlag: make(map[string]struct{}),
makeCheckCh: make(chan struct{}),
checkDoneCh: make(chan struct{}),
}
}

Expand All @@ -63,6 +68,9 @@ func (p *provisionObserver) setPodStartedTime(podName string, eventTime time.Tim
}

func (p *provisionObserver) deleteEventTime(podName string) {
p.makeCheckCh <- struct{}{}
<-p.checkDoneCh

p.mu.Lock()
defer p.mu.Unlock()

Expand Down Expand Up @@ -108,7 +116,8 @@ func (p *provisionObserver) deleteOwnerJobOfPod(ctx context.Context, podName str
return err
}

err = p.client.Delete(ctx, &job)
policy := metav1.DeletePropagationBackground
err = p.client.Delete(ctx, &job, &client.DeleteOptions{PropagationPolicy: &policy})
if err != nil {
if apierrors.IsNotFound(err) {
continue
Expand Down Expand Up @@ -162,19 +171,26 @@ func (p *provisionObserver) check(ctx context.Context) {
}
}

//+kubebuilder:rbac:namespace=default,groups=batch/v1,resources=jobs,verbs=get;list;watch;delete
//+kubebuilder:rbac:namespace=default,groups=batch,resources=jobs,verbs=get;list;watch;delete

func (p *provisionObserver) Start(ctx context.Context) error {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()

for {
needNotify := false
select {
case <-p.makeCheckCh:
needNotify = true
case <-ticker.C:
case <-ctx.Done():
return nil
}

p.check(ctx)

if needNotify {
p.checkDoneCh <- struct{}{}
}
}
}
12 changes: 7 additions & 5 deletions e2e/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,15 +158,17 @@ var _ = Describe("pie", func() {
}

By("checking pie_create_probe_total have on_time=true for standard SC or on_time=false for dummy SC")
Expect("pie_create_probe_total").Should(BeKeyOf(metricFamilies))
for _, metric := range metricFamilies["pie_create_probe_total"].Metric {
Expect(metric.Label).Should(Or(ContainElement(&standardSCLabelPair), ContainElement(&dummySCLabelPair)))
g.Expect("pie_create_probe_total").Should(BeKeyOf(metricFamilies))
metrics := metricFamilies["pie_create_probe_total"].Metric
g.Expect(metrics).Should(ContainElement(HaveField("Label", ContainElement(&standardSCLabelPair))))
g.Expect(metrics).Should(ContainElement(HaveField("Label", ContainElement(&dummySCLabelPair))))
for _, metric := range metrics {
for _, label := range metric.Label {
switch {
case reflect.DeepEqual(label, &standardSCLabelPair):
Expect(metric.Label).Should(ContainElement(&onTimeTrueLabelPair))
g.Expect(metric.Label).Should(ContainElement(&onTimeTrueLabelPair))
case reflect.DeepEqual(label, &dummySCLabelPair):
Expect(metric.Label).Should(ContainElement(&onTimeFalseLabelPair))
g.Expect(metric.Label).Should(ContainElement(&onTimeFalseLabelPair))
}
}
}
Expand Down

0 comments on commit ed3563e

Please sign in to comment.