From 55d253bb57f87a12f76c3c299f24a473586c02ec Mon Sep 17 00:00:00 2001
From: Costa Tsaousis <costa@netdata.cloud>
Date: Sat, 14 Dec 2024 10:12:30 +0200
Subject: [PATCH] up to 5 replication workers work on the same host

---
 src/streaming/replication.c             | 78 +++++++++++++++++--------
 src/streaming/stream-sender-internals.h |  1 +
 src/streaming/stream-sender.c           |  1 +
 3 files changed, 57 insertions(+), 23 deletions(-)

diff --git a/src/streaming/replication.c b/src/streaming/replication.c
index afdec4f9de7064..432a09bde9b4fa 100644
--- a/src/streaming/replication.c
+++ b/src/streaming/replication.c
@@ -1031,8 +1031,7 @@ static struct replication_thread {
         time_t first_time_t;            // the minimum 'after' we encountered
 
         struct {
-            Word_t after;
-            Word_t unique_id;
+            Word_t session;
             Pvoid_t JudyL_array;
         } queue;
 
@@ -1067,8 +1066,7 @@ static struct replication_thread {
                 .first_time_t = 0,
 
                 .queue = {
-                        .after = 0,
-                        .unique_id = 0,
+                        .session = 0,
                         .JudyL_array = NULL,
                 },
         },
@@ -1127,10 +1125,9 @@ static inline bool replication_recursive_lock_mode(char mode) {
         fatal("REPLICATION: reached %s, but replication is not locked by this thread.", __FUNCTION__); \
 } while(0)
 
-void replication_set_next_point_in_time(time_t after, size_t unique_id) {
+static void replication_session_increment(void) {
     replication_recursive_lock();
-    replication_globals.unsafe.queue.after = after;
-    replication_globals.unsafe.queue.unique_id = unique_id;
+    replication_globals.unsafe.queue.session++;
     replication_recursive_unlock();
 }
 
@@ -1289,19 +1286,47 @@ static void replication_sort_entry_del(struct replication_request *rq, bool buff
     replication_recursive_unlock();
 }
 
-static struct replication_request replication_request_get_first_available() {
+static bool rq_sender_acquire(struct replication_request *rq, bool force) {
+    ssize_t expected = __atomic_load_n(&rq->sender->replication.atomic.threads, __ATOMIC_RELAXED);
+    ssize_t desired;
+
+    do {
+        if(!force && expected >= 5)
+            return false;
+
+        desired = expected + 1;
+
+    } while(!__atomic_compare_exchange_n(
+        &rq->sender->replication.atomic.threads, &expected, desired, false, __ATOMIC_RELAXED, __ATOMIC_RELAXED));
+
+    return true;
+}
+
+static void rq_sender_release(struct replication_request *rq) {
+    __atomic_sub_fetch(&rq->sender->replication.atomic.threads, 1, __ATOMIC_RELAXED);
+}
+
+static struct replication_request replication_request_get_first_available(void) {
+    static __thread Word_t after = 0, unique_id = 0, session = 0;
+
     Pvoid_t *inner_judy_pptr;
 
     replication_recursive_lock();
+    if(session != replication_globals.unsafe.queue.session) {
+        after = 0;
+        unique_id = 0;
+        session = replication_globals.unsafe.queue.session;
+    }
 
     struct replication_request rq_to_return = (struct replication_request){ .found = false };
 
-    if(unlikely(!replication_globals.unsafe.queue.after || !replication_globals.unsafe.queue.unique_id)) {
-        replication_globals.unsafe.queue.after = 0;
-        replication_globals.unsafe.queue.unique_id = 0;
+    if(unlikely(!after || !unique_id)) {
+        after = 0;
+        unique_id = 0;
     }
 
-    Word_t started_after = replication_globals.unsafe.queue.after;
+    Word_t started_after = after;
+    Word_t skipped = 0;
 
     size_t round = 0;
     while(!rq_to_return.found) {
@@ -1311,24 +1336,29 @@ static struct replication_request replication_request_get_first_available() {
             break;
 
         if(round == 2) {
-            if(started_after == 0)
+            if(started_after == 0 && !skipped)
                 break;
 
-            replication_globals.unsafe.queue.after = 0;
-            replication_globals.unsafe.queue.unique_id = 0;
+            after = 0;
+            unique_id = 0;
         }
 
         bool find_same_after = true;
-        while (!rq_to_return.found && (inner_judy_pptr = JudyLFirstThenNext(replication_globals.unsafe.queue.JudyL_array, &replication_globals.unsafe.queue.after, &find_same_after))) {
+        while (!rq_to_return.found && (inner_judy_pptr = JudyLFirstThenNext(replication_globals.unsafe.queue.JudyL_array, &after, &find_same_after))) {
             Pvoid_t *our_item_pptr;
 
-            if(unlikely(round == 2 && replication_globals.unsafe.queue.after > started_after))
+            if(unlikely(round == 2 && after > started_after))
                 break;
 
-            while (!rq_to_return.found && (our_item_pptr = JudyLNext(*inner_judy_pptr, &replication_globals.unsafe.queue.unique_id, PJE0))) {
+            while (!rq_to_return.found && (our_item_pptr = JudyLNext(*inner_judy_pptr, &unique_id, PJE0))) {
                 struct replication_sort_entry *rse = *our_item_pptr;
                 struct replication_request *rq = rse->rq;
 
+                if(!rq_sender_acquire(rq, round == 2)) {
+                    skipped++;
+                    continue;
+                }
+
                 // copy the request to return it
                 rq_to_return = *rq;
                 rq_to_return.chart_id = string_dup(rq_to_return.chart_id);
@@ -1342,7 +1372,7 @@ static struct replication_request replication_request_get_first_available() {
             }
 
             // prepare for the next iteration on the outer loop
-            replication_globals.unsafe.queue.unique_id = 0;
+            unique_id = 0;
         }
     }
 
@@ -1470,8 +1500,7 @@ static bool replication_execute_request(struct replication_request *rq, bool wor
     // send the replication data
     rq->q->rq = rq;
     replication_response_execute_and_finalize(
-            rq->q,
-        (size_t)((unsigned long long)stream_circular_buffer_get_max_size(rq->sender->scb) * MAX_REPLICATION_MESSAGE_PERCENT_SENDER_BUFFER / 100ULL), workers);
+        rq->q, (size_t)((unsigned long long)stream_circular_buffer_get_max_size(rq->sender->scb) * MAX_REPLICATION_MESSAGE_PERCENT_SENDER_BUFFER / 100ULL), workers);
 
     rq->q = NULL;
 
@@ -1776,6 +1805,7 @@ static int replication_pipeline_execute_next(void) {
             if (rq->sender_circular_buffer_since_ut != stream_circular_buffer_get_since_ut(rq->sender->scb)) {
                 // the sender has reconnected since this request was queued,
                 // we can safely throw it away, since the parent will resend it
+                rq_sender_release(rq);
                 replication_response_cancel_and_finalize(rq->q);
                 rq->executed = true;
                 rq->found = false;
@@ -1786,6 +1816,7 @@ static int replication_pipeline_execute_next(void) {
                 // it has already been marked as 'preprocessed' in the dictionary,
                 // and the sender will put it back in when there is
                 // enough room in the buffer for processing replication requests
+                rq_sender_release(rq);
                 replication_response_cancel_and_finalize(rq->q);
                 rq->executed = true;
                 rq->found = false;
@@ -1811,6 +1842,7 @@ static int replication_pipeline_execute_next(void) {
     replication_set_latest_first_time(rq->after);
 
     bool chart_found = replication_execute_request(rq, true);
+    rq_sender_release(rq);
     rq->executed = true;
     rq->found = false;
     rq->q = NULL;
@@ -1956,7 +1988,7 @@ void *replication_thread_main(void *ptr) {
 
             if(replication_reset_next_point_in_time_countdown-- == 0) {
                 // once per second, make it scan all the pending requests next time
-                replication_set_next_point_in_time(0, 0);
+                replication_session_increment();
 //                replication_globals.protected.skipped_no_room_since_last_reset = 0;
                 replication_reset_next_point_in_time_countdown = SECONDS_TO_RESET_POINT_IN_TIME;
             }
@@ -2039,7 +2071,7 @@ void *replication_thread_main(void *ptr) {
             sleep_usec(timeout);
 
             // make it scan all the pending requests next time
-            replication_set_next_point_in_time(0, 0);
+            replication_session_increment();
             replication_reset_next_point_in_time_countdown = SECONDS_TO_RESET_POINT_IN_TIME;
 
             continue;
diff --git a/src/streaming/stream-sender-internals.h b/src/streaming/stream-sender-internals.h
index ec3e3f4eedce33..ecc0c22ddb0417 100644
--- a/src/streaming/stream-sender-internals.h
+++ b/src/streaming/stream-sender-internals.h
@@ -90,6 +90,7 @@ struct sender_state {
         time_t latest_completed_before_t;       // the timestamp of the latest replication request
 
         struct {
+            ssize_t threads;
             size_t pending_requests;            // the currently outstanding replication requests
             size_t charts_replicating;          // the number of unique charts having pending replication requests (on every request one is added and is removed when we finish it - it does not track completion of the replication for this chart)
             bool reached_max;                   // true when the sender buffer should not get more replication responses
diff --git a/src/streaming/stream-sender.c b/src/streaming/stream-sender.c
index 08f3b52306e743..f45b5d5a183886 100644
--- a/src/streaming/stream-sender.c
+++ b/src/streaming/stream-sender.c
@@ -66,6 +66,7 @@ static void stream_sender_charts_and_replication_reset(struct sender_state *s) {
 
     rrdhost_sender_replicating_charts_zero(s->host);
     stream_sender_replicating_charts_zero(s);
+    __atomic_store_n(&s->replication.atomic.threads, 0, __ATOMIC_RELAXED);
 }
 
 // --------------------------------------------------------------------------------------------------------------------