Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(scanner): Restart scanning where left #8080

Merged
merged 24 commits into from
Dec 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
30a034a
start scanner where it was left
oxarbitrage Dec 8, 2023
b5bd607
fix tests
oxarbitrage Dec 8, 2023
db38a27
add a `scan_start_where_left` test
oxarbitrage Dec 9, 2023
39a0bfe
refactor a log msg
oxarbitrage Dec 9, 2023
369ae38
fix some comments
oxarbitrage Dec 9, 2023
53dcb31
remove function
oxarbitrage Dec 11, 2023
68bd53f
fix doc comment
oxarbitrage Dec 11, 2023
ae5bc94
Merge remote-tracking branch 'origin/main' into issue8022
oxarbitrage Dec 12, 2023
bf27cfd
clippy
oxarbitrage Dec 12, 2023
b73a28a
fix `sapling_keys_and_last_scanned_heights()`
oxarbitrage Dec 12, 2023
d7c3440
simplify start height
oxarbitrage Dec 12, 2023
024a1dd
i went too far, revert some changes back
oxarbitrage Dec 12, 2023
fea879c
change log info to every 10k blocks
oxarbitrage Dec 12, 2023
f7fc876
Merge remote-tracking branch 'origin/main' into issue8022
oxarbitrage Dec 12, 2023
6b04fb0
fix build
oxarbitrage Dec 12, 2023
dc7f7d2
Merge branch 'main' into issue8022
teor2345 Dec 13, 2023
9d3b21d
Update height snapshot code and check last height is consistent
teor2345 Dec 13, 2023
24ea29b
Add strictly before and strictly after database key gets
teor2345 Dec 13, 2023
cf44fef
Move to the previous key using strictly before ops
teor2345 Dec 13, 2023
c8ac308
Assert that keys are only inserted once
teor2345 Dec 13, 2023
42bf26d
Update the index in each loop
teor2345 Dec 13, 2023
454ca9a
Update snapshots
teor2345 Dec 13, 2023
371b558
Remove debugging code
teor2345 Dec 13, 2023
0a5c086
start scanning at min available height
oxarbitrage Dec 13, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions zebra-scan/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,9 @@ impl Config {
pub fn db_config(&self) -> &DbConfig {
&self.db_config
}

/// Returns the database-specific config as mutable.
pub fn db_config_mut(&mut self) -> &mut DbConfig {
&mut self.db_config
}
}
52 changes: 25 additions & 27 deletions zebra-scan/src/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ const INITIAL_WAIT: Duration = Duration::from_secs(15);
const CHECK_INTERVAL: Duration = Duration::from_secs(30);

/// We log an info log with progress after this many blocks.
const INFO_LOG_INTERVAL: u32 = 100_000;
const INFO_LOG_INTERVAL: u32 = 10_000;

