Skip to content

Commit

Permalink
refactor(*): modify logs
Browse files Browse the repository at this point in the history
  • Loading branch information
qevolg committed Dec 26, 2024
1 parent c7d188b commit 29ab1c3
Show file tree
Hide file tree
Showing 4 changed files with 199 additions and 42 deletions.
3 changes: 2 additions & 1 deletion taos-ws-sys/examples/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ get-err:
gcc -g ws-get-err.c ${WS_FLAGS} -o ./target/ws-get-err && ./target/ws-get-err
fetch-row:
gcc -g ws-fetch-row.c ${WS_FLAGS} -o ./target/ws-fetch-row && ./target/ws-fetch-row

test:
gcc -g test.c ${WS_FLAGS} -o ./target/test && ./target/test
tmq-basic:
gcc -g ws-tmq-basic.c ${WS_FLAGS} -o ./target/ws-tmq-basic && ./target/ws-tmq-basic

Expand Down
48 changes: 48 additions & 0 deletions taos-ws-sys/examples/test.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
#include "taosws.h"
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <assert.h>

int main()
{
char *dsn = getenv("TAOS_DSN");
if (dsn == NULL)
{
dsn = "ws://localhost:6041";
}
WS_TAOS *taos = ws_connect(dsn);

for (int i = 0; i < 50; i++)
{
// printf("query start\n");
WS_RES *res = ws_query_timeout(taos, "insert into t0 values(now, 1)", 1);
// printf("query end\n");
if (res == NULL)
{
int errno = ws_errno(res);
char *errstr = ws_errstr(taos);
printf("Query failed[%d]: %s", errno, errstr);
exit(-1);
}

int code = ws_free_result(res);
if (code != 0)
{
printf("Free result failed[%d]", code);
exit(-1);
}
}

printf("close start\n");
int code1 = ws_close(taos);
printf("close end\n");
if (code1 != 0)
{
printf("Close connection failed[%d]", code1);
exit(-1);
}

return 0;
}
61 changes: 44 additions & 17 deletions taos-ws-sys/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::str::Utf8Error;
use std::string::FromUtf8Error;
use std::sync::OnceLock;
use std::thread;
use std::time::Duration;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};

use bytes::Bytes;
use cargo_metadata::MetadataCommand;
Expand All @@ -20,6 +20,7 @@ use taos_query::util::{hex, InlineBytes, InlineNChar, InlineStr};
use taos_query::{block_in_place_or_global, DsnError, Fetchable, Queryable, TBuilder};
use taos_ws::consumer::Offset;
pub use taos_ws::query::asyn::WS_ERROR_NO;
use taos_ws::query::asyn::{QT, SEND_TIME};
use taos_ws::query::{Error, ResultSet, Taos};
use taos_ws::TaosBuilder;

Expand Down Expand Up @@ -866,7 +867,7 @@ unsafe fn connect_with_dsn(dsn: *const c_char) -> WsTaos {
} else {
let mut taos = builder.build()?;

builder.ping(&mut taos)?;
// builder.ping(&mut taos)?;
Ok(taos)
}
}
Expand Down Expand Up @@ -940,13 +941,25 @@ pub unsafe extern "C" fn ws_connect(dsn: *const c_char) -> *mut WS_TAOS {
builder.init();
});

match connect_with_dsn(dsn) {
if CONNECT_START.is_none() {
CONNECT_START = Some(Instant::now());
}

let ts = SystemTime::now().duration_since(UNIX_EPOCH).unwrap();
tracing::error!("connect start: {:?}", ts);

let client = match connect_with_dsn(dsn) {
Ok(client) => Box::into_raw(Box::new(client)) as _,
Err(err) => {
set_error_and_get_code(err);
std::ptr::null_mut()
}
}
};

let ts = SystemTime::now().duration_since(UNIX_EPOCH).unwrap();
tracing::error!("connect end: {:?}", ts);

client
}

#[no_mangle]
Expand All @@ -972,14 +985,22 @@ pub unsafe extern "C" fn ws_get_server_info(taos: *mut WS_TAOS) -> *const c_char
version_info.as_ptr()
}

static mut CONNECT_START: Option<Instant> = None;

static mut QUERY_TIME: Duration = Duration::ZERO;
static mut FREE_TIME: Duration = Duration::ZERO;

