diff --git a/tests/rptest/tests/delete_records_test.py b/tests/rptest/tests/delete_records_test.py index 2cfbbd401ab13..dd3fa802c9be7 100644 --- a/tests/rptest/tests/delete_records_test.py +++ b/tests/rptest/tests/delete_records_test.py @@ -25,11 +25,12 @@ from rptest.clients.rpk import RpkTool, RpkException from ducktape.utils.util import wait_until from rptest.clients.types import TopicSpec -from rptest.util import produce_until_segments -from rptest.util import expect_exception +from rptest.util import produce_until_segments, wait_until_result, expect_exception from rptest.services.redpanda import SISettings from rptest.utils.si_utils import BucketView, NTP +TEST_TOPIC_NAME = "test-topic-1" + class DeleteRecordsTest(RedpandaTest, PartitionMovementMixin): """ @@ -40,11 +41,11 @@ class DeleteRecordsTest(RedpandaTest, PartitionMovementMixin): log_segment_size = 1048576 topics = [ - TopicSpec(name="test-topic-1", + TopicSpec(name=TEST_TOPIC_NAME, partition_count=1, replication_factor=3, retention_bytes=-1, - cleanup_policy=TopicSpec.CLEANUP_DELETE) + cleanup_policy=TopicSpec.CLEANUP_DELETE), ] def __init__(self, test_context): @@ -88,14 +89,14 @@ def _start(self, if start_with_data is True: produce_until_segments( self.redpanda, - topic=self.topic, + topic=TEST_TOPIC_NAME, partition_idx=0, count=10, acks=-1, ) local_snapshot = self.redpanda.storage().segments_by_node( - "kafka", self.topic, 0) + "kafka", TEST_TOPIC_NAME, 0) assert len(local_snapshot) > 0, "empty snapshot" self.redpanda.logger.info(f"Snapshot: {local_snapshot}") @@ -113,18 +114,22 @@ def _apply_local_retention_settings(self): f"Turning on aggressive local retention, log_segment_size: {self.log_segment_size} retention.local.target.bytes: {self.local_retention}" ) self.rpk.alter_topic_config( - self.topic, TopicSpec.PROPERTY_RETENTION_LOCAL_TARGET_BYTES, + TEST_TOPIC_NAME, TopicSpec.PROPERTY_RETENTION_LOCAL_TARGET_BYTES, self.local_retention) - def get_topic_info(self): - topics_info = list(self.rpk.describe_topic(self.topic)) + def get_topic_info(self, topic_name): + topics_info = list(self.rpk.describe_topic(topic_name)) self.logger.info(topics_info) assert len(topics_info) == 1 return topics_info[0] - def wait_until_records(self, offset, timeout_sec=30, backoff_sec=1): + def wait_until_records(self, + topic_name, + offset, + timeout_sec=30, + backoff_sec=1): def expect_high_watermark(): - topic_info = self.get_topic_info() + topic_info = self.get_topic_info(topic_name) return topic_info.high_watermark == offset wait_until(expect_high_watermark, @@ -147,57 +152,84 @@ def delete_records(self, topic, partition, truncate_offset): assert response[0].error_msg == "", f"Err msg: {response[0].error}" return response[0].new_start_offset - def assert_start_partition_boundaries(self, truncate_offset): - def check_bound_start(offset): + def retry_list_offset_request(self, fn, value_on_read): + def check_bound(): try: - r = self.rpk.consume(self.topic, + if fn(): + return value_on_read + except Exception as e: + # Transient failure, desired to retry + if 'unknown broker' in str(e): + raise e + return not value_on_read + + return wait_until_result( + check_bound, + timeout_sec=10, + backoff_sec=1, + err_msg="Failed to make list_offsets request, unknown broker", + retry_on_exc=True) + + def assert_start_partition_boundaries(self, topic_name, truncate_offset): + def check_bound_start(offset, value_on_read=True): + def attempt_consume_one(): + r = self.rpk.consume(topic_name, n=1, offset=f'{offset}-{offset+1}', quiet=True, timeout=10) return r.count('_') == 1 - except Exception as _: - return False + + return self.retry_list_offset_request(attempt_consume_one, + value_on_read) + + def check_bound_start_fails(offset): + return check_bound_start(offset, value_on_read=False) assert check_bound_start( truncate_offset ), f"new log start: {truncate_offset} not consumable" - assert not check_bound_start( + assert check_bound_start_fails( truncate_offset - 1), f"before log start: {truncate_offset - 1} is consumable" - def assert_new_partition_boundaries(self, truncate_offset, high_watermark): + def assert_new_partition_boundaries(self, topic_name, truncate_offset, + high_watermark): """ Returns true if the partition contains records at the expected boundaries, ensuring the truncation worked at the exact requested point and that the number of remaining records is as expected. """ - def check_bound_end(offset): - try: + def check_bound_end(offset, value_on_read=True): + def attempt_consume_last(): # Not timing out means data was available to read - _ = self.rpk.consume(self.topic, + _ = self.rpk.consume(topic_name, n=1, offset=offset, timeout=10) - except Exception as _: - return False - return True + return True + + return self.retry_list_offset_request(attempt_consume_last, + value_on_read) + + def check_bound_end_fails(offset): + return check_bound_end(offset, value_on_read=False) assert truncate_offset <= high_watermark, f"Test malformed" if truncate_offset == high_watermark: # Assert no data at all can be read - assert not check_bound_end(truncate_offset) + assert check_bound_end_fails(truncate_offset) return # truncate_offset is inclusive start of log # high_watermark is exclusive end of log # Readable offsets: [truncate_offset, high_watermark) - self.assert_start_partition_boundaries(truncate_offset) + self.assert_start_partition_boundaries(topic_name, truncate_offset) assert check_bound_end( high_watermark - 1), f"log end: {high_watermark - 1} not consumable" - assert not check_bound_end( + assert check_bound_end_fails( high_watermark), f"high watermark: {high_watermark} is consumable" @cluster(num_nodes=3) @@ -210,6 +242,7 @@ def test_delete_records_topic_start_delta(self, cloud_storage_enabled): Perform this by creating only 1 segment and moving forward the start offset within that segment, performing verifications at each step """ + topic = TEST_TOPIC_NAME num_records = 10240 records_size = 512 truncate_offset_start = 100 @@ -218,24 +251,27 @@ def test_delete_records_topic_start_delta(self, cloud_storage_enabled): # Produce some data, wait for it all to arrive kafka_tools = KafkaCliTools(self.redpanda) - kafka_tools.produce(self.topic, num_records, records_size) - self.wait_until_records(num_records, timeout_sec=10, backoff_sec=1) + kafka_tools.produce(topic, num_records, records_size) + self.wait_until_records(topic, + num_records, + timeout_sec=10, + backoff_sec=1) # Call delete-records in a loop incrementing new point each time for truncate_offset in range(truncate_offset_start, truncate_offset_start + 5): # Perform truncation - low_watermark = self.delete_records(self.topic, 0, truncate_offset) + low_watermark = self.delete_records(topic, 0, truncate_offset) assert low_watermark == truncate_offset, f"Expected low watermark: {truncate_offset} observed: {low_watermark}" # Assert correctness of start and end offsets in topic metadata - topic_info = self.get_topic_info() + topic_info = self.get_topic_info(topic) assert topic_info.id == 0, f"Partition id: {topic_info.id}" assert topic_info.start_offset == truncate_offset, f"Start offset: {topic_info.start_offset}" assert topic_info.high_watermark == num_records, f"High watermark: {topic_info.high_watermark}" # ... and in actual fetch requests - self.assert_new_partition_boundaries(truncate_offset, + self.assert_new_partition_boundaries(topic, truncate_offset, topic_info.high_watermark) # Disable checks for storage usage inconsistencies as orphaned log segments left @@ -291,7 +327,8 @@ def obtain_test_parameters(local_snapshot): node = local_snapshot[list(local_snapshot.keys())[-1]] segment_boundaries = get_segment_boundaries_via_local_storage(node) truncate_offset = None - high_watermark = int(self.get_topic_info().high_watermark) + high_watermark = int( + self.get_topic_info(TEST_TOPIC_NAME).high_watermark) if truncate_point == "one_below_high_watermark": truncate_offset = high_watermark - 1 # Leave 1 record elif truncate_point == "at_high_watermark": @@ -307,7 +344,7 @@ def obtain_test_parameters(local_snapshot): # Cannot compare kafka offsets to redpanda offsets returned by storage utilities, # so conversion must occur to compare response = self.admin.get_local_offsets_translated( - [truncate_offset], self.topic, 0, translate_to="redpanda") + [truncate_offset], TEST_TOPIC_NAME, 0, translate_to="redpanda") assert len(response) == 1 and 'rp_offset' in response[0], response rp_truncate_offset = response[0]['rp_offset'] @@ -325,10 +362,11 @@ def obtain_test_parameters(local_snapshot): # Make delete-records call, assert response looks ok try: - low_watermark = self.delete_records(self.topic, 0, truncate_offset) + low_watermark = self.delete_records(TEST_TOPIC_NAME, 0, + truncate_offset) assert low_watermark == truncate_offset, f"Expected low watermark: {truncate_offset} observed: {low_watermark}" except Exception as e: - topic_info = self.get_topic_info() + topic_info = self.get_topic_info(TEST_TOPIC_NAME) self.redpanda.logger.info( f"Start offset: {topic_info.start_offset}") raise e @@ -339,7 +377,8 @@ def obtain_test_parameters(local_snapshot): self.redpanda.start_node(stopped_node) # Assert start offset is correct and there aren't any off-by-one errors - self.assert_new_partition_boundaries(low_watermark, high_watermark) + self.assert_new_partition_boundaries(TEST_TOPIC_NAME, low_watermark, + high_watermark) def all_segments_removed(segments, lwm): num_below_watermark = len([seg for seg in segments if seg < lwm]) @@ -352,7 +391,8 @@ def all_segments_removed(segments, lwm): def are_all_local_segments_removed(): local_snapshot = self.redpanda.storage( - all_nodes=True).segments_by_node("kafka", self.topic, 0) + all_nodes=True).segments_by_node("kafka", TEST_TOPIC_NAME, + 0) return all([ all_segments_removed( get_segment_boundaries_via_local_storage(node), @@ -372,7 +412,7 @@ def are_all_local_segments_removed(): f"Timed out waiting for segments, ensure no orphaned segments exist nodes that didn't crash" ) snapshot = self.redpanda.storage(all_nodes=True).segments_by_node( - "kafka", self.topic, 0) + "kafka", TEST_TOPIC_NAME, 0) for node_name, segments in snapshot.items(): if node_name != stopped_node.name: self.redpanda.logger.debug( @@ -386,7 +426,7 @@ def are_all_local_segments_removed(): def are_all_cloud_segments_removed(): bv = BucketView(self.redpanda) cloud_manifest = bv.get_partition_manifest( - NTP("kafka", self.topic, 0)) + NTP("kafka", TEST_TOPIC_NAME, 0)) return all_segments_removed( get_segment_boundaries_via_cloud_manifest(cloud_manifest), rp_truncate_offset) @@ -412,19 +452,20 @@ def test_delete_records_bounds_checking(self, cloud_storage_enabled): self._start(cloud_storage_enabled) - num_records = self.get_topic_info().high_watermark + num_records = self.get_topic_info(TEST_TOPIC_NAME).high_watermark def bad_truncation(truncate_offset): with expect_exception(RpkException, lambda e: out_of_range_prefix in str(e)): - self.rpk.trim_prefix(self.topic, truncate_offset, [0]) + self.rpk.trim_prefix(TEST_TOPIC_NAME, truncate_offset, [0]) # Try truncating past the end of the log bad_truncation(num_records + 1) # Truncate to attempt to truncate before new beginning truncate_offset = 125 - low_watermark = self.delete_records(self.topic, 0, truncate_offset) + low_watermark = self.delete_records(TEST_TOPIC_NAME, 0, + truncate_offset) assert low_watermark == truncate_offset # Try to truncate before and at the low_watermark @@ -435,7 +476,7 @@ def bad_truncation(truncate_offset): # are 1 offset away from eachother truncate_offset = num_records - 2 for t_ofs in range(truncate_offset, num_records + 1): - low_watermark = self.delete_records(self.topic, 0, t_ofs) + low_watermark = self.delete_records(TEST_TOPIC_NAME, 0, t_ofs) assert low_watermark == t_ofs # Assert that nothing is readable @@ -463,24 +504,27 @@ def test_delete_records_empty_or_missing_topic_or_partition( with expect_exception(RpkException, lambda e: unknown_topic_or_partition in str(e)): missing_idx = 15 - self.rpk.trim_prefix(self.topic, 0, [missing_idx]) + self.rpk.trim_prefix(TEST_TOPIC_NAME, 0, [missing_idx]) # Assert out of range occurs on an empty topic with expect_exception(RpkException, lambda e: out_of_range_prefix in str(e)): - self.rpk.trim_prefix(self.topic, 0, [0]) + self.rpk.trim_prefix(TEST_TOPIC_NAME, 0, [0]) # Assert correct behavior on a topic with 1 record - self.rpk.produce(self.topic, "k", "v", partition=0) - self.wait_until_records(1, timeout_sec=5, backoff_sec=1) + self.rpk.produce(TEST_TOPIC_NAME, "k", "v", partition=0) + self.wait_until_records(TEST_TOPIC_NAME, + 1, + timeout_sec=5, + backoff_sec=1) with expect_exception(RpkException, lambda e: out_of_range_prefix in str(e)): - self.rpk.trim_prefix(self.topic, 0, [0]) + self.rpk.trim_prefix(TEST_TOPIC_NAME, 0, [0]) # ... truncating at high watermark 1 should delete all data - low_watermark = self.delete_records(self.topic, 0, 1) + low_watermark = self.delete_records(TEST_TOPIC_NAME, 0, 1) assert low_watermark == 1 - topic_info = self.get_topic_info() + topic_info = self.get_topic_info(TEST_TOPIC_NAME) assert topic_info.high_watermark == 1 @cluster(num_nodes=3) @@ -502,9 +546,10 @@ def test_delete_records_with_transactions(self, cloud_storage_enabled): def delete_records_within_transaction(reporter): try: - high_watermark = int(self.get_topic_info().high_watermark) - response = self.rpk.trim_prefix(self.topic, high_watermark, - [0]) + high_watermark = int( + self.get_topic_info(TEST_TOPIC_NAME).high_watermark) + response = self.rpk.trim_prefix(TEST_TOPIC_NAME, + high_watermark, [0]) assert len(response) == 1 # Even though the on disk data may be late to evict, the start offset # should have been immediately updated @@ -516,7 +561,7 @@ def delete_records_within_transaction(reporter): # Write 20 records and leave the transaction open producer.begin_transaction() for idx in range(0, 20): - producer.produce(self.topic, '0', payload, 0) + producer.produce(TEST_TOPIC_NAME, '0', payload, 0) producer.flush() # The eviction_stm will be re-queuing the event until the max collectible @@ -551,19 +596,19 @@ def test_delete_records_concurrent_truncations(self, # Should complete producing within 20s producer = KgoVerifierProducer(self._ctx, self.redpanda, - self.topic, + TEST_TOPIC_NAME, msg_size=512, msg_count=20000, rate_limit_bps=500000) # 0.5/mbs consumer = KgoVerifierConsumerGroupConsumer(self._ctx, self.redpanda, - self.topic, + TEST_TOPIC_NAME, 512, 1, max_msgs=20000) def issue_delete_records(): - topic_info = self.get_topic_info() + topic_info = self.get_topic_info(TEST_TOPIC_NAME) start_offset = topic_info.start_offset + 1 high_watermark = topic_info.high_watermark - 1 if high_watermark - start_offset <= 0: @@ -572,17 +617,19 @@ def issue_delete_records(): truncate_point = random.randint(start_offset, high_watermark) self.redpanda.logger.info( f"Issuing delete_records request at offset: {truncate_point}") - response = self.rpk.trim_prefix(self.topic, truncate_point, [0]) + response = self.rpk.trim_prefix(TEST_TOPIC_NAME, truncate_point, + [0]) assert len(response) == 1 assert response[0].new_start_offset == truncate_point assert response[0].error_msg == "" # Cannot assert end boundaries as there is a concurrent producer # moving the hwm forward - self.assert_start_partition_boundaries(truncate_point) + self.assert_start_partition_boundaries(TEST_TOPIC_NAME, + truncate_point) def issue_partition_move(): - self._dispatch_random_partition_move(self.topic, 0) - self._wait_for_move_in_progress(self.topic, 0, timeout=5) + self._dispatch_random_partition_move(TEST_TOPIC_NAME, 0) + self._wait_for_move_in_progress(TEST_TOPIC_NAME, 0, timeout=5) def background_test_loop(reporter, fn,