Skip to content

Commit

Permalink
Merge pull request #50 from imageworks/grpc-server
Browse files Browse the repository at this point in the history
Initial gRPC Server Implementation
  • Loading branch information
rydrman authored Mar 3, 2022
2 parents 9ff40f5 + b464e4b commit 04beae3
Show file tree
Hide file tree
Showing 62 changed files with 3,025 additions and 400 deletions.
395 changes: 379 additions & 16 deletions Cargo.lock

Large diffs are not rendered by default.

25 changes: 24 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,21 @@ path = "src/cli/cmd_init.rs"
[[bin]]
name = "spfs-render"
path = "src/cli/cmd_render.rs"
[[bin]]
name = "spfs-server"
path = "src/cli/cmd_server.rs"
required-features = ["server"]

[features]
default = ["server"]
server = [
"tokio/signal",
"tokio-stream/net",
"hyper/server",
"tokio-util/codec",
"tokio-util/io",
"tokio-util/io-util",
]

[dependencies]
async-trait = "0.1.52"
Expand All @@ -51,13 +66,16 @@ faccess = "0.2.3"
futures = "0.3.9"
gitignore = "1.0.7"
glob = "0.3.0"
hyper = { version = "0.14.16", features = ["client"] }
indicatif = "0.16.2"
itertools = "0.9.0"
libc = "0.2.80"
nix = "0.19.0"
palaver = "0.2.8"
prost = "0.9.0"
rand = "0.7.3"
relative-path = "1.3.2"
reqwest = { version = "0.11.7", features = ["stream"] }
ring = "0.16.15"
semver = "0.11.0"
sentry = "0.21.0"
Expand All @@ -79,7 +97,9 @@ tokio = { version = "1.15", features = [
"sync",
"process",
] }
tokio-stream = "0.1.8"
tokio-stream = { version = "0.1", features = ["net"], optional = true }
tokio-util = { version = "0.6", features = ["compat"] }
tonic = "0.6.1"
tracing = "0.1.22"
tracing-subscriber = "0.2.15"
unicode_reader = "1.0.1"
Expand All @@ -89,6 +109,9 @@ walkdir = "2.3.1"
whoami = "0.9.0"
thiserror = "1.0"

[build-dependencies]
tonic-build = "0.6.0"

[dev-dependencies]
rstest = { version = "0.12", default_features = false }
serial_test = "0.5.1"
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ default: debug

debug:
cd $(SOURCE_ROOT)
cargo build
cargo build --all

test:
cargo test
cargo test --all

rpm:
cd $(SOURCE_ROOT)
Expand Down
13 changes: 13 additions & 0 deletions build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
fn main() -> Result<(), Box<dyn std::error::Error>> {
tonic_build::configure().compile(
&[
"src/proto/defs/database.proto",
"src/proto/defs/repository.proto",
"src/proto/defs/payload.proto",
"src/proto/defs/tag.proto",
"src/proto/defs/types.proto",
],
&["src/proto/defs"],
)?;
Ok(())
}
3 changes: 2 additions & 1 deletion spfs.spec
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ Filesystem isolation, capture, and distribution.
%setup -q

%build
cargo build --release --verbose
cargo build --release --verbose --all

%install
mkdir -p %{buildroot}/usr/bin
Expand All @@ -42,6 +42,7 @@ done
/usr/bin/spfs-push
/usr/bin/spfs-pull
/usr/bin/spfs-init
/usr/bin/spfs-server
%caps(cap_chown,cap_fowner+ep) /usr/bin/spfs-render
%caps(cap_sys_chroot,cap_sys_admin+ep) /usr/bin/spfs-join
%caps(cap_setuid,cap_chown,cap_mknod,cap_sys_admin,cap_fowner+ep) /usr/bin/spfs-enter
Expand Down
2 changes: 1 addition & 1 deletion src/bootstrap_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use super::{build_interactive_shell_cmd, build_shell_initialized_command};
use crate::{resolve::which, runtime};
use std::{ffi::OsString, process::Command};

fixtures!();
use crate::fixtures::*;

#[rstest(
shell,
Expand Down
35 changes: 17 additions & 18 deletions src/clean_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@ use crate::{graph, storage, tracking, Error};
use std::collections::HashSet;
use storage::prelude::*;

fixtures!();
use crate::fixtures::*;

