Skip to content
This repository has been archived by the owner on Jun 4, 2021. It is now read-only.

diff the dispatcher.Spec and update as necessary #913

Merged
merged 1 commit into from
Feb 12, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 27 additions & 13 deletions kafka/channel/pkg/reconciler/controller/kafkachannel.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,15 @@ const (
finalizerName = controllerAgentName

// Name of the corev1.Events emitted from the reconciliation process.
channelReconciled = "ChannelReconciled"
channelReconcileFailed = "ChannelReconcileFailed"
channelUpdateStatusFailed = "ChannelUpdateStatusFailed"
dispatcherDeploymentCreated = "DispatcherDeploymentCreated"
dispatcherDeploymentFailed = "DispatcherDeploymentFailed"
dispatcherServiceCreated = "DispatcherServiceCreated"
dispatcherServiceFailed = "DispatcherServiceFailed"
channelReconciled = "ChannelReconciled"
channelReconcileFailed = "ChannelReconcileFailed"
channelUpdateStatusFailed = "ChannelUpdateStatusFailed"
dispatcherDeploymentCreated = "DispatcherDeploymentCreated"
dispatcherDeploymentUpdated = "DispatcherDeploymentUpdated"
dispatcherDeploymentFailed = "DispatcherDeploymentFailed"
dispatcherDeploymentUpdateFailed = "DispatcherDeploymentUpdateFailed"
dispatcherServiceCreated = "DispatcherServiceCreated"
dispatcherServiceFailed = "DispatcherServiceFailed"

dispatcherDeploymentName = "kafka-ch-dispatcher"
dispatcherServiceName = "kafka-ch-dispatcher"
Expand Down Expand Up @@ -376,29 +378,41 @@ func (r *Reconciler) reconcile(ctx context.Context, kc *v1alpha1.KafkaChannel) e
}

func (r *Reconciler) reconcileDispatcher(ctx context.Context, dispatcherNamespace string, kc *v1alpha1.KafkaChannel) (*appsv1.Deployment, error) {
args := resources.DispatcherArgs{
DispatcherNamespace: dispatcherNamespace,
Image: r.dispatcherImage,
}
expected := resources.MakeDispatcher(args)
d, err := r.deploymentLister.Deployments(dispatcherNamespace).Get(dispatcherDeploymentName)
if err != nil {
if apierrs.IsNotFound(err) {
args := resources.DispatcherArgs{
DispatcherNamespace: dispatcherNamespace,
Image: r.dispatcherImage,
}
expected := resources.MakeDispatcher(args)
d, err := r.KubeClientSet.AppsV1().Deployments(dispatcherNamespace).Create(expected)
if err == nil {
r.Recorder.Event(kc, corev1.EventTypeNormal, dispatcherDeploymentCreated, "Dispatcher deployment created")
kc.Status.PropagateDispatcherStatus(&d.Status)
} else {
logging.FromContext(ctx).Error("Unable to create the dispatcher deployment", zap.Error(err))
r.Recorder.Eventf(kc, corev1.EventTypeWarning, dispatcherDeploymentFailed, "Failed to create the dispatcher deployment: %v", err)
kc.Status.MarkServiceFailed("DispatcherDeploymentFailed", "Failed to create the dispatcher deployment: %v", err)
kc.Status.MarkServiceFailed(dispatcherDeploymentFailed, "Failed to create the dispatcher deployment: %v", err)
}
return d, err
}

logging.FromContext(ctx).Error("Unable to get the dispatcher deployment", zap.Error(err))
kc.Status.MarkServiceUnknown("DispatcherDeploymentFailed", "Failed to get dispatcher deployment: %v", err)
return nil, err
} else if !reflect.DeepEqual(expected.Spec, d.Spec) {
logging.FromContext(ctx).Info("Deployment is not what we expect it to be, updating Deployment")
d, err := r.KubeClientSet.AppsV1().Deployments(dispatcherNamespace).Update(expected)
if err == nil {
r.Recorder.Event(kc, corev1.EventTypeNormal, dispatcherDeploymentUpdated, "Dispatcher deployment updated")
kc.Status.PropagateDispatcherStatus(&d.Status)
} else {
logging.FromContext(ctx).Error("Unable to update the dispatcher deployment", zap.Error(err))
r.Recorder.Eventf(kc, corev1.EventTypeWarning, dispatcherDeploymentUpdateFailed, "Failed to update the dispatcher deployment: %v", err)
kc.Status.MarkServiceFailed("DispatcherDeploymentUpdateFailed", "Failed to update the dispatcher deployment: %v", err)
}
return d, err
}

kc.Status.PropagateDispatcherStatus(&d.Status)
Expand Down
79 changes: 77 additions & 2 deletions kafka/channel/pkg/reconciler/controller/kafkachannel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,77 @@ func TestTopicExists(t *testing.T) {
dispatcherNamespace: testNS,
dispatcherDeploymentName: testDispatcherDeploymentName,
dispatcherServiceName: testDispatcherServiceName,
dispatcherImage: testDispatcherImage,
kafkaConfig: &KafkaConfig{
Brokers: []string{brokerName},
},
kafkachannelLister: listers.GetKafkaChannelLister(),
// TODO fix
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wanna do an issue for that ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lol :) Copy and paste from an existing test, so not sure what the actual TODO fix is :)
But, sure I'll create an issue :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

