Skip to content

Commit

Permalink
feature: backend service watching if k8s services exist
Browse files Browse the repository at this point in the history
  • Loading branch information
aureamunoz committed Nov 11, 2022
1 parent bdab1e0 commit e2fa441
Show file tree
Hide file tree
Showing 5 changed files with 334 additions and 2 deletions.
10 changes: 8 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,12 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<quarkus.platform.artifact-id>quarkus-bom</quarkus.platform.artifact-id>
<quarkus.platform.group-id>io.quarkus.platform</quarkus.platform.group-id>
<quarkus.platform.version>2.13.2.Final</quarkus.platform.version>
<quarkus.platform.group-id>io.quarkus</quarkus.platform.group-id>
<quarkus.platform.version>999-SNAPSHOT</quarkus.platform.version>
<htmlunit.version>2.66.0</htmlunit.version>
<skipITs>true</skipITs>
<surefire-plugin.version>3.0.0-M7</surefire-plugin.version>
<assertj.version>3.23.1</assertj.version>
</properties>
<dependencyManagement>
<dependencies>
Expand All @@ -56,6 +57,11 @@
<version>${htmlunit.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>${assertj.version}</version>
</dependency>
</dependencies>
</dependencyManagement>

Expand Down
30 changes: 30 additions & 0 deletions servicebox-app/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,32 @@
<artifactId>quarkus-webjars-locator</artifactId>
</dependency>

<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-smallrye-stork</artifactId>
</dependency>
<dependency>
<groupId>io.smallrye.stork</groupId>
<artifactId>stork-service-discovery-kubernetes</artifactId>
</dependency>
<dependency>
<groupId>io.smallrye.stork</groupId>
<artifactId>stork-load-balancer-random</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-kubernetes</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-kubernetes-client</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-test-kubernetes-client</artifactId>
<scope>test</scope>
</dependency>

<!-- Webjars -->
<dependency>
<groupId>org.webjars</groupId>
Expand Down Expand Up @@ -70,6 +96,10 @@
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-junit5</artifactId>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
Expand Down
26 changes: 26 additions & 0 deletions servicebox-app/src/main/java/io/halkyon/model/ProtocolAndPort.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package io.halkyon.model;

/**
* Structure representing a protocol:port endpoint.
*/
public class ProtocolAndPort {
/**
* The protocol.
*/
public final String protocol;
/**
* The port.
*/
public final int port;

/**
* Creates a new HostAndPort
*
* @param protocol the host
* @param port the port
*/
public ProtocolAndPort(String protocol, int port) {
this.protocol = protocol;
this.port = port;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package io.halkyon.services;

import io.halkyon.model.Claim;
import io.halkyon.model.ProtocolAndPort;
import io.halkyon.model.Service;
import io.quarkus.scheduler.Scheduled;
import io.smallrye.mutiny.Uni;
import io.smallrye.stork.Stork;
import io.smallrye.stork.api.Metadata;
import io.smallrye.stork.api.MetadataKey;
import io.smallrye.stork.api.ServiceDefinition;
import io.smallrye.stork.api.ServiceInstance;
import io.smallrye.stork.servicediscovery.kubernetes.KubernetesConfiguration;
import io.smallrye.stork.servicediscovery.kubernetes.KubernetesMetadataKey;
import org.eclipse.microprofile.config.inject.ConfigProperty;

import javax.enterprise.context.ApplicationScoped;
import javax.transaction.Transactional;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;

import static com.fasterxml.jackson.databind.type.LogicalType.Collection;
import static io.smallrye.mutiny.operators.uni.UniBlockingAwait.await;
import static io.smallrye.stork.servicediscovery.kubernetes.KubernetesMetadataKey.META_K8S_SERVICE_ID;
import static java.util.Optional.ofNullable;

/**
* The claiming service will poll new or pending claims and try to find an available service.
* A claim can request a service in the form of `<service name>:<service version>`, for example: "mysql:3.6".
* If there is an available service that matches the criteria of service name, plus service version, this service will
* be linked to the claim and the claim status will change to "claimed". Otherwise, the status will be "pending".
* After a number of attempts have been made to find a suitable service, the claim status will change to "error".
*/
@ApplicationScoped
public class ServiceDiscoveryJob {


/**
* This method will be executed at every `${servicebox.claiming-service.poll-every}`.
* First, it will collect the list of all available services, and then will loop over the new or pending claims to link
* the service if the criteria matches.
*/
@Transactional
@Scheduled(every="${servicebox.claiming-service.poll-every}", concurrentExecution = Scheduled.ConcurrentExecution.SKIP)
public void execute() {
List<Service> services = Service.listAll();
Stork stork = Stork.getInstance();
for (Service service : services) {
KubernetesConfiguration kubernetesConfiguration = new KubernetesConfiguration().withApplication(service.name);
stork.defineIfAbsent(service.name, ServiceDefinition.of(kubernetesConfiguration));
io.smallrye.stork.api.Service storkService = stork.getService(service.name);

List<ServiceInstance> instances = storkService.getServiceDiscovery().getServiceInstances().await().indefinitely();
ProtocolAndPort protocolAndPort = parseToProtocolAndPort(service.endpoint);
ServiceInstance matching = findMatching(instances, protocolAndPort);
if (matching == null) {
Service.deleteById(service.id);
}
}
}

/**
* Extracts the protocol and port values from an endpoint string in form of protocol:port.
*
* @param endpoint endpoint as protocol:port
* @return {@link ProtocolAndPort}
*/
public static ProtocolAndPort parseToProtocolAndPort(String endpoint) {
String[] endpointParts = endpoint.split("\\:");
ProtocolAndPort protocolAndPort = new ProtocolAndPort(endpointParts[0].toUpperCase(), Integer.valueOf(endpointParts[1]));
return protocolAndPort;

}

/**
* Finds a matching instance for a given port and protocol
*
* @param serviceInstances the list of instances
* @param protocolAndPort the structure representing the protocol and port for an endpoint
* @return the found instance or {@code null} if none matches
*/
public static ServiceInstance findMatching (List<ServiceInstance> serviceInstances, ProtocolAndPort protocolAndPort) {
if (protocolAndPort.protocol == null) {
throw new NullPointerException("Protocol cannot be null");
}
for (ServiceInstance instance : serviceInstances) {
Metadata<KubernetesMetadataKey> k8sMetadata = (Metadata<KubernetesMetadataKey>) instance.getMetadata();
String svcProtocol="";
if(k8sMetadata.getMetadata().get(KubernetesMetadataKey.META_K8S_PORT_PROTOCOL)!=null){
svcProtocol= (String) k8sMetadata.getMetadata().get(KubernetesMetadataKey.META_K8S_PORT_PROTOCOL);
}
if (protocolAndPort.protocol.equals(svcProtocol) && protocolAndPort.port == instance.getPort()) {
return instance;
}
}
return null;
}



}


161 changes: 161 additions & 0 deletions servicebox-app/src/test/java/io/halkyon/ServiceDiscoveryJobTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
package io.halkyon;

import io.fabric8.kubernetes.api.model.EndpointAddress;
import io.fabric8.kubernetes.api.model.EndpointAddressBuilder;
import io.fabric8.kubernetes.api.model.EndpointPortBuilder;
import io.fabric8.kubernetes.api.model.EndpointSubsetBuilder;
import io.fabric8.kubernetes.api.model.Endpoints;
import io.fabric8.kubernetes.api.model.EndpointsBuilder;
import io.fabric8.kubernetes.api.model.ObjectReference;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodBuilder;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
import io.fabric8.kubernetes.client.server.mock.KubernetesServer;
import io.halkyon.model.Claim;
import io.halkyon.model.Service;
import io.halkyon.services.ClaimStatus;
import io.halkyon.services.ClaimingJobService;
import io.halkyon.services.ServiceDiscoveryJob;
import io.quarkus.scheduler.Scheduler;
import io.quarkus.test.junit.QuarkusTest;
import io.quarkus.test.kubernetes.client.KubernetesTestServer;
import io.quarkus.test.kubernetes.client.WithKubernetesTestServer;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import javax.inject.Inject;
import javax.ws.rs.core.MediaType;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;

import static io.restassured.RestAssured.given;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.core.IsNot.not;

@WithKubernetesTestServer
@QuarkusTest
public class ServiceDiscoveryJobTest {

KubernetesClient client;

@KubernetesTestServer
KubernetesServer mockServer;

@Inject
ServiceDiscoveryJob job;

@Inject
Scheduler scheduler;



@Test
public void shouldFindTheKubernetesServiceCorrespondingThePrimazaResgisteredService(){
pauseScheduler();

//Given a Postgresql service running in a Kubernetes cluster
registerPostgresqlServiceinKubernetes();

//Given 2 services registered in the DB, one of them the Postgresql service (representing the instance running in k8s)
createPostgresqlService();
createRabitMQService();

given()
.when().get("/services")
.then()
.statusCode(200)
.body(containsString("RabbitMQ"),
containsString("PostgreSQL"));

//When the job runs
job.execute();


//Then the rabbitMQ is deleted from DB because it is not running in the cluster
given()
.when().get("/services/name/RabbitMQ")
.then()
.statusCode(204)
.body(not(containsString("RabbitMQ")));


given()
.when().get("/services/name/PostgreSQL")
.then()
.statusCode(200)
.body(containsString("PostgreSQL"));


}

private void registerPostgresqlServiceinKubernetes() {

Map<String, String> labels = new HashMap<>();
labels.put("type","color");
labels.put("app.kubernetes.io/version","1.0");

final Pod pod = new PodBuilder().withNewSpec().endSpec().withNewMetadata().withName("PostgreSQL").withLabels(labels).and().build();

//Since we are using a mockServer, we are not able to make any real application running in there, so we run it locally and configure the k8s endpoint to return `localhost`
// as IP. This way, we will send the request to localhost where the RedService is actually running.
String[] ips = { "localhost"};
List<EndpointAddress> endpointAddresses = Arrays.stream(ips)
.map(ipAddress -> {
String uid = UUID.randomUUID().toString();
ObjectReference targetRef = new ObjectReference(null, null, "Pod",
"PostgreSQL", "development", null, uid);
EndpointAddress endpointAddress = new EndpointAddressBuilder().withIp(ipAddress).withTargetRef(targetRef)
.build();
return endpointAddress;
}).collect(Collectors.toList());

Endpoints endpoint = new EndpointsBuilder()
.withNewMetadata().withName("PostgreSQL").endMetadata()
.addToSubsets(new EndpointSubsetBuilder().withAddresses(endpointAddresses)
.addToPorts(new EndpointPortBuilder().withPort(5432).withProtocol("TCP").build())
.build())
.build();

// Set up Kubernetes so that our "pretend" pods and endpoints are created
mockServer.getClient().endpoints().inNamespace("test").resource(endpoint).create();
mockServer.getClient().pods().inNamespace("test").resource(pod).create();
}

private void pauseScheduler() {
scheduler.pause();
await().atMost(30, SECONDS).until(() -> !scheduler.isRunning());
}

private void createPostgresqlService() {
given()
.contentType(MediaType.APPLICATION_JSON)
.accept("application/json")
.body("{\"name\": \"PostgreSQL\", \"version\": \"8\", \"endpoint\": \"tcp:5432\", \"deployed\": \"true\" }")
.when().post("/services")
.then()
.statusCode(201)
.extract()
.as(Service.class);
}

private void createRabitMQService() {
given()
.contentType(MediaType.APPLICATION_JSON)
.accept("application/json")
.body("{\"name\": \"RabbitMQ\", \"version\": \"3.11.2\", \"endpoint\": \"tcp:5672\", \"deployed\": \"false\" }")
.when().post("/services")
.then()
.statusCode(201);
}
}

0 comments on commit e2fa441

Please sign in to comment.