Skip to content

Commit

Permalink
Fix Python Kafka offsets and add tests (#343)
Browse files Browse the repository at this point in the history
  • Loading branch information
cbornet authored Sep 5, 2023
1 parent 2aaaab1 commit 0508e5b
Show file tree
Hide file tree
Showing 2 changed files with 128 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -136,19 +136,35 @@ def __init__(self, configs):
self.uncommitted: Dict[TopicPartition, SortedSet[int]] = {}
self.pending_commits = 0
self.commit_failure = None
self.lock = threading.Lock()
self.lock = threading.RLock()
self.commit_ever_called = False

def start(self):
self.consumer = Consumer(self.configs)
self.consumer.subscribe([self.topic])
logging.info(f"Subscribing consumer to {self.topic}")
self.consumer.subscribe(
[self.topic], on_assign=self.on_assign, on_revoke=self.on_revoke
)

def close(self):
logging.info(
f"Closing consumer to {self.topic} with {self.pending_commits} pending "
f"commits"
)
if self.consumer:
self.consumer.close()
with self.lock:
if self.consumer:
logging.info(
f"Closing consumer to {self.topic} with {self.pending_commits} "
f"pending commits and {len(self.uncommitted)} uncommitted "
f"offsets: {self.uncommitted} "
)
if self.commit_ever_called:
offsets = [
TopicPartition(
topic_partition.topic,
partition=topic_partition.partition,
offset=offset,
)
for topic_partition, offset in self.committed.items()
]
self.consumer.commit(offsets=offsets, asynchronous=False)
self.consumer.close()

def read(self) -> List[KafkaRecord]:
if self.commit_failure:
Expand All @@ -159,7 +175,10 @@ def read(self) -> List[KafkaRecord]:
if message.error():
logging.error(f"Consumer error: {message.error()}")
return []
logging.info(f"Received message from Kafka {message}")
logging.debug(
f"Received message from Kafka topics {self.consumer.assignment()}:"
f" {message}"
)
return [KafkaRecord(message)]

def commit(self, records: List[KafkaRecord]):
Expand All @@ -179,13 +198,9 @@ def commit(self, records: List[KafkaRecord]):
return

with self.lock:
self.commit_ever_called = True
for record in records:
topic_partition = record.topic_partition()
offset = record.offset()
offsets_for_partition = self.uncommitted.setdefault(
topic_partition, SortedSet()
)
offsets_for_partition.add(offset)

current_offset = self.committed.get(topic_partition)
if current_offset is None:
Expand All @@ -197,20 +212,31 @@ def commit(self, records: List[KafkaRecord]):
f"Current position on partition {topic_partition} is "
f"{current_offset}"
)
if current_offset >= 0:
self.committed[topic_partition] = current_offset
else:
current_offset = -1
if current_offset < 0:
current_offset = 0

offset = record.offset() + 1

if offset <= current_offset:
raise RuntimeError(
f"Commit called with offset {offset} less than the currently "
f"committed offset{current_offset}."
)

offsets_for_partition = self.uncommitted.setdefault(
topic_partition, SortedSet()
)
offsets_for_partition.add(offset)

# advance the offset up to the first gap
least = offsets_for_partition[0]
# advance the offset up to the first gap
while least == current_offset + 1:
current_offset = offsets_for_partition.pop(0)
if len(offsets_for_partition) == 0:
break
least = offsets_for_partition[0]

if current_offset != -1:
if current_offset > 0:
self.committed[topic_partition] = current_offset

offsets = [
Expand All @@ -225,13 +251,41 @@ def commit(self, records: List[KafkaRecord]):
self.consumer.commit(offsets=offsets, asynchronous=True)

def on_commit(self, error, partitions):
self.pending_commits -= 1
with self.lock:
self.pending_commits -= 1
if error:
logging.error(f"Error committing offsets: {error}")
logging.error(f"Error committing offsets on topic {self.topic}: {error}")
if not self.commit_failure:
self.commit_failure = KafkaException(error)
else:
logging.info(f"Offsets committed: {partitions}")
logging.debug(f"Offsets committed: {partitions}")

def on_assign(self, consumer: Consumer, partitions: List[TopicPartition]):
with self.lock:
logging.info(f"Partitions assigned: {partitions}")
for partition in partitions:
offset = consumer.committed([partition])[0].offset
logging.info(f"Last committed offset for {partition} is {offset}")
if offset >= 0:
self.committed[partition] = offset

def on_revoke(self, _, partitions: List[TopicPartition]):
with self.lock:
logging.info(f"Partitions revoked: {partitions}")
for partition in partitions:
if partition in self.committed:
offset = self.committed.pop(partition)
logging.info(
f"Current offset {offset} on partition {partition} (revoked)"
)
if partition in self.uncommitted:
offsets = self.uncommitted.pop(partition)
if len(offsets) > 0:
logging.warning(
f"There are uncommitted offsets {offsets} on partition "
f"{partition} (revoked), these messages will be "
f"re-delivered"
)

def get_native_consumer(self) -> Any:
return self.consumer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import time
from typing import List

import pytest
import waiting
import yaml
from confluent_kafka import Consumer, Producer, TopicPartition
Expand All @@ -34,6 +35,9 @@
SASL_MECHANISM = "PLAIN"
USERNAME = "admin"
PASSWORD = "admin-secret"
KAFKA_IMAGE = "confluentinc/cp-kafka:7.4.0"
INPUT_TOPIC = "input-topic"
OUTPUT_TOPIC = "output-topic"


def test_kafka_topic_connection():
Expand All @@ -56,8 +60,6 @@ def test_kafka_topic_connection():
+ f'username="{USERNAME}" '
+ f'password="{PASSWORD}";',
) as container:
input_topic = "input-topic"
output_topic = "output-topic"
bootstrap_server = container.get_bootstrap_server()

