Skip to content

Commit

Permalink
Added kafka config in the CR
Browse files Browse the repository at this point in the history
  • Loading branch information
OlivierCazade committed Apr 7, 2022
1 parent bd6de30 commit eeae529
Show file tree
Hide file tree
Showing 11 changed files with 301 additions and 48 deletions.
17 changes: 17 additions & 0 deletions api/v1alpha1/flowcollector_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,19 @@ type FlowCollectorIPFIX struct {
Sampling int32 `json:"sampling,omitempty" mapstructure:"sampling,omitempty"`
}

// FlowCollectorKafka defines the desired Kafka config of FlowCollector
type FlowCollectorKafka struct {
// Important: Run "make generate" to regenerate code after modifying this file

//+kubebuilder:default:=""
// Address of the kafka server
Address string `json:"address"`

//+kubebuilder:default:=""
// Address of the kafka topic to use
Topic string `json:"topic"`
}

// FlowCollectorFLP defines the desired flowlogs-pipeline state of FlowCollector
type FlowCollectorFLP struct {
// Important: Run "make generate" to regenerate code after modifying this file
Expand Down Expand Up @@ -128,6 +141,10 @@ type FlowCollectorFLP struct {
//+kubebuilder:default:=false
// PrintOutput is a debug flag to print flows exported in flowlogs-pipeline stdout
PrintOutput bool `json:"printOutput,omitempty"`

// Kafka configurations, if empty the operator will deploy a all-in-one FLP
// +optional
Kafka *FlowCollectorKafka `json:"kafka,omitempty"`
}

type FlowCollectorHPA struct {
Expand Down
20 changes: 20 additions & 0 deletions api/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 16 additions & 0 deletions config/crd/bases/flows.netobserv.io_flowcollectors.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1180,6 +1180,22 @@ spec:
- Always
- Never
type: string
kafka:
description: Kafka configurations, if empty the operator will
deploy a all-in-one FLP
properties:
address:
default: ""
description: Address of the kafka server
type: string
topic:
default: ""
description: Address of the kafka topic to use
type: string
required:
- address
- topic
type: object
kind:
default: DaemonSet
description: Kind is the workload kind, either DaemonSet or Deployment
Expand Down
2 changes: 1 addition & 1 deletion controllers/flowcollector_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func (r *FlowCollectorReconciler) Reconcile(ctx context.Context, req ctrl.Reques
}

// OVS config map for CNO
if err := ovsConfigController.Reconcile(ctx, desired); err != nil {
if err := ovsConfigController.Reconcile(ctx, desired, gfReconciler.GetServiceName(&desired.Spec.FlowlogsPipeline)); err != nil {
log.Error(err, "Failed to reconcile ovs-flows-config ConfigMap")
}

Expand Down
81 changes: 81 additions & 0 deletions controllers/flowcollector_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,14 @@ var _ = Describe("FlowCollector Controller", func() {
Name: constants.FLPName,
Namespace: otherNamespace,
}
gfKeyKafkaIngestor := types.NamespacedName{
Name: constants.FLPName + flowlogspipeline.FlpConfSuffix[flowlogspipeline.ConfKafkaIngestor],
Namespace: operatorNamespace,
}
gfKeyKafkaTransformer := types.NamespacedName{
Name: constants.FLPName + flowlogspipeline.FlpConfSuffix[flowlogspipeline.ConfKafkaTransformer],
Namespace: operatorNamespace,
}
cpKey1 := types.NamespacedName{
Name: "network-observability-plugin",
Namespace: operatorNamespace,
Expand Down Expand Up @@ -437,6 +445,79 @@ var _ = Describe("FlowCollector Controller", func() {
})
})

Context("Changing kafka config", func() {
It("Should update kafka config successfully", func() {
Eventually(func() error {
fc := flowsv1alpha1.FlowCollector{}
if err := k8sClient.Get(ctx, crKey, &fc); err != nil {
return err
}
fc.Spec.FlowlogsPipeline.Kafka = &flowsv1alpha1.FlowCollectorKafka{Address: "loaclhost:9092", Topic: "FLP"}
return k8sClient.Update(ctx, &fc)
}).Should(Succeed())
})

It("Should deploy kafka ingestor and transformer", func() {
By("Expecting ingestor daemonset to be created")
Eventually(func() interface{} {
return k8sClient.Get(ctx, gfKeyKafkaIngestor, &appsv1.DaemonSet{})
}, timeout, interval).Should(Succeed())

By("Expecting transformer deployment to be created")
Eventually(func() interface{} {
return k8sClient.Get(ctx, gfKeyKafkaTransformer, &appsv1.Deployment{})
}, timeout, interval).Should(Succeed())

By("Not Expecting transformer service to be created")
Eventually(func() interface{} {
return k8sClient.Get(ctx, gfKeyKafkaTransformer, &v1.Service{})
}, timeout, interval).Should(MatchError(`services "flowlogs-pipeline-ktransform" not found`))
})

It("Should delete previous flp deployment", func() {
By("Expecting deployment to be deleted")
Eventually(func() interface{} {
return k8sClient.Get(ctx, gfKey1, &appsv1.DaemonSet{})
}, timeout, interval).Should(MatchError(`daemonsets.apps "flowlogs-pipeline" not found`))

By("Expecting service to be deleted")
Eventually(func() interface{} {
return k8sClient.Get(ctx, gfKey1, &v1.Service{})
}, timeout, interval).Should(MatchError(`services "flowlogs-pipeline" not found`))
})

It("Should remove kafka config successfully", func() {
Eventually(func() error {
fc := flowsv1alpha1.FlowCollector{}
if err := k8sClient.Get(ctx, crKey, &fc); err != nil {
return err
}
fc.Spec.FlowlogsPipeline.Kafka = nil
return k8sClient.Update(ctx, &fc)
}).Should(Succeed())
})

It("Should deploy single flp again", func() {
By("Expecting daemonset to be created")
Eventually(func() interface{} {
return k8sClient.Get(ctx, gfKey1, &appsv1.DaemonSet{})
}, timeout, interval).Should(Succeed())
})

It("Should delete kafka ingestor and transformer", func() {
By("Expecting ingestor daemonset to be deleted")
Eventually(func() interface{} {
return k8sClient.Get(ctx, gfKeyKafkaIngestor, &appsv1.DaemonSet{})
}, timeout, interval).Should(MatchError(`daemonsets.apps "flowlogs-pipeline-kingestor" not found`))

By("Expecting transformer deployment to be deleted")
Eventually(func() interface{} {
return k8sClient.Get(ctx, gfKeyKafkaTransformer, &appsv1.Deployment{})
}, timeout, interval).Should(MatchError(`deployments.apps "flowlogs-pipeline-ktransform" not found`))
})

})

Context("Changing namespace", func() {
It("Should update namespace successfully", func() {
Eventually(func() error {
Expand Down
12 changes: 6 additions & 6 deletions controllers/flowlogspipeline/flp_objects.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ const (
ConfKafkaTransformer = "kafkaTransformer"
)

var flpConfSuffix = map[string]string{
var FlpConfSuffix = map[string]string{
ConfSingle: "",
ConfKafkaIngestor: "-kingestor",
ConfKafkaTransformer: "-ktransformer",
ConfKafkaTransformer: "-ktransform",
}

// PodConfigurationDigest is an annotation name to facilitate pod restart after
Expand All @@ -61,15 +61,15 @@ func newBuilder(ns string, desired *flowsv1alpha1.FlowCollectorFLP, desiredLoki
return builder{
namespace: ns,
labels: map[string]string{
"app": constants.FLPName + flpConfSuffix[confKind],
"app": constants.FLPName + FlpConfSuffix[confKind],
"version": version,
},
selector: map[string]string{
"app": constants.FLPName + flpConfSuffix[confKind],
"app": constants.FLPName + FlpConfSuffix[confKind],
},
desired: desired,
desiredLoki: desiredLoki,
confKindSuffix: flpConfSuffix[confKind],
confKindSuffix: FlpConfSuffix[confKind],
}
}

Expand Down Expand Up @@ -349,7 +349,7 @@ func (b *builder) autoScaler() *ascv2.HorizontalPodAutoscaler {

func buildAppLabel(confKind string) map[string]string {
return map[string]string{
"app": constants.FLPName + flpConfSuffix[confKind],
"app": constants.FLPName + FlpConfSuffix[confKind],
}
}

Expand Down
Loading

0 comments on commit eeae529

Please sign in to comment.