-
Notifications
You must be signed in to change notification settings - Fork 1
/
send_to_rabbitmq
executable file
·122 lines (97 loc) · 4.31 KB
/
send_to_rabbitmq
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
#!/usr/bin/env python
import sys, json, os, urllib, urllib2, datetime, re, Queue, threading, time, optparse, base64
class ExitToken:
pass
exit_token = ExitToken()
q = Queue.Queue(1000)
hostname = os.popen("hostname").read().strip()
parser = optparse.OptionParser(usage="Usage: %prog [options] <RabbitMQ url>")
parser.add_option('-m', '--message-offset OFFSET', action="store", dest="offset", help="Offset of first byte of message (if this byte is a space or tab, the line will be added to the previous event)", default=0, type="int")
parser.add_option('-n', '--host NAME', action="store", dest="hostname", help="Override 'host' field (defaults to %s)" % hostname, default=hostname)
parser.add_option('-e', '--exchange NAME', action="store", dest="exchange", help="Destination exchange")
parser.add_option('-k', '--routing-key NAME', action="store", dest="routing_key", help="Rounting key (defaults to empty string)", default="")
parser.add_option('-v', '--virtual-host NAME', action="store", dest="vhost", help="Virtual host (defaults to '/')", default="/")
parser.add_option('-u', '--credentials USERNAME:PASSWORD', action="store", dest="credentials", help="Username:password passed as basic auth to http request")
options, args = parser.parse_args()
if not options.exchange:
parser.error('Exchange not given')
if len(args) == 0:
parser.error('RabbitMQ URL not given')
encoded_vhost = urllib.quote(options.vhost, safe='')
encoded_exchange = urllib.quote(options.exchange, safe='')
rabbitmq_url = args[0]
cred = options.credentials.split(":")
base64string = base64.standard_b64encode('%s:%s' % (cred[0], cred[1])).replace('\n', '')
uri = "%s/api/exchanges/%s/%s/publish" % (rabbitmq_url, encoded_vhost, encoded_exchange)
def to_event(event):
try:
match = re.match(r'(\d\d\d\d-\d\d-\d\d[ T]\d\d:\d\d:\d\d[.,])(\d*)([+-]\d\d\d\d)?.*', event['line'])
if match:
timestamp = match.group(1) + match.group(2)[0:3]
timestamp += match.group(3) if match.group(3) else "+0000"
if timestamp[10] == ' ':
timestamp = timestamp[0:10] + 'T' + timestamp[11:]
else:
timestamp = "%s%s%02d00" % (event['time'][0:-3], "+" if time.altzone <= 0 else "-", time.altzone/60/60*-1)
except ValueError, e:
timestamp = event['time']
data = {
"@timestamp": timestamp,
"host": options.hostname,
"message": event['line'].strip(),
}
return json.dumps(data)
def starts_with_space(line):
return len(line) > options.offset and line[options.offset] in [' ', '\t']
def sending():
running = True
lastEvent = None
event = None
while event != exit_token:
count = 0;
payload = ''
while count < 1 and event != exit_token and (not q.empty() or count == 0):
try:
event = q.get(True, 1)
except Queue.Empty, e:
event = None
if event and event != exit_token and starts_with_space(event['line']):
lastEvent['line'] += "\n" + event['line']
else:
if lastEvent:
payload += json.dumps({
"properties":{},
"routing_key":options.routing_key,
"payload":to_event(lastEvent),
"payload_encoding":"string"}
)
# payload += "{\"index\": {}}\n" + to_event(lastEvent) + "\n"
count += 1
lastEvent = None
if not event:
break
lastEvent = event
if count > 0:
request = urllib2.Request(uri)
request.add_header("Authorization", "Basic %s" % base64string)
# print "----------------------"
# print "sending to: %s" % uri
# print payload[0:-1]
urllib2.urlopen(request, payload)
# print "done"
# urllib2.urlopen(uri, payload)
t = threading.Thread(target=sending)
t.daemon = True
t.start()
try:
while 1:
line = sys.stdin.readline()
if not line:
q.put(exit_token)
break
line = line[0:-1]
if len(line) > 0:
q.put({'time': datetime.datetime.now().isoformat(), 'line': line})
t.join()
except KeyboardInterrupt, e:
pass