Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HTTP registry implementation #10470

Merged
merged 1 commit into from
Mar 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
163 changes: 162 additions & 1 deletion crates/cargo-test-support/src/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ use std::collections::BTreeMap;
use std::fmt::Write as _;
use std::fs::{self, File};
use std::io::{BufRead, BufReader, Write};
use std::net::TcpListener;
use std::net::{SocketAddr, TcpListener};
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread;
use tar::{Builder, Header};
use url::Url;
Expand Down Expand Up @@ -368,6 +370,165 @@ pub fn alt_init() {
RegistryBuilder::new().alternative(true).build();
}

pub struct RegistryServer {
done: Arc<AtomicBool>,
server: Option<thread::JoinHandle<()>>,
addr: SocketAddr,
}

impl RegistryServer {
pub fn addr(&self) -> SocketAddr {
self.addr
}
}

impl Drop for RegistryServer {
fn drop(&mut self) {
self.done.store(true, Ordering::SeqCst);
// NOTE: we can't actually await the server since it's blocked in accept()
let _ = self.server.take();
}
}

#[must_use]
pub fn serve_registry(registry_path: PathBuf) -> RegistryServer {
let listener = TcpListener::bind("127.0.0.1:0").unwrap();
let addr = listener.local_addr().unwrap();
let done = Arc::new(AtomicBool::new(false));
let done2 = done.clone();

let t = thread::spawn(move || {
let mut line = String::new();
'server: while !done2.load(Ordering::SeqCst) {
let (socket, _) = listener.accept().unwrap();
// Let's implement a very naive static file HTTP server.
let mut buf = BufReader::new(socket);

// First, the request line:
// GET /path HTTPVERSION
line.clear();
if buf.read_line(&mut line).unwrap() == 0 {
// Connection terminated.
continue;
}

assert!(line.starts_with("GET "), "got non-GET request: {}", line);
let path = PathBuf::from(
line.split_whitespace()
.skip(1)
.next()
.unwrap()
.trim_start_matches('/'),
);

let file = registry_path.join(path);
if file.exists() {
// Grab some other headers we may care about.
let mut if_modified_since = None;
let mut if_none_match = None;
loop {
line.clear();
if buf.read_line(&mut line).unwrap() == 0 {
continue 'server;
}

if line == "\r\n" {
// End of headers.
line.clear();
break;
}

let value = line
.splitn(2, ':')
.skip(1)
.next()
.map(|v| v.trim())
.unwrap();

if line.starts_with("If-Modified-Since:") {
if_modified_since = Some(value.to_owned());
} else if line.starts_with("If-None-Match:") {
if_none_match = Some(value.trim_matches('"').to_owned());
}
}

// Now grab info about the file.
let data = fs::read(&file).unwrap();
let etag = Sha256::new().update(&data).finish_hex();
let last_modified = format!("{:?}", file.metadata().unwrap().modified().unwrap());

// Start to construct our response:
let mut any_match = false;
let mut all_match = true;
if let Some(expected) = if_none_match {
if etag != expected {
all_match = false;
} else {
any_match = true;
}
}
if let Some(expected) = if_modified_since {
// NOTE: Equality comparison is good enough for tests.
if last_modified != expected {
all_match = false;
} else {
any_match = true;
}
}

// Write out the main response line.
if any_match && all_match {
buf.get_mut()
.write_all(b"HTTP/1.1 304 Not Modified\r\n")
.unwrap();
} else {
buf.get_mut().write_all(b"HTTP/1.1 200 OK\r\n").unwrap();
}
// TODO: Support 451 for crate index deletions.

// Write out other headers.
buf.get_mut()
.write_all(format!("Content-Length: {}\r\n", data.len()).as_bytes())
.unwrap();
buf.get_mut()
.write_all(format!("ETag: \"{}\"\r\n", etag).as_bytes())
.unwrap();
buf.get_mut()
.write_all(format!("Last-Modified: {}\r\n", last_modified).as_bytes())
.unwrap();

// And finally, write out the body.
buf.get_mut().write_all(b"\r\n").unwrap();
buf.get_mut().write_all(&data).unwrap();
} else {
loop {
line.clear();
if buf.read_line(&mut line).unwrap() == 0 {
// Connection terminated.
continue 'server;
}

if line == "\r\n" {
break;
}
}

buf.get_mut()
.write_all(b"HTTP/1.1 404 Not Found\r\n\r\n")
.unwrap();
buf.get_mut().write_all(b"\r\n").unwrap();
}
buf.get_mut().flush().unwrap();
}
});

