Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updated #124: Added AWS Instances Cloudwatch collector #246

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
167 changes: 167 additions & 0 deletions collectors/300/aws_instances_stats.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
#!/usr/bin/python
import os
import sys
import time
import datetime
import re
import json
from collections import OrderedDict
import exceptions
import threading
import Queue
from time import mktime
from collectors.etc import awsconf

try:
import boto.ec2
from boto.ec2.cloudwatch import CloudWatchConnection
from boto.ec2.cloudwatch import regions
except ImportError:
exit(13)

ILLEGAL_CHARS_REGEX = re.compile('[^a-zA-Z0-9\- _./]')

COLLECTION_INTERVAL = 300

STATISTICS = frozenset([
'Minimum',
'Maximum',
'Average',
'Sum',
'SampleCount'
])

sendQueue = Queue.Queue()

def cloudwatch_connect_to_region(region):
access_key, secret_access_key = awsconf.get_accesskey_secretkey()
try:
conn = boto.ec2.cloudwatch.connect_to_region(region, aws_access_key_id=access_key, aws_secret_access_key=secret_access_key)
except:
print "Unexpected error:", sys.exc_info()[0]
else:
return conn

def cloudwatch_list_metrics(conn):
return conn.list_metrics()

def cloudwatch_query_metric(region, metric, statistic):
end = datetime.datetime.utcnow()
start = end - datetime.timedelta(seconds=COLLECTION_INTERVAL)
dimensions = metric.dimensions
if len(dimensions) > 0:
metric_name, tags = build_tag_list(metric.name.lower(), region, dimensions)
datapoints = metric.query(start, end, statistic)
if len(datapoints) > 0:
for datapoint in datapoints:
timestamp = format_timestamp(str(datapoint['Timestamp']))
value = int(datapoint[statistic])
metric_full = " %s.%s.%s" % (metric.namespace.lower().replace('/','.'), metric_name, statistic.lower())
output = "%s.%s.%s %s %s %s" % (metric.namespace.lower().replace('/','.'), metric_name, statistic.lower(), str(timestamp), str(value), tags)
if validate_line_parses(output):
sendQueue.put({'timestamp': timestamp, 'output': output})

def format_timestamp(ts):
st = time.strptime(ts, "%Y-%m-%d %H:%M:%S")
dt = datetime.datetime.fromtimestamp(mktime(st))
return dt.strftime("%s")

def build_tag_list(metric_name, region, dimensions):
tags = "region=" + str(region)

for tagk,tagv in dimensions.iteritems():
tagkey = str(tagk)
tagval = str(tagv[0])
tags += " %s=%s" % (tagkey, tagval)

if metric_name == 'networkout':
metric_name = 'network'
tags += " %s=%s" % ('direction', 'out')

if metric_name == 'networkin':
metric_name = 'network'
tags += " %s=%s" % ('direction', 'in')

return metric_name.strip().lower(), tags.strip().lower()

def ec2_connect_to_region(region):
access_key, secret_access_key = awsconf.get_accesskey_secretkey()
return boto.ec2.connect_to_region(region, aws_access_key_id=access_key, aws_secret_access_key=secret_access_key)

def ec2_list_regions():
ec2_regions = []
for i in boto.ec2.cloudwatch.regions():
ec2_regions.append(str(i.name))
return ec2_regions

def handle_region(region, statistic):
try:
sys.stderr.write("starting region " + region + "," + statistic + "\n")
region_conn = cloudwatch_connect_to_region(region)
metrics = cloudwatch_list_metrics(region_conn)
for metric in metrics:
cloudwatch_query_metric(region, metric, statistic)
except boto.exception.BotoServerError, e:
sys.stderr.write("finished region " + region + "," + statistic + "\n")
pass
except exceptions.KeyboardInterrupt:
return 0
except:
sys.stderr.write("failed region " + region + "," + statistic + "\n")
raise
else:
sys.stderr.write("finished region " + region + "," + statistic + "\n")

def send_metrics():
sys.stderr.write("Processing sendQueue \n")
datapoints = {}
try:
while not sendQueue.empty():
item = sendQueue.get()
timestamp = item['timestamp']
output = item['output']
if not timestamp in datapoints:
datapoints[timestamp] = []
datapoints[timestamp].append(output)
sendQueue.task_done()
sys.stderr.write("Queue Emptied, sorting output")
# sortedDatapoints = OrderedDict(sorted(datapoints.iteritems(), key=lambda x: x[0]))
# for timestamp,outputs in sortedDatapoints.iteritems:
for outputs in sorted(datapoints.iteritems(), key=lambda x: x[1]):
for output in outputs:
for t in output:
print t
except exceptions.KeyboardInterrupt:
return 0

# Uses the same code as tcollector here
def validate_line_parses(line):
parsed = re.match('^([-_./a-zA-Z0-9]+)\s+' # Metric name.
'(\d+)\s+' # Timestamp.
'(\S+?)' # Value (int or float).
'((?:\s+[-_./a-zA-Z0-9]+=[-_./a-zA-Z0-9]+)*)$', # Tags
line)
if parsed is None:
sys.stderr.write("invalid data: %s \n" % (line))
return False
metric, timestamp, value, tags = parsed.groups()
return True

def main():
try:
regions = ec2_list_regions()
for reg in regions:
for statistic in STATISTICS:
t = threading.Thread(target=handle_region, kwargs={"region":reg, "statistic":statistic})
t.start()
while threading.activeCount() > 1:
time.sleep(1)
except exceptions.KeyboardInterrupt:
return 0
except:
raise
if not sendQueue.empty():
send_metrics()

if __name__ == "__main__":
sys.exit(main())
4 changes: 4 additions & 0 deletions collectors/etc/awsconf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
#!/usr/bin/env python

def get_accesskey_secretkey():
return ('<access_key_id>', '<secret_access_key>')