Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Low-memory sequence I/O #118

Merged
merged 6 commits into from
Nov 9, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions bench/cljam/io/sequence_bench.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
(ns cljam.io.sequence-bench
(:require [libra.bench :refer :all]
[libra.criterium :as c]
[cljam.test-common :as tcommon]
[cljam.io.sequence :as cseq]))

(defbench read-all-sequences-test
(are [f opts] (c/quick-bench (with-open [rdr (cseq/reader f)]
(dorun (cseq/read-all-sequences rdr opts))))
tcommon/medium-fa-file {}
tcommon/medium-fa-file {:mask? true}
tcommon/medium-twobit-file {}
tcommon/medium-twobit-file {:mask? true}))
4 changes: 3 additions & 1 deletion src/cljam/algo/convert.clj
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,9 @@
"Converts file format between FASTA and TwoBit based on the file extension."
[in out]
(with-open [rdr (cseq/reader in)
wtr (cseq/writer out)]
wtr (cseq/writer out {:index (if (cseq/indexed? rdr)
(cseq/read-indices rdr)
(logging/warn "Non-indexed sequence may use stupendous memory."))})]
(cseq/write-sequences wtr (cseq/read-all-sequences rdr {:mask? true}))))

;;; General converter
Expand Down
17 changes: 9 additions & 8 deletions src/cljam/io/fasta/core.clj
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,17 @@
[f]
(let [f (.getAbsolutePath (cio/file f))]
(FASTAReader. (RandomAccessFile. f "r")
(util/compressor-input-stream f)
f
(delay (fasta-index f)))))

(defn ^FASTAReader clone-reader
"Clones fasta reader sharing persistent objects."
[^FASTAReader rdr]
(let [f (.f rdr)
raf (RandomAccessFile. ^String f "r")]
(FASTAReader. raf f (.index-delay rdr))))
raf (RandomAccessFile. ^String f "r")
stream (util/compressor-input-stream f)]
(FASTAReader. raf stream f (.index-delay rdr))))

(defn read-headers
[^FASTAReader rdr]
Expand Down Expand Up @@ -70,11 +72,10 @@
(reader/reset rdr))

(defn sequential-read
([f]
(sequential-read f {}))
([f opts]
(with-open [stream (util/compressor-input-stream f)]
(reader/sequential-read-string stream (* 1024 1024 10) 536870912 opts))))
([rdr]
(sequential-read rdr {}))
([^FASTAReader rdr opts]
(reader/sequential-read-string (.stream rdr) (* 1024 1024 10) 536870912 opts)))

(extend-type FASTAReader
protocols/IReader
Expand All @@ -100,7 +101,7 @@
(read-all-sequences
([this] (protocols/read-all-sequences this {}))
([this opts]
(sequential-read (.f this) opts)))
(sequential-read this opts)))
(read-sequence
([this region]
(protocols/read-sequence this region {}))
Expand Down
115 changes: 68 additions & 47 deletions src/cljam/io/fasta/reader.clj
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,17 @@
[cljam.util :refer [graph?]]
[cljam.io.fasta.util :refer [header-line? parse-header-line]]
[cljam.io.fasta-index.core :as fasta-index])
(:import [java.io RandomAccessFile InputStream]))
(:import [java.io RandomAccessFile InputStream]
[java.nio ByteBuffer]))

;; FASTAReader
;; -----------

(deftype FASTAReader [reader f index-delay]
(deftype FASTAReader [reader stream f index-delay]
java.io.Closeable
(close [this]
(.close ^java.io.Closeable (.reader this))))
(.close ^java.io.Closeable (.reader this))
(.close ^java.io.Closeable (.stream this))))

;; Reading
;; -------
Expand Down Expand Up @@ -111,7 +113,7 @@
(let [r ^RandomAccessFile (.reader rdr)]
(.seek r 0)))