kafkachannelInformer: nil,
deploymentLister: listers.GetDeploymentLister(),
serviceLister: listers.GetServiceLister(),
endpointsLister: listers.GetEndpointsLister(),
kafkaClusterAdmin: &mockClusterAdmin{
mockCreateTopicFunc: func(topic string, detail *sarama.TopicDetail, validateOnly bool) error {
errMsg := sarama.ErrTopicAlreadyExists.Error()
return &sarama.TopicError{
Err: sarama.ErrTopicAlreadyExists,
ErrMsg: &errMsg,
}
},
},
kafkaClientSet: fakekafkaclient.Get(ctx),
}
}, zap.L()))
}

func TestDeploymentUpdatedOnImageChange(t *testing.T) {
kcKey := testNS + "/" + kcName
row := TableRow{
Name: "Works, topic already exists",
Key: kcKey,
Objects: []runtime.Object{
makeDeploymentWithImage("differentimage"),
makeService(),
makeReadyEndpoints(),
reconcilekafkatesting.NewKafkaChannel(kcName, testNS,
reconcilekafkatesting.WithKafkaFinalizer(finalizerName)),
},
WantErr: false,
WantCreates: []runtime.Object{
makeChannelService(reconcilekafkatesting.NewKafkaChannel(kcName, testNS)),
},
WantUpdates: []clientgotesting.UpdateActionImpl{{
Object: makeDeployment(),
}},
WantStatusUpdates: []clientgotesting.UpdateActionImpl{{
Object: reconcilekafkatesting.NewKafkaChannel(kcName, testNS,
reconcilekafkatesting.WithInitKafkaChannelConditions,
reconcilekafkatesting.WithKafkaFinalizer(finalizerName),
reconcilekafkatesting.WithKafkaChannelConfigReady(),
reconcilekafkatesting.WithKafkaChannelTopicReady(),
// reconcilekafkatesting.WithKafkaChannelDeploymentReady(),
reconcilekafkatesting.WithKafkaChannelServiceReady(),
reconcilekafkatesting.WithKafkaChannelEndpointsReady(),
reconcilekafkatesting.WithKafkaChannelChannelServiceReady(),
reconcilekafkatesting.WithKafkaChannelAddress(channelServiceAddress),
),
}},
WantEvents: []string{
Eventf(corev1.EventTypeNormal, dispatcherDeploymentUpdated, "Dispatcher deployment updated"),
Eventf(corev1.EventTypeNormal, channelReconciled, "KafkaChannel reconciled"),
},
}
defer logtesting.ClearAll()

row.Test(t, reconcilertesting.MakeFactory(func(ctx context.Context, listers *reconcilekafkatesting.Listers, cmw configmap.Watcher) controller.Reconciler {

return &Reconciler{
Base: reconciler.NewBase(ctx, controllerAgentName, cmw),
dispatcherNamespace: testNS,
dispatcherDeploymentName: testDispatcherDeploymentName,
dispatcherServiceName: testDispatcherServiceName,
dispatcherImage: testDispatcherImage,
kafkaConfig: &KafkaConfig{
Brokers: []string{brokerName},
},
Expand Down Expand Up @@ -493,13 +564,17 @@ func (ca *mockClusterAdmin) DeleteConsumerGroup(group string) error {
return nil
}

func makeDeployment() *appsv1.Deployment {
func makeDeploymentWithImage(image string) *appsv1.Deployment {
return resources.MakeDispatcher(resources.DispatcherArgs{
DispatcherNamespace: testNS,
Image: testDispatcherImage,
Image: image,
})
}

func makeDeployment() *appsv1.Deployment {
return makeDeploymentWithImage(testDispatcherImage)
}

func makeReadyDeployment() *appsv1.Deployment {
d := makeDeployment()
d.Status.Conditions = []appsv1.DeploymentCondition{{Type: appsv1.DeploymentAvailable, Status: corev1.ConditionTrue}}
Expand Down