diff --git a/Cargo.lock b/Cargo.lock index 7e2cc86d67..3076c8d31b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -44,6 +44,12 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "anyhow" +version = "1.0.45" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee10e43ae4a853c0a3591d4e2ada1719e553be18199d9da9d4a83f5927c2f5c7" + [[package]] name = "arrayref" version = "0.3.6" @@ -79,6 +85,27 @@ dependencies = [ "syn", ] +[[package]] +name = "async-stream" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "171374e7e3b2504e0e5236e3b59260560f9fe94bfe9ac39ba5e4e929c5590625" +dependencies = [ + "async-stream-impl", + "futures-core", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "648ed8c8d2ce5409ccd57453d9d1b214b342a0d69376a6feda1fd6cae3299308" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "async-trait" version = "0.1.52" @@ -433,6 +460,12 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "fixedbitset" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "398ea4fabe40b9b0d885340a2a991a44c8a645624075ad966d21f88688e2b69e" + [[package]] name = "fnv" version = "1.0.7" @@ -666,11 +699,30 @@ dependencies = [ "indexmap", "slab", "tokio 0.2.25", - "tokio-util", + "tokio-util 0.3.1", "tracing", "tracing-futures", ] +[[package]] +name = "h2" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f072413d126e57991455e0a922b31e4c8ba7c2ffbebf6b78b4f8521397d65cd" +dependencies = [ + "bytes 1.0.1", + "fnv", + "futures-core", + "futures-sink", + "futures-util", + "http", + "indexmap", + "slab", + "tokio 1.15.0", + "tokio-util 0.6.9", + "tracing", +] + [[package]] name = "hash32" version = "0.1.1" @@ -748,11 +800,22 @@ dependencies = [ "http", ] +[[package]] +name = "http-body" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ff4f84919677303da5f147645dbea6b1881f368d03ac84e1dc09031ebd7b2c6" +dependencies = [ + "bytes 1.0.1", + "http", + "pin-project-lite 0.2.6", +] + [[package]] name = "httparse" -version = "1.4.1" +version = "1.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f3a87b616e37e93c22fb19bcd386f02f3af5ea98a25670ad0fce773de23c5e68" +checksum = "acd94fdbe1d4ff688b67b04eee2e17bd50995534a61539e45adfefb45e5e5503" [[package]] name = "httpdate" @@ -760,6 +823,12 @@ version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "494b4d60369511e7dea41cf646832512a94e542f68bb9c49e54518e0f468eb47" +[[package]] +name = "httpdate" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6456b8a6c8f33fee7d958fcd1b60d55b11940a79e63ae87013e6d22e26034440" + [[package]] name = "hyper" version = "0.13.10" @@ -770,20 +839,56 @@ dependencies = [ "futures-channel", "futures-core", "futures-util", - "h2", + "h2 0.2.7", "http", - "http-body", + "http-body 0.3.1", "httparse", - "httpdate", + "httpdate 0.3.2", "itoa", "pin-project", - "socket2", + "socket2 0.3.19", "tokio 0.2.25", "tower-service", "tracing", "want", ] +[[package]] +name = "hyper" +version = "0.14.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b7ec3e62bdc98a2f0393a5048e4c30ef659440ea6e0e572965103e72bd836f55" +dependencies = [ + "bytes 1.0.1", + "futures-channel", + "futures-core", + "futures-util", + "h2 0.3.9", + "http", + "http-body 0.4.4", + "httparse", + "httpdate 1.0.1", + "itoa", + "pin-project-lite 0.2.6", + "socket2 0.4.2", + "tokio 1.15.0", + "tower-service", + "tracing", + "want", +] + +[[package]] +name = "hyper-timeout" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1" +dependencies = [ + "hyper 0.14.16", + "pin-project-lite 0.2.6", + "tokio 1.15.0", + "tokio-io-timeout", +] + [[package]] name = "hyper-tls" version = "0.4.3" @@ -791,12 +896,25 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d979acc56dcb5b8dddba3917601745e877576475aa046df3226eabdecef78eed" dependencies = [ "bytes 0.5.6", - "hyper", + "hyper 0.13.10", "native-tls", "tokio 0.2.25", "tokio-tls", ] +[[package]] +name = "hyper-tls" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905" +dependencies = [ + "bytes 1.0.1", + "hyper 0.14.16", + "native-tls", + "tokio 1.15.0", + "tokio-native-tls", +] + [[package]] name = "idna" version = "0.2.3" @@ -877,6 +995,15 @@ dependencies = [ "either", ] +[[package]] +name = "itertools" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69ddb889f9d0d08a67338271fa9b62996bc788c7796a5c18cf057420aaed5eaf" +dependencies = [ + "either", +] + [[package]] name = "itoa" version = "0.4.7" @@ -1076,11 +1203,17 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "multimap" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a" + [[package]] name = "native-tls" -version = "0.2.7" +version = "0.2.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b8d96b2e1c8da3957d58100b09f102c6d9cfdfced01b7ec5a8974044bb09dbd4" +checksum = "48ba9f7719b5a0f42f338907614285fb5fd70e53858141f69898a1fb7203b24d" dependencies = [ "lazy_static", "libc", @@ -1305,6 +1438,16 @@ dependencies = [ "ucd-trie", ] +[[package]] +name = "petgraph" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4a13a2fa9d0b63e5f22328828741e523766fff0ee9e779316902290dff3f824f" +dependencies = [ + "fixedbitset", + "indexmap", +] + [[package]] name = "pin-project" version = "1.0.7" @@ -1412,6 +1555,59 @@ dependencies = [ "rustc_version 0.2.3", ] +[[package]] +name = "prost" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "444879275cb4fd84958b1a1d5420d15e6fcf7c235fe47f053c9c2a80aceb6001" +dependencies = [ + "bytes 1.0.1", + "prost-derive", +] + +[[package]] +name = "prost-build" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62941722fb675d463659e49c4f3fe1fe792ff24fe5bbaa9c08cd3b98a1c354f5" +dependencies = [ + "bytes 1.0.1", + "heck", + "itertools 0.10.1", + "lazy_static", + "log", + "multimap", + "petgraph", + "prost", + "prost-types", + "regex", + "tempfile", + "which", +] + +[[package]] +name = "prost-derive" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f9cc1a3263e07e0bf68e96268f37665207b49560d98739662cdfaae215c720fe" +dependencies = [ + "anyhow", + "itertools 0.10.1", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "prost-types" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "534b7a0e836e3c482d2693070f982e39e7611da9695d4d1f5a4b186b51faef0a" +dependencies = [ + "bytes 1.0.1", + "prost", +] + [[package]] name = "pwd" version = "1.3.1" @@ -1637,9 +1833,9 @@ dependencies = [ "futures-core", "futures-util", "http", - "http-body", - "hyper", - "hyper-tls", + "http-body 0.3.1", + "hyper 0.13.10", + "hyper-tls 0.4.3", "ipnet", "js-sys", "lazy_static", @@ -1661,6 +1857,42 @@ dependencies = [ "winreg", ] +[[package]] +name = "reqwest" +version = "0.11.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7c4e0a76dc12a116108933f6301b95e83634e0c47b0afbed6abbaa0601e99258" +dependencies = [ + "base64", + "bytes 1.0.1", + "encoding_rs", + "futures-core", + "futures-util", + "http", + "http-body 0.4.4", + "hyper 0.14.16", + "hyper-tls 0.5.0", + "ipnet", + "js-sys", + "lazy_static", + "log", + "mime", + "native-tls", + "percent-encoding", + "pin-project-lite 0.2.6", + "serde 1.0.126", + "serde_json", + "serde_urlencoded", + "tokio 1.15.0", + "tokio-native-tls", + "tokio-util 0.6.9", + "url", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", + "winreg", +] + [[package]] name = "ring" version = "0.16.20" @@ -1830,8 +2062,8 @@ version = "0.21.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "933beb0343c84eefd69a368318e9291b179e09e51982d49c65d7b362b0e9466f" dependencies = [ - "httpdate", - "reqwest", + "httpdate 0.3.2", + "reqwest 0.10.10", "sentry-backtrace", "sentry-contexts", "sentry-core", @@ -2048,6 +2280,16 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "socket2" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5dc90fe6c7be1a323296982db1836d1ea9e47b6839496dde9a541bc496df3516" +dependencies = [ + "libc", + "winapi 0.3.9", +] + [[package]] name = "spfs" version = "0.30.0" @@ -2064,13 +2306,16 @@ dependencies = [ "futures", "gitignore", "glob", + "hyper 0.14.16", "indicatif", - "itertools", + "itertools 0.9.0", "libc", "nix 0.19.1", "palaver", + "prost", "rand 0.7.3", "relative-path", + "reqwest 0.11.8", "ring", "rstest", "semver 0.11.0", @@ -2087,6 +2332,9 @@ dependencies = [ "thiserror", "tokio 1.15.0", "tokio-stream", + "tokio-util 0.6.9", + "tonic", + "tonic-build", "tracing", "tracing-subscriber", "unicode_reader", @@ -2330,6 +2578,16 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "tokio-io-timeout" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "90c49f106be240de154571dd31fbe48acb10ba6c6dd6f6517ad603abffa42de9" +dependencies = [ + "pin-project-lite 0.2.6", + "tokio 1.15.0", +] + [[package]] name = "tokio-macros" version = "1.7.0" @@ -2341,6 +2599,16 @@ dependencies = [ "syn", ] +[[package]] +name = "tokio-native-tls" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7d995660bd2b7f8c1568414c1126076c13fbb725c40112dc0120b78eb9b717b" +dependencies = [ + "native-tls", + "tokio 1.15.0", +] + [[package]] name = "tokio-stream" version = "0.1.8" @@ -2376,6 +2644,21 @@ dependencies = [ "tokio 0.2.25", ] +[[package]] +name = "tokio-util" +version = "0.6.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e99e1983e5d376cd8eb4b66604d2e99e79f5bd988c3055891dcd8c9e2604cc0" +dependencies = [ + "bytes 1.0.1", + "futures-core", + "futures-io", + "futures-sink", + "log", + "pin-project-lite 0.2.6", + "tokio 1.15.0", +] + [[package]] name = "toml" version = "0.5.8" @@ -2385,6 +2668,75 @@ dependencies = [ "serde 1.0.126", ] +[[package]] +name = "tonic" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "24203b79cf2d68909da91178db3026e77054effba0c5d93deb870d3ca7b35afa" +dependencies = [ + "async-stream", + "async-trait", + "base64", + "bytes 1.0.1", + "futures-core", + "futures-util", + "h2 0.3.9", + "http", + "http-body 0.4.4", + "hyper 0.14.16", + "hyper-timeout", + "percent-encoding", + "pin-project", + "prost", + "prost-derive", + "tokio 1.15.0", + "tokio-stream", + "tokio-util 0.6.9", + "tower", + "tower-layer", + "tower-service", + "tracing", + "tracing-futures", +] + +[[package]] +name = "tonic-build" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "88358bb1dcfeb62dcce85c63006cafb964b7be481d522b7e09589d4d1e718d2a" +dependencies = [ + "proc-macro2", + "prost-build", + "quote", + "syn", +] + +[[package]] +name = "tower" +version = "0.4.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f60422bc7fefa2f3ec70359b8ff1caff59d785877eb70595904605bcc412470f" +dependencies = [ + "futures-core", + "futures-util", + "indexmap", + "pin-project", + "rand 0.8.4", + "slab", + "tokio 1.15.0", + "tokio-stream", + "tokio-util 0.6.9", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tower-layer" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "343bc9466d3fe6b0f960ef45960509f84480bf4fd96f92901afe7ff3df9d3a62" + [[package]] name = "tower-service" version = "0.3.1" @@ -2723,6 +3075,17 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "which" +version = "4.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ea187a8ef279bc014ec368c27a920da2024d2a711109bfbe3440585d5cf27ad9" +dependencies = [ + "either", + "lazy_static", + "libc", +] + [[package]] name = "whoami" version = "0.9.0" diff --git a/Cargo.toml b/Cargo.toml index 1bbebfc846..2e2dccdf3c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -37,6 +37,21 @@ path = "src/cli/cmd_init.rs" [[bin]] name = "spfs-render" path = "src/cli/cmd_render.rs" +[[bin]] +name = "spfs-server" +path = "src/cli/cmd_server.rs" +required-features = ["server"] + +[features] +default = ["server"] +server = [ + "tokio/signal", + "tokio-stream/net", + "hyper/server", + "tokio-util/codec", + "tokio-util/io", + "tokio-util/io-util", +] [dependencies] async-trait = "0.1.52" @@ -51,13 +66,16 @@ faccess = "0.2.3" futures = "0.3.9" gitignore = "1.0.7" glob = "0.3.0" +hyper = { version = "0.14.16", features = ["client"] } indicatif = "0.16.2" itertools = "0.9.0" libc = "0.2.80" nix = "0.19.0" palaver = "0.2.8" +prost = "0.9.0" rand = "0.7.3" relative-path = "1.3.2" +reqwest = { version = "0.11.7", features = ["stream"] } ring = "0.16.15" semver = "0.11.0" sentry = "0.21.0" @@ -79,7 +97,9 @@ tokio = { version = "1.15", features = [ "sync", "process", ] } -tokio-stream = "0.1.8" +tokio-stream = { version = "0.1", features = ["net"], optional = true } +tokio-util = { version = "0.6", features = ["compat"] } +tonic = "0.6.1" tracing = "0.1.22" tracing-subscriber = "0.2.15" unicode_reader = "1.0.1" @@ -89,6 +109,9 @@ walkdir = "2.3.1" whoami = "0.9.0" thiserror = "1.0" +[build-dependencies] +tonic-build = "0.6.0" + [dev-dependencies] rstest = { version = "0.12", default_features = false } serial_test = "0.5.1" diff --git a/Makefile b/Makefile index 4f6370f67b..e0d062998d 100644 --- a/Makefile +++ b/Makefile @@ -6,10 +6,10 @@ default: debug debug: cd $(SOURCE_ROOT) - cargo build + cargo build --all test: - cargo test + cargo test --all rpm: cd $(SOURCE_ROOT) diff --git a/build.rs b/build.rs new file mode 100644 index 0000000000..532a2701bd --- /dev/null +++ b/build.rs @@ -0,0 +1,13 @@ +fn main() -> Result<(), Box> { + tonic_build::configure().compile( + &[ + "src/proto/defs/database.proto", + "src/proto/defs/repository.proto", + "src/proto/defs/payload.proto", + "src/proto/defs/tag.proto", + "src/proto/defs/types.proto", + ], + &["src/proto/defs"], + )?; + Ok(()) +} diff --git a/spfs.spec b/spfs.spec index 93dbbe98b6..798112eae3 100644 --- a/spfs.spec +++ b/spfs.spec @@ -24,7 +24,7 @@ Filesystem isolation, capture, and distribution. %setup -q %build -cargo build --release --verbose +cargo build --release --verbose --all %install mkdir -p %{buildroot}/usr/bin @@ -42,6 +42,7 @@ done /usr/bin/spfs-push /usr/bin/spfs-pull /usr/bin/spfs-init +/usr/bin/spfs-server %caps(cap_chown,cap_fowner+ep) /usr/bin/spfs-render %caps(cap_sys_chroot,cap_sys_admin+ep) /usr/bin/spfs-join %caps(cap_setuid,cap_chown,cap_mknod,cap_sys_admin,cap_fowner+ep) /usr/bin/spfs-enter diff --git a/src/bootstrap_test.rs b/src/bootstrap_test.rs index 7f76bf92cd..b5a129ccaf 100644 --- a/src/bootstrap_test.rs +++ b/src/bootstrap_test.rs @@ -8,7 +8,7 @@ use super::{build_interactive_shell_cmd, build_shell_initialized_command}; use crate::{resolve::which, runtime}; use std::{ffi::OsString, process::Command}; -fixtures!(); +use crate::fixtures::*; #[rstest( shell, diff --git a/src/clean_test.rs b/src/clean_test.rs index 295001fdf6..34b55635e1 100644 --- a/src/clean_test.rs +++ b/src/clean_test.rs @@ -14,12 +14,12 @@ use crate::{graph, storage, tracking, Error}; use std::collections::HashSet; use storage::prelude::*; -fixtures!(); +use crate::fixtures::*; #[rstest] #[tokio::test] async fn test_get_attached_objects(#[future] tmprepo: TempRepo) { - let (_td, tmprepo) = tmprepo.await; + let tmprepo = tmprepo.await; let reader = Box::pin("hello, world".as_bytes()); let (payload_digest, _) = tmprepo.write_data(reader).await.unwrap(); let blob = graph::Blob::new(payload_digest, 0); @@ -42,7 +42,7 @@ async fn test_get_attached_objects(#[future] tmprepo: TempRepo) { #[rstest] #[tokio::test] async fn test_get_attached_payloads(#[future] tmprepo: TempRepo) { - let (_td, tmprepo) = tmprepo.await; + let tmprepo = tmprepo.await; let reader = Box::pin("hello, world".as_bytes()); let (payload_digest, _) = tmprepo.write_data(reader).await.unwrap(); let mut expected = HashSet::new(); @@ -65,9 +65,13 @@ async fn test_get_attached_payloads(#[future] tmprepo: TempRepo) { #[rstest] #[tokio::test] -async fn test_get_attached_unattached_objects_blob(#[future] tmprepo: TempRepo) { +async fn test_get_attached_unattached_objects_blob( + #[future] tmprepo: TempRepo, + tmpdir: tempdir::TempDir, +) { init_logging(); - let (tmpdir, tmprepo) = tmprepo.await; + let tmprepo = tmprepo.await; + let data_dir = tmpdir.path().join("data"); ensure(data_dir.join("file.txt"), "hello, world"); @@ -106,10 +110,10 @@ async fn test_get_attached_unattached_objects_blob(#[future] tmprepo: TempRepo) #[rstest] #[tokio::test] -async fn test_clean_untagged_objects(#[future] tmprepo: TempRepo) { +async fn test_clean_untagged_objects(#[future] tmprepo: TempRepo, tmpdir: tempdir::TempDir) { init_logging(); + let tmprepo = tmprepo.await; - let (tmpdir, tmprepo) = tmprepo.await; let data_dir_1 = tmpdir.path().join("data"); ensure(data_dir_1.join("dir/dir/test.file"), "1 hello"); ensure(data_dir_1.join("dir/dir/test.file2"), "1 hello, world"); @@ -168,7 +172,7 @@ async fn test_clean_untagged_objects(#[future] tmprepo: TempRepo) { #[rstest] #[tokio::test] async fn test_clean_untagged_objects_layers_platforms(#[future] tmprepo: TempRepo) { - let (_td, tmprepo) = tmprepo.await; + let tmprepo = tmprepo.await; let manifest = tracking::Manifest::default(); let layer = tmprepo .create_layer(&graph::Manifest::from(&manifest)) @@ -198,15 +202,10 @@ async fn test_clean_untagged_objects_layers_platforms(#[future] tmprepo: TempRep #[rstest] #[tokio::test] -async fn test_clean_manifest_renders(#[future] tmprepo: TempRepo) { - let (tmpdir, tmprepo) = tmprepo.await; - let tmprepo = match tmprepo { - storage::RepositoryHandle::FS(repo) => repo, - _ => { - println!("Unsupported repo for this test"); - return; - } - }; +async fn test_clean_manifest_renders(tmpdir: tempdir::TempDir) { + let tmprepo = storage::fs::FSRepository::create(tmpdir.path()) + .await + .unwrap(); let data_dir = tmpdir.path().join("data"); ensure(data_dir.join("dir/dir/file.txt"), "hello"); @@ -234,7 +233,7 @@ async fn test_clean_manifest_renders(#[future] tmprepo: TempRepo) { .await .expect("failed to clean repo"); - let files = list_files(tmprepo.renders.unwrap().root()); + let files = list_files(tmprepo.renders.as_ref().unwrap().root()); assert!(files.is_empty(), "should remove all created data files"); } diff --git a/src/cli/bin.rs b/src/cli/bin.rs index f43522807c..8caf1a0f80 100644 --- a/src/cli/bin.rs +++ b/src/cli/bin.rs @@ -42,6 +42,7 @@ main!(Opt); \n pull pull one or more object to the local repository\ \n push push one or more objects to a remote repository\ \n render render the contents of an environment or layer\ + \n server run an spfs server (if installed)\ " )] pub struct Opt { diff --git a/src/cli/cmd_check.rs b/src/cli/cmd_check.rs index a1fcbbee18..3bb86452b1 100644 --- a/src/cli/cmd_check.rs +++ b/src/cli/cmd_check.rs @@ -27,6 +27,7 @@ impl CmdCheck { let errors = match repo { RepositoryHandle::FS(repo) => spfs::graph::check_database_integrity(repo).await, RepositoryHandle::Tar(repo) => spfs::graph::check_database_integrity(repo).await, + RepositoryHandle::Rpc(repo) => spfs::graph::check_database_integrity(repo).await, }; for error in errors.iter() { tracing::error!("{:?}", error); diff --git a/src/cli/cmd_server.rs b/src/cli/cmd_server.rs new file mode 100644 index 0000000000..3a38102147 --- /dev/null +++ b/src/cli/cmd_server.rs @@ -0,0 +1,89 @@ +// Copyright (c) 2021 Sony Pictures Imageworks, et al. +// SPDX-License-Identifier: Apache-2.0 +// https://github.com/imageworks/spk +use structopt::StructOpt; + +#[macro_use] +mod args; + +main!(CmdServer); + +#[derive(Debug, StructOpt)] +pub struct CmdServer { + #[structopt(short = "v", long = "verbose", parse(from_occurrences))] + pub verbose: usize, + #[structopt( + long = "remote", + short = "r", + about = "Serve a configured remote repository instead of the local one" + )] + remote: Option, + #[structopt( + long = "payloads-root", + default_value = "http://localhost", + about = "The external root url that clients can use to connect to this server" + )] + payloads_root: url::Url, + // 7737 = spfs on a dial pad + #[structopt( + default_value = "0.0.0.0:7737", + about = "The address to listen on for grpc requests" + )] + grpc_address: std::net::SocketAddr, + #[structopt( + default_value = "0.0.0.0:7787", + about = "The address to listen on for http requests" + )] + http_address: std::net::SocketAddr, +} + +impl CmdServer { + pub async fn run(&mut self, config: &spfs::Config) -> spfs::Result { + let repo = match &self.remote { + Some(remote) => config.get_remote(remote).await?, + None => config.get_repository().await?.into(), + }; + let repo = std::sync::Arc::new(repo); + + let payload_service = + spfs::server::PayloadService::new(repo.clone(), self.payloads_root.clone()); + let http_server = { + let payload_service = payload_service.clone(); + hyper::Server::bind(&self.http_address).serve(hyper::service::make_service_fn( + move |_| { + let s = payload_service.clone(); + async move { Ok::<_, std::convert::Infallible>(s) } + }, + )) + }; + let http_future = http_server.with_graceful_shutdown(async { + if let Err(err) = tokio::signal::ctrl_c().await { + tracing::error!(?err, "Failed to setup graceful shutdown handler"); + }; + tracing::info!("shutting down http server..."); + }); + let grpc_future = tonic::transport::Server::builder() + .add_service(spfs::server::Repository::new_srv()) + .add_service(spfs::server::TagService::new_srv(repo.clone())) + .add_service(spfs::server::DatabaseService::new_srv(repo)) + .add_service(payload_service.into_srv()) + .serve_with_shutdown(self.grpc_address, async { + if let Err(err) = tokio::signal::ctrl_c().await { + tracing::error!(?err, "Failed to setup graceful shutdown handler"); + }; + tracing::info!("shutting down gRPC server..."); + }); + tracing::info!("listening on: {}, {}", self.grpc_address, self.http_address); + + // TODO: stop the other server when one fails so that + // the process can exit + let (grpc_result, http_result) = tokio::join!(grpc_future, http_future,); + if let Err(err) = grpc_result { + tracing::error!("gRPC server failed: {:?}", err); + } + if let Err(err) = http_result { + tracing::error!("http server failed: {:?}", err); + } + Ok(0) + } +} diff --git a/src/commit_test.rs b/src/commit_test.rs index 6f4348936e..c455128340 100644 --- a/src/commit_test.rs +++ b/src/commit_test.rs @@ -7,7 +7,7 @@ use rstest::rstest; use super::{commit_layer, commit_platform}; use crate::{runtime, Error}; -fixtures!(); +use crate::fixtures::*; #[rstest] #[tokio::test] async fn test_commit_empty(tmpdir: tempdir::TempDir) { diff --git a/src/encoding/hash.rs b/src/encoding/hash.rs index 06d73f30e1..09dd183f96 100644 --- a/src/encoding/hash.rs +++ b/src/encoding/hash.rs @@ -132,6 +132,13 @@ where /// Write this object in binary format. fn encode(&self, writer: &mut impl Write) -> Result<()>; + + /// Encode this object into it's binary form in memory. + fn encode_to_bytes(&self) -> Result> { + let mut buf = Vec::new(); + self.encode(&mut buf)?; + Ok(buf) + } } pub trait Decodable @@ -263,6 +270,9 @@ impl<'a> Digest { pub fn as_bytes(&'a self) -> &'a [u8] { self.0.as_ref() } + pub fn into_bytes(self) -> [u8; DIGEST_SIZE] { + self.0 + } pub fn from_bytes(digest_bytes: &[u8]) -> Result { match digest_bytes.try_into() { Err(err) => Err(Error::new(format!( diff --git a/src/error.rs b/src/error.rs index e7b360b2a1..68dab54515 100644 --- a/src/error.rs +++ b/src/error.rs @@ -27,6 +27,8 @@ pub enum Error { InvalidDateTime(#[from] chrono::ParseError), #[error(transparent)] Caps(#[from] caps::errors::CapsError), + #[error("Error communicating with the server: {0:?}")] + Tonic(#[from] tonic::Status), /// Denotes a missing object or one that is not present in the database. #[error("Unknown Object: {0}")] diff --git a/src/fixtures.rs b/src/fixtures.rs index 067547c3f6..33379d82d1 100644 --- a/src/fixtures.rs +++ b/src/fixtures.rs @@ -2,90 +2,213 @@ // SPDX-License-Identifier: Apache-2.0 // https://github.com/imageworks/spk -macro_rules! fixtures { - () => { - use rstest::fixture; - use tempdir::TempDir; +use std::sync::Arc; - #[allow(dead_code)] - type TempRepo = (TempDir, spfs::storage::RepositoryHandle); +use crate as spfs; +use rstest::fixture; +use tempdir::TempDir; - #[allow(dead_code)] - fn init_logging() { - let sub = tracing_subscriber::FmtSubscriber::builder() - .with_max_level(tracing::Level::TRACE) - .without_time() - .with_test_writer() - .finish(); - let _ = tracing::subscriber::set_global_default(sub); +pub enum TempRepo { + FS(spfs::storage::RepositoryHandle, TempDir), + Tar(spfs::storage::RepositoryHandle, TempDir), + Rpc { + repo: spfs::storage::RepositoryHandle, + grpc_join_handle: Option>, + http_join_handle: Option>, + grpc_shutdown: std::sync::mpsc::Sender<()>, + http_shutdown: std::sync::mpsc::Sender<()>, + tmpdir: TempDir, + }, +} + +impl std::ops::Deref for TempRepo { + type Target = spfs::storage::RepositoryHandle; + fn deref(&self) -> &Self::Target { + match self { + Self::FS(r, _) => r, + Self::Tar(r, _) => r, + Self::Rpc { repo, .. } => repo, } + } +} - #[fixture] - fn spfs_binary() -> std::path::PathBuf { - static BUILD_BIN: std::sync::Once = std::sync::Once::new(); - BUILD_BIN.call_once(|| { - let mut command = std::process::Command::new(std::env::var("CARGO").unwrap()); - command.args(&["build", "--all"]); - if Some(0) - != command - .status() - .expect("failed to build binary to test with") - .code() - { - panic!("failed to build binary to test with"); - }; - }); - let mut path = std::env::current_exe().expect("test must have current binary path"); - loop { - { - let parent = path.parent(); - if parent.is_none() { - panic!("cannot find spfs binary to test"); - } - let parent = parent.unwrap(); - if parent.is_dir() && parent.file_name().unwrap() == "target" { - break; - } - } - path.pop(); - } +impl std::ops::DerefMut for TempRepo { + fn deref_mut(&mut self) -> &mut Self::Target { + match self { + Self::FS(r, _) => r, + Self::Tar(r, _) => r, + Self::Rpc { repo, .. } => repo, + } + } +} - path.push(env!("CARGO_PKG_NAME").to_string()); - path +impl Drop for TempRepo { + fn drop(&mut self) { + if let Self::Rpc { + grpc_shutdown, + http_shutdown, + .. + } = self + { + grpc_shutdown + .send(()) + .expect("failed to send grpc server shutdown signal"); + http_shutdown + .send(()) + .expect("failed to send http server shutdown signal"); } + } +} - use crate as spfs; - #[fixture] - fn tmpdir() -> TempDir { - TempDir::new("spfs-test-").expect("failed to create dir for test") +pub fn init_logging() { + let sub = tracing_subscriber::FmtSubscriber::builder() + .with_env_filter("spfs=trace") + .without_time() + .with_test_writer() + .finish(); + let _ = tracing::subscriber::set_global_default(sub); +} + +#[fixture] +pub fn spfs_binary() -> std::path::PathBuf { + static BUILD_BIN: std::sync::Once = std::sync::Once::new(); + BUILD_BIN.call_once(|| { + let mut command = std::process::Command::new(std::env::var("CARGO").unwrap()); + command.args(&["build", "--all"]); + let code = command + .status() + .expect("failed to build binary to test with") + .code(); + if Some(0) != code { + panic!("failed to build binary to test with: {:?}", code); + }; + }); + let mut path = std::env::current_exe().expect("test must have current binary path"); + loop { + let parent = path.parent(); + if parent.is_none() { + panic!("cannot find spfs binary to test"); + } + let parent = parent.unwrap(); + if parent.is_dir() && parent.file_name() == Some(std::ffi::OsStr::new("debug")) { + path.pop(); + break; } - #[fixture(kind = "fs")] - async fn tmprepo(kind: &str) -> (tempdir::TempDir, spfs::storage::RepositoryHandle) { - let tmpdir = tmpdir(); - let repo = match kind { - "fs" => spfs::storage::fs::FSRepository::create(tmpdir.path().join("repo")) + path.pop(); + } + path.push(env!("CARGO_PKG_NAME").to_string()); + path +} + +#[fixture] +pub fn tmpdir() -> TempDir { + TempDir::new("spfs-test-").expect("failed to create dir for test") +} + +#[fixture(kind = "fs")] +pub async fn tmprepo(kind: &str) -> TempRepo { + init_logging(); + let tmpdir = tmpdir(); + match kind { + "fs" => { + let repo = spfs::storage::fs::FSRepository::create(tmpdir.path().join("repo")) + .await + .unwrap() + .into(); + TempRepo::FS(repo, tmpdir) + } + "tar" => { + let repo = spfs::storage::tar::TarRepository::create(tmpdir.path().join("repo.tar")) + .await + .unwrap() + .into(); + TempRepo::Tar(repo, tmpdir) + } + #[cfg(feature = "server")] + "rpc" => { + let repo = Arc::new(spfs::storage::RepositoryHandle::FS( + spfs::storage::fs::FSRepository::create(tmpdir.path().join("repo")) .await - .unwrap() - .into(), - "tar" => spfs::storage::tar::TarRepository::create(tmpdir.path().join("repo.tar")) + .unwrap(), + )); + let listen: std::net::SocketAddr = "127.0.0.1:0".parse().unwrap(); + let http_listener = std::net::TcpListener::bind(listen).unwrap(); + let local_http_addr = http_listener.local_addr().unwrap(); + let payload_service = spfs::server::PayloadService::new( + repo.clone(), + format!("http://{}", local_http_addr).parse().unwrap(), + ); + let (grpc_shutdown, grpc_shutdown_recv) = std::sync::mpsc::channel::<()>(); + let (http_shutdown, http_shutdown_recv) = std::sync::mpsc::channel::<()>(); + let grpc_listener = tokio::net::TcpListener::bind(listen).await.unwrap(); + let local_grpc_addr = grpc_listener.local_addr().unwrap(); + let incoming = tokio_stream::wrappers::TcpListenerStream::new(grpc_listener); + let grpc_future = tonic::transport::Server::builder() + .add_service(spfs::server::Repository::new_srv()) + .add_service(spfs::server::TagService::new_srv(repo.clone())) + .add_service(spfs::server::DatabaseService::new_srv(repo)) + .add_service(payload_service.clone().into_srv()) + .serve_with_incoming_shutdown(incoming, async move { + // use a blocking task to avoid locking up the whole server + // with this very synchronus channel recv process + tokio::task::spawn_blocking(move || { + grpc_shutdown_recv + .recv() + .expect("failed to get server shutdown signal"); + }) .await .unwrap() - .into(), - _ => panic!("unknown repo kind '{}'", kind), + }); + tracing::debug!("test rpc server listening: {}", local_grpc_addr); + let grpc_join_handle = + tokio::task::spawn(async move { grpc_future.await.expect("test server failed") }); + let http_server = { + hyper::Server::from_tcp(http_listener).unwrap().serve( + hyper::service::make_service_fn(move |_| { + let s = payload_service.clone(); + async move { Ok::<_, std::convert::Infallible>(s) } + }), + ) }; - (tmpdir, repo) + let http_future = http_server.with_graceful_shutdown(async { + // use a blocking task to avoid locking up the whole server + // with this very synchronus channel recv process + tokio::task::spawn_blocking(move || { + http_shutdown_recv + .recv() + .expect("failed to get http server shutdown signal"); + }) + .await + .unwrap() + }); + let http_join_handle = + tokio::task::spawn(async move { http_future.await.expect("http server failed") }); + let url = format!("http2://{}", local_grpc_addr).parse().unwrap(); + tracing::debug!("Connected to rpc test repo: {}", url); + let repo = spfs::storage::rpc::RpcRepository::connect(url) + .await + .unwrap() + .into(); + TempRepo::Rpc { + repo, + grpc_join_handle: Some(grpc_join_handle), + http_join_handle: Some(http_join_handle), + grpc_shutdown, + http_shutdown, + tmpdir, + } } + _ => panic!("unknown repo kind '{}'", kind), + } +} - #[allow(dead_code)] - fn ensure(path: std::path::PathBuf, data: &str) { - std::fs::create_dir_all(path.parent().unwrap()).expect("failed to make dirs"); - let mut file = std::fs::OpenOptions::new() - .create(true) - .write(true) - .open(path) - .expect("failed to create file"); - std::io::copy(&mut data.as_bytes(), &mut file).expect("failed to write file data"); - } - }; +pub fn ensure(path: std::path::PathBuf, data: &str) { + std::fs::create_dir_all(path.parent().unwrap()).expect("failed to make dirs"); + let mut file = std::fs::OpenOptions::new() + .create(true) + .write(true) + .open(path) + .expect("failed to create file"); + std::io::copy(&mut data.as_bytes(), &mut file).expect("failed to write file data"); } diff --git a/src/graph/entry_test.rs b/src/graph/entry_test.rs index f5bd2100b1..5ff2c111fe 100644 --- a/src/graph/entry_test.rs +++ b/src/graph/entry_test.rs @@ -8,7 +8,7 @@ use super::Entry; use crate::encoding::{self, Encodable}; use crate::tracking::EntryKind; -fixtures!(); +use crate::fixtures::*; #[rstest(entry, digest, case(Entry{ diff --git a/src/graph/manifest.rs b/src/graph/manifest.rs index 711f519a4f..26b619bca2 100644 --- a/src/graph/manifest.rs +++ b/src/graph/manifest.rs @@ -63,6 +63,18 @@ impl From<&tracking::Entry> for Manifest { } impl Manifest { + /// Create a new manifest with the given tree as the root. + /// + /// It's very possible to create an internally inconsistent manifest + /// this way, so ensure that any additional tree entries in the given + /// root tree are subsequently inserted into the created manifest + pub(crate) fn new(root: Tree) -> Self { + Self { + root, + ..Default::default() + } + } + /// Return the root tree object of this manifest. pub fn root(&self) -> &Tree { &self.root @@ -83,7 +95,7 @@ impl Manifest { /// Add a tree to be tracked in this manifest, returning /// it if the same tree already exists. - fn insert_tree(&mut self, tree: Tree) -> Result> { + pub(crate) fn insert_tree(&mut self, tree: Tree) -> Result> { let digest = tree.digest()?; if let Some(tree) = self.trees.insert(digest, tree) { Ok(Some(tree)) diff --git a/src/graph/manifest_test.rs b/src/graph/manifest_test.rs index b5af5cda1a..bfc1a133e5 100644 --- a/src/graph/manifest_test.rs +++ b/src/graph/manifest_test.rs @@ -7,8 +7,6 @@ use rstest::rstest; use super::Entry; use crate::{encoding, tracking}; -fixtures!(); - #[rstest] fn test_entry_blobs_compare_name() { let a = Entry { diff --git a/src/graph/tree_test.rs b/src/graph/tree_test.rs index e73573e3fc..8cb2907b67 100644 --- a/src/graph/tree_test.rs +++ b/src/graph/tree_test.rs @@ -8,7 +8,7 @@ use super::{Entry, Tree}; use crate::encoding::{self, Encodable}; use crate::tracking::EntryKind; -fixtures!(); +use crate::fixtures::*; #[rstest(entries, digest, case(vec![ diff --git a/src/lib.rs b/src/lib.rs index cde65693ec..76a8e4c8c8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -10,7 +10,6 @@ extern crate serde_derive; pub const VERSION: &str = env!("CARGO_PKG_VERSION"); #[cfg(test)] -#[macro_use] pub mod fixtures; pub mod encoding; @@ -18,7 +17,10 @@ pub mod env; pub mod graph; pub mod io; pub mod prelude; +pub mod proto; pub mod runtime; +#[cfg(feature = "server")] +pub mod server; pub mod storage; pub mod tracking; diff --git a/src/proto/conversions.rs b/src/proto/conversions.rs new file mode 100644 index 0000000000..a754bbcb34 --- /dev/null +++ b/src/proto/conversions.rs @@ -0,0 +1,323 @@ +// Copyright (c) 2021 Sony Pictures Imageworks, et al. +// SPDX-License-Identifier: Apache-2.0 +// https://github.com/imageworks/spk +use std::convert::{TryFrom, TryInto}; + +use crate::{encoding, graph, tracking, Error, Result}; + +fn convert_to_datetime(source: Option) -> Result> { + use std::str::FromStr; + let source = + source.ok_or_else(|| Error::String("Expected non-null digest in rpc message".into()))?; + chrono::DateTime::::from_str(&source.iso_timestamp) + .map_err(|err| Error::String(format!("Received invalid timestamp string: {:?}", err))) +} + +fn convert_from_datetime(source: &chrono::DateTime) -> super::DateTime { + super::DateTime { + iso_timestamp: source.to_string(), + } +} + +impl TryFrom> for encoding::Digest { + type Error = Error; + fn try_from(source: Option) -> Result { + source + .ok_or_else(|| Error::String("Expected non-null digest in rpc message".into()))? + .try_into() + } +} + +impl TryFrom for encoding::Digest { + type Error = Error; + fn try_from(source: super::Digest) -> Result { + Self::from_bytes(source.bytes.as_slice()) + } +} + +impl From for super::Digest { + fn from(source: encoding::Digest) -> Self { + Self { + bytes: Vec::from(source.into_bytes()), + } + } +} + +impl From<&encoding::Digest> for super::Digest { + fn from(source: &encoding::Digest) -> Self { + Self { + bytes: source.as_bytes().to_vec(), + } + } +} + +impl TryFrom> for tracking::Tag { + type Error = Error; + fn try_from(source: Option) -> Result { + source + .ok_or_else(|| Error::String("Expected non-null tag in rpc message".into()))? + .try_into() + } +} + +impl TryFrom for tracking::Tag { + type Error = Error; + fn try_from(source: super::Tag) -> Result { + let mut tag = Self::new(source.org, source.name, source.target.try_into()?)?; + tag.parent = source.parent.try_into()?; + tag.user = source.user; + tag.time = convert_to_datetime(source.time)?; + Ok(tag) + } +} + +impl From<&tracking::Tag> for super::Tag { + fn from(source: &tracking::Tag) -> Self { + Self { + org: source.org(), + name: source.name(), + target: Some((&source.target).into()), + parent: Some((&source.parent).into()), + user: source.user.clone(), + time: Some(convert_from_datetime(&source.time)), + } + } +} + +impl From for super::Error { + fn from(err: Error) -> Self { + let kind = Some(match err { + crate::Error::UnknownObject(digest) => { + super::error::Kind::UnknownObject(super::UnknownObjectError { + message: digest.to_string(), + }) + } + crate::Error::UnknownReference(message) => { + super::error::Kind::UnknownReference(super::UnknownReferenceError { message }) + } + crate::Error::AmbiguousReference(message) => { + super::error::Kind::AmbiguousReference(super::AmbiguousReferenceError { message }) + } + crate::Error::InvalidReference(message) => { + super::error::Kind::InvalidReference(super::InvalidReferenceError { message }) + } + err => super::error::Kind::Other(format!("{:?}", err)), + }); + Self { kind } + } +} + +impl From for Error { + fn from(rpc: super::Error) -> Self { + match rpc.kind { + Some(super::error::Kind::UnknownObject(rpc)) => { + match crate::encoding::Digest::parse(&rpc.message) { + Ok(digest) => crate::Error::UnknownObject(digest), + Err(_) => crate::Error::String( + "Server reported UnknownObject but did not provide a valid digest" + .to_string(), + ), + } + } + Some(super::error::Kind::UnknownReference(rpc)) => { + crate::Error::UnknownReference(rpc.message) + } + Some(super::error::Kind::AmbiguousReference(rpc)) => { + crate::Error::AmbiguousReference(rpc.message) + } + Some(super::error::Kind::InvalidReference(rpc)) => { + crate::Error::InvalidReference(rpc.message) + } + Some(super::error::Kind::Other(message)) => Error::String(message), + None => Error::String("Server did not provide an error message".to_string()), + } + } +} + +impl From<&graph::Object> for super::Object { + fn from(source: &graph::Object) -> Self { + use super::object::Kind; + super::Object { + kind: Some(match source { + graph::Object::Platform(o) => Kind::Platform(o.into()), + graph::Object::Layer(o) => Kind::Layer(o.into()), + graph::Object::Manifest(o) => Kind::Manifest(o.into()), + graph::Object::Tree(o) => Kind::Tree(o.into()), + graph::Object::Blob(o) => Kind::Blob(o.into()), + graph::Object::Mask => Kind::Mask(true), + }), + } + } +} + +impl TryFrom> for graph::Object { + type Error = Error; + fn try_from(source: Option) -> Result { + source + .ok_or_else(|| Error::String("Expected non-null object in rpc message".into()))? + .try_into() + } +} + +impl TryFrom for graph::Object { + type Error = Error; + fn try_from(source: super::Object) -> Result { + use super::object::Kind; + match source.kind { + Some(Kind::Platform(o)) => Ok(graph::Object::Platform(o.try_into()?)), + Some(Kind::Layer(o)) => Ok(graph::Object::Layer(o.try_into()?)), + Some(Kind::Manifest(o)) => Ok(graph::Object::Manifest(o.try_into()?)), + Some(Kind::Tree(o)) => Ok(graph::Object::Tree(o.try_into()?)), + Some(Kind::Blob(o)) => Ok(graph::Object::Blob(o.try_into()?)), + Some(Kind::Mask(_)) => Ok(graph::Object::Mask), + None => Err(Error::String( + "Expected non-empty object kind in rpc message".to_string(), + )), + } + } +} + +impl From<&graph::Platform> for super::Platform { + fn from(source: &graph::Platform) -> Self { + Self { + stack: source.stack.iter().map(Into::into).collect(), + } + } +} + +impl TryFrom for graph::Platform { + type Error = Error; + fn try_from(source: super::Platform) -> Result { + Ok(Self { + stack: source + .stack + .into_iter() + .map(TryInto::try_into) + .collect::>>()?, + }) + } +} + +impl From<&graph::Layer> for super::Layer { + fn from(source: &graph::Layer) -> Self { + Self { + manifest: Some((&source.manifest).into()), + } + } +} + +impl TryFrom for graph::Layer { + type Error = Error; + fn try_from(source: super::Layer) -> Result { + Ok(Self { + manifest: source.manifest.try_into()?, + }) + } +} + +impl From<&graph::Manifest> for super::Manifest { + fn from(source: &graph::Manifest) -> Self { + let mut trees = source.list_trees().into_iter(); + let root = trees.next().map(Into::into); + Self { + root, + trees: trees.map(Into::into).collect(), + } + } +} + +impl TryFrom for graph::Manifest { + type Error = Error; + fn try_from(source: super::Manifest) -> Result { + let mut out = Self::new(source.root.try_into()?); + for tree in source.trees.into_iter() { + out.insert_tree(tree.try_into()?)?; + } + Ok(out) + } +} + +impl From<&graph::Tree> for super::Tree { + fn from(source: &graph::Tree) -> Self { + Self { + entries: source.entries.iter().map(Into::into).collect(), + } + } +} + +impl TryFrom> for graph::Tree { + type Error = Error; + fn try_from(source: Option) -> Result { + source + .ok_or_else(|| Error::String("Expected non-null tree in rpc message".into()))? + .try_into() + } +} + +impl TryFrom for graph::Tree { + type Error = Error; + fn try_from(source: super::Tree) -> Result { + Ok(Self { + entries: source + .entries + .into_iter() + .map(TryInto::try_into) + .collect::>()?, + }) + } +} + +impl From<&graph::Entry> for super::Entry { + fn from(source: &graph::Entry) -> Self { + let kind = match source.kind { + tracking::EntryKind::Tree => super::EntryKind::Tree as i32, + tracking::EntryKind::Blob => super::EntryKind::Blob as i32, + tracking::EntryKind::Mask => super::EntryKind::Mask as i32, + }; + Self { + object: Some((&source.object).into()), + kind, + mode: source.mode, + size: source.size, + name: source.name.clone(), + } + } +} + +impl TryFrom for graph::Entry { + type Error = Error; + fn try_from(source: super::Entry) -> Result { + let kind = match super::EntryKind::from_i32(source.kind) { + Some(super::EntryKind::Tree) => tracking::EntryKind::Tree, + Some(super::EntryKind::Blob) => tracking::EntryKind::Blob, + Some(super::EntryKind::Mask) => tracking::EntryKind::Mask, + None => return Err("Received unknown entry kind in rpm data".into()), + }; + Ok(Self { + object: source.object.try_into()?, + kind, + mode: source.mode, + size: source.size, + name: source.name, + }) + } +} + +impl From<&graph::Blob> for super::Blob { + fn from(source: &graph::Blob) -> Self { + Self { + payload: Some((&source.payload).into()), + size: source.size, + } + } +} + +impl TryFrom for graph::Blob { + type Error = Error; + fn try_from(source: super::Blob) -> Result { + Ok(Self { + payload: source.payload.try_into()?, + size: source.size, + }) + } +} diff --git a/src/proto/defs/database.proto b/src/proto/defs/database.proto new file mode 100644 index 0000000000..00f7e9f25b --- /dev/null +++ b/src/proto/defs/database.proto @@ -0,0 +1,78 @@ +// Copyright (c) 2021 Sony Pictures Imageworks, et al. +// SPDX-License-Identifier: Apache-2.0 +// https://github.com/imageworks/spk +syntax = "proto3"; + +package spfs; + +import "types.proto"; +import "error.proto"; + +message ReadObjectRequest{ + Digest digest = 1; +} +message ReadObjectResponse{ + oneof result { + Error error = 1; + Object ok = 2; + } +} + +message IterDigestsRequest{} +message IterDigestsResponse{ + oneof result { + Error error = 1; + Digest ok = 2; + } +} + +message IterObjectsRequest{} +message IterObjectsResponse{ + oneof result { + Error error = 1; + Object ok = 2; + } +} + +message WalkObjectsRequest{ + Digest root = 1; +} +message WalkObjectsResponse{ + message WalkObjectsItem { + Digest digest = 1; + Object object = 2; + } + oneof result { + Error error = 1; + WalkObjectsItem ok = 2; + } +} + +message WriteObjectRequest{ + Object object = 1; +} +message WriteObjectResponse{ + oneof result { + Error error = 1; + Ok ok = 2; + } +} + +message RemoveObjectRequest{ + Digest digest = 1; +} +message RemoveObjectResponse{ + oneof result { + Error error = 1; + Ok ok = 2; + } +} + +service DatabaseService { + rpc ReadObject(ReadObjectRequest) returns (ReadObjectResponse); + rpc IterDigests(IterDigestsRequest) returns (stream IterDigestsResponse); + rpc IterObjects(IterObjectsRequest) returns (stream IterObjectsResponse); + rpc WalkObjects(WalkObjectsRequest) returns (stream WalkObjectsResponse); + rpc WriteObject(WriteObjectRequest) returns (WriteObjectResponse); + rpc RemoveObject(RemoveObjectRequest) returns (RemoveObjectResponse); +} diff --git a/src/proto/defs/error.proto b/src/proto/defs/error.proto new file mode 100644 index 0000000000..83d5b1ddac --- /dev/null +++ b/src/proto/defs/error.proto @@ -0,0 +1,31 @@ +// Copyright (c) 2021 Sony Pictures Imageworks, et al. +// SPDX-License-Identifier: Apache-2.0 +// https://github.com/imageworks/spk +syntax = "proto3"; + +package spfs; + +message Ok {} + +message UnknownObjectError { + string message = 1; +} +message UnknownReferenceError { + string message = 1; +} +message AmbiguousReferenceError { + string message = 1; +} +message InvalidReferenceError { + string message = 1; +} + +message Error { + oneof kind { + string other = 1; + UnknownObjectError UnknownObject = 2; + UnknownReferenceError UnknownReference = 3; + AmbiguousReferenceError AmbiguousReference = 4; + InvalidReferenceError InvalidReference = 5; + } +} diff --git a/src/proto/defs/payload.proto b/src/proto/defs/payload.proto new file mode 100644 index 0000000000..8f199e5518 --- /dev/null +++ b/src/proto/defs/payload.proto @@ -0,0 +1,74 @@ +// Copyright (c) 2021 Sony Pictures Imageworks, et al. +// SPDX-License-Identifier: Apache-2.0 +// https://github.com/imageworks/spk +syntax = "proto3"; + +package spfs; + +import "error.proto"; +import "types.proto"; +import "database.proto"; + +message HasPayloadRequest{ + Digest digest = 1; +} +message HasPayloadResponse{ + oneof result { + Error error = 1; + bool ok = 2; + } +} + +message WritePayloadRequest{} +message WritePayloadResponse{ + message UploadOption { + string url = 1; + } + // This message is not part of the grpc interface, + // but is the expected response data for http uploads + message UploadResponse { + message UploadResult { + Digest digest = 1; + uint64 size = 2; + } + oneof result { + Error error = 1; + UploadResult ok = 2; + } + } + oneof result { + Error error = 1; + UploadOption ok = 2; + } +} + +message OpenPayloadRequest{ + Digest digest = 1; +} +message OpenPayloadResponse{ + message DownloadOption { + repeated string locations = 1; + } + oneof result { + Error error = 1; + DownloadOption ok = 2; + } +} + +message RemovePayloadRequest{ + Digest digest = 1; +} +message RemovePayloadResponse{ + oneof result { + Error error = 1; + Ok ok = 2; + } +} + +service PayloadService { + rpc IterDigests(IterDigestsRequest) returns (stream IterDigestsResponse); + rpc HasPayload(HasPayloadRequest) returns (HasPayloadResponse); + rpc WritePayload(WritePayloadRequest) returns (WritePayloadResponse); + rpc OpenPayload(OpenPayloadRequest) returns (OpenPayloadResponse); + rpc RemovePayload(RemovePayloadRequest) returns (RemovePayloadResponse); +} diff --git a/src/proto/defs/repository.proto b/src/proto/defs/repository.proto new file mode 100644 index 0000000000..9974f26227 --- /dev/null +++ b/src/proto/defs/repository.proto @@ -0,0 +1,13 @@ +// Copyright (c) 2021 Sony Pictures Imageworks, et al. +// SPDX-License-Identifier: Apache-2.0 +// https://github.com/imageworks/spk +syntax = "proto3"; + +package spfs; + +message PingRequest {} +message PingResponse {} + +service Repository { + rpc Ping(PingRequest) returns (PingResponse); +} diff --git a/src/proto/defs/tag.proto b/src/proto/defs/tag.proto new file mode 100644 index 0000000000..70631fb9a3 --- /dev/null +++ b/src/proto/defs/tag.proto @@ -0,0 +1,119 @@ +// Copyright (c) 2021 Sony Pictures Imageworks, et al. +// SPDX-License-Identifier: Apache-2.0 +// https://github.com/imageworks/spk +syntax = "proto3"; + +package spfs; + +import "types.proto"; +import "error.proto"; + +message Tag { + optional string org = 1; + string name = 2; + Digest target = 3; + Digest parent = 4; + string user = 5; + DateTime time = 6; +} + +message LsTagsRequest { + string path = 1; +} +message LsTagsResponse { + message EntryList { + repeated string entries = 1; + } + oneof result { + Error error = 1; + EntryList ok = 2; + } +} + +message ResolveTagRequest { + string tag_spec = 2; +} +message ResolveTagResponse { + oneof result { + Error error = 1; + Tag ok = 2; + } +} + +message FindTagsRequest { + Digest digest = 1; +} +message FindTagsResponse { + message TagList { + repeated string tags = 1; + } + oneof result { + Error error = 1; + TagList ok = 2; + } +} + +message IterTagSpecsRequest {} +message IterTagSpecsResponse { + message TagSpecList { + repeated string tag_specs = 1; + } + oneof result { + Error error = 1; + TagSpecList ok = 2; + } +} + +message ReadTagRequest { + string tag_spec = 1; +} +message ReadTagResponse { + message TagList { + repeated Tag tags = 1; + } + oneof result { + Error error = 1; + TagList ok = 2; + } +} + +message PushRawTagRequest { + Tag tag = 1; +} +message PushRawTagResponse { + oneof result { + Error error = 1; + Ok ok = 2; + } +} + +message RemoveTagStreamRequest { + string tag_Spec = 1; +} +message RemoveTagStreamResponse { + oneof result { + Error error = 1; + Ok ok = 2; + } +} + +message RemoveTagRequest { + Tag tag = 1; +} +message RemoveTagResponse { + oneof result { + Error error = 1; + Ok ok = 2; + } +} + +service TagService { + rpc LsTags(LsTagsRequest) returns (LsTagsResponse); + rpc ResolveTag(ResolveTagRequest) returns (ResolveTagResponse); + rpc FindTags(FindTagsRequest) returns (FindTagsResponse); + rpc IterTagSpecs(IterTagSpecsRequest) returns (IterTagSpecsResponse); + rpc ReadTag(ReadTagRequest) returns (ReadTagResponse); + rpc PushRawTag(PushRawTagRequest) returns (PushRawTagResponse); + rpc RemoveTagStream(RemoveTagStreamRequest) returns (RemoveTagStreamResponse); + rpc RemoveTag(RemoveTagRequest) returns (RemoveTagResponse); +} diff --git a/src/proto/defs/types.proto b/src/proto/defs/types.proto new file mode 100644 index 0000000000..f87d8e3201 --- /dev/null +++ b/src/proto/defs/types.proto @@ -0,0 +1,61 @@ +// Copyright (c) 2021 Sony Pictures Imageworks, et al. +// SPDX-License-Identifier: Apache-2.0 +// https://github.com/imageworks/spk +syntax = "proto3"; + +package spfs; + +message Digest { + bytes bytes = 1; +} + +message DateTime { + string iso_timestamp = 1; +} + +message Object { + oneof kind { + Platform platform = 1; + Layer layer = 2; + Manifest manifest = 3; + Tree tree = 4; + Blob blob = 5; + bool mask = 6; + } +} + +message Platform { + repeated Digest stack = 1; +} + +message Layer { + Digest manifest = 1; +} + +message Manifest { + Tree root = 1; + repeated Tree trees = 2; +} + +message Tree { + repeated Entry entries = 1; +} + +message Entry { + Digest object = 1; + EntryKind kind = 2; + uint32 mode = 3; + uint64 size = 4; + string name = 5; +} + +enum EntryKind { + TREE = 0; + BLOB = 1; + MASK = 2; +} + +message Blob { + Digest payload = 1; + uint64 size = 2; +} diff --git a/src/proto/mod.rs b/src/proto/mod.rs new file mode 100644 index 0000000000..448a1ce3cc --- /dev/null +++ b/src/proto/mod.rs @@ -0,0 +1,14 @@ +// Copyright (c) 2021 Sony Pictures Imageworks, et al. +// SPDX-License-Identifier: Apache-2.0 +// https://github.com/imageworks/spk +//! Protocol Buffer message formats and conversions. + +mod conversions; +mod result; +mod generated { + tonic::include_proto!("spfs"); +} + +pub use conversions::*; +pub use generated::*; +pub(crate) use result::{handle_error, RpcResult}; diff --git a/src/proto/result.rs b/src/proto/result.rs new file mode 100644 index 0000000000..a17f8e6e49 --- /dev/null +++ b/src/proto/result.rs @@ -0,0 +1,142 @@ +// Copyright (c) 2021 Sony Pictures Imageworks, et al. +// SPDX-License-Identifier: Apache-2.0 +// https://github.com/imageworks/spk + +pub(crate) trait RpcResult: Sized { + type Ok; + type Result; + fn error(err: crate::Error) -> Self; + fn ok(data: Self::Ok) -> Self; + fn to_result(self) -> crate::Result; + fn from_result>(result: crate::Result) -> Self { + match result { + Err(err) => Self::error(err), + Ok(ok) => Self::ok(ok.into()), + } + } +} + +macro_rules! handle_error { + ($result:expr) => { + match $result { + Err(err) => return Ok(tonic::Response::new(RpcResult::error(err))), + Ok(data) => data, + } + }; +} + +pub(crate) use handle_error; + +macro_rules! rpc_result { + ($typename:ty, $result:ty) => { + rpc_result!($typename, $result, super::Ok); + }; + ($typename:ty, $result:ty, $ok_type:ty) => { + impl RpcResult for $typename { + type Ok = $ok_type; + type Result = $result; + fn error(err: crate::Error) -> Self { + let result = Some(Self::Result::Error(err.into())); + Self { result } + } + fn ok(data: Self::Ok) -> Self { + let result = Some(Self::Result::Ok(data)); + Self { result } + } + fn to_result(self) -> crate::Result { + match self.result { + Some(Self::Result::Error(err)) => Err(err.into()), + Some(Self::Result::Ok(data)) => Ok(data), + None => Err(crate::Error::String(format!( + "Unexpected empty result from the server" + ))), + } + } + } + }; +} + +use super::generated as gen; + +rpc_result!( + gen::LsTagsResponse, + gen::ls_tags_response::Result, + gen::ls_tags_response::EntryList +); +rpc_result!( + gen::ResolveTagResponse, + gen::resolve_tag_response::Result, + gen::Tag +); +rpc_result!( + gen::FindTagsResponse, + gen::find_tags_response::Result, + gen::find_tags_response::TagList +); +rpc_result!( + gen::IterTagSpecsResponse, + gen::iter_tag_specs_response::Result, + gen::iter_tag_specs_response::TagSpecList +); +rpc_result!( + gen::ReadTagResponse, + gen::read_tag_response::Result, + gen::read_tag_response::TagList +); +rpc_result!(gen::PushRawTagResponse, gen::push_raw_tag_response::Result); +rpc_result!( + gen::RemoveTagStreamResponse, + gen::remove_tag_stream_response::Result +); +rpc_result!(gen::RemoveTagResponse, gen::remove_tag_response::Result); + +rpc_result!( + gen::ReadObjectResponse, + gen::read_object_response::Result, + gen::Object +); +rpc_result!( + gen::IterDigestsResponse, + gen::iter_digests_response::Result, + gen::Digest +); +rpc_result!( + gen::IterObjectsResponse, + gen::iter_objects_response::Result, + gen::Object +); +rpc_result!( + gen::WalkObjectsResponse, + gen::walk_objects_response::Result, + gen::walk_objects_response::WalkObjectsItem +); +rpc_result!(gen::WriteObjectResponse, gen::write_object_response::Result); +rpc_result!( + gen::RemoveObjectResponse, + gen::remove_object_response::Result +); + +rpc_result!( + gen::HasPayloadResponse, + gen::has_payload_response::Result, + bool +); +rpc_result!( + gen::WritePayloadResponse, + gen::write_payload_response::Result, + gen::write_payload_response::UploadOption +); +rpc_result!( + gen::OpenPayloadResponse, + gen::open_payload_response::Result, + gen::open_payload_response::DownloadOption +); +rpc_result!( + gen::RemovePayloadResponse, + gen::remove_payload_response::Result +); +rpc_result!( + gen::write_payload_response::UploadResponse, + gen::write_payload_response::upload_response::Result, + gen::write_payload_response::upload_response::UploadResult +); diff --git a/src/prune_test.rs b/src/prune_test.rs index b05641d131..b21e70db4f 100644 --- a/src/prune_test.rs +++ b/src/prune_test.rs @@ -10,12 +10,12 @@ use super::{get_prunable_tags, prune_tags, PruneParameters}; use crate::{encoding, storage, tracking, Error}; use std::collections::HashMap; -fixtures!(); +use crate::fixtures::*; #[rstest] #[tokio::test] async fn test_prunable_tags_age(#[future] tmprepo: TempRepo) { - let (_td, tmprepo) = tmprepo.await; + let tmprepo = tmprepo.await; let mut old = tracking::Tag::new( Some("testing".to_string()), "prune", @@ -65,7 +65,7 @@ async fn test_prunable_tags_age(#[future] tmprepo: TempRepo) { #[rstest] #[tokio::test] async fn test_prunable_tags_version(#[future] tmprepo: TempRepo) { - let (_td, tmprepo) = tmprepo.await; + let tmprepo = tmprepo.await; let tag = tracking::TagSpec::parse("testing/versioned").unwrap(); let tag5 = tmprepo .push_tag(&tag, &encoding::EMPTY_DIGEST.into()) @@ -133,7 +133,7 @@ async fn test_prunable_tags_version(#[future] tmprepo: TempRepo) { #[tokio::test] async fn test_prune_tags(#[future] tmprepo: TempRepo) { init_logging(); - let (_td, tmprepo) = tmprepo.await; + let tmprepo = tmprepo.await; let tag = tracking::TagSpec::parse("test/prune").unwrap(); async fn reset(tmprepo: &storage::RepositoryHandle) -> HashMap { diff --git a/src/resolve_test.rs b/src/resolve_test.rs index 3c352e9d62..94948081e7 100644 --- a/src/resolve_test.rs +++ b/src/resolve_test.rs @@ -6,12 +6,12 @@ use rstest::rstest; use super::resolve_stack_to_layers; use crate::{encoding, graph, prelude::*}; -fixtures!(); +use crate::fixtures::*; #[rstest] #[tokio::test] async fn test_stack_to_layers_dedupe(#[future] tmprepo: TempRepo) { - let (_dir, repo) = tmprepo.await; + let repo = tmprepo.await; let layer = graph::Layer::new(encoding::EMPTY_DIGEST.into()); let platform = graph::Platform::new(vec![layer.clone(), layer.clone()].into_iter()).unwrap(); let stack = vec![layer.digest().unwrap(), platform.digest().unwrap()]; diff --git a/src/runtime/storage_test.rs b/src/runtime/storage_test.rs index ddbdb6b682..893d610299 100644 --- a/src/runtime/storage_test.rs +++ b/src/runtime/storage_test.rs @@ -10,7 +10,7 @@ use rstest::rstest; use super::{ensure_runtime, makedirs_with_perms, Config, Runtime, Storage}; use crate::encoding; -fixtures!(); +use crate::fixtures::*; #[rstest] fn test_config_serialization() { diff --git a/src/server/database.rs b/src/server/database.rs new file mode 100644 index 0000000000..f48bc8db89 --- /dev/null +++ b/src/server/database.rs @@ -0,0 +1,100 @@ +// Copyright (c) 2021 Sony Pictures Imageworks, et al. +// SPDX-License-Identifier: Apache-2.0 +// https://github.com/imageworks/spk +use std::convert::TryInto; +use std::pin::Pin; +use std::sync::Arc; + +use futures::{Stream, StreamExt}; +use tonic::{Request, Response, Status}; + +use crate::proto::{self, database_service_server::DatabaseServiceServer, RpcResult}; +use crate::storage; + +#[derive(Debug, Clone)] +pub struct DatabaseService { + repo: Arc, +} + +#[tonic::async_trait] +impl proto::database_service_server::DatabaseService for DatabaseService { + type IterDigestsStream = + Pin> + Send>>; + type IterObjectsStream = + tokio_stream::Iter>>; + type WalkObjectsStream = + tokio_stream::Iter>>; + + async fn read_object( + &self, + request: Request, + ) -> Result, Status> { + let request = request.into_inner(); + let digest = proto::handle_error!(request.digest.try_into()); + let object = { proto::handle_error!(self.repo.read_object(digest).await) }; + let result = proto::ReadObjectResponse::ok((&object).into()); + Ok(Response::new(result)) + } + + async fn iter_digests( + &self, + _request: Request, + ) -> Result, Status> { + let stream = self + .repo + .iter_digests() + .map(proto::IterDigestsResponse::from_result) + .map(Ok); + let stream: Self::IterDigestsStream = Box::pin(stream); + let response = Response::new(stream); + Ok(response) + } + + async fn iter_objects( + &self, + _request: Request, + ) -> Result, Status> { + Err(Status::unimplemented( + "object iteration is no yet supported directly over gRPC", + )) + } + + async fn walk_objects( + &self, + _request: Request, + ) -> Result, Status> { + Err(Status::unimplemented( + "object walking is no yet supported directly over gRPC", + )) + } + + async fn write_object( + &self, + request: Request, + ) -> Result, Status> { + let request = request.into_inner(); + let object = proto::handle_error!(request.object.try_into()); + { + proto::handle_error!(self.repo.write_object(&object).await) + }; + let result = proto::WriteObjectResponse::ok(proto::Ok {}); + Ok(Response::new(result)) + } + + async fn remove_object( + &self, + request: Request, + ) -> Result, Status> { + let request = request.into_inner(); + let digest: crate::encoding::Digest = proto::handle_error!(request.digest.try_into()); + proto::handle_error!(self.repo.remove_object(digest).await); + let result = proto::RemoveObjectResponse::ok(proto::Ok {}); + Ok(Response::new(result)) + } +} + +impl DatabaseService { + pub fn new_srv(repo: Arc) -> DatabaseServiceServer { + DatabaseServiceServer::new(Self { repo }) + } +} diff --git a/src/server/mod.rs b/src/server/mod.rs new file mode 100644 index 0000000000..7a74e3c8fb --- /dev/null +++ b/src/server/mod.rs @@ -0,0 +1,13 @@ +// Copyright (c) 2021 Sony Pictures Imageworks, et al. +// SPDX-License-Identifier: Apache-2.0 +// https://github.com/imageworks/spk +//! Remote rpc server implementation of the spfs repository +mod database; +mod payload; +mod repository; +mod tag; + +pub use database::DatabaseService; +pub use payload::PayloadService; +pub use repository::Repository; +pub use tag::TagService; diff --git a/src/server/payload.rs b/src/server/payload.rs new file mode 100644 index 0000000000..d77d88a782 --- /dev/null +++ b/src/server/payload.rs @@ -0,0 +1,190 @@ +// Copyright (c) 2021 Sony Pictures Imageworks, et al. +// SPDX-License-Identifier: Apache-2.0 +// https://github.com/imageworks/spk +use std::convert::TryInto; +use std::pin::Pin; +use std::sync::Arc; + +use futures::{Stream, StreamExt}; +use prost::Message; +use tonic::{Request, Response, Status}; + +use crate::proto::{self, payload_service_server::PayloadServiceServer, RpcResult}; +use crate::storage; + +/// The payload service is both a gRPC service AND an http server +/// +/// The grpc portion handles payload-related requests as expected, +/// but defers actual upload and download of file data to the http +/// server. This handoff is required because gRPC is really inefficient +/// at large file transfers. It is also a useful way to allow for +/// partitioning and/or migration of the underlying file storage in +/// the future +#[derive(Debug, Clone)] +pub struct PayloadService { + repo: Arc, + external_root: url::Url, +} + +#[tonic::async_trait] +impl proto::payload_service_server::PayloadService for PayloadService { + type IterDigestsStream = + Pin> + Send>>; + + async fn iter_digests( + &self, + _request: Request, + ) -> Result, Status> { + let stream = self + .repo + .iter_payload_digests() + .map(proto::IterDigestsResponse::from_result) + .map(Ok); + let stream: Self::IterDigestsStream = Box::pin(stream); + let response = Response::new(stream); + Ok(response) + } + + async fn write_payload( + &self, + _request: Request, + ) -> Result, Status> { + let data = proto::write_payload_response::UploadOption { + url: self.external_root.to_string(), + }; + let result = proto::WritePayloadResponse::ok(data); + Ok(Response::new(result)) + } + + async fn has_payload( + &self, + request: Request, + ) -> Result, Status> { + let request = request.into_inner(); + let digest: crate::encoding::Digest = proto::handle_error!(request.digest.try_into()); + let exists = self.repo.has_payload(digest).await; + let result = proto::HasPayloadResponse::ok(exists); + Ok(Response::new(result)) + } + + async fn open_payload( + &self, + request: Request, + ) -> Result, Status> { + let request = request.into_inner(); + let digest: crate::encoding::Digest = proto::handle_error!(request.digest.try_into()); + let mut option = proto::open_payload_response::DownloadOption::default(); + let mut self_download = self.external_root.clone(); + if let Ok(mut p) = self_download.path_segments_mut() { + p.push(&digest.to_string()); + } + option.locations.push(self_download.into()); + let result = proto::OpenPayloadResponse::ok(option); + Ok(Response::new(result)) + } + + async fn remove_payload( + &self, + request: Request, + ) -> Result, Status> { + let request = request.into_inner(); + let digest: crate::encoding::Digest = proto::handle_error!(request.digest.try_into()); + proto::handle_error!(self.repo.remove_payload(digest).await); + let result = proto::RemovePayloadResponse::ok(proto::Ok {}); + Ok(Response::new(result)) + } +} + +impl hyper::service::Service> for PayloadService { + type Response = hyper::http::Response; + type Error = crate::Error; + type Future = + std::pin::Pin> + Send>>; + + fn poll_ready( + &mut self, + _cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + std::task::Poll::Ready(Ok(())) + } + + fn call(&mut self, req: hyper::http::Request) -> Self::Future { + match *req.method() { + hyper::Method::POST => Box::pin(handle_upload(self.repo.clone(), req.into_body())), + hyper::Method::GET => Box::pin(handle_download( + self.repo.clone(), + req.uri().path().trim_start_matches('/').to_string(), + )), + _ => Box::pin(futures::future::ready( + hyper::Response::builder() + .status(hyper::http::StatusCode::METHOD_NOT_ALLOWED) + .body(hyper::Body::empty()) + .map_err(|e| crate::Error::String(e.to_string())), + )), + } + } +} + +impl PayloadService { + pub fn new(repo: Arc, external_root: url::Url) -> Self { + Self { + repo, + external_root, + } + } + + pub fn new_srv( + repo: Arc, + external_root: url::Url, + ) -> PayloadServiceServer { + Self::new(repo, external_root).into_srv() + } + + pub fn into_srv(self) -> PayloadServiceServer { + PayloadServiceServer::new(self) + } +} + +async fn handle_upload( + repo: Arc, + body: hyper::Body, +) -> crate::Result> { + let reader = body_to_reader(body); + let (digest, size) = repo.write_data(reader).await.map_err(|err| { + crate::Error::String(format!( + "An error occurred while spwaning a thread for this operation: {:?}", + err + )) + })?; + let result = crate::proto::write_payload_response::UploadResponse::ok( + crate::proto::write_payload_response::upload_response::UploadResult { + digest: Some(digest.into()), + size, + }, + ); + let bytes = result.encode_to_vec(); + hyper::Response::builder() + .status(hyper::http::StatusCode::OK) + .body(bytes.into()) + .map_err(|e| crate::Error::String(e.to_string())) +} + +fn body_to_reader(body: hyper::Body) -> Pin> { + Box::pin(tokio_util::io::StreamReader::new(body.map(|chunk| { + chunk.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)) + }))) +} + +async fn handle_download( + repo: Arc, + relative_path: String, +) -> crate::Result> { + let digest = crate::encoding::Digest::parse(&relative_path)?; + let reader = repo.open_payload(digest).await?; + hyper::Response::builder() + .status(hyper::http::StatusCode::OK) + .body(hyper::Body::wrap_stream(tokio_util::io::ReaderStream::new( + reader, + ))) + .map_err(|e| crate::Error::String(e.to_string())) +} diff --git a/src/server/repository.rs b/src/server/repository.rs new file mode 100644 index 0000000000..1c3e825474 --- /dev/null +++ b/src/server/repository.rs @@ -0,0 +1,28 @@ +// Copyright (c) 2021 Sony Pictures Imageworks, et al. +// SPDX-License-Identifier: Apache-2.0 +// https://github.com/imageworks/spk + +use tonic::{Request, Response, Status}; + +use crate::proto; +use proto::repository_server::RepositoryServer; + +#[derive(Debug, Clone)] +pub struct Repository {} + +#[tonic::async_trait] +impl proto::repository_server::Repository for Repository { + async fn ping( + &self, + _request: Request, + ) -> std::result::Result, Status> { + let data = proto::PingResponse::default(); + Ok(Response::new(data)) + } +} + +impl Repository { + pub fn new_srv() -> RepositoryServer { + RepositoryServer::new(Self {}) + } +} diff --git a/src/server/tag.rs b/src/server/tag.rs new file mode 100644 index 0000000000..ea989c3262 --- /dev/null +++ b/src/server/tag.rs @@ -0,0 +1,129 @@ +// Copyright (c) 2021 Sony Pictures Imageworks, et al. +// SPDX-License-Identifier: Apache-2.0 +// https://github.com/imageworks/spk +use std::convert::TryInto; +use std::sync::Arc; + +use futures::TryStreamExt; +use tokio_stream::StreamExt; +use tonic::{Request, Response, Status}; + +use crate::proto::{self, tag_service_server::TagServiceServer, RpcResult}; +use crate::storage; + +#[derive(Debug, Clone)] +pub struct TagService { + repo: Arc, +} + +#[tonic::async_trait] +impl proto::tag_service_server::TagService for TagService { + async fn ls_tags( + &self, + request: Request, + ) -> std::result::Result, Status> { + tracing::trace!("recieve request"); + let request = request.into_inner(); + let path = relative_path::RelativePath::new(&request.path); + let entries: crate::Result> = { self.repo.ls_tags(path).collect().await }; + let entries = proto::handle_error!(entries); + + let data = proto::LsTagsResponse::ok(proto::ls_tags_response::EntryList { entries }); + Ok(Response::new(data)) + } + + async fn resolve_tag( + &self, + request: tonic::Request, + ) -> Result, tonic::Status> { + let request = request.into_inner(); + let tag_spec = proto::handle_error!(request.tag_spec.parse()); + let tag = proto::handle_error!(self.repo.resolve_tag(&tag_spec).await); + let data = proto::ResolveTagResponse::ok((&tag).into()); + Ok(Response::new(data)) + } + + async fn find_tags( + &self, + request: tonic::Request, + ) -> Result, tonic::Status> { + let request = request.into_inner(); + let digest = proto::handle_error!(request.digest.try_into()); + let mut results = self.repo.find_tags(&digest); + let mut tags = Vec::new(); + while let Some(item) = results.next().await { + let item = proto::handle_error!(item); + tags.push(item.to_string()); + } + let data = proto::FindTagsResponse::ok(proto::find_tags_response::TagList { tags }); + Ok(Response::new(data)) + } + + async fn iter_tag_specs( + &self, + _request: tonic::Request, + ) -> Result, tonic::Status> { + let mut streams = self.repo.iter_tags(); + let mut tag_specs = Vec::new(); + while let Some(item) = streams.next().await { + let item = proto::handle_error!(item); + tag_specs.push(item.0.to_string()); + } + let data = proto::IterTagSpecsResponse::ok(proto::iter_tag_specs_response::TagSpecList { + tag_specs, + }); + Ok(Response::new(data)) + } + + async fn read_tag( + &self, + request: tonic::Request, + ) -> Result, tonic::Status> { + let request = request.into_inner(); + let tag_spec = proto::handle_error!(request.tag_spec.parse()); + let stream = proto::handle_error!(self.repo.read_tag(&tag_spec).await); + let tags: crate::Result> = stream.map_ok(|t| (&t).into()).collect().await; + let tags = proto::handle_error!(tags); + let data = proto::ReadTagResponse::ok(proto::read_tag_response::TagList { tags }); + Ok(Response::new(data)) + } + + async fn push_raw_tag( + &self, + request: tonic::Request, + ) -> Result, tonic::Status> { + let request = request.into_inner(); + let tag = proto::handle_error!(request.tag.try_into()); + proto::handle_error!(self.repo.push_raw_tag(&tag).await); + let data = proto::PushRawTagResponse::ok(proto::Ok {}); + Ok(Response::new(data)) + } + + async fn remove_tag_stream( + &self, + request: tonic::Request, + ) -> Result, tonic::Status> { + let request = request.into_inner(); + let tag_spec = proto::handle_error!(request.tag_spec.parse()); + proto::handle_error!(self.repo.remove_tag_stream(&tag_spec).await); + let data = proto::RemoveTagStreamResponse::ok(proto::Ok {}); + Ok(Response::new(data)) + } + + async fn remove_tag( + &self, + request: tonic::Request, + ) -> Result, tonic::Status> { + let request = request.into_inner(); + let tag = proto::handle_error!(request.tag.try_into()); + proto::handle_error!(self.repo.remove_tag(&tag).await); + let data = proto::RemoveTagResponse::ok(proto::Ok {}); + Ok(Response::new(data)) + } +} + +impl TagService { + pub fn new_srv(repo: Arc) -> TagServiceServer { + TagServiceServer::new(Self { repo }) + } +} diff --git a/src/storage/database_test.rs b/src/storage/database_test.rs new file mode 100644 index 0000000000..db1679a7f4 --- /dev/null +++ b/src/storage/database_test.rs @@ -0,0 +1,33 @@ +// Copyright (c) 2021 Sony Pictures Imageworks, et al. +// SPDX-License-Identifier: Apache-2.0 +// https://github.com/imageworks/spk +use rstest::rstest; + +use crate::{encoding, graph}; + +use crate::fixtures::*; + +#[rstest( + tmprepo, + case::fs(tmprepo("fs")), + case::tar(tmprepo("tar")), + case::rpc(tmprepo("rpc")) +)] +#[tokio::test] +async fn test_object_existance(#[future] tmprepo: TempRepo) { + let tmprepo = tmprepo.await; + let digest = encoding::EMPTY_DIGEST.into(); + let obj = graph::Blob::new(digest, 0).into(); + tmprepo + .write_object(&obj) + .await + .expect("failed to write object data"); + + let actual = tmprepo.has_object(digest).await; + assert!(actual); + + tmprepo.remove_object(digest).await.unwrap(); + + let actual = tmprepo.has_object(digest).await; + assert!(!actual, "object should not exist after being removed"); +} diff --git a/src/storage/fs/hash_store.rs b/src/storage/fs/hash_store.rs index 1fc9d4b72e..daba3af9cc 100644 --- a/src/storage/fs/hash_store.rs +++ b/src/storage/fs/hash_store.rs @@ -66,7 +66,7 @@ impl FSHashStore { /// Write all data in the given reader to a file in this storage pub async fn write_data( &self, - mut reader: Pin>, + mut reader: Pin>, ) -> Result<(encoding::Digest, u64)> { let uuid = uuid::Uuid::new_v4().to_string(); let working_file = self.workdir().join(uuid); diff --git a/src/storage/fs/hash_store_test.rs b/src/storage/fs/hash_store_test.rs index 9f70edc2f8..85277fbe43 100644 --- a/src/storage/fs/hash_store_test.rs +++ b/src/storage/fs/hash_store_test.rs @@ -5,7 +5,7 @@ use rstest::rstest; use tokio_stream::StreamExt; -fixtures!(); +use crate::fixtures::*; #[rstest] #[tokio::test] diff --git a/src/storage/fs/payloads.rs b/src/storage/fs/payloads.rs index adc4813a07..f9ba8710ac 100644 --- a/src/storage/fs/payloads.rs +++ b/src/storage/fs/payloads.rs @@ -11,13 +11,13 @@ use crate::{encoding, Error, Result}; #[async_trait::async_trait] impl crate::storage::PayloadStorage for FSRepository { - fn iter_payload_digests(&self) -> Pin>>> { + fn iter_payload_digests(&self) -> Pin> + Send>> { Box::pin(self.payloads.iter()) } async fn write_data( &self, - reader: Pin>, + reader: Pin>, ) -> Result<(encoding::Digest, u64)> { self.payloads.write_data(reader).await } @@ -25,7 +25,7 @@ impl crate::storage::PayloadStorage for FSRepository { async fn open_payload( &self, digest: encoding::Digest, - ) -> Result>> { + ) -> Result>> { let path = self.payloads.build_digest_path(&digest); match tokio::fs::File::open(&path).await { Ok(file) => Ok(Box::pin(file)), diff --git a/src/storage/fs/renderer_test.rs b/src/storage/fs/renderer_test.rs index 3d115137f5..f31e7bb632 100644 --- a/src/storage/fs/renderer_test.rs +++ b/src/storage/fs/renderer_test.rs @@ -10,7 +10,7 @@ use crate::graph::Manifest; use crate::storage::{fs::FSRepository, ManifestViewer, PayloadStorage, Repository}; use crate::tracking; -fixtures!(); +use crate::fixtures::*; #[rstest] #[tokio::test] diff --git a/src/storage/fs/tag.rs b/src/storage/fs/tag.rs index 292dac4b3c..e7fd3b12d4 100644 --- a/src/storage/fs/tag.rs +++ b/src/storage/fs/tag.rs @@ -28,10 +28,6 @@ use crate::{ }; use encoding::{Decodable, Encodable}; -#[cfg(test)] -#[path = "./tag_test.rs"] -mod tag_test; - const TAG_EXT: &str = "tag"; impl FSRepository { diff --git a/src/storage/fs/tag_test.rs b/src/storage/fs/tag_test.rs index 5c57959545..b7b87dd6de 100644 --- a/src/storage/fs/tag_test.rs +++ b/src/storage/fs/tag_test.rs @@ -1,108 +1,21 @@ // Copyright (c) 2021 Sony Pictures Imageworks, et al. // SPDX-License-Identifier: Apache-2.0 // https://github.com/imageworks/spk - use std::os::unix::fs::MetadataExt; use rstest::rstest; -use tokio_stream::StreamExt; - -use crate::storage::{fs::FSRepository, TagStorage}; -use crate::{encoding, tracking, Result}; -use relative_path::RelativePathBuf; - -fixtures!(); - -#[rstest] -#[tokio::test] -async fn test_tag_stream(tmpdir: tempdir::TempDir) { - init_logging(); - - let storage = FSRepository::create(tmpdir.path()) - .await - .expect("failed to create repo"); - - let digest1 = encoding::Hasher::default().digest(); - let mut h = encoding::Hasher::default(); - h.update(b"hello"); - let digest2 = h.digest(); - - let base = crate::tracking::TagSpec::parse("hello/world").unwrap(); - let tag1 = storage - .push_tag(&base, &digest1) - .await - .expect("failed to push tag"); - assert_eq!(storage.resolve_tag(&base).await.unwrap(), tag1); - assert_eq!( - storage.resolve_tag(&base.with_version(0)).await.unwrap(), - tag1 - ); - - let tag2 = storage - .push_tag(&base, &digest2) - .await - .expect("failed to push tag"); - let _tag3 = storage - .push_tag(&base, &digest2) - .await - .expect("failed to push tag"); - assert_eq!(storage.resolve_tag(&base).await.unwrap(), tag2); - assert_eq!( - storage.resolve_tag(&base.with_version(0)).await.unwrap(), - tag2 - ); - assert_eq!( - storage.resolve_tag(&base.with_version(1)).await.unwrap(), - tag1 - ); - let found: crate::Result> = storage.find_tags(&digest2).collect().await; - assert_eq!(found.unwrap(), vec![base.clone()]); - let found: crate::Result> = storage.find_tags(&digest1).collect().await; - assert_eq!(found.unwrap(), vec![base.with_version(1)]); -} - -#[rstest] -#[tokio::test] -async fn test_tag_no_duplication(tmpdir: tempdir::TempDir) { - init_logging(); - - let storage = FSRepository::create(tmpdir.path().join("tags")) - .await - .unwrap(); - let spec = tracking::TagSpec::parse("hello").unwrap(); - let tag1 = storage - .push_tag(&spec, &encoding::EMPTY_DIGEST.into()) - .await - .unwrap(); - let tag2 = storage - .push_tag(&spec, &encoding::EMPTY_DIGEST.into()) - .await - .unwrap(); - assert_eq!(tag1, tag2); +use super::FSRepository; +use crate::{encoding, storage::TagStorage, tracking}; - assert_eq!( - storage - .read_tag(&spec) - .await - .unwrap() - // there's no count() for streams - .fold(0, |c, _| c + 1) - .await, - 1 - ); -} +use crate::fixtures::*; #[rstest] -#[tokio::test] -async fn test_tag_permissions(tmpdir: tempdir::TempDir) { - let storage = FSRepository::create(tmpdir.path().join("repo")) - .await - .unwrap(); +fn test_tag_permissions(tmpdir: tempdir::TempDir) { + let mut storage = FSRepository::create(tmpdir.path().join("repo")).unwrap(); let spec = tracking::TagSpec::parse("hello").unwrap(); storage .push_tag(&spec, &encoding::EMPTY_DIGEST.into()) - .await .unwrap(); assert_eq!( tmpdir @@ -115,106 +28,3 @@ async fn test_tag_permissions(tmpdir: tempdir::TempDir) { 0o777 ); } - -#[rstest] -#[tokio::test] -async fn test_ls_tags(tmpdir: tempdir::TempDir) { - init_logging(); - - let storage = FSRepository::create(tmpdir.path().join("tags")) - .await - .unwrap(); - for tag in &[ - "spi/stable/my_tag", - "spi/stable/other_tag", - "spi/stable", - "spi/latest/my_tag", - ] { - let spec = tracking::TagSpec::parse(tag).unwrap(); - storage - .push_tag(&spec, &encoding::EMPTY_DIGEST.into()) - .await - .unwrap(); - } - - let mut tags: Vec<_> = storage - .ls_tags(&RelativePathBuf::from("/")) - .collect::>>() - .await - .unwrap(); - assert_eq!(tags, vec!["spi/".to_string()]); - tags = storage - .ls_tags(&RelativePathBuf::from("/spi")) - .collect::>>() - .await - .unwrap(); - tags.sort(); - assert_eq!( - tags, - vec![ - "latest/".to_string(), - "stable".to_string(), - "stable/".to_string() - ] - ); - tags = storage - .ls_tags(&RelativePathBuf::from("spi/stable")) - .collect::>>() - .await - .unwrap(); - tags.sort(); - assert_eq!(tags, vec!["my_tag".to_string(), "other_tag".to_string()]); -} - -#[rstest] -#[tokio::test] -async fn test_rm_tags(tmpdir: tempdir::TempDir) { - init_logging(); - - let storage = FSRepository::create(tmpdir.path().join("tags")) - .await - .unwrap(); - for tag in &[ - "spi/stable/my_tag", - "spi/stable/other_tag", - "spi/latest/my_tag", - ] { - let spec = tracking::TagSpec::parse(tag).unwrap(); - storage - .push_tag(&spec, &encoding::EMPTY_DIGEST.into()) - .await - .unwrap(); - } - - let mut tags: Vec<_> = storage - .ls_tags(&RelativePathBuf::from("/spi")) - .collect::>>() - .await - .unwrap(); - tags.sort(); - assert_eq!(tags, vec!["latest/", "stable/"]); - storage - .remove_tag_stream(&tracking::TagSpec::parse("spi/stable/my_tag").unwrap()) - .await - .unwrap(); - tags = storage - .ls_tags(&RelativePathBuf::from("spi/stable")) - .collect::>>() - .await - .unwrap(); - assert_eq!(tags, vec!["other_tag"]); - storage - .remove_tag_stream(&tracking::TagSpec::parse("spi/stable/other_tag").unwrap()) - .await - .unwrap(); - tags = storage - .ls_tags(&RelativePathBuf::from("spi")) - .collect::>>() - .await - .unwrap(); - assert_eq!( - tags, - vec!["latest/"], - "should remove empty tag folders during cleanup" - ); -} diff --git a/src/storage/manifest_test.rs b/src/storage/manifest_test.rs index 1bc74e8720..eb9dd5fb8f 100644 --- a/src/storage/manifest_test.rs +++ b/src/storage/manifest_test.rs @@ -5,18 +5,21 @@ use rstest::rstest; use tokio_stream::StreamExt; -use crate::graph::{Database, DatabaseView, Manifest}; -use crate::storage::{fs::FSRepository, ManifestStorage}; +use crate::graph::Manifest; use crate::{encoding::Encodable, tracking}; -fixtures!(); +use crate::fixtures::*; -#[rstest] +#[rstest( + repo, + case::fs(tmprepo("fs")), + case::tar(tmprepo("tar")), + case::rpc(tmprepo("rpc")) +)] #[tokio::test] -async fn test_read_write_manifest(tmpdir: tempdir::TempDir) { +async fn test_read_write_manifest(#[future] repo: TempRepo, tmpdir: tempdir::TempDir) { let dir = tmpdir.path(); - let repo = FSRepository::create(dir.join("repo")).await.unwrap(); - + let repo = repo.await; std::fs::File::create(dir.join("file.txt")).unwrap(); let manifest = Manifest::from(&tracking::compute_manifest(&dir).await.unwrap()); let expected = manifest.digest().unwrap(); @@ -33,26 +36,28 @@ async fn test_read_write_manifest(tmpdir: tempdir::TempDir) { assert!(digests.contains(&expected)); } -#[rstest] +#[rstest( + repo, + case::fs(tmprepo("fs")), + case::tar(tmprepo("tar")), + case::rpc(tmprepo("rpc")) +)] #[tokio::test] -async fn test_manifest_parity(tmpdir: tempdir::TempDir) { +async fn test_manifest_parity(#[future] repo: TempRepo, tmpdir: tempdir::TempDir) { init_logging(); let dir = tmpdir.path(); - let storage = FSRepository::create(dir.join("storage")) - .await - .expect("failed to make repo"); + let repo = repo.await; std::fs::create_dir(dir.join("dir")).unwrap(); std::fs::write(dir.join("dir/file.txt"), "").unwrap(); let expected = tracking::compute_manifest(&dir).await.unwrap(); let storable = Manifest::from(&expected); let digest = storable.digest().unwrap(); - storage - .write_object(&storable.into()) + repo.write_object(&storable.into()) .await .expect("failed to store manifest object"); - let out = storage + let out = repo .read_manifest(digest) .await .expect("stored manifest was not written"); diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 68a3f328c6..9b539bc1d3 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -12,6 +12,7 @@ mod tag; pub mod fs; pub mod prelude; +pub mod rpc; pub mod tar; pub use blob::BlobStorage; @@ -23,9 +24,11 @@ pub use repository::Repository; pub use tag::TagStorage; #[derive(Debug)] +#[allow(clippy::large_enum_variant)] pub enum RepositoryHandle { FS(fs::FSRepository), Tar(tar::TarRepository), + Rpc(rpc::RpcRepository), } impl RepositoryHandle { @@ -33,6 +36,7 @@ impl RepositoryHandle { match self { Self::FS(repo) => Box::new(repo), Self::Tar(repo) => Box::new(repo), + Self::Rpc(repo) => Box::new(repo), } } } @@ -44,6 +48,7 @@ impl std::ops::Deref for RepositoryHandle { match self { RepositoryHandle::FS(repo) => repo, RepositoryHandle::Tar(repo) => repo, + RepositoryHandle::Rpc(repo) => repo, } } } @@ -53,6 +58,7 @@ impl std::ops::DerefMut for RepositoryHandle { match self { RepositoryHandle::FS(repo) => repo, RepositoryHandle::Tar(repo) => repo, + RepositoryHandle::Rpc(repo) => repo, } } } @@ -67,6 +73,11 @@ impl From for RepositoryHandle { RepositoryHandle::Tar(repo) } } +impl From for RepositoryHandle { + fn from(repo: rpc::RpcRepository) -> Self { + RepositoryHandle::Rpc(repo) + } +} /// Open the repository at the given url address pub async fn open_repository>(address: S) -> crate::Result { @@ -85,6 +96,7 @@ pub async fn open_repository>(address: S) -> crate::Result Ok(rpc::RpcRepository::connect(url).await?.into()), scheme => Err(format!("Unsupported repository scheme: '{}'", scheme).into()), } } diff --git a/src/storage/payload.rs b/src/storage/payload.rs index b3fe9a3d84..f4561818c5 100644 --- a/src/storage/payload.rs +++ b/src/storage/payload.rs @@ -9,11 +9,15 @@ use futures::Stream; use crate::encoding; use crate::Result; +#[cfg(test)] +#[path = "payload_test.rs"] +mod payload_test; + /// Stores arbitrary binary data payloads using their content digest. #[async_trait::async_trait] pub trait PayloadStorage: Sync + Send { /// Iterate all the payloads in this storage. - fn iter_payload_digests(&self) -> Pin>>>; + fn iter_payload_digests(&self) -> Pin> + Send>>; /// Return true if the identified payload exists. async fn has_payload(&self, digest: encoding::Digest) -> bool { @@ -23,7 +27,7 @@ pub trait PayloadStorage: Sync + Send { /// Store the contents of the given stream, returning its digest and size async fn write_data( &self, - reader: Pin>, + reader: Pin>, ) -> Result<(encoding::Digest, u64)>; /// Return a handle to the full content of a payload. @@ -33,7 +37,7 @@ pub trait PayloadStorage: Sync + Send { async fn open_payload( &self, digest: encoding::Digest, - ) -> Result>>; + ) -> Result>>; /// Remove the payload idetified by the given digest. /// @@ -44,13 +48,13 @@ pub trait PayloadStorage: Sync + Send { #[async_trait::async_trait] impl PayloadStorage for &T { - fn iter_payload_digests(&self) -> Pin>>> { + fn iter_payload_digests(&self) -> Pin> + Send>> { PayloadStorage::iter_payload_digests(&**self) } async fn write_data( &self, - reader: Pin>, + reader: Pin>, ) -> Result<(encoding::Digest, u64)> { PayloadStorage::write_data(&**self, reader).await } @@ -58,7 +62,7 @@ impl PayloadStorage for &T { async fn open_payload( &self, digest: encoding::Digest, - ) -> Result>> { + ) -> Result>> { PayloadStorage::open_payload(&**self, digest).await } diff --git a/src/storage/payload_test.rs b/src/storage/payload_test.rs new file mode 100644 index 0000000000..6ce3d2640b --- /dev/null +++ b/src/storage/payload_test.rs @@ -0,0 +1,112 @@ +// Copyright (c) 2021 Sony Pictures Imageworks, et al. +// SPDX-License-Identifier: Apache-2.0 +// https://github.com/imageworks/spk +use tokio::io::AsyncReadExt; + +use futures::TryStreamExt; +use rstest::rstest; + +use crate::fixtures::*; + +#[rstest( + tmprepo, + case::fs(tmprepo("fs")), + case::tar(tmprepo("tar")), + case::rpc(tmprepo("rpc")) +)] +#[tokio::test] +async fn test_payload_io(#[future] tmprepo: TempRepo) { + let tmprepo = tmprepo.await; + let bytes = "simple string data".as_bytes(); + let reader = Box::pin(bytes.clone()); + + let (digest, size) = tmprepo + .write_data(reader) + .await + .expect("failed to write payload data"); + assert_eq!(size, bytes.len() as u64); + + let mut actual = String::new(); + tmprepo + .open_payload(digest) + .await + .unwrap() + .read_to_string(&mut actual) + .await + .unwrap(); + assert_eq!(&actual, "simple string data"); +} + +#[rstest( + tmprepo, + case::fs(tmprepo("fs")), + case::tar(tmprepo("tar")), + case::rpc(tmprepo("rpc")) +)] +#[tokio::test] +async fn test_payload_existance(#[future] tmprepo: TempRepo) { + let tmprepo = tmprepo.await; + let bytes = "simple string data".as_bytes(); + let reader = Box::pin(bytes.clone()); + + let (digest, size) = tmprepo + .write_data(reader) + .await + .expect("failed to write payload data"); + assert_eq!(size, bytes.len() as u64); + + let actual = tmprepo.has_payload(digest).await; + assert!(actual); + + tmprepo.remove_payload(digest).await.unwrap(); + + let actual = tmprepo.has_payload(digest).await; + assert!(!actual, "payload should not exist after being removed"); +} + +#[rstest( + tmprepo, + case::fs(tmprepo("fs")), + case::tar(tmprepo("tar")), + case::rpc(tmprepo("rpc")) +)] +#[tokio::test] +async fn test_payloads_iter(#[future] tmprepo: TempRepo) { + let tmprepo = tmprepo.await; + let payloads = [ + "simple string data 1".as_bytes(), + "simple string data 2".as_bytes(), + "simple string data 3".as_bytes(), + ]; + + let reader_0 = Box::pin(payloads[0].clone()); + let reader_1 = Box::pin(payloads[1].clone()); + let reader_2 = Box::pin(payloads[2].clone()); + + let mut expected = vec![ + tmprepo + .write_data(reader_0) + .await + .expect("failed to write payload data") + .0, + tmprepo + .write_data(reader_1) + .await + .expect("failed to write payload data") + .0, + tmprepo + .write_data(reader_2) + .await + .expect("failed to write payload data") + .0, + ]; + expected.sort(); + + let mut actual = tmprepo + .iter_payload_digests() + .try_collect::>() + .await + .expect("failed to iter digests"); + actual.sort(); + assert_eq!(actual, expected, "iter should return all stored digests"); +} diff --git a/src/storage/repository.rs b/src/storage/repository.rs index 44d47494e2..be9d57b148 100644 --- a/src/storage/repository.rs +++ b/src/storage/repository.rs @@ -15,6 +15,10 @@ use graph::{Blob, Manifest}; #[path = "./repository_test.rs"] mod repository_test; +#[cfg(test)] +#[path = "./database_test.rs"] +mod database_test; + #[derive(Debug, Eq, PartialEq, Hash, Clone)] pub enum Ref { Digest(encoding::Digest), @@ -107,7 +111,7 @@ pub trait Repository: /// Commit the data from 'reader' as a blob in this repository async fn commit_blob( &self, - reader: Pin>, + reader: Pin>, ) -> Result { let (digest, size) = self.write_data(reader).await?; let blob = Blob::new(digest, size); diff --git a/src/storage/repository_test.rs b/src/storage/repository_test.rs index 697c1842dc..836dd27abd 100644 --- a/src/storage/repository_test.rs +++ b/src/storage/repository_test.rs @@ -14,13 +14,18 @@ use crate::graph::Manifest; use crate::storage::{fs, prelude::*}; use crate::{encoding::Encodable, tracking::TagSpec}; -fixtures!(); +use crate::fixtures::*; -#[rstest(tmprepo, case(tmprepo("fs")), case(tmprepo("tar")))] +#[rstest( + tmprepo, + case(tmprepo("fs")), + case(tmprepo("tar")), + case(tmprepo("rpc")) +)] #[tokio::test] async fn test_find_aliases(#[future] tmprepo: TempRepo) { init_logging(); - let (_td, tmprepo) = tmprepo.await; + let tmprepo = tmprepo.await; tmprepo .find_aliases("not-existant") .await @@ -90,10 +95,15 @@ async fn test_commit_mode_fs(tmpdir: tempdir::TempDir) { ) } -#[rstest(tmprepo, case(tmprepo("fs")), case(tmprepo("tar")))] +#[rstest( + tmprepo, + case(tmprepo("fs")), + case(tmprepo("tar")), + case(tmprepo("rpc")) +)] #[tokio::test] -async fn test_commit_broken_link(#[future] tmprepo: TempRepo) { - let (tmpdir, tmprepo) = tmprepo.await; +async fn test_commit_broken_link(#[future] tmprepo: TempRepo, tmpdir: tempdir::TempDir) { + let tmprepo = tmprepo.await; let src_dir = tmpdir.path().join("source"); std::fs::create_dir_all(&src_dir).unwrap(); std::os::unix::fs::symlink( @@ -106,10 +116,15 @@ async fn test_commit_broken_link(#[future] tmprepo: TempRepo) { assert!(manifest.get_path("broken-link").is_some()); } -#[rstest(tmprepo, case::fs(tmprepo("fs")), case::fs(tmprepo("tar")))] +#[rstest( + tmprepo, + case(tmprepo("fs")), + case(tmprepo("tar")), + case(tmprepo("rpc")) +)] #[tokio::test] -async fn test_commit_dir(#[future] tmprepo: TempRepo) { - let (tmpdir, tmprepo) = tmprepo.await; +async fn test_commit_dir(#[future] tmprepo: TempRepo, tmpdir: tempdir::TempDir) { + let tmprepo = tmprepo.await; let src_dir = tmpdir.path().join("source"); ensure(src_dir.join("dir1.0/dir2.0/file.txt"), "somedata"); ensure(src_dir.join("dir1.0/dir2.1/file.txt"), "someotherdata"); diff --git a/src/storage/rpc/database.rs b/src/storage/rpc/database.rs new file mode 100644 index 0000000000..46e7c21613 --- /dev/null +++ b/src/storage/rpc/database.rs @@ -0,0 +1,82 @@ +// Copyright (c) 2021 Sony Pictures Imageworks, et al. +// SPDX-License-Identifier: Apache-2.0 +// https://github.com/imageworks/spk + +use std::convert::TryInto; +use std::pin::Pin; + +use futures::{Stream, TryStreamExt}; + +use crate::{encoding, graph, proto, storage, Result}; +use proto::RpcResult; + +#[async_trait::async_trait] +impl graph::DatabaseView for super::RpcRepository { + async fn read_object(&self, digest: encoding::Digest) -> Result { + let request = proto::ReadObjectRequest { + digest: Some(digest.into()), + }; + let obj = self + .db_client + .clone() + .read_object(request) + .await? + .into_inner() + .to_result()?; + obj.try_into() + } + + fn iter_digests(&self) -> Pin> + Send>> { + let request = proto::IterDigestsRequest {}; + let mut client = self.db_client.clone(); + let stream = futures::stream::once(async move { client.iter_digests(request).await }) + .map_err(crate::Error::from) + .map_ok(|r| r.into_inner().map_err(crate::Error::from)) + .try_flatten() + .and_then(|d| async { d.to_result() }) + .and_then(|d| async { d.try_into() }); + Box::pin(stream) + } + + fn iter_objects(&self) -> graph::DatabaseIterator<'_> { + graph::DatabaseIterator::new(self) + } + + fn walk_objects<'db>(&'db self, root: &encoding::Digest) -> graph::DatabaseWalker<'db> { + graph::DatabaseWalker::new(self, *root) + } +} + +#[async_trait::async_trait] +impl graph::Database for super::RpcRepository { + async fn write_object(&self, obj: &graph::Object) -> Result<()> { + let request = proto::WriteObjectRequest { + object: Some(obj.into()), + }; + self.db_client + .clone() + .write_object(request) + .await? + .into_inner() + .to_result()?; + Ok(()) + } + + async fn remove_object(&self, digest: encoding::Digest) -> Result<()> { + let request = proto::RemoveObjectRequest { + digest: Some(digest.into()), + }; + self.db_client + .clone() + .remove_object(request) + .await? + .into_inner() + .to_result()?; + Ok(()) + } +} + +impl storage::PlatformStorage for super::RpcRepository {} +impl storage::LayerStorage for super::RpcRepository {} +impl storage::ManifestStorage for super::RpcRepository {} +impl storage::BlobStorage for super::RpcRepository {} diff --git a/src/storage/rpc/mod.rs b/src/storage/rpc/mod.rs new file mode 100644 index 0000000000..152f49debd --- /dev/null +++ b/src/storage/rpc/mod.rs @@ -0,0 +1,11 @@ +// Copyright (c) 2021 Sony Pictures Imageworks, et al. +// SPDX-License-Identifier: Apache-2.0 +// https://github.com/imageworks/spk +//! Storage implemetation which is a client of the built-in spfs server + +mod database; +mod payload; +mod repository; +mod tag; + +pub use repository::RpcRepository; diff --git a/src/storage/rpc/payload.rs b/src/storage/rpc/payload.rs new file mode 100644 index 0000000000..bb7e3e58be --- /dev/null +++ b/src/storage/rpc/payload.rs @@ -0,0 +1,125 @@ +// Copyright (c) 2021 Sony Pictures Imageworks, et al. +// SPDX-License-Identifier: Apache-2.0 +// https://github.com/imageworks/spk +use std::convert::TryInto; +use std::pin::Pin; + +use futures::{Stream, TryStreamExt}; +use prost::Message; + +use crate::{ + encoding, + proto::{self, RpcResult}, + storage, Result, +}; + +#[async_trait::async_trait] +impl storage::PayloadStorage for super::RpcRepository { + fn iter_payload_digests(&self) -> Pin> + Send>> { + let request = proto::IterDigestsRequest {}; + let mut client = self.payload_client.clone(); + let stream = futures::stream::once(async move { client.iter_digests(request).await }) + .map_err(crate::Error::from) + .map_ok(|r| r.into_inner().map_err(crate::Error::from)) + .try_flatten() + .and_then(|d| async { d.to_result() }) + .and_then(|d| async { d.try_into() }); + Box::pin(stream) + } + + async fn write_data( + &self, + reader: Pin>, + ) -> Result<(encoding::Digest, u64)> { + let request = proto::WritePayloadRequest {}; + let option = self + .payload_client + .clone() + .write_payload(request) + .await? + .into_inner() + .to_result()?; + let client = reqwest::Client::new(); + let stream = + tokio_util::codec::FramedRead::new(reader, tokio_util::codec::BytesCodec::new()); + let resp = client + .post(&option.url) + .body(reqwest::Body::wrap_stream(stream)) + .send() + .await + .map_err(|err| { + crate::Error::String(format!("Failed to send upload request: {:?}", err)) + })? + .error_for_status() + .map_err(|err| crate::Error::String(format!("Upload failed: {:?}", err)))?; + if !resp.status().is_success() { + // the server is expected to return all errors via the gRPC message + // payload in the body. Any other status code is unexpected + return Err(crate::Error::String(format!( + "Unexpected status code from payload server: {}", + resp.status() + ))); + } + let bytes = resp + .bytes() + .await + .map_err(|err| format!("Failed to read response from payload server: {:?}", err))?; + let result = crate::proto::write_payload_response::UploadResponse::decode(bytes) + .map_err(|err| format!("Payload server returned invalid response data: {:?}", err))? + .to_result()?; + Ok((result.digest.try_into()?, result.size)) + } + + async fn open_payload( + &self, + digest: encoding::Digest, + ) -> Result>> { + let request = proto::OpenPayloadRequest { + digest: Some(digest.into()), + }; + let option = self + .payload_client + .clone() + .open_payload(request) + .await? + .into_inner() + .to_result()?; + let client = reqwest::Client::new(); + let url = option.locations.get(0).map(String::as_str).unwrap_or(""); + let resp = client + .get(url) + .send() + .await + .map_err(|err| { + crate::Error::String(format!("Failed to send download request: {:?}", err)) + })? + .error_for_status() + .map_err(|err| crate::Error::String(format!("Download failed: {:?}", err)))?; + if !resp.status().is_success() { + // the server is expected to return all errors via the gRPC message + // payload in the body. Any other status code is unexpected + return Err(crate::Error::String(format!( + "Unexpected status code from payload server: {}", + resp.status() + ))); + } + let stream = resp + .bytes_stream() + .map_err(|e| futures::io::Error::new(futures::io::ErrorKind::Other, e)); + use tokio_util::compat::FuturesAsyncReadCompatExt; + Ok(Box::pin(stream.into_async_read().compat())) + } + + async fn remove_payload(&self, digest: encoding::Digest) -> Result<()> { + let request = proto::RemovePayloadRequest { + digest: Some(digest.into()), + }; + self.payload_client + .clone() + .remove_payload(request) + .await? + .into_inner() + .to_result()?; + Ok(()) + } +} diff --git a/src/storage/rpc/repository.rs b/src/storage/rpc/repository.rs new file mode 100644 index 0000000000..02bd1f56c5 --- /dev/null +++ b/src/storage/rpc/repository.rs @@ -0,0 +1,67 @@ +// Copyright (c) 2021 Sony Pictures Imageworks, et al. +// SPDX-License-Identifier: Apache-2.0 +// https://github.com/imageworks/spk + +use crate::proto::{ + database_service_client::DatabaseServiceClient, payload_service_client::PayloadServiceClient, + repository_client::RepositoryClient, tag_service_client::TagServiceClient, +}; +use crate::{proto, storage, Error, Result}; + +#[derive(Debug)] +pub struct RpcRepository { + address: url::Url, + pub(super) repo_client: RepositoryClient, + pub(super) tag_client: TagServiceClient, + pub(super) db_client: DatabaseServiceClient, + pub(super) payload_client: PayloadServiceClient, +} + +impl RpcRepository { + pub async fn connect(address: url::Url) -> Result { + let endpoint = + tonic::transport::Endpoint::from_shared(address.to_string()).map_err(|err| { + Error::String(format!("invalid address for rpc repository: {:?}", err)) + })?; + let repo_client = RepositoryClient::connect(endpoint.clone()) + .await + .map_err(|err| { + Error::String(format!("failed to connect to rpc repository: {:?}", err)) + })?; + let tag_client = TagServiceClient::connect(endpoint.clone()) + .await + .map_err(|err| { + Error::String(format!("failed to connect to rpc repository: {:?}", err)) + })?; + let db_client = DatabaseServiceClient::connect(endpoint.clone()) + .await + .map_err(|err| { + Error::String(format!("failed to connect to rpc repository: {:?}", err)) + })?; + let payload_client = PayloadServiceClient::connect(endpoint) + .await + .map_err(|err| { + Error::String(format!("failed to connect to rpc repository: {:?}", err)) + })?; + Ok(Self { + address, + repo_client, + tag_client, + db_client, + payload_client, + }) + } + + /// The round-trip time taken to ping this repository over grpc, if successful + pub async fn ping(&self) -> Result { + let start = std::time::Instant::now(); + self.repo_client.clone().ping(proto::PingRequest {}).await?; + Ok(start.elapsed()) + } +} + +impl storage::Repository for RpcRepository { + fn address(&self) -> url::Url { + self.address.clone() + } +} diff --git a/src/storage/rpc/tag.rs b/src/storage/rpc/tag.rs new file mode 100644 index 0000000000..188e01d856 --- /dev/null +++ b/src/storage/rpc/tag.rs @@ -0,0 +1,155 @@ +// Copyright (c) 2021 Sony Pictures Imageworks, et al. +// SPDX-License-Identifier: Apache-2.0 +// https://github.com/imageworks/spk + +use std::convert::{TryFrom, TryInto}; +use std::pin::Pin; + +use futures::{Stream, TryStreamExt}; +use relative_path::RelativePath; + +use crate::proto::{self, tag_service_client::TagServiceClient, RpcResult}; +use crate::{ + encoding, + storage::{self, tag::TagSpecAndTagStream}, + tracking, Result, +}; + +#[async_trait::async_trait] +impl storage::TagStorage for super::RpcRepository { + async fn resolve_tag( + &self, + tag_spec: &crate::tracking::TagSpec, + ) -> Result { + let request = proto::ResolveTagRequest { + tag_spec: tag_spec.to_string(), + }; + let response = self + .tag_client + .clone() + .resolve_tag(request) + .await? + .into_inner(); + response.to_result()?.try_into() + } + + fn ls_tags(&self, path: &RelativePath) -> Pin> + Send>> { + let request = proto::LsTagsRequest { + path: path.to_string(), + }; + let mut client = self.tag_client.clone(); + let stream = futures::stream::once(async move { client.ls_tags(request).await }) + .map_err(crate::Error::from) + .and_then(|r| async { r.into_inner().to_result() }) + .map_ok(|resp| futures::stream::iter(resp.entries.into_iter().map(Ok))) + .try_flatten(); + Box::pin(stream) + } + + fn find_tags( + &self, + digest: &encoding::Digest, + ) -> Pin> + Send>> { + let request = proto::FindTagsRequest { + digest: Some(digest.into()), + }; + let mut client = self.tag_client.clone(); + let stream = futures::stream::once(async move { client.find_tags(request).await }) + .map_err(crate::Error::from) + .and_then(|r| async { r.into_inner().to_result() }) + .map_ok(|tag_list| { + futures::stream::iter(tag_list.tags.into_iter().map(tracking::TagSpec::parse)) + }) + .try_flatten(); + Box::pin(stream) + } + + fn iter_tag_streams(&self) -> Pin> + Send>> { + let request = proto::IterTagSpecsRequest {}; + let mut client = self.tag_client.clone(); + let stream = futures::stream::once(async move { client.iter_tag_specs(request).await }) + .map_err(crate::Error::from) + .and_then(|r| async { r.into_inner().to_result() }) + .map_ok(|response| { + futures::stream::iter(response.tag_specs.into_iter().map(tracking::TagSpec::parse)) + }) + .try_flatten(); + let client = self.tag_client.clone(); + let stream = stream.and_then(move |spec| { + let client = client.clone(); + async move { + match read_tag(client, &spec).await { + Ok(tags) => Ok((spec, tags)), + Err(err) => Err(err), + } + } + }); + + Box::pin(stream) + } + + async fn read_tag( + &self, + tag: &tracking::TagSpec, + ) -> Result> + Send>>> { + read_tag(self.tag_client.clone(), tag).await + } + + async fn push_raw_tag(&self, tag: &tracking::Tag) -> Result<()> { + let request = proto::PushRawTagRequest { + tag: Some(tag.into()), + }; + let _response = self + .tag_client + .clone() + .push_raw_tag(request) + .await? + .into_inner() + .to_result()?; + Ok(()) + } + + async fn remove_tag_stream(&self, tag: &tracking::TagSpec) -> Result<()> { + let request = proto::RemoveTagStreamRequest { + tag_spec: tag.to_string(), + }; + let _response = self + .tag_client + .clone() + .remove_tag_stream(request) + .await? + .into_inner() + .to_result()?; + Ok(()) + } + + async fn remove_tag(&self, tag: &tracking::Tag) -> Result<()> { + let request = proto::RemoveTagRequest { + tag: Some(tag.into()), + }; + let _reponse = self + .tag_client + .clone() + .remove_tag(request) + .await? + .into_inner() + .to_result()?; + Ok(()) + } +} + +async fn read_tag( + mut client: TagServiceClient, + tag: &tracking::TagSpec, +) -> Result> + Send>>> { + let request = proto::ReadTagRequest { + tag_spec: tag.to_string(), + }; + let response = client.read_tag(request).await?.into_inner().to_result()?; + let items: Result> = response + .tags + .into_iter() + .map(tracking::Tag::try_from) + .collect(); + Ok(Box::pin(futures::stream::iter(items?.into_iter().map(Ok)))) +} diff --git a/src/storage/tag.rs b/src/storage/tag.rs index fb680f14f6..ad25ececac 100644 --- a/src/storage/tag.rs +++ b/src/storage/tag.rs @@ -14,6 +14,10 @@ pub(crate) type TagStream = Pin> + S pub(crate) type TagSpecAndTagStream = (tracking::TagSpec, TagStream); pub(crate) type IterTagsItem = Result<(tracking::TagSpec, tracking::Tag)>; +#[cfg(test)] +#[path = "./tag_test.rs"] +mod tag_test; + /// A location where tags are tracked and persisted. #[async_trait::async_trait] pub trait TagStorage: Send + Sync { @@ -55,7 +59,7 @@ pub trait TagStorage: Send + Sync { ) -> Pin> + Send>>; /// Iterate through the available tags in this storage. - fn iter_tags(&self) -> Pin>> { + fn iter_tags(&self) -> Pin + Send>> { let stream = self.iter_tag_streams(); let mapped = futures::StreamExt::filter_map(stream, |res| async { match res { diff --git a/src/storage/tag_test.rs b/src/storage/tag_test.rs new file mode 100644 index 0000000000..b5f37bbfab --- /dev/null +++ b/src/storage/tag_test.rs @@ -0,0 +1,230 @@ +// Copyright (c) 2021 Sony Pictures Imageworks, et al. +// SPDX-License-Identifier: Apache-2.0 +// https://github.com/imageworks/spk +use std::os::unix::fs::MetadataExt; + +use rstest::rstest; +use tokio_stream::StreamExt; + +use crate::storage::{fs::FSRepository, TagStorage}; +use crate::{encoding, tracking, Result}; +use relative_path::RelativePathBuf; + +use crate::fixtures::*; + +#[rstest( + tmprepo, + case::fs(tmprepo("fs")), + case::tar(tmprepo("tar")), + case::rpc(tmprepo("rpc")) +)] +#[tokio::test] +async fn test_tag_stream(#[future] tmprepo: TempRepo) { + init_logging(); + let tmprepo = tmprepo.await; + + let digest1 = encoding::Hasher::default().digest(); + let mut h = encoding::Hasher::default(); + h.update(b"hello"); + let digest2 = h.digest(); + + let base = crate::tracking::TagSpec::parse("hello/world").unwrap(); + let tag1 = tmprepo + .push_tag(&base, &digest1) + .await + .expect("failed to push tag"); + assert_eq!(tmprepo.resolve_tag(&base).await.unwrap(), tag1); + assert_eq!( + tmprepo.resolve_tag(&base.with_version(0)).await.unwrap(), + tag1 + ); + + let tag2 = tmprepo + .push_tag(&base, &digest2) + .await + .expect("failed to push tag"); + let _tag3 = tmprepo + .push_tag(&base, &digest2) + .await + .expect("failed to push tag"); + assert_eq!(tmprepo.resolve_tag(&base).await.unwrap(), tag2); + assert_eq!( + tmprepo.resolve_tag(&base.with_version(0)).await.unwrap(), + tag2 + ); + assert_eq!( + tmprepo.resolve_tag(&base.with_version(1)).await.unwrap(), + tag1 + ); + let found: crate::Result> = tmprepo.find_tags(&digest2).collect().await; + assert_eq!(found.unwrap(), vec![base.clone()]); + let found: crate::Result> = tmprepo.find_tags(&digest1).collect().await; + assert_eq!(found.unwrap(), vec![base.with_version(1)]); +} + +#[rstest( + tmprepo, + case::fs(tmprepo("fs")), + case::tar(tmprepo("tar")), + case::rpc(tmprepo("rpc")) +)] +#[tokio::test] +async fn test_tag_no_duplication(#[future] tmprepo: TempRepo) { + init_logging(); + let tmprepo = tmprepo.await; + + let spec = tracking::TagSpec::parse("hello").unwrap(); + let tag1 = tmprepo + .push_tag(&spec, &encoding::EMPTY_DIGEST.into()) + .await + .unwrap(); + let tag2 = tmprepo + .push_tag(&spec, &encoding::EMPTY_DIGEST.into()) + .await + .unwrap(); + + assert_eq!(tag1, tag2); + + assert_eq!( + tmprepo + .read_tag(&spec) + .await + .unwrap() + // there's no count() for streams + .fold(0, |c, _| c + 1) + .await, + 1 + ); +} + +#[rstest] +#[tokio::test] +async fn test_tag_permissions(tmpdir: tempdir::TempDir) { + let storage = FSRepository::create(tmpdir.path().join("repo")) + .await + .unwrap(); + let spec = tracking::TagSpec::parse("hello").unwrap(); + storage + .push_tag(&spec, &encoding::EMPTY_DIGEST.into()) + .await + .unwrap(); + assert_eq!( + tmpdir + .path() + .join("repo/tags/hello.tag") + .metadata() + .unwrap() + .mode() + & 0o777, + 0o777 + ); +} + +#[rstest( + tmprepo, + case::fs(tmprepo("fs")), + case::tar(tmprepo("tar")), + case::rpc(tmprepo("rpc")) +)] +#[tokio::test] +async fn test_ls_tags(#[future] tmprepo: TempRepo) { + init_logging(); + let tmprepo = tmprepo.await; + + for tag in &[ + "spi/stable/my_tag", + "spi/stable/other_tag", + "spi/stable", + "spi/latest/my_tag", + ] { + let spec = tracking::TagSpec::parse(tag).unwrap(); + tmprepo + .push_tag(&spec, &encoding::EMPTY_DIGEST.into()) + .await + .unwrap(); + } + + let mut tags: Vec<_> = tmprepo + .ls_tags(&RelativePathBuf::from("/")) + .collect::>>() + .await + .unwrap(); + assert_eq!(tags, vec!["spi/".to_string()]); + tags = tmprepo + .ls_tags(&RelativePathBuf::from("/spi")) + .collect::>>() + .await + .unwrap(); + tags.sort(); + assert_eq!( + tags, + vec![ + "latest/".to_string(), + "stable".to_string(), + "stable/".to_string() + ] + ); + tags = tmprepo + .ls_tags(&RelativePathBuf::from("spi/stable")) + .collect::>>() + .await + .unwrap(); + tags.sort(); + assert_eq!(tags, vec!["my_tag".to_string(), "other_tag".to_string()]); +} + +#[rstest( + tmprepo, + case::fs(tmprepo("fs")), + case::tar(tmprepo("tar")), + case::rpc(tmprepo("rpc")) +)] +#[tokio::test] +async fn test_rm_tags(#[future] tmprepo: TempRepo) { + init_logging(); + let tmprepo = tmprepo.await; + + for tag in &[ + "spi/stable/my_tag", + "spi/stable/other_tag", + "spi/latest/my_tag", + ] { + let spec = tracking::TagSpec::parse(tag).unwrap(); + tmprepo + .push_tag(&spec, &encoding::EMPTY_DIGEST.into()) + .await + .unwrap(); + } + + let mut tags: Vec<_> = tmprepo + .ls_tags(&RelativePathBuf::from("/spi")) + .collect::>>() + .await + .unwrap(); + tags.sort(); + assert_eq!(tags, vec!["latest/", "stable/"]); + tmprepo + .remove_tag_stream(&tracking::TagSpec::parse("spi/stable/my_tag").unwrap()) + .await + .unwrap(); + tags = tmprepo + .ls_tags(&RelativePathBuf::from("spi/stable")) + .collect::>>() + .await + .unwrap(); + assert_eq!(tags, vec!["other_tag"]); + tmprepo + .remove_tag_stream(&tracking::TagSpec::parse("spi/stable/other_tag").unwrap()) + .await + .unwrap(); + tags = tmprepo + .ls_tags(&RelativePathBuf::from("spi")) + .collect::>>() + .await + .unwrap(); + assert_eq!( + tags, + vec!["latest/"], + "should remove empty tag folders during cleanup" + ); +} diff --git a/src/storage/tar/repository.rs b/src/storage/tar/repository.rs index 268d7deb0d..323e4b4114 100644 --- a/src/storage/tar/repository.rs +++ b/src/storage/tar/repository.rs @@ -137,13 +137,13 @@ impl graph::Database for TarRepository { #[async_trait::async_trait] impl PayloadStorage for TarRepository { - fn iter_payload_digests(&self) -> Pin>>> { + fn iter_payload_digests(&self) -> Pin> + Send>> { self.repo.iter_payload_digests() } async fn write_data( &self, - reader: Pin>, + reader: Pin>, ) -> Result<(encoding::Digest, u64)> { let res = self.repo.write_data(reader).await?; self.up_to_date @@ -154,7 +154,7 @@ impl PayloadStorage for TarRepository { async fn open_payload( &self, digest: encoding::Digest, - ) -> Result>> { + ) -> Result>> { self.repo.open_payload(digest).await } diff --git a/src/sync.rs b/src/sync.rs index e23a19f6e4..d90924f7b3 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -224,7 +224,14 @@ async fn sync_blob( } else { let payload = src.open_payload(blob.payload).await?; tracing::debug!(digest = ?blob.payload, "syncing payload"); - dest.write_data(payload).await?; + let (digest, _) = dest.write_data(payload).await?; + if digest != blob.payload { + return Err(Error::String(format!( + "Source repository provided blob that did not match the requested digest: wanted {}, got {}", + blob.payload, + digest + ))); + } } dest.write_blob(blob.clone()).await?; Ok(()) diff --git a/src/sync_test.rs b/src/sync_test.rs index 346fc20fae..116952751f 100644 --- a/src/sync_test.rs +++ b/src/sync_test.rs @@ -2,7 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 // https://github.com/imageworks/spk -use rstest::rstest; +use rstest::{fixture, rstest}; use super::{push_ref, sync_ref}; use crate::config::Config; @@ -10,7 +10,7 @@ use crate::prelude::*; use crate::{encoding, graph, storage, tracking, Error}; use storage::RepositoryHandle; -fixtures!(); +use crate::fixtures::*; #[rstest] #[tokio::test] @@ -71,26 +71,28 @@ async fn test_push_ref(#[future] config: (tempdir::TempDir, Config)) { assert!(sync_ref(tag.to_string(), &local, &remote).await.is_ok()); } -#[rstest] +#[rstest( + repo_a, + repo_b, + case::fs(tmprepo("fs"), tmprepo("fs")), + case::tar(tmprepo("tar"), tmprepo("tar")), + case::rpc(tmprepo("rpc"), tmprepo("rpc")) +)] #[tokio::test] -async fn test_sync_ref(tmpdir: tempdir::TempDir) { +async fn test_sync_ref( + #[future] repo_a: TempRepo, + #[future] repo_b: TempRepo, + tmpdir: tempdir::TempDir, +) { init_logging(); + let repo_a = repo_a.await; + let repo_b = repo_b.await; + let src_dir = tmpdir.path().join("source"); ensure(src_dir.join("dir/file.txt"), "hello"); ensure(src_dir.join("dir2/otherfile.txt"), "hello2"); ensure(src_dir.join("dir//dir/dir/file.txt"), "hello, world"); - let repo_a: RepositoryHandle = - storage::fs::FSRepository::create(tmpdir.path().join("repo_a").as_path()) - .await - .unwrap() - .into(); - let repo_b: RepositoryHandle = - storage::fs::FSRepository::create(tmpdir.path().join("repo_b").as_path()) - .await - .unwrap() - .into(); - let manifest = repo_a.commit_dir(src_dir.as_path()).await.unwrap(); let layer = repo_a .create_layer(&graph::Manifest::from(&manifest)) @@ -114,12 +116,6 @@ async fn test_sync_ref(tmpdir: tempdir::TempDir) { assert!(repo_b.has_platform(platform.digest().unwrap()).await); assert!(repo_b.has_layer(layer.digest().unwrap()).await); - std::fs::remove_dir_all(tmpdir.path().join("repo_a/objects")).unwrap(); - std::fs::remove_dir_all(tmpdir.path().join("repo_a/payloads")).unwrap(); - std::fs::remove_dir_all(tmpdir.path().join("repo_a/tags")).unwrap(); - std::fs::create_dir_all(tmpdir.path().join("repo_a/objects")).unwrap(); - std::fs::create_dir_all(tmpdir.path().join("repo_a/payloads")).unwrap(); - std::fs::create_dir_all(tmpdir.path().join("repo_a/tags")).unwrap(); sync_ref("testing", &repo_b, &repo_a) .await .expect("failed to sync back"); @@ -128,28 +124,33 @@ async fn test_sync_ref(tmpdir: tempdir::TempDir) { assert!(repo_a.has_layer(layer.digest().unwrap()).await); } -#[rstest] +#[rstest( + repo_a, + repo_b, + case::fs(tmprepo("fs"), tmprepo("fs")), + case::tar(tmprepo("tar"), tmprepo("tar")), + case::rpc(tmprepo("rpc"), tmprepo("rpc")) +)] #[tokio::test] -async fn test_sync_through_tar(tmpdir: tempdir::TempDir) { +async fn test_sync_through_tar( + #[future] repo_a: TempRepo, + #[future] repo_b: TempRepo, + tmpdir: tempdir::TempDir, +) { init_logging(); + let repo_a = repo_a.await; + let repo_b = repo_b.await; + let dir = tmpdir.path(); let src_dir = dir.join("source"); ensure(src_dir.join("dir/file.txt"), "hello"); ensure(src_dir.join("dir2/otherfile.txt"), "hello2"); ensure(src_dir.join("dir//dir/dir/file.txt"), "hello, world"); - let repo_a: RepositoryHandle = storage::fs::FSRepository::create(dir.join("repo_a")) - .await - .unwrap() - .into(); let repo_tar: RepositoryHandle = storage::tar::TarRepository::create(dir.join("repo.tar")) .await .unwrap() .into(); - let repo_b: RepositoryHandle = storage::fs::FSRepository::create(dir.join("repo_b")) - .await - .unwrap() - .into(); let manifest = repo_a.commit_dir(src_dir.as_path()).await.unwrap(); let layer = repo_a diff --git a/src/tracking/diff_test.rs b/src/tracking/diff_test.rs index 5bddf76398..763a512327 100644 --- a/src/tracking/diff_test.rs +++ b/src/tracking/diff_test.rs @@ -7,7 +7,7 @@ use rstest::rstest; use super::{compute_diff, Diff, DiffMode}; use crate::tracking::{compute_manifest, Manifest}; -fixtures!(); +use crate::fixtures::*; #[rstest] fn test_diff_str() { diff --git a/src/tracking/manifest.rs b/src/tracking/manifest.rs index abec93f3ef..4ea37a2b71 100644 --- a/src/tracking/manifest.rs +++ b/src/tracking/manifest.rs @@ -245,7 +245,7 @@ pub async fn compute_manifest + Send>(path: P) -> Resu pub struct ManifestBuilder where - H: FnMut(Pin>) -> F + Send, + H: FnMut(Pin>) -> F + Send, F: Future> + Send, { hasher: H, @@ -253,7 +253,7 @@ where impl ManifestBuilder where - H: FnMut(Pin>) -> F + Send, + H: FnMut(Pin>) -> F + Send, F: Future> + Send, { pub fn new(hasher: H) -> Self { diff --git a/src/tracking/manifest_test.rs b/src/tracking/manifest_test.rs index 12785d2add..f1e42e5bc7 100644 --- a/src/tracking/manifest_test.rs +++ b/src/tracking/manifest_test.rs @@ -7,7 +7,7 @@ use rstest::rstest; use super::{compute_manifest, EntryKind, Manifest}; use crate::graph; -fixtures!(); +use crate::fixtures::*; #[rstest] #[tokio::test]