Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extract depth algorithm and improve performance. #91

Merged
merged 1 commit into from
Jul 28, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 128 additions & 0 deletions src/cljam/algo/depth.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
(ns cljam.algo.depth
"Provides algorithms for calculating simple depth of coverage."
(:require [com.climate.claypoole :as cp]
[cljam.common :as common]
[cljam.util :as util]
[cljam.io.sam :as sam]
[cljam.io.sam.util :as sam-util]))

(def ^:const default-step 1000000)

;; lazy
;; ----

(defn- count-for-positions
"Piles the alignments up and counts them in the positions, returning it as a seq."
[alns beg end]
(let [pile (long-array (inc (- end beg)))]
(doseq [aln alns]
(let [left (max (:pos aln) beg)
right (min (sam-util/get-end aln) end)
left-index (- left beg)]
(dotimes [i (inc (- right left))]
(aset-long pile (+ i left-index) (inc (aget pile (+ i left-index)))))))
(seq pile)))

(defn- lazy-depth*
"Internal lazy-depth function returning lazy sequence of depth."
[rdr rname start end step]
(let [n-threads (common/get-exec-n-threads)
read-fn (fn [r start end]
(sam/read-alignments r {:chr rname :start start :end end :depth :shallow}))
count-fn (fn [xs]
(if (= n-threads 1)
(map (fn [[start end]]
(count-for-positions (read-fn rdr start end) start end)) xs)
(cp/pmap (dec n-threads)
(fn [[start end]]
(with-open [r (sam/clone-bam-reader rdr)]
(count-for-positions (read-fn r start end) start end))) xs)))]
(->> (util/divide-region start end step)
count-fn
(apply concat))))

