Skip to content

Commit

Permalink
process: refactor nextTick for clarity
Browse files Browse the repository at this point in the history
Do not share unnecessary information about nextTick state
between JS & C++, instead only track whether a nextTick
is scheduled or not.

Turn nextTickQueue into an Object instead of a class
since multiple instances are never created.

Other assorted refinements and refactoring.

PR-URL: nodejs#17738
Reviewed-By: Anna Henningsen <anna@addaleax.net>
  • Loading branch information
apapirovski committed Jan 31, 2018
1 parent dd56bd1 commit 5257623
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 132 deletions.
150 changes: 49 additions & 101 deletions lib/internal/process/next_tick.js
Original file line number Diff line number Diff line change
@@ -1,47 +1,9 @@
'use strict';

// This value is used to prevent the nextTickQueue from becoming too
// large and cause the process to run out of memory. When this value
// is reached the nextTimeQueue array will be shortened (see tickDone
// for details).
const kMaxCallbacksPerLoop = 1e4;

exports.setup = setupNextTick;
// Will be overwritten when setupNextTick() is called.
exports.nextTick = null;

class NextTickQueue {
constructor() {
this.head = null;
this.tail = null;
}

push(v) {
const entry = { data: v, next: null };
if (this.tail !== null)
this.tail.next = entry;
else
this.head = entry;
this.tail = entry;
}

shift() {
if (this.head === null)
return;
const ret = this.head.data;
if (this.head === this.tail)
this.head = this.tail = null;
else
this.head = this.head.next;
return ret;
}

clear() {
this.head = null;
this.tail = null;
}
}

