diff --git a/clair3.py b/clair3.py index 9056867..b8a2a4c 100644 --- a/clair3.py +++ b/clair3.py @@ -18,7 +18,9 @@ 'RealignReads', 'CreateTensorPileup', "CreateTensorFullAlignment", + 'CreateTrainingTensor', 'SplitExtendBed', + 'MergeBin', 'MergeVcf', 'SelectHetSnp', 'SelectCandidates', diff --git a/clair3/utils.py b/clair3/utils.py index ce57a16..9797553 100644 --- a/clair3/utils.py +++ b/clair3/utils.py @@ -1,10 +1,11 @@ import sys import gc +import copy import shlex import os import tables import numpy as np -from random import random +from functools import partial from clair3.task.main import * from shared.interval_tree import bed_tree_from, is_region_in @@ -206,63 +207,80 @@ def print_bin_size(path, prefix=None): print('[INFO] total: {}'.format(total)) -def bin_reader_generator_from(subprocess_list, Y, is_tree_empty, tree, miss_variant_set, is_allow_duplicate_chr_pos=False, non_variant_subsample_ratio=1.0): +def bin_reader_generator_from(tensor_fn, Y_true_var, Y, is_tree_empty, tree, miss_variant_set, is_allow_duplicate_chr_pos=False, maximum_non_variant_ratio=None): """ Bin reader generator for bin file generation. - subprocess_list: a list includes all tensor generator of each tensor file. + tensor_fn: tensor file. + Y_true_var: dictionary (contig name: label information) containing all true variant information (should not be changed). Y: dictionary (contig name: label information) to store all variant and non variant information. tree: dictionary(contig name : intervaltree) for quick region querying. miss_variant_set: sometimes there will have true variant missing after downsampling reads. is_allow_duplicate_chr_pos: whether allow duplicate positions when training, if there exists downsampled data, lower depth will add a random prefix character. - non_variant_subsample_ratio: define a maximum non variant ratio for training, we always expect use more non variant data, while it would greatly increase training + maximum_non_variant_ratio: define a maximum non variant ratio for training, we always expect use more non variant data, while it would greatly increase training time, especially in ont data, here we usually use 1:1 or 1:2 for variant candidate: non variant candidate. """ X = {} + ref_list = [] total = 0 - for f in subprocess_list: - for row_idx, row in enumerate(f.stdout): - chrom, coord, seq, string, alt_info = row.split("\t") - alt_info = alt_info.rstrip() - if not (is_tree_empty or is_region_in(tree, chrom, int(coord))): - continue - seq = seq.upper() - if seq[param.flankingBaseNum] not in 'ACGT': - continue - key = chrom + ":" + coord - is_reference = key not in Y - - if key in miss_variant_set: - continue + for row_idx, row in enumerate(tensor_fn): + chrom, coord, seq, string, alt_info = row.split("\t") + alt_info = alt_info.rstrip() + if not (is_tree_empty or is_region_in(tree, chrom, int(coord))): + continue + seq = seq.upper() + if seq[param.flankingBaseNum] not in 'ACGT': + continue + key = chrom + ":" + coord + is_reference = key not in Y_true_var - if is_reference and non_variant_subsample_ratio < 1.0 and random() >= non_variant_subsample_ratio: - continue - if key not in X: - X[key] = (string, alt_info, seq) - elif is_allow_duplicate_chr_pos: - new_key = "" - for character in PREFIX_CHAR_STR: - tmp_key = character + key - if tmp_key not in X: - new_key = tmp_key - break - if len(new_key) > 0: - X[new_key] = (string, alt_info, seq) + if key in miss_variant_set: + continue + if key not in X: + X[key] = (string, alt_info, seq) if is_reference: - Y[key] = output_labels_from_reference(BASE2BASE[seq[param.flankingBaseNum]]) - - if len(X) == shuffle_bin_size: - yield X, total - X = {} - total += 1 - if total % 100000 == 0: - print("[INFO] Processed %d tensors" % total, file=sys.stderr) - f.stdout.close() - f.wait() - yield X, total - yield None, total + ref_list.append(key) + elif is_allow_duplicate_chr_pos: + new_key = "" + for character in PREFIX_CHAR_STR: + tmp_key = character + key + if tmp_key not in X: + new_key = tmp_key + break + if len(new_key) > 0: + X[new_key] = (string, alt_info, seq) + if is_reference: + ref_list.append(new_key) + + if is_reference and key not in Y: + Y[key] = output_labels_from_reference(BASE2BASE[seq[param.flankingBaseNum]]) + + if len(X) == shuffle_bin_size: + if maximum_non_variant_ratio is not None: + _filter_non_variants(X, ref_list, maximum_non_variant_ratio) + yield X, total, False + X = {} + ref_list = [] + total += 1 + if total % 100000 == 0: + print("[INFO] Processed %d tensors" % total, file=sys.stderr) + + if maximum_non_variant_ratio is not None: + _filter_non_variants(X, ref_list, maximum_non_variant_ratio) + yield X, total, True + + +def _filter_non_variants(X, ref_list, maximum_non_variant_ratio): + non_variant_num = len(ref_list) + variant_num = len(X) - non_variant_num + if non_variant_num > variant_num * maximum_non_variant_ratio: + non_variant_keep_fraction = maximum_non_variant_ratio * variant_num / (1. * non_variant_num) + probabilities = np.random.random_sample((non_variant_num,)) + for key, p in zip(ref_list, probabilities): + if p > non_variant_keep_fraction: + X.pop(key) def get_training_array(tensor_fn, var_fn, bed_fn, bin_fn, shuffle=True, is_allow_duplicate_chr_pos=True, chunk_id=None, @@ -288,7 +306,8 @@ def get_training_array(tensor_fn, var_fn, bed_fn, bin_fn, shuffle=True, is_allow tree = bed_tree_from(bed_file_path=bed_fn) is_tree_empty = len(tree.keys()) == 0 - Y, miss_variant_set = variant_map_from(var_fn, tree, is_tree_empty) + Y_true_var, miss_variant_set = variant_map_from(var_fn, tree, is_tree_empty) + Y = copy.deepcopy(Y_true_var) global param float_type = 'int32' @@ -300,32 +319,12 @@ def get_training_array(tensor_fn, var_fn, bed_fn, bin_fn, shuffle=True, is_allow tensor_shape = param.ont_input_shape if platform == 'ont' else param.input_shape - variant_num, non_variant_num, non_variant_subsample_ratio = 0, 0, 1.0 - if maximum_non_variant_ratio is not None and candidate_details_fn_prefix: - candidate_details_fn_prefix = candidate_details_fn_prefix.split('/') - directry, file_prefix = '/'.join(candidate_details_fn_prefix[:-1]), candidate_details_fn_prefix[-1] - file_list = [f for f in os.listdir(directry) if f.startswith(file_prefix)] - for f in file_list: - fn = open(os.path.join(directry, f), 'r') - for row in fn: - chr_pos = row.split('\t')[0] - key = chr_pos.replace(' ', ':') - if key in Y: - variant_num += 1 - else: - non_variant_num += 1 - fn.close() - - max_non_variant_num = variant_num * maximum_non_variant_ratio - if max_non_variant_num < non_variant_num: - non_variant_subsample_ratio = float(max_non_variant_num / non_variant_num) - print("[INFO] variants/non variants/subsample ratio: {}/{}/{}".format(variant_num, non_variant_num, - round(non_variant_subsample_ratio, 4)), - file=sys.stderr) - # select all match prefix if file path not exists subprocess_list = [] - if os.path.exists(tensor_fn): - subprocess_list.append(subprocess_popen(shlex.split("{} -fdc {}".format(param.zstd, tensor_fn)))) + if tensor_fn == 'PIPE': + subprocess_list.append(sys.stdin) + elif os.path.exists(tensor_fn): + subprocess_list.append(subprocess_popen(shlex.split("{} -fdc {}".format(param.zstd, tensor_fn))).stdout) + # select all match prefix if file path not exists else: tensor_fn = tensor_fn.split('/') directry, file_prefix = '/'.join(tensor_fn[:-1]), tensor_fn[-1] @@ -346,7 +345,8 @@ def get_training_array(tensor_fn, var_fn, bed_fn, bin_fn, shuffle=True, is_allow return 0 for file_name in all_file_name: subprocess_list.append( - subprocess_popen(shlex.split("{} -fdc {}".format(param.zstd, os.path.join(directry, file_name))))) + subprocess_popen(shlex.split("{} -fdc {}".format(param.zstd, os.path.join(directry, file_name)))).stdout) + tables.set_blosc_max_threads(64) int_atom = tables.Atom.from_dtype(np.dtype(float_type)) string_atom = tables.StringAtom(itemsize=param.no_of_positions + 50) @@ -361,49 +361,60 @@ def get_training_array(tensor_fn, var_fn, bed_fn, bin_fn, shuffle=True, is_allow table_dict = update_table_dict() # generator to avoid high memory occupy - bin_reader_generator = bin_reader_generator_from(subprocess_list=subprocess_list, - Y=Y, - is_tree_empty=is_tree_empty, - tree=tree, - miss_variant_set=miss_variant_set, - is_allow_duplicate_chr_pos=is_allow_duplicate_chr_pos, - non_variant_subsample_ratio=non_variant_subsample_ratio) - total_compressed = 0 - while True: - X, total = next(bin_reader_generator) - if X is None or not len(X): - break - all_chr_pos = sorted(X.keys()) - if shuffle == True: - np.random.shuffle(all_chr_pos) - for key in all_chr_pos: - - string, alt_info, seq = X[key] - del X[key] - label = None - if key in Y: - label = Y[key] - pos = key + ':' + seq - if not is_allow_duplicate_chr_pos: - del Y[key] - elif is_allow_duplicate_chr_pos: - tmp_key = key[1:] - label = Y[tmp_key] - pos = tmp_key + ':' + seq - if label is None: - print(key) - continue - total_compressed = write_table_dict(table_dict, string, label, pos, total_compressed, alt_info, - tensor_shape, pileup) - - if total_compressed % 500 == 0 and total_compressed > 0: - table_dict = write_table_file(table_file, table_dict, tensor_shape, param.label_size, float_type) + bin_reader_generator = partial(bin_reader_generator_from, + Y_true_var=Y_true_var, + Y=Y, + is_tree_empty=is_tree_empty, + tree=tree, + miss_variant_set=miss_variant_set, + is_allow_duplicate_chr_pos=is_allow_duplicate_chr_pos, + maximum_non_variant_ratio=maximum_non_variant_ratio) - if total_compressed % 50000 == 0: - print("[INFO] Compressed %d tensor" % (total_compressed), file=sys.stderr) + total_compressed = 0 + for fin in subprocess_list: + bin_g = bin_reader_generator(tensor_fn=fin) + completed = False + while not completed: + try: + X, total, completed = next(bin_g) + except StopIteration: + completed = True + + if X is None or not len(X): + break + all_chr_pos = sorted(X.keys()) + if shuffle == True: + np.random.shuffle(all_chr_pos) + for key in all_chr_pos: + + string, alt_info, seq = X[key] + del X[key] + label = None + if key in Y: + label = Y[key] + pos = key + ':' + seq + if not is_allow_duplicate_chr_pos: + del Y[key] + elif is_allow_duplicate_chr_pos: + tmp_key = key[1:] + label = Y[tmp_key] + pos = tmp_key + ':' + seq + if label is None: + print(key) + continue + total_compressed = write_table_dict(table_dict, string, label, pos, total_compressed, alt_info, + tensor_shape, pileup) + + if total_compressed % 500 == 0 and total_compressed > 0: + table_dict = write_table_file(table_file, table_dict, tensor_shape, param.label_size, float_type) + + if total_compressed % 50000 == 0: + print("[INFO] Compressed %d tensor" % (total_compressed), file=sys.stderr) + fin.close() if total_compressed % 500 != 0 and total_compressed > 0: table_dict = write_table_file(table_file, table_dict, tensor_shape, param.label_size, float_type) table_file.close() print("[INFO] Compressed %d/%d tensor" % (total_compressed, total), file=sys.stderr) + diff --git a/docs/full_alignment_training.md b/docs/full_alignment_training.md index ecdb98c..5a84f16 100644 --- a/docs/full_alignment_training.md +++ b/docs/full_alignment_training.md @@ -25,9 +25,9 @@ This document shows how to train and fine-tune a deep learning model for Clair3 - [1. Setup variables](#1-setup-variables) - [2. Create temporary working folders for each submodule](#2-create-temporary-working-folders-for-each-submodule) - [3. Split and extend bed regions](#3-split-and-extend-bed-regions-using-the-splitextendbed-submodule) - - [4. Create full-alignment tensor](#4-create-full-alignment-tensor-using-the-createtensorfullalignment-submodule) - - [5. Get truth variants from unified VCF file](#5-get-truth-variants-from-unified-vcf-using-the-gettruth-submodule) - - [6. Convert full-alignment tensor to compressed binary files](#6-convert-full-alignment-tensor-to-compressed-binary-file-using-the-tensor2bin-submodule) + - [4. Get truth variants from unified VCF file](#4-get-truth-variants-from-unified-vcf-using-the-gettruth-submodule) + - [5. Create full-alignment tensor](#5-create-full-alignment-tensor-using-the-createtrainingtensor-submodule) + - [6. Merge compressed binaries](#6-merge-compressed-binaries-using-the-mergebin-submodule) * [III. Model training](#iii-model-training) - [1. full-alignment model training](#1-full-alignment-model-training) - [2. full-alignment model fine-tune using pre-trained model (optional)](#2-full-alignment-model-fine-tune-using-pre-trained-model-optional) @@ -75,6 +75,8 @@ CHR=(20) # Number of threads to be used THREADS=8 +THREADS_LOW=$((${THREADS}*3/4)) +if [[ ${THREADS_LOW} < 1 ]]; then THREADS_LOW=1; fi ``` @@ -236,14 +238,12 @@ MAXIMUM_NON_VARIANT_RATIO=1 DATASET_FOLDER_PATH="${OUTPUT_DIR}/build" TENSOR_CANDIDATE_PATH="${DATASET_FOLDER_PATH}/tensor_can" BINS_FOLDER_PATH="${DATASET_FOLDER_PATH}/bins" -CANDIDATE_DETAILS_PATH="${DATASET_FOLDER_PATH}/candidate_details" SPLIT_BED_PATH="${DATASET_FOLDER_PATH}/split_beds" VAR_OUTPUT_PATH="${DATASET_FOLDER_PATH}/var" mkdir -p ${DATASET_FOLDER_PATH} mkdir -p ${TENSOR_CANDIDATE_PATH} mkdir -p ${BINS_FOLDER_PATH} -mkdir -p ${CANDIDATE_DETAILS_PATH} mkdir -p ${SPLIT_BED_PATH} mkdir -p ${VAR_OUTPUT_PATH} @@ -263,36 +263,7 @@ ${PARALLEL} --joblog ${DATASET_FOLDER_PATH}/split_extend_bed.log -j${THREADS} \ ``` -#### 4. Create full-alignment tensor using the `CreateTensorFullAlignment` submodule - -```bash -# Create full-alignment tensor for model training -${PARALLEL} --joblog ${DATASET_FOLDER_PATH}/create_tensor_full_alignment.log -j${THREADS} \ -"${PYPY} ${CLAIR3} CreateTensorFullAlignment \ - --bam_fn {4} \ - --ref_fn {5} \ - --tensor_can_fn ${TENSOR_CANDIDATE_PATH}/tensor_can_{2}_{3}_{1}_{7} \ - --indel_fn ${CANDIDATE_DETAILS_PATH}/{2}_{3}_{1}_{7} \ - --ctgName ${CHR_PREFIX}{1} \ - --samtools ${SAMTOOLS} \ - --min_af ${MIN_AF} \ - --extend_bed ${SPLIT_BED_PATH}/{2}_{3}_{1} \ - --bed_fn {6} \ - --add_no_phasing_data_training \ - --phasing_info_in_bam \ - --chunk_id {7} \ - --platform ${PLATFORM} \ - --chunk_num ${chunk_num}" ::: ${CHR[@]} ::: ${ALL_SAMPLE[@]} :::+ ${DEPTHS[@]} :::+ ${ALL_PHASED_BAM_FILE_PATH[@]} :::+ ${ALL_REFERENCE_FILE_PATH[@]} :::+ ${ALL_BED_FILE_PATH[@]} ::: ${CHUNK_LIST[@]} - -``` - -**Options** - - - `--zstd` : we recommend using [zstd](https://github.com/facebook/zstd) , an extremely fast and lossless compression tool to compress temporary tensor output. zstd provides much higher compression ratios compared to other compression tools. - - `--phasing_info_in_bam` : enabled by default, indicating the input BAM is phased using WhatsHap's `haplotag` module, and phased alignments are having a `HP` tag with haplotype details. - - `--add_no_phasing_data_training` : also include unphased alignments in additional to the phased alignments for training. We found including unphased alignments increased model robustness. - -#### 5. Get truth variants from unified VCF using the `GetTruth` submodule +#### 4. Get truth variants from unified VCF using the `GetTruth` submodule ```bash # Covert unified VCF file into simplified var file @@ -304,30 +275,51 @@ ${PARALLEL} --joblog ${VAR_OUTPUT_PATH}/get_truth.log -j${THREADS} \ ``` -#### 6. Convert full-alignment tensor to compressed binary file using the `Tensor2Bin` submodule +#### 5. Create full-alignment tensor using the `CreateTrainingTensor` submodule ```bash -# Convert full-alignment tensor to compressed bin -${PARALLEL} --joblog ${DATASET_FOLDER_PATH}/tensor2Bin.log -j${THREADS} \ -"${PYTHON3} ${CLAIR3} Tensor2Bin \ - --tensor_fn ${TENSOR_CANDIDATE_PATH}/tensor_can_{2}_{3}_{1} \ - --var_fn ${VAR_OUTPUT_PATH}/var_{2}_{3}_{1} \ - --bin_fn ${BINS_FOLDER_PATH}/{2}_{3}_{1}_{4} \ - --chunk_id {4} \ - --chunk_num ${bin_chunk_num} \ +# Create full-alignment tensor for model training +${PARALLEL} --joblog ${DATASET_FOLDER_PATH}/create_tensor_full_alignment.log -j${THREADS_LOW} \ +"${PYPY} ${CLAIR3} CreateTrainingTensor \ + --bam_fn {4} \ + --ref_fn {5} \ + --var_fn ${VAR_OUTPUT_PATH}/var_{2}_{3}_{1}" \ + --bin_fn ${TENSOR_CANDIDATE_PATH}/tensor_{2}_{3}_{1}_{7} \ + --ctgName ${CHR_PREFIX}{1} \ + --samtools ${SAMTOOLS} \ + --min_af ${MIN_AF} \ + --extend_bed ${SPLIT_BED_PATH}/{2}_{3}_{1} \ + --bed_fn {6} \ + --phasing_info_in_bam \ + --add_no_phasing_data_training \ --allow_duplicate_chr_pos \ --platform ${PLATFORM} \ --shuffle \ --maximum_non_variant_ratio ${MAXIMUM_NON_VARIANT_RATIO} \ - --candidate_details_fn_prefix ${CANDIDATE_DETAILS_PATH}/{2}_{3}_{1}_" ::: ${CHR[@]} ::: ${ALL_SAMPLE[@]} :::+ ${DEPTHS[@]} ::: ${BIN_CHUNK_LIST[@]} + --chunk_id {7} \ + --chunk_num ${chunk_num}" ::: ${CHR[@]} ::: ${ALL_SAMPLE[@]} :::+ ${DEPTHS[@]} :::+ ${ALL_PHASED_BAM_FILE_PATH[@]} :::+ ${ALL_REFERENCE_FILE_PATH[@]} :::+ ${ALL_BED_FILE_PATH[@]} ::: ${CHUNK_LIST[@]} ``` **Options** + - `--phasing_info_in_bam` : enabled by default, indicating the input BAM is phased using WhatsHap's `haplotag` module, and phased alignments are having a `HP` tag with haplotype details. - `--allow_duplicate_chr_pos` : for multiple coverages training, this option is required to to allow different coverage training samples at the same variant site. - `--shuffle` : as the input tensors are created in the order of starting positions, this option shuffles the training data in each chunk. During the training process, we also apply index reshuffling in each epoch. - `--maximum_non_variant_ratio` : we set a maximum non-variant ratio (variant:non-variant = 1:1) for full-alignment model training, non-variants are randomly selected from the candidate set if the ratio is exceeded, or all non-variants will be used for training otherwise. + - `--add_no_phasing_data_training` : also include unphased alignments in additional to the phased alignments for training. We found including unphased alignments increased model robustness. + +#### 6. Merge compressed binaries using the `MergeBin` submodule + +```bash +# Merge compressed binaries +${PARALLEL} --joblog ${DATASET_FOLDER_PATH}/mergeBin.log -j${THREADS} \ +"${PYTHON3} ${CLAIR3} MergeBin \ + ${TENSOR_CANDIDATE_PATH}/tensor_{2}_{3}_{1}_* \ + --out_fn ${BINS_FOLDER_PATH}/bin_{2}_{3}_{1} \ + --chunk_num ${bin_chunk_num}" ::: ${CHR[@]} ::: ${ALL_SAMPLE[@]} :::+ ${DEPTHS[@]} + +``` ---- diff --git a/docs/pileup_training.md b/docs/pileup_training.md index 0b9cc17..436f0fc 100644 --- a/docs/pileup_training.md +++ b/docs/pileup_training.md @@ -23,9 +23,9 @@ This document shows how to train and fine-tune a deep-learning model for Clair3 - [1. Setup variables](#1-setup-variables) - [2. Create working directories ](#2-create-temporary-working-folders-for-each-submodule) - [3. Split and extend bed regions](#3-split-bed-regions-using-the-splitextendbed-submodule) - - [4. Create pileup tensor](#4-create-pileup-tensor-using-the-createtensorpileup-submodule) - - [5. Get truth variants from unified VCF file](#5-get-truth-variants-from-unified-vcf-using-the-gettruth-submodule) - - [6. Convert pileup tensor to compressed binary file](#6-convert-pileup-tensor-to-compressed-binary-file-using-the-tensor2bin-submodule) + - [4. Get truth variants from unified VCF file](#4-get-truth-variants-from-unified-vcf-using-the-gettruth-submodule) + - [5. Create pileup tensor](#5-create-pileup-tensor-using-the-createtrainingtensor-submodule) + - [6. Merge compressed binaries](#6-merge-compressed-binaries-using-the-mergebin-submodule) * [III. Model training](#iii-model-training) - [1. Pileup model training](#1-pileup-model-training) - [2. Pileup model fine-tune using pre-trained model](#2-pileup-model-fine-tune-using-pre-trained-model-optional) @@ -66,6 +66,8 @@ mkdir -p ${SUBSAMPLED_BAMS_FOLDER_PATH} # Other parameters THREADS=8 +THREADS_LOW=$((${THREADS}*3/4)) +if [[ ${THREADS_LOW} < 1 ]]; then THREADS_LOW=1; fi PARALLEL='parallel' SAMTOOLS='samtools' @@ -185,14 +187,12 @@ MAXIMUM_NON_VARIANT_RATIO=5 DATASET_FOLDER_PATH="${OUTPUT_DIR}/build" TENSOR_CANDIDATE_PATH="${DATASET_FOLDER_PATH}/tensor_can" BINS_FOLDER_PATH="${DATASET_FOLDER_PATH}/bins" -CANDIDATE_DETAILS_PATH="${DATASET_FOLDER_PATH}/candidate_details" SPLIT_BED_PATH="${DATASET_FOLDER_PATH}/split_beds" VAR_OUTPUT_PATH="${DATASET_FOLDER_PATH}/var" mkdir -p ${DATASET_FOLDER_PATH} mkdir -p ${TENSOR_CANDIDATE_PATH} mkdir -p ${BINS_FOLDER_PATH} -mkdir -p ${CANDIDATE_DETAILS_PATH} mkdir -p ${SPLIT_BED_PATH} mkdir -p ${VAR_OUTPUT_PATH} @@ -211,23 +211,37 @@ ${PARALLEL} --joblog ${DATASET_FOLDER_PATH}/split_extend_bed.log -j${THREADS} \ ``` -#### 4. Create pileup tensor using the `CreateTensorPileup` submodule +#### 4. Get truth variants from unified VCF using the `GetTruth` submodule ```bash -# Create pileup tensors for model training -${PARALLEL} --joblog ${DATASET_FOLDER_PATH}/create_tensor_pileup.log -j${THREADS} \ -"${PYPY} ${CLAIR3} CreateTensorPileup \ +${PARALLEL} --joblog ${VAR_OUTPUT_PATH}/get_truth.log -j${THREADS} \ +"${PYPY} ${CLAIR3} GetTruth \ + --vcf_fn {4} \ + --ctgName ${CHR_PREFIX}{1} \ + --var_fn ${VAR_OUTPUT_PATH}/var_{2}_{3}_{1}" ::: ${CHR[@]} ::: ${ALL_SAMPLE[@]} :::+ ${DEPTHS[@]} :::+ ${UNIFIED_VCF_FILE_PATH[@]} + +``` + +#### 5. Create pileup tensor using the `CreateTrainingTensor` submodule + +```bash +# Create pileup tensor for model training +${PARALLEL} --joblog ${DATASET_FOLDER_PATH}/create_tensor_pileup.log -j${THREADS_LOW} \ +"${PYPY} ${CLAIR3} CreateTrainingTensor \ --bam_fn {4} \ --ref_fn {5} \ - --tensor_can_fn ${TENSOR_CANDIDATE_PATH}/tensor_can_{2}_{3}_{1}_{7} \ - --indel_fn ${CANDIDATE_DETAILS_PATH}/{2}_{3}_{1}_{7} \ + --var_fn ${VAR_OUTPUT_PATH}/var_{2}_{3}_{1}" \ + --bin_fn ${TENSOR_CANDIDATE_PATH}/tensor_{2}_{3}_{1}_{7} \ --ctgName ${CHR_PREFIX}{1} \ --samtools ${SAMTOOLS} \ --snp_min_af ${MIN_SNP_AF} \ --indel_min_af ${MIN_INDEL_AF} \ --extend_bed ${SPLIT_BED_PATH}/{2}_{3}_{1} \ - --platform ${PLATFORM} \ --bed_fn {6} \ + --pileup \ + --platform ${PLATFORM} \ + --shuffle \ + --maximum_non_variant_ratio ${MAXIMUM_NON_VARIANT_RATIO} \ --chunk_id {7} \ --chunk_num ${chunk_num}" ::: ${CHR[@]} ::: ${ALL_SAMPLE[@]} :::+ ${DEPTHS[@]} :::+ ${ALL_BAM_FILE_PATH[@]} :::+ ${ALL_REFERENCE_FILE_PATH[@]} :::+ ${ALL_BED_FILE_PATH[@]} ::: ${CHUNK_LIST[@]} |& tee ${DATASET_FOLDER_PATH}/CTP.log @@ -235,45 +249,24 @@ ${PARALLEL} --joblog ${DATASET_FOLDER_PATH}/create_tensor_pileup.log -j${THREADS **Options** - - `--zstd` : we recommend using [zstd](https://github.com/facebook/zstd) , an extremely fast and lossless compression tool to compress temporary tensor output. zstd provides much higher compression ratios compared to other compression tools. + - `--allow_duplicate_chr_pos` : for multiple coverages training, this option is required to to allow different coverage training samples at the same variant site. + - `--shuffle` : as the input tensors are created in the order of starting positions, this option shuffles the training data in each chunk. During the training process, we also apply index reshuffling in each epoch. + - `--maximum_non_variant_ratio` : we set a maximum non-variant ratio (variant:non-variant = 1:5) for pileup model training, non-variants are randomly selected from the candidate set if the ratio is exceeded, or all non-variants will be used for training otherwise. - `--max_depth` : set the depth cap of every genome position. Pileup input summarizes position-level read alignments where depth information varies in the training materials. If not contrained by computational resources, we recommend setting the depth cap to the maximum depth coverage of the training materials. -#### 5. Get truth variants from unified VCF using the `GetTruth` submodule +#### 6. Merge compressed binaries using the `MergeBin` submodule ```bash -${PARALLEL} --joblog ${VAR_OUTPUT_PATH}/get_truth.log -j${THREADS} \ -"${PYPY} ${CLAIR3} GetTruth \ - --vcf_fn {4} \ - --ctgName ${CHR_PREFIX}{1} \ - --var_fn ${VAR_OUTPUT_PATH}/var_{2}_{3}_{1}" ::: ${CHR[@]} ::: ${ALL_SAMPLE[@]} :::+ ${DEPTHS[@]} :::+ ${UNIFIED_VCF_FILE_PATH[@]} - -``` - -#### 6. Convert pileup tensor to compressed binary file using the `Tensor2Bin` submodule -```bash -# Convert pileup tensor to compressed binaries -${PARALLEL} --joblog ${DATASET_FOLDER_PATH}/tensor2Bin.log -j${THREADS} \ -"${PYTHON3} ${CLAIR3} Tensor2Bin \ - --tensor_fn ${TENSOR_CANDIDATE_PATH}/tensor_can_{2}_{3}_{1} \ - --var_fn ${VAR_OUTPUT_PATH}/var_{2}_{3}_{1} \ - --bin_fn ${BINS_FOLDER_PATH}/{2}_{3}_{1}_{4} \ - --chunk_id {4} \ - --chunk_num ${bin_chunk_num} \ +# Merge compressed binaries +${PARALLEL} --joblog ${DATASET_FOLDER_PATH}/mergeBin.log -j${THREADS} \ +"${PYTHON3} ${CLAIR3} MergeBin \ + ${TENSOR_CANDIDATE_PATH}/tensor_{2}_{3}_{1}_* \ + --out_fn ${BINS_FOLDER_PATH}/bin_{2}_{3}_{1} \ --pileup \ - --allow_duplicate_chr_pos \ - --platform ${PLATFORM} \ - --shuffle \ - --maximum_non_variant_ratio ${MAXIMUM_NON_VARIANT_RATIO} \ - --candidate_details_fn_prefix ${CANDIDATE_DETAILS_PATH}/{2}_{3}_{1}_" ::: ${CHR[@]} ::: ${ALL_SAMPLE[@]} :::+ ${DEPTHS[@]} ::: ${BIN_CHUNK_LIST[@]} + --chunk_num ${bin_chunk_num}" ::: ${CHR[@]} ::: ${ALL_SAMPLE[@]} :::+ ${DEPTHS[@]} ``` -**Options** - - - `--allow_duplicate_chr_pos` : for multiple coverages training, this option is required to to allow different coverage training samples at the same variant site. - - `--shuffle` : as the input tensors are created in the order of starting positions, this option shuffles the training data in each chunk. During the training process, we also apply index reshuffling in each epoch. - - `--maximum_non_variant_ratio` : we set a maximum non-variant ratio (variant:non-variant = 1:5) for pileup model training, non-variants are randomly selected from the candidate set if the ratio is exceeded, or all non-variants will be used for training otherwise. - ---- ## III. Model training diff --git a/docs/representation_unification.md b/docs/representation_unification.md index e34edec..d0791f7 100644 --- a/docs/representation_unification.md +++ b/docs/representation_unification.md @@ -36,7 +36,7 @@ This document shows how Clair3 unifies the representation between the training m - [1. Setup variables](#1-setup-variables) - [2. Phase VCF file using WhatsHap](#2--phase-vcf-file-using-whatshap) - [3. Haplotag read alignment using WhatsHap](#3--haplotag-read-alignment-using-whatshap) -- [4. Prepare true variant set and candidate input](#4--prepare-true-variant-set-and-candidate-input) +- [4. Prepare true variant set](#4--prepare-true-variant-set) - [5. Unify Representation for true variant set and candidate sites](#5--unify-representation-for-true-variant-set-and-candidate-sites) - [6. Merge and sort unified VCF output](#6--merge-and-sort-unified-vcf-output) - [7. Benchmark using unified VCF and true variant set (optional)](#7--benchmark-using-unified-vcf-and-true-variant-set-optional) @@ -76,14 +76,12 @@ CHUNK_LIST=`seq 1 ${chunk_num}` MIN_AF=0.08 # Temporary working directory -CANDIDATE_DETAILS_PATH="${OUTPUT_DIR}/candidate_details" SPLIT_BED_PATH="${OUTPUT_DIR}/split_beds" VCF_OUTPUT_PATH="${OUTPUT_DIR}/vcf_output" VAR_OUTPUT_PATH="${OUTPUT_DIR}/var" PHASE_VCF_PATH="${OUTPUT_DIR}/phased_vcf" PHASE_BAM_PATH="${OUTPUT_DIR}/phased_bam" -mkdir -p ${CANDIDATE_DETAILS_PATH} mkdir -p ${SPLIT_BED_PATH} mkdir -p ${VCF_OUTPUT_PATH} mkdir -p ${VAR_OUTPUT_PATH} @@ -134,7 +132,7 @@ ${PARALLEL} --joblog ${PHASE_BAM_PATH}/index.log -j ${THREADS} ${SAMTOOLS} index ``` -#### 4. Prepare true variant set and candidate input +#### 4. Prepare true variant set ```bash # Split bed file regions according to the contig name and extend bed region @@ -151,23 +149,6 @@ ${PARALLEL} --joblog ${VAR_OUTPUT_PATH}/get_truth.log -j${THREADS} \ --ctgName ${CHR_PREFIX}{1} \ --var_fn ${VAR_OUTPUT_PATH}/var_{1}" ::: ${CHR[@]} -# Create candidate details for representation unification -${PARALLEL} --joblog ${CANDIDATE_DETAILS_PATH}/create_tensor.log -j${THREADS} \ -"${PYPY} ${CLAIR3} CreateTensorFullAlignment \ - --bam_fn ${PHASE_BAM_PATH}/{1}.bam \ - --ref_fn ${REFERENCE_FILE_PATH} \ - --indel_fn ${CANDIDATE_DETAILS_PATH}/{1}_{2} \ - --ctgName ${CHR_PREFIX}{1} \ - --samtools ${SAMTOOLS} \ - --min_af ${MIN_AF} \ - --extend_bed ${SPLIT_BED_PATH}/{1} \ - --unify_repre_fn ${CANDIDATE_DETAILS_PATH}/{1}_{2} \ - --unify_repre \ - --phasing_info_in_bam \ - --bed_fn ${BED_FILE_PATH} \ - --chunk_id {2} \ - --chunk_num ${chunk_num}" ::: ${CHR[@]} ::: ${CHUNK_LIST[@]} - ``` #### 5. Unify Representation for true variant set and candidate sites @@ -175,13 +156,16 @@ ${PARALLEL} --joblog ${CANDIDATE_DETAILS_PATH}/create_tensor.log -j${THREADS} \ ```bash ${PARALLEL} --joblog ${OUTPUT_DIR}/unify_repre.log -j${THREADS} \ "${PYPY} ${CLAIR3} UnifyRepresentation \ + --bam_fn ${PHASE_BAM_PATH}/{1}.bam \ --var_fn ${VAR_OUTPUT_PATH}/var_{1} \ - --candidate_details_fn_prefix ${CANDIDATE_DETAILS_PATH}/{1}_ \ --ref_fn ${REFERENCE_FILE_PATH} \ + --bed_fn ${BED_FILE_PATH} \ + --extend_bed ${SPLIT_BED_PATH}/{1} \ --output_vcf_fn ${VCF_OUTPUT_PATH}/vcf_{1}_{2} \ + --samtools ${SAMTOOLS} \ + --min_af ${MIN_AF} \ --chunk_id {2} \ --chunk_num ${chunk_num} \ - --bed_fn ${BED_FILE_PATH} \ --platform ${PLATFORM} \ --ctgName ${CHR_PREFIX}{1}" ::: ${CHR[@]} ::: ${CHUNK_LIST[@]} > ${OUTPUT_DIR}/RU.log diff --git a/preprocess/CreateTensorFullAlignment.py b/preprocess/CreateTensorFullAlignment.py index 0912a13..6d4fb32 100644 --- a/preprocess/CreateTensorFullAlignment.py +++ b/preprocess/CreateTensorFullAlignment.py @@ -606,14 +606,17 @@ def CreateTensorFullAlignment(args): samtools_mpileup_process = subprocess_popen( shlex.split(samtools_command), stdin=stdin) - if tensor_can_output_path != "PIPE": - tensor_can_fpo = open(tensor_can_output_path, "wb") - tensor_can_fp = subprocess_popen(shlex.split("{} -c".format(args.zstd)), stdin=PIPE, stdout=tensor_can_fpo) - elif not unify_repre: - tensor_can_fp = TensorStdout(sys.stdout) - - if unify_repre_fn: - label_fp = open(unify_repre_fn, 'w') + if not unify_repre: + if tensor_can_output_path != "PIPE": + tensor_can_fpo = open(tensor_can_output_path, "wb") + tensor_can_fp = subprocess_popen(shlex.split("{} -c".format(args.zstd)), stdin=PIPE, stdout=tensor_can_fpo) + else: + tensor_can_fp = TensorStdout(sys.stdout) + else: + if unify_repre_fn != "PIPE": + label_fp = open(unify_repre_fn, 'w') + else: + label_fp = sys.stdout if alt_fn: output_alt_fn = alt_fn alt_fp = open(output_alt_fn, 'w') @@ -800,7 +803,7 @@ def samtools_pileup_generator_from(samtools_mpileup_process): alt_info = alt_info.replace('-', '\t') alt_fp.write('\t'.join([ctg_name + ' ' + str(pos), alt_info]) + '\n') - if unify_repre and unify_repre_fn: + if unify_repre: label_info = get_alt_info(center_pos=pos, pileup_dict=pileup_dict, ref_seq=ref_seq, @@ -820,7 +823,7 @@ def samtools_pileup_generator_from(samtools_mpileup_process): if alt_fn: alt_fp.close() - if unify_repre_fn: + if unify_repre and unify_repre_fn != "PIPE": label_fp.close() diff --git a/preprocess/CreateTrainingTensor.py b/preprocess/CreateTrainingTensor.py new file mode 100644 index 0000000..664b709 --- /dev/null +++ b/preprocess/CreateTrainingTensor.py @@ -0,0 +1,302 @@ +import sys +import shlex +import subprocess +import signal +import random +import os +from os.path import dirname +from time import sleep +from argparse import ArgumentParser, SUPPRESS +import logging + +logging.getLogger().setLevel(logging.INFO) + + +from shared.command_options import ( + CommandOption, + CommandOptionWithNoValue, + ExecuteCommand, + command_string_from, + command_option_from +) +from shared.utils import file_path_from, executable_command_string_from, subprocess_popen, str2bool, log_warning +import shared.param_p as param + + +class InstancesClass(object): + def __init__(self): + self.create_tensor = None + self.compress_tensor = None + + def poll(self): + self.create_tensor.poll() + self.compress_tensor.poll() + + +c = InstancesClass() + + +def check_return_code(signum, frame): + c.poll() + if c.create_tensor.returncode != None and c.create_tensor.returncode != 0: + c.compress_tensor.kill() + sys.exit("CreateTensor.py exited with exceptions. Exiting...") + + if c.compress_tensor.returncode != None and c.compress_tensor.returncode != 0: + c.create_tensor.kill() + sys.exit("Tensor2Bin.py exited with exceptions. Exiting...") + + if ( + c.create_tensor.returncode == None or + c.compress_tensor.returncode == None + ): + signal.alarm(5) + + +def Run(args): + basedir = dirname(__file__) + + CTP_Bin = basedir + "/../clair3.py CreateTensorPileup" + CTFA_Bin = basedir + "/../clair3.py CreateTensorFullAlignment" + T2B_Bin = basedir + "/../clair3.py Tensor2Bin" + + if args.delay > 0: + delay = random.randrange(0, args.delay) + print("[INFO] Delay %d seconds before starting tensor creation ..." % (delay)) + sleep(delay) + + pypyBin = executable_command_string_from(args.pypy, exit_on_not_found=True) + pythonBin = executable_command_string_from(args.python, exit_on_not_found=True) + samtoolsBin = executable_command_string_from(args.samtools, exit_on_not_found=True) + + if args.pileup: + bam_fn = file_path_from(args.bam_fn, exit_on_not_found=True) + else: + bam_fn = file_path_from(args.bam_fn) + if bam_fn is None or bam_fn == "": + print(log_warning( + "[WARNING] Skip full-alignment variant calling for empty full-alignment regions")) + return + ref_fn = file_path_from(args.ref_fn, exit_on_not_found=True) + bed_fn = file_path_from(args.bed_fn) + vcf_fn = file_path_from(args.vcf_fn) + var_fn = file_path_from(args.var_fn, exit_on_not_found=True) + bin_fn = args.bin_fn + extend_bed = file_path_from(args.extend_bed) + + platform = args.platform + if not platform or platform not in param.support_platform: + sys.exit("[ERROR] Provided platform are not in support platform list [ont, hifi, ilmn]") + + pileup = args.pileup + ctgName = args.ctgName + min_af = args.min_af if args.min_af else param.min_af_dict[platform] + snp_min_af = args.snp_min_af + indel_min_af = args.indel_min_af + + if ctgName is None: + sys.exit("--ctgName must be specified. You can call variants on multiple chromosomes simultaneously.") + + pileup_mode = command_option_from(args.pileup, 'pileup') + phasing_info_mode = command_option_from(args.phasing_info_in_bam, 'phasing_info_in_bam') + add_no_phasing_mode = command_option_from(args.add_no_phasing_data_training, 'add_no_phasing_data_training') + allow_duplicate_mode = command_option_from(args.allow_duplicate_chr_pos, 'allow_duplicate_chr_pos') + maximum_non_variant_ratio = CommandOption('maximum_non_variant_ratio', args.maximum_non_variant_ratio) + shuffle_mode = command_option_from(args.shuffle, 'shuffle') + + ctgStart = None + ctgEnd = None + chunk_id = None + chunk_num = None + if args.ctgStart is not None and args.ctgEnd is not None and int(args.ctgStart) <= int(args.ctgEnd): + ctgStart = CommandOption('ctgStart', args.ctgStart) + ctgEnd = CommandOption('ctgEnd', args.ctgEnd) + + if args.chunk_id is not None and args.chunk_num is not None and int(args.chunk_id) <= int(args.chunk_num): + chunk_id = CommandOption('chunk_id', args.chunk_id) + chunk_num = CommandOption('chunk_num', args.chunk_num) + + CT_Bin = CTP_Bin if pileup else CTFA_Bin + create_tensor_command_options = [ + pypyBin, + CT_Bin, + CommandOption('bam_fn', bam_fn), + CommandOption('ref_fn', ref_fn), + CommandOption('vcf_fn', vcf_fn), + CommandOption('ctgName', ctgName), + CommandOption('platform', platform), + CommandOption('samtools', samtoolsBin), + CommandOption('bed_fn', bed_fn), + CommandOption('extend_bed', extend_bed), + ctgStart, + ctgEnd, + chunk_id, + chunk_num, + ] + + if not pileup: + create_tensor_command_options.append(CommandOption('min_af', min_af)) + create_tensor_command_options.append(phasing_info_mode) + create_tensor_command_options.append(add_no_phasing_mode) + else: + create_tensor_command_options.append(CommandOption('snp_min_af', snp_min_af)) + create_tensor_command_options.append(CommandOption('indel_min_af', indel_min_af)) + + compress_tensor_command_options = [ + pythonBin, + T2B_Bin, + CommandOption('platform', platform), + CommandOption('var_fn', var_fn), + CommandOption('bin_fn', bin_fn), + CommandOption('bed_fn', bed_fn), + chunk_id, + chunk_num, + allow_duplicate_mode, + maximum_non_variant_ratio, + shuffle_mode, + ] + if pileup: + compress_tensor_command_options.append(pileup_mode) + + try: + c.create_tensor = subprocess_popen( + shlex.split(command_string_from(create_tensor_command_options)), + ) + + c.compress_tensor = subprocess_popen( + shlex.split(command_string_from(compress_tensor_command_options)), + stdin=c.create_tensor.stdout, stdout=sys.stderr + ) + except Exception as e: + print(e, file=sys.stderr) + sys.exit("Failed to start required processes. Exiting...") + + signal.signal(signal.SIGALRM, check_return_code) + signal.alarm(2) + + try: + c.compress_tensor.wait() + signal.alarm(0) + c.create_tensor.stdout.close() + c.create_tensor.wait() + except KeyboardInterrupt as e: + print("KeyboardInterrupt received when waiting at Tensor2Bin, terminating all scripts.") + try: + c.compress_tensor.terminate() + c.create_tensor.terminate() + except Exception as e: + print(e) + + raise KeyboardInterrupt + except Exception as e: + print("Exception received when waiting at CreateTensor, terminating all scripts.") + print(e) + try: + c.compress_tensor.terminate() + c.create_tensor.terminate() + except Exception as e: + print(e) + + raise e + + +def main(): + parser = ArgumentParser(description="Call variants using a trained model and a BAM file") + + parser.add_argument('--platform', type=str, default="ont", + help="Sequencing platform of the input. Options: 'ont,hifi,ilmn', default: %(default)s") + + parser.add_argument('--bam_fn', type=str, default="bam.bam", required=True, + help="BAM file input, required") + + parser.add_argument('--ref_fn', type=str, default="ref.fa", required=True, + help="Reference fasta file input, required") + + parser.add_argument('--var_fn', type=str, default=None, required=True, + help="Unified VCF input filename, required") + + parser.add_argument('--bin_fn', type=str, default=None, required=True, + help="Compressed binary output filename, required") + + parser.add_argument('--vcf_fn', type=str, default=None, + help="Candidate sites VCF file input, if provided, variants will only be called at the sites in the VCF file, default: %(default)s") + + parser.add_argument('--ctgName', type=str, default=None, + help="The name of sequence to be processed, required if --bed_fn is not defined") + + parser.add_argument('--ctgStart', type=int, default=None, + help="The 1-based starting position of the sequence to be processed, optional, will process the whole --ctgName if not set") + + parser.add_argument('--ctgEnd', type=int, default=None, + help="The 1-based inclusive ending position of the sequence to be processed, optional, will process the whole --ctgName if not set") + + parser.add_argument('--bed_fn', type=str, nargs='?', action="store", default=None, + help="Call variant only in the provided regions. Will take an intersection if --ctgName and/or (--ctgStart, --ctgEnd) are set") + + parser.add_argument('--min_af', type=float, default=None, + help="Minimum allele frequency for both SNP and Indel for a site to be considered as a candidate site, default: %(default)f") + + parser.add_argument('--snp_min_af', type=float, default=0.08, + help="Minimum SNP allele frequency for a site to be considered as a candidate site, default: %(default)f") + + parser.add_argument('--indel_min_af', type=float, default=0.08, + help="Minimum Indel allele frequency for a site to be considered as a candidate site, default: %(default)f") + + parser.add_argument('--samtools', type=str, default="samtools", + help="Path to the 'samtools', samtools version >= 1.10 is required, default: %(default)s") + + parser.add_argument('--pypy', type=str, default="pypy3", + help="Path to the 'pypy', pypy3 version >= 3.6 is required, default: %(default)s") + + parser.add_argument('--python', type=str, default="python3", + help="Path to the 'python3', default: %(default)s") + + # options for advanced users + parser.add_argument('--maximum_non_variant_ratio', default=None, type=float, + help='Maximum ratio of non-variants to variants, default: %(default)f') + + parser.add_argument('--extend_bed', nargs='?', action="store", type=str, default=None, + help="DEBUG: Extend the regions in the --bed_fn by a few bp for tensor creation, default extend 16bp") + + # options for internal process control, don't use any of them unless you are sure about the consequences + ## In pileup mode or not + parser.add_argument('--pileup', action='store_true', + help=SUPPRESS) + + parser.add_argument('--phasing_info_in_bam', action='store_true', + help="DEBUG: Skip phasing and use the phasing info provided in the input BAM (HP tag), default: False") + + parser.add_argument('--add_no_phasing_data_training', action='store_true', + help=SUPPRESS) + + parser.add_argument('--allow_duplicate_chr_pos', action='store_true', + help=SUPPRESS) + + parser.add_argument('--shuffle', action='store_true', + help=SUPPRESS) + + ## The number of chucks to be divided into for parallel processing + parser.add_argument('--chunk_num', type=int, default=None, + help=SUPPRESS) + + ## The chuck ID to work on + parser.add_argument('--chunk_id', type=int, default=None, + help=SUPPRESS) + + ## Wait a short while for no more than a few seconds to start the job. This is to avoid starting multiple jobs simultaneously + ## that might use up the maximum number of threads allowed, because Tensorflow will create more threads than needed at the beginning of running the program + ## Obseleted after adding --tensorflow_threads defaulted at 4 + parser.add_argument('--delay', type=int, default=5, + help=SUPPRESS) + + args = parser.parse_args() + + if len(sys.argv[1:]) == 0: + parser.print_help() + sys.exit(1) + + Run(args) + + +if __name__ == "__main__": + main() diff --git a/preprocess/MergeBin.py b/preprocess/MergeBin.py new file mode 100644 index 0000000..42e41c4 --- /dev/null +++ b/preprocess/MergeBin.py @@ -0,0 +1,96 @@ +import sys +import logging +import numpy as np +from argparse import ArgumentParser, SUPPRESS +import tables + +import clair3.utils as utils + +logging.basicConfig(format='%(message)s', level=logging.INFO) + +def Run(args): + in_fn_list = args.in_fn + out_fn = args.out_fn + platform = args.platform + pileup = args.pileup + + global param + float_type = 'int32' + if pileup: + import shared.param_p as param + else: + import shared.param_f as param + float_type = 'int8' + + tensor_shape = param.ont_input_shape if platform == 'ont' else param.input_shape + + # select all match prefix if file path not exists + tables.set_blosc_max_threads(64) + int_atom = tables.Atom.from_dtype(np.dtype(float_type)) + string_atom = tables.StringAtom(itemsize=param.no_of_positions + 50) + long_string_atom = tables.StringAtom(itemsize=5000) # max alt_info length + table_file = tables.open_file(out_fn, mode='w', filters=utils.FILTERS) + table_file.create_earray(where='/', name='position_matrix', atom=int_atom, shape=[0] + tensor_shape, + filters=utils.FILTERS) + table_file.create_earray(where='/', name='position', atom=string_atom, shape=(0, 1), filters=utils.FILTERS) + table_file.create_earray(where='/', name='label', atom=int_atom, shape=(0, param.label_size), filters=utils.FILTERS) + table_file.create_earray(where='/', name='alt_info', atom=long_string_atom, shape=(0, 1), filters=utils.FILTERS) + + table_dict = utils.update_table_dict() + total_compressed = 0 + + for f in in_fn_list: + print("[INFO] Merging file {}".format(f)) + fi = tables.open_file(f, model='r') + assert (len(fi.root.label) == len(fi.root.position) == len(fi.root.position_matrix) == len(fi.root.alt_info)) + for index in range(len(fi.root.label)): + table_dict['label'].append(fi.root.label[index]) + table_dict['position'].append(fi.root.position[index]) + table_dict['position_matrix'].append(fi.root.position_matrix[index]) + table_dict['alt_info'].append(fi.root.alt_info[index]) + + total_compressed += 1 + + if total_compressed % 500 == 0 and total_compressed > 0: + table_dict = utils.write_table_file(table_file, table_dict, tensor_shape, param.label_size, float_type) + + if total_compressed % 50000 == 0: + print("[INFO] Compressed %d tensor" % (total_compressed), file=sys.stderr) + fi.close() + + if total_compressed % 500 != 0 and total_compressed > 0: + table_dict = utils.write_table_file(table_file, table_dict, tensor_shape, param.label_size, float_type) + print("[INFO] Compressed %d tensor" % (total_compressed), file=sys.stderr) + + table_file.close() + + +def main(): + parser = ArgumentParser(description="Combine tensor binaries into a single file") + + parser.add_argument('in_fn', type=str, nargs='+', + help="Tensor input files, required") + + parser.add_argument('--out_fn', type=str, default=None, required=True, + help="Output a binary tensor file, required") + + parser.add_argument('--platform', type=str, default="ont", + help="Sequencing platform of the input. Options: 'ont,hifi,ilmn', default: %(default)s") + + ## In pileup mode or not (full alignment mode), default: False + parser.add_argument('--pileup', action='store_true', + help=SUPPRESS) + + args = parser.parse_args() + + if len(sys.argv[1:]) == 0: + parser.print_help() + sys.exit(1) + + Run(args) + + +if __name__ == "__main__": + main() + + diff --git a/preprocess/Tensor2Bin.py b/preprocess/Tensor2Bin.py index 6b34088..bf86ca0 100644 --- a/preprocess/Tensor2Bin.py +++ b/preprocess/Tensor2Bin.py @@ -34,8 +34,11 @@ def main(): parser.add_argument('--platform', type=str, default="ont", help="Sequencing platform of the input. Options: 'ont,hifi,ilmn', default: %(default)s") - parser.add_argument('--tensor_fn', type=str, default=None, required=True, - help="Tensor input, required") + parser.add_argument('--tensor_fn', type=str, default="PIPE", + help="Tensor input") + + parser.add_argument('--candidate_details_fn_prefix', type=str, default=None, + help="Candidate details input (unused, retained for compatibility)") parser.add_argument('--var_fn', type=str, default=None, required=True, help="Truth variants list input, required") @@ -69,10 +72,6 @@ def main(): parser.add_argument('--maximum_non_variant_ratio', type=float, default=None, help=SUPPRESS) - ## Path to the variant candidate details - parser.add_argument('--candidate_details_fn_prefix', type=str, default=None, - help=SUPPRESS) - args = parser.parse_args() if len(sys.argv[1:]) == 0: diff --git a/preprocess/UnifyRepresentation.py b/preprocess/UnifyRepresentation.py index c386bc4..92a2ca6 100644 --- a/preprocess/UnifyRepresentation.py +++ b/preprocess/UnifyRepresentation.py @@ -3,16 +3,46 @@ import itertools import json import shlex +import signal +import sys import os from collections import Counter from argparse import ArgumentParser, SUPPRESS from collections import defaultdict -from sys import stderr from subprocess import PIPE, Popen +from shared.command_options import ( + CommandOption, + CommandOptionWithNoValue, + ExecuteCommand, + command_string_from, + command_option_from +) +from shared.utils import file_path_from, executable_command_string_from, subprocess_popen, str2bool, log_warning from shared.interval_tree import bed_tree_from, is_region_in from shared.utils import subprocess_popen, region_from, reference_sequence_from +import shared.param_p as param + +class InstancesClass(object): + def __init__(self): + self.create_tensor = None + + def poll(self): + self.create_tensor.poll() + + +c = InstancesClass() + +def check_return_code(signum, frame): + c.poll() + if c.create_tensor.returncode != None and c.create_tensor.returncode != 0: + c.compress_tensor.kill() + sys.exit("CreateTensor.py exited with exceptions. Exiting...") + + if c.create_tensor.returncode == None: + signal.alarm(5) + extended_window_size = 200 region_size = 50000 @@ -20,8 +50,8 @@ extend_bp = 100 reference_allele_gap = 0.9 -def subprocess_popen(args, stdin=None, stdout=PIPE, stderr=stderr, bufsize=8388608): - return Popen(args, stdin=stdin, stdout=stdout, stderr=stderr, bufsize=bufsize, universal_newlines=True) +def subprocess_popen(args, stdin=None, stdout=PIPE, stderr=sys.stderr, bufsize=8388608): + return Popen(args, stdin=stdin, stdout=stdout, stderr=sys.stderr, bufsize=bufsize, universal_newlines=True) class Reference(object): """ @@ -873,7 +903,8 @@ def unify_label(self, variants, truths, region, ctg_start, ctg_end, all_pos, var for truth in all_truths: pos = truth.start # add missing low-confident tp position - if not (pos >= split_start and pos < split_end) or not (pos >= ctg_start and pos < ctg_end): + if not (pos >= split_start and pos < split_end) or (ctg_start is not None and ctg_end is not None + and not (pos >= ctg_start and pos < ctg_end)): continue if pos in alt_dict and pos in variant_dict: ref_base = variant_dict[pos].reference_bases @@ -950,7 +981,8 @@ def unify_label(self, variants, truths, region, ctg_start, ctg_end, all_pos, var for idx, (pos, raw_genotype, truth_genotype) in enumerate( zip(match_pairs.truths_pos_list, match_pairs.raw_genotypes, match_pairs.truth_genotypes)): - if not (pos >= split_start and pos < split_end) or not (pos >= ctg_start and pos < ctg_end): + if not (pos >= split_start and pos < split_end) or (ctg_start is not None and ctg_end is not None + and not (pos >= ctg_start and pos < ctg_end)): continue if truth_genotype == (0, 0) and sum(raw_genotype) > 0:# miss genoytpe @@ -974,6 +1006,23 @@ def unify_label(self, variants, truths, region, ctg_start, ctg_end, all_pos, var rescue_dict[pos] = "%s\t%d\t.\t%s\t%s\t%d\t%s\t%s\tGT:GQ:DP:AF\t%s:%d:%d:%.4f" % ( self.contig_name, pos, ref_base, variant, 10, 'PASS', '.', genotype_string, 10, 10, 0.5) + +def parse_candidates_file(candidate_details_fn, contig_name=None): + candidate_details_list = [] + unzip_process = subprocess_popen(shlex.split("gzip -fdc %s" % candidate_details_fn)) + for row in unzip_process.stdout: + candidate_details_list.append(row) + return candidate_details_list + + +def parse_line(row): + row = row.strip().split('\t') # ['chr_pos', 'depth', 'cigar_count'] + chr_pos, depth, var_read_json = row[:3] + ctg_name, pos = chr_pos.split() + pos, depth = int(pos), int(depth) + return (ctg_name, pos, depth, var_read_json) + + def UnifyRepresentation(args): """ @@ -989,7 +1038,7 @@ def UnifyRepresentation(args): """ sample_name = args.sampleName var_fn = args.var_fn # true vcf var - candidate_details_fn_prefix = args.candidate_details_fn_prefix + candidate_details_fn = args.candidate_details_fn contig_name = args.ctgName ctg_start = args.ctgStart ctg_end = args.ctgEnd @@ -1002,42 +1051,77 @@ def UnifyRepresentation(args): max_calculate_count = args.max_calculate_count subsample_ratio = args.subsample_ratio platform = args.platform + chunk_id = args.chunk_id + chunk_num = args.chunk_num global test_pos test_pos = None alt_dict = defaultdict() - candidate_details_list = [] read_name_info_dict = defaultdict(Read) - var_read_prefix = '/'.join(candidate_details_fn_prefix.split('/')[:-1]) - var_read_p = candidate_details_fn_prefix.split('/')[-1] - for f in os.listdir(var_read_prefix): - if not f.startswith(var_read_p): - continue - unzip_process = subprocess_popen(shlex.split("gzip -fdc %s" % (os.path.join(var_read_prefix, f)))) - for row in unzip_process.stdout: - row = row.strip().split('\t') # ['chr_pos', 'depth', 'cigar_count'] - chr_pos, depth, var_read_json = row[:3] - ctg_name, pos = chr_pos.split() - pos, depth = int(pos), int(depth) - if contig_name and contig_name != ctg_name: - continue - - candidate_details_list.append((pos, depth, var_read_json)) + if candidate_details_fn is None: + basedir = os.path.dirname(__file__) + CTFA_Bin = basedir + "/../clair3.py CreateTensorFullAlignment" + pypyBin = executable_command_string_from(args.pypy, exit_on_not_found=True) + bam_fn = file_path_from(args.bam_fn, exit_on_not_found=True) + ref_fn = file_path_from(args.ref_fn, exit_on_not_found=True) + vcf_fn = file_path_from(args.vcf_fn) + extend_bed = file_path_from(args.extend_bed) + min_af = args.min_af + ctgStart, ctgEnd = None, None + if ctg_start is not None and ctg_end is not None and int(ctg_start) <= int(ctg_end): + ctgStart = CommandOption('ctgStart', ctg_start) + ctgEnd = CommandOption('ctgEnd', ctg_end) + chunkId, chunkNum = None, None + if chunk_id is not None and chunk_num is not None and int(chunk_id) <= int(chunk_num): + chunkId = CommandOption('chunk_id', chunk_id) + chunkNum = CommandOption('chunk_num', chunk_num) + + create_tensor_command_options = [ + pypyBin, + CTFA_Bin, + CommandOption('bam_fn', bam_fn), + CommandOption('ref_fn', ref_fn), + CommandOption('vcf_fn', vcf_fn), + CommandOption('ctgName', contig_name), + CommandOption('platform', platform), + CommandOption('bed_fn', bed_fn), + CommandOption('extend_bed', extend_bed), + ctgStart, + ctgEnd, + chunkId, + chunkNum, + CommandOptionWithNoValue('unify_repre'), + CommandOptionWithNoValue('phasing_info_in_bam'), + CommandOption('unify_repre_fn', 'PIPE') + ] + if min_af is not None: + create_tensor_command_options.append(CommandOption('min_af', min_af)) + else: + candidate_details_list = [] + if os.path.exists(candidate_details_fn): + candidate_details_list = parse_candidates_file(candidate_details_fn, contig_name) + else: + directory, prefix = os.path.split(candidate_details_fn) + for f in os.listdir(directory): + if not f.startswith(prefix): + continue + candidate_details_list += parse_candidates_file(os.path.join(directory, f), contig_name) - candidate_details_list = sorted(candidate_details_list, key=lambda x: x[0]) + candidate_details_list = sorted(candidate_details_list, key=lambda x: x[0]) - if not len(candidate_details_list): - return + if not len(candidate_details_list): + return - if ctg_start is None or ctg_end is None: - alt_start, alt_end = candidate_details_list[0][0], candidate_details_list[-1][0] - chunk_id = args.chunk_id - 1 # 1-base to 0-base - chunk_num = args.chunk_num - chunk_size = (alt_end - alt_start) // chunk_num + 1 - ctg_start = alt_start + chunk_size * chunk_id - ctg_end = ctg_start + chunk_size + if ctg_start is None or ctg_end is None: + alt_start = parse_line(candidate_details_list[0])[1] + alt_end = parse_line(candidate_details_list[-1])[1] + chunk_id = args.chunk_id - 1 # 1-base to 0-base + chunk_num = args.chunk_num + chunk_size = (alt_end - alt_start) // chunk_num + 1 + ctg_start = alt_start + chunk_size * chunk_id + ctg_end = ctg_start + chunk_size is_ctg_name_given = contig_name is not None is_ctg_range_given = is_ctg_name_given and ctg_start is not None and ctg_end is not None @@ -1067,7 +1151,8 @@ def UnifyRepresentation(args): if contig_name and contig_name != ctg_name: continue pos = int(columns[1]) - if pos < ctg_start - extended_window_size or pos > ctg_end + extended_window_size: + if ctg_start is not None and ctg_end is not None and \ + (pos < ctg_start - extended_window_size or pos > ctg_end + extended_window_size): continue ref_base = columns[2] alt_base = columns[3] @@ -1079,9 +1164,27 @@ def UnifyRepresentation(args): genotype1=genotype1, genotype2=genotype2) - for row in candidate_details_list: - pos, depth, var_read_json = row - if pos < ctg_start - extended_window_size or pos > ctg_end + extended_window_size: + if candidate_details_fn is None: + try: + c.create_tensor = subprocess_popen( + shlex.split(command_string_from(create_tensor_command_options)) + ) + candidate_source = c.create_tensor.stdout + + signal.signal(signal.SIGALRM, check_return_code) + signal.alarm(2) + except Exception as e: + print(e, file=sys.stderr) + sys.exit("Failed to start required processes. Exiting...") + else: + candidate_source = candidate_details_list + + for row in candidate_source: + ctg_name, pos, depth, var_read_json = parse_line(row) + if contig_name != ctg_name: + continue + if ctg_start is not None and ctg_end is not None and \ + (pos < ctg_start - extended_window_size or pos > ctg_end + extended_window_size): continue var_read_dict = json.loads(var_read_json) @@ -1160,11 +1263,16 @@ def UnifyRepresentation(args): continue read.end = max([item[1] for item in read.seq]) if len(read.seq) else None + if candidate_details_fn is None: + c.create_tensor.stdout.close() + c.create_tensor.wait() + signal.alarm(0) + if not len(alt_dict) or not len(variant_dict): return - region_start = ctg_start - region_end = ctg_end + region_start = ctg_start if ctg_start else min(alt_dict.keys()) + region_end = ctg_end if ctg_end else max(alt_dict.keys()) rescue_dict = defaultdict() all_pos = set() @@ -1274,8 +1382,8 @@ def main(): parser.add_argument('--ref_fn', type=str, default="ref.fa", help="Reference fasta file input, default: %(default)s") - parser.add_argument('--candidate_details_fn_prefix', type=str, default=None, - help="All read-level candidate details of provided contig, default: %(default)s") + parser.add_argument('--candidate_details_fn', type=str, default=None, + help="Read-level candidate details file, default: %(default)s") parser.add_argument('--output_vcf_fn', type=str, default=None, help="VCF output filename or stdout if not set,default: %(default)s") @@ -1308,6 +1416,12 @@ def main(): parser.add_argument('--bed_fn', type=str, default=None, help="Candidate sites VCF file input, if provided, will choose candidate +/- 1 or +/- 2. Use together with gen4Training. default: %(default)s") + parser.add_argument('--vcf_fn', type=str, default=None, + help="Candidate sites VCF file input, if provided, will choose candidate +/- 1 or +/- 2. Use together with gen4Training. default: %(default)s") + + parser.add_argument('--extend_bed', type=str, default=None, + help=SUPPRESS) + # options for internal process control ## Subsample ratio tag for sub-sampled BAM file parser.add_argument('--subsample_ratio', type=int, default=1000, @@ -1325,6 +1439,15 @@ def main(): parser.add_argument('--chunk_id', type=int, default=None, help=SUPPRESS) + parser.add_argument('--min_af', type=float, default=None, + help=SUPPRESS) + + parser.add_argument('--bam_fn', type=str, default=None, + help=SUPPRESS) + + parser.add_argument('--pypy', type=str, default="pypy3", + help=SUPPRESS) + args = parser.parse_args() UnifyRepresentation(args)