From 809385229a54a209168b16e3998f5c4c97b0f424 Mon Sep 17 00:00:00 2001 From: Vasil Vasilev Date: Sun, 2 May 2021 23:53:07 +0200 Subject: [PATCH 01/20] Extremely crude implementation --- .../main/scala/fs2/io/internal/IOBuffer.scala | 86 +++++++++++++++++++ io/src/main/scala/fs2/io/io.scala | 7 +- 2 files changed, 89 insertions(+), 4 deletions(-) create mode 100644 io/src/main/scala/fs2/io/internal/IOBuffer.scala diff --git a/io/src/main/scala/fs2/io/internal/IOBuffer.scala b/io/src/main/scala/fs2/io/internal/IOBuffer.scala new file mode 100644 index 0000000000..3e0d6ed541 --- /dev/null +++ b/io/src/main/scala/fs2/io/internal/IOBuffer.scala @@ -0,0 +1,86 @@ +/* + * Copyright (c) 2013 Functional Streams for Scala + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to + * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + * the Software, and to permit persons to whom the Software is furnished to do so, + * subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +package fs2.io.internal + +import java.io.{InputStream, OutputStream} + +private[io] final class IOBuffer(private[this] val capacity: Int) { self => + + private[this] val buffer: Array[Byte] = new Array(capacity) + + private[this] var head: Int = 0 + private[this] var tail: Int = 0 + + private[this] var closed: Boolean = false + + val inputStream: InputStream = new InputStream { + def read(): Int = { + var res = 0 + var cont = true + while (cont) { + self.synchronized { + if (head != tail) { + res = buffer(head % capacity) & 0xff + head += 1 + cont = false + } else if (closed) { + res = -1 + cont = false + } + } + + if (cont) { + Thread.sleep(100L) + } + } + + res + } + + override def close(): Unit = self.synchronized { + closed = true + } + } + + val outputStream: OutputStream = new OutputStream { + def write(b: Int): Unit = { + var cont = true + while (cont) { + self.synchronized { + if (tail - head < capacity) { + buffer(tail % capacity) = (b & 0xff).toByte + tail += 1 + cont = false + } + } + + if (cont) { + Thread.sleep(100L) + } + } + } + + override def close(): Unit = self.synchronized { + closed = true + } + } +} diff --git a/io/src/main/scala/fs2/io/io.scala b/io/src/main/scala/fs2/io/io.scala index 5c05ff639d..1ccfac4b08 100644 --- a/io/src/main/scala/fs2/io/io.scala +++ b/io/src/main/scala/fs2/io/io.scala @@ -27,7 +27,7 @@ import cats.effect.kernel.implicits._ import cats.effect.kernel.Deferred import cats.syntax.all._ -import java.io.{InputStream, OutputStream, PipedInputStream, PipedOutputStream} +import java.io.{InputStream, OutputStream} import java.nio.charset.Charset /** Provides various ways to work with streams that perform IO. @@ -133,9 +133,8 @@ package object io { ): Stream[F, Byte] = { val mkOutput: Resource[F, (OutputStream, InputStream)] = Resource.make(Sync[F].delay { - val os = new PipedOutputStream() - val is = new PipedInputStream(os, chunkSize) - (os: OutputStream, is: InputStream) + val buf = new internal.IOBuffer(chunkSize) + (buf.outputStream, buf.inputStream) })(ois => Sync[F].blocking { // Piped(I/O)Stream implementations cant't throw on close, no need to nest the handling here. From fa7fdea6deb1e63f274848bb59045352fef166f8 Mon Sep 17 00:00:00 2001 From: Vasil Vasilev Date: Mon, 3 May 2021 01:34:50 +0200 Subject: [PATCH 02/20] Read chunks at a time --- .../main/scala/fs2/io/internal/IOBuffer.scala | 35 +++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/io/src/main/scala/fs2/io/internal/IOBuffer.scala b/io/src/main/scala/fs2/io/internal/IOBuffer.scala index 3e0d6ed541..feb6c01858 100644 --- a/io/src/main/scala/fs2/io/internal/IOBuffer.scala +++ b/io/src/main/scala/fs2/io/internal/IOBuffer.scala @@ -56,6 +56,41 @@ private[io] final class IOBuffer(private[this] val capacity: Int) { self => res } + override def read(b: Array[Byte], off: Int, len: Int): Int = { + var offset = off + var length = len + + var success = false + var res = 0 + var cont = true + + while (cont) { + self.synchronized { + if (head != tail) { + val available = tail - head + val toRead = math.min(available, length) + System.arraycopy(buffer, head % capacity, b, offset, toRead) + head += toRead + offset += toRead + length -= toRead + res += toRead + success = true + if (length == 0) { + cont = false + } + } else if (closed) { + cont = false + } + } + + if (cont) { + Thread.sleep(100L) + } + } + + if (success) res else -1 + } + override def close(): Unit = self.synchronized { closed = true } From 534f48a56b7acc224a286bfc554f26ec3e34afd8 Mon Sep 17 00:00:00 2001 From: Vasil Vasilev Date: Mon, 3 May 2021 01:45:21 +0200 Subject: [PATCH 03/20] Write whole chunks at a time --- .../main/scala/fs2/io/internal/IOBuffer.scala | 30 +++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/io/src/main/scala/fs2/io/internal/IOBuffer.scala b/io/src/main/scala/fs2/io/internal/IOBuffer.scala index feb6c01858..e47cf0a201 100644 --- a/io/src/main/scala/fs2/io/internal/IOBuffer.scala +++ b/io/src/main/scala/fs2/io/internal/IOBuffer.scala @@ -105,6 +105,36 @@ private[io] final class IOBuffer(private[this] val capacity: Int) { self => buffer(tail % capacity) = (b & 0xff).toByte tail += 1 cont = false + } else if (closed) { + cont = false + } + } + + if (cont) { + Thread.sleep(100L) + } + } + } + + override def write(b: Array[Byte], off: Int, len: Int): Unit = { + var offset = off + var length = len + + var cont = true + while (cont) { + self.synchronized { + if (tail - head < capacity) { + val available = capacity - (tail - head) + val toWrite = math.min(available, length) + System.arraycopy(b, offset, buffer, tail % capacity, toWrite) + tail += toWrite + offset += toWrite + length -= toWrite + if (length == 0) { + cont = false + } + } else if (closed) { + cont = false } } From c411aaf35189f14b66a0d192205b48cbb381edb7 Mon Sep 17 00:00:00 2001 From: Vasil Vasilev Date: Mon, 3 May 2021 02:18:57 +0200 Subject: [PATCH 04/20] Backpressure readers and writers --- .../main/scala/fs2/io/internal/IOBuffer.scala | 31 ++++++++++++++++--- 1 file changed, 27 insertions(+), 4 deletions(-) diff --git a/io/src/main/scala/fs2/io/internal/IOBuffer.scala b/io/src/main/scala/fs2/io/internal/IOBuffer.scala index e47cf0a201..0405f10de7 100644 --- a/io/src/main/scala/fs2/io/internal/IOBuffer.scala +++ b/io/src/main/scala/fs2/io/internal/IOBuffer.scala @@ -22,6 +22,7 @@ package fs2.io.internal import java.io.{InputStream, OutputStream} +import java.util.concurrent.Semaphore private[io] final class IOBuffer(private[this] val capacity: Int) { self => @@ -32,8 +33,13 @@ private[io] final class IOBuffer(private[this] val capacity: Int) { self => private[this] var closed: Boolean = false + private[this] val readerPermit: Semaphore = new Semaphore(1) + private[this] val writerPermit: Semaphore = new Semaphore(1) + val inputStream: InputStream = new InputStream { def read(): Int = { + readerPermit.acquire() + var res = 0 var cont = true while (cont) { @@ -41,15 +47,18 @@ private[io] final class IOBuffer(private[this] val capacity: Int) { self => if (head != tail) { res = buffer(head % capacity) & 0xff head += 1 + writerPermit.release() + readerPermit.release() cont = false } else if (closed) { res = -1 + readerPermit.release() cont = false } } if (cont) { - Thread.sleep(100L) + readerPermit.acquire() } } @@ -57,6 +66,8 @@ private[io] final class IOBuffer(private[this] val capacity: Int) { self => } override def read(b: Array[Byte], off: Int, len: Int): Int = { + readerPermit.acquire() + var offset = off var length = len @@ -75,16 +86,19 @@ private[io] final class IOBuffer(private[this] val capacity: Int) { self => length -= toRead res += toRead success = true + writerPermit.release() if (length == 0) { + readerPermit.release() cont = false } } else if (closed) { + readerPermit.release() cont = false } } if (cont) { - Thread.sleep(100L) + readerPermit.acquire() } } @@ -93,6 +107,8 @@ private[io] final class IOBuffer(private[this] val capacity: Int) { self => override def close(): Unit = self.synchronized { closed = true + readerPermit.release() + writerPermit.release() } } @@ -104,14 +120,17 @@ private[io] final class IOBuffer(private[this] val capacity: Int) { self => if (tail - head < capacity) { buffer(tail % capacity) = (b & 0xff).toByte tail += 1 + readerPermit.release() + writerPermit.release() cont = false } else if (closed) { + writerPermit.release() cont = false } } if (cont) { - Thread.sleep(100L) + writerPermit.acquire() } } } @@ -130,22 +149,26 @@ private[io] final class IOBuffer(private[this] val capacity: Int) { self => tail += toWrite offset += toWrite length -= toWrite + readerPermit.release() if (length == 0) { + writerPermit.release() cont = false } } else if (closed) { + readerPermit.release() cont = false } } if (cont) { - Thread.sleep(100L) + writerPermit.acquire() } } } override def close(): Unit = self.synchronized { closed = true + readerPermit.release() } } } From 1e680dd1bd68325305209fea8baf3c43cf55f147 Mon Sep 17 00:00:00 2001 From: Vasil Vasilev Date: Mon, 3 May 2021 02:21:06 +0200 Subject: [PATCH 05/20] Rename to InputOutputBuffer --- .../fs2/io/internal/{IOBuffer.scala => InputOutputBuffer.scala} | 2 +- io/src/main/scala/fs2/io/io.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) rename io/src/main/scala/fs2/io/internal/{IOBuffer.scala => InputOutputBuffer.scala} (98%) diff --git a/io/src/main/scala/fs2/io/internal/IOBuffer.scala b/io/src/main/scala/fs2/io/internal/InputOutputBuffer.scala similarity index 98% rename from io/src/main/scala/fs2/io/internal/IOBuffer.scala rename to io/src/main/scala/fs2/io/internal/InputOutputBuffer.scala index 0405f10de7..09f35a25a2 100644 --- a/io/src/main/scala/fs2/io/internal/IOBuffer.scala +++ b/io/src/main/scala/fs2/io/internal/InputOutputBuffer.scala @@ -24,7 +24,7 @@ package fs2.io.internal import java.io.{InputStream, OutputStream} import java.util.concurrent.Semaphore -private[io] final class IOBuffer(private[this] val capacity: Int) { self => +private[io] final class InputOutputBuffer(private[this] val capacity: Int) { self => private[this] val buffer: Array[Byte] = new Array(capacity) diff --git a/io/src/main/scala/fs2/io/io.scala b/io/src/main/scala/fs2/io/io.scala index 1ccfac4b08..868a9a3dc2 100644 --- a/io/src/main/scala/fs2/io/io.scala +++ b/io/src/main/scala/fs2/io/io.scala @@ -133,7 +133,7 @@ package object io { ): Stream[F, Byte] = { val mkOutput: Resource[F, (OutputStream, InputStream)] = Resource.make(Sync[F].delay { - val buf = new internal.IOBuffer(chunkSize) + val buf = new internal.InputOutputBuffer(chunkSize) (buf.outputStream, buf.inputStream) })(ois => Sync[F].blocking { From 727c36b2ce638e6ec32ba50de656186cdbb737b6 Mon Sep 17 00:00:00 2001 From: Vasil Vasilev Date: Mon, 3 May 2021 02:23:53 +0200 Subject: [PATCH 06/20] Import InputOutputBuffer --- io/src/main/scala/fs2/io/io.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/io/src/main/scala/fs2/io/io.scala b/io/src/main/scala/fs2/io/io.scala index 868a9a3dc2..78d8aa35a3 100644 --- a/io/src/main/scala/fs2/io/io.scala +++ b/io/src/main/scala/fs2/io/io.scala @@ -26,6 +26,7 @@ import cats.effect.kernel.{Async, Outcome, Resource, Sync} import cats.effect.kernel.implicits._ import cats.effect.kernel.Deferred import cats.syntax.all._ +import fs2.io.internal.InputOutputBuffer import java.io.{InputStream, OutputStream} import java.nio.charset.Charset @@ -133,7 +134,7 @@ package object io { ): Stream[F, Byte] = { val mkOutput: Resource[F, (OutputStream, InputStream)] = Resource.make(Sync[F].delay { - val buf = new internal.InputOutputBuffer(chunkSize) + val buf = new InputOutputBuffer(chunkSize) (buf.outputStream, buf.inputStream) })(ois => Sync[F].blocking { From 19f6666aceab64d791454184cb3e05787b18215c Mon Sep 17 00:00:00 2001 From: Vasil Vasilev Date: Mon, 3 May 2021 02:42:20 +0200 Subject: [PATCH 07/20] Simplify read and write --- .../fs2/io/internal/InputOutputBuffer.scala | 29 +++++++------------ 1 file changed, 10 insertions(+), 19 deletions(-) diff --git a/io/src/main/scala/fs2/io/internal/InputOutputBuffer.scala b/io/src/main/scala/fs2/io/internal/InputOutputBuffer.scala index 09f35a25a2..8ced6d6c29 100644 --- a/io/src/main/scala/fs2/io/internal/InputOutputBuffer.scala +++ b/io/src/main/scala/fs2/io/internal/InputOutputBuffer.scala @@ -40,29 +40,24 @@ private[io] final class InputOutputBuffer(private[this] val capacity: Int) { sel def read(): Int = { readerPermit.acquire() - var res = 0 - var cont = true - while (cont) { + while (true) { self.synchronized { if (head != tail) { - res = buffer(head % capacity) & 0xff + val byte = buffer(head % capacity) & 0xff head += 1 writerPermit.release() readerPermit.release() - cont = false + return byte } else if (closed) { - res = -1 readerPermit.release() - cont = false + return -1 } } - if (cont) { - readerPermit.acquire() - } + readerPermit.acquire() } - res + -1 } override def read(b: Array[Byte], off: Int, len: Int): Int = { @@ -114,25 +109,21 @@ private[io] final class InputOutputBuffer(private[this] val capacity: Int) { sel val outputStream: OutputStream = new OutputStream { def write(b: Int): Unit = { - var cont = true - while (cont) { + while (true) self.synchronized { if (tail - head < capacity) { buffer(tail % capacity) = (b & 0xff).toByte tail += 1 readerPermit.release() writerPermit.release() - cont = false + return } else if (closed) { writerPermit.release() - cont = false + return } } - if (cont) { - writerPermit.acquire() - } - } + writerPermit.acquire() } override def write(b: Array[Byte], off: Int, len: Int): Unit = { From fe07922ab3ecb0135c7911ffa0c91d6bad19e8c7 Mon Sep 17 00:00:00 2001 From: Vasil Vasilev Date: Mon, 3 May 2021 02:46:36 +0200 Subject: [PATCH 08/20] Simplify write batch --- .../scala/fs2/io/internal/InputOutputBuffer.scala | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/io/src/main/scala/fs2/io/internal/InputOutputBuffer.scala b/io/src/main/scala/fs2/io/internal/InputOutputBuffer.scala index 8ced6d6c29..32fb429ef4 100644 --- a/io/src/main/scala/fs2/io/internal/InputOutputBuffer.scala +++ b/io/src/main/scala/fs2/io/internal/InputOutputBuffer.scala @@ -130,8 +130,7 @@ private[io] final class InputOutputBuffer(private[this] val capacity: Int) { sel var offset = off var length = len - var cont = true - while (cont) { + while (true) { self.synchronized { if (tail - head < capacity) { val available = capacity - (tail - head) @@ -142,18 +141,14 @@ private[io] final class InputOutputBuffer(private[this] val capacity: Int) { sel length -= toWrite readerPermit.release() if (length == 0) { - writerPermit.release() - cont = false + return } } else if (closed) { - readerPermit.release() - cont = false + return } } - if (cont) { - writerPermit.acquire() - } + writerPermit.acquire() } } From 47f9e41ffff9eb0a9b042ec54b65489d8c73cb81 Mon Sep 17 00:00:00 2001 From: Vasil Vasilev Date: Mon, 3 May 2021 02:50:45 +0200 Subject: [PATCH 09/20] Override available for the input stream --- io/src/main/scala/fs2/io/internal/InputOutputBuffer.scala | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/io/src/main/scala/fs2/io/internal/InputOutputBuffer.scala b/io/src/main/scala/fs2/io/internal/InputOutputBuffer.scala index 32fb429ef4..2600fcd152 100644 --- a/io/src/main/scala/fs2/io/internal/InputOutputBuffer.scala +++ b/io/src/main/scala/fs2/io/internal/InputOutputBuffer.scala @@ -105,6 +105,10 @@ private[io] final class InputOutputBuffer(private[this] val capacity: Int) { sel readerPermit.release() writerPermit.release() } + + override def available(): Int = self.synchronized { + if (closed) 0 else tail - head + } } val outputStream: OutputStream = new OutputStream { From 89c6d55d19d75f1df8b7b7501cdc880c5e460198 Mon Sep 17 00:00:00 2001 From: Vasil Vasilev Date: Mon, 3 May 2021 02:58:20 +0200 Subject: [PATCH 10/20] Satisfy the Input/OutputStream read/write interface --- .../fs2/io/internal/InputOutputBuffer.scala | 21 +++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/io/src/main/scala/fs2/io/internal/InputOutputBuffer.scala b/io/src/main/scala/fs2/io/internal/InputOutputBuffer.scala index 2600fcd152..4d2bb14566 100644 --- a/io/src/main/scala/fs2/io/internal/InputOutputBuffer.scala +++ b/io/src/main/scala/fs2/io/internal/InputOutputBuffer.scala @@ -61,6 +61,15 @@ private[io] final class InputOutputBuffer(private[this] val capacity: Int) { sel } override def read(b: Array[Byte], off: Int, len: Int): Int = { + if (b eq null) throw new NullPointerException("Cannot read into a null byte array") + else if (off < 0) + throw new IndexOutOfBoundsException(s"Negative offset into the byte array: $off") + else if (len < 0) throw new IndexOutOfBoundsException(s"Negative read length specified: $len") + else if (len > b.length - off) + throw new IndexOutOfBoundsException( + s"Specified length is greater than the remaining length of the byte array after the offset: len = $len, capacity = ${b.length - off}" + ) + readerPermit.acquire() var offset = off @@ -131,6 +140,18 @@ private[io] final class InputOutputBuffer(private[this] val capacity: Int) { sel } override def write(b: Array[Byte], off: Int, len: Int): Unit = { + if (b eq null) throw new NullPointerException("Cannot read into a null byte array") + else if (off < 0) + throw new IndexOutOfBoundsException(s"Negative offset into the byte array: $off") + else if (len < 0) + throw new IndexOutOfBoundsException(s"Negative write length specified: $len") + else if (len > b.length - off) + throw new IndexOutOfBoundsException( + s"Specified length is greater than the remaining length of the byte array after the offset: len = $len, capacity = ${b.length - off}" + ) + + writerPermit.acquire() + var offset = off var length = len From 16b347ac8d57b6258a5fe48fb701de532c521598 Mon Sep 17 00:00:00 2001 From: Vasil Vasilev Date: Mon, 3 May 2021 03:12:05 +0200 Subject: [PATCH 11/20] Add scaladoc for InputOutputBuffer --- .../fs2/io/internal/InputOutputBuffer.scala | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/io/src/main/scala/fs2/io/internal/InputOutputBuffer.scala b/io/src/main/scala/fs2/io/internal/InputOutputBuffer.scala index 4d2bb14566..fa8c3463d4 100644 --- a/io/src/main/scala/fs2/io/internal/InputOutputBuffer.scala +++ b/io/src/main/scala/fs2/io/internal/InputOutputBuffer.scala @@ -24,6 +24,23 @@ package fs2.io.internal import java.io.{InputStream, OutputStream} import java.util.concurrent.Semaphore +/** Thread safe circular byte buffer which connects a [[java.io.OutputStream]] + * to a [[java.io.OutputStream]] in a memory efficient manner, without copying + * bytes unnecessarily. + * + * @note As per the interfaces of the [[java.io]] classes, all of the + * operations are blocking in nature and extra care should be taken when using + * the exposed input/output streams. Thread safety is ensured by + * synchronizing on individual objects of this class. + * + * This is, in spirit, a clean room reimplementation of the + * [[java.io.PipedInputStream]] and [[java.io.PipedOutputStream]] pair of + * classes which can be used to achieve similar functionality, without the + * thread bookkeeping which is confusing in a multi threaded environment like + * the effect systems in which this code runs. + * + * @param capacity the capacity of the allocated circular buffer + */ private[io] final class InputOutputBuffer(private[this] val capacity: Int) { self => private[this] val buffer: Array[Byte] = new Array(capacity) From 989c74c60022ea75ec3f8fce5957a3b5136921fd Mon Sep 17 00:00:00 2001 From: Vasil Vasilev Date: Mon, 3 May 2021 03:49:30 +0200 Subject: [PATCH 12/20] Add comments for the code in InputOutputBuffer --- .../fs2/io/internal/InputOutputBuffer.scala | 79 ++++++++++++++++++- 1 file changed, 77 insertions(+), 2 deletions(-) diff --git a/io/src/main/scala/fs2/io/internal/InputOutputBuffer.scala b/io/src/main/scala/fs2/io/internal/InputOutputBuffer.scala index fa8c3463d4..8d066e15dd 100644 --- a/io/src/main/scala/fs2/io/internal/InputOutputBuffer.scala +++ b/io/src/main/scala/fs2/io/internal/InputOutputBuffer.scala @@ -55,29 +55,42 @@ private[io] final class InputOutputBuffer(private[this] val capacity: Int) { sel val inputStream: InputStream = new InputStream { def read(): Int = { + // Obtain permission to read from the buffer. Used for backpressuring + // readers when the buffer is empty. readerPermit.acquire() while (true) { self.synchronized { if (head != tail) { + // There is at least one byte to read. val byte = buffer(head % capacity) & 0xff + // The byte is marked as read by advancing the head of the + // circular buffer. head += 1 + // Notify a writer that some space has been freed up in the buffer. writerPermit.release() + // Notify a next reader. readerPermit.release() return byte } else if (closed) { + // The Input/OutputStream pipe has been closed. Release the obtained + // permit such that future readers are not blocked forever. readerPermit.release() return -1 } } + // There is nothing to be read from the buffer at this moment. + // Wait until notified by a writer. readerPermit.acquire() } + // Unreachable code. -1 } override def read(b: Array[Byte], off: Int, len: Int): Int = { + // This branching satisfies the InputStream#read interface. if (b eq null) throw new NullPointerException("Cannot read into a null byte array") else if (off < 0) throw new IndexOutOfBoundsException(s"Negative offset into the byte array: $off") @@ -87,11 +100,19 @@ private[io] final class InputOutputBuffer(private[this] val capacity: Int) { sel s"Specified length is greater than the remaining length of the byte array after the offset: len = $len, capacity = ${b.length - off}" ) + // Obtain permission to read from the buffer. Used for backpressuring + // readers when the buffer is empty. readerPermit.acquire() + // Variables used to track the progress of the reading. It can happen that + // the current contents of the buffer cannot fulfill the read request and + // it needs to be done in several iterations after more data has been + // written into the buffer. var offset = off var length = len + // This method needs to return the number of read bytes, or -1 if the read + // was unsuccessful. var success = false var res = 0 var cont = true @@ -99,26 +120,39 @@ private[io] final class InputOutputBuffer(private[this] val capacity: Int) { sel while (cont) { self.synchronized { if (head != tail) { + // There is at least one byte available for reading. val available = tail - head + // Check whether the whole read request can be fulfilled right now, + // or just a part of it. val toRead = math.min(available, length) + // Transfer the bytes to the provided byte array. System.arraycopy(buffer, head % capacity, b, offset, toRead) + // The bytes are marked as read by advancing the head of the + // circular buffer. head += toRead + // Read request bookkeeping. offset += toRead length -= toRead res += toRead success = true + // Notify a writer that some space has been freed up in the buffer. writerPermit.release() if (length == 0) { + // Notify a next reader. readerPermit.release() cont = false } } else if (closed) { + // The Input/OutputStream pipe has been closed. Release the obtained + // permit such that future writers are not blocked forever. readerPermit.release() cont = false } } if (cont) { + // There is nothing to be read from the buffer at this moment. + // Wait until notified by a writer. readerPermit.acquire() } } @@ -128,6 +162,9 @@ private[io] final class InputOutputBuffer(private[this] val capacity: Int) { sel override def close(): Unit = self.synchronized { closed = true + // Immediately notify the first registered reader/writer. The rest will + // be notified by the read/write mechanism which takes into account the + // state of the Input/OutputStream. readerPermit.release() writerPermit.release() } @@ -139,24 +176,38 @@ private[io] final class InputOutputBuffer(private[this] val capacity: Int) { sel val outputStream: OutputStream = new OutputStream { def write(b: Int): Unit = { - while (true) + // Obtain permission to write to the buffer. Used for backpressuring + // writers when the buffer is full. + writerPermit.acquire() + + while (true) { self.synchronized { if (tail - head < capacity) { + // There is capacity for at least one byte to be written. buffer(tail % capacity) = (b & 0xff).toByte + // The byte is marked as written by advancing the tail of the + // circular buffer. tail += 1 + // Notify a reader that there is new data in the buffer. readerPermit.release() + // Notify a next writer. writerPermit.release() return } else if (closed) { + // The Input/OutputStream pipe has been closed. Release the obtained + // permit such that future writers are not blocked forever. writerPermit.release() return } } - writerPermit.acquire() + // The buffer is currently full. Wait until notified by a reader. + writerPermit.acquire() + } } override def write(b: Array[Byte], off: Int, len: Int): Unit = { + // This branching satisfies the OutputStream#write interface. if (b eq null) throw new NullPointerException("Cannot read into a null byte array") else if (off < 0) throw new IndexOutOfBoundsException(s"Negative offset into the byte array: $off") @@ -167,35 +218,59 @@ private[io] final class InputOutputBuffer(private[this] val capacity: Int) { sel s"Specified length is greater than the remaining length of the byte array after the offset: len = $len, capacity = ${b.length - off}" ) + // Obtain permission to write to the buffer. Used for backpressuring + // writers when the buffer is full. writerPermit.acquire() + // Variables used to track the progress of the writing. It can happen that + // the current leftover capacity of the buffer cannot fulfill the write + // request and it needs to be done in several iterations after more data + // has been written into the buffer. var offset = off var length = len while (true) { self.synchronized { if (tail - head < capacity) { + // There is capacity for at least one byte to be written. val available = capacity - (tail - head) + // Check whether the whole write request can be fulfilled right now, + // or just a part of it. val toWrite = math.min(available, length) + // Transfer the bytes to the provided byte array. System.arraycopy(b, offset, buffer, tail % capacity, toWrite) + // The bytes are marked as written by advancing the tail of the + // circular buffer. tail += toWrite + // Write request bookkeeping. offset += toWrite length -= toWrite + // Notify a reader that there is new data in the buffer. readerPermit.release() if (length == 0) { + // Notify a next writer. + writerPermit.release() return } } else if (closed) { + // The Input/OutputStream pipe has been closed. Release the obtained + // permit such that future writers are not blocked forever. + writerPermit.release() return } } + // The buffer is currently full. Wait until notified by a reader. writerPermit.acquire() } } override def close(): Unit = self.synchronized { closed = true + // Immediately notify the first registered reader/writer. The rest will + // be notified by the read/write mechanism which takes into account the + // state of the Input/OutputStream. + writerPermit.release() readerPermit.release() } } From eaab9dacda3457f1ee7555f8f7a1e2bd8a19b27b Mon Sep 17 00:00:00 2001 From: Vasil Vasilev Date: Mon, 3 May 2021 04:00:24 +0200 Subject: [PATCH 13/20] Add the original reproduction as a unit test --- io/src/test/scala/fs2/io/IoSuite.scala | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/io/src/test/scala/fs2/io/IoSuite.scala b/io/src/test/scala/fs2/io/IoSuite.scala index 9fef0443a9..10ddba379e 100644 --- a/io/src/test/scala/fs2/io/IoSuite.scala +++ b/io/src/test/scala/fs2/io/IoSuite.scala @@ -147,6 +147,17 @@ class IoSuite extends Fs2Suite { .map(chunk => assertEquals(chunk.size, chunkSize.value)) } } + + test("PipedInput/OutputStream used to track threads, fs2 reimplementation works") { + readOutputStream(1024) { os => + IO.blocking { + val t = new Thread(() => os.write(123)) + t.start + t.join + Thread.sleep(100L) + } + }.compile.drain.map(_ => assert(true)) + } } group("unsafeReadInputStream") { From 46731dd3e2ad84d1fb6f9b8e9c49daf45911de1a Mon Sep 17 00:00:00 2001 From: Vasil Vasilev Date: Mon, 3 May 2021 11:14:03 +0200 Subject: [PATCH 14/20] Fix InputOutputStream scaladoc --- io/src/main/scala/fs2/io/internal/InputOutputBuffer.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/io/src/main/scala/fs2/io/internal/InputOutputBuffer.scala b/io/src/main/scala/fs2/io/internal/InputOutputBuffer.scala index 8d066e15dd..cd5e9d805c 100644 --- a/io/src/main/scala/fs2/io/internal/InputOutputBuffer.scala +++ b/io/src/main/scala/fs2/io/internal/InputOutputBuffer.scala @@ -24,9 +24,9 @@ package fs2.io.internal import java.io.{InputStream, OutputStream} import java.util.concurrent.Semaphore -/** Thread safe circular byte buffer which connects a [[java.io.OutputStream]] - * to a [[java.io.OutputStream]] in a memory efficient manner, without copying - * bytes unnecessarily. +/** Thread safe circular byte buffer which pipes a [[java.io.OutputStream]] + * through a [[java.io.InputStream]] in a memory efficient manner, without + * copying bytes unnecessarily. * * @note As per the interfaces of the [[java.io]] classes, all of the * operations are blocking in nature and extra care should be taken when using From 9a3ccd807558e21818196bc459f4cc460342b555 Mon Sep 17 00:00:00 2001 From: Vasil Vasilev Date: Wed, 5 May 2021 01:01:08 +0200 Subject: [PATCH 15/20] Rename to PipedStreamBuffer to avoid confusion --- .../{InputOutputBuffer.scala => PipedStreamBuffer.scala} | 2 +- io/src/main/scala/fs2/io/io.scala | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) rename io/src/main/scala/fs2/io/internal/{InputOutputBuffer.scala => PipedStreamBuffer.scala} (99%) diff --git a/io/src/main/scala/fs2/io/internal/InputOutputBuffer.scala b/io/src/main/scala/fs2/io/internal/PipedStreamBuffer.scala similarity index 99% rename from io/src/main/scala/fs2/io/internal/InputOutputBuffer.scala rename to io/src/main/scala/fs2/io/internal/PipedStreamBuffer.scala index cd5e9d805c..3241da510e 100644 --- a/io/src/main/scala/fs2/io/internal/InputOutputBuffer.scala +++ b/io/src/main/scala/fs2/io/internal/PipedStreamBuffer.scala @@ -41,7 +41,7 @@ import java.util.concurrent.Semaphore * * @param capacity the capacity of the allocated circular buffer */ -private[io] final class InputOutputBuffer(private[this] val capacity: Int) { self => +private[io] final class PipedStreamBuffer(private[this] val capacity: Int) { self => private[this] val buffer: Array[Byte] = new Array(capacity) diff --git a/io/src/main/scala/fs2/io/io.scala b/io/src/main/scala/fs2/io/io.scala index 78d8aa35a3..e2d77b77b7 100644 --- a/io/src/main/scala/fs2/io/io.scala +++ b/io/src/main/scala/fs2/io/io.scala @@ -26,7 +26,7 @@ import cats.effect.kernel.{Async, Outcome, Resource, Sync} import cats.effect.kernel.implicits._ import cats.effect.kernel.Deferred import cats.syntax.all._ -import fs2.io.internal.InputOutputBuffer +import fs2.io.internal.PipedStreamBuffer import java.io.{InputStream, OutputStream} import java.nio.charset.Charset @@ -134,7 +134,7 @@ package object io { ): Stream[F, Byte] = { val mkOutput: Resource[F, (OutputStream, InputStream)] = Resource.make(Sync[F].delay { - val buf = new InputOutputBuffer(chunkSize) + val buf = new PipedStreamBuffer(chunkSize) (buf.outputStream, buf.inputStream) })(ois => Sync[F].blocking { From 9efc6ba86e806c3f91ea2120f041e8b8746fab1d Mon Sep 17 00:00:00 2001 From: Vasil Vasilev Date: Wed, 5 May 2021 01:05:00 +0200 Subject: [PATCH 16/20] Calling close multiple times is a noop --- .../fs2/io/internal/PipedStreamBuffer.scala | 28 +++++++++++-------- 1 file changed, 16 insertions(+), 12 deletions(-) diff --git a/io/src/main/scala/fs2/io/internal/PipedStreamBuffer.scala b/io/src/main/scala/fs2/io/internal/PipedStreamBuffer.scala index 3241da510e..25c96f6b03 100644 --- a/io/src/main/scala/fs2/io/internal/PipedStreamBuffer.scala +++ b/io/src/main/scala/fs2/io/internal/PipedStreamBuffer.scala @@ -161,12 +161,14 @@ private[io] final class PipedStreamBuffer(private[this] val capacity: Int) { sel } override def close(): Unit = self.synchronized { - closed = true - // Immediately notify the first registered reader/writer. The rest will - // be notified by the read/write mechanism which takes into account the - // state of the Input/OutputStream. - readerPermit.release() - writerPermit.release() + if (!closed) { + closed = true + // Immediately notify the first registered reader/writer. The rest will + // be notified by the read/write mechanism which takes into account the + // state of the Input/OutputStream. + readerPermit.release() + writerPermit.release() + } } override def available(): Int = self.synchronized { @@ -266,12 +268,14 @@ private[io] final class PipedStreamBuffer(private[this] val capacity: Int) { sel } override def close(): Unit = self.synchronized { - closed = true - // Immediately notify the first registered reader/writer. The rest will - // be notified by the read/write mechanism which takes into account the - // state of the Input/OutputStream. - writerPermit.release() - readerPermit.release() + if (!closed) { + closed = true + // Immediately notify the first registered reader/writer. The rest will + // be notified by the read/write mechanism which takes into account the + // state of the Input/OutputStream. + writerPermit.release() + readerPermit.release() + } } } } From 5da8ed4ea6408bef13fd326c9ef49c7a3d6102ee Mon Sep 17 00:00:00 2001 From: Vasil Vasilev Date: Wed, 5 May 2021 02:22:15 +0200 Subject: [PATCH 17/20] Introduce Synchronizer --- .../fs2/io/internal/PipedStreamBuffer.scala | 5 ++- .../scala/fs2/io/internal/Synchronizer.scala | 32 +++++++++++++++++++ 2 files changed, 34 insertions(+), 3 deletions(-) create mode 100644 io/src/main/scala/fs2/io/internal/Synchronizer.scala diff --git a/io/src/main/scala/fs2/io/internal/PipedStreamBuffer.scala b/io/src/main/scala/fs2/io/internal/PipedStreamBuffer.scala index 25c96f6b03..b2b12e8e5b 100644 --- a/io/src/main/scala/fs2/io/internal/PipedStreamBuffer.scala +++ b/io/src/main/scala/fs2/io/internal/PipedStreamBuffer.scala @@ -22,7 +22,6 @@ package fs2.io.internal import java.io.{InputStream, OutputStream} -import java.util.concurrent.Semaphore /** Thread safe circular byte buffer which pipes a [[java.io.OutputStream]] * through a [[java.io.InputStream]] in a memory efficient manner, without @@ -50,8 +49,8 @@ private[io] final class PipedStreamBuffer(private[this] val capacity: Int) { sel private[this] var closed: Boolean = false - private[this] val readerPermit: Semaphore = new Semaphore(1) - private[this] val writerPermit: Semaphore = new Semaphore(1) + private[this] val readerPermit: Synchronizer = new Synchronizer() + private[this] val writerPermit: Synchronizer = new Synchronizer() val inputStream: InputStream = new InputStream { def read(): Int = { diff --git a/io/src/main/scala/fs2/io/internal/Synchronizer.scala b/io/src/main/scala/fs2/io/internal/Synchronizer.scala new file mode 100644 index 0000000000..8ad60a767d --- /dev/null +++ b/io/src/main/scala/fs2/io/internal/Synchronizer.scala @@ -0,0 +1,32 @@ +/* + * Copyright (c) 2013 Functional Streams for Scala + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to + * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + * the Software, and to permit persons to whom the Software is furnished to do so, + * subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +package fs2.io.internal + +import java.util.concurrent.Semaphore + +private final class Synchronizer { + private[this] val underlying: Semaphore = new Semaphore(1) + + def acquire(): Unit = underlying.acquire() + + def release(): Unit = underlying.release() +} From b96568775dcaaed3d27c8076c7a40de2802c3b8c Mon Sep 17 00:00:00 2001 From: Vasil Vasilev Date: Wed, 5 May 2021 09:57:20 +0200 Subject: [PATCH 18/20] Implement a custom queued synchronizer --- .../scala/fs2/io/internal/Synchronizer.scala | 20 +++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/io/src/main/scala/fs2/io/internal/Synchronizer.scala b/io/src/main/scala/fs2/io/internal/Synchronizer.scala index 8ad60a767d..c20f174a3a 100644 --- a/io/src/main/scala/fs2/io/internal/Synchronizer.scala +++ b/io/src/main/scala/fs2/io/internal/Synchronizer.scala @@ -21,12 +21,24 @@ package fs2.io.internal -import java.util.concurrent.Semaphore +import java.util.concurrent.locks.AbstractQueuedSynchronizer private final class Synchronizer { - private[this] val underlying: Semaphore = new Semaphore(1) + private[this] val underlying = new AbstractQueuedSynchronizer { + setState(1) - def acquire(): Unit = underlying.acquire() + override def tryAcquire(arg: Int): Boolean = compareAndSetState(1, 0) - def release(): Unit = underlying.release() + override def tryRelease(arg: Int): Boolean = { + setState(1) + true + } + } + + def acquire(): Unit = underlying.acquireInterruptibly(0) + + def release(): Unit = { + underlying.release(0) + () + } } From 859eb18f84d6c8407b8e1c03b4485f334d8f3820 Mon Sep 17 00:00:00 2001 From: Vasil Vasilev Date: Wed, 5 May 2021 11:29:14 +0200 Subject: [PATCH 19/20] Check if the pipe has been closed before blocking --- .../scala/fs2/io/internal/PipedStreamBuffer.scala | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/io/src/main/scala/fs2/io/internal/PipedStreamBuffer.scala b/io/src/main/scala/fs2/io/internal/PipedStreamBuffer.scala index b2b12e8e5b..ddd13915d6 100644 --- a/io/src/main/scala/fs2/io/internal/PipedStreamBuffer.scala +++ b/io/src/main/scala/fs2/io/internal/PipedStreamBuffer.scala @@ -149,7 +149,10 @@ private[io] final class PipedStreamBuffer(private[this] val capacity: Int) { sel } } - if (cont) { + // We need to be careful not to block the thread if the pipe has been + // closed, otherwise we risk a deadlock. When the pipe is closed, this + // reader will loop again and execute the correct logic. + if (!closed && cont) { // There is nothing to be read from the buffer at this moment. // Wait until notified by a writer. readerPermit.acquire() @@ -262,7 +265,12 @@ private[io] final class PipedStreamBuffer(private[this] val capacity: Int) { sel } // The buffer is currently full. Wait until notified by a reader. - writerPermit.acquire() + // We need to be careful not to block the thread if the pipe has been + // closed, otherwise we risk a deadlock. When the pipe is closed, this + // writer will loop again and execute the correct logic. + if (!closed) { + writerPermit.acquire() + } } } From 69601ef0b71e2ca64dd116ca47b12a0cba50974b Mon Sep 17 00:00:00 2001 From: Vasil Vasilev Date: Wed, 5 May 2021 12:47:06 +0200 Subject: [PATCH 20/20] Add Synchronizer scaladoc --- io/src/main/scala/fs2/io/internal/Synchronizer.scala | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/io/src/main/scala/fs2/io/internal/Synchronizer.scala b/io/src/main/scala/fs2/io/internal/Synchronizer.scala index c20f174a3a..962d01cb43 100644 --- a/io/src/main/scala/fs2/io/internal/Synchronizer.scala +++ b/io/src/main/scala/fs2/io/internal/Synchronizer.scala @@ -23,18 +23,28 @@ package fs2.io.internal import java.util.concurrent.locks.AbstractQueuedSynchronizer +/** An alternative implementation of [[java.util.concurrent.Semaphore]] which + * holds ''at most'' 1 permit. In the case that this synchronizer is not + * acquired, any calls to [[Synchronizer#release]] will ''not'' increase the + * permit count, i.e. this synchronizer can still only be acquired by a + * ''single'' thread calling [[Synchronizer#acquire]]. + */ private final class Synchronizer { private[this] val underlying = new AbstractQueuedSynchronizer { + // There is 1 available permit when the synchronizer is constructed. setState(1) override def tryAcquire(arg: Int): Boolean = compareAndSetState(1, 0) override def tryRelease(arg: Int): Boolean = { + // Unconditionally make 1 permit available for taking. This operation + // limits the available permits to at most 1 at any time. setState(1) true } } + @throws[InterruptedException] def acquire(): Unit = underlying.acquireInterruptibly(0) def release(): Unit = {