Skip to content
This repository has been archived by the owner on Aug 30, 2023. It is now read-only.

feature: Make deployment replicas configurable on resource level #178

Merged
merged 23 commits into from
Mar 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
a180a10
feat(deployment-replica-runtime-config): look for replica node in dep…
ChrisRousey Mar 2, 2023
7b778b5
feat(deployment-replica-runtime-config): add test for deployment relp…
ChrisRousey Mar 3, 2023
5834bce
feat(deployment-replicas-config): updated health and metrics endpoints
ChrisRousey Mar 9, 2023
9019396
feat(deployment-replica-config): added default value
ChrisRousey Mar 9, 2023
65fe384
feat(deployment-replica-config): fix problem when only 1 replica
ChrisRousey Mar 9, 2023
f6e524a
Merge branch 'main' into feat/rework-deployment-configuration
ChrisRousey Mar 9, 2023
db6cd96
feat(deployment-replica-config): added validation check for searched …
ChrisRousey Mar 9, 2023
1420ffb
feat(deployment-replica-config): removed the validation
ChrisRousey Mar 9, 2023
ce1a4e8
feat(deployment-replica-config): updated exception message
ChrisRousey Mar 9, 2023
2aecb90
feat(deployment-replica-config): change sorting criteria
ChrisRousey Mar 9, 2023
6390437
feat(deployment-replica-config): remove unused resources
ChrisRousey Mar 9, 2023
6a048fb
feat(deployment-replica-config): fix error message and replica retrieval
ChrisRousey Mar 10, 2023
1d13cdd
feat(deployment-replica-config): override toString for exceptions
ChrisRousey Mar 10, 2023
039d38b
feat(deployment-replica-config): fix bug when no configs exist
ChrisRousey Mar 13, 2023
b7f45b8
feat(deployment-replica-runtime-config): updated naming
ChrisRousey Mar 14, 2023
49edb14
feat(deployment-replica-config): add check for replica smaller than zero
ChrisRousey Mar 14, 2023
21ec22e
feat(deployment-replica-config): fix log retrieval per replica
ChrisRousey Mar 20, 2023
c7d5d58
feat(deployment-replica-config): remove now unneeded service
ChrisRousey Mar 20, 2023
a714af3
feat(deployment-replica-config): removed unneeded references to services
ChrisRousey Mar 22, 2023
10af415
feat(deployment-replica-config): removed untestable tests
ChrisRousey Mar 22, 2023
49cba5f
Merge branch 'main' into feat/rework-deployment-configuration
ChrisRousey Mar 22, 2023
de867fb
feat(deployment-replica-config): change images to latest stable relea…
ChrisRousey Mar 22, 2023
2d916c7
Merge branch 'feat/rework-deployment-configuration' of https://github…
ChrisRousey Mar 22, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ private ConfigUtilities() {}

