Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Kafka sink finalizer #177

Merged
14 changes: 6 additions & 8 deletions control-plane/pkg/apis/eventing/v1alpha1/kafka_sink_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,14 @@ package v1alpha1
import "knative.dev/pkg/apis"

const (
ConditionAddressable apis.ConditionType = "Addressable"
ConditionTopicReady apis.ConditionType = "TopicReady"
ConditionConfigMapUpdated apis.ConditionType = "ConfigMapUpdated"
ConditionAddressable apis.ConditionType = "Addressable"
)

var conditionSet = apis.NewLivingConditionSet(
ConditionAddressable,
ConditionTopicReady,
ConditionConfigMapUpdated,
)
var conditionSet apis.ConditionSet

func RegisterConditionSet(cs apis.ConditionSet) {
conditionSet = cs
}

func (ks *KafkaSink) GetConditionSet() apis.ConditionSet {
return conditionSet
Expand Down
3 changes: 2 additions & 1 deletion control-plane/pkg/core/config/egress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@
package config

import (
"knative.dev/eventing-kafka-broker/control-plane/pkg/contract"
"testing"

"knative.dev/eventing-kafka-broker/control-plane/pkg/contract"

"k8s.io/apimachinery/pkg/types"
)

Expand Down
16 changes: 16 additions & 0 deletions control-plane/pkg/core/config/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,19 @@ func AddOrUpdateResourceConfig(contract *contract.Contract, resource *contract.R
logger.Debug("Resource doesn't exist")
}
}

// DeleteResource deletes the resource at the given index from Resources.
func DeleteResource(ct *contract.Contract, index int) {

if len(ct.Resources) == 1 {
*ct = contract.Contract{
Generation: ct.Generation,
}
return
}

// replace the resource to be deleted with the last one.
ct.Resources[index] = ct.Resources[len(ct.Resources)-1]
// truncate the array.
ct.Resources = ct.Resources[:len(ct.Resources)-1]
}
115 changes: 114 additions & 1 deletion control-plane/pkg/core/config/resource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@
package config

import (
"testing"

"github.com/stretchr/testify/assert"
"google.golang.org/protobuf/testing/protocmp"
"knative.dev/eventing-kafka-broker/control-plane/pkg/contract"
"testing"

"github.com/google/go-cmp/cmp"
"go.uber.org/zap"
Expand Down Expand Up @@ -279,3 +281,114 @@ func TestAddOrUpdateResourcesConfig(t *testing.T) {
})
}
}

func TestDeleteResource(t *testing.T) {

tests := []struct {
name string
ct *contract.Contract
index int
want contract.Contract
}{
{
name: "1 resource",
ct: &contract.Contract{
Resources: []*contract.Resource{
{
Id: "1",
},
},
Generation: 200,
},
index: 0,
want: contract.Contract{
Generation: 200,
},
},
{
name: "2 resource",
ct: &contract.Contract{
Resources: []*contract.Resource{
{
Id: "1",
},
{
Id: "2",
},
},
},
index: 0,
want: contract.Contract{
Resources: []*contract.Resource{
{
Id: "2",
},
},
},
},
{
name: "3 resource - delete last",
ct: &contract.Contract{
Resources: []*contract.Resource{
{
Id: "1",
},
{
Id: "2",
},
{
Id: "3",
},
},
},
index: 2,
want: contract.Contract{
Resources: []*contract.Resource{
{
Id: "1",
},
{
Id: "2",
},
},
},
},
{
name: "3 broker - middle",
ct: &contract.Contract{
Resources: []*contract.Resource{
{
Id: "1",
},
{
Id: "2",
},
{
Id: "3",
},
},
Generation: 200,
},
index: 1,
want: contract.Contract{
Resources: []*contract.Resource{
{
Id: "1",
},
{
Id: "3",
},
},
Generation: 200,
},
},
}
for i := range tests {
t.Run(tests[i].name, func(t *testing.T) {

DeleteResource(tests[i].ct, tests[i].index)

assert.Equal(t, tests[i].ct, &tests[i].want)
})
}
}
1 change: 1 addition & 0 deletions control-plane/pkg/core/config/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package config

