-
Notifications
You must be signed in to change notification settings - Fork 143
/
patch.py
217 lines (170 loc) · 7.33 KB
/
patch.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
import fnmatch
from collections import namedtuple
import urllib3.connection
import wrapt
from aws_xray_sdk.core import xray_recorder
from aws_xray_sdk.core.exceptions.exceptions import SegmentNotFoundException
from aws_xray_sdk.core.models import http
from aws_xray_sdk.core.patcher import _PATCHED_MODULES
from aws_xray_sdk.ext.util import get_hostname, inject_trace_header, strip_url, unwrap
httplib_client_module = 'http.client'
import http.client as httplib
_XRAY_PROP = '_xray_prop'
_XRay_Data = namedtuple('xray_data', ['method', 'host', 'url'])
_XRay_Ignore = namedtuple('xray_ignore', ['subclass', 'hostname', 'urls'])
# A flag indicates whether this module is X-Ray patched or not
PATCH_FLAG = '__xray_patched'
# Calls that should be ignored
_XRAY_IGNORE = set()
def add_ignored(subclass=None, hostname=None, urls=None):
global _XRAY_IGNORE
if subclass is not None or hostname is not None or urls is not None:
urls = urls if urls is None else tuple(urls)
_XRAY_IGNORE.add(_XRay_Ignore(subclass=subclass, hostname=hostname, urls=urls))
def reset_ignored():
global _XRAY_IGNORE
_XRAY_IGNORE.clear()
_ignored_add_default()
def _ignored_add_default():
# skip httplib tracing for SDK built-in centralized sampling pollers
add_ignored(subclass='botocore.awsrequest.AWSHTTPConnection', urls=['/GetSamplingRules', '/SamplingTargets'])
# make sure we have the default rules
_ignored_add_default()
def http_response_processor(wrapped, instance, args, kwargs, return_value,
exception, subsegment, stack):
xray_data = getattr(instance, _XRAY_PROP, None)
if not xray_data:
return
subsegment.put_http_meta(http.METHOD, xray_data.method)
subsegment.put_http_meta(http.URL, strip_url(xray_data.url))
if return_value:
subsegment.put_http_meta(http.STATUS, return_value.status)
# propagate to response object
xray_data = _XRay_Data('READ', xray_data.host, xray_data.url)
setattr(return_value, _XRAY_PROP, xray_data)
if exception:
subsegment.add_exception(exception, stack)
def _xray_traced_http_getresponse(wrapped, instance, args, kwargs):
xray_data = getattr(instance, _XRAY_PROP, None)
if not xray_data:
return wrapped(*args, **kwargs)
return xray_recorder.record_subsegment(
wrapped, instance, args, kwargs,
name=get_hostname(xray_data.url),
namespace='remote',
meta_processor=http_response_processor,
)
def http_send_request_processor(wrapped, instance, args, kwargs, return_value,
exception, subsegment, stack):
xray_data = getattr(instance, _XRAY_PROP, None)
if not xray_data:
return
# we don't delete the attr as we can have multiple reads
subsegment.put_http_meta(http.METHOD, xray_data.method)
subsegment.put_http_meta(http.URL, strip_url(xray_data.url))
if exception:
subsegment.add_exception(exception, stack)
def _ignore_request(instance, hostname, url):
global _XRAY_IGNORE
module = instance.__class__.__module__
if module is None or module == str.__class__.__module__:
subclass = instance.__class__.__name__
else:
subclass = module + '.' + instance.__class__.__name__
for rule in _XRAY_IGNORE:
subclass_match = subclass == rule.subclass if rule.subclass is not None else True
host_match = fnmatch.fnmatch(hostname, rule.hostname) if rule.hostname is not None else True
url_match = url in rule.urls if rule.urls is not None else True
if url_match and host_match and subclass_match:
return True
return False
def _send_request(wrapped, instance, args, kwargs):
def decompose_args(method, url, body, headers, encode_chunked=False):
# skip any ignored requests
if _ignore_request(instance, instance.host, url):
return wrapped(*args, **kwargs)
# Only injects headers when the subsegment for the outgoing
# calls are opened successfully.
subsegment = None
try:
subsegment = xray_recorder.current_subsegment()
except SegmentNotFoundException:
pass
if subsegment:
inject_trace_header(headers, subsegment)
if issubclass(instance.__class__, urllib3.connection.HTTPSConnection):
ssl_cxt = getattr(instance, 'ssl_context', None)
elif issubclass(instance.__class__, httplib.HTTPSConnection):
ssl_cxt = getattr(instance, '_context', None)
else:
# In this case, the patcher can't determine which module the connection instance is from.
# We default to it to check ssl_context but may be None so that the default scheme would be
# (and may falsely be) http.
ssl_cxt = getattr(instance, 'ssl_context', None)
scheme = 'https' if ssl_cxt and type(ssl_cxt).__name__ == 'SSLContext' else 'http'
xray_url = '{}://{}{}'.format(scheme, instance.host, url)
xray_data = _XRay_Data(method, instance.host, xray_url)
setattr(instance, _XRAY_PROP, xray_data)
# we add a segment here in case connect fails
return xray_recorder.record_subsegment(
wrapped, instance, args, kwargs,
name=get_hostname(xray_data.url),
namespace='remote',
meta_processor=http_send_request_processor
)
return decompose_args(*args, **kwargs)
def http_read_processor(wrapped, instance, args, kwargs, return_value,
exception, subsegment, stack):
xray_data = getattr(instance, _XRAY_PROP, None)
if not xray_data:
return
# we don't delete the attr as we can have multiple reads
subsegment.put_http_meta(http.METHOD, xray_data.method)
subsegment.put_http_meta(http.URL, strip_url(xray_data.url))
subsegment.put_http_meta(http.STATUS, instance.status)
if exception:
subsegment.add_exception(exception, stack)
def _xray_traced_http_client_read(wrapped, instance, args, kwargs):
xray_data = getattr(instance, _XRAY_PROP, None)
if not xray_data:
return wrapped(*args, **kwargs)
return xray_recorder.record_subsegment(
wrapped, instance, args, kwargs,
name=get_hostname(xray_data.url),
namespace='remote',
meta_processor=http_read_processor
)
def patch():
"""
patch the built-in `urllib/httplib/httplib.client` methods for tracing.
"""
if getattr(httplib, PATCH_FLAG, False):
return
# we set an attribute to avoid multiple wrapping
setattr(httplib, PATCH_FLAG, True)
wrapt.wrap_function_wrapper(
httplib_client_module,
'HTTPConnection._send_request',
_send_request
)
wrapt.wrap_function_wrapper(
httplib_client_module,
'HTTPConnection.getresponse',
_xray_traced_http_getresponse
)
wrapt.wrap_function_wrapper(
httplib_client_module,
'HTTPResponse.read',
_xray_traced_http_client_read
)
def unpatch():
"""
Unpatch any previously patched modules.
This operation is idempotent.
"""
_PATCHED_MODULES.discard('httplib')
setattr(httplib, PATCH_FLAG, False)
# _send_request encapsulates putrequest, putheader[s], and endheaders
unwrap(httplib.HTTPConnection, '_send_request')
unwrap(httplib.HTTPConnection, 'getresponse')
unwrap(httplib.HTTPResponse, 'read')