Skip to content

Commit

Permalink
feat: improve wallet sql queries (#6232)
Browse files Browse the repository at this point in the history
Description
---
- Improved wallet sql queries in:
  - `fn update_last_validation_timestamps`
- `async fn select_utxos` + `pub fn fetch_unspent_outputs_for_spending`
- Added the `balance_enquiry_cooldown_period` config option back in that
was removed by a previous PR to minimize balance query impacts for busy
console wallets.

Motivation and Context
---
The console wallet could not efficiently submit transactions if it had
many unspent outputs (> 80,000) in its database.

How Has This Been Tested?
---
System-level stress testing. Previously, each of these selections,
`fetch_unspent_outputs_for_spending` up to `final_selection`, would take
multiple seconds.
```rust
2024-03-21 06:44:12.344337500 [wallet::output_manager_service] TRACE select_utxos profile - fetch_unspent_outputs_for_spending: 4000 outputs, 577 ms (at 577)
2024-03-21 06:44:12.346397400 [wallet::output_manager_service] TRACE select_utxos profile - final_selection: 1 outputs from 4000, 2 ms (at 579)
2024-03-21 06:44:13.547512200 [wallet::output_manager_service] TRACE select_utxos profile - fetch_unspent_outputs_for_spending: 4000 outputs, 557 ms (at 557)
2024-03-21 06:44:13.549151900 [wallet::output_manager_service] TRACE select_utxos profile - final_selection: 1 outputs from 4000, 1 ms (at 559)
2024-03-21 06:44:15.137607600 [wallet::output_manager_service] TRACE select_utxos profile - fetch_unspent_outputs_for_spending: 4000 outputs, 552 ms (at 552)
2024-03-21 06:44:15.139724100 [wallet::output_manager_service] TRACE select_utxos profile - final_selection: 1 outputs from 4000, 2 ms (at 554)
2024-03-21 06:44:16.432081200 [wallet::output_manager_service] TRACE select_utxos profile - fetch_unspent_outputs_for_spending: 4000 outputs, 593 ms (at 593)
2024-03-21 06:44:16.433796800 [wallet::output_manager_service] TRACE select_utxos profile - final_selection: 1 outputs from 4000, 1 ms (at 594)
2024-03-21 06:44:17.691752400 [wallet::output_manager_service] TRACE select_utxos profile - fetch_unspent_outputs_for_spending: 4000 outputs, 583 ms (at 583)
2024-03-21 06:44:17.693400000 [wallet::output_manager_service] TRACE select_utxos profile - final_selection: 1 outputs from 4000, 1 ms (at 584)
```

What process can a PR reviewer use to test or verify this change?
---
Code review.
System-level stress test (optional).

<!-- Checklist -->
<!-- 1. Is the title of your PR in the form that would make nice release
notes? The title, excluding the conventional commit
tag, will be included exactly as is in the CHANGELOG, so please think
about it carefully. -->


Breaking Changes
---

- [x] None
- [ ] Requires data directory on base node to be deleted
- [ ] Requires hard fork
- [ ] Other - Please specify

<!-- Does this include a breaking change? If so, include this line as a
footer -->
<!-- BREAKING CHANGE: Description what the user should do, e.g. delete a
database, resync the chain -->
  • Loading branch information
hansieodendaal authored Mar 27, 2024
1 parent c451d20 commit 0290204
Show file tree
Hide file tree
Showing 11 changed files with 103 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ impl AppState {
wallet_connectivity,
balance_enquiry_debouncer: BalanceEnquiryDebouncer::new(
inner,
Duration::from_secs(5),
wallet_config.balance_enquiry_cooldown_period,
output_manager_service,
),
wallet_config,
Expand Down
6 changes: 6 additions & 0 deletions base_layer/wallet/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,11 @@ pub struct WalletConfig {
pub identity_file: Option<PathBuf>,
/// The type of wallet software, or specific type of hardware
pub wallet_type: Option<WalletType>,
/// The cool down period between balance enquiry checks in seconds; requests faster than this will be ignored.
/// For specialized wallets processing many batch transactions this setting could be increased to 60 s to retain
/// responsiveness of the wallet with slightly delayed balance updates
#[serde(with = "serializers::seconds")]
pub balance_enquiry_cooldown_period: Duration,
}

impl Default for WalletConfig {
Expand Down Expand Up @@ -159,6 +164,7 @@ impl Default for WalletConfig {
use_libtor: true,
identity_file: None,
wallet_type: None,
balance_enquiry_cooldown_period: Duration::from_secs(5),
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions base_layer/wallet/src/output_manager_service/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,8 @@ pub enum OutputManagerError {
ValidationInProgress,
#[error("Invalid data: `{0}`")]
RangeProofError(String),
#[error("Transaction is over sized: `{0}`")]
TooManyInputsToFulfillTransaction(String),
}

impl From<RangeProofError> for OutputManagerError {
Expand Down
3 changes: 3 additions & 0 deletions base_layer/wallet/src/output_manager_service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ use crate::{
util::wallet_identity::WalletIdentity,
};

/// The maximum number of transaction inputs that can be created in a single transaction, slightly less than the maximum
/// that a single comms message can hold.
pub const TRANSACTION_INPUTS_LIMIT: u32 = 4000;
const LOG_TARGET: &str = "wallet::output_manager_service::initializer";

pub struct OutputManagerServiceInitializer<T, TKeyManagerInterface>
Expand Down
30 changes: 28 additions & 2 deletions base_layer/wallet/src/output_manager_service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ use tari_script::{inputs, script, ExecutionStack, Opcode, TariScript};
use tari_service_framework::reply_channel;
use tari_shutdown::ShutdownSignal;
use tari_utilities::{hex::Hex, ByteArray};
use tokio::sync::Mutex;
use tokio::{sync::Mutex, time::Instant};

use crate::{
base_node_service::handle::{BaseNodeEvent, BaseNodeServiceHandle},
Expand All @@ -88,6 +88,7 @@ use crate::{
OutputStatus,
},
tasks::TxoValidationTask,
TRANSACTION_INPUTS_LIMIT,
},
util::wallet_identity::WalletIdentity,
};
Expand Down Expand Up @@ -1256,6 +1257,7 @@ where
num_outputs: usize,
total_output_features_and_scripts_byte_size: usize,
) -> Result<UtxoSelection, OutputManagerError> {
let start = Instant::now();
debug!(
target: LOG_TARGET,
"select_utxos amount: {}, fee_per_gram: {}, num_outputs: {}, output_features_and_scripts_byte_size: {}, \
Expand Down Expand Up @@ -1283,10 +1285,20 @@ where
"select_utxos selection criteria: {}", selection_criteria
);
let tip_height = chain_metadata.as_ref().map(|m| m.best_block_height());
let start_new = Instant::now();
let uo = self
.resources
.db
.fetch_unspent_outputs_for_spending(&selection_criteria, amount, tip_height)?;
let uo_len = uo.len();
trace!(
target: LOG_TARGET,
"select_utxos profile - fetch_unspent_outputs_for_spending: {} outputs, {} ms (at {})",
uo_len,
start_new.elapsed().as_millis(),
start.elapsed().as_millis(),
);
let start_new = Instant::now();

// For non-standard queries, we want to ensure that the intended UTXOs are selected
if !selection_criteria.filter.is_standard() && uo.is_empty() {
Expand All @@ -1309,7 +1321,7 @@ where
.map_err(|e| OutputManagerError::ConversionError(e.to_string()))?,
);

trace!(target: LOG_TARGET, "We found {} UTXOs to select from", uo.len());
trace!(target: LOG_TARGET, "We found {} UTXOs to select from", uo_len);

let mut requires_change_output = false;
let mut utxos_total_value = MicroMinotari::from(0);
Expand Down Expand Up @@ -1348,8 +1360,22 @@ where

let perfect_utxo_selection = utxos_total_value == amount + fee_without_change;
let enough_spendable = utxos_total_value > amount + fee_with_change;
trace!(
target: LOG_TARGET,
"select_utxos profile - final_selection: {} outputs from {}, {} ms (at {})",
utxos.len(),
uo_len,
start_new.elapsed().as_millis(),
start.elapsed().as_millis(),
);

if !perfect_utxo_selection && !enough_spendable {
if uo_len == TRANSACTION_INPUTS_LIMIT as usize {
return Err(OutputManagerError::TooManyInputsToFulfillTransaction(format!(
"Input limit '{}' reached",
TRANSACTION_INPUTS_LIMIT
)));
}
let current_tip_for_time_lock_calculation = chain_metadata.map(|cm| cm.best_block_height());
let balance = self.get_balance(current_tip_for_time_lock_calculation)?;
let pending_incoming = balance.pending_incoming_balance;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ pub trait OutputManagerBackend: Send + Sync + Clone {
/// Perform a batch update of the outputs' unmined and invalid state
fn set_outputs_to_unmined_and_invalid(&self, hashes: Vec<FixedHash>) -> Result<(), OutputManagerStorageError>;
/// Perform a batch update of the outputs' last validation timestamp
fn update_last_validation_timestamps(&self, hashes: Vec<FixedHash>) -> Result<(), OutputManagerStorageError>;
fn update_last_validation_timestamps(&self, commitments: Vec<Commitment>) -> Result<(), OutputManagerStorageError>;
fn set_outputs_to_be_revalidated(&self) -> Result<(), OutputManagerStorageError>;
/// Perform a batch update of the outputs' spent status
fn mark_outputs_as_spent(&self, updates: Vec<SpentOutputInfoForBatch>) -> Result<(), OutputManagerStorageError>;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -402,9 +402,12 @@ where T: OutputManagerBackend + 'static
Ok(())
}

pub fn update_last_validation_timestamps(&self, hashes: Vec<FixedHash>) -> Result<(), OutputManagerStorageError> {
pub fn update_last_validation_timestamps(
&self,
commitments: Vec<Commitment>,
) -> Result<(), OutputManagerStorageError> {
let db = self.db.clone();
db.update_last_validation_timestamps(hashes)?;
db.update_last_validation_timestamps(commitments)?;
Ok(())
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -425,12 +425,6 @@ impl OutputManagerBackend for OutputManagerSqliteDatabase {
let mut conn = self.database_connection.get_pooled_connection()?;
let acquire_lock = start.elapsed();

debug!(
target: LOG_TARGET,
"`set_received_outputs_mined_height_and_statuses` for {} outputs",
updates.len()
);

let commitments: Vec<Commitment> = updates.iter().map(|update| update.commitment.clone()).collect();
if !OutputSql::verify_outputs_exist(&commitments, &mut conn)? {
return Err(OutputManagerStorageError::ValuesNotFound);
Expand Down Expand Up @@ -569,30 +563,58 @@ impl OutputManagerBackend for OutputManagerSqliteDatabase {
Ok(())
}

fn update_last_validation_timestamps(&self, hashes: Vec<FixedHash>) -> Result<(), OutputManagerStorageError> {
fn update_last_validation_timestamps(&self, commitments: Vec<Commitment>) -> Result<(), OutputManagerStorageError> {
let start = Instant::now();
let mut conn = self.database_connection.get_pooled_connection()?;
let acquire_lock = start.elapsed();

// Using a raw query here, as the obvious diesel query is not as performant as expected:
// `diesel::update(outputs::table.filter(outputs::hash.eq_any(hashes.iter().map(|hash| hash.to_vec()))))
// ` .set(outputs::last_validation_timestamp.eq(Some(Utc::now().naive_utc())))
// ` .execute(&mut conn)
// ` .num_rows_affected_or_not_found(hashes.len())?;
let sql_query = format!(
r#"
UPDATE outputs
SET last_validation_timestamp = '{}'
WHERE hash IN ({})
"#,
Utc::now().naive_utc(),
hashes
if !OutputSql::verify_outputs_exist(&commitments, &mut conn)? {
return Err(OutputManagerStorageError::ValuesNotFound);
}

let last_validation_timestamp = Utc::now().naive_utc();

// Three queries were evaluated to determine the most efficient way to update the last validation timestamp
// during system-level stress testing:
// - Using `diesel`:
// - `diesel::update(outputs::table.filter(outputs::hash.eq_any(hashes)).set(...).execute(&mut conn)`
// - Note: `diesel` does not support batch updates, so we have to do it manually.
// - Using a raw query that mimicked the `diesel` query:
// - `UPDATE outputs SET last_validation_timestamp = '{}' WHERE hash IN ({})`
// - 20% faster than `diesel` on average
// - Using a raw query with a batch insert (as implemented below):
// - `INSERT INTO outputs (..) VALUES (...) ON CONFLICT (commitment) DO UPDATE SET ...`
// - 1011% faster than `diesel` on average

let mut query = String::from(
"INSERT INTO outputs ( commitment, last_validation_timestamp, mined_height, mined_in_block, status, \
mined_timestamp, spending_key, value, output_type, maturity, hash, script, input_data, \
script_private_key, sender_offset_public_key, metadata_signature_ephemeral_commitment, \
metadata_signature_ephemeral_pubkey, metadata_signature_u_a, metadata_signature_u_x, \
metadata_signature_u_y, spending_priority, covenant, encrypted_data, minimum_value_promise
)
VALUES ",
);

query.push_str(
&commitments
.iter()
.map(|hash| format!("'{}'", hash))
.collect::<Vec<_>>()
.join(",")
.map(|commitment| {
format!(
"(x'{}', '{}', 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)",
commitment.to_hex(),
last_validation_timestamp,
)
})
.collect::<Vec<String>>()
.join(", "),
);

query.push_str(
" ON CONFLICT (commitment) DO UPDATE SET last_validation_timestamp = excluded.last_validation_timestamp",
);
conn.batch_execute(&sql_query)?;

conn.batch_execute(&query)?;

if start.elapsed().as_millis() > 0 {
trace!(
Expand All @@ -601,7 +623,7 @@ impl OutputManagerBackend for OutputManagerSqliteDatabase {
acquire_lock.as_millis(),
(start.elapsed() - acquire_lock).as_millis(),
start.elapsed().as_millis(),
hashes.len()
commitments.len(),
);
}

Expand All @@ -614,12 +636,6 @@ impl OutputManagerBackend for OutputManagerSqliteDatabase {
let mut conn = self.database_connection.get_pooled_connection()?;
let acquire_lock = start.elapsed();

debug!(
target: LOG_TARGET,
"`mark_outputs_as_spent` for {} outputs",
updates.len()
);

let commitments: Vec<Commitment> = updates.iter().map(|update| update.commitment.clone()).collect();
if !OutputSql::verify_outputs_exist(&commitments, &mut conn)? {
return Err(OutputManagerStorageError::ValuesNotFound);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ use crate::{
},
UtxoSelectionFilter,
UtxoSelectionOrdering,
TRANSACTION_INPUTS_LIMIT,
},
schema::outputs,
};
Expand Down Expand Up @@ -264,7 +265,7 @@ impl OutputSql {
},
};

Ok(query.load(conn)?)
Ok(query.limit(i64::from(TRANSACTION_INPUTS_LIMIT)).load(conn)?)
}

/// Return all unspent outputs that have a maturity above the provided chain tip
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,10 +186,10 @@ where
.for_protocol(self.operation_id)?;
}

let unmined_hashes: Vec<_> = unmined.iter().map(|o| o.hash).collect();
if !unmined_hashes.is_empty() {
let unmined_info: Vec<_> = unmined.iter().map(|o| o.commitment.clone()).collect();
if !unmined_info.is_empty() {
self.db
.update_last_validation_timestamps(unmined_hashes)
.update_last_validation_timestamps(unmined_info)
.for_protocol(self.operation_id)?;
}
}
Expand Down
5 changes: 5 additions & 0 deletions common/config/presets/d_console_wallet.toml
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,11 @@
# An example script is available here: applications/minotari_console_wallet/src/notifier/notify_example.sh
#notify_file = "/path/to/script"

# The cool down period between balance enquiry checks in seconds; requests faster than this will be ignored.
# For specialized wallets processing many batch transactions this setting could be increased to 60 s to retain
# responsiveness of the wallet with slightly delayed balance updates (default = 5):
#balance_enquiry_cooldown_period = 5

[wallet.transactions]
# This is the timeout period that will be used for base node broadcast monitoring tasks (default = 30)
broadcast_monitoring_timeout = 180
Expand Down

0 comments on commit 0290204

Please sign in to comment.