Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update TaskManager to support nesting #794

Merged
merged 2 commits into from
Apr 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ tokio = { version = "1", default-features = false, features = [
"fs",
"macros",
"signal",
"sync",
"rt-multi-thread",
"rt",
"process",
Expand Down
1 change: 1 addition & 0 deletions boost_manager/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ impl Server {
.add_task(watcher)
.add_task(updater)
.add_task(purger)
.build()
.start()
.await
}
Expand Down
1 change: 1 addition & 0 deletions ingest/src/server_iot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,7 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> {
.add_task(beacon_report_sink_server)
.add_task(witness_report_sink_server)
.add_task(grpc_server)
.build()
.start()
.await
}
1 change: 1 addition & 0 deletions ingest/src/server_mobile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,7 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> {
.add_task(invalidated_radio_threshold_report_sink_server)
.add_task(coverage_object_report_sink_server)
.add_task(grpc_server)
.build()
.start()
.await
}
54 changes: 45 additions & 9 deletions ingest/tests/iot_ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,11 @@ async fn initialize_session_and_send_beacon_and_witness() {
.run_until(async move {
tokio::task::spawn_local(async move {
let server = create_test_server(port, beacon_client, witness_client, None, None);
TaskManager::builder().add_task(server).start().await
TaskManager::builder()
.add_task(server)
.build()
.start()
.await
});

let pub_key = generate_keypair();
Expand Down Expand Up @@ -77,7 +81,11 @@ async fn stream_stops_after_incorrectly_signed_init_request() {
.run_until(async move {
tokio::task::spawn_local(async move {
let server = create_test_server(port, beacon_client, witness_client, None, None);
TaskManager::builder().add_task(server).start().await
TaskManager::builder()
.add_task(server)
.build()
.start()
.await
});

let pub_key = generate_keypair();
Expand Down Expand Up @@ -111,7 +119,11 @@ async fn stream_stops_after_incorrectly_signed_beacon() {
.run_until(async move {
tokio::task::spawn_local(async move {
let server = create_test_server(port, beacon_client, witness_client, None, None);
TaskManager::builder().add_task(server).start().await
TaskManager::builder()
.add_task(server)
.build()
.start()
.await
});

let pub_key = generate_keypair();
Expand Down Expand Up @@ -148,7 +160,11 @@ async fn stream_stops_after_incorrect_beacon_pubkey() {
.run_until(async move {
tokio::task::spawn_local(async move {
let server = create_test_server(port, beacon_client, witness_client, None, None);
TaskManager::builder().add_task(server).start().await
TaskManager::builder()
.add_task(server)
.build()
.start()
.await
});

let pub_key = generate_keypair();
Expand Down Expand Up @@ -188,7 +204,11 @@ async fn stream_stops_after_incorrectly_signed_witness() {
.run_until(async move {
tokio::task::spawn_local(async move {
let server = create_test_server(port, beacon_client, witness_client, None, None);
TaskManager::builder().add_task(server).start().await
TaskManager::builder()
.add_task(server)
.build()
.start()
.await
});

let pub_key = generate_keypair();
Expand Down Expand Up @@ -225,7 +245,11 @@ async fn stream_stops_after_incorrect_witness_pubkey() {
.run_until(async move {
tokio::task::spawn_local(async move {
let server = create_test_server(port, beacon_client, witness_client, None, None);
TaskManager::builder().add_task(server).start().await
TaskManager::builder()
.add_task(server)
.build()
.start()
.await
});

let pub_key = generate_keypair();
Expand Down Expand Up @@ -265,7 +289,11 @@ async fn stream_stop_if_client_attempts_to_initiliaze_2nd_session() {
.run_until(async move {
tokio::task::spawn_local(async move {
let server = create_test_server(port, beacon_client, witness_client, None, None);
TaskManager::builder().add_task(server).start().await
TaskManager::builder()
.add_task(server)
.build()
.start()
.await
});

let pub_key = generate_keypair();
Expand Down Expand Up @@ -316,7 +344,11 @@ async fn stream_stops_if_init_not_sent_within_timeout() {
tokio::task::spawn_local(async move {
let server =
create_test_server(port, beacon_client, witness_client, Some(500), None);
TaskManager::builder().add_task(server).start().await
TaskManager::builder()
.add_task(server)
.build()
.start()
.await
});

let mut client = connect_and_stream(port).await;
Expand All @@ -338,7 +370,11 @@ async fn stream_stops_on_session_timeout() {
tokio::task::spawn_local(async move {
let server =
create_test_server(port, beacon_client, witness_client, Some(500), Some(900));
TaskManager::builder().add_task(server).start().await
TaskManager::builder()
.add_task(server)
.build()
.start()
.await
});

let mut client = connect_and_stream(port).await;
Expand Down
1 change: 1 addition & 0 deletions iot_config/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ impl Daemon {
TaskManager::builder()
.add_task(grpc_server)
.add_task(db_cleaner)
.build()
.start()
.await
}
Expand Down
1 change: 1 addition & 0 deletions iot_packet_verifier/src/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ impl Cmd {
.add_task(verifier_daemon)
.add_task(burner)
.add_task(report_files_server)
.build()
.start()
.await
}
Expand Down
1 change: 1 addition & 0 deletions iot_verifier/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,7 @@ impl Server {
.add_task(pk_loader_server)
.add_task(entropy_loader_server)
.add_task(rewarder)
.build()
.start()
.await
}
Expand Down
6 changes: 5 additions & 1 deletion mobile_config/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,11 @@ impl Daemon {
hex_boosting_svc,
};

TaskManager::builder().add_task(grpc_server).start().await
TaskManager::builder()
.add_task(grpc_server)
.build()
.start()
.await
}
}

Expand Down
1 change: 1 addition & 0 deletions mobile_packet_verifier/src/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ impl Cmd {
.add_task(reports_server)
.add_task(event_id_purger)
.add_task(daemon)
.build()
.start()
.await
}
Expand Down
1 change: 1 addition & 0 deletions mobile_verifier/src/cli/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,7 @@ impl Cmd {
.add_task(radio_threshold_ingest_server)
.add_task(invalidated_radio_threshold_ingest_server)
.add_task(data_session_ingestor)
.build()
.start()
.await
}
Expand Down
1 change: 1 addition & 0 deletions price/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ impl Server {
.add_task(hnt_price_generator)
.add_task(mobile_price_generator)
.add_task(iot_price_generator)
.build()
.start()
.await
}
Expand Down
Loading
Loading