Skip to content

Commit

Permalink
[C] Make broadcast_receiver scratch buffer expandable to accommodate …
Browse files Browse the repository at this point in the history
…for large responses from the media driver.
  • Loading branch information
vyazelenko committed Dec 12, 2024
1 parent 16f8f9a commit 2dfe144
Show file tree
Hide file tree
Showing 6 changed files with 98 additions and 19 deletions.
2 changes: 2 additions & 0 deletions aeron-client/src/main/c/aeron_client_conductor.c
Original file line number Diff line number Diff line change
Expand Up @@ -709,6 +709,8 @@ void aeron_client_conductor_on_close(aeron_client_conductor_t *conductor)
aeron_client_conductor_delete_lingering_resource(&conductor->lingering_resources.array[i]);
}

aeron_broadcast_receiver_close(&conductor->to_client_buffer);

aeron_int64_to_ptr_hash_map_delete(&conductor->log_buffer_by_id_map);
aeron_int64_to_ptr_hash_map_delete(&conductor->resource_by_id_map);
aeron_array_to_ptr_hash_map_delete(&conductor->image_by_key_map);
Expand Down
61 changes: 45 additions & 16 deletions aeron-client/src/main/c/concurrent/aeron_broadcast_receiver.c
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,28 @@
#include <string.h>
#include <errno.h>
#include <inttypes.h>
#include "aeron_alloc.h"
#include "concurrent/aeron_broadcast_receiver.h"
#include "util/aeron_error.h"

