Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[C] Add thread affinity support for the C media driver. #1298

Merged
merged 4 commits into from
Mar 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions aeron-client/src/main/c/concurrent/aeron_thread.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,12 @@
#define _GNU_SOURCE
#endif

#include "errno.h"
#include "inttypes.h"
#include "aeron_alloc.h"
#include "concurrent/aeron_thread.h"
#include "util/aeron_error.h"

#if !defined(_WIN32)
#include <unistd.h>
#else
Expand Down Expand Up @@ -86,6 +90,25 @@ void aeron_micro_sleep(unsigned int microseconds)
#endif
}

int aeron_thread_set_affinity(const char *role_name, uint8_t cpu_affinity_no)
{
#if defined(__linux__)
cpu_set_t mask;
const size_t size = sizeof(mask);
CPU_ZERO(&mask);
CPU_SET(cpu_affinity_no, &mask);
if (sched_setaffinity(0, size, &mask) < 0)
{
AERON_SET_ERR(errno, "failed to set thread affinity role_name=%s, cpu_affinity_no=%" PRIu8, role_name, cpu_affinity_no);
return -1;
}
return 0;
#else
AERON_SET_ERR(EINVAL, "%s", "thread affinity not supported");
return -1;
#endif
}

#if defined(AERON_COMPILER_GCC)

void aeron_thread_set_name(const char *role_name)
Expand All @@ -97,6 +120,7 @@ void aeron_thread_set_name(const char *role_name)
#endif
}


#elif defined(AERON_COMPILER_MSVC)

static BOOL WINAPI aeron_thread_once_callback(PINIT_ONCE init_once, void (*callback)(void), void **context)
Expand Down
1 change: 1 addition & 0 deletions aeron-client/src/main/c/concurrent/aeron_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ void aeron_thread_set_name(const char *role_name);

void aeron_nano_sleep(uint64_t nanoseconds);
void aeron_micro_sleep(unsigned int microseconds);
int aeron_thread_set_affinity(const char *role_name, uint8_t cpu_affinity_no);

#if defined(AERON_COMPILER_GCC)

Expand Down
3 changes: 3 additions & 0 deletions aeron-driver/src/main/c/aeron_driver.c
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,9 @@ void aeron_driver_context_print_configuration(aeron_driver_context_t *context)
fprintf(
fpout, "\n network_publication_max_messages_per_send=%" PRIu64,
(uint64_t)context->network_publication_max_messages_per_send);
fprintf(fpout, "\n conductor_cpu_affinity_no=%" PRId32, context->conductor_cpu_affinity_no);
fprintf(fpout, "\n receiver_cpu_affinity_no=%" PRId32, context->receiver_cpu_affinity_no);
fprintf(fpout, "\n sender_cpu_affinity_no=%" PRId32, context->sender_cpu_affinity_no);

fprintf(fpout, "\n epoch_clock=%s",
aeron_dlinfo_func((aeron_fptr_t)context->epoch_clock, buffer, sizeof(buffer)));
Expand Down
52 changes: 52 additions & 0 deletions aeron-driver/src/main/c/aeron_driver_context.c
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,7 @@ static void aeron_driver_conductor_on_endpoint_change_null(const void *channel)
#define AERON_RECEIVER_IO_VECTOR_CAPACITY_DEFAULT UINT32_C(2)
#define AERON_SENDER_IO_VECTOR_CAPACITY_DEFAULT UINT32_C(2)
#define AERON_SENDER_MAX_MESSAGES_PER_SEND_DEFAULT UINT32_C(2)
#define AERON_CPU_AFFINITY_DEFAULT (-1)

int aeron_driver_context_init(aeron_driver_context_t **context)
{
Expand Down Expand Up @@ -672,6 +673,25 @@ int aeron_driver_context_init(aeron_driver_context_t **context)
0,
255);

_context->conductor_cpu_affinity_no = aeron_config_parse_int32(
AERON_CONDUCTOR_CPU_AFFINITY_ENV_VAR,
getenv(AERON_CONDUCTOR_CPU_AFFINITY_ENV_VAR),
_context->conductor_cpu_affinity_no,
-1,
255);
_context->receiver_cpu_affinity_no = aeron_config_parse_int32(
AERON_RECEIVER_CPU_AFFINITY_ENV_VAR,
getenv(AERON_RECEIVER_CPU_AFFINITY_ENV_VAR),
_context->receiver_cpu_affinity_no,
-1,
255);
_context->sender_cpu_affinity_no = aeron_config_parse_int32(
AERON_SENDER_CPU_AFFINITY_ENV_VAR,
getenv(AERON_SENDER_CPU_AFFINITY_ENV_VAR),
_context->sender_cpu_affinity_no,
-1,
255);

