Skip to content

Commit

Permalink
[Java] Support constructing a ChannelUriStringBuilder from an existin…
Browse files Browse the repository at this point in the history
…g URI (String or ChannelUri). Use Aeron::nextCorrelationId throughout tests to show most appropriate usage. Encapsulate handling of tagged session ids in the ChannelUriStringBuilder. (#1186)
  • Loading branch information
mikeb01 authored May 31, 2021
1 parent a7955a9 commit d2e2670
Show file tree
Hide file tree
Showing 9 changed files with 293 additions and 65 deletions.
24 changes: 2 additions & 22 deletions aeron-archive/src/main/java/io/aeron/archive/ArchiveConductor.java
Original file line number Diff line number Diff line change
Expand Up @@ -1444,7 +1444,7 @@ private int runTasks(final ArrayDeque<Runnable> taskQueue)

private static ChannelUriStringBuilder strippedChannelBuilder(final ChannelUri channelUri)
{
final ChannelUriStringBuilder builder = new ChannelUriStringBuilder()
return new ChannelUriStringBuilder()
.media(channelUri.media())
.endpoint(channelUri)
.networkInterface(channelUri)
Expand All @@ -1460,28 +1460,8 @@ private static ChannelUriStringBuilder strippedChannelBuilder(final ChannelUri c
.socketRcvbufLength(channelUri)
.socketSndbufLength(channelUri)
.receiverWindowLength(channelUri)
.sessionId(channelUri)
.alias(channelUri);

final String sessionIdStr = channelUri.get(CommonContext.SESSION_ID_PARAM_NAME);
if (null != sessionIdStr)
{
if (ChannelUri.isTagged(sessionIdStr))
{
final long tag = ChannelUri.getTag(sessionIdStr);
if (tag < Integer.MIN_VALUE || tag > Integer.MAX_VALUE)
{
throw new IllegalArgumentException("invalid session id tag value: " + tag);
}

builder.isSessionIdTagged(true).sessionId((int)tag);
}
else
{
builder.sessionId(Integer.valueOf(sessionIdStr));
}
}

return builder;
}

private static String makeKey(final int streamId, final ChannelUri channelUri)
Expand Down
37 changes: 37 additions & 0 deletions aeron-client/src/main/java/io/aeron/ChannelUri.java
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,9 @@ public boolean equals(final Object o)

final ChannelUri that = (ChannelUri)o;

final HashSet<String> missedkeys = new HashSet<>(params.keySet());
final boolean b = missedkeys.removeAll(that.params.keySet());

return Objects.equals(prefix, that.prefix) &&
Objects.equals(media, that.media) &&
Objects.equals(params, that.params) &&
Expand Down Expand Up @@ -585,4 +588,38 @@ private static int countTags(final String tags)

return count;
}

Map<String, String> diff(final ChannelUri that)
{
final HashMap<String, String> differingValues = new HashMap<>();

if (!Objects.equals(prefix, that.prefix))
{
differingValues.put("prefix", prefix + " != " + that.prefix);
}

if (!Objects.equals(media, that.media))
{
differingValues.put("media", media + " != " + that.media);
}

if (!Objects.equals(params, that.params))
{
params.forEach((key, value) ->
{
final String thatValue = that.params.get(key);
if (!Objects.equals(value, thatValue))
{
differingValues.put(key, value + " != " + thatValue);
}
});
}

if (!Arrays.equals(tags, that.tags))
{
differingValues.put(TAGS_PARAM_NAME, Arrays.toString(tags) + " != " + Arrays.toString(that.tags));
}

return differingValues;
}
}
148 changes: 135 additions & 13 deletions aeron-client/src/main/java/io/aeron/ChannelUriStringBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public final class ChannelUriStringBuilder
private Integer initialTermId;
private Integer termId;
private Integer termOffset;
private Integer sessionId;
private Long sessionId;
private Long groupTag;
private Long linger;
private Boolean sparse;
Expand All @@ -72,6 +72,64 @@ public final class ChannelUriStringBuilder
private Integer socketRcvbufLength;
private Integer receiverWindowLength;

/**
* Default constructor
*/
public ChannelUriStringBuilder()
{
}

/**
* Constructs the ChannelUriStringBuilder with the initial values derived from the supplied URI. Will parse the
* incoming URI during this process, so could through an exception at this point of the URI is badly formed.
*
* @param initialUri initial values for the builder.
*/
public ChannelUriStringBuilder(final String initialUri)
{
this(ChannelUri.parse(initialUri));
}