#[rstest]
#[tokio::test]
async fn test_get_attached_objects(#[future] tmprepo: TempRepo) {
let (_td, tmprepo) = tmprepo.await;
let tmprepo = tmprepo.await;
let reader = Box::pin("hello, world".as_bytes());
let (payload_digest, _) = tmprepo.write_data(reader).await.unwrap();
let blob = graph::Blob::new(payload_digest, 0);
Expand All @@ -42,7 +42,7 @@ async fn test_get_attached_objects(#[future] tmprepo: TempRepo) {
#[rstest]
#[tokio::test]
async fn test_get_attached_payloads(#[future] tmprepo: TempRepo) {
let (_td, tmprepo) = tmprepo.await;
let tmprepo = tmprepo.await;
let reader = Box::pin("hello, world".as_bytes());
let (payload_digest, _) = tmprepo.write_data(reader).await.unwrap();
let mut expected = HashSet::new();
Expand All @@ -65,9 +65,13 @@ async fn test_get_attached_payloads(#[future] tmprepo: TempRepo) {

#[rstest]
#[tokio::test]
async fn test_get_attached_unattached_objects_blob(#[future] tmprepo: TempRepo) {
async fn test_get_attached_unattached_objects_blob(
#[future] tmprepo: TempRepo,
tmpdir: tempdir::TempDir,
) {
init_logging();
let (tmpdir, tmprepo) = tmprepo.await;
let tmprepo = tmprepo.await;

let data_dir = tmpdir.path().join("data");
ensure(data_dir.join("file.txt"), "hello, world");

Expand Down Expand Up @@ -106,10 +110,10 @@ async fn test_get_attached_unattached_objects_blob(#[future] tmprepo: TempRepo)

#[rstest]
#[tokio::test]
async fn test_clean_untagged_objects(#[future] tmprepo: TempRepo) {
async fn test_clean_untagged_objects(#[future] tmprepo: TempRepo, tmpdir: tempdir::TempDir) {
init_logging();
let tmprepo = tmprepo.await;

let (tmpdir, tmprepo) = tmprepo.await;
let data_dir_1 = tmpdir.path().join("data");
ensure(data_dir_1.join("dir/dir/test.file"), "1 hello");
ensure(data_dir_1.join("dir/dir/test.file2"), "1 hello, world");
Expand Down Expand Up @@ -168,7 +172,7 @@ async fn test_clean_untagged_objects(#[future] tmprepo: TempRepo) {
#[rstest]
#[tokio::test]
async fn test_clean_untagged_objects_layers_platforms(#[future] tmprepo: TempRepo) {
let (_td, tmprepo) = tmprepo.await;
let tmprepo = tmprepo.await;
let manifest = tracking::Manifest::default();
let layer = tmprepo
.create_layer(&graph::Manifest::from(&manifest))
Expand Down Expand Up @@ -198,15 +202,10 @@ async fn test_clean_untagged_objects_layers_platforms(#[future] tmprepo: TempRep

#[rstest]
#[tokio::test]
async fn test_clean_manifest_renders(#[future] tmprepo: TempRepo) {
let (tmpdir, tmprepo) = tmprepo.await;
let tmprepo = match tmprepo {
storage::RepositoryHandle::FS(repo) => repo,
_ => {
println!("Unsupported repo for this test");
return;
}
};
async fn test_clean_manifest_renders(tmpdir: tempdir::TempDir) {
let tmprepo = storage::fs::FSRepository::create(tmpdir.path())
.await
.unwrap();

let data_dir = tmpdir.path().join("data");
ensure(data_dir.join("dir/dir/file.txt"), "hello");
Expand Down Expand Up @@ -234,7 +233,7 @@ async fn test_clean_manifest_renders(#[future] tmprepo: TempRepo) {
.await
.expect("failed to clean repo");

let files = list_files(tmprepo.renders.unwrap().root());
let files = list_files(tmprepo.renders.as_ref().unwrap().root());
assert!(files.is_empty(), "should remove all created data files");
}

Expand Down
1 change: 1 addition & 0 deletions src/cli/bin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ main!(Opt);
\n pull pull one or more object to the local repository\
\n push push one or more objects to a remote repository\
\n render render the contents of an environment or layer\
\n server run an spfs server (if installed)\
"
)]
pub struct Opt {
Expand Down
1 change: 1 addition & 0 deletions src/cli/cmd_check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ impl CmdCheck {
let errors = match repo {
RepositoryHandle::FS(repo) => spfs::graph::check_database_integrity(repo).await,
RepositoryHandle::Tar(repo) => spfs::graph::check_database_integrity(repo).await,
RepositoryHandle::Rpc(repo) => spfs::graph::check_database_integrity(repo).await,
};
for error in errors.iter() {
tracing::error!("{:?}", error);
Expand Down
89 changes: 89 additions & 0 deletions src/cli/cmd_server.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Copyright (c) 2021 Sony Pictures Imageworks, et al.
// SPDX-License-Identifier: Apache-2.0
// https://github.com/imageworks/spk
use structopt::StructOpt;

#[macro_use]
mod args;

main!(CmdServer);

#[derive(Debug, StructOpt)]
pub struct CmdServer {
#[structopt(short = "v", long = "verbose", parse(from_occurrences))]
pub verbose: usize,
#[structopt(
long = "remote",
short = "r",
about = "Serve a configured remote repository instead of the local one"
)]
remote: Option<String>,
#[structopt(
long = "payloads-root",
default_value = "http://localhost",
about = "The external root url that clients can use to connect to this server"
)]
payloads_root: url::Url,
// 7737 = spfs on a dial pad
#[structopt(
default_value = "0.0.0.0:7737",
about = "The address to listen on for grpc requests"
)]
grpc_address: std::net::SocketAddr,
#[structopt(
default_value = "0.0.0.0:7787",
about = "The address to listen on for http requests"
)]
http_address: std::net::SocketAddr,
}

impl CmdServer {
pub async fn run(&mut self, config: &spfs::Config) -> spfs::Result<i32> {
let repo = match &self.remote {
Some(remote) => config.get_remote(remote).await?,
None => config.get_repository().await?.into(),
};
let repo = std::sync::Arc::new(repo);

let payload_service =
spfs::server::PayloadService::new(repo.clone(), self.payloads_root.clone());
let http_server = {
let payload_service = payload_service.clone();
hyper::Server::bind(&self.http_address).serve(hyper::service::make_service_fn(
move |_| {
let s = payload_service.clone();
async move { Ok::<_, std::convert::Infallible>(s) }
},
))
};
let http_future = http_server.with_graceful_shutdown(async {
if let Err(err) = tokio::signal::ctrl_c().await {
tracing::error!(?err, "Failed to setup graceful shutdown handler");
};
tracing::info!("shutting down http server...");
});
let grpc_future = tonic::transport::Server::builder()
.add_service(spfs::server::Repository::new_srv())
.add_service(spfs::server::TagService::new_srv(repo.clone()))
.add_service(spfs::server::DatabaseService::new_srv(repo))
.add_service(payload_service.into_srv())
.serve_with_shutdown(self.grpc_address, async {
if let Err(err) = tokio::signal::ctrl_c().await {
tracing::error!(?err, "Failed to setup graceful shutdown handler");
};
tracing::info!("shutting down gRPC server...");
});
tracing::info!("listening on: {}, {}", self.grpc_address, self.http_address);

// TODO: stop the other server when one fails so that
// the process can exit
let (grpc_result, http_result) = tokio::join!(grpc_future, http_future,);
if let Err(err) = grpc_result {
tracing::error!("gRPC server failed: {:?}", err);
}
if let Err(err) = http_result {
tracing::error!("http server failed: {:?}", err);
}
Ok(0)
}
}
2 changes: 1 addition & 1 deletion src/commit_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use rstest::rstest;
use super::{commit_layer, commit_platform};
use crate::{runtime, Error};

fixtures!();
use crate::fixtures::*;
#[rstest]
#[tokio::test]
async fn test_commit_empty(tmpdir: tempdir::TempDir) {
Expand Down
10 changes: 10 additions & 0 deletions src/encoding/hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,13 @@ where

/// Write this object in binary format.
fn encode(&self, writer: &mut impl Write) -> Result<()>;

/// Encode this object into it's binary form in memory.
fn encode_to_bytes(&self) -> Result<Vec<u8>> {
let mut buf = Vec::new();
self.encode(&mut buf)?;
Ok(buf)
}
}

pub trait Decodable
Expand Down Expand Up @@ -263,6 +270,9 @@ impl<'a> Digest {
pub fn as_bytes(&'a self) -> &'a [u8] {
self.0.as_ref()
}
pub fn into_bytes(self) -> [u8; DIGEST_SIZE] {
self.0
}
pub fn from_bytes(digest_bytes: &[u8]) -> Result<Self> {
match digest_bytes.try_into() {
Err(err) => Err(Error::new(format!(
Expand Down
2 changes: 2 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ pub enum Error {
InvalidDateTime(#[from] chrono::ParseError),
#[error(transparent)]
Caps(#[from] caps::errors::CapsError),
#[error("Error communicating with the server: {0:?}")]
Tonic(#[from] tonic::Status),

/// Denotes a missing object or one that is not present in the database.
#[error("Unknown Object: {0}")]
Expand Down
Loading

0 comments on commit 04beae3

Please sign in to comment.