From 77e15b0a69c38f5b6e800e8b5a34aae8ba0e7271 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartek=20Iwa=C5=84czuk?= Date: Thu, 8 Jun 2023 01:39:02 +0200 Subject: [PATCH 01/17] fix(ext/node): handle 'upgrade' responses --- ext/node/polyfills/http.ts | 24 +++++++++++++++++++++--- 1 file changed, 21 insertions(+), 3 deletions(-) diff --git a/ext/node/polyfills/http.ts b/ext/node/polyfills/http.ts index 250d34e7cbf355..7fe98c9a635ab1 100644 --- a/ext/node/polyfills/http.ts +++ b/ext/node/polyfills/http.ts @@ -38,7 +38,7 @@ import { Agent, globalAgent } from "ext:deno_node/_http_agent.mjs"; import { urlToHttpOptions } from "ext:deno_node/internal/url.ts"; import { kEmptyObject } from "ext:deno_node/internal/util.mjs"; import { constants, TCP } from "ext:deno_node/internal_binding/tcp_wrap.ts"; -import { notImplemented } from "ext:deno_node/_utils.ts"; +import { notImplemented, warnNotImplemented } from "ext:deno_node/_utils.ts"; import { connResetException, ERR_HTTP_HEADERS_SENT, @@ -504,7 +504,7 @@ class ClientRequest extends OutgoingMessage { } if (options!.createConnection) { - notImplemented("ClientRequest.options.createConnection"); + warnNotImplemented("ClientRequest.options.createConnection"); } if (options!.lookup) { @@ -567,6 +567,7 @@ class ClientRequest extends OutgoingMessage { this.method === "POST" || this.method === "PATCH" || this.method === "PUT", ); + console.log("this._req", this._req); this._bodyWriteRid = this._req.requestBodyRid; } @@ -631,6 +632,16 @@ class ClientRequest extends OutgoingMessage { incoming.statusCode = res.status; incoming.statusMessage = res.statusText; + console.log("res.headers", res.headers, incoming.rawHeaders); + + for (const [key, value] of res.headers) { + console.log("key", key, value); + if (key.toLowerCase() === "upgrade") { + incoming.upgrade = true; + break; + } + } + incoming._addHeaderLines( res.headers, Object.entries(res.headers).flat().length, @@ -641,7 +652,14 @@ class ClientRequest extends OutgoingMessage { core.tryClose(this._req.cancelHandleRid); } - this.emit("response", incoming); + console.log("incoming", incoming.headers); + if (incoming.upgrade) { + // TODO(bartlomieju): throw on connect request + // handle actual upgrade + this.emit("upgrade", incoming, new FakeSocket(), []); + } else { + this.emit("response", incoming); + } } catch (err) { if (this._req.cancelHandleRid !== null) { core.tryClose(this._req.cancelHandleRid); From 6fc27a2f3ad8551c465014878d9936394c6000ed Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartek=20Iwa=C5=84czuk?= Date: Thu, 8 Jun 2023 01:39:44 +0200 Subject: [PATCH 02/17] Revert "refactor(core): use JoinSet instead of FuturesUnordered (#19378)" This reverts commit 19f82b0eaa14f0df58fdfc685e60c8560582c5a4. --- cli/tools/test.rs | 17 ----------- core/ops.rs | 4 --- core/realm.rs | 10 ++----- core/runtime.rs | 72 ++++++++++++++++++++++++++++----------------- ext/ffi/callback.rs | 21 ++++++------- 5 files changed, 59 insertions(+), 65 deletions(-) diff --git a/cli/tools/test.rs b/cli/tools/test.rs index 6f32d69e49dadd..ebe4deb9ae954c 100644 --- a/cli/tools/test.rs +++ b/cli/tools/test.rs @@ -28,7 +28,6 @@ use deno_core::error::AnyError; use deno_core::error::JsError; use deno_core::futures::future; use deno_core::futures::stream; -use deno_core::futures::task::noop_waker; use deno_core::futures::FutureExt; use deno_core::futures::StreamExt; use deno_core::located_script_name; @@ -67,7 +66,6 @@ use std::sync::atomic::AtomicBool; use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering; use std::sync::Arc; -use std::task::Context; use std::time::Duration; use std::time::Instant; use std::time::SystemTime; @@ -1008,21 +1006,6 @@ pub async fn test_specifier( continue; } sender.send(TestEvent::Wait(desc.id))?; - - // TODO(bartlomieju): this is a nasty (beautiful) hack, that was required - // when switching `JsRuntime` from `FuturesUnordered` to `JoinSet`. With - // `JoinSet` all pending ops are immediately polled and that caused a problem - // when some async ops were fired and canceled before running tests (giving - // false positives in the ops sanitizer). We should probably rewrite sanitizers - // to be done in Rust instead of in JS (40_testing.js). - { - // Poll event loop once, this will allow all ops that are already resolved, - // but haven't responded to settle. - let waker = noop_waker(); - let mut cx = Context::from_waker(&waker); - let _ = worker.js_runtime.poll_event_loop(&mut cx, false); - } - let earlier = SystemTime::now(); let result = match worker.js_runtime.call_and_await(&function).await { Ok(r) => r, diff --git a/core/ops.rs b/core/ops.rs index b766eb60d20af6..5f1bf67ef6f8f3 100644 --- a/core/ops.rs +++ b/core/ops.rs @@ -10,7 +10,6 @@ use crate::OpDecl; use crate::OpsTracker; use anyhow::Error; use futures::future::MaybeDone; -use futures::task::AtomicWaker; use futures::Future; use futures::FutureExt; use pin_project::pin_project; @@ -22,7 +21,6 @@ use std::pin::Pin; use std::ptr::NonNull; use std::rc::Rc; use std::rc::Weak; -use std::sync::Arc; use v8::fast_api::CFunctionInfo; use v8::fast_api::CTypeInfo; @@ -186,7 +184,6 @@ pub struct OpState { pub tracker: OpsTracker, pub last_fast_op_error: Option, pub(crate) gotham_state: GothamState, - pub waker: Arc, } impl OpState { @@ -197,7 +194,6 @@ impl OpState { gotham_state: Default::default(), last_fast_op_error: None, tracker: OpsTracker::new(ops_count), - waker: Arc::new(AtomicWaker::new()), } } diff --git a/core/realm.rs b/core/realm.rs index d18f41e662dcd7..94ce77464d1648 100644 --- a/core/realm.rs +++ b/core/realm.rs @@ -5,12 +5,10 @@ use crate::modules::ModuleCode; use crate::ops::OpCtx; use crate::runtime::exception_to_err_result; use crate::runtime::JsRuntimeState; -use crate::task::MaskResultAsSend; use crate::JsRuntime; -use crate::OpId; -use crate::OpResult; -use crate::PromiseId; +use crate::OpCall; use anyhow::Error; +use futures::stream::FuturesUnordered; use std::cell::RefCell; use std::collections::HashSet; use std::collections::VecDeque; @@ -18,7 +16,6 @@ use std::hash::BuildHasherDefault; use std::hash::Hasher; use std::option::Option; use std::rc::Rc; -use tokio::task::JoinSet; use v8::HandleScope; use v8::Local; @@ -51,8 +48,7 @@ pub(crate) struct ContextState { pub(crate) pending_promise_rejections: VecDeque<(v8::Global, v8::Global)>, pub(crate) unrefed_ops: HashSet>, - pub(crate) pending_ops: - JoinSet>, + pub(crate) pending_ops: FuturesUnordered, // We don't explicitly re-read this prop but need the slice to live alongside // the context pub(crate) op_ctxs: Box<[OpCtx]>, diff --git a/core/runtime.rs b/core/runtime.rs index ecfd0bd571e81f..a27717a8b45c69 100644 --- a/core/runtime.rs +++ b/core/runtime.rs @@ -41,6 +41,7 @@ use futures::future::FutureExt; use futures::future::MaybeDone; use futures::stream::StreamExt; use futures::task::noop_waker; +use futures::task::AtomicWaker; use smallvec::SmallVec; use std::any::Any; use std::cell::RefCell; @@ -308,6 +309,7 @@ pub struct JsRuntimeState { dyn_module_evaluate_idle_counter: u32, pub(crate) source_map_getter: Option>>, pub(crate) source_map_cache: Rc>, + pub(crate) have_unpolled_ops: bool, pub(crate) op_state: Rc>, pub(crate) shared_array_buffer_store: Option, pub(crate) compiled_wasm_module_store: Option, @@ -318,6 +320,7 @@ pub struct JsRuntimeState { // flimsy. Try to poll it similarly to `pending_promise_rejections`. pub(crate) dispatched_exception: Option>, pub(crate) inspector: Option>>, + waker: AtomicWaker, } impl JsRuntimeState { @@ -543,6 +546,8 @@ impl JsRuntime { shared_array_buffer_store: options.shared_array_buffer_store, compiled_wasm_module_store: options.compiled_wasm_module_store, op_state: op_state.clone(), + waker: AtomicWaker::new(), + have_unpolled_ops: false, dispatched_exception: None, // Some fields are initialized later after isolate is created inspector: None, @@ -1323,7 +1328,7 @@ impl JsRuntime { { let state = self.inner.state.borrow(); has_inspector = state.inspector.is_some(); - state.op_state.borrow().waker.register(cx.waker()); + state.waker.register(cx.waker()); } if has_inspector { @@ -1414,11 +1419,12 @@ impl JsRuntime { // TODO(andreubotella) The event loop will spin as long as there are pending // background tasks. We should look into having V8 notify us when a // background task is done. - if pending_state.has_pending_background_tasks + if state.have_unpolled_ops + || pending_state.has_pending_background_tasks || pending_state.has_tick_scheduled || maybe_scheduling { - state.op_state.borrow().waker.wake(); + state.waker.wake(); } drop(state); @@ -1471,7 +1477,7 @@ impl JsRuntime { // evaluation may complete during this, in which case the counter will // reset. state.dyn_module_evaluate_idle_counter += 1; - state.op_state.borrow().waker.wake(); + state.waker.wake(); } } @@ -1664,7 +1670,7 @@ impl JsRuntimeState { /// after initiating new dynamic import load. pub fn notify_new_dynamic_import(&mut self) { // Notify event loop to poll again soon. - self.op_state.borrow().waker.wake(); + self.waker.wake(); } } @@ -2398,6 +2404,12 @@ impl JsRuntime { // Polls pending ops and then runs `Deno.core.eventLoopTick` callback. fn do_js_event_loop_tick(&mut self, cx: &mut Context) -> Result<(), Error> { + // Now handle actual ops. + { + let mut state = self.inner.state.borrow_mut(); + state.have_unpolled_ops = false; + } + // Handle responses for each realm. let state = self.inner.state.clone(); let isolate = &mut self.inner.v8_isolate; @@ -2421,15 +2433,10 @@ impl JsRuntime { let mut args: SmallVec<[v8::Local; 32]> = SmallVec::with_capacity(32); - loop { - let item = { - let next = std::pin::pin!(context_state.pending_ops.join_next()); - let Poll::Ready(Some(item)) = next.poll(cx) else { - break; - }; - item - }; - let (promise_id, op_id, mut resp) = item.unwrap().into_inner(); + while let Poll::Ready(Some(item)) = + context_state.pending_ops.poll_next_unpin(cx) + { + let (promise_id, op_id, mut resp) = item; state .borrow() .op_state @@ -2479,6 +2486,11 @@ pub fn queue_fast_async_op( promise_id: PromiseId, op: impl Future> + 'static, ) { + let runtime_state = match ctx.runtime_state.upgrade() { + Some(rc_state) => rc_state, + // at least 1 Rc is held by the JsRuntime. + None => unreachable!(), + }; let get_class = { let state = RefCell::borrow(&ctx.state); state.tracker.track_async(ctx.id); @@ -2487,10 +2499,13 @@ pub fn queue_fast_async_op( let fut = op .map(|result| crate::_ops::to_op_result(get_class, result)) .boxed_local(); - // SAFETY: this this is guaranteed to be running on a current-thread executor - ctx.context_state.borrow_mut().pending_ops.spawn(unsafe { - crate::task::MaskFutureAsSend::new(OpCall::pending(ctx, promise_id, fut)) - }); + let mut state = runtime_state.borrow_mut(); + ctx + .context_state + .borrow_mut() + .pending_ops + .push(OpCall::pending(ctx, promise_id, fut)); + state.have_unpolled_ops = true; } #[inline] @@ -2569,6 +2584,12 @@ pub fn queue_async_op<'s>( promise_id: PromiseId, mut op: MaybeDone>>>, ) -> Option> { + let runtime_state = match ctx.runtime_state.upgrade() { + Some(rc_state) => rc_state, + // at least 1 Rc is held by the JsRuntime. + None => unreachable!(), + }; + // An op's realm (as given by `OpCtx::realm_idx`) must match the realm in // which it is invoked. Otherwise, we might have cross-realm object exposure. // deno_core doesn't currently support such exposure, even though embedders @@ -2606,12 +2627,9 @@ pub fn queue_async_op<'s>( // Otherwise we will push it to the `pending_ops` and let it be polled again // or resolved on the next tick of the event loop. - ctx - .context_state - .borrow_mut() - .pending_ops - // SAFETY: this this is guaranteed to be running on a current-thread executor - .spawn(unsafe { crate::task::MaskFutureAsSend::new(op_call) }); + let mut state = runtime_state.borrow_mut(); + ctx.context_state.borrow_mut().pending_ops.push(op_call); + state.have_unpolled_ops = true; None } @@ -2726,8 +2744,8 @@ pub mod tests { (runtime, dispatch_count) } - #[tokio::test] - async fn test_ref_unref_ops() { + #[test] + fn test_ref_unref_ops() { let (mut runtime, _dispatch_count) = setup(Mode::AsyncDeferred); runtime .execute_script_static( @@ -4717,7 +4735,6 @@ Deno.core.opAsync("op_async_serialize_object_with_numbers_as_keys", { } } - #[ignore] #[tokio::test] async fn js_realm_gc() { static INVOKE_COUNT: AtomicUsize = AtomicUsize::new(0); @@ -4776,6 +4793,7 @@ Deno.core.opAsync("op_async_serialize_object_with_numbers_as_keys", { .await .unwrap(); } + drop(runtime); // Make sure the OpState was dropped properly when the runtime dropped diff --git a/ext/ffi/callback.rs b/ext/ffi/callback.rs index 78a21ab8f42777..2d2cf491bee451 100644 --- a/ext/ffi/callback.rs +++ b/ext/ffi/callback.rs @@ -10,7 +10,6 @@ use crate::MAX_SAFE_INTEGER; use crate::MIN_SAFE_INTEGER; use deno_core::error::AnyError; use deno_core::futures::channel::mpsc; -use deno_core::futures::task::AtomicWaker; use deno_core::op; use deno_core::serde_v8; use deno_core::v8; @@ -33,8 +32,8 @@ use std::rc::Rc; use std::sync::atomic; use std::sync::atomic::AtomicU32; use std::sync::mpsc::sync_channel; -use std::sync::Arc; use std::task::Poll; +use std::task::Waker; static THREAD_ID_COUNTER: AtomicU32 = AtomicU32::new(1); @@ -100,20 +99,21 @@ struct CallbackInfo { pub parameters: Box<[NativeType]>, pub result: NativeType, pub thread_id: u32, - pub waker: Arc, + pub waker: Option, } impl Future for CallbackInfo { type Output = (); fn poll( - self: Pin<&mut Self>, - _cx: &mut std::task::Context<'_>, + mut self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, ) -> std::task::Poll { + // Always replace the waker to make sure it's bound to the proper Future. + self.waker.replace(cx.waker().clone()); // The future for the CallbackInfo never resolves: It can only be canceled. Poll::Pending } } - unsafe extern "C" fn deno_ffi_callback( cif: &libffi::low::ffi_cif, result: &mut c_void, @@ -136,8 +136,10 @@ unsafe extern "C" fn deno_ffi_callback( response_sender.send(()).unwrap(); }); async_work_sender.unbounded_send(fut).unwrap(); - // Make sure event loop wakes up to receive our message before we start waiting for a response. - info.waker.wake(); + if let Some(waker) = info.waker.as_ref() { + // Make sure event loop wakes up to receive our message before we start waiting for a response. + waker.wake_by_ref(); + } response_receiver.recv().unwrap(); } }); @@ -572,7 +574,6 @@ where let current_context = scope.get_current_context(); let context = v8::Global::new(scope, current_context).into_raw(); - let waker = state.waker.clone(); let info: *mut CallbackInfo = Box::leak(Box::new(CallbackInfo { async_work_sender, callback, @@ -580,7 +581,7 @@ where parameters: args.parameters.clone().into(), result: args.result.clone(), thread_id, - waker, + waker: None, })); let cif = Cif::new( args From 1f2b3c5050b132f0329eace683de7231a1795568 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartek=20Iwa=C5=84czuk?= Date: Sat, 10 Jun 2023 01:25:10 +0200 Subject: [PATCH 03/17] fill properties, throw on connect --- ext/node/polyfills/http.ts | 22 ++++++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/ext/node/polyfills/http.ts b/ext/node/polyfills/http.ts index 4ebfddc307ad3c..7327653eab3971 100644 --- a/ext/node/polyfills/http.ts +++ b/ext/node/polyfills/http.ts @@ -563,7 +563,6 @@ class ClientRequest extends OutgoingMessage { this.method === "POST" || this.method === "PATCH" || this.method === "PUT", ); - console.log("this._req", this._req); this._bodyWriteRid = this._req.requestBodyRid; } @@ -656,6 +655,7 @@ class ClientRequest extends OutgoingMessage { incoming.url = res.url; incoming.statusCode = res.status; incoming.statusMessage = res.statusText; + incoming.upgrade = null; console.log("res.headers", res.headers, incoming.rawHeaders); @@ -679,9 +679,23 @@ class ClientRequest extends OutgoingMessage { console.log("incoming", incoming.headers); if (incoming.upgrade) { - // TODO(bartlomieju): throw on connect request - // handle actual upgrade - this.emit("upgrade", incoming, new FakeSocket(), []); + if (this.listenerCount("upgrade") === 0) { + // No listeners, so we got nothing to do + // destroy? + return; + } + + if (this.method === "CONNECT") { + throw new Error("not implemented CONNECT"); + } + + this.upgradeOrConnect = true; + + // TODO(bartlomieju): handle actual upgrade + this.emit("upgrade", incoming, null, []); + this.destroyed = true; + this._closed = true; + this.emit("close"); } else { this.emit("response", incoming); } From 126ed3848523bf9d563b1ddbe3ba1f384a0936da Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartek=20Iwa=C5=84czuk?= Date: Sat, 10 Jun 2023 01:41:37 +0200 Subject: [PATCH 04/17] perform the actual upgrade --- ext/fetch/lib.rs | 104 +++++++++++++++++++++++++++++++++++++ ext/node/polyfills/http.ts | 11 +++- 2 files changed, 113 insertions(+), 2 deletions(-) diff --git a/ext/fetch/lib.rs b/ext/fetch/lib.rs index a36512c774235d..4ee4b66d4247d7 100644 --- a/ext/fetch/lib.rs +++ b/ext/fetch/lib.rs @@ -109,6 +109,9 @@ deno_core::extension!(deno_fetch, ops = [ op_fetch, op_fetch_send, + op_fetch_send2, + op_fetch_raw_response_consume, + op_fetch_upgrade_raw_response, op_fetch_custom_client, ], esm = [ @@ -468,6 +471,95 @@ pub async fn op_fetch_send( }) } +#[op] +pub async fn op_fetch_send2( + state: Rc>, + rid: ResourceId, +) -> Result { + eprintln!("in op_fetch_send2"); + let request = state + .borrow_mut() + .resource_table + .take::(rid)?; + + let request = Rc::try_unwrap(request) + .ok() + .expect("multiple op_fetch_send ongoing"); + + let res = match request.0.await { + Ok(Ok(res)) => res, + Ok(Err(err)) => return Err(type_error(err.to_string())), + Err(_) => return Err(type_error("request was cancelled")), + }; + + //debug!("Fetch response {}", url); + let status = res.status(); + let url = res.url().to_string(); + let mut res_headers = Vec::new(); + for (key, val) in res.headers().iter() { + res_headers.push((key.as_str().into(), val.as_bytes().into())); + } + + let content_length = res.content_length(); + + let rid = state + .borrow_mut() + .resource_table + .add(FetchRawResponseResource { + response: res, + size: content_length, + }); + + Ok(FetchResponse { + status: status.as_u16(), + status_text: status.canonical_reason().unwrap_or("").to_string(), + headers: res_headers, + url, + response_rid: rid, + content_length, + }) +} + +#[op] +pub fn op_fetch_raw_response_consume( + state: &mut OpState, + rid: ResourceId, +) -> Result { + let raw_response = + state.resource_table.take::(rid)?; + let raw_response = Rc::try_unwrap(raw_response) + .expect("Someone is holding onto FetchRawResponseResource"); + let stream: BytesStream = + Box::pin(raw_response.response.bytes_stream().map(|r| { + r.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err)) + })); + + let rid = state.resource_table.add(FetchResponseBodyResource { + reader: AsyncRefCell::new(stream.peekable()), + cancel: CancelHandle::default(), + size: raw_response.size, + }); + + Ok(rid) +} + +#[op] +pub async fn op_fetch_upgrade_raw_response( + state: Rc>, + rid: ResourceId, +) -> Result<(), AnyError> { + let raw_response = state + .borrow_mut() + .resource_table + .take::(rid)?; + let raw_response = Rc::try_unwrap(raw_response) + .expect("Someone is holding onto FetchRawResponseResource"); + + let a = raw_response.response.upgrade().await?; + eprintln!("upgraded the connection!"); + Ok(()) +} + type CancelableResponseResult = Result, Canceled>; pub struct FetchRequestResource( @@ -545,6 +637,18 @@ impl Resource for FetchRequestBodyResource { type BytesStream = Pin> + Unpin>>; +#[derive(Debug)] +pub struct FetchRawResponseResource { + pub response: Response, + pub size: Option, +} + +impl Resource for FetchRawResponseResource { + fn name(&self) -> Cow { + "fetchRawResponse".into() + } +} + pub struct FetchResponseBodyResource { pub reader: AsyncRefCell>, pub cancel: CancelHandle, diff --git a/ext/node/polyfills/http.ts b/ext/node/polyfills/http.ts index 7327653eab3971..a42d9aeb895070 100644 --- a/ext/node/polyfills/http.ts +++ b/ext/node/polyfills/http.ts @@ -617,7 +617,7 @@ class ClientRequest extends OutgoingMessage { (async () => { try { const [res, _] = await Promise.all([ - core.opAsync("op_fetch_send", this._req.requestRid), + core.opAsync("op_fetch_send2", this._req.requestRid), (async () => { if (this._bodyWriteRid) { try { @@ -671,7 +671,6 @@ class ClientRequest extends OutgoingMessage { res.headers, Object.entries(res.headers).flat().length, ); - incoming._bodyRid = res.responseRid; if (this._req.cancelHandleRid !== null) { core.tryClose(this._req.cancelHandleRid); @@ -689,6 +688,7 @@ class ClientRequest extends OutgoingMessage { throw new Error("not implemented CONNECT"); } + await core.opAsync("op_fetch_upgrade_raw_response", res.responseRid); this.upgradeOrConnect = true; // TODO(bartlomieju): handle actual upgrade @@ -697,6 +697,13 @@ class ClientRequest extends OutgoingMessage { this._closed = true; this.emit("close"); } else { + // FIXME(bartlomieju): handle upgrade + { + const responseRid = core.ops.op_fetch_raw_response_consume( + res.responseRid, + ); + incoming._bodyRid = responseRid; + } this.emit("response", incoming); } } catch (err) { From e7f1338779901db8a71946ab042695aa49f45704 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartek=20Iwa=C5=84czuk?= Date: Sat, 10 Jun 2023 02:50:05 +0200 Subject: [PATCH 05/17] it's working, but addresses are faked out --- ext/fetch/lib.rs | 101 +++++++++++++++++++++++++++++++++++-- ext/node/polyfills/http.ts | 25 ++++++++- 2 files changed, 121 insertions(+), 5 deletions(-) diff --git a/ext/fetch/lib.rs b/ext/fetch/lib.rs index 4ee4b66d4247d7..7bd015a86ce881 100644 --- a/ext/fetch/lib.rs +++ b/ext/fetch/lib.rs @@ -543,11 +543,13 @@ pub fn op_fetch_raw_response_consume( Ok(rid) } +use deno_core::task::spawn; + #[op] pub async fn op_fetch_upgrade_raw_response( state: Rc>, rid: ResourceId, -) -> Result<(), AnyError> { +) -> Result { let raw_response = state .borrow_mut() .resource_table @@ -555,9 +557,102 @@ pub async fn op_fetch_upgrade_raw_response( let raw_response = Rc::try_unwrap(raw_response) .expect("Someone is holding onto FetchRawResponseResource"); - let a = raw_response.response.upgrade().await?; + let (read, write) = tokio::io::duplex(1024); + let (read_rx, write_tx) = tokio::io::split(read); + let (mut write_rx, mut read_tx) = tokio::io::split(write); + let upgraded = raw_response.response.upgrade().await?; eprintln!("upgraded the connection!"); - Ok(()) + { + // Stage 3: Pump the data + let (mut upgraded_rx, mut upgraded_tx) = tokio::io::split(upgraded); + + spawn(async move { + let mut buf = [0; 1024]; + loop { + let read = upgraded_rx.read(&mut buf).await?; + if read == 0 { + break; + } + read_tx.write_all(&buf[..read]).await?; + } + Ok::<_, AnyError>(()) + }); + spawn(async move { + let mut buf = [0; 1024]; + loop { + let read = write_rx.read(&mut buf).await?; + if read == 0 { + break; + } + upgraded_tx.write_all(&buf[..read]).await?; + } + Ok::<_, AnyError>(()) + }); + } + + Ok( + state + .borrow_mut() + .resource_table + .add(UpgradeStream::new(read_rx, write_tx)), + ) +} + +use tokio::io::AsyncReadExt; +use tokio::io::AsyncWriteExt; + +struct UpgradeStream { + read: AsyncRefCell>, + write: AsyncRefCell>, + cancel_handle: CancelHandle, +} + +impl UpgradeStream { + pub fn new( + read: tokio::io::ReadHalf, + write: tokio::io::WriteHalf, + ) -> Self { + Self { + read: AsyncRefCell::new(read), + write: AsyncRefCell::new(write), + cancel_handle: CancelHandle::new(), + } + } + + async fn read(self: Rc, buf: &mut [u8]) -> Result { + let cancel_handle = RcRef::map(self.clone(), |this| &this.cancel_handle); + async { + let read = RcRef::map(self, |this| &this.read); + let mut read = read.borrow_mut().await; + Ok(Pin::new(&mut *read).read(buf).await?) + } + .try_or_cancel(cancel_handle) + .await + } + + async fn write(self: Rc, buf: &[u8]) -> Result { + let cancel_handle = RcRef::map(self.clone(), |this| &this.cancel_handle); + async { + let write = RcRef::map(self, |this| &this.write); + let mut write = write.borrow_mut().await; + Ok(Pin::new(&mut *write).write(buf).await?) + } + .try_or_cancel(cancel_handle) + .await + } +} + +impl Resource for UpgradeStream { + fn name(&self) -> Cow { + "fetchRawUpgradeStream".into() + } + + deno_core::impl_readable_byob!(); + deno_core::impl_writable!(); + + fn close(self: Rc) { + self.cancel_handle.cancel(); + } } type CancelableResponseResult = Result, Canceled>; diff --git a/ext/node/polyfills/http.ts b/ext/node/polyfills/http.ts index a42d9aeb895070..e1bae6e5aef384 100644 --- a/ext/node/polyfills/http.ts +++ b/ext/node/polyfills/http.ts @@ -52,6 +52,7 @@ import { serve, upgradeHttpRaw } from "ext:deno_http/00_serve.js"; import { createHttpClient } from "ext:deno_fetch/22_http_client.js"; import { timerId } from "ext:deno_web/03_abort_signal.js"; import { clearTimeout as webClearTimeout } from "ext:deno_web/02_timers.js"; +import { TcpConn } from "ext:deno_net/01_net.js"; enum STATUS_CODES { /** RFC 7231, 6.2.1 */ @@ -688,11 +689,31 @@ class ClientRequest extends OutgoingMessage { throw new Error("not implemented CONNECT"); } - await core.opAsync("op_fetch_upgrade_raw_response", res.responseRid); + const upgradeRid = await core.opAsync( + "op_fetch_upgrade_raw_response", + res.responseRid, + ); + const conn = new TcpConn( + upgradeRid, + { + transport: "tcp", + hostname: "127.0.0.1", + port: 90, + }, + { + transport: "tcp", + hostname: "127.0.0.1", + port: 90, + }, + ); + const socket = new Socket({ + handle: new TCP(constants.SERVER, conn), + }); + this.upgradeOrConnect = true; // TODO(bartlomieju): handle actual upgrade - this.emit("upgrade", incoming, null, []); + this.emit("upgrade", incoming, socket, Buffer.from([])); this.destroyed = true; this._closed = true; this.emit("close"); From f88b72098138198a9ab104c7b6ede904251c8f38 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartek=20Iwa=C5=84czuk?= Date: Sat, 10 Jun 2023 02:56:41 +0200 Subject: [PATCH 06/17] revert unrelated changes --- cli/tools/test.rs | 17 +++++++++++ core/ops.rs | 4 +++ core/realm.rs | 10 +++++-- core/runtime.rs | 72 +++++++++++++++++---------------------------- ext/ffi/callback.rs | 21 +++++++------ 5 files changed, 65 insertions(+), 59 deletions(-) diff --git a/cli/tools/test.rs b/cli/tools/test.rs index ebe4deb9ae954c..6f32d69e49dadd 100644 --- a/cli/tools/test.rs +++ b/cli/tools/test.rs @@ -28,6 +28,7 @@ use deno_core::error::AnyError; use deno_core::error::JsError; use deno_core::futures::future; use deno_core::futures::stream; +use deno_core::futures::task::noop_waker; use deno_core::futures::FutureExt; use deno_core::futures::StreamExt; use deno_core::located_script_name; @@ -66,6 +67,7 @@ use std::sync::atomic::AtomicBool; use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering; use std::sync::Arc; +use std::task::Context; use std::time::Duration; use std::time::Instant; use std::time::SystemTime; @@ -1006,6 +1008,21 @@ pub async fn test_specifier( continue; } sender.send(TestEvent::Wait(desc.id))?; + + // TODO(bartlomieju): this is a nasty (beautiful) hack, that was required + // when switching `JsRuntime` from `FuturesUnordered` to `JoinSet`. With + // `JoinSet` all pending ops are immediately polled and that caused a problem + // when some async ops were fired and canceled before running tests (giving + // false positives in the ops sanitizer). We should probably rewrite sanitizers + // to be done in Rust instead of in JS (40_testing.js). + { + // Poll event loop once, this will allow all ops that are already resolved, + // but haven't responded to settle. + let waker = noop_waker(); + let mut cx = Context::from_waker(&waker); + let _ = worker.js_runtime.poll_event_loop(&mut cx, false); + } + let earlier = SystemTime::now(); let result = match worker.js_runtime.call_and_await(&function).await { Ok(r) => r, diff --git a/core/ops.rs b/core/ops.rs index 5f1bf67ef6f8f3..b766eb60d20af6 100644 --- a/core/ops.rs +++ b/core/ops.rs @@ -10,6 +10,7 @@ use crate::OpDecl; use crate::OpsTracker; use anyhow::Error; use futures::future::MaybeDone; +use futures::task::AtomicWaker; use futures::Future; use futures::FutureExt; use pin_project::pin_project; @@ -21,6 +22,7 @@ use std::pin::Pin; use std::ptr::NonNull; use std::rc::Rc; use std::rc::Weak; +use std::sync::Arc; use v8::fast_api::CFunctionInfo; use v8::fast_api::CTypeInfo; @@ -184,6 +186,7 @@ pub struct OpState { pub tracker: OpsTracker, pub last_fast_op_error: Option, pub(crate) gotham_state: GothamState, + pub waker: Arc, } impl OpState { @@ -194,6 +197,7 @@ impl OpState { gotham_state: Default::default(), last_fast_op_error: None, tracker: OpsTracker::new(ops_count), + waker: Arc::new(AtomicWaker::new()), } } diff --git a/core/realm.rs b/core/realm.rs index 94ce77464d1648..d18f41e662dcd7 100644 --- a/core/realm.rs +++ b/core/realm.rs @@ -5,10 +5,12 @@ use crate::modules::ModuleCode; use crate::ops::OpCtx; use crate::runtime::exception_to_err_result; use crate::runtime::JsRuntimeState; +use crate::task::MaskResultAsSend; use crate::JsRuntime; -use crate::OpCall; +use crate::OpId; +use crate::OpResult; +use crate::PromiseId; use anyhow::Error; -use futures::stream::FuturesUnordered; use std::cell::RefCell; use std::collections::HashSet; use std::collections::VecDeque; @@ -16,6 +18,7 @@ use std::hash::BuildHasherDefault; use std::hash::Hasher; use std::option::Option; use std::rc::Rc; +use tokio::task::JoinSet; use v8::HandleScope; use v8::Local; @@ -48,7 +51,8 @@ pub(crate) struct ContextState { pub(crate) pending_promise_rejections: VecDeque<(v8::Global, v8::Global)>, pub(crate) unrefed_ops: HashSet>, - pub(crate) pending_ops: FuturesUnordered, + pub(crate) pending_ops: + JoinSet>, // We don't explicitly re-read this prop but need the slice to live alongside // the context pub(crate) op_ctxs: Box<[OpCtx]>, diff --git a/core/runtime.rs b/core/runtime.rs index a27717a8b45c69..ecfd0bd571e81f 100644 --- a/core/runtime.rs +++ b/core/runtime.rs @@ -41,7 +41,6 @@ use futures::future::FutureExt; use futures::future::MaybeDone; use futures::stream::StreamExt; use futures::task::noop_waker; -use futures::task::AtomicWaker; use smallvec::SmallVec; use std::any::Any; use std::cell::RefCell; @@ -309,7 +308,6 @@ pub struct JsRuntimeState { dyn_module_evaluate_idle_counter: u32, pub(crate) source_map_getter: Option>>, pub(crate) source_map_cache: Rc>, - pub(crate) have_unpolled_ops: bool, pub(crate) op_state: Rc>, pub(crate) shared_array_buffer_store: Option, pub(crate) compiled_wasm_module_store: Option, @@ -320,7 +318,6 @@ pub struct JsRuntimeState { // flimsy. Try to poll it similarly to `pending_promise_rejections`. pub(crate) dispatched_exception: Option>, pub(crate) inspector: Option>>, - waker: AtomicWaker, } impl JsRuntimeState { @@ -546,8 +543,6 @@ impl JsRuntime { shared_array_buffer_store: options.shared_array_buffer_store, compiled_wasm_module_store: options.compiled_wasm_module_store, op_state: op_state.clone(), - waker: AtomicWaker::new(), - have_unpolled_ops: false, dispatched_exception: None, // Some fields are initialized later after isolate is created inspector: None, @@ -1328,7 +1323,7 @@ impl JsRuntime { { let state = self.inner.state.borrow(); has_inspector = state.inspector.is_some(); - state.waker.register(cx.waker()); + state.op_state.borrow().waker.register(cx.waker()); } if has_inspector { @@ -1419,12 +1414,11 @@ impl JsRuntime { // TODO(andreubotella) The event loop will spin as long as there are pending // background tasks. We should look into having V8 notify us when a // background task is done. - if state.have_unpolled_ops - || pending_state.has_pending_background_tasks + if pending_state.has_pending_background_tasks || pending_state.has_tick_scheduled || maybe_scheduling { - state.waker.wake(); + state.op_state.borrow().waker.wake(); } drop(state); @@ -1477,7 +1471,7 @@ impl JsRuntime { // evaluation may complete during this, in which case the counter will // reset. state.dyn_module_evaluate_idle_counter += 1; - state.waker.wake(); + state.op_state.borrow().waker.wake(); } } @@ -1670,7 +1664,7 @@ impl JsRuntimeState { /// after initiating new dynamic import load. pub fn notify_new_dynamic_import(&mut self) { // Notify event loop to poll again soon. - self.waker.wake(); + self.op_state.borrow().waker.wake(); } } @@ -2404,12 +2398,6 @@ impl JsRuntime { // Polls pending ops and then runs `Deno.core.eventLoopTick` callback. fn do_js_event_loop_tick(&mut self, cx: &mut Context) -> Result<(), Error> { - // Now handle actual ops. - { - let mut state = self.inner.state.borrow_mut(); - state.have_unpolled_ops = false; - } - // Handle responses for each realm. let state = self.inner.state.clone(); let isolate = &mut self.inner.v8_isolate; @@ -2433,10 +2421,15 @@ impl JsRuntime { let mut args: SmallVec<[v8::Local; 32]> = SmallVec::with_capacity(32); - while let Poll::Ready(Some(item)) = - context_state.pending_ops.poll_next_unpin(cx) - { - let (promise_id, op_id, mut resp) = item; + loop { + let item = { + let next = std::pin::pin!(context_state.pending_ops.join_next()); + let Poll::Ready(Some(item)) = next.poll(cx) else { + break; + }; + item + }; + let (promise_id, op_id, mut resp) = item.unwrap().into_inner(); state .borrow() .op_state @@ -2486,11 +2479,6 @@ pub fn queue_fast_async_op( promise_id: PromiseId, op: impl Future> + 'static, ) { - let runtime_state = match ctx.runtime_state.upgrade() { - Some(rc_state) => rc_state, - // at least 1 Rc is held by the JsRuntime. - None => unreachable!(), - }; let get_class = { let state = RefCell::borrow(&ctx.state); state.tracker.track_async(ctx.id); @@ -2499,13 +2487,10 @@ pub fn queue_fast_async_op( let fut = op .map(|result| crate::_ops::to_op_result(get_class, result)) .boxed_local(); - let mut state = runtime_state.borrow_mut(); - ctx - .context_state - .borrow_mut() - .pending_ops - .push(OpCall::pending(ctx, promise_id, fut)); - state.have_unpolled_ops = true; + // SAFETY: this this is guaranteed to be running on a current-thread executor + ctx.context_state.borrow_mut().pending_ops.spawn(unsafe { + crate::task::MaskFutureAsSend::new(OpCall::pending(ctx, promise_id, fut)) + }); } #[inline] @@ -2584,12 +2569,6 @@ pub fn queue_async_op<'s>( promise_id: PromiseId, mut op: MaybeDone>>>, ) -> Option> { - let runtime_state = match ctx.runtime_state.upgrade() { - Some(rc_state) => rc_state, - // at least 1 Rc is held by the JsRuntime. - None => unreachable!(), - }; - // An op's realm (as given by `OpCtx::realm_idx`) must match the realm in // which it is invoked. Otherwise, we might have cross-realm object exposure. // deno_core doesn't currently support such exposure, even though embedders @@ -2627,9 +2606,12 @@ pub fn queue_async_op<'s>( // Otherwise we will push it to the `pending_ops` and let it be polled again // or resolved on the next tick of the event loop. - let mut state = runtime_state.borrow_mut(); - ctx.context_state.borrow_mut().pending_ops.push(op_call); - state.have_unpolled_ops = true; + ctx + .context_state + .borrow_mut() + .pending_ops + // SAFETY: this this is guaranteed to be running on a current-thread executor + .spawn(unsafe { crate::task::MaskFutureAsSend::new(op_call) }); None } @@ -2744,8 +2726,8 @@ pub mod tests { (runtime, dispatch_count) } - #[test] - fn test_ref_unref_ops() { + #[tokio::test] + async fn test_ref_unref_ops() { let (mut runtime, _dispatch_count) = setup(Mode::AsyncDeferred); runtime .execute_script_static( @@ -4735,6 +4717,7 @@ Deno.core.opAsync("op_async_serialize_object_with_numbers_as_keys", { } } + #[ignore] #[tokio::test] async fn js_realm_gc() { static INVOKE_COUNT: AtomicUsize = AtomicUsize::new(0); @@ -4793,7 +4776,6 @@ Deno.core.opAsync("op_async_serialize_object_with_numbers_as_keys", { .await .unwrap(); } - drop(runtime); // Make sure the OpState was dropped properly when the runtime dropped diff --git a/ext/ffi/callback.rs b/ext/ffi/callback.rs index 2d2cf491bee451..78a21ab8f42777 100644 --- a/ext/ffi/callback.rs +++ b/ext/ffi/callback.rs @@ -10,6 +10,7 @@ use crate::MAX_SAFE_INTEGER; use crate::MIN_SAFE_INTEGER; use deno_core::error::AnyError; use deno_core::futures::channel::mpsc; +use deno_core::futures::task::AtomicWaker; use deno_core::op; use deno_core::serde_v8; use deno_core::v8; @@ -32,8 +33,8 @@ use std::rc::Rc; use std::sync::atomic; use std::sync::atomic::AtomicU32; use std::sync::mpsc::sync_channel; +use std::sync::Arc; use std::task::Poll; -use std::task::Waker; static THREAD_ID_COUNTER: AtomicU32 = AtomicU32::new(1); @@ -99,21 +100,20 @@ struct CallbackInfo { pub parameters: Box<[NativeType]>, pub result: NativeType, pub thread_id: u32, - pub waker: Option, + pub waker: Arc, } impl Future for CallbackInfo { type Output = (); fn poll( - mut self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, + self: Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, ) -> std::task::Poll { - // Always replace the waker to make sure it's bound to the proper Future. - self.waker.replace(cx.waker().clone()); // The future for the CallbackInfo never resolves: It can only be canceled. Poll::Pending } } + unsafe extern "C" fn deno_ffi_callback( cif: &libffi::low::ffi_cif, result: &mut c_void, @@ -136,10 +136,8 @@ unsafe extern "C" fn deno_ffi_callback( response_sender.send(()).unwrap(); }); async_work_sender.unbounded_send(fut).unwrap(); - if let Some(waker) = info.waker.as_ref() { - // Make sure event loop wakes up to receive our message before we start waiting for a response. - waker.wake_by_ref(); - } + // Make sure event loop wakes up to receive our message before we start waiting for a response. + info.waker.wake(); response_receiver.recv().unwrap(); } }); @@ -574,6 +572,7 @@ where let current_context = scope.get_current_context(); let context = v8::Global::new(scope, current_context).into_raw(); + let waker = state.waker.clone(); let info: *mut CallbackInfo = Box::leak(Box::new(CallbackInfo { async_work_sender, callback, @@ -581,7 +580,7 @@ where parameters: args.parameters.clone().into(), result: args.result.clone(), thread_id, - waker: None, + waker, })); let cif = Cif::new( args From ccbe367be278f4d741415e0a1a20d3c3ba2c27ee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartek=20Iwa=C5=84czuk?= Date: Sat, 10 Jun 2023 02:56:47 +0200 Subject: [PATCH 07/17] remove console.logs --- ext/node/polyfills/http.ts | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/ext/node/polyfills/http.ts b/ext/node/polyfills/http.ts index e1bae6e5aef384..ea0818f5a1835c 100644 --- a/ext/node/polyfills/http.ts +++ b/ext/node/polyfills/http.ts @@ -658,8 +658,7 @@ class ClientRequest extends OutgoingMessage { incoming.statusMessage = res.statusText; incoming.upgrade = null; - console.log("res.headers", res.headers, incoming.rawHeaders); - + // TODO(bartlomieju): use object lookup instead of iterating for (const [key, value] of res.headers) { console.log("key", key, value); if (key.toLowerCase() === "upgrade") { @@ -677,7 +676,6 @@ class ClientRequest extends OutgoingMessage { core.tryClose(this._req.cancelHandleRid); } - console.log("incoming", incoming.headers); if (incoming.upgrade) { if (this.listenerCount("upgrade") === 0) { // No listeners, so we got nothing to do @@ -695,6 +693,7 @@ class ClientRequest extends OutgoingMessage { ); const conn = new TcpConn( upgradeRid, + // TODO(bartlomieju): figure out actual values { transport: "tcp", hostname: "127.0.0.1", @@ -718,7 +717,6 @@ class ClientRequest extends OutgoingMessage { this._closed = true; this.emit("close"); } else { - // FIXME(bartlomieju): handle upgrade { const responseRid = core.ops.op_fetch_raw_response_consume( res.responseRid, From b5dd4c6b8f5f4d9be8249fc4abe198dae7c2bf2f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartek=20Iwa=C5=84czuk?= Date: Sat, 10 Jun 2023 15:55:36 +0200 Subject: [PATCH 08/17] debug --- ext/node/polyfills/_stream.mjs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/ext/node/polyfills/_stream.mjs b/ext/node/polyfills/_stream.mjs index 2e2fcce8c36366..f7c6a2ea297e4a 100644 --- a/ext/node/polyfills/_stream.mjs +++ b/ext/node/polyfills/_stream.mjs @@ -3822,6 +3822,7 @@ var require_writable = __commonJS({ destroyImpl.construct(this, () => { const state = this._writableState; if (!state.writing) { + console.log("clear buffer1"); clearBuffer(this, state); } finishMaybe(this, state); @@ -3843,6 +3844,7 @@ var require_writable = __commonJS({ errorOrDestroy(this, new ERR_STREAM_CANNOT_PIPE()); }; function _write(stream, chunk, encoding, cb) { + console.log("_write", encoding); const state = stream._writableState; if (typeof encoding === "function") { cb = encoding; @@ -3903,6 +3905,7 @@ var require_writable = __commonJS({ if (state.corked) { state.corked--; if (!state.writing) { + console.log("clear buffer2"); clearBuffer(this, state); } } @@ -3979,6 +3982,7 @@ var require_writable = __commonJS({ return; } state.writing = false; + console.log("writing false", new Error()); state.writecb = null; state.length -= state.writelen; state.writelen = 0; @@ -3997,6 +4001,7 @@ var require_writable = __commonJS({ } } else { if (state.buffered.length > state.bufferedIndex) { + console.log("clearing buffer"); clearBuffer(stream, state); } if (sync) { @@ -4080,6 +4085,7 @@ var require_writable = __commonJS({ } let i = bufferedIndex; state.bufferProcessing = true; + console.log("clearBuffer"); if (bufferedLength > 1 && stream._writev) { state.pendingcb -= bufferedLength - 1; const callback = state.allNoop ? nop : (err) => { @@ -4362,6 +4368,7 @@ var require_writable = __commonJS({ }); var destroy = destroyImpl.destroy; Writable.prototype.destroy = function (err, cb) { + console.log("destroy", new Error()); const state = this._writableState; if ( !state.destroyed && From f75275627d7a42358e54afa71a92262f9d266813 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartek=20Iwa=C5=84czuk?= Date: Tue, 13 Jun 2023 01:10:53 +0200 Subject: [PATCH 09/17] deduplicate code --- ext/fetch/26_fetch.js | 2 +- ext/fetch/lib.rs | 109 ++++++++++++------------------------- ext/node/polyfills/http.ts | 8 ++- 3 files changed, 42 insertions(+), 77 deletions(-) diff --git a/ext/fetch/26_fetch.js b/ext/fetch/26_fetch.js index 5084fab3433df6..0dc06db021905c 100644 --- a/ext/fetch/26_fetch.js +++ b/ext/fetch/26_fetch.js @@ -86,7 +86,7 @@ function opFetch(method, url, headers, clientRid, hasBody, bodyLength, body) { * @returns {Promise<{ status: number, statusText: string, headers: [string, string][], url: string, responseRid: number }>} */ function opFetchSend(rid) { - return core.opAsync("op_fetch_send", rid); + return core.opAsync("op_fetch_send", rid, true); } /** diff --git a/ext/fetch/lib.rs b/ext/fetch/lib.rs index 7bd015a86ce881..ed608ed21dc505 100644 --- a/ext/fetch/lib.rs +++ b/ext/fetch/lib.rs @@ -109,9 +109,8 @@ deno_core::extension!(deno_fetch, ops = [ op_fetch, op_fetch_send, - op_fetch_send2, - op_fetch_raw_response_consume, - op_fetch_upgrade_raw_response, + op_fetch_response_into_byte_stream, + op_fetch_response_upgrade, op_fetch_custom_client, ], esm = [ @@ -423,6 +422,7 @@ pub struct FetchResponse { pub async fn op_fetch_send( state: Rc>, rid: ResourceId, + into_byte_stream: bool, ) -> Result { let request = state .borrow_mut() @@ -439,7 +439,6 @@ pub async fn op_fetch_send( Err(_) => return Err(type_error("request was cancelled")), }; - //debug!("Fetch response {}", url); let status = res.status(); let url = res.url().to_string(); let mut res_headers = Vec::new(); @@ -449,86 +448,46 @@ pub async fn op_fetch_send( let content_length = res.content_length(); - let stream: BytesStream = Box::pin(res.bytes_stream().map(|r| { - r.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err)) - })); - let rid = state - .borrow_mut() - .resource_table - .add(FetchResponseBodyResource { - reader: AsyncRefCell::new(stream.peekable()), - cancel: CancelHandle::default(), - size: content_length, - }); - - Ok(FetchResponse { - status: status.as_u16(), - status_text: status.canonical_reason().unwrap_or("").to_string(), - headers: res_headers, - url, - response_rid: rid, - content_length, - }) -} - -#[op] -pub async fn op_fetch_send2( - state: Rc>, - rid: ResourceId, -) -> Result { - eprintln!("in op_fetch_send2"); - let request = state - .borrow_mut() - .resource_table - .take::(rid)?; - - let request = Rc::try_unwrap(request) - .ok() - .expect("multiple op_fetch_send ongoing"); - - let res = match request.0.await { - Ok(Ok(res)) => res, - Ok(Err(err)) => return Err(type_error(err.to_string())), - Err(_) => return Err(type_error("request was cancelled")), + let response_rid = if !into_byte_stream { + state + .borrow_mut() + .resource_table + .add(FetchResponseResource { + response: res, + size: content_length, + }) + } else { + let stream: BytesStream = Box::pin(res.bytes_stream().map(|r| { + r.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err)) + })); + state + .borrow_mut() + .resource_table + .add(FetchResponseBodyResource { + reader: AsyncRefCell::new(stream.peekable()), + cancel: CancelHandle::default(), + size: content_length, + }) }; - //debug!("Fetch response {}", url); - let status = res.status(); - let url = res.url().to_string(); - let mut res_headers = Vec::new(); - for (key, val) in res.headers().iter() { - res_headers.push((key.as_str().into(), val.as_bytes().into())); - } - - let content_length = res.content_length(); - - let rid = state - .borrow_mut() - .resource_table - .add(FetchRawResponseResource { - response: res, - size: content_length, - }); - Ok(FetchResponse { status: status.as_u16(), status_text: status.canonical_reason().unwrap_or("").to_string(), headers: res_headers, url, - response_rid: rid, + response_rid, content_length, }) } #[op] -pub fn op_fetch_raw_response_consume( +pub fn op_fetch_response_into_byte_stream( state: &mut OpState, rid: ResourceId, ) -> Result { - let raw_response = - state.resource_table.take::(rid)?; + let raw_response = state.resource_table.take::(rid)?; let raw_response = Rc::try_unwrap(raw_response) - .expect("Someone is holding onto FetchRawResponseResource"); + .expect("Someone is holding onto FetchResponseResource"); let stream: BytesStream = Box::pin(raw_response.response.bytes_stream().map(|r| { r.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err)) @@ -546,16 +505,16 @@ pub fn op_fetch_raw_response_consume( use deno_core::task::spawn; #[op] -pub async fn op_fetch_upgrade_raw_response( +pub async fn op_fetch_response_upgrade( state: Rc>, rid: ResourceId, ) -> Result { let raw_response = state .borrow_mut() .resource_table - .take::(rid)?; + .take::(rid)?; let raw_response = Rc::try_unwrap(raw_response) - .expect("Someone is holding onto FetchRawResponseResource"); + .expect("Someone is holding onto FetchResponseResource"); let (read, write) = tokio::io::duplex(1024); let (read_rx, write_tx) = tokio::io::split(read); @@ -644,7 +603,7 @@ impl UpgradeStream { impl Resource for UpgradeStream { fn name(&self) -> Cow { - "fetchRawUpgradeStream".into() + "fetchUpgradedStream".into() } deno_core::impl_readable_byob!(); @@ -733,14 +692,14 @@ type BytesStream = Pin> + Unpin>>; #[derive(Debug)] -pub struct FetchRawResponseResource { +pub struct FetchResponseResource { pub response: Response, pub size: Option, } -impl Resource for FetchRawResponseResource { +impl Resource for FetchResponseResource { fn name(&self) -> Cow { - "fetchRawResponse".into() + "fetchResponse".into() } } diff --git a/ext/node/polyfills/http.ts b/ext/node/polyfills/http.ts index ea0818f5a1835c..4d3ca4b21dcd3a 100644 --- a/ext/node/polyfills/http.ts +++ b/ext/node/polyfills/http.ts @@ -618,7 +618,13 @@ class ClientRequest extends OutgoingMessage { (async () => { try { const [res, _] = await Promise.all([ - core.opAsync("op_fetch_send2", this._req.requestRid), + core.opAsync( + "op_fetch_send", + this._req.requestRid, + /* false because we want to have access to actual Response, + not the bytes stream of response (because we need to handle upgrades) */ + false, + ), (async () => { if (this._bodyWriteRid) { try { From fbf1f34fa3abde1205852011c39d974295564b58 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartek=20Iwa=C5=84czuk?= Date: Tue, 13 Jun 2023 01:18:47 +0200 Subject: [PATCH 10/17] update ops to call --- ext/node/polyfills/http.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ext/node/polyfills/http.ts b/ext/node/polyfills/http.ts index 4d3ca4b21dcd3a..a1d7c3a40743da 100644 --- a/ext/node/polyfills/http.ts +++ b/ext/node/polyfills/http.ts @@ -694,7 +694,7 @@ class ClientRequest extends OutgoingMessage { } const upgradeRid = await core.opAsync( - "op_fetch_upgrade_raw_response", + "op_fetch_response_upgrade", res.responseRid, ); const conn = new TcpConn( @@ -724,7 +724,7 @@ class ClientRequest extends OutgoingMessage { this.emit("close"); } else { { - const responseRid = core.ops.op_fetch_raw_response_consume( + const responseRid = core.ops.op_fetch_response_into_byte_stream( res.responseRid, ); incoming._bodyRid = responseRid; From 3f03a543a6357ea5403dde252438c862b44a9a13 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartek=20Iwa=C5=84czuk?= Date: Tue, 13 Jun 2023 01:18:53 +0200 Subject: [PATCH 11/17] add some debug files --- ws.js | 15 +++++++++++++++ ws1.js | 14 ++++++++++++++ ws_client.js | 28 ++++++++++++++++++++++++++++ ws_client2.js | 26 ++++++++++++++++++++++++++ 4 files changed, 83 insertions(+) create mode 100644 ws.js create mode 100644 ws1.js create mode 100644 ws_client.js create mode 100644 ws_client2.js diff --git a/ws.js b/ws.js new file mode 100644 index 00000000000000..c931f5928565fd --- /dev/null +++ b/ws.js @@ -0,0 +1,15 @@ +Deno.serve({ port: 7000 }, (req) => { + const { socket, response } = Deno.upgradeWebSocket(req, { + idleTimeout: 0, + }); + let i = 0; + socket.onmessage = (e) => { + if (i == 2) { + socket.close(); + return; + } + i++; + socket.send(e.data); + }; + return response; +}); diff --git a/ws1.js b/ws1.js new file mode 100644 index 00000000000000..70eaa72da227aa --- /dev/null +++ b/ws1.js @@ -0,0 +1,14 @@ +import { WebSocketServer } from "npm:ws"; + +const wss = new WebSocketServer({ port: 7000 }); +console.log("Listening on http://127.0.0.1:7000"); + +wss.on("connection", function connection(ws) { + ws.on("error", console.error); + + ws.on("message", function message(data) { + console.log("received: %s", data); + }); + + ws.send("something"); +}); diff --git a/ws_client.js b/ws_client.js new file mode 100644 index 00000000000000..49196d5449445b --- /dev/null +++ b/ws_client.js @@ -0,0 +1,28 @@ +import WebSocket from "npm:ws"; + +const ws = new WebSocket("ws://127.0.0.1:7000"); + +ws.on("error", console.error); + +ws.on("open", function open() { + ws.send("something"); + let i = 0; + let id; + id = setInterval(() => { + i++; + if (i > 2) { + clearInterval(id); + ws.close(); + return; + } + ws.send("hello " + i); + }, 1000); +}); + +ws.on("message", function message(data) { + console.log("received: %s", data); +}); + +ws.on("close", function close() { + console.log("websocket closed"); +}); diff --git a/ws_client2.js b/ws_client2.js new file mode 100644 index 00000000000000..f30a6fde5f2eab --- /dev/null +++ b/ws_client2.js @@ -0,0 +1,26 @@ +const ws = new WebSocket("ws://127.0.0.1:7000"); + +ws.addEventListener("error", console.error); + +ws.addEventListener("open", function open() { + ws.send("something"); + let i = 0; + let id; + id = setInterval(() => { + i++; + if (i > 2) { + clearInterval(id); + ws.close(); + return; + } + ws.send("hello " + i); + }, 1000); +}); + +ws.addEventListener("message", function message(data) { + console.log("received: %s", data); +}); + +ws.addEventListener("close", function close() { + console.log("websocket closed"); +}); From d7b69d2eccd714920a6b90c2f1077fc94ab289fb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartek=20Iwa=C5=84czuk?= Date: Tue, 13 Jun 2023 02:10:17 +0200 Subject: [PATCH 12/17] use better remote addr --- ext/fetch/lib.rs | 10 ++++++++++ ext/node/polyfills/_stream.mjs | 7 ------- ext/node/polyfills/http.ts | 12 +++++++----- 3 files changed, 17 insertions(+), 12 deletions(-) diff --git a/ext/fetch/lib.rs b/ext/fetch/lib.rs index ed608ed21dc505..87d9206337e12d 100644 --- a/ext/fetch/lib.rs +++ b/ext/fetch/lib.rs @@ -416,6 +416,8 @@ pub struct FetchResponse { pub url: String, pub response_rid: ResourceId, pub content_length: Option, + pub remote_addr_ip: Option, + pub remote_addr_port: Option, } #[op] @@ -447,6 +449,12 @@ pub async fn op_fetch_send( } let content_length = res.content_length(); + let remote_addr = res.remote_addr(); + let (remote_addr_ip, remote_addr_port) = if let Some(addr) = remote_addr { + (Some(addr.ip().to_string()), Some(addr.port())) + } else { + (None, None) + }; let response_rid = if !into_byte_stream { state @@ -477,6 +485,8 @@ pub async fn op_fetch_send( url, response_rid, content_length, + remote_addr_ip, + remote_addr_port, }) } diff --git a/ext/node/polyfills/_stream.mjs b/ext/node/polyfills/_stream.mjs index f7c6a2ea297e4a..2e2fcce8c36366 100644 --- a/ext/node/polyfills/_stream.mjs +++ b/ext/node/polyfills/_stream.mjs @@ -3822,7 +3822,6 @@ var require_writable = __commonJS({ destroyImpl.construct(this, () => { const state = this._writableState; if (!state.writing) { - console.log("clear buffer1"); clearBuffer(this, state); } finishMaybe(this, state); @@ -3844,7 +3843,6 @@ var require_writable = __commonJS({ errorOrDestroy(this, new ERR_STREAM_CANNOT_PIPE()); }; function _write(stream, chunk, encoding, cb) { - console.log("_write", encoding); const state = stream._writableState; if (typeof encoding === "function") { cb = encoding; @@ -3905,7 +3903,6 @@ var require_writable = __commonJS({ if (state.corked) { state.corked--; if (!state.writing) { - console.log("clear buffer2"); clearBuffer(this, state); } } @@ -3982,7 +3979,6 @@ var require_writable = __commonJS({ return; } state.writing = false; - console.log("writing false", new Error()); state.writecb = null; state.length -= state.writelen; state.writelen = 0; @@ -4001,7 +3997,6 @@ var require_writable = __commonJS({ } } else { if (state.buffered.length > state.bufferedIndex) { - console.log("clearing buffer"); clearBuffer(stream, state); } if (sync) { @@ -4085,7 +4080,6 @@ var require_writable = __commonJS({ } let i = bufferedIndex; state.bufferProcessing = true; - console.log("clearBuffer"); if (bufferedLength > 1 && stream._writev) { state.pendingcb -= bufferedLength - 1; const callback = state.allNoop ? nop : (err) => { @@ -4368,7 +4362,6 @@ var require_writable = __commonJS({ }); var destroy = destroyImpl.destroy; Writable.prototype.destroy = function (err, cb) { - console.log("destroy", new Error()); const state = this._writableState; if ( !state.destroyed && diff --git a/ext/node/polyfills/http.ts b/ext/node/polyfills/http.ts index a1d7c3a40743da..b40a1de75309bc 100644 --- a/ext/node/polyfills/http.ts +++ b/ext/node/polyfills/http.ts @@ -31,6 +31,7 @@ import { parseUniqueHeadersOption, validateHeaderName, } from "ext:deno_node/_http_outgoing.ts"; +import { ok as assert } from "ext:deno_node/assert.ts"; import { kOutHeaders } from "ext:deno_node/internal/http.ts"; import { _checkIsHttpToken as checkIsHttpToken } from "ext:deno_node/_http_common.ts"; import { Agent, globalAgent } from "ext:deno_node/_http_agent.mjs"; @@ -697,18 +698,20 @@ class ClientRequest extends OutgoingMessage { "op_fetch_response_upgrade", res.responseRid, ); + assert(typeof res.remoteAddrIp !== "undefined"); + assert(typeof res.remoteAddrIp !== "undefined"); const conn = new TcpConn( upgradeRid, - // TODO(bartlomieju): figure out actual values { transport: "tcp", - hostname: "127.0.0.1", - port: 90, + hostname: res.remoteAddrIp, + port: res.remoteAddrIp, }, + // TODO(bartlomieju): figure out actual values { transport: "tcp", hostname: "127.0.0.1", - port: 90, + port: 80, }, ); const socket = new Socket({ @@ -717,7 +720,6 @@ class ClientRequest extends OutgoingMessage { this.upgradeOrConnect = true; - // TODO(bartlomieju): handle actual upgrade this.emit("upgrade", incoming, socket, Buffer.from([])); this.destroyed = true; this._closed = true; From e300a8526906e5a41af545f86cda71bf69935e5d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartek=20Iwa=C5=84czuk?= Date: Tue, 13 Jun 2023 02:15:05 +0200 Subject: [PATCH 13/17] remove debug files --- ws.js | 15 --------------- ws1.js | 14 -------------- ws_client.js | 28 ---------------------------- ws_client2.js | 26 -------------------------- 4 files changed, 83 deletions(-) delete mode 100644 ws.js delete mode 100644 ws1.js delete mode 100644 ws_client.js delete mode 100644 ws_client2.js diff --git a/ws.js b/ws.js deleted file mode 100644 index c931f5928565fd..00000000000000 --- a/ws.js +++ /dev/null @@ -1,15 +0,0 @@ -Deno.serve({ port: 7000 }, (req) => { - const { socket, response } = Deno.upgradeWebSocket(req, { - idleTimeout: 0, - }); - let i = 0; - socket.onmessage = (e) => { - if (i == 2) { - socket.close(); - return; - } - i++; - socket.send(e.data); - }; - return response; -}); diff --git a/ws1.js b/ws1.js deleted file mode 100644 index 70eaa72da227aa..00000000000000 --- a/ws1.js +++ /dev/null @@ -1,14 +0,0 @@ -import { WebSocketServer } from "npm:ws"; - -const wss = new WebSocketServer({ port: 7000 }); -console.log("Listening on http://127.0.0.1:7000"); - -wss.on("connection", function connection(ws) { - ws.on("error", console.error); - - ws.on("message", function message(data) { - console.log("received: %s", data); - }); - - ws.send("something"); -}); diff --git a/ws_client.js b/ws_client.js deleted file mode 100644 index 49196d5449445b..00000000000000 --- a/ws_client.js +++ /dev/null @@ -1,28 +0,0 @@ -import WebSocket from "npm:ws"; - -const ws = new WebSocket("ws://127.0.0.1:7000"); - -ws.on("error", console.error); - -ws.on("open", function open() { - ws.send("something"); - let i = 0; - let id; - id = setInterval(() => { - i++; - if (i > 2) { - clearInterval(id); - ws.close(); - return; - } - ws.send("hello " + i); - }, 1000); -}); - -ws.on("message", function message(data) { - console.log("received: %s", data); -}); - -ws.on("close", function close() { - console.log("websocket closed"); -}); diff --git a/ws_client2.js b/ws_client2.js deleted file mode 100644 index f30a6fde5f2eab..00000000000000 --- a/ws_client2.js +++ /dev/null @@ -1,26 +0,0 @@ -const ws = new WebSocket("ws://127.0.0.1:7000"); - -ws.addEventListener("error", console.error); - -ws.addEventListener("open", function open() { - ws.send("something"); - let i = 0; - let id; - id = setInterval(() => { - i++; - if (i > 2) { - clearInterval(id); - ws.close(); - return; - } - ws.send("hello " + i); - }, 1000); -}); - -ws.addEventListener("message", function message(data) { - console.log("received: %s", data); -}); - -ws.addEventListener("close", function close() { - console.log("websocket closed"); -}); From a4c480ccd817d7eddcc141a9415a3eefdd2fe15d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartek=20Iwa=C5=84czuk?= Date: Tue, 13 Jun 2023 02:15:39 +0200 Subject: [PATCH 14/17] Remove debug log --- ext/fetch/lib.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/ext/fetch/lib.rs b/ext/fetch/lib.rs index 87d9206337e12d..4ae5a33183713e 100644 --- a/ext/fetch/lib.rs +++ b/ext/fetch/lib.rs @@ -530,7 +530,6 @@ pub async fn op_fetch_response_upgrade( let (read_rx, write_tx) = tokio::io::split(read); let (mut write_rx, mut read_tx) = tokio::io::split(write); let upgraded = raw_response.response.upgrade().await?; - eprintln!("upgraded the connection!"); { // Stage 3: Pump the data let (mut upgraded_rx, mut upgraded_tx) = tokio::io::split(upgraded); From 545d115536d57e690ebe12b0c9ca56af52a1fcb5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartek=20Iwa=C5=84czuk?= Date: Tue, 13 Jun 2023 02:41:02 +0200 Subject: [PATCH 15/17] add test --- cli/tests/unit_node/http_test.ts | 46 ++++++++++++++++++++++++++++++++ ext/node/polyfills/http.ts | 2 -- 2 files changed, 46 insertions(+), 2 deletions(-) diff --git a/cli/tests/unit_node/http_test.ts b/cli/tests/unit_node/http_test.ts index 8f87b1fd27831e..f2ea2baa1c67bd 100644 --- a/cli/tests/unit_node/http_test.ts +++ b/cli/tests/unit_node/http_test.ts @@ -617,3 +617,49 @@ Deno.test("[node/http] ClientRequest search params", async () => { await def; assertEquals(body, "foo=bar"); }); + +Deno.test( + "[node/http] client upgrade", + { permissions: { net: true } }, + async () => { + const promise = deferred(); + const server = http.createServer((_req, res) => { + res.writeHead(200, { "Content-Type": "text/plain" }); + res.end("okay"); + }); + server.on("upgrade", (_req, socket, _head) => { + socket.write( + "HTTP/1.1 101 Web Socket Protocol Handshake\r\n" + + "Upgrade: WebSocket\r\n" + + "Connection: Upgrade\r\n" + + "\r\n", + ); + }); + + // Now that server is running + server.listen(1337, "127.0.0.1", () => { + // make a request + const options = { + port: 1337, + host: "127.0.0.1", + headers: { + "Connection": "Upgrade", + "Upgrade": "websocket", + }, + }; + + const req = http.request(options); + req.end(); + + req.on("upgrade", (_res, socket, _upgradeHead) => { + socket.end(); + server.close(() => { + promise.resolve(); + }); + }); + }); + + await promise; + await new Promise((resolve) => setTimeout(resolve, 1000)); + }, +); diff --git a/ext/node/polyfills/http.ts b/ext/node/polyfills/http.ts index b40a1de75309bc..9dacd2f6116d7a 100644 --- a/ext/node/polyfills/http.ts +++ b/ext/node/polyfills/http.ts @@ -665,9 +665,7 @@ class ClientRequest extends OutgoingMessage { incoming.statusMessage = res.statusText; incoming.upgrade = null; - // TODO(bartlomieju): use object lookup instead of iterating for (const [key, value] of res.headers) { - console.log("key", key, value); if (key.toLowerCase() === "upgrade") { incoming.upgrade = true; break; From cddece866c40abd59ebd36f02c6590a0f5cb974c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartek=20Iwa=C5=84czuk?= Date: Tue, 13 Jun 2023 02:43:29 +0200 Subject: [PATCH 16/17] fix the test --- cli/tests/unit_node/http_test.ts | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/cli/tests/unit_node/http_test.ts b/cli/tests/unit_node/http_test.ts index f2ea2baa1c67bd..d8e1060d005c82 100644 --- a/cli/tests/unit_node/http_test.ts +++ b/cli/tests/unit_node/http_test.ts @@ -627,6 +627,8 @@ Deno.test( res.writeHead(200, { "Content-Type": "text/plain" }); res.end("okay"); }); + // @ts-ignore it's a socket for real + let serverSocket; server.on("upgrade", (_req, socket, _head) => { socket.write( "HTTP/1.1 101 Web Socket Protocol Handshake\r\n" + @@ -634,6 +636,7 @@ Deno.test( "Connection: Upgrade\r\n" + "\r\n", ); + serverSocket = socket; }); // Now that server is running @@ -653,6 +656,8 @@ Deno.test( req.on("upgrade", (_res, socket, _upgradeHead) => { socket.end(); + // @ts-ignore it's a socket for real + serverSocket!.end(); server.close(() => { promise.resolve(); }); @@ -660,6 +665,5 @@ Deno.test( }); await promise; - await new Promise((resolve) => setTimeout(resolve, 1000)); }, ); From d2897cb17831cfe34f3a30237d31fef82b7b308b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartek=20Iwa=C5=84czuk?= Date: Tue, 13 Jun 2023 02:47:12 +0200 Subject: [PATCH 17/17] lint --- ext/node/polyfills/http.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ext/node/polyfills/http.ts b/ext/node/polyfills/http.ts index 9dacd2f6116d7a..979bda0137aa85 100644 --- a/ext/node/polyfills/http.ts +++ b/ext/node/polyfills/http.ts @@ -665,7 +665,7 @@ class ClientRequest extends OutgoingMessage { incoming.statusMessage = res.statusText; incoming.upgrade = null; - for (const [key, value] of res.headers) { + for (const [key, _value] of res.headers) { if (key.toLowerCase() === "upgrade") { incoming.upgrade = true; break;