/**
* Constructs the ChannelUriStringBuilder with the initial values derived from the supplied ChannelUri.
*
* @param channelUri initial values for the builder.
*/
public ChannelUriStringBuilder(final ChannelUri channelUri)
{
isSessionIdTagged = false;

prefix(channelUri);
media(channelUri);
endpoint(channelUri);
networkInterface(channelUri);
controlEndpoint(channelUri);
controlMode(channelUri);
tags(channelUri);
alias(channelUri);
congestionControl(channelUri);
flowControl(channelUri);
reliable(channelUri);
ttl(channelUri);
mtu(channelUri);
termLength(channelUri);
initialTermId(channelUri);
termId(channelUri);
termOffset(channelUri);
sessionId(channelUri);
group(channelUri);
linger(channelUri);
sparse(channelUri);
eos(channelUri);
tether(channelUri);
groupTag(channelUri);
rejoin(channelUri);
spiesSimulateConnection(channelUri);
socketRcvbufLength(channelUri);
socketSndbufLength(channelUri);
receiverWindowLength(channelUri);
}

/**
* Clear out all the values thus setting back to the initial state.
*
Expand Down Expand Up @@ -777,40 +835,79 @@ public Integer termOffset()
*/
public ChannelUriStringBuilder sessionId(final Integer sessionId)
{
this.sessionId = sessionId;
this.sessionId = null != sessionId ? sessionId.longValue() : null;
return this;
}

/**
* Set the sessionId value to be what is in the {@link ChannelUri} which may be null.
* Set the session id for a publication or restricted subscription from a formatted string. Supports a format of
* either a string encoded signed 32 bit number or 'tag:' followed by a signed 64 bit value.
*
* @param channelUri to read the value from.
* @param sessionIdStr for the publication or a restricted subscription.
* @return this for a fluent API.
* @see CommonContext#SESSION_ID_PARAM_NAME
*/
public ChannelUriStringBuilder sessionId(final ChannelUri channelUri)
public ChannelUriStringBuilder sessionId(final String sessionIdStr)
{
final String sessionIdValue = channelUri.get(SESSION_ID_PARAM_NAME);
if (null == sessionIdValue)
if (null != sessionIdStr)
{
sessionId = null;
return this;
if (ChannelUri.isTagged(sessionIdStr))
{
taggedSessionId(ChannelUri.getTag(sessionIdStr));
}
else
{
isSessionIdTagged(false);
sessionId(Integer.valueOf(sessionIdStr));
}
}
else
{
return sessionId(Integer.valueOf(sessionIdValue));
sessionId((Integer)null);
}

return this;
}

/**
* Set the session id for a publication or restricted subscription as a tag referenced value.
*
* @param sessionId for the publication or a restricted subscription.
* @return this for a fluent API.
* @see CommonContext#SESSION_ID_PARAM_NAME
*/
public ChannelUriStringBuilder taggedSessionId(final Long sessionId)
{
isSessionIdTagged(true);
this.sessionId = sessionId;
return this;
}

/**
* Set the sessionId value to be what is in the {@link ChannelUri} which may be null.
*
* @param channelUri to read the value from.
* @return this for a fluent API.
* @see CommonContext#SESSION_ID_PARAM_NAME
*/
public ChannelUriStringBuilder sessionId(final ChannelUri channelUri)
{
return sessionId(channelUri.get(SESSION_ID_PARAM_NAME));
}

/**
* Get the session id for a publication or restricted subscription.
*
* @return the session id for a publication or restricted subscription.
* @see CommonContext#SESSION_ID_PARAM_NAME
* @deprecated this method will not correctly handle tagged sessionId values that are outside the range of
* a signed 32 bit number. If this is called and a tagged value outside this range is currently held in this
* object, then the result will be the same as {@link Long#intValue()}.
*/
@Deprecated
public Integer sessionId()
{
return sessionId;
return null != sessionId ? sessionId.intValue() : null;
}

/**
Expand Down Expand Up @@ -1077,6 +1174,31 @@ public ChannelUriStringBuilder tags(final ChannelUri channelUri)
return tags(channelUri.get(TAGS_PARAM_NAME));
}

/**
* Set the tags to the specified channel and publication/subscription tag {@link ChannelUri}. The
* publication/subscription may be null. If channel tag is null, then the pubSubTag must be null.
*
* @param channelTag optional value for the channel tag.
* @param pubSubTag option value for the publication/subscription tag.
* @return this for a fluent API.
* @see CommonContext#TAGS_PARAM_NAME
* @throws IllegalArgumentException if channelTag is null and pubSubTag is not.
*/
public ChannelUriStringBuilder tags(final Long channelTag, final Long pubSubTag)
{
if (null == channelTag && null != pubSubTag)
{
throw new IllegalArgumentException("null == channelTag && null != pubSubTag");
}

if (null == channelTag)
{
return tags((String)null);
}

return tags(channelTag + (null != pubSubTag ? "," + pubSubTag : ""));
}

