-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathstacklesssocket.py
794 lines (673 loc) · 29.8 KB
/
stacklesssocket.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
#
# Stackless compatible socket module.
#
# Author: Richard Tew <richard.m.tew@gmail.com>
#
# Feel free to email me with any questions, comments, or suggestions for
# improvement.
#
# Remaining work:
#
# = Asyncore does not add that much to this module. In fact, its
# limitations and differences between implementations in different Python
# versions just complicate things.
# = Select on Windows only handles 512 sockets at a time. So if there
# are more sockets than that, then they need to be separated and
# batched around this limitation.
# = It should be possible to have this wrap different mechanisms of
# asynchronous IO, from select to IO completion ports.
# = UDP support is mostly there due to the new hands off approach, but
# there are a few spots like handle_write and timeout handling, which need
# to be dealt with.
#
# Python standard library socket unit test state:
#
# - 2.5: Bad.
# - 2.6: Excellent (two UDP failures).
# - 2.7: Excellent (two UDP failures).
#
# This module is otherwise known to generally work for 2.5, 2.6 and 2.7.
#
# Small parts of this code were contributed back with permission from an
# internal version of this module in use at CCP Games.
#
import stackless
import asyncore, weakref, time, select, types
# If you pump the scheduler and wish to prevent the scheduler from staying
# non-empty for prolonged periods of time, If you do not pump the scheduler,
# you may however wish to prevent calls to poll() from running too long.
# Doing so gives all managed sockets a fairer chance at being read from,
# rather than paying prolonged attention to sockets with more incoming data.
#
# These values govern how long a poll() call spends at a given attempt
# of reading the data present on a given socket.
#
VALUE_MAX_NONBLOCKINGREAD_SIZE = 1000000
VALUE_MAX_NONBLOCKINGREAD_CALLS = 100
## Monkey-patching support..
# We need this so that sockets are cleared out when they are no longer in use.
# In fact, it is essential to correct operation of this code.
asyncore.socket_map = weakref.WeakValueDictionary()
import socket as stdsocket # We need the "socket" name for the function we export.
from errno import EALREADY, EINPROGRESS, EWOULDBLOCK, ECONNRESET, \
ENOTCONN, ESHUTDOWN, EINTR, EISCONN, EBADF, ECONNABORTED
# If we are to masquerade as the socket module, we need to provide the constants.
if "__all__" in stdsocket.__dict__:
__all__ = stdsocket.__dict__
for k, v in stdsocket.__dict__.iteritems():
if k in __all__:
globals()[k] = v
elif k == "EBADF":
globals()[k] = v
else:
for k, v in stdsocket.__dict__.iteritems():
if k.upper() == k:
globals()[k] = v
error = stdsocket.error
timeout = stdsocket.timeout
# WARNING: this function blocks and is not thread safe.
# The only solution is to spawn a thread to handle all
# getaddrinfo requests. Implementing a stackless DNS
# lookup service is only second best as getaddrinfo may
# use other methods.
getaddrinfo = stdsocket.getaddrinfo
# urllib2 apparently uses this directly. We need to cater for that.
_fileobject = stdsocket._fileobject
# Someone needs to invoke asyncore.poll() regularly to keep the socket
# data moving. The "ManageSockets" function here is a simple example
# of such a function. It is started by StartManager(), which uses the
# global "managerRunning" to ensure that no more than one copy is
# running.
#
# If you think you can do this better, register an alternative to
# StartManager using stacklesssocket_manager(). Your function will be
# called every time a new socket is created; it's your responsibility
# to ensure it doesn't start multiple copies of itself unnecessarily.
#
# By Nike: Added poll_interval on install to have it configurable from outside,
managerRunning = False
poll_interval = 0.05
def ManageSockets():
global managerRunning
try:
while len(asyncore.socket_map):
# Check the sockets for activity.
#print "POLL"
asyncore.poll(poll_interval)
# Yield to give other tasklets a chance to be scheduled.
_schedule_func()
finally:
managerRunning = False
def StartManager():
global managerRunning
if not managerRunning:
managerRunning = True
return stackless.tasklet(ManageSockets)()
_schedule_func = stackless.schedule
_manage_sockets_func = StartManager
_sleep_func = None
_timeout_func = None
def can_timeout():
return _sleep_func is not None or _timeout_func is not None
def stacklesssocket_manager(mgr):
global _manage_sockets_func
_manage_sockets_func = mgr
def socket(*args, **kwargs):
import sys
if "socket" in sys.modules and sys.modules["socket"] is not stdsocket:
raise RuntimeError("Use 'stacklesssocket.install' instead of replacing the 'socket' module")
_realsocket_old = stdsocket._realsocket
_socketobject_old = stdsocket._socketobject
class _socketobject_new(_socketobject_old):
def __init__(self, family=AF_INET, type=SOCK_STREAM, proto=0, _sock=None):
# We need to do this here.
if _sock is None:
_sock = _realsocket_old(family, type, proto)
_sock = _fakesocket(_sock)
_manage_sockets_func()
_socketobject_old.__init__(self, family, type, proto, _sock)
if not isinstance(self._sock, _fakesocket):
raise RuntimeError("bad socket")
def accept(self):
sock, addr = self._sock.accept()
sock = _fakesocket(sock)
sock.wasConnected = True
return _socketobject_new(_sock=sock), addr
accept.__doc__ = _socketobject_old.accept.__doc__
def install(pi=None):
global poll_interval
if stdsocket._realsocket is socket:
raise StandardError("Still installed")
stdsocket._realsocket = socket
stdsocket.socket = stdsocket.SocketType = stdsocket._socketobject = _socketobject_new
if pi is not None:
poll_interval = pi
def uninstall():
stdsocket._realsocket = _realsocket_old
stdsocket.socket = stdsocket.SocketType = stdsocket._socketobject = _socketobject_old
class _fakesocket(asyncore.dispatcher):
connectChannel = None
acceptChannel = None
wasConnected = False
_timeout = None
_blocking = True
lastReadChannelRef = None
lastReadArguments = None
lastReadTally = 0
lastReadCalls = 0
def __init__(self, realSocket):
# This is worth doing. I was passing in an invalid socket which
# was an instance of _fakesocket and it was causing tasklet death.
if not isinstance(realSocket, _realsocket_old):
raise StandardError("An invalid socket passed to fakesocket %s" % realSocket.__class__)
# This will register the real socket in the internal socket map.
asyncore.dispatcher.__init__(self, realSocket)
self.readQueue = []
self.writeQueue = []
self.sendToBuffers = []
if can_timeout():
self._timeout = stdsocket.getdefaulttimeout()
def receive_with_timeout(self, channel):
if self._timeout is not None:
# Start a timing out process.
# a) Engage a pre-existing external tasklet to send an exception on our channel if it has a receiver, if we are still there when it times out.
# b) Launch a tasklet that does a sleep, and sends an exception if we are still waiting, when it is awoken.
# Block waiting for a send.
if _timeout_func is not None:
# You will want to use this if you are using sockets in a different thread from your sleep functionality.
_timeout_func(self._timeout, channel, (timeout, "timed out"))
elif _sleep_func is not None:
stackless.tasklet(self._manage_receive_with_timeout)(channel)
else:
raise NotImplementedError("should not be here")
try:
ret = channel.receive()
except BaseException, e:
raise e
return ret
else:
return channel.receive()
def _manage_receive_with_timeout(self, channel):
if channel.balance < 0:
_sleep_func(self._timeout)
if channel.balance < 0:
channel.send_exception(timeout, "timed out")
def __del__(self):
# There are no more users (sockets or files) of this fake socket, we
# are safe to close it fully. If we don't, asyncore will choke on
# the weakref failures.
self.close()
# The asyncore version of this function depends on socket being set
# which is not the case when this fake socket has been closed.
def __getattr__(self, attr):
if not hasattr(self, "socket"):
raise AttributeError("socket attribute unset on '"+ attr +"' lookup")
return getattr(self.socket, attr)
## Asyncore potential activity indicators.
def readable(self):
if self.socket.type == SOCK_DGRAM:
return True
if len(self.readQueue):
return True
if self.acceptChannel is not None and self.acceptChannel.balance < 0:
return True
if self.connectChannel is not None and self.connectChannel.balance < 0:
return True
return False
def writable(self):
if self.socket.type != SOCK_DGRAM and not self.connected:
return True
if len(self.writeQueue):
return True
if len(self.sendToBuffers):
return True
return False
## Overriden socket methods.
def accept(self):
self._ensure_non_blocking_read()
if not self.acceptChannel:
self.acceptChannel = stackless.channel()
return self.receive_with_timeout(self.acceptChannel)
def connect(self, address):
"""
If a timeout is set for the connection attempt, and the timeout occurs
then it is the responsibility of the user to close the socket, should
they not wish the connection to potentially establish anyway.
"""
asyncore.dispatcher.connect(self, address)
# UDP sockets do not connect.
if self.socket.type != SOCK_DGRAM and not self.connected:
if not self.connectChannel:
self.connectChannel = stackless.channel()
# Prefer the sender. Do not block when sending, given that
# there is a tasklet known to be waiting, this will happen.
self.connectChannel.preference = 1
self.receive_with_timeout(self.connectChannel)
def _send(self, data, flags):
self._ensure_connected()
channel = stackless.channel()
channel.preference = 1 # Prefer the sender.
self.writeQueue.append((flags, data, channel))
return self.receive_with_timeout(channel)
def send(self, data, flags=0):
return self._send(data, flags)
def sendall(self, data, flags=0):
while len(data):
nbytes = self._send(data, flags)
if nbytes == 0:
raise Exception("completely unexpected situation, no data sent")
data = data[nbytes:]
def sendto(self, sendData, sendArg1=None, sendArg2=None):
# sendto(data, address)
# sendto(data [, flags], address)
if sendArg2 is not None:
flags = sendArg1
sendAddress = sendArg2
else:
flags = 0
sendAddress = sendArg1
waitChannel = None
for idx, (data, address, channel, sentBytes) in enumerate(self.sendToBuffers):
if address == sendAddress:
self.sendToBuffers[idx] = (data + sendData, address, channel, sentBytes)
waitChannel = channel
break
if waitChannel is None:
waitChannel = stackless.channel()
self.sendToBuffers.append((sendData, sendAddress, waitChannel, 0))
return self.receive_with_timeout(waitChannel)
def _recv(self, methodName, args, sizeIdx=0):
self._ensure_non_blocking_read()
if self._fileno is None:
return ""
if len(args) >= sizeIdx+1:
generalArgs = list(args)
generalArgs[sizeIdx] = 0
generalArgs = tuple(generalArgs)
else:
generalArgs = args
channelKey = methodName, generalArgs
#print self._fileno, "_recv:---ENTER---", channelKey
while True:
channel = None
if self.lastReadChannelRef is not None and self.lastReadTally < VALUE_MAX_NONBLOCKINGREAD_SIZE and self.lastReadCalls < VALUE_MAX_NONBLOCKINGREAD_CALLS:
if channelKey == self.lastReadArguments:
channel = self.lastReadChannelRef()
self.lastReadChannelRef = None
#elif self.lastReadTally >= VALUE_MAX_NONBLOCKINGREAD_SIZE or self.lastReadCalls >= VALUE_MAX_NONBLOCKINGREAD_CALLS:
#print "_recv:FORCE-CHANNEL-CHANGE %d %d" % (self.lastReadTally, self.lastReadCalls)
if channel is None:
channel = stackless.channel()
channel.preference = -1 # Prefer the receiver.
self.lastReadTally = self.lastReadCalls = 0
#print self._fileno, "_recv:NEW-CHANNEL", id(channel)
self.readQueue.append([ channel, methodName, args ])
else:
self.readQueue[0][2] = args
#print self._fileno, "_recv:RECYCLE-CHANNEL", id(channel), self.lastReadTally
try:
ret = self.receive_with_timeout(channel)
except stdsocket.error, e:
if isinstance(e, stdsocket.error) and e.args[0] == EWOULDBLOCK:
#print self._fileno, "_recv:BLOCK-RETRY", id(channel), "-" * 30
continue
else:
raise
break
self.lastReadChannelRef = weakref.ref(channel)
self.lastReadArguments = channelKey
if isinstance(ret, types.StringTypes):
self.lastReadTally += len(ret)
elif methodName == "recvfrom":
self.lastReadTally += len(ret[0])
elif methodName == "recvfrom_into":
self.lastReadTally += ret[0]
else:
self.lastReadTally += ret
self.lastReadCalls += 1
#print self._fileno, "_recv:---EXIT---", channelKey, len(ret), self.lastReadChannelRef()
return ret
def recv(self, *args):
if not self.connected:
# Sockets which have never been connected do this.
if not self.wasConnected:
raise error(10057, 'Socket is not connected')
return self._recv("recv", args)
def recv_into(self, *args):
if not self.connected:
# Sockets which have never been connected do this.
if not self.wasConnected:
raise error(10057, 'Socket is not connected')
return self._recv("recv_into", args, sizeIdx=1)
def recvfrom(self, *args):
return self._recv("recvfrom", args)
def recvfrom_into(self, *args):
return self._recv("recvfrom_into", args, sizeIdx=1)
def close(self):
if self._fileno is None:
return
asyncore.dispatcher.close(self)
self.connected = False
self.accepting = False
self.writeQueue = []
# Clear out all the channels with relevant errors.
while self.acceptChannel and self.acceptChannel.balance < 0:
self.acceptChannel.send_exception(error, 9, 'Bad file descriptor')
while self.connectChannel and self.connectChannel.balance < 0:
self.connectChannel.send_exception(error, 10061, 'Connection refused')
self._clear_read_queue()
def _clear_read_queue(self, *args):
for t in self.readQueue:
if t[0].balance < 0:
if len(args):
t[0].send_exception(*args)
else:
t[0].send("")
self.readQueue = []
# asyncore doesn't support this. Why not?
def fileno(self):
return self.socket.fileno()
def _is_non_blocking(self):
return not self._blocking or self._timeout == 0.0
def _ensure_non_blocking_read(self):
if self._is_non_blocking():
# Ensure there is something on the socket, before fetching it. Otherwise, error complaining.
r, w, e = select.select([ self ], [], [], 0.0)
if not r:
raise stdsocket.error(EWOULDBLOCK, "The socket operation could not complete without blocking")
def _ensure_connected(self):
if not self.connected:
# The socket was never connected.
if not self.wasConnected:
raise error(10057, "Socket is not connected")
# The socket has been closed already.
raise error(EBADF, 'Bad file descriptor')
def setblocking(self, flag):
self._blocking = flag
def gettimeout(self):
return self._timeout
def settimeout(self, value):
if value and not can_timeout():
raise RuntimeError("This is a stackless socket - to have timeout support you need to provide a sleep function")
self._timeout = value
def handle_accept(self):
if self.acceptChannel and self.acceptChannel.balance < 0:
t = asyncore.dispatcher.accept(self)
if t is None:
return
t[0].setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
stackless.tasklet(self.acceptChannel.send)(t)
# Inform the blocked connect call that the connection has been made.
def handle_connect(self):
if self.socket.type != SOCK_DGRAM:
if self.connectChannel and self.connectChannel.balance < 0:
self.wasConnected = True
self.connectChannel.send(None)
# Asyncore says its done but self.readBuffer may be non-empty
# so can't close yet. Do nothing and let 'recv' trigger the close.
def handle_close(self):
# This also gets called in the case that a non-blocking connect gets
# back to us with a no. If we don't reject the connect, then all
# connect calls that do not connect will block indefinitely.
if self.connectChannel is not None:
self.close()
# Some error, just close the channel and let that raise errors to
# blocked calls.
def handle_expt(self):
if False:
import traceback
print "handle_expt: START"
traceback.print_exc()
print "handle_expt: END"
self.close()
def handle_error(self):
self.close()
def handle_read(self):
"""
This will be called once per-poll call per socket with data in its buffer to be read.
If you call poll once every 30th of a second, then you are going to be rate limited
in terms of how fast you can read incoming data by the packet size they arrive in.
In order to deal with the worst case scenario, advantage is taken of how scheduling
works in order to keep reading until there is no more data left to read.
1. This function is called indicating data is present to read.
2. The desired amount is read and a send call is made on the channel with it.
3. The function is blocked on that action and the tasklet it is running in is reinserted into the scheduler.
4. The tasklet that made the read related socket call is awakened with the given data.
5. It returns the data to the function that made that call.
6. The function that made the call makes another read related socket call.
a) If the call is similar enough to the last call, then the previous channel is retrieved.
b) Otherwise, a new channel is created.
7. The tasklet that is making the read related socket call is blocked on the channel.
8. This tasklet that was blocked sending gets scheduled again.
a) If there is a tasklet blocked on the channel that it was using, then goto 2.
b) Otherwise, the function exits.
Note that if this function loops indefinitely, and the scheduler is pumped rather than
continuously run, the pumping application will stay in its pump call for a prolonged
period of time potentially starving the rest of the application for CPU time.
An attempt is made in _recv to limit the amount of data read in this manner to a fixed
amount and it lets this function exit if that amount is exceeded. However, this it is
up to the user of Stackless to understand how their application schedules and blocks,
and there are situations where small reads may still effectively loop indefinitely.
"""
if not len(self.readQueue):
return
channel, methodName, args = self.readQueue[0]
#print self._fileno, "handle_read:---ENTER---", id(channel)
while channel.balance < 0:
args = self.readQueue[0][2]
#print self._fileno, "handle_read:CALL", id(channel), args
try:
result = getattr(self.socket, methodName)(*args)
#print self._fileno, "handle_read:RESULT", id(channel), len(result)
except Exception, e:
# winsock sometimes throws ENOTCONN
#print self._fileno, "handle_read:EXCEPTION", id(channel), len(result)
if isinstance(e, stdsocket.error) and e.args[0] in [ECONNRESET, ENOTCONN, ESHUTDOWN, ECONNABORTED]:
self.handle_close()
result = ''
elif channel.balance < 0:
channel.send_exception(e.__class__, *e.args)
if channel.balance < 0:
#print self._fileno, "handle_read:RETURN-RESULT", id(channel), len(result)
channel.send(result)
if len(self.readQueue) and self.readQueue[0][0] is channel:
del self.readQueue[0]
#print self._fileno, "handle_read:---EXIT---", id(channel)
def handle_write(self):
"""
This function still needs work WRT UDP.
"""
if len(self.writeQueue):
flags, data, channel = self.writeQueue[0]
del self.writeQueue[0]
# asyncore does not expose sending the flags.
def asyncore_send(self, data, flags=0):
try:
result = self.socket.send(data, flags)
return result
except socket.error, why:
if why.args[0] == EWOULDBLOCK:
return 0
elif why.args[0] in (ECONNRESET, ENOTCONN, ESHUTDOWN, ECONNABORTED):
self.handle_close()
return 0
else:
raise
nbytes = asyncore_send(self, data, flags)
if channel.balance < 0:
channel.send(nbytes)
elif len(self.sendToBuffers):
data, address, channel, oldSentBytes = self.sendToBuffers[0]
sentBytes = self.socket.sendto(data, address)
totalSentBytes = oldSentBytes + sentBytes
if len(data) > sentBytes:
self.sendToBuffers[0] = data[sentBytes:], address, channel, totalSentBytes
else:
del self.sendToBuffers[0]
stackless.tasklet(channel.send)(totalSentBytes)
if False:
def dump_socket_stack_traces():
import traceback
for skt in asyncore.socket_map.values():
for k, v in skt.__dict__.items():
if isinstance(v, stackless.channel) and v.queue:
i = 0
current = v.queue
while i == 0 or v.queue is not current:
print "%s.%s.%s" % (skt, k, i)
traceback.print_stack(t)
i += 1
if __name__ == '__main__':
import sys
import struct
# Test code goes here.
testAddress = "127.0.0.1", 3000
info = -12345678
data = struct.pack("i", info)
dataLength = len(data)
def TestTCPServer(address):
global info, data, dataLength
print "server listen socket creation"
listenSocket = stdsocket.socket(AF_INET, SOCK_STREAM)
listenSocket.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
listenSocket.bind(address)
listenSocket.listen(5)
NUM_TESTS = 2
i = 1
while i < NUM_TESTS + 1:
# No need to schedule this tasklet as the accept should yield most
# of the time on the underlying channel.
print "server connection wait", i
currentSocket, clientAddress = listenSocket.accept()
print "server", i, "listen socket", currentSocket.fileno(), "from", clientAddress
if i == 1:
print "server closing (a)", i, "fd", currentSocket.fileno(), "id", id(currentSocket)
currentSocket.close()
print "server closed (a)", i
elif i == 2:
print "server test", i, "send"
currentSocket.send(data)
print "server test", i, "recv"
if currentSocket.recv(4) != "":
print "server recv(1)", i, "FAIL"
break
# multiple empty recvs are fine
if currentSocket.recv(4) != "":
print "server recv(2)", i, "FAIL"
break
else:
print "server closing (b)", i, "fd", currentSocket.fileno(), "id", id(currentSocket)
currentSocket.close()
print "server test", i, "OK"
i += 1
if i != NUM_TESTS+1:
print "server: FAIL", i
else:
print "server: OK", i
print "Done server"
def TestTCPClient(address):
global info, data, dataLength
# Attempt 1:
clientSocket = stdsocket.socket()
clientSocket.connect(address)
print "client connection (1) fd", clientSocket.fileno(), "id", id(clientSocket._sock), "waiting to recv"
if clientSocket.recv(5) != "":
print "client test", 1, "FAIL"
else:
print "client test", 1, "OK"
# Attempt 2:
clientSocket = stdsocket.socket()
clientSocket.connect(address)
print "client connection (2) fd", clientSocket.fileno(), "id", id(clientSocket._sock), "waiting to recv"
s = clientSocket.recv(dataLength)
if s == "":
print "client test", 2, "FAIL (disconnect)"
else:
t = struct.unpack("i", s)
if t[0] == info:
print "client test", 2, "OK"
else:
print "client test", 2, "FAIL (wrong data)"
print "client exit"
def TestMonkeyPatchUrllib(uri):
# replace the system socket with this module
install()
try:
import urllib # must occur after monkey-patching!
f = urllib.urlopen(uri)
if not isinstance(f.fp._sock, _fakesocket):
raise AssertionError("failed to apply monkeypatch, got %s" % f.fp._sock.__class__)
s = f.read()
if len(s) != 0:
print "Fetched", len(s), "bytes via replaced urllib"
else:
raise AssertionError("no text received?")
finally:
uninstall()
def TestMonkeyPatchUDP(address):
# replace the system socket with this module
install()
try:
def UDPServer(address):
listenSocket = stdsocket.socket(AF_INET, SOCK_DGRAM)
listenSocket.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
listenSocket.bind(address)
# Apparently each call to recvfrom maps to an incoming
# packet and if we only ask for part of that packet, the
# rest is lost. We really need a proper unittest suite
# which tests this module against the normal socket
# module.
print "waiting to receive"
rdata = ""
while len(rdata) < 512:
data, address = listenSocket.recvfrom(4096)
print "received", data, len(data)
rdata += data
def UDPClient(address):
clientSocket = stdsocket.socket(AF_INET, SOCK_DGRAM)
# clientSocket.connect(address)
print "sending 512 byte packet"
sentBytes = clientSocket.sendto("-"+ ("*" * 510) +"-", address)
print "sent 512 byte packet", sentBytes
stackless.tasklet(UDPServer)(address)
stackless.tasklet(UDPClient)(address)
stackless.run()
finally:
uninstall()
if len(sys.argv) == 2:
if sys.argv[1] == "client":
print "client started"
TestTCPClient(testAddress)
print "client exited"
elif sys.argv[1] == "slpclient":
print "client started"
stackless.tasklet(TestTCPClient)(testAddress)
stackless.run()
print "client exited"
elif sys.argv[1] == "server":
print "server started"
TestTCPServer(testAddress)
print "server exited"
elif sys.argv[1] == "slpserver":
print "server started"
stackless.tasklet(TestTCPServer)(testAddress)
stackless.run()
print "server exited"
else:
print "Usage:", sys.argv[0], "[client|server|slpclient|slpserver]"
sys.exit(1)
else:
print "* Running client/server test"
install()
try:
stackless.tasklet(TestTCPServer)(testAddress)
stackless.tasklet(TestTCPClient)(testAddress)
stackless.run()
finally:
uninstall()
print "* Running urllib test"
stackless.tasklet(TestMonkeyPatchUrllib)("http://python.org/")
stackless.run()
print "* Running udp test"
TestMonkeyPatchUDP(testAddress)
print "result: SUCCESS"