diff --git a/taos-ws-sys/src/native/tmq.rs b/taos-ws-sys/src/native/tmq.rs index d78b0624..d50fee80 100644 --- a/taos-ws-sys/src/native/tmq.rs +++ b/taos-ws-sys/src/native/tmq.rs @@ -68,6 +68,7 @@ pub struct tmq_topic_assignment { #[repr(C)] #[allow(non_camel_case_types)] +#[derive(Debug, PartialEq, Eq)] pub enum tmq_res_t { TMQ_RES_INVALID = -1, TMQ_RES_DATA = 1, @@ -382,7 +383,6 @@ pub extern "C" fn tmq_commit_async( todo!() } -// TODO: test case #[no_mangle] #[allow(non_snake_case)] pub unsafe extern "C" fn tmq_commit_offset_sync( @@ -669,7 +669,6 @@ pub unsafe extern "C" fn tmq_position( } } -// TODO: test case #[no_mangle] #[allow(non_snake_case)] pub unsafe extern "C" fn tmq_committed( @@ -736,7 +735,6 @@ pub unsafe extern "C" fn tmq_committed( } } -// TODO: test case #[no_mangle] pub unsafe extern "C" fn tmq_get_table_name(res: *mut TAOS_RES) -> *const c_char { trace!("tmq_get_table_name start, res: {res:?}"); @@ -761,7 +759,6 @@ pub unsafe extern "C" fn tmq_get_table_name(res: *mut TAOS_RES) -> *const c_char } } -// TODO: test case #[no_mangle] pub extern "C" fn tmq_get_res_type(res: *mut TAOS_RES) -> tmq_res_t { trace!("tmq_get_res_type start, res: {res:?}"); @@ -773,7 +770,6 @@ pub extern "C" fn tmq_get_res_type(res: *mut TAOS_RES) -> tmq_res_t { tmq_res_t::TMQ_RES_DATA } -// TODO: test case #[no_mangle] pub unsafe extern "C" fn tmq_get_topic_name(res: *mut TAOS_RES) -> *const c_char { trace!("tmq_get_topic_name start, res: {res:?}"); @@ -798,7 +794,6 @@ pub unsafe extern "C" fn tmq_get_topic_name(res: *mut TAOS_RES) -> *const c_char } } -// TODO: test case #[no_mangle] pub unsafe extern "C" fn tmq_get_db_name(res: *mut TAOS_RES) -> *const c_char { trace!("tmq_get_db_name start, res: {res:?}"); @@ -823,7 +818,6 @@ pub unsafe extern "C" fn tmq_get_db_name(res: *mut TAOS_RES) -> *const c_char { } } -// TODO: test case #[no_mangle] pub unsafe extern "C" fn tmq_get_vgroup_id(res: *mut TAOS_RES) -> i32 { trace!("tmq_get_vgroup_id start, res: {res:?}"); @@ -848,7 +842,6 @@ pub unsafe extern "C" fn tmq_get_vgroup_id(res: *mut TAOS_RES) -> i32 { } } -// TODO: test case #[no_mangle] pub unsafe extern "C" fn tmq_get_vgroup_offset(res: *mut TAOS_RES) -> i64 { trace!("tmq_get_vgroup_offset start, res: {res:?}"); @@ -1276,7 +1269,7 @@ mod tests { #[test] fn test_tmq_subscribe() { unsafe { - let db = "test_1737357704"; + let db = "test_1737357513"; let topic = "topic_1737357513"; let taos = test_connect(); @@ -1343,7 +1336,7 @@ mod tests { #[test] fn test_tmq_get_topic_assignment() { unsafe { - let db = "test_1737423087"; + let db = "test_1737423043"; let topic = "topic_1737423043"; let taos = test_connect(); @@ -1522,4 +1515,141 @@ mod tests { ); } } + + #[test] + fn test_tmq_commit_offset_sync() { + unsafe { + let db = "test_1737444552"; + let topic = "topic_1737444552"; + let table = "t0"; + + let taos = test_connect(); + test_exec_many( + taos, + &[ + &format!("drop topic if exists {topic}"), + &format!("drop database if exists {db}"), + &format!("create database {db}"), + &format!("use {db}"), + &format!("create table {table} (ts timestamp, c1 int)"), + &format!("insert into {table} values (now, 1)"), + &format!("insert into {table} values (now, 2)"), + &format!("create topic {topic} as database {db}"), + ], + ); + + let conf = tmq_conf_new(); + assert!(!conf.is_null()); + + let key = c"group.id".as_ptr(); + let value = c"1".as_ptr(); + let res = tmq_conf_set(conf, key, value); + assert_eq!(res, tmq_conf_res_t::TMQ_CONF_OK); + + let key = c"auto.offset.reset".as_ptr(); + let value = c"earliest".as_ptr(); + let res = tmq_conf_set(conf, key, value); + assert_eq!(res, tmq_conf_res_t::TMQ_CONF_OK); + + let mut errstr = [0; 256]; + let consumer = tmq_consumer_new(conf, errstr.as_mut_ptr(), errstr.len() as _); + assert!(!consumer.is_null()); + + let list = tmq_list_new(); + assert!(!list.is_null()); + + let value = CString::from_str(topic).unwrap(); + let errno = tmq_list_append(list, value.as_ptr()); + assert_eq!(errno, 0); + + let errno = tmq_subscribe(consumer, list); + assert_eq!(errno, 0); + + tmq_conf_destroy(conf); + tmq_list_destroy(list); + + let topic_name = CString::from_str(topic).unwrap(); + let mut assignment = ptr::null_mut(); + let mut num_of_assignment = 0; + + let errno = tmq_get_topic_assignment( + consumer, + topic_name.as_ptr(), + &mut assignment, + &mut num_of_assignment, + ); + assert_eq!(errno, 0); + + let (_, (len, cap)) = TOPIC_ASSIGNMETN_MAP.remove(&(assignment as usize)).unwrap(); + let assigns = Vec::from_raw_parts(assignment, len, cap); + + let mut vg_ids = Vec::new(); + for assign in &assigns { + vg_ids.push(assign.vgId); + } + + loop { + let res = tmq_consumer_poll(consumer, 1000); + if res.is_null() { + break; + } + + let table_name = tmq_get_table_name(res); + assert!(!table_name.is_null()); + assert_eq!( + CStr::from_ptr(table_name), + CString::new(table).unwrap().as_c_str() + ); + + let db_name = tmq_get_db_name(res); + assert!(!db_name.is_null()); + assert_eq!( + CStr::from_ptr(db_name), + CString::new(db).unwrap().as_c_str() + ); + + let res_type = tmq_get_res_type(res); + assert_eq!(res_type, tmq_res_t::TMQ_RES_DATA); + + let topic_name = tmq_get_topic_name(res); + assert!(!topic_name.is_null()); + assert_eq!( + CStr::from_ptr(topic_name), + CString::new(topic).unwrap().as_c_str() + ); + + let vg_id = tmq_get_vgroup_id(res); + assert!(vg_ids.contains(&vg_id)); + + let offset = tmq_get_vgroup_offset(res); + assert_eq!(offset, 0); + + let mut current_offset = 0; + for assign in &assigns { + if assign.vgId == vg_id { + current_offset = assign.currentOffset; + println!("current_offset: {current_offset}"); + break; + } + } + + let errno = tmq_commit_offset_sync(consumer, topic_name, vg_id, current_offset); + assert_eq!(errno, 0); + + let committed_offset = tmq_committed(consumer, topic_name, vg_id); + assert_eq!(committed_offset, current_offset); + } + + let errno = tmq_unsubscribe(consumer); + assert_eq!(errno, 0); + + let errno = tmq_consumer_close(consumer); + assert_eq!(errno, 0); + + test_exec_many( + taos, + &[format!("drop topic {topic}"), format!("drop database {db}")], + ); + } + } }