Skip to content

Commit

Permalink
refactor(sys): impl tmq_conf_set
Browse files Browse the repository at this point in the history
  • Loading branch information
qevolg committed Jan 19, 2025
1 parent 1a8fed2 commit cac3e59
Showing 1 changed file with 90 additions and 6 deletions.
96 changes: 90 additions & 6 deletions taos-ws-sys/src/native/tmq.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
use std::collections::HashMap;
use std::ffi::{c_char, c_int, c_void};
use std::ffi::{c_char, c_int, c_void, CStr};
use std::str::FromStr;

use crate::native::error::TaosMaybeError;
use crate::native::TAOS_RES;
use taos_error::Code;
use tracing::trace;

use crate::native::error::{TaosError, TaosMaybeError};
use crate::native::{TaosResult, TAOS_RES};

pub const TSDB_CLIENT_ID_LEN: usize = 256;
pub const TSDB_CGROUP_LEN: usize = 193;
Expand Down Expand Up @@ -403,6 +407,7 @@ pub struct tmq_conf_t {

#[repr(C)]
#[allow(non_camel_case_types)]
#[derive(Debug, PartialEq, Eq)]
pub enum tmq_conf_res_t {
TMQ_CONF_UNKNOWN = -2,
TMQ_CONF_INVALID = -1,
Expand Down Expand Up @@ -439,17 +444,34 @@ pub type _tmq_conf_t = c_void;

#[no_mangle]
pub extern "C" fn tmq_conf_new() -> *mut _tmq_conf_t {
trace!("tmq_conf_new");
let tmq_conf: TaosMaybeError<TmqConf> = TmqConf::new().into();
trace!(tmq_conf=?tmq_conf, "tmq_conf_new done");
Box::into_raw(Box::new(tmq_conf)) as _
}

#[no_mangle]
pub extern "C" fn tmq_conf_set(
conf: *mut tmq_conf_t,
pub unsafe extern "C" fn tmq_conf_set(
conf: *mut _tmq_conf_t,
key: *const c_char,
value: *const c_char,
) -> tmq_conf_res_t {
todo!()
trace!(conf=?conf, key=?key, value=?value, "tmq_conf_set");

if conf.is_null() || key.is_null() || value.is_null() {
return tmq_conf_res_t::TMQ_CONF_INVALID;
}

match _tmq_conf_set(conf, key, value) {
Ok(_) => tmq_conf_res_t::TMQ_CONF_OK,
Err(err) => {
trace!(err=?err, "tmq_conf_set failed");
match err.code() {
Code::INVALID_PARA => tmq_conf_res_t::TMQ_CONF_INVALID,
_ => tmq_conf_res_t::TMQ_CONF_UNKNOWN,
}
}
}
}

#[no_mangle]
Expand Down Expand Up @@ -641,6 +663,7 @@ pub extern "C" fn tmq_err2str(code: i32) -> *const c_char {
}

#[allow(dead_code)]
#[derive(Debug)]
struct TmqConf {
conf: HashMap<String, String>,
}
Expand All @@ -653,6 +676,56 @@ impl TmqConf {
}
}

unsafe fn _tmq_conf_set(
conf: *mut _tmq_conf_t,
key: *const c_char,
value: *const c_char,
) -> TaosResult<()> {
let key = CStr::from_ptr(key).to_str()?.to_string();
let value = CStr::from_ptr(value).to_str()?.to_string();

match (conf as *mut TaosMaybeError<TmqConf>)
.as_mut()
.and_then(|conf| conf.deref_mut())
{
Some(tmq_conf) => {
match key.to_lowercase().as_str() {
"group.id" | "client.id" | "td.connect.db" => {}
"enable.auto.commit" | "msg.with.table.name" => match value.to_lowercase().as_str()
{
"true" | "false" => {}
_ => {
trace!(key, value, "tmq_conf_set failed");
return Err(TaosError::new(Code::INVALID_PARA, "invalid value"));
}
},
"auto.commit.interval.ms" => match i32::from_str(&value) {
Ok(_) => {}
Err(_) => {
trace!(key, value, "tmq_conf_set failed");
return Err(TaosError::new(Code::INVALID_PARA, "invalid value"));
}
},
"auto.offset.reset" => match value.to_lowercase().as_str() {
"none" | "earliest" | "latest" => {}
_ => {
trace!(key, value, "tmq_conf_set failed");
return Err(TaosError::new(Code::INVALID_PARA, "invalid value"));
}
},
_ => {
trace!("tmq_conf_set failed, unknow key: {key}");
return Err(TaosError::new(Code::FAILED, "unknow key"));
}
}
trace!(key, value, "tmq_conf_set done");
tmq_conf.conf.insert(key, value);
Ok(())
}
None => Err(TaosError::new(Code::OBJECT_IS_NULL, "conf is null")),
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand All @@ -662,4 +735,15 @@ mod tests {
let tmq_conf = tmq_conf_new();
assert!(!tmq_conf.is_null());
}

#[test]
fn test_tmq_conf_set() {
let tmq_conf = tmq_conf_new();
assert!(!tmq_conf.is_null());

let key = c"group.id".as_ptr() as *const c_char;
let value = c"test".as_ptr() as *const c_char;
let res = unsafe { tmq_conf_set(tmq_conf, key, value) };
assert_eq!(res, tmq_conf_res_t::TMQ_CONF_OK);
}
}

0 comments on commit cac3e59

Please sign in to comment.