Skip to content

Commit

Permalink
Replace Hyper with Warp for HTTP server
Browse files Browse the repository at this point in the history
- Use Warp
- Optimize clones
- HTTPS server
- Content-Length limit
- HTTP Basic Auth
  • Loading branch information
aarroyoc committed Sep 23, 2023
1 parent 1c8cd85 commit 0bd94fa
Show file tree
Hide file tree
Showing 8 changed files with 633 additions and 321 deletions.
472 changes: 328 additions & 144 deletions Cargo.lock

Large diffs are not rendered by default.

5 changes: 2 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ ffi = ["dep:libffi"]
repl = ["dep:crossterm", "dep:ctrlc", "dep:rustyline"]
hostname = ["dep:hostname"]
tls = ["dep:native-tls"]
http = ["dep:hyper", "dep:reqwest"]
http = ["dep:warp", "dep:reqwest"]
rust_beta_channel = []
crypto-full = []

Expand Down Expand Up @@ -66,7 +66,6 @@ ryu = "1.0.9"
futures = "0.3"
libloading = "0.7"
derive_deref = "1.1.1"
http-body-util = "0.1.0-rc.2"
bytes = "1"
dashu = "0.4.0"
num-order = { version = "1.2.0" }
Expand All @@ -79,7 +78,7 @@ crossterm = { version = "0.20.0", optional = true }
ctrlc = { version = "3.2.2", optional = true }
rustyline = { version = "12.0.0", optional = true }
native-tls = { version = "0.2.4", optional = true }
hyper = { version = "=1.0.0-rc.3", features = ["full"], optional = true }
warp = { version = "=0.3.5", features = ["tls"], optional = true }
reqwest = { version = "0.11.18", features = ["blocking"], optional = true }
tokio = { version = "1.28.2", features = ["full"] }

Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -624,7 +624,7 @@ The modules that ship with Scryer Prolog are also called
Probabilistic predicates and random number generators.
* [`http/http_open`](src/lib/http/http_open.pl) Open a stream to
read answers from web servers. HTTPS is also supported.
* [`http/http_server`](src/lib/http/http_server.pl) Runs a HTTP/1.1 and HTTP/2.0 web server. Uses [Hyper](https://hyper.rs) as a backend. Supports some query and form handling.
* [`http/http_server`](src/lib/http/http_server.pl) Runs a HTTP/1.1 and HTTP/2.0 web server. Uses [Warp](https://github.com/seanmonstar/warp) as a backend. Supports some query and form handling.
* [`sgml`](src/lib/sgml.pl)
`load_html/3` and `load_xml/3` represent HTML and XML documents
as Prolog terms for convenient and efficient reasoning. Use
Expand Down
2 changes: 1 addition & 1 deletion build/instructions_template.rs
Original file line number Diff line number Diff line change
Expand Up @@ -558,7 +558,7 @@ enum SystemClauseType {
DeterministicLengthRundown,
#[strum_discriminants(strum(props(Arity = "7", Name = "$http_open")))]
HttpOpen,
#[strum_discriminants(strum(props(Arity = "2", Name = "$http_listen")))]
#[strum_discriminants(strum(props(Arity = "5", Name = "$http_listen")))]
HttpListen,
#[strum_discriminants(strum(props(Arity = "7", Name = "$http_accept")))]
HttpAccept,
Expand Down
56 changes: 12 additions & 44 deletions src/http.rs
Original file line number Diff line number Diff line change
@@ -1,55 +1,23 @@
use bytes::Bytes;
use http_body_util::Full;
use hyper::service::Service;
use hyper::{body::Incoming as IncomingBody, Request, Response};
use std::future::Future;
use std::pin::Pin;
use std::sync::{Arc, Condvar, Mutex};
use std::sync::{Arc, Mutex, Condvar};
use std::io::BufRead;

use warp::http;

pub struct HttpListener {
pub incoming: std::sync::mpsc::Receiver<HttpRequest>,
}

#[derive(Debug)]
pub struct HttpRequest {
pub request: Request<IncomingBody>,
pub request_data: HttpRequestData,
pub response: HttpResponse,
}

pub type HttpResponse = Arc<(Mutex<bool>, Mutex<Option<Response<Full<Bytes>>>>, Condvar)>;

pub struct HttpService {
pub tx: std::sync::mpsc::SyncSender<HttpRequest>,
}

impl Service<Request<IncomingBody>> for HttpService {
type Response = Response<Full<Bytes>>;
type Error = hyper::Error;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;

fn call(&mut self, req: Request<IncomingBody>) -> Self::Future {
// new connection!
// we send the Request info to Prolog
let response = Arc::new((Mutex::new(false), Mutex::new(None), Condvar::new()));
let http_request = HttpRequest {
request: req,
response: Arc::clone(&response),
};
self.tx.send(http_request).unwrap();
pub type HttpResponse = Arc<(Mutex<bool>, Mutex<Option<warp::reply::Response>>, Condvar)>;

// we wait for the Response info from Prolog
{
let (ready, _response, cvar) = &*response;
let mut ready = ready.lock().unwrap();
while !*ready {
ready = cvar.wait(ready).unwrap();
}
}
{
let (_, response, _) = &*response;
let response = response.lock().unwrap().take();
let res = response.expect("Data race error in HTTP Server");
Box::pin(async move { Ok(res) })
}
}
pub struct HttpRequestData {
pub method: http::Method,
pub headers: http::HeaderMap,
pub path: String,
pub query: String,
pub body: Box<dyn BufRead + Send>,
}
103 changes: 93 additions & 10 deletions src/lib/http/http_server.pl
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
*/

/** This library provides an starting point to build HTTP server based applications.
It is based on [Hyper](https://hyper.rs/), which allows for HTTP/1.0, HTTP/1.1 and HTTP/2. However,
some advanced features that Hyper provides are still not accesible.
It is based on [Warp](https://github.com/seanmonstar/warp), which allows for HTTP/1.0, HTTP/1.1 and HTTP/2. However,
some advanced features that Warp provides are still not accesible.
## Usage
Expand Down Expand Up @@ -46,22 +46,26 @@
Some things that are still missing:
- Read forms in multipart format
- HTTP Basic Auth
- Session handling via cookies
- HTML Templating (but you can use [Teruel](https://github.com/aarroyoc/teruel/), [Marquete](https://github.com/aarroyoc/marquete/) or [Djota](https://github.com/aarroyoc/djota) for that)
*/


:- module(http_server, [
http_listen/2,
http_listen/3,
http_headers/2,
http_status_code/2,
http_body/2,
http_redirect/2,
http_query/3
http_query/3,
http_basic_auth/4
]).

:- meta_predicate http_listen(?, :).
:- meta_predicate http_listen(?, :, ?).

:- meta_predicate http_basic_auth(:, :, ?, ?).

:- use_module(library(charsio)).
:- use_module(library(crypto)).
Expand All @@ -74,25 +78,58 @@

%% http_listen(+Port, +Handlers).
%
% Equivalent to `http_listen(Port, Handlers, [])`.
http_listen(Port, Module:Handlers0) :-
must_be(integer, Port),
must_be(list, Handlers0),
maplist(module_qualification(Module), Handlers0, Handlers),
http_listen_(Port, Handlers, []).

%% http_listen(+Port, +Handlers, +Options).
%
% Listens for HTTP connections on port Port. Each handler on the list Handlers should be of the form: `HttpVerb(PathUnification, Predicate)`.
% For example: `get(user/User, get_info(User))` will match an HTTP request that is a GET, the path unifies with /user/User (where User is a variable)
% and it will call `get_info` with three arguments: an `http_request` term, an `http_response` term and User.
http_listen(Port, Module:Handlers0) :-
% and it will call `get_info` with three arguments: an `http_request` term, an `http_response` term and User.
%
% The following options are supported:
%
% - `tls_key(+Key)` - a TLS key for HTTPS (string)
% - `tls_cert(+Cert)` - a TLS cert for HTTPS (string)
% - `content_length_limit(+Limit)` - maximum length (in bytes) for the incoming bodies. By default, 32KB.
%
% In order to have a HTTPS server (instead of plain HTTP), both `tls_key` and `tls_cert` options must be provided.
http_listen(Port, Module:Handlers0, Options) :-
must_be(integer, Port),
must_be(list, Handlers0),
must_be(list, Options),
maplist(module_qualification(Module), Handlers0, Handlers),
http_listen_(Port, Handlers).
http_listen_(Port, Handlers, Options).

module_qualification(M, H0, H) :-
H0 =.. [Method, Path, Goal],
H =.. [Method, Path, M:Goal].

http_listen_(Port, Handlers) :-
http_listen_(Port, Handlers, Options) :-
parse_options(Options, TLSKey, TLSCert, ContentLengthLimit),
phrase(format_("0.0.0.0:~d", [Port]), Addr),
'$http_listen'(Addr, HttpListener),!,
'$http_listen'(Addr, HttpListener, TLSKey, TLSCert, ContentLengthLimit),!,
format("Listening at ~s\n", [Addr]),
http_loop(HttpListener, Handlers).

parse_options(Options, TLSKey, TLSCert, ContentLengthLimit) :-
member_option_default(tls_key, Options, "", TLSKey),
member_option_default(tls_cert, Options, "", TLSCert),
member_option_default(content_length_limit, Options, 32768, ContentLengthLimit),
must_be(integer, ContentLengthLimit).

member_option_default(Key, List, _Default, Value) :-
X =.. [Key, Value],
member(X, List).
member_option_default(Key, List, Default, Default) :-
X =.. [Key, _],
\+ member(X, List).


http_loop(HttpListener, Handlers) :-
'$http_accept'(HttpListener, RequestMethod, RequestPath, RequestHeaders, RequestQuery, RequestStream, ResponseHandle),
current_time(Time),
Expand All @@ -114,7 +151,7 @@
)
; (
'$http_answer'(ResponseHandle, 404, [], ResponseStream),
call_cleanup(format(ResponseStream, "Not Found"), close(ResponseStream)))
call_cleanup(format(ResponseStream, "Not Found", []), close(ResponseStream)))
),
http_loop(HttpListener, Handlers).

Expand Down Expand Up @@ -352,3 +389,49 @@
url_decode(Chars).

url_decode([]) --> [].

%% http_basic_auth(+LoginPredicate, +Handler, +Request, -Response)
%
% Metapredicate that wraps an existing Handler with an HTTP Basic Auth flow.
% Checks if a given user + password is authorized to execute that handler, returning 401
% if it's not satisfied.
%
% `LoginPredicate` must be a predicate of arity 2 that takes a User and a Password.
% `Handler` will have, in addition to the Request and Response arguments, a User argument
% containing the User given in the authentication.
%
% Example:
%
% ```
% main :-
% http_listen(8800,[get('/', http_basic_auth(login, inside_handler("data")))]).
%
% login(User, Pass) :-
% User = "aarroyoc",
% Pass = "123456".
%
% inside_handler(Data, User, Request, Response) :-
% http_body(Response, text(User)).
% ```
http_basic_auth(LoginPredicate, Handler, Request, Response) :-
http_headers(Request, Headers),
member("authorization"-AuthorizationStr, Headers),
append("Basic ", Coded, AuthorizationStr),
chars_base64(UserPass, Coded, []),
append(User, [':'|Password], UserPass),
(
call(LoginPredicate, User, Password) ->
call(Handler, User, Request, Response)
; http_basic_auth_unauthorized_response(Response)
).

http_basic_auth(_LoginPredicate, _Handler, Request, Response) :-
http_headers(Request, Headers),
\+ member("authorization"-_, Headers),
http_basic_auth_unauthorized_response(Response).

http_basic_auth_unauthorized_response(Response) :-
http_status_code(Response, 401),
http_headers(Response, ["www-authenticate"-"Basic realm=\"Scryer Prolog\", charset=\"UTF-8\""]),
http_body(Response, text("Unauthorized")).

65 changes: 39 additions & 26 deletions src/machine/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ use std::ptr;
#[cfg(feature = "tls")]
use native_tls::TlsStream;

#[cfg(feature = "http")]
use warp::hyper;

#[derive(Debug, BitfieldSpecifier, Clone, Copy, PartialEq, Eq, Hash)]
#[bits = 1]
pub enum StreamType {
Expand Down Expand Up @@ -290,9 +293,9 @@ impl Read for HttpReadStream {
#[cfg(feature = "http")]
pub struct HttpWriteStream {
status_code: u16,
headers: hyper::HeaderMap,
headers: mem::ManuallyDrop<hyper::HeaderMap>,
response: TypedArenaPtr<HttpResponse>,
buffer: Vec<u8>,
buffer: mem::ManuallyDrop<Vec<u8>>,
}

#[cfg(feature = "http")]
Expand All @@ -312,24 +315,33 @@ impl Write for HttpWriteStream {

#[inline]
fn flush(&mut self) -> std::io::Result<()> {
let (ready, response, cvar) = &**self.response;

let mut ready = ready.lock().unwrap();
{
let mut response = response.lock().unwrap();

let bytes = bytes::Bytes::copy_from_slice(&self.buffer);
let mut response_ = hyper::Response::builder().status(self.status_code);
*response_.headers_mut().unwrap() = self.headers.clone();
*response = Some(response_.body(http_body_util::Full::new(bytes)).unwrap());
}
*ready = true;
cvar.notify_one();
Ok(())
}
}

Ok(())
#[cfg(feature = "http")]
impl HttpWriteStream {
fn drop(&mut self) {
let headers = unsafe { mem::ManuallyDrop::take(&mut self.headers) };
let buffer = unsafe { mem::ManuallyDrop::take(&mut self.buffer) };

let (ready, response, cvar) = &**self.response;

let mut ready = ready.lock().unwrap();
{
let mut response = response.lock().unwrap();

let mut response_ = warp::http::Response::builder()
.status(self.status_code);
*response_.headers_mut().unwrap() = headers;
*response = Some(response_.body(warp::hyper::Body::from(buffer)).unwrap());
}
*ready = true;
cvar.notify_one();
}
}


#[derive(Debug)]
pub struct StandardOutputStream {}

Expand Down Expand Up @@ -1243,15 +1255,15 @@ impl Stream {
headers: hyper::HeaderMap,
arena: &mut Arena,
) -> Self {
Stream::HttpWrite(arena_alloc!(
StreamLayout::new(CharReader::new(HttpWriteStream {
response,
status_code,
headers,
buffer: Vec::new(),
})),
arena
))
Stream::HttpWrite(arena_alloc!(
StreamLayout::new(CharReader::new(HttpWriteStream {
response,
status_code,
headers: mem::ManuallyDrop::new(headers),
buffer: mem::ManuallyDrop::new(Vec::new()),
})),
arena
))
}

#[inline]
Expand Down Expand Up @@ -1299,7 +1311,8 @@ impl Stream {
Ok(())
}
#[cfg(feature = "http")]
Stream::HttpWrite(ref mut http_stream) => {
Stream::HttpWrite(ref mut http_stream) => {
http_stream.inner_mut().drop();
unsafe {
http_stream.set_tag(ArenaHeaderTag::Dropped);
std::ptr::drop_in_place(&mut http_stream.inner_mut().buffer as *mut _);
Expand Down
Loading

0 comments on commit 0bd94fa

Please sign in to comment.