Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Zabbix MySQL replication bridge local DB cache and refresh. #175

Merged
merged 16 commits into from
Sep 14, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 32 additions & 31 deletions collectors/0/zabbix_bridge.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@
# Dump all replication item/metric insert events from a zabbix mysql server
#

import re
import sqlite3
import sys
import time

try:
from pymysqlreplication import BinLogStreamReader
Expand All @@ -26,11 +27,6 @@
except ImportError:
BinLogStreamReader = None # This is handled gracefully in main()

try:
import pymysql
except ImportError:
pymysql = None # This is handled gracefully in main()

from collectors.etc import zabbix_bridge_conf
from collectors.lib import utils

Expand All @@ -40,9 +36,6 @@ def main():
if BinLogStreamReader is None:
utils.err("error: Python module `pymysqlreplication' is missing")
return 1
if pymysql is None:
utils.err("error: Python module `pymysql' is missing")
return 1
settings = zabbix_bridge_conf.get_settings()

# Set blocking to True if you want to block and wait for the next event at
Expand All @@ -53,7 +46,19 @@ def main():
resume_stream=True,
blocking=True)

hostmap = gethostmap(settings) # Prime initial hostmap
db_filename = settings['sqlitedb']
dbcache = sqlite3.connect(':memory:')
cachecur = dbcache.cursor()
cachecur.execute("ATTACH DATABASE '%s' as 'dbfile'" % (db_filename,))
cachecur.execute('CREATE TABLE zabbix_cache AS SELECT * FROM dbfile.zabbix_cache')
cachecur.execute('CREATE UNIQUE INDEX uniq_zid on zabbix_cache (id)')

# tcollector.zabbix_bridge namespace for internal Zabbix bridge metrics.
log_pos = 0
key_lookup_miss = 0
sample_last_ts = int(time.time())
last_key_lookup_miss = 0

for binlogevent in stream:
if binlogevent.schema == settings['mysql']['db']:
table = binlogevent.table
Expand All @@ -62,34 +67,30 @@ def main():
for row in binlogevent.rows:
r = row['values']
itemid = r['itemid']
try:
hm = hostmap[itemid]
print "zbx.%s %d %s host=%s proxy=%s" % (hm['key'], r['clock'], r['value'], hm['host'], hm['proxy'])
except KeyError:
cachecur.execute('SELECT id, key, host, proxy FROM zabbix_cache WHERE id=?', (itemid,))
row = cachecur.fetchone()
if (row is not None):
print "zbx.%s %d %s host=%s proxy=%s" % (row[1], r['clock'], r['value'], row[2], row[3])
if ((int(time.time()) - sample_last_ts) > settings['internal_metric_interval']): # Sample internal metrics @ 10s intervals
sample_last_ts = int(time.time())
print "tcollector.zabbix_bridge.log_pos %d %s" % (sample_last_ts, log_pos)
print "tcollector.zabbix_bridge.key_lookup_miss %d %s" % (sample_last_ts, key_lookup_miss)
print "tcollector.zabbix_bridge.timestamp_drift %d %s" % (sample_last_ts, (sample_last_ts - r['clock']))
if ((key_lookup_miss - last_key_lookup_miss) > settings['dbrefresh']):
print "tcollector.zabbix_bridge.key_lookup_miss_reload %d %s" % (sample_last_ts, (key_lookup_miss - last_key_lookup_miss))
cachecur.execute('DROP TABLE zabbix_cache')
cachecur.execute('CREATE TABLE zabbix_cache AS SELECT * FROM dbfile.zabbix_cache')
last_key_lookup_miss = key_lookup_miss
else:
# TODO: Consider https://wiki.python.org/moin/PythonDecoratorLibrary#Retry
hostmap = gethostmap(settings)
utils.err("error: Key lookup miss for %s" % (itemid))
key_lookup_miss += 1
sys.stdout.flush()
# if n seconds old, reload
# settings['gethostmap_interval']

dbcache.close()
stream.close()


def gethostmap(settings):
conn = pymysql.connect(**settings['mysql'])
cur = conn.cursor()
cur.execute("SELECT i.itemid, i.key_, h.host, h2.host AS proxy FROM items i JOIN hosts h ON i.hostid=h.hostid LEFT JOIN hosts h2 ON h2.hostid=h.proxy_hostid")
# Translation of item key_
# Note: http://opentsdb.net/docs/build/html/user_guide/writing.html#metrics-and-tags
disallow = re.compile(settings['disallow'])
hostmap = {}
for row in cur:
hostmap[row[0]] = { 'key': re.sub(disallow, '_', row[1]), 'host': re.sub(disallow, '_', row[2]), 'proxy': row[3] }
cur.close()
conn.close()
return hostmap

