Skip to content

Commit

Permalink
test: add necessary tests for region meta manager
Browse files Browse the repository at this point in the history
  • Loading branch information
niebayes committed Nov 24, 2023
1 parent 4fc127e commit 697b893
Showing 1 changed file with 129 additions and 1 deletion.
130 changes: 129 additions & 1 deletion src/common/meta/src/key/region_meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,18 @@ use crate::rpc::KeyValue;
pub type KafkaTopic = String;

/// A region's unique metadata.
#[derive(Clone)]
pub struct RegionMeta {
region_id: RegionId,
topic: Option<KafkaTopic>,
}

impl RegionMeta {
fn new(region_id: RegionId, topic: Option<KafkaTopic>) -> Self {
Self { region_id, topic }
}
}

// The table id is included to support efficiently scan metadata of all regions of a table.
pub struct RegionMetaKey {
table_id: TableId,
Expand Down Expand Up @@ -66,7 +73,7 @@ impl TableMetaKey for RegionMetaKey {
}
}

#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
#[derive(Serialize, Deserialize, PartialEq, Debug)]
pub struct RegionMetaValue {
topic: Option<KafkaTopic>,
}
Expand Down Expand Up @@ -155,3 +162,124 @@ impl RegionMetaManager {
Ok(Txn::new().and_then(txns))
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::kv_backend::memory::MemoryKvBackend;
use crate::kv_backend::txn::{TxnOpResponse, TxnService};

#[test]
fn test_key_serde() {
let table_id = 22;
let region_number = 33;
let key = RegionMetaKey::new(table_id, region_number);
let got = key.as_raw_key();
let expected = format!("{}/{}/{}", REGION_META_KEY_PREFIX, 22, 33).into_bytes();
assert_eq!(got, expected);
}

#[test]
fn test_value_serde() {
let value = RegionMetaValue { topic: None };
let raw_value = value.try_as_raw_value().unwrap();
let parsed_value = RegionMetaValue::try_from_raw_value(&raw_value).unwrap();
assert_eq!(value, parsed_value);

let value = RegionMetaValue {
topic: Some("test_topic".to_string()),
};
let raw_value = value.try_as_raw_value().unwrap();
let parsed_value = RegionMetaValue::try_from_raw_value(&raw_value).unwrap();
assert_eq!(value, parsed_value);
}

#[tokio::test]
async fn test_create_then_get() {
let kv_backend = Arc::new(MemoryKvBackend::default());
let region_meta_manager = RegionMetaManager::new(kv_backend.clone());

let region_id = RegionId::from_u64(1);
let topic = Some("test_topic".to_string());
let region_metas = vec![RegionMeta::new(region_id, topic.clone())];
let create_txn = region_meta_manager.build_create_txn(region_metas).unwrap();
let create_response = kv_backend.txn(create_txn).await.unwrap();
assert!(create_response.succeeded);

let region_meta_key = RegionMetaKey::new(region_id.table_id(), region_id.region_number());
let region_meta_value = region_meta_manager.get(&region_meta_key).await.unwrap();
assert_eq!(region_meta_value, Some(RegionMetaValue { topic }));
}

#[tokio::test]
async fn test_create_then_range() {
let kv_backend = Arc::new(MemoryKvBackend::default());
let region_meta_manager = RegionMetaManager::new(kv_backend.clone());

let table_id = 42;
let region_metas = vec![
RegionMeta::new(RegionId::new(table_id, 1), Some("test_topic_1".to_string())),
RegionMeta::new(RegionId::new(table_id, 2), Some("test_topic_2".to_string())),
RegionMeta::new(RegionId::new(table_id, 3), Some("test_topic_3".to_string())),
];
let create_txn = region_meta_manager
.build_create_txn(region_metas.clone())
.unwrap();
let create_response = kv_backend.txn(create_txn).await.unwrap();
assert!(create_response.succeeded);

let region_meta_stream = region_meta_manager.region_metas(table_id);
// FIXME(niebayes): Simplify the codes.
let got = region_meta_stream
.collect::<Vec<_>>()
.await
.into_iter()
.map(|res| res.unwrap())
.collect::<Vec<_>>();
assert_eq!(format!("{:?}", got).into_bytes(), b"[RegionMetaValue { topic: Some(\"test_topic_1\") }, RegionMetaValue { topic: Some(\"test_topic_2\") }, RegionMetaValue { topic: Some(\"test_topic_3\") }]");

let expected = region_metas
.into_iter()
.map(|region_meta| RegionMetaValue::new(region_meta.topic))
.collect::<Vec<_>>();
assert_eq!(expected, got);
}

#[tokio::test]
async fn test_create_then_delete() {
let kv_backend = Arc::new(MemoryKvBackend::default());
let region_meta_manager = RegionMetaManager::new(kv_backend.clone());

let table_id = 42;
let region_metas = vec![
RegionMeta::new(RegionId::new(table_id, 1), Some("test_topic_1".to_string())),
RegionMeta::new(RegionId::new(table_id, 2), Some("test_topic_2".to_string())),
RegionMeta::new(RegionId::new(table_id, 3), Some("test_topic_3".to_string())),
];
let create_txn = region_meta_manager
.build_create_txn(region_metas.clone())
.unwrap();
let create_response = kv_backend.txn(create_txn).await.unwrap();
assert!(create_response.succeeded);

let region_ids = region_metas
.into_iter()
.map(|region_meta| region_meta.region_id)
.collect::<Vec<_>>();
let delete_txn = region_meta_manager.build_delete_txn(region_ids).unwrap();
let delete_response = kv_backend.txn(delete_txn).await.unwrap();
assert!(delete_response.succeeded);

let num_deleted = delete_response
.responses
.into_iter()
.map(|response| {
let TxnOpResponse::ResponseDelete(inner) = response else {
panic!();
};
inner.deleted
})
.sum::<i64>();
assert_eq!(num_deleted, 3);
}
}

0 comments on commit 697b893

Please sign in to comment.