Skip to content

Commit

Permalink
Merge pull request #85 from armanbilge/pr/array-ptr-optimizations
Browse files Browse the repository at this point in the history
Optimize socket read/write with array ptrs
  • Loading branch information
armanbilge authored Nov 19, 2022
2 parents 4c7a5c3 + 4d931d3 commit 9c00d8c
Showing 1 changed file with 69 additions and 67 deletions.
136 changes: 69 additions & 67 deletions core/src/main/scala/epollcat/internal/ch/EpollAsyncSocketChannel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -114,42 +114,42 @@ final class EpollAsyncSocketChannel private (
handler: CompletionHandler[Integer, _ >: A]
): Unit =
if (readReady) {
Zone { implicit z =>
val count = dst.remaining()
val buf = alloc[Byte](count.toLong)

def completed(total: Int): Unit = {
var i = 0
while (i < total) {
dst.put(buf(i.toLong))
i += 1
}
handler.completed(total, attachment)
}
val position = dst.position()
val count = dst.remaining()
val hasArray = dst.hasArray()
val buf = if (hasArray) dst.array() else new Array[Byte](count)
val offset = if (hasArray) dst.arrayOffset() + position else 0

@tailrec
def go(buf: Ptr[Byte], count: Int, total: Int): Unit = {
val readed = posix.unistd.read(fd, buf, count.toULong)
if (readed == -1) {
val e = errno.errno
if (e == posix.errno.EAGAIN || e == posix.errno.EWOULDBLOCK) {
readReady = false
completed(total)
} else
handler.failed(new RuntimeException(s"read: $e"), attachment)
} else if (readed == 0) {
if (total > 0)
completed(total)
else
handler.completed(-1, attachment)
} else if (readed < count)
go(buf + readed.toLong, count - readed, total + readed)
else // readed == count
completed(total + readed)
}
def completed(total: Int): Unit = {
if (hasArray)
dst.position(position + total)
else
dst.put(buf, 0, total)
handler.completed(total, attachment)
}

go(buf, count, 0)
@tailrec
def go(buf: Ptr[Byte], count: Int, total: Int): Unit = {
val readed = posix.unistd.read(fd, buf, count.toULong)
if (readed == -1) {
val e = errno.errno
if (e == posix.errno.EAGAIN || e == posix.errno.EWOULDBLOCK) {
readReady = false
completed(total)
} else
handler.failed(new RuntimeException(s"read: $e"), attachment)
} else if (readed == 0) {
if (total > 0)
completed(total)
else
handler.completed(-1, attachment)
} else if (readed < count)
go(buf + readed.toLong, count - readed, total + readed)
else // readed == count
completed(total + readed)
}

go(buf.at(offset), count, 0)
} else {
readCallback = () => {
readCallback = null
Expand Down Expand Up @@ -294,44 +294,46 @@ final class EpollAsyncSocketChannel private (
): Unit = if (outputShutdown)
handler.failed(new ClosedChannelException, attachment)
else if (writeReady) {
Zone { implicit z =>
val position = src.position()
val count = src.remaining()
val buf = alloc[Byte](count.toLong)
var i = 0
while (i < count) {
buf(i.toLong) = src.get(position + i)
i += 1
val position = src.position()
val count = src.remaining()

val hasArray = src.hasArray()
val buf =
if (hasArray) src.array()
else {
val buf = new Array[Byte](count)
src.get(buf)
buf
}
val offset = if (hasArray) src.arrayOffset() + position else 0

def completed(total: Int): Unit = {
src.position(position + total)
handler.completed(total, attachment)
}

@tailrec
def go(buf: Ptr[Byte], count: Int, total: Int): Unit = {
val wrote =
if (LinktimeInfo.isLinux)
posix.sys.socket.send(fd, buf, count.toULong, socket.MSG_NOSIGNAL).toInt
else
posix.unistd.write(fd, buf, count.toULong)

if (wrote == -1) {
val e = errno.errno
if (e == posix.errno.EAGAIN || e == posix.errno.EWOULDBLOCK) {
writeReady = false
completed(total)
} else
handler.failed(new RuntimeException(s"write: $e"), attachment)
} else if (wrote < count)
go(buf + wrote.toLong, count - wrote, total + wrote)
else // wrote == count
completed(total + wrote)
}
def completed(total: Int): Unit = {
src.position(position + total)
handler.completed(total, attachment)
}

go(buf, count, 0)
@tailrec
def go(buf: Ptr[Byte], count: Int, total: Int): Unit = {
val wrote =
if (LinktimeInfo.isLinux)
posix.sys.socket.send(fd, buf, count.toULong, socket.MSG_NOSIGNAL).toInt
else
posix.unistd.write(fd, buf, count.toULong)

if (wrote == -1) {
val e = errno.errno
if (e == posix.errno.EAGAIN || e == posix.errno.EWOULDBLOCK) {
writeReady = false
completed(total)
} else
handler.failed(new RuntimeException(s"write: $e"), attachment)
} else if (wrote < count)
go(buf + wrote.toLong, count - wrote, total + wrote)
else // wrote == count
completed(total + wrote)
}

go(buf.at(offset), count, 0)
} else {
writeCallback = () => {
writeCallback = null
Expand Down

0 comments on commit 9c00d8c

Please sign in to comment.