Skip to content

Commit

Permalink
NETOBSERV-627: direct-flp
Browse files Browse the repository at this point in the history
- move all related code in file in_process_reconciler.go
- reconcile Loki roles, prometheus service/sm, cert watching etc.
  • Loading branch information
jotak committed Feb 16, 2024
1 parent 3531e28 commit 4fa2940
Show file tree
Hide file tree
Showing 7 changed files with 192 additions and 80 deletions.
57 changes: 25 additions & 32 deletions controllers/ebpf/agent_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"strings"

flowslatest "github.com/netobserv/network-observability-operator/apis/flowcollector/v1beta2"
metricslatest "github.com/netobserv/network-observability-operator/apis/flowmetrics/v1alpha1"
"github.com/netobserv/network-observability-operator/controllers/constants"
"github.com/netobserv/network-observability-operator/controllers/ebpf/internal/permissions"
"github.com/netobserv/network-observability-operator/controllers/flp"
Expand All @@ -24,7 +23,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
)

Expand Down Expand Up @@ -127,7 +125,17 @@ func (c *AgentController) Reconcile(ctx context.Context, target *flowslatest.Flo
if err := c.permissions.Reconcile(ctx, &target.Spec.Agent.EBPF); err != nil {
return fmt.Errorf("reconciling permissions: %w", err)
}
desired, err := c.desired(ctx, target, rlog)

var inprocFLPInfo *flp.InProcessInfo
if helper.UseMergedAgentFLP(&target.Spec) {
// Direct-FLP mode
inprocFLPInfo, err = flp.ReconcileInProcess(ctx, c.Instance, target)
if err != nil {
return fmt.Errorf("reconciling in-process FLP: %w", err)
}
}

desired, err := c.desired(ctx, target, inprocFLPInfo, rlog)
if err != nil {
return err
}
Expand Down Expand Up @@ -174,28 +182,30 @@ func newMountPropagationMode(m corev1.MountPropagationMode) *corev1.MountPropaga
return mode
}

