Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add unique constraint restriction for KV #3723

Merged
merged 6 commits into from
Feb 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 20 additions & 3 deletions crates/redis_interface/src/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,19 @@ use common_utils::{
};
use error_stack::{IntoReport, ResultExt};
use fred::{
interfaces::{HashesInterface, KeysInterface, StreamsInterface},
interfaces::{HashesInterface, KeysInterface, SetsInterface, StreamsInterface},
prelude::RedisErrorKind,
types::{
Expiration, FromRedis, MultipleIDs, MultipleKeys, MultipleOrderedPairs, MultipleStrings,
RedisKey, RedisMap, RedisValue, Scanner, SetOptions, XCap, XReadResponse,
MultipleValues, RedisKey, RedisMap, RedisValue, Scanner, SetOptions, XCap, XReadResponse,
},
};
use futures::StreamExt;
use router_env::{instrument, logger, tracing};

use crate::{
errors,
types::{DelReply, HsetnxReply, MsetnxReply, RedisEntryId, SetnxReply},
types::{DelReply, HsetnxReply, MsetnxReply, RedisEntryId, SaddReply, SetnxReply},
};

impl super::RedisConnectionPool {
Expand Down Expand Up @@ -466,6 +466,23 @@ impl super::RedisConnectionPool {
.change_context(errors::RedisError::JsonDeserializationFailed)
}

#[instrument(level = "DEBUG", skip(self))]
pub async fn sadd<V>(
&self,
key: &str,
members: V,
) -> CustomResult<SaddReply, errors::RedisError>
where
V: TryInto<MultipleValues> + Debug + Send,
V::Error: Into<fred::error::RedisError> + Send,
{
self.pool
.sadd(key, members)
.await
.into_report()
.change_context(errors::RedisError::SetAddMembersFailed)
}

#[instrument(level = "DEBUG", skip(self))]
pub async fn stream_append_entry<F>(
&self,
Expand Down
2 changes: 2 additions & 0 deletions crates/redis_interface/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ pub enum RedisError {
SetHashFailed,
#[error("Failed to set hash field in Redis")]
SetHashFieldFailed,
#[error("Failed to add members to set in Redis")]
SetAddMembersFailed,
#[error("Failed to get hash field in Redis")]
GetHashFieldFailed,
#[error("The requested value was not found in Redis")]
Expand Down
19 changes: 19 additions & 0 deletions crates/redis_interface/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,3 +255,22 @@ impl fred::types::FromRedis for DelReply {
}
}
}

#[derive(Debug)]
pub enum SaddReply {
KeySet,
KeyNotSet,
}

impl fred::types::FromRedis for SaddReply {
fn from_value(value: fred::types::RedisValue) -> Result<Self, fred::error::RedisError> {
match value {
fred::types::RedisValue::Integer(1) => Ok(Self::KeySet),
fred::types::RedisValue::Integer(0) => Ok(Self::KeyNotSet),
_ => Err(fred::error::RedisError::new(
fred::error::RedisErrorKind::Unknown,
"Unexpected sadd command reply",
)),
}
}
}
10 changes: 6 additions & 4 deletions crates/storage_impl/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,10 +164,12 @@ impl RedisErrorExt for error_stack::Report<RedisError> {
RedisError::NotFound => self.change_context(DataStorageError::ValueNotFound(format!(
"Data does not exist for key {key}",
))),
RedisError::SetNxFailed => self.change_context(DataStorageError::DuplicateValue {
entity: "redis",
key: Some(key.to_string()),
}),
RedisError::SetNxFailed | RedisError::SetAddMembersFailed => {
self.change_context(DataStorageError::DuplicateValue {
entity: "redis",
key: Some(key.to_string()),
})
}
_ => self.change_context(DataStorageError::KVError),
}
}
Expand Down
74 changes: 73 additions & 1 deletion crates/storage_impl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ pub mod refund;
mod reverse_lookup;
mod utils;

