Skip to content

Commit

Permalink
Merge pull request #62 from FraMecca/master
Browse files Browse the repository at this point in the history
TCPConnection.waitForDataAsync
  • Loading branch information
s-ludwig committed Feb 26, 2018
2 parents 98280be + 57d516a commit 6aa5cdd
Showing 1 changed file with 65 additions and 0 deletions.
65 changes: 65 additions & 0 deletions source/vibe/core/net.d
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,61 @@ mixin(tracer);
return m_context.readBuffer.length > 0;
}

/** Waits asynchronously for new data to arrive.
This function can be used to detach the `TCPConnection` from a
running task while waiting for data, so that the associated memory
resources are available for other operations.
Note that `read_ready_callback` may be called from outside of a
task, so no blocking operations may be performed. Instead, an existing
task should be notified, or a new one started with `runTask`.
Params:
read_ready_callback = A callback taking a `bool` parameter that
signals the read-readiness of the connection
timeout = Optional timeout to limit the maximum wait time
Returns:
If the read readiness can be determined immediately, it will be
returned as WaitForDataAsyncStatus.sataAvailable` or
`WaitForDataAsyncStatus.noModeData` and the callback will not be
invoked. Otherwise `WaitForDataAsyncStatus.waiting` is returned
and the callback will be invoked once the status can be
determined or the specified timeout is reached.
*/
WaitForDataAsyncStatus waitForDataAsync(CALLABLE)(CALLABLE read_ready_callback, Duration timeout = Duration.max)
if (is(typeof(read_ready_callback(true))))
{
mixin(tracer);
import vibe.core.core : setTimer;

if (!m_context)
return WaitForDataAsyncStatus.noMoreData;

if (m_context.readBuffer.length > 0)
return WaitForDataAsyncStatus.dataAvailable;

if (timeout <= 0.seconds) {
auto rs = waitForData(0.seconds);
return rs ? WaitForDataAsyncStatus.dataAvailable : WaitForDataAsyncStatus.noMoreData;
}

auto tm = setTimer(timeout, {
eventDriver.sockets.cancelRead(m_socket);
read_ready_callback(false);
});
eventDriver.sockets.read(m_socket, m_context.readBuffer.peekDst(), IOMode.once,
(sock, st, nb) {
tm.stop();
assert(m_context.readBuffer.length == 0);
m_context.readBuffer.putN(nb);
read_ready_callback(m_context.readBuffer.length > 0);
});

return WaitForDataAsyncStatus.waiting;
}

const(ubyte)[] peek() { return m_context ? m_context.readBuffer.peek() : null; }

void skip(ulong count)
Expand Down Expand Up @@ -684,6 +739,16 @@ mixin(tracer);
}
}

/** Represents possible return values for
TCPConnection.waitForDataAsync.
*/
enum WaitForDataAsyncStatus {
noMoreData,
dataAvailable,
waiting,
}


mixin validateConnectionStream!TCPConnection;

private void loopWithTimeout(alias LoopBody, ExceptionType = Exception)(Duration timeout)
Expand Down

0 comments on commit 6aa5cdd

Please sign in to comment.