_context->send_to_sm_poll_ratio = (uint8_t)aeron_config_parse_uint64(
AERON_SEND_TO_STATUS_POLL_RATIO_ENV_VAR,
getenv(AERON_SEND_TO_STATUS_POLL_RATIO_ENV_VAR),
Expand Down Expand Up @@ -2660,3 +2680,35 @@ uint32_t aeron_driver_context_get_network_publication_max_messages_per_send(aero
context->network_publication_max_messages_per_send : AERON_SENDER_MAX_MESSAGES_PER_SEND_DEFAULT;
}

void aeron_set_thread_affinity_on_start(void *state, const char *role_name)
{
aeron_driver_context_t *context = (aeron_driver_context_t *)state;
int result = 0;
if (0 == strcmp("conductor", role_name) && 0 < context->conductor_cpu_affinity_no)
{
result = aeron_thread_set_affinity(role_name, (uint8_t)context->conductor_cpu_affinity_no);
}
else if (0 == strcmp("sender", role_name) && 0 < context->sender_cpu_affinity_no)
{
result = aeron_thread_set_affinity(role_name, (uint8_t)context->sender_cpu_affinity_no);
}
else if (0 == strcmp("receiver", role_name) && 0 < context->receiver_cpu_affinity_no)
{
result = aeron_thread_set_affinity(role_name, (uint8_t)context->receiver_cpu_affinity_no);
}

if (result < 0)
{
AERON_APPEND_ERR("%s", "WARNING: unable to apply affinity");
// Just in case the error log is not initialised, but it should be by this point.
if (NULL != context->error_log)
{
aeron_distinct_error_log_record(context->error_log, aeron_errcode(), aeron_errmsg());
}
else
{
fprintf(stderr, "%s", aeron_errmsg());
}
aeron_err_clear();
}
}
5 changes: 5 additions & 0 deletions aeron-driver/src/main/c/aeron_driver_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,11 @@ typedef struct aeron_driver_context_stct
uint32_t receiver_io_vector_capacity; /* aeron.receiver.io.vector.capacity = 2 */
uint32_t sender_io_vector_capacity; /* aeron.sender.io.vector.capacity = 2 */
uint32_t network_publication_max_messages_per_send; /* aeron.network.publication.max.messages.per.send = 2 */

int32_t conductor_cpu_affinity_no; /* aeron.conductor.cpu.affinity = -1 */
int32_t receiver_cpu_affinity_no; /* aeron.receiver.cpu.affinity = -1 */
int32_t sender_cpu_affinity_no; /* aeron.conductor.cpu.affinity = -1 */

struct /* aeron.receiver.receiver.tag = <unset> */
{
bool is_present;
Expand Down
7 changes: 7 additions & 0 deletions aeron-driver/src/main/c/aeronmd.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include "aeron_driver_context.h"
#include "util/aeron_properties_util.h"
#include "util/aeron_strutil.h"
#include "util/aeron_error.h"

volatile bool running = true;

Expand Down Expand Up @@ -117,6 +118,12 @@ int main(int argc, char **argv)
goto cleanup;
}

if (aeron_driver_context_set_agent_on_start_function(context, aeron_set_thread_affinity_on_start, context))
{
fprintf(stderr, "ERROR: unable to set on_start function(%d) %s\n", aeron_errcode(), aeron_errmsg());
goto cleanup;
}

if (aeron_driver_init(&driver, context) < 0)
{
fprintf(stderr, "ERROR: driver init (%d) %s\n", aeron_errcode(), aeron_errmsg());
Expand Down
15 changes: 15 additions & 0 deletions aeron-driver/src/main/c/aeronmd.h
Original file line number Diff line number Diff line change
Expand Up @@ -786,6 +786,10 @@ uint32_t aeron_driver_context_get_sender_io_vector_capacity(aeron_driver_context
int aeron_driver_context_set_network_publication_max_messages_per_send(aeron_driver_context_t *context, uint32_t value);
uint32_t aeron_driver_context_get_network_publication_max_messages_per_send(aeron_driver_context_t *context);

#define AERON_CONDUCTOR_CPU_AFFINITY_ENV_VAR "AERON_CONDUCTOR_CPU_AFFINITY"
#define AERON_RECEIVER_CPU_AFFINITY_ENV_VAR "AERON_RECEIVER_CPU_AFFINITY"
#define AERON_SENDER_CPU_AFFINITY_ENV_VAR "AERON_SENDER_CPU_AFFINITY"

/**
* Set the list of filenames to dynamic libraries to load upon context init.
*/
Expand Down Expand Up @@ -985,6 +989,17 @@ const char *aeron_errmsg();
*/
int aeron_default_path(char *path, size_t path_length);

/**
* Affinity setting function that complies with the aeron_agent_on_start_func_t structure that can
* be used as an agent start function. The state should be the aeron_driver_context_t* and the function
* will match the values "conductor", "sender", "receiver" and use the respective configuration options from
* the aeron_driver_context_t.
*
* @param state client information passed to function, should be the aeron_driver_context_t*.
* @param role_name name of the role specified on the agent.
*/
void aeron_set_thread_affinity_on_start(void *state, const char *role_name);

#ifdef __cplusplus
}
#endif
Expand Down
17 changes: 17 additions & 0 deletions aeron-samples/scripts/show_thread_affinity.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#!/usr/bin/env bash

set -euo pipefail

pname=${1:-aeronmd} # default to 'aeronmd'

for pid in $(pgrep "${pname}"); do
echo "PID: ${pid} (${pname})"
ps -T -p ${pid} -o spid,comm | while read line; do
if [[ ${line} != *"SPID"* ]]; then
IFS=" " read -a fields<<<"${line}"
aff=$(taskset -cp "${fields[0]}" | tr -d '\n')
printf "%s (%s)\n" "${aff}" "${fields[1]}"
fi
done
done