if __name__ == "__main__":
sys.stdin.close()
sys.exit(main())
Expand Down
79 changes: 79 additions & 0 deletions collectors/900/zabbix_bridge_cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
#!/usr/bin/env python
#
# Copyright (C) 2014 The tcollector Authors.
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or (at your
# option) any later version. This program is distributed in the hope that it
# will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty
# of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser
# General Public License for more details. You should have received a copy
# of the GNU Lesser General Public License along with this program. If not,
# see <http://www.gnu.org/licenses/>.
#
# Dump all replication item/metric insert events from a zabbix mysql server
# to a local sqlite cache (that can also be shared).
#

import os
import re
import sqlite3
import sys
import time
try:
import pymysql
except ImportError:
pymysql = None # This is handled gracefully in main()

from collectors.etc import zabbix_bridge_conf
from collectors.lib import utils


def main():
utils.drop_privileges()
if pymysql is None:
utils.err("error: Python module `pymysql' is missing")
return 1
settings = zabbix_bridge_conf.get_settings()

db_filename = settings['sqlitedb']
db_is_new = not os.path.exists(db_filename)
dbcache = sqlite3.connect(db_filename)

if db_is_new:
utils.err("Zabbix bridge SQLite DB file does not exist; creating: %s" % (db_filename))
cachecur = dbcache.cursor()
cachecur.execute('''CREATE TABLE zabbix_cache
(id integer, key text, host text, proxy text)''')
dbcache.commit()
else:
utils.err("Zabbix bridge SQLite DB exists @ %s" % (db_filename))


dbzbx = pymysql.connect(**settings['mysql'])
zbxcur = dbzbx.cursor()
zbxcur.execute("SELECT i.itemid, i.key_, h.host, h2.host AS proxy FROM items i JOIN hosts h ON i.hostid=h.hostid LEFT JOIN hosts h2 ON h2.hostid=h.proxy_hostid")
# Translation of item key_
# Note: http://opentsdb.net/docs/build/html/user_guide/writing.html#metrics-and-tags
disallow = re.compile(settings['disallow'])
cachecur = dbcache.cursor()
print('tcollector.zabbix_bridge.deleterows %d %s' %
(int(time.time()), cachecur.execute('DELETE FROM zabbix_cache').rowcount))
rowcount = 0
for row in zbxcur:
cachecur.execute('''INSERT INTO zabbix_cache(id, key, host, proxy) VALUES (?,?,?,?)''',
(row[0], re.sub(disallow, '_', row[1]), re.sub(disallow, '_', row[2]), row[3]))
rowcount += 1

print('tcollector.zabbix_bridge.rows %d %s' % (int(time.time()), rowcount))
zbxcur.close()
dbcache.commit()

dbzbx.close()
dbcache.close()


if __name__ == "__main__":
sys.stdin.close()
sys.exit(main())
8 changes: 5 additions & 3 deletions collectors/etc/zabbix_bridge_conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ def get_settings():
'passwd': '',
'db': 'zabbix'
},
'slaveid': 3, # Slave identifier, it should be unique.
'disallow': '[^a-zA-Z0-9\-_\.]', # Regex of characters to replace with _.
'gethostmap_interval': 300 # How often to reload itemid, hostmap from DB.
'slaveid': 3, # Slave identifier, it should be unique.
'disallow': '[^a-zA-Z0-9\-_\.]', # Regex of characters to replace with _.
'internal_metric_interval': 30, # Internal metric interval drift and error counts.
'dbrefresh': 10, # Number of key misses before DB reload from file occurs
'sqlitedb': '/tmp/zabbix_bridge.db' # SQLite DB to cache items from Zabbix DB.
}
3 changes: 2 additions & 1 deletion tcollector.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,8 @@ def run(self):
# while breaking out every once in a while to setup selects
# on new children.
while ALIVE:
for col in all_living_collectors():
alc = all_living_collectors()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this change doing?

for col in alc:
for line in col.collect():
self.process_line(col, line)

Expand Down