Skip to content

Commit

Permalink
Merge pull request #56 from peterazmanov/master
Browse files Browse the repository at this point in the history
bug fix: lock to prevent event race conditions (issue #55)
  • Loading branch information
harvimt committed Nov 18, 2015
2 parents 9c3d22b + e4e21ed commit 480ab82
Showing 1 changed file with 43 additions and 21 deletions.
64 changes: 43 additions & 21 deletions quamash/_windows.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ class _IocpProactor(windows_events.IocpProactor):
def __init__(self):
self.__events = []
super(_IocpProactor, self).__init__()
self._lock = QtCore.QMutex()

def select(self, timeout=None):
"""Override in order to handle events in a threadsafe manner."""
Expand All @@ -70,6 +71,14 @@ def close(self):
self._logger.debug('Closing')
super(_IocpProactor, self).close()

def recv(self, conn, nbytes, flags=0):
with QtCore.QMutexLocker(self._lock):
return super(_IocpProactor, self).recv(conn, nbytes, flags)

def send(self, conn, buf, flags=0):
with QtCore.QMutexLocker(self._lock):
return super(_IocpProactor, self).send(conn, buf, flags)

def _poll(self, timeout=None):
"""Override in order to handle events in a threadsafe manner."""
if timeout is None:
Expand All @@ -83,31 +92,44 @@ def _poll(self, timeout=None):
if ms >= UINT32_MAX:
raise ValueError("timeout too big")

while True:
# self._logger.debug('Polling IOCP with timeout {} ms in thread {}...'.format(
# ms, threading.get_ident()))
status = _overlapped.GetQueuedCompletionStatus(self._iocp, ms)
if status is None:
break
with QtCore.QMutexLocker(self._lock):
while True:
# self._logger.debug('Polling IOCP with timeout {} ms in thread {}...'.format(
# ms, threading.get_ident()))
status = _overlapped.GetQueuedCompletionStatus(self._iocp, ms)
if status is None:
break

err, transferred, key, address = status
try:
f, ov, obj, callback = self._cache.pop(address)
except KeyError:
# key is either zero, or it is used to return a pipe
# handle which should be closed to avoid a leak.
if key not in (0, _overlapped.INVALID_HANDLE_VALUE):
_winapi.CloseHandle(key)
ms = 0
continue

if obj in self._stopped_serving:
f.cancel()
# Futures might already be resolved or cancelled
elif not f.done():
self.__events.append((f, callback, transferred, key, ov))

err, transferred, key, address = status
try:
f, ov, obj, callback = self._cache.pop(address)
except KeyError:
# key is either zero, or it is used to return a pipe
# handle which should be closed to avoid a leak.
if key not in (0, _overlapped.INVALID_HANDLE_VALUE):
_winapi.CloseHandle(key)
ms = 0
continue

if obj in self._stopped_serving:
f.cancel()
# Futures might already be resolved or cancelled
elif not f.done():
self.__events.append((f, callback, transferred, key, ov))
def _wait_for_handle(self, handle, timeout, _is_cancel):
with QtCore.QMutexLocker(self._lock):
return super(_IocpProactor, self)._wait_for_handle(handle, timeout, _is_cancel)

def accept(self, listener):
with QtCore.QMutexLocker(self._lock):
return super(_IocpProactor, self).accept(listener)

ms = 0
def connect(self, conn, address):
with QtCore.QMutexLocker(self._lock):
return super(_IocpProactor, self).connect(conn, address)


@with_logger
Expand Down

0 comments on commit 480ab82

Please sign in to comment.