Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace Hyper with Warp for HTTP server #1998

Merged
merged 1 commit into from
Sep 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
472 changes: 328 additions & 144 deletions Cargo.lock

Large diffs are not rendered by default.

5 changes: 2 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ ffi = ["dep:libffi"]
repl = ["dep:crossterm", "dep:ctrlc", "dep:rustyline"]
hostname = ["dep:hostname"]
tls = ["dep:native-tls"]
http = ["dep:hyper", "dep:reqwest"]
http = ["dep:warp", "dep:reqwest"]
rust_beta_channel = []
crypto-full = []

Expand Down Expand Up @@ -66,7 +66,6 @@ ryu = "1.0.9"
futures = "0.3"
libloading = "0.7"
derive_deref = "1.1.1"
http-body-util = "0.1.0-rc.2"
bytes = "1"
dashu = "0.4.0"
num-order = { version = "1.2.0" }
Expand All @@ -79,7 +78,7 @@ crossterm = { version = "0.20.0", optional = true }
ctrlc = { version = "3.2.2", optional = true }
rustyline = { version = "12.0.0", optional = true }
native-tls = { version = "0.2.4", optional = true }
hyper = { version = "=1.0.0-rc.3", features = ["full"], optional = true }
warp = { version = "=0.3.5", features = ["tls"], optional = true }
reqwest = { version = "0.11.18", features = ["blocking"], optional = true }
tokio = { version = "1.28.2", features = ["full"] }

Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -624,7 +624,7 @@ The modules that ship with Scryer Prolog are also called
Probabilistic predicates and random number generators.
* [`http/http_open`](src/lib/http/http_open.pl) Open a stream to
read answers from web servers. HTTPS is also supported.
* [`http/http_server`](src/lib/http/http_server.pl) Runs a HTTP/1.1 and HTTP/2.0 web server. Uses [Hyper](https://hyper.rs) as a backend. Supports some query and form handling.
* [`http/http_server`](src/lib/http/http_server.pl) Runs a HTTP/1.1 and HTTP/2.0 web server. Uses [Warp](https://github.com/seanmonstar/warp) as a backend. Supports some query and form handling.
* [`sgml`](src/lib/sgml.pl)
`load_html/3` and `load_xml/3` represent HTML and XML documents
as Prolog terms for convenient and efficient reasoning. Use
Expand Down
2 changes: 1 addition & 1 deletion build/instructions_template.rs
Original file line number Diff line number Diff line change
Expand Up @@ -558,7 +558,7 @@ enum SystemClauseType {
DeterministicLengthRundown,
#[strum_discriminants(strum(props(Arity = "7", Name = "$http_open")))]
HttpOpen,
#[strum_discriminants(strum(props(Arity = "2", Name = "$http_listen")))]
#[strum_discriminants(strum(props(Arity = "5", Name = "$http_listen")))]
HttpListen,
#[strum_discriminants(strum(props(Arity = "7", Name = "$http_accept")))]
HttpAccept,
Expand Down
56 changes: 12 additions & 44 deletions src/http.rs
Original file line number Diff line number Diff line change
@@ -1,55 +1,23 @@
use bytes::Bytes;
use http_body_util::Full;
use hyper::service::Service;
use hyper::{body::Incoming as IncomingBody, Request, Response};
use std::future::Future;
use std::pin::Pin;
use std::sync::{Arc, Condvar, Mutex};
use std::sync::{Arc, Mutex, Condvar};
use std::io::BufRead;

use warp::http;

pub struct HttpListener {
pub incoming: std::sync::mpsc::Receiver<HttpRequest>,
}

#[derive(Debug)]
pub struct HttpRequest {
pub request: Request<IncomingBody>,
pub request_data: HttpRequestData,
pub response: HttpResponse,
}

pub type HttpResponse = Arc<(Mutex<bool>, Mutex<Option<Response<Full<Bytes>>>>, Condvar)>;

pub struct HttpService {
pub tx: std::sync::mpsc::SyncSender<HttpRequest>,
}

impl Service<Request<IncomingBody>> for HttpService {
type Response = Response<Full<Bytes>>;
type Error = hyper::Error;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;

fn call(&mut self, req: Request<IncomingBody>) -> Self::Future {
// new connection!
// we send the Request info to Prolog
let response = Arc::new((Mutex::new(false), Mutex::new(None), Condvar::new()));
let http_request = HttpRequest {
request: req,
response: Arc::clone(&response),
};
self.tx.send(http_request).unwrap();
pub type HttpResponse = Arc<(Mutex<bool>, Mutex<Option<warp::reply::Response>>, Condvar)>;

// we wait for the Response info from Prolog
{
let (ready, _response, cvar) = &*response;
let mut ready = ready.lock().unwrap();
while !*ready {
ready = cvar.wait(ready).unwrap();
}
}
{
let (_, response, _) = &*response;
let response = response.lock().unwrap().take();
let res = response.expect("Data race error in HTTP Server");
Box::pin(async move { Ok(res) })
}
}
pub struct HttpRequestData {
pub method: http::Method,
pub headers: http::HeaderMap,
pub path: String,
pub query: String,
pub body: Box<dyn BufRead + Send>,
}
103 changes: 93 additions & 10 deletions src/lib/http/http_server.pl
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
*/

/** This library provides an starting point to build HTTP server based applications.
It is based on [Hyper](https://hyper.rs/), which allows for HTTP/1.0, HTTP/1.1 and HTTP/2. However,
some advanced features that Hyper provides are still not accesible.
It is based on [Warp](https://github.com/seanmonstar/warp), which allows for HTTP/1.0, HTTP/1.1 and HTTP/2. However,
some advanced features that Warp provides are still not accesible.

## Usage

Expand Down Expand Up @@ -46,22 +46,26 @@
Some things that are still missing:

- Read forms in multipart format
- HTTP Basic Auth
- Session handling via cookies
- HTML Templating (but you can use [Teruel](https://github.com/aarroyoc/teruel/), [Marquete](https://github.com/aarroyoc/marquete/) or [Djota](https://github.com/aarroyoc/djota) for that)
*/


:- module(http_server, [
http_listen/2,
http_listen/3,
http_headers/2,
http_status_code/2,
http_body/2,
http_redirect/2,
http_query/3
http_query/3,
http_basic_auth/4
]).

:- meta_predicate http_listen(?, :).
:- meta_predicate http_listen(?, :, ?).

:- meta_predicate http_basic_auth(:, :, ?, ?).

:- use_module(library(charsio)).
:- use_module(library(crypto)).
Expand All @@ -74,25 +78,58 @@

%% http_listen(+Port, +Handlers).
%
% Equivalent to `http_listen(Port, Handlers, [])`.
http_listen(Port, Module:Handlers0) :-
must_be(integer, Port),
must_be(list, Handlers0),
maplist(module_qualification(Module), Handlers0, Handlers),
http_listen_(Port, Handlers, []).

%% http_listen(+Port, +Handlers, +Options).
%
% Listens for HTTP connections on port Port. Each handler on the list Handlers should be of the form: `HttpVerb(PathUnification, Predicate)`.
% For example: `get(user/User, get_info(User))` will match an HTTP request that is a GET, the path unifies with /user/User (where User is a variable)
% and it will call `get_info` with three arguments: an `http_request` term, an `http_response` term and User.
http_listen(Port, Module:Handlers0) :-
% and it will call `get_info` with three arguments: an `http_request` term, an `http_response` term and User.
%
% The following options are supported:
%
% - `tls_key(+Key)` - a TLS key for HTTPS (string)
% - `tls_cert(+Cert)` - a TLS cert for HTTPS (string)
% - `content_length_limit(+Limit)` - maximum length (in bytes) for the incoming bodies. By default, 32KB.
%
% In order to have a HTTPS server (instead of plain HTTP), both `tls_key` and `tls_cert` options must be provided.
http_listen(Port, Module:Handlers0, Options) :-
must_be(integer, Port),
must_be(list, Handlers0),
must_be(list, Options),
maplist(module_qualification(Module), Handlers0, Handlers),
http_listen_(Port, Handlers).
http_listen_(Port, Handlers, Options).

module_qualification(M, H0, H) :-
H0 =.. [Method, Path, Goal],
H =.. [Method, Path, M:Goal].

http_listen_(Port, Handlers) :-
http_listen_(Port, Handlers, Options) :-
parse_options(Options, TLSKey, TLSCert, ContentLengthLimit),
phrase(format_("0.0.0.0:~d", [Port]), Addr),
'$http_listen'(Addr, HttpListener),!,
'$http_listen'(Addr, HttpListener, TLSKey, TLSCert, ContentLengthLimit),!,
format("Listening at ~s\n", [Addr]),
http_loop(HttpListener, Handlers).

parse_options(Options, TLSKey, TLSCert, ContentLengthLimit) :-
member_option_default(tls_key, Options, "", TLSKey),
member_option_default(tls_cert, Options, "", TLSCert),
member_option_default(content_length_limit, Options, 32768, ContentLengthLimit),
must_be(integer, ContentLengthLimit).

member_option_default(Key, List, _Default, Value) :-
X =.. [Key, Value],
member(X, List).
member_option_default(Key, List, Default, Default) :-
X =.. [Key, _],
\+ member(X, List).


http_loop(HttpListener, Handlers) :-
'$http_accept'(HttpListener, RequestMethod, RequestPath, RequestHeaders, RequestQuery, RequestStream, ResponseHandle),
current_time(Time),
Expand All @@ -114,7 +151,7 @@
)
; (
'$http_answer'(ResponseHandle, 404, [], ResponseStream),
call_cleanup(format(ResponseStream, "Not Found"), close(ResponseStream)))
call_cleanup(format(ResponseStream, "Not Found", []), close(ResponseStream)))
),
http_loop(HttpListener, Handlers).

Expand Down Expand Up @@ -352,3 +389,49 @@
url_decode(Chars).

url_decode([]) --> [].

%% http_basic_auth(+LoginPredicate, +Handler, +Request, -Response)
%
% Metapredicate that wraps an existing Handler with an HTTP Basic Auth flow.
% Checks if a given user + password is authorized to execute that handler, returning 401
% if it's not satisfied.
%
% `LoginPredicate` must be a predicate of arity 2 that takes a User and a Password.
% `Handler` will have, in addition to the Request and Response arguments, a User argument
% containing the User given in the authentication.
%
% Example:
%
% ```
% main :-
% http_listen(8800,[get('/', http_basic_auth(login, inside_handler("data")))]).
%
% login(User, Pass) :-
% User = "aarroyoc",
% Pass = "123456".
%
% inside_handler(Data, User, Request, Response) :-
% http_body(Response, text(User)).
% ```
http_basic_auth(LoginPredicate, Handler, Request, Response) :-
http_headers(Request, Headers),
member("authorization"-AuthorizationStr, Headers),
append("Basic ", Coded, AuthorizationStr),
chars_base64(UserPass, Coded, []),
append(User, [':'|Password], UserPass),
(
call(LoginPredicate, User, Password) ->
call(Handler, User, Request, Response)
; http_basic_auth_unauthorized_response(Response)
).

http_basic_auth(_LoginPredicate, _Handler, Request, Response) :-
http_headers(Request, Headers),
\+ member("authorization"-_, Headers),
http_basic_auth_unauthorized_response(Response).

http_basic_auth_unauthorized_response(Response) :-
http_status_code(Response, 401),
http_headers(Response, ["www-authenticate"-"Basic realm=\"Scryer Prolog\", charset=\"UTF-8\""]),
http_body(Response, text("Unauthorized")).

11 changes: 11 additions & 0 deletions src/machine/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5499,6 +5499,17 @@ impl Machine {
if interruption {
self.machine_st.throw_interrupt_exception();
self.machine_st.backtrack();

#[cfg(not(target_arch = "wasm32"))]
let runtime = tokio::runtime::Runtime::new().unwrap();
#[cfg(target_arch = "wasm32")]
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();

let old_runtime = std::mem::replace(&mut self.runtime, runtime);
old_runtime.shutdown_background();
}
}
Err(_) => unreachable!(),
Expand Down
65 changes: 39 additions & 26 deletions src/machine/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ use std::ptr;
#[cfg(feature = "tls")]
use native_tls::TlsStream;

#[cfg(feature = "http")]
use warp::hyper;

#[derive(Debug, BitfieldSpecifier, Clone, Copy, PartialEq, Eq, Hash)]
#[bits = 1]
pub enum StreamType {
Expand Down Expand Up @@ -290,9 +293,9 @@ impl Read for HttpReadStream {
#[cfg(feature = "http")]
pub struct HttpWriteStream {
status_code: u16,
headers: hyper::HeaderMap,
headers: mem::ManuallyDrop<hyper::HeaderMap>,
response: TypedArenaPtr<HttpResponse>,
buffer: Vec<u8>,
buffer: mem::ManuallyDrop<Vec<u8>>,
}

#[cfg(feature = "http")]
Expand All @@ -312,24 +315,33 @@ impl Write for HttpWriteStream {

#[inline]
fn flush(&mut self) -> std::io::Result<()> {
let (ready, response, cvar) = &**self.response;

let mut ready = ready.lock().unwrap();
{
let mut response = response.lock().unwrap();

let bytes = bytes::Bytes::copy_from_slice(&self.buffer);
let mut response_ = hyper::Response::builder().status(self.status_code);
*response_.headers_mut().unwrap() = self.headers.clone();
*response = Some(response_.body(http_body_util::Full::new(bytes)).unwrap());
}
*ready = true;
cvar.notify_one();
Ok(())
}
}

Ok(())
#[cfg(feature = "http")]
impl HttpWriteStream {
fn drop(&mut self) {
let headers = unsafe { mem::ManuallyDrop::take(&mut self.headers) };
let buffer = unsafe { mem::ManuallyDrop::take(&mut self.buffer) };

let (ready, response, cvar) = &**self.response;

let mut ready = ready.lock().unwrap();
{
let mut response = response.lock().unwrap();

let mut response_ = warp::http::Response::builder()
.status(self.status_code);
*response_.headers_mut().unwrap() = headers;
*response = Some(response_.body(warp::hyper::Body::from(buffer)).unwrap());
}
*ready = true;
cvar.notify_one();
}
}


#[derive(Debug)]
pub struct StandardOutputStream {}

Expand Down Expand Up @@ -1243,15 +1255,15 @@ impl Stream {
headers: hyper::HeaderMap,
arena: &mut Arena,
) -> Self {
Stream::HttpWrite(arena_alloc!(
StreamLayout::new(CharReader::new(HttpWriteStream {
response,
status_code,
headers,
buffer: Vec::new(),
})),
arena
))
Stream::HttpWrite(arena_alloc!(
StreamLayout::new(CharReader::new(HttpWriteStream {
response,
status_code,
headers: mem::ManuallyDrop::new(headers),
buffer: mem::ManuallyDrop::new(Vec::new()),
})),
arena
))
}

#[inline]
Expand Down Expand Up @@ -1299,7 +1311,8 @@ impl Stream {
Ok(())
}
#[cfg(feature = "http")]
Stream::HttpWrite(ref mut http_stream) => {
Stream::HttpWrite(ref mut http_stream) => {
http_stream.inner_mut().drop();
unsafe {
http_stream.set_tag(ArenaHeaderTag::Dropped);
std::ptr::drop_in_place(&mut http_stream.inner_mut().buffer as *mut _);
Expand Down
Loading
Loading