Skip to content

Commit

Permalink
fix: avoid one in memory copy for unsealing (#1401)
Browse files Browse the repository at this point in the history
* fix: avoid one in memory copy for unsealing
* feat: re-factor get_unsealed_range to use mmap

unseal_range is not readily able to be converted, but we should be
able to deprecate this call from -api by having unseal call into
get_unsealed_range transparently to the caller

* fix: preserve sealed file data

Co-authored-by: nemo <nemo@protocol.ai>
  • Loading branch information
dignifiedquire and cryptonemo authored Mar 22, 2021
1 parent e619ae9 commit 7d08e87
Show file tree
Hide file tree
Showing 7 changed files with 182 additions and 66 deletions.
139 changes: 126 additions & 13 deletions filecoin-proofs/src/api/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::fs::{self, File};
use std::fs::{self, File, OpenOptions};
use std::io::{self, BufReader, BufWriter, Read, Write};
use std::path::{Path, PathBuf};

Expand All @@ -7,6 +7,7 @@ use bincode::deserialize;
use filecoin_hashers::Hasher;
use fr32::{write_unpadded, Fr32Reader};
use log::{info, trace};
use memmap::MmapOptions;
use merkletree::store::{DiskStore, LevelCacheStore, StoreConfig};
use storage_proofs_core::{
cache_key::CacheKey,
Expand Down Expand Up @@ -83,18 +84,15 @@ pub fn get_unsealed_range<T: Into<PathBuf> + AsRef<Path>, Tree: 'static + Merkle
) -> Result<UnpaddedBytesAmount> {
info!("get_unsealed_range:start");

let f_in = File::open(&sealed_path)
.with_context(|| format!("could not open sealed_path={:?}", sealed_path.as_ref()))?;

let f_out = File::create(&output_path)
.with_context(|| format!("could not create output_path={:?}", output_path.as_ref()))?;

let buf_f_out = BufWriter::new(f_out);

let result = unseal_range::<_, _, _, Tree>(
let result = unseal_range_mapped::<_, _, Tree>(
porep_config,
cache_path,
f_in,
sealed_path.into(),
buf_f_out,
prover_id,
sector_id,
Expand Down Expand Up @@ -130,7 +128,7 @@ pub fn unseal_range<P, R, W, Tree>(
porep_config: PoRepConfig,
cache_path: P,
mut sealed_sector: R,
mut unsealed_output: W,
unsealed_output: W,
prover_id: ProverId,
sector_id: SectorId,
comm_d: Commitment,
Expand Down Expand Up @@ -161,10 +159,126 @@ where
let mut data = Vec::new();
sealed_sector.read_to_end(&mut data)?;

let res = unseal_range_inner::<_, _, Tree>(
porep_config,
cache_path,
&mut data,
unsealed_output,
replica_id,
offset,
num_bytes,
)?;

info!("unseal_range:finish");

Ok(res)
}

/// Unseals the sector read from `sealed_sector` and returns the bytes for a
/// piece whose first (unpadded) byte begins at `offset` and ends at `offset`
/// plus `num_bytes`, inclusive. Note that the entire sector is unsealed each
/// time this function is called.
///
/// # Arguments
///
/// * `porep_config` - porep configuration containing the sector size.
/// * `cache_path` - path to the directory in which the sector data's Merkle Tree is written.
/// * `sealed_sector` - a byte source from which we read sealed sector data.
/// * `unsealed_output` - a byte sink to which we write unsealed, un-bit-padded sector bytes.
/// * `prover_id` - the prover-id that sealed the sector.
/// * `sector_id` - the sector-id of the sealed sector.
/// * `comm_d` - the commitment to the sector's data.
/// * `ticket` - the ticket that was used to generate the sector's replica-id.
/// * `offset` - the byte index in the unsealed sector of the first byte that we want to read.
/// * `num_bytes` - the number of bytes that we want to read.
#[allow(clippy::too_many_arguments)]
pub fn unseal_range_mapped<P, W, Tree>(
porep_config: PoRepConfig,
cache_path: P,
sealed_path: PathBuf,
unsealed_output: W,
prover_id: ProverId,
sector_id: SectorId,
comm_d: Commitment,
ticket: Ticket,
offset: UnpaddedByteIndex,
num_bytes: UnpaddedBytesAmount,
) -> Result<UnpaddedBytesAmount>
where
P: Into<PathBuf> + AsRef<Path>,
W: Write,
Tree: 'static + MerkleTreeTrait,
{
info!("unseal_range_mapped:start");
ensure!(comm_d != [0; 32], "Invalid all zero commitment (comm_d)");

let comm_d =
as_safe_commitment::<<DefaultPieceHasher as Hasher>::Domain, _>(&comm_d, "comm_d")?;

let replica_id = generate_replica_id::<Tree::Hasher, _>(
&prover_id,
sector_id.into(),
&ticket,
comm_d,
&porep_config.porep_id,
);

let mapped_file = OpenOptions::new()
.read(true)
.write(true)
.open(&sealed_path)?;
let mut data = unsafe { MmapOptions::new().map_copy(&mapped_file)? };

let result = unseal_range_inner::<_, _, Tree>(
porep_config,
cache_path,
&mut data,
unsealed_output,
replica_id,
offset,
num_bytes,
);
info!("unseal_range_mapped:finish");

result
}

/// Unseals the sector read from `sealed_sector` and returns the bytes for a
/// piece whose first (unpadded) byte begins at `offset` and ends at `offset`
/// plus `num_bytes`, inclusive. Note that the entire sector is unsealed each
/// time this function is called.
///
/// # Arguments
///
/// * `porep_config` - porep configuration containing the sector size.
/// * `cache_path` - path to the directory in which the sector data's Merkle Tree is written.
/// * `sealed_sector` - a byte source from which we read sealed sector data.
/// * `unsealed_output` - a byte sink to which we write unsealed, un-bit-padded sector bytes.
/// * `prover_id` - the prover-id that sealed the sector.
/// * `sector_id` - the sector-id of the sealed sector.
/// * `comm_d` - the commitment to the sector's data.
/// * `ticket` - the ticket that was used to generate the sector's replica-id.
/// * `offset` - the byte index in the unsealed sector of the first byte that we want to read.
/// * `num_bytes` - the number of bytes that we want to read.
#[allow(clippy::too_many_arguments)]
fn unseal_range_inner<P, W, Tree>(
porep_config: PoRepConfig,
cache_path: P,
data: &mut [u8],
mut unsealed_output: W,
replica_id: <Tree::Hasher as Hasher>::Domain,
offset: UnpaddedByteIndex,
num_bytes: UnpaddedBytesAmount,
) -> Result<UnpaddedBytesAmount>
where
P: Into<PathBuf> + AsRef<Path>,
W: Write,
Tree: 'static + MerkleTreeTrait,
{
info!("unseal_range_inner:start");

let base_tree_size = get_base_tree_size::<DefaultBinaryTree>(porep_config.sector_size)?;
let base_tree_leafs = get_base_tree_leafs::<DefaultBinaryTree>(base_tree_size)?;
// MT for original data is always named tree-d, and it will be
// referenced later in the process as such.
let config = StoreConfig::new(
cache_path.as_ref(),
CacheKey::CommDTree.to_string(),
Expand All @@ -183,11 +297,10 @@ where
let offset_padded: PaddedBytesAmount = UnpaddedBytesAmount::from(offset).into();
let num_bytes_padded: PaddedBytesAmount = num_bytes.into();

let unsealed_all =
StackedDrg::<Tree, DefaultPieceHasher>::extract_all(&pp, &replica_id, &data, Some(config))?;
StackedDrg::<Tree, DefaultPieceHasher>::extract_all(&pp, &replica_id, data, Some(config))?;
let start: usize = offset_padded.into();
let end = start + usize::from(num_bytes_padded);
let unsealed = &unsealed_all[start..end];
let unsealed = &data[start..end];

// If the call to `extract_range` was successful, the `unsealed` vector must
// have a length which equals `num_bytes_padded`. The byte at its 0-index
Expand All @@ -197,7 +310,7 @@ where

let amount = UnpaddedBytesAmount(written as u64);

info!("unseal_range:finish");
info!("unseal_range_inner:finish");
Ok(amount)
}

Expand Down
10 changes: 5 additions & 5 deletions filecoin-proofs/tests/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ use filecoin_proofs::{
add_piece, clear_cache, compute_comm_d, fauxrep_aux, generate_fallback_sector_challenges,
generate_piece_commitment, generate_single_vanilla_proof, generate_window_post,
generate_window_post_with_vanilla, generate_winning_post,
generate_winning_post_sector_challenge, generate_winning_post_with_vanilla, seal_commit_phase1,
seal_commit_phase2, seal_pre_commit_phase1, seal_pre_commit_phase2, unseal_range,
generate_winning_post_sector_challenge, generate_winning_post_with_vanilla, get_unsealed_range,
seal_commit_phase1, seal_commit_phase2, seal_pre_commit_phase1, seal_pre_commit_phase2,
validate_cache_for_commit, validate_cache_for_precommit_phase2, verify_seal,
verify_window_post, verify_winning_post, Commitment, DefaultTreeDomain, MerkleTreeTrait,
PaddedBytesAmount, PieceInfo, PoRepConfig, PoRepProofPartitions, PoStConfig, PoStType,
Expand Down Expand Up @@ -1040,11 +1040,11 @@ fn proof_and_unseal<Tree: 'static + MerkleTreeTrait>(

let commit_output = seal_commit_phase2(config, phase1_output, prover_id, sector_id)?;

let _ = unseal_range::<_, _, _, Tree>(
let _ = get_unsealed_range::<_, Tree>(
config,
cache_dir_path,
sealed_sector_file,
&unseal_file,
sealed_sector_file.path(),
unseal_file.path(),
prover_id,
sector_id,
comm_d,
Expand Down
25 changes: 16 additions & 9 deletions storage-proofs-porep/src/drg/vanilla.rs
Original file line number Diff line number Diff line change
Expand Up @@ -496,36 +496,42 @@ where
fn extract_all<'b>(
pp: &'b Self::PublicParams,
replica_id: &'b <H as Hasher>::Domain,
data: &'b [u8],
data: &'b mut [u8],
_config: Option<StoreConfig>,
) -> Result<Vec<u8>> {
) -> Result<()> {
decode(&pp.graph, replica_id, data, None)
}

fn extract(
pp: &Self::PublicParams,
replica_id: &<H as Hasher>::Domain,
data: &[u8],
data: &mut [u8],
node: usize,
_config: Option<StoreConfig>,
) -> Result<Vec<u8>> {
Ok(decode_block(&pp.graph, replica_id, data, None, node)?.into_bytes())
) -> Result<()> {
let block = decode_block(&pp.graph, replica_id, &data, None, node)?;
let start = node * NODE_SIZE;
let end = start + NODE_SIZE;
let dest = &mut data[start..end];
dest.copy_from_slice(AsRef::<[u8]>::as_ref(&block));

Ok(())
}
}

pub fn decode<'a, H, G>(
graph: &'a G,
replica_id: &'a <H as Hasher>::Domain,
data: &'a [u8],
data: &'a mut [u8],
exp_parents_data: Option<&'a [u8]>,
) -> Result<Vec<u8>>
) -> Result<()>
where
H: Hasher,
G::Key: AsRef<H::Domain>,
G: Graph<H> + Sync,
{
// TODO: proper error handling
let result = (0..graph.size())
let result: Vec<u8> = (0..graph.size())
.into_par_iter()
.flat_map(|i| {
decode_block::<H, G>(graph, replica_id, data, exp_parents_data, i)
Expand All @@ -534,7 +540,8 @@ where
})
.collect();

Ok(result)
data.copy_from_slice(&result);
Ok(())
}

pub fn decode_block<'a, H, G>(
Expand Down
8 changes: 4 additions & 4 deletions storage-proofs-porep/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,15 @@ pub trait PoRep<'a, H: Hasher, G: Hasher>: ProofScheme<'a> {
fn extract_all(
pub_params: &'a Self::PublicParams,
replica_id: &H::Domain,
replica: &[u8],
data: &mut [u8],
config: Option<StoreConfig>,
) -> Result<Vec<u8>>;
) -> Result<()>;

fn extract(
pub_params: &'a Self::PublicParams,
replica_id: &H::Domain,
replica: &[u8],
data: &mut [u8],
node: usize,
config: Option<StoreConfig>,
) -> Result<Vec<u8>>;
) -> Result<()>;
}
14 changes: 6 additions & 8 deletions storage-proofs-porep/src/stacked/vanilla/porep.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,29 +49,27 @@ impl<'a, 'c, Tree: 'static + MerkleTreeTrait, G: 'static + Hasher> PoRep<'a, Tre
fn extract_all<'b>(
pp: &'b PublicParams<Tree>,
replica_id: &'b <Tree::Hasher as Hasher>::Domain,
data: &'b [u8],
data: &'b mut [u8],
config: Option<StoreConfig>,
) -> Result<Vec<u8>> {
let mut data = data.to_vec();

) -> Result<()> {
Self::extract_and_invert_transform_layers(
&pp.graph,
&pp.layer_challenges,
replica_id,
&mut data,
data,
config.expect("Missing store config"),
)?;

Ok(data)
Ok(())
}

fn extract(
_pp: &PublicParams<Tree>,
_replica_id: &<Tree::Hasher as Hasher>::Domain,
_data: &[u8],
_data: &mut [u8],
_node: usize,
_config: Option<StoreConfig>,
) -> Result<Vec<u8>> {
) -> Result<()> {
unimplemented!();
}
}
Loading

0 comments on commit 7d08e87

Please sign in to comment.