public static Uni<List<ConfigEntity>> getMappedConfigs(
Map<String, String> configs, DataCaterSessionFactory dsf) {

if (configs == null) {
return Uni.createFrom().item(new ArrayList<>());
}

return dsf.withTransaction(
(session, transaction) ->
session
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,9 @@
import io.datacater.core.pipeline.PipelineEntity;
import io.datacater.core.stream.StreamEntity;
import io.datacater.core.utilities.StringUtilities;
import io.fabric8.kubernetes.api.model.apps.Deployment;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.dsl.ContainerResource;
import io.fabric8.kubernetes.client.dsl.LogWatch;
import io.fabric8.kubernetes.client.dsl.RollableScalableResource;
import io.quarkus.security.Authenticated;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.unchecked.Unchecked;
Expand Down Expand Up @@ -66,21 +65,24 @@ public Uni<DeploymentEntity> getDeployment(@PathParam("uuid") UUID deploymentId)
@GET
@Path("{uuid}/logs")
@Produces(MediaType.APPLICATION_JSON)
public Uni<List<String>> getLogs(@PathParam("uuid") UUID deploymentId) {
public Uni<List<String>> getLogs(
@PathParam("uuid") UUID deploymentId, @DefaultValue("1") @QueryParam("replica") int replica) {
return dsf.withTransaction(
((session, transaction) -> session.find(DeploymentEntity.class, deploymentId)))
.onItem()
.ifNull()
.failWith(new DeploymentNotFoundException(StaticConfig.LoggerMessages.DEPLOYMENT_NOT_FOUND))
.onItem()
.ifNotNull()
.transform(Unchecked.function(deployment -> getDeploymentLogsAsList(deployment.getId())));
.transform(
Unchecked.function(deployment -> getDeploymentLogsAsList(deployment.getId(), replica)));
}

@GET
@Path("{uuid}/health")
@Produces(MediaType.APPLICATION_JSON)
public Uni<Response> getHealth(@PathParam("uuid") UUID deploymentId) {
public Uni<Response> getHealth(
@PathParam("uuid") UUID deploymentId, @DefaultValue("1") @QueryParam("replica") int replica) {
return dsf.withTransaction(
((session, transaction) -> session.find(DeploymentEntity.class, deploymentId)))
.onItem()
Expand All @@ -94,7 +96,9 @@ public Uni<Response> getHealth(@PathParam("uuid") UUID deploymentId) {
HttpClient httpClient = HttpClient.newHttpClient();
HttpRequest req =
buildDeploymentServiceRequest(
deploymentId, StaticConfig.EnvironmentVariables.DEPLOYMENT_HEALTH_PATH);
deploymentId,
StaticConfig.EnvironmentVariables.DEPLOYMENT_HEALTH_PATH,
replica);
HttpResponse<String> response =
httpClient.send(req, HttpResponse.BodyHandlers.ofString());
return Response.ok().entity(response.body()).build();
Expand All @@ -104,7 +108,8 @@ public Uni<Response> getHealth(@PathParam("uuid") UUID deploymentId) {
@GET
@Path("{uuid}/metrics")
@Produces(MediaType.TEXT_PLAIN)
public Uni<Response> getMetrics(@PathParam("uuid") UUID deploymentId) {
public Uni<Response> getMetrics(
@PathParam("uuid") UUID deploymentId, @DefaultValue("1") @QueryParam("replica") int replica) {
return dsf.withTransaction(
((session, transaction) -> session.find(DeploymentEntity.class, deploymentId)))
.onItem()
Expand All @@ -118,7 +123,9 @@ public Uni<Response> getMetrics(@PathParam("uuid") UUID deploymentId) {
HttpClient httpClient = HttpClient.newHttpClient();
HttpRequest req =
buildDeploymentServiceRequest(
deploymentId, StaticConfig.EnvironmentVariables.DEPLOYMENT_METRICS_PATH);
deploymentId,
StaticConfig.EnvironmentVariables.DEPLOYMENT_METRICS_PATH,
replica);
HttpResponse<String> response =
httpClient.send(req, HttpResponse.BodyHandlers.ofString());
return Response.ok().entity(response.body()).build();
Expand All @@ -129,7 +136,10 @@ public Uni<Response> getMetrics(@PathParam("uuid") UUID deploymentId) {
@Path("{uuid}/watch-logs")
@Produces(MediaType.SERVER_SENT_EVENTS)
public Uni<Response> watchLogs(
@PathParam("uuid") UUID deploymentId, @Context Sse sse, @Context SseEventSink eventSink) {
@PathParam("uuid") UUID deploymentId,
@DefaultValue("1") @QueryParam("replica") int replica,
@Context Sse sse,
@Context SseEventSink eventSink) {
return dsf.withTransaction(
((session, transaction) -> session.find(DeploymentEntity.class, deploymentId)))
.onItem()
Expand All @@ -140,7 +150,7 @@ public Uni<Response> watchLogs(
.transform(
deployment -> {
try {
watchLogsRunner(deployment.getId(), sse, eventSink);
watchLogsRunner(deployment.getId(), replica, sse, eventSink);
} catch (IOException e) {
throw new DatacaterException(StringUtilities.wrapString(e.getMessage()));
}
Expand Down Expand Up @@ -336,21 +346,20 @@ private Uni<StreamEntity> getStream(
String.format(StaticConfig.LoggerMessages.STREAM_NOT_FOUND, key))));
}

private List<String> getDeploymentLogsAsList(UUID deploymentId) {
private List<String> getDeploymentLogsAsList(UUID deploymentId, int replica) {
K8Deployment k8Deployment = new K8Deployment(client);
return Arrays.asList(k8Deployment.getLogs(deploymentId).split("\n"));
return Arrays.asList(k8Deployment.getLogs(deploymentId, replica).split("\n"));
}

private HttpRequest buildDeploymentServiceRequest(UUID deploymentId, String path) {
private HttpRequest buildDeploymentServiceRequest(UUID deploymentId, String path, int replica) {
K8Deployment k8Deployment = new K8Deployment(client);
String clusterIp = k8Deployment.getClusterIp(deploymentId);
String ip = k8Deployment.getDeploymentReplicaIp(deploymentId, replica).replace(".", "-");
String namespace = StaticConfig.EnvironmentVariables.NAMESPACE;
int port = StaticConfig.EnvironmentVariables.DEPLOYMENT_CONTAINER_PORT;
String protocol = StaticConfig.EnvironmentVariables.DEPLOYMENT_CONTAINER_PROTOCOL;

String uriReady =
String.format(
"%s://%s:%d%s",
StaticConfig.EnvironmentVariables.DEPLOYMENT_CONTAINER_PROTOCOL,
clusterIp,
StaticConfig.EnvironmentVariables.DEPLOYMENT_CONTAINER_PORT,
path);
String.format("%s://%s.%s.pod.cluster.local:%d%s", protocol, ip, namespace, port, path);

return HttpRequest.newBuilder()
.GET()
Expand All @@ -359,9 +368,9 @@ private HttpRequest buildDeploymentServiceRequest(UUID deploymentId, String path
.build();
}

private RollableScalableResource<Deployment> watchDeploymentLogs(UUID deploymentId) {
private ContainerResource watchDeploymentLogs(UUID deploymentId, int replica) {
K8Deployment k8Deployment = new K8Deployment(client);
return k8Deployment.watchLogs(deploymentId);
return k8Deployment.watchLogs(deploymentId, replica);
}

private List<DeploymentEntity> getK8Deployments(List<DeploymentEntity> deployments) {
Expand Down Expand Up @@ -452,12 +461,10 @@ private UUID getPipelineUUIDFromMap(Map<String, Object> map) {
}
}

private void watchLogsRunner(UUID deploymentId, @Context Sse sse, @Context SseEventSink eventSink)
private void watchLogsRunner(
UUID deploymentId, int replica, @Context Sse sse, @Context SseEventSink eventSink)
throws IOException {
LogWatch lw =
watchDeploymentLogs(deploymentId)
.inContainer(StaticConfig.DEPLOYMENT_NAME_PREFIX + deploymentId)
.watchLog();
LogWatch lw = watchDeploymentLogs(deploymentId, replica).watchLog();
InputStream is = lw.getOutput();
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(is));
String line;
Expand Down
Loading