-
Notifications
You must be signed in to change notification settings - Fork 24
/
test.py
257 lines (199 loc) · 6.83 KB
/
test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
# coding: utf-8
from __future__ import print_function
import io
import logging
import os
import platform
import sys
import time
from fcntl import fcntl
from tempfile import TemporaryFile
from unittest import mock
import pytest
import wurlitzer
from wurlitzer import (
PIPE,
STDOUT,
Wurlitzer,
c_stderr_p,
c_stdout_p,
libc,
pipes,
stop_sys_pipes,
sys_pipes,
sys_pipes_forever,
)
def printf(msg):
"""Call C printf"""
libc.printf((msg + '\n').encode('utf8'))
def printf_err(msg):
"""Cal C fprintf on stderr"""
libc.fprintf(c_stderr_p, (msg + '\n').encode('utf8'))
def test_pipes():
with pipes(stdout=PIPE, stderr=PIPE) as (stdout, stderr):
printf(u"Hellø")
printf_err(u"Hi, stdérr")
assert stdout.read() == u"Hellø\n"
assert stderr.read() == u"Hi, stdérr\n"
def test_pipe_bytes():
with pipes(encoding=None) as (stdout, stderr):
printf(u"Hellø")
printf_err(u"Hi, stdérr")
assert stdout.read() == u"Hellø\n".encode('utf8')
assert stderr.read() == u"Hi, stdérr\n".encode('utf8')
def test_forward():
stdout = io.StringIO()
stderr = io.StringIO()
with pipes(stdout=stdout, stderr=stderr) as (_stdout, _stderr):
printf(u"Hellø")
printf_err(u"Hi, stdérr")
assert _stdout is stdout
assert _stderr is stderr
assert stdout.getvalue() == u"Hellø\n"
assert stderr.getvalue() == u"Hi, stdérr\n"
def test_pipes_stderr():
stdout = io.StringIO()
with pipes(stdout=stdout, stderr=STDOUT) as (_stdout, _stderr):
printf(u"Hellø")
libc.fflush(c_stdout_p)
time.sleep(0.1)
printf_err(u"Hi, stdérr")
assert _stdout is stdout
assert _stderr is None
assert stdout.getvalue() == u"Hellø\nHi, stdérr\n"
def test_flush():
stdout = io.StringIO()
w = Wurlitzer(stdout=stdout, stderr=STDOUT)
with w:
printf_err(u"Hellø")
time.sleep(0.5)
assert stdout.getvalue().strip() == u"Hellø"
def test_sys_pipes():
stdout = io.StringIO()
stderr = io.StringIO()
with mock.patch('sys.stdout', stdout), mock.patch(
'sys.stderr', stderr
), sys_pipes():
printf(u"Hellø")
printf_err(u"Hi, stdérr")
assert stdout.getvalue() == u"Hellø\n"
assert stderr.getvalue() == u"Hi, stdérr\n"
def test_sys_pipes_check():
# pytest redirects stdout; un-redirect it for the test
with mock.patch('sys.stdout', sys.__stdout__), mock.patch(
'sys.stderr', sys.__stderr__
):
with pytest.raises(ValueError):
with sys_pipes():
pass
def test_redirect_everything():
stdout = io.StringIO()
stderr = io.StringIO()
with mock.patch('sys.stdout', stdout), mock.patch('sys.stderr', stderr):
sys_pipes_forever()
printf(u"Hellø")
printf_err(u"Hi, stdérr")
stop_sys_pipes()
assert stdout.getvalue() == u"Hellø\n"
assert stderr.getvalue() == u"Hi, stdérr\n"
def count_fds():
"""utility for counting file descriptors"""
proc_fds = '/proc/{}/fd'.format(os.getpid())
if os.path.isdir(proc_fds):
return len(proc_fds)
else:
# this is an approximate count,
# but it should at least be stable if we aren't leaking
with TemporaryFile() as tf:
return tf.fileno()
def test_fd_leak():
base_count = count_fds()
with pipes():
print('ok')
assert count_fds() == base_count
for i in range(10):
with pipes():
print('ok')
assert count_fds() == base_count
def test_buffer_full():
with pipes(stdout=None, stderr=io.StringIO()) as (stdout, stderr):
long_string = "x" * 100000 # create a long string (longer than 65536)
printf_err(long_string)
# Test never reaches here as the process hangs.
assert stderr.getvalue() == long_string + "\n"
def test_buffer_full_default():
with pipes() as (stdout, stderr):
long_string = "x" * 100000 # create a long string (longer than 65536)
printf(long_string)
# Test never reaches here as the process hangs.
assert stdout.read() == long_string + "\n"
def test_pipe_max_size():
max_pipe_size = wurlitzer._get_max_pipe_size()
if platform.system() == 'Linux':
assert 65535 <= max_pipe_size <= 1024 * 1024
else:
assert max_pipe_size is None
@pytest.mark.skipif(
wurlitzer._get_max_pipe_size() is None, reason="requires _get_max_pipe_size"
)
def test_bufsize():
default_bufsize = wurlitzer._get_max_pipe_size()
with wurlitzer.pipes() as (stdout, stderr):
assert fcntl(sys.__stdout__, wurlitzer.F_GETPIPE_SZ) == default_bufsize
assert fcntl(sys.__stderr__, wurlitzer.F_GETPIPE_SZ) == default_bufsize
bufsize = 2**18 # seems to only accept powers of two?
with wurlitzer.pipes(bufsize=bufsize) as (stdout, stderr):
assert fcntl(sys.__stdout__, wurlitzer.F_GETPIPE_SZ) == bufsize
assert fcntl(sys.__stderr__, wurlitzer.F_GETPIPE_SZ) == bufsize
def test_log_pipes(caplog):
with caplog.at_level(logging.INFO), wurlitzer.pipes(
logging.getLogger("wurlitzer.stdout"), logging.getLogger("wurlitzer.stderr")
):
printf("some stdout")
printf_err("some stderr")
stdout_logs = []
stderr_logs = []
for t in caplog.record_tuples:
if "stdout" in t[0]:
stdout_logs.append(t)
else:
stderr_logs.append(t)
assert stdout_logs == [
("wurlitzer.stdout", logging.INFO, "some stdout"),
]
assert stderr_logs == [
("wurlitzer.stderr", logging.ERROR, "some stderr"),
]
for record in caplog.records:
# check 'stream' extra
assert record.stream
assert record.name == "wurlitzer." + record.stream
def test_two_file_pipes(tmpdir):
test_stdout = tmpdir / "stdout.txt"
test_stderr = tmpdir / "stderr.txt"
with test_stdout.open("ab") as stdout_f, test_stderr.open("ab") as stderr_f:
w = Wurlitzer(stdout_f, stderr_f)
with w:
assert w.thread is None
printf("some stdout")
printf_err("some stderr")
# make sure capture stopped
printf("after stdout")
printf_err("after stderr")
with test_stdout.open() as f:
assert f.read() == "some stdout\n"
with test_stderr.open() as f:
assert f.read() == "some stderr\n"
def test_one_file_pipe(tmpdir):
test_stdout = tmpdir / "stdout.txt"
with test_stdout.open("ab") as stdout_f:
stderr = io.StringIO()
w = Wurlitzer(stdout_f, stderr)
with w as (stdout, stderr):
assert w.thread is not None
printf("some stdout")
printf_err("some stderr")
assert not w.thread.is_alive()
with test_stdout.open() as f:
assert f.read() == "some stdout\n"
assert stderr.getvalue() == "some stderr\n"