diff --git a/src/attestation_store/cf_kv.rs b/src/attestation_store/cf_kv.rs new file mode 100644 index 0000000..badbf86 --- /dev/null +++ b/src/attestation_store/cf_kv.rs @@ -0,0 +1,86 @@ +use { + super::{AttestationStore, Result}, + async_trait::async_trait, + serde::Serialize, + std::time::Duration, +}; + +const ATTESTATION_TTL_SECS: usize = 300; + +pub struct CloudflareKv { + pub account_id: String, + pub namespace_id: String, + pub bearer_token: String, + pub http_client: reqwest::Client, +} + +impl CloudflareKv { + pub fn new(account_id: String, namespace_id: String, bearer_token: String) -> Self { + Self { + account_id, + namespace_id, + bearer_token, + http_client: reqwest::Client::new(), + } + } +} + +#[derive(Serialize)] +struct SetBulkBody<'a> { + expiration: usize, + key: &'a str, + value: &'a str, +} + +#[async_trait] +impl AttestationStore for CloudflareKv { + async fn set_attestation(&self, id: &str, origin: &str) -> Result<()> { + let url = format!( + "https://api.cloudflare.com/client/v4/accounts/{account_id}/storage/kv/namespaces/{namespace_id}/bulk", + account_id = self.account_id, namespace_id = self.namespace_id + ); + let res = self + .http_client + .put(&url) + .bearer_auth(&self.bearer_token) + .json(&vec![SetBulkBody { + expiration: ATTESTATION_TTL_SECS, + key: id, + value: origin, + }]) + .timeout(Duration::from_secs(1)) + .send() + .await?; + if res.status().is_success() { + Ok(()) + } else { + Err(anyhow::anyhow!( + "Failed to set attestation: status:{} response body:{:?}", + res.status(), + res.text().await + )) + } + } + + async fn get_attestation(&self, id: &str) -> Result> { + let url = format!( + "https://api.cloudflare.com/client/v4/accounts/{account_id}/storage/kv/namespaces/{namespace_id}/values/{id}", + account_id = self.account_id, namespace_id = self.namespace_id + ); + let response = self + .http_client + .get(&url) + .bearer_auth(&self.bearer_token) + .timeout(Duration::from_secs(1)) + .send() + .await?; + // TODO what is the status code for a key not found? + // TODO for not-key not found errors throw error instead of None + if response.status().is_success() { + let value = response.text().await?; + Ok(Some(value)) + } else { + Ok(None) + } + } +} diff --git a/src/attestation_store/migration.rs b/src/attestation_store/migration.rs new file mode 100644 index 0000000..1c9fd99 --- /dev/null +++ b/src/attestation_store/migration.rs @@ -0,0 +1,33 @@ +use { + super::{cf_kv::CloudflareKv, AttestationStore, Result}, + crate::util::redis, + async_trait::async_trait, +}; + +pub struct MigrationStore { + redis: redis::Adapter, + cf_kv: CloudflareKv, +} + +impl MigrationStore { + pub fn new(redis: redis::Adapter, cf_kv: CloudflareKv) -> Self { + Self { redis, cf_kv } + } +} + +#[async_trait] +impl AttestationStore for MigrationStore { + async fn set_attestation(&self, id: &str, origin: &str) -> Result<()> { + let redis_fut = self.redis.set_attestation(id, origin); + let cf_kv_fut = self.cf_kv.set_attestation(id, origin); + tokio::try_join!(redis_fut, cf_kv_fut).map(|_| ()) + } + + async fn get_attestation(&self, id: &str) -> Result> { + if let Some(attestation) = self.redis.get_attestation(id).await? { + Ok(Some(attestation)) + } else { + self.cf_kv.get_attestation(id).await + } + } +} diff --git a/src/attestation_store/mod.rs b/src/attestation_store/mod.rs index f92eaf2..16adea2 100644 --- a/src/attestation_store/mod.rs +++ b/src/attestation_store/mod.rs @@ -1,3 +1,5 @@ +pub mod cf_kv; +pub mod migration; pub mod redis; use async_trait::async_trait; diff --git a/src/main.rs b/src/main.rs index 1ccbd8b..e89e20a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -8,6 +8,7 @@ use { AXUM_HTTP_REQUESTS_DURATION_SECONDS, }, bouncer::{ + attestation_store::{cf_kv::CloudflareKv, migration::MigrationStore}, event_sink, http_server::{RequestInfo, ServerConfig}, project_registry::{self, CachedExt as _}, @@ -55,6 +56,10 @@ pub struct Configuration { pub data_api_auth_token: String, pub scam_guard_cache_url: String, + pub cf_kv_account_id: String, + pub cf_kv_namespace_id: String, + pub cf_kv_bearer_token: String, + pub secret: String, pub s3_endpoint: Option, @@ -99,8 +104,17 @@ async fn main() -> Result<(), anyhow::Error> { .install_recorder() .context("Failed to install Prometheus metrics recorder")?; - let attestation_store = redis::new("attestation_store", config.attestation_cache_url.clone()) - .context("Failed to initialize AttestationStore")?; + let attestation_store = { + let redis_attestation_store = + redis::new("attestation_store", config.attestation_cache_url.clone()) + .context("Failed to initialize AttestationStore")?; + let cf_kv_attestation_store = CloudflareKv::new( + config.cf_kv_account_id, + config.cf_kv_namespace_id, + config.cf_kv_bearer_token, + ); + MigrationStore::new(redis_attestation_store, cf_kv_attestation_store) + }; let project_registry_cache = redis::new( "project_registry_cache", diff --git a/terraform/ecs/cluster.tf b/terraform/ecs/cluster.tf index 2b0ed8f..302cb50 100644 --- a/terraform/ecs/cluster.tf +++ b/terraform/ecs/cluster.tf @@ -86,6 +86,10 @@ resource "aws_ecs_task_definition" "app_task" { { name = "PROJECT_REGISTRY_CACHE_URL", value = var.project_registry_cache_url }, { name = "SCAM_GUARD_CACHE_URL", value = var.scam_guard_cache_url }, + { name = "CF_KV_ACCOUNT_ID", value = var.cf_kv_account_id }, + { name = "CF_KV_NAMESPACE_ID", value = var.cf_kv_namespace_id }, + { name = "CF_KV_BEARER_TOKEN", value = var.cf_kv_bearer_token }, + { name = "DATA_LAKE_BUCKET", value = var.analytics_datalake_bucket_name }, { name = "BLOCKED_COUNTRIES", value = var.ofac_blocked_countries }, diff --git a/terraform/ecs/variables.tf b/terraform/ecs/variables.tf index 2205d2a..764ff95 100644 --- a/terraform/ecs/variables.tf +++ b/terraform/ecs/variables.tf @@ -152,6 +152,21 @@ variable "scam_guard_cache_url" { type = string } +variable "cf_kv_account_id" { + description = "The account ID of the Cloudflare KV store" + type = string +} + +variable "cf_kv_namespace_id" { + description = "The namespace ID of the Cloudflare KV store" + type = string +} + +variable "cf_kv_bearer_token" { + description = "The Cloudflare API bearer token" + type = string +} + variable "ofac_blocked_countries" { description = "The list of countries under OFAC sanctions" type = string diff --git a/terraform/res_ecs.tf b/terraform/res_ecs.tf index d93e4a2..9e5d833 100644 --- a/terraform/res_ecs.tf +++ b/terraform/res_ecs.tf @@ -65,6 +65,10 @@ module "ecs" { project_registry_cache_url = "redis://${module.redis.endpoint}/1" scam_guard_cache_url = "redis://${module.redis.endpoint}/2" + cf_kv_account_id = var.cf_kv_account_id + cf_kv_namespace_id = var.cf_kv_namespace_id + cf_kv_bearer_token = var.cf_kv_bearer_token + ofac_blocked_countries = var.ofac_blocked_countries # Analytics diff --git a/terraform/variables.tf b/terraform/variables.tf index e364e94..b8c3320 100644 --- a/terraform/variables.tf +++ b/terraform/variables.tf @@ -75,6 +75,24 @@ variable "ofac_blocked_countries" { default = "" } +#------------------------------------------------------------------------------- +# Cloudflare KV for V2 migration + +variable "cf_kv_account_id" { + description = "The account ID of the Cloudflare KV store" + type = string +} + +variable "cf_kv_namespace_id" { + description = "The namespace ID of the Cloudflare KV store" + type = string +} + +variable "cf_kv_bearer_token" { + description = "The Cloudflare API bearer token" + type = string +} + #------------------------------------------------------------------------------- # Project Registry