Skip to content

Commit

Permalink
* Some minor cleanup of pgclient.rs and such.
Browse files Browse the repository at this point in the history
  • Loading branch information
Venryx committed Nov 22, 2023
1 parent 68cbcaf commit 18cd7cc
Show file tree
Hide file tree
Showing 6 changed files with 19 additions and 12 deletions.
3 changes: 2 additions & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,9 @@
// have RA's cargo-check use a separate cache-directory (otherwise, eg. running cargo-check in terminal with different flags will mess up RA's cargo-check cache, requiring a full re-check for RA)
"rust-analyzer.checkOnSave.extraArgs": [
"--target-dir", "./Temp/rust-analyzer-check"
]
],
// maybe use these in the future, in place of the "-Awarnings" flag in config.toml and Dockerfile
/*"rust-analyzer.diagnostics.warningsAsHint": [],
"rust-analyzer.diagnostics.warningsAsInfo": [],*/
"rust-analyzer.diagnostics.disabled": ["unused_braces"],
}
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ members = [

[patch.crates-io]
# force deadpool-postgres to use the same version of tokio-postgres as our root project
# sync:version with root Cargo.toml
# sync:version with "rust-shared/Cargo.toml"
#tokio-postgres = "0.7.1"
#tokio-postgres = {git = "https://github.com/petrosagg/rust-postgres.git", branch = "copy-both", features = ["with-serde_json-1", "array-impls"]}
#tokio-postgres = {git = "https://github.com/Venryx/rust-postgres.git", branch = "copy-both", features = ["with-serde_json-1", "array-impls"]}
Expand Down
21 changes: 13 additions & 8 deletions Packages/app-server/src/links/pgclient.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,12 @@ pub async fn start_streaming_changes(
let event = match event_res {
Ok(event) => event,
Err(err) => {
warn!("Duplex-stream from pgclient returned an error; resuming listen loop. @error:{:?}", err);
// if error is of type that signifies that the the connection has closed, just break the loop
if err.is_closed() { break; }
if err.is_closed() {
warn!("Duplex-stream from pgclient returned a connecting-closing error; breaking listen loop. @error:{:?}", err);
break;
}
warn!("Duplex-stream from pgclient returned a non-connecting-closing error; resuming listen loop. @error:{:?}", err);
continue;
},
};
Expand All @@ -170,18 +173,18 @@ pub async fn start_streaming_changes(
XLogData(body) => {
wal_pos_last_processed = max(wal_pos_last_processed, body.wal_end());
let core_data = body.into_data();
info!("Got XLogData/data-change event. @wal_pos_last_processed:{}", wal_pos_last_processed);
debug!("Got XLogData/data-change event. @wal_pos_last_processed:{}", wal_pos_last_processed);
match core_data {
Relation(body2) => {
info!("Got relation event:{:?}", body2);
debug!("Got relation event:{:?}", body2);
table_infos.insert(body2.rel_id(), TableInfo {
name: body2.name().unwrap().to_owned(),
columns: body2.columns().into_iter().map(|c| ColumnInfo::from_column(c)).collect_vec(),
});
},
// todo: maybe rework my code to just use the existing LogicalReplicationMessage struct, rather than the LDChange struct (which was the data-structure sent by wal2json, but arguably not relevant anymore)
Insert(body2) => {
info!("Got insert event:{:?}", body2);
debug!("Got insert event:{:?}", body2);
let table_info = table_infos.get(&body2.rel_id()).unwrap();
let new_data = wal_data_tuple_to_row_data(body2.tuple(), table_info, 100).unwrap();
let change = LDChange {
Expand All @@ -198,7 +201,7 @@ pub async fn start_streaming_changes(
storage_wrapper.notify_of_ld_change(change).await;
},
Update(body2) => {
info!("Got update event:{:?}", body2);
debug!("Got update event:{:?}", body2);
let table_info = table_infos.get(&body2.rel_id()).unwrap();
let new_data = wal_data_tuple_to_row_data(body2.new_tuple(), table_info, 100).unwrap();
let change = LDChange {
Expand All @@ -215,7 +218,7 @@ pub async fn start_streaming_changes(
storage_wrapper.notify_of_ld_change(change).await;
},
Delete(body2) => {
info!("Got delete event:{:?}", body2);
debug!("Got delete event:{:?}", body2);
let table_info = table_infos.get(&body2.rel_id()).unwrap();
let key_tuple = body2.key_tuple().ok_or(anyhow!("Delete event didn't have key-tuple!"))?;
let old_data_partial = wal_data_tuple_to_row_data(key_tuple, table_info, 100).unwrap();
Expand All @@ -239,7 +242,7 @@ pub async fn start_streaming_changes(
},
// ignore all other message-types
enum_type => {
info!("Got other event: {:?}", enum_type);
debug!("Got other event: {:?}", enum_type);
},
}
},
Expand All @@ -257,6 +260,8 @@ pub async fn start_streaming_changes(
stream.as_mut().standby_status_update(lsn, lsn, lsn, ts, request_server_response).await?;
}
},
// todo: maybe delay ingesting insert/update/delete events until after we get the corresponding "commit" message (unsure if this is necessary)
//Commit(commit) => {},
_ => debug!("Got unknown replication event:{:?}", event),
}
}
Expand Down
1 change: 1 addition & 0 deletions Packages/app-server/src/links/pgclient_/wal_structs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ fn pg_array_type_to_basic_type(array_type: &Type) -> Option<Type> {
Type::JSONB_ARRAY => Some(Type::JSONB),
Type::FLOAT4_ARRAY => Some(Type::FLOAT4),
Type::FLOAT8_ARRAY => Some(Type::FLOAT8),
//Type::TS_VECTOR_ARRAY => Some(Type::TS_VECTOR), // not needed atm, since tsvector columns are currently ignored by data_tuple_entry_to_json_value
_ => None,
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ impl LQGroup {
columntypes: Some(new_data.keys().map(|_| "unknown".to_owned()).collect()),
oldkeys: None,
schema: "".to_owned(),
needs_wal2json_jsonval_fixes: Some(true), // todo: probably not needed as true, so probably change to false
needs_wal2json_jsonval_fixes: Some(false), // don't apply fixes, since fixes already applied (if needed) during initial ingestion ("columntypes" being "unknown" precludes the fixes from running anyway)
};

lqi.on_table_changed(&new_data_as_change, None).await;
Expand Down
2 changes: 1 addition & 1 deletion Packages/app-server/src/utils/db/pg_stream_parsing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ fn clone_ldchange_val_0with_type_fixes(value: &JSONValue, typ: &str) -> JSONValu
}*/
match typ {
"jsonb" => {
// the LDChange vals of type jsonb are initially stored as strings
// the wal2json-sourced LDChange vals of type jsonb are initially stored as strings
// convert that to a serde_json::Value::Object, so serde_json::from_value(...) can auto-deserialize it to a nested struct
match value.as_str() {
Some(val_as_str) => {
Expand Down

0 comments on commit 18cd7cc

Please sign in to comment.