diff --git a/Cargo.lock b/Cargo.lock index b3f169927..99e1f3841 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -79,6 +79,12 @@ version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3441f0f7b02788e948e47f457ca01f1d7e6d92c693bc132c22b087d3141c03ff" +[[package]] +name = "base64" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e1b586273c5702936fe7b7d6896644d8be71e6314cfe09d3167c95f712589e8" + [[package]] name = "base64" version = "0.21.3" @@ -807,6 +813,30 @@ version = "0.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2c6201b9ff9fd90a5a3bac2e56a830d0caa509576f0e503818ee82c181b3437a" +[[package]] +name = "headers" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06683b93020a07e3dbcf5f8c0f6d40080d725bea7936fc01ad345c01b97dc270" +dependencies = [ + "base64 0.21.3", + "bytes", + "headers-core", + "http", + "httpdate", + "mime", + "sha1", +] + +[[package]] +name = "headers-core" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7f66481bfee273957b1f20485a4ff3362987f85b2c236580d81b4eb7a326429" +dependencies = [ + "http", +] + [[package]] name = "heck" version = "0.3.3" @@ -878,29 +908,6 @@ dependencies = [ "pin-project-lite", ] -[[package]] -name = "http-body" -version = "1.0.0-rc.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "951dfc2e32ac02d67c90c0d65bd27009a635dc9b381a2cc7d284ab01e3a0150d" -dependencies = [ - "bytes", - "http", -] - -[[package]] -name = "http-body-util" -version = "0.1.0-rc.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "08ef12f041acdd397010e5fb6433270c147d3b8b2d0a840cd7fff8e531dca5c8" -dependencies = [ - "bytes", - "futures-util", - "http", - "http-body 1.0.0-rc.2", - "pin-project-lite", -] - [[package]] name = "httparse" version = "1.8.0" @@ -925,7 +932,7 @@ dependencies = [ "futures-util", "h2", "http", - "http-body 0.4.5", + "http-body", "httparse", "httpdate", "itoa", @@ -937,28 +944,6 @@ dependencies = [ "want", ] -[[package]] -name = "hyper" -version = "1.0.0-rc.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7b75264b2003a3913f118d35c586e535293b3e22e41f074930762929d071e092" -dependencies = [ - "bytes", - "futures-channel", - "futures-core", - "futures-util", - "h2", - "http", - "http-body 1.0.0-rc.2", - "httparse", - "httpdate", - "itoa", - "pin-project-lite", - "tokio", - "tracing", - "want", -] - [[package]] name = "hyper-tls" version = "0.5.0" @@ -966,7 +951,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905" dependencies = [ "bytes", - "hyper 0.14.27", + "hyper", "native-tls", "tokio", "tokio-native-tls", @@ -1197,6 +1182,16 @@ version = "0.3.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" +[[package]] +name = "mime_guess" +version = "2.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4192263c238a5f0d0c6bfd21f336a313a4ce1c450542449ca191bb657b4642ef" +dependencies = [ + "mime", + "unicase", +] + [[package]] name = "miniz_oxide" version = "0.7.1" @@ -1258,6 +1253,24 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "multer" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "01acbdc23469fd8fe07ab135923371d5f5a422fbf9c522158677c8eb15bc51c2" +dependencies = [ + "bytes", + "encoding_rs", + "futures-util", + "http", + "httparse", + "log", + "memchr", + "mime", + "spin 0.9.8", + "version_check", +] + [[package]] name = "native-tls" version = "0.2.11" @@ -1575,6 +1588,26 @@ dependencies = [ "siphasher", ] +[[package]] +name = "pin-project" +version = "1.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fda4ed1c6c173e3fc7a83629421152e01d7b1f9b7f65fb301e490e8cfc656422" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "1.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4359fd9c9171ec6e8c62926d6faaf553a8dc3f64e1507e76da7911b4f6a04405" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.29", +] + [[package]] name = "pin-project-lite" version = "0.2.13" @@ -1756,8 +1789,8 @@ dependencies = [ "futures-util", "h2", "http", - "http-body 0.4.5", - "hyper 0.14.27", + "http-body", + "hyper", "hyper-tls", "ipnet", "js-sys", @@ -1789,7 +1822,7 @@ dependencies = [ "cc", "libc", "once_cell", - "spin", + "spin 0.5.2", "untrusted", "web-sys", "winapi", @@ -1805,7 +1838,7 @@ dependencies = [ "getrandom", "libc", "once_cell", - "spin", + "spin 0.5.2", "untrusted", "web-sys", "winapi", @@ -1850,6 +1883,27 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "rustls" +version = "0.20.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fff78fc74d175294f4e83b28343315ffcfb114b156f0185e9741cb5570f50e2f" +dependencies = [ + "log", + "ring", + "sct", + "webpki", +] + +[[package]] +name = "rustls-pemfile" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2d3987094b1d07b653b7dfdc3f70ce9a1da9c51ac18c1b06b662e4f9a0e9f4b2" +dependencies = [ + "base64 0.21.3", +] + [[package]] name = "rustversion" version = "1.0.14" @@ -1903,6 +1957,12 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "scoped-tls" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1cf6437eb19a8f4a6cc0f7dca544973b0b78843adbfeb3683d1a94a0024a294" + [[package]] name = "scopeguard" version = "1.2.0" @@ -1933,8 +1993,6 @@ dependencies = [ "getrandom", "git-version", "hostname", - "http-body-util", - "hyper 1.0.0-rc.3", "indexmap", "lazy_static", "lexical", @@ -1969,6 +2027,17 @@ dependencies = [ "to-syn-value_derive", "tokio", "walkdir", + "warp", +] + +[[package]] +name = "sct" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d53dcdb7c9f8158937a7981b48accfd39a43af418591a5d008c7b22b5e1b7ca4" +dependencies = [ + "ring", + "untrusted", ] [[package]] @@ -2073,6 +2142,17 @@ dependencies = [ "syn 2.0.29", ] +[[package]] +name = "sha1" +version = "0.10.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f04293dc80c3993519f2d7f6f511707ee7094fe0c6d3406feb330cdb3540eba3" +dependencies = [ + "cfg-if", + "cpufeatures", + "digest 0.10.7", +] + [[package]] name = "sha2" version = "0.10.7" @@ -2184,6 +2264,12 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" +[[package]] +name = "spin" +version = "0.9.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" + [[package]] name = "static_assertions" version = "1.1.0" @@ -2418,6 +2504,40 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-rustls" +version = "0.23.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c43ee83903113e03984cb9e5cebe6c04a5116269e900e3ddba8f068a62adda59" +dependencies = [ + "rustls", + "tokio", + "webpki", +] + +[[package]] +name = "tokio-stream" +version = "0.1.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "397c988d37662c7dda6d2208364a706264bf3d6138b11d436cbac0ad38832842" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", +] + +[[package]] +name = "tokio-tungstenite" +version = "0.18.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "54319c93411147bced34cb5609a80e0a8e44c5999c93903a81cd866630ec0bfd" +dependencies = [ + "futures-util", + "log", + "tokio", + "tungstenite", +] + [[package]] name = "tokio-util" version = "0.7.8" @@ -2445,6 +2565,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8ce8c33a8d48bd45d624a6e523445fd21ec13d3653cd51f681abf67418f54eb8" dependencies = [ "cfg-if", + "log", "pin-project-lite", "tracing-core", ] @@ -2464,12 +2585,40 @@ version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3528ecfd12c466c6f163363caf2d02a71161dd5e1cc6ae7b34207ea2d42d81ed" +[[package]] +name = "tungstenite" +version = "0.18.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30ee6ab729cd4cf0fd55218530c4522ed30b7b6081752839b68fcec8d0960788" +dependencies = [ + "base64 0.13.1", + "byteorder", + "bytes", + "http", + "httparse", + "log", + "rand", + "sha1", + "thiserror", + "url", + "utf-8", +] + [[package]] name = "typenum" version = "1.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "497961ef93d974e23eb6f433eb5fe1b7930b659f06d12dec6fc44a8f554c0bba" +[[package]] +name = "unicase" +version = "2.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7d2d4dafb69621809a81864c9c1b864479e1235c0dd4e199924b9742439ed89" +dependencies = [ + "version_check", +] + [[package]] name = "unicode-bidi" version = "0.3.13" @@ -2572,6 +2721,38 @@ dependencies = [ "try-lock", ] +[[package]] +name = "warp" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba431ef570df1287f7f8b07e376491ad54f84d26ac473489427231e1718e1f69" +dependencies = [ + "bytes", + "futures-channel", + "futures-util", + "headers", + "http", + "hyper", + "log", + "mime", + "mime_guess", + "multer", + "percent-encoding", + "pin-project", + "rustls-pemfile", + "scoped-tls", + "serde", + "serde_json", + "serde_urlencoded", + "tokio", + "tokio-rustls", + "tokio-stream", + "tokio-tungstenite", + "tokio-util", + "tower-service", + "tracing", +] + [[package]] name = "wasi" version = "0.10.0+wasi-snapshot-preview1" @@ -2660,6 +2841,16 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "webpki" +version = "0.22.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f095d78192e208183081cc07bc5515ef55216397af48b873e5edcd72637fa1bd" +dependencies = [ + "ring", + "untrusted", +] + [[package]] name = "winapi" version = "0.3.9" diff --git a/Cargo.toml b/Cargo.toml index 94f589d04..fed3df996 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,7 +18,7 @@ ffi = ["dep:libffi"] repl = ["dep:crossterm", "dep:ctrlc", "dep:rustyline"] hostname = ["dep:hostname"] tls = ["dep:native-tls"] -http = ["dep:hyper", "dep:reqwest"] +http = ["dep:warp", "dep:reqwest"] [build-dependencies] indexmap = "1.0.2" @@ -61,7 +61,6 @@ ryu = "1.0.9" futures = "0.3" libloading = "0.7" derive_deref = "1.1.1" -http-body-util = "0.1.0-rc.2" bytes = "1" dashu = { git = "https://github.com/coasys/dashu.git", version = "0.3.1" } rand = "0.8.5" @@ -73,7 +72,7 @@ crossterm = { version = "0.20.0", optional = true } ctrlc = { version = "3.2.2", optional = true } rustyline = { version = "12.0.0", optional = true } native-tls = { version = "0.2.4", optional = true } -hyper = { version = "=1.0.0-rc.3", features = ["full"], optional = true } +warp = { version = "=0.3.5", features = ["tls"], optional = true } reqwest = { version = "0.11.18", features = ["blocking"], optional = true } tokio = { version = "1.28.2", features = ["full"] } diff --git a/build/instructions_template.rs b/build/instructions_template.rs index 52fcd8b7a..3ade994e0 100644 --- a/build/instructions_template.rs +++ b/build/instructions_template.rs @@ -552,7 +552,7 @@ enum SystemClauseType { DeterministicLengthRundown, #[strum_discriminants(strum(props(Arity = "7", Name = "$http_open")))] HttpOpen, - #[strum_discriminants(strum(props(Arity = "2", Name = "$http_listen")))] + #[strum_discriminants(strum(props(Arity = "5", Name = "$http_listen")))] HttpListen, #[strum_discriminants(strum(props(Arity = "7", Name = "$http_accept")))] HttpAccept, diff --git a/src/http.rs b/src/http.rs index 71d1682cf..c2f5bb159 100644 --- a/src/http.rs +++ b/src/http.rs @@ -1,54 +1,23 @@ use std::sync::{Arc, Mutex, Condvar}; -use std::future::Future; -use std::pin::Pin; -use http_body_util::Full; -use bytes::Bytes; -use hyper::service::Service; -use hyper::{body::Incoming as IncomingBody, Request, Response}; +use std::io::BufRead; + +use warp::http; pub struct HttpListener { pub incoming: std::sync::mpsc::Receiver } -#[derive(Debug)] pub struct HttpRequest { - pub request: Request, + pub request_data: HttpRequestData, pub response: HttpResponse, } -pub type HttpResponse = Arc<(Mutex, Mutex>>>, Condvar)>; - -pub struct HttpService { - pub tx: std::sync::mpsc::SyncSender, -} - -impl Service> for HttpService { - type Response = Response>; - type Error = hyper::Error; - type Future = Pin> + Send>>; - - fn call(&mut self, req: Request) -> Self::Future { - // new connection! - // we send the Request info to Prolog - let response = Arc::new((Mutex::new(false), Mutex::new(None), Condvar::new())); - let http_request = HttpRequest { request: req, response: Arc::clone(&response) }; - self.tx.send(http_request).unwrap(); +pub type HttpResponse = Arc<(Mutex, Mutex>, Condvar)>; - // we wait for the Response info from Prolog - { - let (ready, _response, cvar) = &*response; - let mut ready = ready.lock().unwrap(); - while !*ready { - ready = cvar.wait(ready).unwrap(); - } - } - { - let (_, response, _) = &*response; - let response = response.lock().unwrap().take(); - let res = response.expect("Data race error in HTTP Server"); - Box::pin(async move { - Ok(res) - }) - } - } +pub struct HttpRequestData { + pub method: http::Method, + pub headers: http::HeaderMap, + pub path: String, + pub query: String, + pub body: Box, } diff --git a/src/lib/http/http_server.pl b/src/lib/http/http_server.pl index 381d56077..2ec52b6d3 100644 --- a/src/lib/http/http_server.pl +++ b/src/lib/http/http_server.pl @@ -54,6 +54,7 @@ :- module(http_server, [ http_listen/2, + http_listen/3, http_headers/2, http_status_code/2, http_body/2, @@ -62,6 +63,7 @@ ]). :- meta_predicate http_listen(?, :). +:- meta_predicate http_listen(?, :, ?). :- use_module(library(charsio)). :- use_module(library(crypto)). @@ -74,25 +76,58 @@ %% http_listen(+Port, +Handlers). % +% Equivalent to `http_listen(Port, Handlers, [])`. +http_listen(Port, Module:Handlers0) :- + must_be(integer, Port), + must_be(list, Handlers0), + maplist(module_qualification(Module), Handlers0, Handlers), + http_listen_(Port, Handlers, []). + +%% http_listen(+Port, +Handlers, +Options). +% % Listens for HTTP connections on port Port. Each handler on the list Handlers should be of the form: `HttpVerb(PathUnification, Predicate)`. % For example: `get(user/User, get_info(User))` will match an HTTP request that is a GET, the path unifies with /user/User (where User is a variable) -% and it will call `get_info` with three arguments: an `http_request` term, an `http_response` term and User. -http_listen(Port, Module:Handlers0) :- +% and it will call `get_info` with three arguments: an `http_request` term, an `http_response` term and User. +% +% The following options are supported: +% +% - `tls_key(+Key)` - a TLS key for HTTPS (string) +% - `tls_cert(+Cert)` - a TLS cert for HTTPS (string) +% - `content_length_limit(+Limit)` - maximum length (in bytes) for the incoming bodies. By default, 32KB. +% +% In order to have a HTTPS server (instead of plain HTTP), both `ssl_key` and `ssl_cert` options must be provided. +http_listen(Port, Module:Handlers0, Options) :- must_be(integer, Port), must_be(list, Handlers0), + must_be(list, Options), maplist(module_qualification(Module), Handlers0, Handlers), - http_listen_(Port, Handlers). + http_listen_(Port, Handlers, Options). module_qualification(M, H0, H) :- H0 =.. [Method, Path, Goal], H =.. [Method, Path, M:Goal]. -http_listen_(Port, Handlers) :- +http_listen_(Port, Handlers, Options) :- + parse_options(Options, TLSKey, TLSCert, ContentLengthLimit), phrase(format_("0.0.0.0:~d", [Port]), Addr), - '$http_listen'(Addr, HttpListener),!, + '$http_listen'(Addr, HttpListener, TLSKey, TLSCert, ContentLengthLimit),!, format("Listening at ~s\n", [Addr]), http_loop(HttpListener, Handlers). +parse_options(Options, TLSKey, TLSCert, ContentLengthLimit) :- + member_option_default(tls_key, Options, "", TLSKey), + member_option_default(tls_cert, Options, "", TLSCert), + member_option_default(content_length_limit, Options, 32768, ContentLengthLimit), + must_be(integer, ContentLengthLimit). + +member_option_default(Key, List, _Default, Value) :- + X =.. [Key, Value], + member(X, List). +member_option_default(Key, List, Default, Default) :- + X =.. [Key, _], + \+ member(X, List). + + http_loop(HttpListener, Handlers) :- '$http_accept'(HttpListener, RequestMethod, RequestPath, RequestHeaders, RequestQuery, RequestStream, ResponseHandle), current_time(Time), @@ -114,7 +149,7 @@ ) ; ( '$http_answer'(ResponseHandle, 404, [], ResponseStream), - call_cleanup(format(ResponseStream, "Not Found"), close(ResponseStream))) + call_cleanup(format(ResponseStream, "Not Found", []), close(ResponseStream))) ), http_loop(HttpListener, Handlers). diff --git a/src/machine/streams.rs b/src/machine/streams.rs index f4ce6cbd4..a065de3fa 100644 --- a/src/machine/streams.rs +++ b/src/machine/streams.rs @@ -30,6 +30,9 @@ use std::ptr; #[cfg(feature = "tls")] use native_tls::TlsStream; +#[cfg(feature = "http")] +use warp::hyper; + #[derive(Debug, BitfieldSpecifier, Clone, Copy, PartialEq, Eq, Hash)] #[bits = 1] pub enum StreamType { @@ -286,9 +289,9 @@ impl Read for HttpReadStream { #[cfg(feature = "http")] pub struct HttpWriteStream { status_code: u16, - headers: hyper::HeaderMap, + headers: mem::ManuallyDrop, response: TypedArenaPtr, - buffer: Vec, + buffer: mem::ManuallyDrop>, } #[cfg(feature = "http")] @@ -308,25 +311,33 @@ impl Write for HttpWriteStream { #[inline] fn flush(&mut self) -> std::io::Result<()> { + Ok(()) + } +} + +#[cfg(feature = "http")] +impl HttpWriteStream { + fn drop(&mut self) { + let headers = unsafe { mem::ManuallyDrop::take(&mut self.headers) }; + let buffer = unsafe { mem::ManuallyDrop::take(&mut self.buffer) }; + let (ready, response, cvar) = &**self.response; let mut ready = ready.lock().unwrap(); { let mut response = response.lock().unwrap(); - let bytes = bytes::Bytes::copy_from_slice(&self.buffer); - let mut response_ = hyper::Response::builder() + let mut response_ = warp::http::Response::builder() .status(self.status_code); - *response_.headers_mut().unwrap() = self.headers.clone(); - *response = Some(response_.body(http_body_util::Full::new(bytes)).unwrap()); + *response_.headers_mut().unwrap() = headers; + *response = Some(response_.body(warp::hyper::Body::from(buffer)).unwrap()); } *ready = true; cvar.notify_one(); - - Ok(()) } } + #[derive(Debug)] pub struct StandardOutputStream {} @@ -1243,8 +1254,8 @@ impl Stream { StreamLayout::new(CharReader::new(HttpWriteStream { response, status_code, - headers, - buffer: Vec::new(), + headers: mem::ManuallyDrop::new(headers), + buffer: mem::ManuallyDrop::new(Vec::new()), })), arena )) @@ -1297,7 +1308,8 @@ impl Stream { Ok(()) } #[cfg(feature = "http")] - Stream::HttpWrite(ref mut http_stream) => { + Stream::HttpWrite(ref mut http_stream) => { + http_stream.inner_mut().drop(); unsafe { http_stream.set_tag(ArenaHeaderTag::Dropped); std::ptr::drop_in_place(&mut http_stream.inner_mut().buffer as *mut _); diff --git a/src/machine/system_calls.rs b/src/machine/system_calls.rs index b158d428e..d3d978fd5 100644 --- a/src/machine/system_calls.rs +++ b/src/machine/system_calls.rs @@ -12,7 +12,7 @@ use crate::ffi::*; use crate::heap_iter::*; use crate::heap_print::*; #[cfg(feature = "http")] -use crate::http::{HttpService, HttpListener, HttpResponse}; +use crate::http::{HttpRequestData, HttpListener, HttpResponse, HttpRequest}; use crate::instructions::*; use crate::machine; use crate::machine::{Machine, VERIFY_ATTR_INTERRUPT_LOC, get_structure_index}; @@ -43,14 +43,14 @@ pub(crate) use ref_thread_local::RefThreadLocal; use std::borrow::BorrowMut; use std::cell::Cell; use std::cmp::Ordering; -use std::collections::BTreeSet; +use std::collections::{BTreeSet}; use std::convert::TryFrom; use std::env; #[cfg(feature = "ffi")] use std::ffi::CString; use std::fs; use std::hash::{BuildHasher, BuildHasherDefault}; -use std::io::{ErrorKind, Read, Write}; +use std::io::{ErrorKind, Read, BufRead, Write}; use std::iter::{once, FromIterator}; use std::mem; use std::net::{TcpListener, TcpStream, SocketAddr, ToSocketAddrs}; @@ -58,6 +58,7 @@ use std::num::NonZeroU32; use std::ops::Sub; use std::process; use std::str::FromStr; +use std::sync::{Mutex, Arc, Condvar}; use chrono::{offset::Local, DateTime}; #[cfg(not(target_os = "wasi"))] @@ -88,15 +89,14 @@ use roxmltree; use select; #[cfg(feature = "http")] -use hyper::server::conn::http1; +use warp::hyper::header::{HeaderValue, HeaderName}; #[cfg(feature = "http")] -use hyper::header::{HeaderValue, HeaderName}; +use warp::hyper::{HeaderMap, Method}; #[cfg(feature = "http")] -use hyper::{HeaderMap, Method}; -use http_body_util::BodyExt; -use bytes::Buf; +use warp::{Buf, Filter}; #[cfg(feature = "http")] use reqwest::Url; +use futures::future; #[cfg(feature = "repl")] pub(crate) fn get_key() -> KeyEvent { @@ -4363,6 +4363,44 @@ impl Machine { #[inline(always)] pub(crate) fn http_listen(&mut self) -> CallResult { let address_sink = self.deref_register(1); + let tls_key = self.deref_register(3); + let tls_cert = self.deref_register(4); + let content_length_limit = self.deref_register(5); + const CONTENT_LENGTH_LIMIT_DEFAULT: u64 = 32768; + let content_length_limit = match Number::try_from(content_length_limit) { + Ok(Number::Fixnum(n)) => if n.get_num() >= 0 { + n.get_num() as u64 + } else { + CONTENT_LENGTH_LIMIT_DEFAULT + }, + Ok(Number::Integer(n)) => match n.to_u64() { + Some(u) => u, + None => CONTENT_LENGTH_LIMIT_DEFAULT, + } + _ => CONTENT_LENGTH_LIMIT_DEFAULT, + }; + + let ssl_server: Option<(String,String)> = { + match self.machine_st.value_to_str_like(tls_key) { + Some(key) => { + match self.machine_st.value_to_str_like(tls_cert) { + Some(cert) => { + let key_str = key.as_str(); + let cert_str = cert.as_str(); + + if key_str.is_empty() || cert_str.is_empty() { + None + } else { + Some((key_str.to_string(), cert_str.to_string())) + } + } + None => None + } + } + None => None + } + }; + if let Some(address_str) = self.machine_st.value_to_str_like(address_sink) { let address_string = address_str.as_str(); let addr: SocketAddr = match address_string.to_socket_addrs().ok().and_then(|mut s| s.next()) { @@ -4373,33 +4411,67 @@ impl Machine { } }; - let (tx, rx) = std::sync::mpsc::sync_channel(1024); + let (tx, rx) = std::sync::mpsc::sync_channel(1024); - let _guard = self.runtime.enter(); - let listener = match self.runtime.block_on(async { tokio::net::TcpListener::bind(addr).await }) { - Ok(listener) => listener, - Err(_) => { - return Err(self.machine_st.open_permission_error(address_sink, atom!("http_listen"), 2)); - } - }; + fn get_reader(body: impl Buf + Send + 'static) -> Box { + Box::new(body.reader()) + } - self.runtime.spawn(async move { - loop { - let tx = tx.clone(); - let (stream, _) = listener.accept().await.unwrap(); - - tokio::task::spawn(async move { - if let Err(err) = http1::Builder::new() - .serve_connection(stream, HttpService { - tx - }) - .await - { - eprintln!("Error serving connection: {:?}", err); + let serve = warp::body::aggregate() + .and(warp::header::optional::(warp::http::header::CONTENT_LENGTH.as_str())) + .and(warp::method()) + .and(warp::header::headers_cloned()) + .and(warp::path::full()) + .and(warp::query::raw().or_else(|_| future::ready(Ok::<(String,), warp::Rejection>(("".to_string(),))))) + .map(move |body, content_length, method, headers: warp::http::HeaderMap, path: warp::filters::path::FullPath, query| { + if let Some(content_length) = content_length { + if content_length > content_length_limit { + return warp::http::Response::builder() + .status(413) + .body(warp::hyper::Body::empty()) + .unwrap(); } - }); + } + + let http_request_data = HttpRequestData { + method, + headers, + path: path.as_str().to_string(), + query, + body: get_reader(body), + }; + let response = Arc::new((Mutex::new(false), Mutex::new(None), Condvar::new())); + let http_request = HttpRequest { request_data: http_request_data, response: Arc::clone(&response) }; + // we send the request to http_accept + tx.send(http_request).unwrap(); + + // we wait for the Response info from Prolog + { + let (ready, _response, cvar) = &*response; + let mut ready = ready.lock().unwrap(); + while !*ready { + ready = cvar.wait(ready).unwrap(); + } + } + { + let (_, response, _) = &*response; + let response = response.lock().unwrap().take(); + response.expect("Data race error in HTTP server") + } + }); + + self.runtime.spawn(async move { + match ssl_server { + Some((key, cert)) => { + warp::serve(serve).tls().key(key).cert(cert).run(addr).await + } + None => { + warp::serve(serve).run(addr).await + } } }); + + let http_listener = HttpListener { incoming: rx }; let http_listener = arena_alloc!(http_listener, &mut self.machine_st.arena); let addr = self.deref_register(2); @@ -4423,18 +4495,21 @@ impl Machine { (ArenaHeaderTag::HttpListener, http_listener) => { match http_listener.incoming.recv() { Ok(request) => { - let method_atom = match *request.request.method() { + let method_atom = match request.request_data.method { Method::GET => atom!("get"), Method::POST => atom!("post"), Method::PUT => atom!("put"), Method::DELETE => atom!("delete"), Method::PATCH => atom!("patch"), Method::HEAD => atom!("head"), - _ => unreachable!(), + Method::OPTIONS => atom!("options"), + Method::TRACE => atom!("trace"), + Method::CONNECT => atom!("connect"), + _ => atom!("unsupported_extension"), }; - let path_atom = self.machine_st.atom_tbl.build_with(request.request.uri().path()); + let path_atom = self.machine_st.atom_tbl.build_with(&request.request_data.path); let path_cell = atom_as_cstr_cell!(path_atom); - let headers: Vec = request.request.headers().iter().map(|(header_name, header_value)| { + let headers: Vec = request.request_data.headers.iter().map(|(header_name, header_value)| { let h = self.machine_st.heap.len(); let header_term = functor!( @@ -4448,17 +4523,13 @@ impl Machine { let headers_list = iter_to_heap_list(&mut self.machine_st.heap, headers.into_iter()); - let query_str = request.request.uri().query().unwrap_or(""); - let query_atom = self.machine_st.atom_tbl.build_with(query_str); + let query_str = request.request_data.query; + let query_atom = self.machine_st.atom_tbl.build_with(&query_str); let query_cell = string_as_cstr_cell!(query_atom); - let hyper_req = request.request; - let buf = self.runtime.block_on(async {hyper_req.collect().await.unwrap().aggregate()}); - let reader = buf.reader(); - let mut stream = Stream::from_http_stream( path_atom, - Box::new(reader), + request.request_data.body, &mut self.machine_st.arena ); *stream.options_mut() = StreamOptions::default();