Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Misc Remote Dev enhancements #2052

Merged
merged 4 commits into from
Feb 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ Usage:
* Fix #2003: check local port available on start remote-dev
* Fix #2004: AnsiOutputStream exceptions don't prevent logging or program execution
* Fix #2008: resources validated after their generation by `k8s:resource`
* Fix #2052: Remote Dev discovers remote ports for local services exposed in the cluster
* Fix #2052: Remote Dev includes a SOCKS 5 proxy

### 1.10.1 (2022-11-16)
* Fix #1915: Maven 3.6.3 is still supported
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package org.eclipse.jkube.kit.common;

import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpServer;
import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
Expand Down Expand Up @@ -55,9 +56,11 @@ private static Callable<HttpServer> startServer(File staticDirectory) {
final int port = getFreeRandomPort();
final HttpServer ret = HttpServer.create(new InetSocketAddress(port), 0);
ret.createContext("/", exchange -> {
final File file = new File(staticDirectory, exchange.getRequestURI().getPath()).getCanonicalFile();
if (file.isFile()) {

final String path = exchange.getRequestURI().getPath();
final File file = new File(staticDirectory, path).getCanonicalFile();
if (path.equals("/health")) {
reply(exchange, 200, "READY");
} else if (file.isFile()) {
try (
OutputStream outputStream = exchange.getResponseBody();
FileInputStream fis = new FileInputStream(file)
Expand All @@ -68,11 +71,7 @@ private static Callable<HttpServer> startServer(File staticDirectory) {
IOUtils.copy(fis, outputStream);
}
} else {
String response = "404 (Not Found)\n";
exchange.sendResponseHeaders(404 , response.length());
try (OutputStream os = exchange.getResponseBody()) {
os.write(response.getBytes());
}
reply(exchange, 404, "404 (Not Found)\n");
}
});
log.info("Starting server");
Expand All @@ -81,6 +80,13 @@ private static Callable<HttpServer> startServer(File staticDirectory) {
};
}

private static void reply(HttpExchange exchange, int code, String body) throws IOException {
exchange.sendResponseHeaders(code , body.length());
try (OutputStream os = exchange.getResponseBody()) {
os.write(body.getBytes());
}
}

public int getPort() {
return getServer().getAddress().getPort();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ public class LocalService {

private String serviceName;
private String type;
/**
* Local port where the service is exposed
*/
private int port;

public Service toKubernetesService() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.eclipse.jkube.kit.common.KitLogger;

import java.util.Collections;
import java.util.List;


class LocalServiceManager {
Expand All @@ -29,17 +28,17 @@ class LocalServiceManager {

private final KitLogger logger;
private final KubernetesClient kubernetesClient;
private final List<LocalService> localServices;
private final RemoteDevelopmentContext context;

public LocalServiceManager(RemoteDevelopmentContext context) {
this.context = context;
logger = context.getLogger();
kubernetesClient = context.getKubernetesClient();
localServices = context.getRemoteDevelopmentConfig().getLocalServices();
}

public void createOrReplaceServices() {
logger.debug("Creating or replacing Kubernetes services for exposed ports from local environment");
for (LocalService localService : localServices) {
for (LocalService localService : context.getRemoteDevelopmentConfig().getLocalServices()) {
final Service existingService = kubernetesClient.services().withName(localService.getServiceName()).get();
final Service newService;
if (existingService == null) {
Expand All @@ -49,36 +48,43 @@ public void createOrReplaceServices() {
if (existingService.getMetadata().getAnnotations().get(PREVIOUS_SERVICE_ANNOTATION) != null) {
previousServiceAnnotation = existingService.getMetadata().getAnnotations().get(PREVIOUS_SERVICE_ANNOTATION);
} else {
final Service cleanExistingService = new ServiceBuilder(existingService)
final Service sanitizedExistingService = new ServiceBuilder(existingService)
.withStatus(null)
.editSpec()
.withClusterIP(null)
.withClusterIPs(Collections.emptyList())
.withExternalIPs(Collections.emptyList())
.endSpec()
.build();
previousServiceAnnotation = Serialization.asJson(cleanExistingService);
previousServiceAnnotation = Serialization.asJson(sanitizedExistingService);
}
newService = new ServiceBuilder(localService.toKubernetesService())
final ServiceBuilder newServiceBuilder = new ServiceBuilder(localService.toKubernetesService())
.editOrNewMetadata()
.addToAnnotations(PREVIOUS_SERVICE_ANNOTATION, previousServiceAnnotation)
.endMetadata()
.build();
.endMetadata();
// Prefer existing service ports
if (existingService.getSpec().getPorts() != null && existingService.getSpec().getPorts().size() == 1) {
newServiceBuilder.editOrNewSpec()
.withPorts(existingService.getSpec().getPorts())
.endSpec();
}
newService = newServiceBuilder.build();
}
kubernetesClient.services().resource(newService).createOrReplace();
context.getManagedServices().put(localService, newService);
}
}

public void tearDownServices() {
logger.debug("Tearing down Kubernetes services for exposed ports from local environment");
for (LocalService localService : localServices) {
final Service service = kubernetesClient.services().withName(localService.getServiceName()).get();
for (Service managedService : context.getManagedServices().values()) {
final Service service = kubernetesClient.services().resource(managedService).fromServer().get();
if (service != null && service.getMetadata().getAnnotations().get(PREVIOUS_SERVICE_ANNOTATION) != null) {
final Service previousService = Serialization.unmarshal(
service.getMetadata().getAnnotations().get(PREVIOUS_SERVICE_ANNOTATION), Service.class);
kubernetesClient.services().resource(previousService).createOrReplace();
} else if (service != null) {
kubernetesClient.services().withName(localService.getServiceName()).delete();
kubernetesClient.services().resource(service).delete();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@
*/
package org.eclipse.jkube.kit.remotedev;

import io.fabric8.kubernetes.api.model.IntOrString;
import io.fabric8.kubernetes.api.model.Service;
import io.fabric8.kubernetes.api.model.ServicePort;
import io.fabric8.kubernetes.api.model.ServiceSpec;
import org.apache.sshd.client.SshClient;
import org.apache.sshd.client.auth.pubkey.UserAuthPublicKeyFactory;
import org.apache.sshd.client.config.hosts.HostConfigEntryResolver;
Expand All @@ -35,12 +39,16 @@
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

class PortForwarder implements Callable<Void> {

private static final String LOCALHOST = "localhost";

private final RemoteDevelopmentContext context;
private final KitLogger logger;
private final AtomicBoolean stop;
Expand All @@ -56,11 +64,13 @@ public Void call() throws InterruptedException {
logger.debug("Starting port forwarder...");
while (true) {
waitForUser();
waitForServices();
final SshClient sshClient = startSshClient();
try (ClientSession session = createSession(sshClient)) {
session.auth().verify(10, TimeUnit.SECONDS);
forwardRemotePorts(session);
forwardLocalPorts(session);
socksProxy(session);
session.waitFor(
Arrays.asList(ClientSession.ClientSessionEvent.CLOSED, ClientSession.ClientSessionEvent.TIMEOUT),
Duration.ofHours(1));
Expand Down Expand Up @@ -100,7 +110,7 @@ private SshClient startSshClient() {

private ClientSession createSession(SshClient sshClient) throws IOException {
return sshClient
.connect(context.getUser(), "localhost", context.getSshPort())
.connect(context.getUser(), LOCALHOST, context.getSshPort())
.verify(10, TimeUnit.SECONDS)
.getSession();
}
Expand All @@ -112,6 +122,13 @@ private void waitForUser() throws InterruptedException {
}
}

private void waitForServices() throws InterruptedException {
logger.debug("Waiting for remote services to be created");
while (!context.getManagedServices().keySet().containsAll(context.getRemoteDevelopmentConfig().getLocalServices())) {
TimeUnit.SECONDS.sleep(1);
}
}

private void forwardRemotePorts(ClientSession session) throws IOException {
for (RemoteService remoteService : context.getRemoteDevelopmentConfig().getRemoteServices()) {
session.startLocalPortForwarding(
Expand All @@ -122,13 +139,28 @@ private void forwardRemotePorts(ClientSession session) throws IOException {
}

private void forwardLocalPorts(ClientSession session) throws IOException {
for (LocalService localService : context.getRemoteDevelopmentConfig().getLocalServices()) {
for (Map.Entry<LocalService, Service> managedService : context.getManagedServices().entrySet()) {
final int localPort = managedService.getKey().getPort();
final int remotePort = Optional.ofNullable(managedService.getValue().getSpec())
.map(ServiceSpec::getPorts).map(p -> p.iterator().next())
.map(ServicePort::getTargetPort).map(IntOrString::getIntVal)
.orElse(localPort);
session.startRemotePortForwarding(
new SshdSocketAddress("", localService.getPort()),
new SshdSocketAddress("localhost", localService.getPort()) // Extremely important for quarkus:dev
new SshdSocketAddress("", remotePort),
new SshdSocketAddress(LOCALHOST, managedService.getKey().getPort()) // Extremely important for quarkus:dev
);
logger.info("Local port '%s' is now available as a Kubernetes Service at %s:%s",
localService.getPort(), localService.getServiceName(), localService.getPort());
localPort, managedService.getKey().getServiceName(), remotePort);
}
}

private void socksProxy(ClientSession session) throws IOException {
final int socksPort = context.getRemoteDevelopmentConfig().getSocksPort();
if (socksPort > 0 && socksPort <= 65535) {
session.startDynamicPortForwarding(new SshdSocketAddress(LOCALHOST, socksPort));
logger.info("SOCKS 5 proxy is now available at 'localhost:%s'", socksPort);
} else {
logger.debug("SOCKS 5 proxy is disabled");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
@EqualsAndHashCode
public class RemoteDevelopmentConfig {

private int socksPort;
@Singular
private List<RemoteService> remoteServices;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package org.eclipse.jkube.kit.remotedev;

import io.fabric8.kubernetes.api.model.Service;
import io.fabric8.kubernetes.client.KubernetesClient;
import lombok.Getter;
import org.eclipse.jkube.kit.common.KitLogger;
Expand All @@ -26,8 +27,10 @@
import java.security.KeyPairGenerator;
import java.security.NoSuchAlgorithmException;
import java.security.interfaces.RSAPublicKey;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

Expand All @@ -46,6 +49,8 @@ public class RemoteDevelopmentContext {
@Getter
private final String sshRsaPublicKey;
private final Properties properties;
@Getter
private final Map<LocalService, Service> managedServices;

public RemoteDevelopmentContext(
KitLogger kitLogger, KubernetesClient kubernetesClient, RemoteDevelopmentConfig remoteDevelopmentConfig) {
Expand All @@ -58,6 +63,7 @@ public RemoteDevelopmentContext(
clientKeys = initClientKeys();
sshRsaPublicKey = initSShRsaPublicKey(clientKeys);
properties = new Properties();
managedServices = new ConcurrentHashMap<>();
try {
properties.load(RemoteDevelopmentContext.class.getResourceAsStream(REMOTE_DEV_PROPERTIES_FILE));
} catch(IOException ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package org.eclipse.jkube.kit.remotedev;

import io.fabric8.kubernetes.api.model.IntOrString;
import io.fabric8.kubernetes.api.model.ObjectMeta;
import io.fabric8.kubernetes.api.model.Service;
import io.fabric8.kubernetes.api.model.ServiceBuilder;
Expand Down Expand Up @@ -69,7 +70,7 @@ void createOrReplaceServices_createsNewService() {
void createOrReplaceServices_withExistingService_replacesService() {
// Given
kubernetesClient.services().resource(new ServiceBuilder()
.withNewMetadata().withName("service").endMetadata()
.withNewMetadata().withName("service").addToAnnotations("k8s", "io").endMetadata()
.withNewSpec()
.addNewPort().withPort(31337).endPort()
.endSpec().build()).create();
Expand All @@ -83,7 +84,8 @@ void createOrReplaceServices_withExistingService_replacesService() {
.hasFieldOrPropertyWithValue("metadata.name", "service")
.extracting(s -> s.getMetadata().getAnnotations().get("jkube/previous-service"))
.extracting(Serialization::unmarshal)
.isInstanceOf(Service.class)
.asInstanceOf(InstanceOfAssertFactories.type(Service.class))
.satisfies(s -> assertThat(s.getMetadata().getAnnotations()).containsEntry("k8s", "io"))
.extracting("spec.ports").asList()
.extracting("port")
.containsExactly(31337);
Expand Down Expand Up @@ -121,18 +123,43 @@ void createOrReplaceServices_withExistingServiceWithAnnotation_replacesServiceWi
}

@Test
void tearDownServices_deletesNewService() {
void createOrReplaceServices_withExistingService_reusesPort() {
// Given
kubernetesClient.services().resource(new ServiceBuilder()
.withNewMetadata().withName("service").endMetadata()
.withNewSpec()
.addNewPort().withPort(31337).endPort()
.addNewPort().withPort(31337).withNewTargetPort().withValue(42).endTargetPort().endPort()
.endSpec().build()).create();
final RemoteDevelopmentConfig config = RemoteDevelopmentConfig.builder()
.localService(LocalService.builder().serviceName("service").port(1337).build())
.build();
// When
new LocalServiceManager(new RemoteDevelopmentContext(logger, kubernetesClient, config)).tearDownServices();
new LocalServiceManager(new RemoteDevelopmentContext(logger, kubernetesClient, config)).createOrReplaceServices();
// Then
assertThat(kubernetesClient.services().withName("service").get())
.hasFieldOrPropertyWithValue("metadata.name", "service")
.extracting("spec.ports").asList()
.extracting("targetPort")
.containsExactly(new IntOrString(42));
}

@Test
void tearDownServices_deletesNewService() {
// Given
final Service service = new ServiceBuilder()
.withNewMetadata().withName("service").endMetadata()
.withNewSpec()
.addNewPort().withPort(31337).endPort()
.endSpec().build();
kubernetesClient.services().resource(service).create();
final LocalService localService = LocalService.builder().serviceName("service").port(1337).build();
final RemoteDevelopmentConfig config = RemoteDevelopmentConfig.builder()
.localService(localService)
.build();
final RemoteDevelopmentContext context = new RemoteDevelopmentContext(logger, kubernetesClient, config);
context.getManagedServices().put(localService, service);
// When
new LocalServiceManager(context).tearDownServices();
// Then
assertThat(kubernetesClient.services().withName("service")
.waitUntilCondition(Objects::isNull, 1, TimeUnit.SECONDS))
Expand All @@ -142,7 +169,7 @@ void tearDownServices_deletesNewService() {
@Test
void tearDownServices_restoresOldService() {
// Given
kubernetesClient.services().resource(new ServiceBuilder()
final Service service = new ServiceBuilder()
.withNewMetadata().withName("service")
.addToAnnotations("jkube/previous-service", Serialization.asJson(new ServiceBuilder()
.withNewMetadata().withName("service").endMetadata()
Expand All @@ -151,12 +178,16 @@ void tearDownServices_restoresOldService() {
.endMetadata()
.withNewSpec()
.addNewPort().withPort(1337).endPort()
.endSpec().build()).create();
.endSpec().build();
kubernetesClient.services().resource(service).create();
final LocalService localService = LocalService.builder().serviceName("service").port(1337).build();
final RemoteDevelopmentConfig config = RemoteDevelopmentConfig.builder()
.localService(LocalService.builder().serviceName("service").port(1337).build())
.localService(localService)
.build();
final RemoteDevelopmentContext context = new RemoteDevelopmentContext(logger, kubernetesClient, config);
context.getManagedServices().put(localService, service);
// When
new LocalServiceManager(new RemoteDevelopmentContext(logger, kubernetesClient, config)).tearDownServices();
new LocalServiceManager(context).tearDownServices();
// Then
assertThat(kubernetesClient.services().withName("service").get())
.extracting("spec.selector")
Expand Down
Loading