Skip to content

Commit

Permalink
Commit pending synchronously when closing Kafka sink
Browse files Browse the repository at this point in the history
  • Loading branch information
cbornet committed Sep 5, 2023
1 parent 360f490 commit 0bc1638
Showing 1 changed file with 28 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -136,21 +136,35 @@ def __init__(self, configs):
self.uncommitted: Dict[TopicPartition, SortedSet[int]] = {}
self.pending_commits = 0
self.commit_failure = None
self.lock = threading.Lock()
self.lock = threading.RLock()
self.commit_ever_called = False

def start(self):
self.consumer = Consumer(self.configs)
logging.info(f"Subscribing consumer to {self.topic}")
self.consumer.subscribe(
[self.topic], on_assign=self.on_assign, on_revoke=self.on_revoke
)

def close(self):
logging.info(
f"Closing consumer to {self.topic} with {self.pending_commits} pending "
f"commits"
)
if self.consumer:
self.consumer.close()
with self.lock:
if self.consumer:
logging.info(
f"Closing consumer to {self.topic} with {self.pending_commits} "
f"pending commits and {len(self.uncommitted)} uncommitted "
f"offsets: {self.uncommitted} "
)
if self.commit_ever_called:
offsets = [
TopicPartition(
topic_partition.topic,
partition=topic_partition.partition,
offset=offset,
)
for topic_partition, offset in self.committed.items()
]
self.consumer.commit(offsets=offsets, asynchronous=False)
self.consumer.close()

def read(self) -> List[KafkaRecord]:
if self.commit_failure:
Expand All @@ -161,7 +175,10 @@ def read(self) -> List[KafkaRecord]:
if message.error():
logging.error(f"Consumer error: {message.error()}")
return []
logging.info(f"Received message from Kafka {message}")
logging.debug(
f"Received message from Kafka topics {self.consumer.assignment()}:"
f" {message}"
)
return [KafkaRecord(message)]

def commit(self, records: List[KafkaRecord]):
Expand All @@ -181,6 +198,7 @@ def commit(self, records: List[KafkaRecord]):
return

with self.lock:
self.commit_ever_called = True
for record in records:
topic_partition = record.topic_partition()

Expand Down Expand Up @@ -236,11 +254,11 @@ def on_commit(self, error, partitions):
with self.lock:
self.pending_commits -= 1
if error:
logging.error(f"Error committing offsets: {error}")
logging.error(f"Error committing offsets on topic {self.topic}: {error}")
if not self.commit_failure:
self.commit_failure = KafkaException(error)
else:
logging.info(f"Offsets committed: {partitions}")
logging.debug(f"Offsets committed: {partitions}")

def on_assign(self, consumer: Consumer, partitions: List[TopicPartition]):
with self.lock:
Expand Down

0 comments on commit 0bc1638

Please sign in to comment.