From d2e26702fc9e5008f016711d58ebeaabff4fa396 Mon Sep 17 00:00:00 2001 From: Michael Barker Date: Mon, 31 May 2021 18:14:20 +1200 Subject: [PATCH] [Java] Support constructing a ChannelUriStringBuilder from an existing URI (String or ChannelUri). Use Aeron::nextCorrelationId throughout tests to show most appropriate usage. Encapsulate handling of tagged session ids in the ChannelUriStringBuilder. (#1186) --- .../io/aeron/archive/ArchiveConductor.java | 24 +-- .../src/main/java/io/aeron/ChannelUri.java | 37 +++++ .../io/aeron/ChannelUriStringBuilder.java | 148 ++++++++++++++++-- .../io/aeron/ChannelUriStringBuilderTest.java | 28 ++++ .../io/aeron/MultiDestinationCastTest.java | 11 +- .../MultiDestinationSubscriptionTest.java | 49 ++++-- .../io/aeron/ResolvedEndpointSystemTest.java | 29 +++- .../java/io/aeron/SpySubscriptionTest.java | 28 +++- .../aeron/archive/ReplicateRecordingTest.java | 4 +- 9 files changed, 293 insertions(+), 65 deletions(-) diff --git a/aeron-archive/src/main/java/io/aeron/archive/ArchiveConductor.java b/aeron-archive/src/main/java/io/aeron/archive/ArchiveConductor.java index 17ae014687..6680e9fa04 100644 --- a/aeron-archive/src/main/java/io/aeron/archive/ArchiveConductor.java +++ b/aeron-archive/src/main/java/io/aeron/archive/ArchiveConductor.java @@ -1444,7 +1444,7 @@ private int runTasks(final ArrayDeque taskQueue) private static ChannelUriStringBuilder strippedChannelBuilder(final ChannelUri channelUri) { - final ChannelUriStringBuilder builder = new ChannelUriStringBuilder() + return new ChannelUriStringBuilder() .media(channelUri.media()) .endpoint(channelUri) .networkInterface(channelUri) @@ -1460,28 +1460,8 @@ private static ChannelUriStringBuilder strippedChannelBuilder(final ChannelUri c .socketRcvbufLength(channelUri) .socketSndbufLength(channelUri) .receiverWindowLength(channelUri) + .sessionId(channelUri) .alias(channelUri); - - final String sessionIdStr = channelUri.get(CommonContext.SESSION_ID_PARAM_NAME); - if (null != sessionIdStr) - { - if (ChannelUri.isTagged(sessionIdStr)) - { - final long tag = ChannelUri.getTag(sessionIdStr); - if (tag < Integer.MIN_VALUE || tag > Integer.MAX_VALUE) - { - throw new IllegalArgumentException("invalid session id tag value: " + tag); - } - - builder.isSessionIdTagged(true).sessionId((int)tag); - } - else - { - builder.sessionId(Integer.valueOf(sessionIdStr)); - } - } - - return builder; } private static String makeKey(final int streamId, final ChannelUri channelUri) diff --git a/aeron-client/src/main/java/io/aeron/ChannelUri.java b/aeron-client/src/main/java/io/aeron/ChannelUri.java index a7a2a292a7..0f675c8565 100644 --- a/aeron-client/src/main/java/io/aeron/ChannelUri.java +++ b/aeron-client/src/main/java/io/aeron/ChannelUri.java @@ -264,6 +264,9 @@ public boolean equals(final Object o) final ChannelUri that = (ChannelUri)o; + final HashSet missedkeys = new HashSet<>(params.keySet()); + final boolean b = missedkeys.removeAll(that.params.keySet()); + return Objects.equals(prefix, that.prefix) && Objects.equals(media, that.media) && Objects.equals(params, that.params) && @@ -585,4 +588,38 @@ private static int countTags(final String tags) return count; } + + Map diff(final ChannelUri that) + { + final HashMap differingValues = new HashMap<>(); + + if (!Objects.equals(prefix, that.prefix)) + { + differingValues.put("prefix", prefix + " != " + that.prefix); + } + + if (!Objects.equals(media, that.media)) + { + differingValues.put("media", media + " != " + that.media); + } + + if (!Objects.equals(params, that.params)) + { + params.forEach((key, value) -> + { + final String thatValue = that.params.get(key); + if (!Objects.equals(value, thatValue)) + { + differingValues.put(key, value + " != " + thatValue); + } + }); + } + + if (!Arrays.equals(tags, that.tags)) + { + differingValues.put(TAGS_PARAM_NAME, Arrays.toString(tags) + " != " + Arrays.toString(that.tags)); + } + + return differingValues; + } } diff --git a/aeron-client/src/main/java/io/aeron/ChannelUriStringBuilder.java b/aeron-client/src/main/java/io/aeron/ChannelUriStringBuilder.java index b2121e607c..b063de3429 100644 --- a/aeron-client/src/main/java/io/aeron/ChannelUriStringBuilder.java +++ b/aeron-client/src/main/java/io/aeron/ChannelUriStringBuilder.java @@ -58,7 +58,7 @@ public final class ChannelUriStringBuilder private Integer initialTermId; private Integer termId; private Integer termOffset; - private Integer sessionId; + private Long sessionId; private Long groupTag; private Long linger; private Boolean sparse; @@ -72,6 +72,64 @@ public final class ChannelUriStringBuilder private Integer socketRcvbufLength; private Integer receiverWindowLength; + /** + * Default constructor + */ + public ChannelUriStringBuilder() + { + } + + /** + * Constructs the ChannelUriStringBuilder with the initial values derived from the supplied URI. Will parse the + * incoming URI during this process, so could through an exception at this point of the URI is badly formed. + * + * @param initialUri initial values for the builder. + */ + public ChannelUriStringBuilder(final String initialUri) + { + this(ChannelUri.parse(initialUri)); + } + + /** + * Constructs the ChannelUriStringBuilder with the initial values derived from the supplied ChannelUri. + * + * @param channelUri initial values for the builder. + */ + public ChannelUriStringBuilder(final ChannelUri channelUri) + { + isSessionIdTagged = false; + + prefix(channelUri); + media(channelUri); + endpoint(channelUri); + networkInterface(channelUri); + controlEndpoint(channelUri); + controlMode(channelUri); + tags(channelUri); + alias(channelUri); + congestionControl(channelUri); + flowControl(channelUri); + reliable(channelUri); + ttl(channelUri); + mtu(channelUri); + termLength(channelUri); + initialTermId(channelUri); + termId(channelUri); + termOffset(channelUri); + sessionId(channelUri); + group(channelUri); + linger(channelUri); + sparse(channelUri); + eos(channelUri); + tether(channelUri); + groupTag(channelUri); + rejoin(channelUri); + spiesSimulateConnection(channelUri); + socketRcvbufLength(channelUri); + socketSndbufLength(channelUri); + receiverWindowLength(channelUri); + } + /** * Clear out all the values thus setting back to the initial state. * @@ -777,29 +835,64 @@ public Integer termOffset() */ public ChannelUriStringBuilder sessionId(final Integer sessionId) { - this.sessionId = sessionId; + this.sessionId = null != sessionId ? sessionId.longValue() : null; return this; } /** - * Set the sessionId value to be what is in the {@link ChannelUri} which may be null. + * Set the session id for a publication or restricted subscription from a formatted string. Supports a format of + * either a string encoded signed 32 bit number or 'tag:' followed by a signed 64 bit value. * - * @param channelUri to read the value from. + * @param sessionIdStr for the publication or a restricted subscription. * @return this for a fluent API. * @see CommonContext#SESSION_ID_PARAM_NAME */ - public ChannelUriStringBuilder sessionId(final ChannelUri channelUri) + public ChannelUriStringBuilder sessionId(final String sessionIdStr) { - final String sessionIdValue = channelUri.get(SESSION_ID_PARAM_NAME); - if (null == sessionIdValue) + if (null != sessionIdStr) { - sessionId = null; - return this; + if (ChannelUri.isTagged(sessionIdStr)) + { + taggedSessionId(ChannelUri.getTag(sessionIdStr)); + } + else + { + isSessionIdTagged(false); + sessionId(Integer.valueOf(sessionIdStr)); + } } else { - return sessionId(Integer.valueOf(sessionIdValue)); + sessionId((Integer)null); } + + return this; + } + + /** + * Set the session id for a publication or restricted subscription as a tag referenced value. + * + * @param sessionId for the publication or a restricted subscription. + * @return this for a fluent API. + * @see CommonContext#SESSION_ID_PARAM_NAME + */ + public ChannelUriStringBuilder taggedSessionId(final Long sessionId) + { + isSessionIdTagged(true); + this.sessionId = sessionId; + return this; + } + + /** + * Set the sessionId value to be what is in the {@link ChannelUri} which may be null. + * + * @param channelUri to read the value from. + * @return this for a fluent API. + * @see CommonContext#SESSION_ID_PARAM_NAME + */ + public ChannelUriStringBuilder sessionId(final ChannelUri channelUri) + { + return sessionId(channelUri.get(SESSION_ID_PARAM_NAME)); } /** @@ -807,10 +900,14 @@ public ChannelUriStringBuilder sessionId(final ChannelUri channelUri) * * @return the session id for a publication or restricted subscription. * @see CommonContext#SESSION_ID_PARAM_NAME + * @deprecated this method will not correctly handle tagged sessionId values that are outside the range of + * a signed 32 bit number. If this is called and a tagged value outside this range is currently held in this + * object, then the result will be the same as {@link Long#intValue()}. */ + @Deprecated public Integer sessionId() { - return sessionId; + return null != sessionId ? sessionId.intValue() : null; } /** @@ -1077,6 +1174,31 @@ public ChannelUriStringBuilder tags(final ChannelUri channelUri) return tags(channelUri.get(TAGS_PARAM_NAME)); } + /** + * Set the tags to the specified channel and publication/subscription tag {@link ChannelUri}. The + * publication/subscription may be null. If channel tag is null, then the pubSubTag must be null. + * + * @param channelTag optional value for the channel tag. + * @param pubSubTag option value for the publication/subscription tag. + * @return this for a fluent API. + * @see CommonContext#TAGS_PARAM_NAME + * @throws IllegalArgumentException if channelTag is null and pubSubTag is not. + */ + public ChannelUriStringBuilder tags(final Long channelTag, final Long pubSubTag) + { + if (null == channelTag && null != pubSubTag) + { + throw new IllegalArgumentException("null == channelTag && null != pubSubTag"); + } + + if (null == channelTag) + { + return tags((String)null); + } + + return tags(channelTag + (null != pubSubTag ? "," + pubSubTag : "")); + } + /** * Get the tags for a channel used by a publication or subscription. Tags can be used to identify or tag a * channel so that a configuration can be referenced and reused. @@ -1105,7 +1227,7 @@ public ChannelUriStringBuilder isSessionIdTagged(final boolean isSessionIdTagged } /** - * Is the value for {@link #sessionId()} a tagged. + * Is the value for {@link #sessionId()} a tag. * * @return whether the value for {@link #sessionId()} a tag reference or not. * @see CommonContext#TAGS_PARAM_NAME @@ -1770,7 +1892,7 @@ public String toString() return build(); } - private static String prefixTag(final boolean isTagged, final Integer value) + private static String prefixTag(final boolean isTagged, final Long value) { return isTagged ? TAG_PREFIX + value : value.toString(); } diff --git a/aeron-client/src/test/java/io/aeron/ChannelUriStringBuilderTest.java b/aeron-client/src/test/java/io/aeron/ChannelUriStringBuilderTest.java index a951849132..f071f683a9 100644 --- a/aeron-client/src/test/java/io/aeron/ChannelUriStringBuilderTest.java +++ b/aeron-client/src/test/java/io/aeron/ChannelUriStringBuilderTest.java @@ -17,6 +17,8 @@ import org.junit.jupiter.api.Test; +import java.util.Collections; + import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -127,4 +129,30 @@ public void shouldGenerateChannelWithReceiverWindow() "aeron:udp?endpoint=address:9999|rcv-wnd=8192", builder.build()); } + + @Test + void shouldBuildChannelBuilderUsingExistingStringWithAllTheFields() + { + final String uri = "aeron-spy:aeron:udp?endpoint=127.0.0.1:0|interface=127.0.0.1|control=127.0.0.2:0|" + + "control-mode=manual|tags=2,4|alias=foo|cc=cubic|fc=min|reliable=false|ttl=16|mtu=8992|" + + "term-length=1048576|init-term-id=5|term-offset=64|term-id=4353|session-id=2314234|gtag=3|linger=0|" + + "sparse=true|eos=true|tether=false|group=false|ssc=true|so-sndbuf=8388608|so-rcvbuf=2097152|" + + "rcv-wnd=1048576"; + + final ChannelUri fromString = ChannelUri.parse(uri); + final ChannelUri fromBuilder = ChannelUri.parse(new ChannelUriStringBuilder(uri).build()); + + assertEquals(Collections.emptyMap(), fromString.diff(fromBuilder)); + } + + @Test + void shouldBuildChannelBuilderUsingExistingStringWithTaggedSessionIdAndIpc() + { + final String uri = "aeron:ipc?session-id=tag:123456"; + + final ChannelUri fromString = ChannelUri.parse(uri); + final ChannelUri fromBuilder = ChannelUri.parse(new ChannelUriStringBuilder(uri).build()); + + assertEquals(Collections.emptyMap(), fromString.diff(fromBuilder)); + } } diff --git a/aeron-system-tests/src/test/java/io/aeron/MultiDestinationCastTest.java b/aeron-system-tests/src/test/java/io/aeron/MultiDestinationCastTest.java index 4cfcac47d3..609c58a165 100644 --- a/aeron-system-tests/src/test/java/io/aeron/MultiDestinationCastTest.java +++ b/aeron-system-tests/src/test/java/io/aeron/MultiDestinationCastTest.java @@ -53,7 +53,7 @@ public class MultiDestinationCastTest private static final String SUB2_MDC_DYNAMIC_URI = "aeron:udp?control=localhost:24325|group=true"; private static final String SUB3_MDC_DYNAMIC_URI = CommonContext.SPY_PREFIX + PUB_MDC_DYNAMIC_URI; - private static final String PUB_MDC_MANUAL_URI = "aeron:udp?control-mode=manual|tags=3,4"; + private static final String PUB_MDC_MANUAL_URI = "aeron:udp?control-mode=manual"; private static final String SUB1_MDC_MANUAL_URI = "aeron:udp?endpoint=localhost:24326|group=true"; private static final String SUB2_MDC_MANUAL_URI = "aeron:udp?endpoint=localhost:24327|group=true"; private static final String SUB3_MDC_MANUAL_URI = CommonContext.SPY_PREFIX + PUB_MDC_MANUAL_URI; @@ -140,11 +140,16 @@ public void shouldSpinUpAndShutdownWithManual() { launch(Tests::onError); + final String taggedMdcUri = new ChannelUriStringBuilder(PUB_MDC_MANUAL_URI).tags( + clientA.nextCorrelationId(), + clientA.nextCorrelationId()).build(); + final String spySubscriptionUri = new ChannelUriStringBuilder(taggedMdcUri).prefix("aeron-spy").build(); + subscriptionA = clientA.addSubscription(SUB1_MDC_MANUAL_URI, STREAM_ID); subscriptionB = clientB.addSubscription(SUB2_MDC_MANUAL_URI, STREAM_ID); - subscriptionC = clientA.addSubscription(SUB3_MDC_MANUAL_URI, STREAM_ID); + subscriptionC = clientA.addSubscription(spySubscriptionUri, STREAM_ID); - publication = clientA.addPublication(PUB_MDC_MANUAL_URI, STREAM_ID); + publication = clientA.addPublication(taggedMdcUri, STREAM_ID); publication.addDestination(SUB1_MDC_MANUAL_URI); final long correlationId = publication.asyncAddDestination(SUB2_MDC_MANUAL_URI); diff --git a/aeron-system-tests/src/test/java/io/aeron/MultiDestinationSubscriptionTest.java b/aeron-system-tests/src/test/java/io/aeron/MultiDestinationSubscriptionTest.java index 38fd8dfdfa..4252832597 100644 --- a/aeron-system-tests/src/test/java/io/aeron/MultiDestinationSubscriptionTest.java +++ b/aeron-system-tests/src/test/java/io/aeron/MultiDestinationSubscriptionTest.java @@ -301,11 +301,18 @@ public void shouldFindMdsSubscriptionWithTags() { launch(Tests::onError); - subscription = clientA.addSubscription(SUB_URI + "|tags=1001", STREAM_ID); + final long tagA = clientA.nextCorrelationId(); + final long tagIgnored = clientA.nextCorrelationId(); + + final String taggedSubUri = new ChannelUriStringBuilder(SUB_URI).tags(tagA, null).build(); + final String taggedSubUriIgnored = new ChannelUriStringBuilder(SUB_URI).tags(tagIgnored, null).build(); + final String referringSubUri = new ChannelUriStringBuilder().media("udp").tags(tagA, null).build(); + + subscription = clientA.addSubscription(taggedSubUri, STREAM_ID); subscription.addDestination(PUB_UNICAST_URI); - try (Subscription ignore = clientA.addSubscription(SUB_URI + "|tags=1002", STREAM_ID); - Subscription subscriptionA1 = clientA.addSubscription("aeron:udp?tags=1001", STREAM_ID)) + try (Subscription ignore = clientA.addSubscription(taggedSubUriIgnored, STREAM_ID); + Subscription subscriptionA1 = clientA.addSubscription(referringSubUri, STREAM_ID)) { publicationA = clientA.addPublication(PUB_UNICAST_URI, STREAM_ID); @@ -330,12 +337,20 @@ public void shouldAllowMultipleMdsSubscriptionsWithTags() launch(Tests::onError); - subscription = clientA.addSubscription(SUB_URI + "|tags=1001", STREAM_ID); + final long tagA = clientA.nextCorrelationId(); + final long tagB = clientA.nextCorrelationId(); + + final String uriA = new ChannelUriStringBuilder(SUB_URI).tags(tagA, null).build(); + final String referringUriA = new ChannelUriStringBuilder().media("udp").tags(tagA, null).build(); + final String uriB = new ChannelUriStringBuilder(SUB_URI).tags(tagB, null).build(); + final String referringUriB = new ChannelUriStringBuilder().media("udp").tags(tagB, null).build(); + + subscription = clientA.addSubscription(uriA, STREAM_ID); subscription.addDestination(PUB_UNICAST_URI); - try (Subscription subscriptionB = clientA.addSubscription(SUB_URI + "|tags=1002", STREAM_ID); - Subscription subscriptionA1 = clientA.addSubscription("aeron:udp?tags=1001", STREAM_ID); - Subscription subscriptionB1 = clientA.addSubscription("aeron:udp?tags=1002", STREAM_ID)) + try (Subscription subscriptionB = clientA.addSubscription(uriB, STREAM_ID); + Subscription subscriptionA1 = clientA.addSubscription(referringUriA, STREAM_ID); + Subscription subscriptionB1 = clientA.addSubscription(referringUriB, STREAM_ID)) { subscriptionB.addDestination(unicastUri2); @@ -379,18 +394,22 @@ public void shouldAllowMultipleMdsSubscriptionsWithTags() public void shouldSendToSingleDestinationMultipleSubscriptionsWithUnicast() { final int numMessagesToSend = NUM_MESSAGES_PER_TERM * 3; - final String tags = "1,2"; launch(Tests::onError); + final long channelTag = clientA.nextCorrelationId(); + final long subTag = clientA.nextCorrelationId(); + final String subscriptionChannel = new ChannelUriStringBuilder() .media(CommonContext.UDP_MEDIA) - .tags(tags) + .tags(channelTag, subTag) .controlMode(CommonContext.MDC_CONTROL_MODE_MANUAL) .build(); + final String copyChannel = new ChannelUriStringBuilder().media("udp").tags(channelTag, subTag).build(); + subscription = clientA.addSubscription(subscriptionChannel, STREAM_ID); - final Subscription copySubscription = clientA.addSubscription(subscriptionChannel, STREAM_ID); + final Subscription copySubscription = clientA.addSubscription(copyChannel, STREAM_ID); subscription.addDestination(PUB_UNICAST_URI); publicationA = clientA.addPublication(PUB_UNICAST_URI, STREAM_ID); @@ -475,13 +494,14 @@ public void shouldSendToMultipleDestinationSubscriptionWithSameStream() final int numMessagesToSend = NUM_MESSAGES_PER_TERM * 3; final int numMessagesToSendForA = numMessagesToSend / 2; final int numMessagesToSendForB = numMessagesToSend / 2; - final String tags = "1,2"; - final int pubTag = 2; launch(Tests::onError); + final long channelTag = clientA.nextCorrelationId(); + final long pubTag = clientA.nextCorrelationId(); + final String publicationChannelA = new ChannelUriStringBuilder() - .tags(tags) + .tags(channelTag, pubTag) .media(CommonContext.UDP_MEDIA) .endpoint(UNICAST_ENDPOINT_A) .build(); @@ -511,8 +531,7 @@ public void shouldSendToMultipleDestinationSubscriptionWithSameStream() final String publicationChannelB = new ChannelUriStringBuilder() .media(CommonContext.UDP_MEDIA) - .isSessionIdTagged(true) - .sessionId(pubTag) + .taggedSessionId(pubTag) .initialTermId(initialTermId) .termId(termId) .termOffset(termOffset) diff --git a/aeron-system-tests/src/test/java/io/aeron/ResolvedEndpointSystemTest.java b/aeron-system-tests/src/test/java/io/aeron/ResolvedEndpointSystemTest.java index 9388d76450..fad85b07c3 100644 --- a/aeron-system-tests/src/test/java/io/aeron/ResolvedEndpointSystemTest.java +++ b/aeron-system-tests/src/test/java/io/aeron/ResolvedEndpointSystemTest.java @@ -109,9 +109,20 @@ void shouldSubscribeWithSystemAssignedPort() @Timeout(5) void shouldSubscribeToSystemAssignedPorts() { - final String systemAssignedPortUri1 = "aeron:udp?endpoint=127.0.0.1:0|tags=1002"; - final String systemAssignedPortUri2 = "aeron:udp?endpoint=127.0.0.1:0|tags=1003"; - final String tagged1 = "aeron:udp?tags=1002"; + final long tag1 = client.nextCorrelationId(); + final long tag2 = client.nextCorrelationId(); + + final String systemAssignedPortUri1 = new ChannelUriStringBuilder() + .media("udp") + .endpoint("127.0.0.1:0") + .tags(tag1, null) + .build(); + final String systemAssignedPortUri2 = new ChannelUriStringBuilder() + .media("udp") + .endpoint("127.0.0.1:0") + .tags(tag2, null) + .build(); + final String tagged1 = new ChannelUriStringBuilder().media("udp").tags(tag1, null).build(); try (Subscription sub1 = client.addSubscription(systemAssignedPortUri1, STREAM_ID); Subscription sub2 = client.addSubscription(systemAssignedPortUri2, STREAM_ID); @@ -161,12 +172,18 @@ void shouldSubscribeToSystemAssignedPorts() void shouldSubscribeToSystemAssignedPortsUsingIPv6() { assumeFalse("true".equals(System.getProperty("java.net.preferIPv4Stack"))); + final long channelTag = client.nextCorrelationId(); + + final String systemAssignedPortUri = new ChannelUriStringBuilder() + .media("udp") + .endpoint("[::1]:0") + .tags(channelTag, null) + .build(); + final String taggedUri = new ChannelUriStringBuilder().media("udp").tags(channelTag, null).build(); - final String systemAssignedPortUri = "aeron:udp?endpoint=[::1]:0|tags=1001"; - final String tagged2 = "aeron:udp?tags=1001"; try (Subscription sub1 = client.addSubscription(systemAssignedPortUri, STREAM_ID); - Subscription sub2 = client.addSubscription(tagged2, STREAM_ID + 1)) + Subscription sub2 = client.addSubscription(taggedUri, STREAM_ID + 1)) { List bindAddressAndPort1; while ((bindAddressAndPort1 = sub1.localSocketAddresses()).isEmpty()) diff --git a/aeron-system-tests/src/test/java/io/aeron/SpySubscriptionTest.java b/aeron-system-tests/src/test/java/io/aeron/SpySubscriptionTest.java index e296150113..214712f6ce 100644 --- a/aeron-system-tests/src/test/java/io/aeron/SpySubscriptionTest.java +++ b/aeron-system-tests/src/test/java/io/aeron/SpySubscriptionTest.java @@ -122,19 +122,39 @@ public void shouldReceivePublishedMessage(final String channel) @Timeout(10) public void shouldConnectToRecreatedChannelByTag() { - final String channelOne = "aeron:udp?tags=1|endpoint=localhost:24325"; + final long tag1 = aeron.nextCorrelationId(); + final String channelOne = new ChannelUriStringBuilder() + .media("udp") + .tags(tag1, null) + .endpoint("localhost:24325") + .build(); + final ChannelUriStringBuilder spyChannelOneBuilder = new ChannelUriStringBuilder() + .prefix("aeron-spy") + .media("udp") + .tags(tag1, null); + try (Publication publication = aeron.addExclusivePublication(channelOne, STREAM_ID); Subscription spy = aeron.addSubscription( - SPY_PREFIX + "aeron:udp?tags=1|session-id=" + publication.sessionId(), STREAM_ID)) + spyChannelOneBuilder.sessionId(publication.sessionId()).build(), STREAM_ID)) { Tests.await(spy::isConnected); assertNotNull(spy.imageBySessionId(publication.sessionId())); } - final String channelTwo = "aeron:udp?tags=2|endpoint=localhost:24325"; + final long tag2 = aeron.nextCorrelationId(); + final String channelTwo = new ChannelUriStringBuilder() + .media("udp") + .tags(tag2, null) + .endpoint("localhost:24325") + .build(); + final ChannelUriStringBuilder spyChannelTwoBuilder = new ChannelUriStringBuilder() + .prefix("aeron-spy") + .media("udp") + .tags(tag2, null); + try (Publication publication = aeron.addExclusivePublication(channelTwo, STREAM_ID); Subscription spy = aeron.addSubscription( - SPY_PREFIX + "aeron:udp?tags=2|session-id=" + publication.sessionId(), STREAM_ID)) + spyChannelTwoBuilder.sessionId(publication.sessionId()).build(), STREAM_ID)) { Tests.await(spy::isConnected); assertNotNull(spy.imageBySessionId(publication.sessionId())); diff --git a/aeron-system-tests/src/test/java/io/aeron/archive/ReplicateRecordingTest.java b/aeron-system-tests/src/test/java/io/aeron/archive/ReplicateRecordingTest.java index 400996d0b3..58c65e9a04 100644 --- a/aeron-system-tests/src/test/java/io/aeron/archive/ReplicateRecordingTest.java +++ b/aeron-system-tests/src/test/java/io/aeron/archive/ReplicateRecordingTest.java @@ -554,8 +554,8 @@ public void shouldReplicateLiveRecordingAndMergeWhileFollowingWithTaggedSubscrip final String messagePrefix = "Message-Prefix-"; final int messageCount = 10; final long srcRecordingId; - final long channelTagId = 333; - final long subscriptionTagId = 777; + final long channelTagId = dstAeron.nextCorrelationId(); + final long subscriptionTagId = dstAeron.nextCorrelationId(); final String taggedChannel = "aeron:udp?control-mode=manual|rejoin=false|tags=" + channelTagId + "," + subscriptionTagId;