RegistryServer {
addr,
server: Some(t),
done,
}
}

/// Creates a new on-disk registry.
pub fn init_registry(registry_path: PathBuf, dl_url: String, api_url: Url, api_path: PathBuf) {
// Initialize a new registry.
Expand Down
2 changes: 2 additions & 0 deletions src/cargo/core/features.rs
Original file line number Diff line number Diff line change
Expand Up @@ -650,6 +650,7 @@ unstable_cli_options!(
no_index_update: bool = ("Do not update the registry index even if the cache is outdated"),
panic_abort_tests: bool = ("Enable support to run tests with -Cpanic=abort"),
host_config: bool = ("Enable the [host] section in the .cargo/config.toml file"),
http_registry: bool = ("Support HTTP-based crate registries"),
target_applies_to_host: bool = ("Enable the `target-applies-to-host` key in the .cargo/config.toml file"),
rustdoc_map: bool = ("Allow passing external documentation mappings to rustdoc"),
separate_nightlies: bool = (HIDDEN),
Expand Down Expand Up @@ -875,6 +876,7 @@ impl CliUnstable {
"multitarget" => self.multitarget = parse_empty(k, v)?,
"rustdoc-map" => self.rustdoc_map = parse_empty(k, v)?,
"terminal-width" => self.terminal_width = Some(parse_usize_opt(v)?),
"http-registry" => self.http_registry = parse_empty(k, v)?,
"namespaced-features" => stabilized_warn(k, "1.60", STABILISED_NAMESPACED_FEATURES),
"weak-dep-features" => stabilized_warn(k, "1.60", STABILIZED_WEAK_DEP_FEATURES),
"credential-process" => self.credential_process = parse_empty(k, v)?,
Expand Down
11 changes: 2 additions & 9 deletions src/cargo/core/package.rs
Original file line number Diff line number Diff line change
Expand Up @@ -405,13 +405,6 @@ impl<'cfg> PackageSet<'cfg> {
) -> CargoResult<PackageSet<'cfg>> {
// We've enabled the `http2` feature of `curl` in Cargo, so treat
// failures here as fatal as it would indicate a build-time problem.
//
// Note that the multiplexing support is pretty new so we're having it
// off-by-default temporarily.
//
// Also note that pipelining is disabled as curl authors have indicated
// that it's buggy, and we've empirically seen that it's buggy with HTTP
// proxies.
let mut multi = Multi::new();
Eh2406 marked this conversation as resolved.
Show resolved Hide resolved
let multiplexing = config.http_config()?.multiplexing.unwrap_or(true);
multi
Expand Down Expand Up @@ -700,7 +693,7 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> {
return Ok(Some(pkg));
}

// Ask the original source fo this `PackageId` for the corresponding
// Ask the original source for this `PackageId` for the corresponding
// package. That may immediately come back and tell us that the package
// is ready, or it could tell us that it needs to be downloaded.
let mut sources = self.set.sources.borrow_mut();
Expand Down Expand Up @@ -757,7 +750,7 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> {
// initiate dozens of connections to crates.io, but rather only one.
// Once the main one is opened we realized that pipelining is possible
// and multiplexing is possible with static.crates.io. All in all this
// reduces the number of connections done to a more manageable state.
// reduces the number of connections down to a more manageable state.
try_old_curl!(handle.pipewait(true), "pipewait");

handle.write_function(move |buf| {
Expand Down
7 changes: 6 additions & 1 deletion src/cargo/core/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,11 @@ impl<'cfg> PackageRegistry<'cfg> {
}

self.load(namespace, kind)?;

// This isn't strictly necessary since it will be called later.
// However it improves error messages for sources that issue errors
// in `block_until_ready` because the callers here have context about
// which deps are being resolved.
self.block_until_ready()?;
Ok(())
}
Expand Down Expand Up @@ -273,7 +278,7 @@ impl<'cfg> PackageRegistry<'cfg> {
// First up we need to actually resolve each `deps` specification to
// precisely one summary. We're not using the `query` method below as it
// internally uses maps we're building up as part of this method
// (`patches_available` and `patches). Instead we're going straight to
// (`patches_available` and `patches`). Instead we're going straight to
// the source to load information from it.
//
// Remember that each dependency listed in `[patch]` has to resolve to
Expand Down
7 changes: 6 additions & 1 deletion src/cargo/core/source/source_id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,11 @@ impl SourceId {
Ok(SourceId::new(SourceKind::Registry, url, None)?
.with_precise(Some("locked".to_string())))
}
"sparse" => {
let url = string.into_url()?;
Ok(SourceId::new(SourceKind::Registry, url, None)?
.with_precise(Some("locked".to_string())))
Comment on lines 135 to +141
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my own understanding, how come sparse passes in the full string while registry passes in the trailing bit (url)? Do these show up somewhere that we have to maintain compatibility?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

http registries are still SourceKind::Registry, so the sparse+ prefix is how it knows we're in http mode. Otherwise it couldn't tell them apart.

I may be able to make the "registry" one also pass the registry+ prefix for consistency -- as long as it doesn't show up somewhere external as you mentioned.

}
"path" => {
let url = url.into_url()?;
SourceId::new(SourceKind::Path, url, None)
Expand Down Expand Up @@ -301,7 +306,7 @@ impl SourceId {
self,
yanked_whitelist,
config,
))),
)?)),
SourceKind::LocalRegistry => {
let path = match self.inner.url.to_file_path() {
Ok(p) => p,
Expand Down
9 changes: 6 additions & 3 deletions src/cargo/ops/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,7 @@ fn registry(
}
let api_host = {
let _lock = config.acquire_package_cache_lock()?;
let mut src = RegistrySource::remote(sid, &HashSet::new(), config);
let mut src = RegistrySource::remote(sid, &HashSet::new(), config)?;
// Only update the index if the config is not available or `force` is set.
if force_update {
src.invalidate_cache()
Expand Down Expand Up @@ -528,8 +528,11 @@ pub fn http_handle_and_timeout(config: &Config) -> CargoResult<(Easy, HttpTimeou
specified"
)
}
if !config.network_allowed() {
bail!("can't make HTTP request in the offline mode")
if config.offline() {
bail!(
"attempting to make an HTTP request, but --offline was \
specified"
)
}

// The timeout option for libcurl by default times out the entire transfer,
Expand Down
10 changes: 3 additions & 7 deletions src/cargo/sources/path.rs
Original file line number Diff line number Diff line change
Expand Up @@ -498,9 +498,7 @@ impl<'cfg> Debug for PathSource<'cfg> {

impl<'cfg> Source for PathSource<'cfg> {
fn query(&mut self, dep: &Dependency, f: &mut dyn FnMut(Summary)) -> Poll<CargoResult<()>> {
if !self.updated {
return Poll::Pending;
}
arlosi marked this conversation as resolved.
Show resolved Hide resolved
self.update()?;
for s in self.packages.iter().map(|p| p.summary()) {
if dep.matches(s) {
f(s.clone())
Expand All @@ -514,9 +512,7 @@ impl<'cfg> Source for PathSource<'cfg> {
_dep: &Dependency,
f: &mut dyn FnMut(Summary),
) -> Poll<CargoResult<()>> {
if !self.updated {
return Poll::Pending;
}
self.update()?;
for s in self.packages.iter().map(|p| p.summary()) {
f(s.clone())
}
Expand All @@ -537,7 +533,7 @@ impl<'cfg> Source for PathSource<'cfg> {

fn download(&mut self, id: PackageId) -> CargoResult<MaybePackage> {
trace!("getting packages; id={}", id);

self.update()?;
let pkg = self.packages.iter().find(|pkg| pkg.package_id() == id);
pkg.cloned()
.map(MaybePackage::Ready)
Expand Down
Loading