Skip to content

Commit

Permalink
timers: allow Immediates to be unrefed
Browse files Browse the repository at this point in the history
Refactor Immediates handling to allow for them to be unrefed, similar
to setTimeout, but without extra handles.

Document the new `immediate.ref()` and `immediate.unref()` methods.

Add SetImmediateUnref on the C++ side.

Backport-PR-URL: #19006
PR-URL: #18139
Reviewed-By: Anna Henningsen <anna@addaleax.net>
Reviewed-By: Franziska Hinkelmann <franziska.hinkelmann@gmail.com>
Reviewed-By: James M Snell <jasnell@gmail.com>
  • Loading branch information
apapirovski authored and addaleax committed Feb 26, 2018
1 parent 01ef0f1 commit 319cd44
Show file tree
Hide file tree
Showing 10 changed files with 249 additions and 102 deletions.
32 changes: 32 additions & 0 deletions doc/api/timers.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,38 @@ This object is created internally and is returned from [`setImmediate()`][]. It
can be passed to [`clearImmediate()`][] in order to cancel the scheduled
actions.

By default, when an immediate is scheduled, the Node.js event loop will continue
running as long as the immediate is active. The `Immediate` object returned by
[`setImmediate()`][] exports both `immediate.ref()` and `immediate.unref()`
functions that can be used to control this default behavior.

### immediate.ref()
<!-- YAML
added: REPLACEME
-->

When called, requests that the Node.js event loop *not* exit so long as the
`Immediate` is active. Calling `immediate.ref()` multiple times will have no
effect.

*Note*: By default, all `Immediate` objects are "ref'd", making it normally
unnecessary to call `immediate.ref()` unless `immediate.unref()` had been called
previously.

Returns a reference to the `Immediate`.

### immediate.unref()
<!-- YAML
added: REPLACEME
-->

When called, the active `Immediate` object will not require the Node.js event
loop to remain active. If there is no other activity keeping the event loop
running, the process may exit before the `Immediate` object's callback is
invoked. Calling `immediate.unref()` multiple times will have no effect.

Returns a reference to the `Immediate`.

## Class: Timeout

This object is created internally and is returned from [`setTimeout()`][] and
Expand Down
143 changes: 84 additions & 59 deletions lib/timers.js
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,14 @@ const trigger_async_id_symbol = Symbol('triggerAsyncId');

// *Must* match Environment::ImmediateInfo::Fields in src/env.h.
const kCount = 0;
const kHasOutstanding = 1;
const kRefCount = 1;
const kHasOutstanding = 2;

const [activateImmediateCheck, immediateInfo] =
const [immediateInfo, toggleImmediateRef] =
setImmediateCallback(processImmediate);

const kRefed = Symbol('refed');

// Timeout values > TIMEOUT_MAX are set to 1.
const TIMEOUT_MAX = 2 ** 31 - 1;

