Skip to content

Commit

Permalink
[identity] Add exponential backoff for one-time key retries
Browse files Browse the repository at this point in the history
Summary:
Patrially addresses [[ https://linear.app/comm/issue/ENG-9264/introduce-transact-write-helper-in-comm-lib-to-handle-retries-and | ENG-9264 ]].
Used exponential backoff utility from our batch helper in our OTK logic, without introducing dedicated transact write helper, which would be much more involved.

Test Plan:
Started local Identity. Registered a user device and uploaded a bunch of OTKs.
Added the following test to Commtest = it spawns 9 simultaneous `GetOutboundKeys` RPC calls:
```
lang=rust
#[tokio::test]
async fn otks() {
  // the device you uploaded OTKs for
  let user_id = "** paste here **";
  let device_id = "** paste here **";
  let access_token = "** paste here **";

  let request = OutboundKeysForUserRequest {
    user_id: user_id.to_string(),
  };

  // spawn 9 simultaneous GetOutboundKeys calls
  for i in 1..10 {
    let request = request.clone();
    let user_id = request.user_id.clone();
    let device_id = device_id.to_string();
    let access_token = access_token.to_string();
    tokio::spawn(async move {
      let mut client = get_auth_client(
        &service_addr::IDENTITY_GRPC.to_string(),
        user_id,
        device_id.clone(),
        access_token,
        PlatformMetadata::new(PLACEHOLDER_CODE_VERSION, DEVICE_TYPE),
      )
      .await
      .expect("Couldn't connect to identity service");

      let result = client
        .get_outbound_keys_for_user(request.clone())
        .await
        .unwrap()
        .into_inner();

      let device = result.devices.get(&device_id).unwrap();
      println!(
        "Thread {}: content={:?}, notif={:?}",
        i, device.one_time_content_prekey, device.one_time_notif_prekey
      );
    });
  }

  println!("Now sleeping");
  tokio::time::sleep(Duration::from_secs(3)).await;
  println!("Done");
}
```

Before introducing the backoff, was able to reproduce the issue (lowered max retries to 3, localstack is too fast, it was rarely failing with 8).
```
---- otks stdout ----
Now sleeping
Thread 3: content=Some("pVPfPv8GwO6A1nmh9sTCTtgXY1SgTpkg1axzSLfpKYO"), notif=Some("0glisLhk42UpSlSzPvRG1Ni54QOdrDnBiG22MI6c0V8")
Thread 4: content=Some("6s8AJVc21lEo5b7O70HlpsHJQI0xTMLNCRhERDBI0fC"), notif=None
Thread 7: content=None, notif=Some("pVPfPv8GwO6A1nmh9sTCTtgXY1SgTpkg1axzSLfpKYO")
Thread 8: content=Some("oTPvgqPb159Xrc3lcT9AxqTZ0KmhXDQS9BCIm6NkV1Q"), notif=None
Thread 2: content=None, notif=None
Thread 5: content=None, notif=None
Thread 9: content=None, notif=None
Thread 1: content=None, notif=Some("6s8AJVc21lEo5b7O70HlpsHJQI0xTMLNCRhERDBI0fC")
Thread 6: content=Some("mUlLHPepdsZtVD9FEkwgxuuLJAOO7rwiJrG4nrllsEy"), notif=None
Done
```
Also saw several `MaxRetriesExceeded` in Identity console

After introducing the backoff, the OTKs were always returned for all threads:
```
---- otks stdout ----
Now sleeping
Thread 7: content=Some("2LCsQHnU7CtVRbmrtK9U4hASqrauEz0xumpoikNX1XH"), notif=Some("Udmu7PoHUkP9VvrcHObwlJ69AwTcdk13OfidsHgzC9l")
Thread 3: content=Some("q2AeT2WSq591qLaMP91guNmrPXnWsQp0JQUzKdtnImC"), notif=Some("wCZ558PHRHYinPWGqCxUUE2BnjpesN03IMdLMplV9ii")
Thread 5: content=Some("KedtvWnxaeBqZkA0l52H9yZFAPaaMJuyDfgQQNJ1aIn"), notif=Some("u2wNG3GhPzVWaWaeYauSvfed9RR6x8i2zCsRuQMd9lv")
Thread 4: content=Some("9LzxGjX47mr3w6MObSG30URQ6LdLOBFOpgmeh6yLn1w"), notif=Some("2LCsQHnU7CtVRbmrtK9U4hASqrauEz0xumpoikNX1XH")
Thread 2: content=Some("jhketAygIKVLRWFm454ceAjxAQOQVlZerbGL2mtOP8Y"), notif=Some("q2AeT2WSq591qLaMP91guNmrPXnWsQp0JQUzKdtnImC")
Thread 1: content=Some("Z76yL9r8hlKJ0XoVUAV9CENU8IFIoUTWOo74iuHyNK8"), notif=Some("KedtvWnxaeBqZkA0l52H9yZFAPaaMJuyDfgQQNJ1aIn")
Thread 6: content=Some("0glisLhk42UpSlSzPvRG1Ni54QOdrDnBiG22MI6c0V8"), notif=Some("9LzxGjX47mr3w6MObSG30URQ6LdLOBFOpgmeh6yLn1w")
Thread 8: content=Some("pVPfPv8GwO6A1nmh9sTCTtgXY1SgTpkg1axzSLfpKYO"), notif=Some("jhketAygIKVLRWFm454ceAjxAQOQVlZerbGL2mtOP8Y")
Thread 9: content=Some("6s8AJVc21lEo5b7O70HlpsHJQI0xTMLNCRhERDBI0fC"), notif=Some("Z76yL9r8hlKJ0XoVUAV9CENU8IFIoUTWOo74iuHyNK8")
Done
```

Reviewers: varun, kamil

Reviewed By: varun, kamil

Subscribers: ashoat, tomek

Differential Revision: https://phab.comm.dev/D13356
  • Loading branch information
barthap committed Sep 17, 2024
1 parent 1a92d95 commit 3bee0f5
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 11 deletions.
12 changes: 6 additions & 6 deletions services/identity/src/database/one_time_keys.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,17 +68,16 @@ impl DatabaseClient {

// TODO: Introduce `transact_write_helper` similar to `batch_write_helper`
// in `comm-lib` to handle transactions with retries
let mut attempt = 0;
let retry_config = ExponentialBackoffConfig {
max_attempts: retry::MAX_ATTEMPTS as u32,
..Default::default()
};

// TODO: Introduce nanny task that handles calling `spawn_refresh_keys_task`
let mut requested_more_keys = false;

let mut exponential_backoff = retry_config.new_counter();
loop {
attempt += 1;
if attempt > retry::MAX_ATTEMPTS {
return Err(Error::MaxRetriesExceeded);
}

let otk_count =
self.get_otk_count(user_id, device_id, account_type).await?;
if otk_count < ONE_TIME_KEY_MINIMUM_THRESHOLD && can_request_more_keys {
Expand Down Expand Up @@ -149,6 +148,7 @@ impl DatabaseClient {
]);
if is_transaction_retryable(&dynamo_db_error, &retryable_codes) {
info!("Encountered transaction conflict while retrieving one-time key - retrying");
exponential_backoff.sleep_and_retry().await?;
} else {
error!(
errorType = error_types::OTK_DB_LOG,
Expand Down
10 changes: 5 additions & 5 deletions shared/comm-lib/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@ pub mod batch_operations {
}

impl ExponentialBackoffConfig {
fn new_counter(&self) -> ExponentialBackoffHelper {
pub fn new_counter(&self) -> ExponentialBackoffHelper {
ExponentialBackoffHelper::new(self)
}
fn backoff_enabled(&self) -> bool {
Expand Down Expand Up @@ -690,8 +690,8 @@ pub mod batch_operations {
Ok(())
}

/// internal helper struct
struct ExponentialBackoffHelper<'cfg> {
/// Utility for managing retries with exponential backoff
pub struct ExponentialBackoffHelper<'cfg> {
config: &'cfg ExponentialBackoffConfig,
attempt: u32,
}
Expand All @@ -702,12 +702,12 @@ pub mod batch_operations {
}

/// reset counter after successfull operation
fn reset(&mut self) {
pub fn reset(&mut self) {
self.attempt = 0;
}

/// increase counter and sleep in case of failure
async fn sleep_and_retry(&mut self) -> Result<(), super::Error> {
pub async fn sleep_and_retry(&mut self) -> Result<(), super::Error> {
let jitter_factor = 1f32.min(0f32.max(self.config.jitter_factor));
let random_multiplier =
1.0 + rand::thread_rng().gen_range(-jitter_factor..=jitter_factor);
Expand Down

0 comments on commit 3bee0f5

Please sign in to comment.