Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[pull] master from real-logic:master #2210

Merged
merged 1 commit into from
Dec 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 17 additions & 15 deletions aeron-driver/src/main/java/io/aeron/driver/DriverConductor.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

import io.aeron.Aeron;
import io.aeron.ChannelUri;
import io.aeron.CommonContext;
import io.aeron.driver.MediaDriver.Context;
import io.aeron.driver.buffer.LogFactory;
import io.aeron.driver.buffer.RawLog;
Expand Down Expand Up @@ -282,8 +281,11 @@ void onCreatePublicationImage(
Configuration.validateMtuLength(senderMtuLength);

final UdpChannel subscriptionChannel = channelEndpoint.subscriptionUdpChannel();
Configuration.validateInitialWindowLength(
subscriptionChannel.receiverWindowLengthOrDefault(ctx.initialWindowLength()), senderMtuLength);

final SubscriptionParams subscriptionParams =
SubscriptionParams.getSubscriptionParams(subscriptionChannel.channelUri(), ctx, termBufferLength);

Configuration.validateInitialWindowLength(subscriptionParams.receiverWindowLength, senderMtuLength);

final long joinPosition = computePosition(
activeTermId, initialTermOffset, LogBufferDescriptor.positionBitsToShift(termBufferLength), initialTermId);
Expand Down Expand Up @@ -1044,7 +1046,7 @@ void onAddNetworkSubscription(
validateTimestampConfiguration(udpChannel);

final SubscriptionParams params =
SubscriptionParams.getSubscriptionParams(udpChannel.channelUri(), ctx);
SubscriptionParams.getSubscriptionParams(udpChannel.channelUri(), ctx, 0);
checkForClashingSubscription(params, udpChannel, streamId);

final ReceiveChannelEndpoint channelEndpoint = getOrCreateReceiveChannelEndpoint(
Expand Down Expand Up @@ -1091,7 +1093,7 @@ private void addNetworkSubscriptionToReceiver(final NetworkSubscriptionLink subs

void onAddIpcSubscription(final String channel, final int streamId, final long registrationId, final long clientId)
{
final SubscriptionParams params = SubscriptionParams.getSubscriptionParams(parseUri(channel), ctx);
final SubscriptionParams params = SubscriptionParams.getSubscriptionParams(parseUri(channel), ctx, 0);
final IpcSubscriptionLink subscriptionLink = new IpcSubscriptionLink(
registrationId, streamId, channel, getOrAddClient(clientId), params);

Expand All @@ -1110,7 +1112,7 @@ void onAddIpcSubscription(final String channel, final int streamId, final long r
registrationId,
linkIpcSubscription(publication, subscriptionLink).id(),
publication.rawLog().fileName(),
CommonContext.IPC_CHANNEL);
IPC_CHANNEL);
}
}
}
Expand All @@ -1124,7 +1126,7 @@ void onAddSpySubscription(final String channel, final int streamId, final long r
{
final UdpChannel udpChannel = asyncResult.get();
final SubscriptionParams params =
SubscriptionParams.getSubscriptionParams(udpChannel.channelUri(), ctx);
SubscriptionParams.getSubscriptionParams(udpChannel.channelUri(), ctx, 0);
final SpySubscriptionLink subscriptionLink = new SpySubscriptionLink(
registrationId, udpChannel, streamId, getOrAddClient(clientId), params);

Expand All @@ -1143,7 +1145,7 @@ void onAddSpySubscription(final String channel, final int streamId, final long r
registrationId,
linkSpy(publication, subscriptionLink).id(),
publication.rawLog().fileName(),
CommonContext.IPC_CHANNEL);
IPC_CHANNEL);
}
}
});
Expand Down Expand Up @@ -1295,7 +1297,7 @@ else if (destinationChannel.startsWith(SPY_QUALIFIER))
void onAddRcvIpcDestination(final long registrationId, final String destinationChannel, final long correlationId)
{
final SubscriptionParams params =
SubscriptionParams.getSubscriptionParams(parseUri(destinationChannel), ctx);
SubscriptionParams.getSubscriptionParams(parseUri(destinationChannel), ctx, 0);
final SubscriptionLink mdsSubscriptionLink = findMdsSubscriptionLink(subscriptionLinks, registrationId);

if (null == mdsSubscriptionLink)
Expand Down Expand Up @@ -1325,7 +1327,7 @@ void onAddRcvIpcDestination(final long registrationId, final String destinationC
registrationId,
linkIpcSubscription(publication, subscriptionLink).id(),
publication.rawLog().fileName(),
CommonContext.IPC_CHANNEL);
IPC_CHANNEL);
}
}
}
Expand All @@ -1339,7 +1341,7 @@ void onAddRcvSpyDestination(final long registrationId, final String destinationC
{
final UdpChannel udpChannel = asyncResult.get();
final SubscriptionParams params =
SubscriptionParams.getSubscriptionParams(udpChannel.channelUri(), ctx);
SubscriptionParams.getSubscriptionParams(udpChannel.channelUri(), ctx, 0);
final SubscriptionLink mdsSubscriptionLink = findMdsSubscriptionLink(subscriptionLinks, registrationId);

if (null == mdsSubscriptionLink)
Expand Down Expand Up @@ -1370,7 +1372,7 @@ void onAddRcvSpyDestination(final long registrationId, final String destinationC
registrationId,
linkSpy(publication, subscriptionLink).id(),
publication.rawLog().fileName(),
CommonContext.IPC_CHANNEL);
IPC_CHANNEL);
}
}
});
Expand Down Expand Up @@ -1983,7 +1985,7 @@ private SendChannelEndpoint findExistingSendChannelEndpoint(final UdpChannel udp
}