Expand Down Expand Up @@ -690,42 +693,41 @@ function processImmediate() {
const queue = outstandingQueue.head !== null ?
outstandingQueue : immediateQueue;
var immediate = queue.head;
var tail = queue.tail;
const tail = queue.tail;

// Clear the linked list early in case new `setImmediate()` calls occur while
// immediate callbacks are executed
queue.head = queue.tail = null;

while (immediate !== null) {
if (!immediate._onImmediate) {
immediate = immediate._idleNext;
continue;
}
let count = 0;
let refCount = 0;

// Save next in case `clearImmediate(immediate)` is called from callback
const next = immediate._idleNext;
while (immediate !== null) {
immediate._destroyed = true;

const asyncId = immediate[async_id_symbol];
emitBefore(asyncId, immediate[trigger_async_id_symbol]);

tryOnImmediate(immediate, next, tail);
count++;
if (immediate[kRefed])
refCount++;
immediate[kRefed] = undefined;

tryOnImmediate(immediate, tail, count, refCount);

emitAfter(asyncId);

// If `clearImmediate(immediate)` wasn't called from the callback, use the
// `immediate`'s next item
if (immediate._idleNext !== null)
immediate = immediate._idleNext;
else
immediate = next;
immediate = immediate._idleNext;
}

immediateInfo[kCount] -= count;
immediateInfo[kRefCount] -= refCount;
immediateInfo[kHasOutstanding] = 0;
}

// An optimization so that the try/finally only de-optimizes (since at least v8
// 4.7) what is in this smaller function.
function tryOnImmediate(immediate, next, oldTail) {
function tryOnImmediate(immediate, oldTail, count, refCount) {
var threw = true;
try {
// make the actual call outside the try/finally to allow it to be optimized
Expand All @@ -734,21 +736,21 @@ function tryOnImmediate(immediate, next, oldTail) {
} finally {
immediate._onImmediate = null;

if (!immediate._destroyed) {
immediate._destroyed = true;
immediateInfo[kCount]--;

if (async_hook_fields[kDestroy] > 0) {
emitDestroy(immediate[async_id_symbol]);
}
if (async_hook_fields[kDestroy] > 0) {
emitDestroy(immediate[async_id_symbol]);
}

if (threw && (immediate._idleNext !== null || next !== null)) {
// Handle any remaining Immediates after error handling has resolved,
// assuming we're still alive to do so.
outstandingQueue.head = immediate._idleNext || next;
outstandingQueue.tail = oldTail;
immediateInfo[kHasOutstanding] = 1;
if (threw) {
immediateInfo[kCount] -= count;
immediateInfo[kRefCount] -= refCount;

if (immediate._idleNext !== null) {
// Handle any remaining Immediates after error handling has resolved,
// assuming we're still alive to do so.
outstandingQueue.head = immediate._idleNext;
outstandingQueue.tail = oldTail;
immediateInfo[kHasOutstanding] = 1;
}
}
}
}
Expand All @@ -763,31 +765,51 @@ function runCallback(timer) {
}


function Immediate(callback, args) {
this._idleNext = null;
this._idlePrev = null;
// this must be set to null first to avoid function tracking
// on the hidden class, revisit in V8 versions after 6.2
this._onImmediate = null;
this._onImmediate = callback;
this._argv = args;
this._destroyed = false;
const Immediate = class Immediate {
constructor(callback, args) {
this._idleNext = null;
this._idlePrev = null;
// this must be set to null first to avoid function tracking
// on the hidden class, revisit in V8 versions after 6.2
this._onImmediate = null;
this._onImmediate = callback;
this._argv = args;
this._destroyed = false;
this[kRefed] = false;

this[async_id_symbol] = ++async_id_fields[kAsyncIdCounter];
this[trigger_async_id_symbol] = getDefaultTriggerAsyncId();
if (async_hook_fields[kInit] > 0) {
emitInit(this[async_id_symbol],
'Immediate',
this[trigger_async_id_symbol],
this);
}

this[async_id_symbol] = ++async_id_fields[kAsyncIdCounter];
this[trigger_async_id_symbol] = getDefaultTriggerAsyncId();
if (async_hook_fields[kInit] > 0) {
emitInit(this[async_id_symbol],
'Immediate',
this[trigger_async_id_symbol],
this);
this.ref();
immediateInfo[kCount]++;

immediateQueue.append(this);
}

if (immediateInfo[kCount] === 0)
activateImmediateCheck();
immediateInfo[kCount]++;
ref() {
if (this[kRefed] === false) {
this[kRefed] = true;
if (immediateInfo[kRefCount]++ === 0)
toggleImmediateRef(true);
}
return this;
}

immediateQueue.append(this);
}
unref() {
if (this[kRefed] === true) {
this[kRefed] = false;
if (--immediateInfo[kRefCount] === 0)
toggleImmediateRef(false);
}
return this;
}
};

function setImmediate(callback, arg1, arg2, arg3) {
if (typeof callback !== 'function') {
Expand Down Expand Up @@ -827,15 +849,18 @@ exports.setImmediate = setImmediate;


exports.clearImmediate = function(immediate) {
if (!immediate) return;
if (!immediate || immediate._destroyed)
return;

if (!immediate._destroyed) {
immediateInfo[kCount]--;
immediate._destroyed = true;
immediateInfo[kCount]--;
immediate._destroyed = true;

if (async_hook_fields[kDestroy] > 0) {
emitDestroy(immediate[async_id_symbol]);
}
if (immediate[kRefed] && --immediateInfo[kRefCount] === 0)
toggleImmediateRef(false);
immediate[kRefed] = undefined;

if (async_hook_fields[kDestroy] > 0) {
emitDestroy(immediate[async_id_symbol]);
}

immediate._onImmediate = null;
Expand Down
40 changes: 34 additions & 6 deletions src/env-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,10 @@ inline uint32_t Environment::ImmediateInfo::count() const {
return fields_[kCount];
}

inline uint32_t Environment::ImmediateInfo::ref_count() const {
return fields_[kRefCount];
}

inline bool Environment::ImmediateInfo::has_outstanding() const {
return fields_[kHasOutstanding] == 1;
}
Expand All @@ -241,6 +245,14 @@ inline void Environment::ImmediateInfo::count_dec(uint32_t decrement) {
fields_[kCount] = fields_[kCount] - decrement;
}

inline void Environment::ImmediateInfo::ref_count_inc(uint32_t increment) {
fields_[kRefCount] = fields_[kRefCount] + increment;
}

inline void Environment::ImmediateInfo::ref_count_dec(uint32_t decrement) {
fields_[kRefCount] = fields_[kRefCount] - decrement;
}

inline Environment::TickInfo::TickInfo(v8::Isolate* isolate)
: fields_(isolate, kFieldsCount) {}

Expand Down Expand Up @@ -514,20 +526,36 @@ inline void Environment::set_fs_stats_field_array(double* fields) {
fs_stats_field_array_ = fields;
}

void Environment::SetImmediate(native_immediate_callback cb,
void Environment::CreateImmediate(native_immediate_callback cb,
void* data,
v8::Local<v8::Object> obj) {
v8::Local<v8::Object> obj,
bool ref) {
native_immediate_callbacks_.push_back({
cb,
data,
std::unique_ptr<v8::Persistent<v8::Object>>(
obj.IsEmpty() ? nullptr : new v8::Persistent<v8::Object>(isolate_, obj))
std::unique_ptr<v8::Persistent<v8::Object>>(obj.IsEmpty() ?
nullptr : new v8::Persistent<v8::Object>(isolate_, obj)),
ref
});
if (immediate_info()->count() == 0)
ActivateImmediateCheck();
immediate_info()->count_inc(1);
}

void Environment::SetImmediate(native_immediate_callback cb,
void* data,
v8::Local<v8::Object> obj) {
CreateImmediate(cb, data, obj, true);

if (immediate_info()->ref_count() == 0)
ToggleImmediateRef(true);
immediate_info()->ref_count_inc(1);
}

void Environment::SetUnrefImmediate(native_immediate_callback cb,
void* data,
v8::Local<v8::Object> obj) {
CreateImmediate(cb, data, obj, false);
}

inline performance::performance_state* Environment::performance_state() {
return performance_state_.get();
}
Expand Down
36 changes: 19 additions & 17 deletions src/env.cc
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ void Environment::Start(int argc,

uv_idle_init(event_loop(), immediate_idle_handle());

uv_check_start(immediate_check_handle(), CheckImmediate);

// Inform V8's CPU profiler when we're idle. The profiler is sampling-based
// but not all samples are created equal; mark the wall clock time spent in
// epoll_wait() and friends so profiling tools can filter it out. The samples
Expand Down Expand Up @@ -272,39 +274,35 @@ void Environment::EnvPromiseHook(v8::PromiseHookType type,
void Environment::RunAndClearNativeImmediates() {
size_t count = native_immediate_callbacks_.size();
if (count > 0) {
size_t ref_count = 0;
std::vector<NativeImmediateCallback> list;
native_immediate_callbacks_.swap(list);
for (const auto& cb : list) {
cb.cb_(this, cb.data_);
if (cb.keep_alive_)
cb.keep_alive_->Reset();
if (cb.refed_)
ref_count++;
}

#ifdef DEBUG
CHECK_GE(immediate_info()->count(), count);
#endif
immediate_info()->count_dec(count);
immediate_info()->ref_count_dec(ref_count);
}
}

static bool MaybeStopImmediate(Environment* env) {
if (env->immediate_info()->count() == 0) {
uv_check_stop(env->immediate_check_handle());
uv_idle_stop(env->immediate_idle_handle());
return true;
}
return false;
}


void Environment::CheckImmediate(uv_check_t* handle) {
Environment* env = Environment::from_immediate_check_handle(handle);
HandleScope scope(env->isolate());
Context::Scope context_scope(env->context());

if (MaybeStopImmediate(env))
if (env->immediate_info()->count() == 0)
return;

HandleScope scope(env->isolate());
Context::Scope context_scope(env->context());

env->RunAndClearNativeImmediates();

do {
Expand All @@ -316,13 +314,17 @@ void Environment::CheckImmediate(uv_check_t* handle) {
{0, 0}).ToLocalChecked();
} while (env->immediate_info()->has_outstanding());

MaybeStopImmediate(env);
if (env->immediate_info()->ref_count() == 0)
env->ToggleImmediateRef(false);
}

void Environment::ActivateImmediateCheck() {
uv_check_start(&immediate_check_handle_, CheckImmediate);
// Idle handle is needed only to stop the event loop from blocking in poll.
uv_idle_start(&immediate_idle_handle_, [](uv_idle_t*){ });
void Environment::ToggleImmediateRef(bool ref) {
if (ref) {
// Idle handle is needed only to stop the event loop from blocking in poll.
uv_idle_start(immediate_idle_handle(), [](uv_idle_t*){ });
} else {
uv_idle_stop(immediate_idle_handle());
}
}

void Environment::AsyncHooks::grow_async_ids_stack() {
Expand Down
Loading

0 comments on commit 319cd44

Please sign in to comment.