diff --git a/.github/workflows/ci-low-cadence.yml b/.github/workflows/ci-low-cadence.yml index fa251d1461..7bff3cf3c5 100644 --- a/.github/workflows/ci-low-cadence.yml +++ b/.github/workflows/ci-low-cadence.yml @@ -25,7 +25,7 @@ jobs: strategy: fail-fast: false matrix: - java: [ '8', '11', '17', '21-ea' ] + java: [ '8', '17', '21' ] os: [ 'ubuntu-22.04', 'windows-latest' ] steps: - name: Checkout code diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 8fe87b4d26..01108d2745 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -33,7 +33,7 @@ jobs: strategy: fail-fast: false matrix: - java: [ '8', '11', '17', '21-ea' ] + java: [ '8', '17', '21' ] os: [ 'ubuntu-22.04', 'windows-latest' ] steps: - name: Checkout code diff --git a/aeron-archive/src/main/java/io/aeron/archive/Archive.java b/aeron-archive/src/main/java/io/aeron/archive/Archive.java index d63799c4fe..62bf9195f5 100644 --- a/aeron-archive/src/main/java/io/aeron/archive/Archive.java +++ b/aeron-archive/src/main/java/io/aeron/archive/Archive.java @@ -3000,7 +3000,7 @@ public boolean ownsAeronClient() * @param catalog {@link Catalog} describing the contents of the Archive. * @return this for a fluent API. */ - public Context catalog(final Catalog catalog) + Context catalog(final Catalog catalog) { this.catalog = catalog; return this; @@ -3011,7 +3011,7 @@ public Context catalog(final Catalog catalog) * * @return the {@link Catalog} describing the contents of the Archive. */ - public Catalog catalog() + Catalog catalog() { return catalog; } diff --git a/aeron-client/src/main/java/io/aeron/Aeron.java b/aeron-client/src/main/java/io/aeron/Aeron.java index a610905b6e..5f59b7ff10 100644 --- a/aeron-client/src/main/java/io/aeron/Aeron.java +++ b/aeron-client/src/main/java/io/aeron/Aeron.java @@ -433,9 +433,10 @@ public long asyncAddSubscription(final String channel, final int streamId) * * @param registrationId returned from * {@link #asyncAddSubscription(String, int, AvailableImageHandler, UnavailableImageHandler)} - * or {@link #asyncAddSubscription(String, int)} - * @return a new {@link ConcurrentPublication} when available otherwise null. - * @see #asyncAddPublication(String, int) + * or {@link #asyncAddSubscription(String, int)} + * @return a new {@link Subscription} when available otherwise null. + * @see #asyncAddSubscription(String, int) + * @see #asyncAddSubscription(String, int, AvailableImageHandler, UnavailableImageHandler) */ public Subscription getSubscription(final long registrationId) { @@ -1199,7 +1200,7 @@ Context logBuffersFactory(final LogBuffersFactory logBuffersFactory) * * @return the factory for making log buffers. */ - public LogBuffersFactory logBuffersFactory() + LogBuffersFactory logBuffersFactory() { return logBuffersFactory; } diff --git a/aeron-client/src/main/java/io/aeron/ChannelUri.java b/aeron-client/src/main/java/io/aeron/ChannelUri.java index e20ea9241a..b19a4182fb 100644 --- a/aeron-client/src/main/java/io/aeron/ChannelUri.java +++ b/aeron-client/src/main/java/io/aeron/ChannelUri.java @@ -313,7 +313,7 @@ public String toString() sb.append(AERON_PREFIX).append(media); - if (params.size() > 0) + if (!params.isEmpty()) { sb.append('?'); diff --git a/aeron-cluster/src/main/java/io/aeron/cluster/ClusterMember.java b/aeron-cluster/src/main/java/io/aeron/cluster/ClusterMember.java index 8fd77dfd95..f6a61c8426 100644 --- a/aeron-cluster/src/main/java/io/aeron/cluster/ClusterMember.java +++ b/aeron-cluster/src/main/java/io/aeron/cluster/ClusterMember.java @@ -536,7 +536,7 @@ public void closePublication(final ErrorHandler errorHandler) */ public static ClusterMember[] parse(final String value) { - if (null == value || value.length() == 0) + if (null == value || value.isEmpty()) { return ClusterMember.EMPTY_MEMBERS; } @@ -653,7 +653,7 @@ public static String encodeAsString(final ClusterMember[] clusterMembers) */ public static String encodeAsString(final List clusterMembers) { - if (0 == clusterMembers.size()) + if (clusterMembers.isEmpty()) { return ""; } diff --git a/aeron-cluster/src/main/java/io/aeron/cluster/package-info.java b/aeron-cluster/src/main/java/io/aeron/cluster/package-info.java index 691bb46103..17173dd1d3 100644 --- a/aeron-cluster/src/main/java/io/aeron/cluster/package-info.java +++ b/aeron-cluster/src/main/java/io/aeron/cluster/package-info.java @@ -56,7 +56,6 @@ * The majority of cluster members determine consensus. Clusters should typically be 3 or 5 in population size. * However, 2 node clusters are supported whereby both members must agree the log and in the event of failure the * remaining member must be manually reconfigured as a single node cluster to progress. - * *

Protocol

* Messages are specified using SBE * in this schema diff --git a/aeron-cluster/src/test/java/io/aeron/cluster/ConsensusModuleContextTest.java b/aeron-cluster/src/test/java/io/aeron/cluster/ConsensusModuleContextTest.java index a6a73be554..31f4341bc1 100644 --- a/aeron-cluster/src/test/java/io/aeron/cluster/ConsensusModuleContextTest.java +++ b/aeron-cluster/src/test/java/io/aeron/cluster/ConsensusModuleContextTest.java @@ -62,8 +62,6 @@ import static io.aeron.cluster.service.ClusteredServiceContainer.Configuration.MARK_FILE_DIR_PROP_NAME; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.junit.jupiter.api.Assertions.*; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.Mockito.*; class ConsensusModuleContextTest diff --git a/aeron-driver/src/main/c/agent/aeron_driver_agent.c b/aeron-driver/src/main/c/agent/aeron_driver_agent.c index 1568360071..c86725d6fc 100644 --- a/aeron-driver/src/main/c/agent/aeron_driver_agent.c +++ b/aeron-driver/src/main/c/agent/aeron_driver_agent.c @@ -72,7 +72,7 @@ static aeron_thread_t log_reader_thread; static aeron_driver_agent_dynamic_dissector_entry_t *dynamic_dissector_entries = NULL; static size_t num_dynamic_dissector_entries = 0; static int64_t dynamic_dissector_index = 0; -static struct aeron_driver_agent_log_event_stct log_events[AERON_DRIVER_EVENT_NUM_ELEMENTS] = +static aeron_driver_agent_log_event_t log_events[] = { { AERON_DRIVER_AGENT_EVENT_UNKNOWN_NAME, AERON_DRIVER_AGENT_EVENT_TYPE_UNKNOWN, false }, { "FRAME_IN", AERON_DRIVER_AGENT_EVENT_TYPE_OTHER, false }, @@ -127,16 +127,17 @@ static struct aeron_driver_agent_log_event_stct log_events[AERON_DRIVER_EVENT_NU { "NAME_RESOLUTION_RESOLVE", AERON_DRIVER_AGENT_EVENT_TYPE_OTHER, false }, { "GENERIC_MESSAGE", AERON_DRIVER_AGENT_EVENT_TYPE_OTHER, false }, { "NAME_RESOLUTION_LOOKUP", AERON_DRIVER_AGENT_EVENT_TYPE_OTHER, false }, - { "NAME_RESOLUTION_HOST_NAME", AERON_DRIVER_AGENT_EVENT_TYPE_OTHER, false }, + { "NAME_RESOLUTION_HOST_NAME", AERON_DRIVER_AGENT_EVENT_TYPE_OTHER, false }, { "ADD_DYNAMIC_DISSECTOR", AERON_DRIVER_AGENT_EVENT_TYPE_OTHER, false }, { "DYNAMIC_DISSECTOR_EVENT", AERON_DRIVER_AGENT_EVENT_TYPE_OTHER, false }, }; -typedef struct aeron_driver_agent_name_resolver_state_stct +#define AERON_DRIVER_EVENT_NUM_ELEMENTS (sizeof(log_events) / sizeof(aeron_driver_agent_log_event_t)) + +size_t aeron_driver_agent_max_event_count(void) { - aeron_name_resolver_resolve_func_t delegate_resolve_func; + return AERON_DRIVER_EVENT_NUM_ELEMENTS; } -aeron_driver_agent_name_resolver_state_t; aeron_mpsc_rb_t *aeron_driver_agent_mpsc_rb(void) { @@ -189,7 +190,7 @@ void aeron_driver_agent_logging_ring_buffer_init(void) if (aeron_mpsc_rb_init(&logging_mpsc_rb, rb_buffer, rb_length) < 0) { - fprintf(stderr, "could not init logging mpwc_rb. exiting.\n"); + fprintf(stderr, "could not init logging mpsc_rb. exiting.\n"); exit(EXIT_FAILURE); } } @@ -216,7 +217,7 @@ static aeron_driver_agent_event_t aeron_driver_agent_event_name_to_id(const char return AERON_DRIVER_EVENT_UNKNOWN_EVENT; } - for (int i = 0; i < AERON_DRIVER_EVENT_NUM_ELEMENTS; i++) + for (size_t i = 0; i < AERON_DRIVER_EVENT_NUM_ELEMENTS; i++) { const char *name = log_events[i].name; if (0 == strncmp(name, event_name, strlen(name) + 1)) @@ -230,7 +231,7 @@ static aeron_driver_agent_event_t aeron_driver_agent_event_name_to_id(const char static inline bool is_valid_event_id(const int id) { - return id >= 0 && id < AERON_DRIVER_EVENT_NUM_ELEMENTS; + return id >= 0 && id < (int)AERON_DRIVER_EVENT_NUM_ELEMENTS; } const char *aeron_driver_agent_event_name(const aeron_driver_agent_event_t id) @@ -250,7 +251,7 @@ bool aeron_driver_agent_is_event_enabled(const aeron_driver_agent_event_t id) static void aeron_driver_agent_set_enabled_all_events(const bool is_enabled) { - for (int i = 0; i < AERON_DRIVER_EVENT_NUM_ELEMENTS; i++) + for (size_t i = 0; i < AERON_DRIVER_EVENT_NUM_ELEMENTS; i++) { const char *event_name = log_events[i].name; if (!aeron_driver_agent_is_unknown_event(event_name)) @@ -262,7 +263,7 @@ static void aeron_driver_agent_set_enabled_all_events(const bool is_enabled) static void aeron_driver_agent_set_enabled_admin_events(const bool is_enabled) { - for (int i = 0; i < AERON_DRIVER_EVENT_NUM_ELEMENTS; i++) + for (size_t i = 0; i < AERON_DRIVER_EVENT_NUM_ELEMENTS; i++) { if (AERON_DRIVER_EVENT_FRAME_IN != i && AERON_DRIVER_EVENT_FRAME_OUT != i && @@ -280,7 +281,7 @@ static void aeron_driver_agent_set_enabled_admin_events(const bool is_enabled) static void aeron_driver_agent_set_enabled_specific_events(const uint8_t type, const bool is_enabled) { - for (int i = 0; i < AERON_DRIVER_EVENT_NUM_ELEMENTS; i++) + for (size_t i = 0; i < AERON_DRIVER_EVENT_NUM_ELEMENTS; i++) { if (type == log_events[i].type) { @@ -291,7 +292,7 @@ static void aeron_driver_agent_set_enabled_specific_events(const uint8_t type, c static bool any_event_enabled(const uint8_t type) { - for (int i = 0; i < AERON_DRIVER_EVENT_NUM_ELEMENTS; i++) + for (size_t i = 0; i < AERON_DRIVER_EVENT_NUM_ELEMENTS; i++) { if (type == log_events[i].type && log_events[i].enabled) { @@ -474,7 +475,7 @@ bool aeron_driver_agent_logging_events_init(const char *event_log, const char *e void aeron_driver_agent_logging_events_free(void) { - for (int i = 0; i < AERON_DRIVER_EVENT_NUM_ELEMENTS; i++) + for (size_t i = 0; i < AERON_DRIVER_EVENT_NUM_ELEMENTS; i++) { log_events[i].enabled = false; } diff --git a/aeron-driver/src/main/c/agent/aeron_driver_agent.h b/aeron-driver/src/main/c/agent/aeron_driver_agent.h index 5ea01479be..ca8c93e002 100644 --- a/aeron-driver/src/main/c/agent/aeron_driver_agent.h +++ b/aeron-driver/src/main/c/agent/aeron_driver_agent.h @@ -33,6 +33,7 @@ typedef enum aeron_driver_agent_event_enum { + AERON_DRIVER_EVENT_UNKNOWN_EVENT = -1, AERON_DRIVER_EVENT_FRAME_IN = 1, AERON_DRIVER_EVENT_FRAME_OUT = 2, AERON_DRIVER_EVENT_CMD_IN_ADD_PUBLICATION = 3, @@ -77,11 +78,8 @@ typedef enum aeron_driver_agent_event_enum AERON_DRIVER_EVENT_NAME_RESOLUTION_HOST_NAME = 53, // C-specific events. Note: event IDs are dynamic to avoid gaps in the sparse arrays. - AERON_DRIVER_EVENT_ADD_DYNAMIC_DISSECTOR, - AERON_DRIVER_EVENT_DYNAMIC_DISSECTOR_EVENT, - - AERON_DRIVER_EVENT_NUM_ELEMENTS, // number of elements in this enum (including gaps) - AERON_DRIVER_EVENT_UNKNOWN_EVENT = -1 + AERON_DRIVER_EVENT_ADD_DYNAMIC_DISSECTOR = 54, + AERON_DRIVER_EVENT_DYNAMIC_DISSECTOR_EVENT = 55 } aeron_driver_agent_event_t; @@ -209,6 +207,8 @@ typedef int (*aeron_driver_context_init_t)(aeron_driver_context_t **); int aeron_driver_agent_context_init(aeron_driver_context_t *context); +size_t aeron_driver_agent_max_event_count(void); + const char *aeron_driver_agent_dissect_log_header( int64_t time_ns, aeron_driver_agent_event_t event_id, diff --git a/aeron-driver/src/main/java/io/aeron/driver/AbstractMinMulticastFlowControl.java b/aeron-driver/src/main/java/io/aeron/driver/AbstractMinMulticastFlowControl.java index 05d471edda..6bec2c4859 100644 --- a/aeron-driver/src/main/java/io/aeron/driver/AbstractMinMulticastFlowControl.java +++ b/aeron-driver/src/main/java/io/aeron/driver/AbstractMinMulticastFlowControl.java @@ -67,7 +67,11 @@ public abstract class AbstractMinMulticastFlowControl extends AbstractMinMulticastFlowControlRhsPadding implements FlowControl { - static final Receiver[] EMPTY_RECEIVERS = new Receiver[0]; + /** + * Multiple of receiver window to allow for a retransmit action. + */ + private static final int RETRANSMIT_RECEIVER_WINDOW_MULTIPLE = 16; + private static final Receiver[] EMPTY_RECEIVERS = new Receiver[0]; private final boolean isGroupTagAware; private volatile boolean hasRequiredReceivers; @@ -192,6 +196,21 @@ public boolean hasRequiredReceivers() return hasRequiredReceivers; } + /** + * {@inheritDoc} + */ + public int maxRetransmissionLength( + final long resendPosition, + final int resendLength, + final int termBufferLength, + final int mtuLength) + { + final int estimatedWindowLength = Configuration.receiverWindowLength( + termBufferLength, Configuration.INITIAL_WINDOW_LENGTH_DEFAULT); + + return Math.min(RETRANSMIT_RECEIVER_WINDOW_MULTIPLE * estimatedWindowLength, resendLength); + } + /** * Process a received status message. * diff --git a/aeron-driver/src/main/java/io/aeron/driver/DriverConductor.java b/aeron-driver/src/main/java/io/aeron/driver/DriverConductor.java index 092df69c94..52d45ad808 100644 --- a/aeron-driver/src/main/java/io/aeron/driver/DriverConductor.java +++ b/aeron-driver/src/main/java/io/aeron/driver/DriverConductor.java @@ -254,7 +254,7 @@ void onCreatePublicationImage( final ArrayList subscriberPositions = createSubscriberPositions( sessionId, streamId, channelEndpoint, joinPosition); - if (subscriberPositions.size() > 0) + if (!subscriberPositions.isEmpty()) { RawLog rawLog = null; CongestionControl congestionControl = null; diff --git a/aeron-driver/src/main/java/io/aeron/driver/FlowControl.java b/aeron-driver/src/main/java/io/aeron/driver/FlowControl.java index 134698637e..350a84063d 100644 --- a/aeron-driver/src/main/java/io/aeron/driver/FlowControl.java +++ b/aeron-driver/src/main/java/io/aeron/driver/FlowControl.java @@ -104,6 +104,21 @@ void initialize( */ boolean hasRequiredReceivers(); + /** + * The maximum window length allowed to retransmit per NAK. + * + * @param resendPosition of the NAK. + * @param resendLength of the NAK. + * @param termBufferLength of the publication. + * @param mtuLength of the publication. + * @return the maximum window length allowed to retransmit per NAK. + */ + int maxRetransmissionLength( + long resendPosition, + int resendLength, + int termBufferLength, + int mtuLength); + /** * {@inheritDoc} */ diff --git a/aeron-driver/src/main/java/io/aeron/driver/IpcPublication.java b/aeron-driver/src/main/java/io/aeron/driver/IpcPublication.java index 28473acb55..3c3201be11 100644 --- a/aeron-driver/src/main/java/io/aeron/driver/IpcPublication.java +++ b/aeron-driver/src/main/java/io/aeron/driver/IpcPublication.java @@ -520,14 +520,14 @@ private void cleanBufferTo(final long position) final long cleanPosition = this.cleanPosition; if (position > cleanPosition) { - final UnsafeBuffer dirtyTerm = termBuffers[indexByPosition(cleanPosition, positionBitsToShift)]; + final UnsafeBuffer dirtyTermBuffer = termBuffers[indexByPosition(cleanPosition, positionBitsToShift)]; final int bytesForCleaning = (int)(position - cleanPosition); final int bufferCapacity = termBufferLength; final int termOffset = (int)cleanPosition & (bufferCapacity - 1); final int length = Math.min(bytesForCleaning, bufferCapacity - termOffset); - dirtyTerm.setMemory(termOffset + SIZE_OF_LONG, length - SIZE_OF_LONG, (byte)0); - dirtyTerm.putLongOrdered(termOffset, 0); + dirtyTermBuffer.setMemory(termOffset + SIZE_OF_LONG, length - SIZE_OF_LONG, (byte)0); + dirtyTermBuffer.putLongOrdered(termOffset, 0); this.cleanPosition = cleanPosition + length; } } diff --git a/aeron-driver/src/main/java/io/aeron/driver/MaxMulticastFlowControl.java b/aeron-driver/src/main/java/io/aeron/driver/MaxMulticastFlowControl.java index 0d11e2b2a9..91aa87ece7 100644 --- a/aeron-driver/src/main/java/io/aeron/driver/MaxMulticastFlowControl.java +++ b/aeron-driver/src/main/java/io/aeron/driver/MaxMulticastFlowControl.java @@ -42,6 +42,11 @@ public class MaxMulticastFlowControl implements FlowControl */ public static final MaxMulticastFlowControl INSTANCE = new MaxMulticastFlowControl(); + /** + * Multiple of receiver window to allow for a retransmit action. + */ + private static final int RETRANSMIT_RECEIVER_WINDOW_MULTIPLE = 4; + /** * {@inheritDoc} */ @@ -113,4 +118,19 @@ public boolean hasRequiredReceivers() { return true; } + + /** + * {@inheritDoc} + */ + public int maxRetransmissionLength( + final long resendPosition, + final int resendLength, + final int termBufferLength, + final int mtuLength) + { + final int estimatedWindowLength = Configuration.receiverWindowLength( + termBufferLength, Configuration.INITIAL_WINDOW_LENGTH_DEFAULT); + + return Math.min(RETRANSMIT_RECEIVER_WINDOW_MULTIPLE * estimatedWindowLength, resendLength); + } } diff --git a/aeron-driver/src/main/java/io/aeron/driver/NetworkPublication.java b/aeron-driver/src/main/java/io/aeron/driver/NetworkPublication.java index 121f5fe798..66d1c159ea 100644 --- a/aeron-driver/src/main/java/io/aeron/driver/NetworkPublication.java +++ b/aeron-driver/src/main/java/io/aeron/driver/NetworkPublication.java @@ -485,7 +485,8 @@ public void resend(final int termId, final int termOffset, final int length) final UnsafeBuffer termBuffer = termBuffers[activeIndex]; final ByteBuffer sendBuffer = sendBuffers[activeIndex]; - int remainingBytes = length; + int remainingBytes = flowControl.maxRetransmissionLength( + resendPosition, length, termBufferLength, mtuLength); int bytesSent = 0; int offset = termOffset; do @@ -777,13 +778,13 @@ private void cleanBufferTo(final long position) final long cleanPosition = this.cleanPosition; if (position > cleanPosition) { - final UnsafeBuffer dirtyTerm = termBuffers[indexByPosition(cleanPosition, positionBitsToShift)]; + final UnsafeBuffer dirtyTermBuffer = termBuffers[indexByPosition(cleanPosition, positionBitsToShift)]; final int bytesForCleaning = (int)(position - cleanPosition); final int termOffset = (int)cleanPosition & termLengthMask; final int length = Math.min(bytesForCleaning, termBufferLength - termOffset); - dirtyTerm.setMemory(termOffset + SIZE_OF_LONG, length - SIZE_OF_LONG, (byte)0); - dirtyTerm.putLongOrdered(termOffset, 0); + dirtyTermBuffer.setMemory(termOffset + SIZE_OF_LONG, length - SIZE_OF_LONG, (byte)0); + dirtyTermBuffer.putLongOrdered(termOffset, 0); this.cleanPosition = cleanPosition + length; } } diff --git a/aeron-driver/src/main/java/io/aeron/driver/PublicationImage.java b/aeron-driver/src/main/java/io/aeron/driver/PublicationImage.java index 23602994fb..980d4b601c 100644 --- a/aeron-driver/src/main/java/io/aeron/driver/PublicationImage.java +++ b/aeron-driver/src/main/java/io/aeron/driver/PublicationImage.java @@ -903,12 +903,12 @@ private void cleanBufferTo(final long position) if (position > cleanPosition) { final int bytesForCleaning = (int)(position - cleanPosition); - final UnsafeBuffer dirtyTerm = termBuffers[indexByPosition(cleanPosition, positionBitsToShift)]; + final UnsafeBuffer dirtyTermBuffer = termBuffers[indexByPosition(cleanPosition, positionBitsToShift)]; final int termOffset = (int)cleanPosition & termLengthMask; - final int length = Math.min(bytesForCleaning, dirtyTerm.capacity() - termOffset); + final int length = Math.min(bytesForCleaning, dirtyTermBuffer.capacity() - termOffset); - dirtyTerm.setMemory(termOffset, length - SIZE_OF_LONG, (byte)0); - dirtyTerm.putLongOrdered(termOffset + (length - SIZE_OF_LONG), 0); + dirtyTermBuffer.setMemory(termOffset, length - SIZE_OF_LONG, (byte)0); + dirtyTermBuffer.putLongOrdered(termOffset + (length - SIZE_OF_LONG), 0); this.cleanPosition = cleanPosition + length; } } diff --git a/aeron-driver/src/main/java/io/aeron/driver/RetransmitHandler.java b/aeron-driver/src/main/java/io/aeron/driver/RetransmitHandler.java index 4895fb0321..21f2a2035f 100644 --- a/aeron-driver/src/main/java/io/aeron/driver/RetransmitHandler.java +++ b/aeron-driver/src/main/java/io/aeron/driver/RetransmitHandler.java @@ -32,7 +32,7 @@ */ public final class RetransmitHandler { - private final BiInt2ObjectMap activeRetransmitsMap = new BiInt2ObjectMap<>(); + private final BiInt2ObjectMap activeRetransmitByTermIdAndTermOffsetMap = new BiInt2ObjectMap<>(); private final RetransmitAction[] retransmitActionPool = new RetransmitAction[MAX_RETRANSMITS_DEFAULT]; private final NanoClock nanoClock; private final FeedbackDelayGenerator delayGenerator; @@ -42,10 +42,10 @@ public final class RetransmitHandler /** * Create a handler for the dealing with the reception of frame request a frame to be retransmitted. * - * @param nanoClock used to determine time - * @param invalidPackets for recording invalid packets - * @param delayGenerator to use for delay determination - * @param lingerTimeoutGenerator to use for linger timeout + * @param nanoClock used to determine time. + * @param invalidPackets for recording invalid packets. + * @param delayGenerator to use for delay determination. + * @param lingerTimeoutGenerator to use for linger timeout. */ public RetransmitHandler( final NanoClock nanoClock, @@ -67,11 +67,11 @@ public RetransmitHandler( /** * Called on reception of a NAK to start retransmits handling. * - * @param termId from the NAK and the term id of the buffer to retransmit from - * @param termOffset from the NAK and the offset of the data to retransmit - * @param length of the missing data + * @param termId from the NAK and the term id of the buffer to retransmit from. + * @param termOffset from the NAK and the offset of the data to retransmit. + * @param length of the missing data. * @param termLength of the term buffer. - * @param retransmitSender to call if an immediate retransmit is required + * @param retransmitSender to call if an immediate retransmit is required. */ public void onNak( final int termId, @@ -82,8 +82,8 @@ public void onNak( { if (!isInvalid(termOffset, termLength)) { - if (null == activeRetransmitsMap.get(termId, termOffset) && - activeRetransmitsMap.size() < MAX_RETRANSMITS_DEFAULT) + if (null == activeRetransmitByTermIdAndTermOffsetMap.get(termId, termOffset) && + activeRetransmitByTermIdAndTermOffsetMap.size() < MAX_RETRANSMITS_DEFAULT) { final RetransmitAction action = assignRetransmitAction(); action.termId = termId; @@ -101,7 +101,7 @@ public void onNak( action.delay(delay, nanoClock.nanoTime()); } - activeRetransmitsMap.put(termId, termOffset, action); + activeRetransmitByTermIdAndTermOffsetMap.put(termId, termOffset, action); } } } @@ -111,30 +111,29 @@ public void onNak( *

* NOTE: Currently only called from unit tests. Would be used for retransmitting from receivers for NAK suppression. * - * @param termId of the data - * @param termOffset of the data + * @param termId of the data. + * @param termOffset of the data. */ public void onRetransmitReceived(final int termId, final int termOffset) { - final RetransmitAction action = activeRetransmitsMap.get(termId, termOffset); + final RetransmitAction action = activeRetransmitByTermIdAndTermOffsetMap.get(termId, termOffset); if (null != action && DELAYED == action.state) { - activeRetransmitsMap.remove(termId, termOffset); - action.cancel(); - // do not go into linger + activeRetransmitByTermIdAndTermOffsetMap.remove(termId, termOffset); + action.cancel(); // do not go into linger } } /** * Called to process any outstanding timeouts. * - * @param nowNs time in nanoseconds - * @param retransmitSender to call on retransmissions + * @param nowNs time in nanoseconds. + * @param retransmitSender to call on retransmissions. */ public void processTimeouts(final long nowNs, final RetransmitSender retransmitSender) { - if (activeRetransmitsMap.size() > 0) + if (!activeRetransmitByTermIdAndTermOffsetMap.isEmpty()) { for (final RetransmitAction action : retransmitActionPool) { @@ -146,7 +145,7 @@ public void processTimeouts(final long nowNs, final RetransmitSender retransmitS else if (LINGERING == action.state && (action.expireNs - nowNs < 0)) { action.cancel(); - activeRetransmitsMap.remove(action.termId, action.termOffset); + activeRetransmitByTermIdAndTermOffsetMap.remove(action.termId, action.termOffset); } } } diff --git a/aeron-driver/src/main/java/io/aeron/driver/UnicastFlowControl.java b/aeron-driver/src/main/java/io/aeron/driver/UnicastFlowControl.java index 5096bc59b6..0c1449bfe5 100644 --- a/aeron-driver/src/main/java/io/aeron/driver/UnicastFlowControl.java +++ b/aeron-driver/src/main/java/io/aeron/driver/UnicastFlowControl.java @@ -37,6 +37,11 @@ public class UnicastFlowControl implements FlowControl */ public static final UnicastFlowControl INSTANCE = new UnicastFlowControl(); + /** + * Multiple of receiver window to allow for a retransmit action. + */ + private static final int RETRANSMIT_RECEIVER_WINDOW_MULTIPLE = 16; + /** * {@inheritDoc} */ @@ -107,4 +112,19 @@ public boolean hasRequiredReceivers() { return true; } + + /** + * {@inheritDoc} + */ + public int maxRetransmissionLength( + final long resendPosition, + final int resendLength, + final int termBufferLength, + final int mtuLength) + { + final int estimatedWindowLength = Configuration.receiverWindowLength( + termBufferLength, Configuration.INITIAL_WINDOW_LENGTH_DEFAULT); + + return Math.min(RETRANSMIT_RECEIVER_WINDOW_MULTIPLE * estimatedWindowLength, resendLength); + } } diff --git a/aeron-driver/src/main/java/io/aeron/driver/media/WildcardPortManager.java b/aeron-driver/src/main/java/io/aeron/driver/media/WildcardPortManager.java index 6239fc1653..1ce1eaab2b 100644 --- a/aeron-driver/src/main/java/io/aeron/driver/media/WildcardPortManager.java +++ b/aeron-driver/src/main/java/io/aeron/driver/media/WildcardPortManager.java @@ -96,7 +96,7 @@ public void freeManagedPort(final InetSocketAddress bindAddress) */ public static int[] parsePortRange(final String value) { - if (null == value || value.length() == 0) + if (null == value || value.isEmpty()) { return EMPTY_PORT_RANGE; } diff --git a/aeron-driver/src/test/c/agent/aeron_driver_agent_test.cpp b/aeron-driver/src/test/c/agent/aeron_driver_agent_test.cpp index bdf313acae..1d8507d046 100644 --- a/aeron-driver/src/test/c/agent/aeron_driver_agent_test.cpp +++ b/aeron-driver/src/test/c/agent/aeron_driver_agent_test.cpp @@ -52,7 +52,7 @@ class DriverAgentTest : public testing::Test static void assert_all_events_disabled() { - for (int i = 0; i < AERON_DRIVER_EVENT_NUM_ELEMENTS; i++) + for (size_t i = 0; i < aeron_driver_agent_max_event_count(); i++) { auto event_id = static_cast(i); EXPECT_FALSE(aeron_driver_agent_is_event_enabled(event_id)); @@ -61,7 +61,7 @@ class DriverAgentTest : public testing::Test static void assert_all_events_enabled() { - for (int i = 0; i < AERON_DRIVER_EVENT_NUM_ELEMENTS; i++) + for (size_t i = 0; i < aeron_driver_agent_max_event_count(); i++) { auto event_id = static_cast(i); auto event_name = aeron_driver_agent_event_name(event_id); @@ -70,13 +70,12 @@ class DriverAgentTest : public testing::Test EXPECT_EQ(expected, aeron_driver_agent_is_event_enabled(event_id)) << event_name; } - EXPECT_FALSE(aeron_driver_agent_is_event_enabled(AERON_DRIVER_EVENT_NUM_ELEMENTS)); EXPECT_FALSE(aeron_driver_agent_is_event_enabled(AERON_DRIVER_EVENT_UNKNOWN_EVENT)); } static void assert_admin_events_enabled(const bool is_enabled) { - for (int i = 0; i < AERON_DRIVER_EVENT_NUM_ELEMENTS; i++) + for (size_t i = 0; i < aeron_driver_agent_max_event_count(); i++) { auto event_id = static_cast(i); if (AERON_DRIVER_EVENT_FRAME_IN != event_id && @@ -134,7 +133,7 @@ TEST_F(DriverAgentTest, allLoggingEventsShouldHaveUniqueNames) std::set names; std::string unknown_name = std::string(AERON_DRIVER_AGENT_EVENT_UNKNOWN_NAME); - for (int i = 0; i < AERON_DRIVER_EVENT_NUM_ELEMENTS; i++) + for (size_t i = 0; i < aeron_driver_agent_max_event_count(); i++) { auto event_id = static_cast(i); auto event_name = std::string(aeron_driver_agent_event_name(event_id)); @@ -150,7 +149,6 @@ TEST_F(DriverAgentTest, allLoggingEventsShouldHaveUniqueNames) } } - EXPECT_EQ(unknown_name, std::string(aeron_driver_agent_event_name(AERON_DRIVER_EVENT_NUM_ELEMENTS))); EXPECT_EQ(unknown_name, std::string(aeron_driver_agent_event_name(AERON_DRIVER_EVENT_UNKNOWN_EVENT))); } @@ -179,7 +177,6 @@ TEST_F(DriverAgentTest, shouldEnabledAdminLoggingEvents) EXPECT_FALSE(aeron_driver_agent_is_event_enabled(static_cast(0))); EXPECT_FALSE(aeron_driver_agent_is_event_enabled(static_cast(9))); EXPECT_FALSE(aeron_driver_agent_is_event_enabled(static_cast(27))); - EXPECT_FALSE(aeron_driver_agent_is_event_enabled(AERON_DRIVER_EVENT_NUM_ELEMENTS)); } TEST_F(DriverAgentTest, shouldEnableEventByName) @@ -244,7 +241,7 @@ TEST_F(DriverAgentTest, shouldDisableMultipleEventsSplitByComma) EXPECT_TRUE(aeron_driver_agent_logging_events_init( "all", "CMD_IN_REMOVE_COUNTER,33,NAME_RESOLUTION_NEIGHBOR_ADDED,CMD_OUT_ERROR,FRAME_OUT,")); - for (int i = 0; i < AERON_DRIVER_EVENT_NUM_ELEMENTS; i++) + for (size_t i = 0; i < aeron_driver_agent_max_event_count(); i++) { auto event_id = static_cast(i); bool expected = @@ -492,7 +489,7 @@ TEST_F(DriverAgentTest, shouldLogConductorToDriverCommand) command->correlated.client_id = 42; command->stream_id = 7; command->channel_length = 4; - memcpy(buffer + sizeof(aeron_publication_command_t), "test", 4); + memcpy(buffer + sizeof(aeron_publication_command_t), "test", sizeof("test")); aeron_driver_agent_conductor_to_driver_interceptor(AERON_COMMAND_ADD_SUBSCRIPTION, command, length, nullptr); @@ -589,7 +586,7 @@ TEST_F(DriverAgentTest, shouldLogConductorToClientCommand) command->correlated.client_id = 42; command->stream_id = 7; command->channel_length = 4; - memcpy(buffer + sizeof(aeron_publication_command_t), "test", 4); + memcpy(buffer + sizeof(aeron_publication_command_t), "test", strlen("test")); aeron_driver_agent_conductor_to_client_interceptor(nullptr, AERON_RESPONSE_ON_OPERATION_SUCCESS, command, length); @@ -678,8 +675,8 @@ TEST_F(DriverAgentTest, shouldLogSmallAgentLogFrames) aeron_driver_agent_logging_ring_buffer_init(); struct sockaddr_storage addr {}; - struct msghdr message; - struct iovec iov; + struct msghdr message {}; + struct iovec iov {}; const int message_length = 100; uint8_t buffer[message_length]; @@ -726,8 +723,8 @@ TEST_F(DriverAgentTest, shouldLogAgentLogFramesAndCopyUpToMaxFrameLengthMessage) aeron_driver_agent_logging_ring_buffer_init(); struct sockaddr_storage addr {}; - struct msghdr message; - struct iovec iov; + struct msghdr message {}; + struct iovec iov {}; const int message_length = AERON_MAX_FRAME_LENGTH * 5; uint8_t buffer[message_length]; @@ -1119,7 +1116,7 @@ TEST_F(DriverAgentTest, shouldLogSendChannelClose) auto message_handler = [](int32_t msg_type_id, const void *msg, size_t length, void *clientd) { - size_t *count = (size_t *)clientd; + auto *count = (size_t *)clientd; (*count)++; EXPECT_EQ(msg_type_id, AERON_DRIVER_EVENT_SEND_CHANNEL_CLOSE); @@ -1249,7 +1246,6 @@ TEST_F(DriverAgentTest, shouldNotAddDynamicDissectorIfDynamicDissectorEventIsDis aeron_driver_agent_generic_dissector_func_t dynamic_dissector = [](FILE *fpout, const char *log_header_str, const void *message, size_t len) { - }; EXPECT_EQ(-1, aeron_driver_agent_add_dynamic_dissector(dynamic_dissector)); @@ -1277,7 +1273,6 @@ TEST_F(DriverAgentTest, shouldAddDynamicDissectorIfDynamicDissectorEventIsEnable aeron_driver_agent_generic_dissector_func_t dynamic_dissector = [](FILE *fpout, const char *log_header_str, const void *message, size_t len) { - }; EXPECT_EQ(0, aeron_driver_agent_add_dynamic_dissector(dynamic_dissector)); @@ -1559,7 +1554,8 @@ TEST_F(DriverAgentTest, shouldInitializeOnNameLookupFunction) TEST_F(DriverAgentTest, shouldInitializeNameResolverFunctions) { - EXPECT_TRUE(aeron_driver_agent_logging_events_init("NAME_RESOLUTION_LOOKUP,NAME_RESOLUTION_RESOLVE,NAME_RESOLUTION_HOST_NAME", nullptr)); + EXPECT_TRUE(aeron_driver_agent_logging_events_init( + "NAME_RESOLUTION_LOOKUP,NAME_RESOLUTION_RESOLVE,NAME_RESOLUTION_HOST_NAME", nullptr)); aeron_driver_agent_init_logging_events_interceptors(m_context); EXPECT_NE(nullptr, m_context->on_name_resolve_func); diff --git a/aeron-driver/src/test/java/io/aeron/driver/media/WildcardPortManagerTest.java b/aeron-driver/src/test/java/io/aeron/driver/media/WildcardPortManagerTest.java index a78110e3d7..e134007de4 100644 --- a/aeron-driver/src/test/java/io/aeron/driver/media/WildcardPortManagerTest.java +++ b/aeron-driver/src/test/java/io/aeron/driver/media/WildcardPortManagerTest.java @@ -24,11 +24,11 @@ import static org.hamcrest.core.Is.is; import static org.junit.jupiter.api.Assertions.assertThrows; -public class WildcardPortManagerTest +class WildcardPortManagerTest { - public int[] portRange = new int[]{ 20000, 20003}; - public UdpChannel udpChannelPort0 = UdpChannel.parse("aeron:udp?endpoint=localhost:0"); - public UdpChannel udpChannelPubControl = UdpChannel.parse("aeron:udp?control=localhost:0"); + final int[] portRange = new int[]{ 20000, 20003}; + final UdpChannel udpChannelPort0 = UdpChannel.parse("aeron:udp?endpoint=localhost:0"); + final UdpChannel udpChannelPubControl = UdpChannel.parse("aeron:udp?control=localhost:0"); @Test void shouldAllocateConsecutivePortsInRange() throws BindException diff --git a/aeron-system-tests/src/test/java/io/aeron/PubAndSubTest.java b/aeron-system-tests/src/test/java/io/aeron/PubAndSubTest.java index 8fe1a8a068..9d58818d1b 100644 --- a/aeron-system-tests/src/test/java/io/aeron/PubAndSubTest.java +++ b/aeron-system-tests/src/test/java/io/aeron/PubAndSubTest.java @@ -672,19 +672,21 @@ void shouldNoticeDroppedSubscriber(final String channel) } } - @Test void shouldAllowSubscriptionsIfUsingTagsAndParametersAndAllMatch() { launch("aeron:ipc"); + final String pubChannel = "aeron:udp?control-mode=dynamic|control=127.0.0.1:9999"; final String channel = "aeron:udp?endpoint=127.0.0.1:0|control=127.0.0.1:9999|tags=1001"; try ( - Subscription ignore1 = subscribingClient.addSubscription(channel, 1000); - Subscription ignore2 = subscribingClient.addSubscription(channel, 1000)) + Publication pub = subscribingClient.addPublication(pubChannel, 1000); + Subscription sub1 = subscribingClient.addSubscription(channel, 1000); + Subscription sub2 = subscribingClient.addSubscription(channel, 1000)) { - Objects.requireNonNull(ignore1); - Objects.requireNonNull(ignore2); + Tests.awaitConnected(sub1); + Tests.awaitConnected(sub2); + Tests.awaitConnected(pub); } } diff --git a/aeron-test-support/src/main/java/io/aeron/test/SystemTestWatcher.java b/aeron-test-support/src/main/java/io/aeron/test/SystemTestWatcher.java index 64bdfd9095..5bdc321519 100644 --- a/aeron-test-support/src/main/java/io/aeron/test/SystemTestWatcher.java +++ b/aeron-test-support/src/main/java/io/aeron/test/SystemTestWatcher.java @@ -206,7 +206,7 @@ else if (0 != count.get()) } else { - directoryName = testMethod + "_" + System.nanoTime() + context.getUniqueId(); + directoryName = testMethod + "_" + System.nanoTime(); } } else diff --git a/build.gradle b/build.gradle index bf35ccf8b7..df408a2ffd 100644 --- a/build.gradle +++ b/build.gradle @@ -302,7 +302,7 @@ subprojects { options.charSet = 'UTF-8' options.links("https://www.javadoc.io/doc/org.agrona/agrona/${agronaVersion}/") - if (buildJavaVersion >= 21) { // early access JavaDoc location is different + if (buildJavaVersion > 21) { // early access JavaDoc location is different options.links("https://download.java.net/java/early_access/jdk${buildJavaVersion}/docs/api/") } else if (buildJavaVersion >= 11) { diff --git a/buildSrc/src/main/java/io/aeron/build/AsciidoctorPreprocessTask.java b/buildSrc/src/main/java/io/aeron/build/AsciidoctorPreprocessTask.java index 4c68e9bdf9..61493b6386 100644 --- a/buildSrc/src/main/java/io/aeron/build/AsciidoctorPreprocessTask.java +++ b/buildSrc/src/main/java/io/aeron/build/AsciidoctorPreprocessTask.java @@ -199,7 +199,7 @@ public void process(final Document document, final PreprocessorReader reader) errors.forEach((key, value) -> System.out.println("file: " + key + ", error count: " + value)); - if (0 < errors.size()) + if (!errors.isEmpty()) { throw new Exception("failed due to errors in parsing"); }