func (c *AgentController) desired(ctx context.Context, coll *flowslatest.FlowCollector, rlog logr.Logger) (*v1.DaemonSet, error) {
func (c *AgentController) desired(ctx context.Context, coll *flowslatest.FlowCollector, inprocFLPInfo *flp.InProcessInfo, rlog logr.Logger) (*v1.DaemonSet, error) {
if coll == nil || !helper.UseEBPF(&coll.Spec) {
return nil, nil
}
version := helper.ExtractVersion(c.Image)
annotations := make(map[string]string)

fm := metricslatest.FlowMetricList{}
if !helper.UseKafka(&coll.Spec) {
// Direct-FLP mode => list custom metrics
if err := c.List(ctx, &fm, &client.ListOptions{Namespace: coll.Spec.Namespace}); err != nil {
return nil, c.Status.Error("CantListFlowMetrics", err)
}
}

env, err := c.envConfig(ctx, coll, annotations, &fm)
env, err := c.envConfig(ctx, coll, annotations, inprocFLPInfo)
if err != nil {
return nil, err
}
volumeMounts := c.volumes.GetMounts()
volumes := c.volumes.GetVolumes()

if inprocFLPInfo != nil {
// Merge annotations
for k, v := range inprocFLPInfo.Annotations {
annotations[k] = v
}
// Add volumes
volumes = inprocFLPInfo.Volumes.AppendVolumes(volumes)
volumeMounts = inprocFLPInfo.Volumes.AppendMounts(volumeMounts)
}

if helper.IsPrivileged(&coll.Spec.Agent.EBPF) {
volume := corev1.Volume{
Name: bpfNetNSMountName,
Expand Down Expand Up @@ -278,7 +288,7 @@ func (c *AgentController) desired(ctx context.Context, coll *flowslatest.FlowCol
}, nil
}

func (c *AgentController) envConfig(ctx context.Context, coll *flowslatest.FlowCollector, annots map[string]string, metrics *metricslatest.FlowMetricList) ([]corev1.EnvVar, error) {
func (c *AgentController) envConfig(ctx context.Context, coll *flowslatest.FlowCollector, annots map[string]string, inprocFLPInfo *flp.InProcessInfo) ([]corev1.EnvVar, error) {
config := c.setEnvConfig(coll)

if helper.UseKafka(&coll.Spec) {
Expand Down Expand Up @@ -333,10 +343,6 @@ func (c *AgentController) envConfig(ctx context.Context, coll *flowslatest.FlowC
)
}
} else {
flpConfig, err := c.buildFLPConfig(&coll.Spec, metrics)
if err != nil {
return nil, err
}
debugConfig := helper.GetAdvancedProcessorConfig(coll.Spec.Processor.Advanced)
config = append(config,
corev1.EnvVar{
Expand All @@ -345,7 +351,7 @@ func (c *AgentController) envConfig(ctx context.Context, coll *flowslatest.FlowC
},
corev1.EnvVar{
Name: envFLPConfig,
Value: flpConfig,
Value: inprocFLPInfo.JSONConfig,
},
corev1.EnvVar{
Name: envFlowsTargetHost,
Expand All @@ -365,19 +371,6 @@ func (c *AgentController) envConfig(ctx context.Context, coll *flowslatest.FlowC
return config, nil
}

func (c *AgentController) buildFLPConfig(desired *flowslatest.FlowCollectorSpec, metrics *metricslatest.FlowMetricList) (string, error) {
flpBuilder, err := flp.NewBuilder(c.NewInstance(c.Image, c.Status), desired, metrics, flp.ConfMonolith)
if err != nil {
return "", err
}
pipeline := flpBuilder.NewInProcessPipeline()
err = pipeline.AddProcessorStages()
if err != nil {
return "", err
}
return flpBuilder.GetJSONConfig()
}

func requiredAction(current, desired *v1.DaemonSet) reconcileAction {
if desired == nil {
return actionNone
Expand Down
28 changes: 0 additions & 28 deletions controllers/ebpf/internal/permissions/permissions.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,11 @@ import (

flowslatest "github.com/netobserv/network-observability-operator/apis/flowcollector/v1beta2"
"github.com/netobserv/network-observability-operator/controllers/constants"
"github.com/netobserv/network-observability-operator/controllers/flp"
"github.com/netobserv/network-observability-operator/controllers/reconcilers"
"github.com/netobserv/network-observability-operator/pkg/helper"

osv1 "github.com/openshift/api/security/v1"
v1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -47,9 +45,6 @@ func (c *Reconciler) Reconcile(ctx context.Context, desired *flowslatest.FlowCol
if err := c.reconcileVendorPermissions(ctx, desired); err != nil {
return fmt.Errorf("reconciling vendor permissions: %w", err)
}
if err := c.reconcileRoles(ctx); err != nil {
return fmt.Errorf("reconciling roles: %w", err)
}
return nil
}

Expand Down Expand Up @@ -227,26 +222,3 @@ func (c *Reconciler) cleanupPreviousNamespace(ctx context.Context) error {
}
return nil
}

func (c *Reconciler) reconcileRoles(ctx context.Context) error {
cr := flp.BuildClusterRoleTransformer()
if err := c.ReconcileClusterRole(ctx, cr); err != nil {
return err
}
crb := &rbacv1.ClusterRoleBinding{
ObjectMeta: metav1.ObjectMeta{
Name: cr.Name + "-agent",
},
RoleRef: rbacv1.RoleRef{
APIGroup: "rbac.authorization.k8s.io",
Kind: "ClusterRole",
Name: cr.Name,
},
Subjects: []rbacv1.Subject{{
Kind: "ServiceAccount",
Name: constants.EBPFServiceAccount,
Namespace: c.PrivilegedNamespace(),
}},
}
return c.ReconcileClusterRoleBinding(ctx, crb)
}
18 changes: 12 additions & 6 deletions controllers/flp/flp_common_objects.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ const (
livenessPeriodSeconds = 10
startupFailureThreshold = 5
startupPeriodSeconds = 10
appLabel = "app"
)

type ConfKind string
Expand Down Expand Up @@ -87,11 +88,11 @@ func NewBuilder(info *reconcilers.Instance, desired *flowslatest.FlowCollectorSp
return builder{
info: info,
labels: map[string]string{
"app": name,
appLabel: name,
"version": helper.MaxLabelLength(version),
},
selector: map[string]string{
"app": name,
appLabel: name,
},
desired: desired,
flowMetrics: flowMetrics,
Expand Down Expand Up @@ -155,6 +156,11 @@ func (b *builder) initPipeline(ingest config.PipelineBuilderStage) PipelineBuild
return pipeline
}

func (b *builder) overrideApp(app string) {
b.labels[appLabel] = app
b.selector[appLabel] = app
}

func (b *builder) portProtocol() corev1.Protocol {
if helper.UseEBPF(b.desired) {
return corev1.ProtocolTCP
Expand Down Expand Up @@ -375,7 +381,7 @@ func (b *builder) serviceAccount() *corev1.ServiceAccount {
Name: b.name(),
Namespace: b.info.Namespace,
Labels: map[string]string{
"app": b.name(),
appLabel: b.name(),
},
},
}
Expand All @@ -392,7 +398,7 @@ func (b *builder) clusterRoleBinding(ck ConfKind, mono bool) *rbacv1.ClusterRole
ObjectMeta: metav1.ObjectMeta{
Name: rbName,
Labels: map[string]string{
"app": b.name(),
appLabel: b.name(),
},
},
RoleRef: rbacv1.RoleRef{
Expand Down Expand Up @@ -487,7 +493,7 @@ func (b *builder) prometheusRule() *monitoringv1.PrometheusRule {
For: &d,
Labels: map[string]string{
"severity": "warning",
"app": "netobserv",
appLabel: "netobserv",
},
})
}
Expand All @@ -504,7 +510,7 @@ func (b *builder) prometheusRule() *monitoringv1.PrometheusRule {
For: &d,
Labels: map[string]string{
"severity": "warning",
"app": "netobserv",
appLabel: "netobserv",
},
})
}
Expand Down
6 changes: 3 additions & 3 deletions controllers/flp/flp_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,16 +237,16 @@ func reconcileMonitoringCerts(ctx context.Context, info *reconcilers.Common, tls
return nil
}

func reconcileLokiRoles(ctx context.Context, r *reconcilers.Common, b *builder) error {
roles := loki.ClusterRoles(b.desired.Loki.Mode)
func ReconcileLokiRoles(ctx context.Context, r *reconcilers.Common, spec *flowslatest.FlowCollectorSpec, appName, saName, saNamespace string) error {
roles := loki.ClusterRoles(spec.Loki.Mode)
if len(roles) > 0 {
for i := range roles {
if err := r.ReconcileClusterRole(ctx, &roles[i]); err != nil {
return err
}
}
// Binding
crb := loki.ClusterRoleBinding(b.name(), b.name(), b.info.Namespace)
crb := loki.ClusterRoleBinding(appName, saName, saNamespace)
if err := r.ReconcileClusterRoleBinding(ctx, crb); err != nil {
return err
}
Expand Down
30 changes: 20 additions & 10 deletions controllers/flp/flp_monolith_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,23 +109,26 @@ func (r *monolithReconciler) reconcile(ctx context.Context, desired *flowslatest
return err
}

err = r.reconcileCertificates(ctx, desired, annotations)
if err != nil {
return err
}

return r.reconcileDaemonSet(ctx, builder.daemonSet(annotations))
}

func (r *monolithReconciler) reconcileCertificates(ctx context.Context, desired *flowslatest.FlowCollector, annotations map[string]string) error {
// Watch for Loki certificate if necessary; we'll ignore in that case the returned digest, as we don't need to restart pods on cert rotation
// because certificate is always reloaded from file
if _, err = r.Watcher.ProcessCACert(ctx, r.Client, &r.Loki.TLS, r.Namespace); err != nil {
if _, err := r.Watcher.ProcessCACert(ctx, r.Client, &r.Loki.TLS, r.Namespace); err != nil {
return err
}

// Watch for Kafka exporter certificate if necessary; need to restart pods in case of cert rotation
if err = annotateKafkaExporterCerts(ctx, r.Common, desired.Spec.Exporters, annotations); err != nil {
if err := annotateKafkaExporterCerts(ctx, r.Common, desired.Spec.Exporters, annotations); err != nil {
return err
}

// Watch for monitoring caCert
if err = reconcileMonitoringCerts(ctx, r.Common, &desired.Spec.Processor.Metrics.Server.TLS, r.Namespace); err != nil {
return err
}

return r.reconcileDaemonSet(ctx, builder.daemonSet(annotations))
return reconcileMonitoringCerts(ctx, r.Common, &desired.Spec.Processor.Metrics.Server.TLS, r.Namespace)
}

func (r *monolithReconciler) reconcilePrometheusService(ctx context.Context, builder *monolithBuilder) error {
Expand Down Expand Up @@ -185,5 +188,12 @@ func (r *monolithReconciler) reconcilePermissions(ctx context.Context, builder *
}
}

return reconcileLokiRoles(ctx, r.Common, &builder.generic)
return ReconcileLokiRoles(
ctx,
r.Common,
builder.generic.desired,
builder.generic.name(),
builder.generic.name(),
r.Common.Namespace,
)
}
9 changes: 8 additions & 1 deletion controllers/flp/flp_transfo_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,5 +202,12 @@ func (r *transformerReconciler) reconcilePermissions(ctx context.Context, builde
return err
}

return reconcileLokiRoles(ctx, r.Common, &builder.generic)
return ReconcileLokiRoles(
ctx,
r.Common,
builder.generic.desired,
builder.generic.name(),
builder.generic.name(),
r.Common.Namespace,
)
}
Loading

0 comments on commit 4fa2940

Please sign in to comment.