/// Start a scan task that reads blocks from `state`, scans them with the configured keys in
/// `storage`, and then writes the results to `storage`.
Expand All @@ -64,21 +64,21 @@ pub async fn start(
storage: Storage,
) -> Result<(), Report> {
let network = storage.network();
let mut height = storage.min_sapling_birthday_height();

// Read keys from the storage on disk, which can block async execution.
let key_storage = storage.clone();
let key_birthdays = tokio::task::spawn_blocking(move || key_storage.sapling_keys())
let key_heights = tokio::task::spawn_blocking(move || key_storage.sapling_keys_last_heights())
.wait_for_panics()
.await;
let key_birthdays = Arc::new(key_birthdays);
let key_heights = Arc::new(key_heights);

let mut height = get_min_height(&key_heights).unwrap_or(storage.min_sapling_birthday_height());

// Parse and convert keys once, then use them to scan all blocks.
// There is some cryptography here, but it should be fast even with thousands of keys.
let parsed_keys: HashMap<
SaplingScanningKey,
(Vec<DiversifiableFullViewingKey>, Vec<SaplingIvk>),
> = key_birthdays
> = key_heights
.keys()
.map(|key| {
let parsed_keys = sapling_key_to_scan_block_keys(key, network)?;
Expand All @@ -96,7 +96,7 @@ pub async fn start(
state.clone(),
chain_tip_change.clone(),
storage.clone(),
key_birthdays.clone(),
key_heights.clone(),
parsed_keys.clone(),
)
.await?;
Expand Down Expand Up @@ -125,7 +125,7 @@ pub async fn scan_height_and_store_results(
mut state: State,
chain_tip_change: ChainTipChange,
storage: Storage,
key_birthdays: Arc<HashMap<SaplingScanningKey, Height>>,
key_last_scanned_heights: Arc<HashMap<SaplingScanningKey, Height>>,
parsed_keys: Arc<
HashMap<SaplingScanningKey, (Vec<DiversifiableFullViewingKey>, Vec<SaplingIvk>)>,
>,
Expand All @@ -135,19 +135,7 @@ pub async fn scan_height_and_store_results(
// Only log at info level every 100,000 blocks.
//
// TODO: also log progress every 5 minutes once we reach the tip?
let is_info_log =
height == storage.min_sapling_birthday_height() || height.0 % INFO_LOG_INTERVAL == 0;

// TODO: add debug logs?
if is_info_log {
info!(
"Scanning the blockchain: now at block {:?}, current tip {:?}",
height,
chain_tip_change
.latest_chain_tip()
.best_tip_height_and_hash(),
);
}
let is_info_log = height.0 % INFO_LOG_INTERVAL == 0;

// Get a block from the state.
// We can't use ServiceExt::oneshot() here, because it causes lifetime errors in init().
Expand All @@ -168,24 +156,29 @@ pub async fn scan_height_and_store_results(
// Scan it with all the keys.
//
// TODO: scan each key in parallel (after MVP?)
for (key_num, (sapling_key, birthday_height)) in key_birthdays.iter().enumerate() {
for (key_num, (sapling_key, last_scanned_height)) in key_last_scanned_heights.iter().enumerate()
{
// Only scan what was not scanned for each key
if height <= *last_scanned_height {
teor2345 marked this conversation as resolved.
Show resolved Hide resolved
continue;
}

// # Security
//
// We can't log `sapling_key` here because it is a private viewing key. Anyone who reads
// the logs could use the key to view those transactions.
if is_info_log {
info!(
"Scanning the blockchain for key {}, started at block {:?}",
key_num, birthday_height,
"Scanning the blockchain for key {}, started at block {:?}, now at block {:?}, current tip {:?}",
key_num, last_scanned_height.next().expect("height is not maximum").as_usize(),
height.as_usize(),
chain_tip_change.latest_chain_tip().best_tip_height().expect("we should have a tip to scan").as_usize(),
);
}

// Get the pre-parsed keys for this configured key.
let (dfvks, ivks) = parsed_keys.get(sapling_key).cloned().unwrap_or_default();

// Scan the block, which blocks async execution until the scan is complete.
//
// TODO: skip scanning before birthday height (#8022)
let sapling_key = sapling_key.clone();
let block = block.clone();
let mut storage = storage.clone();
Expand Down Expand Up @@ -403,3 +396,8 @@ fn scanned_block_to_db_result<Nf>(
})
.collect()
}

/// Get the minimal height available in a key_heights map.
fn get_min_height(map: &HashMap<String, Height>) -> Option<Height> {
map.values().cloned().min()
}
21 changes: 16 additions & 5 deletions zebra-scan/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ pub use db::{SaplingScannedResult, SaplingScanningKey};

use self::db::ScannerWriteBatch;

/// We insert an empty results entry to the database every this interval for each stored key,
/// so we can track progress.
const INSERT_CONTROL_INTERVAL: u32 = 1_000;
oxarbitrage marked this conversation as resolved.
Show resolved Hide resolved

/// Store key info and results of the scan.
///
/// `rocksdb` allows concurrent writes through a shared reference,
Expand Down Expand Up @@ -87,16 +91,14 @@ impl Storage {
self.write_batch(batch);
}

/// Returns all the keys and their birthdays.
///
/// Birthdays are adjusted to sapling activation if they are too low or missing.
/// Returns all the keys and their last scanned heights.
///
/// # Performance / Hangs
///
/// This method can block while reading database files, so it must be inside spawn_blocking()
/// in async code.
pub fn sapling_keys(&self) -> HashMap<SaplingScanningKey, Height> {
self.sapling_keys_and_birthday_heights()
pub fn sapling_keys_last_heights(&self) -> HashMap<SaplingScanningKey, Height> {
self.sapling_keys_and_last_scanned_heights()
}

/// Add the sapling results for `height` to the storage. The results can be any map of
Expand All @@ -116,6 +118,10 @@ impl Storage {
// in a single batch.
let mut batch = ScannerWriteBatch::default();

// Every `INSERT_CONTROL_INTERVAL` we add a new entry to the scanner database for each key
// so we can track progress made in the last interval even if no transaction was yet found.
let is_control_time = height.0 % INSERT_CONTROL_INTERVAL == 0 && sapling_results.is_empty();

for (index, sapling_result) in sapling_results {
let index = SaplingScannedDatabaseIndex {
sapling_key: sapling_key.clone(),
Expand All @@ -130,6 +136,11 @@ impl Storage {
batch.insert_sapling_result(self, entry);
}

// Add tracking entry for key.
if is_control_time {
batch.insert_sapling_height(self, sapling_key, height);
}

self.write_batch(batch);
}

Expand Down
12 changes: 11 additions & 1 deletion zebra-scan/src/storage/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,17 @@ impl Storage {

let new_storage = Self { db };

// TODO: report the last scanned height here?
// Report where we are for each key in the database.
let keys = new_storage.sapling_keys_last_heights();
for (key_num, (_key, height)) in keys.iter().enumerate() {
tracing::info!(
"Last scanned height for key number {} is {}, resuming at {}",
key_num,
height.as_usize(),
teor2345 marked this conversation as resolved.
Show resolved Hide resolved
height.next().expect("height is not maximum").as_usize(),
);
}

tracing::info!("loaded Zebra scanner cache");

new_storage
Expand Down
66 changes: 32 additions & 34 deletions zebra-scan/src/storage/db/sapling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,50 +97,37 @@ impl Storage {
.collect()
}

/// Returns all the keys and their birthday heights.
pub fn sapling_keys_and_birthday_heights(&self) -> HashMap<SaplingScanningKey, Height> {
// This code is a bit complex because we don't have a separate column family for keys
// and their birthday heights.
//
// TODO: make a separate column family after the MVP.

/// Returns all the keys and their last scanned heights.
pub fn sapling_keys_and_last_scanned_heights(&self) -> HashMap<SaplingScanningKey, Height> {
teor2345 marked this conversation as resolved.
Show resolved Hide resolved
let sapling_tx_ids = self.sapling_tx_ids_cf();
let mut keys = HashMap::new();

// The minimum key is invalid or a dummy key, so we will never have an entry for it.
let mut find_next_key_index = SaplingScannedDatabaseIndex::min();
let mut last_stored_record: Option<(
SaplingScannedDatabaseIndex,
Option<SaplingScannedResult>,
)> = self.db.zs_last_key_value(&sapling_tx_ids);

loop {
// Find the next key, and the first height we have for it.
let Some(entry) = self
.db
.zs_next_key_value_from(&sapling_tx_ids, &find_next_key_index)
else {
break;
let Some((mut last_stored_record_index, _result)) = last_stored_record else {
return keys;
};

let sapling_key = entry.0.sapling_key;
let mut height = entry.0.tx_loc.height;
let _first_result: Option<SaplingScannedResult> = entry.1;

let height_results = self.sapling_results_for_key_and_height(&sapling_key, height);

// If there are no results for this block, then it's a "skip up to height" marker, and
// the birthday height is the next height. If there are some results, it's the actual
// birthday height.
if height_results.values().all(Option::is_none) {
height = height
.next()
.expect("results should only be stored for validated block heights");
}
let sapling_key = last_stored_record_index.sapling_key.clone();
let height = last_stored_record_index.tx_loc.height;

keys.insert(sapling_key.clone(), height);
let prev_height = keys.insert(sapling_key.clone(), height);
assert_eq!(
prev_height, None,
"unexpected duplicate key: keys must only be inserted once\
last_stored_record_index: {last_stored_record_index:?}",
);

// Skip all the results before the next key.
find_next_key_index = SaplingScannedDatabaseIndex::max_for_key(&sapling_key);
// Skip all the results until the next key.
last_stored_record_index = SaplingScannedDatabaseIndex::min_for_key(&sapling_key);
last_stored_record = self
.db
.zs_prev_key_value_strictly_before(&sapling_tx_ids, &last_stored_record_index);
}

keys
}

/// Returns the Sapling indexes and results in the supplied range.
Expand Down Expand Up @@ -216,4 +203,15 @@ impl ScannerWriteBatch {
SaplingScannedDatabaseIndex::min_for_key_and_height(sapling_key, skip_up_to_height);
self.zs_insert(&storage.sapling_tx_ids_cf(), index, None);
}

/// Insert sapling height with no results
pub(crate) fn insert_sapling_height(
&mut self,
storage: &Storage,
sapling_key: &SaplingScanningKey,
height: Height,
) {
let index = SaplingScannedDatabaseIndex::min_for_key_and_height(sapling_key, height);
self.zs_insert(&storage.sapling_tx_ids_cf(), index, None);
}
}
18 changes: 7 additions & 11 deletions zebra-scan/src/storage/db/tests/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,30 +147,26 @@ fn snapshot_raw_rocksdb_column_family_data(db: &ScannerDb, original_cf_names: &[
/// Snapshot typed scanner result data using high-level storage methods,
/// using `cargo insta` and RON serialization.
fn snapshot_typed_result_data(storage: &Storage) {
// TODO: snapshot the latest scanned heights after PR #8080 merges
//insta::assert_ron_snapshot!("latest_heights", latest_scanned_heights);

// Make sure the typed key format doesn't accidentally change.
//
// TODO: update this after PR #8080
let sapling_keys_and_birthday_heights = storage.sapling_keys();
let sapling_keys_last_heights = storage.sapling_keys_last_heights();

// HashMap has an unstable order across Rust releases, so we need to sort it here.
insta::assert_ron_snapshot!(
"sapling_keys",
sapling_keys_and_birthday_heights,
sapling_keys_last_heights,
{
"." => insta::sorted_redaction()
}
);

// HashMap has an unstable order across Rust releases, so we need to sort it here as well.
for (key_index, (sapling_key, _birthday_height)) in sapling_keys_and_birthday_heights
.iter()
.sorted()
.enumerate()
for (key_index, (sapling_key, last_height)) in
sapling_keys_last_heights.iter().sorted().enumerate()
{
let sapling_results = storage.sapling_results(sapling_key);

assert_eq!(sapling_results.keys().max(), Some(last_height));

// Check internal database method consistency
for (height, results) in sapling_results.iter() {
let sapling_index_and_results =
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
---
source: zebra-scan/src/storage/db/tests/snapshot.rs
expression: sapling_keys_and_birthday_heights
expression: sapling_keys_last_heights
---
{
"zxviews1q0duytgcqqqqpqre26wkl45gvwwwd706xw608hucmvfalr759ejwf7qshjf5r9aa7323zulvz6plhttp5mltqcgs9t039cx2d09mgq05ts63n8u35hyv6h9nc9ctqqtue2u7cer2mqegunuulq2luhq3ywjcz35yyljewa4mgkgjzyfwh6fr6jd0dzd44ghk0nxdv2hnv4j5nxfwv24rwdmgllhe0p8568sgqt9ckt02v2kxf5ahtql6s0ltjpkckw8gtymxtxuu9gcr0swvz": Height(0),
"zxviewsfake": Height(1000000),
"zxviews1q0duytgcqqqqpqre26wkl45gvwwwd706xw608hucmvfalr759ejwf7qshjf5r9aa7323zulvz6plhttp5mltqcgs9t039cx2d09mgq05ts63n8u35hyv6h9nc9ctqqtue2u7cer2mqegunuulq2luhq3ywjcz35yyljewa4mgkgjzyfwh6fr6jd0dzd44ghk0nxdv2hnv4j5nxfwv24rwdmgllhe0p8568sgqt9ckt02v2kxf5ahtql6s0ltjpkckw8gtymxtxuu9gcr0swvz": Height(419199),
"zxviewsfake": Height(999999),
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
---
source: zebra-scan/src/storage/db/tests/snapshot.rs
expression: sapling_keys_and_birthday_heights
expression: sapling_keys_last_heights
---
{
"zxviews1q0duytgcqqqqpqre26wkl45gvwwwd706xw608hucmvfalr759ejwf7qshjf5r9aa7323zulvz6plhttp5mltqcgs9t039cx2d09mgq05ts63n8u35hyv6h9nc9ctqqtue2u7cer2mqegunuulq2luhq3ywjcz35yyljewa4mgkgjzyfwh6fr6jd0dzd44ghk0nxdv2hnv4j5nxfwv24rwdmgllhe0p8568sgqt9ckt02v2kxf5ahtql6s0ltjpkckw8gtymxtxuu9gcr0swvz": Height(0),
"zxviewsfake": Height(1000000),
"zxviews1q0duytgcqqqqpqre26wkl45gvwwwd706xw608hucmvfalr759ejwf7qshjf5r9aa7323zulvz6plhttp5mltqcgs9t039cx2d09mgq05ts63n8u35hyv6h9nc9ctqqtue2u7cer2mqegunuulq2luhq3ywjcz35yyljewa4mgkgjzyfwh6fr6jd0dzd44ghk0nxdv2hnv4j5nxfwv24rwdmgllhe0p8568sgqt9ckt02v2kxf5ahtql6s0ltjpkckw8gtymxtxuu9gcr0swvz": Height(419199),
"zxviewsfake": Height(999999),
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
---
source: zebra-scan/src/storage/db/tests/snapshot.rs
expression: sapling_keys_and_birthday_heights
expression: sapling_keys_last_heights
---
{
"zxviews1q0duytgcqqqqpqre26wkl45gvwwwd706xw608hucmvfalr759ejwf7qshjf5r9aa7323zulvz6plhttp5mltqcgs9t039cx2d09mgq05ts63n8u35hyv6h9nc9ctqqtue2u7cer2mqegunuulq2luhq3ywjcz35yyljewa4mgkgjzyfwh6fr6jd0dzd44ghk0nxdv2hnv4j5nxfwv24rwdmgllhe0p8568sgqt9ckt02v2kxf5ahtql6s0ltjpkckw8gtymxtxuu9gcr0swvz": Height(0),
"zxviewsfake": Height(1000000),
"zxviews1q0duytgcqqqqpqre26wkl45gvwwwd706xw608hucmvfalr759ejwf7qshjf5r9aa7323zulvz6plhttp5mltqcgs9t039cx2d09mgq05ts63n8u35hyv6h9nc9ctqqtue2u7cer2mqegunuulq2luhq3ywjcz35yyljewa4mgkgjzyfwh6fr6jd0dzd44ghk0nxdv2hnv4j5nxfwv24rwdmgllhe0p8568sgqt9ckt02v2kxf5ahtql6s0ltjpkckw8gtymxtxuu9gcr0swvz": Height(419199),
"zxviewsfake": Height(999999),
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
---
source: zebra-scan/src/storage/db/tests/snapshot.rs
expression: sapling_keys_and_birthday_heights
expression: sapling_keys_last_heights
---
{
"zxviews1q0duytgcqqqqpqre26wkl45gvwwwd706xw608hucmvfalr759ejwf7qshjf5r9aa7323zulvz6plhttp5mltqcgs9t039cx2d09mgq05ts63n8u35hyv6h9nc9ctqqtue2u7cer2mqegunuulq2luhq3ywjcz35yyljewa4mgkgjzyfwh6fr6jd0dzd44ghk0nxdv2hnv4j5nxfwv24rwdmgllhe0p8568sgqt9ckt02v2kxf5ahtql6s0ltjpkckw8gtymxtxuu9gcr0swvz": Height(419200),
"zxviewsfake": Height(1000000),
"zxviews1q0duytgcqqqqpqre26wkl45gvwwwd706xw608hucmvfalr759ejwf7qshjf5r9aa7323zulvz6plhttp5mltqcgs9t039cx2d09mgq05ts63n8u35hyv6h9nc9ctqqtue2u7cer2mqegunuulq2luhq3ywjcz35yyljewa4mgkgjzyfwh6fr6jd0dzd44ghk0nxdv2hnv4j5nxfwv24rwdmgllhe0p8568sgqt9ckt02v2kxf5ahtql6s0ltjpkckw8gtymxtxuu9gcr0swvz": Height(419199),
"zxviewsfake": Height(999999),
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
---
source: zebra-scan/src/storage/db/tests/snapshot.rs
expression: sapling_keys_and_birthday_heights
expression: sapling_keys_last_heights
---
{
"zxviews1q0duytgcqqqqpqre26wkl45gvwwwd706xw608hucmvfalr759ejwf7qshjf5r9aa7323zulvz6plhttp5mltqcgs9t039cx2d09mgq05ts63n8u35hyv6h9nc9ctqqtue2u7cer2mqegunuulq2luhq3ywjcz35yyljewa4mgkgjzyfwh6fr6jd0dzd44ghk0nxdv2hnv4j5nxfwv24rwdmgllhe0p8568sgqt9ckt02v2kxf5ahtql6s0ltjpkckw8gtymxtxuu9gcr0swvz": Height(0),
"zxviewsfake": Height(1000000),
"zxviews1q0duytgcqqqqpqre26wkl45gvwwwd706xw608hucmvfalr759ejwf7qshjf5r9aa7323zulvz6plhttp5mltqcgs9t039cx2d09mgq05ts63n8u35hyv6h9nc9ctqqtue2u7cer2mqegunuulq2luhq3ywjcz35yyljewa4mgkgjzyfwh6fr6jd0dzd44ghk0nxdv2hnv4j5nxfwv24rwdmgllhe0p8568sgqt9ckt02v2kxf5ahtql6s0ltjpkckw8gtymxtxuu9gcr0swvz": Height(279999),
"zxviewsfake": Height(999999),
}
Loading
Loading