From 62817654ee8cdd12ac9fa682b9e007b97d3ebe80 Mon Sep 17 00:00:00 2001 From: Micah Nagel Date: Mon, 22 Apr 2024 12:28:04 -0600 Subject: [PATCH 1/3] chore: add debug logging to endpointslice watch --- .../controllers/network/generators/kubeAPI.ts | 59 +++++++++++++------ src/pepr/operator/index.ts | 2 +- 2 files changed, 41 insertions(+), 20 deletions(-) diff --git a/src/pepr/operator/controllers/network/generators/kubeAPI.ts b/src/pepr/operator/controllers/network/generators/kubeAPI.ts index 7bb8d17c4..fdc48b508 100644 --- a/src/pepr/operator/controllers/network/generators/kubeAPI.ts +++ b/src/pepr/operator/controllers/network/generators/kubeAPI.ts @@ -1,5 +1,5 @@ import { V1NetworkPolicyPeer } from "@kubernetes/client-node"; -import { K8s, Log, R, kind } from "pepr"; +import { K8s, kind, Log, R } from "pepr"; import { RemoteGenerated } from "../../../crd"; import { anywhere } from "./anywhere"; @@ -36,8 +36,16 @@ export function kubeAPI() { * @param slice The EndpointSlice for the API server */ export async function updateAPIServerCIDRFromEndpointSlice(slice: kind.EndpointSlice) { - const svc = await K8s(kind.Service).InNamespace("default").Get("kubernetes"); - await updateAPIServerCIDR(slice, svc); + try { + Log.debug( + "Processing watch for endpointslices, getting k8s service for updating API server CIDR", + ); + const svc = await K8s(kind.Service).InNamespace("default").Get("kubernetes"); + await updateAPIServerCIDR(slice, svc); + } catch (err) { + const msg = "Failed to update network policies from endpoint slice watch"; + Log.error({ err }, msg); + } } /** @@ -45,8 +53,16 @@ export async function updateAPIServerCIDRFromEndpointSlice(slice: kind.EndpointS * @param svc The Service for the API server */ export async function updateAPIServerCIDRFromService(svc: kind.Service) { - const slice = await K8s(kind.EndpointSlice).InNamespace("default").Get("kubernetes"); - await updateAPIServerCIDR(slice, svc); + try { + Log.debug( + "Processing watch for api service, getting endpoint slices for updating API server CIDR", + ); + const slice = await K8s(kind.EndpointSlice).InNamespace("default").Get("kubernetes"); + await updateAPIServerCIDR(slice, svc); + } catch (err) { + const msg = "Failed to update network policies from api service watch"; + Log.error({ err }, msg); + } } /** @@ -74,24 +90,29 @@ export async function updateAPIServerCIDR(slice: kind.EndpointSlice, svc: kind.S }, })); - // Get all the KubeAPI NetworkPolicies - const netPols = await K8s(kind.NetworkPolicy) - .WithLabel("uds.dev/generated", RemoteGenerated.KubeAPI) - .Get(); + try { + // Get all the KubeAPI NetworkPolicies + const netPols = await K8s(kind.NetworkPolicy) + .WithLabel("uds.dev/generated", RemoteGenerated.KubeAPI) + .Get(); - for (const netPol of netPols.items) { - // Get the old peers - const oldPeers = netPol.spec?.egress?.[0].to; + for (const netPol of netPols.items) { + // Get the old peers + const oldPeers = netPol.spec?.egress?.[0].to; - // Update the NetworkPolicy if the peers have changed - if (!R.equals(oldPeers, apiServerPeers)) { - // Note using the apiServerPeers variable here instead of the oldPeers variable - // in case another EndpointSlice is updated before this one - netPol.spec!.egress![0].to = apiServerPeers; + // Update the NetworkPolicy if the peers have changed + if (!R.equals(oldPeers, apiServerPeers)) { + // Note using the apiServerPeers variable here instead of the oldPeers variable + // in case another EndpointSlice is updated before this one + netPol.spec!.egress![0].to = apiServerPeers; - Log.debug(`Updating ${netPol.metadata!.namespace}/${netPol.metadata!.name}`); - await K8s(kind.NetworkPolicy).Apply(netPol); + Log.debug(`Updating ${netPol.metadata!.namespace}/${netPol.metadata!.name}`); + await K8s(kind.NetworkPolicy).Apply(netPol); + } } + } catch (err) { + const msg = "Failed to update network policies with new API CIDRs"; + Log.error({ err }, msg); } } } diff --git a/src/pepr/operator/index.ts b/src/pepr/operator/index.ts index c7b3e6938..6ad88fb9c 100644 --- a/src/pepr/operator/index.ts +++ b/src/pepr/operator/index.ts @@ -33,7 +33,7 @@ When(a.EndpointSlice) .IsCreatedOrUpdated() .InNamespace("default") .WithName("kubernetes") - .Watch(updateAPIServerCIDRFromEndpointSlice); + .Reconcile(updateAPIServerCIDRFromEndpointSlice); // Watch for changes to the API server Service and update the API server CIDR When(a.Service) From 7204345c8415f036ba259f9cfb89f9d58f33e5fd Mon Sep 17 00:00:00 2001 From: Micah Nagel Date: Mon, 22 Apr 2024 12:49:32 -0600 Subject: [PATCH 2/3] fix: redundant catch --- .../controllers/network/generators/kubeAPI.ts | 60 +++++++++++-------- 1 file changed, 34 insertions(+), 26 deletions(-) diff --git a/src/pepr/operator/controllers/network/generators/kubeAPI.ts b/src/pepr/operator/controllers/network/generators/kubeAPI.ts index fdc48b508..7cc794d18 100644 --- a/src/pepr/operator/controllers/network/generators/kubeAPI.ts +++ b/src/pepr/operator/controllers/network/generators/kubeAPI.ts @@ -11,7 +11,9 @@ let apiServerPeers: V1NetworkPolicyPeer[]; * Initialize the API server CIDR by getting the EndpointSlice and Service for the API server */ export async function initAPIServerCIDR() { - const slice = await K8s(kind.EndpointSlice).InNamespace("default").Get("kubernetes"); + const slice = await K8s(kind.EndpointSlice).InNamespace("default").Get( + "kubernetes", + ); const svc = await K8s(kind.Service).InNamespace("default").Get("kubernetes"); await updateAPIServerCIDR(slice, svc); } @@ -35,12 +37,16 @@ export function kubeAPI() { * When the kubernetes EndpointSlice is created or updated, update the API server CIDR * @param slice The EndpointSlice for the API server */ -export async function updateAPIServerCIDRFromEndpointSlice(slice: kind.EndpointSlice) { +export async function updateAPIServerCIDRFromEndpointSlice( + slice: kind.EndpointSlice, +) { try { Log.debug( "Processing watch for endpointslices, getting k8s service for updating API server CIDR", ); - const svc = await K8s(kind.Service).InNamespace("default").Get("kubernetes"); + const svc = await K8s(kind.Service).InNamespace("default").Get( + "kubernetes", + ); await updateAPIServerCIDR(slice, svc); } catch (err) { const msg = "Failed to update network policies from endpoint slice watch"; @@ -57,7 +63,9 @@ export async function updateAPIServerCIDRFromService(svc: kind.Service) { Log.debug( "Processing watch for api service, getting endpoint slices for updating API server CIDR", ); - const slice = await K8s(kind.EndpointSlice).InNamespace("default").Get("kubernetes"); + const slice = await K8s(kind.EndpointSlice).InNamespace("default").Get( + "kubernetes", + ); await updateAPIServerCIDR(slice, svc); } catch (err) { const msg = "Failed to update network policies from api service watch"; @@ -71,12 +79,15 @@ export async function updateAPIServerCIDRFromService(svc: kind.Service) { * @param slice The EndpointSlice for the API server * @param svc The Service for the API server */ -export async function updateAPIServerCIDR(slice: kind.EndpointSlice, svc: kind.Service) { +export async function updateAPIServerCIDR( + slice: kind.EndpointSlice, + svc: kind.Service, +) { const { endpoints } = slice; const k8sApiIP = svc.spec?.clusterIP; // Flatten the endpoints into a list of IPs - const peers = endpoints?.flatMap(e => e.addresses); + const peers = endpoints?.flatMap((e) => e.addresses); if (k8sApiIP) { peers?.push(k8sApiIP); @@ -84,35 +95,32 @@ export async function updateAPIServerCIDR(slice: kind.EndpointSlice, svc: kind.S // If the peers are found, cache and process them if (peers?.length) { - apiServerPeers = peers.flatMap(ip => ({ + apiServerPeers = peers.flatMap((ip) => ({ ipBlock: { cidr: `${ip}/32`, }, })); - try { - // Get all the KubeAPI NetworkPolicies - const netPols = await K8s(kind.NetworkPolicy) - .WithLabel("uds.dev/generated", RemoteGenerated.KubeAPI) - .Get(); + // Get all the KubeAPI NetworkPolicies + const netPols = await K8s(kind.NetworkPolicy) + .WithLabel("uds.dev/generated", RemoteGenerated.KubeAPI) + .Get(); - for (const netPol of netPols.items) { - // Get the old peers - const oldPeers = netPol.spec?.egress?.[0].to; + for (const netPol of netPols.items) { + // Get the old peers + const oldPeers = netPol.spec?.egress?.[0].to; - // Update the NetworkPolicy if the peers have changed - if (!R.equals(oldPeers, apiServerPeers)) { - // Note using the apiServerPeers variable here instead of the oldPeers variable - // in case another EndpointSlice is updated before this one - netPol.spec!.egress![0].to = apiServerPeers; + // Update the NetworkPolicy if the peers have changed + if (!R.equals(oldPeers, apiServerPeers)) { + // Note using the apiServerPeers variable here instead of the oldPeers variable + // in case another EndpointSlice is updated before this one + netPol.spec!.egress![0].to = apiServerPeers; - Log.debug(`Updating ${netPol.metadata!.namespace}/${netPol.metadata!.name}`); - await K8s(kind.NetworkPolicy).Apply(netPol); - } + Log.debug( + `Updating ${netPol.metadata!.namespace}/${netPol.metadata!.name}`, + ); + await K8s(kind.NetworkPolicy).Apply(netPol); } - } catch (err) { - const msg = "Failed to update network policies with new API CIDRs"; - Log.error({ err }, msg); } } } From e5b45602fc0078a8de53c63ccba118d5430b456c Mon Sep 17 00:00:00 2001 From: Micah Nagel Date: Mon, 22 Apr 2024 14:05:08 -0600 Subject: [PATCH 3/3] chore: format --- .../controllers/network/generators/kubeAPI.ts | 29 +++++-------------- 1 file changed, 8 insertions(+), 21 deletions(-) diff --git a/src/pepr/operator/controllers/network/generators/kubeAPI.ts b/src/pepr/operator/controllers/network/generators/kubeAPI.ts index 7cc794d18..8451ffa2c 100644 --- a/src/pepr/operator/controllers/network/generators/kubeAPI.ts +++ b/src/pepr/operator/controllers/network/generators/kubeAPI.ts @@ -11,9 +11,7 @@ let apiServerPeers: V1NetworkPolicyPeer[]; * Initialize the API server CIDR by getting the EndpointSlice and Service for the API server */ export async function initAPIServerCIDR() { - const slice = await K8s(kind.EndpointSlice).InNamespace("default").Get( - "kubernetes", - ); + const slice = await K8s(kind.EndpointSlice).InNamespace("default").Get("kubernetes"); const svc = await K8s(kind.Service).InNamespace("default").Get("kubernetes"); await updateAPIServerCIDR(slice, svc); } @@ -37,16 +35,12 @@ export function kubeAPI() { * When the kubernetes EndpointSlice is created or updated, update the API server CIDR * @param slice The EndpointSlice for the API server */ -export async function updateAPIServerCIDRFromEndpointSlice( - slice: kind.EndpointSlice, -) { +export async function updateAPIServerCIDRFromEndpointSlice(slice: kind.EndpointSlice) { try { Log.debug( "Processing watch for endpointslices, getting k8s service for updating API server CIDR", ); - const svc = await K8s(kind.Service).InNamespace("default").Get( - "kubernetes", - ); + const svc = await K8s(kind.Service).InNamespace("default").Get("kubernetes"); await updateAPIServerCIDR(slice, svc); } catch (err) { const msg = "Failed to update network policies from endpoint slice watch"; @@ -63,9 +57,7 @@ export async function updateAPIServerCIDRFromService(svc: kind.Service) { Log.debug( "Processing watch for api service, getting endpoint slices for updating API server CIDR", ); - const slice = await K8s(kind.EndpointSlice).InNamespace("default").Get( - "kubernetes", - ); + const slice = await K8s(kind.EndpointSlice).InNamespace("default").Get("kubernetes"); await updateAPIServerCIDR(slice, svc); } catch (err) { const msg = "Failed to update network policies from api service watch"; @@ -79,15 +71,12 @@ export async function updateAPIServerCIDRFromService(svc: kind.Service) { * @param slice The EndpointSlice for the API server * @param svc The Service for the API server */ -export async function updateAPIServerCIDR( - slice: kind.EndpointSlice, - svc: kind.Service, -) { +export async function updateAPIServerCIDR(slice: kind.EndpointSlice, svc: kind.Service) { const { endpoints } = slice; const k8sApiIP = svc.spec?.clusterIP; // Flatten the endpoints into a list of IPs - const peers = endpoints?.flatMap((e) => e.addresses); + const peers = endpoints?.flatMap(e => e.addresses); if (k8sApiIP) { peers?.push(k8sApiIP); @@ -95,7 +84,7 @@ export async function updateAPIServerCIDR( // If the peers are found, cache and process them if (peers?.length) { - apiServerPeers = peers.flatMap((ip) => ({ + apiServerPeers = peers.flatMap(ip => ({ ipBlock: { cidr: `${ip}/32`, }, @@ -116,9 +105,7 @@ export async function updateAPIServerCIDR( // in case another EndpointSlice is updated before this one netPol.spec!.egress![0].to = apiServerPeers; - Log.debug( - `Updating ${netPol.metadata!.namespace}/${netPol.metadata!.name}`, - ); + Log.debug(`Updating ${netPol.metadata!.namespace}/${netPol.metadata!.name}`); await K8s(kind.NetworkPolicy).Apply(netPol); } }