int aeron_broadcast_receiver_init(aeron_broadcast_receiver_t *receiver, void *buffer, size_t length)
{
const size_t capacity = length - AERON_BROADCAST_BUFFER_TRAILER_LENGTH;
int result = -1;
receiver->scratch_buffer = NULL;
receiver->scratch_buffer_capacity = 0;

if (AERON_BROADCAST_IS_CAPACITY_VALID(capacity))
{
size_t scratch_buffer_capacity = AERON_BROADCAST_SCRATCH_BUFFER_LENGTH_DEFAULT;
uint8_t *scratch_buffer;
if (aeron_alloc((void**)&scratch_buffer, scratch_buffer_capacity) < 0)
{
AERON_APPEND_ERR("failed to allocate scratch buffer of capacity: %" PRIu64, scratch_buffer_capacity);
return -1;
}

receiver->scratch_buffer = scratch_buffer;
receiver->scratch_buffer_capacity = scratch_buffer_capacity;
receiver->buffer = buffer;
receiver->capacity = capacity;
receiver->mask = capacity - 1u;
Expand All @@ -40,21 +52,22 @@ int aeron_broadcast_receiver_init(aeron_broadcast_receiver_t *receiver, void *bu
receiver->record_offset = (size_t)latest & receiver->mask;
receiver->lapped_count = 0;

result = 0;
return 0;
}
else
{
AERON_SET_ERR(EINVAL, "Capacity: %" PRIu64 " invalid, must be power of two", (uint64_t)capacity);
return -1;
}

return result;
}

extern bool aeron_broadcast_receiver_validate(aeron_broadcast_receiver_t *receiver);

extern bool aeron_broadcast_receiver_validate_at(aeron_broadcast_receiver_t *receiver, int64_t cursor);

extern bool aeron_broadcast_receiver_receive_next(aeron_broadcast_receiver_t *receiver);
int aeron_broadcast_receiver_close(aeron_broadcast_receiver_t *receiver)
{
aeron_free(receiver->scratch_buffer);
receiver->scratch_buffer = NULL;
receiver->scratch_buffer_capacity = 0;
return 0;
}

int aeron_broadcast_receiver_receive(
aeron_broadcast_receiver_t *receiver, aeron_broadcast_receiver_handler_t handler, void *clientd)
Expand All @@ -75,14 +88,24 @@ int aeron_broadcast_receiver_receive(

const size_t length = (size_t)record->length - AERON_BROADCAST_RECORD_HEADER_LENGTH;

if (length > sizeof(receiver->scratch_buffer))
if (length > receiver->scratch_buffer_capacity)
{
AERON_SET_ERR(
EINVAL,
"scratch buffer too small, required: %" PRIu64 ", found: %" PRIu64,
(uint64_t) length,
(uint64_t) sizeof(receiver->scratch_buffer));
return -1;
size_t new_scratch_buffer_capacity = receiver->scratch_buffer_capacity;
while (new_scratch_buffer_capacity < length)
{
new_scratch_buffer_capacity += (new_scratch_buffer_capacity >> 1);
}

uint8_t *new_scratch_buffer;
if (aeron_alloc((void**)&new_scratch_buffer, new_scratch_buffer_capacity) < 0)
{
AERON_APPEND_ERR("failed to allocate scratch buffer of capacity: %" PRIu64, new_scratch_buffer_capacity);
return -1;
}

aeron_free(receiver->scratch_buffer);
receiver->scratch_buffer = new_scratch_buffer;
receiver->scratch_buffer_capacity = new_scratch_buffer_capacity;
}

const int32_t type_id = record->msg_type_id;
Expand All @@ -104,3 +127,9 @@ int aeron_broadcast_receiver_receive(

return messages_received;
}

extern bool aeron_broadcast_receiver_validate(aeron_broadcast_receiver_t *receiver);

extern bool aeron_broadcast_receiver_validate_at(aeron_broadcast_receiver_t *receiver, int64_t cursor);

extern bool aeron_broadcast_receiver_receive_next(aeron_broadcast_receiver_t *receiver);
6 changes: 4 additions & 2 deletions aeron-client/src/main/c/concurrent/aeron_broadcast_receiver.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,16 @@
#include "aeron_atomic.h"
#include "aeron_broadcast_descriptor.h"

#define AERON_BROADCAST_SCRATCH_BUFFER_LENGTH (4096u)
#define AERON_BROADCAST_SCRATCH_BUFFER_LENGTH_DEFAULT (4096u)

typedef struct aeron_broadcast_receiver_stct
{
uint8_t scratch_buffer[AERON_BROADCAST_SCRATCH_BUFFER_LENGTH];
uint8_t *scratch_buffer;
uint8_t *buffer;
aeron_broadcast_descriptor_t *descriptor;
size_t capacity;
size_t mask;
size_t scratch_buffer_capacity;

size_t record_offset;
int64_t cursor;
Expand All @@ -41,6 +42,7 @@ aeron_broadcast_receiver_t;
typedef void (*aeron_broadcast_receiver_handler_t)(int32_t type_id, uint8_t *buffer, size_t length, void *clientd);

int aeron_broadcast_receiver_init(aeron_broadcast_receiver_t *receiver, void *buffer, size_t length);
int aeron_broadcast_receiver_close(aeron_broadcast_receiver_t *receiver);

inline bool aeron_broadcast_receiver_validate_at(aeron_broadcast_receiver_t *receiver, int64_t cursor)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,29 @@ TEST_F(BroadcastReceiverTest, shouldCalculateCapacityForBuffer)

ASSERT_EQ(aeron_broadcast_receiver_init(&receiver, m_buffer.data(), m_buffer.size()), 0);
EXPECT_EQ(receiver.capacity, BUFFER_SZ - AERON_BROADCAST_BUFFER_TRAILER_LENGTH);
EXPECT_EQ(receiver.scratch_buffer_capacity, AERON_BROADCAST_SCRATCH_BUFFER_LENGTH_DEFAULT);

EXPECT_EQ(0, aeron_broadcast_receiver_close(&receiver));
}

TEST_F(BroadcastReceiverTest, shouldFreeScratchBuffer)
{
aeron_broadcast_receiver_t receiver;

ASSERT_EQ(aeron_broadcast_receiver_init(&receiver, m_buffer.data(), m_buffer.size()), 0);
ASSERT_EQ(aeron_broadcast_receiver_close(&receiver), 0);
EXPECT_EQ(receiver.scratch_buffer, nullptr);
EXPECT_EQ(receiver.scratch_buffer_capacity, 0);
}

TEST_F(BroadcastReceiverTest, shouldErrorForCapacityNotPowerOfTwo)
{
aeron_broadcast_receiver_t receiver;

ASSERT_EQ(aeron_broadcast_receiver_init(&receiver, m_buffer.data(), m_buffer.size() - 1), -1);

EXPECT_EQ(receiver.scratch_buffer, nullptr);
EXPECT_EQ(receiver.scratch_buffer_capacity, 0);
}

TEST_F(BroadcastReceiverTest, shouldNotBeLappedBeforeReception)
Expand All @@ -67,6 +83,8 @@ TEST_F(BroadcastReceiverTest, shouldNotBeLappedBeforeReception)
ASSERT_EQ(aeron_broadcast_receiver_init(&receiver, m_buffer.data(), m_buffer.size()), 0);

EXPECT_EQ(receiver.lapped_count, 0);

EXPECT_EQ(0, aeron_broadcast_receiver_close(&receiver));
}

TEST_F(BroadcastReceiverTest, shouldNotReceiveFromEmptyBuffer)
Expand All @@ -78,6 +96,8 @@ TEST_F(BroadcastReceiverTest, shouldNotReceiveFromEmptyBuffer)
receiver.descriptor->tail_counter = 0;

EXPECT_FALSE(aeron_broadcast_receiver_receive_next(&receiver));

EXPECT_EQ(0, aeron_broadcast_receiver_close(&receiver));
}

