-
Notifications
You must be signed in to change notification settings - Fork 104
set kafka offset to retention - segmentSize #850
Conversation
zk = KazooClient(hosts='zookeeper:2181') | ||
zk.start() | ||
try: | ||
resp = zk.get("/config/topics/%s" % topic) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess it's safe to assume that kafka won't change that too often
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, this wont change. Kafka 0.11 and newer have an AdminApi for querying/setting topic config, but there are no clients for it yet.
scripts/k8s/getOffset.py
Outdated
print "oldest" | ||
sys.exit(0) | ||
|
||
data = json.loads(resp[0]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this could throw a ValueError
exception if there is malformed data, probably better to catch it and print oldest
scripts/k8s/getOffset.py
Outdated
|
||
data = json.loads(resp[0]) | ||
|
||
retention=int(data['config'].get('retention.ms', 0)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this could throw KeyError
if config
isn't present, data.get('config', {}).get('retention.ms', 0)
should be safer. also ValueError
if the value isn't numeric
scripts/k8s/getOffset.py
Outdated
sys.exit(0) | ||
|
||
# oldest we can safetly consume from in minutes | ||
oldest = int((retention - segmentSize)/(1000 * 60)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it necessary to convert that into int()
? i think it should be int
anyway
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i want an int not a float
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i don't think python would make this a float
>>> segmentSize=int(100)
>>> retention=int(1234512345)
>>> (retention - segmentSize)/(1000 * 60)
20575
>>> type((retention - segmentSize)/(1000 * 60))
<type 'int'>
That's a nice idea. The only problem I see is if for some reason it stops working, like for example if kafka changed the location or format of that config, and if all the exceptions get handled and it just outputs |
If the kafka offset is set to "oldest", query zookeeper for the topic retention and segment size. Then set the offset config var to retention - segmentSize. This ensures that MT doesnt start consuming from the oldest log segment which could be deleted while MT is still consuming it.
e9527fe
to
40be383
Compare
scripts/k8s/entrypoint.sh
Outdated
fi | ||
|
||
if [ x"$MT_KAFKA_CLUSTER_OFFSET" = "xoldest" ]; then | ||
MT_KAFKA_CLUSTER_OFFSET=$(/getOffset.py $MT_KAFKA_CLUSTER_TOPIC) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
need to export these vars
can we name this new option "smart" or "auto" or something, to avoid confusion and clearly distinguish is from "oldest". |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
comments
If the kafka offset is set to "oldest", query zookeeper for the topic retention
and segment size. Then set the offset config var to retention - segmentSize.
This ensures that MT doesnt start consuming from the oldest log segment which
could be deleted while MT is still consuming it, which leads to MT having to shutdown and restart
this is a step towards improving backlog replay
relates to https://github.com/raintank/hosted-metrics-api/issues/82
and
issue #614