Skip to content

Commit

Permalink
[aptos-workspace-server] fix error propagation + pull docker images +…
Browse files Browse the repository at this point in the history
… use only 1 docker network
  • Loading branch information
vgao1996 committed Dec 12, 2024
1 parent 803b7fd commit 6b4ce2a
Show file tree
Hide file tree
Showing 7 changed files with 154 additions and 71 deletions.
35 changes: 32 additions & 3 deletions aptos-move/aptos-workspace-server/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,48 @@
use futures::{future::Shared, FutureExt};
use std::{
fmt::{self, Debug, Display},
future::Future,
net::{IpAddr, Ipv4Addr},
sync::Arc,
};

/// An wrapper to ensure propagation of chain of errors.
pub(crate) struct ArcError(Arc<anyhow::Error>);

impl Clone for ArcError {
fn clone(&self) -> Self {
Self(self.0.clone())
}
}

impl std::error::Error for ArcError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
self.0.source()
}
}

impl Display for ArcError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.0)
}
}

impl Debug for ArcError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{:?}", self.0)
}
}

/// The local IP address services are bound to.
pub(crate) const IP_LOCAL_HOST: IpAddr = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));

/// Converts a future into a shared future by wrapping the error in an `Arc`.
pub(crate) fn make_shared<F, T, E>(fut: F) -> Shared<impl Future<Output = Result<T, Arc<E>>>>
pub(crate) fn make_shared<F, T>(fut: F) -> Shared<impl Future<Output = Result<T, ArcError>>>
where
T: Clone,
F: Future<Output = Result<T, E>>,
F: Future<Output = Result<T, anyhow::Error>>,
{
fut.map(|r| r.map_err(|err| Arc::new(err))).shared()
fut.map(|r| r.map_err(|err| ArcError(Arc::new(err))))
.shared()
}
21 changes: 11 additions & 10 deletions aptos-move/aptos-workspace-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@
mod common;
mod services;

use anyhow::{Context, Result};
use anyhow::{anyhow, Context, Result};
use common::make_shared;
use futures::TryFutureExt;
use services::{
docker_common::create_docker_network, indexer_api::start_indexer_api,
docker_common::create_docker_network_permanent, indexer_api::start_indexer_api,
processors::start_all_processors,
};
use tokio_util::sync::CancellationToken;
Expand Down Expand Up @@ -73,9 +73,11 @@ pub async fn run_all_services() -> Result<()> {
);

// Docker Network
let docker_network_name = format!("aptos-workspace-{}", instance_id);
let (fut_docker_network, fut_docker_network_clean_up) =
create_docker_network(shutdown.clone(), docker_network_name);
let docker_network_name = "aptos-workspace".to_string();
let fut_docker_network = make_shared(create_docker_network_permanent(
shutdown.clone(),
docker_network_name,
));