TEST_F(BroadcastReceiverTest, shouldReceiveFirstMessageFromBuffer)
Expand Down Expand Up @@ -107,6 +127,8 @@ TEST_F(BroadcastReceiverTest, shouldReceiveFirstMessageFromBuffer)
EXPECT_EQ(record->msg_type_id, MSG_TYPE_ID);
EXPECT_EQ(receiver.record_offset, record_offset);
EXPECT_TRUE(aeron_broadcast_receiver_validate(&receiver));

EXPECT_EQ(0, aeron_broadcast_receiver_close(&receiver));
}

TEST_F(BroadcastReceiverTest, shouldReceiveTwoMessagesFromBuffer)
Expand Down Expand Up @@ -150,6 +172,8 @@ TEST_F(BroadcastReceiverTest, shouldReceiveTwoMessagesFromBuffer)
EXPECT_EQ(record->msg_type_id, MSG_TYPE_ID);
EXPECT_EQ(receiver.record_offset, record_offset_two);
EXPECT_TRUE(aeron_broadcast_receiver_validate(&receiver));

EXPECT_EQ(0, aeron_broadcast_receiver_close(&receiver));
}

TEST_F(BroadcastReceiverTest, shouldLateJoinTransmission)
Expand Down Expand Up @@ -181,6 +205,8 @@ TEST_F(BroadcastReceiverTest, shouldLateJoinTransmission)
EXPECT_EQ(receiver.record_offset, record_offset);
EXPECT_TRUE(aeron_broadcast_receiver_validate(&receiver));
EXPECT_GT(receiver.lapped_count, 0);

EXPECT_EQ(0, aeron_broadcast_receiver_close(&receiver));
}

TEST_F(BroadcastReceiverTest, shouldCopeWithPaddingRecordAndWrapOfBufferToNextRecord)
Expand Down Expand Up @@ -226,6 +252,8 @@ TEST_F(BroadcastReceiverTest, shouldCopeWithPaddingRecordAndWrapOfBufferToNextRe
EXPECT_EQ(record->msg_type_id, MSG_TYPE_ID);
EXPECT_EQ(receiver.record_offset, record_offset);
EXPECT_TRUE(aeron_broadcast_receiver_validate(&receiver));

EXPECT_EQ(0, aeron_broadcast_receiver_close(&receiver));
}

TEST_F(BroadcastReceiverTest, shouldDealWithRecordBecomingInvalidDueToOverwrite)
Expand Down Expand Up @@ -258,4 +286,6 @@ TEST_F(BroadcastReceiverTest, shouldDealWithRecordBecomingInvalidDueToOverwrite)
receiver.descriptor->tail_intent_counter = static_cast<int64_t>(tail + (CAPACITY - aligned_record_length));

EXPECT_FALSE(aeron_broadcast_receiver_validate(&receiver));

EXPECT_EQ(0, aeron_broadcast_receiver_close(&receiver));
}
16 changes: 16 additions & 0 deletions aeron-driver/src/test/c/aeron_driver_conductor_pub_sub_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,22 @@ INSTANTIATE_TEST_SUITE_P(
return std::string(info.param->m_name);
});

TEST_P(DriverConductorPubSubTest, shouldRejectAddPublicationIfChannelIsTooLong)
{
const auto channel = std::string(GetParam()->m_channel).append("|alias=").append(AERON_URI_MAX_LENGTH, 'x');
int64_t client_id = nextCorrelationId();
int64_t pub_id = nextCorrelationId();

ASSERT_EQ(addPublication(client_id, pub_id, channel.c_str(), STREAM_ID_1, false), 0);

doWorkUntilDone();

EXPECT_CALL(m_mockCallbacks, broadcastToClient(AERON_RESPONSE_ON_ERROR, _, _))
.With(IsError(pub_id));

readAllBroadcastsFromConductor(mock_broadcast_handler);
}

TEST_P(DriverConductorPubSubTest, shouldBeAbleToAddAndRemoveSingleNetworkPublication)
{
const char *channel = GetParam()->m_channel;
Expand Down
2 changes: 1 addition & 1 deletion aeron-driver/src/test/c/aeron_driver_conductor_test.h
Original file line number Diff line number Diff line change
Expand Up @@ -655,7 +655,7 @@ class DriverConductorTest
}

protected:
uint8_t m_command_buffer[AERON_MAX_PATH] = {};
uint8_t m_command_buffer[128 * 1024] = {};
TestDriverContext m_context = {};
TestDriverConductor m_conductor;
aeron_broadcast_receiver_t m_broadcast_receiver = {};
Expand Down

0 comments on commit 2dfe144

Please sign in to comment.