Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: avoid one in memory copy for unsealing #1401

Merged
merged 3 commits into from
Mar 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 126 additions & 13 deletions filecoin-proofs/src/api/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::fs::{self, File};
use std::fs::{self, File, OpenOptions};
use std::io::{self, BufReader, BufWriter, Read, Write};
use std::path::{Path, PathBuf};

Expand All @@ -7,6 +7,7 @@ use bincode::deserialize;
use filecoin_hashers::Hasher;
use fr32::{write_unpadded, Fr32Reader};
use log::{info, trace};
use memmap::MmapOptions;
use merkletree::store::{DiskStore, LevelCacheStore, StoreConfig};
use storage_proofs_core::{
cache_key::CacheKey,
Expand Down Expand Up @@ -83,18 +84,15 @@ pub fn get_unsealed_range<T: Into<PathBuf> + AsRef<Path>, Tree: 'static + Merkle
) -> Result<UnpaddedBytesAmount> {
info!("get_unsealed_range:start");

let f_in = File::open(&sealed_path)
.with_context(|| format!("could not open sealed_path={:?}", sealed_path.as_ref()))?;

let f_out = File::create(&output_path)
.with_context(|| format!("could not create output_path={:?}", output_path.as_ref()))?;

let buf_f_out = BufWriter::new(f_out);

let result = unseal_range::<_, _, _, Tree>(
let result = unseal_range_mapped::<_, _, Tree>(
porep_config,
cache_path,
f_in,
sealed_path.into(),
buf_f_out,
prover_id,
sector_id,
Expand Down Expand Up @@ -130,7 +128,7 @@ pub fn unseal_range<P, R, W, Tree>(
porep_config: PoRepConfig,
cache_path: P,
mut sealed_sector: R,
mut unsealed_output: W,
unsealed_output: W,
prover_id: ProverId,
sector_id: SectorId,
comm_d: Commitment,
Expand Down Expand Up @@ -161,10 +159,126 @@ where
let mut data = Vec::new();
sealed_sector.read_to_end(&mut data)?;

let res = unseal_range_inner::<_, _, Tree>(
porep_config,
cache_path,
&mut data,
unsealed_output,
replica_id,
offset,
num_bytes,
)?;

info!("unseal_range:finish");

Ok(res)
}

/// Unseals the sector read from `sealed_sector` and returns the bytes for a
/// piece whose first (unpadded) byte begins at `offset` and ends at `offset`
/// plus `num_bytes`, inclusive. Note that the entire sector is unsealed each
/// time this function is called.
///
/// # Arguments
///
/// * `porep_config` - porep configuration containing the sector size.
/// * `cache_path` - path to the directory in which the sector data's Merkle Tree is written.
/// * `sealed_sector` - a byte source from which we read sealed sector data.
/// * `unsealed_output` - a byte sink to which we write unsealed, un-bit-padded sector bytes.
/// * `prover_id` - the prover-id that sealed the sector.
/// * `sector_id` - the sector-id of the sealed sector.
/// * `comm_d` - the commitment to the sector's data.
/// * `ticket` - the ticket that was used to generate the sector's replica-id.
/// * `offset` - the byte index in the unsealed sector of the first byte that we want to read.
/// * `num_bytes` - the number of bytes that we want to read.
#[allow(clippy::too_many_arguments)]
pub fn unseal_range_mapped<P, W, Tree>(
porep_config: PoRepConfig,
cache_path: P,
sealed_path: PathBuf,
unsealed_output: W,
prover_id: ProverId,
sector_id: SectorId,
comm_d: Commitment,
ticket: Ticket,
offset: UnpaddedByteIndex,
num_bytes: UnpaddedBytesAmount,
) -> Result<UnpaddedBytesAmount>
where
P: Into<PathBuf> + AsRef<Path>,
W: Write,
Tree: 'static + MerkleTreeTrait,
{
info!("unseal_range_mapped:start");
ensure!(comm_d != [0; 32], "Invalid all zero commitment (comm_d)");

let comm_d =
as_safe_commitment::<<DefaultPieceHasher as Hasher>::Domain, _>(&comm_d, "comm_d")?;

let replica_id = generate_replica_id::<Tree::Hasher, _>(
&prover_id,
sector_id.into(),
&ticket,
comm_d,
&porep_config.porep_id,
);

let mapped_file = OpenOptions::new()
.read(true)
.write(true)
.open(&sealed_path)?;
let mut data = unsafe { MmapOptions::new().map_copy(&mapped_file)? };

let result = unseal_range_inner::<_, _, Tree>(
porep_config,
cache_path,
&mut data,
unsealed_output,
replica_id,
offset,
num_bytes,
);
info!("unseal_range_mapped:finish");

result
}

/// Unseals the sector read from `sealed_sector` and returns the bytes for a
/// piece whose first (unpadded) byte begins at `offset` and ends at `offset`
/// plus `num_bytes`, inclusive. Note that the entire sector is unsealed each
/// time this function is called.
///
/// # Arguments
///
/// * `porep_config` - porep configuration containing the sector size.
/// * `cache_path` - path to the directory in which the sector data's Merkle Tree is written.
/// * `sealed_sector` - a byte source from which we read sealed sector data.
/// * `unsealed_output` - a byte sink to which we write unsealed, un-bit-padded sector bytes.
/// * `prover_id` - the prover-id that sealed the sector.
/// * `sector_id` - the sector-id of the sealed sector.
/// * `comm_d` - the commitment to the sector's data.
/// * `ticket` - the ticket that was used to generate the sector's replica-id.
/// * `offset` - the byte index in the unsealed sector of the first byte that we want to read.
/// * `num_bytes` - the number of bytes that we want to read.
#[allow(clippy::too_many_arguments)]
fn unseal_range_inner<P, W, Tree>(
porep_config: PoRepConfig,
cache_path: P,
data: &mut [u8],
mut unsealed_output: W,
replica_id: <Tree::Hasher as Hasher>::Domain,
offset: UnpaddedByteIndex,
num_bytes: UnpaddedBytesAmount,
) -> Result<UnpaddedBytesAmount>
where
P: Into<PathBuf> + AsRef<Path>,
W: Write,
Tree: 'static + MerkleTreeTrait,
{
info!("unseal_range_inner:start");

let base_tree_size = get_base_tree_size::<DefaultBinaryTree>(porep_config.sector_size)?;
let base_tree_leafs = get_base_tree_leafs::<DefaultBinaryTree>(base_tree_size)?;
// MT for original data is always named tree-d, and it will be
// referenced later in the process as such.
let config = StoreConfig::new(
cache_path.as_ref(),
CacheKey::CommDTree.to_string(),
Expand All @@ -183,11 +297,10 @@ where
let offset_padded: PaddedBytesAmount = UnpaddedBytesAmount::from(offset).into();
let num_bytes_padded: PaddedBytesAmount = num_bytes.into();

let unsealed_all =
StackedDrg::<Tree, DefaultPieceHasher>::extract_all(&pp, &replica_id, &data, Some(config))?;
StackedDrg::<Tree, DefaultPieceHasher>::extract_all(&pp, &replica_id, data, Some(config))?;
let start: usize = offset_padded.into();
let end = start + usize::from(num_bytes_padded);
let unsealed = &unsealed_all[start..end];
let unsealed = &data[start..end];

// If the call to `extract_range` was successful, the `unsealed` vector must
// have a length which equals `num_bytes_padded`. The byte at its 0-index
Expand All @@ -197,7 +310,7 @@ where

let amount = UnpaddedBytesAmount(written as u64);

info!("unseal_range:finish");
info!("unseal_range_inner:finish");
Ok(amount)
}

Expand Down
10 changes: 5 additions & 5 deletions filecoin-proofs/tests/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ use filecoin_proofs::{
add_piece, clear_cache, compute_comm_d, fauxrep_aux, generate_fallback_sector_challenges,
generate_piece_commitment, generate_single_vanilla_proof, generate_window_post,
generate_window_post_with_vanilla, generate_winning_post,
generate_winning_post_sector_challenge, generate_winning_post_with_vanilla, seal_commit_phase1,
seal_commit_phase2, seal_pre_commit_phase1, seal_pre_commit_phase2, unseal_range,
generate_winning_post_sector_challenge, generate_winning_post_with_vanilla, get_unsealed_range,
seal_commit_phase1, seal_commit_phase2, seal_pre_commit_phase1, seal_pre_commit_phase2,
validate_cache_for_commit, validate_cache_for_precommit_phase2, verify_seal,
verify_window_post, verify_winning_post, Commitment, DefaultTreeDomain, MerkleTreeTrait,
PaddedBytesAmount, PieceInfo, PoRepConfig, PoRepProofPartitions, PoStConfig, PoStType,
Expand Down Expand Up @@ -1040,11 +1040,11 @@ fn proof_and_unseal<Tree: 'static + MerkleTreeTrait>(

let commit_output = seal_commit_phase2(config, phase1_output, prover_id, sector_id)?;

let _ = unseal_range::<_, _, _, Tree>(
let _ = get_unsealed_range::<_, Tree>(
config,
cache_dir_path,
sealed_sector_file,
&unseal_file,
sealed_sector_file.path(),
unseal_file.path(),
prover_id,
sector_id,
comm_d,
Expand Down
25 changes: 16 additions & 9 deletions storage-proofs-porep/src/drg/vanilla.rs
Original file line number Diff line number Diff line change
Expand Up @@ -496,36 +496,42 @@ where
fn extract_all<'b>(
pp: &'b Self::PublicParams,
replica_id: &'b <H as Hasher>::Domain,
data: &'b [u8],
data: &'b mut [u8],
_config: Option<StoreConfig>,
) -> Result<Vec<u8>> {
) -> Result<()> {
decode(&pp.graph, replica_id, data, None)
}

fn extract(
pp: &Self::PublicParams,
replica_id: &<H as Hasher>::Domain,
data: &[u8],
data: &mut [u8],
node: usize,
_config: Option<StoreConfig>,
) -> Result<Vec<u8>> {
Ok(decode_block(&pp.graph, replica_id, data, None, node)?.into_bytes())
) -> Result<()> {
let block = decode_block(&pp.graph, replica_id, &data, None, node)?;
let start = node * NODE_SIZE;
let end = start + NODE_SIZE;
let dest = &mut data[start..end];
dest.copy_from_slice(AsRef::<[u8]>::as_ref(&block));

Ok(())
}
}

pub fn decode<'a, H, G>(
graph: &'a G,
replica_id: &'a <H as Hasher>::Domain,
data: &'a [u8],
data: &'a mut [u8],
exp_parents_data: Option<&'a [u8]>,
) -> Result<Vec<u8>>
) -> Result<()>
where
H: Hasher,
G::Key: AsRef<H::Domain>,
G: Graph<H> + Sync,
{
// TODO: proper error handling
let result = (0..graph.size())
let result: Vec<u8> = (0..graph.size())
.into_par_iter()
.flat_map(|i| {
decode_block::<H, G>(graph, replica_id, data, exp_parents_data, i)
Expand All @@ -534,7 +540,8 @@ where
})
.collect();

Ok(result)
data.copy_from_slice(&result);
Ok(())
}

pub fn decode_block<'a, H, G>(
Expand Down
8 changes: 4 additions & 4 deletions storage-proofs-porep/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,15 @@ pub trait PoRep<'a, H: Hasher, G: Hasher>: ProofScheme<'a> {
fn extract_all(
pub_params: &'a Self::PublicParams,
replica_id: &H::Domain,
replica: &[u8],
data: &mut [u8],
config: Option<StoreConfig>,
) -> Result<Vec<u8>>;
) -> Result<()>;

fn extract(
pub_params: &'a Self::PublicParams,
replica_id: &H::Domain,
replica: &[u8],
data: &mut [u8],
node: usize,
config: Option<StoreConfig>,
) -> Result<Vec<u8>>;
) -> Result<()>;
}
14 changes: 6 additions & 8 deletions storage-proofs-porep/src/stacked/vanilla/porep.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,29 +49,27 @@ impl<'a, 'c, Tree: 'static + MerkleTreeTrait, G: 'static + Hasher> PoRep<'a, Tre
fn extract_all<'b>(
pp: &'b PublicParams<Tree>,
replica_id: &'b <Tree::Hasher as Hasher>::Domain,
data: &'b [u8],
data: &'b mut [u8],
config: Option<StoreConfig>,
) -> Result<Vec<u8>> {
let mut data = data.to_vec();

) -> Result<()> {
Self::extract_and_invert_transform_layers(
&pp.graph,
&pp.layer_challenges,
replica_id,
&mut data,
data,
config.expect("Missing store config"),
)?;

Ok(data)
Ok(())
}

fn extract(
_pp: &PublicParams<Tree>,
_replica_id: &<Tree::Hasher as Hasher>::Domain,
_data: &[u8],
_data: &mut [u8],
_node: usize,
_config: Option<StoreConfig>,
) -> Result<Vec<u8>> {
) -> Result<()> {
unimplemented!();
}
}
Loading