diff --git a/leshan-core/src/main/java/org/eclipse/leshan/core/endpoint/EndpointUriUtil.java b/leshan-core/src/main/java/org/eclipse/leshan/core/endpoint/EndpointUriUtil.java index 67908f3683..9fc3dc9eae 100644 --- a/leshan-core/src/main/java/org/eclipse/leshan/core/endpoint/EndpointUriUtil.java +++ b/leshan-core/src/main/java/org/eclipse/leshan/core/endpoint/EndpointUriUtil.java @@ -47,6 +47,15 @@ public static URI createUri(String uri) { } } + public static URI replaceAddress(URI originalUri, InetSocketAddress newAddress) { + try { + return new URI(originalUri.getScheme(), null, newAddress.getHostString(), newAddress.getPort(), null, null, + null); + } catch (URISyntaxException e) { + throw new IllegalStateException(e); + } + } + public static InetSocketAddress getSocketAddr(URI uri) { return new InetSocketAddress(uri.getHost(), uri.getPort()); } diff --git a/leshan-integration-tests/src/test/java/org/eclipse/leshan/integration/tests/util/LeshanProxyBuilder.java b/leshan-integration-tests/src/test/java/org/eclipse/leshan/integration/tests/util/LeshanProxyBuilder.java new file mode 100644 index 0000000000..7915f0f571 --- /dev/null +++ b/leshan-integration-tests/src/test/java/org/eclipse/leshan/integration/tests/util/LeshanProxyBuilder.java @@ -0,0 +1,30 @@ +/******************************************************************************* + * Copyright (c) 2023 Sierra Wireless and others. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v2.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v20.html + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.html. + * + * Contributors: + * Sierra Wireless - initial API and implementation + *******************************************************************************/ +package org.eclipse.leshan.integration.tests.util; + +import java.net.InetSocketAddress; +import java.net.URI; + +import org.eclipse.leshan.core.endpoint.Protocol; + +public class LeshanProxyBuilder { + + public static ReverseProxy givenReverseProxyFor(LeshanTestServer server, Protocol protocol) { + URI serverEndpointUri = server.getEndpoint(protocol).getURI(); + return new ReverseProxy(new InetSocketAddress("localhost", 0), + new InetSocketAddress(serverEndpointUri.getHost(), serverEndpointUri.getPort())); + } +} diff --git a/leshan-integration-tests/src/test/java/org/eclipse/leshan/integration/tests/util/LeshanTestClient.java b/leshan-integration-tests/src/test/java/org/eclipse/leshan/integration/tests/util/LeshanTestClient.java index 0a60c1210c..1c67c2759d 100644 --- a/leshan-integration-tests/src/test/java/org/eclipse/leshan/integration/tests/util/LeshanTestClient.java +++ b/leshan-integration-tests/src/test/java/org/eclipse/leshan/integration/tests/util/LeshanTestClient.java @@ -25,6 +25,8 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.timeout; +import java.net.InetSocketAddress; +import java.net.URI; import java.security.cert.Certificate; import java.util.List; import java.util.Map; @@ -43,11 +45,13 @@ import org.eclipse.leshan.client.send.DataSender; import org.eclipse.leshan.client.servers.LwM2mServer; import org.eclipse.leshan.client.util.LinkFormatHelper; +import org.eclipse.leshan.core.endpoint.EndpointUriUtil; import org.eclipse.leshan.core.link.LinkSerializer; import org.eclipse.leshan.core.link.lwm2m.attributes.LwM2mAttributeParser; import org.eclipse.leshan.core.node.codec.LwM2mDecoder; import org.eclipse.leshan.core.node.codec.LwM2mEncoder; import org.eclipse.leshan.server.bootstrap.LeshanBootstrapServer; +import org.eclipse.leshan.server.endpoint.LwM2mServerEndpoint; import org.mockito.ArgumentCaptor; import org.mockito.InOrder; @@ -57,13 +61,14 @@ public class LeshanTestClient extends LeshanClient { private final String endpointName; private final InOrder inOrder; + private final ReverseProxy proxy; public LeshanTestClient(String endpoint, List objectEnablers, List dataSenders, List trustStore, RegistrationEngineFactory engineFactory, BootstrapConsistencyChecker checker, Map additionalAttributes, Map bsAdditionalAttributes, LwM2mEncoder encoder, LwM2mDecoder decoder, ScheduledExecutorService sharedExecutor, LinkSerializer linkSerializer, LinkFormatHelper linkFormatHelper, - LwM2mAttributeParser attributeParser, LwM2mClientEndpointsProvider endpointsProvider) { + LwM2mAttributeParser attributeParser, LwM2mClientEndpointsProvider endpointsProvider, ReverseProxy proxy) { super(endpoint, objectEnablers, dataSenders, trustStore, engineFactory, checker, additionalAttributes, bsAdditionalAttributes, encoder, decoder, sharedExecutor, linkSerializer, linkFormatHelper, attributeParser, endpointsProvider); @@ -71,6 +76,8 @@ public LeshanTestClient(String endpoint, List obje // Store some internal attribute this.endpointName = endpoint; + this.proxy = proxy; + // Add Mock Listener clientObserver = mock(LwM2mClientObserver.class); addObserver(clientObserver); @@ -100,31 +107,20 @@ public void waitForRegistrationTo(LeshanTestServer server) { public void waitForRegistrationTo(LeshanTestServer server, long timeout, TimeUnit unit) { inOrder.verify(clientObserver, timeout(unit.toMillis(timeout)).times(1)).onRegistrationStarted(assertArg( // - s -> { - assertThat(server.getEndpoints()) // - .filteredOn(ep -> ep.getURI().toString().equals(s.getUri())) // - .hasSize(1); - - }), // + s -> assertThat(isServerIdentifiedByUri(server, s.getUri()))), // isNotNull()); inOrder.verify(clientObserver, timeout(unit.toMillis(timeout)).times(1)).onRegistrationSuccess(assertArg( // - s -> assertThat(server.getEndpoints()) // - .filteredOn(ep -> ep.getURI().toString().equals(s.getUri())) // - .hasSize(1)), // + s -> assertThat(isServerIdentifiedByUri(server, s.getUri()))), // isNotNull(), isNotNull()); inOrder.verifyNoMoreInteractions(); } public void waitForUpdateTo(LeshanTestServer server, long timeout, TimeUnit unit) { inOrder.verify(clientObserver, timeout(unit.toMillis(timeout)).times(1)).onUpdateStarted(assertArg( // - s -> assertThat(server.getEndpoints()) // - .filteredOn(ep -> ep.getURI().toString().equals(s.getUri())) // - .hasSize(1)), // + s -> assertThat(isServerIdentifiedByUri(server, s.getUri()))), // notNull()); inOrder.verify(clientObserver, timeout(unit.toMillis(timeout)).times(1)).onUpdateSuccess(assertArg( // - s -> assertThat(server.getEndpoints()) // - .filteredOn(ep -> ep.getURI().toString().equals(s.getUri())) // - .hasSize(1)), // + s -> assertThat(isServerIdentifiedByUri(server, s.getUri()))), // notNull()); inOrder.verifyNoMoreInteractions(); } @@ -135,17 +131,10 @@ public void waitForDeregistrationTo(LeshanTestServer server) { public void waitForDeregistrationTo(LeshanTestServer server, long timeout, TimeUnit unit) { inOrder.verify(clientObserver, timeout(unit.toMillis(timeout)).times(1)).onDeregistrationStarted(assertArg( // - s -> { - assertThat(server.getEndpoints()) // - .filteredOn(ep -> ep.getURI().toString().equals(s.getUri())) // - .hasSize(1); - - }), // + s -> assertThat(isServerIdentifiedByUri(server, s.getUri()))), // isNotNull()); inOrder.verify(clientObserver, timeout(unit.toMillis(timeout)).times(1)).onDeregistrationSuccess(assertArg( // - s -> assertThat(server.getEndpoints()) // - .filteredOn(ep -> ep.getURI().toString().equals(s.getUri())) // - .hasSize(1)), // + s -> assertThat(isServerIdentifiedByUri(server, s.getUri()))), // isNotNull()); inOrder.verifyNoMoreInteractions(); } @@ -156,15 +145,11 @@ public void waitForUpdateTimeoutTo(LeshanTestServer server) { public void waitForUpdateTimeoutTo(LeshanTestServer server, long timeout, TimeUnit unit) { inOrder.verify(clientObserver, timeout(unit.toMillis(timeout)).times(1)).onUpdateStarted(assertArg( // - s -> assertThat(server.getEndpoints()) // - .filteredOn(ep -> ep.getURI().toString().equals(s.getUri())) // - .hasSize(1)), // + s -> assertThat(isServerIdentifiedByUri(server, s.getUri()))), // notNull()); inOrder.verify(clientObserver, timeout(unit.toMillis(timeout)).times(1)).onUpdateTimeout(assertArg( // - s -> assertThat(server.getEndpoints()) // - .filteredOn(ep -> ep.getURI().toString().equals(s.getUri())) // - .hasSize(1)), // + s -> assertThat(isServerIdentifiedByUri(server, s.getUri()))), // notNull()); // if client update timeout, it will retry again then try a register so all events can not be consume by inOrder // ... @@ -176,14 +161,10 @@ public void waitForUpdateFailureTo(LeshanTestServer server) { public void waitForUpdateFailureTo(LeshanTestServer server, long timeout, TimeUnit unit) { inOrder.verify(clientObserver, timeout(unit.toMillis(timeout)).times(1)).onUpdateStarted(assertArg( // - s -> assertThat(server.getEndpoints()) // - .filteredOn(ep -> ep.getURI().toString().equals(s.getUri())) // - .hasSize(1)), // + s -> assertThat(isServerIdentifiedByUri(server, s.getUri()))), // notNull()); inOrder.verify(clientObserver, timeout(unit.toMillis(timeout)).times(1)).onUpdateFailure(assertArg( // - s -> assertThat(server.getEndpoints()) // - .filteredOn(ep -> ep.getURI().toString().equals(s.getUri())) // - .hasSize(1)), // + s -> assertThat(isServerIdentifiedByUri(server, s.getUri()))), // notNull(), // notNull(), // TODO we should be able to check response code any(), // @@ -225,4 +206,22 @@ public Exception waitForBootstrapFailure(LeshanBootstrapServer server, long time c.capture()); return c.getValue(); } + + private boolean isServerIdentifiedByUri(LeshanTestServer server, String expectedUri) { + for (LwM2mServerEndpoint endpoint : server.getEndpoints()) { + URI endpointURI = endpoint.getURI(); + InetSocketAddress endpointAddr = EndpointUriUtil.getSocketAddr(endpointURI); + if (proxy != null && endpointAddr.equals(proxy.getServerAddress())) { + URI proxyUri = EndpointUriUtil.replaceAddress(endpointURI, proxy.getClientSideProxyAddress()); + if (proxyUri.toString().equals(expectedUri)) { + return true; + } + } else { + if (endpointURI.toString().equals(expectedUri)) { + return true; + } + } + } + return false; + } } diff --git a/leshan-integration-tests/src/test/java/org/eclipse/leshan/integration/tests/util/LeshanTestClientBuilder.java b/leshan-integration-tests/src/test/java/org/eclipse/leshan/integration/tests/util/LeshanTestClientBuilder.java index 56137e2c85..aa04b0190b 100644 --- a/leshan-integration-tests/src/test/java/org/eclipse/leshan/integration/tests/util/LeshanTestClientBuilder.java +++ b/leshan-integration-tests/src/test/java/org/eclipse/leshan/integration/tests/util/LeshanTestClientBuilder.java @@ -61,6 +61,7 @@ import org.eclipse.leshan.client.util.LinkFormatHelper; import org.eclipse.leshan.core.CertificateUsage; import org.eclipse.leshan.core.LwM2mId; +import org.eclipse.leshan.core.endpoint.EndpointUriUtil; import org.eclipse.leshan.core.endpoint.Protocol; import org.eclipse.leshan.core.link.LinkSerializer; import org.eclipse.leshan.core.link.lwm2m.attributes.LwM2mAttributeParser; @@ -88,6 +89,7 @@ public class LeshanTestClientBuilder extends LeshanClientBuilder { private Protocol protocolToUse; private LeshanServer server; private LeshanBootstrapServer bootstrapServer; + private ReverseProxy proxy; private Integer bootstrapServerId; @@ -128,8 +130,8 @@ public LeshanTestClient build() { try { // connect to LWM2M Server if (server != null) { - LwM2mServerEndpoint endpoint = server.getEndpoint(protocolToUse); - URI uri = endpoint.getURI(); + URI uri = getServerUri(); + int serverID = 12345; if (pskIdentity != null && pskKey != null) { @@ -234,7 +236,7 @@ protected LeshanTestClient createLeshanClient(String endpoint, List + * It supports only 1 client and server. + * + *
+ * ┌──────┐
+ * │client│
+ * └─┬─▲──┘
+ *   │ │  clientAddress
+ *   │ │
+ *   │ │  clientSideProxyAddress
+ * ┌─▼─┴──┐
+ * │Proxy │
+ * └─┬─▲──┘
+ *   │ │  serverSideProxyAddress
+ *   │ │
+ *   │ │  serverAddress
+ * ┌─▼─┴──┐
+ * │Server│
+ * └──────┘
+ * 
+ */ +public class ReverseProxy { + + private static final Logger LOGGER = LoggerFactory.getLogger(ReverseProxy.class); + + private static final int BUFFER_SIZE = 2048; + + private final InetSocketAddress clientSideProxyAddress; + private final InetSocketAddress serverAddress; + private InetSocketAddress clientAddress; + + private DatagramChannel clientToProxyChannel; + private DatagramChannel proxyToServerChannel; + private Selector selector; + + private final ExecutorService executor = Executors.newFixedThreadPool(1); + private volatile boolean running; + private volatile boolean changeServerSideProxyAddress = false; + + public ReverseProxy(InetSocketAddress clientSideProxyAddress, InetSocketAddress serverAddress) { + this.clientSideProxyAddress = clientSideProxyAddress; + this.serverAddress = serverAddress; + } + + public void start() { + executor.execute(() -> { + try { + LOGGER.trace("Starting Reverse Proxy"); + + selector = Selector.open(); + + clientToProxyChannel = DatagramChannel.open(); + clientToProxyChannel.bind(clientSideProxyAddress); + clientToProxyChannel.configureBlocking(false); + clientToProxyChannel.register(selector, SelectionKey.OP_READ); + + proxyToServerChannel = DatagramChannel.open(); + proxyToServerChannel.configureBlocking(false); + proxyToServerChannel.register(selector, SelectionKey.OP_READ); + + running = true; + + LOGGER.debug("Reverse Proxy Started"); + while (running) { + selector.select(); + // Handle events if any + Set selecteds = selector.selectedKeys(); + for (SelectionKey selected : selecteds) { + if (selected.channel() == clientToProxyChannel) { + handleClientPackets(); + } else if (selected.channel() == proxyToServerChannel) { + handlerServerPackets(); + } else { + logAndRaiseException("Unexpected selected channel"); + } + } + // Reassign address if needed + if (changeServerSideProxyAddress) { + reassignServerSideProxyAddress(); + } + } + // proxy is stopped + LOGGER.trace("Stopping Reverse Proxy"); + clientToProxyChannel.close(); + proxyToServerChannel.close(); + LOGGER.debug("Reverse Proxy stopped"); + + } catch (IOException e) { + logAndRaiseException("Unexpected IO Exception when running proxy", e); + } + }); + } + + private void handleClientPackets() throws IOException { + LOGGER.trace("Handling Packet received from Client"); + + // Get received Data + ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE); + InetSocketAddress sourceAddress = (InetSocketAddress) clientToProxyChannel.receive(buffer); + if (sourceAddress == null) { + return; + } else { + // maybe better to store this on connect event ? + clientAddress = sourceAddress; + } + + // Transfer Data + buffer.flip(); + proxyToServerChannel.send(buffer, serverAddress); + buffer.clear(); + LOGGER.trace("{} bytes transfer to Server", buffer.remaining(), serverAddress); + } + + private void handlerServerPackets() throws IOException { + LOGGER.trace("Handling Packet received from Server"); + + // Get received Data + ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE); + InetSocketAddress sourceAddress; + sourceAddress = (InetSocketAddress) proxyToServerChannel.receive(buffer); + if (sourceAddress == null) { + return; + } + if (!sourceAddress.equals(serverAddress)) { + logAndRaiseException(String.format("We should only receive data from server %s", serverAddress)); + } + if (clientAddress == null) { + logAndRaiseException("Client should send data first before sever send data"); + } + + // Transfer Data + buffer.flip(); + clientToProxyChannel.send(buffer, clientAddress); + buffer.clear(); + LOGGER.debug("{} bytes transfered to Client", buffer.remaining(), clientAddress); + } + + private void reassignServerSideProxyAddress() { + LOGGER.trace("Changing Server Side Proxy Address"); + DatagramChannel previousChannel = proxyToServerChannel; + try { + proxyToServerChannel = DatagramChannel.open(); + proxyToServerChannel.configureBlocking(false); + proxyToServerChannel.register(selector, SelectionKey.OP_READ); + proxyToServerChannel.connect(serverAddress); + selector.wakeup(); + LOGGER.debug("Server Side Proxy Address Changed from {} to {}", previousChannel.getLocalAddress(), + proxyToServerChannel.getLocalAddress()); + } catch (IOException e) { + logAndRaiseException("Unable to create new channel when trying to get change Server Side Proxy Address", e); + } finally { + changeServerSideProxyAddress = false; + try { + previousChannel.close(); + } catch (IOException e) { + logAndRaiseException( + "Unable to close previous channel when trying to get change Server Side Proxy Address", e); + } + } + } + + private void logAndRaiseException(String errorMessage) { + LOGGER.error(errorMessage); + throw new IllegalStateException(errorMessage); + } + + private void logAndRaiseException(String errorMessage, Exception exp) { + LOGGER.error(errorMessage, exp); + throw new IllegalStateException(errorMessage, exp); + } + + /** + * @return effective public address which should be exposed to client. + */ + public InetSocketAddress getClientSideProxyAddress() { + try { + return (InetSocketAddress) clientToProxyChannel.getLocalAddress(); + } catch (IOException e) { + throw new IllegalStateException(e); + } + } + + /** + * @return address of server to proxified + */ + public InetSocketAddress getServerAddress() { + try { + return (InetSocketAddress) clientToProxyChannel.getLocalAddress(); + } catch (IOException e) { + throw new IllegalStateException(e); + } + } + + /** + * @return will assign a new server side proxy address which can be used to simulate client address change. + */ + public void changeServerSideProxyAddress() { + changeServerSideProxyAddress = true; + selector.wakeup(); + // Wait address effectively changed (we wait 10x100 ms max) + for (int i = 0; i < 10 && changeServerSideProxyAddress; i++) { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + + public void stop() { + running = false; + executor.shutdown(); + } +}