Skip to content

Commit

Permalink
spi-stats:fix sync fact check cert_id and sql.
Browse files Browse the repository at this point in the history
  • Loading branch information
ljl committed Dec 11, 2024
1 parent b539aba commit 4e9349e
Showing 1 changed file with 45 additions and 45 deletions.
90 changes: 45 additions & 45 deletions backend/spi/spi-stats/src/serv/pg/stats_pg_sync_serv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,34 +191,35 @@ pub(crate) async fn fact_record_sync(fact_conf_key: &str, funs: &TardisFunsInst,
let Some(fact_conf) = stats_pg_conf_fact_serv::get(fact_conf_key, &conn, ctx).await? else {
return Err(funs.err().not_found("starsys_stats_conf_fact", "find", "fact conf not found", "404-fact-conf-not-found"));
};
let Some(cert_id) = fact_conf.rel_cert_id else {
return Err(funs.err().bad_request("starsys_stats_conf_fact", "sync", "The rel_cert_id is required", "404-fact-conf-not-found"));
};
let Some(sync_sql) = fact_conf.sync_sql else {
return Err(funs.err().bad_request("starsys_stats_conf_fact", "sync", "The sync_sql is required", "404-fact-conf-not-found"));
};
if cert_id.is_empty() || sync_sql.is_empty() {
return Err(funs.err().bad_request("starsys_stats_conf_fact", "sync", "The rel_cert_id and sync_sql is required", "404-fact-conf-not-found"));
}
let task_ctx = ctx.clone();
let fact_conf_key = fact_conf_key.to_string();
TaskProcessor::execute_task_with_ctx(
&funs.conf::<StatsConfig>().cache_key_async_task_status,
move |_task_id| async move {
let funs = stats_initializer::get_tardis_inst();
let inst = funs.init(None, &task_ctx, true, stats_initializer::init_fun).await?;
if let Some(cert_id) = fact_conf.rel_cert_id {
if let Some(sync_sql) = fact_conf.sync_sql {
let db_source_conn = get_db_conn_by_cert_id(&cert_id, &funs, &task_ctx, inst.as_ref()).await?;
let db_source_list = db_source_conn.query_all(&sync_sql, vec![]).await?;
for db_source_record in db_source_list {
let fact_record_key = db_source_record.try_get::<String>("", "key")?;
let add_req = StatsFactRecordLoadReq {
own_paths: db_source_record.try_get::<Option<String>>("", "own_paths")?.unwrap_or_default(),
ct: db_source_record.try_get::<Option<DateTime<Utc>>>("", "ct")?.unwrap_or_default(),
idempotent_id: db_source_record.try_get::<Option<String>>("", "idempotent_id")?,
ignore_updates: Some(false),
data: serde_json::Value::from_query_result(&db_source_record, "")?,
ext: None,
};
stats_pg_record_serv::fact_record_load(&fact_conf_key, &fact_record_key, add_req, &funs, &task_ctx, inst.as_ref()).await?;
}
} else {
log::warn!("[spi-stats] sync_sql not found for fact: {}", fact_conf_key);
}
} else {
log::warn!("[spi-stats] cert_id not found for fact: {}", fact_conf_key);
let db_source_conn = get_db_conn_by_cert_id(&cert_id, &funs, &task_ctx, inst.as_ref()).await?;
let db_source_list = db_source_conn.query_all(&sync_sql, vec![]).await?;
for db_source_record in db_source_list {
let fact_record_key = db_source_record.try_get::<String>("", "key")?;
let add_req = StatsFactRecordLoadReq {
own_paths: db_source_record.try_get::<Option<String>>("", "own_paths")?.unwrap_or_default(),
ct: db_source_record.try_get::<Option<DateTime<Utc>>>("", "ct")?.unwrap_or_default(),
idempotent_id: db_source_record.try_get::<Option<String>>("", "idempotent_id")?,
ignore_updates: Some(false),
data: serde_json::Value::from_query_result(&db_source_record, "")?,
ext: None,
};
stats_pg_record_serv::fact_record_load(&fact_conf_key, &fact_record_key, add_req, &funs, &task_ctx, inst.as_ref()).await?;
}
Ok(())
},
Expand Down Expand Up @@ -298,32 +299,31 @@ pub(crate) async fn fact_col_record_result(
ctx: &TardisContext,
inst: &SpiBsInst,
) -> TardisResult<Option<Value>> {
if let Some(cert_id) = fact_col.rel_cert_id {
if let Some(sql) = fact_col.rel_sql {
let data_source_conn = get_db_conn_by_cert_id(&cert_id, funs, ctx, inst).await?;
let (sql, params) = process_sql(&sql, &fact_record)?;
if let Some(rel_record) = data_source_conn.query_one(&sql, params).await? {
if let Some(first_column) = rel_record.column_names().get(0) {
let result = match fact_col.kind {
StatsFactColKind::Dimension | StatsFactColKind::Ext => {
if fact_col.dim_multi_values.unwrap_or(false) {
fact_col.dim_data_type.clone().unwrap_or(StatsDataTypeKind::String).result_to_sea_orm_value_array(&rel_record, first_column)?
} else {
fact_col.dim_data_type.clone().unwrap_or(StatsDataTypeKind::String).result_to_sea_orm_value(&rel_record, first_column)?
}
}
StatsFactColKind::Measure => fact_col.mes_data_type.clone().unwrap_or(StatsDataTypeKind::Int).result_to_sea_orm_value(&rel_record, first_column)?,
};
return Ok(Some(result));
} else {
log::warn!("[spi-stats] rel_field not found for fact_col: {}", fact_col.key);
let Some(cert_id) = fact_col.rel_cert_id else {
return Ok(None);
};
let Some(sql) = fact_col.rel_sql else {
return Ok(None);
};
if cert_id.is_empty() || sql.is_empty() {
return Ok(None);
}
let data_source_conn = get_db_conn_by_cert_id(&cert_id, funs, ctx, inst).await?;
let (sql, params) = process_sql(&sql, &fact_record)?;
if let Some(rel_record) = data_source_conn.query_one(&sql, params).await? {
if let Some(first_column) = rel_record.column_names().get(0) {
let result = match fact_col.kind {
StatsFactColKind::Dimension | StatsFactColKind::Ext => {
if fact_col.dim_multi_values.unwrap_or(false) {
fact_col.dim_data_type.clone().unwrap_or(StatsDataTypeKind::String).result_to_sea_orm_value_array(&rel_record, first_column)?
} else {
fact_col.dim_data_type.clone().unwrap_or(StatsDataTypeKind::String).result_to_sea_orm_value(&rel_record, first_column)?
}
}
}
} else {
log::warn!("[spi-stats] rel_sql not found for fact_col: {}", fact_col.key);
StatsFactColKind::Measure => fact_col.mes_data_type.clone().unwrap_or(StatsDataTypeKind::Int).result_to_sea_orm_value(&rel_record, first_column)?,
};
return Ok(Some(result));
}
} else {
log::warn!("[spi-stats] cert_id not found for fact_col: {}", fact_col.key);
}
Ok(None)
}
Expand Down

0 comments on commit 4e9349e

Please sign in to comment.