From e876930ba022ecd34b3faae76e5de6bb7a11917a Mon Sep 17 00:00:00 2001 From: Lorenzo Gabriele Date: Thu, 27 Oct 2022 22:49:51 +0200 Subject: [PATCH 1/6] WIP: Hanging test with monitor --- .../test/scala/epollcat/MonitorSuite.scala | 67 +++++++++++++++++++ 1 file changed, 67 insertions(+) create mode 100644 tests/native/src/test/scala/epollcat/MonitorSuite.scala diff --git a/tests/native/src/test/scala/epollcat/MonitorSuite.scala b/tests/native/src/test/scala/epollcat/MonitorSuite.scala new file mode 100644 index 0000000..9fa94fa --- /dev/null +++ b/tests/native/src/test/scala/epollcat/MonitorSuite.scala @@ -0,0 +1,67 @@ +package epollcat + +import cats.effect.IO +import cats.effect.Resource +import epollcat.unsafe.EventPollingExecutorScheduler +import epollcat.unsafe.EpollRuntime + +import scala.scalanative.unsafe._ +import scala.scalanative.unsigned._ +import scala.scalanative.posix.unistd._ + +class MonitorSuite extends EpollcatSuite { + + val zoneResource: Resource[IO, Zone] = Resource.make(IO(Zone.open()))(z => IO(z.close())) + + val pipeResource: Resource[IO, (Int, Int)] = Resource.make { + zoneResource.use { implicit zone => + val fildes = alloc[CInt](2) + if (pipe(fildes) != 0) { + IO.raiseError(new Exception("Failed to create pipe")) + } else { + IO((fildes(0), fildes(1))) + } + } + } { + case (a, b) => + IO { + println("CLOSING PIPES") + close(a) + close(b) + () + } + } + + test("compile") { + val scheduler = EpollRuntime.global.scheduler.asInstanceOf[EventPollingExecutorScheduler] + + pipeResource.use { + case (r, w) => + IO.fromFuture { + IO { + println("TEST STARTS") + val promise = scala.concurrent.Promise[Unit]() + val byte = 10.toByte + scheduler.monitor(r, reads = true, writes = false)( + new epollcat.unsafe.EventNotificationCallback { + def notifyEvents(readReady: Boolean, writeReady: Boolean): Unit = { + println("NOTIFY EVENTS") + val buf = stackalloc[Byte]() + val bytesRead = read(r, buf, 1L.toULong) + assert(bytesRead == 1) + assert(buf(0) == byte) + println("SUCCESSFUL PROMISE") + promise.success(()) + } + }) + val buf = stackalloc[Byte]() + buf(0) = byte + val bytesWrote = write(w, buf, 1L.toULong) + assert(bytesWrote == 1) + println("RETURNING FUTURE") + promise.future + } + } + } + } +} From f4eb7294344dd8f644dcb8f900d394404fac7519 Mon Sep 17 00:00:00 2001 From: Lorenzo Gabriele Date: Fri, 28 Oct 2022 10:42:17 +0200 Subject: [PATCH 2/6] Fix test and use IO.async_ --- .../test/scala/epollcat/MonitorSuite.scala | 49 +++++++++---------- 1 file changed, 22 insertions(+), 27 deletions(-) diff --git a/tests/native/src/test/scala/epollcat/MonitorSuite.scala b/tests/native/src/test/scala/epollcat/MonitorSuite.scala index 9fa94fa..b25f9ca 100644 --- a/tests/native/src/test/scala/epollcat/MonitorSuite.scala +++ b/tests/native/src/test/scala/epollcat/MonitorSuite.scala @@ -2,6 +2,7 @@ package epollcat import cats.effect.IO import cats.effect.Resource +import epollcat.unsafe.EventNotificationCallback import epollcat.unsafe.EventPollingExecutorScheduler import epollcat.unsafe.EpollRuntime @@ -11,9 +12,10 @@ import scala.scalanative.posix.unistd._ class MonitorSuite extends EpollcatSuite { - val zoneResource: Resource[IO, Zone] = Resource.make(IO(Zone.open()))(z => IO(z.close())) + private val zoneResource: Resource[IO, Zone] = + Resource.make(IO(Zone.open()))(z => IO(z.close())) - val pipeResource: Resource[IO, (Int, Int)] = Resource.make { + private val pipeResource: Resource[IO, (Int, Int)] = Resource.make { zoneResource.use { implicit zone => val fildes = alloc[CInt](2) if (pipe(fildes) != 0) { @@ -25,42 +27,35 @@ class MonitorSuite extends EpollcatSuite { } { case (a, b) => IO { - println("CLOSING PIPES") close(a) close(b) () } } - test("compile") { + test("monitor a pipe") { val scheduler = EpollRuntime.global.scheduler.asInstanceOf[EventPollingExecutorScheduler] pipeResource.use { case (r, w) => - IO.fromFuture { - IO { - println("TEST STARTS") - val promise = scala.concurrent.Promise[Unit]() - val byte = 10.toByte - scheduler.monitor(r, reads = true, writes = false)( - new epollcat.unsafe.EventNotificationCallback { - def notifyEvents(readReady: Boolean, writeReady: Boolean): Unit = { - println("NOTIFY EVENTS") - val buf = stackalloc[Byte]() - val bytesRead = read(r, buf, 1L.toULong) - assert(bytesRead == 1) - assert(buf(0) == byte) - println("SUCCESSFUL PROMISE") - promise.success(()) - } - }) - val buf = stackalloc[Byte]() - buf(0) = byte - val bytesWrote = write(w, buf, 1L.toULong) - assert(bytesWrote == 1) - println("RETURNING FUTURE") - promise.future + IO.async_[Unit] { cb => + val byte = 10.toByte + var stop: Runnable = null + val monitorCallback = new epollcat.unsafe.EventNotificationCallback { + def notifyEvents(readReady: Boolean, writeReady: Boolean): Unit = { + val readBuf = stackalloc[Byte]() + val bytesRead = read(r, readBuf, 1L.toULong) + assertEquals(bytesRead, 1) + assertEquals(readBuf(0), byte) + stop.run() + cb(Right(())) + } } + stop = scheduler.monitor(r, reads = true, writes = false)(monitorCallback) + val writeBuf = stackalloc[Byte]() + writeBuf(0) = byte + val wroteBytes = write(w, writeBuf, 1L.toULong) + assertEquals(wroteBytes, 1) } } } From 81e9e10d28819e543b78699ccfa374cdb2e6194d Mon Sep 17 00:00:00 2001 From: Lorenzo Gabriele Date: Fri, 28 Oct 2022 11:46:19 +0200 Subject: [PATCH 3/6] Add header --- .../test/scala/epollcat/MonitorSuite.scala | 85 +++++++++++-------- 1 file changed, 50 insertions(+), 35 deletions(-) diff --git a/tests/native/src/test/scala/epollcat/MonitorSuite.scala b/tests/native/src/test/scala/epollcat/MonitorSuite.scala index b25f9ca..d5daa74 100644 --- a/tests/native/src/test/scala/epollcat/MonitorSuite.scala +++ b/tests/native/src/test/scala/epollcat/MonitorSuite.scala @@ -1,3 +1,19 @@ +/* + * Copyright 2022 Arman Bilge + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package epollcat import cats.effect.IO @@ -12,51 +28,50 @@ import scala.scalanative.posix.unistd._ class MonitorSuite extends EpollcatSuite { - private val zoneResource: Resource[IO, Zone] = - Resource.make(IO(Zone.open()))(z => IO(z.close())) - - private val pipeResource: Resource[IO, (Int, Int)] = Resource.make { - zoneResource.use { implicit zone => - val fildes = alloc[CInt](2) - if (pipe(fildes) != 0) { - IO.raiseError(new Exception("Failed to create pipe")) - } else { - IO((fildes(0), fildes(1))) + class Pipe private (val readFd: Int, val writeFd: Int) + object Pipe { + private val zoneResource: Resource[IO, Zone] = + Resource.make(IO(Zone.open()))(z => IO(z.close())) + val make: Resource[IO, Pipe] = Resource.make { + zoneResource.use { implicit zone => + val fildes = alloc[CInt](2) + if (pipe(fildes) != 0) { + IO.raiseError(new Exception("Failed to create pipe")) + } else { + IO(new Pipe(fildes(0), fildes(1))) + } } - } - } { - case (a, b) => + }(pipe => IO { - close(a) - close(b) + close(pipe.readFd) + close(pipe.writeFd) () - } + }) } test("monitor a pipe") { val scheduler = EpollRuntime.global.scheduler.asInstanceOf[EventPollingExecutorScheduler] - pipeResource.use { - case (r, w) => - IO.async_[Unit] { cb => - val byte = 10.toByte - var stop: Runnable = null - val monitorCallback = new epollcat.unsafe.EventNotificationCallback { - def notifyEvents(readReady: Boolean, writeReady: Boolean): Unit = { - val readBuf = stackalloc[Byte]() - val bytesRead = read(r, readBuf, 1L.toULong) - assertEquals(bytesRead, 1) - assertEquals(readBuf(0), byte) - stop.run() - cb(Right(())) - } + Pipe.make.use { pipe => + IO.async_[Unit] { cb => + val byte = 10.toByte + var stop: Runnable = null + val monitorCallback = new epollcat.unsafe.EventNotificationCallback { + def notifyEvents(readReady: Boolean, writeReady: Boolean): Unit = { + val readBuf = stackalloc[Byte]() + val bytesRead = read(pipe.readFd, readBuf, 1L.toULong) + assertEquals(bytesRead, 1) + assertEquals(readBuf(0), byte) + stop.run() + cb(Right(())) } - stop = scheduler.monitor(r, reads = true, writes = false)(monitorCallback) - val writeBuf = stackalloc[Byte]() - writeBuf(0) = byte - val wroteBytes = write(w, writeBuf, 1L.toULong) - assertEquals(wroteBytes, 1) } + stop = scheduler.monitor(pipe.readFd, reads = true, writes = false)(monitorCallback) + val writeBuf = stackalloc[Byte]() + writeBuf(0) = byte + val wroteBytes = write(pipe.writeFd, writeBuf, 1L.toULong) + assertEquals(wroteBytes, 1) + } } } } From fa5b7c94832eee066386f6b581ceb78f8784dcb0 Mon Sep 17 00:00:00 2001 From: Lorenzo Gabriele Date: Fri, 28 Oct 2022 12:09:15 +0200 Subject: [PATCH 4/6] Remove unused import --- tests/native/src/test/scala/epollcat/MonitorSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/native/src/test/scala/epollcat/MonitorSuite.scala b/tests/native/src/test/scala/epollcat/MonitorSuite.scala index d5daa74..f880a14 100644 --- a/tests/native/src/test/scala/epollcat/MonitorSuite.scala +++ b/tests/native/src/test/scala/epollcat/MonitorSuite.scala @@ -56,7 +56,7 @@ class MonitorSuite extends EpollcatSuite { IO.async_[Unit] { cb => val byte = 10.toByte var stop: Runnable = null - val monitorCallback = new epollcat.unsafe.EventNotificationCallback { + val monitorCallback = new EventNotificationCallback { def notifyEvents(readReady: Boolean, writeReady: Boolean): Unit = { val readBuf = stackalloc[Byte]() val bytesRead = read(pipe.readFd, readBuf, 1L.toULong) From 54d83104c929473e74fca171262877ef1656c688 Mon Sep 17 00:00:00 2001 From: Lorenzo Gabriele Date: Sat, 29 Oct 2022 11:17:49 +0200 Subject: [PATCH 5/6] Use stackalloc instead of Zone --- .../native/src/test/scala/epollcat/MonitorSuite.scala | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/tests/native/src/test/scala/epollcat/MonitorSuite.scala b/tests/native/src/test/scala/epollcat/MonitorSuite.scala index f880a14..57316c5 100644 --- a/tests/native/src/test/scala/epollcat/MonitorSuite.scala +++ b/tests/native/src/test/scala/epollcat/MonitorSuite.scala @@ -30,15 +30,13 @@ class MonitorSuite extends EpollcatSuite { class Pipe private (val readFd: Int, val writeFd: Int) object Pipe { - private val zoneResource: Resource[IO, Zone] = - Resource.make(IO(Zone.open()))(z => IO(z.close())) val make: Resource[IO, Pipe] = Resource.make { - zoneResource.use { implicit zone => - val fildes = alloc[CInt](2) + IO { + val fildes = stackalloc[CInt](2) if (pipe(fildes) != 0) { - IO.raiseError(new Exception("Failed to create pipe")) + throw new Exception("Failed to create pipe") } else { - IO(new Pipe(fildes(0), fildes(1))) + new Pipe(fildes(0), fildes(1)) } } }(pipe => From 118661944e3760a6e2c50074d3ab800985b9ab34 Mon Sep 17 00:00:00 2001 From: Lorenzo Gabriele Date: Sat, 29 Oct 2022 11:37:34 +0200 Subject: [PATCH 6/6] Actually fail the test when callback fails --- .../src/test/scala/epollcat/MonitorSuite.scala | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/tests/native/src/test/scala/epollcat/MonitorSuite.scala b/tests/native/src/test/scala/epollcat/MonitorSuite.scala index 57316c5..e37817f 100644 --- a/tests/native/src/test/scala/epollcat/MonitorSuite.scala +++ b/tests/native/src/test/scala/epollcat/MonitorSuite.scala @@ -56,12 +56,17 @@ class MonitorSuite extends EpollcatSuite { var stop: Runnable = null val monitorCallback = new EventNotificationCallback { def notifyEvents(readReady: Boolean, writeReady: Boolean): Unit = { - val readBuf = stackalloc[Byte]() - val bytesRead = read(pipe.readFd, readBuf, 1L.toULong) - assertEquals(bytesRead, 1) - assertEquals(readBuf(0), byte) - stop.run() - cb(Right(())) + try { + val readBuf = stackalloc[Byte]() + val bytesRead = read(pipe.readFd, readBuf, 1L.toULong) + assertEquals(bytesRead, 1) + assertEquals(readBuf(0), byte) + cb(Right(())) + } catch { + case e: Throwable => cb(Left(e)) + } finally { + stop.run() + } } } stop = scheduler.monitor(pipe.readFd, reads = true, writes = false)(monitorCallback)