diff --git a/Cargo.lock b/Cargo.lock index 42f1f935b..80eae615e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -210,6 +210,42 @@ version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "89b2fd2a0dcf38d7971e2194b6b6eebab45ae01067456a7fd93d5547a61b70be" +[[package]] +name = "cached" +version = "0.44.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b195e4fbc4b6862bbd065b991a34750399c119797efff72492f28a5864de8700" +dependencies = [ + "async-trait", + "cached_proc_macro", + "cached_proc_macro_types", + "futures", + "hashbrown 0.13.2", + "instant", + "once_cell", + "thiserror", + "tokio", +] + +[[package]] +name = "cached_proc_macro" +version = "0.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b48814962d2fd604c50d2b9433c2a41a0ab567779ee2c02f7fba6eca1221f082" +dependencies = [ + "cached_proc_macro_types", + "darling", + "proc-macro2", + "quote", + "syn 1.0.109", +] + +[[package]] +name = "cached_proc_macro_types" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3a4f925191b4367301851c6d99b09890311d74b0d43f274c0b34c86d308a3663" + [[package]] name = "cargo-lock" version = "9.0.0" @@ -428,7 +464,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "907076dfda823b0b36d2a1bb5f90c96660a5bbcd7729e10727f07858f22c4edc" dependencies = [ "cfg-if", - "hashbrown", + "hashbrown 0.12.3", "lock_api", "once_cell", "parking_lot_core", @@ -527,7 +563,7 @@ checksum = "96beaf9d35dbc4686bc86a4ecb851fd6a406f0bf32d9f646b1225a5c5cf5b5d7" dependencies = [ "env_logger", "fxhash", - "hashbrown", + "hashbrown 0.12.3", "indexmap", "instant", "log", @@ -814,6 +850,12 @@ dependencies = [ "ahash", ] +[[package]] +name = "hashbrown" +version = "0.13.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "43a3c133739dddd0d2990f9a4bdf8eb4b21ef50e4851ca85ab661199821d510e" + [[package]] name = "headers" version = "0.3.8" @@ -1040,7 +1082,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bd070e393353796e801d209ad339e89596eb4c8d430d18ede6a1cced8fafbd99" dependencies = [ "autocfg", - "hashbrown", + "hashbrown 0.12.3", ] [[package]] @@ -1899,6 +1941,7 @@ name = "qcs" version = "0.15.4" dependencies = [ "built", + "cached", "derive_builder", "dirs", "enum-as-inner", @@ -2706,7 +2749,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "32bf088d1d7df2b2b6711b06da3471bc86677383c57b27251e18c56df8deac14" dependencies = [ "ahash", - "hashbrown", + "hashbrown 0.12.3", ] [[package]] diff --git a/crates/lib/Cargo.toml b/crates/lib/Cargo.toml index 5a2b18488..6128e2138 100644 --- a/crates/lib/Cargo.toml +++ b/crates/lib/Cargo.toml @@ -16,6 +16,7 @@ tracing-config = ["tracing", "qcs-api-client-common/tracing-config", "qcs-api-cl otel-tracing = ["tracing-config", "qcs-api-client-grpc/otel-tracing", "qcs-api-client-openapi/otel-tracing"] [dependencies] +cached = "0.44.0" dirs = "5.0.0" enum-as-inner = "0.5.1" futures = "0.3.24" @@ -43,8 +44,6 @@ uuid = { version = "1.2.1", features = ["v4"] } tonic = { version = "0.9.2", features = ["tls", "tls-roots"] } zmq = { version = "0.10.0" } itertools = "0.11.0" -rstest = "0.17.0" -insta = "1.29.0" derive_builder = "0.12.0" [dev-dependencies] @@ -60,6 +59,8 @@ warp = { version = "0.3.3", default-features = false } regex = "1.7.0" test-case = "3.1.0" tracing-subscriber = "0.3.17" +rstest = "0.17.0" +insta = "1.29.0" [build-dependencies] built = "0.6.1" diff --git a/crates/lib/src/qpu/api.rs b/crates/lib/src/qpu/api.rs index 9bfd3ce26..e9cb42aab 100644 --- a/crates/lib/src/qpu/api.rs +++ b/crates/lib/src/qpu/api.rs @@ -3,6 +3,7 @@ use std::{fmt, time::Duration}; +use cached::proc_macro::cached; use derive_builder::Builder; use qcs_api_client_common::{configuration::RefreshError, ClientConfiguration}; pub use qcs_api_client_grpc::channel::Error as GrpcError; @@ -22,7 +23,8 @@ use qcs_api_client_grpc::{ pub use qcs_api_client_openapi::apis::Error as OpenApiError; use qcs_api_client_openapi::apis::{ endpoints_api::{ - get_default_endpoint, get_endpoint, GetDefaultEndpointError, GetEndpointError, + get_default_endpoint as api_get_default_endpoint, get_endpoint, GetDefaultEndpointError, + GetEndpointError, }, quantum_processors_api::{ list_quantum_processor_accessors, ListQuantumProcessorAccessorsError, @@ -307,38 +309,7 @@ impl ExecutionOptions { quantum_processor_id: &str, client: &Qcs, ) -> Result { - let mut min = None; - let mut next_page_token = None; - loop { - let accessors = list_quantum_processor_accessors( - &client.get_openapi_client(), - quantum_processor_id, - Some(100), - next_page_token.as_deref(), - ) - .await?; - - let accessor = accessors - .accessors - .into_iter() - .filter(|acc| { - acc.live - // `as_deref` needed to work around the `Option>` type. - && acc.access_type.as_deref() == Some(&QuantumProcessorAccessorType::GatewayV1) - }) - .min_by_key(|acc| acc.rank.unwrap_or(i64::MAX)); - - min = std::cmp::min_by_key(min, accessor, |acc| { - acc.as_ref().and_then(|acc| acc.rank).unwrap_or(i64::MAX) - }); - - next_page_token = accessors.next_page_token.clone(); - if next_page_token.is_none() { - break; - } - } - min.map(|accessor| accessor.url) - .ok_or_else(|| QpuApiError::GatewayNotFound(quantum_processor_id.to_string())) + get_accessor_with_cache(quantum_processor_id, client).await } async fn get_default_endpoint_address( @@ -346,16 +317,92 @@ impl ExecutionOptions { quantum_processor_id: &str, client: &Qcs, ) -> Result { - let default_endpoint = - get_default_endpoint(&client.get_openapi_client(), quantum_processor_id).await?; - let addresses = default_endpoint.addresses.as_ref(); - let grpc_address = addresses.grpc.as_ref(); - grpc_address - .ok_or_else(|| QpuApiError::QpuEndpointNotFound(quantum_processor_id.into())) - .cloned() + get_default_endpoint_with_cache(quantum_processor_id, client).await } } +#[cached( + result = true, + time = 60, + time_refresh = true, + sync_writes = true, + key = "String", + convert = r"{ String::from(quantum_processor_id)}" +)] +async fn get_accessor_with_cache( + quantum_processor_id: &str, + client: &Qcs, +) -> Result { + #[cfg(feature = "tracing")] + tracing::info!(quantum_processor_id=%quantum_processor_id, "get_accessor cache miss"); + get_accessor(quantum_processor_id, client).await +} + +async fn get_accessor(quantum_processor_id: &str, client: &Qcs) -> Result { + let mut min = None; + let mut next_page_token = None; + loop { + let accessors = list_quantum_processor_accessors( + &client.get_openapi_client(), + quantum_processor_id, + Some(100), + next_page_token.as_deref(), + ) + .await?; + + let accessor = accessors + .accessors + .into_iter() + .filter(|acc| { + acc.live + // `as_deref` needed to work around the `Option>` type. + && acc.access_type.as_deref() == Some(&QuantumProcessorAccessorType::GatewayV1) + }) + .min_by_key(|acc| acc.rank.unwrap_or(i64::MAX)); + + min = std::cmp::min_by_key(min, accessor, |acc| { + acc.as_ref().and_then(|acc| acc.rank).unwrap_or(i64::MAX) + }); + + next_page_token = accessors.next_page_token.clone(); + if next_page_token.is_none() { + break; + } + } + min.map(|accessor| accessor.url) + .ok_or_else(|| QpuApiError::GatewayNotFound(quantum_processor_id.to_string())) +} + +#[cached( + result = true, + time = 60, + time_refresh = true, + sync_writes = true, + key = "String", + convert = r"{ String::from(quantum_processor_id)}" +)] +async fn get_default_endpoint_with_cache( + quantum_processor_id: &str, + client: &Qcs, +) -> Result { + #[cfg(feature = "tracing")] + tracing::info!(quantum_processor_id=%quantum_processor_id, "get_default_endpoint cache miss"); + get_default_endpoint(quantum_processor_id, client).await +} + +async fn get_default_endpoint( + quantum_processor_id: &str, + client: &Qcs, +) -> Result { + let default_endpoint = + api_get_default_endpoint(&client.get_openapi_client(), quantum_processor_id).await?; + let addresses = default_endpoint.addresses.as_ref(); + let grpc_address = addresses.grpc.as_ref(); + grpc_address + .ok_or_else(|| QpuApiError::QpuEndpointNotFound(quantum_processor_id.into())) + .cloned() +} + /// Errors that can occur while attempting to establish a connection to the QPU. #[derive(Debug, thiserror::Error)] pub enum QpuApiError { diff --git a/crates/lib/tests/mocked_qpu.rs b/crates/lib/tests/mocked_qpu.rs index edbf581ae..568d2e49b 100644 --- a/crates/lib/tests/mocked_qpu.rs +++ b/crates/lib/tests/mocked_qpu.rs @@ -3,6 +3,7 @@ use std::time::Duration; +use futures::future; use ndarray::arr2; use qcs::{ @@ -23,19 +24,51 @@ MEASURE 1 ro[1] const QPU_ID: &str = "Aspen-M-3"; -#[tokio::test] -async fn successful_bell_state() { +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn test_qcs_against_mocks() { + // Shared setup setup().await; + + let mut handles = Vec::new(); + for _ in 0..3 { + // Test direct access + handles.push(tokio::spawn(run_bell_state( + ConnectionStrategy::DirectAccess, + ))); + // Check gateway access + handles.push(tokio::spawn(run_bell_state(ConnectionStrategy::Gateway))); + } + + // Ensure both access methods were cached + future::try_join_all(handles).await.unwrap(); + assert_eq!( + 1, + mock_qcs::DEFAULT_ENDPOINT_CALL_COUNT.load(std::sync::atomic::Ordering::SeqCst) + ); + assert_eq!( + 1, + mock_qcs::ACCESSORS_CALL_COUNT.load(std::sync::atomic::Ordering::SeqCst) + ); +} + +async fn setup() { + simple_logger::init_with_env().unwrap(); + std::env::set_var(SETTINGS_PATH_VAR, "tests/settings.toml"); + std::env::set_var(SECRETS_PATH_VAR, "tests/secrets.toml"); + tokio::spawn(qpu::run()); + tokio::spawn(translation::run()); + tokio::spawn(auth_server::run()); + tokio::spawn(mock_qcs::run()); +} + +async fn run_bell_state(connection_strategy: ConnectionStrategy) { + let execution_options_direct_access = ExecutionOptionsBuilder::default() + .connection_strategy(connection_strategy) + .build() + .expect("should be valid execution options"); let result = Executable::from_quil(BELL_STATE) .with_shots(std::num::NonZeroU16::new(2).expect("value is non-zero")) - .execute_on_qpu( - QPU_ID, - None, - &ExecutionOptionsBuilder::default() - .connection_strategy(ConnectionStrategy::DirectAccess) - .build() - .expect("should be valid execution options"), - ) + .execute_on_qpu(QPU_ID, None, &execution_options_direct_access) .await .expect("Failed to run program that should be successful"); assert_eq!( @@ -52,16 +85,6 @@ async fn successful_bell_state() { assert_eq!(result.duration, Some(Duration::from_micros(8675))); } -async fn setup() { - simple_logger::init_with_env().unwrap(); - std::env::set_var(SETTINGS_PATH_VAR, "tests/settings.toml"); - std::env::set_var(SECRETS_PATH_VAR, "tests/secrets.toml"); - tokio::spawn(qpu::run()); - tokio::spawn(translation::run()); - tokio::spawn(auth_server::run()); - tokio::spawn(mock_qcs::run()); -} - #[allow(dead_code)] mod auth_server { use serde::{Deserialize, Serialize}; @@ -101,10 +124,17 @@ mod mock_qcs { use warp::Filter; use qcs_api_client_openapi::models::{ - InstructionSetArchitecture, TranslateNativeQuilToEncryptedBinaryRequest, - TranslateNativeQuilToEncryptedBinaryResponse, + InstructionSetArchitecture, ListQuantumProcessorAccessorsResponse, + QuantumProcessorAccessor, QuantumProcessorAccessorType, + TranslateNativeQuilToEncryptedBinaryRequest, TranslateNativeQuilToEncryptedBinaryResponse, }; + const MOCK_QPU_ADDRESS: &str = "http://127.0.0.1:8002"; + pub(crate) static DEFAULT_ENDPOINT_CALL_COUNT: std::sync::atomic::AtomicUsize = + std::sync::atomic::AtomicUsize::new(0); + pub(crate) static ACCESSORS_CALL_COUNT: std::sync::atomic::AtomicUsize = + std::sync::atomic::AtomicUsize::new(0); + use super::QPU_ID; #[derive(Debug, Deserialize)] @@ -149,14 +179,16 @@ mod mock_qcs { }) }); + use std::sync::atomic::Ordering::SeqCst; let default_endpoint = warp::path(QPU_ID) .and(warp::path("endpoints:getDefault")) .and(warp::get()) .map(|| { + DEFAULT_ENDPOINT_CALL_COUNT.fetch_add(1, SeqCst); let endpoint = json!({ "address": "", "addresses": { - "grpc": "http://127.0.0.1:8002", + "grpc": MOCK_QPU_ADDRESS, }, "datacenter": "west-1", "healthy": true, @@ -167,8 +199,26 @@ mod mock_qcs { warp::reply::json(&endpoint) }); - let quantum_processors = - warp::path("quantumProcessors").and(isa.or(translate).or(default_endpoint)); + let accessors = warp::path(QPU_ID) + .and(warp::path("accessors")) + .and(warp::get()) + .map(|| { + ACCESSORS_CALL_COUNT.fetch_add(1, SeqCst); + let rsp = ListQuantumProcessorAccessorsResponse { + accessors: vec![QuantumProcessorAccessor { + access_type: Some(Box::new(QuantumProcessorAccessorType::GatewayV1)), + live: true, + rank: Some(0), + id: Some(QPU_ID.to_string()), + url: MOCK_QPU_ADDRESS.into(), + }], + next_page_token: None, + }; + warp::reply::json(&rsp) + }); + + let quantum_processors = warp::path("quantumProcessors") + .and(isa.or(translate).or(default_endpoint).or(accessors)); warp::serve(warp::path("v1").and(quantum_processors)) .run(([127, 0, 0, 1], 8000)) @@ -317,6 +367,7 @@ mod qpu { let service = ControllerService::default(); Server::builder() .add_service(ControllerServer::new(service)) + // port must match MOCK_QPU_ADDRESS .serve("127.0.0.1:8002".parse().expect("address can be parsed")) .await .expect("service can be awaited");