Skip to content

Commit

Permalink
wip, tests and planner
Browse files Browse the repository at this point in the history
Signed-off-by: Michal Maslanka <michal@redpanda.com>
  • Loading branch information
mmaslankaprv committed Oct 23, 2023
1 parent 556f9fc commit a2a552f
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 3 deletions.
2 changes: 1 addition & 1 deletion src/v/cluster/partition_balancer_backend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,7 @@ ss::future<> partition_balancer_backend::do_tick() {
auto f = _topics_frontend.move_partition_replicas(
reassignment.ntp,
reassignment.allocated.replicas(),
reconfiguration_policy::full_local_retention,
reconfiguration_policy::target_initial_retention,
model::timeout_clock::now() + add_move_cmd_timeout,
_cur_term->id);
return f.then([reassignment = std::move(reassignment)](auto errc) {
Expand Down
2 changes: 2 additions & 0 deletions src/v/cluster/partition_balancer_planner.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include "cluster/partition_balancer_types.h"
#include "cluster/scheduling/partition_allocator.h"
#include "cluster/topic_table.h"
#include "cluster/types.h"
#include "model/metadata.h"

#include <absl/container/flat_hash_map.h>
Expand All @@ -25,6 +26,7 @@ namespace cluster {
struct ntp_reassignment {
model::ntp ntp;
allocated_partition allocated;
reconfiguration_policy reconfiguration_policy;
};

struct planner_config {
Expand Down
78 changes: 76 additions & 2 deletions tests/rptest/tests/scaling_up_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
from rptest.clients.types import TopicSpec
from rptest.clients.default import DefaultClient
from rptest.services.kgo_verifier_services import KgoVerifierConsumerGroupConsumer, KgoVerifierProducer
from rptest.services.redpanda import make_redpanda_service
from rptest.services.redpanda import SISettings
import concurrent

from rptest.tests.prealloc_nodes import PreallocNodesTest

Expand Down Expand Up @@ -381,4 +382,77 @@ def initial_rebalance_finished():
self.redpanda.start_node(unavailable_node)
self.wait_for_partitions_rebalanced(total_replicas=total_replicas,
timeout_sec=self.rebalance_timeout)
self.verify()
self.verify()

def _kafka_usage(self, nodes):
usage_per_node = {}
for n in nodes:
id = self.redpanda.node_id(n)
disk_usage = self.redpanda.data_dir_usage("kafka", n)
usage_per_node[id] = disk_usage

return usage_per_node

@cluster(num_nodes=7)
def test_fast_node_addition(self):

log_segment_size = 5 * 1024 * 1024
total_segments_per_partition = 5
partition_cnt = 64
msg_size = 16 * 1024 # 16 KiB
data_size = log_segment_size * total_segments_per_partition * partition_cnt
msg_cnt = data_size // msg_size

# configure and start redpanda
extra_rp_conf = {
'cloud_storage_segment_max_upload_interval_sec': 10,
'cloud_storage_manifest_max_upload_interval_sec': 10,
'initial_retention_local_target_bytes_default': log_segment_size,
}
# shadow indexing is required when we want to leverage fast partition movements
si_settings = SISettings(test_context=self.test_context,
log_segment_size=log_segment_size,
retention_local_strict=False)

self.redpanda.set_extra_rp_conf(extra_rp_conf)
self.redpanda.set_si_settings(si_settings)
self.redpanda.start(nodes=self.redpanda.nodes[0:4])
topic = TopicSpec(replication_factor=3,
partition_count=partition_cnt,
redpanda_remote_write=True,
redpanda_remote_read=True)

total_replicas = 3 * partition_cnt

self.client().create_topic(topic)
self.logger.info(
f"Producing {data_size} bytes of data in {msg_cnt} total messages")
self.producer = KgoVerifierProducer(
self.test_context,
self.redpanda,
topic.name,
msg_size=msg_size,
msg_count=msg_cnt,
custom_node=self.preallocated_nodes)
self.producer.start()
self.producer.wait()

# add fourth node
self.redpanda.start_node(self.redpanda.nodes[4])
self.wait_for_partitions_rebalanced(total_replicas=total_replicas,
timeout_sec=self.rebalance_timeout)

def print_disk_usage(usage):
for n, b in usage.items():
self.logger.info(
f"node: {n} total partitions size: {b/(1024*1024)} Mb")

usage = self._kafka_usage(nodes=self.redpanda.nodes[0:5])
print_disk_usage(usage)
# add fifth node
self.redpanda.start_node(self.redpanda.nodes[5])
self.wait_for_partitions_rebalanced(total_replicas=total_replicas,
timeout_sec=self.rebalance_timeout)

usage = self._kafka_usage(self.redpanda.nodes)
print_disk_usage(usage)

0 comments on commit a2a552f

Please sign in to comment.