From 4e0594cbff320e38744bcc21cf873fe48590c647 Mon Sep 17 00:00:00 2001 From: Friedel Ziegelmayer Date: Mon, 9 Nov 2020 18:16:36 +0100 Subject: [PATCH] perf(filecoin-proofs): speed up Fr32Reader --- filecoin-proofs/Cargo.toml | 2 +- filecoin-proofs/benches/preprocessing.rs | 37 +- filecoin-proofs/src/fr32_reader.rs | 461 +++++++---------------- 3 files changed, 172 insertions(+), 328 deletions(-) diff --git a/filecoin-proofs/Cargo.toml b/filecoin-proofs/Cargo.toml index ba90f2ca7..0cd7ef2b1 100644 --- a/filecoin-proofs/Cargo.toml +++ b/filecoin-proofs/Cargo.toml @@ -36,7 +36,6 @@ anyhow = "1.0.23" rand_xorshift = "0.2.0" sha2 = "0.9.1" typenum = "1.11.2" -bitintr = "0.3.0" gperftools = { version = "0.2", optional = true } generic-array = "0.14.4" structopt = "0.3.12" @@ -45,6 +44,7 @@ indicatif = "0.15.0" groupy = "0.3.0" dialoguer = "0.7.1" clap = "2.33.3" +byte-slice-cast = "1.0.0" [dependencies.reqwest] version = "0.10" diff --git a/filecoin-proofs/benches/preprocessing.rs b/filecoin-proofs/benches/preprocessing.rs index defad7189..ad66aaf2d 100644 --- a/filecoin-proofs/benches/preprocessing.rs +++ b/filecoin-proofs/benches/preprocessing.rs @@ -2,7 +2,7 @@ use std::io::{self, Read}; use std::time::Duration; use criterion::{criterion_group, criterion_main, Criterion, ParameterizedBenchmark, Throughput}; -use filecoin_proofs::fr32_reader::Fr32Reader; +use filecoin_proofs::{add_piece, fr32_reader::Fr32Reader, PaddedBytesAmount, UnpaddedBytesAmount}; use rand::{thread_rng, Rng}; #[cfg(feature = "cpu-profile")] @@ -52,6 +52,7 @@ fn preprocessing_benchmark(c: &mut Criterion) { let mut reader = Fr32Reader::new(io::Cursor::new(&data)); reader.read_to_end(&mut buf).expect("in memory read error"); assert!(buf.len() >= data.len()); + buf.clear(); }); stop_profile(); }, @@ -63,5 +64,37 @@ fn preprocessing_benchmark(c: &mut Criterion) { ); } -criterion_group!(benches, preprocessing_benchmark); +fn add_piece_benchmark(c: &mut Criterion) { + c.bench( + "preprocessing", + ParameterizedBenchmark::new( + "add_piece", + |b, size| { + let padded_size = PaddedBytesAmount(*size as u64); + let unpadded_size: UnpaddedBytesAmount = padded_size.into(); + let data = random_data(unpadded_size.0 as usize); + let mut buf = Vec::with_capacity(*size); + + start_profile(&format!("add_piece_{}", *size)); + b.iter(|| { + add_piece( + io::Cursor::new(&data), + &mut buf, + unpadded_size, + &[unpadded_size][..], + ) + .unwrap(); + buf.clear(); + }); + stop_profile(); + }, + vec![512, 256 * 1024, 512 * 1024, 1024 * 1024, 2 * 1024 * 1024], + ) + .sample_size(10) + .throughput(|s| Throughput::Bytes(*s as u64)) + .warm_up_time(Duration::from_secs(1)), + ); +} + +criterion_group!(benches, preprocessing_benchmark, add_piece_benchmark); criterion_main!(benches); diff --git a/filecoin-proofs/src/fr32_reader.rs b/filecoin-proofs/src/fr32_reader.rs index df6e5b3da..4491c5d0d 100644 --- a/filecoin-proofs/src/fr32_reader.rs +++ b/filecoin-proofs/src/fr32_reader.rs @@ -1,157 +1,118 @@ +use byte_slice_cast::*; use std::io; -const DATA_BITS: u64 = 254; -const TARGET_BITS: u64 = 256; +/// The number of Frs per Block. +const NUM_FRS_PER_BLOCK: usize = 4; +/// The amount of bits in an Fr when not padded. +const IN_BITS_FR: usize = 254; +/// The amount of bits in an Fr when padded. +const OUT_BITS_FR: usize = 256; -#[derive(Debug)] +const NUM_BYTES_IN_BLOCK: usize = NUM_FRS_PER_BLOCK * IN_BITS_FR / 8; +const NUM_BYTES_OUT_BLOCK: usize = NUM_FRS_PER_BLOCK * OUT_BITS_FR / 8; + +const NUM_U128S_PER_BLOCK: usize = NUM_BYTES_OUT_BLOCK / std::mem::size_of::(); + +const MASK_SKIP_HIGH_2: u128 = 0b0011_1111_1111_1111_1111_1111_1111_1111_1111_1111_1111_1111_1111_1111_1111_1111_1111_1111_1111_1111_1111_1111_1111_1111_1111_1111_1111_1111_1111_1111_1111_1111; + +/// An `io::Reader` that converts unpadded input into valid `Fr32` padded output. pub struct Fr32Reader { /// The source being padded. source: R, - /// How much of the target already was `read` from, in bits. - target_offset: u64, - /// Currently read byte. - buffer: Buffer, + /// Currently read block. + /// This is padded to 128 bytes to allow reading all values as `u128`s, but only the first + /// 127 bytes are ever valid. + in_buffer: [u8; NUM_BYTES_IN_BLOCK + 1], + /// Currently writing out block. + out_buffer: [u128; NUM_U128S_PER_BLOCK], + /// The current offset into the `out_buffer` in bytes. + out_offset: usize, + /// How many `Fr32`s are available in the `out_buffer`. + available_frs: usize, /// Are we done reading? done: bool, } +macro_rules! process_fr { + ( + $in_buffer:expr, + $out0:expr, + $out1:expr, + $bit_offset:expr + ) => {{ + $out0 = $in_buffer[0] >> 128 - $bit_offset; + $out0 |= $in_buffer[1] << $bit_offset; + $out1 = $in_buffer[1] >> 128 - $bit_offset; + $out1 |= $in_buffer[2] << $bit_offset; + $out1 &= MASK_SKIP_HIGH_2; // zero high 2 bits + }}; +} + impl Fr32Reader { pub fn new(source: R) -> Self { Fr32Reader { source, - target_offset: 0, - buffer: Default::default(), + in_buffer: [0; NUM_BYTES_IN_BLOCK + 1], + out_buffer: [0; NUM_U128S_PER_BLOCK], + out_offset: 0, + available_frs: 0, done: false, } } - fn read_u8_no_pad(&mut self, target: &mut [u8]) -> io::Result { - target[0] = self.buffer.read_u8(); - self.target_offset += 8; - - Ok(1) - } - - fn read_u16_no_pad(&mut self, target: &mut [u8]) -> io::Result { - self.buffer.read_u16_into(&mut target[..2]); - self.target_offset += 16; - - Ok(2) - } - - fn read_u32_no_pad(&mut self, target: &mut [u8]) -> io::Result { - self.buffer.read_u32_into(&mut target[..4]); - self.target_offset += 32; - - Ok(4) - } - - fn read_u64_no_pad(&mut self, target: &mut [u8]) -> io::Result { - self.buffer.read_u64_into(&mut target[..8]); - self.target_offset += 64; - - Ok(8) - } - - /// Read up to 8 bytes into the targets first element. - /// Assumes that target is not empty. - fn read_bytes(&mut self, target: &mut [u8]) -> io::Result { - let bit_pos = self.target_offset % TARGET_BITS; - let bits_to_padding = if bit_pos < DATA_BITS { - DATA_BITS as usize - bit_pos as usize - } else { - 0 - }; - - if bits_to_padding >= 8 { - self.fill_buffer()?; - } - - let available = self.buffer.available(); - if available > 0 { - let target_len = target.len(); - // Try to avoid padding, and copy as much as possible over at once. - - if bits_to_padding >= 64 && available >= 64 && target_len >= 8 { - return self.read_u64_no_pad(target); - } - - if bits_to_padding >= 32 && available >= 32 && target_len >= 4 { - return self.read_u32_no_pad(target); - } - - if bits_to_padding >= 16 && available >= 16 && target_len >= 2 { - return self.read_u16_no_pad(target); - } + /// Processes a single block in in_buffer, writing the result to out_buffer. + fn process_block(&mut self) { + let in_buffer: &[u128] = self.in_buffer.as_slice_of::().unwrap(); + let out = &mut self.out_buffer; - if bits_to_padding >= 8 && available >= 8 && target_len >= 1 { - return self.read_u8_no_pad(target); - } + // 0..254 + { + out[0] = in_buffer[0]; + out[1] = in_buffer[1] & MASK_SKIP_HIGH_2; } + // 254..508 + process_fr!(&in_buffer[1..], out[2], out[3], 2); + // 508..762 + process_fr!(&in_buffer[3..], out[4], out[5], 4); + // 762..1016 + process_fr!(&in_buffer[5..], out[6], out[7], 6); - self.read_u8_padded(target, bits_to_padding, available) + // Reset buffer offset. + self.out_offset = 0; } - fn read_u8_padded( - &mut self, - target: &mut [u8], - bits_to_padding: usize, - available: u64, - ) -> io::Result { - target[0] = 0; - - if available >= 6 { - match bits_to_padding { - 6 => { - target[0] = self.buffer.read_u8_range(6); - self.target_offset += 8; - return Ok(1); + fn fill_in_buffer(&mut self) -> io::Result { + let mut bytes_read = 0; + let mut buf = &mut self.in_buffer[..NUM_BYTES_IN_BLOCK]; + + while !buf.is_empty() { + match self.source.read(buf) { + Ok(0) => { + break; } - 5 => { - target[0] = self.buffer.read_u8_range(5); - if self.buffer.read_bit() { - set_bit(&mut target[0], 7); - } - self.target_offset += 8; - return Ok(1); + Ok(n) => { + buf = &mut buf[n..]; + bytes_read += n; } - _ => {} + Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {} + Err(e) => return Err(e), } } - for i in 0..8 { - if self.target_offset % TARGET_BITS < DATA_BITS { - if !self.fill_buffer()? { - if i > 0 { - return Ok(1); - } else { - return Ok(0); - } - } - - if self.buffer.read_bit() { - set_bit(&mut target[0], i); - } - }; - - self.target_offset += 1; + // Clear unfilled memory. + for val in &mut self.in_buffer[bytes_read..NUM_BYTES_IN_BLOCK] { + *val = 0; } - Ok(1) + Ok(bytes_read) } +} - /// Fill the inner buffer, only if necessary. Returns `true` if more data is available. - fn fill_buffer(&mut self) -> io::Result { - if self.buffer.available() > 0 { - // Nothing to do, already some data available. - return Ok(true); - } - - let read = self.source.read(&mut self.buffer[..])?; - self.buffer.reset_available(read as u64 * 8); - - Ok(read > 0) - } +/// Division of x by y, rounding up. +/// x must be > 0 +#[inline] +const fn div_ceil(x: usize, y: usize) -> usize { + 1 + ((x - 1) / y) } impl io::Read for Fr32Reader { @@ -160,146 +121,48 @@ impl io::Read for Fr32Reader { return Ok(0); } - let mut read = 0; - while read < target.len() { - let current_read = self.read_bytes(&mut target[read..])?; - read += current_read; - - if current_read == 0 { - self.done = true; - break; - } - } - - Ok(read) - } -} - -fn set_bit(x: &mut u8, bit: usize) { - *x |= 1 << bit -} - -use std::ops::{Deref, DerefMut}; - -#[derive(Debug, Default, Clone, Copy)] -struct Buffer { - data: u64, - /// Bits already consumed. - pos: u64, - /// Bits available. - avail: u64, -} + // The number of bytes already read and written into `target`. + let mut bytes_read = 0; + // The number of bytes to read. + let bytes_to_read = target.len(); -impl Deref for Buffer { - type Target = [u8; 8]; + while bytes_read < bytes_to_read { + // Load and process the next block, if no Frs are available anymore. + if self.available_frs == 0 { + let bytes_read = self.fill_in_buffer()?; - fn deref(&self) -> &Self::Target { - unsafe { &*(&self.data as *const u64 as *const [u8; 8]) } - } -} - -impl DerefMut for Buffer { - fn deref_mut(&mut self) -> &mut Self::Target { - unsafe { &mut *(&mut self.data as *mut u64 as *mut [u8; 8]) } - } -} - -impl Buffer { - /// How many bits are available to read. - #[inline] - pub fn available(&self) -> u64 { - self.avail - self.pos - } - - pub fn reset_available(&mut self, bits: u64) { - self.pos = 0; - self.avail = bits; - } - - /// Read a single bit at the current position. - pub fn read_bit(&mut self) -> bool { - let res = self.data & (1 << self.pos) != 0; - debug_assert!(self.available() >= 1); - self.pos += 1; - res - } - - #[cfg(target_endian = "little")] - pub fn read_u8_range(&mut self, len: u64) -> u8 { - use bitintr::Bextr; - debug_assert!(self.available() >= len); - let res = self.data.bextr(self.pos, len) as u8; - self.pos += len; - res - } - - #[cfg(target_endian = "little")] - pub fn read_u8(&mut self) -> u8 { - use bitintr::Bextr; - debug_assert!(self.available() >= 8); - let res = self.data.bextr(self.pos, 8) as u8; - self.pos += 8; - res - } - - #[cfg(target_endian = "little")] - pub fn read_u16(&mut self) -> u16 { - debug_assert!(self.available() >= 16); - - use bitintr::Bextr; - let res = self.data.bextr(self.pos, 16) as u16; - self.pos += 16; - res - } - - #[cfg(target_endian = "little")] - pub fn read_u16_into(&mut self, target: &mut [u8]) { - assert!(target.len() >= 2); + // All data was read from the source, no new data in the buffer. + if bytes_read == 0 { + self.done = true; + break; + } - let value = self.read_u16().to_le_bytes(); - target[0] = value[0]; - target[1] = value[1]; - } + self.process_block(); - #[cfg(target_endian = "little")] - pub fn read_u32(&mut self) -> u32 { - debug_assert!(self.available() >= 32); + // Update state of how many new Frs are now available. + self.available_frs = div_ceil(bytes_read * 8, IN_BITS_FR); + } - use bitintr::Bextr; - let res = self.data.bextr(self.pos, 32) as u32; - self.pos += 32; - res - } + // Write out as many Frs as available and requested + { + let available_bytes = self.available_frs * (OUT_BITS_FR / 8); - #[cfg(target_endian = "little")] - pub fn read_u32_into(&mut self, target: &mut [u8]) { - assert!(target.len() >= 4); - let value = self.read_u32().to_le_bytes(); - target[0] = value[0]; - target[1] = value[1]; - target[2] = value[2]; - target[3] = value[3]; - } + let target_start = bytes_read; + let target_end = std::cmp::min(target_start + available_bytes, bytes_to_read); + let len = target_end - target_start; - pub fn read_u64(&mut self) -> u64 { - debug_assert!(self.available() >= 64); + let out_start = self.out_offset; + let out_end = out_start + len; - self.pos += 64; - self.data - } + target[target_start..target_end] + .copy_from_slice(&self.out_buffer.as_byte_slice()[out_start..out_end]); + bytes_read += len; + self.out_offset += len; + self.available_frs -= div_ceil(len * 8, OUT_BITS_FR); + } + } - #[cfg(target_endian = "little")] - pub fn read_u64_into(&mut self, target: &mut [u8]) { - assert!(target.len() >= 8); - let value = self.read_u64().to_le_bytes(); - target[0] = value[0]; - target[1] = value[1]; - target[2] = value[2]; - target[3] = value[3]; - target[4] = value[4]; - target[5] = value[5]; - target[6] = value[6]; - target[7] = value[7]; + Ok(bytes_read) } } @@ -309,67 +172,8 @@ mod tests { use pretty_assertions::assert_eq; use std::io::Read; - #[test] - fn test_buffer_read_bit() { - let mut buffer = Buffer::default(); - let val = 12345u64.to_le_bytes(); - buffer.copy_from_slice(&val[..]); - buffer.reset_available(64); - - for i in 0..8 { - assert_eq!(buffer.read_bit(), 0 != val[0] & (1 << i)); - } - } - - #[test] - fn test_buffer_read_u8() { - let mut buffer = Buffer::default(); - let val = 12345u64.to_le_bytes(); - buffer.copy_from_slice(&val[..]); - buffer.reset_available(64); - - for (i, &byte) in val.iter().enumerate().take(8) { - let read = buffer.read_u8(); - assert_eq!(read, byte, "failed to read byte {}", i); - } - } - - #[test] - fn test_buffer_read_u16() { - let mut buffer = Buffer::default(); - let val = 12345u64.to_le_bytes(); - buffer.copy_from_slice(&val[..]); - buffer.reset_available(64); - - for val in val.chunks(2) { - let read = buffer.read_u16(); - assert_eq!(read, u16::from_le_bytes([val[0], val[1]])); - } - } - - #[test] - fn test_buffer_read_u32() { - let mut buffer = Buffer::default(); - let val = 12345u64.to_le_bytes(); - buffer.copy_from_slice(&val[..]); - buffer.reset_available(64); - - for val in val.chunks(4) { - let read = buffer.read_u32(); - assert_eq!(read, u32::from_le_bytes([val[0], val[1], val[2], val[3]])); - } - } - - #[test] - fn test_buffer_read_u64() { - let mut buffer = Buffer::default(); - let val = 12345u64; - buffer.copy_from_slice(&val.to_le_bytes()[..]); - buffer.reset_available(64); - - let read = buffer.read_u64(); - assert_eq!(read, val); - } + const DATA_BITS: u64 = 254; + const TARGET_BITS: u64 = 256; #[test] fn test_simple_short() { @@ -380,7 +184,8 @@ mod tests { reader .read_to_end(&mut padded) .expect("in-memory read failed"); - assert_eq!(&data[..], &padded[..]); + assert_eq!(padded.len(), 32); + assert_eq!(&data[..], &padded[..30]); assert_eq!(padded.into_boxed_slice(), bit_vec_padding(data)); } @@ -397,9 +202,10 @@ mod tests { assert_eq!(&padded[0..31], &data[0..31]); assert_eq!(padded[31], 0b0011_1111); assert_eq!(padded[32], 0b0000_0011); - assert_eq!(padded.len(), 33); - - assert_eq!(padded.into_boxed_slice(), bit_vec_padding(data)); + assert_eq!(padded.len(), 64); + let bv = bit_vec_padding(data); + assert_eq!(bv.len(), 64); + assert_eq!(padded.into_boxed_slice(), bv); } #[test] @@ -479,7 +285,7 @@ mod tests { let mut rng = rand::thread_rng(); for i in 1..100 { - for j in 0..50 { + for j in 1..50 { let mut data = vec![0u8; i * j]; rng.fill_bytes(&mut data); @@ -487,16 +293,17 @@ mod tests { let mut reader = Fr32Reader::new(io::Cursor::new(&data)); reader.read_to_end(&mut buf).expect("in-memory read failed"); - assert_eq!(buf.clone().into_boxed_slice(), bit_vec_padding(data)); + assert_eq!( + buf.into_boxed_slice(), + bit_vec_padding(data), + "{} - {}", + i, + j + ); } } } - // Simple (and slow) padder implementation using `BitVec`. - // It is technically not quite right to use `BitVec` to test this, since at - // the moment that function still uses - // it for some corner cases, but since largely this implementation - // has been replaced it seems reasonable. fn bit_vec_padding(raw_data: Vec) -> Box<[u8]> { use bitvec::{order::Lsb0 as LittleEndian, vec::BitVec}; use itertools::Itertools; @@ -517,6 +324,10 @@ mod tests { } } + while padded_data.len() % (TARGET_BITS as usize) != 0 { + padded_data.push(false); + } + padded_data.into_boxed_slice() }