From 03a43f1781f9c9dd143d4fe09dff01360b58a6aa Mon Sep 17 00:00:00 2001 From: Michael Barker Date: Fri, 9 Sep 2022 11:00:41 +1200 Subject: [PATCH] [Java] AeronArchive.Context.controlResponseChannel. Derive controlResponseChannel within Archive for the client AeronArchive. --- .../main/java/io/aeron/archive/Archive.java | 23 ++++++++++++++++++ .../io/aeron/archive/client/AeronArchive.java | 18 +++++++------- ...textTests.java => ArchiveContextTest.java} | 24 ++++++++++++++++++- .../main/java/io/aeron/test/TestContexts.java | 8 ++++--- 4 files changed, 59 insertions(+), 14 deletions(-) rename aeron-archive/src/test/java/io/aeron/archive/{ArchiveContextTests.java => ArchiveContextTest.java} (85%) diff --git a/aeron-archive/src/main/java/io/aeron/archive/Archive.java b/aeron-archive/src/main/java/io/aeron/archive/Archive.java index def182d6ca7..f29ffd2cb41 100644 --- a/aeron-archive/src/main/java/io/aeron/archive/Archive.java +++ b/aeron-archive/src/main/java/io/aeron/archive/Archive.java @@ -47,6 +47,7 @@ import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.function.Supplier; +import static io.aeron.CommonContext.ENDPOINT_PARAM_NAME; import static io.aeron.archive.ArchiveThreadingMode.DEDICATED; import static io.aeron.logbuffer.LogBufferDescriptor.TERM_MAX_LENGTH; import static io.aeron.logbuffer.LogBufferDescriptor.TERM_MIN_LENGTH; @@ -1185,6 +1186,28 @@ else if (segmentFileLength < TERM_MIN_LENGTH || segmentFileLength > TERM_MAX_LEN archiveClientContext = new AeronArchive.Context(); } + if (null == archiveClientContext.controlResponseChannel()) + { + final ChannelUri controlChannelUri = ChannelUri.parse(controlChannel); + final String endpoint = controlChannelUri.get(ENDPOINT_PARAM_NAME); + int separatorIndex = -1; + if (null == endpoint || -1 == (separatorIndex = endpoint.lastIndexOf(':'))) + { + throw new ConfigurationException( + "Unable to derive Archive.Context.archiveClientContext.controlResponseChannel as " + + "Archive.Context.controlChannel.endpoint=" + endpoint + + " and is not in the : format"); + + } + final String responseEndpoint = endpoint.substring(0, separatorIndex) + ":0"; + final String responseChannel = new ChannelUriStringBuilder() + .media("udp") + .endpoint(responseEndpoint) + .build(); + + archiveClientContext.controlResponseChannel(responseChannel); + } + archiveClientContext.aeron(aeron).lock(NoOpLock.INSTANCE).errorHandler(errorHandler); if (null == controlSessionsCounter) diff --git a/aeron-archive/src/main/java/io/aeron/archive/client/AeronArchive.java b/aeron-archive/src/main/java/io/aeron/archive/client/AeronArchive.java index 2798c213ca6..87ab22594b7 100644 --- a/aeron-archive/src/main/java/io/aeron/archive/client/AeronArchive.java +++ b/aeron-archive/src/main/java/io/aeron/archive/client/AeronArchive.java @@ -2580,11 +2580,6 @@ public static final class Configuration */ public static final String CONTROL_RESPONSE_CHANNEL_PROP_NAME = "aeron.archive.control.response.channel"; - /** - * Default channel for receiving control response messages from an archive. - */ - public static final String CONTROL_RESPONSE_CHANNEL_DEFAULT = "aeron:udp?endpoint=localhost:0"; - /** * Stream id within a channel for receiving control messages from an archive. */ @@ -2757,15 +2752,13 @@ public static int localControlStreamId() } /** - * The value {@link #CONTROL_RESPONSE_CHANNEL_DEFAULT} or system property - * {@link #CONTROL_RESPONSE_CHANNEL_PROP_NAME} if set. + * The value of system property {@link #CONTROL_RESPONSE_CHANNEL_PROP_NAME} if set, null otherwise. * - * @return {@link #CONTROL_RESPONSE_CHANNEL_DEFAULT} or system property - * {@link #CONTROL_RESPONSE_CHANNEL_PROP_NAME} if set. + * @return of system property {@link #CONTROL_RESPONSE_CHANNEL_PROP_NAME} if set. */ public static String controlResponseChannel() { - return System.getProperty(CONTROL_RESPONSE_CHANNEL_PROP_NAME, CONTROL_RESPONSE_CHANNEL_DEFAULT); + return System.getProperty(CONTROL_RESPONSE_CHANNEL_PROP_NAME); } /** @@ -2884,6 +2877,11 @@ public void conclude() throw new ConfigurationException("AeronArchive.Context.controlRequestChannel must be set"); } + if (null == controlResponseChannel) + { + throw new ConfigurationException("AeronArchive.Context.controlResponseChannel must be set"); + } + if (null == aeron) { aeron = Aeron.connect( diff --git a/aeron-archive/src/test/java/io/aeron/archive/ArchiveContextTests.java b/aeron-archive/src/test/java/io/aeron/archive/ArchiveContextTest.java similarity index 85% rename from aeron-archive/src/test/java/io/aeron/archive/ArchiveContextTests.java rename to aeron-archive/src/test/java/io/aeron/archive/ArchiveContextTest.java index a1173c9485f..101b0cb8cb0 100644 --- a/aeron-archive/src/test/java/io/aeron/archive/ArchiveContextTests.java +++ b/aeron-archive/src/test/java/io/aeron/archive/ArchiveContextTest.java @@ -35,7 +35,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -class ArchiveContextTests +class ArchiveContextTest { private final Archive.Context context = TestContexts.localhostArchive(); @@ -137,6 +137,28 @@ void shouldThrowIfReplicationChannelIsNotSet() assertThrows(ConfigurationException.class, context::conclude); } + @Test + void shouldDeriveArchiveClientContextResponseChannelFromArchiveControlChannel() + { + context.controlChannel("aeron:udp?endpoint=127.0.0.2:23005"); + context.conclude(); + assertEquals("aeron:udp?endpoint=127.0.0.2:0", context.archiveClientContext().controlResponseChannel()); + } + + @Test + void shouldThrowConfigurationExceptionIfUnableToDeriveArchiveClientContextResponseChannelDueToEndpointFormat() + { + context.controlChannel("aeron:udp?endpoint=some_logical_name"); + assertThrows(ConfigurationException.class, context::conclude); + } + + @Test + void shouldThrowConfigurationExceptionIfUnableToDeriveArchiveClientContextResponseChannelDueToEndpointNull() + { + context.controlChannel("aeron:udp?control-mode=dynamic|control=192.168.0.1:12345"); + assertThrows(ConfigurationException.class, context::conclude); + } + public static class TestAuthorisationSupplier implements AuthorisationServiceSupplier { public AuthorisationService get() diff --git a/aeron-test-support/src/main/java/io/aeron/test/TestContexts.java b/aeron-test-support/src/main/java/io/aeron/test/TestContexts.java index 751dc9a21d3..5cef4d6c344 100644 --- a/aeron-test-support/src/main/java/io/aeron/test/TestContexts.java +++ b/aeron-test-support/src/main/java/io/aeron/test/TestContexts.java @@ -21,19 +21,21 @@ public class TestContexts { public static final String LOCALHOST_REPLICATION_CHANNEL = "aeron:udp?endpoint=localhost:0"; - public static final String LOCALHOST_CONTROL_CHANNEL = "aeron:udp?endpoint=localhost:8010"; + public static final String LOCALHOST_CONTROL_REQUEST_CHANNEL = "aeron:udp?endpoint=localhost:8010"; + public static final String LOCALHOST_CONTROL_RESPONSE_CHANNEL = "aeron:udp?endpoint=localhost:0"; public static Archive.Context localhostArchive() { return new Archive.Context() - .controlChannel(LOCALHOST_CONTROL_CHANNEL) + .controlChannel(LOCALHOST_CONTROL_REQUEST_CHANNEL) .replicationChannel(LOCALHOST_REPLICATION_CHANNEL); } public static AeronArchive.Context localhostAeronArchive() { return new AeronArchive.Context() - .controlRequestChannel(LOCALHOST_CONTROL_CHANNEL); + .controlRequestChannel(LOCALHOST_CONTROL_REQUEST_CHANNEL) + .controlResponseChannel(LOCALHOST_CONTROL_RESPONSE_CHANNEL); } }