Skip to content

Commit

Permalink
Add --cache-size to zarrs_reencode
Browse files Browse the repository at this point in the history
  • Loading branch information
LDeakin committed Aug 25, 2024
1 parent 02f2be8 commit da623cb
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 9 deletions.
2 changes: 2 additions & 0 deletions docs/zarrs_reencode.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@ Reencode `array.zarr` (`uint16`) with:
- a shard shape of [128, 128, 0]
- the last dimension of the shard shape will match the array shape to the nearest multiple of the chunk shape
- level 9 blosclz compression with bitshuffling
- an input chunk cache with a size of 1GB

```bash
zarrs_reencode \
--cache-size 1000000000 \
--chunk-shape 32,32,32 \
--shard-shape 128,128,0 \
--bytes-to-bytes-codecs '[ { "name": "blosc", "configuration": { "cname": "blosclz", "clevel": 9, "shuffle": "bitshuffle", "typesize": 2, "blocksize": 0 } } ]' \
Expand Down
20 changes: 15 additions & 5 deletions src/bin/zarrs_reencode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@ use std::sync::Arc;

use clap::Parser;
use indicatif::{ProgressBar, ProgressStyle};
use zarrs::storage::{
storage_adapter::async_to_sync::{AsyncToSyncBlockOn, AsyncToSyncStorageAdapter},
store::AsyncOpendalStore,
AsyncReadableListableStorage, ListableStorageTraits, ReadableListableStorage, StorePrefix,
WritableStorageTraits,
use zarrs::{
array::ChunkCacheLruSizeLimit,
storage::{
storage_adapter::async_to_sync::{AsyncToSyncBlockOn, AsyncToSyncStorageAdapter},
store::AsyncOpendalStore,
AsyncReadableListableStorage, ListableStorageTraits, ReadableListableStorage, StorePrefix,
WritableStorageTraits,
},
};
use zarrs_tools::{
do_reencode, get_array_builder_reencode,
Expand Down Expand Up @@ -45,6 +48,10 @@ struct Args {
/// Print verbose information, such as the array header.
#[arg(long, short, default_value_t = false)]
verbose: bool,

/// An optional chunk cache size (in bytes).
#[arg(long)]
cache_size: Option<u64>,
}

fn bar_style_run() -> ProgressStyle {
Expand Down Expand Up @@ -143,12 +150,15 @@ fn main() -> anyhow::Result<()> {
let array_out = builder.build(storage_out.clone(), "/").unwrap();
array_out.store_metadata().unwrap();

let chunk_cache = args.cache_size.map(ChunkCacheLruSizeLimit::new);

let (duration, duration_read, duration_write, bytes_decoded) = do_reencode(
&array_in,
&array_out,
args.validate,
args.concurrent_chunks,
&progress_callback,
chunk_cache.as_ref(),
)?;
bar.set_style(bar_style_finish());
bar.finish_and_clear();
Expand Down
22 changes: 18 additions & 4 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ use zarrs::{
CodecOptionsBuilder, Crc32cCodec, ShardingCodec,
},
concurrency::RecommendedConcurrency,
Array, ArrayBuilder, ArrayError, CodecChain, DataType, DimensionName, FillValue,
FillValueMetadata,
Array, ArrayBuilder, ArrayChunkCacheExt, ArrayError, ChunkCache, CodecChain, DataType,
DimensionName, FillValue, FillValueMetadata,
},
array_subset::ArraySubset,
config::global_config,
Expand Down Expand Up @@ -595,6 +595,7 @@ pub fn do_reencode<
validate: bool,
concurrent_chunks: Option<usize>,
progress_callback: &ProgressCallback,
cache: Option<&impl ChunkCache>,
) -> Result<(f32, f32, f32, usize), ArrayError> {
let start = SystemTime::now();
let bytes_decoded = Mutex::new(0);
Expand Down Expand Up @@ -637,8 +638,17 @@ pub fn do_reencode<
try_for_each,
|chunk_indices: Vec<u64>| {
let chunk_subset = array_out.chunk_subset(&chunk_indices).unwrap();
let bytes = progress
.read(|| array_in.retrieve_array_subset_opt(&chunk_subset, &codec_options))?;
let bytes = progress.read(|| {
if let Some(cache) = cache {
array_in.retrieve_array_subset_opt_cached(
cache,
&chunk_subset,
&codec_options,
)
} else {
array_in.retrieve_array_subset_opt(&chunk_subset, &codec_options)
}
})?;
*bytes_decoded.lock().unwrap() += bytes.size();

if validate {
Expand All @@ -649,6 +659,10 @@ pub fn do_reencode<
.retrieve_chunk_opt(&chunk_indices, &codec_options)
.unwrap();
assert!(bytes == bytes_out);
// let bytes_in = array_in
// .retrieve_array_subset_opt(&chunk_subset, &codec_options)
// .unwrap();
// assert!(bytes_in == bytes_out);
} else {
progress.write(|| {
array_out.store_chunk_opt(&chunk_indices, bytes, &codec_options)
Expand Down

0 comments on commit da623cb

Please sign in to comment.