Skip to content

Commit

Permalink
[C] Handle failed log buffer delete in C Media Driver (#1073)
Browse files Browse the repository at this point in the history
* [C] Add free function to the managed resources.

* [C] Add aeron_map_raw_log_free function.

* [C] Implement retry when freeing log buffers.

* [C] Add tests that verify re-try on freeing the log buffers.

* [C] Continue rename of the log functions.

* [C] Pad millis with leading zeroes when less than three digits.

* [Java] Reduce timer interval to 15ms to force tests to fail.

* [Java] Ensure that driver output is always printed in the order they were created.

* [C] Do not perform action when in wrong state, i.e. prevent accessing freed buffers when close operation is in progress.
  • Loading branch information
vyazelenko authored Oct 13, 2020
1 parent d23c19f commit 2042835
Show file tree
Hide file tree
Showing 21 changed files with 471 additions and 48 deletions.
30 changes: 19 additions & 11 deletions aeron-client/src/main/c/util/aeron_fileutil.c
Original file line number Diff line number Diff line change
Expand Up @@ -518,25 +518,33 @@ int aeron_raw_log_map_existing(aeron_mapped_raw_log_t *mapped_raw_log, const cha

int aeron_raw_log_close(aeron_mapped_raw_log_t *mapped_raw_log, const char *filename)
{
int result = 0;

if (mapped_raw_log->mapped_file.addr != NULL)
if (!aeron_raw_log_free(mapped_raw_log, filename))
{
if ((result = aeron_unmap(&mapped_raw_log->mapped_file)) < 0)
{
return -1;
}
aeron_set_err_from_last_err_code("%s:%d", __FILE__, __LINE__);
return -1;
}

return 0;
}

if (NULL != filename && remove(filename) < 0)
bool aeron_raw_log_free(aeron_mapped_raw_log_t *mapped_raw_log, const char *filename)
{
if (NULL != mapped_raw_log->mapped_file.addr)
{
if (aeron_unmap(&mapped_raw_log->mapped_file) < 0)
{
aeron_set_err_from_last_err_code("%s:%d", __FILE__, __LINE__);
return -1;
return false;
}

mapped_raw_log->mapped_file.addr = NULL;
}

return result;
if (NULL != filename && remove(filename) < 0)
{
return false;
}

return true;
}

#if defined(__clang__)
Expand Down
3 changes: 3 additions & 0 deletions aeron-client/src/main/c/util/aeron_fileutil.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ size_t aeron_temp_filename(char *filename, size_t length);

typedef int (*aeron_raw_log_map_func_t)(aeron_mapped_raw_log_t *, const char *, bool, uint64_t, uint64_t);
typedef int (*aeron_raw_log_close_func_t)(aeron_mapped_raw_log_t *, const char *filename);
typedef bool (*aeron_raw_log_free_func_t)(aeron_mapped_raw_log_t *, const char *filename);

int aeron_raw_log_map(
aeron_mapped_raw_log_t *mapped_raw_log,
Expand All @@ -107,4 +108,6 @@ int aeron_raw_log_map_existing(aeron_mapped_raw_log_t *mapped_raw_log, const cha

int aeron_raw_log_close(aeron_mapped_raw_log_t *mapped_raw_log, const char *filename);

bool aeron_raw_log_free(aeron_mapped_raw_log_t *mapped_raw_log, const char *filename);

#endif //AERON_FILEUTIL_H
1 change: 1 addition & 0 deletions aeron-client/src/test/c/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,4 @@ aeron_c_client_test(subscription_test aeron_subscription_test.cpp)
aeron_c_client_test(image_test aeron_image_test.cpp)
aeron_c_client_test(fragment_assembler_test aeron_fragment_assembler_test.cpp)
aeron_c_client_test(array_to_ptr_hash_map_test collections/aeron_array_to_ptr_hash_map_test.cpp)
aeron_c_client_test(aeron_fileutil_test util/aeron_fileutil_test.cpp)
122 changes: 122 additions & 0 deletions aeron-client/src/test/c/util/aeron_fileutil_test.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
* Copyright 2014-2020 Real Logic Limited.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <exception>
#include <functional>

#include <gtest/gtest.h>

extern "C"
{
#include "util/aeron_fileutil.h"
#include "util/aeron_error.h"
}

class FileUtilTest : public testing::Test {
public:
FileUtilTest() {
}
};

TEST_F(FileUtilTest, rawLogCloseShouldUnmapAndDeleteLogFile) {
aeron_mapped_raw_log_t mapped_raw_log = {};
const char *file = "test_close_unused_file.log";
const size_t file_length = 16384;
ASSERT_EQ(0, aeron_raw_log_map(&mapped_raw_log, file, true, 4096, 4096));

EXPECT_NE(nullptr, mapped_raw_log.mapped_file.addr);
EXPECT_EQ(file_length, mapped_raw_log.mapped_file.length);
EXPECT_EQ((int64_t)file_length, aeron_file_length(file));

ASSERT_EQ(0, aeron_raw_log_close(&mapped_raw_log, file));

EXPECT_EQ(nullptr, mapped_raw_log.mapped_file.addr);
EXPECT_EQ(file_length, mapped_raw_log.mapped_file.length);
EXPECT_EQ(-1, aeron_file_length(file));
}

TEST_F(FileUtilTest, rawLogFreeShouldUnmapAndDeleteLogFile) {
aeron_mapped_raw_log_t mapped_raw_log = {};
const char *file = "test_free_unused_file.log";
const size_t file_length = 16384;
ASSERT_EQ(0, aeron_raw_log_map(&mapped_raw_log, file, true, 4096, 4096));

EXPECT_NE(nullptr, mapped_raw_log.mapped_file.addr);
EXPECT_EQ(file_length, mapped_raw_log.mapped_file.length);
EXPECT_EQ((int64_t)file_length, aeron_file_length(file));

ASSERT_EQ(true, aeron_raw_log_free(&mapped_raw_log, file));

EXPECT_EQ(nullptr, mapped_raw_log.mapped_file.addr);
EXPECT_EQ(file_length, mapped_raw_log.mapped_file.length);
EXPECT_EQ(-1, aeron_file_length(file));
}

TEST_F(FileUtilTest, rawLogCloseShouldNotDeleteFileIfUnmapFails) {
aeron_mapped_raw_log_t mapped_raw_log = {};
const char *file = "test_close_unmap_fails.log";
const size_t file_length = 16384;
ASSERT_EQ(0, aeron_raw_log_map(&mapped_raw_log, file, true, 4096, 4096));
const auto mapped_addr = mapped_raw_log.mapped_file.addr;
mapped_raw_log.mapped_file.addr = reinterpret_cast<void *>(-1);

ASSERT_EQ(-1, aeron_raw_log_close(&mapped_raw_log, file));
EXPECT_EQ((int64_t)file_length, aeron_file_length(file));

mapped_raw_log.mapped_file.addr = mapped_addr;
ASSERT_EQ(0, aeron_raw_log_close(&mapped_raw_log, file));
EXPECT_EQ(-1, aeron_file_length(file));
}

TEST_F(FileUtilTest, rawLogFreeShouldNotDeleteFileIfUnmapFails) {
aeron_mapped_raw_log_t mapped_raw_log = {};
const char *file = "test_free_unmap_fails.log";
const size_t file_length = 16384;
ASSERT_EQ(0, aeron_raw_log_map(&mapped_raw_log, file, true, 4096, 4096));
const auto mapped_addr = mapped_raw_log.mapped_file.addr;
mapped_raw_log.mapped_file.addr = reinterpret_cast<void *>(-1);

ASSERT_EQ(false, aeron_raw_log_free(&mapped_raw_log, file));
EXPECT_EQ((int64_t)file_length, aeron_file_length(file));

mapped_raw_log.mapped_file.addr = mapped_addr;
ASSERT_EQ(true, aeron_raw_log_free(&mapped_raw_log, file));
EXPECT_EQ(-1, aeron_file_length(file));
}

TEST_F(FileUtilTest, rawLogCloseShouldReturnErrorIfFileDeleteFailsAndSetError) {
aeron_mapped_raw_log_t mapped_raw_log = {};
const char *file = "test_close_delete_unknown_file.log";
aeron_set_err(0, "");
ASSERT_EQ(-1, aeron_file_length(file));
ASSERT_EQ(0, aeron_errcode());

ASSERT_EQ(-1, aeron_raw_log_close(&mapped_raw_log, file));
EXPECT_NE(0, aeron_errcode());
EXPECT_NE(std::string(""), std::string(aeron_errmsg()));
}

TEST_F(FileUtilTest, rawLogFreeShouldReturnErrorIfFileDeleteFails) {
aeron_mapped_raw_log_t mapped_raw_log = {};
const char *file = "test_free_delete_unknown_file.log";
aeron_set_err(0, "");
ASSERT_EQ(-1, aeron_file_length(file));
ASSERT_EQ(0, aeron_errcode());

ASSERT_EQ(false, aeron_raw_log_free(&mapped_raw_log, file));
EXPECT_EQ(0, aeron_errcode());
EXPECT_EQ(std::string(""), std::string(aeron_errmsg()));
}
79 changes: 75 additions & 4 deletions aeron-driver/src/main/c/aeron_driver_conductor.c
Original file line number Diff line number Diff line change
Expand Up @@ -212,48 +212,55 @@ int aeron_driver_conductor_init(aeron_driver_conductor_t *conductor, aeron_drive
conductor->clients.on_time_event = aeron_client_on_time_event;
conductor->clients.has_reached_end_of_life = aeron_client_has_reached_end_of_life;
conductor->clients.delete_func = aeron_client_delete;
conductor->clients.free_func = aeron_client_free;

conductor->ipc_publications.array = NULL;
conductor->ipc_publications.length = 0;
conductor->ipc_publications.capacity = 0;
conductor->ipc_publications.on_time_event = aeron_ipc_publication_entry_on_time_event;
conductor->ipc_publications.has_reached_end_of_life = aeron_ipc_publication_entry_has_reached_end_of_life;
conductor->ipc_publications.delete_func = aeron_ipc_publication_entry_delete;
conductor->ipc_publications.free_func = aeron_ipc_publication_entry_free;

conductor->network_publications.array = NULL;
conductor->network_publications.length = 0;
conductor->network_publications.capacity = 0;
conductor->network_publications.on_time_event = aeron_network_publication_entry_on_time_event;
conductor->network_publications.has_reached_end_of_life = aeron_network_publication_entry_has_reached_end_of_life;
conductor->network_publications.delete_func = aeron_network_publication_entry_delete;
conductor->network_publications.free_func = aeron_network_publication_entry_free;

conductor->send_channel_endpoints.array = NULL;
conductor->send_channel_endpoints.length = 0;
conductor->send_channel_endpoints.capacity = 0;
conductor->send_channel_endpoints.on_time_event = aeron_send_channel_endpoint_entry_on_time_event;
conductor->send_channel_endpoints.has_reached_end_of_life = aeron_send_channel_endpoint_entry_has_reached_end_of_life;
conductor->send_channel_endpoints.delete_func = aeron_send_channel_endpoint_entry_delete;
conductor->send_channel_endpoints.free_func = aeron_send_channel_endpoint_entry_free;

conductor->receive_channel_endpoints.array = NULL;
conductor->receive_channel_endpoints.length = 0;
conductor->receive_channel_endpoints.capacity = 0;
conductor->receive_channel_endpoints.on_time_event = aeron_receive_channel_endpoint_entry_on_time_event;
conductor->receive_channel_endpoints.has_reached_end_of_life = aeron_receive_channel_endpoint_entry_has_reached_end_of_life;
conductor->receive_channel_endpoints.delete_func = aeron_receive_channel_endpoint_entry_delete;
conductor->receive_channel_endpoints.free_func = aeron_receive_channel_endpoint_entry_free;

conductor->publication_images.array = NULL;
conductor->publication_images.length = 0;
conductor->publication_images.capacity = 0;
conductor->publication_images.on_time_event = aeron_publication_image_entry_on_time_event;
conductor->publication_images.has_reached_end_of_life = aeron_publication_image_entry_has_reached_end_of_life;
conductor->publication_images.delete_func = aeron_publication_image_entry_delete;
conductor->publication_images.free_func = aeron_publication_image_entry_free;

conductor->lingering_resources.array = NULL;
conductor->lingering_resources.length = 0;
conductor->lingering_resources.capacity = 0;
conductor->lingering_resources.on_time_event = aeron_linger_resource_entry_on_time_event;
conductor->lingering_resources.has_reached_end_of_life = aeron_linger_resource_entry_has_reached_end_of_life;
conductor->lingering_resources.delete_func = aeron_linger_resource_entry_delete;
conductor->lingering_resources.free_func = aeron_linger_resource_entry_free;

conductor->ipc_subscriptions.array = NULL;
conductor->ipc_subscriptions.length = 0;
Expand Down Expand Up @@ -588,6 +595,11 @@ void aeron_client_delete(aeron_driver_conductor_t *conductor, aeron_client_t *cl
client->heartbeat_timestamp.value_addr = NULL;
}

bool aeron_client_free(aeron_client_t *client)
{
return true;
}

void aeron_driver_conductor_on_available_image(
aeron_driver_conductor_t *conductor,
int64_t correlation_id,
Expand Down Expand Up @@ -659,6 +671,18 @@ void aeron_ipc_publication_entry_delete(
entry->publication = NULL;
}

bool aeron_ipc_publication_entry_free(aeron_ipc_publication_entry_t *entry)
{
aeron_ipc_publication_t *publication = entry->publication;
if (publication->raw_log_free_func(&publication->mapped_raw_log, publication->log_file_name))
{
aeron_free(publication->log_file_name);
publication->log_file_name = NULL;
return true;
}
return false;
}

void aeron_network_publication_entry_on_time_event(
aeron_driver_conductor_t *conductor, aeron_network_publication_entry_t *entry, int64_t now_ns, int64_t now_ms)
{
Expand Down Expand Up @@ -696,6 +720,18 @@ void aeron_network_publication_entry_delete(
}
}

bool aeron_network_publication_entry_free(aeron_network_publication_entry_t *entry)
{
aeron_network_publication_t *publication = entry->publication;
if (publication->raw_log_free_func(&publication->mapped_raw_log, publication->log_file_name))
{
aeron_free(publication->log_file_name);
publication->log_file_name = NULL;
return true;
}
return false;
}

void aeron_driver_conductor_cleanup_spies(aeron_driver_conductor_t *conductor, aeron_network_publication_t *publication)
{
for (size_t i = 0, size = conductor->spy_subscriptions.length; i < size; i++)
Expand Down Expand Up @@ -741,6 +777,11 @@ void aeron_send_channel_endpoint_entry_delete(
aeron_send_channel_endpoint_delete(&conductor->counters_manager, entry->endpoint);
}

bool aeron_send_channel_endpoint_entry_free(aeron_send_channel_endpoint_entry_t *entry)
{
return true;
}

void aeron_receive_channel_endpoint_entry_on_time_event(
aeron_driver_conductor_t *conductor, aeron_receive_channel_endpoint_entry_t *entry, int64_t now_ns, int64_t now_ms)
{
Expand Down Expand Up @@ -769,6 +810,11 @@ void aeron_receive_channel_endpoint_entry_delete(
aeron_receive_channel_endpoint_delete(&conductor->counters_manager, entry->endpoint);
}

bool aeron_receive_channel_endpoint_entry_free(aeron_receive_channel_endpoint_entry_t *entry)
{
return true;
}

void aeron_publication_image_entry_on_time_event(
aeron_driver_conductor_t *conductor, aeron_publication_image_entry_t *entry, int64_t now_ns, int64_t now_ms)
{
Expand All @@ -794,6 +840,18 @@ void aeron_publication_image_entry_delete(
entry->image = NULL;
}

bool aeron_publication_image_entry_free(aeron_publication_image_entry_t *entry)
{
aeron_publication_image_t *image = entry->image;
if (image->raw_log_free_func(&image->mapped_raw_log, image->log_file_name))
{
aeron_free(image->log_file_name);
image->log_file_name = NULL;
return true;
}
return false;
}

void aeron_linger_resource_entry_on_time_event(
aeron_driver_conductor_t *conductor, aeron_linger_resource_entry_t *entry, int64_t now_ns, int64_t now_ms)
{
Expand All @@ -814,6 +872,11 @@ void aeron_linger_resource_entry_delete(aeron_driver_conductor_t *conductor, aer
aeron_free(entry->buffer);
}

bool aeron_linger_resource_entry_free(aeron_linger_resource_entry_t *entry)
{
return true;
}

void aeron_driver_conductor_image_transition_to_linger(
aeron_driver_conductor_t *conductor, aeron_publication_image_t *image)
{
Expand Down Expand Up @@ -854,10 +917,18 @@ for (int last_index = (int)l.length - 1, i = last_index; i >= 0; i--) \
l.on_time_event(c, elem, now_ns, now_ms); \
if (l.has_reached_end_of_life(c, elem)) \
{ \
l.delete_func(c, elem); \
aeron_array_fast_unordered_remove((uint8_t *)l.array, sizeof(t), i, last_index); \
last_index--; \
l.length--; \
if (l.free_func(elem)) \
{ \
l.delete_func(c, elem); \
aeron_array_fast_unordered_remove((uint8_t *)l.array, sizeof(t), i, last_index); \
last_index--; \
l.length--; \
} \
else \
{ \
int64_t *counter = aeron_system_counter_addr(&c->system_counters, AERON_SYSTEM_COUNTER_FREE_FAILS); \
aeron_counter_ordered_increment(counter, 1); \
} \
} \
}

Expand Down
Loading

0 comments on commit 2042835

Please sign in to comment.