function setupNextTick() {
const async_wrap = process.binding('async_wrap');
const async_hooks = require('internal/async_hooks');
Expand All @@ -56,15 +18,47 @@ function setupNextTick() {
// Grab the constants necessary for working with internal arrays.
const { kInit, kDestroy, kAsyncIdCounter } = async_wrap.constants;
const { async_id_symbol, trigger_async_id_symbol } = async_wrap;
const nextTickQueue = new NextTickQueue();
var microtasksScheduled = false;

// Used to run V8's micro task queue.
var _runMicrotasks = {};
// tickInfo is used so that the C++ code in src/node.cc can
// have easy access to our nextTick state, and avoid unnecessary
// calls into JS land.
// runMicrotasks is used to run V8's micro task queue.
const [
tickInfo,
runMicrotasks
] = process._setupNextTick(_tickCallback);

// *Must* match Environment::TickInfo::Fields in src/env.h.
var kIndex = 0;
var kLength = 1;
const kScheduled = 0;

const nextTickQueue = {
head: null,
tail: null,
push(data) {
const entry = { data, next: null };
if (this.tail !== null) {
this.tail.next = entry;
} else {
this.head = entry;
tickInfo[kScheduled] = 1;
}
this.tail = entry;
},
shift() {
if (this.head === null)
return;
const ret = this.head.data;
if (this.head === this.tail) {
this.head = this.tail = null;
tickInfo[kScheduled] = 0;
} else {
this.head = this.head.next;
}
return ret;
}
};

var microtasksScheduled = false;

process.nextTick = nextTick;
// Needs to be accessible from beyond this scope.
Expand All @@ -73,25 +67,6 @@ function setupNextTick() {
// Set the nextTick() function for internal usage.
exports.nextTick = internalNextTick;

// This tickInfo thing is used so that the C++ code in src/node.cc
// can have easy access to our nextTick state, and avoid unnecessary
// calls into JS land.
const tickInfo = process._setupNextTick(_tickCallback, _runMicrotasks);

_runMicrotasks = _runMicrotasks.runMicrotasks;

function tickDone() {
if (tickInfo[kLength] !== 0) {
if (tickInfo[kLength] <= tickInfo[kIndex]) {
nextTickQueue.clear();
tickInfo[kLength] = 0;
} else {
tickInfo[kLength] -= tickInfo[kIndex];
}
}
tickInfo[kIndex] = 0;
}

const microTasksTickObject = {
callback: runMicrotasksCallback,
args: undefined,
Expand All @@ -105,38 +80,27 @@ function setupNextTick() {
// For the moment all microtasks come from the void until the PromiseHook
// API is implemented.
nextTickQueue.push(microTasksTickObject);

tickInfo[kLength]++;
microtasksScheduled = true;
}

function runMicrotasksCallback() {
microtasksScheduled = false;
_runMicrotasks();
runMicrotasks();

if (tickInfo[kIndex] < tickInfo[kLength] ||
emitPendingUnhandledRejections()) {
if (nextTickQueue.head !== null || emitPendingUnhandledRejections())
scheduleMicrotasks();
}
}

function _tickCallback() {
let tock;
do {
while (tickInfo[kIndex] < tickInfo[kLength]) {
++tickInfo[kIndex];
const tock = nextTickQueue.shift();

// CHECK(Number.isSafeInteger(tock[async_id_symbol]))
// CHECK(tock[async_id_symbol] > 0)
// CHECK(Number.isSafeInteger(tock[trigger_async_id_symbol]))
// CHECK(tock[trigger_async_id_symbol] > 0)

while (tock = nextTickQueue.shift()) {
const asyncId = tock[async_id_symbol];
emitBefore(asyncId, tock[trigger_async_id_symbol]);
// emitDestroy() places the async_id_symbol into an asynchronous queue
// that calls the destroy callback in the future. It's called before
// calling tock.callback so destroy will be called even if the callback
// throws an exception that is handles by 'uncaughtException' or a
// throws an exception that is handled by 'uncaughtException' or a
// domain.
// TODO(trevnorris): This is a bit of a hack. It relies on the fact
// that nextTick() doesn't allow the event loop to proceed, but if
Expand All @@ -152,24 +116,21 @@ function setupNextTick() {
Reflect.apply(callback, undefined, tock.args);

emitAfter(asyncId);

if (kMaxCallbacksPerLoop < tickInfo[kIndex])
tickDone();
}
tickDone();
_runMicrotasks();
runMicrotasks();
emitPendingUnhandledRejections();
} while (tickInfo[kLength] !== 0);
} while (nextTickQueue.head !== null);
}

class TickObject {
constructor(callback, args, asyncId, triggerAsyncId) {
constructor(callback, args, triggerAsyncId) {
// this must be set to null first to avoid function tracking
// on the hidden class, revisit in V8 versions after 6.2
this.callback = null;
this.callback = callback;
this.args = args;

const asyncId = ++async_id_fields[kAsyncIdCounter];
this[async_id_symbol] = asyncId;
this[trigger_async_id_symbol] = triggerAsyncId;

Expand Down Expand Up @@ -203,13 +164,7 @@ function setupNextTick() {
args[i - 1] = arguments[i];
}

// In V8 6.2, moving tickInfo & async_id_fields[kAsyncIdCounter] into the
// TickObject incurs a significant performance penalty in the
// next-tick-breadth-args benchmark (revisit later)
++tickInfo[kLength];
nextTickQueue.push(new TickObject(callback,
args,
++async_id_fields[kAsyncIdCounter],
nextTickQueue.push(new TickObject(callback, args,
getDefaultTriggerAsyncId()));
}

Expand Down Expand Up @@ -238,13 +193,6 @@ function setupNextTick() {

if (triggerAsyncId === null)
triggerAsyncId = getDefaultTriggerAsyncId();
// In V8 6.2, moving tickInfo & async_id_fields[kAsyncIdCounter] into the
// TickObject incurs a significant performance penalty in the
// next-tick-breadth-args benchmark (revisit later)
++tickInfo[kLength];
nextTickQueue.push(new TickObject(callback,
args,
++async_id_fields[kAsyncIdCounter],
triggerAsyncId));
nextTickQueue.push(new TickObject(callback, args, triggerAsyncId));
}
}
14 changes: 3 additions & 11 deletions src/env-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -222,24 +222,16 @@ inline Environment::TickInfo::TickInfo() {
fields_[i] = 0;
}

inline uint32_t* Environment::TickInfo::fields() {
inline uint8_t* Environment::TickInfo::fields() {
return fields_;
}

inline int Environment::TickInfo::fields_count() const {
return kFieldsCount;
}

inline uint32_t Environment::TickInfo::index() const {
return fields_[kIndex];
}

inline uint32_t Environment::TickInfo::length() const {
return fields_[kLength];
}

inline void Environment::TickInfo::set_index(uint32_t value) {
fields_[kIndex] = value;
inline uint8_t Environment::TickInfo::scheduled() const {
return fields_[kScheduled];
}

inline void Environment::AssignToContext(v8::Local<v8::Context> context,
Expand Down
11 changes: 4 additions & 7 deletions src/env.h
Original file line number Diff line number Diff line change
Expand Up @@ -455,23 +455,20 @@ class Environment {

class TickInfo {
public:
inline uint32_t* fields();
inline uint8_t* fields();
inline int fields_count() const;
inline uint32_t index() const;
inline uint32_t length() const;
inline void set_index(uint32_t value);
inline uint8_t scheduled() const;

private:
friend class Environment; // So we can call the constructor.
inline TickInfo();

enum Fields {
kIndex,
kLength,
kScheduled,
kFieldsCount
};

uint32_t fields_[kFieldsCount];
uint8_t fields_[kFieldsCount];

DISALLOW_COPY_AND_ASSIGN(TickInfo);
};
Expand Down
33 changes: 20 additions & 13 deletions src/node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ using v8::SealHandleScope;
using v8::String;
using v8::TryCatch;
using v8::Uint32Array;
using v8::Uint8Array;
using v8::Undefined;
using v8::V8;
using v8::Value;
Expand Down Expand Up @@ -1195,25 +1196,32 @@ void SetupNextTick(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);

CHECK(args[0]->IsFunction());
CHECK(args[1]->IsObject());

env->set_tick_callback_function(args[0].As<Function>());

env->SetMethod(args[1].As<Object>(), "runMicrotasks", RunMicrotasks);

// Do a little housekeeping.
env->process_object()->Delete(
env->context(),
FIXED_ONE_BYTE_STRING(args.GetIsolate(), "_setupNextTick")).FromJust();
FIXED_ONE_BYTE_STRING(env->isolate(), "_setupNextTick")).FromJust();

// Values use to cross communicate with processNextTick.
uint32_t* const fields = env->tick_info()->fields();
uint32_t const fields_count = env->tick_info()->fields_count();
uint8_t* const fields = env->tick_info()->fields();
uint8_t const fields_count = env->tick_info()->fields_count();

Local<ArrayBuffer> array_buffer =
ArrayBuffer::New(env->isolate(), fields, sizeof(*fields) * fields_count);

args.GetReturnValue().Set(Uint32Array::New(array_buffer, 0, fields_count));
v8::Local<v8::Function> run_microtasks_fn =
env->NewFunctionTemplate(RunMicrotasks)->GetFunction(env->context())
.ToLocalChecked();
run_microtasks_fn->SetName(
FIXED_ONE_BYTE_STRING(env->isolate(), "runMicrotasks"));

Local<Array> ret = Array::New(env->isolate(), 2);
ret->Set(env->context(), 0,
Uint8Array::New(array_buffer, 0, fields_count)).FromJust();
ret->Set(env->context(), 1, run_microtasks_fn).FromJust();

args.GetReturnValue().Set(ret);
}

void PromiseRejectCallback(PromiseRejectMessage message) {
Expand Down Expand Up @@ -1339,7 +1347,7 @@ void InternalCallbackScope::Close() {

Environment::TickInfo* tick_info = env_->tick_info();

if (tick_info->length() == 0) {
if (tick_info->scheduled() == 0) {
env_->isolate()->RunMicrotasks();
}

Expand All @@ -1350,10 +1358,7 @@ void InternalCallbackScope::Close() {
CHECK_EQ(env_->trigger_async_id(), 0);
}

Local<Object> process = env_->process_object();

if (tick_info->length() == 0) {
tick_info->set_index(0);
if (tick_info->scheduled() == 0) {
return;
}

Expand All @@ -1362,6 +1367,8 @@ void InternalCallbackScope::Close() {
CHECK_EQ(env_->trigger_async_id(), 0);
}

Local<Object> process = env_->process_object();

if (env_->tick_callback_function()->Call(process, 0, nullptr).IsEmpty()) {
failed_ = true;
}
Expand Down

0 comments on commit 5257623

Please sign in to comment.