(definline create-ba [^java.nio.ByteBuffer buffer]
(definline create-ba [^ByteBuffer buffer]
`(when (pos? (.position ~buffer))
(let [ba# (byte-array (.position ~buffer))]
(.clear ~buffer)
Expand All @@ -122,57 +124,76 @@
(def ^:private ^:const gt-byte (byte \>))
(def ^:private ^:const newline-byte (byte \newline))

(defn- sequqntial-read*
"Core function to read FASTA sequentially.
Function f is called every time a single sequence finishes reading.
When finished reading entire file, f is called with nil."
[^InputStream stream page-size seq-buf-size ^bytes byte-map f]
(defn- read-buffer!
[^bytes buf ^long size buffers ^bytes byte-map]
(let [{:keys [^ByteBuffer name-buf ^ByteBuffer seq-buf ^ByteBuffer rest-buf]} buffers]
(loop [i 0, name-line? false]
(if (< i size)
(let [b (aget buf i)]
(cond
(= b gt-byte) (if (pos? (.position seq-buf))
(do (.put rest-buf buf i (- size i))
true)
(recur (inc i) true))
(= b newline-byte) (if name-line?
(recur (inc i) false)
(recur (inc i) name-line?))
:else (do (if name-line?
(.put name-buf b)
(.put seq-buf (aget byte-map b)))
(recur (inc i) name-line?))))
false))))

(defn- sequential-read1!
[^InputStream stream buf buffers byte-map loaded-bytes]
(let [{:keys [^ByteBuffer name-buf ^ByteBuffer seq-buf ^ByteBuffer rest-buf]} buffers
read-preload? (atom (some? (seq loaded-bytes)))]
(loop [new-ref? false]
(if-not new-ref?
(if @read-preload?
(let [new-ref*? (read-buffer! loaded-bytes (count loaded-bytes) buffers byte-map)]
(reset! read-preload? false)
(recur new-ref*?))
(let [n (.read stream buf)]
(if (pos? n)
(recur (read-buffer! buf n buffers byte-map))
{:name (create-ba name-buf) :sequence (create-ba seq-buf) :rest-bytes (create-ba rest-buf) :eof? true})))
{:name (create-ba name-buf) :sequence (create-ba seq-buf) :rest-bytes (create-ba rest-buf) :eof? false}))))

(defn- sequential-read!
[stream buf buffers byte-map loaded-bytes eof?]
(when (or (not eof?) (seq loaded-bytes))
(lazy-seq
(let [m (sequential-read1! stream buf buffers byte-map loaded-bytes)]
(cons (select-keys m [:name :sequence])
(sequential-read! stream buf buffers byte-map (:rest-bytes m) (:eof? m)))))))

(defn- sequential-read
[stream page-size seq-buf-size byte-map]
(let [buf (byte-array page-size)
n-buf (java.nio.ByteBuffer/allocate 1024)
s-buf (java.nio.ByteBuffer/allocate seq-buf-size)]
(loop [rname* nil]
(let [bytes (.read stream buf)]
(if (pos? bytes)
(recur
(loop [i 0 name-line? false rname rname*]
(if (< i bytes)
(let [b (aget buf i)]
(if (= b gt-byte)
(do (when-let [s (create-ba s-buf)] (f {:name rname :sequence s}))
(recur (inc i) true rname))
(if (= b newline-byte)
(if name-line?
(recur (inc i) false (create-ba n-buf))
(recur (inc i) name-line? rname))
(do (if name-line?
(.put n-buf b)
(.put s-buf (aget byte-map b)))
(recur (inc i) name-line? rname)))))
rname)))
(f {:name rname* :sequence (create-ba s-buf)}))))
(f nil)))
name-buf (ByteBuffer/allocate 1024)
seq-buf (ByteBuffer/allocate seq-buf-size)
rest-buf (ByteBuffer/allocate page-size)]
(sequential-read! stream buf
{:name-buf name-buf :seq-buf seq-buf :rest-buf rest-buf}
byte-map (byte-array 0) false)))

(defn sequential-read-byte-array
"Returns list of maps containing sequence as byte-array.
Bases ACGTN are encoded as 1~5"
[^InputStream stream page-size seq-buf-size]
(let [s (atom [])
byte-map (byte-array (range 128))]
"Returns list of maps containing sequence as byte-array. Bases ACGTN are
encoded as 1-5."
[stream page-size seq-buf-size]
(let [byte-map (byte-array (range 128))]
(doseq [[i v] [[\a 1] [\A 1] [\c 2] [\C 2] [\g 3] [\G 3] [\t 4] [\T 4] [\n 5] [\N 5]]]
(aset-byte byte-map (byte i) (byte v)))
(sequqntial-read* stream page-size seq-buf-size byte-map #(when % (swap! s conj %)))
@s))
(sequential-read stream page-size seq-buf-size byte-map)))

(defn sequential-read-string
"Returns list of maps containing sequence as upper-case string."
[^InputStream stream page-size seq-buf-size {:keys [mask?]}]
(let [s (atom [])
byte-map (byte-array (range 128))
handler (fn [{:keys [name sequence]}]
(when (and name sequence)
(swap! s conj {:name (String. ^bytes name) :sequence (String. ^bytes sequence)})))]
[stream page-size seq-buf-size {:keys [mask?]}]
(let [byte-map (byte-array (range 128))]
(when-not mask?
(doseq [[i v] [[\a \A] [\c \C] [\g \G] [\t \T] [\n \N]]]
(aset-byte byte-map (byte i) (byte v))))
(sequqntial-read* stream page-size seq-buf-size byte-map handler)
@s))
(map (fn [{:keys [^bytes name ^bytes sequence]}]
{:name (String. name) :sequence (String. sequence)})
(sequential-read stream page-size seq-buf-size byte-map))))
14 changes: 9 additions & 5 deletions src/cljam/io/sequence.clj
Original file line number Diff line number Diff line change
Expand Up @@ -80,18 +80,22 @@
(fa-writer/writer f options)))

(defn ^TwoBitWriter twobit-writer
"Returns an open cljam.io.twobit.writer.TwoBitWriter of f. Should be used
inside with-open to ensure the writer is properly closed."
[f]
(tb-writer/writer f))
"Returns an open cljam.io.twobit.writer.TwoBitWriter of f with options:
:index - metadata of indexed sequences. The amount of memory usage can be
reduced if index is supplied.
Should be used inside with-open to ensure the writer is properly closed."
([f]
(twobit-writer f {}))
([f options]
(tb-writer/writer f options)))

(defn ^Closeable writer
"Selects suitable writer from f's extension, returning the open writer. This
function supports FASTA and TwoBit format."
[f & options]
(case (io-util/file-type f)
:fasta (apply fasta-writer f options)
:2bit (twobit-writer f)
:2bit (apply twobit-writer f options)
(throw (IllegalArgumentException. "Invalid file type"))))

(defn write-sequences
Expand Down
12 changes: 10 additions & 2 deletions src/cljam/io/twobit/reader.clj
Original file line number Diff line number Diff line change
Expand Up @@ -119,13 +119,21 @@
(.rewind cb)
(.toString cb)))))

(defn- read-all-sequences*
[rdr chrs option]
(when (seq chrs)
(let [[{:keys [name]} & nxt] chrs]
(lazy-seq
(cons {:name name
:sequence (read-sequence rdr {:chr name} option)}
(read-all-sequences* rdr nxt option))))))

(defn read-all-sequences
"Reads all sequences in file."
([rdr]
(read-all-sequences rdr {}))
([^TwoBitReader rdr option]
(for [{:keys [name offset]} (.file-index rdr)]
{:name name :sequence (read-sequence rdr {:chr name} option)})))
(read-all-sequences* rdr (.file-index rdr) option)))

(defn read-indices
"Reads metadata of indexed sequences."
Expand Down
Loading