Skip to content

Commit

Permalink
tuta-sdk blob store client for uploading
Browse files Browse the repository at this point in the history
Needed for the import (attachment upload).
the customids for some aggregated data transfer types
are generated in BlobAccessTokenFacade since they're not
encrypted and bypass the place where it's usually done.
  • Loading branch information
mpfau authored and ganthern committed Nov 5, 2024
1 parent e04aaa6 commit b86c7ee
Show file tree
Hide file tree
Showing 13 changed files with 1,217 additions and 278 deletions.
637 changes: 361 additions & 276 deletions tuta-sdk/rust/Cargo.lock

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion tuta-sdk/rust/sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@ lz4_flex = { version = "0.11.3", default-features = false, features = ["safe-enc
hyper = { version = "1.4.1", features = ["client"], optional = true }
hyper-util = { version = "0.1.9", features = ["full"], optional = true }
http-body-util = { version = "0.1.2", optional = true }
hyper-rustls = { version = "0.27.3", features = ["ring", "http2", "rustls-platform-verifier"], optional = true }
hyper-rustls = { version = "0.27.3", features = ["ring", "http2"], optional = true }
rustls = { version = "*", optional = true }
form_urlencoded = "1"

[target.'cfg(target_os = "android")'.dependencies]
android_log = "0.1.3"
Expand Down
55 changes: 55 additions & 0 deletions tuta-sdk/rust/sdk/examples/uploadBlob.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
use std::error::Error;
use std::sync::Arc;
use tutasdk::crypto::key::GenericAesKey;
use tutasdk::crypto::randomizer_facade::RandomizerFacade;
use tutasdk::net::native_rest_client::NativeRestClient;
use tutasdk::tutanota_constants::ArchiveDataType;
use tutasdk::Sdk;

fn main() -> Result<(), Box<dyn Error>> {
let mut runtime_builder = tokio::runtime::Builder::new_current_thread();
let runtime_builder = runtime_builder.enable_all();
let Ok(runtime) = runtime_builder.build() else {
panic!("could not initialize tokio runtime");
};

runtime.block_on(async_main())
}

async fn async_main() -> Result<(), Box<dyn Error>> {
let rest_client = Arc::new(NativeRestClient::try_new().unwrap());

// this test expect local server with matching model versions to be live at: http://localhost:9000
let sdk = Sdk::new("http://localhost:9000".to_string(), rest_client.clone());

let logged_in_sdk = sdk
.create_session("map-free@tutanota.de", "map")
.await
.unwrap();

let mail_facade = logged_in_sdk.mail_facade();
let user_mailbox = mail_facade.load_user_mailbox().await.unwrap();
let owner_group_id = user_mailbox._ownerGroup.unwrap();

let randomizer_facade = RandomizerFacade::from_core(rand_core::OsRng);

let new_aes_256_key = GenericAesKey::from_bytes(
randomizer_facade
.generate_random_array::<{ tutasdk::crypto::aes::AES_256_KEY_SIZE }>()
.as_slice(),
)
.unwrap();
let result = logged_in_sdk
.blob_facade()
.encrypt_and_upload(
ArchiveDataType::Attachments,
&owner_group_id,
&new_aes_256_key,
vec![0; 1024],
)
.await?;
for tw in result {
println!("{:?}", tw.blobReferenceToken)
}
Ok(())
}
163 changes: 163 additions & 0 deletions tuta-sdk/rust/sdk/src/blobs/blob_access_token_cache.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
use crate::date::DateProvider;
use crate::entities::storage::BlobServerAccessInfo;
use std::collections::HashMap;
use std::future::Future;
use std::sync::{Arc, RwLock};

pub(super) struct BlobAccessTokenCache {
cache: RwLock<HashMap<String, BlobServerAccessInfo>>,
date_provider: Arc<dyn DateProvider>,
}

impl BlobAccessTokenCache {
pub fn new(date_provider: Arc<dyn DateProvider>) -> Self {
Self {
cache: RwLock::default(),
date_provider,
}
}

pub async fn try_get_token<F, E, Loader>(
&self,
key: &String,
loader: Loader,
) -> Result<BlobServerAccessInfo, E>
where
F: Future<Output = Result<BlobServerAccessInfo, E>> + Sized + Send,
Loader: FnOnce() -> F + Send,
{
{
let cache = self.cache.read().expect("poisoned lock");
let maybe_value = cache.get(key);
if maybe_value.is_some()
&& can_be_used_for_another_request(
maybe_value.unwrap(),
self.date_provider.as_ref(),
) {
return Ok(maybe_value.unwrap().clone());
}
}
let loaded = loader().await?;
self.insert(key, loaded.clone());
Ok(loaded)
}

fn insert(&self, key: &String, value: BlobServerAccessInfo) {
let mut cache = self.cache.write().expect("poisoned lock");
// someone else might have inserted something while we were loading.
// we're just replacing + dropping that value.
let _previous = cache.insert(key.clone(), value);
}

pub fn evict(&self, key: &String) {
let mut cache = self.cache.write().expect("poisoned lock");
cache.remove(key);
}
}

fn can_be_used_for_another_request(
blob_server_access_info: &BlobServerAccessInfo,
date_provider: &dyn DateProvider,
) -> bool {
blob_server_access_info
.expires
.is_after(&date_provider.now())
}

#[cfg(test)]
mod tests {
use crate::blobs::blob_access_token_cache::{
can_be_used_for_another_request, BlobAccessTokenCache,
};
use crate::date::date_provider::stub::DateProviderStub;
use crate::date::DateTime;
use crate::entities::storage::BlobServerAccessInfo;
use crate::util::test_utils::create_test_entity;
use std::sync::Arc;

#[tokio::test]
async fn get_cached() {
let cache = BlobAccessTokenCache::new(Arc::new(DateProviderStub::new(0)));
let key = "key".to_owned();
let test_token = BlobServerAccessInfo {
expires: DateTime::from_millis(10),
..create_test_entity()
};
cache.insert(&key, test_token.clone());
let loaded = cache.try_get_token(&key, || async {
// helps type inference
if true {
panic!("should be in cache");
}
Err(())
});
assert_eq!(test_token, loaded.await.unwrap())
}

#[tokio::test]
async fn get_uncached() {
let cache = BlobAccessTokenCache::new(Arc::new(DateProviderStub::new(0)));
let key = "key".to_owned();
let test_token = BlobServerAccessInfo {
..create_test_entity()
};
let test_clone = test_token.clone();
let loaded = cache.try_get_token(&key, || async move {
Ok(test_clone) as Result<BlobServerAccessInfo, ()>
});
assert_eq!(test_token, loaded.await.unwrap())
}

#[tokio::test]
async fn get_expired() {
let cache = BlobAccessTokenCache::new(Arc::new(DateProviderStub::new(20)));
let key = "key".to_owned();
let expired_token = BlobServerAccessInfo {
expires: DateTime::from_millis(10),
..create_test_entity()
};
cache.insert(&key, expired_token.clone());
let new_token = BlobServerAccessInfo {
expires: DateTime::from_millis(30),
..create_test_entity()
};
let expected_token = new_token.clone();
let loaded = cache.try_get_token(&key, || async move {
Ok(new_token) as Result<BlobServerAccessInfo, ()>
});
assert_eq!(expected_token, loaded.await.unwrap())
}

#[test]
fn can_be_used_for_another_request_expired_token() {
let date_provider = DateProviderStub::new(10);
assert!(!can_be_used_for_another_request(
&BlobServerAccessInfo {
expires: DateTime::from_millis(10),
..create_test_entity()
},
&date_provider
));
assert!(can_be_used_for_another_request(
&BlobServerAccessInfo {
expires: DateTime::from_millis(11),
..create_test_entity()
},
&date_provider
));
}

#[test]
fn evict() {
let cache = BlobAccessTokenCache::new(Arc::new(DateProviderStub::new(20)));
let key = "key".to_owned();
let expired_token = BlobServerAccessInfo {
expires: DateTime::from_millis(10),
..create_test_entity()
};
cache.insert(&key, expired_token.clone());

cache.evict(&key);
assert!(!cache.cache.read().unwrap().contains_key(&key));
}
}
147 changes: 147 additions & 0 deletions tuta-sdk/rust/sdk/src/blobs/blob_access_token_facade.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
use crate::blobs::blob_access_token_cache::BlobAccessTokenCache;
use crate::crypto::randomizer_facade::RandomizerFacade;
use crate::custom_id::CustomId;
use crate::date::DateProvider;
use crate::entities::storage::{BlobAccessTokenPostIn, BlobServerAccessInfo, BlobWriteData};
use crate::generated_id::GeneratedId;
#[cfg_attr(test, mockall_double::double)]
use crate::services::service_executor::ResolvingServiceExecutor;
use crate::services::storage::BlobAccessTokenService;
use crate::services::ExtraServiceParams;
use crate::tutanota_constants::ArchiveDataType;
use crate::ApiCallError;
use base64::prelude::BASE64_URL_SAFE_NO_PAD;
use base64::Engine;
use std::sync::Arc;

/// The BlobAccessTokenFacade requests blobAccessTokens from the BlobAccessTokenService to get
/// or post to the BlobService (binary blobs) or DefaultBlobElementResource (instances).
/// All tokens are cached.
pub(crate) struct BlobAccessTokenFacade {
cache: BlobAccessTokenCache,
randomizer_facade: RandomizerFacade,
service_executor: Arc<ResolvingServiceExecutor>,
}

#[cfg_attr(test, mockall::automock)]
impl BlobAccessTokenFacade {
pub fn new(
randomizer_facade: RandomizerFacade,
service_executor: Arc<ResolvingServiceExecutor>,
date_provider: Arc<dyn DateProvider>,
) -> Self {
Self {
cache: BlobAccessTokenCache::new(date_provider),
randomizer_facade,
service_executor,
}
}

/// Requests a token that allows uploading blobs for the given ArchiveDataType and ownerGroup.
pub async fn request_write_token(
&self,
archive_data_type: ArchiveDataType,
owner_group_id: &GeneratedId,
) -> Result<BlobServerAccessInfo, ApiCallError> {
let archive_data_type_discriminant = archive_data_type.discriminant();
let owner_group_id_clone = owner_group_id.clone();
let loader = move || async move {
let post_in: BlobAccessTokenPostIn = BlobAccessTokenPostIn {
_format: 0,
archiveDataType: Some(archive_data_type_discriminant),
read: None,
write: Some(BlobWriteData {
_id: Some(CustomId(
BASE64_URL_SAFE_NO_PAD
.encode(self.randomizer_facade.generate_random_array::<4>()),
)),
archiveOwnerGroup: owner_group_id_clone,
}),
};
self.service_executor
.post::<BlobAccessTokenService>(post_in, ExtraServiceParams::default())
.await
.map(|r| r.blobAccessInfo)
};

self.cache
.try_get_token(
&make_write_cache_key(owner_group_id, archive_data_type),
loader,
)
.await
}

/// Remove a given write token from the cache.
pub fn evict_write_token(
&self,
archive_data_type: ArchiveDataType,
owner_group_id: &GeneratedId,
) {
let key = make_write_cache_key(owner_group_id, archive_data_type);
self.cache.evict(&key);
}
}

pub(crate) fn make_write_cache_key(
owner_group_id: &GeneratedId,
archive_data_type: ArchiveDataType,
) -> String {
format!(
"{}{}",
owner_group_id.as_str(),
archive_data_type.discriminant()
)
}

#[cfg(test)]
mod tests {
use super::*;
use crate::crypto::randomizer_facade::RandomizerFacade;
use crate::custom_id::CustomId;
use crate::date::date_provider::stub::DateProviderStub;
use crate::date::DateTime;
use crate::entities::storage::{BlobAccessTokenPostOut, BlobServerAccessInfo};
use crate::services::service_executor::MockResolvingServiceExecutor;
use crate::services::storage::BlobAccessTokenService;
use crate::tutanota_constants::ArchiveDataType;
use crate::util::test_utils::create_test_entity;
use crate::GeneratedId;
use std::sync::Arc;

#[tokio::test]
async fn request_write_token_with_uncached_and_cached() {
let owner_group_id = GeneratedId(String::from("hallo"));
let expected_access_info = BlobServerAccessInfo {
_id: Some(CustomId(String::from("123"))),
expires: DateTime::from_millis(1_000),
..create_test_entity()
};

let mut executor = MockResolvingServiceExecutor::default();
executor
.expect_post::<BlobAccessTokenService>()
.times(1)
.return_const(Ok(BlobAccessTokenPostOut {
blobAccessInfo: expected_access_info.clone(),
..create_test_entity()
}));
let facade = BlobAccessTokenFacade::new(
RandomizerFacade::from_core(rand_core::OsRng),
Arc::new(executor),
Arc::new(DateProviderStub::new(10)),
);
let actual_access_info = facade
.request_write_token(ArchiveDataType::Attachments, &owner_group_id)
.await
.expect("failed to request token");

assert_eq!(expected_access_info, actual_access_info);

let cached_access_info = facade
.request_write_token(ArchiveDataType::Attachments, &owner_group_id)
.await
.expect("failed to request token");
assert_eq!(expected_access_info, cached_access_info);
}
}
Loading

0 comments on commit b86c7ee

Please sign in to comment.