From 61d2427495c6d5380c676cca3a421c0fc33c8d2a Mon Sep 17 00:00:00 2001 From: Lee Tibbert Date: Fri, 16 Sep 2022 18:20:17 -0400 Subject: [PATCH 1/6] Fix #64: Remove a KqExec poll hang --- .../unsafe/KqueueExecutorScheduler.scala | 79 ++++++++++--------- 1 file changed, 40 insertions(+), 39 deletions(-) diff --git a/core/src/main/scala/epollcat/unsafe/KqueueExecutorScheduler.scala b/core/src/main/scala/epollcat/unsafe/KqueueExecutorScheduler.scala index e9623c6..c992fc7 100644 --- a/core/src/main/scala/epollcat/unsafe/KqueueExecutorScheduler.scala +++ b/core/src/main/scala/epollcat/unsafe/KqueueExecutorScheduler.scala @@ -65,53 +65,54 @@ private[unsafe] final class KqueueExecutorScheduler( } } - val timeoutSpec = - if (timeoutIsInfinite || timeoutIsZero) null - else { - val ts = stackalloc[time.timespec]() - val sec = timeout.toSeconds - ts.tv_sec = sec - ts.tv_nsec = (timeout - sec.seconds).toNanos - ts - } + if (!noCallbacks) { + val timeoutSpec = + if (timeoutIsInfinite || timeoutIsZero) null + else { + val ts = stackalloc[time.timespec]() + val sec = timeout.toSeconds + ts.tv_sec = sec + ts.tv_nsec = (timeout - sec.seconds).toNanos + ts + } - val eventlist = stackalloc[kevent64_s](maxEvents.toLong) - val flags = (if (timeoutIsZero) KEVENT_FLAG_IMMEDIATE else KEVENT_FLAG_NONE).toUInt - val triggeredEvents = - kevent64(kqfd, changelist, finalChangeCount, eventlist, maxEvents, flags, timeoutSpec) - - if (triggeredEvents >= 0) { - var i = 0 - var event = eventlist - while (i < triggeredEvents) { - if ((event.flags.toLong & EV_ERROR) != 0) { - - // TODO it would be interesting to propagate this failure via the callback - reportFailure( - new RuntimeException( - s"kevent64: flags=${event.flags.toHexString} errno=${event.data}" + val eventlist = stackalloc[kevent64_s](maxEvents.toLong) + val flags = (if (timeoutIsZero) KEVENT_FLAG_IMMEDIATE else KEVENT_FLAG_NONE).toUInt + val triggeredEvents = + kevent64(kqfd, changelist, finalChangeCount, eventlist, maxEvents, flags, timeoutSpec) + + if (triggeredEvents >= 0) { + var i = 0 + var event = eventlist + while (i < triggeredEvents) { + if ((event.flags.toLong & EV_ERROR) != 0) { + + // TODO it would be interesting to propagate this failure via the callback + reportFailure( + new RuntimeException( + s"kevent64: flags=${event.flags.toHexString} errno=${event.data}" + ) ) - ) - } else if (callbacks.contains(event.ident.toLong)) { - val filter = event.filter - val cb = EventNotificationCallback.fromPtr(event.udata) + } else if (callbacks.contains(event.ident.toLong)) { + val filter = event.filter + val cb = EventNotificationCallback.fromPtr(event.udata) - try { - cb.notifyEvents(filter == EVFILT_READ, filter == EVFILT_WRITE) - } catch { - case NonFatal(ex) => - reportFailure(ex) + try { + cb.notifyEvents(filter == EVFILT_READ, filter == EVFILT_WRITE) + } catch { + case NonFatal(ex) => + reportFailure(ex) + } } - } - i += 1 - event += 1 + i += 1 + event += 1 + } + } else { + throw new RuntimeException(s"kevent64: ${errno.errno}") } - } else { - throw new RuntimeException(s"kevent64: ${errno.errno}") } - !changes.isEmpty() || callbacks.nonEmpty } } From fa70f815a98d1c4f4056ce345ccff2ffc9103cd6 Mon Sep 17 00:00:00 2001 From: Lee Tibbert Date: Sat, 17 Sep 2022 13:12:46 -0400 Subject: [PATCH 2/6] Add Arman's socket close no-hang test --- tests/shared/src/test/scala/epollcat/TcpSuite.scala | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tests/shared/src/test/scala/epollcat/TcpSuite.scala b/tests/shared/src/test/scala/epollcat/TcpSuite.scala index c0c7749..a4b08ad 100644 --- a/tests/shared/src/test/scala/epollcat/TcpSuite.scala +++ b/tests/shared/src/test/scala/epollcat/TcpSuite.scala @@ -238,4 +238,8 @@ class TcpSuite extends EpollcatSuite { .timeoutTo(100.millis, IO.unit) } + test("immediately closing a socket does not hang") { + IOSocketChannel.open.use_ + } + } From 1c4a0fa00e516b8ce3112241c88fa586e7d6a99f Mon Sep 17 00:00:00 2001 From: Lee Tibbert Date: Sat, 17 Sep 2022 14:12:01 -0400 Subject: [PATCH 3/6] sockets & server_sockets travel in pairs; their test must also --- tests/shared/src/test/scala/epollcat/TcpSuite.scala | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tests/shared/src/test/scala/epollcat/TcpSuite.scala b/tests/shared/src/test/scala/epollcat/TcpSuite.scala index a4b08ad..948710e 100644 --- a/tests/shared/src/test/scala/epollcat/TcpSuite.scala +++ b/tests/shared/src/test/scala/epollcat/TcpSuite.scala @@ -242,4 +242,8 @@ class TcpSuite extends EpollcatSuite { IOSocketChannel.open.use_ } + test("immediately closing a server socket does not hang") { + IOServerSocketChannel.open.use_ + } + } From 63838ca4debad4753d586753c90399e63f191a13 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Sun, 18 Sep 2022 13:44:09 -0700 Subject: [PATCH 4/6] Revert "Fix #64: Remove a KqExec poll hang" This reverts commit 61d2427495c6d5380c676cca3a421c0fc33c8d2a. --- .../unsafe/KqueueExecutorScheduler.scala | 79 +++++++++---------- 1 file changed, 39 insertions(+), 40 deletions(-) diff --git a/core/src/main/scala/epollcat/unsafe/KqueueExecutorScheduler.scala b/core/src/main/scala/epollcat/unsafe/KqueueExecutorScheduler.scala index c992fc7..e9623c6 100644 --- a/core/src/main/scala/epollcat/unsafe/KqueueExecutorScheduler.scala +++ b/core/src/main/scala/epollcat/unsafe/KqueueExecutorScheduler.scala @@ -65,54 +65,53 @@ private[unsafe] final class KqueueExecutorScheduler( } } - if (!noCallbacks) { - val timeoutSpec = - if (timeoutIsInfinite || timeoutIsZero) null - else { - val ts = stackalloc[time.timespec]() - val sec = timeout.toSeconds - ts.tv_sec = sec - ts.tv_nsec = (timeout - sec.seconds).toNanos - ts - } + val timeoutSpec = + if (timeoutIsInfinite || timeoutIsZero) null + else { + val ts = stackalloc[time.timespec]() + val sec = timeout.toSeconds + ts.tv_sec = sec + ts.tv_nsec = (timeout - sec.seconds).toNanos + ts + } - val eventlist = stackalloc[kevent64_s](maxEvents.toLong) - val flags = (if (timeoutIsZero) KEVENT_FLAG_IMMEDIATE else KEVENT_FLAG_NONE).toUInt - val triggeredEvents = - kevent64(kqfd, changelist, finalChangeCount, eventlist, maxEvents, flags, timeoutSpec) - - if (triggeredEvents >= 0) { - var i = 0 - var event = eventlist - while (i < triggeredEvents) { - if ((event.flags.toLong & EV_ERROR) != 0) { - - // TODO it would be interesting to propagate this failure via the callback - reportFailure( - new RuntimeException( - s"kevent64: flags=${event.flags.toHexString} errno=${event.data}" - ) + val eventlist = stackalloc[kevent64_s](maxEvents.toLong) + val flags = (if (timeoutIsZero) KEVENT_FLAG_IMMEDIATE else KEVENT_FLAG_NONE).toUInt + val triggeredEvents = + kevent64(kqfd, changelist, finalChangeCount, eventlist, maxEvents, flags, timeoutSpec) + + if (triggeredEvents >= 0) { + var i = 0 + var event = eventlist + while (i < triggeredEvents) { + if ((event.flags.toLong & EV_ERROR) != 0) { + + // TODO it would be interesting to propagate this failure via the callback + reportFailure( + new RuntimeException( + s"kevent64: flags=${event.flags.toHexString} errno=${event.data}" ) + ) - } else if (callbacks.contains(event.ident.toLong)) { - val filter = event.filter - val cb = EventNotificationCallback.fromPtr(event.udata) + } else if (callbacks.contains(event.ident.toLong)) { + val filter = event.filter + val cb = EventNotificationCallback.fromPtr(event.udata) - try { - cb.notifyEvents(filter == EVFILT_READ, filter == EVFILT_WRITE) - } catch { - case NonFatal(ex) => - reportFailure(ex) - } + try { + cb.notifyEvents(filter == EVFILT_READ, filter == EVFILT_WRITE) + } catch { + case NonFatal(ex) => + reportFailure(ex) } - - i += 1 - event += 1 } - } else { - throw new RuntimeException(s"kevent64: ${errno.errno}") + + i += 1 + event += 1 } + } else { + throw new RuntimeException(s"kevent64: ${errno.errno}") } + !changes.isEmpty() || callbacks.nonEmpty } } From a9f368f37e1cc4c439679307d556bade0cacf6d8 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Sun, 18 Sep 2022 13:48:47 -0700 Subject: [PATCH 5/6] Add a note about the failure mode --- tests/shared/src/test/scala/epollcat/TcpSuite.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/shared/src/test/scala/epollcat/TcpSuite.scala b/tests/shared/src/test/scala/epollcat/TcpSuite.scala index 948710e..4184892 100644 --- a/tests/shared/src/test/scala/epollcat/TcpSuite.scala +++ b/tests/shared/src/test/scala/epollcat/TcpSuite.scala @@ -239,10 +239,12 @@ class TcpSuite extends EpollcatSuite { } test("immediately closing a socket does not hang") { + // note: on failure the test passes, but the test runner hangs IOSocketChannel.open.use_ } test("immediately closing a server socket does not hang") { + // note: on failure the test passes, but the test runner hangs IOServerSocketChannel.open.use_ } From 391a739d24356ef5cdea420a6b955c986f0a7d62 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Sun, 18 Sep 2022 13:56:04 -0700 Subject: [PATCH 6/6] Use the final kqueue change count in condition --- .../unsafe/KqueueExecutorScheduler.scala | 34 +++++++++---------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/core/src/main/scala/epollcat/unsafe/KqueueExecutorScheduler.scala b/core/src/main/scala/epollcat/unsafe/KqueueExecutorScheduler.scala index e9623c6..d602a97 100644 --- a/core/src/main/scala/epollcat/unsafe/KqueueExecutorScheduler.scala +++ b/core/src/main/scala/epollcat/unsafe/KqueueExecutorScheduler.scala @@ -44,27 +44,27 @@ private[unsafe] final class KqueueExecutorScheduler( val timeoutIsInfinite = timeout == Duration.Inf val timeoutIsZero = timeout == Duration.Zero val noCallbacks = callbacks.isEmpty - val changeCount = changes.size() + + // pre-process the changes to filter canceled ones + val changelist = stackalloc[kevent64_s](changes.size().toLong) + var change = changelist + var changeCount = 0 + while (!changes.isEmpty()) { + val evAdd = changes.poll() + if (!evAdd.canceled) { + change.ident = evAdd.fd.toULong + change.filter = evAdd.filter + change.flags = (EV_ADD | EV_CLEAR).toUShort + change.udata = EventNotificationCallback.toPtr(evAdd.cb) + change += 1 + changeCount += 1 + } + } if ((timeoutIsInfinite || timeoutIsZero) && noCallbacks && changeCount == 0) false // nothing to do here. refer to scaladoc on PollingExecutorScheduler#poll else { - val changelist = stackalloc[kevent64_s](changeCount.toLong) - var change = changelist - var finalChangeCount = 0 - while (!changes.isEmpty()) { - val evAdd = changes.poll() - if (!evAdd.canceled) { - change.ident = evAdd.fd.toULong - change.filter = evAdd.filter - change.flags = (EV_ADD | EV_CLEAR).toUShort - change.udata = EventNotificationCallback.toPtr(evAdd.cb) - change += 1 - finalChangeCount += 1 - } - } - val timeoutSpec = if (timeoutIsInfinite || timeoutIsZero) null else { @@ -78,7 +78,7 @@ private[unsafe] final class KqueueExecutorScheduler( val eventlist = stackalloc[kevent64_s](maxEvents.toLong) val flags = (if (timeoutIsZero) KEVENT_FLAG_IMMEDIATE else KEVENT_FLAG_NONE).toUInt val triggeredEvents = - kevent64(kqfd, changelist, finalChangeCount, eventlist, maxEvents, flags, timeoutSpec) + kevent64(kqfd, changelist, changeCount, eventlist, maxEvents, flags, timeoutSpec) if (triggeredEvents >= 0) { var i = 0