Skip to content

Commit

Permalink
[Java] AeronArchive.Context.controlResponseChannel. Derive controlRes…
Browse files Browse the repository at this point in the history
…ponseChannel within Archive for the client AeronArchive.
  • Loading branch information
mikeb01 committed Sep 11, 2022
1 parent 1e33f68 commit 03a43f1
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 14 deletions.
23 changes: 23 additions & 0 deletions aeron-archive/src/main/java/io/aeron/archive/Archive.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.function.Supplier;

import static io.aeron.CommonContext.ENDPOINT_PARAM_NAME;
import static io.aeron.archive.ArchiveThreadingMode.DEDICATED;
import static io.aeron.logbuffer.LogBufferDescriptor.TERM_MAX_LENGTH;
import static io.aeron.logbuffer.LogBufferDescriptor.TERM_MIN_LENGTH;
Expand Down Expand Up @@ -1185,6 +1186,28 @@ else if (segmentFileLength < TERM_MIN_LENGTH || segmentFileLength > TERM_MAX_LEN
archiveClientContext = new AeronArchive.Context();
}

if (null == archiveClientContext.controlResponseChannel())
{
final ChannelUri controlChannelUri = ChannelUri.parse(controlChannel);
final String endpoint = controlChannelUri.get(ENDPOINT_PARAM_NAME);
int separatorIndex = -1;
if (null == endpoint || -1 == (separatorIndex = endpoint.lastIndexOf(':')))
{
throw new ConfigurationException(
"Unable to derive Archive.Context.archiveClientContext.controlResponseChannel as " +
"Archive.Context.controlChannel.endpoint=" + endpoint +
" and is not in the <host>:<port> format");

}
final String responseEndpoint = endpoint.substring(0, separatorIndex) + ":0";
final String responseChannel = new ChannelUriStringBuilder()
.media("udp")
.endpoint(responseEndpoint)
.build();

archiveClientContext.controlResponseChannel(responseChannel);
}

archiveClientContext.aeron(aeron).lock(NoOpLock.INSTANCE).errorHandler(errorHandler);

if (null == controlSessionsCounter)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2580,11 +2580,6 @@ public static final class Configuration
*/
public static final String CONTROL_RESPONSE_CHANNEL_PROP_NAME = "aeron.archive.control.response.channel";

/**
* Default channel for receiving control response messages from an archive.
*/
public static final String CONTROL_RESPONSE_CHANNEL_DEFAULT = "aeron:udp?endpoint=localhost:0";

/**
* Stream id within a channel for receiving control messages from an archive.
*/
Expand Down Expand Up @@ -2757,15 +2752,13 @@ public static int localControlStreamId()
}

/**
* The value {@link #CONTROL_RESPONSE_CHANNEL_DEFAULT} or system property
* {@link #CONTROL_RESPONSE_CHANNEL_PROP_NAME} if set.
* The value of system property {@link #CONTROL_RESPONSE_CHANNEL_PROP_NAME} if set, null otherwise.
*
* @return {@link #CONTROL_RESPONSE_CHANNEL_DEFAULT} or system property
* {@link #CONTROL_RESPONSE_CHANNEL_PROP_NAME} if set.
* @return of system property {@link #CONTROL_RESPONSE_CHANNEL_PROP_NAME} if set.
*/
public static String controlResponseChannel()
{
return System.getProperty(CONTROL_RESPONSE_CHANNEL_PROP_NAME, CONTROL_RESPONSE_CHANNEL_DEFAULT);
return System.getProperty(CONTROL_RESPONSE_CHANNEL_PROP_NAME);
}

/**
Expand Down Expand Up @@ -2884,6 +2877,11 @@ public void conclude()
throw new ConfigurationException("AeronArchive.Context.controlRequestChannel must be set");
}

if (null == controlResponseChannel)
{
throw new ConfigurationException("AeronArchive.Context.controlResponseChannel must be set");
}

if (null == aeron)
{
aeron = Aeron.connect(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

class ArchiveContextTests
class ArchiveContextTest
{
private final Archive.Context context = TestContexts.localhostArchive();

Expand Down Expand Up @@ -137,6 +137,28 @@ void shouldThrowIfReplicationChannelIsNotSet()
assertThrows(ConfigurationException.class, context::conclude);
}

@Test
void shouldDeriveArchiveClientContextResponseChannelFromArchiveControlChannel()
{
context.controlChannel("aeron:udp?endpoint=127.0.0.2:23005");
context.conclude();
assertEquals("aeron:udp?endpoint=127.0.0.2:0", context.archiveClientContext().controlResponseChannel());
}

@Test
void shouldThrowConfigurationExceptionIfUnableToDeriveArchiveClientContextResponseChannelDueToEndpointFormat()
{
context.controlChannel("aeron:udp?endpoint=some_logical_name");
assertThrows(ConfigurationException.class, context::conclude);
}

@Test
void shouldThrowConfigurationExceptionIfUnableToDeriveArchiveClientContextResponseChannelDueToEndpointNull()
{
context.controlChannel("aeron:udp?control-mode=dynamic|control=192.168.0.1:12345");
assertThrows(ConfigurationException.class, context::conclude);
}

public static class TestAuthorisationSupplier implements AuthorisationServiceSupplier
{
public AuthorisationService get()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,21 @@
public class TestContexts
{
public static final String LOCALHOST_REPLICATION_CHANNEL = "aeron:udp?endpoint=localhost:0";
public static final String LOCALHOST_CONTROL_CHANNEL = "aeron:udp?endpoint=localhost:8010";
public static final String LOCALHOST_CONTROL_REQUEST_CHANNEL = "aeron:udp?endpoint=localhost:8010";
public static final String LOCALHOST_CONTROL_RESPONSE_CHANNEL = "aeron:udp?endpoint=localhost:0";

public static Archive.Context localhostArchive()
{
return new Archive.Context()
.controlChannel(LOCALHOST_CONTROL_CHANNEL)
.controlChannel(LOCALHOST_CONTROL_REQUEST_CHANNEL)
.replicationChannel(LOCALHOST_REPLICATION_CHANNEL);
}

public static AeronArchive.Context localhostAeronArchive()
{
return new AeronArchive.Context()
.controlRequestChannel(LOCALHOST_CONTROL_CHANNEL);
.controlRequestChannel(LOCALHOST_CONTROL_REQUEST_CHANNEL)
.controlResponseChannel(LOCALHOST_CONTROL_RESPONSE_CHANNEL);
}

}

0 comments on commit 03a43f1

Please sign in to comment.