diff --git a/BUILD.gn b/BUILD.gn index cda8d1ffbb5eaf..461e5fcd194a44 100644 --- a/BUILD.gn +++ b/BUILD.gn @@ -13,6 +13,8 @@ group("default") { ":deno", ":hyper_hello", ":test_rs", + "core:deno_core_http_bench", + "core:deno_core_test", "libdeno:test_cc", ] } diff --git a/core/BUILD.gn b/core/BUILD.gn new file mode 100644 index 00000000000000..be06a7f4be99ea --- /dev/null +++ b/core/BUILD.gn @@ -0,0 +1,37 @@ +import("//build_extra/rust/rust.gni") + +# deno_core does not depend on flatbuffers nor tokio. +main_extern = [ + "$rust_build:futures", + "$rust_build:libc", + "$rust_build:serde_json", + "$rust_build:log", +] + +rust_crate("deno_core") { + source_root = "lib.rs" + extern = main_extern + deps = [ + "../libdeno:libdeno_static_lib", + ] +} + +rust_test("deno_core_test") { + source_root = "lib.rs" + extern = main_extern + deps = [ + "../libdeno:libdeno_static_lib", + ] +} + +rust_executable("deno_core_http_bench") { + source_root = "http_bench.rs" + extern = [ + "$rust_build:futures", + "$rust_build:lazy_static", + "$rust_build:libc", + "$rust_build:log", + "$rust_build:tokio", + ":deno_core" + ] +} diff --git a/core/http_bench.js b/core/http_bench.js new file mode 100644 index 00000000000000..b9615e6896b80d --- /dev/null +++ b/core/http_bench.js @@ -0,0 +1,150 @@ +// This is not a real HTTP server. We read blindly one time into 'requestBuf', +// then write this fixed 'responseBuf'. The point of this benchmark is to +// exercise the event loop in a simple yet semi-realistic way. +const shared32 = new Int32Array(libdeno.shared); + +const INDEX_NUM_RECORDS = 0; +const INDEX_RECORDS = 1; +const RECORD_OFFSET_PROMISE_ID = 0; +const RECORD_OFFSET_OP = 1; +const RECORD_OFFSET_ARG = 2; +const RECORD_OFFSET_RESULT = 3; +const RECORD_SIZE = 4; +const OP_LISTEN = 1; +const OP_ACCEPT = 2; +const OP_READ = 3; +const OP_WRITE = 4; +const OP_CLOSE = 5; + +const NUM_RECORDS = (shared32.length - INDEX_RECORDS) / RECORD_SIZE; +if (NUM_RECORDS != 100) { + throw Error("expected 100 entries"); +} + +const requestBuf = new Uint8Array(64 * 1024); +const responseBuf = new Uint8Array( + "HTTP/1.1 200 OK\r\nContent-Length: 12\r\n\r\nHello World\n" + .split("") + .map(c => c.charCodeAt(0)) +); + +const promiseMap = new Map(); +let nextPromiseId = 1; + +function createResolvable() { + let methods; + const promise = new Promise((resolve, reject) => { + methods = { resolve, reject }; + }); + return Object.assign(promise, methods); +} + +/** Returns Promise */ +function sendAsync(op, arg, zeroCopyData) { + const id = nextPromiseId++; + const p = createResolvable(); + shared32[INDEX_NUM_RECORDS] = 1; + setRecord(0, RECORD_OFFSET_PROMISE_ID, id); + setRecord(0, RECORD_OFFSET_OP, op); + setRecord(0, RECORD_OFFSET_ARG, arg); + setRecord(0, RECORD_OFFSET_RESULT, -1); + promiseMap.set(id, p); + libdeno.send(null, zeroCopyData); + return p; +} + +/** Returns u32 number */ +function sendSync(op, arg) { + shared32[INDEX_NUM_RECORDS] = 1; + setRecord(0, RECORD_OFFSET_PROMISE_ID, 0); + setRecord(0, RECORD_OFFSET_OP, op); + setRecord(0, RECORD_OFFSET_ARG, arg); + setRecord(0, RECORD_OFFSET_RESULT, -1); + libdeno.send(); + return getRecord(0, RECORD_OFFSET_RESULT); +} + +function setRecord(i, off, value) { + if (i >= NUM_RECORDS) { + throw Error("out of range"); + } + shared32[INDEX_RECORDS + RECORD_SIZE * i + off] = value; +} + +function getRecord(i, off) { + if (i >= NUM_RECORDS) { + throw Error("out of range"); + } + return shared32[INDEX_RECORDS + RECORD_SIZE * i + off]; +} + +function handleAsyncMsgFromRust() { + for (let i = 0; i < shared32[INDEX_NUM_RECORDS]; i++) { + let id = getRecord(i, RECORD_OFFSET_PROMISE_ID); + const p = promiseMap.get(id); + promiseMap.delete(id); + p.resolve(getRecord(i, RECORD_OFFSET_RESULT)); + } +} + +/** Listens on 0.0.0.0:4500, returns rid. */ +function listen() { + return sendSync(OP_LISTEN, -1); +} + +/** Accepts a connection, returns rid. */ +async function accept(rid) { + return await sendAsync(OP_ACCEPT, rid); +} + +/** + * Reads a packet from the rid, presumably an http request. data is ignored. + * Returns bytes read. + */ +async function read(rid, data) { + return await sendAsync(OP_READ, rid, data); +} + +/** Writes a fixed HTTP response to the socket rid. Returns bytes written. */ +async function write(rid, data) { + return await sendAsync(OP_WRITE, rid, data); +} + +function close(rid) { + return sendSync(OP_CLOSE, rid); +} + +async function serve(rid) { + while (true) { + const nread = await read(rid, requestBuf); + if (nread <= 0) { + break; + } + + const nwritten = await write(rid, responseBuf); + if (nwritten < 0) { + break; + } + } + close(rid); +} + +async function main() { + libdeno.recv(handleAsyncMsgFromRust); + + libdeno.print("http_bench.js start"); + + const listener_rid = listen(); + libdeno.print(`listening http://127.0.0.1:4544/ rid = ${listener_rid}`); + while (true) { + const rid = await accept(listener_rid); + // libdeno.print(`accepted ${rid}`); + if (rid < 0) { + libdeno.print(`accept error ${rid}`); + return; + } + serve(rid); + } +} + +main(); diff --git a/core/http_bench.rs b/core/http_bench.rs new file mode 100644 index 00000000000000..3da30433a07d6e --- /dev/null +++ b/core/http_bench.rs @@ -0,0 +1,210 @@ +/// To run this benchmark: +/// +/// > DENO_BUILD_MODE=release ./tools/build.py && \ +/// ./target/release/deno_core_http_bench --multi-thread +extern crate deno_core; +extern crate futures; +extern crate libc; +extern crate tokio; + +#[macro_use] +extern crate log; +#[macro_use] +extern crate lazy_static; + +use deno_core::deno_buf; +use deno_core::AsyncResult; +use deno_core::Isolate; +use deno_core::JSError; +use deno_core::Op; +use deno_core::RECORD_OFFSET_ARG; +use deno_core::RECORD_OFFSET_OP; +use deno_core::RECORD_OFFSET_PROMISE_ID; +use deno_core::RECORD_OFFSET_RESULT; +use futures::future::lazy; +use std::collections::HashMap; +use std::env; +use std::net::SocketAddr; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering; +use std::sync::Mutex; +use tokio::prelude::*; + +const OP_LISTEN: i32 = 1; +const OP_ACCEPT: i32 = 2; +const OP_READ: i32 = 3; +const OP_WRITE: i32 = 4; +const OP_CLOSE: i32 = 5; + +fn main() { + let js_source = include_str!("http_bench.js"); + let isolate = deno_core::Isolate::new(recv_cb); + + let main_future = lazy(move || { + // TODO currently isolate.execute() must be run inside tokio, hence the + // lazy(). It would be nice to not have that contraint. Probably requires + // using v8::MicrotasksPolicy::kExplicit + js_check(isolate.execute("http_bench.js", js_source)); + isolate.then(|r| { + js_check(r); + Ok(()) + }) + }); + + let args: Vec = env::args().collect(); + if args.len() > 1 && args[1] == "--multi-thread" { + println!("multi-thread"); + tokio::run(main_future); + } else { + println!("single-thread"); + tokio::runtime::current_thread::run(main_future); + } +} + +enum Repr { + TcpListener(tokio::net::TcpListener), + TcpStream(tokio::net::TcpStream), +} + +type ResourceTable = HashMap; +lazy_static! { + static ref RESOURCE_TABLE: Mutex = Mutex::new(HashMap::new()); + static ref NEXT_RID: AtomicUsize = AtomicUsize::new(3); +} + +fn new_rid() -> i32 { + let rid = NEXT_RID.fetch_add(1, Ordering::SeqCst); + rid as i32 +} + +fn recv_cb(isolate: &mut Isolate, zero_copy_buf: deno_buf) { + isolate.test_send_counter += 1; // TODO ideally store this in isolate.state? + + let promise_id = isolate.shared.get_record(0, RECORD_OFFSET_PROMISE_ID); + let op_id = isolate.shared.get_record(0, RECORD_OFFSET_OP); + let arg = isolate.shared.get_record(0, RECORD_OFFSET_ARG); + + // dbg!(promise_id); + // dbg!(op_id); + // dbg!(arg); + + let is_sync = promise_id == 0; + + if is_sync { + // sync ops + match op_id { + OP_CLOSE => { + debug!("close"); + assert!(is_sync); + let mut table = RESOURCE_TABLE.lock().unwrap(); + let r = table.remove(&arg); + isolate.shared.set_record( + 0, + RECORD_OFFSET_RESULT, + if r.is_some() { 0 } else { -1 }, + ); + } + OP_LISTEN => { + debug!("listen"); + assert!(is_sync); + + let addr = "127.0.0.1:4544".parse::().unwrap(); + let listener = tokio::net::TcpListener::bind(&addr).unwrap(); + let rid = new_rid(); + isolate.shared.set_record(0, RECORD_OFFSET_RESULT, rid); + let mut guard = RESOURCE_TABLE.lock().unwrap(); + guard.insert(rid, Repr::TcpListener(listener)); + } + _ => panic!("bad op"), + } + } else { + // async ops + let zero_copy_id = zero_copy_buf.zero_copy_id; + let op = match op_id { + OP_ACCEPT => { + let listener_rid = arg; + op_accept(listener_rid) + } + OP_READ => { + let rid = arg; + op_read(rid, zero_copy_buf) + } + OP_WRITE => { + let rid = arg; + op_write(rid, zero_copy_buf) + } + _ => panic!("bad op"), + }; + isolate.add_op(promise_id, op, zero_copy_id); + } +} + +fn op_accept(listener_rid: i32) -> Box { + debug!("accept {}", listener_rid); + Box::new( + futures::future::poll_fn(move || { + let mut table = RESOURCE_TABLE.lock().unwrap(); + let maybe_repr = table.get_mut(&listener_rid); + match maybe_repr { + Some(Repr::TcpListener(ref mut listener)) => listener.poll_accept(), + _ => panic!("bad rid"), + } + }).and_then(move |(stream, addr)| { + debug!("accept success {}", addr); + let rid = new_rid(); + + let mut guard = RESOURCE_TABLE.lock().unwrap(); + guard.insert(rid, Repr::TcpStream(stream)); + + Ok(AsyncResult { result: rid }) + }), + ) +} + +fn op_read(rid: i32, mut zero_copy_buf: deno_buf) -> Box { + debug!("read rid={}", rid); + Box::new( + futures::future::poll_fn(move || { + let mut table = RESOURCE_TABLE.lock().unwrap(); + let maybe_repr = table.get_mut(&rid); + match maybe_repr { + Some(Repr::TcpStream(ref mut stream)) => { + stream.poll_read(&mut zero_copy_buf) + } + _ => panic!("bad rid"), + } + }).and_then(move |nread| { + debug!("read success {}", nread); + Ok(AsyncResult { + result: nread as i32, + }) + }), + ) +} + +fn op_write(rid: i32, zero_copy_buf: deno_buf) -> Box { + debug!("write rid={}", rid); + Box::new( + futures::future::poll_fn(move || { + let mut table = RESOURCE_TABLE.lock().unwrap(); + let maybe_repr = table.get_mut(&rid); + match maybe_repr { + Some(Repr::TcpStream(ref mut stream)) => { + stream.poll_write(&zero_copy_buf) + } + _ => panic!("bad rid"), + } + }).and_then(move |nwritten| { + debug!("write success {}", nwritten); + Ok(AsyncResult { + result: nwritten as i32, + }) + }), + ) +} + +fn js_check(r: Result<(), JSError>) { + if let Err(e) = r { + panic!(e.to_string()); + } +} diff --git a/core/js_errors.rs b/core/js_errors.rs new file mode 100644 index 00000000000000..c07af136fe6b2b --- /dev/null +++ b/core/js_errors.rs @@ -0,0 +1,416 @@ +// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. + +// Note that source_map_mappings requires 0-indexed line and column numbers but +// V8 Exceptions are 1-indexed. + +// TODO: This currently only applies to uncaught exceptions. It would be nice to +// also have source maps for situations like this: +// const err = new Error("Boo!"); +// console.log(err.stack); +// It would require calling into Rust from Error.prototype.prepareStackTrace. + +use serde_json; +use std::fmt; +use std::str; + +#[derive(Debug, PartialEq)] +pub struct StackFrame { + pub line: i64, // zero indexed + pub column: i64, // zero indexed + pub script_name: String, + pub function_name: String, + pub is_eval: bool, + pub is_constructor: bool, + pub is_wasm: bool, +} + +#[derive(Debug, PartialEq)] +pub struct JSError { + pub message: String, + + pub source_line: Option, + pub script_resource_name: Option, + pub line_number: Option, + pub start_position: Option, + pub end_position: Option, + pub error_level: Option, + pub start_column: Option, + pub end_column: Option, + + pub frames: Vec, +} + +impl fmt::Display for StackFrame { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + // Note when we print to string, we change from 0-indexed to 1-indexed. + let function_name = self.function_name.clone(); + let script_line_column = + format_script_line_column(&self.script_name, self.line, self.column); + + if !self.function_name.is_empty() { + write!(f, " at {} ({})", function_name, script_line_column) + } else if self.is_eval { + write!(f, " at eval ({})", script_line_column) + } else { + write!(f, " at {}", script_line_column) + } + } +} + +fn format_script_line_column( + script_name: &str, + line: i64, + column: i64, +) -> String { + // TODO match this style with how typescript displays errors. + let line = (1 + line).to_string(); + let column = (1 + column).to_string(); + let script_name = script_name.to_string(); + format!("{}:{}:{}", script_name, line, column) +} + +impl fmt::Display for JSError { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + if self.script_resource_name.is_some() { + let script_resource_name = self.script_resource_name.as_ref().unwrap(); + // Avoid showing internal code from gen/bundle/main.js + if script_resource_name != "gen/bundle/main.js" + && script_resource_name != "gen/bundle/compiler.js" + { + if self.line_number.is_some() && self.start_column.is_some() { + assert!(self.line_number.is_some()); + assert!(self.start_column.is_some()); + let script_line_column = format_script_line_column( + script_resource_name, + self.line_number.unwrap() - 1, + self.start_column.unwrap() - 1, + ); + write!(f, "{}", script_line_column)?; + } + if self.source_line.is_some() { + write!(f, "\n{}\n", self.source_line.as_ref().unwrap())?; + let mut s = String::new(); + for i in 0..self.end_column.unwrap() { + if i >= self.start_column.unwrap() { + s.push('^'); + } else { + s.push(' '); + } + } + writeln!(f, "{}", s)?; + } + } + } + + write!(f, "{}", self.message.clone())?; + + for frame in &self.frames { + write!(f, "\n{}", &frame.to_string())?; + } + Ok(()) + } +} + +impl StackFrame { + // TODO Maybe use serde_derive? + fn from_json_value(v: &serde_json::Value) -> Option { + if !v.is_object() { + return None; + } + let obj = v.as_object().unwrap(); + + let line_v = &obj["line"]; + if !line_v.is_u64() { + return None; + } + let line = line_v.as_u64().unwrap() as i64; + + let column_v = &obj["column"]; + if !column_v.is_u64() { + return None; + } + let column = column_v.as_u64().unwrap() as i64; + + let script_name_v = &obj["scriptName"]; + if !script_name_v.is_string() { + return None; + } + let script_name = String::from(script_name_v.as_str().unwrap()); + + // Optional fields. See EncodeExceptionAsJSON() in libdeno. + // Sometimes V8 doesn't provide all the frame information. + + let mut function_name = String::from(""); // default + if obj.contains_key("functionName") { + let function_name_v = &obj["functionName"]; + if function_name_v.is_string() { + function_name = String::from(function_name_v.as_str().unwrap()); + } + } + + let mut is_eval = false; // default + if obj.contains_key("isEval") { + let is_eval_v = &obj["isEval"]; + if is_eval_v.is_boolean() { + is_eval = is_eval_v.as_bool().unwrap(); + } + } + + let mut is_constructor = false; // default + if obj.contains_key("isConstructor") { + let is_constructor_v = &obj["isConstructor"]; + if is_constructor_v.is_boolean() { + is_constructor = is_constructor_v.as_bool().unwrap(); + } + } + + let mut is_wasm = false; // default + if obj.contains_key("isWasm") { + let is_wasm_v = &obj["isWasm"]; + if is_wasm_v.is_boolean() { + is_wasm = is_wasm_v.as_bool().unwrap(); + } + } + + Some(StackFrame { + line: line - 1, + column: column - 1, + script_name, + function_name, + is_eval, + is_constructor, + is_wasm, + }) + } +} + +impl JSError { + /// Creates a new JSError by parsing the raw exception JSON string from V8. + pub fn from_v8_exception(json_str: &str) -> Option { + let v = serde_json::from_str::(json_str); + if v.is_err() { + return None; + } + let v = v.unwrap(); + + if !v.is_object() { + return None; + } + let obj = v.as_object().unwrap(); + + let message_v = &obj["message"]; + if !message_v.is_string() { + return None; + } + let message = String::from(message_v.as_str().unwrap()); + + let source_line = obj + .get("sourceLine") + .and_then(|v| v.as_str().map(String::from)); + let script_resource_name = obj + .get("scriptResourceName") + .and_then(|v| v.as_str().map(String::from)); + let line_number = obj.get("lineNumber").and_then(|v| v.as_i64()); + let start_position = obj.get("startPosition").and_then(|v| v.as_i64()); + let end_position = obj.get("endPosition").and_then(|v| v.as_i64()); + let error_level = obj.get("errorLevel").and_then(|v| v.as_i64()); + let start_column = obj.get("startColumn").and_then(|v| v.as_i64()); + let end_column = obj.get("endColumn").and_then(|v| v.as_i64()); + + let frames_v = &obj["frames"]; + if !frames_v.is_array() { + return None; + } + let frame_values = frames_v.as_array().unwrap(); + + let mut frames = Vec::::new(); + for frame_v in frame_values { + match StackFrame::from_json_value(frame_v) { + None => return None, + Some(frame) => frames.push(frame), + } + } + + Some(JSError { + message, + source_line, + script_resource_name, + line_number, + start_position, + end_position, + error_level, + start_column, + end_column, + frames, + }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn error1() -> JSError { + JSError { + message: "Error: foo bar".to_string(), + source_line: None, + script_resource_name: None, + line_number: None, + start_position: None, + end_position: None, + error_level: None, + start_column: None, + end_column: None, + frames: vec![ + StackFrame { + line: 4, + column: 16, + script_name: "foo_bar.ts".to_string(), + function_name: "foo".to_string(), + is_eval: false, + is_constructor: false, + is_wasm: false, + }, + StackFrame { + line: 5, + column: 20, + script_name: "bar_baz.ts".to_string(), + function_name: "qat".to_string(), + is_eval: false, + is_constructor: false, + is_wasm: false, + }, + StackFrame { + line: 1, + column: 1, + script_name: "deno_main.js".to_string(), + function_name: "".to_string(), + is_eval: false, + is_constructor: false, + is_wasm: false, + }, + ], + } + } + + #[test] + fn stack_frame_from_json_value_1() { + let v = serde_json::from_str::( + r#"{ + "line":2, + "column":11, + "functionName":"foo", + "scriptName":"/Users/rld/src/deno/tests/error_001.ts", + "isEval":true, + "isConstructor":false, + "isWasm":false + }"#, + ).unwrap(); + let r = StackFrame::from_json_value(&v); + assert_eq!( + r, + Some(StackFrame { + line: 1, + column: 10, + script_name: "/Users/rld/src/deno/tests/error_001.ts".to_string(), + function_name: "foo".to_string(), + is_eval: true, + is_constructor: false, + is_wasm: false, + }) + ); + } + + #[test] + fn stack_frame_from_json_value_2() { + let v = serde_json::from_str::( + r#"{ + "scriptName": "/Users/rld/src/deno/tests/error_001.ts", + "line": 2, + "column": 11 + }"#, + ).unwrap(); + let r = StackFrame::from_json_value(&v); + assert!(r.is_some()); + let f = r.unwrap(); + assert_eq!(f.line, 1); + assert_eq!(f.column, 10); + assert_eq!(f.script_name, "/Users/rld/src/deno/tests/error_001.ts"); + } + + #[test] + fn js_error_from_v8_exception() { + let r = JSError::from_v8_exception( + r#"{ + "message":"Uncaught Error: bad", + "frames":[ + { + "line":2, + "column":11, + "functionName":"foo", + "scriptName":"/Users/rld/src/deno/tests/error_001.ts", + "isEval":true, + "isConstructor":false, + "isWasm":false + }, { + "line":5, + "column":5, + "functionName":"bar", + "scriptName":"/Users/rld/src/deno/tests/error_001.ts", + "isEval":true, + "isConstructor":false, + "isWasm":false + } + ]}"#, + ); + assert!(r.is_some()); + let e = r.unwrap(); + assert_eq!(e.message, "Uncaught Error: bad"); + assert_eq!(e.frames.len(), 2); + assert_eq!( + e.frames[0], + StackFrame { + line: 1, + column: 10, + script_name: "/Users/rld/src/deno/tests/error_001.ts".to_string(), + function_name: "foo".to_string(), + is_eval: true, + is_constructor: false, + is_wasm: false, + } + ) + } + + #[test] + fn js_error_from_v8_exception2() { + let r = JSError::from_v8_exception( + "{\"message\":\"Error: boo\",\"sourceLine\":\"throw Error('boo');\",\"scriptResourceName\":\"a.js\",\"lineNumber\":3,\"startPosition\":8,\"endPosition\":9,\"errorLevel\":8,\"startColumn\":6,\"endColumn\":7,\"isSharedCrossOrigin\":false,\"isOpaque\":false,\"frames\":[{\"line\":3,\"column\":7,\"functionName\":\"\",\"scriptName\":\"a.js\",\"isEval\":false,\"isConstructor\":false,\"isWasm\":false}]}" + ); + assert!(r.is_some()); + let e = r.unwrap(); + assert_eq!(e.message, "Error: boo"); + assert_eq!(e.source_line, Some("throw Error('boo');".to_string())); + assert_eq!(e.script_resource_name, Some("a.js".to_string())); + assert_eq!(e.line_number, Some(3)); + assert_eq!(e.start_position, Some(8)); + assert_eq!(e.end_position, Some(9)); + assert_eq!(e.error_level, Some(8)); + assert_eq!(e.start_column, Some(6)); + assert_eq!(e.end_column, Some(7)); + assert_eq!(e.frames.len(), 1); + } + + #[test] + fn stack_frame_to_string() { + let e = error1(); + assert_eq!(" at foo (foo_bar.ts:5:17)", &e.frames[0].to_string()); + assert_eq!(" at qat (bar_baz.ts:6:21)", &e.frames[1].to_string()); + } + + #[test] + fn js_error_to_string() { + let e = error1(); + let expected = "Error: foo bar\n at foo (foo_bar.ts:5:17)\n at qat (bar_baz.ts:6:21)\n at deno_main.js:2:2"; + assert_eq!(expected, &e.to_string()); + } +} diff --git a/core/lib.rs b/core/lib.rs new file mode 100644 index 00000000000000..d13339ee501bf4 --- /dev/null +++ b/core/lib.rs @@ -0,0 +1,364 @@ +#[macro_use] +extern crate log; +extern crate futures; +extern crate libc; + +mod js_errors; +mod libdeno; +mod shared; + +pub use crate::js_errors::JSError; +pub use crate::libdeno::deno_buf; +pub use crate::shared::*; +use futures::Async; +use futures::Future; +use futures::Poll; +use libc::c_void; +use std::collections::HashMap; +use std::ffi::CStr; +use std::ffi::CString; +use std::sync::{Once, ONCE_INIT}; + +pub struct Isolate { + libdeno_isolate: *const libdeno::isolate, + pending_ops: HashMap, // promise_id -> op + polled_recently: bool, + recv_cb: RecvCallback, + + pub shared: Shared, + pub test_send_counter: u32, // TODO only used for testing- REMOVE. +} + +pub type RecvCallback = fn(isolate: &mut Isolate, zero_copy_buf: deno_buf); + +pub const NUM_RECORDS: usize = 100; + +// TODO rename to AsyncResult +pub struct AsyncResult { + pub result: i32, +} + +pub type Op = dyn Future + Send; + +struct PendingOp { + op: Box, + polled_recently: bool, + zero_copy_id: usize, // non-zero if associated zero-copy buffer. +} + +static DENO_INIT: Once = ONCE_INIT; + +unsafe impl Send for Isolate {} + +impl Isolate { + pub fn new(recv_cb: RecvCallback) -> Self { + DENO_INIT.call_once(|| { + unsafe { libdeno::deno_init() }; + }); + + // Allocate unmanaged memory for the shared buffer by creating a Vec, + // grabbing the raw pointer, and then leaking the Vec so it is never freed. + let mut shared = Shared::new(); + let shared_deno_buf = shared.as_deno_buf(); + + let config = libdeno::deno_config { + will_snapshot: 0, + load_snapshot: deno_buf::empty(), // TODO + shared: shared_deno_buf, + recv_cb: pre_dispatch, + }; + let libdeno_isolate = unsafe { libdeno::deno_new(config) }; + + Self { + pending_ops: HashMap::new(), + polled_recently: false, + libdeno_isolate, + test_send_counter: 0, + recv_cb, + shared, + } + } + + fn zero_copy_release(&self, zero_copy_id: usize) { + unsafe { + libdeno::deno_zero_copy_release(self.libdeno_isolate, zero_copy_id) + } + } + + pub fn add_op( + self: &mut Self, + promise_id: i32, + op: Box, + zero_copy_id: usize, + ) { + debug!("add_op {}", zero_copy_id); + self.pending_ops.insert( + promise_id, + PendingOp { + op, + polled_recently: false, + zero_copy_id, + }, + ); + self.polled_recently = false; + } + + #[inline] + pub unsafe fn from_raw_ptr<'a>(ptr: *const c_void) -> &'a mut Self { + let ptr = ptr as *mut _; + &mut *ptr + } + + #[inline] + pub fn as_raw_ptr(&self) -> *const c_void { + self as *const _ as *const c_void + } + + pub fn execute( + &self, + js_filename: &str, + js_source: &str, + ) -> Result<(), JSError> { + let filename = CString::new(js_filename).unwrap(); + let source = CString::new(js_source).unwrap(); + unsafe { + libdeno::deno_execute( + self.libdeno_isolate, + self.as_raw_ptr(), + filename.as_ptr(), + source.as_ptr(), + ) + }; + if let Some(err) = self.last_exception() { + return Err(err); + } + Ok(()) + } + + pub fn last_exception(&self) -> Option { + let ptr = unsafe { libdeno::deno_last_exception(self.libdeno_isolate) }; + if ptr.is_null() { + None + } else { + let cstr = unsafe { CStr::from_ptr(ptr) }; + let v8_exception = cstr.to_str().unwrap(); + debug!("v8_exception\n{}\n", v8_exception); + let js_error = JSError::from_v8_exception(v8_exception).unwrap(); + Some(js_error) + } + } + + fn check_promise_errors(&self) { + unsafe { + libdeno::deno_check_promise_errors(self.libdeno_isolate); + } + } + + fn respond(&mut self) -> Result<(), JSError> { + let buf = deno_buf::empty(); + unsafe { + libdeno::deno_respond(self.libdeno_isolate, self.as_raw_ptr(), buf) + } + if let Some(err) = self.last_exception() { + Err(err) + } else { + Ok(()) + } + } +} + +struct LockerScope { + libdeno_isolate: *const libdeno::isolate, +} + +impl LockerScope { + fn new(isolate: &Isolate) -> LockerScope { + let libdeno_isolate = isolate.libdeno_isolate; + unsafe { libdeno::deno_lock(libdeno_isolate) } + LockerScope { libdeno_isolate } + } +} + +impl Drop for LockerScope { + fn drop(&mut self) { + unsafe { libdeno::deno_unlock(self.libdeno_isolate) } + } +} + +impl Future for Isolate { + type Item = (); + type Error = JSError; + + fn poll(&mut self) -> Poll<(), JSError> { + // Lock the current thread for V8. + let _locker = LockerScope::new(self); + + // Clear + self.polled_recently = false; + for (_, pending) in self.pending_ops.iter_mut() { + pending.polled_recently = false; + } + + while !self.polled_recently { + let mut complete = HashMap::::new(); + + self.polled_recently = true; + for (promise_id, pending) in self.pending_ops.iter_mut() { + // Do not call poll on futures we've already polled this turn. + if pending.polled_recently { + continue; + } + pending.polled_recently = true; + + let promise_id = *promise_id; + let op = &mut pending.op; + match op.poll() { + Err(op_err) => { + eprintln!("op err {:?}", op_err); + complete.insert(promise_id, AsyncResult { result: -1 }); + debug!("pending op {} complete err", promise_id); + } + Ok(Async::Ready(async_result)) => { + complete.insert(promise_id, async_result); + debug!("pending op {} complete ready", promise_id); + } + Ok(Async::NotReady) => { + debug!("pending op {} not ready", promise_id); + continue; + } + } + } + + self.shared.set_num_records(complete.len() as i32); + if complete.len() > 0 { + // self.zero_copy_release() and self.respond() need Locker. + let mut i = 0; + for (promise_id, async_result) in complete.iter_mut() { + let pending = self.pending_ops.remove(promise_id).unwrap(); + + if pending.zero_copy_id > 0 { + self.zero_copy_release(pending.zero_copy_id); + } + + self + .shared + .set_record(i, RECORD_OFFSET_PROMISE_ID, *promise_id); + self + .shared + .set_record(i, RECORD_OFFSET_RESULT, async_result.result); + i += 1; + } + self.respond()?; + } + } + + self.check_promise_errors(); + if let Some(err) = self.last_exception() { + return Err(err); + } + + // We're idle if pending_ops is empty. + if self.pending_ops.is_empty() { + Ok(futures::Async::Ready(())) + } else { + Ok(futures::Async::NotReady) + } + } +} + +extern "C" fn pre_dispatch( + user_data: *mut c_void, + control_buf: deno_buf, + zero_copy_buf: deno_buf, +) { + let isolate = unsafe { Isolate::from_raw_ptr(user_data) }; + assert_eq!(control_buf.len(), 0); + (isolate.recv_cb)(isolate, zero_copy_buf); +} + +#[cfg(test)] +mod tests { + use super::*; + + fn inc_counter(isolate: &mut Isolate, zero_copy_buf: deno_buf) { + assert_eq!(zero_copy_buf.len(), 0); + isolate.test_send_counter += 1; // TODO ideally store this in isolate.state? + } + + fn js_check(r: Result<(), JSError>) { + if let Err(e) = r { + panic!(e.to_string()); + } + } + + #[test] + fn test_execute() { + let isolate = Isolate::new(inc_counter); + js_check(isolate.execute( + "filename.js", + r#" + libdeno.send(); + async function main() { + libdeno.send(); + } + main(); + "#, + )); + // We expect that main is executed even tho we didn't poll. + assert_eq!(isolate.test_send_counter, 2); + } + + fn async_immediate(isolate: &mut Isolate, zero_copy_buf: deno_buf) { + assert_eq!(zero_copy_buf.len(), 0); + isolate.test_send_counter += 1; // TODO ideally store this in isolate.state? + + let promise_id = 0; + let op = Box::new(futures::future::ok(AsyncResult { result: 0 })); + isolate.add_op(promise_id, op, zero_copy_buf.zero_copy_id); + } + + #[test] + fn test_poll_async_immediate_ops() { + let mut isolate = Isolate::new(async_immediate); + js_check(isolate.execute( + "setup.js", + r#" + let nrecv = 0; + libdeno.recv(() => { + nrecv++; + }); + function assertEq(actual, expected) { + if (expected != actual) { + throw Error(`actual ${actual} expected ${expected} `); + } + } + "#, + )); + assert_eq!(isolate.test_send_counter, 0); + js_check(isolate.execute( + "check1.js", + r#" + assertEq(nrecv, 0); + libdeno.send(); + assertEq(nrecv, 0); + "#, + )); + assert_eq!(isolate.test_send_counter, 1); + assert_eq!(Ok(Async::Ready(())), isolate.poll()); + assert_eq!(isolate.test_send_counter, 1); + js_check(isolate.execute( + "check2.js", + r#" + assertEq(nrecv, 1); + libdeno.send(); + assertEq(nrecv, 1); + "#, + )); + assert_eq!(isolate.test_send_counter, 2); + assert_eq!(Ok(Async::Ready(())), isolate.poll()); + js_check(isolate.execute("check3.js", "assertEq(nrecv, 2)")); + assert_eq!(isolate.test_send_counter, 2); + // We are idle, so the next poll should be the last. + assert_eq!(Ok(Async::Ready(())), isolate.poll()); + } +} diff --git a/core/libdeno.rs b/core/libdeno.rs new file mode 120000 index 00000000000000..32688906e1553e --- /dev/null +++ b/core/libdeno.rs @@ -0,0 +1 @@ +../src/libdeno.rs \ No newline at end of file diff --git a/core/shared.rs b/core/shared.rs new file mode 100644 index 00000000000000..40d83db732b173 --- /dev/null +++ b/core/shared.rs @@ -0,0 +1,49 @@ +use crate::libdeno::deno_buf; +use std::mem; + +// TODO this is where we abstract flatbuffers at. +// TODO make these constants private to this file. +const INDEX_NUM_RECORDS: usize = 0; +const INDEX_RECORDS: usize = 1; +pub const RECORD_OFFSET_PROMISE_ID: usize = 0; +pub const RECORD_OFFSET_OP: usize = 1; +pub const RECORD_OFFSET_ARG: usize = 2; +pub const RECORD_OFFSET_RESULT: usize = 3; +const RECORD_SIZE: usize = 4; +const NUM_RECORDS: usize = 100; + +/// Represents the shared buffer between JS and Rust. +/// Used for FFI. +pub struct Shared(Vec); + +impl Shared { + pub fn new() -> Shared { + let mut vec = Vec::::new(); + vec.resize(INDEX_RECORDS + RECORD_SIZE * NUM_RECORDS, 0); + Shared(vec) + } + + pub fn set_record(&mut self, i: usize, off: usize, value: i32) { + assert!(i < NUM_RECORDS); + self.0[INDEX_RECORDS + RECORD_SIZE * i + off] = value; + } + + pub fn get_record(&self, i: usize, off: usize) -> i32 { + assert!(i < NUM_RECORDS); + return self.0[INDEX_RECORDS + RECORD_SIZE * i + off]; + } + + pub fn set_num_records(&mut self, num_records: i32) { + self.0[INDEX_NUM_RECORDS] = num_records; + } + + pub fn get_num_records(&self) -> i32 { + return self.0[INDEX_NUM_RECORDS]; + } + + pub fn as_deno_buf(&mut self) -> deno_buf { + let ptr = self.0.as_mut_ptr() as *mut u8; + let len = mem::size_of::() * self.0.len(); + unsafe { deno_buf::from_raw_parts(ptr, len) } + } +} diff --git a/js/dispatch.ts b/js/dispatch.ts index e52a6f64c0eb1c..55ea682fc127de 100644 --- a/js/dispatch.ts +++ b/js/dispatch.ts @@ -17,7 +17,7 @@ export function setFireTimersCallback(fn: () => void) { export function handleAsyncMsgFromRust(ui8: Uint8Array) { // If a the buffer is empty, recv() on the native side timed out and we // did not receive a message. - if (ui8.length) { + if (ui8 && ui8.length) { const bb = new flatbuffers.ByteBuffer(ui8); const base = msg.Base.getRootAsBase(bb); const cmdId = base.cmdId(); diff --git a/libdeno/api.cc b/libdeno/api.cc index 2601862a2dea77..4cb7b846bd8f1c 100644 --- a/libdeno/api.cc +++ b/libdeno/api.cc @@ -78,6 +78,19 @@ deno::DenoIsolate* unwrap(Deno* d_) { return reinterpret_cast(d_); } +void deno_lock(Deno* d_) { + auto* d = unwrap(d_); + CHECK_NULL(d->locker_); + d->locker_ = new v8::Locker(d->isolate_); +} + +void deno_unlock(Deno* d_) { + auto* d = unwrap(d_); + CHECK_NOT_NULL(d->locker_); + delete d->locker_; + d->locker_ = nullptr; +} + deno_buf deno_get_snapshot(Deno* d_) { auto* d = unwrap(d_); CHECK_NOT_NULL(d->snapshot_creator_); @@ -87,7 +100,7 @@ deno_buf deno_get_snapshot(Deno* d_) { auto blob = d->snapshot_creator_->CreateBlob( v8::SnapshotCreator::FunctionCodeHandling::kKeep); return {nullptr, 0, reinterpret_cast(const_cast(blob.data)), - blob.raw_size}; + blob.raw_size, 0}; } static std::unique_ptr platform; @@ -127,12 +140,23 @@ void deno_execute(Deno* d_, void* user_data, const char* js_filename, deno::Execute(context, js_filename, js_source); } -void deno_respond(Deno* d_, void* user_data, int32_t req_id, deno_buf buf) { +void deno_zero_copy_release(Deno* d_, size_t zero_copy_id) { + auto* d = unwrap(d_); + v8::Isolate::Scope isolate_scope(d->isolate_); + v8::Locker locker(d->isolate_); + v8::HandleScope handle_scope(d->isolate_); + d->DeleteZeroCopyRef(zero_copy_id); +} + +void deno_respond(Deno* d_, void* user_data, deno_buf buf) { auto* d = unwrap(d_); if (d->current_args_ != nullptr) { // Synchronous response. - auto ab = deno::ImportBuf(d, buf); - d->current_args_->GetReturnValue().Set(ab); + if (buf.data_ptr != nullptr) { + DCHECK_EQ(buf.zero_copy_id, 0); + auto ab = deno::ImportBuf(d, buf); + d->current_args_->GetReturnValue().Set(ab); + } d->current_args_ = nullptr; return; } @@ -148,8 +172,6 @@ void deno_respond(Deno* d_, void* user_data, int32_t req_id, deno_buf buf) { v8::TryCatch try_catch(d->isolate_); - deno::DeleteDataRef(d, req_id); - auto recv_ = d->recv_.Get(d->isolate_); if (recv_.IsEmpty()) { d->last_exception_ = "libdeno.recv_ has not been called."; @@ -157,8 +179,17 @@ void deno_respond(Deno* d_, void* user_data, int32_t req_id, deno_buf buf) { } v8::Local args[1]; - args[0] = deno::ImportBuf(d, buf); - auto v = recv_->Call(context, context->Global(), 1, args); + int argc = 0; + + // You cannot use zero_copy_buf with deno_respond(). Use + // deno_zero_copy_release() instead. + DCHECK_EQ(buf.zero_copy_id, 0); + if (buf.data_ptr != nullptr) { + args[0] = deno::ImportBuf(d, buf); + argc = 1; + } + + auto v = recv_->Call(context, context->Global(), argc, args); if (try_catch.HasCaught()) { CHECK(v.IsEmpty()); diff --git a/libdeno/binding.cc b/libdeno/binding.cc index f640fe83c68bde..d4849db7be51c1 100644 --- a/libdeno/binding.cc +++ b/libdeno/binding.cc @@ -44,20 +44,6 @@ v8::StartupData SerializeInternalFields(v8::Local holder, int index, return {payload, size}; } -void AddDataRef(DenoIsolate* d, int32_t req_id, v8::Local data_v) { - d->async_data_map_.emplace(std::piecewise_construct, std::make_tuple(req_id), - std::make_tuple(d->isolate_, data_v)); -} - -void DeleteDataRef(DenoIsolate* d, int32_t req_id) { - // Delete persistent reference to data ArrayBuffer. - auto it = d->async_data_map_.find(req_id); - if (it != d->async_data_map_.end()) { - it->second.Reset(); - d->async_data_map_.erase(it); - } -} - // Extracts a C string from a v8::V8 Utf8Value. const char* ToCString(const v8::String::Utf8Value& value) { return *value ? *value : ""; @@ -131,6 +117,13 @@ void ErrorToJSON(const v8::FunctionCallbackInfo& args) { } v8::Local ImportBuf(DenoIsolate* d, deno_buf buf) { + // Do not use ImportBuf with zero_copy buffers. + DCHECK_EQ(buf.zero_copy_id, 0); + + if (buf.data_ptr == nullptr) { + return v8::Local(); + } + if (buf.alloc_ptr == nullptr) { // If alloc_ptr isn't set, we memcpy. // This is currently used for flatbuffers created in Rust. @@ -209,42 +202,44 @@ void Send(const v8::FunctionCallbackInfo& args) { DenoIsolate* d = DenoIsolate::FromIsolate(isolate); DCHECK_EQ(d->isolate_, isolate); - v8::Locker locker(d->isolate_); + deno_buf control = {nullptr, 0u, nullptr, 0u, 0u}; + deno_buf zero_copy = {nullptr, 0u, nullptr, 0u, 0u}; + v8::HandleScope handle_scope(isolate); - CHECK_NULL(d->current_args_); // libdeno.send re-entry forbidden. - int32_t req_id = d->next_req_id_++; + if (args.Length() > 0) { + v8::Local control_v = args[0]; + if (control_v->IsArrayBufferView()) { + control = + GetContents(isolate, v8::Local::Cast(control_v)); + } + } - v8::Local control_v = args[0]; - CHECK(control_v->IsArrayBufferView()); - deno_buf control = - GetContents(isolate, v8::Local::Cast(control_v)); - deno_buf data = {nullptr, 0u, nullptr, 0u}; - v8::Local data_v; + v8::Local zero_copy_v; if (args.Length() == 2) { if (args[1]->IsArrayBufferView()) { - data_v = args[1]; - data = GetContents(isolate, v8::Local::Cast(data_v)); + zero_copy_v = args[1]; + zero_copy = GetContents( + isolate, v8::Local::Cast(zero_copy_v)); + size_t zero_copy_id = d->next_zero_copy_id_++; + DCHECK_GT(zero_copy_id, 0); + zero_copy.zero_copy_id = zero_copy_id; + // If the zero_copy ArrayBuffer was given, we must maintain a strong + // reference to it until deno_zero_copy_release is called. + d->AddZeroCopyRef(zero_copy_id, zero_copy_v); } - } else { - CHECK_EQ(args.Length(), 1); } DCHECK_NULL(d->current_args_); d->current_args_ = &args; - d->recv_cb_(d->user_data_, req_id, control, data); + d->recv_cb_(d->user_data_, control, zero_copy); if (d->current_args_ == nullptr) { // This indicates that deno_repond() was called already. } else { // Asynchronous. d->current_args_ = nullptr; - // If the data ArrayBuffer was given, we must maintain a strong reference - // to it until deno_respond is called. - if (!data_v.IsEmpty()) { - AddDataRef(d, req_id, data_v); - } } } diff --git a/libdeno/deno.h b/libdeno/deno.h index 6be0b56256baaf..f3902985e01fe7 100644 --- a/libdeno/deno.h +++ b/libdeno/deno.h @@ -15,6 +15,7 @@ typedef struct { size_t alloc_len; // Length of the memory allocation. uint8_t* data_ptr; // Start of logical contents (within the allocation). size_t data_len; // Length of logical contents. + size_t zero_copy_id; // 0 = normal, 1 = must call deno_zero_copy_release. } deno_buf; typedef struct deno_s Deno; @@ -22,8 +23,8 @@ typedef struct deno_s Deno; // A callback to receive a message from a libdeno.send() javascript call. // control_buf is valid for only for the lifetime of this callback. // data_buf is valid until deno_respond() is called. -typedef void (*deno_recv_cb)(void* user_data, int32_t req_id, - deno_buf control_buf, deno_buf data_buf); +typedef void (*deno_recv_cb)(void* user_data, deno_buf control_buf, + deno_buf zerop_copy_buf); void deno_init(); const char* deno_v8_version(); @@ -47,6 +48,9 @@ deno_buf deno_get_snapshot(Deno* d); void deno_delete(Deno* d); +void deno_lock(Deno* d); +void deno_unlock(Deno* d); + // Compile and execute a traditional JavaScript script that does not use // module import statements. // If it succeeded deno_last_exception() will return NULL. @@ -66,11 +70,13 @@ void deno_execute(Deno* d, void* user_data, const char* js_filename, // longer owns `buf` and must not use it; deno_respond() is responsible for // releasing its memory.) // -// Calling this function more than once with the same req_id will result in -// an error. -// // If a JS exception was encountered, deno_last_exception() will be non-NULL. -void deno_respond(Deno* d, void* user_data, int32_t req_id, deno_buf buf); +void deno_respond(Deno* d, void* user_data, deno_buf buf); + +// consumes zero_copy +// Calling this function more than once with the same zero_copy_id will result +// in an error. +void deno_zero_copy_release(Deno* d, size_t zero_copy_id); void deno_check_promise_errors(Deno* d); diff --git a/libdeno/internal.h b/libdeno/internal.h index 7209664079a964..563043085b115b 100644 --- a/libdeno/internal.h +++ b/libdeno/internal.h @@ -30,12 +30,13 @@ class DenoIsolate { public: explicit DenoIsolate(deno_config config) : isolate_(nullptr), + locker_(nullptr), shared_(config.shared), current_args_(nullptr), snapshot_creator_(nullptr), global_import_buf_ptr_(nullptr), recv_cb_(config.recv_cb), - next_req_id_(0), + next_zero_copy_id_(1), // zero_copy_id must not be zero. user_data_(nullptr), resolve_cb_(nullptr) { array_buffer_allocator_ = v8::ArrayBuffer::Allocator::NewDefaultAllocator(); @@ -48,6 +49,9 @@ class DenoIsolate { ~DenoIsolate() { shared_ab_.Reset(); + if (locker_) { + delete locker_; + } if (snapshot_creator_) { delete snapshot_creator_; } else { @@ -78,14 +82,31 @@ class DenoIsolate { } } + void DeleteZeroCopyRef(size_t zero_copy_id) { + DCHECK_NE(zero_copy_id, 0); + // Delete persistent reference to data ArrayBuffer. + auto it = zero_copy_map_.find(zero_copy_id); + if (it != zero_copy_map_.end()) { + it->second.Reset(); + zero_copy_map_.erase(it); + } + } + + void AddZeroCopyRef(size_t zero_copy_id, v8::Local zero_copy_v) { + zero_copy_map_.emplace(std::piecewise_construct, + std::make_tuple(zero_copy_id), + std::make_tuple(isolate_, zero_copy_v)); + } + v8::Isolate* isolate_; + v8::Locker* locker_; v8::ArrayBuffer::Allocator* array_buffer_allocator_; deno_buf shared_; const v8::FunctionCallbackInfo* current_args_; v8::SnapshotCreator* snapshot_creator_; void* global_import_buf_ptr_; deno_recv_cb recv_cb_; - int32_t next_req_id_; + size_t next_zero_copy_id_; void* user_data_; v8::Persistent builtin_modules_; @@ -94,7 +115,7 @@ class DenoIsolate { deno_resolve_cb resolve_cb_; v8::Persistent context_; - std::map> async_data_map_; + std::map> zero_copy_map_; std::map> pending_promise_map_; std::string last_exception_; v8::Persistent recv_; @@ -152,7 +173,7 @@ static intptr_t external_references[] = { reinterpret_cast(MessageCallback), 0}; -static const deno_buf empty_buf = {nullptr, 0, nullptr, 0}; +static const deno_buf empty_buf = {nullptr, 0, nullptr, 0, 0}; Deno* NewFromSnapshot(void* user_data, deno_recv_cb cb); @@ -166,8 +187,6 @@ v8::StartupData SerializeInternalFields(v8::Local holder, int index, v8::Local ImportBuf(DenoIsolate* d, deno_buf buf); -void DeleteDataRef(DenoIsolate* d, int32_t req_id); - bool Execute(v8::Local context, const char* js_filename, const char* js_source); bool ExecuteMod(v8::Local context, const char* js_filename, diff --git a/libdeno/libdeno_test.cc b/libdeno/libdeno_test.cc index 3193e7677023d8..90fceef73b2c2a 100644 --- a/libdeno/libdeno_test.cc +++ b/libdeno/libdeno_test.cc @@ -26,9 +26,11 @@ TEST(LibDenoTest, Snapshotter) { TEST(LibDenoTest, CanCallFunction) { Deno* d = deno_new(deno_config{0, snapshot, empty, nullptr}); + deno_lock(d); deno_execute(d, nullptr, "a.js", "if (CanCallFunction() != 'foo') throw Error();"); EXPECT_EQ(nullptr, deno_last_exception(d)); + deno_unlock(d); deno_delete(d); } @@ -47,6 +49,7 @@ deno_buf strbuf(const char* str) { buf.alloc_len = len + 1; buf.data_ptr = buf.alloc_ptr; buf.data_len = len; + buf.zero_copy_id = 0; return buf; } @@ -71,8 +74,8 @@ void assert_null(deno_buf b) { TEST(LibDenoTest, RecvReturnEmpty) { static int count = 0; - auto recv_cb = [](auto _, int req_id, auto buf, auto data_buf) { - assert_null(data_buf); + auto recv_cb = [](auto _, auto buf, auto zero_copy_buf) { + assert_null(zero_copy_buf); count++; EXPECT_EQ(static_cast(3), buf.data_len); EXPECT_EQ(buf.data_ptr[0], 'a'); @@ -88,15 +91,17 @@ TEST(LibDenoTest, RecvReturnEmpty) { TEST(LibDenoTest, RecvReturnBar) { static int count = 0; - auto recv_cb = [](auto user_data, int req_id, auto buf, auto data_buf) { + auto recv_cb = [](auto user_data, auto buf, auto zero_copy_buf) { auto d = reinterpret_cast(user_data); - assert_null(data_buf); + assert_null(zero_copy_buf); count++; EXPECT_EQ(static_cast(3), buf.data_len); EXPECT_EQ(buf.data_ptr[0], 'a'); EXPECT_EQ(buf.data_ptr[1], 'b'); EXPECT_EQ(buf.data_ptr[2], 'c'); - deno_respond(d, user_data, req_id, strbuf("bar")); + EXPECT_EQ(zero_copy_buf.zero_copy_id, 0u); + EXPECT_EQ(zero_copy_buf.data_ptr, nullptr); + deno_respond(d, user_data, strbuf("bar")); }; Deno* d = deno_new(deno_config{0, snapshot, empty, recv_cb}); deno_execute(d, d, "a.js", "RecvReturnBar()"); @@ -114,9 +119,9 @@ TEST(LibDenoTest, DoubleRecvFails) { TEST(LibDenoTest, SendRecvSlice) { static int count = 0; - auto recv_cb = [](auto user_data, int req_id, auto buf, auto data_buf) { + auto recv_cb = [](auto user_data, auto buf, auto zero_copy_buf) { auto d = reinterpret_cast(user_data); - assert_null(data_buf); + assert_null(zero_copy_buf); static const size_t alloc_len = 1024; size_t i = count++; // Check the size and offset of the slice. @@ -134,12 +139,12 @@ TEST(LibDenoTest, SendRecvSlice) { memcpy(alloc_ptr, buf.alloc_ptr, alloc_len); // Make a slice that is a bit shorter than the original. deno_buf buf2{alloc_ptr, alloc_len, alloc_ptr + data_offset, - buf.data_len - 19}; + buf.data_len - 19, 0}; // Place some values into the buffer for the JS side to verify. buf2.data_ptr[0] = 200 + i; buf2.data_ptr[buf2.data_len - 1] = 200 - i; // Send back. - deno_respond(d, user_data, req_id, buf2); + deno_respond(d, user_data, buf2); }; Deno* d = deno_new(deno_config{0, snapshot, empty, recv_cb}); deno_execute(d, d, "a.js", "SendRecvSlice()"); @@ -150,8 +155,8 @@ TEST(LibDenoTest, SendRecvSlice) { TEST(LibDenoTest, JSSendArrayBufferViewTypes) { static int count = 0; - auto recv_cb = [](auto _, int req_id, auto buf, auto data_buf) { - assert_null(data_buf); + auto recv_cb = [](auto _, auto buf, auto zero_copy_buf) { + assert_null(zero_copy_buf); count++; size_t data_offset = buf.data_ptr - buf.alloc_ptr; EXPECT_EQ(data_offset, 2468u); @@ -197,33 +202,39 @@ TEST(LibDenoTest, GlobalErrorHandling) { deno_delete(d); } -TEST(LibDenoTest, DataBuf) { +TEST(LibDenoTest, ZeroCopyBuf) { static int count = 0; - static deno_buf data_buf_copy; - auto recv_cb = [](auto _, int req_id, deno_buf buf, deno_buf data_buf) { + static deno_buf zero_copy_buf2; + auto recv_cb = [](auto user_data, deno_buf buf, deno_buf zero_copy_buf) { count++; - data_buf.data_ptr[0] = 4; - data_buf.data_ptr[1] = 2; - data_buf_copy = data_buf; + EXPECT_GT(zero_copy_buf.zero_copy_id, 0u); + zero_copy_buf.data_ptr[0] = 4; + zero_copy_buf.data_ptr[1] = 2; + zero_copy_buf2 = zero_copy_buf; EXPECT_EQ(2u, buf.data_len); - EXPECT_EQ(2u, data_buf.data_len); + EXPECT_EQ(2u, zero_copy_buf.data_len); EXPECT_EQ(buf.data_ptr[0], 1); EXPECT_EQ(buf.data_ptr[1], 2); + // Note zero_copy_buf won't actually be freed here because in + // libdeno_test.js zeroCopyBuf is a rooted global. We just want to exercise + // the API here. + auto d = reinterpret_cast(user_data); + deno_zero_copy_release(d, zero_copy_buf.zero_copy_id); }; Deno* d = deno_new(deno_config{0, snapshot, empty, recv_cb}); - deno_execute(d, nullptr, "a.js", "DataBuf()"); + deno_execute(d, d, "a.js", "ZeroCopyBuf()"); EXPECT_EQ(nullptr, deno_last_exception(d)); EXPECT_EQ(count, 1); - // data_buf was subsequently changed in JS, let's check that our copy reflects - // that. - EXPECT_EQ(data_buf_copy.data_ptr[0], 9); - EXPECT_EQ(data_buf_copy.data_ptr[1], 8); + // zero_copy_buf was subsequently changed in JS, let's check that our copy + // reflects that. + EXPECT_EQ(zero_copy_buf2.data_ptr[0], 9); + EXPECT_EQ(zero_copy_buf2.data_ptr[1], 8); deno_delete(d); } TEST(LibDenoTest, CheckPromiseErrors) { static int count = 0; - auto recv_cb = [](auto _, int req_id, auto buf, auto data_buf) { count++; }; + auto recv_cb = [](auto _, auto buf, auto zero_copy_buf) { count++; }; Deno* d = deno_new(deno_config{0, snapshot, empty, recv_cb}); EXPECT_EQ(deno_last_exception(d), nullptr); deno_execute(d, nullptr, "a.js", "CheckPromiseErrors()"); @@ -271,7 +282,7 @@ TEST(LibDenoTest, EncodeErrorBug) { TEST(LibDenoTest, Shared) { uint8_t s[] = {0, 1, 2}; - deno_buf shared = {nullptr, 0, s, 3}; + deno_buf shared = {nullptr, 0, s, 3, 0}; Deno* d = deno_new(deno_config{0, snapshot, shared, nullptr}); deno_execute(d, nullptr, "a.js", "Shared()"); EXPECT_EQ(nullptr, deno_last_exception(d)); @@ -306,7 +317,7 @@ TEST(LibDenoTest, LibDenoEvalContextError) { TEST(LibDenoTest, SharedAtomics) { int32_t s[] = {0, 1, 2}; - deno_buf shared = {nullptr, 0, reinterpret_cast(s), sizeof s}; + deno_buf shared = {nullptr, 0, reinterpret_cast(s), sizeof s, 0}; Deno* d = deno_new(deno_config{0, empty, shared, nullptr}); deno_execute(d, nullptr, "a.js", "Atomics.add(new Int32Array(libdeno.shared), 0, 1)"); diff --git a/libdeno/libdeno_test.js b/libdeno/libdeno_test.js index d6ea5f98321abf..8b1ad2e04168fc 100644 --- a/libdeno/libdeno_test.js +++ b/libdeno/libdeno_test.js @@ -103,11 +103,11 @@ global.GlobalErrorHandling = () => { }; // Allocate this buf at the top level to avoid GC. -const dataBuf = new Uint8Array([3, 4]); +const zeroCopyBuf = new Uint8Array([3, 4]); -global.DataBuf = () => { +global.ZeroCopyBuf = () => { const a = new Uint8Array([1, 2]); - const b = dataBuf; + const b = zeroCopyBuf; // The second parameter of send should modified by the // privileged side. const r = libdeno.send(a, b); diff --git a/libdeno/modules_test.cc b/libdeno/modules_test.cc index d41c38b8e9a817..cb800e89a5325a 100644 --- a/libdeno/modules_test.cc +++ b/libdeno/modules_test.cc @@ -2,10 +2,12 @@ #include "test.h" static int exec_count = 0; -void recv_cb(void* user_data, int req_id, deno_buf buf, deno_buf data_buf) { +void recv_cb(void* user_data, deno_buf buf, deno_buf zero_copy_buf) { // We use this to check that scripts have executed. EXPECT_EQ(1u, buf.data_len); EXPECT_EQ(buf.data_ptr[0], 4); + EXPECT_EQ(zero_copy_buf.zero_copy_id, 0u); + EXPECT_EQ(zero_copy_buf.data_ptr, nullptr); exec_count++; } diff --git a/libdeno/test.cc b/libdeno/test.cc index a8fcbc63b64e33..1340fe8c37165c 100644 --- a/libdeno/test.cc +++ b/libdeno/test.cc @@ -3,7 +3,7 @@ #include #include "file_util.h" -deno_buf snapshot = {nullptr, 0, nullptr, 0}; +deno_buf snapshot = {nullptr, 0, nullptr, 0, 0}; int main(int argc, char** argv) { // Locate the snapshot. diff --git a/libdeno/test.h b/libdeno/test.h index 25ca939887b549..2f7c3238446d50 100644 --- a/libdeno/test.h +++ b/libdeno/test.h @@ -6,6 +6,6 @@ #include "testing/gtest/include/gtest/gtest.h" extern deno_buf snapshot; // Loaded in libdeno/test.cc -const deno_buf empty = {nullptr, 0, nullptr, 0}; +const deno_buf empty = {nullptr, 0, nullptr, 0, 0}; #endif // TEST_H_ diff --git a/src/isolate.rs b/src/isolate.rs index b723738d712b78..440c168f2b88c2 100644 --- a/src/isolate.rs +++ b/src/isolate.rs @@ -48,15 +48,17 @@ pub type Buf = Box<[u8]>; pub type Op = dyn Future + Send; // Returns (is_sync, op) -pub type Dispatch = - fn(isolate: &Isolate, buf: libdeno::deno_buf, data_buf: libdeno::deno_buf) - -> (bool, Box); +pub type Dispatch = fn( + isolate: &Isolate, + buf: libdeno::deno_buf, + zero_copy_buf: libdeno::deno_buf, +) -> (bool, Box); pub struct Isolate { libdeno_isolate: *const libdeno::isolate, dispatch: Dispatch, - rx: mpsc::Receiver<(i32, Buf)>, - tx: mpsc::Sender<(i32, Buf)>, + rx: mpsc::Receiver<(usize, Buf)>, + tx: mpsc::Sender<(usize, Buf)>, ntasks: Cell, timeout_due: Cell>, pub modules: RefCell, @@ -204,7 +206,7 @@ impl Isolate { }; let libdeno_isolate = unsafe { libdeno::deno_new(config) }; // This channel handles sending async messages back to the runtime. - let (tx, rx) = mpsc::channel::<(i32, Buf)>(); + let (tx, rx) = mpsc::channel::<(usize, Buf)>(); Self { libdeno_isolate, @@ -404,37 +406,39 @@ impl Isolate { Ok(()) } - pub fn respond(&self, req_id: i32, buf: Buf) { + pub fn respond(&self, zero_copy_id: usize, buf: Buf) { self.state.metrics_op_completed(buf.len()); + + // This will be cleaned up in the future. + if zero_copy_id > 0 { + unsafe { + libdeno::deno_zero_copy_release(self.libdeno_isolate, zero_copy_id) + } + } + // deno_respond will memcpy the buf into V8's heap, // so borrowing a reference here is sufficient. unsafe { libdeno::deno_respond( self.libdeno_isolate, self.as_raw_ptr(), - req_id, buf.as_ref().into(), ) } } - fn complete_op(&self, req_id: i32, buf: Buf) { + fn complete_op(&self, zero_copy_id: usize, buf: Buf) { // Receiving a message on rx exactly corresponds to an async task // completing. self.ntasks_decrement(); // Call into JS with the buf. - self.respond(req_id, buf); + self.respond(zero_copy_id, buf); } fn timeout(&self) { let dummy_buf = libdeno::deno_buf::empty(); unsafe { - libdeno::deno_respond( - self.libdeno_isolate, - self.as_raw_ptr(), - -1, - dummy_buf, - ) + libdeno::deno_respond(self.libdeno_isolate, self.as_raw_ptr(), dummy_buf) } } @@ -450,7 +454,7 @@ impl Isolate { // Main thread event loop. while !self.is_idle() { match recv_deadline(&self.rx, self.get_timeout_due()) { - Ok((req_id, buf)) => self.complete_op(req_id, buf), + Ok((zero_copy_id, buf)) => self.complete_op(zero_copy_id, buf), Err(mpsc::RecvTimeoutError::Timeout) => self.timeout(), Err(e) => panic!("recv_deadline() failed: {:?}", e), } @@ -532,23 +536,24 @@ extern "C" fn resolve_cb( // Dereferences the C pointer into the Rust Isolate object. extern "C" fn pre_dispatch( user_data: *mut c_void, - req_id: i32, control_buf: libdeno::deno_buf, - data_buf: libdeno::deno_buf, + zero_copy_buf: libdeno::deno_buf, ) { // for metrics let bytes_sent_control = control_buf.len(); - let bytes_sent_data = data_buf.len(); + let bytes_sent_zero_copy = zero_copy_buf.len(); + + let zero_copy_id = zero_copy_buf.zero_copy_id; // We should ensure that there is no other `&mut Isolate` exists. // And also, it should be in the same thread with other `&Isolate`s. let isolate = unsafe { Isolate::from_raw_ptr(user_data) }; let dispatch = isolate.dispatch; - let (is_sync, op) = dispatch(isolate, control_buf, data_buf); + let (is_sync, op) = dispatch(isolate, control_buf, zero_copy_buf); isolate .state - .metrics_op_dispatched(bytes_sent_control, bytes_sent_data); + .metrics_op_dispatched(bytes_sent_control, bytes_sent_zero_copy); if is_sync { // Execute op synchronously. @@ -560,7 +565,7 @@ extern "C" fn pre_dispatch( isolate.state.metrics_op_completed(buf.len()); } else { // Set the synchronous response, the value returned from isolate.send(). - isolate.respond(req_id, buf); + isolate.respond(zero_copy_id, buf); } } else { // Execute op asynchronously. @@ -574,7 +579,7 @@ extern "C" fn pre_dispatch( let task = op .and_then(move |buf| { let sender = tx; // tx is moved to new thread - sender.send((req_id, buf)).expect("tx.send error"); + sender.send((zero_copy_id, buf)).expect("tx.send error"); Ok(()) }).map_err(|_| ()); tokio::spawn(task); diff --git a/src/libdeno.rs b/src/libdeno.rs index 204f817b5b79b5..6696a382bbb6db 100644 --- a/src/libdeno.rs +++ b/src/libdeno.rs @@ -1,4 +1,9 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. + +// TODO Remove. While core is being developed, it may not use the complete +// libdeno API. Thus we allow dead code until things settle. +#![allow(dead_code)] + use libc::c_char; use libc::c_int; use libc::c_void; @@ -25,6 +30,7 @@ pub struct deno_buf { alloc_len: usize, data_ptr: *const u8, data_len: usize, + pub zero_copy_id: usize, } /// `deno_buf` can not clone, and there is no interior mutability. @@ -39,6 +45,7 @@ impl deno_buf { alloc_len: 0, data_ptr: null(), data_len: 0, + zero_copy_id: 0, } } @@ -49,6 +56,7 @@ impl deno_buf { alloc_len: 0, data_ptr: ptr, data_len: len, + zero_copy_id: 0, } } } @@ -62,6 +70,7 @@ impl<'a> From<&'a [u8]> for deno_buf { alloc_len: 0, data_ptr: x.as_ref().as_ptr(), data_len: x.len(), + zero_copy_id: 0, } } } @@ -106,9 +115,8 @@ impl AsMut<[u8]> for deno_buf { #[allow(non_camel_case_types)] type deno_recv_cb = unsafe extern "C" fn( user_data: *mut c_void, - req_id: i32, - buf: deno_buf, - data_buf: deno_buf, + control_buf: deno_buf, // deprecated + zero_copy_buf: deno_buf, ); #[allow(non_camel_case_types)] @@ -137,12 +145,14 @@ extern "C" { pub fn deno_delete(i: *const isolate); pub fn deno_last_exception(i: *const isolate) -> *const c_char; pub fn deno_check_promise_errors(i: *const isolate); + pub fn deno_lock(i: *const isolate); + pub fn deno_unlock(i: *const isolate); pub fn deno_respond( i: *const isolate, user_data: *const c_void, - req_id: i32, buf: deno_buf, ); + pub fn deno_zero_copy_release(i: *const isolate, zero_copy_id: usize); pub fn deno_execute( i: *const isolate, user_data: *const c_void, diff --git a/tools/benchmark.py b/tools/benchmark.py index c9a3e024320e7a..53037b1e7187d0 100755 --- a/tools/benchmark.py +++ b/tools/benchmark.py @@ -204,8 +204,10 @@ def main(argv): # pipe. if os.name != 'nt': hyper_hello_path = os.path.join(build_dir, "hyper_hello") + core_http_bench_exe = os.path.join(build_dir, "deno_core_http_bench") new_data["throughput"] = run_throughput(deno_path) - new_data["req_per_sec"] = http_benchmark(deno_path, hyper_hello_path) + new_data["req_per_sec"] = http_benchmark(deno_path, hyper_hello_path, + core_http_bench_exe) if "linux" in sys.platform: # Thread count test, only on linux new_data["thread_count"] = run_thread_count_benchmark(deno_path) diff --git a/tools/format.py b/tools/format.py index fe053eaa32fe4a..083640c2272587 100755 --- a/tools/format.py +++ b/tools/format.py @@ -39,7 +39,7 @@ def qrun(cmd, env=None): print "prettier" qrun(["node", prettier, "--write", "--loglevel=error"] + ["rollup.config.js"] + glob("*.json") + glob("*.md") + - find_exts([".github", "js", "tests", "tools", "website"], + find_exts([".github", "js", "tests", "tools", "website", "core"], [".js", ".json", ".ts", ".md"], skip=["tools/clang", "js/deps"])) @@ -47,4 +47,4 @@ def qrun(cmd, env=None): qrun([ "third_party/rustfmt/" + platform() + "/rustfmt", "--config-path", rustfmt_config, "build.rs" -] + find_exts(["src"], [".rs"])) +] + find_exts(["src", "core"], [".rs"])) diff --git a/tools/http_benchmark.py b/tools/http_benchmark.py index 312e61da6f6dc1..d84a24de61125f 100755 --- a/tools/http_benchmark.py +++ b/tools/http_benchmark.py @@ -30,6 +30,16 @@ def deno_net_http_benchmark(deno_exe): }) +def deno_core_single(exe): + print "http_benchmark testing deno_core_single" + return run([exe, "--single-thread"]) + + +def deno_core_multi(exe): + print "http_benchmark testing deno_core_multi" + return run([exe, "--multi-thread"]) + + def node_http_benchmark(): node_cmd = ["node", "tools/node_http.js", ADDR.split(":")[1]] print "http_benchmark testing NODE." @@ -48,11 +58,13 @@ def hyper_http_benchmark(hyper_hello_exe): return run(hyper_cmd) -def http_benchmark(deno_exe, hyper_hello_exe): +def http_benchmark(deno_exe, hyper_hello_exe, core_http_bench_exe): r = {} # TODO Rename to "deno_tcp" r["deno"] = deno_http_benchmark(deno_exe) r["deno_net_http"] = deno_net_http_benchmark(deno_exe) + r["deno_core_single"] = deno_core_single(core_http_bench_exe) + r["deno_core_multi"] = deno_core_multi(core_http_bench_exe) r["node"] = node_http_benchmark() r["node_tcp"] = node_tcp_benchmark() r["hyper"] = hyper_http_benchmark(hyper_hello_exe) @@ -68,8 +80,14 @@ def run(server_cmd, merge_env=None): for key, value in merge_env.iteritems(): env[key] = value + # Wait for port 4544 to become available. + # TODO Need to use SO_REUSEPORT with tokio::net::TcpListener. + time.sleep(5) + server = subprocess.Popen(server_cmd, env=env) + time.sleep(5) # wait for server to wake up. TODO racy. + try: cmd = "third_party/wrk/%s/wrk -d %s http://%s/" % (util.platform(), DURATION, ADDR) diff --git a/tools/lint.py b/tools/lint.py index 148cc47281a27b..e3e7653874e513 100755 --- a/tools/lint.py +++ b/tools/lint.py @@ -21,8 +21,8 @@ run(["node", tslint, "-p", ".", "--exclude", "**/gen/**/*.ts"]) run([ - "node", tslint, "./js/**/*_test.ts", "./tests/**/*.ts", "--exclude", - "**/gen/**/*.ts", "--project", "tsconfig.json" + "node", tslint, "./js/**/*_test.ts", "./tests/**/*.ts", "./core/*.js", + "--exclude", "**/gen/**/*.ts", "--project", "tsconfig.json" ]) run([sys.executable, "third_party/depot_tools/pylint.py"] + diff --git a/tools/node_tcp_promise.js b/tools/node_tcp_promise.js new file mode 100644 index 00000000000000..c8fc54aba92a43 --- /dev/null +++ b/tools/node_tcp_promise.js @@ -0,0 +1,25 @@ +// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. +// Note: this is a keep-alive server. +const { Server } = require("net"); +const port = process.argv[2] || "4544"; +console.log("port", port); + +const response = Buffer.from( + "HTTP/1.1 200 OK\r\nContent-Length: 12\r\n\r\nHello World\n" +); + +async function write(socket, buffer) { + let p = new Promise((resolve, reject) => { + socket.write(buffer, resolve); + }); + return p; +} + +Server(async socket => { + socket.on("error", _ => { + socket.destroy(); + }); + for await (const data of socket) { + write(socket, response); + } +}).listen(port); diff --git a/tools/test.py b/tools/test.py index 5d08e59d1c42e3..5a8c67519405df 100755 --- a/tools/test.py +++ b/tools/test.py @@ -85,6 +85,11 @@ def main(argv): check_exists(test_rs) run([test_rs]) + deno_core_test = os.path.join(build_dir, + "deno_core_test" + executable_suffix) + check_exists(deno_core_test) + run([deno_core_test]) + unit_tests(deno_exe) prefetch_test(deno_exe)