Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NETOBSERV-1248: PoC k8s cache #684

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 131 additions & 0 deletions controllers/flp/flp_cache_objects.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
package flp

import (
"fmt"
"hash/fnv"
"strconv"
"strings"

"github.com/netobserv/flowlogs-pipeline/pkg/api"
"github.com/netobserv/network-observability-operator/controllers/constants"
"github.com/netobserv/network-observability-operator/pkg/helper"
"gopkg.in/yaml.v2"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

const (
flpCacheName = constants.FLPName + "-cache"
flpCacheConfigMap = "flp-cache-config"
flpCacheTopic = "informers"
)

func (b *builder) cachePodTemplate(annotations map[string]string) corev1.PodTemplateSpec {
advancedConfig := helper.GetAdvancedProcessorConfig(b.desired.Processor.Advanced)
var ports []corev1.ContainerPort

if advancedConfig.ProfilePort != nil {
ports = append(ports, corev1.ContainerPort{
Name: profilePortName,
ContainerPort: *advancedConfig.ProfilePort,
Protocol: corev1.ProtocolTCP,
})
}

volumeMounts := b.volumes.AppendMounts([]corev1.VolumeMount{{
MountPath: configPath + "-cache",
Name: configVolume + "-cache",
}})
volumes := b.volumes.AppendVolumes([]corev1.Volume{{
Name: configVolume + "-cache",
VolumeSource: corev1.VolumeSource{
ConfigMap: &corev1.ConfigMapVolumeSource{
LocalObjectReference: corev1.LocalObjectReference{
Name: flpCacheConfigMap,
},
},
},
}})

var envs []corev1.EnvVar
envs = append(envs, constants.EnvNoHTTP2)
imageName := strings.Replace(b.info.Image, "flowlogs-pipeline", "flowlogs-pipeline-cache", 1)

container := corev1.Container{
Name: flpCacheName,
Image: imageName,
ImagePullPolicy: corev1.PullPolicy(b.desired.Processor.ImagePullPolicy),
Args: []string{fmt.Sprintf(`--config=%s/%s`, configPath+"-cache", configFile)},
Resources: *b.desired.Processor.Resources.DeepCopy(),
VolumeMounts: volumeMounts,
Ports: ports,
Env: envs,
SecurityContext: helper.ContainerDefaultSecurityContext(),
}
return corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: b.cacheLabels,
Annotations: annotations,
},
Spec: corev1.PodSpec{
Volumes: volumes,
Containers: []corev1.Container{container},
ServiceAccountName: b.name(),
NodeSelector: advancedConfig.Scheduling.NodeSelector,
Tolerations: advancedConfig.Scheduling.Tolerations,
Affinity: advancedConfig.Scheduling.Affinity,
PriorityClassName: advancedConfig.Scheduling.PriorityClassName,
},
}
}

type CacheConfig struct {
KubeConfigPath string `yaml:"kubeConfigPath"`
KafkaConfig api.EncodeKafka `yaml:"kafkaConfig"`
PProfPort int32 `yaml:"pprofPort"`
LogLevel string `yaml:"logLevel"`
}

// returns a configmap with a digest of its configuration contents, which will be used to
// detect any configuration change
func (b *builder) cacheConfigMap() (*corev1.ConfigMap, string, error) {
// Re-use the initial stage (which should be Kafka ingester), with a different topic
// TODO: that's ugly and deserves more refactoring
params := b.pipeline.GetStageParams()[0]

kafkaSpec := b.desired.Kafka
cc := CacheConfig{
LogLevel: b.desired.Processor.LogLevel,
KafkaConfig: api.EncodeKafka{
Address: kafkaSpec.Address,
Topic: flpCacheTopic,
TLS: params.Ingest.Kafka.TLS,
SASL: params.Ingest.Kafka.SASL,
},
}
advancedConfig := helper.GetAdvancedProcessorConfig(b.desired.Processor.Advanced)
if advancedConfig.ProfilePort != nil {
cc.PProfPort = *advancedConfig.ProfilePort
}

bs, err := yaml.Marshal(cc)
if err != nil {
return nil, "", err
}

configMap := corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: flpCacheConfigMap,
Namespace: b.info.Namespace,
Labels: b.cacheLabels,
},
Data: map[string]string{
configFile: string(bs),
},
}
hasher := fnv.New64a()
_, _ = hasher.Write(bs)
digest := strconv.FormatUint(hasher.Sum64(), 36)
return &configMap, digest, nil
}
10 changes: 9 additions & 1 deletion controllers/flp/flp_common_objects.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ type Builder struct {
info *reconcilers.Instance
labels map[string]string
selector map[string]string
cacheLabels map[string]string
cacheSelector map[string]string
desired *flowslatest.FlowCollectorSpec
flowMetrics *metricslatest.FlowMetricList
detectedSubnets []flowslatest.SubnetLabel
Expand Down Expand Up @@ -95,6 +97,13 @@ func NewBuilder(info *reconcilers.Instance, desired *flowslatest.FlowCollectorSp
selector: map[string]string{
"app": name,
},
cacheLabels: map[string]string{
"app": flpCacheName,
"version": helper.MaxLabelLength(version),
},
cacheSelector: map[string]string{
"app": flpCacheName,
},
desired: desired,
flowMetrics: flowMetrics,
detectedSubnets: detectedSubnets,
Expand Down Expand Up @@ -192,7 +201,6 @@ func (b *builder) podTemplate(hasHostPort, hostNetwork bool, annotations map[str
}})

var envs []corev1.EnvVar
advancedConfig = helper.GetAdvancedProcessorConfig(b.desired.Processor.Advanced)
// we need to sort env map to keep idempotency,
// as equal maps could be iterated in different order
for _, pair := range helper.KeySorted(advancedConfig.Env) {
Expand Down
15 changes: 14 additions & 1 deletion controllers/flp/flp_pipeline_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,18 @@ func (b *PipelineBuilder) AddProcessorStages() error {
}...)
}