// Indexer part 1: postgres db
let (fut_postgres, fut_postgres_finish, fut_postgres_clean_up) =
Expand Down Expand Up @@ -106,19 +108,18 @@ pub async fn run_all_services() -> Result<()> {
// Phase 2: Wait for all services to be up.
let all_services_up = async move {
tokio::try_join!(
fut_node_api.map_err(anyhow::Error::msg),
fut_indexer_grpc.map_err(anyhow::Error::msg),
fut_node_api.map_err(|err| anyhow!(err)),
fut_indexer_grpc.map_err(|err| anyhow!(err)),
fut_faucet,
fut_postgres.map_err(anyhow::Error::msg),
fut_all_processors_ready.map_err(anyhow::Error::msg),
fut_postgres.map_err(|err| anyhow!(err)),
fut_all_processors_ready.map_err(|err| anyhow!(err)),
fut_indexer_api,
)
};
let clean_up_all = async move {
eprintln!("Running shutdown steps");
fut_indexer_api_clean_up.await;
fut_postgres_clean_up.await;
fut_docker_network_clean_up.await;
};
tokio::select! {
_ = shutdown.cancelled() => {
Expand Down
86 changes: 82 additions & 4 deletions aptos-move/aptos-workspace-server/src/services/docker_common.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,68 @@
// Copyright (c) Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use crate::common::make_shared;
use crate::common::{make_shared, ArcError};
use anyhow::{anyhow, bail, Context, Result};
use aptos_localnet::docker;
use bollard::{
container::{CreateContainerOptions, InspectContainerOptions, StartContainerOptions},
image::CreateImageOptions,
network::CreateNetworkOptions,
secret::ContainerInspectResponse,
volume::CreateVolumeOptions,
};
use futures::TryStreamExt;
use std::{future::Future, sync::Arc};
use tokio::sync::Mutex;
use tokio_util::sync::CancellationToken;

/// Creates a permanent docker network which does not need to be cleaned up.
pub async fn create_docker_network_permanent(
shutdown: CancellationToken,
name: String,
) -> Result<String, anyhow::Error> {
let handle = tokio::spawn(async move {
let docker = tokio::select! {
_ = shutdown.cancelled() => {
bail!("failed to create docker network: cancelled")
}
res = docker::get_docker() => {
res.context("failed to create docker network")?
}
};

let res = docker
.create_network(CreateNetworkOptions {
name: name.clone(),
internal: false,
check_duplicate: true,
..Default::default()
})
.await;

match res {
Ok(_response) => {
println!("Created docker network {}", name);

Ok(name)
},
Err(err) => match err {
bollard::errors::Error::DockerResponseServerError {
status_code: 409, ..
} => {
println!("Docker network {} already eixsts, not creating it", name);
Ok(name)
},
err => Err(err.into()),
},
}
});

handle
.await
.map_err(|err| anyhow!("failed to join task handle: {}", err))?
}

/// Creates a Docker network asynchronously and provides a cleanup task for network removal.
///
/// A cancellation token can be used to signal an early shutdown, allowing the creation task to
Expand All @@ -28,11 +77,12 @@ use tokio_util::sync::CancellationToken;
///
/// Note that the cleanup is a "best-effort" operation -- success is not guaranteed due to
/// reliance on external commands, which may fail for various reasons.
#[allow(unused)]
pub fn create_docker_network(
shutdown: CancellationToken,
name: String,
) -> (
impl Future<Output = Result<String, Arc<anyhow::Error>>> + Clone,
impl Future<Output = Result<String, ArcError>> + Clone,
impl Future<Output = ()>,
) {
// Flag indicating whether cleanup is needed.
Expand Down Expand Up @@ -132,7 +182,7 @@ pub fn create_docker_volume(
shutdown: CancellationToken,
name: String,
) -> (
impl Future<Output = Result<String, Arc<anyhow::Error>>> + Clone,
impl Future<Output = Result<String, ArcError>> + Clone,
impl Future<Output = ()>,
) {
// Flag indicating whether cleanup is needed.
Expand Down Expand Up @@ -234,7 +284,7 @@ pub fn create_start_and_inspect_container(
options: CreateContainerOptions<String>,
config: bollard::container::Config<String>,
) -> (
impl Future<Output = Result<Arc<ContainerInspectResponse>, Arc<anyhow::Error>>> + Clone,
impl Future<Output = Result<Arc<ContainerInspectResponse>, ArcError>> + Clone,
impl Future<Output = ()>,
) {
#[derive(PartialEq, Eq, Clone, Copy)]
Expand Down Expand Up @@ -266,6 +316,34 @@ pub fn create_start_and_inspect_container(
}
};

let image_name = config.image.as_ref().unwrap();
match docker.inspect_image(image_name).await {
Ok(_) => {
println!("Docker image {} already exists", image_name);
},
Err(_err) => {
println!(
"Docker image {} does not exist. Pulling image..",
image_name
);

docker
.create_image(
Some(CreateImageOptions {
from_image: image_name.clone(),
..Default::default()
}),
None,
None,
)
.try_collect::<Vec<_>>()
.await
.context("failed to create docker container")?;

println!("Pulled docker image {}", image_name);
},
}

let mut state = state.lock().await;

*state = State::Created;
Expand Down
10 changes: 4 additions & 6 deletions aptos-move/aptos-workspace-server/src/services/faucet.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
// Copyright (c) Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use crate::common::IP_LOCAL_HOST;
use crate::common::{ArcError, IP_LOCAL_HOST};
use anyhow::{anyhow, Context, Result};
use aptos_faucet_core::server::{FunderKeyEnum, RunConfig};
use aptos_localnet::health_checker::HealthChecker;
use futures::channel::oneshot;
use std::{future::Future, path::PathBuf, sync::Arc};
use std::{future::Future, path::PathBuf};
use url::Url;

/// Starts the faucet service.
Expand All @@ -21,8 +21,8 @@ use url::Url;
/// there is an error.
pub fn start_faucet(
test_dir: PathBuf,
fut_node_api: impl Future<Output = Result<u16, Arc<anyhow::Error>>> + Send + 'static,
fut_indexer_grpc: impl Future<Output = Result<u16, Arc<anyhow::Error>>> + Send + 'static,
fut_node_api: impl Future<Output = Result<u16, ArcError>> + Send + 'static,
fut_indexer_grpc: impl Future<Output = Result<u16, ArcError>> + Send + 'static,
) -> (
impl Future<Output = Result<u16>>,
impl Future<Output = Result<()>> + 'static,
Expand All @@ -32,12 +32,10 @@ pub fn start_faucet(
let handle_faucet = tokio::spawn(async move {
let api_port = fut_node_api
.await
.map_err(anyhow::Error::msg)
.context("failed to start faucet: node api did not start successfully")?;

fut_indexer_grpc
.await
.map_err(anyhow::Error::msg)
.context("failed to start faucet: indexer grpc did not start successfully")?;

println!("Starting faucet..");
Expand Down
30 changes: 9 additions & 21 deletions aptos-move/aptos-workspace-server/src/services/indexer_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use super::{
docker_common::create_start_and_inspect_container,
postgres::get_postgres_connection_string_within_docker_network,
};
use crate::common::{make_shared, IP_LOCAL_HOST};
use crate::common::{make_shared, ArcError, IP_LOCAL_HOST};
use anyhow::{anyhow, Context, Result};
use aptos_localnet::{
docker,
Expand Down Expand Up @@ -124,15 +124,9 @@ fn create_container_options_and_config(
pub fn start_indexer_api(
instance_id: Uuid,
shutdown: CancellationToken,
fut_docker_network: impl Future<Output = Result<String, Arc<anyhow::Error>>>
+ Clone
+ Send
+ 'static,
fut_postgres: impl Future<Output = Result<u16, Arc<anyhow::Error>>> + Clone + Send + 'static,
fut_all_processors_ready: impl Future<Output = Result<(), Arc<anyhow::Error>>>
+ Clone
+ Send
+ 'static,
fut_docker_network: impl Future<Output = Result<String, ArcError>> + Clone + Send + 'static,
fut_postgres: impl Future<Output = Result<u16, ArcError>> + Clone + Send + 'static,
fut_all_processors_ready: impl Future<Output = Result<(), ArcError>> + Clone + Send + 'static,
) -> (
impl Future<Output = Result<u16>>,
impl Future<Output = Result<()>>,
Expand All @@ -144,15 +138,10 @@ pub fn start_indexer_api(
let fut_container_clean_up = fut_container_clean_up.clone();

async move {
let (docker_network_name, _postgres_port, _) = try_join!(
fut_docker_network,
fut_postgres,
fut_all_processors_ready
)
.map_err(anyhow::Error::msg)
.context(
"failed to start indexer api server: one or more dependencies failed to start",
)?;
let (docker_network_name, _postgres_port, _) =
try_join!(fut_docker_network, fut_postgres, fut_all_processors_ready).context(
"failed to start indexer api server: one or more dependencies failed to start",
)?;

println!("Starting indexer API..");

Expand All @@ -164,7 +153,6 @@ pub fn start_indexer_api(

let container_info = fut_container
.await
.map_err(anyhow::Error::msg)
.context("failed to start indexer api server")?;

let indexer_api_port = get_hasura_assigned_port(&container_info)
Expand All @@ -178,7 +166,7 @@ pub fn start_indexer_api(
let fut_create_indexer_api = fut_create_indexer_api.clone();

async move {
let indexer_api_port = fut_create_indexer_api.await.map_err(anyhow::Error::msg)?;
let indexer_api_port = fut_create_indexer_api.await?;

let url =
Url::parse(&format!("http://{}:{}", IP_LOCAL_HOST, indexer_api_port)).unwrap();
Expand Down
12 changes: 4 additions & 8 deletions aptos-move/aptos-workspace-server/src/services/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// SPDX-License-Identifier: Apache-2.0

use crate::{
common::{make_shared, IP_LOCAL_HOST},
common::{make_shared, ArcError, IP_LOCAL_HOST},
services::docker_common::{create_docker_volume, create_start_and_inspect_container},
};
use anyhow::{anyhow, Context, Result};
Expand Down Expand Up @@ -161,7 +161,7 @@ fn create_container_options_and_config(
/// as it relies on external commands that may fail for various reasons.
pub fn start_postgres(
shutdown: CancellationToken,
fut_network: impl Future<Output = Result<String, Arc<anyhow::Error>>>,
fut_network: impl Future<Output = Result<String, ArcError>>,
instance_id: Uuid,
) -> (
impl Future<Output = Result<u16>>,
Expand All @@ -180,7 +180,6 @@ pub fn start_postgres(

async move {
let (network_name, volume_name) = try_join!(fut_network, fut_volume)
.map_err(anyhow::Error::msg)
.context("failed to start postgres: one or more dependencies failed to start")?;

let (options, config) =
Expand All @@ -189,10 +188,7 @@ pub fn start_postgres(
create_start_and_inspect_container(shutdown.clone(), options, config);
*fut_container_clean_up.lock().await = Some(fut_container_cleanup);

let container_info = fut_container
.await
.map_err(anyhow::Error::msg)
.context("failed to start postgres")?;
let container_info = fut_container.await.context("failed to start postgres")?;

let postgres_port = get_postgres_assigned_port(&container_info)
.ok_or_else(|| anyhow!("failed to get postgres port"))?;
Expand All @@ -205,7 +201,7 @@ pub fn start_postgres(
let fut_create_postgres = fut_create_postgres.clone();

async move {
let postgres_port = fut_create_postgres.await.map_err(anyhow::Error::msg)?;
let postgres_port = fut_create_postgres.await?;

let health_checker =
HealthChecker::Postgres(get_postgres_connection_string(postgres_port));
Expand Down
Loading

0 comments on commit 6b4ce2a

Please sign in to comment.