Skip to content

Commit

Permalink
test(sys): add and modify test case
Browse files Browse the repository at this point in the history
  • Loading branch information
qevolg committed Jan 21, 2025
1 parent 01dbd8d commit bb78de0
Showing 1 changed file with 140 additions and 10 deletions.
150 changes: 140 additions & 10 deletions taos-ws-sys/src/native/tmq.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ pub struct tmq_topic_assignment {

#[repr(C)]
#[allow(non_camel_case_types)]
#[derive(Debug, PartialEq, Eq)]
pub enum tmq_res_t {
TMQ_RES_INVALID = -1,
TMQ_RES_DATA = 1,
Expand Down Expand Up @@ -382,7 +383,6 @@ pub extern "C" fn tmq_commit_async(
todo!()
}

// TODO: test case
#[no_mangle]
#[allow(non_snake_case)]
pub unsafe extern "C" fn tmq_commit_offset_sync(
Expand Down Expand Up @@ -669,7 +669,6 @@ pub unsafe extern "C" fn tmq_position(
}
}

// TODO: test case
#[no_mangle]
#[allow(non_snake_case)]
pub unsafe extern "C" fn tmq_committed(
Expand Down Expand Up @@ -736,7 +735,6 @@ pub unsafe extern "C" fn tmq_committed(
}
}

// TODO: test case
#[no_mangle]
pub unsafe extern "C" fn tmq_get_table_name(res: *mut TAOS_RES) -> *const c_char {
trace!("tmq_get_table_name start, res: {res:?}");
Expand All @@ -761,7 +759,6 @@ pub unsafe extern "C" fn tmq_get_table_name(res: *mut TAOS_RES) -> *const c_char
}
}

// TODO: test case
#[no_mangle]
pub extern "C" fn tmq_get_res_type(res: *mut TAOS_RES) -> tmq_res_t {
trace!("tmq_get_res_type start, res: {res:?}");
Expand All @@ -773,7 +770,6 @@ pub extern "C" fn tmq_get_res_type(res: *mut TAOS_RES) -> tmq_res_t {
tmq_res_t::TMQ_RES_DATA
}

// TODO: test case
#[no_mangle]
pub unsafe extern "C" fn tmq_get_topic_name(res: *mut TAOS_RES) -> *const c_char {
trace!("tmq_get_topic_name start, res: {res:?}");
Expand All @@ -798,7 +794,6 @@ pub unsafe extern "C" fn tmq_get_topic_name(res: *mut TAOS_RES) -> *const c_char
}
}

// TODO: test case
#[no_mangle]
pub unsafe extern "C" fn tmq_get_db_name(res: *mut TAOS_RES) -> *const c_char {
trace!("tmq_get_db_name start, res: {res:?}");
Expand All @@ -823,7 +818,6 @@ pub unsafe extern "C" fn tmq_get_db_name(res: *mut TAOS_RES) -> *const c_char {
}
}

// TODO: test case
#[no_mangle]
pub unsafe extern "C" fn tmq_get_vgroup_id(res: *mut TAOS_RES) -> i32 {
trace!("tmq_get_vgroup_id start, res: {res:?}");
Expand All @@ -848,7 +842,6 @@ pub unsafe extern "C" fn tmq_get_vgroup_id(res: *mut TAOS_RES) -> i32 {
}
}

// TODO: test case
#[no_mangle]
pub unsafe extern "C" fn tmq_get_vgroup_offset(res: *mut TAOS_RES) -> i64 {
trace!("tmq_get_vgroup_offset start, res: {res:?}");
Expand Down Expand Up @@ -1276,7 +1269,7 @@ mod tests {
#[test]
fn test_tmq_subscribe() {
unsafe {
let db = "test_1737357704";
let db = "test_1737357513";
let topic = "topic_1737357513";

let taos = test_connect();
Expand Down Expand Up @@ -1343,7 +1336,7 @@ mod tests {
#[test]
fn test_tmq_get_topic_assignment() {
unsafe {
let db = "test_1737423087";
let db = "test_1737423043";
let topic = "topic_1737423043";

let taos = test_connect();
Expand Down Expand Up @@ -1522,4 +1515,141 @@ mod tests {
);
}
}

#[test]
fn test_tmq_commit_offset_sync() {
unsafe {
let db = "test_1737444552";
let topic = "topic_1737444552";
let table = "t0";

let taos = test_connect();
test_exec_many(
taos,
&[
&format!("drop topic if exists {topic}"),
&format!("drop database if exists {db}"),
&format!("create database {db}"),
&format!("use {db}"),
&format!("create table {table} (ts timestamp, c1 int)"),
&format!("insert into {table} values (now, 1)"),
&format!("insert into {table} values (now, 2)"),
&format!("create topic {topic} as database {db}"),
],
);

let conf = tmq_conf_new();
assert!(!conf.is_null());

let key = c"group.id".as_ptr();
let value = c"1".as_ptr();
let res = tmq_conf_set(conf, key, value);
assert_eq!(res, tmq_conf_res_t::TMQ_CONF_OK);

let key = c"auto.offset.reset".as_ptr();
let value = c"earliest".as_ptr();
let res = tmq_conf_set(conf, key, value);
assert_eq!(res, tmq_conf_res_t::TMQ_CONF_OK);

let mut errstr = [0; 256];
let consumer = tmq_consumer_new(conf, errstr.as_mut_ptr(), errstr.len() as _);
assert!(!consumer.is_null());

let list = tmq_list_new();
assert!(!list.is_null());

let value = CString::from_str(topic).unwrap();
let errno = tmq_list_append(list, value.as_ptr());
assert_eq!(errno, 0);

let errno = tmq_subscribe(consumer, list);
assert_eq!(errno, 0);

tmq_conf_destroy(conf);
tmq_list_destroy(list);

let topic_name = CString::from_str(topic).unwrap();
let mut assignment = ptr::null_mut();
let mut num_of_assignment = 0;

let errno = tmq_get_topic_assignment(
consumer,
topic_name.as_ptr(),
&mut assignment,
&mut num_of_assignment,
);
assert_eq!(errno, 0);

let (_, (len, cap)) = TOPIC_ASSIGNMETN_MAP.remove(&(assignment as usize)).unwrap();
let assigns = Vec::from_raw_parts(assignment, len, cap);

let mut vg_ids = Vec::new();
for assign in &assigns {
vg_ids.push(assign.vgId);
}

loop {
let res = tmq_consumer_poll(consumer, 1000);
if res.is_null() {
break;
}

let table_name = tmq_get_table_name(res);
assert!(!table_name.is_null());
assert_eq!(
CStr::from_ptr(table_name),
CString::new(table).unwrap().as_c_str()
);

let db_name = tmq_get_db_name(res);
assert!(!db_name.is_null());
assert_eq!(
CStr::from_ptr(db_name),
CString::new(db).unwrap().as_c_str()
);

let res_type = tmq_get_res_type(res);
assert_eq!(res_type, tmq_res_t::TMQ_RES_DATA);

let topic_name = tmq_get_topic_name(res);
assert!(!topic_name.is_null());
assert_eq!(
CStr::from_ptr(topic_name),
CString::new(topic).unwrap().as_c_str()
);

let vg_id = tmq_get_vgroup_id(res);
assert!(vg_ids.contains(&vg_id));

let offset = tmq_get_vgroup_offset(res);
assert_eq!(offset, 0);

let mut current_offset = 0;
for assign in &assigns {
if assign.vgId == vg_id {
current_offset = assign.currentOffset;
println!("current_offset: {current_offset}");
break;
}
}

let errno = tmq_commit_offset_sync(consumer, topic_name, vg_id, current_offset);
assert_eq!(errno, 0);

let committed_offset = tmq_committed(consumer, topic_name, vg_id);
assert_eq!(committed_offset, current_offset);
}

let errno = tmq_unsubscribe(consumer);
assert_eq!(errno, 0);

let errno = tmq_consumer_close(consumer);
assert_eq!(errno, 0);

test_exec_many(
taos,
&[format!("drop topic {topic}"), format!("drop database {db}")],
);
}
}
}

0 comments on commit bb78de0

Please sign in to comment.