// enrichment using Kafka cache
var kafkaCache *api.IngestKafka
if helper.UseKafka(b.desired) {
// Re-use the initial stage (which should be Kafka ingester), with a different topic
// TODO: that's ugly and deserves more refactoring
params := b.GetStageParams()[0]
kc := *params.Ingest.Kafka
kc.Topic = flpCacheTopic
kc.GroupID = ""
kafkaCache = &kc
}

// enrich stage (transform) configuration
enrichedStage := lastStage.TransformNetwork("enrich", api.TransformNetwork{
Rules: rules,
Expand All @@ -135,7 +147,8 @@ func (b *PipelineBuilder) AddProcessorStages() error {
DstHostField: "DstK8S_HostIP",
FlowDirectionField: "FlowDirection",
},
SubnetLabels: flpLabels,
SubnetLabels: flpLabels,
KafkaCacheConfig: kafkaCache,
})

// loki stage (write) configuration
Expand Down
19 changes: 19 additions & 0 deletions controllers/flp/flp_transfo_objects.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
corev1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/ptr"

flowslatest "github.com/netobserv/network-observability-operator/apis/flowcollector/v1beta2"
metricslatest "github.com/netobserv/network-observability-operator/apis/flowmetrics/v1alpha1"
Expand Down Expand Up @@ -41,6 +42,24 @@ func (b *transfoBuilder) deployment(annotations map[string]string) *appsv1.Deplo
}
}

func (b *transfoBuilder) cacheDeployment(annotations map[string]string) *appsv1.Deployment {
pod := b.generic.cachePodTemplate(annotations)
return &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: flpCacheName,
Namespace: b.generic.info.Namespace,
Labels: b.generic.cacheLabels,
},
Spec: appsv1.DeploymentSpec{
Replicas: ptr.To(int32(1)),
Selector: &metav1.LabelSelector{
MatchLabels: b.generic.cacheSelector,
},
Template: pod,
},
}
}

