From a2558524dbface8b98dcf0b6afea45a2dbd3a508 Mon Sep 17 00:00:00 2001 From: Dmitry Patsura Date: Mon, 23 Jan 2023 22:58:23 +0300 Subject: [PATCH] feat(cubestore): Correct metrics for queue/cache (ignore long commands) --- rust/cubestore/cubestore/src/sql/mod.rs | 272 ++++++++++++++---------- 1 file changed, 158 insertions(+), 114 deletions(-) diff --git a/rust/cubestore/cubestore/src/sql/mod.rs b/rust/cubestore/cubestore/src/sql/mod.rs index 2768cc3c7f012..da726329d4337 100644 --- a/rust/cubestore/cubestore/src/sql/mod.rs +++ b/rust/cubestore/cubestore/src/sql/mod.rs @@ -1167,7 +1167,7 @@ impl SqlService for SqlServiceImpl { app_metrics::QUEUE_QUERIES.increment(); let execution_time = SystemTime::now(); - let result = match command { + let (result, track_time) = match command { QueueCommand::Add { key, priority, @@ -1183,21 +1183,24 @@ impl SqlService for SqlServiceImpl { )) .await?; - Ok(Arc::new(DataFrame::new( - vec![ - Column::new("added".to_string(), ColumnType::Boolean, 0), - Column::new("pending".to_string(), ColumnType::Int, 1), - ], - vec![Row::new(vec![ - TableValue::Boolean(response.added), - TableValue::Int(response.pending as i64), - ])], - ))) + ( + Arc::new(DataFrame::new( + vec![ + Column::new("added".to_string(), ColumnType::Boolean, 0), + Column::new("pending".to_string(), ColumnType::Int, 1), + ], + vec![Row::new(vec![ + TableValue::Boolean(response.added), + TableValue::Int(response.pending as i64), + ])], + )), + true, + ) } QueueCommand::Truncate {} => { self.cachestore.queue_truncate().await?; - Ok(Arc::new(DataFrame::new(vec![], vec![]))) + (Arc::new(DataFrame::new(vec![], vec![])), false) } QueueCommand::Cancel { key } => { let columns = vec![ @@ -1206,47 +1209,49 @@ impl SqlService for SqlServiceImpl { ]; let result = self.cachestore.queue_cancel(key.value).await?; - if let Some(result) = result { - Ok(Arc::new(DataFrame::new( - columns, - vec![result.into_row().into_queue_cancel_row()], - ))) + let rows = if let Some(result) = result { + vec![result.into_row().into_queue_cancel_row()] } else { - Ok(Arc::new(DataFrame::new(columns, vec![]))) - } + vec![] + }; + + (Arc::new(DataFrame::new(columns, rows)), true) } QueueCommand::Heartbeat { key } => { self.cachestore.queue_heartbeat(key.value).await?; - Ok(Arc::new(DataFrame::new(vec![], vec![]))) + (Arc::new(DataFrame::new(vec![], vec![])), true) } QueueCommand::MergeExtra { key, payload } => { self.cachestore .queue_merge_extra(key.value, payload) .await?; - Ok(Arc::new(DataFrame::new(vec![], vec![]))) + (Arc::new(DataFrame::new(vec![], vec![])), true) } QueueCommand::Ack { key, result } => { self.cachestore.queue_ack(key.value, result).await?; - Ok(Arc::new(DataFrame::new(vec![], vec![]))) + (Arc::new(DataFrame::new(vec![], vec![])), true) } QueueCommand::Get { key } => { - let columns = vec![ - Column::new("payload".to_string(), ColumnType::String, 0), - Column::new("extra".to_string(), ColumnType::String, 1), - ]; - let result = self.cachestore.queue_get(key.value).await?; - if let Some(result) = result { - Ok(Arc::new(DataFrame::new( - columns, - vec![result.into_row().into_queue_get_row()], - ))) + let rows = if let Some(result) = result { + vec![result.into_row().into_queue_get_row()] } else { - Ok(Arc::new(DataFrame::new(columns, vec![]))) - } + vec![] + }; + + ( + Arc::new(DataFrame::new( + vec![ + Column::new("payload".to_string(), ColumnType::String, 0), + Column::new("extra".to_string(), ColumnType::String, 1), + ], + rows, + )), + true, + ) } QueueCommand::ToCancel { prefix, @@ -1260,16 +1265,19 @@ impl SqlService for SqlServiceImpl { let columns = vec![Column::new("id".to_string(), ColumnType::String, 0)]; - Ok(Arc::new(DataFrame::new( - columns, - rows.into_iter() - .map(|item| { - Row::new(vec![TableValue::String( - item.get_row().get_key().clone(), - )]) - }) - .collect(), - ))) + ( + Arc::new(DataFrame::new( + columns, + rows.into_iter() + .map(|item| { + Row::new(vec![TableValue::String( + item.get_row().get_key().clone(), + )]) + }) + .collect(), + )), + true, + ) } QueueCommand::List { prefix, @@ -1292,88 +1300,106 @@ impl SqlService for SqlServiceImpl { columns.push(Column::new("payload".to_string(), ColumnType::String, 3)); } - Ok(Arc::new(DataFrame::new( - columns, - rows.into_iter() - .map(|item| item.into_row().into_queue_list_row(with_payload)) - .collect(), - ))) + ( + Arc::new(DataFrame::new( + columns, + rows.into_iter() + .map(|item| item.into_row().into_queue_list_row(with_payload)) + .collect(), + )), + true, + ) } QueueCommand::Retrieve { key, concurrency } => { let result = self .cachestore .queue_retrieve(key.value, concurrency) .await?; + let rows = if let Some(result) = result { vec![result.into_row().into_queue_retrieve_row()] } else { vec![] }; - Ok(Arc::new(DataFrame::new( - vec![ - Column::new("payload".to_string(), ColumnType::String, 0), - Column::new("extra".to_string(), ColumnType::String, 1), - ], - rows, - ))) + ( + Arc::new(DataFrame::new( + vec![ + Column::new("payload".to_string(), ColumnType::String, 0), + Column::new("extra".to_string(), ColumnType::String, 1), + ], + rows, + )), + true, + ) } QueueCommand::Result { key } => { - let columns = vec![ - Column::new("payload".to_string(), ColumnType::String, 0), - Column::new("type".to_string(), ColumnType::String, 1), - ]; - let ack_result = self.cachestore.queue_result(key.value).await?; - if let Some(ack_result) = ack_result { + let rows = if let Some(ack_result) = ack_result { match ack_result { QueueResultResponse::Success { value } => { - Ok(Arc::new(DataFrame::new( - columns, - vec![Row::new(vec![ - TableValue::String(value), - TableValue::String("success".to_string()), - ])], - ))) + vec![Row::new(vec![ + TableValue::String(value), + TableValue::String("success".to_string()), + ])] } } } else { - Ok(Arc::new(DataFrame::new(columns, vec![]))) - } + vec![] + }; + + ( + Arc::new(DataFrame::new( + vec![ + Column::new("payload".to_string(), ColumnType::String, 0), + Column::new("type".to_string(), ColumnType::String, 1), + ], + rows, + )), + true, + ) } QueueCommand::ResultBlocking { timeout, key } => { - let columns = vec![ - Column::new("payload".to_string(), ColumnType::String, 0), - Column::new("type".to_string(), ColumnType::String, 1), - ]; - let ack_result = self .cachestore .queue_result_blocking(key.value, timeout) .await?; - if let Some(ack_result) = ack_result { + + let rows = if let Some(ack_result) = ack_result { match ack_result { QueueResultResponse::Success { value } => { - Ok(Arc::new(DataFrame::new( - columns, - vec![Row::new(vec![ - TableValue::String(value), - TableValue::String("success".to_string()), - ])], - ))) + vec![Row::new(vec![ + TableValue::String(value), + TableValue::String("success".to_string()), + ])] } } } else { - Ok(Arc::new(DataFrame::new(vec![], vec![]))) - } + vec![] + }; + + ( + Arc::new(DataFrame::new( + vec![ + Column::new("payload".to_string(), ColumnType::String, 0), + Column::new("type".to_string(), ColumnType::String, 1), + ], + rows, + )), + false, + ) } }; let execution_time = execution_time.elapsed()?; - app_metrics::QUEUE_QUERY_TIME_MS.report(execution_time.as_millis() as i64); + + if track_time { + app_metrics::QUEUE_QUERY_TIME_MS.report(execution_time.as_millis() as i64); + } + debug!("Queue command processing time: {:?}", execution_time,); - result + Ok(result) } CubeStoreStatement::Statement(Statement::Query(q)) => { let logical_plan = self @@ -1443,7 +1469,7 @@ impl SqlService for SqlServiceImpl { app_metrics::CACHE_QUERIES.increment(); let execution_time = SystemTime::now(); - let result = match command { + let (result, track_time) = match command { CacheCommand::Set { key, value, @@ -1457,10 +1483,13 @@ impl SqlService for SqlServiceImpl { .cache_set(CacheItem::new(key, ttl, value), nx) .await?; - Ok(Arc::new(DataFrame::new( - vec![Column::new("success".to_string(), ColumnType::Boolean, 0)], - vec![Row::new(vec![TableValue::Boolean(success)])], - ))) + ( + Arc::new(DataFrame::new( + vec![Column::new("success".to_string(), ColumnType::Boolean, 0)], + vec![Row::new(vec![TableValue::Boolean(success)])], + )), + true, + ) } CacheCommand::Get { key } => { let result = self.cachestore.cache_get(key.value).await?; @@ -1470,47 +1499,62 @@ impl SqlService for SqlServiceImpl { TableValue::Null }; - Ok(Arc::new(DataFrame::new( - vec![Column::new("value".to_string(), ColumnType::String, 0)], - vec![Row::new(vec![value])], - ))) + ( + Arc::new(DataFrame::new( + vec![Column::new("value".to_string(), ColumnType::String, 0)], + vec![Row::new(vec![value])], + )), + true, + ) } CacheCommand::Keys { prefix } => { let rows = self.cachestore.cache_keys(prefix.value).await?; - Ok(Arc::new(DataFrame::new( - vec![Column::new("key".to_string(), ColumnType::String, 0)], - rows.iter() - .map(|i| Row::new(vec![TableValue::String(i.get_row().get_path())])) - .collect(), - ))) + ( + Arc::new(DataFrame::new( + vec![Column::new("key".to_string(), ColumnType::String, 0)], + rows.iter() + .map(|i| { + Row::new(vec![TableValue::String(i.get_row().get_path())]) + }) + .collect(), + )), + true, + ) } CacheCommand::Remove { key } => { self.cachestore.cache_delete(key.value).await?; - Ok(Arc::new(DataFrame::new(vec![], vec![]))) + (Arc::new(DataFrame::new(vec![], vec![])), true) } CacheCommand::Truncate {} => { self.cachestore.cache_truncate().await?; - Ok(Arc::new(DataFrame::new(vec![], vec![]))) + (Arc::new(DataFrame::new(vec![], vec![])), false) } CacheCommand::Incr { path } => { let row = self.cachestore.cache_incr(path.value).await?; - Ok(Arc::new(DataFrame::new( - vec![Column::new("value".to_string(), ColumnType::String, 0)], - vec![Row::new(vec![TableValue::String( - row.get_row().get_value().clone(), - )])], - ))) + ( + Arc::new(DataFrame::new( + vec![Column::new("value".to_string(), ColumnType::String, 0)], + vec![Row::new(vec![TableValue::String( + row.get_row().get_value().clone(), + )])], + )), + true, + ) } }; let execution_time = execution_time.elapsed()?; - app_metrics::CACHE_QUERY_TIME_MS.report(execution_time.as_millis() as i64); + + if track_time { + app_metrics::CACHE_QUERY_TIME_MS.report(execution_time.as_millis() as i64); + } + debug!("Cache command processing time: {:?}", execution_time,); - result + Ok(result) } _ => Err(CubeError::user(format!("Unsupported SQL: '{}'", query))),