Skip to content

Commit

Permalink
feat: v2 migration Cloudflare KV
Browse files Browse the repository at this point in the history
  • Loading branch information
chris13524 committed Jun 24, 2024
1 parent 7c8cd49 commit 71c3a74
Show file tree
Hide file tree
Showing 8 changed files with 178 additions and 2 deletions.
86 changes: 86 additions & 0 deletions src/attestation_store/cf_kv.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
use {
super::{AttestationStore, Result},
async_trait::async_trait,
serde::Serialize,
std::time::Duration,
};

const ATTESTATION_TTL_SECS: usize = 300;

pub struct CloudflareKv {
pub account_id: String,
pub namespace_id: String,
pub bearer_token: String,
pub http_client: reqwest::Client,
}

impl CloudflareKv {
pub fn new(account_id: String, namespace_id: String, bearer_token: String) -> Self {
Self {
account_id,
namespace_id,
bearer_token,
http_client: reqwest::Client::new(),
}
}
}

#[derive(Serialize)]
struct SetBulkBody<'a> {
expiration: usize,
key: &'a str,
value: &'a str,
}

#[async_trait]
impl AttestationStore for CloudflareKv {
async fn set_attestation(&self, id: &str, origin: &str) -> Result<()> {
let url = format!(
"https://api.cloudflare.com/client/v4/accounts/{account_id}/storage/kv/namespaces/{namespace_id}/bulk",
account_id = self.account_id, namespace_id = self.namespace_id
);
let res = self
.http_client
.put(&url)
.bearer_auth(&self.bearer_token)
.json(&vec![SetBulkBody {
expiration: ATTESTATION_TTL_SECS,
key: id,
value: origin,
}])
.timeout(Duration::from_secs(1))
.send()
.await?;
if res.status().is_success() {
Ok(())
} else {
Err(anyhow::anyhow!(
"Failed to set attestation: status:{} response body:{:?}",
res.status(),
res.text().await
))
}
}

async fn get_attestation(&self, id: &str) -> Result<Option<String>> {
let url = format!(
"https://api.cloudflare.com/client/v4/accounts/{account_id}/storage/kv/namespaces/{namespace_id}/values/{id}",
account_id = self.account_id, namespace_id = self.namespace_id
);
let response = self
.http_client
.get(&url)
.bearer_auth(&self.bearer_token)
.timeout(Duration::from_secs(1))
.send()
.await?;
// TODO what is the status code for a key not found?
// TODO for not-key not found errors throw error instead of None
if response.status().is_success() {
let value = response.text().await?;
Ok(Some(value))
} else {
Ok(None)
}
}
}
33 changes: 33 additions & 0 deletions src/attestation_store/migration.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
use {
super::{cf_kv::CloudflareKv, AttestationStore, Result},
crate::util::redis,
async_trait::async_trait,
};

pub struct MigrationStore {
redis: redis::Adapter,
cf_kv: CloudflareKv,
}

impl MigrationStore {
pub fn new(redis: redis::Adapter, cf_kv: CloudflareKv) -> Self {
Self { redis, cf_kv }
}
}

#[async_trait]
impl AttestationStore for MigrationStore {
async fn set_attestation(&self, id: &str, origin: &str) -> Result<()> {
let redis_fut = self.redis.set_attestation(id, origin);
let cf_kv_fut = self.cf_kv.set_attestation(id, origin);
tokio::try_join!(redis_fut, cf_kv_fut).map(|_| ())
}

async fn get_attestation(&self, id: &str) -> Result<Option<String>> {
if let Some(attestation) = self.redis.get_attestation(id).await? {
Ok(Some(attestation))
} else {
self.cf_kv.get_attestation(id).await
}
}
}
2 changes: 2 additions & 0 deletions src/attestation_store/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
pub mod cf_kv;
pub mod migration;
pub mod redis;