import (
"fmt"

"knative.dev/eventing-kafka-broker/control-plane/pkg/contract"

eventing "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/eventing/v1alpha1"
Expand Down
3 changes: 2 additions & 1 deletion control-plane/pkg/core/config/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@
package config

import (
"knative.dev/eventing-kafka-broker/control-plane/pkg/contract"
"testing"

"knative.dev/eventing-kafka-broker/control-plane/pkg/contract"

eventing "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/eventing/v1alpha1"
)

Expand Down
38 changes: 37 additions & 1 deletion control-plane/pkg/reconciler/base/receiver_condition_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,12 @@ var ConditionSet = apis.NewLivingConditionSet(
)

const (
TopicOwnerAnnotation = "eventing.knative.dev/topic.owner"

ReasonDataPlaneNotAvailable = "Data plane not available"
MessageDataPlaneNotAvailable = "Did you install the data plane for this component?"

ReasonTopicNotPresent = "Topic is not present"
)

type Object interface {
Expand Down Expand Up @@ -145,7 +149,17 @@ func (manager *StatusConditionManager) FailedToCreateTopic(topic string, err err
return fmt.Errorf("failed to create topic: %s: %w", topic, err)
}

func (manager *StatusConditionManager) TopicCreated(topic string) {
func (manager *StatusConditionManager) TopicReady(topic string) {

if owner, ok := manager.Object.GetStatus().Annotations[TopicOwnerAnnotation]; ok {
manager.Object.GetConditionSet().Manage(manager.Object.GetStatus()).MarkTrueWithReason(
ConditionTopicReady,
fmt.Sprintf("Topic %s (owner %s)", topic, owner),
"",
)

return
}

manager.Object.GetConditionSet().Manage(manager.Object.GetStatus()).MarkTrueWithReason(
ConditionTopicReady,
Expand Down Expand Up @@ -206,3 +220,25 @@ func (manager *StatusConditionManager) FailedToResolveConfig(err error) reconcil
func (manager *StatusConditionManager) ConfigResolved() {
manager.Object.GetConditionSet().Manage(manager.Object.GetStatus()).MarkTrue(ConditionConfigParsed)
}

func (manager *StatusConditionManager) TopicNotPresentOrInvalidErr(err error) error {
manager.Object.GetConditionSet().Manage(manager.Object.GetStatus()).MarkFalse(
ConditionTopicReady,
ReasonTopicNotPresent,
err.Error(),
)

return fmt.Errorf("topic is not present: %w", err)
}

func (manager *StatusConditionManager) TopicNotPresentOrInvalid() error {

manager.Object.GetConditionSet().Manage(manager.Object.GetStatus()).MarkFalse(
ConditionTopicReady,
ReasonTopicNotPresent,
"Check topic configuration",
)

return fmt.Errorf("topic is not present: check topic configuration")

}
1 change: 1 addition & 0 deletions control-plane/pkg/reconciler/base/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package base
import (
"context"
"fmt"

"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/proto"
"knative.dev/eventing-kafka-broker/control-plane/pkg/contract"
Expand Down
47 changes: 17 additions & 30 deletions control-plane/pkg/reconciler/broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@ package broker
import (
"context"
"fmt"
"knative.dev/eventing-kafka-broker/control-plane/pkg/contract"
"math"
"strings"
"sync"

"knative.dev/eventing-kafka-broker/control-plane/pkg/contract"

"github.com/Shopify/sarama"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -106,7 +107,7 @@ func (r *Reconciler) reconcileKind(ctx context.Context, broker *eventing.Broker)
if err != nil {
return statusConditionManager.FailedToCreateTopic(topic, err)
}
statusConditionManager.TopicCreated(topic)
statusConditionManager.TopicReady(topic)

logger.Debug("Topic created", zap.Any("topic", topic))

Expand All @@ -119,14 +120,14 @@ func (r *Reconciler) reconcileKind(ctx context.Context, broker *eventing.Broker)
logger.Debug("Got contract config map")

// Get contract data.
contract, err := r.GetDataPlaneConfigMapData(logger, contractConfigMap)
if err != nil && contract == nil {
ct, err := r.GetDataPlaneConfigMapData(logger, contractConfigMap)
if err != nil && ct == nil {
return statusConditionManager.FailedToGetDataFromConfigMap(err)
}

logger.Debug(
"Got contract data from config map",
zap.Any(base.ContractLogKey, (*log.ContractMarshaller)(contract)),
zap.Any(base.ContractLogKey, (*log.ContractMarshaller)(ct)),
)

// Get resource configuration.
Expand All @@ -135,15 +136,15 @@ func (r *Reconciler) reconcileKind(ctx context.Context, broker *eventing.Broker)
return statusConditionManager.FailedToGetConfig(err)
}

brokerIndex := coreconfig.FindResource(contract, broker.UID)
brokerIndex := coreconfig.FindResource(ct, broker.UID)
// Update contract data with the new contract configuration
coreconfig.AddOrUpdateResourceConfig(contract, brokerResource, brokerIndex, logger)
coreconfig.AddOrUpdateResourceConfig(ct, brokerResource, brokerIndex, logger)

// Increment volumeGeneration
contract.Generation = incrementContractGeneration(contract.Generation)
ct.Generation = incrementContractGeneration(ct.Generation)

// Update the configuration map with the new contract data.
if err := r.UpdateDataPlaneConfigMap(ctx, contract, contractConfigMap); err != nil {
if err := r.UpdateDataPlaneConfigMap(ctx, ct, contractConfigMap); err != nil {
logger.Error("failed to update data plane config map", zap.Error(
statusConditionManager.FailedToUpdateConfigMap(err),
))
Expand All @@ -160,7 +161,7 @@ func (r *Reconciler) reconcileKind(ctx context.Context, broker *eventing.Broker)
// the update even if here eventually means seconds or minutes after the actual update.

// Update volume generation annotation of receiver pods
if err := r.UpdateReceiverPodsAnnotation(ctx, logger, contract.Generation); err != nil {
if err := r.UpdateReceiverPodsAnnotation(ctx, logger, ct.Generation); err != nil {
logger.Error("Failed to update receiver pod annotation", zap.Error(
statusConditionManager.FailedToUpdateReceiverPodsAnnotation(err),
))
Expand All @@ -170,7 +171,7 @@ func (r *Reconciler) reconcileKind(ctx context.Context, broker *eventing.Broker)
logger.Debug("Updated receiver pod annotation")

// Update volume generation annotation of dispatcher pods
if err := r.UpdateDispatcherPodsAnnotation(ctx, logger, contract.Generation); err != nil {
if err := r.UpdateDispatcherPodsAnnotation(ctx, logger, ct.Generation); err != nil {
// Failing to update dispatcher pods annotation leads to config map refresh delayed by several seconds.
// Since the dispatcher side is the consumer side, we don't lose availability, and we can consider the Broker
// ready. So, log out the error and move on to the next step.
Expand Down Expand Up @@ -206,24 +207,24 @@ func (r *Reconciler) finalizeKind(ctx context.Context, broker *eventing.Broker)
logger.Debug("Got contract config map")

// Get contract data.
contract, err := r.GetDataPlaneConfigMapData(logger, contractConfigMap)
ct, err := r.GetDataPlaneConfigMapData(logger, contractConfigMap)
if err != nil {
return fmt.Errorf("failed to get contract: %w", err)
}

logger.Debug(
"Got contract data from config map",
zap.Any(base.ContractLogKey, (*log.ContractMarshaller)(contract)),
zap.Any(base.ContractLogKey, (*log.ContractMarshaller)(ct)),
)

brokerIndex := coreconfig.FindResource(contract, broker.UID)
brokerIndex := coreconfig.FindResource(ct, broker.UID)
if brokerIndex != coreconfig.NoResource {
deleteBroker(contract, brokerIndex)
coreconfig.DeleteResource(ct, brokerIndex)

logger.Debug("Broker deleted", zap.Int("index", brokerIndex))

// Update the configuration map with the new contract data.
if err := r.UpdateDataPlaneConfigMap(ctx, contract, contractConfigMap); err != nil {
if err := r.UpdateDataPlaneConfigMap(ctx, ct, contractConfigMap); err != nil {
return err
}

Expand Down Expand Up @@ -369,20 +370,6 @@ func (r *Reconciler) SetDefaultTopicDetails(topicDetail sarama.TopicDetail) {
r.KafkaDefaultTopicDetails = topicDetail
}

func deleteBroker(c *contract.Contract, index int) {
if len(c.Resources) == 1 {
*c = contract.Contract{
Generation: c.Generation,
}
return
}

// replace the resource to be deleted with the last one.
c.Resources[index] = c.Resources[len(c.Resources)-1]
// truncate the array.
c.Resources = c.Resources[:len(c.Resources)-1]
}

func (r *Reconciler) getDefaultBootstrapServersOrFail() ([]string, error) {
r.bootstrapServersLock.RLock()
defer r.bootstrapServersLock.RUnlock()
Expand Down
Loading