Skip to content

Commit

Permalink
up to 5 replication workers work on the same host
Browse files Browse the repository at this point in the history
  • Loading branch information
ktsaou committed Dec 14, 2024
1 parent 696a290 commit 55d253b
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 23 deletions.
78 changes: 55 additions & 23 deletions src/streaming/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -1031,8 +1031,7 @@ static struct replication_thread {
time_t first_time_t; // the minimum 'after' we encountered

struct {
Word_t after;
Word_t unique_id;
Word_t session;
Pvoid_t JudyL_array;
} queue;

Expand Down Expand Up @@ -1067,8 +1066,7 @@ static struct replication_thread {
.first_time_t = 0,

.queue = {
.after = 0,
.unique_id = 0,
.session = 0,
.JudyL_array = NULL,
},
},
Expand Down Expand Up @@ -1127,10 +1125,9 @@ static inline bool replication_recursive_lock_mode(char mode) {
fatal("REPLICATION: reached %s, but replication is not locked by this thread.", __FUNCTION__); \
} while(0)

void replication_set_next_point_in_time(time_t after, size_t unique_id) {
static void replication_session_increment(void) {
replication_recursive_lock();
replication_globals.unsafe.queue.after = after;
replication_globals.unsafe.queue.unique_id = unique_id;
replication_globals.unsafe.queue.session++;
replication_recursive_unlock();
}

Expand Down Expand Up @@ -1289,19 +1286,47 @@ static void replication_sort_entry_del(struct replication_request *rq, bool buff
replication_recursive_unlock();
}

static struct replication_request replication_request_get_first_available() {
static bool rq_sender_acquire(struct replication_request *rq, bool force) {
ssize_t expected = __atomic_load_n(&rq->sender->replication.atomic.threads, __ATOMIC_RELAXED);
ssize_t desired;

do {
if(!force && expected >= 5)
return false;

desired = expected + 1;

} while(!__atomic_compare_exchange_n(
&rq->sender->replication.atomic.threads, &expected, desired, false, __ATOMIC_RELAXED, __ATOMIC_RELAXED));

return true;
}

static void rq_sender_release(struct replication_request *rq) {
__atomic_sub_fetch(&rq->sender->replication.atomic.threads, 1, __ATOMIC_RELAXED);
}

static struct replication_request replication_request_get_first_available(void) {
static __thread Word_t after = 0, unique_id = 0, session = 0;

Pvoid_t *inner_judy_pptr;

replication_recursive_lock();
if(session != replication_globals.unsafe.queue.session) {
after = 0;
unique_id = 0;
session = replication_globals.unsafe.queue.session;
}

struct replication_request rq_to_return = (struct replication_request){ .found = false };

if(unlikely(!replication_globals.unsafe.queue.after || !replication_globals.unsafe.queue.unique_id)) {
replication_globals.unsafe.queue.after = 0;
replication_globals.unsafe.queue.unique_id = 0;
if(unlikely(!after || !unique_id)) {
after = 0;
unique_id = 0;
}

Word_t started_after = replication_globals.unsafe.queue.after;
Word_t started_after = after;
Word_t skipped = 0;

size_t round = 0;
while(!rq_to_return.found) {
Expand All @@ -1311,24 +1336,29 @@ static struct replication_request replication_request_get_first_available() {
break;

if(round == 2) {
if(started_after == 0)
if(started_after == 0 && !skipped)
break;

replication_globals.unsafe.queue.after = 0;
replication_globals.unsafe.queue.unique_id = 0;
after = 0;
unique_id = 0;
}

bool find_same_after = true;
while (!rq_to_return.found && (inner_judy_pptr = JudyLFirstThenNext(replication_globals.unsafe.queue.JudyL_array, &replication_globals.unsafe.queue.after, &find_same_after))) {
while (!rq_to_return.found && (inner_judy_pptr = JudyLFirstThenNext(replication_globals.unsafe.queue.JudyL_array, &after, &find_same_after))) {
Pvoid_t *our_item_pptr;

if(unlikely(round == 2 && replication_globals.unsafe.queue.after > started_after))
if(unlikely(round == 2 && after > started_after))
break;

while (!rq_to_return.found && (our_item_pptr = JudyLNext(*inner_judy_pptr, &replication_globals.unsafe.queue.unique_id, PJE0))) {
while (!rq_to_return.found && (our_item_pptr = JudyLNext(*inner_judy_pptr, &unique_id, PJE0))) {
struct replication_sort_entry *rse = *our_item_pptr;
struct replication_request *rq = rse->rq;

if(!rq_sender_acquire(rq, round == 2)) {
skipped++;
continue;
}

// copy the request to return it
rq_to_return = *rq;
rq_to_return.chart_id = string_dup(rq_to_return.chart_id);
Expand All @@ -1342,7 +1372,7 @@ static struct replication_request replication_request_get_first_available() {
}

// prepare for the next iteration on the outer loop
replication_globals.unsafe.queue.unique_id = 0;
unique_id = 0;
}
}

Expand Down Expand Up @@ -1470,8 +1500,7 @@ static bool replication_execute_request(struct replication_request *rq, bool wor
// send the replication data
rq->q->rq = rq;
replication_response_execute_and_finalize(
rq->q,
(size_t)((unsigned long long)stream_circular_buffer_get_max_size(rq->sender->scb) * MAX_REPLICATION_MESSAGE_PERCENT_SENDER_BUFFER / 100ULL), workers);
rq->q, (size_t)((unsigned long long)stream_circular_buffer_get_max_size(rq->sender->scb) * MAX_REPLICATION_MESSAGE_PERCENT_SENDER_BUFFER / 100ULL), workers);

rq->q = NULL;

Expand Down Expand Up @@ -1776,6 +1805,7 @@ static int replication_pipeline_execute_next(void) {
if (rq->sender_circular_buffer_since_ut != stream_circular_buffer_get_since_ut(rq->sender->scb)) {
// the sender has reconnected since this request was queued,
// we can safely throw it away, since the parent will resend it
rq_sender_release(rq);
replication_response_cancel_and_finalize(rq->q);
rq->executed = true;
rq->found = false;
Expand All @@ -1786,6 +1816,7 @@ static int replication_pipeline_execute_next(void) {
// it has already been marked as 'preprocessed' in the dictionary,
// and the sender will put it back in when there is
// enough room in the buffer for processing replication requests
rq_sender_release(rq);
replication_response_cancel_and_finalize(rq->q);
rq->executed = true;
rq->found = false;
Expand All @@ -1811,6 +1842,7 @@ static int replication_pipeline_execute_next(void) {
replication_set_latest_first_time(rq->after);

bool chart_found = replication_execute_request(rq, true);
rq_sender_release(rq);
rq->executed = true;
rq->found = false;
rq->q = NULL;
Expand Down Expand Up @@ -1956,7 +1988,7 @@ void *replication_thread_main(void *ptr) {

if(replication_reset_next_point_in_time_countdown-- == 0) {
// once per second, make it scan all the pending requests next time
replication_set_next_point_in_time(0, 0);
replication_session_increment();
// replication_globals.protected.skipped_no_room_since_last_reset = 0;
replication_reset_next_point_in_time_countdown = SECONDS_TO_RESET_POINT_IN_TIME;
}
Expand Down Expand Up @@ -2039,7 +2071,7 @@ void *replication_thread_main(void *ptr) {
sleep_usec(timeout);

// make it scan all the pending requests next time
replication_set_next_point_in_time(0, 0);
replication_session_increment();
replication_reset_next_point_in_time_countdown = SECONDS_TO_RESET_POINT_IN_TIME;

continue;
Expand Down
1 change: 1 addition & 0 deletions src/streaming/stream-sender-internals.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ struct sender_state {
time_t latest_completed_before_t; // the timestamp of the latest replication request

struct {
ssize_t threads;
size_t pending_requests; // the currently outstanding replication requests
size_t charts_replicating; // the number of unique charts having pending replication requests (on every request one is added and is removed when we finish it - it does not track completion of the replication for this chart)
bool reached_max; // true when the sender buffer should not get more replication responses
Expand Down
1 change: 1 addition & 0 deletions src/streaming/stream-sender.c
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ static void stream_sender_charts_and_replication_reset(struct sender_state *s) {

rrdhost_sender_replicating_charts_zero(s->host);
stream_sender_replicating_charts_zero(s);
__atomic_store_n(&s->replication.atomic.threads, 0, __ATOMIC_RELAXED);
}

// --------------------------------------------------------------------------------------------------------------------
Expand Down

0 comments on commit 55d253b

Please sign in to comment.