Skip to content

Commit

Permalink
NETOBSERV-765 Add console metrics service
Browse files Browse the repository at this point in the history
Some refactoring to normalize services reconciliation
  • Loading branch information
jotak committed Mar 29, 2023
1 parent 51493cf commit fd54c56
Show file tree
Hide file tree
Showing 13 changed files with 125 additions and 148 deletions.
2 changes: 1 addition & 1 deletion api/v1beta1/flowcollector_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,7 @@ type FlowCollectorConsolePlugin struct {
//+kubebuilder:validation:Minimum=1
//+kubebuilder:validation:Maximum=65535
//+kubebuilder:default:=9001
// port is the plugin service port
// port is the plugin service port. Do not use 9002, which is reserved for metrics.
Port int32 `json:"port,omitempty"`

//+kubebuilder:validation:Enum=IfNotPresent;Always;Never
Expand Down
74 changes: 47 additions & 27 deletions controllers/consoleplugin/consoleplugin_objects.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
corev1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"

flowslatest "github.com/netobserv/network-observability-operator/api/v1beta1"
"github.com/netobserv/network-observability-operator/controllers/constants"
Expand All @@ -32,6 +33,9 @@ const configVolume = "config-volume"
const configPath = "/opt/app-root/"
const lokiCerts = "loki-certs"
const tokensPath = "/var/run/secrets/tokens/"
const metricsSvcName = constants.PluginName + "-metrics"
const metricsPort = 9002
const metricsPortName = "metrics"

type builder struct {
namespace string
Expand Down Expand Up @@ -96,7 +100,7 @@ func (b *builder) serviceMonitor() *monitoringv1.ServiceMonitor {
Spec: monitoringv1.ServiceMonitorSpec{
Endpoints: []monitoringv1.Endpoint{
{
Port: "main",
Port: metricsPortName,
Interval: "15s",
Scheme: "https",
TLSConfig: &monitoringv1.TLSConfig{
Expand Down Expand Up @@ -289,34 +293,50 @@ func (b *builder) autoScaler() *ascv2.HorizontalPodAutoscaler {
}
}

func (b *builder) service(old *corev1.Service) *corev1.Service {
if old == nil {
return &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: constants.PluginName,
Namespace: b.namespace,
Labels: b.labels,
Annotations: map[string]string{
constants.OpenShiftCertificateAnnotation: "console-serving-cert",
},
},
Spec: corev1.ServiceSpec{
Selector: b.selector,
Ports: []corev1.ServicePort{{
Port: b.desired.ConsolePlugin.Port,
Protocol: "TCP",
Name: "main",
}},
func (b *builder) mainService() *corev1.Service {
return &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: constants.PluginName,
Namespace: b.namespace,
Labels: b.labels,
Annotations: map[string]string{
constants.OpenShiftCertificateAnnotation: "console-serving-cert",
},
}
},
Spec: corev1.ServiceSpec{
Selector: b.selector,
Ports: []corev1.ServicePort{{
Port: b.desired.ConsolePlugin.Port,
Protocol: corev1.ProtocolTCP,
// Some Kubernetes versions might automatically set TargetPort to Port. We need to
// explicitly set it here so the reconcile loop verifies that the owned service
// is equal as the desired service
TargetPort: intstr.FromInt(int(b.desired.ConsolePlugin.Port)),
}},
},
}
}

func (b *builder) metricsService() *corev1.Service {
return &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: metricsSvcName,
Namespace: b.namespace,
Labels: b.labels,
},
Spec: corev1.ServiceSpec{
Selector: b.selector,
Ports: []corev1.ServicePort{{
Port: metricsPort,
Protocol: corev1.ProtocolTCP,
Name: metricsPortName,
// Some Kubernetes versions might automatically set TargetPort to Port. We need to
// explicitly set it here so the reconcile loop verifies that the owned service
// is equal as the desired service
TargetPort: intstr.FromInt(metricsPort),
}},
},
}
// In case we're updating an existing service, we need to build from the old one to keep immutable fields such as clusterIP
newService := old.DeepCopy()
newService.Spec.Ports = []corev1.ServicePort{{
Port: b.desired.ConsolePlugin.Port,
Protocol: corev1.ProtocolUDP,
}}
return newService
}

// returns a configmap with a digest of its configuration contents, which will be used to
Expand Down
34 changes: 11 additions & 23 deletions controllers/consoleplugin/consoleplugin_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type CPReconciler struct {
type ownedObjects struct {
deployment *appsv1.Deployment
service *corev1.Service
metricsService *corev1.Service
hpa *ascv2.HorizontalPodAutoscaler
serviceAccount *corev1.ServiceAccount
configMap *corev1.ConfigMap
Expand All @@ -46,6 +47,7 @@ func NewReconciler(cl reconcilers.ClientHelper, ns, prevNS, imageName string, av
owned := ownedObjects{
deployment: &appsv1.Deployment{},
service: &corev1.Service{},
metricsService: &corev1.Service{},
hpa: &ascv2.HorizontalPodAutoscaler{},
serviceAccount: &corev1.ServiceAccount{},
configMap: &corev1.ConfigMap{},
Expand All @@ -54,6 +56,7 @@ func NewReconciler(cl reconcilers.ClientHelper, ns, prevNS, imageName string, av
nobjMngr := reconcilers.NewNamespacedObjectManager(cl, ns, prevNS)
nobjMngr.AddManagedObject(constants.PluginName, owned.deployment)
nobjMngr.AddManagedObject(constants.PluginName, owned.service)
nobjMngr.AddManagedObject(metricsSvcName, owned.metricsService)
nobjMngr.AddManagedObject(constants.PluginName, owned.hpa)
nobjMngr.AddManagedObject(constants.PluginName, owned.serviceAccount)
nobjMngr.AddManagedObject(configMapName, owned.configMap)
Expand Down Expand Up @@ -102,7 +105,7 @@ func (r *CPReconciler) Reconcile(ctx context.Context, desired *flowslatest.FlowC
return err
}

if err = r.reconcileService(ctx, builder, &desired.Spec); err != nil {
if err = r.reconcileServices(ctx, builder, &desired.Spec); err != nil {
return err
}

Expand Down Expand Up @@ -214,20 +217,15 @@ func (r *CPReconciler) reconcileDeployment(ctx context.Context, builder builder,
return nil
}

func (r *CPReconciler) reconcileService(ctx context.Context, builder builder, desired *flowslatest.FlowCollectorSpec) error {
report := helper.NewChangeReport("Console service")
func (r *CPReconciler) reconcileServices(ctx context.Context, builder builder, desired *flowslatest.FlowCollectorSpec) error {
report := helper.NewChangeReport("Console services")
defer report.LogIfNeeded(ctx)

if !r.nobjMngr.Exists(r.owned.service) {
newSVC := builder.service(nil)
if err := r.CreateOwned(ctx, newSVC); err != nil {
return err
}
} else if serviceNeedsUpdate(r.owned.service, &desired.ConsolePlugin, &report) {
newSVC := builder.service(r.owned.service)
if err := r.UpdateOwned(ctx, r.owned.service, newSVC); err != nil {
return err
}
if err := reconcilers.ReconcileService(ctx, r.nobjMngr, &r.ClientHelper, r.owned.service, builder.mainService(), &report); err != nil {
return err
}
if err := reconcilers.ReconcileService(ctx, r.nobjMngr, &r.ClientHelper, r.owned.metricsService, builder.metricsService(), &report); err != nil {
return err
}
if r.availableAPIs.HasSvcMonitor() {
serviceMonitor := builder.serviceMonitor()
Expand Down Expand Up @@ -277,13 +275,3 @@ func statusURL(loki *flowslatest.FlowCollectorLoki) string {
}
return querierURL(loki)
}

func serviceNeedsUpdate(svc *corev1.Service, desired *pluginSpec, report *helper.ChangeReport) bool {
for _, port := range svc.Spec.Ports {
if port.Port == desired.Port && port.Protocol == "TCP" {
return false
}
}
report.Add("Port changed")
return true
}
28 changes: 15 additions & 13 deletions controllers/consoleplugin/consoleplugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func getPluginConfig() flowslatest.FlowCollectorConsolePlugin {
}
}

func getServiceSpecs() (corev1.Service, flowslatest.FlowCollectorConsolePlugin) {
func getServiceSpecs() corev1.Service {
var service = corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Namespace: testNamespace,
Expand All @@ -69,7 +69,7 @@ func getServiceSpecs() (corev1.Service, flowslatest.FlowCollectorConsolePlugin)
},
}

return service, getPluginConfig()
return service
}

var minReplicas = int32(1)
Expand Down Expand Up @@ -192,26 +192,27 @@ func TestContainerUpdateCheck(t *testing.T) {

func TestServiceUpdateCheck(t *testing.T) {
assert := assert.New(t)
old := getServiceSpecs()

//equals specs
serviceSpec, containerConfig := getServiceSpecs()
serviceSpec := getServiceSpecs()
report := helper.NewChangeReport("")
assert.Equal(serviceNeedsUpdate(&serviceSpec, &containerConfig, &report), false)
assert.Equal(helper.ServiceChanged(&old, &serviceSpec, &report), false)
assert.Contains(report.String(), "no change")

//wrong port protocol
serviceSpec, containerConfig = getServiceSpecs()
serviceSpec = getServiceSpecs()
serviceSpec.Spec.Ports[0].Protocol = "UDP"
report = helper.NewChangeReport("")
assert.Equal(serviceNeedsUpdate(&serviceSpec, &containerConfig, &report), true)
assert.Contains(report.String(), "Port change")
assert.Equal(helper.ServiceChanged(&old, &serviceSpec, &report), true)
assert.Contains(report.String(), "Service spec changed")

//wrong port number
serviceSpec, containerConfig = getServiceSpecs()
serviceSpec = getServiceSpecs()
serviceSpec.Spec.Ports[0].Port = 8080
report = helper.NewChangeReport("")
assert.Equal(serviceNeedsUpdate(&serviceSpec, &containerConfig, &report), true)
assert.Contains(report.String(), "Port change")
assert.Equal(helper.ServiceChanged(&old, &serviceSpec, &report), true)
assert.Contains(report.String(), "Service spec changed")
}

func TestBuiltService(t *testing.T) {
Expand All @@ -222,9 +223,10 @@ func TestBuiltService(t *testing.T) {
loki := flowslatest.FlowCollectorLoki{URL: "http://foo:1234"}
spec := flowslatest.FlowCollectorSpec{ConsolePlugin: plugin, Loki: loki}
builder := newBuilder(testNamespace, testImage, &spec, &certWatcher)
newService := builder.service(nil)
old := builder.mainService()
new := builder.mainService()
report := helper.NewChangeReport("")
assert.Equal(serviceNeedsUpdate(newService, &plugin, &report), false)
assert.Equal(helper.ServiceChanged(old, new, &report), false)
assert.Contains(report.String(), "no change")
}

Expand All @@ -244,7 +246,7 @@ func TestLabels(t *testing.T) {
assert.Equal("dev", depl.Spec.Template.Labels["version"])

// Service
svc := builder.service(nil)
svc := builder.mainService()
assert.Equal("netobserv-plugin", svc.Labels["app"])
assert.Equal("netobserv-plugin", svc.Spec.Selector["app"])
assert.Equal("dev", svc.Labels["version"])
Expand Down
45 changes: 17 additions & 28 deletions controllers/flowlogspipeline/flp_common_objects.go
Original file line number Diff line number Diff line change
Expand Up @@ -615,43 +615,32 @@ func (b *builder) configMap(stages []config.Stage, parameters []config.StagePara
return &configMap, digest, nil
}

func (b *builder) newPromService() *corev1.Service {
service := corev1.Service{
func (b *builder) promService() *corev1.Service {
svc := corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: b.promServiceName(),
Namespace: b.namespace,
Labels: b.labels,
},
Spec: corev1.ServiceSpec{Selector: b.selector},
Spec: corev1.ServiceSpec{
Selector: b.selector,
Ports: []corev1.ServicePort{{
Name: prometheusServiceName,
Port: b.desired.Processor.Metrics.Server.Port,
Protocol: corev1.ProtocolTCP,
// Some Kubernetes versions might automatically set TargetPort to Port. We need to
// explicitly set it here so the reconcile loop verifies that the owned service
// is equal as the desired service
TargetPort: intstr.FromInt(int(b.desired.Processor.Metrics.Server.Port)),
}},
},
}
b.fillPromService(&service)
return &service
}

func (b *builder) fromPromService(old *corev1.Service) *corev1.Service {
svc := old.DeepCopy()
b.fillPromService(svc)
return svc
}

func (b *builder) fillPromService(svc *corev1.Service) {
svc.Spec.Ports = []corev1.ServicePort{{
Name: prometheusServiceName,
Port: b.desired.Processor.Metrics.Server.Port,
Protocol: corev1.ProtocolTCP,
// Some Kubernetes versions might automatically set TargetPort to Port. We need to
// explicitly set it here so the reconcile loop verifies that the owned service
// is equal as the desired service
TargetPort: intstr.FromInt(int(b.desired.Processor.Metrics.Server.Port)),
}}
if b.desired.Processor.Metrics.Server.TLS.Type == flowslatest.ServerTLSAuto {
if svc.ObjectMeta.Annotations == nil {
svc.ObjectMeta.Annotations = map[string]string{}
svc.ObjectMeta.Annotations = map[string]string{
constants.OpenShiftCertificateAnnotation: b.promServiceName(),
}
svc.ObjectMeta.Annotations[constants.OpenShiftCertificateAnnotation] = b.promServiceName()
} else if svc.ObjectMeta.Annotations != nil {
delete(svc.ObjectMeta.Annotations, constants.OpenShiftCertificateAnnotation)
}
return &svc
}

func (b *builder) serviceAccount() *corev1.ServiceAccount {
Expand Down
8 changes: 2 additions & 6 deletions controllers/flowlogspipeline/flp_ingest_objects.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,8 @@ func (b *ingestBuilder) buildPipelineConfig() ([]config.Stage, []config.StagePar
return pipeline.GetStages(), pipeline.GetStageParams(), nil
}

func (b *ingestBuilder) newPromService() *corev1.Service {
return b.generic.newPromService()
}

func (b *ingestBuilder) fromPromService(old *corev1.Service) *corev1.Service {
return b.generic.fromPromService(old)
func (b *ingestBuilder) promService() *corev1.Service {
return b.generic.promService()
}

func buildClusterRoleIngester(useOpenShiftSCC bool) *rbacv1.ClusterRole {
Expand Down
13 changes: 2 additions & 11 deletions controllers/flowlogspipeline/flp_ingest_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,17 +116,8 @@ func (r *flpIngesterReconciler) reconcilePrometheusService(ctx context.Context,
report := helper.NewChangeReport("FLP prometheus service")
defer report.LogIfNeeded(ctx)

if !r.nobjMngr.Exists(r.owned.promService) {
if err := r.CreateOwned(ctx, builder.newPromService()); err != nil {
return err
}
} else {
newSVC := builder.fromPromService(r.owned.promService)
if helper.ServiceChanged(r.owned.promService, newSVC, &report) {
if err := r.UpdateOwned(ctx, r.owned.promService, newSVC); err != nil {
return err
}
}
if err := reconcilers.ReconcileService(ctx, r.nobjMngr, &r.ClientHelper, r.owned.promService, builder.promService(), &report); err != nil {
return err
}
if r.availableAPIs.HasSvcMonitor() {
serviceMonitor := builder.generic.serviceMonitor()
Expand Down
8 changes: 2 additions & 6 deletions controllers/flowlogspipeline/flp_monolith_objects.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,8 @@ func (b *monolithBuilder) buildPipelineConfig() ([]config.Stage, []config.StageP
return pipeline.GetStages(), pipeline.GetStageParams(), dashboardConfigMap, nil
}

func (b *monolithBuilder) newPromService() *corev1.Service {
return b.generic.newPromService()
}

func (b *monolithBuilder) fromPromService(old *corev1.Service) *corev1.Service {
return b.generic.fromPromService(old)
func (b *monolithBuilder) promService() *corev1.Service {
return b.generic.promService()
}

func (b *monolithBuilder) serviceAccount() *corev1.ServiceAccount {
Expand Down
13 changes: 2 additions & 11 deletions controllers/flowlogspipeline/flp_monolith_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,17 +124,8 @@ func (r *flpMonolithReconciler) reconcilePrometheusService(ctx context.Context,
report := helper.NewChangeReport("FLP prometheus service")
defer report.LogIfNeeded(ctx)

if !r.nobjMngr.Exists(r.owned.promService) {
if err := r.CreateOwned(ctx, builder.newPromService()); err != nil {
return err
}
} else {
newSVC := builder.fromPromService(r.owned.promService)
if helper.ServiceChanged(r.owned.promService, newSVC, &report) {
if err := r.UpdateOwned(ctx, r.owned.promService, newSVC); err != nil {
return err
}
}
if err := reconcilers.ReconcileService(ctx, r.nobjMngr, &r.ClientHelper, r.owned.promService, builder.promService(), &report); err != nil {
return err
}
if r.availableAPIs.HasSvcMonitor() {
serviceMonitor := builder.generic.serviceMonitor()
Expand Down
Loading

0 comments on commit fd54c56

Please sign in to comment.