use common_utils::errors::CustomResult;
use database::store::PgPool;
pub use mock_db::MockDb;
use redis_interface::errors::RedisError;
use redis_interface::{errors::RedisError, SaddReply};

pub use crate::database::store::DatabaseStore;

Expand Down Expand Up @@ -259,3 +260,74 @@ pub(crate) fn diesel_error_to_data_error(
_ => StorageError::DatabaseError(error_stack::report!(*diesel_error)),
}
}

#[async_trait::async_trait]
pub trait UniqueConstraints {
fn unique_constraints(&self) -> Vec<String>;
fn table_name(&self) -> &str;
async fn check_for_constraints(
&self,
redis_conn: &Arc<redis_interface::RedisConnectionPool>,
) -> CustomResult<(), RedisError> {
let constraints = self.unique_constraints();
let sadd_result = redis_conn
.sadd(
&format!("unique_constraint:{}", self.table_name()),
constraints,
)
.await?;

match sadd_result {
SaddReply::KeyNotSet => Err(error_stack::report!(RedisError::SetAddMembersFailed)),
SaddReply::KeySet => Ok(()),
}
}
}

impl UniqueConstraints for diesel_models::Address {
fn unique_constraints(&self) -> Vec<String> {
vec![format!("address_{}", self.address_id)]
}
fn table_name(&self) -> &str {
"Address"
}
}

impl UniqueConstraints for diesel_models::PaymentIntent {
fn unique_constraints(&self) -> Vec<String> {
vec![format!("pi_{}_{}", self.merchant_id, self.payment_id)]
}
fn table_name(&self) -> &str {
"PaymentIntent"
}
}

impl UniqueConstraints for diesel_models::PaymentAttempt {
fn unique_constraints(&self) -> Vec<String> {
vec![format!(
"pa_{}_{}_{}",
self.merchant_id, self.payment_id, self.attempt_id
)]
}
fn table_name(&self) -> &str {
"PaymentAttempt"
}
}

impl UniqueConstraints for diesel_models::Refund {
fn unique_constraints(&self) -> Vec<String> {
vec![format!("refund_{}_{}", self.merchant_id, self.refund_id)]
}
fn table_name(&self) -> &str {
"Refund"
}
}

impl UniqueConstraints for diesel_models::ReverseLookup {
fn unique_constraints(&self) -> Vec<String> {
vec![format!("reverselookup_{}", self.lookup_id)]
}
fn table_name(&self) -> &str {
"ReverseLookup"
}
}
8 changes: 6 additions & 2 deletions crates/storage_impl/src/redis/kv_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use router_derive::TryGetEnumVariant;
use router_env::logger;
use serde::de;

use crate::{metrics, store::kv::TypedSql, KVRouterStore};
use crate::{metrics, store::kv::TypedSql, KVRouterStore, UniqueConstraints};

pub trait KvStorePartition {
fn partition_number(key: PartitionKey<'_>, num_partitions: u8) -> u32 {
Expand Down Expand Up @@ -95,7 +95,7 @@ pub async fn kv_wrapper<'a, T, D, S>(
where
T: de::DeserializeOwned,
D: crate::database::store::DatabaseStore,
S: serde::Serialize + Debug + KvStorePartition,
S: serde::Serialize + Debug + KvStorePartition + UniqueConstraints + Sync,
{
let redis_conn = store.get_redis_conn()?;

Expand Down Expand Up @@ -147,6 +147,8 @@ where
KvOperation::HSetNx(field, value, sql) => {
logger::debug!(kv_operation= %operation, value = ?value);

value.check_for_constraints(&redis_conn).await?;

let result = redis_conn
.serialize_and_set_hash_field_if_not_exist(key, field, value, Some(ttl))
.await?;
Expand All @@ -168,6 +170,8 @@ where
.serialize_and_set_key_if_not_exist(key, value, Some(ttl.into()))
.await?;

value.check_for_constraints(&redis_conn).await?;

if matches!(result, redis_interface::SetnxReply::KeySet) {
store
.push_to_drainer_stream::<S>(sql, partition_key)
Expand Down
Loading