Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[pull] master from real-logic:master #1790

Merged
merged 22 commits into from
Sep 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
96ce23f
[Java] Verify publications/subscriptions are connected in shouldAllow…
mikeb01 Sep 19, 2023
89c6d76
[Java] Prevent directory resolution failures with parameterised tests.
mikeb01 Sep 19, 2023
6a2c159
[Java] Touch ups for RetransmitHandler.
mjpt777 Sep 20, 2023
b87b64a
[Java] Remove ea from Java 21 in CI.
mjpt777 Sep 20, 2023
1ea6a3b
[CI] Use JDK 21 GA and drop JDK 11.
vyazelenko Sep 20, 2023
8359107
[C] Put enum events in order.
mjpt777 Sep 20, 2023
48cafa2
Merge remote-tracking branch 'origin/master'
mjpt777 Sep 20, 2023
2f6663f
[Java] Fix javadoc link for JDK 21 GA.
vyazelenko Sep 20, 2023
7fac7bb
[C] Remove reliance of max event num in enum for driver agent.
mjpt777 Sep 20, 2023
89f430a
[C] Fix signed comparison.
mjpt777 Sep 20, 2023
1ccda35
[C] Fix signed comparison.
mjpt777 Sep 20, 2023
b10448a
[C] Declare void param.
mjpt777 Sep 20, 2023
bfe832c
[C] Keep loop types consistent.
mjpt777 Sep 20, 2023
029a5ed
[C] Keep loop types consistent.
mjpt777 Sep 20, 2023
80f4376
[C] Reduce memcpy to buffer size.
mjpt777 Sep 20, 2023
3390452
[Java]: add a clamp on retransmit length.
tmontgomery Sep 20, 2023
c3d4e7f
Merge remote-tracking branch 'origin/master'
tmontgomery Sep 20, 2023
cd907aa
[Java] Naming.
mjpt777 Sep 26, 2023
2b03173
[Java] Fix Aeron.getSubscription(long) JavaDoc. (#1505)
wojciech-adaptive Sep 26, 2023
6828c59
[Java] Encapsulate retransmit multiples constants.
mjpt777 Sep 26, 2023
69d55b1
[Java] Naming.
mjpt777 Sep 26, 2023
110b9e2
[Java] Warnings cleanup.
mjpt777 Sep 26, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci-low-cadence.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ jobs:
strategy:
fail-fast: false
matrix:
java: [ '8', '11', '17', '21-ea' ]
java: [ '8', '17', '21' ]
os: [ 'ubuntu-22.04', 'windows-latest' ]
steps:
- name: Checkout code
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ jobs:
strategy:
fail-fast: false
matrix:
java: [ '8', '11', '17', '21-ea' ]
java: [ '8', '17', '21' ]
os: [ 'ubuntu-22.04', 'windows-latest' ]
steps:
- name: Checkout code
Expand Down
4 changes: 2 additions & 2 deletions aeron-archive/src/main/java/io/aeron/archive/Archive.java
Original file line number Diff line number Diff line change
Expand Up @@ -3000,7 +3000,7 @@ public boolean ownsAeronClient()
* @param catalog {@link Catalog} describing the contents of the Archive.
* @return this for a fluent API.
*/
public Context catalog(final Catalog catalog)
Context catalog(final Catalog catalog)
{
this.catalog = catalog;
return this;
Expand All @@ -3011,7 +3011,7 @@ public Context catalog(final Catalog catalog)
*
* @return the {@link Catalog} describing the contents of the Archive.
*/
public Catalog catalog()
Catalog catalog()
{
return catalog;
}
Expand Down
9 changes: 5 additions & 4 deletions aeron-client/src/main/java/io/aeron/Aeron.java
Original file line number Diff line number Diff line change
Expand Up @@ -433,9 +433,10 @@ public long asyncAddSubscription(final String channel, final int streamId)
*
* @param registrationId returned from
* {@link #asyncAddSubscription(String, int, AvailableImageHandler, UnavailableImageHandler)}
* or {@link #asyncAddSubscription(String, int)}
* @return a new {@link ConcurrentPublication} when available otherwise null.
* @see #asyncAddPublication(String, int)
* or {@link #asyncAddSubscription(String, int)}
* @return a new {@link Subscription} when available otherwise null.
* @see #asyncAddSubscription(String, int)
* @see #asyncAddSubscription(String, int, AvailableImageHandler, UnavailableImageHandler)
*/
public Subscription getSubscription(final long registrationId)
{
Expand Down Expand Up @@ -1199,7 +1200,7 @@ Context logBuffersFactory(final LogBuffersFactory logBuffersFactory)
*
* @return the factory for making log buffers.
*/
public LogBuffersFactory logBuffersFactory()
LogBuffersFactory logBuffersFactory()
{
return logBuffersFactory;
}
Expand Down
2 changes: 1 addition & 1 deletion aeron-client/src/main/java/io/aeron/ChannelUri.java
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ public String toString()

sb.append(AERON_PREFIX).append(media);

if (params.size() > 0)
if (!params.isEmpty())
{
sb.append('?');

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -536,7 +536,7 @@ public void closePublication(final ErrorHandler errorHandler)
*/
public static ClusterMember[] parse(final String value)
{
if (null == value || value.length() == 0)
if (null == value || value.isEmpty())
{
return ClusterMember.EMPTY_MEMBERS;
}
Expand Down Expand Up @@ -653,7 +653,7 @@ public static String encodeAsString(final ClusterMember[] clusterMembers)
*/
public static String encodeAsString(final List<ClusterMember> clusterMembers)
{
if (0 == clusterMembers.size())
if (clusterMembers.isEmpty())
{
return "";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@
* The majority of cluster members determine consensus. Clusters should typically be 3 or 5 in population size.
* However, 2 node clusters are supported whereby both members must agree the log and in the event of failure the
* remaining member must be manually reconfigured as a single node cluster to progress.
*
* <h2>Protocol</h2>
* Messages are specified using <a href="https://github.com/real-logic/simple-binary-encoding" target="_blank">SBE</a>
* in this schema
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,6 @@
import static io.aeron.cluster.service.ClusteredServiceContainer.Configuration.MARK_FILE_DIR_PROP_NAME;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.*;

class ConsensusModuleContextTest
Expand Down
27 changes: 14 additions & 13 deletions aeron-driver/src/main/c/agent/aeron_driver_agent.c
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ static aeron_thread_t log_reader_thread;
static aeron_driver_agent_dynamic_dissector_entry_t *dynamic_dissector_entries = NULL;
static size_t num_dynamic_dissector_entries = 0;
static int64_t dynamic_dissector_index = 0;
static struct aeron_driver_agent_log_event_stct log_events[AERON_DRIVER_EVENT_NUM_ELEMENTS] =
static aeron_driver_agent_log_event_t log_events[] =
{
{ AERON_DRIVER_AGENT_EVENT_UNKNOWN_NAME, AERON_DRIVER_AGENT_EVENT_TYPE_UNKNOWN, false },
{ "FRAME_IN", AERON_DRIVER_AGENT_EVENT_TYPE_OTHER, false },
Expand Down Expand Up @@ -127,16 +127,17 @@ static struct aeron_driver_agent_log_event_stct log_events[AERON_DRIVER_EVENT_NU
{ "NAME_RESOLUTION_RESOLVE", AERON_DRIVER_AGENT_EVENT_TYPE_OTHER, false },
{ "GENERIC_MESSAGE", AERON_DRIVER_AGENT_EVENT_TYPE_OTHER, false },
{ "NAME_RESOLUTION_LOOKUP", AERON_DRIVER_AGENT_EVENT_TYPE_OTHER, false },
{ "NAME_RESOLUTION_HOST_NAME", AERON_DRIVER_AGENT_EVENT_TYPE_OTHER, false },
{ "NAME_RESOLUTION_HOST_NAME", AERON_DRIVER_AGENT_EVENT_TYPE_OTHER, false },
{ "ADD_DYNAMIC_DISSECTOR", AERON_DRIVER_AGENT_EVENT_TYPE_OTHER, false },
{ "DYNAMIC_DISSECTOR_EVENT", AERON_DRIVER_AGENT_EVENT_TYPE_OTHER, false },
};

typedef struct aeron_driver_agent_name_resolver_state_stct
#define AERON_DRIVER_EVENT_NUM_ELEMENTS (sizeof(log_events) / sizeof(aeron_driver_agent_log_event_t))

size_t aeron_driver_agent_max_event_count(void)
{
aeron_name_resolver_resolve_func_t delegate_resolve_func;
return AERON_DRIVER_EVENT_NUM_ELEMENTS;
}
aeron_driver_agent_name_resolver_state_t;

aeron_mpsc_rb_t *aeron_driver_agent_mpsc_rb(void)
{
Expand Down Expand Up @@ -189,7 +190,7 @@ void aeron_driver_agent_logging_ring_buffer_init(void)

if (aeron_mpsc_rb_init(&logging_mpsc_rb, rb_buffer, rb_length) < 0)
{
fprintf(stderr, "could not init logging mpwc_rb. exiting.\n");
fprintf(stderr, "could not init logging mpsc_rb. exiting.\n");
exit(EXIT_FAILURE);
}
}
Expand All @@ -216,7 +217,7 @@ static aeron_driver_agent_event_t aeron_driver_agent_event_name_to_id(const char
return AERON_DRIVER_EVENT_UNKNOWN_EVENT;
}

for (int i = 0; i < AERON_DRIVER_EVENT_NUM_ELEMENTS; i++)
for (size_t i = 0; i < AERON_DRIVER_EVENT_NUM_ELEMENTS; i++)
{
const char *name = log_events[i].name;
if (0 == strncmp(name, event_name, strlen(name) + 1))
Expand All @@ -230,7 +231,7 @@ static aeron_driver_agent_event_t aeron_driver_agent_event_name_to_id(const char

static inline bool is_valid_event_id(const int id)
{
return id >= 0 && id < AERON_DRIVER_EVENT_NUM_ELEMENTS;
return id >= 0 && id < (int)AERON_DRIVER_EVENT_NUM_ELEMENTS;
}

const char *aeron_driver_agent_event_name(const aeron_driver_agent_event_t id)
Expand All @@ -250,7 +251,7 @@ bool aeron_driver_agent_is_event_enabled(const aeron_driver_agent_event_t id)

static void aeron_driver_agent_set_enabled_all_events(const bool is_enabled)
{
for (int i = 0; i < AERON_DRIVER_EVENT_NUM_ELEMENTS; i++)
for (size_t i = 0; i < AERON_DRIVER_EVENT_NUM_ELEMENTS; i++)
{
const char *event_name = log_events[i].name;
if (!aeron_driver_agent_is_unknown_event(event_name))
Expand All @@ -262,7 +263,7 @@ static void aeron_driver_agent_set_enabled_all_events(const bool is_enabled)

static void aeron_driver_agent_set_enabled_admin_events(const bool is_enabled)
{
for (int i = 0; i < AERON_DRIVER_EVENT_NUM_ELEMENTS; i++)
for (size_t i = 0; i < AERON_DRIVER_EVENT_NUM_ELEMENTS; i++)
{
if (AERON_DRIVER_EVENT_FRAME_IN != i &&
AERON_DRIVER_EVENT_FRAME_OUT != i &&
Expand All @@ -280,7 +281,7 @@ static void aeron_driver_agent_set_enabled_admin_events(const bool is_enabled)

static void aeron_driver_agent_set_enabled_specific_events(const uint8_t type, const bool is_enabled)
{
for (int i = 0; i < AERON_DRIVER_EVENT_NUM_ELEMENTS; i++)
for (size_t i = 0; i < AERON_DRIVER_EVENT_NUM_ELEMENTS; i++)
{
if (type == log_events[i].type)
{
Expand All @@ -291,7 +292,7 @@ static void aeron_driver_agent_set_enabled_specific_events(const uint8_t type, c

static bool any_event_enabled(const uint8_t type)
{
for (int i = 0; i < AERON_DRIVER_EVENT_NUM_ELEMENTS; i++)
for (size_t i = 0; i < AERON_DRIVER_EVENT_NUM_ELEMENTS; i++)
{
if (type == log_events[i].type && log_events[i].enabled)
{
Expand Down Expand Up @@ -474,7 +475,7 @@ bool aeron_driver_agent_logging_events_init(const char *event_log, const char *e

void aeron_driver_agent_logging_events_free(void)
{
for (int i = 0; i < AERON_DRIVER_EVENT_NUM_ELEMENTS; i++)
for (size_t i = 0; i < AERON_DRIVER_EVENT_NUM_ELEMENTS; i++)
{
log_events[i].enabled = false;
}
Expand Down
10 changes: 5 additions & 5 deletions aeron-driver/src/main/c/agent/aeron_driver_agent.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

typedef enum aeron_driver_agent_event_enum
{
AERON_DRIVER_EVENT_UNKNOWN_EVENT = -1,
AERON_DRIVER_EVENT_FRAME_IN = 1,
AERON_DRIVER_EVENT_FRAME_OUT = 2,
AERON_DRIVER_EVENT_CMD_IN_ADD_PUBLICATION = 3,
Expand Down Expand Up @@ -77,11 +78,8 @@ typedef enum aeron_driver_agent_event_enum
AERON_DRIVER_EVENT_NAME_RESOLUTION_HOST_NAME = 53,

// C-specific events. Note: event IDs are dynamic to avoid gaps in the sparse arrays.
AERON_DRIVER_EVENT_ADD_DYNAMIC_DISSECTOR,
AERON_DRIVER_EVENT_DYNAMIC_DISSECTOR_EVENT,

AERON_DRIVER_EVENT_NUM_ELEMENTS, // number of elements in this enum (including gaps)
AERON_DRIVER_EVENT_UNKNOWN_EVENT = -1
AERON_DRIVER_EVENT_ADD_DYNAMIC_DISSECTOR = 54,
AERON_DRIVER_EVENT_DYNAMIC_DISSECTOR_EVENT = 55
}
aeron_driver_agent_event_t;

Expand Down Expand Up @@ -209,6 +207,8 @@ typedef int (*aeron_driver_context_init_t)(aeron_driver_context_t **);

int aeron_driver_agent_context_init(aeron_driver_context_t *context);

size_t aeron_driver_agent_max_event_count(void);

const char *aeron_driver_agent_dissect_log_header(
int64_t time_ns,
aeron_driver_agent_event_t event_id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,11 @@ public abstract class AbstractMinMulticastFlowControl
extends AbstractMinMulticastFlowControlRhsPadding
implements FlowControl
{
static final Receiver[] EMPTY_RECEIVERS = new Receiver[0];
/**
* Multiple of receiver window to allow for a retransmit action.
*/
private static final int RETRANSMIT_RECEIVER_WINDOW_MULTIPLE = 16;
private static final Receiver[] EMPTY_RECEIVERS = new Receiver[0];

private final boolean isGroupTagAware;
private volatile boolean hasRequiredReceivers;
Expand Down Expand Up @@ -192,6 +196,21 @@ public boolean hasRequiredReceivers()
return hasRequiredReceivers;
}

/**
* {@inheritDoc}
*/
public int maxRetransmissionLength(
final long resendPosition,
final int resendLength,
final int termBufferLength,
final int mtuLength)
{
final int estimatedWindowLength = Configuration.receiverWindowLength(
termBufferLength, Configuration.INITIAL_WINDOW_LENGTH_DEFAULT);

return Math.min(RETRANSMIT_RECEIVER_WINDOW_MULTIPLE * estimatedWindowLength, resendLength);
}

/**
* Process a received status message.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ void onCreatePublicationImage(
final ArrayList<SubscriberPosition> subscriberPositions = createSubscriberPositions(
sessionId, streamId, channelEndpoint, joinPosition);

if (subscriberPositions.size() > 0)
if (!subscriberPositions.isEmpty())
{
RawLog rawLog = null;
CongestionControl congestionControl = null;
Expand Down
15 changes: 15 additions & 0 deletions aeron-driver/src/main/java/io/aeron/driver/FlowControl.java
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,21 @@ void initialize(
*/
boolean hasRequiredReceivers();

/**
* The maximum window length allowed to retransmit per NAK.
*
* @param resendPosition of the NAK.
* @param resendLength of the NAK.
* @param termBufferLength of the publication.
* @param mtuLength of the publication.
* @return the maximum window length allowed to retransmit per NAK.
*/
int maxRetransmissionLength(
long resendPosition,
int resendLength,
int termBufferLength,
int mtuLength);

/**
* {@inheritDoc}
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -520,14 +520,14 @@ private void cleanBufferTo(final long position)
final long cleanPosition = this.cleanPosition;
if (position > cleanPosition)
{
final UnsafeBuffer dirtyTerm = termBuffers[indexByPosition(cleanPosition, positionBitsToShift)];
final UnsafeBuffer dirtyTermBuffer = termBuffers[indexByPosition(cleanPosition, positionBitsToShift)];
final int bytesForCleaning = (int)(position - cleanPosition);
final int bufferCapacity = termBufferLength;
final int termOffset = (int)cleanPosition & (bufferCapacity - 1);
final int length = Math.min(bytesForCleaning, bufferCapacity - termOffset);

dirtyTerm.setMemory(termOffset + SIZE_OF_LONG, length - SIZE_OF_LONG, (byte)0);
dirtyTerm.putLongOrdered(termOffset, 0);
dirtyTermBuffer.setMemory(termOffset + SIZE_OF_LONG, length - SIZE_OF_LONG, (byte)0);
dirtyTermBuffer.putLongOrdered(termOffset, 0);
this.cleanPosition = cleanPosition + length;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ public class MaxMulticastFlowControl implements FlowControl
*/
public static final MaxMulticastFlowControl INSTANCE = new MaxMulticastFlowControl();

/**
* Multiple of receiver window to allow for a retransmit action.
*/
private static final int RETRANSMIT_RECEIVER_WINDOW_MULTIPLE = 4;

/**
* {@inheritDoc}
*/
Expand Down Expand Up @@ -113,4 +118,19 @@ public boolean hasRequiredReceivers()
{
return true;
}

/**
* {@inheritDoc}
*/
public int maxRetransmissionLength(
final long resendPosition,
final int resendLength,
final int termBufferLength,
final int mtuLength)
{
final int estimatedWindowLength = Configuration.receiverWindowLength(
termBufferLength, Configuration.INITIAL_WINDOW_LENGTH_DEFAULT);

return Math.min(RETRANSMIT_RECEIVER_WINDOW_MULTIPLE * estimatedWindowLength, resendLength);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,8 @@ public void resend(final int termId, final int termOffset, final int length)
final UnsafeBuffer termBuffer = termBuffers[activeIndex];
final ByteBuffer sendBuffer = sendBuffers[activeIndex];

int remainingBytes = length;
int remainingBytes = flowControl.maxRetransmissionLength(
resendPosition, length, termBufferLength, mtuLength);
int bytesSent = 0;
int offset = termOffset;
do
Expand Down Expand Up @@ -777,13 +778,13 @@ private void cleanBufferTo(final long position)
final long cleanPosition = this.cleanPosition;
if (position > cleanPosition)
{
final UnsafeBuffer dirtyTerm = termBuffers[indexByPosition(cleanPosition, positionBitsToShift)];
final UnsafeBuffer dirtyTermBuffer = termBuffers[indexByPosition(cleanPosition, positionBitsToShift)];
final int bytesForCleaning = (int)(position - cleanPosition);
final int termOffset = (int)cleanPosition & termLengthMask;
final int length = Math.min(bytesForCleaning, termBufferLength - termOffset);

dirtyTerm.setMemory(termOffset + SIZE_OF_LONG, length - SIZE_OF_LONG, (byte)0);
dirtyTerm.putLongOrdered(termOffset, 0);
dirtyTermBuffer.setMemory(termOffset + SIZE_OF_LONG, length - SIZE_OF_LONG, (byte)0);
dirtyTermBuffer.putLongOrdered(termOffset, 0);
this.cleanPosition = cleanPosition + length;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -903,12 +903,12 @@ private void cleanBufferTo(final long position)
if (position > cleanPosition)
{
final int bytesForCleaning = (int)(position - cleanPosition);
final UnsafeBuffer dirtyTerm = termBuffers[indexByPosition(cleanPosition, positionBitsToShift)];
final UnsafeBuffer dirtyTermBuffer = termBuffers[indexByPosition(cleanPosition, positionBitsToShift)];
final int termOffset = (int)cleanPosition & termLengthMask;
final int length = Math.min(bytesForCleaning, dirtyTerm.capacity() - termOffset);
final int length = Math.min(bytesForCleaning, dirtyTermBuffer.capacity() - termOffset);

dirtyTerm.setMemory(termOffset, length - SIZE_OF_LONG, (byte)0);
dirtyTerm.putLongOrdered(termOffset + (length - SIZE_OF_LONG), 0);
dirtyTermBuffer.setMemory(termOffset, length - SIZE_OF_LONG, (byte)0);
dirtyTermBuffer.putLongOrdered(termOffset + (length - SIZE_OF_LONG), 0);
this.cleanPosition = cleanPosition + length;
}
}
Expand Down
Loading