/**
* Get the tags for a channel used by a publication or subscription. Tags can be used to identify or tag a
* channel so that a configuration can be referenced and reused.
Expand Down Expand Up @@ -1105,7 +1227,7 @@ public ChannelUriStringBuilder isSessionIdTagged(final boolean isSessionIdTagged
}

/**
* Is the value for {@link #sessionId()} a tagged.
* Is the value for {@link #sessionId()} a tag.
*
* @return whether the value for {@link #sessionId()} a tag reference or not.
* @see CommonContext#TAGS_PARAM_NAME
Expand Down Expand Up @@ -1770,7 +1892,7 @@ public String toString()
return build();
}

private static String prefixTag(final boolean isTagged, final Integer value)
private static String prefixTag(final boolean isTagged, final Long value)
{
return isTagged ? TAG_PREFIX + value : value.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

import org.junit.jupiter.api.Test;

import java.util.Collections;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;

Expand Down Expand Up @@ -127,4 +129,30 @@ public void shouldGenerateChannelWithReceiverWindow()
"aeron:udp?endpoint=address:9999|rcv-wnd=8192",
builder.build());
}

@Test
void shouldBuildChannelBuilderUsingExistingStringWithAllTheFields()
{
final String uri = "aeron-spy:aeron:udp?endpoint=127.0.0.1:0|interface=127.0.0.1|control=127.0.0.2:0|" +
"control-mode=manual|tags=2,4|alias=foo|cc=cubic|fc=min|reliable=false|ttl=16|mtu=8992|" +
"term-length=1048576|init-term-id=5|term-offset=64|term-id=4353|session-id=2314234|gtag=3|linger=0|" +
"sparse=true|eos=true|tether=false|group=false|ssc=true|so-sndbuf=8388608|so-rcvbuf=2097152|" +
"rcv-wnd=1048576";

final ChannelUri fromString = ChannelUri.parse(uri);
final ChannelUri fromBuilder = ChannelUri.parse(new ChannelUriStringBuilder(uri).build());

assertEquals(Collections.emptyMap(), fromString.diff(fromBuilder));
}

@Test
void shouldBuildChannelBuilderUsingExistingStringWithTaggedSessionIdAndIpc()
{
final String uri = "aeron:ipc?session-id=tag:123456";

final ChannelUri fromString = ChannelUri.parse(uri);
final ChannelUri fromBuilder = ChannelUri.parse(new ChannelUriStringBuilder(uri).build());

assertEquals(Collections.emptyMap(), fromString.diff(fromBuilder));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public class MultiDestinationCastTest
private static final String SUB2_MDC_DYNAMIC_URI = "aeron:udp?control=localhost:24325|group=true";
private static final String SUB3_MDC_DYNAMIC_URI = CommonContext.SPY_PREFIX + PUB_MDC_DYNAMIC_URI;

private static final String PUB_MDC_MANUAL_URI = "aeron:udp?control-mode=manual|tags=3,4";
private static final String PUB_MDC_MANUAL_URI = "aeron:udp?control-mode=manual";
private static final String SUB1_MDC_MANUAL_URI = "aeron:udp?endpoint=localhost:24326|group=true";
private static final String SUB2_MDC_MANUAL_URI = "aeron:udp?endpoint=localhost:24327|group=true";
private static final String SUB3_MDC_MANUAL_URI = CommonContext.SPY_PREFIX + PUB_MDC_MANUAL_URI;
Expand Down Expand Up @@ -140,11 +140,16 @@ public void shouldSpinUpAndShutdownWithManual()
{
launch(Tests::onError);

final String taggedMdcUri = new ChannelUriStringBuilder(PUB_MDC_MANUAL_URI).tags(
clientA.nextCorrelationId(),
clientA.nextCorrelationId()).build();
final String spySubscriptionUri = new ChannelUriStringBuilder(taggedMdcUri).prefix("aeron-spy").build();

subscriptionA = clientA.addSubscription(SUB1_MDC_MANUAL_URI, STREAM_ID);
subscriptionB = clientB.addSubscription(SUB2_MDC_MANUAL_URI, STREAM_ID);
subscriptionC = clientA.addSubscription(SUB3_MDC_MANUAL_URI, STREAM_ID);
subscriptionC = clientA.addSubscription(spySubscriptionUri, STREAM_ID);

publication = clientA.addPublication(PUB_MDC_MANUAL_URI, STREAM_ID);
publication = clientA.addPublication(taggedMdcUri, STREAM_ID);
publication.addDestination(SUB1_MDC_MANUAL_URI);
final long correlationId = publication.asyncAddDestination(SUB2_MDC_MANUAL_URI);

Expand Down
Loading

0 comments on commit d2e2670

Please sign in to comment.