Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[TD-32234] feat(ws): websocket stmt2 support #371

Merged
merged 66 commits into from
Dec 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
66 commits
Select commit Hold shift + click to select a range
4ebb82b
feat(*): add ws stmt2
qevolg Oct 30, 2024
a8b1ac1
refactor(taos-ws): add stmt2 bind
qevolg Nov 3, 2024
9c619b5
test(stmt2): modify test cases
qevolg Nov 4, 2024
be0917e
test(stmt2): add unit tests
qevolg Nov 8, 2024
1b94180
test(stmt2): add unit tests
qevolg Nov 9, 2024
b6e8c7a
refactor(*): add Bindable and AsyncBindable trait
qevolg Nov 11, 2024
10734ce
refactor(*): add call adapter interfaces
qevolg Nov 11, 2024
422c051
refactor(stmt2): debug interface
qevolg Nov 11, 2024
20218f0
refactor(taso-query): add big endian to little endian conversion
qevolg Nov 12, 2024
b35e109
refactor(stmt2): modify Stmt2Send
qevolg Nov 12, 2024
3616b43
refactor(stmt2): add bind
qevolg Nov 17, 2024
2b7a7a9
refactor(stmt2): modify stmt2 api
qevolg Nov 17, 2024
2c8fc9f
test(stmt2): add bind test cases
qevolg Nov 18, 2024
7a4f1cc
refactor(stmt2): modify bind
qevolg Nov 18, 2024
e342052
refactor(ws): reuse ws connection
qevolg Nov 21, 2024
0ac45e1
Merge remote-tracking branch 'origin/main' into feat/TD-32234
qevolg Nov 21, 2024
396650f
refactor(taos): modify stmt2 feature
qevolg Nov 21, 2024
67e9a9c
refactor(*): generate reqid
qevolg Nov 22, 2024
1e9a2bd
refactor(query): rename reqid to req_id
qevolg Nov 22, 2024
4c8c751
test(taos): add stmt2 test cases
qevolg Nov 23, 2024
c2cefd5
refactor(*): rename stmt2 execute to exec
qevolg Nov 23, 2024
4400b88
refactor(*): modify stmt2 test cases
qevolg Nov 23, 2024
59d5a10
test(util): modify req_id test cases
qevolg Nov 23, 2024
0f026e5
refactor(query): add macros to simplify code
qevolg Nov 24, 2024
9ffda54
refactor(stmt2): modify log level
qevolg Nov 25, 2024
9e1b778
refactor(views): rename macros
qevolg Nov 26, 2024
fe95f5f
refactor(*): add stmt2 result
qevolg Nov 28, 2024
b81bf3a
refactor(stmt2): refactor stmt2 close
qevolg Nov 28, 2024
760260a
refactor(ws): refactor fetch
qevolg Nov 28, 2024
632ee1b
refactor(stmt2): add macros to reduce redundant code
qevolg Nov 28, 2024
9868a64
refactor(stmt2): refactor write timestamp
qevolg Nov 28, 2024
eb2e02b
refactor(stmt2): refactor error handling
qevolg Nov 29, 2024
0152d05
test(*): add test cases
qevolg Dec 2, 2024
e85e1c9
refactor(*): cleaning up the code
qevolg Dec 2, 2024
4a6c344
test(*): modify test cases
qevolg Dec 2, 2024
bfb661c
refactor(stmt2): optimize code and modify test case
qevolg Dec 6, 2024
d1684be
test(stmt2): modify and add test case
qevolg Dec 6, 2024
dfdefc0
test(stmt2): comment test case
qevolg Dec 6, 2024
178a456
test(stmt2): modify test case
qevolg Dec 6, 2024
0138cec
style(stmt2): format code
qevolg Dec 7, 2024
c39ab2d
refactor(query): inline generate_req_id
qevolg Dec 8, 2024
a78c439
refactor(stmt2): modify result and bind
qevolg Dec 11, 2024
2a9802d
test(taos): add stmt2 ws bench
qevolg Dec 12, 2024
da898d3
example(taos): move sql to examples and add stmt2 bench
qevolg Dec 12, 2024
7642ad7
Merge branch 'feat/TD-32234' into test/stmt2-ws-bench
qevolg Dec 13, 2024
f11cfd9
refactor(*): modify Stmt2BindData
qevolg Dec 13, 2024
dc9d0c7
test(taos): modify sql and stmt2 bench
qevolg Dec 13, 2024
cd2e38f
refactor(ws): insert cancels calling free_result
qevolg Dec 13, 2024
4666a24
test(taos): add stmt bench
qevolg Dec 16, 2024
8e975a9
test(taos): rename files
qevolg Dec 17, 2024
46ed772
test(taos): move sql, stmt and stmt2 perf tests to tests
qevolg Dec 17, 2024
f8e1834
test(taos): merge stmt perf tests
qevolg Dec 17, 2024
9694adb
bench(taos): move perf tests to benches
qevolg Dec 17, 2024
8f77ccd
test(taos): modify perf test case
qevolg Dec 18, 2024
e805d71
refactor(*): rename stmt2 trait
qevolg Dec 18, 2024
3591ca9
style(taos): format Cargo.toml
qevolg Dec 18, 2024
5bb19bb
test(taos): modify interlace ts
qevolg Dec 18, 2024
50a03a1
test(taos): check count
qevolg Dec 18, 2024
2bccccd
Merge pull request #398 from taosdata/test/stmt2-ws-bench
qevolg Dec 18, 2024
dfb3769
Merge branch 'main' into feat/TD-32234
qevolg Dec 18, 2024
39888d0
refactor(*): fix conflicts
qevolg Dec 18, 2024
c0ebc19
Merge branch 'main' into feat/TD-32234
qevolg Dec 19, 2024
9d87453
refactor(*): eliminate warnings
qevolg Dec 19, 2024
4503db0
test(taos): extract dsn
qevolg Dec 19, 2024
4099e64
test(taos): generate database name using timestamp and drop database
qevolg Dec 20, 2024
e1e6130
refactor(*): rename result to result_set and Stmt2BindData to Stmt2Bi…
qevolg Dec 23, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .cargo/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ rustflags = [
"-Wclippy::unnecessary_struct_initialization",
"-Wclippy::unused_peekable",
"-Wclippy::unused_rounding",
"-Wclippy::useless_let_if_seq",
"-Wclippy::while_float",

# Pedantic Group
Expand Down
5 changes: 5 additions & 0 deletions taos-query/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ tokio = { version = "1", features = [
lazy_static = "1.4"
derive_builder = "0.12.0"
tracing = { version = "0.1", features = ["log"] }
uuid = { version = "1.11.0", features = [
"v4",
"fast-rng",
"macro-diagnostics",
] }

[dev-dependencies]
flate2 = "1"
Expand Down
45 changes: 4 additions & 41 deletions taos-query/src/common/raw/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,18 +241,6 @@ impl RawBlock {
unsafe { (v as *const u64).read_unaligned() == 0x7FFFFF0000000000 }
}

// const BOOL_NULL: u8 = 0x2;
// const TINY_INT_NULL: i8 = i8::MIN;
// const SMALL_INT_NULL: i16 = i16::MIN;
// const INT_NULL: i32 = i32::MIN;
// const BIG_INT_NULL: i64 = i64::MIN;
// const FLOAT_NULL: f32 = 0x7FF00000i32 as f32;
// const DOUBLE_NULL: f64 = 0x7FFFFF0000000000i64 as f64;
// const U_TINY_INT_NULL: u8 = u8::MAX;
// const U_SMALL_INT_NULL: u16 = u16::MAX;
// const U_INT_NULL: u32 = u32::MAX;
// const U_BIG_INT_NULL: u64 = u64::MAX;

let layout = Rc::new(RefCell::new(Layout::INLINE_DEFAULT.with_schema_changed()));

let bytes = bytes.into();
Expand All @@ -273,8 +261,7 @@ impl RawBlock {

for (i, (field, length)) in fields.iter().zip(lengths).enumerate() {
macro_rules! _primitive_view {
($ty:ident, $prim:ty) => {
{
($ty:ident, $prim:ty) => {{
debug_assert_eq!(field.bytes(), *length);
// column start
let start = offset;
Expand All @@ -283,31 +270,17 @@ impl RawBlock {
// byte slice from start to end: `[start, end)`.
let data = bytes.slice(start..offset);
let nulls = NullBits::from_iter((0..rows).map(|row| unsafe {
paste::paste!{ [<$ty:snake _is_null>] (
paste::paste! { [<$ty:snake _is_null>] (
data
.as_ptr()
.offset(row as isize * std::mem::size_of::<$prim>() as isize)
as *const $prim,
) }
}));
// value as target type
// let value_slice = unsafe {
// std::slice::from_raw_parts(
// transmute::<*const u8, *const $prim>(data.as_ptr()),
// rows,
// )
// };

// Set data lengths for v3-compatible block.
data_lengths[i] = data.len() as u32;

// generate nulls bitmap.
// let nulls = NullsMut::from_bools(
// value_slice
// .iter()
// .map(|v| paste::paste!{ [<$ty:snake _is_null>](v as _) })
// // .map(|b| *b as u64 == paste::paste! { [<$ty:snake:upper _NULL>] }),
// )
// .into_nulls();
// build column view
let column = paste::paste! { ColumnView::$ty([<$ty View>] { nulls, data }) };
columns.push(column);
Expand Down Expand Up @@ -831,10 +804,6 @@ impl RawBlock {
self.columns.get_unchecked(col).get_ref_unchecked(row)
}

// unsafe fn get_col_unchecked(&self, col: usize) -> &ColumnView {
// self.columns.get_unchecked(col)
// }

pub fn to_values(&self) -> Vec<Vec<Value>> {
self.rows().map(RowView::into_values).collect_vec()
}
Expand All @@ -859,13 +828,6 @@ impl RawBlock {
PrettyBlock::new(self)
}

// pub fn fields_iter(&self) -> impl Iterator<Item = Field> + '_ {
// self.schemas()
// .iter()
// .zip(self.field_names())
// .map(|(schema, name)| Field::new(name, schema.ty, schema.len))
// }

pub fn to_create(&self) -> Option<MetaCreate> {
self.table_name().map(|table_name| MetaCreate::Normal {
table_name: table_name.to_string(),
Expand Down Expand Up @@ -1029,6 +991,7 @@ impl crate::prelude::sync::Inlinable for InlineBlock {
Ok(self.0.len())
}
}

#[async_trait::async_trait]
impl crate::prelude::AsyncInlinable for InlineBlock {
async fn read_inlined<R: tokio::io::AsyncRead + Send + Unpin>(
Expand Down
20 changes: 0 additions & 20 deletions taos-query/src/common/raw/views/big_int_unsigned_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,26 +257,6 @@ impl ExactSizeIterator for UBigIntViewIter<'_> {
}
}

impl<A: Into<Option<Item>>> FromIterator<A> for UBigIntView {
fn from_iter<T: IntoIterator<Item = A>>(iter: T) -> Self {
let (nulls, mut values): (Vec<bool>, Vec<_>) = iter
.into_iter()
.map(|v| match v.into() {
Some(v) => (false, v),
None => (true, Item::default()),
})
.unzip();
Self {
nulls: NullBits::from_iter(nulls),
data: Bytes::from({
let (ptr, len, cap) = (values.as_mut_ptr(), values.len(), values.capacity());
std::mem::forget(values);
unsafe { Vec::from_raw_parts(ptr as *mut u8, len * ITEM_SIZE, cap * ITEM_SIZE) }
}),
}
}
}

#[test]
fn test_slice() {
let data = [0, 1, Item::MIN, Item::MAX];
Expand Down
20 changes: 0 additions & 20 deletions taos-query/src/common/raw/views/big_int_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,26 +258,6 @@ impl ExactSizeIterator for BigIntViewIter<'_> {
}
}

impl<A: Into<Option<Item>>> FromIterator<A> for View {
fn from_iter<T: IntoIterator<Item = A>>(iter: T) -> Self {
let (nulls, mut values): (Vec<bool>, Vec<_>) = iter
.into_iter()
.map(|v| match v.into() {
Some(v) => (false, v),
None => (true, Item::default()),
})
.unzip();
Self {
nulls: NullBits::from_iter(nulls),
data: Bytes::from({
let (ptr, len, cap) = (values.as_mut_ptr(), values.len(), values.capacity());
std::mem::forget(values);
unsafe { Vec::from_raw_parts(ptr as *mut u8, len * ITEM_SIZE, cap * ITEM_SIZE) }
}),
}
}
}

#[test]
fn test_slice() {
let data = [0, 1, 2, i64::MAX];
Expand Down
20 changes: 0 additions & 20 deletions taos-query/src/common/raw/views/bool_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,26 +251,6 @@ impl ExactSizeIterator for BoolViewIter<'_> {
}
}

impl<A: Into<Option<bool>>> FromIterator<A> for BoolView {
fn from_iter<T: IntoIterator<Item = A>>(iter: T) -> Self {
let (nulls, mut values): (Vec<bool>, Vec<_>) = iter
.into_iter()
.map(|v| match v.into() {
Some(v) => (false, v),
None => (true, false),
})
.unzip();
Self {
nulls: NullBits::from_iter(nulls),
data: Bytes::from({
let (ptr, len, cap) = (values.as_mut_ptr(), values.len(), values.capacity());
std::mem::forget(values);
unsafe { Vec::from_raw_parts(ptr as *mut u8, len, cap) }
}),
}
}
}

#[test]
fn test_bool_slice() {
let data = [true, false, false, true];
Expand Down
21 changes: 0 additions & 21 deletions taos-query/src/common/raw/views/double_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,27 +258,6 @@ impl ExactSizeIterator for DoubleViewIter<'_> {
}
}

impl<A: Into<Option<Item>>> FromIterator<A> for View {
fn from_iter<T: IntoIterator<Item = A>>(iter: T) -> Self {
let (nulls, mut values): (Vec<bool>, Vec<_>) = iter
.into_iter()
.map(|v| match v.into() {
Some(v) => (false, v),
None => (true, Item::default()),
})
.unzip();
// dbg!()
Self {
nulls: NullBits::from_iter(nulls),
data: Bytes::from({
let (ptr, len, cap) = (values.as_mut_ptr(), values.len(), values.capacity());
std::mem::forget(values);
unsafe { Vec::from_raw_parts(ptr as *mut u8, len * ITEM_SIZE, cap * ITEM_SIZE) }
}),
}
}
}

#[test]
fn test_slice() {
let data = [0., 1., Item::MIN, Item::MAX];
Expand Down
20 changes: 0 additions & 20 deletions taos-query/src/common/raw/views/float_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,26 +258,6 @@ impl ExactSizeIterator for FloatViewIter<'_> {
}
}

impl<A: Into<Option<Item>>> FromIterator<A> for View {
fn from_iter<T: IntoIterator<Item = A>>(iter: T) -> Self {
let (nulls, mut values): (Vec<bool>, Vec<_>) = iter
.into_iter()
.map(|v| match v.into() {
Some(v) => (false, v),
None => (true, Item::default()),
})
.unzip();
Self {
nulls: NullBits::from_iter(nulls),
data: Bytes::from({
let (ptr, len, cap) = (values.as_mut_ptr(), values.len(), values.capacity());
std::mem::forget(values);
unsafe { Vec::from_raw_parts(ptr as *mut u8, len * ITEM_SIZE, cap * ITEM_SIZE) }
}),
}
}
}

#[test]
fn test_slice() {
let data = [0., 1., Item::MIN, Item::MAX];
Expand Down
20 changes: 0 additions & 20 deletions taos-query/src/common/raw/views/int_unsigned_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,26 +253,6 @@ impl ExactSizeIterator for UIntViewIter<'_> {
}
}

impl<A: Into<Option<Item>>> FromIterator<A> for View {
fn from_iter<T: IntoIterator<Item = A>>(iter: T) -> Self {
let (nulls, mut values): (Vec<bool>, Vec<_>) = iter
.into_iter()
.map(|v| match v.into() {
Some(v) => (false, v),
None => (true, Item::default()),
})
.unzip();
Self {
nulls: NullBits::from_iter(nulls),
data: Bytes::from({
let (ptr, len, cap) = (values.as_mut_ptr(), values.len(), values.capacity());
std::mem::forget(values);
unsafe { Vec::from_raw_parts(ptr as *mut u8, len * ITEM_SIZE, cap * ITEM_SIZE) }
}),
}
}
}

#[test]
fn test_slice() {
let data = [0, 1, Item::MIN, Item::MAX];
Expand Down
83 changes: 51 additions & 32 deletions taos-query/src/common/raw/views/int_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,40 +259,59 @@ impl ExactSizeIterator for IntViewIter<'_> {
}
}

impl<A: Into<Option<Item>>> FromIterator<A> for View {
fn from_iter<T: IntoIterator<Item = A>>(iter: T) -> Self {
let (nulls, mut values): (Vec<bool>, Vec<_>) = iter
.into_iter()
.map(|v| match v.into() {
Some(v) => (false, v),
None => (true, Item::default()),
})
.unzip();
Self {
nulls: NullBits::from_iter(nulls),
data: Bytes::from({
let (ptr, len, cap) = (values.as_mut_ptr(), values.len(), values.capacity());
std::mem::forget(values);
unsafe { Vec::from_raw_parts(ptr as *mut u8, len * ITEM_SIZE, cap * ITEM_SIZE) }
}),
#[cfg(test)]
mod tests {
use byteorder::{BigEndian, ByteOrder};

use crate::views::int_view::{Item, View, ITEM_SIZE};

#[test]
fn test_slice() {
let data = [0, 1, Item::MIN, Item::MAX];
let view = View::from_iter(data);
dbg!(&view);
let slice = view.slice(1..3);
dbg!(&slice);

let data = [None, Some(Item::MIN), Some(Item::MAX), None];
let view = View::from_iter(data);
dbg!(&view);
let range = 1..4;
let slice = view.slice(range.clone()).unwrap();
for (v, i) in slice.iter().zip(range) {
assert_eq!(v, data[i]);
}
}
}

#[test]
fn test_slice() {
let data = [0, 1, Item::MIN, Item::MAX];
let view = View::from_iter(data);
dbg!(&view);
let slice = view.slice(1..3);
dbg!(&slice);

let data = [None, Some(Item::MIN), Some(Item::MAX), None];
let view = View::from_iter(data);
dbg!(&view);
let range = 1..4;
let slice = view.slice(range.clone()).unwrap();
for (v, i) in slice.iter().zip(range) {
assert_eq!(v, data[i]);
#[cfg(target_endian = "little")]
#[test]
fn test_from_iterator() {
let data = [0x12345678];
let view = View::from_iter(data);
let bytes = view.data;
assert_eq!(bytes.to_vec(), vec![0x78, 0x56, 0x34, 0x12]);
}

#[cfg(target_endian = "little")]
#[test]
fn test_from_iterator_mock_big_endian() {
let mut bytes = [0u8; 16];
BigEndian::write_i32(&mut bytes, 0x12345678);
BigEndian::write_i32(&mut bytes[4..], 0x78563412);
BigEndian::write_i32(&mut bytes[8..], 0x12131415);
BigEndian::write_i32(&mut bytes[12..], 0x51413121);

for i in (0..bytes.len()).step_by(ITEM_SIZE) {
let j = i + ITEM_SIZE;
let val = Item::from_be_bytes(bytes[i..j].try_into().unwrap());
bytes[i..j].copy_from_slice(&val.to_le_bytes());
}

let expect = vec![
0x78, 0x56, 0x34, 0x12, 0x12, 0x34, 0x56, 0x78, 0x15, 0x14, 0x13, 0x12, 0x21, 0x31,
0x41, 0x51,
];

assert_eq!(bytes.to_vec(), expect);
}
}
Loading
Loading