-
Notifications
You must be signed in to change notification settings - Fork 143
/
udp_emitter.py
72 lines (56 loc) · 2.28 KB
/
udp_emitter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
import logging
import socket
from aws_xray_sdk.core.daemon_config import DaemonConfig
from ..exceptions.exceptions import InvalidDaemonAddressException
log = logging.getLogger(__name__)
PROTOCOL_HEADER = "{\"format\":\"json\",\"version\":1}"
PROTOCOL_DELIMITER = '\n'
DEFAULT_DAEMON_ADDRESS = '127.0.0.1:2000'
class UDPEmitter(object):
"""
The default emitter the X-Ray recorder uses to send segments/subsegments
to the X-Ray daemon over UDP using a non-blocking socket. If there is an
exception on the actual data transfer between the socket and the daemon,
it logs the exception and continue.
"""
def __init__(self, daemon_address=DEFAULT_DAEMON_ADDRESS):
self._socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self._socket.setblocking(0)
self.set_daemon_address(daemon_address)
def send_entity(self, entity):
"""
Serializes a segment/subsegment and sends it to the X-Ray daemon
over UDP. By default it doesn't retry on failures.
:param entity: a trace entity to send to the X-Ray daemon
"""
message = "%s%s%s" % (PROTOCOL_HEADER,
PROTOCOL_DELIMITER,
entity.serialize())
log.debug("sending: %s to %s:%s." % (message, self._ip, self._port))
self._send_data(message)
def set_daemon_address(self, address):
"""
Set up UDP ip and port from the raw daemon address
string using ``DaemonConfig`` class utlities.
"""
if address:
daemon_config = DaemonConfig(address)
self._ip, self._port = daemon_config.udp_ip, daemon_config.udp_port
@property
def ip(self):
return self._ip
@property
def port(self):
return self._port
def _send_data(self, data):
try:
self._socket.sendto(data.encode('utf-8'), (self._ip,
self._port))
except Exception:
log.exception('failed to send data to X-Ray daemon.')
def _parse_address(self, daemon_address):
try:
val = daemon_address.split(':')
return val[0], int(val[1])
except Exception:
raise InvalidDaemonAddressException('Invalid daemon address %s specified.' % daemon_address)