From 998b8819a4be38de2dab9defcbb00d6beb3b9940 Mon Sep 17 00:00:00 2001 From: Luiz Irber Date: Fri, 8 Jul 2022 18:39:26 -0700 Subject: [PATCH] rkyv support in signature try to skip md5sum derive typed builder for GatherResult expose md5 start moving mastiff remove unused bigsi and sbt indices Expose iterator methods for Signature remove unused function preparing for MinHashOps Bump version, there will be many breaking changes... more splits default to large minhash, expose frozen use enum for MinHash thru FFI bug fixes and cleanup add c++ stdlib package for mamba fix finch feature try out roaring Add back check command initial update impl add semver check for rust bump once_cell fix rust ci start bringing #1943 more selection more picklist use dashmap in mem_revindex Revert "use dashmap in mem_revindex" This reverts commit 22727b7091dee2dbafd10ce707578c5085aa6cfc. bump rocksdb to 0.19 bump rocksdb to 0.20, bump MSRV to 1.60 update deps add cargo deny config use cibuildwheel configs in pyproject.toml flake cleanup fix cargo.lock tox updates don't worry with in-mem sigs for now --- .github/workflows/dev_envs.yml | 2 +- .github/workflows/rust.yml | 6 + .readthedocs.yml | 4 + Cargo.lock | 420 ++++++- Makefile | 3 +- deny.toml | 1 + doc/developer.md | 2 +- flake.nix | 2 + include/sourmash.h | 1 + pyproject.toml | 16 +- src/core/Cargo.toml | 22 +- src/core/build.rs | 6 +- src/core/cbindgen.toml | 2 +- src/core/src/encodings.rs | 5 + src/core/src/errors.rs | 5 + src/core/src/ffi/index/mod.rs | 1 + src/core/src/ffi/index/revindex.rs | 2 +- src/core/src/ffi/storage.rs | 7 +- src/core/src/from.rs | 6 +- src/core/src/index/linear.rs | 32 +- src/core/src/index/mod.rs | 113 +- src/core/src/index/revindex.rs | 699 ----------- src/core/src/index/revindex/disk_revindex.rs | 549 +++++++++ src/core/src/index/revindex/mem_revindex.rs | 1118 ++++++++++++++++++ src/core/src/index/revindex/mod.rs | 509 ++++++++ src/core/src/lib.rs | 2 + src/core/src/manifest.rs | 186 +++ src/core/src/picklist.rs | 29 + src/core/src/signature.rs | 94 ++ src/core/src/sketch/hyperloglog/mod.rs | 4 + src/core/src/sketch/minhash.rs | 12 + src/core/src/sketch/mod.rs | 4 + src/core/src/sketch/nodegraph.rs | 11 +- src/core/src/storage.rs | 9 +- src/core/tests/minhash.rs | 1 + src/core/tests/storage.rs | 38 + src/sourmash/sbt_storage.py | 4 +- tests/test_index.py | 3 + tox.ini | 7 +- 39 files changed, 3124 insertions(+), 813 deletions(-) delete mode 100644 src/core/src/index/revindex.rs create mode 100644 src/core/src/index/revindex/disk_revindex.rs create mode 100644 src/core/src/index/revindex/mem_revindex.rs create mode 100644 src/core/src/index/revindex/mod.rs create mode 100644 src/core/src/manifest.rs create mode 100644 src/core/src/picklist.rs diff --git a/.github/workflows/dev_envs.yml b/.github/workflows/dev_envs.yml index 33b5069d00..5c8c359bfb 100644 --- a/.github/workflows/dev_envs.yml +++ b/.github/workflows/dev_envs.yml @@ -57,7 +57,7 @@ jobs: - name: install dependencies shell: bash -l {0} - run: mamba install 'tox>=3.27,<4' tox-conda rust git compilers pandoc + run: mamba install 'tox>=3.27,<4' tox-conda rust git compilers pandoc libstdcxx-ng - name: run tests for 3.9 shell: bash -l {0} diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 300169efa6..89c87c7551 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -234,6 +234,12 @@ jobs: toolchain: stable override: true + - name: Check semver + uses: obi1kenobi/cargo-semver-checks-action@v2 + with: + crate-name: sourmash + version-tag-prefix: r + - name: Make sure we can publish the sourmash crate uses: actions-rs/cargo@v1 with: diff --git a/.readthedocs.yml b/.readthedocs.yml index 5b33921869..5479606af7 100644 --- a/.readthedocs.yml +++ b/.readthedocs.yml @@ -9,6 +9,10 @@ build: tools: python: "3.10" rust: "1.64" + apt_packages: + - llvm-dev + - libclang-dev + - clang # Build documentation in the docs/ directory with Sphinx sphinx: diff --git a/Cargo.lock b/Cargo.lock index bb1633c7c6..c2f3ac9cb3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8,6 +8,17 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" +[[package]] +name = "ahash" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fcb51a0695d8f838b1ee009b3fbf66bda078cd64590202a864a8f3e8c4315c47" +dependencies = [ + "getrandom", + "once_cell", + "version_check", +] + [[package]] name = "aliasable" version = "0.1.3" @@ -59,6 +70,12 @@ version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7b7e4c2464d97fe331d41de9d5db0def0a96f4d823b8b32a2efd503578988973" +[[package]] +name = "binary-merge" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "597bb81c80a54b6a4381b23faba8d7774b144c94cbd1d6fe3f1329bd776554ab" + [[package]] name = "bincode" version = "1.3.3" @@ -68,6 +85,27 @@ dependencies = [ "serde", ] +[[package]] +name = "bindgen" +version = "0.65.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cfdf7b466f9a4903edc73f95d6d2bcd5baf8ae620638762244d3f60143643cc5" +dependencies = [ + "bitflags 1.3.2", + "cexpr", + "clang-sys", + "lazy_static", + "lazycell", + "peeking_take_while", + "prettyplease", + "proc-macro2", + "quote", + "regex", + "rustc-hash", + "shlex", + "syn 2.0.23", +] + [[package]] name = "bitflags" version = "1.3.2" @@ -80,18 +118,6 @@ version = "2.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "630be753d4e58660abd17930c71b647fe46c27ea6b63cc59e1e3851406972e42" -[[package]] -name = "bstr" -version = "0.2.17" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ba3569f383e8f1598449f1a423e72e99569137b47740b1da11ef19af3d5c3223" -dependencies = [ - "lazy_static", - "memchr", - "regex-automata", - "serde", -] - [[package]] name = "buffer-redux" version = "1.0.0" @@ -108,12 +134,39 @@ version = "3.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0d261e256854913907f67ed06efbc3338dfe6179796deefc1ff763fc1aee5535" +[[package]] +name = "bytecheck" +version = "0.6.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3a31f923c2db9513e4298b72df143e6e655a759b3d6a0966df18f81223fff54f" +dependencies = [ + "bytecheck_derive", + "ptr_meta", +] + +[[package]] +name = "bytecheck_derive" +version = "0.6.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "edb17c862a905d912174daa27ae002326fff56dc8b8ada50a0a5f0976cb174f0" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.104", +] + [[package]] name = "bytecount" version = "0.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2c676a478f63e9fa2dd5368a42f28bba0d6c560b775f38583c8bbaa7fcd67c9c" +[[package]] +name = "bytemuck" +version = "1.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2f5715e491b5a1598fc2bef5a606847b5dc1d48ea625bd3c02c00de8285591da" + [[package]] name = "byteorder" version = "1.4.3" @@ -164,6 +217,18 @@ name = "cc" version = "1.0.73" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2fff2a6927b3bb87f9595d67196a70493f627687a71d87a0d692242c33f58c11" +dependencies = [ + "jobserver", +] + +[[package]] +name = "cexpr" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6fac387a98bb7c37292057cffc56d62ecb629900026402633ae9160df93a8766" +dependencies = [ + "nom", +] [[package]] name = "cfg-if" @@ -213,6 +278,17 @@ dependencies = [ "half", ] +[[package]] +name = "clang-sys" +version = "1.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a050e2153c5be08febd6734e29298e844fdb0fa21aeddd63b4eb7baa106c69b" +dependencies = [ + "glob", + "libc", + "libloading", +] + [[package]] name = "clap" version = "4.3.0" @@ -374,13 +450,12 @@ dependencies = [ [[package]] name = "csv" -version = "1.1.6" +version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "22813a6dc45b335f9bade10bf7271dc477e81113e89eb251a0bc2a8a81c536e1" +checksum = "af91f40b7355f82b0a891f50e70399475945bb0b0da4f1700ce60761c9d3e359" dependencies = [ - "bstr", "csv-core", - "itoa 0.4.8", + "itoa", "ryu", "serde", ] @@ -396,9 +471,9 @@ dependencies = [ [[package]] name = "cxx" -version = "1.0.85" +version = "1.0.91" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5add3fc1717409d029b20c5b6903fc0c0b02fa6741d820054f4a2efa5e5816fd" +checksum = "86d3488e7665a7a483b57e25bdd90d0aeb2bc7608c8d0346acf2ad3f1caf1d62" dependencies = [ "cc", "cxxbridge-flags", @@ -408,9 +483,9 @@ dependencies = [ [[package]] name = "cxx-build" -version = "1.0.85" +version = "1.0.91" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b4c87959ba14bc6fbc61df77c3fcfe180fc32b93538c4f1031dd802ccb5f2ff0" +checksum = "48fcaf066a053a41a81dfb14d57d99738b767febb8b735c3016e469fac5da690" dependencies = [ "cc", "codespan-reporting", @@ -423,15 +498,15 @@ dependencies = [ [[package]] name = "cxxbridge-flags" -version = "1.0.85" +version = "1.0.91" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "69a3e162fde4e594ed2b07d0f83c6c67b745e7f28ce58c6df5e6b6bef99dfb59" +checksum = "a2ef98b8b717a829ca5603af80e1f9e2e48013ab227b68ef37872ef84ee479bf" [[package]] name = "cxxbridge-macro" -version = "1.0.85" +version = "1.0.91" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3e7e2adeb6a0d4a282e581096b06e1791532b7d576dcde5ccd9382acf55db8e6" +checksum = "086c685979a698443656e5cf7856c95c642295a38599f12fb1ff76fb28d19892" dependencies = [ "proc-macro2", "quote", @@ -531,12 +606,27 @@ dependencies = [ "syn 1.0.104", ] +[[package]] +name = "glob" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b919933a397b79c37e33b77bb2aa3dc8eb6e165ad809e58ff75bc7db2e34574" + [[package]] name = "half" version = "1.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eabb4a44450da02c90444cf74558da904edde8fb4e9035a9a6a4e15445af0bd7" +[[package]] +name = "hashbrown" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "db0d4cf898abf0081f964436dc980e96670a0f36863e4b83aaacdb65c9d7ccc3" +dependencies = [ + "ahash", +] + [[package]] name = "heck" version = "0.4.1" @@ -558,6 +648,12 @@ version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "443144c8cdadd93ebf52ddb4056d257f5b52c04d3c804e657d19eb73fc33668b" +[[package]] +name = "histogram" +version = "0.6.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "12cb882ccb290b8646e554b157ab0b71e64e8d5bef775cd66b6531e52d302669" + [[package]] name = "iana-time-zone" version = "0.1.53" @@ -582,6 +678,15 @@ dependencies = [ "cxx-build", ] +[[package]] +name = "inplace-vec-builder" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cf64c2edc8226891a71f127587a2861b132d2b942310843814d5001d99a1d307" +dependencies = [ + "smallvec", +] + [[package]] name = "io-lifetimes" version = "1.0.11" @@ -616,15 +721,18 @@ dependencies = [ [[package]] name = "itoa" -version = "0.4.8" +version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b71991ff56294aa922b450139ee08b3bfc70982c6b2c7562771375cf73542dd4" +checksum = "1aab8fc367588b89dcee83ab0fd66b72b50b72fa1904d7095045ace2b0c81c35" [[package]] -name = "itoa" -version = "1.0.1" +name = "jobserver" +version = "0.1.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1aab8fc367588b89dcee83ab0fd66b72b50b72fa1904d7095045ace2b0c81c35" +checksum = "af25a77299a7f711a01975c35a6a424eb6862092cc2d6c72c4ed6cbc56dfc1fa" +dependencies = [ + "libc", +] [[package]] name = "js-sys" @@ -641,18 +749,61 @@ version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" +[[package]] +name = "lazycell" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" + [[package]] name = "libc" version = "0.2.146" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f92be4933c13fd498862a9e02a3055f8a8d9c039ce33db97306fd5a6caa7f29b" +[[package]] +name = "libloading" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b67380fd3b2fbe7527a606e18729d21c6f3951633d0500574c4dc22d2d638b9f" +dependencies = [ + "cfg-if", + "winapi", +] + [[package]] name = "libm" version = "0.2.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "348108ab3fba42ec82ff6e9564fc4ca0247bdccdc68dd8af9764bbc79c3c8ffb" +[[package]] +name = "librocksdb-sys" +version = "0.11.0+8.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3386f101bcb4bd252d8e9d2fb41ec3b0862a15a62b478c355b2982efa469e3e" +dependencies = [ + "bindgen", + "bzip2-sys", + "cc", + "glob", + "libc", + "libz-sys", + "lz4-sys", + "zstd-sys", +] + +[[package]] +name = "libz-sys" +version = "1.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9702761c3935f8cc2f101793272e202c72b99da8f4224a19ddcf1279a6450bbf" +dependencies = [ + "cc", + "pkg-config", + "vcpkg", +] + [[package]] name = "link-cplusplus" version = "1.0.8" @@ -680,6 +831,16 @@ version = "0.4.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f" +[[package]] +name = "lz4-sys" +version = "1.9.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57d27b317e207b10f69f5e75494119e391a96f48861ae870d1da6edac98ca900" +dependencies = [ + "cc", + "libc", +] + [[package]] name = "lzma-sys" version = "0.1.17" @@ -731,6 +892,12 @@ dependencies = [ "autocfg", ] +[[package]] +name = "minimal-lexical" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" + [[package]] name = "miniz_oxide" version = "0.4.4" @@ -763,9 +930,9 @@ dependencies = [ [[package]] name = "niffler" -version = "2.4.0" +version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "68c7ffd42bdba05fc9fbfda31283d44c5c8a88fed1a191f68795dba23cc8204b" +checksum = "470dd05a938a5ad42c2cb80ceea4255e275990ee530b86ca164e6d8a19fa407f" dependencies = [ "cfg-if", "flate2", @@ -778,6 +945,16 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2bf50223579dc7cdcfb3bfcacf7069ff68243f8c363f62ffa99cf000a6b9c451" +[[package]] +name = "nom" +version = "7.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d273983c5a657a70a3e8f2a01329822f3b8c8172b73826411a55751e404a0a4a" +dependencies = [ + "memchr", + "minimal-lexical", +] + [[package]] name = "num-integer" version = "0.1.44" @@ -819,6 +996,15 @@ dependencies = [ "libc", ] +[[package]] +name = "numsep" +version = "0.1.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ad5c49c3e12c314efb1f43cba136031b657dcd59ee26936ab2be313c5e97da22" +dependencies = [ + "slicestring", +] + [[package]] name = "once_cell" version = "1.18.0" @@ -855,6 +1041,12 @@ dependencies = [ "syn 2.0.23", ] +[[package]] +name = "peeking_take_while" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19b17cddbe7ec3f8bc800887bab5e717348c95ea2ca0b1bf0837fb964dc67099" + [[package]] name = "piz" version = "0.5.1" @@ -911,6 +1103,16 @@ version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eb9f9e6e233e5c4a35559a617bf40a4ec447db2e84c20b55a6f83167b7e57872" +[[package]] +name = "prettyplease" +version = "0.2.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c64d9ba0963cdcea2e1b2230fbae2bab30eb25a174be395c41e764bfb65dd62" +dependencies = [ + "proc-macro2", + "syn 2.0.23", +] + [[package]] name = "primal-check" version = "0.3.3" @@ -970,6 +1172,26 @@ dependencies = [ "unarray", ] +[[package]] +name = "ptr_meta" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0738ccf7ea06b608c10564b31debd4f5bc5e197fc8bfe088f68ae5ce81e7a4f1" +dependencies = [ + "ptr_meta_derive", +] + +[[package]] +name = "ptr_meta_derive" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16b845dbfca988fa33db069c0e230574d15a3088f147a87b64c7589eb662c9ac" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.104", +] + [[package]] name = "quote" version = "1.0.29" @@ -1058,18 +1280,79 @@ dependencies = [ "regex-syntax", ] -[[package]] -name = "regex-automata" -version = "0.1.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132" - [[package]] name = "regex-syntax" version = "0.6.26" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49b3de9ec5dc0a3417da371aab17d729997c15010e7fd24ff707773a33bddb64" +[[package]] +name = "rend" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "581008d2099240d37fb08d77ad713bcaec2c4d89d50b5b21a8bb1996bbab68ab" +dependencies = [ + "bytecheck", +] + +[[package]] +name = "retain_mut" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8c31b5c4033f8fdde8700e4657be2c497e7288f01515be52168c631e2e4d4086" + +[[package]] +name = "rkyv" +version = "0.7.40" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c30f1d45d9aa61cbc8cd1eb87705470892289bb2d01943e7803b873a57404dc3" +dependencies = [ + "bytecheck", + "hashbrown", + "ptr_meta", + "rend", + "rkyv_derive", + "seahash", +] + +[[package]] +name = "rkyv_derive" +version = "0.7.40" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff26ed6c7c4dfc2aa9480b86a60e3c7233543a270a680e10758a507c5a4ce476" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.104", +] + +[[package]] +name = "roaring" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ef0fb5e826a8bde011ecae6a8539dd333884335c57ff0f003fbe27c25bbe8f71" +dependencies = [ + "bytemuck", + "byteorder", + "retain_mut", +] + +[[package]] +name = "rocksdb" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bb6f170a4041d50a0ce04b0d2e14916d6ca863ea2e422689a5b694395d299ffe" +dependencies = [ + "libc", + "librocksdb-sys", +] + +[[package]] +name = "rustc-hash" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" + [[package]] name = "rustix" version = "0.37.20" @@ -1136,6 +1419,12 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ddccb15bcce173023b3fedd9436f882a0739b8dfb45e4f6b6002bee5929f61b2" +[[package]] +name = "seahash" +version = "4.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1c107b6f4780854c8b126e228ea8869f4d7b71260f962fefb57b996b8959ba6b" + [[package]] name = "serde" version = "1.0.168" @@ -1162,11 +1451,29 @@ version = "1.0.105" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "693151e1ac27563d6dbcec9dee9fbd5da8539b20fa14ad3752b2e6d363ace360" dependencies = [ - "itoa 1.0.1", + "itoa", "ryu", "serde", ] +[[package]] +name = "shlex" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "43b2853a4d09f215c24cc5489c992ce46052d359b5109343cbafbf26bc62f8a3" + +[[package]] +name = "size" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9fed904c7fb2856d868b92464fc8fa597fce366edea1a9cbfaa8cb5fe080bd6d" + +[[package]] +name = "slicestring" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "636b979c5672ac7c2a1120ca0a9a6074cd090dadfec42af6f8a5baea1223d180" + [[package]] name = "smallvec" version = "1.8.0" @@ -1181,7 +1488,7 @@ checksum = "9f1341053f34bb13b5e9590afb7d94b48b48d4b87467ec28e3c238693bb553de" [[package]] name = "sourmash" -version = "0.11.0" +version = "0.12.0" dependencies = [ "assert_matches", "az", @@ -1191,10 +1498,12 @@ dependencies = [ "chrono", "counter", "criterion", + "csv", "finch", "fixedbitset", "getrandom", "getset", + "histogram", "log", "md5", "memmap2", @@ -1203,6 +1512,7 @@ dependencies = [ "niffler", "nohash-hasher", "num-iter", + "numsep", "once_cell", "ouroboros", "piz", @@ -1210,8 +1520,12 @@ dependencies = [ "proptest", "rand", "rayon", + "rkyv", + "roaring", + "rocksdb", "serde", "serde_json", + "size", "tempfile", "thiserror", "twox-hash", @@ -1349,16 +1663,25 @@ checksum = "6ceab39d59e4c9499d4e5a8ee0e2735b891bb7308ac83dfb4e80cad195c9f6f3" [[package]] name = "unicode-width" -version = "0.1.9" +version = "0.1.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3ed742d4ea2bd1176e236172c8429aaf54486e7ac098db29ffe6529e0ce50973" +checksum = "c0edd1e5b14653f783770bce4a4dabb4a5108a5370a5f5d8cfe8710c361f6c8b" + +[[package]] +name = "vcpkg" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" [[package]] name = "vec-collections" -version = "0.3.6" +version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4f2390c4dc8ae8640c57d067b1a3d40bc05c124cc6bc7394d761b53435d41b76" +checksum = "3c9965c8f2ffed1dbcd16cafe18a009642f540fa22661c6cfd6309ddb02e4982" dependencies = [ + "binary-merge", + "inplace-vec-builder", + "lazy_static", "num-traits", "serde", "smallvec", @@ -1601,3 +1924,14 @@ checksum = "c179869f34fc7c01830d3ce7ea2086bc3a07e0d35289b667d0a8bf910258926c" dependencies = [ "lzma-sys", ] + +[[package]] +name = "zstd-sys" +version = "2.0.7+zstd.1.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94509c3ba2fe55294d752b79842c530ccfab760192521df74a081a78d2b3c7f5" +dependencies = [ + "cc", + "libc", + "pkg-config", +] diff --git a/Makefile b/Makefile index 4c2ef69abb..f964bc3cce 100644 --- a/Makefile +++ b/Makefile @@ -32,7 +32,8 @@ include/sourmash.h: src/core/src/lib.rs \ src/core/src/ffi/index/mod.rs \ src/core/src/ffi/index/revindex.rs \ src/core/src/ffi/storage.rs \ - src/core/src/errors.rs + src/core/src/errors.rs \ + src/core/cbindgen.toml cd src/core && \ RUSTC_BOOTSTRAP=1 cbindgen -c cbindgen.toml . -o ../../$@ diff --git a/deny.toml b/deny.toml index 29d148d50b..99f3b442c7 100644 --- a/deny.toml +++ b/deny.toml @@ -29,6 +29,7 @@ default = "deny" confidence-threshold = 0.8 exceptions = [ { allow = ["Zlib"], name = "piz", version = "*" }, + { allow = ["ISC"], name = "libloading", version = "*" }, ] [bans] diff --git a/doc/developer.md b/doc/developer.md index 2368611e7a..b169d557de 100644 --- a/doc/developer.md +++ b/doc/developer.md @@ -25,7 +25,7 @@ and the [`conda-forge`](https://conda-forge.org/) channel by default). Once `mamba` is installed, run ``` -mamba create -n sourmash_dev 'tox>=3.27,<4' tox-conda rust git compilers pandoc +mamba create -n sourmash_dev 'tox>=3.27,<4' tox-conda rust git compilers pandoc libstdcxx-ng ``` to create an environment called `sourmash_dev` containing the programs needed for development. diff --git a/flake.nix b/flake.nix index 43ad3b8d78..cf004e183d 100644 --- a/flake.nix +++ b/flake.nix @@ -106,6 +106,7 @@ #py-spy #heaptrack + cargo-all-features cargo-watch cargo-limit cargo-outdated @@ -114,6 +115,7 @@ nixpkgs-fmt ]; + # Needed for matplotlib LD_LIBRARY_PATH = lib.makeLibraryPath [ pkgs.stdenv.cc.cc.lib ]; # workaround for https://github.com/NixOS/nixpkgs/blob/48dfc9fa97d762bce28cc8372a2dd3805d14c633/doc/languages-frameworks/python.section.md#python-setuppy-bdist_wheel-cannot-create-whl diff --git a/include/sourmash.h b/include/sourmash.h index 6fa7854880..011aee2925 100644 --- a/include/sourmash.h +++ b/include/sourmash.h @@ -42,6 +42,7 @@ enum SourmashErrorCode { SOURMASH_ERROR_CODE_PARSE_INT = 100003, SOURMASH_ERROR_CODE_SERDE_ERROR = 100004, SOURMASH_ERROR_CODE_NIFFLER_ERROR = 100005, + SOURMASH_ERROR_CODE_CSV_ERROR = 100006, }; typedef uint32_t SourmashErrorCode; diff --git a/pyproject.toml b/pyproject.toml index ff3b831c20..dc77145fc2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -144,7 +144,7 @@ include = [ exclude = [ { path = "**/__pycache__/*", format = ["sdist", "wheel"] }, ] -features = ["maturin"] +features = ["maturin", "mastiff"] locked = true module-name = "sourmash._lowlevel" @@ -164,7 +164,7 @@ known_first_party = ["sourmash"] [tool.cibuildwheel] build = "cp39-*" -skip = "*-win32 *-manylinux_i686 *-musllinux_ppc64le *-musllinux_s390x" +skip = "*-win32 *-manylinux_i686 *-musllinux_*" before-all = [ "curl https://sh.rustup.rs -sSf | sh -s -- -y --default-toolchain=stable", "cargo update --dry-run", @@ -178,6 +178,18 @@ build-verbosity = 3 CARGO_REGISTRIES_CRATES_IO_PROTOCOL="sparse" PATH="$HOME/.cargo/bin:$PATH" +[tool.cibuildwheel.linux] +before-all = [ + "curl https://sh.rustup.rs -sSf | sh -s -- -y --default-toolchain=stable", + "cargo update --dry-run", + "if [ -f /etc/system-release ]; then yum -y install centos-release-scl; fi", + "if [ -f /etc/system-release ]; then yum -y install llvm-toolset-7.0; fi", +] +before-build = [ + "if [ -f /etc/system-release ]; then source scl_source enable llvm-toolset-7.0; fi", + "if [ -f /etc/system-release ]; then source scl_source enable devtoolset-10; fi", +] + [tool.cibuildwheel.linux.environment] CARGO_REGISTRIES_CRATES_IO_PROTOCOL="sparse" PATH="$HOME/.cargo/bin:$PATH" diff --git a/src/core/Cargo.toml b/src/core/Cargo.toml index cbc897b28b..87c80ee3dc 100644 --- a/src/core/Cargo.toml +++ b/src/core/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "sourmash" -version = "0.11.0" +version = "0.12.0" authors = ["Luiz Irber "] description = "MinHash sketches for genomic data" repository = "https://github.com/sourmash-bio/sourmash" @@ -22,6 +22,8 @@ bench = false from-finch = ["finch"] parallel = ["rayon"] maturin = [] +mastiff = ["rocksdb", "rkyv", "parallel"] +default = [] [dependencies] az = "1.0.0" @@ -29,6 +31,7 @@ bytecount = "0.6.0" byteorder = "1.4.3" cfg-if = "1.0" counter = "0.5.7" +csv = "1.1.6" finch = { version = "0.6.0", optional = true } fixedbitset = "0.4.0" getrandom = { version = "0.2", features = ["js"] } @@ -47,10 +50,15 @@ primal-check = "0.3.1" thiserror = "1.0" typed-builder = "0.14.0" twox-hash = "1.6.0" -vec-collections = "0.3.4" +vec-collections = "0.4.3" piz = "0.5.0" memmap2 = "0.7.1" ouroboros = "0.17.2" +rkyv = { version = "0.7.39", optional = true } +roaring = "0.10.0" +histogram = "0.6.9" +numsep = "0.1.12" +size = "0.4.0" [dev-dependencies] assert_matches = "1.3.0" @@ -72,6 +80,13 @@ harness = false name = "minhash" harness = false +[package.metadata.cargo-all-features] +skip_optional_dependencies = true +denylist = ["maturin"] +skip_feature_sets = [ + ["mastiff", "parallel"], # mastiff implies parallel +] + ## Wasm section. Crates only used for WASM, as well as specific configurations [target.'cfg(all(target_arch = "wasm32", target_os="unknown"))'.dependencies.wasm-bindgen] @@ -90,4 +105,5 @@ features = ["wasmbind"] wasm-bindgen-test = "0.3.37" ### These crates don't compile on wasm -[target.'cfg(not(all(target_arch = "wasm32", target_os="unknown")))'.dependencies] +[target.'cfg(not(target_arch = "wasm32"))'.dependencies] +rocksdb = { version = "0.21.0", optional = true } diff --git a/src/core/build.rs b/src/core/build.rs index a22396c25a..f067828d50 100644 --- a/src/core/build.rs +++ b/src/core/build.rs @@ -55,12 +55,12 @@ fn copy_c_bindings(crate_dir: &str) { let new_header: String = header .lines() .filter_map(|s| { - if s.starts_with("#") { + if s.starts_with('#') { None } else { Some({ let mut s = s.to_owned(); - s.push_str("\n"); + s.push('\n'); s }) } @@ -71,5 +71,5 @@ fn copy_c_bindings(crate_dir: &str) { let target_dir = find_target_dir(&out_dir); std::fs::create_dir_all(&target_dir).expect("error creating target dir"); let out_path = target_dir.join("header.h"); - std::fs::write(out_path, &new_header).expect("error writing header"); + std::fs::write(out_path, new_header).expect("error writing header"); } diff --git a/src/core/cbindgen.toml b/src/core/cbindgen.toml index cd6cd781c2..bc1538056b 100644 --- a/src/core/cbindgen.toml +++ b/src/core/cbindgen.toml @@ -8,7 +8,7 @@ clean = true [parse.expand] crates = ["sourmash"] -features = [] +features = ["mastiff"] [enum] rename_variants = "QualifiedScreamingSnakeCase" diff --git a/src/core/src/encodings.rs b/src/core/src/encodings.rs index 6010cf2f6d..443db90b50 100644 --- a/src/core/src/encodings.rs +++ b/src/core/src/encodings.rs @@ -7,6 +7,7 @@ use std::str; use nohash_hasher::BuildNoHashHasher; use once_cell::sync::Lazy; +use vec_collections::AbstractVecSet; use crate::Error; @@ -23,6 +24,10 @@ type ColorToIdx = HashMap>; #[allow(non_camel_case_types)] #[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[cfg_attr( + feature = "rkyv", + derive(rkyv::Serialize, rkyv::Deserialize, rkyv::Archive) +)] #[repr(u32)] pub enum HashFunctions { murmur64_DNA = 1, diff --git a/src/core/src/errors.rs b/src/core/src/errors.rs index cd4ddcfaf1..f6c3ce311e 100644 --- a/src/core/src/errors.rs +++ b/src/core/src/errors.rs @@ -63,6 +63,9 @@ pub enum SourmashError { #[error(transparent)] IOError(#[from] std::io::Error), + #[error(transparent)] + CsvError(#[from] csv::Error), + #[cfg(not(all(target_arch = "wasm32", target_os = "unknown")))] #[error(transparent)] Panic(#[from] crate::ffi::utils::Panic), @@ -108,6 +111,7 @@ pub enum SourmashErrorCode { ParseInt = 100_003, SerdeError = 100_004, NifflerError = 100_005, + CsvError = 100_006, } #[cfg(not(all(target_arch = "wasm32", target_os = "unknown")))] @@ -137,6 +141,7 @@ impl SourmashErrorCode { SourmashError::IOError { .. } => SourmashErrorCode::Io, SourmashError::NifflerError { .. } => SourmashErrorCode::NifflerError, SourmashError::Utf8Error { .. } => SourmashErrorCode::Utf8Error, + SourmashError::CsvError { .. } => SourmashErrorCode::CsvError, } } } diff --git a/src/core/src/ffi/index/mod.rs b/src/core/src/ffi/index/mod.rs index 932a97b222..cb98f0963b 100644 --- a/src/core/src/ffi/index/mod.rs +++ b/src/core/src/ffi/index/mod.rs @@ -1,3 +1,4 @@ +#[cfg(feature = "mastiff")] pub mod revindex; use crate::signature::Signature; diff --git a/src/core/src/ffi/index/revindex.rs b/src/core/src/ffi/index/revindex.rs index 3597121bce..8f4691dbad 100644 --- a/src/core/src/ffi/index/revindex.rs +++ b/src/core/src/ffi/index/revindex.rs @@ -1,7 +1,7 @@ use std::path::PathBuf; use std::slice; -use crate::index::revindex::RevIndex; +use crate::index::revindex::mem_revindex::RevIndex; use crate::index::Index; use crate::signature::{Signature, SigsTrait}; use crate::sketch::minhash::KmerMinHash; diff --git a/src/core/src/ffi/storage.rs b/src/core/src/ffi/storage.rs index 86d3834201..882a8d5f20 100644 --- a/src/core/src/ffi/storage.rs +++ b/src/core/src/ffi/storage.rs @@ -1,5 +1,6 @@ use std::os::raw::c_char; use std::slice; +use std::sync::Arc; use crate::ffi::utils::{ForeignObject, SourmashStr}; use crate::prelude::*; @@ -8,7 +9,7 @@ use crate::storage::ZipStorage; pub struct SourmashZipStorage; impl ForeignObject for SourmashZipStorage { - type RustObject = ZipStorage; + type RustObject = Arc; } ffi_fn! { @@ -20,7 +21,7 @@ unsafe fn zipstorage_new(ptr: *const c_char, insize: usize) -> Result<*mut Sourm }; let zipstorage = ZipStorage::from_file(path)?; - Ok(SourmashZipStorage::from_rust(zipstorage)) + Ok(SourmashZipStorage::from_rust(Arc::new(zipstorage))) } } @@ -110,7 +111,7 @@ unsafe fn zipstorage_set_subdir( std::str::from_utf8(path)? }; - storage.set_subdir(path.to_string()); + (*Arc::get_mut(storage).unwrap()).set_subdir(path.to_string()); Ok(()) } } diff --git a/src/core/src/from.rs b/src/core/src/from.rs index dfc384236e..37c98bf40d 100644 --- a/src/core/src/from.rs +++ b/src/core/src/from.rs @@ -23,10 +23,8 @@ impl From for KmerMinHash { values.len() as u32, ); - let hash_with_abunds: Vec<(u64, u64)> = values - .iter() - .map(|x| (x.hash as u64, x.count as u64)) - .collect(); + let hash_with_abunds: Vec<(u64, u64)> = + values.iter().map(|x| (x.hash, x.count as u64)).collect(); new_mh .add_many_with_abund(&hash_with_abunds) diff --git a/src/core/src/index/linear.rs b/src/core/src/index/linear.rs index 78b2c6f1f5..6ae2916f16 100644 --- a/src/core/src/index/linear.rs +++ b/src/core/src/index/linear.rs @@ -6,18 +6,18 @@ use std::path::PathBuf; use serde::{Deserialize, Serialize}; use typed_builder::TypedBuilder; -use crate::index::{Comparable, DatasetInfo, Index, SigStore}; +use crate::index::{DatasetInfo, Index, SigStore}; use crate::prelude::*; use crate::storage::{FSStorage, InnerStorage, Storage, StorageInfo}; use crate::Error; #[derive(TypedBuilder)] -pub struct LinearIndex { +pub struct LinearIndex { #[builder(default)] storage: Option, #[builder(default)] - datasets: Vec>, + datasets: Vec, } #[derive(Serialize, Deserialize)] @@ -27,15 +27,11 @@ struct LinearInfo { leaves: Vec, } -impl<'a, L> Index<'a> for LinearIndex -where - L: Clone + Comparable + 'a, - SigStore: From, -{ - type Item = L; +impl<'a> Index<'a> for LinearIndex { + type Item = Signature; //type SignatureIterator = std::slice::Iter<'a, Self::Item>; - fn insert(&mut self, node: L) -> Result<(), Error> { + fn insert(&mut self, node: Self::Item) -> Result<(), Error> { self.datasets.push(node.into()); Ok(()) } @@ -76,11 +72,7 @@ where */ } -impl LinearIndex -where - L: ToWriter, - SigStore: ReadData, -{ +impl LinearIndex { pub fn save_file>( &mut self, path: P, @@ -115,7 +107,7 @@ where .iter_mut() .map(|l| { // Trigger data loading - let _: &L = (*l).data().unwrap(); + let _: &Signature = (*l).data().unwrap(); // set storage to new one l.storage = Some(storage.clone()); @@ -137,7 +129,7 @@ where Ok(()) } - pub fn from_path>(path: P) -> Result, Error> { + pub fn from_path>(path: P) -> Result { let file = File::open(&path)?; let mut reader = BufReader::new(file); @@ -147,11 +139,11 @@ where basepath.push(path); basepath.canonicalize()?; - let linear = LinearIndex::::from_reader(&mut reader, basepath.parent().unwrap())?; + let linear = LinearIndex::from_reader(&mut reader, basepath.parent().unwrap())?; Ok(linear) } - pub fn from_reader(rdr: R, path: P) -> Result, Error> + pub fn from_reader(rdr: R, path: P) -> Result where R: Read, P: AsRef, @@ -171,7 +163,7 @@ where .leaves .into_iter() .map(|l| { - let mut v: SigStore = l.into(); + let mut v: SigStore = l.into(); v.storage = Some(storage.clone()); v }) diff --git a/src/core/src/index/mod.rs b/src/core/src/index/mod.rs index 832fdf9091..97b5e743d2 100644 --- a/src/core/src/index/mod.rs +++ b/src/core/src/index/mod.rs @@ -4,6 +4,9 @@ //! Some indices also support containment searches. pub mod linear; + +#[cfg(not(all(target_arch = "wasm32", target_os = "unknown")))] +#[cfg(feature = "mastiff")] pub mod revindex; pub mod search; @@ -15,14 +18,85 @@ use once_cell::sync::OnceCell; use serde::{Deserialize, Serialize}; use typed_builder::TypedBuilder; +use crate::encodings::HashFunctions; use crate::errors::ReadDataError; use crate::index::search::{search_minhashes, search_minhashes_containment}; +use crate::picklist::Picklist; use crate::prelude::*; use crate::signature::SigsTrait; use crate::sketch::Sketch; use crate::storage::{InnerStorage, Storage}; use crate::Error; +#[derive(Default)] +pub struct Selection { + ksize: Option, + abund: Option, + num: Option, + scaled: Option, + containment: Option, + moltype: Option, + picklist: Option, +} + +impl Selection { + pub fn ksize(&self) -> Option { + self.ksize + } + + pub fn set_ksize(&mut self, ksize: u32) { + self.ksize = Some(ksize); + } + + pub fn abund(&self) -> Option { + self.abund + } + + pub fn set_abund(&mut self, value: bool) { + self.abund = Some(value); + } + + pub fn num(&self) -> Option { + self.num + } + + pub fn set_num(&mut self, num: u32) { + self.num = Some(num); + } + + pub fn scaled(&self) -> Option { + self.scaled + } + + pub fn set_scaled(&mut self, scaled: u32) { + self.scaled = Some(scaled); + } + + pub fn containment(&self) -> Option { + self.containment + } + + pub fn set_containment(&mut self, containment: bool) { + self.containment = Some(containment); + } + + pub fn moltype(&self) -> Option { + self.moltype + } + + pub fn set_moltype(&mut self, value: HashFunctions) { + self.moltype = Some(value); + } + + pub fn picklist(&self) -> Option { + self.picklist.clone() + } + + pub fn set_picklist(&mut self, value: Picklist) { + self.picklist = Some(value); + } +} + pub trait Index<'a> { type Item: Comparable; //type SignatureIterator: Iterator; @@ -116,7 +190,7 @@ pub struct DatasetInfo { } #[derive(TypedBuilder, Default, Clone)] -pub struct SigStore { +pub struct SigStore { #[builder(setter(into))] filename: String, @@ -129,16 +203,16 @@ pub struct SigStore { storage: Option, #[builder(setter(into), default)] - data: OnceCell, + data: OnceCell, } -impl SigStore { +impl SigStore { pub fn name(&self) -> String { self.name.clone() } } -impl std::fmt::Debug for SigStore { +impl std::fmt::Debug for SigStore { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!( f, @@ -148,7 +222,7 @@ impl std::fmt::Debug for SigStore { } } -impl ReadData for SigStore { +impl ReadData for SigStore { fn data(&self) -> Result<&Signature, Error> { if let Some(sig) = self.data.get() { Ok(sig) @@ -172,10 +246,7 @@ impl ReadData for SigStore { } } -impl SigStore -where - T: ToWriter, -{ +impl SigStore { pub fn save(&self, path: &str) -> Result { if let Some(storage) = &self.storage { if let Some(data) = self.data.get() { @@ -190,10 +261,8 @@ where unimplemented!() } } -} -impl SigStore { - pub fn count_common(&self, other: &SigStore) -> u64 { + pub fn count_common(&self, other: &SigStore) -> u64 { let ng: &Signature = self.data().unwrap(); let ong: &Signature = other.data().unwrap(); @@ -220,13 +289,13 @@ impl SigStore { } } -impl From> for Signature { - fn from(other: SigStore) -> Signature { +impl From for Signature { + fn from(other: SigStore) -> Signature { other.data.get().unwrap().to_owned() } } -impl Deref for SigStore { +impl Deref for SigStore { type Target = Signature; fn deref(&self) -> &Signature { @@ -234,8 +303,8 @@ impl Deref for SigStore { } } -impl From for SigStore { - fn from(other: Signature) -> SigStore { +impl From for SigStore { + fn from(other: Signature) -> SigStore { let name = other.name(); let filename = other.filename(); @@ -249,8 +318,8 @@ impl From for SigStore { } } -impl Comparable> for SigStore { - fn similarity(&self, other: &SigStore) -> f64 { +impl Comparable for SigStore { + fn similarity(&self, other: &SigStore) -> f64 { let ng: &Signature = self.data().unwrap(); let ong: &Signature = other.data().unwrap(); @@ -273,7 +342,7 @@ impl Comparable> for SigStore { unimplemented!() } - fn containment(&self, other: &SigStore) -> f64 { + fn containment(&self, other: &SigStore) -> f64 { let ng: &Signature = self.data().unwrap(); let ong: &Signature = other.data().unwrap(); @@ -325,8 +394,8 @@ impl Comparable for Signature { } } -impl From for SigStore { - fn from(other: DatasetInfo) -> SigStore { +impl From for SigStore { + fn from(other: DatasetInfo) -> SigStore { SigStore { filename: other.filename, name: other.name, diff --git a/src/core/src/index/revindex.rs b/src/core/src/index/revindex.rs deleted file mode 100644 index 0a1fc25d18..0000000000 --- a/src/core/src/index/revindex.rs +++ /dev/null @@ -1,699 +0,0 @@ -use std::collections::{HashMap, HashSet}; -use std::path::{Path, PathBuf}; -use std::sync::atomic::{AtomicUsize, Ordering}; - -use getset::{CopyGetters, Getters, Setters}; -use log::{debug, info}; -use nohash_hasher::BuildNoHashHasher; -use serde::{Deserialize, Serialize}; - -#[cfg(feature = "parallel")] -use rayon::prelude::*; - -use crate::encodings::{Color, Colors, Idx}; -use crate::index::Index; -use crate::signature::{Signature, SigsTrait}; -use crate::sketch::minhash::KmerMinHash; -use crate::sketch::Sketch; -use crate::Error; -use crate::HashIntoType; - -type SigCounter = counter::Counter; - -#[derive(Serialize, Deserialize)] -struct HashToColor(HashMap>); - -impl HashToColor { - fn new() -> Self { - HashToColor(HashMap::< - HashIntoType, - Color, - BuildNoHashHasher, - >::with_hasher(BuildNoHashHasher::default())) - } - - fn get(&self, hash: &HashIntoType) -> Option<&Color> { - self.0.get(hash) - } - - fn retain(&mut self, hashes: &HashSet) { - self.0.retain(|hash, _| hashes.contains(hash)) - } - - fn len(&self) -> usize { - self.0.len() - } - - fn is_empty(&self) -> bool { - self.0.is_empty() - } - - fn add_to(&mut self, colors: &mut Colors, dataset_id: usize, matched_hashes: Vec) { - let mut color = None; - - matched_hashes.into_iter().for_each(|hash| { - color = Some(colors.update(color, &[dataset_id as Idx]).unwrap()); - self.0.insert(hash, color.unwrap()); - }); - } - - fn reduce_hashes_colors( - a: (HashToColor, Colors), - b: (HashToColor, Colors), - ) -> (HashToColor, Colors) { - let ((small_hashes, small_colors), (mut large_hashes, mut large_colors)) = - if a.0.len() > b.0.len() { - (b, a) - } else { - (a, b) - }; - - small_hashes.0.into_iter().for_each(|(hash, color)| { - large_hashes - .0 - .entry(hash) - .and_modify(|entry| { - // Hash is already present. - // Update the current color by adding the indices from - // small_colors. - let ids = small_colors.indices(&color); - let new_color = large_colors.update(Some(*entry), ids).unwrap(); - *entry = new_color; - }) - .or_insert_with(|| { - // In this case, the hash was not present yet. - // we need to create the same color from small_colors - // into large_colors. - let ids = small_colors.indices(&color); - let new_color = large_colors.update(None, ids).unwrap(); - assert_eq!(new_color, color); - new_color - }); - }); - - (large_hashes, large_colors) - } -} - -// Use rkyv for serialization? -// https://davidkoloski.me/rkyv/ -#[derive(Serialize, Deserialize)] -pub struct RevIndex { - hash_to_color: HashToColor, - - sig_files: Vec, - - #[serde(skip)] - ref_sigs: Option>, - - template: Sketch, - colors: Colors, - //#[serde(skip)] - //storage: Option, -} - -impl RevIndex { - pub fn load>( - index_path: P, - queries: Option<&[KmerMinHash]>, - ) -> Result> { - let (rdr, _) = niffler::from_path(index_path)?; - let revindex = if let Some(qs) = queries { - // TODO: avoid loading full revindex if query != None - /* - struct PartialRevIndex { - hashes_to_keep: Option>, - marker: PhantomData T>, - } - - impl PartialRevIndex { - pub fn new(hashes_to_keep: HashSet) -> Self { - PartialRevIndex { - hashes_to_keep: Some(hashes_to_keep), - marker: PhantomData, - } - } - } - */ - - let mut hashes: HashSet = HashSet::new(); - for q in qs { - hashes.extend(q.iter_mins()); - } - - //let mut revindex: RevIndex = PartialRevIndex::new(hashes).deserialize(&rdr).unwrap(); - - let mut revindex: RevIndex = serde_json::from_reader(rdr)?; - revindex.hash_to_color.retain(&hashes); - revindex - } else { - // Load the full revindex - serde_json::from_reader(rdr)? - }; - - Ok(revindex) - } - - pub fn new( - search_sigs: &[PathBuf], - template: &Sketch, - threshold: usize, - queries: Option<&[KmerMinHash]>, - keep_sigs: bool, - ) -> RevIndex { - // If threshold is zero, let's merge all queries and save time later - let merged_query = queries.and_then(|qs| Self::merge_queries(qs, threshold)); - - let processed_sigs = AtomicUsize::new(0); - - #[cfg(feature = "parallel")] - let sig_iter = search_sigs.par_iter(); - - #[cfg(not(feature = "parallel"))] - let sig_iter = search_sigs.iter(); - - let filtered_sigs = sig_iter.enumerate().filter_map(|(dataset_id, filename)| { - let i = processed_sigs.fetch_add(1, Ordering::SeqCst); - if i % 1000 == 0 { - info!("Processed {} reference sigs", i); - } - - let search_sig = Signature::from_path(filename) - .unwrap_or_else(|_| panic!("Error processing {:?}", filename)) - .swap_remove(0); - - RevIndex::map_hashes_colors( - dataset_id, - &search_sig, - queries, - &merged_query, - threshold, - template, - ) - }); - - #[cfg(feature = "parallel")] - let (hash_to_color, colors) = filtered_sigs.reduce( - || (HashToColor::new(), Colors::default()), - HashToColor::reduce_hashes_colors, - ); - - #[cfg(not(feature = "parallel"))] - let (hash_to_color, colors) = filtered_sigs.fold( - (HashToColor::new(), Colors::default()), - HashToColor::reduce_hashes_colors, - ); - - // TODO: build this together with hash_to_idx? - let ref_sigs = if keep_sigs { - #[cfg(feature = "parallel")] - let sigs_iter = search_sigs.par_iter(); - - #[cfg(not(feature = "parallel"))] - let sigs_iter = search_sigs.iter(); - - Some( - sigs_iter - .map(|ref_path| { - Signature::from_path(ref_path) - .unwrap_or_else(|_| panic!("Error processing {:?}", ref_path)) - .swap_remove(0) - }) - .collect(), - ) - } else { - None - }; - - RevIndex { - hash_to_color, - sig_files: search_sigs.into(), - ref_sigs, - template: template.clone(), - colors, - // storage: Some(InnerStorage::new(MemStorage::default())), - } - } - - fn merge_queries(qs: &[KmerMinHash], threshold: usize) -> Option { - if threshold == 0 { - let mut merged = qs[0].clone(); - for query in &qs[1..] { - merged.merge(query).unwrap(); - } - Some(merged) - } else { - None - } - } - - pub fn new_with_sigs( - search_sigs: Vec, - template: &Sketch, - threshold: usize, - queries: Option<&[KmerMinHash]>, - ) -> RevIndex { - // If threshold is zero, let's merge all queries and save time later - let merged_query = queries.and_then(|qs| Self::merge_queries(qs, threshold)); - - let processed_sigs = AtomicUsize::new(0); - - #[cfg(feature = "parallel")] - let sigs_iter = search_sigs.par_iter(); - #[cfg(not(feature = "parallel"))] - let sigs_iter = search_sigs.iter(); - - let filtered_sigs = sigs_iter.enumerate().filter_map(|(dataset_id, sig)| { - let i = processed_sigs.fetch_add(1, Ordering::SeqCst); - if i % 1000 == 0 { - info!("Processed {} reference sigs", i); - } - - RevIndex::map_hashes_colors( - dataset_id, - sig, - queries, - &merged_query, - threshold, - template, - ) - }); - - #[cfg(feature = "parallel")] - let (hash_to_color, colors) = filtered_sigs.reduce( - || (HashToColor::new(), Colors::default()), - HashToColor::reduce_hashes_colors, - ); - - #[cfg(not(feature = "parallel"))] - let (hash_to_color, colors) = filtered_sigs.fold( - (HashToColor::new(), Colors::default()), - HashToColor::reduce_hashes_colors, - ); - - RevIndex { - hash_to_color, - sig_files: vec![], - ref_sigs: search_sigs.into(), - template: template.clone(), - colors, - //storage: None, - } - } - - fn map_hashes_colors( - dataset_id: usize, - search_sig: &Signature, - queries: Option<&[KmerMinHash]>, - merged_query: &Option, - threshold: usize, - template: &Sketch, - ) -> Option<(HashToColor, Colors)> { - let mut search_mh = None; - if let Some(Sketch::MinHash(mh)) = search_sig.select_sketch(template) { - search_mh = Some(mh); - } - - let search_mh = search_mh.expect("Couldn't find a compatible MinHash"); - let mut hash_to_color = HashToColor::new(); - let mut colors = Colors::default(); - - if let Some(qs) = queries { - if let Some(ref merged) = merged_query { - let (matched_hashes, intersection) = merged.intersection(search_mh).unwrap(); - if !matched_hashes.is_empty() || intersection > threshold as u64 { - hash_to_color.add_to(&mut colors, dataset_id, matched_hashes); - } - } else { - for query in qs { - let (matched_hashes, intersection) = query.intersection(search_mh).unwrap(); - if !matched_hashes.is_empty() || intersection > threshold as u64 { - hash_to_color.add_to(&mut colors, dataset_id, matched_hashes); - } - } - } - } else { - let matched = search_mh.mins(); - let size = matched.len() as u64; - if !matched.is_empty() || size > threshold as u64 { - hash_to_color.add_to(&mut colors, dataset_id, matched); - } - }; - - if hash_to_color.is_empty() { - None - } else { - Some((hash_to_color, colors)) - } - } - - pub fn search( - &self, - counter: SigCounter, - similarity: bool, - threshold: usize, - ) -> Result, Box> { - let mut matches = vec![]; - if similarity { - unimplemented!("TODO: threshold correction") - } - - for (dataset_id, size) in counter.most_common() { - if size >= threshold { - matches.push(self.sig_files[dataset_id as usize].to_str().unwrap().into()); - } else { - break; - }; - } - Ok(matches) - } - - pub fn gather( - &self, - mut counter: SigCounter, - threshold: usize, - query: &KmerMinHash, - ) -> Result, Box> { - let mut match_size = usize::max_value(); - let mut matches = vec![]; - - while match_size > threshold && !counter.is_empty() { - let (dataset_id, size) = counter.most_common()[0]; - match_size = if size >= threshold { size } else { break }; - - let p; - let match_path = if self.sig_files.is_empty() { - p = PathBuf::new(); // TODO: Fix somehow? - &p - } else { - &self.sig_files[dataset_id as usize] - }; - - let ref_match; - let match_sig = if let Some(refsigs) = &self.ref_sigs { - &refsigs[dataset_id as usize] - } else { - // TODO: remove swap_remove - ref_match = Signature::from_path(match_path)?.swap_remove(0); - &ref_match - }; - - let mut match_mh = None; - if let Some(Sketch::MinHash(mh)) = match_sig.select_sketch(&self.template) { - match_mh = Some(mh); - } - let match_mh = match_mh.expect("Couldn't find a compatible MinHash"); - - // Calculate stats - let f_orig_query = match_size as f64 / query.size() as f64; - let f_match = match_size as f64 / match_mh.size() as f64; - let filename = match_path.to_str().unwrap().into(); - let name = match_sig.name(); - let unique_intersect_bp = match_mh.scaled() as usize * match_size; - let gather_result_rank = matches.len(); - - let (intersect_orig, _) = match_mh.intersection_size(query)?; - let intersect_bp = (match_mh.scaled() * intersect_orig) as usize; - - let f_unique_to_query = intersect_orig as f64 / query.size() as f64; - let match_ = match_sig.clone(); - - // TODO: all of these - let f_unique_weighted = 0.; - let average_abund = 0; - let median_abund = 0; - let std_abund = 0; - let md5 = "".into(); - let f_match_orig = 0.; - let remaining_bp = 0; - - let result = GatherResult { - intersect_bp, - f_orig_query, - f_match, - f_unique_to_query, - f_unique_weighted, - average_abund, - median_abund, - std_abund, - filename, - name, - md5, - match_, - f_match_orig, - unique_intersect_bp, - gather_result_rank, - remaining_bp, - }; - matches.push(result); - - // Prepare counter for finding the next match by decrementing - // all hashes found in the current match in other datasets - for hash in match_mh.iter_mins() { - if let Some(color) = self.hash_to_color.get(hash) { - for dataset in self.colors.indices(color) { - counter.entry(*dataset).and_modify(|e| { - if *e > 0 { - *e -= 1 - } - }); - } - } - } - counter.remove(&dataset_id); - } - Ok(matches) - } - - pub fn counter_for_query(&self, query: &KmerMinHash) -> SigCounter { - query - .iter_mins() - .filter_map(|hash| self.hash_to_color.get(hash)) - .flat_map(|color| self.colors.indices(color)) - .cloned() - .collect() - } - - pub fn template(&self) -> Sketch { - self.template.clone() - } - - // TODO: mh should be a sketch, or even a sig... - pub(crate) fn find_signatures( - &self, - mh: &KmerMinHash, - threshold: f64, - containment: bool, - _ignore_scaled: bool, - ) -> Result, Error> { - /* - let template_mh = None; - if let Sketch::MinHash(mh) = self.template { - template_mh = Some(mh); - }; - // TODO: throw error - let template_mh = template_mh.unwrap(); - - let tmp_mh; - let mh = if template_mh.scaled() > mh.scaled() { - // TODO: proper error here - tmp_mh = mh.downsample_scaled(self.scaled)?; - &tmp_mh - } else { - mh - }; - - if self.scaled < mh.scaled() && !ignore_scaled { - return Err(LcaDBError::ScaledMismatchError { - db: self.scaled, - query: mh.scaled(), - } - .into()); - } - */ - - // TODO: proper threshold calculation - let threshold: usize = (threshold * (mh.size() as f64)) as _; - - let counter = self.counter_for_query(mh); - - debug!( - "number of matching signatures for hashes: {}", - counter.len() - ); - - let mut results = vec![]; - for (dataset_id, size) in counter.most_common() { - let match_size = if size >= threshold { size } else { break }; - - let p; - let match_path = if self.sig_files.is_empty() { - p = PathBuf::new(); // TODO: Fix somehow? - &p - } else { - &self.sig_files[dataset_id as usize] - }; - - let ref_match; - let match_sig = if let Some(refsigs) = &self.ref_sigs { - &refsigs[dataset_id as usize] - } else { - // TODO: remove swap_remove - ref_match = Signature::from_path(match_path)?.swap_remove(0); - &ref_match - }; - - let mut match_mh = None; - if let Some(Sketch::MinHash(mh)) = match_sig.select_sketch(&self.template) { - match_mh = Some(mh); - } - let match_mh = match_mh.unwrap(); - - if size >= threshold { - let score = if containment { - size as f64 / mh.size() as f64 - } else { - size as f64 / (mh.size() + match_size - size) as f64 - }; - let filename = match_path.to_str().unwrap().into(); - let mut sig = match_sig.clone(); - sig.reset_sketches(); - sig.push(Sketch::MinHash(match_mh.clone())); - results.push((score, sig, filename)); - } else { - break; - }; - } - Ok(results) - } -} - -#[derive(CopyGetters, Getters, Setters, Serialize, Deserialize, Debug)] -pub struct GatherResult { - #[getset(get_copy = "pub")] - intersect_bp: usize, - - #[getset(get_copy = "pub")] - f_orig_query: f64, - - #[getset(get_copy = "pub")] - f_match: f64, - - f_unique_to_query: f64, - f_unique_weighted: f64, - average_abund: usize, - median_abund: usize, - std_abund: usize, - - #[getset(get = "pub")] - filename: String, - - #[getset(get = "pub")] - name: String, - - md5: String, - match_: Signature, - f_match_orig: f64, - unique_intersect_bp: usize, - gather_result_rank: usize, - remaining_bp: usize, -} - -impl GatherResult { - pub fn get_match(&self) -> Signature { - self.match_.clone() - } -} - -impl<'a> Index<'a> for RevIndex { - type Item = Signature; - - fn insert(&mut self, _node: Self::Item) -> Result<(), Error> { - unimplemented!() - } - - fn save>(&self, _path: P) -> Result<(), Error> { - unimplemented!() - } - - fn load>(_path: P) -> Result<(), Error> { - unimplemented!() - } - - fn len(&self) -> usize { - if let Some(refs) = &self.ref_sigs { - refs.len() - } else { - self.sig_files.len() - } - } - - fn signatures(&self) -> Vec { - if let Some(ref sigs) = self.ref_sigs { - sigs.to_vec() - } else { - unimplemented!() - } - } - - fn signature_refs(&self) -> Vec<&Self::Item> { - unimplemented!() - } -} - -#[cfg(test)] -mod test { - use super::*; - - use crate::sketch::minhash::max_hash_for_scaled; - - #[test] - fn revindex_new() { - let max_hash = max_hash_for_scaled(10000); - let template = Sketch::MinHash( - KmerMinHash::builder() - .num(0u32) - .ksize(31) - .max_hash(max_hash) - .build(), - ); - let search_sigs = [ - "../../tests/test-data/gather/GCF_000006945.2_ASM694v2_genomic.fna.gz.sig".into(), - "../../tests/test-data/gather/GCF_000007545.1_ASM754v1_genomic.fna.gz.sig".into(), - ]; - let index = RevIndex::new(&search_sigs, &template, 0, None, false); - assert_eq!(index.colors.len(), 3); - } - - #[test] - fn revindex_many() { - let max_hash = max_hash_for_scaled(10000); - let template = Sketch::MinHash( - KmerMinHash::builder() - .num(0u32) - .ksize(31) - .max_hash(max_hash) - .build(), - ); - let search_sigs = [ - "../../tests/test-data/gather/GCF_000006945.2_ASM694v2_genomic.fna.gz.sig".into(), - "../../tests/test-data/gather/GCF_000007545.1_ASM754v1_genomic.fna.gz.sig".into(), - "../../tests/test-data/gather/GCF_000008105.1_ASM810v1_genomic.fna.gz.sig".into(), - ]; - - let index = RevIndex::new(&search_sigs, &template, 0, None, false); - /* - dbg!(&index.colors.colors); - 0: 86 - 1: 132 - 2: 91 - (0, 1): 53 - (0, 2): 90 - (1, 2): 26 - (0, 1, 2): 261 - union: 739 - */ - //assert_eq!(index.colors.len(), 3); - assert_eq!(index.colors.len(), 7); - } -} diff --git a/src/core/src/index/revindex/disk_revindex.rs b/src/core/src/index/revindex/disk_revindex.rs new file mode 100644 index 0000000000..37be806343 --- /dev/null +++ b/src/core/src/index/revindex/disk_revindex.rs @@ -0,0 +1,549 @@ +use std::hash::{BuildHasher, BuildHasherDefault, Hash, Hasher}; +use std::path::{Path, PathBuf}; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; + +use byteorder::{LittleEndian, WriteBytesExt}; +use log::{info, trace}; +use rayon::prelude::*; +use rocksdb::{ColumnFamilyDescriptor, MergeOperands, Options}; + +use crate::index::revindex::mem_revindex::GatherResult; +use crate::signature::{Signature, SigsTrait}; +use crate::sketch::minhash::KmerMinHash; +use crate::sketch::Sketch; + +use crate::index::revindex::prepare_query; +use crate::index::revindex::{ + self as module, sig_save_to_db, stats_for_cf, Color, DatasetID, Datasets, HashToColor, + QueryColors, SigCounter, SignatureData, DB, HASHES, SIGS, +}; + +fn compute_color(idxs: &Datasets) -> Color { + let s = BuildHasherDefault::::default(); + let mut hasher = s.build_hasher(); + /* + // TODO: remove this... + let mut sorted: Vec<_> = idxs.iter().collect(); + sorted.sort(); + */ + idxs.hash(&mut hasher); + hasher.finish() +} + +#[derive(Debug, Clone)] +pub struct RevIndex { + db: Arc, +} + +fn merge_datasets( + _: &[u8], + existing_val: Option<&[u8]>, + operands: &MergeOperands, +) -> Option> { + let mut datasets = existing_val + .and_then(Datasets::from_slice) + .unwrap_or_default(); + + for op in operands { + let new_vals = Datasets::from_slice(op).unwrap(); + datasets.union(new_vals); + } + // TODO: optimization! if nothing changed, skip as_bytes() + datasets.as_bytes() +} + +/* TODO: need the repair_cf variant, not available in rocksdb-rust yet +pub fn repair(path: &Path) { + let opts = db_options(); + + DB::repair(&opts, path).unwrap() +} +*/ + +impl RevIndex { + pub fn create(path: &Path) -> module::RevIndex { + let mut opts = module::RevIndex::db_options(); + opts.create_if_missing(true); + opts.create_missing_column_families(true); + + // prepare column family descriptors + let cfs = cf_descriptors(); + + let db = Arc::new(DB::open_cf_descriptors(&opts, path, cfs).unwrap()); + + module::RevIndex::Plain(Self { db }) + } + + pub fn open(path: &Path, read_only: bool) -> module::RevIndex { + let opts = module::RevIndex::db_options(); + + // prepare column family descriptors + let cfs = cf_descriptors(); + + let db = if read_only { + Arc::new(DB::open_cf_descriptors_read_only(&opts, path, cfs, false).unwrap()) + } else { + Arc::new(DB::open_cf_descriptors(&opts, path, cfs).unwrap()) + }; + + module::RevIndex::Plain(Self { db }) + } + + fn map_hashes_colors( + &self, + dataset_id: DatasetID, + filename: &PathBuf, + threshold: f64, + template: &Sketch, + save_paths: bool, + ) { + let search_sig = Signature::from_path(filename) + .unwrap_or_else(|_| panic!("Error processing {:?}", filename)) + .swap_remove(0); + + let search_mh = + prepare_query(&search_sig, template).expect("Couldn't find a compatible MinHash"); + + let colors = Datasets::new(&[dataset_id]).as_bytes().unwrap(); + + let cf_hashes = self.db.cf_handle(HASHES).unwrap(); + + let matched = search_mh.mins(); + let size = matched.len() as u64; + if !matched.is_empty() || size > threshold as u64 { + // FIXME threshold is f64 + let mut hash_bytes = [0u8; 8]; + for hash in matched { + (&mut hash_bytes[..]) + .write_u64::(hash) + .expect("error writing bytes"); + self.db + .merge_cf(&cf_hashes, &hash_bytes[..], colors.as_slice()) + .expect("error merging"); + } + } + + sig_save_to_db( + self.db.clone(), + search_sig, + search_mh, + size, + threshold, + save_paths, + filename, + dataset_id, + ); + } + + pub fn counter_for_query(&self, query: &KmerMinHash) -> SigCounter { + info!("Collecting hashes"); + let cf_hashes = self.db.cf_handle(HASHES).unwrap(); + let hashes_iter = query.iter_mins().map(|hash| { + let mut v = vec![0_u8; 8]; + (&mut v[..]) + .write_u64::(*hash) + .expect("error writing bytes"); + (&cf_hashes, v) + }); + + info!("Multi get"); + self.db + .multi_get_cf(hashes_iter) + .into_iter() + .filter_map(|r| r.ok().unwrap_or(None)) + .flat_map(|raw_datasets| { + let new_vals = Datasets::from_slice(&raw_datasets).unwrap(); + new_vals.into_iter() + }) + .collect() + } + + pub fn prepare_gather_counters( + &self, + query: &KmerMinHash, + ) -> (SigCounter, QueryColors, HashToColor) { + let cf_hashes = self.db.cf_handle(HASHES).unwrap(); + let hashes_iter = query.iter_mins().map(|hash| { + let mut v = vec![0_u8; 8]; + (&mut v[..]) + .write_u64::(*hash) + .expect("error writing bytes"); + (&cf_hashes, v) + }); + + /* + build a HashToColors for query, + and a QueryColors (Color -> Datasets) mapping. + Loading Datasets from rocksdb for every hash takes too long. + */ + let mut query_colors: QueryColors = Default::default(); + let mut counter: SigCounter = Default::default(); + + info!("Building hash_to_colors and query_colors"); + let hash_to_colors = query + .iter_mins() + .zip(self.db.multi_get_cf(hashes_iter)) + .filter_map(|(k, r)| { + let raw = r.ok().unwrap_or(None); + raw.map(|raw| { + let new_vals = Datasets::from_slice(&raw).unwrap(); + let color = compute_color(&new_vals); + query_colors + .entry(color) + .or_insert_with(|| new_vals.clone()); + counter.update(new_vals); + (*k, color) + }) + }) + .collect(); + + (counter, query_colors, hash_to_colors) + } + + pub fn matches_from_counter( + &self, + counter: SigCounter, + threshold: usize, + ) -> Vec<(String, usize)> { + let cf_sigs = self.db.cf_handle(SIGS).unwrap(); + + let matches_iter = counter + .most_common() + .into_iter() + .filter_map(|(dataset_id, size)| { + if size >= threshold { + let mut v = vec![0_u8; 8]; + (&mut v[..]) + .write_u64::(dataset_id) + .expect("error writing bytes"); + Some((&cf_sigs, v, size)) + } else { + None + } + }); + + let matches_sizes = matches_iter.clone().map(|(_, _, v)| v); + + info!("Multi get matches"); + self.db + .multi_get_cf(matches_iter.map(|(k, v, _)| (k, v))) + .into_iter() + .zip(matches_sizes) + .filter_map(|(r, size)| r.ok().unwrap_or(None).map(|v| (v, size))) + .filter_map( + |(sigdata, size)| match SignatureData::from_slice(&sigdata).unwrap() { + SignatureData::Empty => None, + SignatureData::External(p) => Some((p, size)), + SignatureData::Internal(sig) => Some((sig.name(), size)), + }, + ) + .collect() + } + + pub fn gather( + &self, + mut counter: SigCounter, + query_colors: QueryColors, + hash_to_color: HashToColor, + threshold: usize, + orig_query: &KmerMinHash, + template: &Sketch, + ) -> Result, Box> { + let mut match_size = usize::max_value(); + let mut matches = vec![]; + let mut key_bytes = [0u8; 8]; + //let mut query: KmerMinHashBTree = orig_query.clone().into(); + + let cf_sigs = self.db.cf_handle(SIGS).unwrap(); + + while match_size > threshold && !counter.is_empty() { + trace!("counter len: {}", counter.len()); + trace!("match size: {}", match_size); + + let (dataset_id, size) = counter.k_most_common_ordered(1)[0]; + match_size = if size >= threshold { size } else { break }; + + (&mut key_bytes[..]) + .write_u64::(dataset_id) + .expect("error writing bytes"); + + let match_sig = self + .db + .get_cf(&cf_sigs, &key_bytes[..]) + .ok() + .map( + |sigdata| match SignatureData::from_slice(&(sigdata.unwrap())).unwrap() { + SignatureData::Empty => todo!("throw error, empty sig"), + SignatureData::External(_p) => todo!("Load from external"), + SignatureData::Internal(sig) => sig, + }, + ) + .unwrap_or_else(|| panic!("Unknown dataset {}", dataset_id)); + + let match_mh = + prepare_query(&match_sig, template).expect("Couldn't find a compatible MinHash"); + + // Calculate stats + let f_orig_query = match_size as f64 / orig_query.size() as f64; + let f_match = match_size as f64 / match_mh.size() as f64; + let name = match_sig.name(); + let unique_intersect_bp = match_mh.scaled() as usize * match_size; + let gather_result_rank = matches.len(); + + let (intersect_orig, _) = match_mh.intersection_size(orig_query)?; + let intersect_bp = (match_mh.scaled() * intersect_orig) as usize; + + let f_unique_to_query = intersect_orig as f64 / orig_query.size() as f64; + let match_ = match_sig.clone(); + let md5 = match_sig.md5sum(); + + // TODO: all of these + let filename = "".into(); + let f_unique_weighted = 0.; + let average_abund = 0; + let median_abund = 0; + let std_abund = 0; + let f_match_orig = 0.; + let remaining_bp = 0; + + let result = GatherResult::builder() + .intersect_bp(intersect_bp) + .f_orig_query(f_orig_query) + .f_match(f_match) + .f_unique_to_query(f_unique_to_query) + .f_unique_weighted(f_unique_weighted) + .average_abund(average_abund) + .median_abund(median_abund) + .std_abund(std_abund) + .filename(filename) + .name(name) + .md5(md5) + .match_(match_) + .f_match_orig(f_match_orig) + .unique_intersect_bp(unique_intersect_bp) + .gather_result_rank(gather_result_rank) + .remaining_bp(remaining_bp) + .build(); + matches.push(result); + + trace!("Preparing counter for next round"); + // Prepare counter for finding the next match by decrementing + // all hashes found in the current match in other datasets + // TODO: not used at the moment, so just skip. + //query.remove_many(match_mh.to_vec().as_slice())?; + + // TODO: Use HashesToColors here instead. If not initialized, + // build it. + match_mh + .iter_mins() + .filter_map(|hash| hash_to_color.get(hash)) + .flat_map(|color| { + // TODO: remove this clone + query_colors.get(color).unwrap().clone().into_iter() + }) + .for_each(|dataset| { + // TODO: collect the flat_map into a Counter, and remove more + // than one at a time... + counter.entry(dataset).and_modify(|e| { + if *e > 0 { + *e -= 1 + } + }); + }); + + counter.remove(&dataset_id); + } + Ok(matches) + } + + pub fn index( + &self, + index_sigs: Vec, + template: &Sketch, + threshold: f64, + save_paths: bool, + ) { + let processed_sigs = AtomicUsize::new(0); + + index_sigs + .par_iter() + .enumerate() + .for_each(|(dataset_id, filename)| { + let i = processed_sigs.fetch_add(1, Ordering::SeqCst); + if i % 1000 == 0 { + info!("Processed {} reference sigs", i); + } + + self.map_hashes_colors( + dataset_id as DatasetID, + filename, + threshold, + template, + save_paths, + ); + }); + info!("Processed {} reference sigs", processed_sigs.into_inner()); + } + + pub fn update( + &self, + index_sigs: Vec, + template: &Sketch, + threshold: f64, + save_paths: bool, + ) { + use byteorder::ReadBytesExt; + + if !save_paths { + todo!("only supports with save_paths=True for now"); + } + + let cf_sigs = self.db.cf_handle(SIGS).unwrap(); + let iter = self.db.iterator_cf(&cf_sigs, rocksdb::IteratorMode::Start); + + info!("Verifying existing sigs"); + // verify data match up to this point + let mut max_dataset_id = 0; + let to_skip = iter + .map(|result| { + let (key, value) = result.unwrap(); + let current_dataset_id = (&key[..]).read_u64::().unwrap(); + + let filename = &index_sigs[current_dataset_id as usize]; + let sig_data = SignatureData::from_slice(&value).unwrap(); + match sig_data { + SignatureData::External(sig) => { + assert_eq!(sig, filename.as_os_str().to_str().unwrap().to_string()) + } + SignatureData::Empty => (), + SignatureData::Internal(_) => { + todo!("only supports with save_paths=True for now") + } + }; + max_dataset_id = max_dataset_id.max(current_dataset_id); + }) + .count(); + + max_dataset_id += 1; + assert_eq!(max_dataset_id as usize, to_skip); + + // process the remainder + let processed_sigs = AtomicUsize::new(0); + + index_sigs + .par_iter() + .skip(to_skip) + .enumerate() + .for_each(|(i, filename)| { + let dataset_id = i + to_skip; + + let i = processed_sigs.fetch_add(1, Ordering::SeqCst); + if i % 1000 == 0 { + info!("Processed {} reference sigs", i); + } + + self.map_hashes_colors( + dataset_id as DatasetID, + filename, + threshold, + template, + save_paths, + ); + }); + + info!( + "Processed additional {} reference sigs", + processed_sigs.into_inner() + ); + } + + pub fn check(&self, quick: bool) { + stats_for_cf(self.db.clone(), HASHES, true, quick); + info!(""); + stats_for_cf(self.db.clone(), SIGS, false, quick); + } + + pub fn compact(&self) { + for cf_name in [HASHES, SIGS] { + let cf = self.db.cf_handle(cf_name).unwrap(); + self.db.compact_range_cf(&cf, None::<&[u8]>, None::<&[u8]>) + } + } + + pub fn flush(&self) -> Result<(), Box> { + self.db.flush_wal(true)?; + + for cf_name in [HASHES, SIGS] { + let cf = self.db.cf_handle(cf_name).unwrap(); + self.db.flush_cf(&cf)?; + } + + Ok(()) + } + + pub fn convert(&self, _output_db: module::RevIndex) -> Result<(), Box> { + todo!() + /* + if let RevIndex::Color(db) = output_db { + let other_db = db.db; + + let cf_hashes = self.db.cf_handle(HASHES).unwrap(); + + info!("start converting colors"); + let mut color_bytes = [0u8; 8]; + let iter = self + .db + .iterator_cf(&cf_hashes, rocksdb::IteratorMode::Start); + for (key, value) in iter { + let datasets = Datasets::from_slice(&value).unwrap(); + let new_idx: Vec<_> = datasets.into_iter().collect(); + let new_color = Colors::update(other_db.clone(), None, new_idx.as_slice()).unwrap(); + + (&mut color_bytes[..]) + .write_u64::(new_color) + .expect("error writing bytes"); + other_db + .put_cf(&cf_hashes, &key[..], &color_bytes[..]) + .unwrap(); + } + info!("finished converting colors"); + + info!("copying sigs to output"); + let cf_sigs = self.db.cf_handle(SIGS).unwrap(); + let iter = self.db.iterator_cf(&cf_sigs, rocksdb::IteratorMode::Start); + for (key, value) in iter { + other_db.put_cf(&cf_sigs, &key[..], &value[..]).unwrap(); + } + info!("finished copying sigs to output"); + + Ok(()) + } else { + todo!() + } + */ + } +} + +fn cf_descriptors() -> Vec { + let mut cfopts = Options::default(); + cfopts.set_max_write_buffer_number(16); + cfopts.set_merge_operator_associative("datasets operator", merge_datasets); + cfopts.set_min_write_buffer_number_to_merge(10); + + // Updated default from + // https://github.com/facebook/rocksdb/wiki/Setup-Options-and-Basic-Tuning#other-general-options + cfopts.set_level_compaction_dynamic_level_bytes(true); + + let cf_hashes = ColumnFamilyDescriptor::new(HASHES, cfopts); + + let mut cfopts = Options::default(); + cfopts.set_max_write_buffer_number(16); + // Updated default + cfopts.set_level_compaction_dynamic_level_bytes(true); + //cfopts.set_merge_operator_associative("colors operator", merge_colors); + + let cf_sigs = ColumnFamilyDescriptor::new(SIGS, cfopts); + + vec![cf_hashes, cf_sigs] +} diff --git a/src/core/src/index/revindex/mem_revindex.rs b/src/core/src/index/revindex/mem_revindex.rs new file mode 100644 index 0000000000..ed8a1b5152 --- /dev/null +++ b/src/core/src/index/revindex/mem_revindex.rs @@ -0,0 +1,1118 @@ +use std::collections::{HashMap, HashSet}; +use std::path::{Path, PathBuf}; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; + +use getset::{CopyGetters, Getters, Setters}; +use log::{debug, info}; +use nohash_hasher::BuildNoHashHasher; +use serde::{Deserialize, Serialize}; +use typed_builder::TypedBuilder; + +#[cfg(feature = "parallel")] +use rayon::prelude::*; + +use crate::encodings::{Color, Colors, Idx}; +use crate::index::{Index, Selection, SigStore}; +use crate::manifest::Manifest; +use crate::signature::{Signature, SigsTrait}; +use crate::sketch::minhash::KmerMinHash; +use crate::sketch::Sketch; +use crate::storage::{Storage, ZipStorage}; +use crate::Error; +use crate::HashIntoType; + +type SigCounter = counter::Counter; + +#[derive(Serialize, Deserialize)] +struct HashToColor(HashMap>); + +impl HashToColor { + fn new() -> Self { + HashToColor(HashMap::< + HashIntoType, + Color, + BuildNoHashHasher, + >::with_hasher(BuildNoHashHasher::default())) + } + + fn get(&self, hash: &HashIntoType) -> Option<&Color> { + self.0.get(hash) + } + + fn retain(&mut self, hashes: &HashSet) { + self.0.retain(|hash, _| hashes.contains(hash)) + } + + fn len(&self) -> usize { + self.0.len() + } + + fn is_empty(&self) -> bool { + self.0.is_empty() + } + + fn add_to(&mut self, colors: &mut Colors, dataset_id: usize, matched_hashes: Vec) { + let mut color = None; + + matched_hashes.into_iter().for_each(|hash| { + color = Some(colors.update(color, &[dataset_id as Idx]).unwrap()); + self.0.insert(hash, color.unwrap()); + }); + } + + fn reduce_hashes_colors( + a: (HashToColor, Colors), + b: (HashToColor, Colors), + ) -> (HashToColor, Colors) { + let ((small_hashes, small_colors), (mut large_hashes, mut large_colors)) = + if a.0.len() > b.0.len() { + (b, a) + } else { + (a, b) + }; + + small_hashes.0.into_iter().for_each(|(hash, color)| { + large_hashes + .0 + .entry(hash) + .and_modify(|entry| { + // Hash is already present. + // Update the current color by adding the indices from + // small_colors. + let ids = small_colors.indices(&color); + let new_color = large_colors.update(Some(*entry), ids).unwrap(); + *entry = new_color; + }) + .or_insert_with(|| { + // In this case, the hash was not present yet. + // we need to create the same color from small_colors + // into large_colors. + let ids = small_colors.indices(&color); + let new_color = large_colors.update(None, ids).unwrap(); + assert_eq!(new_color, color); + new_color + }); + }); + + (large_hashes, large_colors) + } +} + +// Use rkyv for serialization? +// https://davidkoloski.me/rkyv/ +#[derive(Serialize, Deserialize)] +pub struct RevIndex { + linear: LinearRevIndex, + hash_to_color: HashToColor, + colors: Colors, +} + +#[derive(Serialize, Deserialize)] +pub struct LinearRevIndex { + sig_files: Manifest, + + #[serde(skip)] + ref_sigs: Option>, + + template: Sketch, + + #[serde(skip)] + storage: Option>, +} + +impl LinearRevIndex { + pub fn new( + sig_files: Option, + template: &Sketch, + keep_sigs: bool, + ref_sigs: Option>, + storage: Option, + ) -> Self { + if ref_sigs.is_none() && sig_files.is_none() { + todo!("throw error, one need to be set"); + } + + let ref_sigs = if let Some(ref_sigs) = ref_sigs { + Some(ref_sigs.into_iter().map(|m| m.into()).collect()) + } else if keep_sigs { + let search_sigs: Vec<_> = sig_files + .as_ref() + .unwrap() + .internal_locations() + .map(PathBuf::from) + .collect(); + + #[cfg(feature = "parallel")] + let sigs_iter = search_sigs.par_iter(); + + #[cfg(not(feature = "parallel"))] + let sigs_iter = search_sigs.iter(); + + Some( + sigs_iter + .map(|ref_path| { + if let Some(storage) = &storage { + let sig_data = storage + .load(ref_path.to_str().unwrap_or_else(|| { + panic!("error converting path {:?}", ref_path) + })) + .unwrap_or_else(|_| panic!("error loading {:?}", ref_path)); + Signature::from_reader(sig_data.as_slice()) + .unwrap_or_else(|_| panic!("Error processing {:?}", ref_path)) + .swap_remove(0) + .into() + } else { + Signature::from_path(ref_path) + .unwrap_or_else(|_| panic!("Error processing {:?}", ref_path)) + .swap_remove(0) + .into() + } + }) + .collect(), + ) + } else { + None + }; + + let storage = storage.map(Arc::new); + + let sig_files = sig_files.unwrap_or_else(|| { + todo!("generate manifest for ref_sigs"); + }); + + LinearRevIndex { + sig_files, + template: template.clone(), + ref_sigs, + storage, + } + } + + fn index( + self, + threshold: usize, + merged_query: Option, + queries: Option<&[KmerMinHash]>, + ) -> RevIndex { + let processed_sigs = AtomicUsize::new(0); + + let search_sigs: Vec<_> = self + .sig_files + .internal_locations() + .map(PathBuf::from) + .collect(); + + #[cfg(feature = "parallel")] + let sig_iter = search_sigs.par_iter(); + + #[cfg(not(feature = "parallel"))] + let sig_iter = search_sigs.iter(); + + let filtered_sigs = sig_iter.enumerate().filter_map(|(dataset_id, filename)| { + let i = processed_sigs.fetch_add(1, Ordering::SeqCst); + if i % 1000 == 0 { + info!("Processed {} reference sigs", i); + } + + let search_sig = if let Some(storage) = &self.storage { + let sig_data = storage + .load( + filename + .to_str() + .unwrap_or_else(|| panic!("error converting path {:?}", filename)), + ) + .unwrap_or_else(|_| panic!("error loading {:?}", filename)); + + Signature::from_reader(sig_data.as_slice()) + } else { + Signature::from_path(filename) + } + .unwrap_or_else(|_| panic!("Error processing {:?}", filename)) + .swap_remove(0); + + RevIndex::map_hashes_colors( + dataset_id, + &search_sig, + queries, + &merged_query, + threshold, + &self.template, + ) + }); + + #[cfg(feature = "parallel")] + let (hash_to_color, colors) = filtered_sigs.reduce( + || (HashToColor::new(), Colors::default()), + HashToColor::reduce_hashes_colors, + ); + + #[cfg(not(feature = "parallel"))] + let (hash_to_color, colors) = filtered_sigs.fold( + (HashToColor::new(), Colors::default()), + HashToColor::reduce_hashes_colors, + ); + + RevIndex { + hash_to_color, + colors, + linear: self, + } + } + + pub fn location(&self) -> Option { + if let Some(storage) = &self.storage { + storage.path() + } else { + None + } + } + + pub fn storage(&self) -> Option> { + self.storage.clone() + } + + pub fn select(mut self, selection: &Selection) -> Result { + let manifest = self.sig_files.select_to_manifest(selection)?; + self.sig_files = manifest; + + Ok(self) + /* + # if we have a manifest, run 'select' on the manifest. + manifest = self.manifest + traverse_yield_all = self.traverse_yield_all + + if manifest is not None: + manifest = manifest.select_to_manifest(**kwargs) + return ZipFileLinearIndex(self.storage, + selection_dict=None, + traverse_yield_all=traverse_yield_all, + manifest=manifest, + use_manifest=True) + else: + # no manifest? just pass along all the selection kwargs to + # the new ZipFileLinearIndex. + + assert manifest is None + if self.selection_dict: + # combine selects... + d = dict(self.selection_dict) + for k, v in kwargs.items(): + if k in d: + if d[k] is not None and d[k] != v: + raise ValueError(f"incompatible select on '{k}'") + d[k] = v + kwargs = d + + return ZipFileLinearIndex(self.storage, + selection_dict=kwargs, + traverse_yield_all=traverse_yield_all, + manifest=None, + use_manifest=False) + */ + } + + pub fn counter_for_query(&self, query: &KmerMinHash) -> SigCounter { + let processed_sigs = AtomicUsize::new(0); + + // TODO: Some(ref_sigs) case + + let search_sigs: Vec<_> = self + .sig_files + .internal_locations() + .map(PathBuf::from) + .collect(); + + #[cfg(feature = "parallel")] + let sig_iter = search_sigs.par_iter(); + + #[cfg(not(feature = "parallel"))] + let sig_iter = search_sigs.iter(); + + let counters = sig_iter.enumerate().filter_map(|(dataset_id, filename)| { + let i = processed_sigs.fetch_add(1, Ordering::SeqCst); + if i % 1000 == 0 { + info!("Processed {} reference sigs", i); + } + + let search_sig = if let Some(storage) = &self.storage { + let sig_data = storage + .load( + filename + .to_str() + .unwrap_or_else(|| panic!("error converting path {:?}", filename)), + ) + .unwrap_or_else(|_| panic!("error loading {:?}", filename)); + + Signature::from_reader(sig_data.as_slice()) + } else { + Signature::from_path(filename) + } + .unwrap_or_else(|_| panic!("Error processing {:?}", filename)) + .swap_remove(0); + + let mut search_mh = None; + if let Some(Sketch::MinHash(mh)) = search_sig.select_sketch(&self.template) { + search_mh = Some(mh); + }; + let search_mh = search_mh.expect("Couldn't find a compatible MinHash"); + + let (large_mh, small_mh) = if query.size() > search_mh.size() { + (query, search_mh) + } else { + (search_mh, query) + }; + + let (size, _) = small_mh + .intersection_size(large_mh) + .unwrap_or_else(|_| panic!("error computing intersection for {:?}", filename)); + + if size == 0 { + None + } else { + let mut counter: SigCounter = Default::default(); + counter[&(dataset_id as u64)] += size as usize; + Some(counter) + } + }); + + let reduce_counters = |mut a: SigCounter, b: SigCounter| { + a.extend(&b); + a + }; + + #[cfg(feature = "parallel")] + let counter = counters.reduce(SigCounter::new, reduce_counters); + + #[cfg(not(feature = "parallel"))] + let counter = counters.fold(SigCounter::new(), reduce_counters); + + counter + } + + pub fn search( + &self, + counter: SigCounter, + similarity: bool, + threshold: usize, + ) -> Result, Box> { + let mut matches = vec![]; + if similarity { + unimplemented!("TODO: threshold correction") + } + + for (dataset_id, size) in counter.most_common() { + if size >= threshold { + matches.push( + self.sig_files[dataset_id as usize] + .internal_location() + .to_str() + .unwrap() + .into(), + ); + } else { + break; + }; + } + Ok(matches) + } + + fn gather_round( + &self, + dataset_id: u64, + match_size: usize, + query: &KmerMinHash, + round: usize, + ) -> Result { + let match_path = if self.sig_files.is_empty() { + PathBuf::new() + } else { + self.sig_files[dataset_id as usize].internal_location() + }; + let match_sig = self.sig_for_dataset(dataset_id as usize)?; + let result = self.stats_for_match(&match_sig, query, match_size, match_path, round)?; + Ok(result) + } + + fn sig_for_dataset(&self, dataset_id: usize) -> Result { + let match_path = if self.sig_files.is_empty() { + PathBuf::new() + } else { + self.sig_files[dataset_id].internal_location() + }; + + let match_sig = if let Some(refsigs) = &self.ref_sigs { + refsigs[dataset_id].clone() + } else { + let mut sig = if let Some(storage) = &self.storage { + let sig_data = storage + .load( + match_path + .to_str() + .unwrap_or_else(|| panic!("error converting path {:?}", match_path)), + ) + .unwrap_or_else(|_| panic!("error loading {:?}", match_path)); + Signature::from_reader(sig_data.as_slice())? + } else { + Signature::from_path(&match_path)? + }; + // TODO: remove swap_remove + sig.swap_remove(0).into() + }; + Ok(match_sig) + } + + fn stats_for_match( + &self, + match_sig: &Signature, + query: &KmerMinHash, + match_size: usize, + match_path: PathBuf, + gather_result_rank: usize, + ) -> Result { + let mut match_mh = None; + if let Some(Sketch::MinHash(mh)) = match_sig.select_sketch(&self.template) { + match_mh = Some(mh); + } + let match_mh = match_mh.expect("Couldn't find a compatible MinHash"); + + // Calculate stats + let f_orig_query = match_size as f64 / query.size() as f64; + let f_match = match_size as f64 / match_mh.size() as f64; + let filename = match_path.to_str().unwrap().into(); + let name = match_sig.name(); + let unique_intersect_bp = match_mh.scaled() as usize * match_size; + + let (intersect_orig, _) = match_mh.intersection_size(query)?; + let intersect_bp = (match_mh.scaled() * intersect_orig) as usize; + + let f_unique_to_query = intersect_orig as f64 / query.size() as f64; + let match_ = match_sig.clone(); + + // TODO: all of these + let f_unique_weighted = 0.; + let average_abund = 0; + let median_abund = 0; + let std_abund = 0; + let md5 = "".into(); + let f_match_orig = 0.; + let remaining_bp = 0; + + Ok(GatherResult { + intersect_bp, + f_orig_query, + f_match, + f_unique_to_query, + f_unique_weighted, + average_abund, + median_abund, + std_abund, + filename, + name, + md5, + match_, + f_match_orig, + unique_intersect_bp, + gather_result_rank, + remaining_bp, + }) + } + + pub fn gather( + &self, + mut counter: SigCounter, + threshold: usize, + query: &KmerMinHash, + ) -> Result, Box> { + let mut match_size = usize::max_value(); + let mut matches = vec![]; + + while match_size > threshold && !counter.is_empty() { + let (dataset_id, size) = counter.most_common()[0]; + if threshold == 0 && size == 0 { + break; + } + + match_size = if size >= threshold { + size + } else { + break; + }; + + let result = self.gather_round(dataset_id, match_size, query, matches.len())?; + + // Prepare counter for finding the next match by decrementing + // all hashes found in the current match in other datasets + // TODO: maybe par_iter? + let mut to_remove: HashSet = Default::default(); + to_remove.insert(dataset_id); + + for (dataset, value) in counter.iter_mut() { + let dataset_sig = self.sig_for_dataset(*dataset as usize)?; + let mut match_mh = None; + if let Some(Sketch::MinHash(mh)) = dataset_sig.select_sketch(&self.template) { + match_mh = Some(mh); + } + let match_mh = match_mh.expect("Couldn't find a compatible MinHash"); + + let (intersection, _) = query.intersection_size(match_mh)?; + if intersection as usize > *value { + to_remove.insert(*dataset); + } else { + *value -= intersection as usize; + }; + } + to_remove.iter().for_each(|dataset_id| { + counter.remove(dataset_id); + }); + matches.push(result); + } + Ok(matches) + } + + pub fn manifest(&self) -> Manifest { + self.sig_files.clone() + } + + pub fn set_manifest(&mut self, new_manifest: Manifest) -> Result<(), Error> { + self.sig_files = new_manifest; + Ok(()) + } + + pub fn signatures_iter(&self) -> impl Iterator + '_ { + if let Some(_sigs) = &self.ref_sigs { + //sigs.iter().cloned() + todo!("this works, but need to match return types") + } else { + // FIXME temp solution, must find better one! + (0..self.sig_files.len()) + .map(move |dataset_id| self.sig_for_dataset(dataset_id).expect("error loading sig")) + } + } +} + +impl<'a> Index<'a> for LinearRevIndex { + type Item = SigStore; + + fn insert(&mut self, _node: Self::Item) -> Result<(), Error> { + unimplemented!() + } + + fn save>(&self, _path: P) -> Result<(), Error> { + unimplemented!() + } + + fn load>(_path: P) -> Result<(), Error> { + unimplemented!() + } + + fn len(&self) -> usize { + if let Some(refs) = &self.ref_sigs { + refs.len() + } else { + self.sig_files.len() + } + } + + fn signatures(&self) -> Vec { + if let Some(ref sigs) = self.ref_sigs { + sigs.to_vec() + } else { + unimplemented!() + } + } + + fn signature_refs(&self) -> Vec<&Self::Item> { + unimplemented!() + } +} + +impl RevIndex { + pub fn load>( + index_path: P, + queries: Option<&[KmerMinHash]>, + ) -> Result> { + let (rdr, _) = niffler::from_path(index_path)?; + let revindex = if let Some(qs) = queries { + // TODO: avoid loading full revindex if query != None + /* + struct PartialRevIndex { + hashes_to_keep: Option>, + marker: PhantomData T>, + } + + impl PartialRevIndex { + pub fn new(hashes_to_keep: HashSet) -> Self { + PartialRevIndex { + hashes_to_keep: Some(hashes_to_keep), + marker: PhantomData, + } + } + } + */ + + let mut hashes: HashSet = HashSet::new(); + for q in qs { + hashes.extend(q.iter_mins()); + } + + //let mut revindex: RevIndex = PartialRevIndex::new(hashes).deserialize(&rdr).unwrap(); + + let mut revindex: RevIndex = serde_json::from_reader(rdr)?; + revindex.hash_to_color.retain(&hashes); + revindex + } else { + // Load the full revindex + serde_json::from_reader(rdr)? + }; + + Ok(revindex) + } + + pub fn new( + search_sigs: &[PathBuf], + template: &Sketch, + threshold: usize, + queries: Option<&[KmerMinHash]>, + keep_sigs: bool, + ) -> RevIndex { + // If threshold is zero, let's merge all queries and save time later + let merged_query = queries.and_then(|qs| Self::merge_queries(qs, threshold)); + + let linear = LinearRevIndex::new(Some(search_sigs.into()), template, keep_sigs, None, None); + linear.index(threshold, merged_query, queries) + } + + pub fn from_zipstorage( + storage: ZipStorage, + template: &Sketch, + threshold: usize, + queries: Option<&[KmerMinHash]>, + keep_sigs: bool, + ) -> Result { + // If threshold is zero, let's merge all queries and save time later + let merged_query = queries.and_then(|qs| Self::merge_queries(qs, threshold)); + + // Load manifest from zipstorage + let manifest = Manifest::from_reader(storage.load("SOURMASH-MANIFEST.csv")?.as_slice())?; + let search_sigs: Vec<_> = manifest.internal_locations().map(PathBuf::from).collect(); + + let linear = LinearRevIndex::new( + Some(search_sigs.as_slice().into()), + template, + keep_sigs, + None, + Some(storage), + ); + + Ok(linear.index(threshold, merged_query, queries)) + } + + fn merge_queries(qs: &[KmerMinHash], threshold: usize) -> Option { + if threshold == 0 { + let mut merged = qs[0].clone(); + for query in &qs[1..] { + merged.merge(query).unwrap(); + } + Some(merged) + } else { + None + } + } + + pub fn new_with_sigs( + search_sigs: Vec, + template: &Sketch, + threshold: usize, + queries: Option<&[KmerMinHash]>, + ) -> RevIndex { + // If threshold is zero, let's merge all queries and save time later + let merged_query = queries.and_then(|qs| Self::merge_queries(qs, threshold)); + + let linear = LinearRevIndex::new( + Default::default(), + template, + false, + search_sigs.into(), + None, + ); + + linear.index(threshold, merged_query, queries) + } + + fn map_hashes_colors( + dataset_id: usize, + search_sig: &Signature, + queries: Option<&[KmerMinHash]>, + merged_query: &Option, + threshold: usize, + template: &Sketch, + ) -> Option<(HashToColor, Colors)> { + let mut search_mh = None; + if let Some(Sketch::MinHash(mh)) = search_sig.select_sketch(template) { + search_mh = Some(mh); + } + + let search_mh = search_mh.expect("Couldn't find a compatible MinHash"); + let mut hash_to_color = HashToColor::new(); + let mut colors = Colors::default(); + + if let Some(qs) = queries { + if let Some(ref merged) = merged_query { + let (matched_hashes, intersection) = merged.intersection(search_mh).unwrap(); + if !matched_hashes.is_empty() || intersection > threshold as u64 { + hash_to_color.add_to(&mut colors, dataset_id, matched_hashes); + } + } else { + for query in qs { + let (matched_hashes, intersection) = query.intersection(search_mh).unwrap(); + if !matched_hashes.is_empty() || intersection > threshold as u64 { + hash_to_color.add_to(&mut colors, dataset_id, matched_hashes); + } + } + } + } else { + let matched = search_mh.mins(); + let size = matched.len() as u64; + if !matched.is_empty() || size > threshold as u64 { + hash_to_color.add_to(&mut colors, dataset_id, matched); + } + }; + + if hash_to_color.is_empty() { + None + } else { + Some((hash_to_color, colors)) + } + } + + pub fn counter_for_query(&self, query: &KmerMinHash) -> SigCounter { + query + .iter_mins() + .filter_map(|hash| self.hash_to_color.get(hash)) + .flat_map(|color| self.colors.indices(color)) + .cloned() + .collect() + } + + pub fn search( + &self, + counter: SigCounter, + similarity: bool, + threshold: usize, + ) -> Result, Box> { + self.linear.search(counter, similarity, threshold) + } + + pub fn gather( + &self, + mut counter: SigCounter, + threshold: usize, + query: &KmerMinHash, + ) -> Result, Box> { + let mut match_size = usize::max_value(); + let mut matches = vec![]; + + while match_size > threshold && !counter.is_empty() { + let (dataset_id, size) = counter.most_common()[0]; + match_size = if size >= threshold { size } else { break }; + let result = self + .linear + .gather_round(dataset_id, match_size, query, matches.len())?; + if let Some(Sketch::MinHash(match_mh)) = + result.match_.select_sketch(&self.linear.template) + { + // Prepare counter for finding the next match by decrementing + // all hashes found in the current match in other datasets + for hash in match_mh.iter_mins() { + if let Some(color) = self.hash_to_color.get(hash) { + counter.subtract(self.colors.indices(color).cloned()); + } + } + counter.remove(&dataset_id); + matches.push(result); + } else { + unimplemented!() + } + } + Ok(matches) + } + + pub fn template(&self) -> Sketch { + self.linear.template.clone() + } + + // TODO: mh should be a sketch, or even a sig... + pub(crate) fn find_signatures( + &self, + mh: &KmerMinHash, + threshold: f64, + containment: bool, + _ignore_scaled: bool, + ) -> Result, Error> { + /* + let template_mh = None; + if let Sketch::MinHash(mh) = self.template { + template_mh = Some(mh); + }; + // TODO: throw error + let template_mh = template_mh.unwrap(); + + let tmp_mh; + let mh = if template_mh.scaled() > mh.scaled() { + // TODO: proper error here + tmp_mh = mh.downsample_scaled(self.scaled)?; + &tmp_mh + } else { + mh + }; + + if self.scaled < mh.scaled() && !ignore_scaled { + return Err(LcaDBError::ScaledMismatchError { + db: self.scaled, + query: mh.scaled(), + } + .into()); + } + */ + + // TODO: proper threshold calculation + let threshold: usize = (threshold * (mh.size() as f64)) as _; + + let counter = self.counter_for_query(mh); + + debug!( + "number of matching signatures for hashes: {}", + counter.len() + ); + + let mut results = vec![]; + for (dataset_id, size) in counter.most_common() { + let match_size = if size >= threshold { size } else { break }; + + let match_path = if self.linear.sig_files.is_empty() { + PathBuf::new() + } else { + self.linear.sig_files[dataset_id as usize].internal_location() + }; + + let ref_match; + let match_sig = if let Some(refsigs) = &self.linear.ref_sigs { + &refsigs[dataset_id as usize] + } else { + let mut sig = if let Some(storage) = &self.linear.storage { + let sig_data = + storage + .load(match_path.to_str().unwrap_or_else(|| { + panic!("error converting path {:?}", match_path) + })) + .unwrap_or_else(|_| panic!("error loading {:?}", match_path)); + Signature::from_reader(sig_data.as_slice())? + } else { + Signature::from_path(&match_path)? + }; + // TODO: remove swap_remove + ref_match = sig.swap_remove(0); + &ref_match + }; + + let mut match_mh = None; + if let Some(Sketch::MinHash(mh)) = match_sig.select_sketch(&self.linear.template) { + match_mh = Some(mh); + } + let match_mh = match_mh.unwrap(); + + if size >= threshold { + let score = if containment { + size as f64 / mh.size() as f64 + } else { + size as f64 / (mh.size() + match_size - size) as f64 + }; + let filename = match_path.to_str().unwrap().into(); + let mut sig = match_sig.clone(); + sig.reset_sketches(); + sig.push(Sketch::MinHash(match_mh.clone())); + results.push((score, sig, filename)); + } else { + break; + }; + } + Ok(results) + } +} + +#[derive(TypedBuilder, CopyGetters, Getters, Setters, Serialize, Deserialize, Debug, PartialEq)] +pub struct GatherResult { + #[getset(get_copy = "pub")] + intersect_bp: usize, + + #[getset(get_copy = "pub")] + f_orig_query: f64, + + #[getset(get_copy = "pub")] + f_match: f64, + + f_unique_to_query: f64, + f_unique_weighted: f64, + average_abund: usize, + median_abund: usize, + std_abund: usize, + + #[getset(get = "pub")] + filename: String, + + #[getset(get = "pub")] + name: String, + + #[getset(get = "pub")] + md5: String, + match_: Signature, + f_match_orig: f64, + unique_intersect_bp: usize, + gather_result_rank: usize, + remaining_bp: usize, +} + +impl GatherResult { + pub fn get_match(&self) -> Signature { + self.match_.clone() + } +} + +impl<'a> Index<'a> for RevIndex { + type Item = Signature; + + fn insert(&mut self, _node: Self::Item) -> Result<(), Error> { + unimplemented!() + } + + fn save>(&self, _path: P) -> Result<(), Error> { + unimplemented!() + } + + fn load>(_path: P) -> Result<(), Error> { + unimplemented!() + } + + fn len(&self) -> usize { + if let Some(refs) = &self.linear.ref_sigs { + refs.len() + } else { + self.linear.sig_files.len() + } + } + + fn signatures(&self) -> Vec { + if let Some(ref sigs) = self.linear.ref_sigs { + sigs.iter().map(|s| s.clone().into()).collect() + } else { + unimplemented!() + } + } + + fn signature_refs(&self) -> Vec<&Self::Item> { + unimplemented!() + } +} + +#[cfg(test)] +mod test { + use super::*; + + use crate::sketch::minhash::max_hash_for_scaled; + + #[test] + fn revindex_new() { + let max_hash = max_hash_for_scaled(10000); + let template = Sketch::MinHash( + KmerMinHash::builder() + .num(0u32) + .ksize(31) + .max_hash(max_hash) + .build(), + ); + let search_sigs = [ + "../../tests/test-data/gather/GCF_000006945.2_ASM694v2_genomic.fna.gz.sig".into(), + "../../tests/test-data/gather/GCF_000007545.1_ASM754v1_genomic.fna.gz.sig".into(), + ]; + let index = RevIndex::new(&search_sigs, &template, 0, None, false); + assert_eq!(index.colors.len(), 3); + } + + #[test] + fn revindex_many() { + let max_hash = max_hash_for_scaled(10000); + let template = Sketch::MinHash( + KmerMinHash::builder() + .num(0u32) + .ksize(31) + .max_hash(max_hash) + .build(), + ); + let search_sigs = [ + "../../tests/test-data/gather/GCF_000006945.2_ASM694v2_genomic.fna.gz.sig".into(), + "../../tests/test-data/gather/GCF_000007545.1_ASM754v1_genomic.fna.gz.sig".into(), + "../../tests/test-data/gather/GCF_000008105.1_ASM810v1_genomic.fna.gz.sig".into(), + ]; + + let index = RevIndex::new(&search_sigs, &template, 0, None, false); + /* + dbg!(&index.colors.colors); + 0: 86 + 1: 132 + 2: 91 + (0, 1): 53 + (0, 2): 90 + (1, 2): 26 + (0, 1, 2): 261 + union: 739 + */ + //assert_eq!(index.colors.len(), 3); + assert_eq!(index.colors.len(), 7); + } + + #[test] + fn revindex_from_zipstorage() { + let max_hash = max_hash_for_scaled(100); + let template = Sketch::MinHash( + KmerMinHash::builder() + .num(0u32) + .ksize(57) + .hash_function(crate::encodings::HashFunctions::murmur64_protein) + .max_hash(max_hash) + .build(), + ); + let storage = ZipStorage::from_file("../../tests/test-data/prot/protein.zip") + .expect("error loading zipfile"); + let index = RevIndex::from_zipstorage(storage, &template, 0, None, false) + .expect("error building from ziptorage"); + + assert_eq!(index.colors.len(), 3); + + let query_sig = Signature::from_path( + "../../tests/test-data/prot/protein/GCA_001593925.1_ASM159392v1_protein.faa.gz.sig", + ) + .expect("Error processing query") + .swap_remove(0); + let mut query_mh = None; + if let Some(Sketch::MinHash(mh)) = query_sig.select_sketch(&template) { + query_mh = Some(mh); + } + let query_mh = query_mh.expect("Couldn't find a compatible MinHash"); + + let counter_rev = index.counter_for_query(query_mh); + let counter_lin = index.linear.counter_for_query(query_mh); + + let results_rev = index.search(counter_rev, false, 0).unwrap(); + let results_linear = index.linear.search(counter_lin, false, 0).unwrap(); + assert_eq!(results_rev, results_linear); + + let counter_rev = index.counter_for_query(query_mh); + let counter_lin = index.linear.counter_for_query(query_mh); + + let results_rev = index.gather(counter_rev, 0, query_mh).unwrap(); + let results_linear = index.linear.gather(counter_lin, 0, query_mh).unwrap(); + assert_eq!(results_rev.len(), 1); + assert_eq!(results_rev, results_linear); + } +} diff --git a/src/core/src/index/revindex/mod.rs b/src/core/src/index/revindex/mod.rs new file mode 100644 index 0000000000..314344620d --- /dev/null +++ b/src/core/src/index/revindex/mod.rs @@ -0,0 +1,509 @@ +pub mod disk_revindex; +pub mod mem_revindex; + +use std::collections::HashMap; +use std::hash::{Hash, Hasher}; +use std::path::{Path, PathBuf}; +use std::sync::Arc; + +use byteorder::{LittleEndian, WriteBytesExt}; +use rkyv::{Archive, Deserialize, Serialize}; +use roaring::RoaringTreemap; + +use crate::index::revindex::mem_revindex::GatherResult; +use crate::signature::{Signature, SigsTrait}; +use crate::sketch::minhash::{max_hash_for_scaled, KmerMinHash}; +use crate::sketch::Sketch; + +use crate::encodings::Color; + +//type DB = rocksdb::DBWithThreadMode; +type DB = rocksdb::DBWithThreadMode; + +type DatasetID = u64; +type SigCounter = counter::Counter; +type QueryColors = HashMap; +type HashToColor = HashMap; + +const HASHES: &str = "hashes"; +const SIGS: &str = "signatures"; +const COLORS: &str = "colors"; + +pub enum RevIndex { + //Color(color_revindex::ColorRevIndex), + Plain(disk_revindex::RevIndex), +} + +impl RevIndex { + /* TODO: need the repair_cf variant, not available in rocksdb-rust yet + pub fn repair(index: &Path, colors: bool) { + if colors { + color_revindex::repair(index); + } else { + disk_revindex::repair(index); + } + } + */ + + pub fn counter_for_query(&self, query: &KmerMinHash) -> SigCounter { + match self { + //Self::Color(db) => db.counter_for_query(query), + Self::Plain(db) => db.counter_for_query(query), + } + } + + pub fn matches_from_counter( + &self, + counter: SigCounter, + threshold: usize, + ) -> Vec<(String, usize)> { + match self { + //Self::Color(db) => todo!(), //db.matches_from_counter(counter, threshold), + Self::Plain(db) => db.matches_from_counter(counter, threshold), + } + } + + pub fn prepare_gather_counters( + &self, + query: &KmerMinHash, + ) -> (SigCounter, QueryColors, HashToColor) { + match self { + //Self::Color(_db) => todo!(), //db.prepare_gather_counters(query), + Self::Plain(db) => db.prepare_gather_counters(query), + } + } + + pub fn index( + &self, + index_sigs: Vec, + template: &Sketch, + threshold: f64, + save_paths: bool, + ) { + match self { + //Self::Color(db) => db.index(index_sigs, template, threshold, save_paths), + Self::Plain(db) => db.index(index_sigs, template, threshold, save_paths), + } + } + + pub fn update( + &self, + index_sigs: Vec, + template: &Sketch, + threshold: f64, + save_paths: bool, + ) { + match self { + //Self::Color(db) => db.update(index_sigs, template, threshold, save_paths), + Self::Plain(db) => db.update(index_sigs, template, threshold, save_paths), + } + } + + pub fn compact(&self) { + match self { + //Self::Color(db) => db.compact(), + Self::Plain(db) => db.compact(), + }; + } + + pub fn flush(&self) -> Result<(), Box> { + match self { + //Self::Color(db) => db.flush(), + Self::Plain(db) => db.flush(), + } + } + + pub fn convert(&self, output_db: RevIndex) -> Result<(), Box> { + match self { + //Self::Color(_db) => todo!(), + Self::Plain(db) => db.convert(output_db), + } + } + + pub fn check(&self, quick: bool) { + match self { + //Self::Color(db) => db.check(quick), + Self::Plain(db) => db.check(quick), + } + } + + pub fn create(index: &Path, colors: bool) -> Self { + if colors { + todo!() //color_revindex::ColorRevIndex::create(index) + } else { + disk_revindex::RevIndex::create(index) + } + } + + pub fn open(index: &Path, read_only: bool) -> Self { + let opts = Self::db_options(); + let cfs = DB::list_cf(&opts, index).unwrap(); + + if cfs.into_iter().any(|c| c == COLORS) { + // TODO: ColorRevIndex can't be read-only for now, + // due to pending unmerged colors + todo!() //color_revindex::ColorRevIndex::open(index, false) + } else { + disk_revindex::RevIndex::open(index, read_only) + } + } + + fn db_options() -> rocksdb::Options { + let mut opts = rocksdb::Options::default(); + opts.set_max_open_files(500); + + // Updated defaults from + // https://github.com/facebook/rocksdb/wiki/Setup-Options-and-Basic-Tuning#other-general-options + opts.set_bytes_per_sync(1048576); + let mut block_opts = rocksdb::BlockBasedOptions::default(); + block_opts.set_block_size(16 * 1024); + block_opts.set_cache_index_and_filter_blocks(true); + block_opts.set_pin_l0_filter_and_index_blocks_in_cache(true); + block_opts.set_format_version(5); + opts.set_block_based_table_factory(&block_opts); + // End of updated defaults + + opts + } + + pub fn gather( + &self, + counter: SigCounter, + query_colors: QueryColors, + hash_to_color: HashToColor, + threshold: usize, + query: &KmerMinHash, + template: &Sketch, + ) -> Result, Box> { + match self { + //Self::Color(_db) => todo!(), + Self::Plain(db) => db.gather( + counter, + query_colors, + hash_to_color, + threshold, + query, + template, + ), + } + } +} + +#[derive(Debug, Default, PartialEq, Clone, Archive, Serialize, Deserialize)] +enum SignatureData { + #[default] + Empty, + Internal(Signature), + External(String), +} + +impl SignatureData { + fn from_slice(slice: &[u8]) -> Option { + // TODO: avoid the aligned vec allocation here + let mut vec = rkyv::AlignedVec::new(); + vec.extend_from_slice(slice); + let archived_value = unsafe { rkyv::archived_root::(vec.as_ref()) }; + let inner = archived_value.deserialize(&mut rkyv::Infallible).unwrap(); + Some(inner) + } + + fn as_bytes(&self) -> Option> { + let bytes = rkyv::to_bytes::<_, 256>(self).unwrap(); + Some(bytes.into_vec()) + + /* + let mut serializer = DefaultSerializer::default(); + let v = serializer.serialize_value(self).unwrap(); + debug_assert_eq!(v, 0); + let buf = serializer.into_serializer().into_inner(); + debug_assert!(Datasets::from_slice(&buf.to_vec()).is_some()); + Some(buf.to_vec()) + */ + } +} + +fn check_compatible_downsample(me: &KmerMinHash, other: &KmerMinHash) -> Result<(), crate::Error> { + /* + if self.num != other.num { + return Err(Error::MismatchNum { + n1: self.num, + n2: other.num, + } + .into()); + } + */ + use crate::Error; + + if me.ksize() != other.ksize() { + return Err(Error::MismatchKSizes); + } + if me.hash_function() != other.hash_function() { + // TODO: fix this error + return Err(Error::MismatchDNAProt); + } + if me.max_hash() < other.max_hash() { + return Err(Error::MismatchScaled); + } + if me.seed() != other.seed() { + return Err(Error::MismatchSeed); + } + Ok(()) +} + +pub fn prepare_query(search_sig: &Signature, template: &Sketch) -> Option { + let mut search_mh = None; + if let Some(Sketch::MinHash(mh)) = search_sig.select_sketch(template) { + search_mh = Some(mh.clone()); + } else { + // try to find one that can be downsampled + if let Sketch::MinHash(template_mh) = template { + for sketch in search_sig.sketches() { + if let Sketch::MinHash(ref_mh) = sketch { + if check_compatible_downsample(&ref_mh, template_mh).is_ok() { + let max_hash = max_hash_for_scaled(template_mh.scaled()); + let mh = ref_mh.downsample_max_hash(max_hash).unwrap(); + search_mh = Some(mh); + } + } + } + } + } + search_mh +} + +#[derive(Debug, Default, PartialEq, Clone)] +pub enum Datasets { + #[default] + Empty, + Unique(DatasetID), + Many(RoaringTreemap), +} + +impl Hash for Datasets { + fn hash(&self, state: &mut H) + where + H: Hasher, + { + match self { + Self::Empty => todo!(), + Self::Unique(v) => v.hash(state), + Self::Many(_) => todo!(), + } + } +} + +impl IntoIterator for Datasets { + type Item = DatasetID; + type IntoIter = Box>; + + fn into_iter(self) -> Self::IntoIter { + match self { + Self::Empty => Box::new(std::iter::empty()), + Self::Unique(v) => Box::new(std::iter::once(v)), + Self::Many(v) => Box::new(v.into_iter()), + } + } +} + +impl Extend for Datasets { + fn extend(&mut self, iter: T) + where + T: IntoIterator, + { + if let Self::Many(v) = self { + v.extend(iter); + return; + } + + let mut it = iter.into_iter(); + while let Some(value) = it.next() { + match self { + Self::Empty => *self = Datasets::Unique(value), + Self::Unique(v) => { + if *v != value { + *self = Self::Many([*v, value].iter().copied().collect()); + } + } + Self::Many(v) => { + v.extend(it); + return; + } + } + } + } +} + +impl Datasets { + fn new(vals: &[DatasetID]) -> Self { + if vals.is_empty() { + Self::Empty + } else if vals.len() == 1 { + Self::Unique(vals[0]) + } else { + Self::Many(RoaringTreemap::from_sorted_iter(vals.iter().copied()).unwrap()) + } + } + + fn from_slice(slice: &[u8]) -> Option { + use byteorder::ReadBytesExt; + + if slice.len() == 8 { + // Unique + Some(Self::Unique( + (&slice[..]).read_u64::().unwrap(), + )) + } else if slice.len() == 1 { + // Empty + Some(Self::Empty) + } else { + // Many + Some(Self::Many(RoaringTreemap::deserialize_from(slice).unwrap())) + } + } + + fn as_bytes(&self) -> Option> { + use byteorder::WriteBytesExt; + + match self { + Self::Empty => Some(vec![42_u8]), + Self::Unique(v) => { + let mut buf = vec![0u8; 8]; + (&mut buf[..]) + .write_u64::(*v) + .expect("error writing bytes"); + Some(buf) + } + Self::Many(v) => { + let mut buf = vec![]; + v.serialize_into(&mut buf).unwrap(); + Some(buf) + } + } + } + + fn union(&mut self, other: Datasets) { + match self { + Datasets::Empty => match other { + Datasets::Empty => (), + Datasets::Unique(_) | Datasets::Many(_) => *self = other, + }, + Datasets::Unique(v) => match other { + Datasets::Empty => (), + Datasets::Unique(o) => { + if *v != o { + *self = Datasets::Many([*v, o].iter().copied().collect()) + } + } + Datasets::Many(mut o) => { + o.extend([*v]); + *self = Datasets::Many(o); + } + }, + Datasets::Many(ref mut v) => v.extend(other), + } + } + + fn len(&self) -> usize { + match self { + Self::Empty => 0, + Self::Unique(_) => 1, + Self::Many(ref v) => v.len() as usize, + } + } + + /* + fn contains(&self, value: &DatasetID) -> bool { + match self { + Self::Empty => false, + Self::Unique(v) => v == value, + Self::Many(ref v) => v.contains(*value), + } + } + */ +} + +fn sig_save_to_db( + db: Arc, + mut search_sig: Signature, + search_mh: KmerMinHash, + size: u64, + threshold: f64, + save_paths: bool, + filename: &Path, + dataset_id: u64, +) { + // Save signature to DB + let sig = if search_mh.is_empty() || size < threshold as u64 { + SignatureData::Empty + } else if save_paths { + SignatureData::External(filename.to_str().unwrap().to_string()) + } else { + search_sig.reset_sketches(); + search_sig.push(Sketch::MinHash(search_mh)); + SignatureData::Internal(search_sig) + }; + + let sig_bytes = sig.as_bytes().unwrap(); + let cf_sigs = db.cf_handle(SIGS).unwrap(); + let mut hash_bytes = [0u8; 8]; + (&mut hash_bytes[..]) + .write_u64::(dataset_id) + .expect("error writing bytes"); + db.put_cf(&cf_sigs, &hash_bytes[..], sig_bytes.as_slice()) + .expect("error saving sig"); +} + +fn stats_for_cf(db: Arc, cf_name: &str, deep_check: bool, quick: bool) { + use byteorder::ReadBytesExt; + use histogram::Histogram; + use log::info; + use numsep::{separate, Locale}; + + let cf = db.cf_handle(cf_name).unwrap(); + + let iter = db.iterator_cf(&cf, rocksdb::IteratorMode::Start); + let mut kcount = 0; + let mut vcount = 0; + let mut vcounts = Histogram::new(); + let mut datasets: Datasets = Default::default(); + + for result in iter { + let (key, value) = result.unwrap(); + let _k = (&key[..]).read_u64::().unwrap(); + kcount += key.len(); + + //println!("Saw {} {:?}", k, Datasets::from_slice(&value)); + vcount += value.len(); + + if !quick && deep_check { + let v = Datasets::from_slice(&value).expect("Error with value"); + vcounts.increment(v.len() as u64).unwrap(); + datasets.union(v); + } + //println!("Saw {} {:?}", k, value); + } + + info!("*** {} ***", cf_name); + use size::Size; + let ksize = Size::from_bytes(kcount); + let vsize = Size::from_bytes(vcount); + if !quick && cf_name == COLORS { + info!( + "total datasets: {}", + separate(datasets.len(), Locale::English) + ); + } + info!("total keys: {}", separate(kcount / 8, Locale::English)); + + info!("k: {}", ksize.to_string()); + info!("v: {}", vsize.to_string()); + + if !quick && kcount > 0 && deep_check { + info!("max v: {}", vcounts.maximum().unwrap()); + info!("mean v: {}", vcounts.mean().unwrap()); + info!("stddev: {}", vcounts.stddev().unwrap()); + info!("median v: {}", vcounts.percentile(50.0).unwrap()); + info!("p25 v: {}", vcounts.percentile(25.0).unwrap()); + info!("p75 v: {}", vcounts.percentile(75.0).unwrap()); + } +} diff --git a/src/core/src/lib.rs b/src/core/src/lib.rs index 66de82e6a0..afbd3d9f7f 100644 --- a/src/core/src/lib.rs +++ b/src/core/src/lib.rs @@ -45,6 +45,8 @@ cfg_if! { } else { pub mod ffi; pub mod index; + pub mod manifest; + pub mod picklist; } } diff --git a/src/core/src/manifest.rs b/src/core/src/manifest.rs new file mode 100644 index 0000000000..ce740c638b --- /dev/null +++ b/src/core/src/manifest.rs @@ -0,0 +1,186 @@ +use std::convert::TryInto; +use std::io::Read; +use std::ops::Deref; +use std::path::PathBuf; + +use serde::de; +use serde::{Deserialize, Serialize}; + +use crate::encodings::HashFunctions; +use crate::index::Selection; +use crate::Error; + +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct Record { + internal_location: String, + ksize: u32, + + #[serde(deserialize_with = "to_bool")] + with_abundance: bool, + + md5: String, + name: String, + moltype: String, + /* + md5short: String, + num: String, + scaled: String, + n_hashes: String, + filename: String, + */ +} + +fn to_bool<'de, D>(deserializer: D) -> Result +where + D: de::Deserializer<'de>, +{ + match String::deserialize(deserializer)? + .to_ascii_lowercase() + .as_ref() + { + "0" | "false" => Ok(false), + "1" | "true" => Ok(true), + other => Err(de::Error::invalid_value( + de::Unexpected::Str(other), + &"0/1 or true/false are the only supported values", + )), + } +} + +#[derive(Debug, Default, Serialize, Deserialize, Clone)] +pub struct Manifest { + records: Vec, +} + +impl Record { + pub fn internal_location(&self) -> PathBuf { + self.internal_location.clone().into() + } + + pub fn ksize(&self) -> u32 { + self.ksize + } + + pub fn with_abundance(&self) -> bool { + self.with_abundance + } + + pub fn md5(&self) -> &str { + self.md5.as_ref() + } + + pub fn name(&self) -> &str { + self.name.as_ref() + } + + pub fn moltype(&self) -> HashFunctions { + self.moltype.as_str().try_into().unwrap() + } +} + +impl Manifest { + pub fn from_reader(rdr: R) -> Result { + let mut records = vec![]; + + let mut rdr = csv::ReaderBuilder::new() + .comment(Some(b'#')) + .from_reader(rdr); + for result in rdr.deserialize() { + let record: Record = result?; + records.push(record); + } + Ok(Manifest { records }) + } + + pub fn internal_locations(&self) -> impl Iterator { + self.records.iter().map(|r| r.internal_location.as_str()) + } + + pub fn iter(&self) -> impl Iterator { + self.records.iter() + } + + pub fn select_to_manifest(&self, selection: &Selection) -> Result { + let rows = self.records.iter().filter(|row| { + let mut valid = true; + valid = if let Some(ksize) = selection.ksize() { + row.ksize == ksize + } else { + valid + }; + valid = if let Some(abund) = selection.abund() { + valid && row.with_abundance() == abund + } else { + valid + }; + valid = if let Some(moltype) = selection.moltype() { + valid && row.moltype() == moltype + } else { + valid + }; + valid + }); + + Ok(Manifest { + records: rows.cloned().collect(), + }) + + /* + matching_rows = self.rows + if ksize: + matching_rows = ( row for row in matching_rows + if row['ksize'] == ksize ) + if moltype: + matching_rows = ( row for row in matching_rows + if row['moltype'] == moltype ) + if scaled or containment: + if containment and not scaled: + raise ValueError("'containment' requires 'scaled' in Index.select'") + + matching_rows = ( row for row in matching_rows + if row['scaled'] and not row['num'] ) + if num: + matching_rows = ( row for row in matching_rows + if row['num'] and not row['scaled'] ) + + if abund: + # only need to concern ourselves if abundance is _required_ + matching_rows = ( row for row in matching_rows + if row['with_abundance'] ) + + if picklist: + matching_rows = ( row for row in matching_rows + if picklist.matches_manifest_row(row) ) + + # return only the internal filenames! + for row in matching_rows: + yield row + */ + } +} + +impl From<&[PathBuf]> for Manifest { + fn from(v: &[PathBuf]) -> Self { + Manifest { + records: v + .iter() + .map(|p| Record { + internal_location: p.to_str().unwrap().into(), + ksize: 0, // FIXME + with_abundance: false, // FIXME + md5: "".into(), // FIXME + name: "".into(), // FIXME + moltype: "".into(), // FIXME + }) + .collect(), + } + } +} + +impl Deref for Manifest { + type Target = Vec; + + fn deref(&self) -> &Self::Target { + &self.records + } +} diff --git a/src/core/src/picklist.rs b/src/core/src/picklist.rs new file mode 100644 index 0000000000..ddb3183d14 --- /dev/null +++ b/src/core/src/picklist.rs @@ -0,0 +1,29 @@ +use getset::{CopyGetters, Getters, Setters}; +use typed_builder::TypedBuilder; + +#[derive(Default, TypedBuilder, CopyGetters, Getters, Setters, Clone)] +pub struct Picklist { + #[getset(get = "pub", set = "pub")] + #[builder(default = "".into())] + coltype: String, + + #[getset(get = "pub", set = "pub")] + #[builder(default = "".into())] + pickfile: String, + + #[getset(get = "pub", set = "pub")] + #[builder(default = "".into())] + column_name: String, + + #[getset(get = "pub", set = "pub")] + #[builder] + pickstyle: PickStyle, +} + +#[derive(Clone, Default)] +#[repr(u32)] +pub enum PickStyle { + #[default] + Include = 1, + Exclude = 2, +} diff --git a/src/core/src/signature.rs b/src/core/src/signature.rs index db2a85ea05..290b76788d 100644 --- a/src/core/src/signature.rs +++ b/src/core/src/signature.rs @@ -2,6 +2,8 @@ //! //! A signature is a collection of sketches for a genomic dataset. +use core::iter::FusedIterator; + use std::fs::File; use std::io; use std::iter::Iterator; @@ -20,6 +22,8 @@ use crate::sketch::Sketch; use crate::Error; use crate::HashIntoType; +// TODO: this is the behavior expected from Sketch, but that name is already +// used. Sketchable? pub trait SigsTrait { fn size(&self) -> usize; fn to_vec(&self) -> Vec; @@ -395,6 +399,10 @@ impl Iterator for SeqToHashes { } #[derive(Serialize, Deserialize, Debug, Clone, TypedBuilder)] +#[cfg_attr( + feature = "rkyv", + derive(rkyv::Serialize, rkyv::Deserialize, rkyv::Archive) +)] pub struct Signature { #[serde(default = "default_class")] #[builder(default = default_class())] @@ -654,6 +662,92 @@ impl Signature { Ok(()) } + + pub fn iter_mut(&mut self) -> IterMut<'_> { + let length = self.signatures.len(); + IterMut { + iter: self.signatures.iter_mut(), + length, + } + } + + pub fn iter(&mut self) -> Iter<'_> { + let length = self.signatures.len(); + Iter { + iter: self.signatures.iter(), + length, + } + } +} + +pub struct IterMut<'a> { + iter: std::slice::IterMut<'a, Sketch>, + length: usize, +} + +impl<'a> IntoIterator for &'a mut Signature { + type Item = &'a mut Sketch; + type IntoIter = IterMut<'a>; + + fn into_iter(self) -> IterMut<'a> { + self.iter_mut() + } +} + +impl<'a> Iterator for IterMut<'a> { + type Item = &'a mut Sketch; + + fn next(&mut self) -> Option<&'a mut Sketch> { + if self.length == 0 { + None + } else { + self.length -= 1; + self.iter.next() + } + } + + fn size_hint(&self) -> (usize, Option) { + (self.length, Some(self.length)) + } +} + +pub struct Iter<'a> { + iter: std::slice::Iter<'a, Sketch>, + length: usize, +} + +impl<'a> Iterator for Iter<'a> { + type Item = &'a Sketch; + + fn next(&mut self) -> Option<&'a Sketch> { + if self.length == 0 { + None + } else { + self.length -= 1; + self.iter.next() + } + } + + fn size_hint(&self) -> (usize, Option) { + (self.length, Some(self.length)) + } +} + +impl FusedIterator for Iter<'_> {} + +impl ExactSizeIterator for Iter<'_> { + fn len(&self) -> usize { + self.length + } +} + +impl Clone for Iter<'_> { + fn clone(&self) -> Self { + Iter { + iter: self.iter.clone(), + length: self.length, + } + } } impl ToWriter for Signature { diff --git a/src/core/src/sketch/hyperloglog/mod.rs b/src/core/src/sketch/hyperloglog/mod.rs index 409d2a2c44..d575d50f70 100644 --- a/src/core/src/sketch/hyperloglog/mod.rs +++ b/src/core/src/sketch/hyperloglog/mod.rs @@ -26,6 +26,10 @@ pub mod estimators; use estimators::CounterType; #[derive(Debug, Default, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[cfg_attr( + feature = "rkyv", + derive(rkyv::Serialize, rkyv::Deserialize, rkyv::Archive) +)] pub struct HyperLogLog { registers: Vec, p: usize, diff --git a/src/core/src/sketch/minhash.rs b/src/core/src/sketch/minhash.rs index 5c5f1114f8..454a50f7a7 100644 --- a/src/core/src/sketch/minhash.rs +++ b/src/core/src/sketch/minhash.rs @@ -33,6 +33,10 @@ pub fn scaled_for_max_hash(max_hash: u64) -> u64 { } #[derive(Debug, TypedBuilder)] +#[cfg_attr( + feature = "rkyv", + derive(rkyv::Serialize, rkyv::Deserialize, rkyv::Archive) +)] pub struct KmerMinHash { num: u32, ksize: u32, @@ -53,6 +57,8 @@ pub struct KmerMinHash { abunds: Option>, #[builder(default)] + //#[cfg_attr(feature = "rkyv", with(rkyv::with::Lock))] + #[cfg_attr(feature = "rkyv", with(rkyv::with::Skip))] md5sum: Mutex>, } @@ -927,6 +933,10 @@ mod test { // A MinHash implementation for low scaled or large cardinalities #[derive(Debug, TypedBuilder)] +#[cfg_attr( + feature = "rkyv", + derive(rkyv::Serialize, rkyv::Deserialize, rkyv::Archive) +)] pub struct KmerMinHashBTree { num: u32, ksize: u32, @@ -950,6 +960,8 @@ pub struct KmerMinHashBTree { current_max: u64, #[builder(default)] + //#[cfg_attr(feature = "rkyv", with(rkyv::with::Lock))] + #[cfg_attr(feature = "rkyv", with(rkyv::with::Skip))] md5sum: Mutex>, } diff --git a/src/core/src/sketch/mod.rs b/src/core/src/sketch/mod.rs index 09bd51085c..3ef04e43df 100644 --- a/src/core/src/sketch/mod.rs +++ b/src/core/src/sketch/mod.rs @@ -10,6 +10,10 @@ use crate::sketch::minhash::{KmerMinHash, KmerMinHashBTree}; #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(untagged)] +#[cfg_attr( + feature = "rkyv", + derive(rkyv::Serialize, rkyv::Deserialize, rkyv::Archive) +)] pub enum Sketch { MinHash(KmerMinHash), LargeMinHash(KmerMinHashBTree), diff --git a/src/core/src/sketch/nodegraph.rs b/src/core/src/sketch/nodegraph.rs index cbca8915ba..bbfef5cd0d 100644 --- a/src/core/src/sketch/nodegraph.rs +++ b/src/core/src/sketch/nodegraph.rs @@ -7,7 +7,7 @@ use byteorder::{BigEndian, ByteOrder, LittleEndian, ReadBytesExt, WriteBytesExt} use fixedbitset::FixedBitSet; use crate::prelude::*; -use crate::sketch::minhash::KmerMinHash; +use crate::sketch::minhash::{KmerMinHash, KmerMinHashBTree}; use crate::Error; use crate::HashIntoType; @@ -58,6 +58,15 @@ impl Update for KmerMinHash { } } +impl Update for KmerMinHashBTree { + fn update(&self, other: &mut Nodegraph) -> Result<(), Error> { + for h in self.mins() { + other.count(h); + } + Ok(()) + } +} + impl Nodegraph { pub fn new(tablesizes: &[usize], ksize: usize) -> Nodegraph { let mut bs = Vec::with_capacity(tablesizes.len()); diff --git a/src/core/src/storage.rs b/src/core/src/storage.rs index f4f942d330..08a990d687 100644 --- a/src/core/src/storage.rs +++ b/src/core/src/storage.rs @@ -3,8 +3,7 @@ use std::ffi::OsStr; use std::fs::{DirBuilder, File}; use std::io::{BufReader, BufWriter, Read, Write}; use std::path::{Path, PathBuf}; -use std::rc::Rc; -use std::sync::RwLock; +use std::sync::{Arc, RwLock}; use serde::{Deserialize, Serialize}; use thiserror::Error; @@ -25,11 +24,11 @@ pub trait Storage { } #[derive(Clone)] -pub struct InnerStorage(Rc>); +pub struct InnerStorage(Arc>); impl InnerStorage { - pub fn new(inner: impl Storage + 'static) -> InnerStorage { - InnerStorage(Rc::new(RwLock::new(inner))) + pub fn new(inner: impl Storage + Send + Sync + 'static) -> InnerStorage { + InnerStorage(Arc::new(RwLock::new(inner))) } } diff --git a/src/core/tests/minhash.rs b/src/core/tests/minhash.rs index bcb3fdb4fa..50c03870e0 100644 --- a/src/core/tests/minhash.rs +++ b/src/core/tests/minhash.rs @@ -6,6 +6,7 @@ use proptest::collection::vec; use proptest::num::u64; use proptest::proptest; use sourmash::encodings::HashFunctions; +use sourmash::prelude::*; use sourmash::signature::SeqToHashes; use sourmash::signature::{Signature, SigsTrait}; use sourmash::sketch::minhash::{ diff --git a/src/core/tests/storage.rs b/src/core/tests/storage.rs index 5a60e02fcc..a27fa27b14 100644 --- a/src/core/tests/storage.rs +++ b/src/core/tests/storage.rs @@ -42,3 +42,41 @@ fn zipstorage_list_sbts() -> Result<(), Box> { Ok(()) } + +#[cfg(feature = "parallel")] +#[test] +fn zipstorage_parallel_access() -> Result<(), Box> { + use std::io::BufReader; + + use rayon::prelude::*; + use sourmash::signature::{Signature, SigsTrait}; + use sourmash::sketch::minhash::KmerMinHash; + + let mut filename = PathBuf::from(env!("CARGO_MANIFEST_DIR")); + filename.push("../../tests/test-data/v6.sbt.zip"); + + let zs = ZipStorage::from_file(filename.to_str().unwrap())?; + + let total_hashes: usize = [ + ".sbt.v3/f71e78178af9e45e6f1d87a0c53c465c", + ".sbt.v3/f0c834bc306651d2b9321fb21d3e8d8f", + ".sbt.v3/4e94e60265e04f0763142e20b52c0da1", + ".sbt.v3/6d6e87e1154e95b279e5e7db414bc37b", + ".sbt.v3/0107d767a345eff67ecdaed2ee5cd7ba", + ".sbt.v3/b59473c94ff2889eca5d7165936e64b3", + ".sbt.v3/60f7e23c24a8d94791cc7a8680c493f9", + ] + .par_iter() + .map(|path| { + let data = zs.load(path).unwrap(); + let sigs: Vec = serde_json::from_reader(&data[..]).expect("Loading error"); + sigs.iter() + .map(|v| v.sketches().iter().map(|mh| mh.size()).sum::()) + .sum::() + }) + .sum(); + + assert_eq!(total_hashes, 3500); + + Ok(()) +} diff --git a/src/sourmash/sbt_storage.py b/src/sourmash/sbt_storage.py index a22e782d69..42a4fceaa6 100644 --- a/src/sourmash/sbt_storage.py +++ b/src/sourmash/sbt_storage.py @@ -130,7 +130,7 @@ def subdir(self, value): self._methodcall(lib.zipstorage_set_subdir, to_bytes(value), len(value)) def _filenames(self): - if self.__inner: + if not self._objptr: return self.__inner._filenames() size = ffi.new("uintptr_t *") @@ -150,7 +150,7 @@ def save(self, path, content, *, overwrite=False, compress=False): raise NotImplementedError() def load(self, path): - if self.__inner: + if not self._objptr: return self.__inner.load(path) try: diff --git a/tests/test_index.py b/tests/test_index.py index af0c1da890..1067422c5f 100644 --- a/tests/test_index.py +++ b/tests/test_index.py @@ -1775,6 +1775,7 @@ def test_lazy_index_wraps_multi_index_location(): lazy2.signatures_with_location()): assert ss_tup == ss_lazy_tup +@pytest.mark.skip("no support for in-memory sigs yet") def test_revindex_index_search(): # confirm that RevIndex works sig2 = utils.get_test_data("2.fa.sig") @@ -1820,6 +1821,7 @@ def test_revindex_index_search(): assert sr[0][1] == ss63 +@pytest.mark.skip("no support for in-memory sigs yet") def test_revindex_gather(): # check that RevIndex.best_containment works. sig2 = utils.get_test_data("2.fa.sig") @@ -1846,6 +1848,7 @@ def test_revindex_gather(): assert match.signature == ss47 +@pytest.mark.skip("no support for in-memory sigs yet") def test_revindex_gather_ignore(): # check that RevIndex gather ignores things properly. sig2 = utils.get_test_data('2.fa.sig') diff --git a/tox.ini b/tox.ini index 41734a6a3b..f73758ab7f 100644 --- a/tox.ini +++ b/tox.ini @@ -50,6 +50,11 @@ commands = pytest \ --junitxml {toxworkdir}/junit.{envname}.xml \ {posargs:doc tests} +[testenv:.pkg] +pass_env = + LIBCLANG_PATH + BINDGEN_EXTRA_CLANG_ARGS + [testenv:pypy3] deps = pip >= 19.3.1 @@ -104,7 +109,7 @@ commands = description = invoke sphinx-build to build the HTML docs basepython = python3.10 extras = doc -whitelist_externals = pandoc +allowlist_externals = pandoc pass_env = HOME change_dir = {toxinidir} #commands = sphinx-build -d "{toxworkdir}/docs_doctree" doc "{toxworkdir}/docs_out" --color -W -bhtml {posargs}