func (b *transfoBuilder) staticConfigMap() (*corev1.ConfigMap, string, error) {
pipeline := b.generic.NewKafkaPipeline()
err := pipeline.AddProcessorStages()
Expand Down
39 changes: 32 additions & 7 deletions controllers/flp/flp_transfo_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@
type transformerReconciler struct {
*reconcilers.Instance
deployment *appsv1.Deployment
cacheDeployment *appsv1.Deployment
promService *corev1.Service
hpa *ascv2.HorizontalPodAutoscaler
serviceAccount *corev1.ServiceAccount
staticConfigMap *corev1.ConfigMap
dynamicConfigMap *corev1.ConfigMap
cacheConfigMap *corev1.ConfigMap
roleBinding *rbacv1.ClusterRoleBinding
serviceMonitor *monitoringv1.ServiceMonitor
prometheusRule *monitoringv1.PrometheusRule
Expand All @@ -37,11 +39,13 @@
rec := transformerReconciler{
Instance: cmn,
deployment: cmn.Managed.NewDeployment(name),
cacheDeployment: cmn.Managed.NewDeployment(flpCacheName),
promService: cmn.Managed.NewService(promServiceName(ConfKafkaTransformer)),
hpa: cmn.Managed.NewHPA(name),
serviceAccount: cmn.Managed.NewServiceAccount(name),
staticConfigMap: cmn.Managed.NewConfigMap(staticConfigMapName(ConfKafkaTransformer)),
dynamicConfigMap: cmn.Managed.NewConfigMap(dynamicConfigMapName(ConfKafkaTransformer)),
cacheConfigMap: cmn.Managed.NewConfigMap(flpCacheConfigMap),
roleBinding: cmn.Managed.NewCRB(RoleBindingName(ConfKafkaTransformer)),
}
if cmn.AvailableAPIs.HasSvcMonitor() {
Expand All @@ -67,7 +71,7 @@
return &r.Status
}

func (r *transformerReconciler) reconcile(ctx context.Context, desired *flowslatest.FlowCollector, flowMetrics *metricslatest.FlowMetricList, detectedSubnets []flowslatest.SubnetLabel) error {

Check failure on line 74 in controllers/flp/flp_transfo_reconciler.go

View workflow job for this annotation

GitHub Actions / Build, lint, test (1.22)

calculated cyclomatic complexity for function reconcile is 23, max is 20 (cyclop)
// Retrieve current owned objects
err := r.Managed.FetchAll(ctx)
if err != nil {
Expand All @@ -86,13 +90,13 @@
if err != nil {
return err
}

// Main, static config map
newSCM, configDigest, err := builder.staticConfigMap()
if err != nil {
return err
}
annotations := map[string]string{
constants.PodConfigurationDigest: configDigest,
}
annotations := map[string]string{constants.PodConfigurationDigest: configDigest}
if !r.Managed.Exists(r.staticConfigMap) {
if err := r.CreateOwned(ctx, newSCM); err != nil {
return err
Expand All @@ -103,6 +107,23 @@
}
}

// Cache config map
// TODO: factorize with main static CM code
newCCM, configDigest, err := builder.generic.cacheConfigMap()
if err != nil {
return err
}
cacheAnnotations := map[string]string{constants.PodConfigurationDigest: configDigest}
if !r.Managed.Exists(r.cacheConfigMap) {
if err := r.CreateOwned(ctx, newCCM); err != nil {
return err
}
} else if !equality.Semantic.DeepDerivative(newCCM.Data, r.cacheConfigMap.Data) {
if err := r.UpdateIfOwned(ctx, r.cacheConfigMap, newCCM); err != nil {
return err
}
}

if err := r.reconcileDynamicConfigMap(ctx, &builder); err != nil {
return err
}
Expand All @@ -125,6 +146,7 @@
}

// Watch for Kafka certificate if necessary; need to restart pods in case of cert rotation
// TODO: cacheAnnotations
if err = annotateKafkaCerts(ctx, r.Common, &desired.Spec.Kafka, "kafka", annotations); err != nil {
return err
}
Expand All @@ -137,7 +159,7 @@
return err
}

if err = r.reconcileDeployment(ctx, &desired.Spec.Processor, &builder, annotations); err != nil {
if err = r.reconcileDeployment(ctx, &desired.Spec.Processor, &builder, annotations, cacheAnnotations); err != nil {
return err
}

Expand All @@ -161,11 +183,11 @@
return nil
}

func (r *transformerReconciler) reconcileDeployment(ctx context.Context, desiredFLP *flowslatest.FlowCollectorFLP, builder *transfoBuilder, annotations map[string]string) error {
func (r *transformerReconciler) reconcileDeployment(ctx context.Context, desiredFLP *flowslatest.FlowCollectorFLP, builder *transfoBuilder, annotations, cacheAnnots map[string]string) error {
report := helper.NewChangeReport("FLP Deployment")
defer report.LogIfNeeded(ctx)

return reconcilers.ReconcileDeployment(
if err := reconcilers.ReconcileDeployment(
ctx,
r.Instance,
r.deployment,
Expand All @@ -174,7 +196,10 @@
helper.PtrInt32(desiredFLP.KafkaConsumerReplicas),
&desiredFLP.KafkaConsumerAutoscaler,
&report,
)
); err != nil {
return err
}
return reconcilers.ReconcileDeployment(ctx, r.Instance, r.cacheDeployment, builder.cacheDeployment(cacheAnnots), flpCacheName, 1, nil, &report)
}

func (r *transformerReconciler) reconcileHPA(ctx context.Context, desiredFLP *flowslatest.FlowCollectorFLP, builder *transfoBuilder) error {
Expand Down
8 changes: 5 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ require (
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/prometheus/client_golang v1.19.0 // indirect
github.com/prometheus/client_golang v1.19.1 // indirect
github.com/prometheus/client_model v0.5.0 // indirect
github.com/prometheus/procfs v0.12.0 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
Expand All @@ -66,15 +66,15 @@ require (
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/exp v0.0.0-20230905200255-921286631fa9 // indirect
golang.org/x/net v0.25.0 // indirect
golang.org/x/oauth2 v0.17.0 // indirect
golang.org/x/oauth2 v0.18.0 // indirect
golang.org/x/sys v0.20.0 // indirect
golang.org/x/term v0.20.0 // indirect
golang.org/x/text v0.15.0 // indirect
golang.org/x/time v0.5.0 // indirect
golang.org/x/tools v0.21.0 // indirect
gomodules.xyz/jsonpatch/v2 v2.4.0 // indirect
google.golang.org/appengine v1.6.8 // indirect
google.golang.org/protobuf v1.33.0 // indirect
google.golang.org/protobuf v1.34.1 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/klog/v2 v2.120.1 // indirect
Expand All @@ -84,3 +84,5 @@ require (
)

replace github.com/prometheus/common v0.48.0 => github.com/netobserv/prometheus-common v0.48.0-netobserv

replace github.com/netobserv/flowlogs-pipeline => ../flowlogs-pipeline
Loading
Loading