use async_trait::async_trait;
Expand Down
18 changes: 16 additions & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use {
AXUM_HTTP_REQUESTS_DURATION_SECONDS,
},
bouncer::{
attestation_store::{cf_kv::CloudflareKv, migration::MigrationStore},
event_sink,
http_server::{RequestInfo, ServerConfig},
project_registry::{self, CachedExt as _},
Expand Down Expand Up @@ -55,6 +56,10 @@ pub struct Configuration {
pub data_api_auth_token: String,
pub scam_guard_cache_url: String,

pub cf_kv_account_id: String,
pub cf_kv_namespace_id: String,
pub cf_kv_bearer_token: String,

pub secret: String,

pub s3_endpoint: Option<String>,
Expand Down Expand Up @@ -99,8 +104,17 @@ async fn main() -> Result<(), anyhow::Error> {
.install_recorder()
.context("Failed to install Prometheus metrics recorder")?;

let attestation_store = redis::new("attestation_store", config.attestation_cache_url.clone())
.context("Failed to initialize AttestationStore")?;
let attestation_store = {
let redis_attestation_store =
redis::new("attestation_store", config.attestation_cache_url.clone())
.context("Failed to initialize AttestationStore")?;
let cf_kv_attestation_store = CloudflareKv::new(
config.cf_kv_account_id,
config.cf_kv_namespace_id,
config.cf_kv_bearer_token,
);
MigrationStore::new(redis_attestation_store, cf_kv_attestation_store)
};

let project_registry_cache = redis::new(
"project_registry_cache",
Expand Down
4 changes: 4 additions & 0 deletions terraform/ecs/cluster.tf
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ resource "aws_ecs_task_definition" "app_task" {
{ name = "PROJECT_REGISTRY_CACHE_URL", value = var.project_registry_cache_url },
{ name = "SCAM_GUARD_CACHE_URL", value = var.scam_guard_cache_url },

{ name = "CF_KV_ACCOUNT_ID", value = var.cf_kv_account_id },
{ name = "CF_KV_NAMESPACE_ID", value = var.cf_kv_namespace_id },
{ name = "CF_KV_BEARER_TOKEN", value = var.cf_kv_bearer_token },

{ name = "DATA_LAKE_BUCKET", value = var.analytics_datalake_bucket_name },

{ name = "BLOCKED_COUNTRIES", value = var.ofac_blocked_countries },
Expand Down
15 changes: 15 additions & 0 deletions terraform/ecs/variables.tf
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,21 @@ variable "scam_guard_cache_url" {
type = string
}

variable "cf_kv_account_id" {
description = "The account ID of the Cloudflare KV store"
type = string
}

variable "cf_kv_namespace_id" {
description = "The namespace ID of the Cloudflare KV store"
type = string
}

variable "cf_kv_bearer_token" {
description = "The Cloudflare API bearer token"
type = string
}

variable "ofac_blocked_countries" {
description = "The list of countries under OFAC sanctions"
type = string
Expand Down
4 changes: 4 additions & 0 deletions terraform/res_ecs.tf
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ module "ecs" {
project_registry_cache_url = "redis://${module.redis.endpoint}/1"
scam_guard_cache_url = "redis://${module.redis.endpoint}/2"

cf_kv_account_id = var.cf_kv_account_id
cf_kv_namespace_id = var.cf_kv_namespace_id
cf_kv_bearer_token = var.cf_kv_bearer_token

ofac_blocked_countries = var.ofac_blocked_countries

# Analytics
Expand Down
18 changes: 18 additions & 0 deletions terraform/variables.tf
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,24 @@ variable "ofac_blocked_countries" {
default = ""
}

#-------------------------------------------------------------------------------
# Cloudflare KV for V2 migration

variable "cf_kv_account_id" {
description = "The account ID of the Cloudflare KV store"
type = string
}

variable "cf_kv_namespace_id" {
description = "The namespace ID of the Cloudflare KV store"
type = string
}

variable "cf_kv_bearer_token" {
description = "The Cloudflare API bearer token"
type = string
}


#-------------------------------------------------------------------------------
# Project Registry
Expand Down

0 comments on commit 71c3a74

Please sign in to comment.