diff --git a/.github/workflows/dev_envs.yml b/.github/workflows/dev_envs.yml index 33b5069d00..5c8c359bfb 100644 --- a/.github/workflows/dev_envs.yml +++ b/.github/workflows/dev_envs.yml @@ -57,7 +57,7 @@ jobs: - name: install dependencies shell: bash -l {0} - run: mamba install 'tox>=3.27,<4' tox-conda rust git compilers pandoc + run: mamba install 'tox>=3.27,<4' tox-conda rust git compilers pandoc libstdcxx-ng - name: run tests for 3.9 shell: bash -l {0} diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 300169efa6..89c87c7551 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -234,6 +234,12 @@ jobs: toolchain: stable override: true + - name: Check semver + uses: obi1kenobi/cargo-semver-checks-action@v2 + with: + crate-name: sourmash + version-tag-prefix: r + - name: Make sure we can publish the sourmash crate uses: actions-rs/cargo@v1 with: diff --git a/.readthedocs.yml b/.readthedocs.yml index f015d9c63d..c4b7ce13ff 100644 --- a/.readthedocs.yml +++ b/.readthedocs.yml @@ -9,6 +9,10 @@ build: tools: python: "3.10" rust: "1.64" + apt_packages: + - llvm-dev + - libclang-dev + - clang # Build documentation in the docs/ directory with Sphinx sphinx: diff --git a/Cargo.lock b/Cargo.lock index bb1633c7c6..c2f3ac9cb3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8,6 +8,17 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" +[[package]] +name = "ahash" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fcb51a0695d8f838b1ee009b3fbf66bda078cd64590202a864a8f3e8c4315c47" +dependencies = [ + "getrandom", + "once_cell", + "version_check", +] + [[package]] name = "aliasable" version = "0.1.3" @@ -59,6 +70,12 @@ version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7b7e4c2464d97fe331d41de9d5db0def0a96f4d823b8b32a2efd503578988973" +[[package]] +name = "binary-merge" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "597bb81c80a54b6a4381b23faba8d7774b144c94cbd1d6fe3f1329bd776554ab" + [[package]] name = "bincode" version = "1.3.3" @@ -68,6 +85,27 @@ dependencies = [ "serde", ] +[[package]] +name = "bindgen" +version = "0.65.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cfdf7b466f9a4903edc73f95d6d2bcd5baf8ae620638762244d3f60143643cc5" +dependencies = [ + "bitflags 1.3.2", + "cexpr", + "clang-sys", + "lazy_static", + "lazycell", + "peeking_take_while", + "prettyplease", + "proc-macro2", + "quote", + "regex", + "rustc-hash", + "shlex", + "syn 2.0.23", +] + [[package]] name = "bitflags" version = "1.3.2" @@ -80,18 +118,6 @@ version = "2.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "630be753d4e58660abd17930c71b647fe46c27ea6b63cc59e1e3851406972e42" -[[package]] -name = "bstr" -version = "0.2.17" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ba3569f383e8f1598449f1a423e72e99569137b47740b1da11ef19af3d5c3223" -dependencies = [ - "lazy_static", - "memchr", - "regex-automata", - "serde", -] - [[package]] name = "buffer-redux" version = "1.0.0" @@ -108,12 +134,39 @@ version = "3.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0d261e256854913907f67ed06efbc3338dfe6179796deefc1ff763fc1aee5535" +[[package]] +name = "bytecheck" +version = "0.6.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3a31f923c2db9513e4298b72df143e6e655a759b3d6a0966df18f81223fff54f" +dependencies = [ + "bytecheck_derive", + "ptr_meta", +] + +[[package]] +name = "bytecheck_derive" +version = "0.6.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "edb17c862a905d912174daa27ae002326fff56dc8b8ada50a0a5f0976cb174f0" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.104", +] + [[package]] name = "bytecount" version = "0.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2c676a478f63e9fa2dd5368a42f28bba0d6c560b775f38583c8bbaa7fcd67c9c" +[[package]] +name = "bytemuck" +version = "1.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2f5715e491b5a1598fc2bef5a606847b5dc1d48ea625bd3c02c00de8285591da" + [[package]] name = "byteorder" version = "1.4.3" @@ -164,6 +217,18 @@ name = "cc" version = "1.0.73" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2fff2a6927b3bb87f9595d67196a70493f627687a71d87a0d692242c33f58c11" +dependencies = [ + "jobserver", +] + +[[package]] +name = "cexpr" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6fac387a98bb7c37292057cffc56d62ecb629900026402633ae9160df93a8766" +dependencies = [ + "nom", +] [[package]] name = "cfg-if" @@ -213,6 +278,17 @@ dependencies = [ "half", ] +[[package]] +name = "clang-sys" +version = "1.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a050e2153c5be08febd6734e29298e844fdb0fa21aeddd63b4eb7baa106c69b" +dependencies = [ + "glob", + "libc", + "libloading", +] + [[package]] name = "clap" version = "4.3.0" @@ -374,13 +450,12 @@ dependencies = [ [[package]] name = "csv" -version = "1.1.6" +version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "22813a6dc45b335f9bade10bf7271dc477e81113e89eb251a0bc2a8a81c536e1" +checksum = "af91f40b7355f82b0a891f50e70399475945bb0b0da4f1700ce60761c9d3e359" dependencies = [ - "bstr", "csv-core", - "itoa 0.4.8", + "itoa", "ryu", "serde", ] @@ -396,9 +471,9 @@ dependencies = [ [[package]] name = "cxx" -version = "1.0.85" +version = "1.0.91" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5add3fc1717409d029b20c5b6903fc0c0b02fa6741d820054f4a2efa5e5816fd" +checksum = "86d3488e7665a7a483b57e25bdd90d0aeb2bc7608c8d0346acf2ad3f1caf1d62" dependencies = [ "cc", "cxxbridge-flags", @@ -408,9 +483,9 @@ dependencies = [ [[package]] name = "cxx-build" -version = "1.0.85" +version = "1.0.91" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b4c87959ba14bc6fbc61df77c3fcfe180fc32b93538c4f1031dd802ccb5f2ff0" +checksum = "48fcaf066a053a41a81dfb14d57d99738b767febb8b735c3016e469fac5da690" dependencies = [ "cc", "codespan-reporting", @@ -423,15 +498,15 @@ dependencies = [ [[package]] name = "cxxbridge-flags" -version = "1.0.85" +version = "1.0.91" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "69a3e162fde4e594ed2b07d0f83c6c67b745e7f28ce58c6df5e6b6bef99dfb59" +checksum = "a2ef98b8b717a829ca5603af80e1f9e2e48013ab227b68ef37872ef84ee479bf" [[package]] name = "cxxbridge-macro" -version = "1.0.85" +version = "1.0.91" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3e7e2adeb6a0d4a282e581096b06e1791532b7d576dcde5ccd9382acf55db8e6" +checksum = "086c685979a698443656e5cf7856c95c642295a38599f12fb1ff76fb28d19892" dependencies = [ "proc-macro2", "quote", @@ -531,12 +606,27 @@ dependencies = [ "syn 1.0.104", ] +[[package]] +name = "glob" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b919933a397b79c37e33b77bb2aa3dc8eb6e165ad809e58ff75bc7db2e34574" + [[package]] name = "half" version = "1.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eabb4a44450da02c90444cf74558da904edde8fb4e9035a9a6a4e15445af0bd7" +[[package]] +name = "hashbrown" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "db0d4cf898abf0081f964436dc980e96670a0f36863e4b83aaacdb65c9d7ccc3" +dependencies = [ + "ahash", +] + [[package]] name = "heck" version = "0.4.1" @@ -558,6 +648,12 @@ version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "443144c8cdadd93ebf52ddb4056d257f5b52c04d3c804e657d19eb73fc33668b" +[[package]] +name = "histogram" +version = "0.6.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "12cb882ccb290b8646e554b157ab0b71e64e8d5bef775cd66b6531e52d302669" + [[package]] name = "iana-time-zone" version = "0.1.53" @@ -582,6 +678,15 @@ dependencies = [ "cxx-build", ] +[[package]] +name = "inplace-vec-builder" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cf64c2edc8226891a71f127587a2861b132d2b942310843814d5001d99a1d307" +dependencies = [ + "smallvec", +] + [[package]] name = "io-lifetimes" version = "1.0.11" @@ -616,15 +721,18 @@ dependencies = [ [[package]] name = "itoa" -version = "0.4.8" +version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b71991ff56294aa922b450139ee08b3bfc70982c6b2c7562771375cf73542dd4" +checksum = "1aab8fc367588b89dcee83ab0fd66b72b50b72fa1904d7095045ace2b0c81c35" [[package]] -name = "itoa" -version = "1.0.1" +name = "jobserver" +version = "0.1.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1aab8fc367588b89dcee83ab0fd66b72b50b72fa1904d7095045ace2b0c81c35" +checksum = "af25a77299a7f711a01975c35a6a424eb6862092cc2d6c72c4ed6cbc56dfc1fa" +dependencies = [ + "libc", +] [[package]] name = "js-sys" @@ -641,18 +749,61 @@ version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" +[[package]] +name = "lazycell" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" + [[package]] name = "libc" version = "0.2.146" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f92be4933c13fd498862a9e02a3055f8a8d9c039ce33db97306fd5a6caa7f29b" +[[package]] +name = "libloading" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b67380fd3b2fbe7527a606e18729d21c6f3951633d0500574c4dc22d2d638b9f" +dependencies = [ + "cfg-if", + "winapi", +] + [[package]] name = "libm" version = "0.2.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "348108ab3fba42ec82ff6e9564fc4ca0247bdccdc68dd8af9764bbc79c3c8ffb" +[[package]] +name = "librocksdb-sys" +version = "0.11.0+8.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3386f101bcb4bd252d8e9d2fb41ec3b0862a15a62b478c355b2982efa469e3e" +dependencies = [ + "bindgen", + "bzip2-sys", + "cc", + "glob", + "libc", + "libz-sys", + "lz4-sys", + "zstd-sys", +] + +[[package]] +name = "libz-sys" +version = "1.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9702761c3935f8cc2f101793272e202c72b99da8f4224a19ddcf1279a6450bbf" +dependencies = [ + "cc", + "pkg-config", + "vcpkg", +] + [[package]] name = "link-cplusplus" version = "1.0.8" @@ -680,6 +831,16 @@ version = "0.4.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f" +[[package]] +name = "lz4-sys" +version = "1.9.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57d27b317e207b10f69f5e75494119e391a96f48861ae870d1da6edac98ca900" +dependencies = [ + "cc", + "libc", +] + [[package]] name = "lzma-sys" version = "0.1.17" @@ -731,6 +892,12 @@ dependencies = [ "autocfg", ] +[[package]] +name = "minimal-lexical" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" + [[package]] name = "miniz_oxide" version = "0.4.4" @@ -763,9 +930,9 @@ dependencies = [ [[package]] name = "niffler" -version = "2.4.0" +version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "68c7ffd42bdba05fc9fbfda31283d44c5c8a88fed1a191f68795dba23cc8204b" +checksum = "470dd05a938a5ad42c2cb80ceea4255e275990ee530b86ca164e6d8a19fa407f" dependencies = [ "cfg-if", "flate2", @@ -778,6 +945,16 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2bf50223579dc7cdcfb3bfcacf7069ff68243f8c363f62ffa99cf000a6b9c451" +[[package]] +name = "nom" +version = "7.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d273983c5a657a70a3e8f2a01329822f3b8c8172b73826411a55751e404a0a4a" +dependencies = [ + "memchr", + "minimal-lexical", +] + [[package]] name = "num-integer" version = "0.1.44" @@ -819,6 +996,15 @@ dependencies = [ "libc", ] +[[package]] +name = "numsep" +version = "0.1.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ad5c49c3e12c314efb1f43cba136031b657dcd59ee26936ab2be313c5e97da22" +dependencies = [ + "slicestring", +] + [[package]] name = "once_cell" version = "1.18.0" @@ -855,6 +1041,12 @@ dependencies = [ "syn 2.0.23", ] +[[package]] +name = "peeking_take_while" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19b17cddbe7ec3f8bc800887bab5e717348c95ea2ca0b1bf0837fb964dc67099" + [[package]] name = "piz" version = "0.5.1" @@ -911,6 +1103,16 @@ version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eb9f9e6e233e5c4a35559a617bf40a4ec447db2e84c20b55a6f83167b7e57872" +[[package]] +name = "prettyplease" +version = "0.2.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c64d9ba0963cdcea2e1b2230fbae2bab30eb25a174be395c41e764bfb65dd62" +dependencies = [ + "proc-macro2", + "syn 2.0.23", +] + [[package]] name = "primal-check" version = "0.3.3" @@ -970,6 +1172,26 @@ dependencies = [ "unarray", ] +[[package]] +name = "ptr_meta" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0738ccf7ea06b608c10564b31debd4f5bc5e197fc8bfe088f68ae5ce81e7a4f1" +dependencies = [ + "ptr_meta_derive", +] + +[[package]] +name = "ptr_meta_derive" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16b845dbfca988fa33db069c0e230574d15a3088f147a87b64c7589eb662c9ac" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.104", +] + [[package]] name = "quote" version = "1.0.29" @@ -1058,18 +1280,79 @@ dependencies = [ "regex-syntax", ] -[[package]] -name = "regex-automata" -version = "0.1.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132" - [[package]] name = "regex-syntax" version = "0.6.26" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49b3de9ec5dc0a3417da371aab17d729997c15010e7fd24ff707773a33bddb64" +[[package]] +name = "rend" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "581008d2099240d37fb08d77ad713bcaec2c4d89d50b5b21a8bb1996bbab68ab" +dependencies = [ + "bytecheck", +] + +[[package]] +name = "retain_mut" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8c31b5c4033f8fdde8700e4657be2c497e7288f01515be52168c631e2e4d4086" + +[[package]] +name = "rkyv" +version = "0.7.40" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c30f1d45d9aa61cbc8cd1eb87705470892289bb2d01943e7803b873a57404dc3" +dependencies = [ + "bytecheck", + "hashbrown", + "ptr_meta", + "rend", + "rkyv_derive", + "seahash", +] + +[[package]] +name = "rkyv_derive" +version = "0.7.40" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff26ed6c7c4dfc2aa9480b86a60e3c7233543a270a680e10758a507c5a4ce476" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.104", +] + +[[package]] +name = "roaring" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ef0fb5e826a8bde011ecae6a8539dd333884335c57ff0f003fbe27c25bbe8f71" +dependencies = [ + "bytemuck", + "byteorder", + "retain_mut", +] + +[[package]] +name = "rocksdb" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bb6f170a4041d50a0ce04b0d2e14916d6ca863ea2e422689a5b694395d299ffe" +dependencies = [ + "libc", + "librocksdb-sys", +] + +[[package]] +name = "rustc-hash" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" + [[package]] name = "rustix" version = "0.37.20" @@ -1136,6 +1419,12 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ddccb15bcce173023b3fedd9436f882a0739b8dfb45e4f6b6002bee5929f61b2" +[[package]] +name = "seahash" +version = "4.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1c107b6f4780854c8b126e228ea8869f4d7b71260f962fefb57b996b8959ba6b" + [[package]] name = "serde" version = "1.0.168" @@ -1162,11 +1451,29 @@ version = "1.0.105" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "693151e1ac27563d6dbcec9dee9fbd5da8539b20fa14ad3752b2e6d363ace360" dependencies = [ - "itoa 1.0.1", + "itoa", "ryu", "serde", ] +[[package]] +name = "shlex" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "43b2853a4d09f215c24cc5489c992ce46052d359b5109343cbafbf26bc62f8a3" + +[[package]] +name = "size" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9fed904c7fb2856d868b92464fc8fa597fce366edea1a9cbfaa8cb5fe080bd6d" + +[[package]] +name = "slicestring" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "636b979c5672ac7c2a1120ca0a9a6074cd090dadfec42af6f8a5baea1223d180" + [[package]] name = "smallvec" version = "1.8.0" @@ -1181,7 +1488,7 @@ checksum = "9f1341053f34bb13b5e9590afb7d94b48b48d4b87467ec28e3c238693bb553de" [[package]] name = "sourmash" -version = "0.11.0" +version = "0.12.0" dependencies = [ "assert_matches", "az", @@ -1191,10 +1498,12 @@ dependencies = [ "chrono", "counter", "criterion", + "csv", "finch", "fixedbitset", "getrandom", "getset", + "histogram", "log", "md5", "memmap2", @@ -1203,6 +1512,7 @@ dependencies = [ "niffler", "nohash-hasher", "num-iter", + "numsep", "once_cell", "ouroboros", "piz", @@ -1210,8 +1520,12 @@ dependencies = [ "proptest", "rand", "rayon", + "rkyv", + "roaring", + "rocksdb", "serde", "serde_json", + "size", "tempfile", "thiserror", "twox-hash", @@ -1349,16 +1663,25 @@ checksum = "6ceab39d59e4c9499d4e5a8ee0e2735b891bb7308ac83dfb4e80cad195c9f6f3" [[package]] name = "unicode-width" -version = "0.1.9" +version = "0.1.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3ed742d4ea2bd1176e236172c8429aaf54486e7ac098db29ffe6529e0ce50973" +checksum = "c0edd1e5b14653f783770bce4a4dabb4a5108a5370a5f5d8cfe8710c361f6c8b" + +[[package]] +name = "vcpkg" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" [[package]] name = "vec-collections" -version = "0.3.6" +version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4f2390c4dc8ae8640c57d067b1a3d40bc05c124cc6bc7394d761b53435d41b76" +checksum = "3c9965c8f2ffed1dbcd16cafe18a009642f540fa22661c6cfd6309ddb02e4982" dependencies = [ + "binary-merge", + "inplace-vec-builder", + "lazy_static", "num-traits", "serde", "smallvec", @@ -1601,3 +1924,14 @@ checksum = "c179869f34fc7c01830d3ce7ea2086bc3a07e0d35289b667d0a8bf910258926c" dependencies = [ "lzma-sys", ] + +[[package]] +name = "zstd-sys" +version = "2.0.7+zstd.1.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94509c3ba2fe55294d752b79842c530ccfab760192521df74a081a78d2b3c7f5" +dependencies = [ + "cc", + "libc", + "pkg-config", +] diff --git a/Makefile b/Makefile index 4c2ef69abb..f964bc3cce 100644 --- a/Makefile +++ b/Makefile @@ -32,7 +32,8 @@ include/sourmash.h: src/core/src/lib.rs \ src/core/src/ffi/index/mod.rs \ src/core/src/ffi/index/revindex.rs \ src/core/src/ffi/storage.rs \ - src/core/src/errors.rs + src/core/src/errors.rs \ + src/core/cbindgen.toml cd src/core && \ RUSTC_BOOTSTRAP=1 cbindgen -c cbindgen.toml . -o ../../$@ diff --git a/deny.toml b/deny.toml index 29d148d50b..99f3b442c7 100644 --- a/deny.toml +++ b/deny.toml @@ -29,6 +29,7 @@ default = "deny" confidence-threshold = 0.8 exceptions = [ { allow = ["Zlib"], name = "piz", version = "*" }, + { allow = ["ISC"], name = "libloading", version = "*" }, ] [bans] diff --git a/doc/developer.md b/doc/developer.md index 2368611e7a..b169d557de 100644 --- a/doc/developer.md +++ b/doc/developer.md @@ -25,7 +25,7 @@ and the [`conda-forge`](https://conda-forge.org/) channel by default). Once `mamba` is installed, run ``` -mamba create -n sourmash_dev 'tox>=3.27,<4' tox-conda rust git compilers pandoc +mamba create -n sourmash_dev 'tox>=3.27,<4' tox-conda rust git compilers pandoc libstdcxx-ng ``` to create an environment called `sourmash_dev` containing the programs needed for development. diff --git a/flake.nix b/flake.nix index 43ad3b8d78..cf004e183d 100644 --- a/flake.nix +++ b/flake.nix @@ -106,6 +106,7 @@ #py-spy #heaptrack + cargo-all-features cargo-watch cargo-limit cargo-outdated @@ -114,6 +115,7 @@ nixpkgs-fmt ]; + # Needed for matplotlib LD_LIBRARY_PATH = lib.makeLibraryPath [ pkgs.stdenv.cc.cc.lib ]; # workaround for https://github.com/NixOS/nixpkgs/blob/48dfc9fa97d762bce28cc8372a2dd3805d14c633/doc/languages-frameworks/python.section.md#python-setuppy-bdist_wheel-cannot-create-whl diff --git a/include/sourmash.h b/include/sourmash.h index 6fa7854880..011aee2925 100644 --- a/include/sourmash.h +++ b/include/sourmash.h @@ -42,6 +42,7 @@ enum SourmashErrorCode { SOURMASH_ERROR_CODE_PARSE_INT = 100003, SOURMASH_ERROR_CODE_SERDE_ERROR = 100004, SOURMASH_ERROR_CODE_NIFFLER_ERROR = 100005, + SOURMASH_ERROR_CODE_CSV_ERROR = 100006, }; typedef uint32_t SourmashErrorCode; diff --git a/pyproject.toml b/pyproject.toml index ff3b831c20..dc77145fc2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -144,7 +144,7 @@ include = [ exclude = [ { path = "**/__pycache__/*", format = ["sdist", "wheel"] }, ] -features = ["maturin"] +features = ["maturin", "mastiff"] locked = true module-name = "sourmash._lowlevel" @@ -164,7 +164,7 @@ known_first_party = ["sourmash"] [tool.cibuildwheel] build = "cp39-*" -skip = "*-win32 *-manylinux_i686 *-musllinux_ppc64le *-musllinux_s390x" +skip = "*-win32 *-manylinux_i686 *-musllinux_*" before-all = [ "curl https://sh.rustup.rs -sSf | sh -s -- -y --default-toolchain=stable", "cargo update --dry-run", @@ -178,6 +178,18 @@ build-verbosity = 3 CARGO_REGISTRIES_CRATES_IO_PROTOCOL="sparse" PATH="$HOME/.cargo/bin:$PATH" +[tool.cibuildwheel.linux] +before-all = [ + "curl https://sh.rustup.rs -sSf | sh -s -- -y --default-toolchain=stable", + "cargo update --dry-run", + "if [ -f /etc/system-release ]; then yum -y install centos-release-scl; fi", + "if [ -f /etc/system-release ]; then yum -y install llvm-toolset-7.0; fi", +] +before-build = [ + "if [ -f /etc/system-release ]; then source scl_source enable llvm-toolset-7.0; fi", + "if [ -f /etc/system-release ]; then source scl_source enable devtoolset-10; fi", +] + [tool.cibuildwheel.linux.environment] CARGO_REGISTRIES_CRATES_IO_PROTOCOL="sparse" PATH="$HOME/.cargo/bin:$PATH" diff --git a/src/core/Cargo.toml b/src/core/Cargo.toml index cbc897b28b..87c80ee3dc 100644 --- a/src/core/Cargo.toml +++ b/src/core/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "sourmash" -version = "0.11.0" +version = "0.12.0" authors = ["Luiz Irber "] description = "MinHash sketches for genomic data" repository = "https://github.com/sourmash-bio/sourmash" @@ -22,6 +22,8 @@ bench = false from-finch = ["finch"] parallel = ["rayon"] maturin = [] +mastiff = ["rocksdb", "rkyv", "parallel"] +default = [] [dependencies] az = "1.0.0" @@ -29,6 +31,7 @@ bytecount = "0.6.0" byteorder = "1.4.3" cfg-if = "1.0" counter = "0.5.7" +csv = "1.1.6" finch = { version = "0.6.0", optional = true } fixedbitset = "0.4.0" getrandom = { version = "0.2", features = ["js"] } @@ -47,10 +50,15 @@ primal-check = "0.3.1" thiserror = "1.0" typed-builder = "0.14.0" twox-hash = "1.6.0" -vec-collections = "0.3.4" +vec-collections = "0.4.3" piz = "0.5.0" memmap2 = "0.7.1" ouroboros = "0.17.2" +rkyv = { version = "0.7.39", optional = true } +roaring = "0.10.0" +histogram = "0.6.9" +numsep = "0.1.12" +size = "0.4.0" [dev-dependencies] assert_matches = "1.3.0" @@ -72,6 +80,13 @@ harness = false name = "minhash" harness = false +[package.metadata.cargo-all-features] +skip_optional_dependencies = true +denylist = ["maturin"] +skip_feature_sets = [ + ["mastiff", "parallel"], # mastiff implies parallel +] + ## Wasm section. Crates only used for WASM, as well as specific configurations [target.'cfg(all(target_arch = "wasm32", target_os="unknown"))'.dependencies.wasm-bindgen] @@ -90,4 +105,5 @@ features = ["wasmbind"] wasm-bindgen-test = "0.3.37" ### These crates don't compile on wasm -[target.'cfg(not(all(target_arch = "wasm32", target_os="unknown")))'.dependencies] +[target.'cfg(not(target_arch = "wasm32"))'.dependencies] +rocksdb = { version = "0.21.0", optional = true } diff --git a/src/core/build.rs b/src/core/build.rs index a22396c25a..f067828d50 100644 --- a/src/core/build.rs +++ b/src/core/build.rs @@ -55,12 +55,12 @@ fn copy_c_bindings(crate_dir: &str) { let new_header: String = header .lines() .filter_map(|s| { - if s.starts_with("#") { + if s.starts_with('#') { None } else { Some({ let mut s = s.to_owned(); - s.push_str("\n"); + s.push('\n'); s }) } @@ -71,5 +71,5 @@ fn copy_c_bindings(crate_dir: &str) { let target_dir = find_target_dir(&out_dir); std::fs::create_dir_all(&target_dir).expect("error creating target dir"); let out_path = target_dir.join("header.h"); - std::fs::write(out_path, &new_header).expect("error writing header"); + std::fs::write(out_path, new_header).expect("error writing header"); } diff --git a/src/core/cbindgen.toml b/src/core/cbindgen.toml index cd6cd781c2..bc1538056b 100644 --- a/src/core/cbindgen.toml +++ b/src/core/cbindgen.toml @@ -8,7 +8,7 @@ clean = true [parse.expand] crates = ["sourmash"] -features = [] +features = ["mastiff"] [enum] rename_variants = "QualifiedScreamingSnakeCase" diff --git a/src/core/src/encodings.rs b/src/core/src/encodings.rs index 6010cf2f6d..443db90b50 100644 --- a/src/core/src/encodings.rs +++ b/src/core/src/encodings.rs @@ -7,6 +7,7 @@ use std::str; use nohash_hasher::BuildNoHashHasher; use once_cell::sync::Lazy; +use vec_collections::AbstractVecSet; use crate::Error; @@ -23,6 +24,10 @@ type ColorToIdx = HashMap>; #[allow(non_camel_case_types)] #[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[cfg_attr( + feature = "rkyv", + derive(rkyv::Serialize, rkyv::Deserialize, rkyv::Archive) +)] #[repr(u32)] pub enum HashFunctions { murmur64_DNA = 1, diff --git a/src/core/src/errors.rs b/src/core/src/errors.rs index cd4ddcfaf1..f6c3ce311e 100644 --- a/src/core/src/errors.rs +++ b/src/core/src/errors.rs @@ -63,6 +63,9 @@ pub enum SourmashError { #[error(transparent)] IOError(#[from] std::io::Error), + #[error(transparent)] + CsvError(#[from] csv::Error), + #[cfg(not(all(target_arch = "wasm32", target_os = "unknown")))] #[error(transparent)] Panic(#[from] crate::ffi::utils::Panic), @@ -108,6 +111,7 @@ pub enum SourmashErrorCode { ParseInt = 100_003, SerdeError = 100_004, NifflerError = 100_005, + CsvError = 100_006, } #[cfg(not(all(target_arch = "wasm32", target_os = "unknown")))] @@ -137,6 +141,7 @@ impl SourmashErrorCode { SourmashError::IOError { .. } => SourmashErrorCode::Io, SourmashError::NifflerError { .. } => SourmashErrorCode::NifflerError, SourmashError::Utf8Error { .. } => SourmashErrorCode::Utf8Error, + SourmashError::CsvError { .. } => SourmashErrorCode::CsvError, } } } diff --git a/src/core/src/ffi/index/mod.rs b/src/core/src/ffi/index/mod.rs index 932a97b222..cb98f0963b 100644 --- a/src/core/src/ffi/index/mod.rs +++ b/src/core/src/ffi/index/mod.rs @@ -1,3 +1,4 @@ +#[cfg(feature = "mastiff")] pub mod revindex; use crate::signature::Signature; diff --git a/src/core/src/ffi/index/revindex.rs b/src/core/src/ffi/index/revindex.rs index 3597121bce..8f4691dbad 100644 --- a/src/core/src/ffi/index/revindex.rs +++ b/src/core/src/ffi/index/revindex.rs @@ -1,7 +1,7 @@ use std::path::PathBuf; use std::slice; -use crate::index::revindex::RevIndex; +use crate::index::revindex::mem_revindex::RevIndex; use crate::index::Index; use crate::signature::{Signature, SigsTrait}; use crate::sketch::minhash::KmerMinHash; diff --git a/src/core/src/ffi/storage.rs b/src/core/src/ffi/storage.rs index 86d3834201..882a8d5f20 100644 --- a/src/core/src/ffi/storage.rs +++ b/src/core/src/ffi/storage.rs @@ -1,5 +1,6 @@ use std::os::raw::c_char; use std::slice; +use std::sync::Arc; use crate::ffi::utils::{ForeignObject, SourmashStr}; use crate::prelude::*; @@ -8,7 +9,7 @@ use crate::storage::ZipStorage; pub struct SourmashZipStorage; impl ForeignObject for SourmashZipStorage { - type RustObject = ZipStorage; + type RustObject = Arc; } ffi_fn! { @@ -20,7 +21,7 @@ unsafe fn zipstorage_new(ptr: *const c_char, insize: usize) -> Result<*mut Sourm }; let zipstorage = ZipStorage::from_file(path)?; - Ok(SourmashZipStorage::from_rust(zipstorage)) + Ok(SourmashZipStorage::from_rust(Arc::new(zipstorage))) } } @@ -110,7 +111,7 @@ unsafe fn zipstorage_set_subdir( std::str::from_utf8(path)? }; - storage.set_subdir(path.to_string()); + (*Arc::get_mut(storage).unwrap()).set_subdir(path.to_string()); Ok(()) } } diff --git a/src/core/src/from.rs b/src/core/src/from.rs index dfc384236e..37c98bf40d 100644 --- a/src/core/src/from.rs +++ b/src/core/src/from.rs @@ -23,10 +23,8 @@ impl From for KmerMinHash { values.len() as u32, ); - let hash_with_abunds: Vec<(u64, u64)> = values - .iter() - .map(|x| (x.hash as u64, x.count as u64)) - .collect(); + let hash_with_abunds: Vec<(u64, u64)> = + values.iter().map(|x| (x.hash, x.count as u64)).collect(); new_mh .add_many_with_abund(&hash_with_abunds) diff --git a/src/core/src/index/linear.rs b/src/core/src/index/linear.rs index 78b2c6f1f5..6ae2916f16 100644 --- a/src/core/src/index/linear.rs +++ b/src/core/src/index/linear.rs @@ -6,18 +6,18 @@ use std::path::PathBuf; use serde::{Deserialize, Serialize}; use typed_builder::TypedBuilder; -use crate::index::{Comparable, DatasetInfo, Index, SigStore}; +use crate::index::{DatasetInfo, Index, SigStore}; use crate::prelude::*; use crate::storage::{FSStorage, InnerStorage, Storage, StorageInfo}; use crate::Error; #[derive(TypedBuilder)] -pub struct LinearIndex { +pub struct LinearIndex { #[builder(default)] storage: Option, #[builder(default)] - datasets: Vec>, + datasets: Vec, } #[derive(Serialize, Deserialize)] @@ -27,15 +27,11 @@ struct LinearInfo { leaves: Vec, } -impl<'a, L> Index<'a> for LinearIndex -where - L: Clone + Comparable + 'a, - SigStore: From, -{ - type Item = L; +impl<'a> Index<'a> for LinearIndex { + type Item = Signature; //type SignatureIterator = std::slice::Iter<'a, Self::Item>; - fn insert(&mut self, node: L) -> Result<(), Error> { + fn insert(&mut self, node: Self::Item) -> Result<(), Error> { self.datasets.push(node.into()); Ok(()) } @@ -76,11 +72,7 @@ where */ } -impl LinearIndex -where - L: ToWriter, - SigStore: ReadData, -{ +impl LinearIndex { pub fn save_file>( &mut self, path: P, @@ -115,7 +107,7 @@ where .iter_mut() .map(|l| { // Trigger data loading - let _: &L = (*l).data().unwrap(); + let _: &Signature = (*l).data().unwrap(); // set storage to new one l.storage = Some(storage.clone()); @@ -137,7 +129,7 @@ where Ok(()) } - pub fn from_path>(path: P) -> Result, Error> { + pub fn from_path>(path: P) -> Result { let file = File::open(&path)?; let mut reader = BufReader::new(file); @@ -147,11 +139,11 @@ where basepath.push(path); basepath.canonicalize()?; - let linear = LinearIndex::::from_reader(&mut reader, basepath.parent().unwrap())?; + let linear = LinearIndex::from_reader(&mut reader, basepath.parent().unwrap())?; Ok(linear) } - pub fn from_reader(rdr: R, path: P) -> Result, Error> + pub fn from_reader(rdr: R, path: P) -> Result where R: Read, P: AsRef, @@ -171,7 +163,7 @@ where .leaves .into_iter() .map(|l| { - let mut v: SigStore = l.into(); + let mut v: SigStore = l.into(); v.storage = Some(storage.clone()); v }) diff --git a/src/core/src/index/mod.rs b/src/core/src/index/mod.rs index 832fdf9091..97b5e743d2 100644 --- a/src/core/src/index/mod.rs +++ b/src/core/src/index/mod.rs @@ -4,6 +4,9 @@ //! Some indices also support containment searches. pub mod linear; + +#[cfg(not(all(target_arch = "wasm32", target_os = "unknown")))] +#[cfg(feature = "mastiff")] pub mod revindex; pub mod search; @@ -15,14 +18,85 @@ use once_cell::sync::OnceCell; use serde::{Deserialize, Serialize}; use typed_builder::TypedBuilder; +use crate::encodings::HashFunctions; use crate::errors::ReadDataError; use crate::index::search::{search_minhashes, search_minhashes_containment}; +use crate::picklist::Picklist; use crate::prelude::*; use crate::signature::SigsTrait; use crate::sketch::Sketch; use crate::storage::{InnerStorage, Storage}; use crate::Error; +#[derive(Default)] +pub struct Selection { + ksize: Option, + abund: Option, + num: Option, + scaled: Option, + containment: Option, + moltype: Option, + picklist: Option, +} + +impl Selection { + pub fn ksize(&self) -> Option { + self.ksize + } + + pub fn set_ksize(&mut self, ksize: u32) { + self.ksize = Some(ksize); + } + + pub fn abund(&self) -> Option { + self.abund + } + + pub fn set_abund(&mut self, value: bool) { + self.abund = Some(value); + } + + pub fn num(&self) -> Option { + self.num + } + + pub fn set_num(&mut self, num: u32) { + self.num = Some(num); + } + + pub fn scaled(&self) -> Option { + self.scaled + } + + pub fn set_scaled(&mut self, scaled: u32) { + self.scaled = Some(scaled); + } + + pub fn containment(&self) -> Option { + self.containment + } + + pub fn set_containment(&mut self, containment: bool) { + self.containment = Some(containment); + } + + pub fn moltype(&self) -> Option { + self.moltype + } + + pub fn set_moltype(&mut self, value: HashFunctions) { + self.moltype = Some(value); + } + + pub fn picklist(&self) -> Option { + self.picklist.clone() + } + + pub fn set_picklist(&mut self, value: Picklist) { + self.picklist = Some(value); + } +} + pub trait Index<'a> { type Item: Comparable; //type SignatureIterator: Iterator; @@ -116,7 +190,7 @@ pub struct DatasetInfo { } #[derive(TypedBuilder, Default, Clone)] -pub struct SigStore { +pub struct SigStore { #[builder(setter(into))] filename: String, @@ -129,16 +203,16 @@ pub struct SigStore { storage: Option, #[builder(setter(into), default)] - data: OnceCell, + data: OnceCell, } -impl SigStore { +impl SigStore { pub fn name(&self) -> String { self.name.clone() } } -impl std::fmt::Debug for SigStore { +impl std::fmt::Debug for SigStore { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!( f, @@ -148,7 +222,7 @@ impl std::fmt::Debug for SigStore { } } -impl ReadData for SigStore { +impl ReadData for SigStore { fn data(&self) -> Result<&Signature, Error> { if let Some(sig) = self.data.get() { Ok(sig) @@ -172,10 +246,7 @@ impl ReadData for SigStore { } } -impl SigStore -where - T: ToWriter, -{ +impl SigStore { pub fn save(&self, path: &str) -> Result { if let Some(storage) = &self.storage { if let Some(data) = self.data.get() { @@ -190,10 +261,8 @@ where unimplemented!() } } -} -impl SigStore { - pub fn count_common(&self, other: &SigStore) -> u64 { + pub fn count_common(&self, other: &SigStore) -> u64 { let ng: &Signature = self.data().unwrap(); let ong: &Signature = other.data().unwrap(); @@ -220,13 +289,13 @@ impl SigStore { } } -impl From> for Signature { - fn from(other: SigStore) -> Signature { +impl From for Signature { + fn from(other: SigStore) -> Signature { other.data.get().unwrap().to_owned() } } -impl Deref for SigStore { +impl Deref for SigStore { type Target = Signature; fn deref(&self) -> &Signature { @@ -234,8 +303,8 @@ impl Deref for SigStore { } } -impl From for SigStore { - fn from(other: Signature) -> SigStore { +impl From for SigStore { + fn from(other: Signature) -> SigStore { let name = other.name(); let filename = other.filename(); @@ -249,8 +318,8 @@ impl From for SigStore { } } -impl Comparable> for SigStore { - fn similarity(&self, other: &SigStore) -> f64 { +impl Comparable for SigStore { + fn similarity(&self, other: &SigStore) -> f64 { let ng: &Signature = self.data().unwrap(); let ong: &Signature = other.data().unwrap(); @@ -273,7 +342,7 @@ impl Comparable> for SigStore { unimplemented!() } - fn containment(&self, other: &SigStore) -> f64 { + fn containment(&self, other: &SigStore) -> f64 { let ng: &Signature = self.data().unwrap(); let ong: &Signature = other.data().unwrap(); @@ -325,8 +394,8 @@ impl Comparable for Signature { } } -impl From for SigStore { - fn from(other: DatasetInfo) -> SigStore { +impl From for SigStore { + fn from(other: DatasetInfo) -> SigStore { SigStore { filename: other.filename, name: other.name, diff --git a/src/core/src/index/revindex.rs b/src/core/src/index/revindex.rs deleted file mode 100644 index 0a1fc25d18..0000000000 --- a/src/core/src/index/revindex.rs +++ /dev/null @@ -1,699 +0,0 @@ -use std::collections::{HashMap, HashSet}; -use std::path::{Path, PathBuf}; -use std::sync::atomic::{AtomicUsize, Ordering}; - -use getset::{CopyGetters, Getters, Setters}; -use log::{debug, info}; -use nohash_hasher::BuildNoHashHasher; -use serde::{Deserialize, Serialize}; - -#[cfg(feature = "parallel")] -use rayon::prelude::*; - -use crate::encodings::{Color, Colors, Idx}; -use crate::index::Index; -use crate::signature::{Signature, SigsTrait}; -use crate::sketch::minhash::KmerMinHash; -use crate::sketch::Sketch; -use crate::Error; -use crate::HashIntoType; - -type SigCounter = counter::Counter; - -#[derive(Serialize, Deserialize)] -struct HashToColor(HashMap>); - -impl HashToColor { - fn new() -> Self { - HashToColor(HashMap::< - HashIntoType, - Color, - BuildNoHashHasher, - >::with_hasher(BuildNoHashHasher::default())) - } - - fn get(&self, hash: &HashIntoType) -> Option<&Color> { - self.0.get(hash) - } - - fn retain(&mut self, hashes: &HashSet) { - self.0.retain(|hash, _| hashes.contains(hash)) - } - - fn len(&self) -> usize { - self.0.len() - } - - fn is_empty(&self) -> bool { - self.0.is_empty() - } - - fn add_to(&mut self, colors: &mut Colors, dataset_id: usize, matched_hashes: Vec) { - let mut color = None; - - matched_hashes.into_iter().for_each(|hash| { - color = Some(colors.update(color, &[dataset_id as Idx]).unwrap()); - self.0.insert(hash, color.unwrap()); - }); - } - - fn reduce_hashes_colors( - a: (HashToColor, Colors), - b: (HashToColor, Colors), - ) -> (HashToColor, Colors) { - let ((small_hashes, small_colors), (mut large_hashes, mut large_colors)) = - if a.0.len() > b.0.len() { - (b, a) - } else { - (a, b) - }; - - small_hashes.0.into_iter().for_each(|(hash, color)| { - large_hashes - .0 - .entry(hash) - .and_modify(|entry| { - // Hash is already present. - // Update the current color by adding the indices from - // small_colors. - let ids = small_colors.indices(&color); - let new_color = large_colors.update(Some(*entry), ids).unwrap(); - *entry = new_color; - }) - .or_insert_with(|| { - // In this case, the hash was not present yet. - // we need to create the same color from small_colors - // into large_colors. - let ids = small_colors.indices(&color); - let new_color = large_colors.update(None, ids).unwrap(); - assert_eq!(new_color, color); - new_color - }); - }); - - (large_hashes, large_colors) - } -} - -// Use rkyv for serialization? -// https://davidkoloski.me/rkyv/ -#[derive(Serialize, Deserialize)] -pub struct RevIndex { - hash_to_color: HashToColor, - - sig_files: Vec, - - #[serde(skip)] - ref_sigs: Option>, - - template: Sketch, - colors: Colors, - //#[serde(skip)] - //storage: Option, -} - -impl RevIndex { - pub fn load>( - index_path: P, - queries: Option<&[KmerMinHash]>, - ) -> Result> { - let (rdr, _) = niffler::from_path(index_path)?; - let revindex = if let Some(qs) = queries { - // TODO: avoid loading full revindex if query != None - /* - struct PartialRevIndex { - hashes_to_keep: Option>, - marker: PhantomData T>, - } - - impl PartialRevIndex { - pub fn new(hashes_to_keep: HashSet) -> Self { - PartialRevIndex { - hashes_to_keep: Some(hashes_to_keep), - marker: PhantomData, - } - } - } - */ - - let mut hashes: HashSet = HashSet::new(); - for q in qs { - hashes.extend(q.iter_mins()); - } - - //let mut revindex: RevIndex = PartialRevIndex::new(hashes).deserialize(&rdr).unwrap(); - - let mut revindex: RevIndex = serde_json::from_reader(rdr)?; - revindex.hash_to_color.retain(&hashes); - revindex - } else { - // Load the full revindex - serde_json::from_reader(rdr)? - }; - - Ok(revindex) - } - - pub fn new( - search_sigs: &[PathBuf], - template: &Sketch, - threshold: usize, - queries: Option<&[KmerMinHash]>, - keep_sigs: bool, - ) -> RevIndex { - // If threshold is zero, let's merge all queries and save time later - let merged_query = queries.and_then(|qs| Self::merge_queries(qs, threshold)); - - let processed_sigs = AtomicUsize::new(0); - - #[cfg(feature = "parallel")] - let sig_iter = search_sigs.par_iter(); - - #[cfg(not(feature = "parallel"))] - let sig_iter = search_sigs.iter(); - - let filtered_sigs = sig_iter.enumerate().filter_map(|(dataset_id, filename)| { - let i = processed_sigs.fetch_add(1, Ordering::SeqCst); - if i % 1000 == 0 { - info!("Processed {} reference sigs", i); - } - - let search_sig = Signature::from_path(filename) - .unwrap_or_else(|_| panic!("Error processing {:?}", filename)) - .swap_remove(0); - - RevIndex::map_hashes_colors( - dataset_id, - &search_sig, - queries, - &merged_query, - threshold, - template, - ) - }); - - #[cfg(feature = "parallel")] - let (hash_to_color, colors) = filtered_sigs.reduce( - || (HashToColor::new(), Colors::default()), - HashToColor::reduce_hashes_colors, - ); - - #[cfg(not(feature = "parallel"))] - let (hash_to_color, colors) = filtered_sigs.fold( - (HashToColor::new(), Colors::default()), - HashToColor::reduce_hashes_colors, - ); - - // TODO: build this together with hash_to_idx? - let ref_sigs = if keep_sigs { - #[cfg(feature = "parallel")] - let sigs_iter = search_sigs.par_iter(); - - #[cfg(not(feature = "parallel"))] - let sigs_iter = search_sigs.iter(); - - Some( - sigs_iter - .map(|ref_path| { - Signature::from_path(ref_path) - .unwrap_or_else(|_| panic!("Error processing {:?}", ref_path)) - .swap_remove(0) - }) - .collect(), - ) - } else { - None - }; - - RevIndex { - hash_to_color, - sig_files: search_sigs.into(), - ref_sigs, - template: template.clone(), - colors, - // storage: Some(InnerStorage::new(MemStorage::default())), - } - } - - fn merge_queries(qs: &[KmerMinHash], threshold: usize) -> Option { - if threshold == 0 { - let mut merged = qs[0].clone(); - for query in &qs[1..] { - merged.merge(query).unwrap(); - } - Some(merged) - } else { - None - } - } - - pub fn new_with_sigs( - search_sigs: Vec, - template: &Sketch, - threshold: usize, - queries: Option<&[KmerMinHash]>, - ) -> RevIndex { - // If threshold is zero, let's merge all queries and save time later - let merged_query = queries.and_then(|qs| Self::merge_queries(qs, threshold)); - - let processed_sigs = AtomicUsize::new(0); - - #[cfg(feature = "parallel")] - let sigs_iter = search_sigs.par_iter(); - #[cfg(not(feature = "parallel"))] - let sigs_iter = search_sigs.iter(); - - let filtered_sigs = sigs_iter.enumerate().filter_map(|(dataset_id, sig)| { - let i = processed_sigs.fetch_add(1, Ordering::SeqCst); - if i % 1000 == 0 { - info!("Processed {} reference sigs", i); - } - - RevIndex::map_hashes_colors( - dataset_id, - sig, - queries, - &merged_query, - threshold, - template, - ) - }); - - #[cfg(feature = "parallel")] - let (hash_to_color, colors) = filtered_sigs.reduce( - || (HashToColor::new(), Colors::default()), - HashToColor::reduce_hashes_colors, - ); - - #[cfg(not(feature = "parallel"))] - let (hash_to_color, colors) = filtered_sigs.fold( - (HashToColor::new(), Colors::default()), - HashToColor::reduce_hashes_colors, - ); - - RevIndex { - hash_to_color, - sig_files: vec![], - ref_sigs: search_sigs.into(), - template: template.clone(), - colors, - //storage: None, - } - } - - fn map_hashes_colors( - dataset_id: usize, - search_sig: &Signature, - queries: Option<&[KmerMinHash]>, - merged_query: &Option, - threshold: usize, - template: &Sketch, - ) -> Option<(HashToColor, Colors)> { - let mut search_mh = None; - if let Some(Sketch::MinHash(mh)) = search_sig.select_sketch(template) { - search_mh = Some(mh); - } - - let search_mh = search_mh.expect("Couldn't find a compatible MinHash"); - let mut hash_to_color = HashToColor::new(); - let mut colors = Colors::default(); - - if let Some(qs) = queries { - if let Some(ref merged) = merged_query { - let (matched_hashes, intersection) = merged.intersection(search_mh).unwrap(); - if !matched_hashes.is_empty() || intersection > threshold as u64 { - hash_to_color.add_to(&mut colors, dataset_id, matched_hashes); - } - } else { - for query in qs { - let (matched_hashes, intersection) = query.intersection(search_mh).unwrap(); - if !matched_hashes.is_empty() || intersection > threshold as u64 { - hash_to_color.add_to(&mut colors, dataset_id, matched_hashes); - } - } - } - } else { - let matched = search_mh.mins(); - let size = matched.len() as u64; - if !matched.is_empty() || size > threshold as u64 { - hash_to_color.add_to(&mut colors, dataset_id, matched); - } - }; - - if hash_to_color.is_empty() { - None - } else { - Some((hash_to_color, colors)) - } - } - - pub fn search( - &self, - counter: SigCounter, - similarity: bool, - threshold: usize, - ) -> Result, Box> { - let mut matches = vec![]; - if similarity { - unimplemented!("TODO: threshold correction") - } - - for (dataset_id, size) in counter.most_common() { - if size >= threshold { - matches.push(self.sig_files[dataset_id as usize].to_str().unwrap().into()); - } else { - break; - }; - } - Ok(matches) - } - - pub fn gather( - &self, - mut counter: SigCounter, - threshold: usize, - query: &KmerMinHash, - ) -> Result, Box> { - let mut match_size = usize::max_value(); - let mut matches = vec![]; - - while match_size > threshold && !counter.is_empty() { - let (dataset_id, size) = counter.most_common()[0]; - match_size = if size >= threshold { size } else { break }; - - let p; - let match_path = if self.sig_files.is_empty() { - p = PathBuf::new(); // TODO: Fix somehow? - &p - } else { - &self.sig_files[dataset_id as usize] - }; - - let ref_match; - let match_sig = if let Some(refsigs) = &self.ref_sigs { - &refsigs[dataset_id as usize] - } else { - // TODO: remove swap_remove - ref_match = Signature::from_path(match_path)?.swap_remove(0); - &ref_match - }; - - let mut match_mh = None; - if let Some(Sketch::MinHash(mh)) = match_sig.select_sketch(&self.template) { - match_mh = Some(mh); - } - let match_mh = match_mh.expect("Couldn't find a compatible MinHash"); - - // Calculate stats - let f_orig_query = match_size as f64 / query.size() as f64; - let f_match = match_size as f64 / match_mh.size() as f64; - let filename = match_path.to_str().unwrap().into(); - let name = match_sig.name(); - let unique_intersect_bp = match_mh.scaled() as usize * match_size; - let gather_result_rank = matches.len(); - - let (intersect_orig, _) = match_mh.intersection_size(query)?; - let intersect_bp = (match_mh.scaled() * intersect_orig) as usize; - - let f_unique_to_query = intersect_orig as f64 / query.size() as f64; - let match_ = match_sig.clone(); - - // TODO: all of these - let f_unique_weighted = 0.; - let average_abund = 0; - let median_abund = 0; - let std_abund = 0; - let md5 = "".into(); - let f_match_orig = 0.; - let remaining_bp = 0; - - let result = GatherResult { - intersect_bp, - f_orig_query, - f_match, - f_unique_to_query, - f_unique_weighted, - average_abund, - median_abund, - std_abund, - filename, - name, - md5, - match_, - f_match_orig, - unique_intersect_bp, - gather_result_rank, - remaining_bp, - }; - matches.push(result); - - // Prepare counter for finding the next match by decrementing - // all hashes found in the current match in other datasets - for hash in match_mh.iter_mins() { - if let Some(color) = self.hash_to_color.get(hash) { - for dataset in self.colors.indices(color) { - counter.entry(*dataset).and_modify(|e| { - if *e > 0 { - *e -= 1 - } - }); - } - } - } - counter.remove(&dataset_id); - } - Ok(matches) - } - - pub fn counter_for_query(&self, query: &KmerMinHash) -> SigCounter { - query - .iter_mins() - .filter_map(|hash| self.hash_to_color.get(hash)) - .flat_map(|color| self.colors.indices(color)) - .cloned() - .collect() - } - - pub fn template(&self) -> Sketch { - self.template.clone() - } - - // TODO: mh should be a sketch, or even a sig... - pub(crate) fn find_signatures( - &self, - mh: &KmerMinHash, - threshold: f64, - containment: bool, - _ignore_scaled: bool, - ) -> Result, Error> { - /* - let template_mh = None; - if let Sketch::MinHash(mh) = self.template { - template_mh = Some(mh); - }; - // TODO: throw error - let template_mh = template_mh.unwrap(); - - let tmp_mh; - let mh = if template_mh.scaled() > mh.scaled() { - // TODO: proper error here - tmp_mh = mh.downsample_scaled(self.scaled)?; - &tmp_mh - } else { - mh - }; - - if self.scaled < mh.scaled() && !ignore_scaled { - return Err(LcaDBError::ScaledMismatchError { - db: self.scaled, - query: mh.scaled(), - } - .into()); - } - */ - - // TODO: proper threshold calculation - let threshold: usize = (threshold * (mh.size() as f64)) as _; - - let counter = self.counter_for_query(mh); - - debug!( - "number of matching signatures for hashes: {}", - counter.len() - ); - - let mut results = vec![]; - for (dataset_id, size) in counter.most_common() { - let match_size = if size >= threshold { size } else { break }; - - let p; - let match_path = if self.sig_files.is_empty() { - p = PathBuf::new(); // TODO: Fix somehow? - &p - } else { - &self.sig_files[dataset_id as usize] - }; - - let ref_match; - let match_sig = if let Some(refsigs) = &self.ref_sigs { - &refsigs[dataset_id as usize] - } else { - // TODO: remove swap_remove - ref_match = Signature::from_path(match_path)?.swap_remove(0); - &ref_match - }; - - let mut match_mh = None; - if let Some(Sketch::MinHash(mh)) = match_sig.select_sketch(&self.template) { - match_mh = Some(mh); - } - let match_mh = match_mh.unwrap(); - - if size >= threshold { - let score = if containment { - size as f64 / mh.size() as f64 - } else { - size as f64 / (mh.size() + match_size - size) as f64 - }; - let filename = match_path.to_str().unwrap().into(); - let mut sig = match_sig.clone(); - sig.reset_sketches(); - sig.push(Sketch::MinHash(match_mh.clone())); - results.push((score, sig, filename)); - } else { - break; - }; - } - Ok(results) - } -} - -#[derive(CopyGetters, Getters, Setters, Serialize, Deserialize, Debug)] -pub struct GatherResult { - #[getset(get_copy = "pub")] - intersect_bp: usize, - - #[getset(get_copy = "pub")] - f_orig_query: f64, - - #[getset(get_copy = "pub")] - f_match: f64, - - f_unique_to_query: f64, - f_unique_weighted: f64, - average_abund: usize, - median_abund: usize, - std_abund: usize, - - #[getset(get = "pub")] - filename: String, - - #[getset(get = "pub")] - name: String, - - md5: String, - match_: Signature, - f_match_orig: f64, - unique_intersect_bp: usize, - gather_result_rank: usize, - remaining_bp: usize, -} - -impl GatherResult { - pub fn get_match(&self) -> Signature { - self.match_.clone() - } -} - -impl<'a> Index<'a> for RevIndex { - type Item = Signature; - - fn insert(&mut self, _node: Self::Item) -> Result<(), Error> { - unimplemented!() - } - - fn save>(&self, _path: P) -> Result<(), Error> { - unimplemented!() - } - - fn load>(_path: P) -> Result<(), Error> { - unimplemented!() - } - - fn len(&self) -> usize { - if let Some(refs) = &self.ref_sigs { - refs.len() - } else { - self.sig_files.len() - } - } - - fn signatures(&self) -> Vec { - if let Some(ref sigs) = self.ref_sigs { - sigs.to_vec() - } else { - unimplemented!() - } - } - - fn signature_refs(&self) -> Vec<&Self::Item> { - unimplemented!() - } -} - -#[cfg(test)] -mod test { - use super::*; - - use crate::sketch::minhash::max_hash_for_scaled; - - #[test] - fn revindex_new() { - let max_hash = max_hash_for_scaled(10000); - let template = Sketch::MinHash( - KmerMinHash::builder() - .num(0u32) - .ksize(31) - .max_hash(max_hash) - .build(), - ); - let search_sigs = [ - "../../tests/test-data/gather/GCF_000006945.2_ASM694v2_genomic.fna.gz.sig".into(), - "../../tests/test-data/gather/GCF_000007545.1_ASM754v1_genomic.fna.gz.sig".into(), - ]; - let index = RevIndex::new(&search_sigs, &template, 0, None, false); - assert_eq!(index.colors.len(), 3); - } - - #[test] - fn revindex_many() { - let max_hash = max_hash_for_scaled(10000); - let template = Sketch::MinHash( - KmerMinHash::builder() - .num(0u32) - .ksize(31) - .max_hash(max_hash) - .build(), - ); - let search_sigs = [ - "../../tests/test-data/gather/GCF_000006945.2_ASM694v2_genomic.fna.gz.sig".into(), - "../../tests/test-data/gather/GCF_000007545.1_ASM754v1_genomic.fna.gz.sig".into(), - "../../tests/test-data/gather/GCF_000008105.1_ASM810v1_genomic.fna.gz.sig".into(), - ]; - - let index = RevIndex::new(&search_sigs, &template, 0, None, false); - /* - dbg!(&index.colors.colors); - 0: 86 - 1: 132 - 2: 91 - (0, 1): 53 - (0, 2): 90 - (1, 2): 26 - (0, 1, 2): 261 - union: 739 - */ - //assert_eq!(index.colors.len(), 3); - assert_eq!(index.colors.len(), 7); - } -} diff --git a/src/core/src/index/revindex/disk_revindex.rs b/src/core/src/index/revindex/disk_revindex.rs new file mode 100644 index 0000000000..37be806343 --- /dev/null +++ b/src/core/src/index/revindex/disk_revindex.rs @@ -0,0 +1,549 @@ +use std::hash::{BuildHasher, BuildHasherDefault, Hash, Hasher}; +use std::path::{Path, PathBuf}; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; + +use byteorder::{LittleEndian, WriteBytesExt}; +use log::{info, trace}; +use rayon::prelude::*; +use rocksdb::{ColumnFamilyDescriptor, MergeOperands, Options}; + +use crate::index::revindex::mem_revindex::GatherResult; +use crate::signature::{Signature, SigsTrait}; +use crate::sketch::minhash::KmerMinHash; +use crate::sketch::Sketch; + +use crate::index::revindex::prepare_query; +use crate::index::revindex::{ + self as module, sig_save_to_db, stats_for_cf, Color, DatasetID, Datasets, HashToColor, + QueryColors, SigCounter, SignatureData, DB, HASHES, SIGS, +}; + +fn compute_color(idxs: &Datasets) -> Color { + let s = BuildHasherDefault::::default(); + let mut hasher = s.build_hasher(); + /* + // TODO: remove this... + let mut sorted: Vec<_> = idxs.iter().collect(); + sorted.sort(); + */ + idxs.hash(&mut hasher); + hasher.finish() +} + +#[derive(Debug, Clone)] +pub struct RevIndex { + db: Arc, +} + +fn merge_datasets( + _: &[u8], + existing_val: Option<&[u8]>, + operands: &MergeOperands, +) -> Option> { + let mut datasets = existing_val + .and_then(Datasets::from_slice) + .unwrap_or_default(); + + for op in operands { + let new_vals = Datasets::from_slice(op).unwrap(); + datasets.union(new_vals); + } + // TODO: optimization! if nothing changed, skip as_bytes() + datasets.as_bytes() +} + +/* TODO: need the repair_cf variant, not available in rocksdb-rust yet +pub fn repair(path: &Path) { + let opts = db_options(); + + DB::repair(&opts, path).unwrap() +} +*/ + +impl RevIndex { + pub fn create(path: &Path) -> module::RevIndex { + let mut opts = module::RevIndex::db_options(); + opts.create_if_missing(true); + opts.create_missing_column_families(true); + + // prepare column family descriptors + let cfs = cf_descriptors(); + + let db = Arc::new(DB::open_cf_descriptors(&opts, path, cfs).unwrap()); + + module::RevIndex::Plain(Self { db }) + } + + pub fn open(path: &Path, read_only: bool) -> module::RevIndex { + let opts = module::RevIndex::db_options(); + + // prepare column family descriptors + let cfs = cf_descriptors(); + + let db = if read_only { + Arc::new(DB::open_cf_descriptors_read_only(&opts, path, cfs, false).unwrap()) + } else { + Arc::new(DB::open_cf_descriptors(&opts, path, cfs).unwrap()) + }; + + module::RevIndex::Plain(Self { db }) + } + + fn map_hashes_colors( + &self, + dataset_id: DatasetID, + filename: &PathBuf, + threshold: f64, + template: &Sketch, + save_paths: bool, + ) { + let search_sig = Signature::from_path(filename) + .unwrap_or_else(|_| panic!("Error processing {:?}", filename)) + .swap_remove(0); + + let search_mh = + prepare_query(&search_sig, template).expect("Couldn't find a compatible MinHash"); + + let colors = Datasets::new(&[dataset_id]).as_bytes().unwrap(); + + let cf_hashes = self.db.cf_handle(HASHES).unwrap(); + + let matched = search_mh.mins(); + let size = matched.len() as u64; + if !matched.is_empty() || size > threshold as u64 { + // FIXME threshold is f64 + let mut hash_bytes = [0u8; 8]; + for hash in matched { + (&mut hash_bytes[..]) + .write_u64::(hash) + .expect("error writing bytes"); + self.db + .merge_cf(&cf_hashes, &hash_bytes[..], colors.as_slice()) + .expect("error merging"); + } + } + + sig_save_to_db( + self.db.clone(), + search_sig, + search_mh, + size, + threshold, + save_paths, + filename, + dataset_id, + ); + } + + pub fn counter_for_query(&self, query: &KmerMinHash) -> SigCounter { + info!("Collecting hashes"); + let cf_hashes = self.db.cf_handle(HASHES).unwrap(); + let hashes_iter = query.iter_mins().map(|hash| { + let mut v = vec![0_u8; 8]; + (&mut v[..]) + .write_u64::(*hash) + .expect("error writing bytes"); + (&cf_hashes, v) + }); + + info!("Multi get"); + self.db + .multi_get_cf(hashes_iter) + .into_iter() + .filter_map(|r| r.ok().unwrap_or(None)) + .flat_map(|raw_datasets| { + let new_vals = Datasets::from_slice(&raw_datasets).unwrap(); + new_vals.into_iter() + }) + .collect() + } + + pub fn prepare_gather_counters( + &self, + query: &KmerMinHash, + ) -> (SigCounter, QueryColors, HashToColor) { + let cf_hashes = self.db.cf_handle(HASHES).unwrap(); + let hashes_iter = query.iter_mins().map(|hash| { + let mut v = vec![0_u8; 8]; + (&mut v[..]) + .write_u64::(*hash) + .expect("error writing bytes"); + (&cf_hashes, v) + }); + + /* + build a HashToColors for query, + and a QueryColors (Color -> Datasets) mapping. + Loading Datasets from rocksdb for every hash takes too long. + */ + let mut query_colors: QueryColors = Default::default(); + let mut counter: SigCounter = Default::default(); + + info!("Building hash_to_colors and query_colors"); + let hash_to_colors = query + .iter_mins() + .zip(self.db.multi_get_cf(hashes_iter)) + .filter_map(|(k, r)| { + let raw = r.ok().unwrap_or(None); + raw.map(|raw| { + let new_vals = Datasets::from_slice(&raw).unwrap(); + let color = compute_color(&new_vals); + query_colors + .entry(color) + .or_insert_with(|| new_vals.clone()); + counter.update(new_vals); + (*k, color) + }) + }) + .collect(); + + (counter, query_colors, hash_to_colors) + } + + pub fn matches_from_counter( + &self, + counter: SigCounter, + threshold: usize, + ) -> Vec<(String, usize)> { + let cf_sigs = self.db.cf_handle(SIGS).unwrap(); + + let matches_iter = counter + .most_common() + .into_iter() + .filter_map(|(dataset_id, size)| { + if size >= threshold { + let mut v = vec![0_u8; 8]; + (&mut v[..]) + .write_u64::(dataset_id) + .expect("error writing bytes"); + Some((&cf_sigs, v, size)) + } else { + None + } + }); + + let matches_sizes = matches_iter.clone().map(|(_, _, v)| v); + + info!("Multi get matches"); + self.db + .multi_get_cf(matches_iter.map(|(k, v, _)| (k, v))) + .into_iter() + .zip(matches_sizes) + .filter_map(|(r, size)| r.ok().unwrap_or(None).map(|v| (v, size))) + .filter_map( + |(sigdata, size)| match SignatureData::from_slice(&sigdata).unwrap() { + SignatureData::Empty => None, + SignatureData::External(p) => Some((p, size)), + SignatureData::Internal(sig) => Some((sig.name(), size)), + }, + ) + .collect() + } + + pub fn gather( + &self, + mut counter: SigCounter, + query_colors: QueryColors, + hash_to_color: HashToColor, + threshold: usize, + orig_query: &KmerMinHash, + template: &Sketch, + ) -> Result, Box> { + let mut match_size = usize::max_value(); + let mut matches = vec![]; + let mut key_bytes = [0u8; 8]; + //let mut query: KmerMinHashBTree = orig_query.clone().into(); + + let cf_sigs = self.db.cf_handle(SIGS).unwrap(); + + while match_size > threshold && !counter.is_empty() { + trace!("counter len: {}", counter.len()); + trace!("match size: {}", match_size); + + let (dataset_id, size) = counter.k_most_common_ordered(1)[0]; + match_size = if size >= threshold { size } else { break }; + + (&mut key_bytes[..]) + .write_u64::(dataset_id) + .expect("error writing bytes"); + + let match_sig = self + .db + .get_cf(&cf_sigs, &key_bytes[..]) + .ok() + .map( + |sigdata| match SignatureData::from_slice(&(sigdata.unwrap())).unwrap() { + SignatureData::Empty => todo!("throw error, empty sig"), + SignatureData::External(_p) => todo!("Load from external"), + SignatureData::Internal(sig) => sig, + }, + ) + .unwrap_or_else(|| panic!("Unknown dataset {}", dataset_id)); + + let match_mh = + prepare_query(&match_sig, template).expect("Couldn't find a compatible MinHash"); + + // Calculate stats + let f_orig_query = match_size as f64 / orig_query.size() as f64; + let f_match = match_size as f64 / match_mh.size() as f64; + let name = match_sig.name(); + let unique_intersect_bp = match_mh.scaled() as usize * match_size; + let gather_result_rank = matches.len(); + + let (intersect_orig, _) = match_mh.intersection_size(orig_query)?; + let intersect_bp = (match_mh.scaled() * intersect_orig) as usize; + + let f_unique_to_query = intersect_orig as f64 / orig_query.size() as f64; + let match_ = match_sig.clone(); + let md5 = match_sig.md5sum(); + + // TODO: all of these + let filename = "".into(); + let f_unique_weighted = 0.; + let average_abund = 0; + let median_abund = 0; + let std_abund = 0; + let f_match_orig = 0.; + let remaining_bp = 0; + + let result = GatherResult::builder() + .intersect_bp(intersect_bp) + .f_orig_query(f_orig_query) + .f_match(f_match) + .f_unique_to_query(f_unique_to_query) + .f_unique_weighted(f_unique_weighted) + .average_abund(average_abund) + .median_abund(median_abund) + .std_abund(std_abund) + .filename(filename) + .name(name) + .md5(md5) + .match_(match_) + .f_match_orig(f_match_orig) + .unique_intersect_bp(unique_intersect_bp) + .gather_result_rank(gather_result_rank) + .remaining_bp(remaining_bp) + .build(); + matches.push(result); + + trace!("Preparing counter for next round"); + // Prepare counter for finding the next match by decrementing + // all hashes found in the current match in other datasets + // TODO: not used at the moment, so just skip. + //query.remove_many(match_mh.to_vec().as_slice())?; + + // TODO: Use HashesToColors here instead. If not initialized, + // build it. + match_mh + .iter_mins() + .filter_map(|hash| hash_to_color.get(hash)) + .flat_map(|color| { + // TODO: remove this clone + query_colors.get(color).unwrap().clone().into_iter() + }) + .for_each(|dataset| { + // TODO: collect the flat_map into a Counter, and remove more + // than one at a time... + counter.entry(dataset).and_modify(|e| { + if *e > 0 { + *e -= 1 + } + }); + }); + + counter.remove(&dataset_id); + } + Ok(matches) + } + + pub fn index( + &self, + index_sigs: Vec, + template: &Sketch, + threshold: f64, + save_paths: bool, + ) { + let processed_sigs = AtomicUsize::new(0); + + index_sigs + .par_iter() + .enumerate() + .for_each(|(dataset_id, filename)| { + let i = processed_sigs.fetch_add(1, Ordering::SeqCst); + if i % 1000 == 0 { + info!("Processed {} reference sigs", i); + } + + self.map_hashes_colors( + dataset_id as DatasetID, + filename, + threshold, + template, + save_paths, + ); + }); + info!("Processed {} reference sigs", processed_sigs.into_inner()); + } + + pub fn update( + &self, + index_sigs: Vec, + template: &Sketch, + threshold: f64, + save_paths: bool, + ) { + use byteorder::ReadBytesExt; + + if !save_paths { + todo!("only supports with save_paths=True for now"); + } + + let cf_sigs = self.db.cf_handle(SIGS).unwrap(); + let iter = self.db.iterator_cf(&cf_sigs, rocksdb::IteratorMode::Start); + + info!("Verifying existing sigs"); + // verify data match up to this point + let mut max_dataset_id = 0; + let to_skip = iter + .map(|result| { + let (key, value) = result.unwrap(); + let current_dataset_id = (&key[..]).read_u64::().unwrap(); + + let filename = &index_sigs[current_dataset_id as usize]; + let sig_data = SignatureData::from_slice(&value).unwrap(); + match sig_data { + SignatureData::External(sig) => { + assert_eq!(sig, filename.as_os_str().to_str().unwrap().to_string()) + } + SignatureData::Empty => (), + SignatureData::Internal(_) => { + todo!("only supports with save_paths=True for now") + } + }; + max_dataset_id = max_dataset_id.max(current_dataset_id); + }) + .count(); + + max_dataset_id += 1; + assert_eq!(max_dataset_id as usize, to_skip); + + // process the remainder + let processed_sigs = AtomicUsize::new(0); + + index_sigs + .par_iter() + .skip(to_skip) + .enumerate() + .for_each(|(i, filename)| { + let dataset_id = i + to_skip; + + let i = processed_sigs.fetch_add(1, Ordering::SeqCst); + if i % 1000 == 0 { + info!("Processed {} reference sigs", i); + } + + self.map_hashes_colors( + dataset_id as DatasetID, + filename, + threshold, + template, + save_paths, + ); + }); + + info!( + "Processed additional {} reference sigs", + processed_sigs.into_inner() + ); + } + + pub fn check(&self, quick: bool) { + stats_for_cf(self.db.clone(), HASHES, true, quick); + info!(""); + stats_for_cf(self.db.clone(), SIGS, false, quick); + } + + pub fn compact(&self) { + for cf_name in [HASHES, SIGS] { + let cf = self.db.cf_handle(cf_name).unwrap(); + self.db.compact_range_cf(&cf, None::<&[u8]>, None::<&[u8]>) + } + } + + pub fn flush(&self) -> Result<(), Box> { + self.db.flush_wal(true)?; + + for cf_name in [HASHES, SIGS] { + let cf = self.db.cf_handle(cf_name).unwrap(); + self.db.flush_cf(&cf)?; + } + + Ok(()) + } + + pub fn convert(&self, _output_db: module::RevIndex) -> Result<(), Box> { + todo!() + /* + if let RevIndex::Color(db) = output_db { + let other_db = db.db; + + let cf_hashes = self.db.cf_handle(HASHES).unwrap(); + + info!("start converting colors"); + let mut color_bytes = [0u8; 8]; + let iter = self + .db + .iterator_cf(&cf_hashes, rocksdb::IteratorMode::Start); + for (key, value) in iter { + let datasets = Datasets::from_slice(&value).unwrap(); + let new_idx: Vec<_> = datasets.into_iter().collect(); + let new_color = Colors::update(other_db.clone(), None, new_idx.as_slice()).unwrap(); + + (&mut color_bytes[..]) + .write_u64::(new_color) + .expect("error writing bytes"); + other_db + .put_cf(&cf_hashes, &key[..], &color_bytes[..]) + .unwrap(); + } + info!("finished converting colors"); + + info!("copying sigs to output"); + let cf_sigs = self.db.cf_handle(SIGS).unwrap(); + let iter = self.db.iterator_cf(&cf_sigs, rocksdb::IteratorMode::Start); + for (key, value) in iter { + other_db.put_cf(&cf_sigs, &key[..], &value[..]).unwrap(); + } + info!("finished copying sigs to output"); + + Ok(()) + } else { + todo!() + } + */ + } +} + +fn cf_descriptors() -> Vec { + let mut cfopts = Options::default(); + cfopts.set_max_write_buffer_number(16); + cfopts.set_merge_operator_associative("datasets operator", merge_datasets); + cfopts.set_min_write_buffer_number_to_merge(10); + + // Updated default from + // https://github.com/facebook/rocksdb/wiki/Setup-Options-and-Basic-Tuning#other-general-options + cfopts.set_level_compaction_dynamic_level_bytes(true); + + let cf_hashes = ColumnFamilyDescriptor::new(HASHES, cfopts); + + let mut cfopts = Options::default(); + cfopts.set_max_write_buffer_number(16); + // Updated default + cfopts.set_level_compaction_dynamic_level_bytes(true); + //cfopts.set_merge_operator_associative("colors operator", merge_colors); + + let cf_sigs = ColumnFamilyDescriptor::new(SIGS, cfopts); + + vec![cf_hashes, cf_sigs] +} diff --git a/src/core/src/index/revindex/mem_revindex.rs b/src/core/src/index/revindex/mem_revindex.rs new file mode 100644 index 0000000000..ed8a1b5152 --- /dev/null +++ b/src/core/src/index/revindex/mem_revindex.rs @@ -0,0 +1,1118 @@ +use std::collections::{HashMap, HashSet}; +use std::path::{Path, PathBuf}; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; + +use getset::{CopyGetters, Getters, Setters}; +use log::{debug, info}; +use nohash_hasher::BuildNoHashHasher; +use serde::{Deserialize, Serialize}; +use typed_builder::TypedBuilder; + +#[cfg(feature = "parallel")] +use rayon::prelude::*; + +use crate::encodings::{Color, Colors, Idx}; +use crate::index::{Index, Selection, SigStore}; +use crate::manifest::Manifest; +use crate::signature::{Signature, SigsTrait}; +use crate::sketch::minhash::KmerMinHash; +use crate::sketch::Sketch; +use crate::storage::{Storage, ZipStorage}; +use crate::Error; +use crate::HashIntoType; + +type SigCounter = counter::Counter; + +#[derive(Serialize, Deserialize)] +struct HashToColor(HashMap>); + +impl HashToColor { + fn new() -> Self { + HashToColor(HashMap::< + HashIntoType, + Color, + BuildNoHashHasher, + >::with_hasher(BuildNoHashHasher::default())) + } + + fn get(&self, hash: &HashIntoType) -> Option<&Color> { + self.0.get(hash) + } + + fn retain(&mut self, hashes: &HashSet) { + self.0.retain(|hash, _| hashes.contains(hash)) + } + + fn len(&self) -> usize { + self.0.len() + } + + fn is_empty(&self) -> bool { + self.0.is_empty() + } + + fn add_to(&mut self, colors: &mut Colors, dataset_id: usize, matched_hashes: Vec) { + let mut color = None; + + matched_hashes.into_iter().for_each(|hash| { + color = Some(colors.update(color, &[dataset_id as Idx]).unwrap()); + self.0.insert(hash, color.unwrap()); + }); + } + + fn reduce_hashes_colors( + a: (HashToColor, Colors), + b: (HashToColor, Colors), + ) -> (HashToColor, Colors) { + let ((small_hashes, small_colors), (mut large_hashes, mut large_colors)) = + if a.0.len() > b.0.len() { + (b, a) + } else { + (a, b) + }; + + small_hashes.0.into_iter().for_each(|(hash, color)| { + large_hashes + .0 + .entry(hash) + .and_modify(|entry| { + // Hash is already present. + // Update the current color by adding the indices from + // small_colors. + let ids = small_colors.indices(&color); + let new_color = large_colors.update(Some(*entry), ids).unwrap(); + *entry = new_color; + }) + .or_insert_with(|| { + // In this case, the hash was not present yet. + // we need to create the same color from small_colors + // into large_colors. + let ids = small_colors.indices(&color); + let new_color = large_colors.update(None, ids).unwrap(); + assert_eq!(new_color, color); + new_color + }); + }); + + (large_hashes, large_colors) + } +} + +// Use rkyv for serialization? +// https://davidkoloski.me/rkyv/ +#[derive(Serialize, Deserialize)] +pub struct RevIndex { + linear: LinearRevIndex, + hash_to_color: HashToColor, + colors: Colors, +} + +#[derive(Serialize, Deserialize)] +pub struct LinearRevIndex { + sig_files: Manifest, + + #[serde(skip)] + ref_sigs: Option>, + + template: Sketch, + + #[serde(skip)] + storage: Option>, +} + +impl LinearRevIndex { + pub fn new( + sig_files: Option, + template: &Sketch, + keep_sigs: bool, + ref_sigs: Option>, + storage: Option, + ) -> Self { + if ref_sigs.is_none() && sig_files.is_none() { + todo!("throw error, one need to be set"); + } + + let ref_sigs = if let Some(ref_sigs) = ref_sigs { + Some(ref_sigs.into_iter().map(|m| m.into()).collect()) + } else if keep_sigs { + let search_sigs: Vec<_> = sig_files + .as_ref() + .unwrap() + .internal_locations() + .map(PathBuf::from) + .collect(); + + #[cfg(feature = "parallel")] + let sigs_iter = search_sigs.par_iter(); + + #[cfg(not(feature = "parallel"))] + let sigs_iter = search_sigs.iter(); + + Some( + sigs_iter + .map(|ref_path| { + if let Some(storage) = &storage { + let sig_data = storage + .load(ref_path.to_str().unwrap_or_else(|| { + panic!("error converting path {:?}", ref_path) + })) + .unwrap_or_else(|_| panic!("error loading {:?}", ref_path)); + Signature::from_reader(sig_data.as_slice()) + .unwrap_or_else(|_| panic!("Error processing {:?}", ref_path)) + .swap_remove(0) + .into() + } else { + Signature::from_path(ref_path) + .unwrap_or_else(|_| panic!("Error processing {:?}", ref_path)) + .swap_remove(0) + .into() + } + }) + .collect(), + ) + } else { + None + }; + + let storage = storage.map(Arc::new); + + let sig_files = sig_files.unwrap_or_else(|| { + todo!("generate manifest for ref_sigs"); + }); + + LinearRevIndex { + sig_files, + template: template.clone(), + ref_sigs, + storage, + } + } + + fn index( + self, + threshold: usize, + merged_query: Option, + queries: Option<&[KmerMinHash]>, + ) -> RevIndex { + let processed_sigs = AtomicUsize::new(0); + + let search_sigs: Vec<_> = self + .sig_files + .internal_locations() + .map(PathBuf::from) + .collect(); + + #[cfg(feature = "parallel")] + let sig_iter = search_sigs.par_iter(); + + #[cfg(not(feature = "parallel"))] + let sig_iter = search_sigs.iter(); + + let filtered_sigs = sig_iter.enumerate().filter_map(|(dataset_id, filename)| { + let i = processed_sigs.fetch_add(1, Ordering::SeqCst); + if i % 1000 == 0 { + info!("Processed {} reference sigs", i); + } + + let search_sig = if let Some(storage) = &self.storage { + let sig_data = storage + .load( + filename + .to_str() + .unwrap_or_else(|| panic!("error converting path {:?}", filename)), + ) + .unwrap_or_else(|_| panic!("error loading {:?}", filename)); + + Signature::from_reader(sig_data.as_slice()) + } else { + Signature::from_path(filename) + } + .unwrap_or_else(|_| panic!("Error processing {:?}", filename)) + .swap_remove(0); + + RevIndex::map_hashes_colors( + dataset_id, + &search_sig, + queries, + &merged_query, + threshold, + &self.template, + ) + }); + + #[cfg(feature = "parallel")] + let (hash_to_color, colors) = filtered_sigs.reduce( + || (HashToColor::new(), Colors::default()), + HashToColor::reduce_hashes_colors, + ); + + #[cfg(not(feature = "parallel"))] + let (hash_to_color, colors) = filtered_sigs.fold( + (HashToColor::new(), Colors::default()), + HashToColor::reduce_hashes_colors, + ); + + RevIndex { + hash_to_color, + colors, + linear: self, + } + } + + pub fn location(&self) -> Option { + if let Some(storage) = &self.storage { + storage.path() + } else { + None + } + } + + pub fn storage(&self) -> Option> { + self.storage.clone() + } + + pub fn select(mut self, selection: &Selection) -> Result { + let manifest = self.sig_files.select_to_manifest(selection)?; + self.sig_files = manifest; + + Ok(self) + /* + # if we have a manifest, run 'select' on the manifest. + manifest = self.manifest + traverse_yield_all = self.traverse_yield_all + + if manifest is not None: + manifest = manifest.select_to_manifest(**kwargs) + return ZipFileLinearIndex(self.storage, + selection_dict=None, + traverse_yield_all=traverse_yield_all, + manifest=manifest, + use_manifest=True) + else: + # no manifest? just pass along all the selection kwargs to + # the new ZipFileLinearIndex. + + assert manifest is None + if self.selection_dict: + # combine selects... + d = dict(self.selection_dict) + for k, v in kwargs.items(): + if k in d: + if d[k] is not None and d[k] != v: + raise ValueError(f"incompatible select on '{k}'") + d[k] = v + kwargs = d + + return ZipFileLinearIndex(self.storage, + selection_dict=kwargs, + traverse_yield_all=traverse_yield_all, + manifest=None, + use_manifest=False) + */ + } + + pub fn counter_for_query(&self, query: &KmerMinHash) -> SigCounter { + let processed_sigs = AtomicUsize::new(0); + + // TODO: Some(ref_sigs) case + + let search_sigs: Vec<_> = self + .sig_files + .internal_locations() + .map(PathBuf::from) + .collect(); + + #[cfg(feature = "parallel")] + let sig_iter = search_sigs.par_iter(); + + #[cfg(not(feature = "parallel"))] + let sig_iter = search_sigs.iter(); + + let counters = sig_iter.enumerate().filter_map(|(dataset_id, filename)| { + let i = processed_sigs.fetch_add(1, Ordering::SeqCst); + if i % 1000 == 0 { + info!("Processed {} reference sigs", i); + } + + let search_sig = if let Some(storage) = &self.storage { + let sig_data = storage + .load( + filename + .to_str() + .unwrap_or_else(|| panic!("error converting path {:?}", filename)), + ) + .unwrap_or_else(|_| panic!("error loading {:?}", filename)); + + Signature::from_reader(sig_data.as_slice()) + } else { + Signature::from_path(filename) + } + .unwrap_or_else(|_| panic!("Error processing {:?}", filename)) + .swap_remove(0); + + let mut search_mh = None; + if let Some(Sketch::MinHash(mh)) = search_sig.select_sketch(&self.template) { + search_mh = Some(mh); + }; + let search_mh = search_mh.expect("Couldn't find a compatible MinHash"); + + let (large_mh, small_mh) = if query.size() > search_mh.size() { + (query, search_mh) + } else { + (search_mh, query) + }; + + let (size, _) = small_mh + .intersection_size(large_mh) + .unwrap_or_else(|_| panic!("error computing intersection for {:?}", filename)); + + if size == 0 { + None + } else { + let mut counter: SigCounter = Default::default(); + counter[&(dataset_id as u64)] += size as usize; + Some(counter) + } + }); + + let reduce_counters = |mut a: SigCounter, b: SigCounter| { + a.extend(&b); + a + }; + + #[cfg(feature = "parallel")] + let counter = counters.reduce(SigCounter::new, reduce_counters); + + #[cfg(not(feature = "parallel"))] + let counter = counters.fold(SigCounter::new(), reduce_counters); + + counter + } + + pub fn search( + &self, + counter: SigCounter, + similarity: bool, + threshold: usize, + ) -> Result, Box> { + let mut matches = vec![]; + if similarity { + unimplemented!("TODO: threshold correction") + } + + for (dataset_id, size) in counter.most_common() { + if size >= threshold { + matches.push( + self.sig_files[dataset_id as usize] + .internal_location() + .to_str() + .unwrap() + .into(), + ); + } else { + break; + }; + } + Ok(matches) + } + + fn gather_round( + &self, + dataset_id: u64, + match_size: usize, + query: &KmerMinHash, + round: usize, + ) -> Result { + let match_path = if self.sig_files.is_empty() { + PathBuf::new() + } else { + self.sig_files[dataset_id as usize].internal_location() + }; + let match_sig = self.sig_for_dataset(dataset_id as usize)?; + let result = self.stats_for_match(&match_sig, query, match_size, match_path, round)?; + Ok(result) + } + + fn sig_for_dataset(&self, dataset_id: usize) -> Result { + let match_path = if self.sig_files.is_empty() { + PathBuf::new() + } else { + self.sig_files[dataset_id].internal_location() + }; + + let match_sig = if let Some(refsigs) = &self.ref_sigs { + refsigs[dataset_id].clone() + } else { + let mut sig = if let Some(storage) = &self.storage { + let sig_data = storage + .load( + match_path + .to_str() + .unwrap_or_else(|| panic!("error converting path {:?}", match_path)), + ) + .unwrap_or_else(|_| panic!("error loading {:?}", match_path)); + Signature::from_reader(sig_data.as_slice())? + } else { + Signature::from_path(&match_path)? + }; + // TODO: remove swap_remove + sig.swap_remove(0).into() + }; + Ok(match_sig) + } + + fn stats_for_match( + &self, + match_sig: &Signature, + query: &KmerMinHash, + match_size: usize, + match_path: PathBuf, + gather_result_rank: usize, + ) -> Result { + let mut match_mh = None; + if let Some(Sketch::MinHash(mh)) = match_sig.select_sketch(&self.template) { + match_mh = Some(mh); + } + let match_mh = match_mh.expect("Couldn't find a compatible MinHash"); + + // Calculate stats + let f_orig_query = match_size as f64 / query.size() as f64; + let f_match = match_size as f64 / match_mh.size() as f64; + let filename = match_path.to_str().unwrap().into(); + let name = match_sig.name(); + let unique_intersect_bp = match_mh.scaled() as usize * match_size; + + let (intersect_orig, _) = match_mh.intersection_size(query)?; + let intersect_bp = (match_mh.scaled() * intersect_orig) as usize; + + let f_unique_to_query = intersect_orig as f64 / query.size() as f64; + let match_ = match_sig.clone(); + + // TODO: all of these + let f_unique_weighted = 0.; + let average_abund = 0; + let median_abund = 0; + let std_abund = 0; + let md5 = "".into(); + let f_match_orig = 0.; + let remaining_bp = 0; + + Ok(GatherResult { + intersect_bp, + f_orig_query, + f_match, + f_unique_to_query, + f_unique_weighted, + average_abund, + median_abund, + std_abund, + filename, + name, + md5, + match_, + f_match_orig, + unique_intersect_bp, + gather_result_rank, + remaining_bp, + }) + } + + pub fn gather( + &self, + mut counter: SigCounter, + threshold: usize, + query: &KmerMinHash, + ) -> Result, Box> { + let mut match_size = usize::max_value(); + let mut matches = vec![]; + + while match_size > threshold && !counter.is_empty() { + let (dataset_id, size) = counter.most_common()[0]; + if threshold == 0 && size == 0 { + break; + } + + match_size = if size >= threshold { + size + } else { + break; + }; + + let result = self.gather_round(dataset_id, match_size, query, matches.len())?; + + // Prepare counter for finding the next match by decrementing + // all hashes found in the current match in other datasets + // TODO: maybe par_iter? + let mut to_remove: HashSet = Default::default(); + to_remove.insert(dataset_id); + + for (dataset, value) in counter.iter_mut() { + let dataset_sig = self.sig_for_dataset(*dataset as usize)?; + let mut match_mh = None; + if let Some(Sketch::MinHash(mh)) = dataset_sig.select_sketch(&self.template) { + match_mh = Some(mh); + } + let match_mh = match_mh.expect("Couldn't find a compatible MinHash"); + + let (intersection, _) = query.intersection_size(match_mh)?; + if intersection as usize > *value { + to_remove.insert(*dataset); + } else { + *value -= intersection as usize; + }; + } + to_remove.iter().for_each(|dataset_id| { + counter.remove(dataset_id); + }); + matches.push(result); + } + Ok(matches) + } + + pub fn manifest(&self) -> Manifest { + self.sig_files.clone() + } + + pub fn set_manifest(&mut self, new_manifest: Manifest) -> Result<(), Error> { + self.sig_files = new_manifest; + Ok(()) + } + + pub fn signatures_iter(&self) -> impl Iterator + '_ { + if let Some(_sigs) = &self.ref_sigs { + //sigs.iter().cloned() + todo!("this works, but need to match return types") + } else { + // FIXME temp solution, must find better one! + (0..self.sig_files.len()) + .map(move |dataset_id| self.sig_for_dataset(dataset_id).expect("error loading sig")) + } + } +} + +impl<'a> Index<'a> for LinearRevIndex { + type Item = SigStore; + + fn insert(&mut self, _node: Self::Item) -> Result<(), Error> { + unimplemented!() + } + + fn save>(&self, _path: P) -> Result<(), Error> { + unimplemented!() + } + + fn load>(_path: P) -> Result<(), Error> { + unimplemented!() + } + + fn len(&self) -> usize { + if let Some(refs) = &self.ref_sigs { + refs.len() + } else { + self.sig_files.len() + } + } + + fn signatures(&self) -> Vec { + if let Some(ref sigs) = self.ref_sigs { + sigs.to_vec() + } else { + unimplemented!() + } + } + + fn signature_refs(&self) -> Vec<&Self::Item> { + unimplemented!() + } +} + +impl RevIndex { + pub fn load>( + index_path: P, + queries: Option<&[KmerMinHash]>, + ) -> Result> { + let (rdr, _) = niffler::from_path(index_path)?; + let revindex = if let Some(qs) = queries { + // TODO: avoid loading full revindex if query != None + /* + struct PartialRevIndex { + hashes_to_keep: Option>, + marker: PhantomData T>, + } + + impl PartialRevIndex { + pub fn new(hashes_to_keep: HashSet) -> Self { + PartialRevIndex { + hashes_to_keep: Some(hashes_to_keep), + marker: PhantomData, + } + } + } + */ + + let mut hashes: HashSet = HashSet::new(); + for q in qs { + hashes.extend(q.iter_mins()); + } + + //let mut revindex: RevIndex = PartialRevIndex::new(hashes).deserialize(&rdr).unwrap(); + + let mut revindex: RevIndex = serde_json::from_reader(rdr)?; + revindex.hash_to_color.retain(&hashes); + revindex + } else { + // Load the full revindex + serde_json::from_reader(rdr)? + }; + + Ok(revindex) + } + + pub fn new( + search_sigs: &[PathBuf], + template: &Sketch, + threshold: usize, + queries: Option<&[KmerMinHash]>, + keep_sigs: bool, + ) -> RevIndex { + // If threshold is zero, let's merge all queries and save time later + let merged_query = queries.and_then(|qs| Self::merge_queries(qs, threshold)); + + let linear = LinearRevIndex::new(Some(search_sigs.into()), template, keep_sigs, None, None); + linear.index(threshold, merged_query, queries) + } + + pub fn from_zipstorage( + storage: ZipStorage, + template: &Sketch, + threshold: usize, + queries: Option<&[KmerMinHash]>, + keep_sigs: bool, + ) -> Result { + // If threshold is zero, let's merge all queries and save time later + let merged_query = queries.and_then(|qs| Self::merge_queries(qs, threshold)); + + // Load manifest from zipstorage + let manifest = Manifest::from_reader(storage.load("SOURMASH-MANIFEST.csv")?.as_slice())?; + let search_sigs: Vec<_> = manifest.internal_locations().map(PathBuf::from).collect(); + + let linear = LinearRevIndex::new( + Some(search_sigs.as_slice().into()), + template, + keep_sigs, + None, + Some(storage), + ); + + Ok(linear.index(threshold, merged_query, queries)) + } + + fn merge_queries(qs: &[KmerMinHash], threshold: usize) -> Option { + if threshold == 0 { + let mut merged = qs[0].clone(); + for query in &qs[1..] { + merged.merge(query).unwrap(); + } + Some(merged) + } else { + None + } + } + + pub fn new_with_sigs( + search_sigs: Vec, + template: &Sketch, + threshold: usize, + queries: Option<&[KmerMinHash]>, + ) -> RevIndex { + // If threshold is zero, let's merge all queries and save time later + let merged_query = queries.and_then(|qs| Self::merge_queries(qs, threshold)); + + let linear = LinearRevIndex::new( + Default::default(), + template, + false, + search_sigs.into(), + None, + ); + + linear.index(threshold, merged_query, queries) + } + + fn map_hashes_colors( + dataset_id: usize, + search_sig: &Signature, + queries: Option<&[KmerMinHash]>, + merged_query: &Option, + threshold: usize, + template: &Sketch, + ) -> Option<(HashToColor, Colors)> { + let mut search_mh = None; + if let Some(Sketch::MinHash(mh)) = search_sig.select_sketch(template) { + search_mh = Some(mh); + } + + let search_mh = search_mh.expect("Couldn't find a compatible MinHash"); + let mut hash_to_color = HashToColor::new(); + let mut colors = Colors::default(); + + if let Some(qs) = queries { + if let Some(ref merged) = merged_query { + let (matched_hashes, intersection) = merged.intersection(search_mh).unwrap(); + if !matched_hashes.is_empty() || intersection > threshold as u64 { + hash_to_color.add_to(&mut colors, dataset_id, matched_hashes); + } + } else { + for query in qs { + let (matched_hashes, intersection) = query.intersection(search_mh).unwrap(); + if !matched_hashes.is_empty() || intersection > threshold as u64 { + hash_to_color.add_to(&mut colors, dataset_id, matched_hashes); + } + } + } + } else { + let matched = search_mh.mins(); + let size = matched.len() as u64; + if !matched.is_empty() || size > threshold as u64 { + hash_to_color.add_to(&mut colors, dataset_id, matched); + } + }; + + if hash_to_color.is_empty() { + None + } else { + Some((hash_to_color, colors)) + } + } + + pub fn counter_for_query(&self, query: &KmerMinHash) -> SigCounter { + query + .iter_mins() + .filter_map(|hash| self.hash_to_color.get(hash)) + .flat_map(|color| self.colors.indices(color)) + .cloned() + .collect() + } + + pub fn search( + &self, + counter: SigCounter, + similarity: bool, + threshold: usize, + ) -> Result, Box> { + self.linear.search(counter, similarity, threshold) + } + + pub fn gather( + &self, + mut counter: SigCounter, + threshold: usize, + query: &KmerMinHash, + ) -> Result, Box> { + let mut match_size = usize::max_value(); + let mut matches = vec![]; + + while match_size > threshold && !counter.is_empty() { + let (dataset_id, size) = counter.most_common()[0]; + match_size = if size >= threshold { size } else { break }; + let result = self + .linear + .gather_round(dataset_id, match_size, query, matches.len())?; + if let Some(Sketch::MinHash(match_mh)) = + result.match_.select_sketch(&self.linear.template) + { + // Prepare counter for finding the next match by decrementing + // all hashes found in the current match in other datasets + for hash in match_mh.iter_mins() { + if let Some(color) = self.hash_to_color.get(hash) { + counter.subtract(self.colors.indices(color).cloned()); + } + } + counter.remove(&dataset_id); + matches.push(result); + } else { + unimplemented!() + } + } + Ok(matches) + } + + pub fn template(&self) -> Sketch { + self.linear.template.clone() + } + + // TODO: mh should be a sketch, or even a sig... + pub(crate) fn find_signatures( + &self, + mh: &KmerMinHash, + threshold: f64, + containment: bool, + _ignore_scaled: bool, + ) -> Result, Error> { + /* + let template_mh = None; + if let Sketch::MinHash(mh) = self.template { + template_mh = Some(mh); + }; + // TODO: throw error + let template_mh = template_mh.unwrap(); + + let tmp_mh; + let mh = if template_mh.scaled() > mh.scaled() { + // TODO: proper error here + tmp_mh = mh.downsample_scaled(self.scaled)?; + &tmp_mh + } else { + mh + }; + + if self.scaled < mh.scaled() && !ignore_scaled { + return Err(LcaDBError::ScaledMismatchError { + db: self.scaled, + query: mh.scaled(), + } + .into()); + } + */ + + // TODO: proper threshold calculation + let threshold: usize = (threshold * (mh.size() as f64)) as _; + + let counter = self.counter_for_query(mh); + + debug!( + "number of matching signatures for hashes: {}", + counter.len() + ); + + let mut results = vec![]; + for (dataset_id, size) in counter.most_common() { + let match_size = if size >= threshold { size } else { break }; + + let match_path = if self.linear.sig_files.is_empty() { + PathBuf::new() + } else { + self.linear.sig_files[dataset_id as usize].internal_location() + }; + + let ref_match; + let match_sig = if let Some(refsigs) = &self.linear.ref_sigs { + &refsigs[dataset_id as usize] + } else { + let mut sig = if let Some(storage) = &self.linear.storage { + let sig_data = + storage + .load(match_path.to_str().unwrap_or_else(|| { + panic!("error converting path {:?}", match_path) + })) + .unwrap_or_else(|_| panic!("error loading {:?}", match_path)); + Signature::from_reader(sig_data.as_slice())? + } else { + Signature::from_path(&match_path)? + }; + // TODO: remove swap_remove + ref_match = sig.swap_remove(0); + &ref_match + }; + + let mut match_mh = None; + if let Some(Sketch::MinHash(mh)) = match_sig.select_sketch(&self.linear.template) { + match_mh = Some(mh); + } + let match_mh = match_mh.unwrap(); + + if size >= threshold { + let score = if containment { + size as f64 / mh.size() as f64 + } else { + size as f64 / (mh.size() + match_size - size) as f64 + }; + let filename = match_path.to_str().unwrap().into(); + let mut sig = match_sig.clone(); + sig.reset_sketches(); + sig.push(Sketch::MinHash(match_mh.clone())); + results.push((score, sig, filename)); + } else { + break; + }; + } + Ok(results) + } +} + +#[derive(TypedBuilder, CopyGetters, Getters, Setters, Serialize, Deserialize, Debug, PartialEq)] +pub struct GatherResult { + #[getset(get_copy = "pub")] + intersect_bp: usize, + + #[getset(get_copy = "pub")] + f_orig_query: f64, + + #[getset(get_copy = "pub")] + f_match: f64, + + f_unique_to_query: f64, + f_unique_weighted: f64, + average_abund: usize, + median_abund: usize, + std_abund: usize, + + #[getset(get = "pub")] + filename: String, + + #[getset(get = "pub")] + name: String, + + #[getset(get = "pub")] + md5: String, + match_: Signature, + f_match_orig: f64, + unique_intersect_bp: usize, + gather_result_rank: usize, + remaining_bp: usize, +} + +impl GatherResult { + pub fn get_match(&self) -> Signature { + self.match_.clone() + } +} + +impl<'a> Index<'a> for RevIndex { + type Item = Signature; + + fn insert(&mut self, _node: Self::Item) -> Result<(), Error> { + unimplemented!() + } + + fn save>(&self, _path: P) -> Result<(), Error> { + unimplemented!() + } + + fn load>(_path: P) -> Result<(), Error> { + unimplemented!() + } + + fn len(&self) -> usize { + if let Some(refs) = &self.linear.ref_sigs { + refs.len() + } else { + self.linear.sig_files.len() + } + } + + fn signatures(&self) -> Vec { + if let Some(ref sigs) = self.linear.ref_sigs { + sigs.iter().map(|s| s.clone().into()).collect() + } else { + unimplemented!() + } + } + + fn signature_refs(&self) -> Vec<&Self::Item> { + unimplemented!() + } +} + +#[cfg(test)] +mod test { + use super::*; + + use crate::sketch::minhash::max_hash_for_scaled; + + #[test] + fn revindex_new() { + let max_hash = max_hash_for_scaled(10000); + let template = Sketch::MinHash( + KmerMinHash::builder() + .num(0u32) + .ksize(31) + .max_hash(max_hash) + .build(), + ); + let search_sigs = [ + "../../tests/test-data/gather/GCF_000006945.2_ASM694v2_genomic.fna.gz.sig".into(), + "../../tests/test-data/gather/GCF_000007545.1_ASM754v1_genomic.fna.gz.sig".into(), + ]; + let index = RevIndex::new(&search_sigs, &template, 0, None, false); + assert_eq!(index.colors.len(), 3); + } + + #[test] + fn revindex_many() { + let max_hash = max_hash_for_scaled(10000); + let template = Sketch::MinHash( + KmerMinHash::builder() + .num(0u32) + .ksize(31) + .max_hash(max_hash) + .build(), + ); + let search_sigs = [ + "../../tests/test-data/gather/GCF_000006945.2_ASM694v2_genomic.fna.gz.sig".into(), + "../../tests/test-data/gather/GCF_000007545.1_ASM754v1_genomic.fna.gz.sig".into(), + "../../tests/test-data/gather/GCF_000008105.1_ASM810v1_genomic.fna.gz.sig".into(), + ]; + + let index = RevIndex::new(&search_sigs, &template, 0, None, false); + /* + dbg!(&index.colors.colors); + 0: 86 + 1: 132 + 2: 91 + (0, 1): 53 + (0, 2): 90 + (1, 2): 26 + (0, 1, 2): 261 + union: 739 + */ + //assert_eq!(index.colors.len(), 3); + assert_eq!(index.colors.len(), 7); + } + + #[test] + fn revindex_from_zipstorage() { + let max_hash = max_hash_for_scaled(100); + let template = Sketch::MinHash( + KmerMinHash::builder() + .num(0u32) + .ksize(57) + .hash_function(crate::encodings::HashFunctions::murmur64_protein) + .max_hash(max_hash) + .build(), + ); + let storage = ZipStorage::from_file("../../tests/test-data/prot/protein.zip") + .expect("error loading zipfile"); + let index = RevIndex::from_zipstorage(storage, &template, 0, None, false) + .expect("error building from ziptorage"); + + assert_eq!(index.colors.len(), 3); + + let query_sig = Signature::from_path( + "../../tests/test-data/prot/protein/GCA_001593925.1_ASM159392v1_protein.faa.gz.sig", + ) + .expect("Error processing query") + .swap_remove(0); + let mut query_mh = None; + if let Some(Sketch::MinHash(mh)) = query_sig.select_sketch(&template) { + query_mh = Some(mh); + } + let query_mh = query_mh.expect("Couldn't find a compatible MinHash"); + + let counter_rev = index.counter_for_query(query_mh); + let counter_lin = index.linear.counter_for_query(query_mh); + + let results_rev = index.search(counter_rev, false, 0).unwrap(); + let results_linear = index.linear.search(counter_lin, false, 0).unwrap(); + assert_eq!(results_rev, results_linear); + + let counter_rev = index.counter_for_query(query_mh); + let counter_lin = index.linear.counter_for_query(query_mh); + + let results_rev = index.gather(counter_rev, 0, query_mh).unwrap(); + let results_linear = index.linear.gather(counter_lin, 0, query_mh).unwrap(); + assert_eq!(results_rev.len(), 1); + assert_eq!(results_rev, results_linear); + } +} diff --git a/src/core/src/index/revindex/mod.rs b/src/core/src/index/revindex/mod.rs new file mode 100644 index 0000000000..314344620d --- /dev/null +++ b/src/core/src/index/revindex/mod.rs @@ -0,0 +1,509 @@ +pub mod disk_revindex; +pub mod mem_revindex; + +use std::collections::HashMap; +use std::hash::{Hash, Hasher}; +use std::path::{Path, PathBuf}; +use std::sync::Arc; + +use byteorder::{LittleEndian, WriteBytesExt}; +use rkyv::{Archive, Deserialize, Serialize}; +use roaring::RoaringTreemap; + +use crate::index::revindex::mem_revindex::GatherResult; +use crate::signature::{Signature, SigsTrait}; +use crate::sketch::minhash::{max_hash_for_scaled, KmerMinHash}; +use crate::sketch::Sketch; + +use crate::encodings::Color; + +//type DB = rocksdb::DBWithThreadMode; +type DB = rocksdb::DBWithThreadMode; + +type DatasetID = u64; +type SigCounter = counter::Counter; +type QueryColors = HashMap; +type HashToColor = HashMap; + +const HASHES: &str = "hashes"; +const SIGS: &str = "signatures"; +const COLORS: &str = "colors"; + +pub enum RevIndex { + //Color(color_revindex::ColorRevIndex), + Plain(disk_revindex::RevIndex), +} + +impl RevIndex { + /* TODO: need the repair_cf variant, not available in rocksdb-rust yet + pub fn repair(index: &Path, colors: bool) { + if colors { + color_revindex::repair(index); + } else { + disk_revindex::repair(index); + } + } + */ + + pub fn counter_for_query(&self, query: &KmerMinHash) -> SigCounter { + match self { + //Self::Color(db) => db.counter_for_query(query), + Self::Plain(db) => db.counter_for_query(query), + } + } + + pub fn matches_from_counter( + &self, + counter: SigCounter, + threshold: usize, + ) -> Vec<(String, usize)> { + match self { + //Self::Color(db) => todo!(), //db.matches_from_counter(counter, threshold), + Self::Plain(db) => db.matches_from_counter(counter, threshold), + } + } + + pub fn prepare_gather_counters( + &self, + query: &KmerMinHash, + ) -> (SigCounter, QueryColors, HashToColor) { + match self { + //Self::Color(_db) => todo!(), //db.prepare_gather_counters(query), + Self::Plain(db) => db.prepare_gather_counters(query), + } + } + + pub fn index( + &self, + index_sigs: Vec, + template: &Sketch, + threshold: f64, + save_paths: bool, + ) { + match self { + //Self::Color(db) => db.index(index_sigs, template, threshold, save_paths), + Self::Plain(db) => db.index(index_sigs, template, threshold, save_paths), + } + } + + pub fn update( + &self, + index_sigs: Vec, + template: &Sketch, + threshold: f64, + save_paths: bool, + ) { + match self { + //Self::Color(db) => db.update(index_sigs, template, threshold, save_paths), + Self::Plain(db) => db.update(index_sigs, template, threshold, save_paths), + } + } + + pub fn compact(&self) { + match self { + //Self::Color(db) => db.compact(), + Self::Plain(db) => db.compact(), + }; + } + + pub fn flush(&self) -> Result<(), Box> { + match self { + //Self::Color(db) => db.flush(), + Self::Plain(db) => db.flush(), + } + } + + pub fn convert(&self, output_db: RevIndex) -> Result<(), Box> { + match self { + //Self::Color(_db) => todo!(), + Self::Plain(db) => db.convert(output_db), + } + } + + pub fn check(&self, quick: bool) { + match self { + //Self::Color(db) => db.check(quick), + Self::Plain(db) => db.check(quick), + } + } + + pub fn create(index: &Path, colors: bool) -> Self { + if colors { + todo!() //color_revindex::ColorRevIndex::create(index) + } else { + disk_revindex::RevIndex::create(index) + } + } + + pub fn open(index: &Path, read_only: bool) -> Self { + let opts = Self::db_options(); + let cfs = DB::list_cf(&opts, index).unwrap(); + + if cfs.into_iter().any(|c| c == COLORS) { + // TODO: ColorRevIndex can't be read-only for now, + // due to pending unmerged colors + todo!() //color_revindex::ColorRevIndex::open(index, false) + } else { + disk_revindex::RevIndex::open(index, read_only) + } + } + + fn db_options() -> rocksdb::Options { + let mut opts = rocksdb::Options::default(); + opts.set_max_open_files(500); + + // Updated defaults from + // https://github.com/facebook/rocksdb/wiki/Setup-Options-and-Basic-Tuning#other-general-options + opts.set_bytes_per_sync(1048576); + let mut block_opts = rocksdb::BlockBasedOptions::default(); + block_opts.set_block_size(16 * 1024); + block_opts.set_cache_index_and_filter_blocks(true); + block_opts.set_pin_l0_filter_and_index_blocks_in_cache(true); + block_opts.set_format_version(5); + opts.set_block_based_table_factory(&block_opts); + // End of updated defaults + + opts + } + + pub fn gather( + &self, + counter: SigCounter, + query_colors: QueryColors, + hash_to_color: HashToColor, + threshold: usize, + query: &KmerMinHash, + template: &Sketch, + ) -> Result, Box> { + match self { + //Self::Color(_db) => todo!(), + Self::Plain(db) => db.gather( + counter, + query_colors, + hash_to_color, + threshold, + query, + template, + ), + } + } +} + +#[derive(Debug, Default, PartialEq, Clone, Archive, Serialize, Deserialize)] +enum SignatureData { + #[default] + Empty, + Internal(Signature), + External(String), +} + +impl SignatureData { + fn from_slice(slice: &[u8]) -> Option { + // TODO: avoid the aligned vec allocation here + let mut vec = rkyv::AlignedVec::new(); + vec.extend_from_slice(slice); + let archived_value = unsafe { rkyv::archived_root::(vec.as_ref()) }; + let inner = archived_value.deserialize(&mut rkyv::Infallible).unwrap(); + Some(inner) + } + + fn as_bytes(&self) -> Option> { + let bytes = rkyv::to_bytes::<_, 256>(self).unwrap(); + Some(bytes.into_vec()) + + /* + let mut serializer = DefaultSerializer::default(); + let v = serializer.serialize_value(self).unwrap(); + debug_assert_eq!(v, 0); + let buf = serializer.into_serializer().into_inner(); + debug_assert!(Datasets::from_slice(&buf.to_vec()).is_some()); + Some(buf.to_vec()) + */ + } +} + +fn check_compatible_downsample(me: &KmerMinHash, other: &KmerMinHash) -> Result<(), crate::Error> { + /* + if self.num != other.num { + return Err(Error::MismatchNum { + n1: self.num, + n2: other.num, + } + .into()); + } + */ + use crate::Error; + + if me.ksize() != other.ksize() { + return Err(Error::MismatchKSizes); + } + if me.hash_function() != other.hash_function() { + // TODO: fix this error + return Err(Error::MismatchDNAProt); + } + if me.max_hash() < other.max_hash() { + return Err(Error::MismatchScaled); + } + if me.seed() != other.seed() { + return Err(Error::MismatchSeed); + } + Ok(()) +} + +pub fn prepare_query(search_sig: &Signature, template: &Sketch) -> Option { + let mut search_mh = None; + if let Some(Sketch::MinHash(mh)) = search_sig.select_sketch(template) { + search_mh = Some(mh.clone()); + } else { + // try to find one that can be downsampled + if let Sketch::MinHash(template_mh) = template { + for sketch in search_sig.sketches() { + if let Sketch::MinHash(ref_mh) = sketch { + if check_compatible_downsample(&ref_mh, template_mh).is_ok() { + let max_hash = max_hash_for_scaled(template_mh.scaled()); + let mh = ref_mh.downsample_max_hash(max_hash).unwrap(); + search_mh = Some(mh); + } + } + } + } + } + search_mh +} + +#[derive(Debug, Default, PartialEq, Clone)] +pub enum Datasets { + #[default] + Empty, + Unique(DatasetID), + Many(RoaringTreemap), +} + +impl Hash for Datasets { + fn hash(&self, state: &mut H) + where + H: Hasher, + { + match self { + Self::Empty => todo!(), + Self::Unique(v) => v.hash(state), + Self::Many(_) => todo!(), + } + } +} + +impl IntoIterator for Datasets { + type Item = DatasetID; + type IntoIter = Box>; + + fn into_iter(self) -> Self::IntoIter { + match self { + Self::Empty => Box::new(std::iter::empty()), + Self::Unique(v) => Box::new(std::iter::once(v)), + Self::Many(v) => Box::new(v.into_iter()), + } + } +} + +impl Extend for Datasets { + fn extend(&mut self, iter: T) + where + T: IntoIterator, + { + if let Self::Many(v) = self { + v.extend(iter); + return; + } + + let mut it = iter.into_iter(); + while let Some(value) = it.next() { + match self { + Self::Empty => *self = Datasets::Unique(value), + Self::Unique(v) => { + if *v != value { + *self = Self::Many([*v, value].iter().copied().collect()); + } + } + Self::Many(v) => { + v.extend(it); + return; + } + } + } + } +} + +impl Datasets { + fn new(vals: &[DatasetID]) -> Self { + if vals.is_empty() { + Self::Empty + } else if vals.len() == 1 { + Self::Unique(vals[0]) + } else { + Self::Many(RoaringTreemap::from_sorted_iter(vals.iter().copied()).unwrap()) + } + } + + fn from_slice(slice: &[u8]) -> Option { + use byteorder::ReadBytesExt; + + if slice.len() == 8 { + // Unique + Some(Self::Unique( + (&slice[..]).read_u64::().unwrap(), + )) + } else if slice.len() == 1 { + // Empty + Some(Self::Empty) + } else { + // Many + Some(Self::Many(RoaringTreemap::deserialize_from(slice).unwrap())) + } + } + + fn as_bytes(&self) -> Option> { + use byteorder::WriteBytesExt; + + match self { + Self::Empty => Some(vec![42_u8]), + Self::Unique(v) => { + let mut buf = vec![0u8; 8]; + (&mut buf[..]) + .write_u64::(*v) + .expect("error writing bytes"); + Some(buf) + } + Self::Many(v) => { + let mut buf = vec![]; + v.serialize_into(&mut buf).unwrap(); + Some(buf) + } + } + } + + fn union(&mut self, other: Datasets) { + match self { + Datasets::Empty => match other { + Datasets::Empty => (), + Datasets::Unique(_) | Datasets::Many(_) => *self = other, + }, + Datasets::Unique(v) => match other { + Datasets::Empty => (), + Datasets::Unique(o) => { + if *v != o { + *self = Datasets::Many([*v, o].iter().copied().collect()) + } + } + Datasets::Many(mut o) => { + o.extend([*v]); + *self = Datasets::Many(o); + } + }, + Datasets::Many(ref mut v) => v.extend(other), + } + } + + fn len(&self) -> usize { + match self { + Self::Empty => 0, + Self::Unique(_) => 1, + Self::Many(ref v) => v.len() as usize, + } + } + + /* + fn contains(&self, value: &DatasetID) -> bool { + match self { + Self::Empty => false, + Self::Unique(v) => v == value, + Self::Many(ref v) => v.contains(*value), + } + } + */ +} + +fn sig_save_to_db( + db: Arc, + mut search_sig: Signature, + search_mh: KmerMinHash, + size: u64, + threshold: f64, + save_paths: bool, + filename: &Path, + dataset_id: u64, +) { + // Save signature to DB + let sig = if search_mh.is_empty() || size < threshold as u64 { + SignatureData::Empty + } else if save_paths { + SignatureData::External(filename.to_str().unwrap().to_string()) + } else { + search_sig.reset_sketches(); + search_sig.push(Sketch::MinHash(search_mh)); + SignatureData::Internal(search_sig) + }; + + let sig_bytes = sig.as_bytes().unwrap(); + let cf_sigs = db.cf_handle(SIGS).unwrap(); + let mut hash_bytes = [0u8; 8]; + (&mut hash_bytes[..]) + .write_u64::(dataset_id) + .expect("error writing bytes"); + db.put_cf(&cf_sigs, &hash_bytes[..], sig_bytes.as_slice()) + .expect("error saving sig"); +} + +fn stats_for_cf(db: Arc, cf_name: &str, deep_check: bool, quick: bool) { + use byteorder::ReadBytesExt; + use histogram::Histogram; + use log::info; + use numsep::{separate, Locale}; + + let cf = db.cf_handle(cf_name).unwrap(); + + let iter = db.iterator_cf(&cf, rocksdb::IteratorMode::Start); + let mut kcount = 0; + let mut vcount = 0; + let mut vcounts = Histogram::new(); + let mut datasets: Datasets = Default::default(); + + for result in iter { + let (key, value) = result.unwrap(); + let _k = (&key[..]).read_u64::().unwrap(); + kcount += key.len(); + + //println!("Saw {} {:?}", k, Datasets::from_slice(&value)); + vcount += value.len(); + + if !quick && deep_check { + let v = Datasets::from_slice(&value).expect("Error with value"); + vcounts.increment(v.len() as u64).unwrap(); + datasets.union(v); + } + //println!("Saw {} {:?}", k, value); + } + + info!("*** {} ***", cf_name); + use size::Size; + let ksize = Size::from_bytes(kcount); + let vsize = Size::from_bytes(vcount); + if !quick && cf_name == COLORS { + info!( + "total datasets: {}", + separate(datasets.len(), Locale::English) + ); + } + info!("total keys: {}", separate(kcount / 8, Locale::English)); + + info!("k: {}", ksize.to_string()); + info!("v: {}", vsize.to_string()); + + if !quick && kcount > 0 && deep_check { + info!("max v: {}", vcounts.maximum().unwrap()); + info!("mean v: {}", vcounts.mean().unwrap()); + info!("stddev: {}", vcounts.stddev().unwrap()); + info!("median v: {}", vcounts.percentile(50.0).unwrap()); + info!("p25 v: {}", vcounts.percentile(25.0).unwrap()); + info!("p75 v: {}", vcounts.percentile(75.0).unwrap()); + } +} diff --git a/src/core/src/lib.rs b/src/core/src/lib.rs index 66de82e6a0..afbd3d9f7f 100644 --- a/src/core/src/lib.rs +++ b/src/core/src/lib.rs @@ -45,6 +45,8 @@ cfg_if! { } else { pub mod ffi; pub mod index; + pub mod manifest; + pub mod picklist; } } diff --git a/src/core/src/manifest.rs b/src/core/src/manifest.rs new file mode 100644 index 0000000000..ce740c638b --- /dev/null +++ b/src/core/src/manifest.rs @@ -0,0 +1,186 @@ +use std::convert::TryInto; +use std::io::Read; +use std::ops::Deref; +use std::path::PathBuf; + +use serde::de; +use serde::{Deserialize, Serialize}; + +use crate::encodings::HashFunctions; +use crate::index::Selection; +use crate::Error; + +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct Record { + internal_location: String, + ksize: u32, + + #[serde(deserialize_with = "to_bool")] + with_abundance: bool, + + md5: String, + name: String, + moltype: String, + /* + md5short: String, + num: String, + scaled: String, + n_hashes: String, + filename: String, + */ +} + +fn to_bool<'de, D>(deserializer: D) -> Result +where + D: de::Deserializer<'de>, +{ + match String::deserialize(deserializer)? + .to_ascii_lowercase() + .as_ref() + { + "0" | "false" => Ok(false), + "1" | "true" => Ok(true), + other => Err(de::Error::invalid_value( + de::Unexpected::Str(other), + &"0/1 or true/false are the only supported values", + )), + } +} + +#[derive(Debug, Default, Serialize, Deserialize, Clone)] +pub struct Manifest { + records: Vec, +} + +impl Record { + pub fn internal_location(&self) -> PathBuf { + self.internal_location.clone().into() + } + + pub fn ksize(&self) -> u32 { + self.ksize + } + + pub fn with_abundance(&self) -> bool { + self.with_abundance + } + + pub fn md5(&self) -> &str { + self.md5.as_ref() + } + + pub fn name(&self) -> &str { + self.name.as_ref() + } + + pub fn moltype(&self) -> HashFunctions { + self.moltype.as_str().try_into().unwrap() + } +} + +impl Manifest { + pub fn from_reader(rdr: R) -> Result { + let mut records = vec![]; + + let mut rdr = csv::ReaderBuilder::new() + .comment(Some(b'#')) + .from_reader(rdr); + for result in rdr.deserialize() { + let record: Record = result?; + records.push(record); + } + Ok(Manifest { records }) + } + + pub fn internal_locations(&self) -> impl Iterator { + self.records.iter().map(|r| r.internal_location.as_str()) + } + + pub fn iter(&self) -> impl Iterator { + self.records.iter() + } + + pub fn select_to_manifest(&self, selection: &Selection) -> Result { + let rows = self.records.iter().filter(|row| { + let mut valid = true; + valid = if let Some(ksize) = selection.ksize() { + row.ksize == ksize + } else { + valid + }; + valid = if let Some(abund) = selection.abund() { + valid && row.with_abundance() == abund + } else { + valid + }; + valid = if let Some(moltype) = selection.moltype() { + valid && row.moltype() == moltype + } else { + valid + }; + valid + }); + + Ok(Manifest { + records: rows.cloned().collect(), + }) + + /* + matching_rows = self.rows + if ksize: + matching_rows = ( row for row in matching_rows + if row['ksize'] == ksize ) + if moltype: + matching_rows = ( row for row in matching_rows + if row['moltype'] == moltype ) + if scaled or containment: + if containment and not scaled: + raise ValueError("'containment' requires 'scaled' in Index.select'") + + matching_rows = ( row for row in matching_rows + if row['scaled'] and not row['num'] ) + if num: + matching_rows = ( row for row in matching_rows + if row['num'] and not row['scaled'] ) + + if abund: + # only need to concern ourselves if abundance is _required_ + matching_rows = ( row for row in matching_rows + if row['with_abundance'] ) + + if picklist: + matching_rows = ( row for row in matching_rows + if picklist.matches_manifest_row(row) ) + + # return only the internal filenames! + for row in matching_rows: + yield row + */ + } +} + +impl From<&[PathBuf]> for Manifest { + fn from(v: &[PathBuf]) -> Self { + Manifest { + records: v + .iter() + .map(|p| Record { + internal_location: p.to_str().unwrap().into(), + ksize: 0, // FIXME + with_abundance: false, // FIXME + md5: "".into(), // FIXME + name: "".into(), // FIXME + moltype: "".into(), // FIXME + }) + .collect(), + } + } +} + +impl Deref for Manifest { + type Target = Vec; + + fn deref(&self) -> &Self::Target { + &self.records + } +} diff --git a/src/core/src/picklist.rs b/src/core/src/picklist.rs new file mode 100644 index 0000000000..ddb3183d14 --- /dev/null +++ b/src/core/src/picklist.rs @@ -0,0 +1,29 @@ +use getset::{CopyGetters, Getters, Setters}; +use typed_builder::TypedBuilder; + +#[derive(Default, TypedBuilder, CopyGetters, Getters, Setters, Clone)] +pub struct Picklist { + #[getset(get = "pub", set = "pub")] + #[builder(default = "".into())] + coltype: String, + + #[getset(get = "pub", set = "pub")] + #[builder(default = "".into())] + pickfile: String, + + #[getset(get = "pub", set = "pub")] + #[builder(default = "".into())] + column_name: String, + + #[getset(get = "pub", set = "pub")] + #[builder] + pickstyle: PickStyle, +} + +#[derive(Clone, Default)] +#[repr(u32)] +pub enum PickStyle { + #[default] + Include = 1, + Exclude = 2, +} diff --git a/src/core/src/signature.rs b/src/core/src/signature.rs index db2a85ea05..290b76788d 100644 --- a/src/core/src/signature.rs +++ b/src/core/src/signature.rs @@ -2,6 +2,8 @@ //! //! A signature is a collection of sketches for a genomic dataset. +use core::iter::FusedIterator; + use std::fs::File; use std::io; use std::iter::Iterator; @@ -20,6 +22,8 @@ use crate::sketch::Sketch; use crate::Error; use crate::HashIntoType; +// TODO: this is the behavior expected from Sketch, but that name is already +// used. Sketchable? pub trait SigsTrait { fn size(&self) -> usize; fn to_vec(&self) -> Vec; @@ -395,6 +399,10 @@ impl Iterator for SeqToHashes { } #[derive(Serialize, Deserialize, Debug, Clone, TypedBuilder)] +#[cfg_attr( + feature = "rkyv", + derive(rkyv::Serialize, rkyv::Deserialize, rkyv::Archive) +)] pub struct Signature { #[serde(default = "default_class")] #[builder(default = default_class())] @@ -654,6 +662,92 @@ impl Signature { Ok(()) } + + pub fn iter_mut(&mut self) -> IterMut<'_> { + let length = self.signatures.len(); + IterMut { + iter: self.signatures.iter_mut(), + length, + } + } + + pub fn iter(&mut self) -> Iter<'_> { + let length = self.signatures.len(); + Iter { + iter: self.signatures.iter(), + length, + } + } +} + +pub struct IterMut<'a> { + iter: std::slice::IterMut<'a, Sketch>, + length: usize, +} + +impl<'a> IntoIterator for &'a mut Signature { + type Item = &'a mut Sketch; + type IntoIter = IterMut<'a>; + + fn into_iter(self) -> IterMut<'a> { + self.iter_mut() + } +} + +impl<'a> Iterator for IterMut<'a> { + type Item = &'a mut Sketch; + + fn next(&mut self) -> Option<&'a mut Sketch> { + if self.length == 0 { + None + } else { + self.length -= 1; + self.iter.next() + } + } + + fn size_hint(&self) -> (usize, Option) { + (self.length, Some(self.length)) + } +} + +pub struct Iter<'a> { + iter: std::slice::Iter<'a, Sketch>, + length: usize, +} + +impl<'a> Iterator for Iter<'a> { + type Item = &'a Sketch; + + fn next(&mut self) -> Option<&'a Sketch> { + if self.length == 0 { + None + } else { + self.length -= 1; + self.iter.next() + } + } + + fn size_hint(&self) -> (usize, Option) { + (self.length, Some(self.length)) + } +} + +impl FusedIterator for Iter<'_> {} + +impl ExactSizeIterator for Iter<'_> { + fn len(&self) -> usize { + self.length + } +} + +impl Clone for Iter<'_> { + fn clone(&self) -> Self { + Iter { + iter: self.iter.clone(), + length: self.length, + } + } } impl ToWriter for Signature { diff --git a/src/core/src/sketch/hyperloglog/mod.rs b/src/core/src/sketch/hyperloglog/mod.rs index 409d2a2c44..d575d50f70 100644 --- a/src/core/src/sketch/hyperloglog/mod.rs +++ b/src/core/src/sketch/hyperloglog/mod.rs @@ -26,6 +26,10 @@ pub mod estimators; use estimators::CounterType; #[derive(Debug, Default, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[cfg_attr( + feature = "rkyv", + derive(rkyv::Serialize, rkyv::Deserialize, rkyv::Archive) +)] pub struct HyperLogLog { registers: Vec, p: usize, diff --git a/src/core/src/sketch/minhash.rs b/src/core/src/sketch/minhash.rs index 5c5f1114f8..454a50f7a7 100644 --- a/src/core/src/sketch/minhash.rs +++ b/src/core/src/sketch/minhash.rs @@ -33,6 +33,10 @@ pub fn scaled_for_max_hash(max_hash: u64) -> u64 { } #[derive(Debug, TypedBuilder)] +#[cfg_attr( + feature = "rkyv", + derive(rkyv::Serialize, rkyv::Deserialize, rkyv::Archive) +)] pub struct KmerMinHash { num: u32, ksize: u32, @@ -53,6 +57,8 @@ pub struct KmerMinHash { abunds: Option>, #[builder(default)] + //#[cfg_attr(feature = "rkyv", with(rkyv::with::Lock))] + #[cfg_attr(feature = "rkyv", with(rkyv::with::Skip))] md5sum: Mutex>, } @@ -927,6 +933,10 @@ mod test { // A MinHash implementation for low scaled or large cardinalities #[derive(Debug, TypedBuilder)] +#[cfg_attr( + feature = "rkyv", + derive(rkyv::Serialize, rkyv::Deserialize, rkyv::Archive) +)] pub struct KmerMinHashBTree { num: u32, ksize: u32, @@ -950,6 +960,8 @@ pub struct KmerMinHashBTree { current_max: u64, #[builder(default)] + //#[cfg_attr(feature = "rkyv", with(rkyv::with::Lock))] + #[cfg_attr(feature = "rkyv", with(rkyv::with::Skip))] md5sum: Mutex>, } diff --git a/src/core/src/sketch/mod.rs b/src/core/src/sketch/mod.rs index 09bd51085c..3ef04e43df 100644 --- a/src/core/src/sketch/mod.rs +++ b/src/core/src/sketch/mod.rs @@ -10,6 +10,10 @@ use crate::sketch::minhash::{KmerMinHash, KmerMinHashBTree}; #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(untagged)] +#[cfg_attr( + feature = "rkyv", + derive(rkyv::Serialize, rkyv::Deserialize, rkyv::Archive) +)] pub enum Sketch { MinHash(KmerMinHash), LargeMinHash(KmerMinHashBTree), diff --git a/src/core/src/sketch/nodegraph.rs b/src/core/src/sketch/nodegraph.rs index cbca8915ba..bbfef5cd0d 100644 --- a/src/core/src/sketch/nodegraph.rs +++ b/src/core/src/sketch/nodegraph.rs @@ -7,7 +7,7 @@ use byteorder::{BigEndian, ByteOrder, LittleEndian, ReadBytesExt, WriteBytesExt} use fixedbitset::FixedBitSet; use crate::prelude::*; -use crate::sketch::minhash::KmerMinHash; +use crate::sketch::minhash::{KmerMinHash, KmerMinHashBTree}; use crate::Error; use crate::HashIntoType; @@ -58,6 +58,15 @@ impl Update for KmerMinHash { } } +impl Update for KmerMinHashBTree { + fn update(&self, other: &mut Nodegraph) -> Result<(), Error> { + for h in self.mins() { + other.count(h); + } + Ok(()) + } +} + impl Nodegraph { pub fn new(tablesizes: &[usize], ksize: usize) -> Nodegraph { let mut bs = Vec::with_capacity(tablesizes.len()); diff --git a/src/core/src/storage.rs b/src/core/src/storage.rs index f4f942d330..08a990d687 100644 --- a/src/core/src/storage.rs +++ b/src/core/src/storage.rs @@ -3,8 +3,7 @@ use std::ffi::OsStr; use std::fs::{DirBuilder, File}; use std::io::{BufReader, BufWriter, Read, Write}; use std::path::{Path, PathBuf}; -use std::rc::Rc; -use std::sync::RwLock; +use std::sync::{Arc, RwLock}; use serde::{Deserialize, Serialize}; use thiserror::Error; @@ -25,11 +24,11 @@ pub trait Storage { } #[derive(Clone)] -pub struct InnerStorage(Rc>); +pub struct InnerStorage(Arc>); impl InnerStorage { - pub fn new(inner: impl Storage + 'static) -> InnerStorage { - InnerStorage(Rc::new(RwLock::new(inner))) + pub fn new(inner: impl Storage + Send + Sync + 'static) -> InnerStorage { + InnerStorage(Arc::new(RwLock::new(inner))) } } diff --git a/src/core/tests/minhash.rs b/src/core/tests/minhash.rs index bcb3fdb4fa..50c03870e0 100644 --- a/src/core/tests/minhash.rs +++ b/src/core/tests/minhash.rs @@ -6,6 +6,7 @@ use proptest::collection::vec; use proptest::num::u64; use proptest::proptest; use sourmash::encodings::HashFunctions; +use sourmash::prelude::*; use sourmash::signature::SeqToHashes; use sourmash::signature::{Signature, SigsTrait}; use sourmash::sketch::minhash::{ diff --git a/src/core/tests/storage.rs b/src/core/tests/storage.rs index 5a60e02fcc..a27fa27b14 100644 --- a/src/core/tests/storage.rs +++ b/src/core/tests/storage.rs @@ -42,3 +42,41 @@ fn zipstorage_list_sbts() -> Result<(), Box> { Ok(()) } + +#[cfg(feature = "parallel")] +#[test] +fn zipstorage_parallel_access() -> Result<(), Box> { + use std::io::BufReader; + + use rayon::prelude::*; + use sourmash::signature::{Signature, SigsTrait}; + use sourmash::sketch::minhash::KmerMinHash; + + let mut filename = PathBuf::from(env!("CARGO_MANIFEST_DIR")); + filename.push("../../tests/test-data/v6.sbt.zip"); + + let zs = ZipStorage::from_file(filename.to_str().unwrap())?; + + let total_hashes: usize = [ + ".sbt.v3/f71e78178af9e45e6f1d87a0c53c465c", + ".sbt.v3/f0c834bc306651d2b9321fb21d3e8d8f", + ".sbt.v3/4e94e60265e04f0763142e20b52c0da1", + ".sbt.v3/6d6e87e1154e95b279e5e7db414bc37b", + ".sbt.v3/0107d767a345eff67ecdaed2ee5cd7ba", + ".sbt.v3/b59473c94ff2889eca5d7165936e64b3", + ".sbt.v3/60f7e23c24a8d94791cc7a8680c493f9", + ] + .par_iter() + .map(|path| { + let data = zs.load(path).unwrap(); + let sigs: Vec = serde_json::from_reader(&data[..]).expect("Loading error"); + sigs.iter() + .map(|v| v.sketches().iter().map(|mh| mh.size()).sum::()) + .sum::() + }) + .sum(); + + assert_eq!(total_hashes, 3500); + + Ok(()) +} diff --git a/src/sourmash/sbt_storage.py b/src/sourmash/sbt_storage.py index a22e782d69..42a4fceaa6 100644 --- a/src/sourmash/sbt_storage.py +++ b/src/sourmash/sbt_storage.py @@ -130,7 +130,7 @@ def subdir(self, value): self._methodcall(lib.zipstorage_set_subdir, to_bytes(value), len(value)) def _filenames(self): - if self.__inner: + if not self._objptr: return self.__inner._filenames() size = ffi.new("uintptr_t *") @@ -150,7 +150,7 @@ def save(self, path, content, *, overwrite=False, compress=False): raise NotImplementedError() def load(self, path): - if self.__inner: + if not self._objptr: return self.__inner.load(path) try: diff --git a/tests/test_index.py b/tests/test_index.py index af0c1da890..1067422c5f 100644 --- a/tests/test_index.py +++ b/tests/test_index.py @@ -1775,6 +1775,7 @@ def test_lazy_index_wraps_multi_index_location(): lazy2.signatures_with_location()): assert ss_tup == ss_lazy_tup +@pytest.mark.skip("no support for in-memory sigs yet") def test_revindex_index_search(): # confirm that RevIndex works sig2 = utils.get_test_data("2.fa.sig") @@ -1820,6 +1821,7 @@ def test_revindex_index_search(): assert sr[0][1] == ss63 +@pytest.mark.skip("no support for in-memory sigs yet") def test_revindex_gather(): # check that RevIndex.best_containment works. sig2 = utils.get_test_data("2.fa.sig") @@ -1846,6 +1848,7 @@ def test_revindex_gather(): assert match.signature == ss47 +@pytest.mark.skip("no support for in-memory sigs yet") def test_revindex_gather_ignore(): # check that RevIndex gather ignores things properly. sig2 = utils.get_test_data('2.fa.sig') diff --git a/tox.ini b/tox.ini index 41734a6a3b..f73758ab7f 100644 --- a/tox.ini +++ b/tox.ini @@ -50,6 +50,11 @@ commands = pytest \ --junitxml {toxworkdir}/junit.{envname}.xml \ {posargs:doc tests} +[testenv:.pkg] +pass_env = + LIBCLANG_PATH + BINDGEN_EXTRA_CLANG_ARGS + [testenv:pypy3] deps = pip >= 19.3.1 @@ -104,7 +109,7 @@ commands = description = invoke sphinx-build to build the HTML docs basepython = python3.10 extras = doc -whitelist_externals = pandoc +allowlist_externals = pandoc pass_env = HOME change_dir = {toxinidir} #commands = sphinx-build -d "{toxworkdir}/docs_doctree" doc "{toxworkdir}/docs_out" --color -W -bhtml {posargs}