if (!udpChannel.hasExplicitControl() && !udpChannel.isManualControlMode() &&
!udpChannel.channelUri().containsKey(CommonContext.ENDPOINT_PARAM_NAME))
!udpChannel.channelUri().containsKey(ENDPOINT_PARAM_NAME))
{
throw new InvalidChannelException(
"URI must have explicit control, endpoint, or be manual control-mode when original: channel=" +
Expand Down Expand Up @@ -2097,7 +2099,7 @@ private void linkIpcSubscriptions(final IpcPublication publication)
subscription.registrationId,
linkIpcSubscription(publication, subscription).id(),
publication.rawLog().fileName(),
CommonContext.IPC_CHANNEL);
IPC_CHANNEL);
}
}
}
Expand Down Expand Up @@ -2451,7 +2453,7 @@ private void linkSpies(final ArrayList<SubscriptionLink> links, final NetworkPub
subscription.registrationId(),
linkSpy(publication, subscription).id(),
publication.rawLog().fileName(),
CommonContext.IPC_CHANNEL);
IPC_CHANNEL);
}
}
}
Expand Down
26 changes: 14 additions & 12 deletions aeron-driver/src/main/java/io/aeron/driver/SubscriptionParams.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,10 @@ final class SubscriptionParams
boolean isTether = true;
boolean isResponse = false;
InferableBoolean group = InferableBoolean.INFER;
int initialWindowLength;
int receiverWindowLength;

static SubscriptionParams getSubscriptionParams(final ChannelUri channelUri, final MediaDriver.Context context)
static SubscriptionParams getSubscriptionParams(
final ChannelUri channelUri, final MediaDriver.Context context, final int publisherTermBufferLength)
{
final SubscriptionParams params = new SubscriptionParams();

Expand Down Expand Up @@ -114,8 +115,11 @@ static SubscriptionParams getSubscriptionParams(final ChannelUri channelUri, fin
final String groupStr = channelUri.get(GROUP_PARAM_NAME);
params.group = null != groupStr ? InferableBoolean.parse(groupStr) : context.receiverGroupConsideration();

final int initialWindowLength = UdpChannel.parseBufferLength(channelUri, RECEIVER_WINDOW_LENGTH_PARAM_NAME);
params.initialWindowLength = 0 != initialWindowLength ? initialWindowLength : context.initialWindowLength();
final int rcvWndLength = UdpChannel.parseBufferLength(channelUri, RECEIVER_WINDOW_LENGTH_PARAM_NAME);
params.receiverWindowLength = Configuration.receiverWindowLength(
0 != publisherTermBufferLength ? publisherTermBufferLength :
(channelUri.isIpc() ? context.ipcTermBufferLength() : context.publicationTermBufferLength()),
0 != rcvWndLength ? rcvWndLength : context.initialWindowLength());

params.isResponse = CONTROL_MODE_RESPONSE.equals(channelUri.get(MDC_CONTROL_MODE_PARAM_NAME));

Expand All @@ -129,21 +133,19 @@ static void validateInitialWindowForRcvBuf(
final MediaDriver.Context ctx,
final String existingChannel)
{
if (0 != channelSocketRcvbufLength && params.initialWindowLength > channelSocketRcvbufLength)
if (0 != channelSocketRcvbufLength && params.receiverWindowLength > channelSocketRcvbufLength)
{
throw new IllegalStateException(
"Initial window greater than SO_RCVBUF for channel: rcv-wnd=" + params.initialWindowLength +
"Initial window greater than SO_RCVBUF for channel: rcv-wnd=" + params.receiverWindowLength +
" so-rcvbuf=" + channelSocketRcvbufLength +
(null == existingChannel ? "" : (" existingChannel=" + existingChannel)) +
" channel=" + channel);
(null == existingChannel ? "" : (" existingChannel=" + existingChannel)) + " channel=" + channel);
}
else if (0 == channelSocketRcvbufLength && params.initialWindowLength > ctx.osDefaultSocketRcvbufLength())
else if (0 == channelSocketRcvbufLength && params.receiverWindowLength > ctx.osDefaultSocketRcvbufLength())
{
throw new IllegalStateException(
"Initial window greater than SO_RCVBUF for channel: rcv-wnd=" + params.initialWindowLength +
"Initial window greater than SO_RCVBUF for channel: rcv-wnd=" + params.receiverWindowLength +
" so-rcvbuf=" + ctx.osDefaultSocketRcvbufLength() + " (OS default)" +
(null == existingChannel ? "" : (" existingChannel=" + existingChannel)) +
" channel=" + channel);
(null == existingChannel ? "" : (" existingChannel=" + existingChannel)) + " channel=" + channel);
}
}

Expand Down
Loading