#[no_mangle]
/// Same to taos_close. This should always be called after everything done with the connection.
pub unsafe extern "C" fn ws_close(taos: *mut WS_TAOS) -> i32 {
let ts = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos();
tracing::error!("close: {ts}");
CONNECT_START.take().map(|start| {
let elapsed = start.elapsed();
tracing::error!("connect time: {:?}", elapsed);
});

tracing::error!("query time: {:?}", QUERY_TIME);
tracing::error!("send time: {:?}", SEND_TIME);
tracing::error!("send recv time: {:?}", QT.send_recv_duration());

if !taos.is_null() {
tracing::trace!("close connection {taos:p}");
Expand Down Expand Up @@ -1062,8 +1083,6 @@ pub unsafe extern "C" fn ws_stop_query(rs: *mut WS_RES) -> i32 {
}
}

static mut TOTAL_TIME: u128 = 0;

#[no_mangle]
/// Query a sql with timeout.
///
Expand All @@ -1073,14 +1092,20 @@ pub unsafe extern "C" fn ws_query_timeout(
sql: *const c_char,
seconds: u32,
) -> *mut WS_RES {
let start = std::time::Instant::now();

let ts = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_nanos();
tracing::error!("query start: {:?}", ts);
let start = Instant::now();
let res: WsMaybeError<WsResultSet> =
query_with_sql_timeout(taos, sql, Duration::from_secs(seconds as _)).into();

TOTAL_TIME += start.elapsed().as_nanos();
tracing::error!("query total time: {TOTAL_TIME}");

QUERY_TIME += start.elapsed();
let ts = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_nanos();
tracing::error!("query end: {:?}", ts);
Box::into_raw(Box::new(res)) as _
}

Expand Down Expand Up @@ -1352,9 +1377,11 @@ pub unsafe extern "C" fn ws_num_fields(rs: *const WS_RES) -> i32 {
#[no_mangle]
/// Same to taos_free_result. Every websocket result-set object should be freed with this method.
pub unsafe extern "C" fn ws_free_result(rs: *mut WS_RES) -> i32 {
let start = Instant::now();
if !rs.is_null() {
let _ = Box::from_raw(rs as *mut WsMaybeError<WsResultSet>);
}
FREE_TIME += start.elapsed();
Code::SUCCESS.into()
}

Expand Down
129 changes: 105 additions & 24 deletions taos-ws/src/query/asyn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::pin::Pin;
use std::sync::atomic::AtomicU64;
use std::sync::Arc;
use std::task::Poll;
use std::time::{Duration, Instant};
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};

use anyhow::bail;
use futures::stream::SplitStream;
Expand Down Expand Up @@ -531,6 +531,12 @@ async fn read_queries(
loop {
tokio::select! {
res = reader.try_next() => {
let ts = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_nanos();
tracing::error!("recv: {ts:?}");
unsafe {QT.recv();}
match res {
Ok(frame) => {
if let Some(frame) = frame {
Expand Down Expand Up @@ -606,6 +612,95 @@ pub fn is_support_binary_sql(v1: &str) -> bool {
is_greater_than_or_equal_to(v1, "3.3.2.0")
}

pub struct QueryTime {
pub query_start: Option<Instant>,
pub query_ws_send_duration: Duration,
pub query_ws_send_start: Option<Instant>,
pub query_ws_recv_duration: Duration,
pub query_handle_start: Option<Instant>,
pub query_handle_duration: Duration,
pub send_start: Option<Instant>,
pub send_recv_duration: Duration,
}

impl QueryTime {
pub const fn new() -> Self {
Self {
query_start: None,
query_ws_send_duration: Duration::ZERO,
query_ws_send_start: None,
query_ws_recv_duration: Duration::ZERO,
query_handle_start: None,
query_handle_duration: Duration::ZERO,
send_start: None,
send_recv_duration: Duration::ZERO,
}
}

pub fn with_send_start(&mut self, start: Instant) {
self.send_start = Some(start);
}

pub fn recv(&mut self) {
self.send_start.take().map(|start| {
self.send_recv_duration += start.elapsed();
});
}

pub fn send_recv_duration(&self) -> Duration {
self.send_recv_duration
}

// pub fn with_query_start(&mut self, start: Instant) {
// self.query_start = Some(start);
// }

// pub fn query_ws_send(&mut self) {
// if self.query_start.is_none() {
// return;
// }
// self.query_ws_send_duration += self.query_start.unwrap().elapsed();
// }

// pub fn query_ws_send_duration(&self) -> Duration {
// self.query_ws_send_duration
// }

// pub fn with_query_ws_send_start(&mut self, start: Instant) {
// self.query_ws_send_start = Some(start);
// }

// pub fn query_ws_recv(&mut self) {
// if self.query_ws_send_start.is_none() {
// return;
// }
// self.query_ws_recv_duration += self.query_ws_send_start.unwrap().elapsed();
// }

// pub fn query_ws_recv_duration(&self) -> Duration {
// self.query_ws_recv_duration
// }

// pub fn with_query_handle_start(&mut self, start: Instant) {
// self.query_handle_start = Some(start);
// }

// pub fn query_handle(&mut self) {
// if self.query_handle_start.is_none() {
// return;
// }
// self.query_handle_duration += self.query_handle_start.unwrap().elapsed();
// }

// pub fn query_handle_duration(&self) -> Duration {
// self.query_handle_duration
// }
}

pub static mut QT: QueryTime = QueryTime::new();

pub static mut SEND_TIME: Duration = Duration::ZERO;

impl WsTaos {
/// Build TDengine websocket client from dsn.
///
Expand All @@ -619,12 +714,6 @@ impl WsTaos {
}

pub(crate) async fn from_wsinfo(info: &TaosBuilder) -> RawResult<Self> {
let ts = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos();
tracing::error!("connect: {ts}");

let ws = info.build_stream(info.to_query_url()).await?;

let req_id = 0;
Expand Down Expand Up @@ -743,16 +832,8 @@ impl WsTaos {
let close_listener = rx.clone();

tokio::spawn(async move {
let ts = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos();
tracing::error!("consume: {ts}");

let mut interval = time::interval(Duration::from_secs(53));

let mut flag = true;

loop {
tokio::select! {
_ = interval.tick() => {
Expand All @@ -770,15 +851,10 @@ impl WsTaos {
msg = msg_recv.recv_async() => {
match msg {
Ok(msg) => {
if flag {
let ts = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos();
tracing::error!("first send req: {ts}");
flag = false;
}

let ts = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_nanos();
tracing::error!("query send: {ts:?}");
let start = Instant::now();
unsafe {QT.with_send_start(start);}
if let Err(err) = sender.send(msg).await {
tracing::error!("Write websocket error: {}", err);
let mut keys = Vec::new();
Expand All @@ -790,6 +866,11 @@ impl WsTaos {
}
break;
}
let ts = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_nanos();
tracing::error!("query send end: {ts:?}");
unsafe {
SEND_TIME += start.elapsed();
}
}
Err(_) => {
break;
Expand Down

0 comments on commit 29ab1c3

Please sign in to comment.