Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

change(scan): Use the on-disk database for keys and results #8036

Merged
merged 18 commits into from
Nov 30, 2023
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 17 additions & 5 deletions zebra-scan/src/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,30 @@ use color_eyre::Report;
use tokio::task::JoinHandle;
use tracing::Instrument;

use zebra_chain::parameters::Network;
use zebra_chain::{diagnostic::task::WaitForPanics, parameters::Network};

use crate::{scan, storage::Storage, Config};

/// Initialize the scanner based on its config.
pub fn init(
/// Initialize the scanner based on its config, and spawn a task for it.
///
/// TODO: add a test for this function.
pub fn spawn_init(
config: &Config,
network: Network,
state: scan::State,
) -> JoinHandle<Result<(), Report>> {
let storage = Storage::new(config, network);
let config = config.clone();
tokio::spawn(init(config, network, state).in_current_span())
}

/// Initialize the scanner based on its config.
///
/// TODO: add a test for this function.
pub async fn init(config: Config, network: Network, state: scan::State) -> Result<(), Report> {
let storage = tokio::task::spawn_blocking(move || Storage::new(&config, network))
.wait_for_panics()
.await;

// TODO: add more tasks here?
tokio::spawn(scan::start(state, storage).in_current_span())
scan::start(state, storage).await
}
2 changes: 1 addition & 1 deletion zebra-scan/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@ pub mod storage;
mod tests;

pub use config::Config;
pub use init::init;
pub use init::{init, spawn_init};
21 changes: 15 additions & 6 deletions zebra-scan/src/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ use zcash_client_backend::{
use zcash_primitives::zip32::AccountId;

use zebra_chain::{
block::Block, parameters::Network, serialization::ZcashSerialize, transaction::Transaction,
block::Block, diagnostic::task::WaitForPanics, parameters::Network,
serialization::ZcashSerialize, transaction::Transaction,
};

use crate::storage::Storage;
Expand All @@ -31,7 +32,7 @@ pub type State = Buffer<
const INITIAL_WAIT: Duration = Duration::from_secs(10);

/// The amount of time between checking and starting new scans.
const CHECK_INTERVAL: Duration = Duration::from_secs(10);
const CHECK_INTERVAL: Duration = Duration::from_secs(30);

/// Start the scan task given state and storage.
///
Expand All @@ -57,13 +58,16 @@ pub async fn start(mut state: State, storage: Storage) -> Result<(), Report> {
_ => unreachable!("unmatched response to a state::Tip request"),
};

// Read keys from the storage
let available_keys = storage.get_sapling_keys();
// Read keys from the storage on disk, which can block.
let key_storage = storage.clone();
let available_keys = tokio::task::spawn_blocking(move || key_storage.sapling_keys())
.wait_for_panics()
.await;

for key in available_keys {
info!(
"Scanning the blockchain for key {} from block 1 to {:?}",
key.0, tip,
"Scanning the blockchain for key {} from block {:?} to {:?}",
key.0, key.1, tip,
);
}

Expand All @@ -73,6 +77,11 @@ pub async fn start(mut state: State, storage: Storage) -> Result<(), Report> {

/// Returns transactions belonging to the given `ScanningKey`.
///
/// # Performance / Hangs
///
/// This method can block while reading database files, so it must be inside spawn_blocking()
/// in async code.
///
/// TODO:
/// - Remove the `sapling_tree_size` parameter or turn it into an `Option` once we have access to
/// Zebra's state, and we can retrieve the tree size ourselves.
Expand Down
130 changes: 91 additions & 39 deletions zebra-scan/src/storage.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,21 @@
//! Store viewing keys and results of the scan.

#![allow(dead_code)]
use std::collections::{BTreeMap, HashMap};

use std::collections::HashMap;

use zebra_chain::{block::Height, parameters::Network, transaction::Hash};
use zebra_chain::{
block::Height,
parameters::{Network, NetworkUpgrade},
};
use zebra_state::{SaplingScannedDatabaseEntry, SaplingScannedDatabaseIndex};

use crate::config::Config;

pub mod db;

/// The type used in Zebra to store Sapling scanning keys.
/// It can represent a full viewing key or an individual viewing key.
pub type SaplingScanningKey = String;
// Public types and APIs
pub use db::{SaplingScannedResult, SaplingScanningKey};

use self::db::ScannerWriteBatch;

/// Store key info and results of the scan.
///
Expand All @@ -37,27 +40,19 @@ pub struct Storage {
/// `rocksdb` allows reads and writes via a shared reference,
/// so this database object can be freely cloned.
/// The last instance that is dropped will close the underlying database.
//
// This database is created but not actually used for results.
// TODO: replace the fields below with a database instance.
db: db::ScannerDb,

/// The sapling key and an optional birthday for it.
sapling_keys: HashMap<SaplingScanningKey, Option<Height>>,

/// The sapling key and the related transaction id.
sapling_results: HashMap<SaplingScanningKey, Vec<Hash>>,
}

impl Storage {
/// Opens and returns the on-disk scanner results storage for `config` and `network`.
/// If there is no existing storage, creates a new storage on disk.
///
/// TODO:
/// New keys in `config` are inserted into the database with their birthday heights. Shielded
/// activation is the minimum birthday height.
///
/// Birthdays and scanner progress are marked by inserting an empty result for that height.
///
/// # Performance / Hangs
///
/// This method can block while creating or reading database files, so it must be inside
/// spawn_blocking() in async code.
pub fn new(config: &Config, network: Network) -> Self {
let mut storage = Self::new_db(config, network);

Expand All @@ -69,32 +64,89 @@ impl Storage {
}

/// Add a sapling key to the storage.
///
/// # Performance / Hangs
///
/// This method can block while writing database files, so it must be inside spawn_blocking()
/// in async code.
pub fn add_sapling_key(&mut self, key: SaplingScanningKey, birthday: Option<Height>) {
self.sapling_keys.insert(key, birthday);
// It's ok to write some keys and not others during shutdown, so each key can get its own
// batch. (They will be re-written on startup anyway.)
let mut batch = ScannerWriteBatch::default();

batch.insert_sapling_key(self, key, birthday);

self.write_batch(batch);
}

/// Returns all the keys and their birthdays.
///
/// Birthdays are adjusted to sapling activation if they are too low or missing.
///
/// # Performance / Hangs
///
/// This method can block while reading database files, so it must be inside spawn_blocking()
/// in async code.
pub fn sapling_keys(&self) -> HashMap<SaplingScanningKey, Height> {
self.sapling_keys_and_birthday_heights()
}

/// Add a sapling result to the storage.
pub fn add_sapling_result(&mut self, key: SaplingScanningKey, txid: Hash) {
if let Some(results) = self.sapling_results.get_mut(&key) {
results.push(txid);
} else {
self.sapling_results.insert(key, vec![txid]);
}
///
/// # Performance / Hangs
///
/// This method can block while writing database files, so it must be inside spawn_blocking()
/// in async code.
pub fn add_sapling_result(
&mut self,
sapling_key: SaplingScanningKey,
height: Height,
result: Vec<SaplingScannedResult>,
) {
// It's ok to write some results and not others during shutdown, so each result can get its
// own batch. (They will be re-scanned on startup anyway.)
let mut batch = ScannerWriteBatch::default();

let index = SaplingScannedDatabaseIndex {
sapling_key,
height,
};

let entry = SaplingScannedDatabaseEntry {
index,
value: result,
};

batch.insert_sapling_result(self, entry);

self.write_batch(batch);
}

/// Get the results of a sapling key.
//
// TODO: Rust style - remove "get_" from these names
pub fn get_sapling_results(&self, key: &str) -> Vec<Hash> {
self.sapling_results.get(key).cloned().unwrap_or_default()
/// Returns all the results for a sapling key, for every scanned block height.
///
/// # Performance / Hangs
///
/// This method can block while reading database files, so it must be inside spawn_blocking()
/// in async code.
pub fn sapling_results(
&self,
sapling_key: &SaplingScanningKey,
) -> BTreeMap<Height, Vec<SaplingScannedResult>> {
self.sapling_results_for_key(sapling_key)
}

/// Get all keys and their birthdays.
//
// TODO: any value below sapling activation as the birthday height, or `None`, should default
// to sapling activation. This requires the configured network.
// Return Height not Option<Height>.
pub fn get_sapling_keys(&self) -> HashMap<String, Option<Height>> {
self.sapling_keys.clone()
// Parameters

/// Returns the minimum sapling birthday height for the configured network.
pub fn min_sapling_birthday_height(&self) -> Height {
// Assume that the genesis block never contains shielded inputs or outputs.
//
// # Consensus
//
// For Zcash mainnet and the public testnet, Sapling activates above genesis,
// so this is always true.
NetworkUpgrade::Sapling
.activation_height(self.network())
.unwrap_or(Height(0))
}
}
66 changes: 52 additions & 14 deletions zebra-scan/src/storage/db.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,23 @@
//! Persistent storage for scanner results.

use std::{collections::HashMap, path::Path};
use std::path::Path;

use semver::Version;

use zebra_chain::parameters::Network;
use zebra_state::{DiskWriteBatch, ReadDisk};

use crate::Config;

use super::Storage;

// Public types and APIs
pub use zebra_state::ZebraDb as ScannerDb;
pub use zebra_state::{
SaplingScannedDatabaseEntry, SaplingScannedDatabaseIndex, SaplingScannedResult,
SaplingScanningKey, ZebraDb as ScannerDb,
};

pub mod sapling;

/// The directory name used to distinguish the scanner database from Zebra's other databases or
/// flat files.
Expand All @@ -24,12 +30,14 @@ pub const SCANNER_DATABASE_KIND: &str = "private-scan";
/// Existing column families that aren't listed here are preserved when the database is opened.
pub const SCANNER_COLUMN_FAMILIES_IN_CODE: &[&str] = &[
// Sapling
"sapling_tx_ids",
sapling::SAPLING_TX_IDS,
// Orchard
// TODO
// TODO: add Orchard support
];

impl Storage {
// Creation

/// Opens and returns an on-disk scanner results database instance for `config` and `network`.
/// If there is no existing database, creates a new database on disk.
///
Expand Down Expand Up @@ -64,23 +72,15 @@ impl Storage {
.map(ToString::to_string),
);

let new_storage = Self {
db,
sapling_keys: HashMap::new(),
sapling_results: HashMap::new(),
};
let new_storage = Self { db };

// TODO: report the last scanned height here?
tracing::info!("loaded Zebra scanner cache");

new_storage
}

/// The database format version in the running scanner code.
pub fn database_format_version_in_code() -> Version {
// TODO: implement scanner database versioning
Version::new(0, 0, 0)
}
// Config

/// Returns the configured network for this database.
pub fn network(&self) -> Network {
Expand All @@ -92,6 +92,14 @@ impl Storage {
self.db.path()
}

// Versioning & Upgrades

/// The database format version in the running scanner code.
pub fn database_format_version_in_code() -> Version {
// TODO: implement scanner database versioning
Version::new(0, 0, 0)
}

/// Check for panics in code running in spawned threads.
/// If a thread exited with a panic, resume that panic.
///
Expand All @@ -101,4 +109,34 @@ impl Storage {
pub fn check_for_panics(&mut self) {
self.db.check_for_panics()
}

// General database status

/// Returns true if the database is empty.
pub fn is_empty(&self) -> bool {
// Any column family that is populated at (or near) startup can be used here.
self.db.zs_is_empty(&self.sapling_tx_ids_cf())
}
}

// General writing

/// Wrapper type for scanner database writes.
#[must_use = "batches must be written to the database"]
#[derive(Default)]
pub struct ScannerWriteBatch(pub DiskWriteBatch);

// Redirect method calls to DiskWriteBatch for convenience.
impl std::ops::Deref for ScannerWriteBatch {
type Target = DiskWriteBatch;

fn deref(&self) -> &Self::Target {
&self.0
}
}

impl std::ops::DerefMut for ScannerWriteBatch {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
Loading
Loading