Skip to content

Commit

Permalink
Fix a bug where stale endpoints in KubernetesEndpointGroup aren't u…
Browse files Browse the repository at this point in the history
…pdated (#6012)

Motivation:

When a Kubernetes service is updated, a new Pod watcher is created and receives events. The old cache should have been cleared at this point, but it was not removed, so stale endpoints were exposed.

Modifications:

- Clear `podToNode` map when a new Pod watcher is created

Result:

Fixed a bug where stale endpoints remained when a Kubernetes service was updated.
  • Loading branch information
ikhoon authored Dec 3, 2024
1 parent db28939 commit 2134872
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,8 @@ public void eventReceived(Action action, Service service0) {
if (podWatch0 != null) {
podWatch0.close();
}
// Clear the podToNode map before starting a new pod watch.
podToNode.clear();
podWatch0 = watchPod(service0.getSpec().getSelector());
if (closed) {
podWatch0.close();
Expand Down Expand Up @@ -384,6 +386,8 @@ public void eventReceived(Action action, Node node) {
nodeToIp.remove(nodeName);
break;
}
// TODO(ikhoon): Reschedule the update after a certain delay since multiple websocket events
// are updated in a same task.
maybeUpdateEndpoints();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,63 @@ void createEndpointsWithNodeIpAndPort() throws InterruptedException {
});
}

@Test
void clearOldEndpointsWhenServiceIsUpdated() throws InterruptedException {
// Prepare Kubernetes resources
final List<Node> nodes = ImmutableList.of(newNode("1.1.1.1"), newNode("2.2.2.2"), newNode("3.3.3.3"));
final Deployment deployment = newDeployment();
final int nodePort = 30000;
final Service service = newService(nodePort);
final List<Pod> pods = nodes.stream()
.map(node -> node.getMetadata().getName())
.map(nodeName -> newPod(deployment.getSpec().getTemplate(), nodeName))
.collect(toImmutableList());

// Create Kubernetes resources
for (Node node : nodes) {
client.nodes().resource(node).create();
}
client.pods().resource(pods.get(0)).create();
client.pods().resource(pods.get(1)).create();
client.apps().deployments().resource(deployment).create();
client.services().resource(service).create();

final KubernetesEndpointGroup endpointGroup = KubernetesEndpointGroup.of(client, "test",
"nginx-service");
endpointGroup.whenReady().join();

// Initial state
await().untilAsserted(() -> {
final List<Endpoint> endpoints = endpointGroup.endpoints();
// Wait until all endpoints are ready
assertThat(endpoints).containsExactlyInAnyOrder(
Endpoint.of("1.1.1.1", nodePort),
Endpoint.of("2.2.2.2", nodePort)
);
});

// Update service and deployment with new selector
final int newNodePort = 30001;
final String newSelectorName = "nginx-updated";
final Service updatedService = newService(newNodePort, newSelectorName);
client.services().resource(updatedService).update();
final Deployment updatedDeployment = newDeployment(newSelectorName);
client.apps().deployments().resource(updatedDeployment).update();

final List<Pod> updatedPods =
nodes.stream()
.map(node -> node.getMetadata().getName())
.map(nodeName -> newPod(updatedDeployment.getSpec().getTemplate(), nodeName))
.collect(toImmutableList());
client.pods().resource(updatedPods.get(2)).create();
await().untilAsserted(() -> {
final List<Endpoint> endpoints = endpointGroup.endpoints();
assertThat(endpoints).containsExactlyInAnyOrder(
Endpoint.of("3.3.3.3", newNodePort)
);
});
}

@Test
void shouldUsePortNameToGetNodePort() {
final List<Node> nodes = ImmutableList.of(newNode("1.1.1.1"), newNode("2.2.2.2"), newNode("3.3.3.3"));
Expand Down Expand Up @@ -292,6 +349,10 @@ private static Node newNode(String ip) {
}

static Service newService(@Nullable Integer nodePort) {
return newService(nodePort, "nginx");
}

static Service newService(@Nullable Integer nodePort, String selectorName) {
final ObjectMeta metadata = new ObjectMetaBuilder()
.withName("nginx-service")
.build();
Expand All @@ -301,7 +362,7 @@ static Service newService(@Nullable Integer nodePort) {
.build();
final ServiceSpec serviceSpec = new ServiceSpecBuilder()
.withPorts(servicePort)
.withSelector(ImmutableMap.of("app", "nginx"))
.withSelector(ImmutableMap.of("app", selectorName))
.withType("NodePort")
.build();
return new ServiceBuilder()
Expand All @@ -310,27 +371,31 @@ static Service newService(@Nullable Integer nodePort) {
.build();
}

static Deployment newDeployment() {
static Deployment newDeployment(String selectorName) {
final ObjectMeta metadata = new ObjectMetaBuilder()
.withName("nginx-deployment")
.build();
final LabelSelector selector = new LabelSelectorBuilder()
.withMatchLabels(ImmutableMap.of("app", "nginx"))
.withMatchLabels(ImmutableMap.of("app", selectorName))
.build();
final DeploymentSpec deploymentSpec = new DeploymentSpecBuilder()
.withReplicas(4)
.withSelector(selector)
.withTemplate(newPodTemplate())
.withTemplate(newPodTemplate(selectorName))
.build();
return new DeploymentBuilder()
.withMetadata(metadata)
.withSpec(deploymentSpec)
.build();
}

private static PodTemplateSpec newPodTemplate() {
static Deployment newDeployment() {
return newDeployment("nginx");
}

private static PodTemplateSpec newPodTemplate(String selectorName) {
final ObjectMeta metadata = new ObjectMetaBuilder()
.withLabels(ImmutableMap.of("app", "nginx"))
.withLabels(ImmutableMap.of("app", selectorName))
.build();
final Container container = new ContainerBuilder()
.withName("nginx")
Expand Down

0 comments on commit 2134872

Please sign in to comment.