Skip to content

Commit

Permalink
Merge branch 'main' into zz-flow-12.09
Browse files Browse the repository at this point in the history
  • Loading branch information
ZzIsGod1019 committed Dec 14, 2024
2 parents 36a9dd9 + 6aaed2f commit f43b90f
Show file tree
Hide file tree
Showing 9 changed files with 129 additions and 71 deletions.
2 changes: 1 addition & 1 deletion backend/middlewares/schedule/src/serv/schedule_job_serv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ pub(crate) async fn find_task(
funs: &TardisFunsInst,
ctx: &TardisContext,
) -> TardisResult<TardisPage<ScheduleTaskInfoResp>> {
let resp = SpiLogClient::find(
let resp = SpiLogClient::findv2(
LogItemFindReq {
tag: "schedule_task".to_string(),
keys: Some(vec![job_code.into()]),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{sync::Arc, task::ready, time::Duration};

use bios_sdk_invoke::clients::spi_log_client::{LogItemAddReq, LogItemAddV2Req, LogItemFindReq, SpiLogClient};
use bios_sdk_invoke::clients::spi_log_client::{LogItemAddV2Req, LogItemFindReq, SpiLogClient};
use tardis::{
basic::dto::TardisContext,
chrono::Utc,
Expand Down Expand Up @@ -34,10 +34,10 @@ impl EventComponent for SpiLog {
let ctx = self.ctx.clone();
let code = code.to_string();
let _handle = tokio::spawn(async move {
let result = SpiLogClient::add(
LogItemAddReq {
let result = SpiLogClient::addv2(
LogItemAddV2Req {
tag: JOB_TAG.to_string(),
content: "add job".into(),
content: tardis::serde_json::Value::Null,
key: Some(code.to_string()),
op: Some(OP_ADD.to_string()),
ts: Some(Utc::now()),
Expand All @@ -59,10 +59,10 @@ impl EventComponent for SpiLog {
let ctx = self.ctx.clone();
let code = code.to_string();
let _handle = tokio::spawn(async move {
let result = SpiLogClient::add(
LogItemAddReq {
let result = SpiLogClient::addv2(
LogItemAddV2Req {
tag: JOB_TAG.to_string(),
content: "delete job".into(),
content: tardis::serde_json::Value::Null,
key: Some(code.to_string()),
op: Some(OP_DELETE.to_string()),
ts: Some(Utc::now()),
Expand All @@ -84,10 +84,10 @@ impl EventComponent for SpiLog {
let ctx = self.ctx.clone();
let code = code.to_string();
let _handle = tokio::spawn(async move {
let result = SpiLogClient::add(
LogItemAddReq {
let result = SpiLogClient::addv2(
LogItemAddV2Req {
tag: TASK_TAG.to_string(),
content: "start request".into(),
content: tardis::serde_json::Value::Null,
key: Some(code.to_string()),
op: Some(OP_EXECUTE_START.to_string()),
ts: Some(Utc::now()),
Expand Down
6 changes: 6 additions & 0 deletions backend/spi/spi-log/src/serv/pg/log_pg_item_serv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,12 @@ pub async fn find(find_req: &mut LogItemFindReq, funs: &TardisFunsInst, ctx: &Ta
for val in value {
sql_vals.push(val);
}
} else if ext_or_item.op == BasicQueryOpKind::IsNull {
where_fragments.push(format!("ext ->> '{}' is null", ext_or_item.field));
} else if ext_or_item.op == BasicQueryOpKind::IsNotNull {
where_fragments.push(format!("(ext ->> '{}' is not null or ext ->> '{}' != '')", ext_or_item.field, ext_or_item.field));
} else if ext_or_item.op == BasicQueryOpKind::IsNullOrEmpty {
where_fragments.push(format!("(ext ->> '{}' is null or ext ->> '{}' = '')", ext_or_item.field, ext_or_item.field));
} else {
if value.len() > 1 {
return err_notfound(ext_or_item);
Expand Down
74 changes: 62 additions & 12 deletions backend/spi/spi-log/src/serv/pgv2/log_pg_item_serv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ pub async fn addv2(add_req: &mut LogItemAddV2Req, funs: &TardisFunsInst, ctx: &T
let id = add_req.idempotent_id.clone().unwrap_or(TardisFuns::field.nanoid());

let bs_inst = inst.inst::<TardisRelDBClient>();
// 初始化要保存的内容
let mut insert_content = add_req.content.clone();
let (mut conn, table_name) = log_pg_initializer::init_table_and_conn(bs_inst, &add_req.tag, ctx, true).await?;
conn.begin().await?;
Expand All @@ -59,34 +60,64 @@ pub async fn addv2(add_req: &mut LogItemAddV2Req, funs: &TardisFunsInst, ctx: &T
.await?;

if let Some(last_record) = get_last_record {
let last_content: JsonValue = last_record.try_get("", "content")?;
let mut last_content: JsonValue = last_record.try_get("", "content")?;
let last_ts: DateTime<Utc> = last_record.try_get("", "ts")?;
let last_key: String = last_record.try_get("", "key")?;

insert_content = last_content;
insert_content = last_content.clone();
merge(&mut insert_content, add_req.content.clone());

//把上次的内容有ref字段的改为ref_key
for ref_field in &ref_fields {
if let Some(field_value) = insert_content.get_mut(ref_field) {
if !is_log_ref(field_value) {
*field_value = JsonValue::String(get_ref_filed_value(&last_ts, &last_key));
//如果上次和这次同时有这个ref字段
if let (Some(insert_field_value), Some(last_field_value)) = (insert_content.get_mut(ref_field), last_content.get_mut(ref_field)) {
//上次ref_key默认指向为上一条
let mut last_ref_key = JsonValue::String(get_ref_filed_value(&last_ts, &last_key));
// ref字段的原始值 默认为上一条
let mut ref_origin_value = last_field_value.clone();
// 判断上次ref字段是否已经是ref_key了
if is_log_ref(last_field_value) {
if let Some(last_field_value_str) = last_field_value.as_str() {
//是的话 把ref_key指回上一条所指的ref_key
last_ref_key = last_field_value.clone();
// 取出原始字段
let (last_ts, last_key) = parse_ref_ts_key(last_field_value_str)?;
let ref_record = conn
.query_one(
&format!(
r#"
select ts,key,content from {table_name} where key = $1 and ts = $2 order by ts desc limit 1
"#
),
vec![Value::from(last_key.to_string()), Value::from(last_ts)],
)
.await?;
if let Some(ref_record) = ref_record {
let ref_record_content: JsonValue = ref_record.try_get("", "content")?;
if let Some(ref_old_value) = ref_record_content.get(ref_field) {
ref_origin_value = ref_old_value.clone();
}
}
}
}
// 对比一下这个字段现在的值和ref字段的原始值
if insert_field_value.to_string() == ref_origin_value.to_string() {
// 如果一样,那么把这次ref字段改成ref_key
*insert_field_value = last_ref_key;
}
}
}

if let (Some(insert_content), Some(add_req_content)) = (insert_content.as_object_mut(), add_req.content.as_object()) {
for (k, v) in add_req_content {
insert_content.insert(k.to_string(), v.clone());
}
}
}
}

add_req.content = insert_content;
let mut params = vec![
Value::from(id.clone()),
Value::from(add_req.kind.as_ref().unwrap_or(&"".into()).to_string()),
Value::from(add_req.key.as_ref().unwrap_or(&"".into()).to_string()),
Value::from(add_req.tag.clone()),
Value::from(add_req.op.as_ref().unwrap_or(&"".to_string()).as_str()),
Value::from(insert_content),
Value::from(add_req.content.clone()),
Value::from(add_req.owner.as_ref().unwrap_or(&"".to_string()).as_str()),
Value::from(add_req.owner_name.as_ref().unwrap_or(&"".to_string()).as_str()),
Value::from(add_req.own_paths.as_ref().unwrap_or(&"".to_string()).as_str()),
Expand Down Expand Up @@ -331,6 +362,12 @@ pub async fn findv2(find_req: &mut LogItemFindReq, funs: &TardisFunsInst, ctx: &
for val in value {
sql_vals.push(val);
}
} else if ext_or_item.op == BasicQueryOpKind::IsNull {
where_fragments.push(format!("ext ->> '{}' is null", ext_or_item.field));
} else if ext_or_item.op == BasicQueryOpKind::IsNotNull {
where_fragments.push(format!("(ext ->> '{}' is not null or ext ->> '{}' != '')", ext_or_item.field, ext_or_item.field));
} else if ext_or_item.op == BasicQueryOpKind::IsNullOrEmpty {
where_fragments.push(format!("(ext ->> '{}' is null or ext ->> '{}' = '')", ext_or_item.field, ext_or_item.field));
} else {
if value.len() > 1 {
return err_notfound(ext_or_item);
Expand Down Expand Up @@ -741,6 +778,19 @@ async fn push_to_eda(req: &LogItemAddV2Req, ref_fields: &Vec<String>, funs: &Tar
Ok(())
}

fn merge(a: &mut serde_json::Value, b: serde_json::Value) {
match (a, b) {
(a @ &mut serde_json::Value::Object(_), serde_json::Value::Object(b)) => {
if let Some(a) = a.as_object_mut() {
for (k, v) in b {
merge(a.entry(k).or_insert(serde_json::Value::Null), v);
}
}
}
(a, b) => *a = b,
}
}

#[cfg(test)]
mod test {
use tardis::{chrono::Utc, serde_json::Value};
Expand Down
1 change: 1 addition & 0 deletions backend/spi/spi-search/src/serv/pg/search_pg_item_serv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -941,6 +941,7 @@ fn package_ext(
}
Ok(())
}

fn merge(a: &mut serde_json::Value, b: serde_json::Value) {
match (a, b) {
(a @ &mut serde_json::Value::Object(_), serde_json::Value::Object(b)) => {
Expand Down
4 changes: 2 additions & 2 deletions backend/spi/spi-stats/src/serv/pg/stats_pg_conf_fact_serv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ VALUES
AddOrModifySyncTaskReq {
code: format!("{}_{}", SYNC_FACT_TASK_CODE, add_req.key),
cron: add_req.sync_cron.clone().unwrap_or("".to_string()),
callback_url: format!("{}/ci/{}/sync", funs.conf::<StatsConfig>().base_url, add_req.key),
callback_url: format!("{}/ci/fact/{}/sync", funs.conf::<StatsConfig>().base_url, add_req.key),
callback_method: "PUT".to_string(),
callback_body: None,
enable: add_req.is_sync.unwrap_or_default(),
Expand Down Expand Up @@ -165,7 +165,7 @@ WHERE key = $1
code: format!("{}_{}", SYNC_FACT_TASK_CODE, fact_conf_key),
enable: modify_req.is_sync.unwrap_or_default(),
cron: modify_req.sync_cron.clone().unwrap_or("".to_string()),
callback_url: format!("{}/ci/{}/sync", funs.conf::<StatsConfig>().base_url, fact_conf_key),
callback_url: format!("{}/ci/fact/{}/sync", funs.conf::<StatsConfig>().base_url, fact_conf_key),
callback_method: "PUT".to_string(),
callback_body: None,
callback_headers,
Expand Down
6 changes: 3 additions & 3 deletions backend/spi/spi-stats/src/serv/pg/stats_pg_record_serv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,7 @@ async fn fact_records_modify(
));
};
// TODO check value enum when stable_ds = true
sql_sets.push(format!("{} = ${}", req_fact_col_key, params.len() + 1));
sql_sets.push(format!("{} = ${}", fact_col_conf.key, params.len() + 1));
if fact_col_conf.dim_multi_values.unwrap_or(false) {
params.push(dim_conf.data_type.json_to_sea_orm_value_array(&req_fact_col_value, false)?);
} else {
Expand All @@ -547,7 +547,7 @@ async fn fact_records_modify(
"400-spi-stats-invalid-request",
));
};
sql_sets.push(format!("{} = ${}", req_fact_col_key, params.len() + 1));
sql_sets.push(format!("{} = ${}", fact_col_conf.key, params.len() + 1));
params.push(mes_data_type.json_to_sea_orm_value(&req_fact_col_value, false)?);
} else {
let Some(req_fact_col_value) = req_fact_col_value.as_str() else {
Expand All @@ -558,7 +558,7 @@ async fn fact_records_modify(
"400-spi-stats-invalid-request",
));
};
sql_sets.push(format!("{} = ${}", req_fact_col_key, params.len() + 1));
sql_sets.push(format!("{} = ${}", fact_col_conf.key, params.len() + 1));
params.push(req_fact_col_value.into());
}
}
Expand Down
45 changes: 39 additions & 6 deletions backend/spi/spi-stats/src/serv/pg/stats_pg_sync_serv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use bios_basic::{
spi_initializer::common_pg::{self},
},
};
use serde_json::Map;
use serde_json::{error, json, Map};
use tardis::{
basic::{dto::TardisContext, error::TardisError, field::TrimString, result::TardisResult},
chrono::{DateTime, Utc},
Expand All @@ -26,7 +26,7 @@ use tardis::{
reldb_client::{TardisRelDBClient, TardisRelDBlConnection},
sea_orm::{FromQueryResult, Value},
},
log,
log::{self, error_span},
regex::{self, Regex},
TardisFunsInst,
};
Expand Down Expand Up @@ -204,11 +204,15 @@ pub(crate) async fn fact_record_sync(fact_conf_key: &str, funs: &TardisFunsInst,
let fact_conf_key = fact_conf_key.to_string();
TaskProcessor::execute_task_with_ctx(
&funs.conf::<StatsConfig>().cache_key_async_task_status,
move |_task_id| async move {
move |task_id| async move {
let funs = stats_initializer::get_tardis_inst();
let inst = funs.init(None, &task_ctx, true, stats_initializer::init_fun).await?;
let db_source_conn = get_db_conn_by_cert_id(&cert_id, &funs, &task_ctx, inst.as_ref()).await?;
let db_source_list = db_source_conn.query_all(&sync_sql, vec![]).await?;
let mut success = 0;
let mut error = 0;
let mut error_list = vec![];
let total = db_source_list.len();
for db_source_record in db_source_list {
let fact_record_key = db_source_record.try_get::<String>("", "key")?;
let add_req = StatsFactRecordLoadReq {
Expand All @@ -219,7 +223,20 @@ pub(crate) async fn fact_record_sync(fact_conf_key: &str, funs: &TardisFunsInst,
data: serde_json::Value::from_query_result(&db_source_record, "")?,
ext: None,
};
stats_pg_record_serv::fact_record_load(&fact_conf_key, &fact_record_key, add_req, &funs, &task_ctx, inst.as_ref()).await?;
let load_resp = stats_pg_record_serv::fact_record_load(&fact_conf_key, &fact_record_key, add_req, &funs, &task_ctx, inst.as_ref()).await;
if load_resp.is_ok() {
success += 1;
} else {
error += 1;
error_list.push(json!({"key":fact_record_key,"error":load_resp.unwrap_err().to_string()}));
}
let _ = TaskProcessor::set_process_data(
&funs.conf::<StatsConfig>().cache_key_async_task_status,
task_id,
json!({"success":success,"error":error,"total":total,"error_list":error_list}),
&funs.cache(),
)
.await;
}
Ok(())
},
Expand All @@ -244,11 +261,14 @@ pub(crate) async fn fact_col_record_sync(fact_conf_key: &str, fact_col_conf_key:
let fact_col_conf_key = fact_col_conf_key.to_string();
TaskProcessor::execute_task_with_ctx(
&funs.conf::<StatsConfig>().cache_key_async_task_status,
move |_task_id| async move {
move |task_id| async move {
let funs = stats_initializer::get_tardis_inst();
let inst = funs.init(None, &task_ctx, true, stats_initializer::init_fun).await?;
let mut page_number = 1;
let page_size = 10;
let mut success = 0;
let mut error = 0;
let mut error_list = vec![];
loop {
let fact_record_pages =
stats_pg_record_serv::get_fact_record_paginated(&fact_conf_key, None, page_number, page_size, Some(true), &funs, &task_ctx, inst.as_ref()).await?;
Expand All @@ -272,7 +292,20 @@ pub(crate) async fn fact_col_record_sync(fact_conf_key: &str, fact_col_conf_key:
},
ext: None,
};
stats_pg_record_serv::fact_record_load(&fact_conf_key, fact_record_key, add_req, &funs, &task_ctx, inst.as_ref()).await?;
let load_resp = stats_pg_record_serv::fact_record_load(&fact_conf_key, fact_record_key, add_req, &funs, &task_ctx, inst.as_ref()).await;
if load_resp.is_ok() {
success += 1;
} else {
error += 1;
error_list.push(json!({"key":fact_record_key,"error":load_resp.unwrap_err().to_string()}));
}
let _ = TaskProcessor::set_process_data(
&funs.conf::<StatsConfig>().cache_key_async_task_status,
task_id,
json!({"success":success,"error":error,"total":fact_record_pages.total_size,"error_list":error_list}),
&funs.cache(),
)
.await;
}
}
}
Expand Down
Loading

0 comments on commit f43b90f

Please sign in to comment.