Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize auto-commit process #31

Merged
merged 4 commits into from
Jun 25, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 9 additions & 17 deletions kafka/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def __init__(self, client, group, topic, auto_commit=True,
# Set up the auto-commit timer
if auto_commit is True and auto_commit_every_t is not None:
self.commit_timer = ReentrantTimer(auto_commit_every_t,
self._timed_commit)
self.commit)
self.commit_timer.start()

def get_or_init_offset_callback(resp):
Expand Down Expand Up @@ -150,15 +150,6 @@ def pending(self, partitions=[]):

return total

def _timed_commit(self):
"""
Commit offsets as part of timer
"""
self.commit()

# Once the commit is done, start the timer again
self.commit_timer.start()

def commit(self, partitions=[]):
"""
Commit offsets for this consumer
Expand All @@ -167,11 +158,17 @@ def commit(self, partitions=[]):
all of them
"""

# short circuit if nothing happened
# short circuit if nothing happened. This check is kept outside
# to prevent un-necessarily acquiring a lock for checking the state
if self.count_since_commit == 0:
return

with self.commit_lock:
# Do this check again, just in case the state has changed
# during the lock acquiring timeout
if self.count_since_commit == 0:
return

reqs = []
if len(partitions) == 0: # commit all partitions
partitions = self.offsets.keys()
Expand Down Expand Up @@ -201,12 +198,7 @@ def _auto_commit(self):
return

if self.count_since_commit > self.auto_commit_every_n:
if self.commit_timer is not None:
self.commit_timer.stop()
self.commit()
self.commit_timer.start()
else:
self.commit()
self.commit()

def __iter__(self):
"""
Expand Down
40 changes: 31 additions & 9 deletions kafka/util.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from collections import defaultdict
from itertools import groupby
import struct
from threading import Timer
from threading import Thread, Event


def write_int_string(s):
Expand Down Expand Up @@ -81,19 +81,41 @@ class ReentrantTimer(object):

t: timer interval in milliseconds
fn: a callable to invoke
args: tuple of args to be passed to function
kwargs: keyword arguments to be passed to function
"""
def __init__(self, t, fn):
self.timer = None
self.t = t
def __init__(self, t, fn, *args, **kwargs):

if t <= 0:
raise ValueError('Invalid timeout value')

if not callable(fn):
raise ValueError('fn must be callable')

self.thread = None
self.t = t / 1000.0
self.fn = fn
self.args = args
self.kwargs = kwargs
self.active = None

def _timer(self, active):
while not active.wait(self.t):
self.fn(*self.args, **self.kwargs)

def start(self):
if self.timer is not None:
self.timer.cancel()
if self.thread is not None:
self.stop()

self.timer = Timer(self.t / 1000., self.fn)
self.timer.start()
self.active = Event()
self.thread = Thread(target=self._timer, args=(self.active,))
self.thread.daemon = True # So the app exits when main thread exits
self.thread.start()

def stop(self):
self.timer.cancel()
if self.thread is None:
return

self.active.set()
self.thread.join(self.t + 1)
self.timer = None