config_yaml = f"""
Expand All @@ -70,10 +72,10 @@ def test_kafka_topic_connection():
sasl.jaas.config: "org.apache.kafka.common.security.plain.PlainLoginModule required username='{USERNAME}' password='{PASSWORD}';"
sasl.mechanism: {SASL_MECHANISM}
input:
topic: {input_topic}
topic: {INPUT_TOPIC}
output:
topic: {output_topic}
topic: {OUTPUT_TOPIC}
agent:
applicationId: testApplicationId
Expand All @@ -94,7 +96,7 @@ def test_kafka_topic_connection():
}
)
producer.produce(
input_topic,
INPUT_TOPIC,
StringSerializer()("verification message"),
headers=[("prop-key", b"prop-value")],
)
Expand All @@ -113,7 +115,7 @@ def test_kafka_topic_connection():
)

try:
consumer.subscribe([output_topic])
consumer.subscribe([OUTPUT_TOPIC])

msg = None
for i in range(10):
Expand All @@ -137,8 +139,7 @@ def test_kafka_topic_connection():


def test_kafka_commit():
with KafkaContainer(image="confluentinc/cp-kafka:7.4.0") as container:
input_topic = "input-topic"
with KafkaContainer(image=KAFKA_IMAGE) as container:
bootstrap_server = container.get_bootstrap_server()

config_yaml = f"""
Expand All @@ -151,51 +152,69 @@ def test_kafka_commit():

config = yaml.safe_load(config_yaml)
source = kafka_connection.create_topic_consumer(
"id", config["streamingCluster"], {"topic": input_topic}
"id", config["streamingCluster"], {"topic": INPUT_TOPIC}
)
source.start()

producer = Producer({"bootstrap.servers": bootstrap_server})
for _ in range(4):
producer.produce(input_topic, b"message")
for i in range(10):
producer.produce(INPUT_TOPIC, f"message {i}".encode())
producer.flush()

records = [source.read()[0], source.read()[0]]
source.commit(records)

topic_partition = TopicPartition(INPUT_TOPIC, partition=0)

waiting.wait(
lambda: source.consumer.committed(
[TopicPartition(input_topic, partition=0)]
)[0].offset
== 1,
lambda: source.consumer.committed([topic_partition])[0].offset == 2,
timeout_seconds=5,
sleep_seconds=0.1,
)

# Re-committing records fails
with pytest.raises(RuntimeError):
source.commit(records)

committed_later = source.read()
source.commit(source.read())

# There's a hole in the offsets so the last commit offset is recorded in
# the uncommitted offsets
assert source.uncommitted[topic_partition] == {4}

time.sleep(1)
assert (
source.consumer.committed([TopicPartition(input_topic, partition=0)])[
0
].offset
== 1
)
assert source.consumer.committed([topic_partition])[0].offset == 2
source.commit(committed_later)

waiting.wait(
lambda: source.consumer.committed(
[TopicPartition(input_topic, partition=0)]
)[0].offset
== 3,
lambda: source.consumer.committed([topic_partition])[0].offset == 4,
timeout_seconds=5,
sleep_seconds=0.1,
)

source.close()

# Create a new source with the same id, resuming from the committed offset
source = kafka_connection.create_topic_consumer(
"id", config["streamingCluster"], {"topic": INPUT_TOPIC}
)
source.start()

# Check that we resume on the correct message
assert source.read()[0].value() == "message 4"

# on_assign should have been called
assert source.committed[topic_partition] == 4

source.close()

# on_revoke should have been called
assert topic_partition not in source.committed


def test_kafka_dlq():
with KafkaContainer(image="confluentinc/cp-kafka:7.4.0") as container:
input_topic = "input-topic"
output_topic = "output-topic"
with KafkaContainer(image=KAFKA_IMAGE) as container:
dlq_topic = "dlq-topic"
bootstrap_server = container.get_bootstrap_server()

Expand All @@ -207,12 +226,12 @@ def test_kafka_dlq():
bootstrap.servers: {bootstrap_server}
input:
topic: {input_topic}
topic: {INPUT_TOPIC}
deadLetterTopicProducer:
topic: {dlq_topic}
output:
topic: {output_topic}
topic: {OUTPUT_TOPIC}
agent:
applicationId: testApplicationId
Expand All @@ -227,7 +246,7 @@ def test_kafka_dlq():
config = yaml.safe_load(config_yaml)

producer = Producer({"bootstrap.servers": bootstrap_server})
producer.produce(input_topic, StringSerializer()("verification message"))
producer.produce(INPUT_TOPIC, StringSerializer()("verification message"))
producer.flush()

consumer = Consumer(
Expand Down Expand Up @@ -274,7 +293,7 @@ class SaslKafkaContainer(KafkaContainer):
""" "KafkaContainer with support for SASL in the waiting probe"""

def __init__(self, **kwargs):
super().__init__("confluentinc/cp-kafka:7.4.0", **kwargs)
super().__init__(KAFKA_IMAGE, **kwargs)

@wait_container_is_ready(
UnrecognizedBrokerVersion, NoBrokersAvailable, KafkaError, ValueError
Expand Down

0 comments on commit 0508e5b

Please sign in to comment.