-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathstatistics_feeder.py
319 lines (263 loc) · 11.9 KB
/
statistics_feeder.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
from slac_utils.time import now, datetime_to_epoch, sleep
import datetime
from slac_utils.queues import PyAmqpLibQueue
from slac_utils.hashing import ConsistentHashRing
from collections import deque
import gc
from struct import pack
from pickle import dumps
import socket
import logging
class StatisticsMixin( object ):
"""
a supervisor that relays statistics of workers back to a store
"""
stats_feeder = None
stats_feeder_obj = None
stats_premable = ''
def init_stats(self,*args,**kwargs):
if self.stats_feeder and 'stats_host' in kwargs and 'stats_port' in kwargs:
f = getattr( self, 'stats_feeder' )
self.stats_feeder_obj = f( host=kwargs['stats_host'], port=kwargs['stats_port' ])
def statistics_key( self, job ):
return self.stats_preamble + '.' + str(job['device'])
def process_stats(self, job, retries=1 ):
if 'stats' in job['_meta']:
t = now()
# logging.warn("sending stats: " + str(t) + ', key=' + str(self.statistics_key(job)) + ": " + str(job['_meta']['stats']))
try:
self.stats_feeder_obj.send( t, self.statistics_key(job), statistics=job['_meta']['stats'] )
except Exception,e:
logging.error("Could not send statistics: " + str(e))
class StatisticsFeeder( object ):
"""
generic agent to push statistics to something
"""
stats_messages = None
def __init__(self,*args,**kwargs):
for k,v in kwargs.iteritems():
setattr( self, k, v )
self.stats_messages = deque()
self.init(*args,**kwargs)
def init(self,*args,**kwargs):
pass
def __enter__(self):
return self
def __exit__(self,*args,**kwargs):
pass
def send( self, statistics={} ):
raise NotImplementedError, 'not implemented'
class PyAmqpLibFeeder( StatisticsFeeder ):
queue = None
def init(self,*args,**kwargs):
logging.debug("ARGS: %s KWARGS: %s"%(args,kwargs))
super( PyAmqpLibFeeder, self ).init(*args,**kwargs )
if kwargs.has_key('key_premable'):
del kwargs['key_preamble']
else:
self.key_preamble = ''
self.queue = PyAmpqLibQueue(**kwargs)
def __enter__(self):
self.queue.__enter__()
def __exit__(self):
self.queue.__exit__()
def send( self, time, key, statistics={}, retries=3 ):
if isinstance( time, datetime.datetime ):
time = datetime_to_epoch( time )
for k,v in statistics.iteritems():
self.queue.put( '%f %d' % (v,time), key=self.key_preamble + '.' + k )
class CarbonFeeder( StatisticsFeeder ):
"""
A generic class to send statistics to graphite/carbon
"""
pickle = True
sock = None
state = False
backing_off = False
backoff_time = 0.5
backoff_window = 0
def init(self,*args,**kwargs):
super( CarbonFeeder, self ).init(*args,**kwargs)
self.pickle = True
self.backing_off = False
self.backoff_window = 0
self.__enter__()
def __enter__(self):
try:
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
logging.info("connecting %s with %s:%s (%r) msgs: %d" % ( self, self.host,self.port,self.sock, len(self.stats_messages), ) )
# self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.sock.connect( ( self.host, int(self.port) ) )
self.sock.settimeout( 2*self.backoff_time )
# logging.info(" connect ok, timeout %s" % (self.sock.gettimeout()) )
self.state = True
except socket.error, e:
self.state = False
# logging.warn(" connect not ok: %s" % (e,))
if self.sock:
self.sock.close()
self.sock = None
logging.warn(" could not connect to %s:%s: %s" %(self.host,self.port,e) )
return self
def __exit__(self,*args,**kwargs):
if self.sock:
self.sock.close()
self.state = False
self.sock = None
def __del__(self):
# flush cache out to host, if it's down, then try 3 times (each 10x backoff time wait)
self.backoff_time = self.backoff_time * 3
while len(self.stats_messages) > 0:
n = self._send( 100 )
logging.info("flushing... %s" % (n))
def reconnect( self ):
self.__exit__()
sleep( self.backoff_time )
self.__enter__()
def _send( self, number ):
size = len(self.stats_messages)
if number > size:
number = size
if not self.state or not self.sock:
self.reconnect()
# this = deque()
this = []
try:
if self.sock == None:
raise Exception, 'no socket available'
ts = now()
# as we're using deque, we have to pop if (no peeking)
for x in xrange(0,number):
this.append(self.stats_messages.popleft())
# try sending the stuff
# logging.debug("sending %s" % this)
payload = dumps(this)
header = pack("!L", len(payload))
self.sock.sendall( header + payload )
# time it and report
ts = now() - ts
ts = "%.3f" % float( (float(ts.microseconds) + float(ts.seconds*1000000))/1000000 )
logging.info("%s:%s sent %s datapoints (%s left) in %ssec" % (self.host, self.port, number, size - number, ts ) )
return len(this)
except Exception,e:
self.__exit__()
# don't loose the data! put it back into the deque
if len(this) > 0:
self.stats_messages.extendleft(this)
logging.warning("%s:%s send error: %s %s" % ( self.host, self.port, type(e), e ))
# logging.error('could not store stats %s (count %s)' % (key,len(self.stats_messages)))
return None
def load_from_disk( self ):
pass
def page_to_disk( self ):
"""
in order to prevent hogging up memory, if the len(self.stats_message) gets too large
then we page the data off to disk
"""
pass
def send( self, time, key, statistics, min_chunks=500, max_chunks=750, backoff_threshold=25000 ):
"""
try to be a little clever in not stalling the updates as we can keep a cache on this
system, we use two variables: backing_off and backoff_window. if the grpahite server is
stalled, we set backing_off to true, and we do not attempt to send anything until we
have accumulated max_chunks more items (since the last try)
"""
if isinstance( time, datetime.datetime ):
time = datetime_to_epoch( time )
gc.disable()
for k,v in statistics.iteritems():
try:
v = float(v)
this_key = "%s.%s" % (key,k)
this_key = this_key.replace( '/', '.' )
# logging.info(" %s: %s\t%s" % ( time, this_key, v ))
self.stats_messages.append( ( this_key, (time, v) ) )
self.backoff_window = self.backoff_window + 1
except Exception, e:
logging.error("Error parsing %s: (%s) %s in %s" % (v, type(e), e, key) )
gc.enable()
# facility not hammering the failed host too much and thus slowing us down
# we use self.backoff_window as a counter for outstanding messages
# if this value goes over max_chunks, we try sending again
if self.backing_off:
if self.backoff_window > backoff_threshold:
self.backing_off = False
self.backoff_window = self.backoff_window - backoff_threshold
# okay to send
if not self.backing_off:
# send! if we succeed, then good
# if we're backing off, then try sending max_chunks, else min_chunks
# also, try to avoid bursting after backing_off?
if self.backoff_window >= min_chunks:
size = len(self.stats_messages) # send everything we have
num = size
if num > max_chunks:
num = max_chunks # limit number of items sent
# send the data
sent = self._send( num )
if not sent == None:
self.backing_off = False
self.backoff_window = self.backoff_window - sent
if self.backoff_window < 0:
self.backoff_window = 0 # important!
# logging.info("%s:%s after %s\tsize %s/%s window %s: %s"%( self.host, self.port, sent, size, len(self.stats_messages), self.backoff_window, self.backing_off ) )
return True
# if sending fails, then we wait another width before we try again
else:
# use fact that we should have sent the entire size if we aren't backing offc
self.backing_off = True
# logging.error("%s %s: send failed: num %s size %s window %s, backoff %s" % (self.host, self.port, num, len(self.stats_messages), self.backoff_window, self.backing_off ) )
return False
return None
class MultiCarbonFeeder( StatisticsFeeder ):
instance_ports = {}
ring = None
feeders = {}
def init(self,**kwargs):
super( MultiCarbonFeeder, self).init(**kwargs)
# expects self.instances = [ 'ip:port:instance', 'ip:port:instance', ]
if not len(self.instances) > 0:
raise Exception, 'no carbon instances defined, use CARBON_INSTANCES'
self.instance_ports = {} # { (server, instance) : port }
self.ring = ConsistentHashRing([])
self.feeders = {}
for i in self.instances:
s, p, n = i.split(':')
self.add_instance( s, p, n )
# connect to each instance
self.feeders[(s,p,n)] = CarbonFeeder( host=s, port=p )
def __exit__(self,*args,**kwargs):
for i in self.feeders:
self.feeder.__exit__(*args,**kwargs)
def add_instance(self,server,port,instance):
if (server, instance) in self.instance_ports:
raise Exception("destination instance (%s, %s) already configured" % (server, instance))
self.instance_ports[ (server, instance) ] = port
self.ring.add_node( (server, instance) )
def remove_instance(self,server,port,instance):
if (server, instance) not in self.instance_ports:
raise Exception("destination instance (%s, %s) not configured" % (server, instance))
del self.instance_ports[ (server, instance) ]
self.ring.remove_node( (server, instance) )
def get_instance(self,key):
(s, i) = self.ring.get_node(key)
p = self.instance_ports[ (s, i) ]
k = (s, p, i)
if k in self.feeders:
# logging.info("%s:%s:%s" % (s, p, i))
return self.feeders[k]
raise Exception, 'could not find feeder for %s:%s:%s' % (s, p, i)
def send( self, time, key, statistics={}, min_chunks=500, max_chunks=1000, backoff_threshold=25000 ):
# logging.info("sending...")
# as statistics is a hash, we need to append concat with key to get appropriate feeder
data = {}
for k,v in statistics.iteritems():
this_key = "%s.%s" % (key,k)
this_key = this_key.replace( '/', '.' )
# for f in self.get_instances(this_key):
f = self.get_instance(this_key)
if not f in data:
data[f] = {}
data[f][k] = v
for f in data:
f.send( time, key, statistics=data[f], min_chunks=min_chunks, max_chunks=max_chunks, backoff_threshold=backoff_threshold )