Skip to content

Commit

Permalink
Merge pull request #78 from chrovis/feature/io-protocols
Browse files Browse the repository at this point in the history
Add protocols for I/O APIs.
  • Loading branch information
totakke authored May 29, 2017
2 parents e45187a + 60f08b0 commit 7da4841
Show file tree
Hide file tree
Showing 50 changed files with 880 additions and 388 deletions.
13 changes: 6 additions & 7 deletions src/cljam/bam/core.clj
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
(ns cljam.bam.core
"The core of BAM features."
(:require [clojure.java.io :refer [file]]
(:require [clojure.java.io :as cio]
[me.raynes.fs :as fs]
[cljam.io]
[cljam.lsb :as lsb]
(cljam.bam [common :refer [bam-magic]]
[reader :as reader]
Expand All @@ -27,19 +26,19 @@

(defn ^BAMReader reader [f {:keys [ignore-index]
:or {ignore-index false}}]
(let [rdr (BGZFInputStream. (file f))
(let [rdr (BGZFInputStream. (cio/file f))
data-rdr (DataInputStream. rdr)]
(when-not (Arrays/equals ^bytes (lsb/read-bytes data-rdr 4) (.getBytes ^String bam-magic))
(throw (IOException. "Invalid BAM file")))
(let [{:keys [header refs]} (reader/load-headers data-rdr)
index-delay (delay (bam-index f :ignore ignore-index))]
(BAMReader. (.getAbsolutePath (file f))
(BAMReader. (.getAbsolutePath (cio/file f))
header refs rdr data-rdr index-delay (.getFilePointer rdr)))))

(defn ^BAMReader clone-reader
"Clones bam reader sharing persistent objects."
[^BAMReader rdr]
(let [bgzf-rdr (BGZFInputStream. (file (.f rdr)))
(let [bgzf-rdr (BGZFInputStream. (cio/file (.f rdr)))
data-rdr (DataInputStream. bgzf-rdr)]
(.seek bgzf-rdr (.start-pos rdr))
(BAMReader. (.f rdr) (.header rdr) (.refs rdr) bgzf-rdr data-rdr (.index-delay rdr) (.start-pos rdr))))
Expand All @@ -48,5 +47,5 @@
;; -------

(defn ^BAMWriter writer [f]
(BAMWriter. (.getAbsolutePath (file f))
(DataOutputStream. (BGZFOutputStream. (file f)))))
(BAMWriter. (.getAbsolutePath (cio/file f))
(DataOutputStream. (BGZFOutputStream. (cio/file f)))))
48 changes: 33 additions & 15 deletions src/cljam/bam/reader.clj
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
(ns cljam.bam.reader
(:require [cljam.lsb :as lsb]
[cljam.io]
[cljam.io :as io]
[cljam.util.sam-util :refer [ref-id ref-name parse-header get-end]]
[cljam.bam-index :refer [get-spans]]
[cljam.bam-index.core :as bai]
Expand All @@ -22,33 +22,51 @@
Closeable
(close [this]
(.close ^Closeable (.reader this)))
cljam.io/ISAMReader
io/IReader
(reader-path [this]
(.f this))
(read [this]
(io/read this {}))
(read [this option]
(io/read-alignments this {} option))
io/IAlignmentReader
(read-header [this]
(.header this))
(read-refs [this]
(.refs this))
(read-alignments [this]
(read-alignments-sequentially* this :deep))
(read-alignments [this {:keys [chr start end depth]
:or {chr nil
start 1
end Long/MAX_VALUE
depth :deep}}]
(io/read-alignments this {} {}))
(read-alignments [this region]
(io/read-alignments this region {}))
(read-alignments [this
{:keys [chr start end]
:or {chr nil
start 1
end Long/MAX_VALUE}}
{:keys [depth]
:or {depth :deep}}]
(if (nil? chr)
(read-alignments-sequentially* this depth)
(read-alignments* this chr start end depth)))
(read-blocks [this]
(read-blocks-sequentially* this :normal))
(read-blocks [this {:keys [chr start end mode]
:or {chr nil
start 1
end Long/MAX_VALUE
mode :normal}}]
(io/read-blocks this {} {}))
(read-blocks [this region]
(io/read-blocks this region {}))
(read-blocks [this
{:keys [chr start end]
:or {chr nil
start 1
end Long/MAX_VALUE}}
{:keys [mode]
:or {mode :normal}}]
(if (nil? chr)
(read-blocks-sequentially* this mode)
(read-blocks* this chr start end))))
(read-blocks* this chr start end)))
io/IRegionReader
(read-in-region [this region]
(io/read-in-region this region {}))
(read-in-region [this region option]
(io/read-alignments this region option)))

;; Reading a single block
;; --------------------
Expand Down
5 changes: 3 additions & 2 deletions src/cljam/bam/writer.clj
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
(ns cljam.bam.writer
(:require [clojure.string :refer [split]]
[cljam.io]
[cljam.io :as io]
(cljam [cigar :as cgr]
[lsb :as lsb]
[util :refer [string->bytes ubyte]])
Expand All @@ -21,9 +21,10 @@
Closeable
(close [this]
(.close ^Closeable (.writer this)))
cljam.io/ISAMWriter
io/IWriter
(writer-path [this]
(.f this))
io/IAlignmentWriter
(write-header [this header]
(write-header* this header))
(write-refs [this header]
Expand Down
6 changes: 3 additions & 3 deletions src/cljam/bam_index/core.clj
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
(ns cljam.bam-index.core
"The core of BAM index features."
(:require [clojure.java.io :as io]
(:require [clojure.java.io :as cio]
[clojure.tools.logging :as logging]
[me.raynes.fs :as fs]
(cljam.bam-index [common :refer :all]
Expand Down Expand Up @@ -77,9 +77,9 @@

(defn ^BAIWriter writer
[f refs]
(BAIWriter. (DataOutputStream. (FileOutputStream. (io/file f)))
(BAIWriter. (DataOutputStream. (FileOutputStream. (cio/file f)))
refs
(.getAbsolutePath (io/file f))))
(.getAbsolutePath (cio/file f))))

(defn create-index
"Creates a BAM index file from the alignments and references data."
Expand Down
4 changes: 2 additions & 2 deletions src/cljam/bam_index/reader.clj
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
(ns cljam.bam-index.reader
(:require [clojure.java.io :as io]
(:require [clojure.java.io :as cio]
[cljam.lsb :as lsb]
[cljam.bam-index.common :refer [bai-magic]])
(:import java.util.Arrays
Expand Down Expand Up @@ -113,7 +113,7 @@
:lidx lidx}))

(defn reader [f]
(let [r (DataInputStream. (FileInputStream. (io/file f)))]
(let [r (DataInputStream. (FileInputStream. (cio/file f)))]
(when-not (Arrays/equals ^bytes (lsb/read-bytes r 4) (.getBytes ^String bai-magic))
(throw (IOException. "Invalid BAI file")))
(->BAIReader f r)))
3 changes: 1 addition & 2 deletions src/cljam/bam_index/writer.clj
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
(ns cljam.bam-index.writer
(:require [clojure.java.io :as io]
[com.climate.claypoole :as cp]
(:require [com.climate.claypoole :as cp]
[cljam.common :refer [get-exec-n-threads]]
[cljam.lsb :as lsb]
[cljam.util.sam-util :as sam-util]
Expand Down
2 changes: 1 addition & 1 deletion src/cljam/bam_indexer.clj
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,5 @@
(with-open [r (bam/reader in-bam :ignore-index true)]
(binding [*n-threads* n-threads]
(bai-core/create-index out-bai
(io/read-blocks r {:mode :pointer})
(io/read-blocks r {} {:mode :pointer})
(io/read-refs r)))))
12 changes: 5 additions & 7 deletions src/cljam/bcf.clj
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,7 @@
(defn meta-info
"Returns meta-information of the BCF from rdr as a map."
[^BCFReader rdr]
(-> (.meta-info rdr)
(update :contig (fn [xs] (map (fn [m] (dissoc m :idx)) xs)))
(update :filter (fn [xs] (keep (fn [m] (when-not (= (:id m) "PASS") (dissoc m :idx))) xs)))
(update :info (fn [xs] (map (fn [m] (dissoc m :idx)) xs)))
(update :format (fn [xs] (map (fn [m] (dissoc m :idx)) xs)))))
(bcf-reader/meta-info rdr))

(defn header
"Returns header of the BCF from rdr as a vector including header field strings."
Expand All @@ -39,8 +35,10 @@
:bcf BCF-style map. CHROM, FILTER, INFO and :genotype contains indices to meta-info.
:shallow Only CHROM, POS and ref-length are parsed.
:raw Raw map of ByteBufers."
[rdr & {:keys [depth] :or {depth :deep}}]
(bcf-reader/read-variants rdr :depth depth))
([rdr]
(read-variants rdr {}))
([rdr {:keys [depth] :or {depth :deep}}]
(bcf-reader/read-variants rdr {:depth depth})))

;; Writing
;; -------
Expand Down
83 changes: 61 additions & 22 deletions src/cljam/bcf/reader.clj
Original file line number Diff line number Diff line change
@@ -1,17 +1,45 @@
(ns cljam.bcf.reader
(:require [clojure.java.io :as io]
(:refer-clojure :exclude [read])
(:require [clojure.java.io :as cio]
[clojure.string :as cstr]
[clojure.tools.logging :as logging]
[cljam.io :as io]
[cljam.lsb :as lsb]
[cljam.vcf.reader :as vcf-reader]
[cljam.util.vcf-util :as vcf-util])
(:import [java.io Closeable IOException]
[java.nio ByteBuffer]
[bgzf4j BGZFInputStream]))

(declare read-variants meta-info)

(deftype BCFReader [^String f meta-info header ^BGZFInputStream reader ^long start-pos]
Closeable
(close [this]
(.close ^Closeable (.reader this))))
(.close ^Closeable (.reader this)))
io/IReader
(reader-path [this] (.f this))
(read [this] (io/read this {}))
(read [this option] (read-variants this option))
io/IRegionReader
(read-in-region [this region]
(io/read-in-region this region {}))
(read-in-region [this {:keys [chr start end]} option]
(logging/warn "May cause degradation of performance.")
(filter
(fn [v] (and (if chr (= (:chr v) chr) true)
(if start (<= start (:pos v)) true)
(if end (<= (+ (:pos v) (count (:ref v))) end) true)))
(read-variants this option))))

;; need dynamic extension for namespace issue.
(extend-type BCFReader
io/IVariantReader
(meta-info [this] (meta-info this))
(header [this] (.header this))
(read-variants
([this] (io/read-variants this {}))
([this option] (read-variants this option))))

(defn- parse-meta-and-header
"Parses meta-info and header of BCF files and returns them as a map.
Expand All @@ -35,7 +63,7 @@
ensure the Reader is properly closed.
Throws IOException if failed to parse BCF file format."
[f]
(let [rdr (BGZFInputStream. (io/file f))
(let [rdr (BGZFInputStream. (cio/file f))
magic (lsb/read-bytes rdr 5)]
(if (= (seq magic) (map byte "BCF\2\2"))
(let [hlen (lsb/read-int rdr)
Expand All @@ -44,7 +72,7 @@
(let [{:keys [header meta]} (->> (String. ^bytes header-buf 0 (int (dec hlen)))
cstr/split-lines
parse-meta-and-header)]
(BCFReader. (.getAbsolutePath (io/file f)) meta header rdr (.getFilePointer rdr)))
(BCFReader. (.getAbsolutePath (cio/file f)) meta header rdr (.getFilePointer rdr)))
(do
(.close rdr)
(throw (IOException. (str "Invalid file format. BCF header must be NULL-terminated."))))))
Expand Down Expand Up @@ -193,6 +221,15 @@
[meta]
(into {} (map (fn [m] [(Integer/parseInt (:idx m)) (assoc m :kw (keyword (:id m)))])) meta))

(defn meta-info
"Returns meta-information of the BCF from rdr as a map."
[^BCFReader rdr]
(-> (.meta-info rdr)
(update :contig (fn [xs] (map (fn [m] (dissoc m :idx)) xs)))
(update :filter (fn [xs] (keep (fn [m] (when-not (= (:id m) "PASS") (dissoc m :idx))) xs)))
(update :info (fn [xs] (map (fn [m] (dissoc m :idx)) xs)))
(update :format (fn [xs] (map (fn [m] (dissoc m :idx)) xs)))))

(defn read-variants
"Returns data lines of the BCF from rdr as a lazy sequence of maps.
rdr must implement cljam.bcf.BCFReader.
Expand All @@ -203,22 +240,24 @@
:bcf BCF-style map. CHROM, FILTER, INFO and :genotype contains indices to meta-info.
:shallow Only CHROM, POS and ref-length are parsed.
:raw Raw map of ByteBufers."
[^BCFReader rdr & {:keys [depth] :or {depth :deep}}]
(.seek ^BGZFInputStream (.reader rdr) ^long (.start-pos rdr))
(let [contigs (meta->map (:contig (.meta-info rdr)))
filters (assoc (meta->map (:filter (.meta-info rdr))) 0 {:id "PASS" :kw :PASS})
formats (meta->map (:format (.meta-info rdr)))
info (meta->map (:info (.meta-info rdr)))
kws (mapv keyword (drop 8 (.header rdr)))
parse-fn (case depth
:deep (comp (partial bcf-map->parsed-variant contigs filters formats info kws)
([rdr]
(read-variants rdr {}))
([^BCFReader rdr {:keys [depth] :or {depth :deep}}]
(.seek ^BGZFInputStream (.reader rdr) ^long (.start-pos rdr))
(let [contigs (meta->map (:contig (.meta-info rdr)))
filters (assoc (meta->map (:filter (.meta-info rdr))) 0 {:id "PASS" :kw :PASS})
formats (meta->map (:format (.meta-info rdr)))
info (meta->map (:info (.meta-info rdr)))
kws (mapv keyword (drop 8 (.header rdr)))
parse-fn (case depth
:deep (comp (partial bcf-map->parsed-variant contigs filters formats info kws)
parse-data-line-deep)
:vcf (comp (vcf-util/variant-vals-stringifier (.meta-info rdr) (.header rdr))
(partial bcf-map->parsed-variant contigs filters formats info kws)
parse-data-line-deep)
:vcf (comp (vcf-util/variant-vals-stringifier (.meta-info rdr) (.header rdr))
(partial bcf-map->parsed-variant contigs filters formats info kws)
parse-data-line-deep)
:bcf parse-data-line-deep
:shallow (partial parse-data-line-shallow contigs)
:raw identity)]
(read-data-lines (.reader rdr)
(fn [rdr] (parse-fn (read-data-line-buffer rdr)))
(.start-pos rdr))))
:bcf parse-data-line-deep
:shallow (partial parse-data-line-shallow contigs)
:raw identity)]
(read-data-lines (.reader rdr)
(fn [rdr] (parse-fn (read-data-line-buffer rdr)))
(.start-pos rdr)))))
14 changes: 11 additions & 3 deletions src/cljam/bcf/writer.clj
Original file line number Diff line number Diff line change
@@ -1,17 +1,25 @@
(ns cljam.bcf.writer
(:require [clojure.java.io :as io]
(:require [clojure.java.io :as cio]
[clojure.string :as cstr]
[cljam.io :as io]
[cljam.lsb :as lsb]
[cljam.vcf.writer :as vw]
[cljam.util.vcf-util :as vcf-util])
(:import [java.io Closeable IOException DataOutputStream]
[java.nio ByteBuffer ByteOrder]
[bgzf4j BGZFOutputStream]))

(declare write-variants)

(deftype BCFWriter [^String f meta-info header ^DataOutputStream writer]
Closeable
(close [this]
(.close ^Closeable (.writer this))))
(.close ^Closeable (.writer this)))
io/IWriter
(writer-path [this] (.f this))
io/IVariantWriter
(write-variants [this variants]
(write-variants this variants)))

(def ^:private ^:const bcf-meta-keys
[:fileformat :file-date :source :reference :contig :phasing :info :filter :format :alt :sample :pedigree])
Expand Down Expand Up @@ -66,7 +74,7 @@
(map-indexed (fn [i m] (assoc m :idx (str (inc i)))) others)))))
(update :info (fn [xs] (map-indexed (fn [i m] (assoc m :idx (str i))) xs)))
(update :format (fn [xs] (map-indexed (fn [i m] (assoc m :idx (str i))) xs))))
w (BCFWriter. (.getAbsolutePath (io/file f)) indexed-meta header dos)]
w (BCFWriter. (.getAbsolutePath (cio/file f)) indexed-meta header dos)]
(write-file-header w)
w))

Expand Down
Loading

0 comments on commit 7da4841

Please sign in to comment.