(defn lazy-depth
"Calculate depth of coverage lazily. Returns a lazy seq of depth for range [start, end].
Requires a `cljam.io.bam.reader.BAMReader` instance and region.
If start and end are not supplied, piles whole range up.
Note that CIGAR code in alignemnts are ignored and only start/end positions are used."
[bam-reader {:keys [chr start end] :or {start 1 end Long/MAX_VALUE}}
& [{:keys [step n-threads] :or {step default-step n-threads 1}}]]
{:pre [chr start end (pos? start) (pos? end) (<= start end)]}
(when-let [{:keys [len]} (sam-util/ref-by-name (sam/read-refs bam-reader) chr)]
(binding [common/*n-threads* n-threads]
(lazy-depth* bam-reader chr (min len start) (min len end) step))))

;; eager
;; -----

(defn- unchecked-aset-depth-in-region!
"Piles alignments up and sets depth values to a part of the given int-array."
[alns beg end offset ^ints pile]
(let [beg (int beg)
end (int end)
offset (int offset)]
(doseq [aln alns]
(let [left (Math/max ^int (:pos aln) (unchecked-int beg))
right (unchecked-inc-int (or (:end aln) (sam-util/get-end aln)))
left-index (unchecked-add-int (unchecked-subtract-int left beg) offset)
right-index (unchecked-add-int (unchecked-subtract-int right beg) offset)]
(aset-int pile left-index (unchecked-inc-int (aget pile left-index)))
(when (<= right end)
(aset-int pile right-index (unchecked-dec-int (aget pile right-index))))))
(dotimes [i (- end beg)]
(aset-int
pile
(unchecked-add-int (unchecked-inc-int i) offset)
(unchecked-add-int
(aget pile (unchecked-add-int i offset))
(aget pile (unchecked-add-int (unchecked-inc-int i) offset)))))))

(defn- aset-depth-in-region!
"Piles alignments up and sets depth values to a part of the given int-array.
It's roughly 15-25% slower than unchecked version."
[alns beg end offset ^ints pile]
(let [beg (int beg)
end (int end)
offset (int offset)]
(doseq [aln alns]
(let [left (Math/max ^int (:pos aln) beg)
right (int ^long (inc (or (:end aln) (sam-util/get-end aln))))
left-index (+ (- left beg) offset)
right-index (+ (- right beg) offset)]
(aset-int pile left-index (inc (aget pile left-index)))
(when (<= right end)
(aset-int pile right-index (dec (aget pile right-index))))))
(dotimes [i (- end beg)]
(aset-int pile (+ (inc i) offset) (+ (aget pile (+ i offset)) (aget pile (+ (inc i) offset)))))))

(defn ^"[I" depth*
"Internal depth function which returns an int-array."
[rdr {:keys [chr start end] :as region}
& [{:keys [step unchecked? n-threads] :or {step default-step unchecked? false n-threads 1}}]]
(let [pile (int-array (inc (- end start)))
f (if unchecked? unchecked-aset-depth-in-region! aset-depth-in-region!)]
(if (= n-threads 1)
(f (sam/read-alignments rdr region {:depth :shallow}) start end 0 pile)
(cp/pdoseq
n-threads
[[s e] (util/divide-region start end step)]
(with-open [r (sam/clone-bam-reader rdr)]
(-> (sam/read-alignments r {:chr chr, :start s, :end e} {:depth :shallow})
(f s e (- s start) pile)))))
pile))

(defn depth
"Calculate depth of coverage eagerly. Returns a seq of depth for range [start, end].
Requires a `cljam.io.bam.reader.BAMReader` instance and region.
If start and end are not supplied, piles whole range up.
Note that CIGAR code in alignemnts are ignored and only start/end positions are used."
[bam-reader {:keys [chr start end] :or {start 1 end Long/MAX_VALUE}}
& [{:keys [step unchecked? n-threads] :or {step default-step unchecked? false n-threads 1}}]]
{:pre [chr start end (pos? start) (pos? end) (<= start end)]}
(when-let [{:keys [len]} (sam-util/ref-by-name (sam/read-refs bam-reader) chr)]
(seq
(depth*
bam-reader
{:chr chr, :start (min len start), :end (min len end)}
{:step step, :unchecked? unchecked?, :n-threads n-threads}))))
23 changes: 1 addition & 22 deletions src/cljam/algo/pileup.clj
Original file line number Diff line number Diff line change
@@ -1,29 +1,8 @@
(ns cljam.algo.pileup
"Functions to calculate pileup from the BAM."
(:require [cljam.common :refer [*n-threads*]]
[cljam.io.sam :as sam]
[cljam.algo.pileup.pileup :as plp]
(:require [cljam.io.sam :as sam]
[cljam.algo.pileup.mpileup :as mplp]))

(defn first-pos
"Returns a position of first alignment in left-right, or nil."
[bam-reader region]
(plp/first-pos bam-reader region))

(def ^:private default-pileup-option
{:n-threads 0})

(defn pileup
"Piles alignments up, returning the pileup as a lazy seq. Requires a
cljam.bam.reader.BAMReader instance and region. If start and end are not
supplied, piles whole range up."
([reader region]
(pileup reader region {}))
([bam-reader {:keys [chr start end] :or {start -1 end -1} :as region} option]
(let [option* (merge default-pileup-option option)]
(binding [*n-threads* (:n-threads option*)]
(apply plp/pileup bam-reader region (apply concat option*))))))

(defn mpileup
"Returns a lazy sequence of cljam.algo.pileup.mpileup.MPileupElement
calculated from FASTA and BAM. If start and end are not supplied, piles whole
Expand Down
63 changes: 0 additions & 63 deletions src/cljam/algo/pileup/pileup.clj

This file was deleted.

35 changes: 14 additions & 21 deletions src/cljam/tools/cli.clj
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@
[cljam.algo.sorter :as sorter]
[cljam.algo.fasta-indexer :as fai]
[cljam.algo.dict :as dict]
[cljam.algo.depth :as depth]
[cljam.algo.pileup :as plp]
[cljam.algo.convert :as convert]
[cljam.algo.level :as level])
[cljam.algo.level :as level]
[cljam.util :as util])
(:import [java.io Closeable BufferedWriter OutputStreamWriter]))

;; CLI functions
Expand Down Expand Up @@ -207,11 +209,11 @@
(defn- pileup-simple
([rdr n-threads]
(doseq [rname (map :name (sam/read-refs rdr))]
(pileup-simple rdr n-threads rname -1 -1)))
([rdr n-threads rname start end]
(pileup-simple rdr n-threads {:chr rname})))
([rdr n-threads region]
(binding [*out* (BufferedWriter. (OutputStreamWriter. System/out))
*flush-on-newline* false]
(doseq [line (plp/pileup rdr {:chr rname :start start :end end} {:n-threads n-threads})]
(doseq [line (depth/lazy-depth rdr region {:n-threads n-threads})]
(println line))
(flush))))

Expand All @@ -225,9 +227,9 @@
:qual (cstr/join (% line))
:seq (cstr/join (% line))
(% line)) [:rname :pos :ref :count :seq :qual])))))))
([rdr ref-fa rname start end]
([rdr ref-fa region]
(with-open [fa-rdr (cseq/reader ref-fa)]
(doseq [line (plp/mpileup fa-rdr rdr {:chr rname :start start :end end})]
(doseq [line (plp/mpileup fa-rdr rdr region)]
(if-not (zero? (:count line))
(println (cstr/join \tab (map #(case %
:qual (cstr/join (% line))
Expand All @@ -243,23 +245,14 @@
:qual (cstr/join (% line))
:seq (cstr/join (% line))
(% line)) [:rname :pos :ref :count :seq :qual]))))))
([rdr rname start end]
(doseq [line (plp/mpileup nil rdr {:chr rname :start start :end end})]
([rdr region]
(doseq [line (plp/mpileup nil rdr region)]
(if-not (zero? (:count line))
(println (cstr/join \tab (map #(case %
:qual (cstr/join (% line))
:seq (cstr/join (% line))
(% line)) [:rname :pos :ref :count :seq :qual])))))))

(defn- parse-region
[region-str]
(if-let [m (re-find #"^([^:]+)$" region-str)]
(conj (vec (rest m)) -1 -1)
(if-let [m (re-find #"^([^:]+):([0-9]+)-([0-9]+)$" region-str)]
(-> (vec (rest m))
(update-in [1] #(Integer/parseInt %))
(update-in [2] #(Integer/parseInt %))))))

(defn pileup [args]
(let [{:keys [options arguments errors summary]} (parse-opts args pileup-cli-options)]
(cond
Expand All @@ -273,11 +266,11 @@
(when-not (sorter/sorted-by? r)
(exit 1 "Not sorted"))
(if (:region options)
(if-let [region (parse-region (:region options))]
(if-let [region (util/parse-region (:region options))]
(cond
(:simple options) (apply pileup-simple r (:thread options) region)
(:ref options) (apply pileup-with-ref r (:ref options) region)
:else (apply pileup-without-ref r region))
(:simple options) (pileup-simple r (:thread options) region)
(:ref options) (pileup-with-ref r (:ref options) region)
:else (pileup-without-ref r region))
(exit 1 "Invalid region format"))
(cond
(:simple options) (pileup-simple r (:thread options))
Expand Down
46 changes: 45 additions & 1 deletion src/cljam/util.clj
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
(ns cljam.util
"General utilities."
(:require [clojure.java.io :refer [file] :as cio]
[clojure.string :as cstr])
[clojure.string :as cstr]
[proton.core :as proton])
(:import [org.apache.commons.compress.compressors
CompressorStreamFactory CompressorException]))

Expand Down Expand Up @@ -108,3 +109,46 @@
(map (fn [[s e]] {:chr name :start s :end e})
(divide-region 1 len step)))
refs))

(defn valid-rname?
"Checks if the given rname conforms to the spec of sam."
[rname]
(and rname (string? rname) (re-matches #"[!-)+-<>-~][!-~]*" rname)))

(defn valid-region?
"Checks if the given region map is a valid 1-based closed range."
[{:keys [chr start end]}]
(and start end
(valid-rname? chr)
(number? start) (pos? start)
(number? end) (pos? end)
(<= start end)))

(defn parse-region
"Parse a region string into a map."
[region-str]
(when region-str
(let [[_ chr _ start _ end] (re-matches #"([!-)+-<>-~][!-~]*?)(:(\d+)?(-(\d+))?)?" region-str)
start' (proton/as-long start)
end' (proton/as-long end)]
(when chr
(cond-> {:chr chr}
start' (assoc :start start')
end' (assoc :end end'))))))

(defn parse-region-strict
"Parse a region string into a map strictly."
[region-str]
(let [region-map (parse-region region-str)]
(when (valid-region? region-map) region-map)))

(defn format-region
"Format a region map into a string."
[{:keys [chr start end]}]
(let [result (apply str (interleave [nil \: \-] (take-while some? [chr start end])))]
(when-not (cstr/blank? result) result)))

(defn format-region-strict
"Format a region map into a string strictly."
[region-map]
(when (valid-region? region-map) (format-region region-map)))
Loading