Skip to content

Commit

Permalink
prevent users from requesting g.vcf.gz in Spark (#4277)
Browse files Browse the repository at this point in the history
* prevent users from requesting g.vcf.gz in Spark

* this is currently broken, see #4274
* add a check to HaplotypeCallerSpark and VariantSparkSink and throw a clear exception in this case
* added test for GVCF writing in VariantSparkSink which previously didn't exist
* added new UserException.UnimplementedFeature class
* closes #4275
  • Loading branch information
lbergelson authored and droazen committed Jan 30, 2018
1 parent 668a6ec commit d6e7635
Show file tree
Hide file tree
Showing 5 changed files with 133 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

import com.google.common.base.Joiner;
import com.google.common.base.Splitter;
import htsjdk.samtools.SAMFileHeader;
import htsjdk.samtools.SAMRecord;
import htsjdk.samtools.util.IOUtil;
import htsjdk.tribble.AbstractFeatureReader;
import htsjdk.variant.variantcontext.VariantContext;
import htsjdk.variant.variantcontext.writer.VariantContextWriter;
import htsjdk.variant.vcf.VCFHeader;
Expand All @@ -21,6 +21,7 @@
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.broadcast.Broadcast;
import org.broadinstitute.hellbender.exceptions.UserException;
import org.broadinstitute.hellbender.utils.gcs.BucketUtils;
import org.broadinstitute.hellbender.utils.variant.HomoSapiensConstants;
import org.broadinstitute.hellbender.utils.variant.writers.GVCFWriter;
Expand All @@ -29,7 +30,6 @@
import org.seqdoop.hadoop_bam.util.VCFFileMerger;
import scala.Tuple2;

import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Comparator;
Expand Down Expand Up @@ -100,7 +100,7 @@ public RecordWriter<NullWritable, VariantContextWritable> getRecordWriter(TaskAt
WRITE_HEADER_PROPERTY, true);

switch (format) {
case BCF: return new KeyIgnoringBCFRecordWriter<NullWritable>(out,header,wh,ctx);
case BCF: return new KeyIgnoringBCFRecordWriter<NullWritable>(out, header, wh, ctx);
case VCF: return new GvcfKeyIgnoringVCFRecordWriter<NullWritable>(out,header,wh,ctx);
default: throw new IllegalStateException("Unrecognized variant format: " + format);
}
Expand Down Expand Up @@ -178,6 +178,12 @@ private static void writeVariantsSingle(
final VCFHeader header, final boolean writeGvcf, final List<Integer> gqPartitions, final int defaultPloidy, final int numReducers) throws IOException {

final Configuration conf = ctx.hadoopConfiguration();

//TODO remove me when https://github.com/broadinstitute/gatk/issues/4274 and https://github.com/broadinstitute/gatk/issues/4303 are fixed
if (writeGvcf && (AbstractFeatureReader.hasBlockCompressedExtension(outputFile) || outputFile.endsWith(IOUtil.BCF_FILE_EXTENSION))) {
throw new UserException.UnimplementedFeature("It is currently not possible to write a compressed g.vcf or any g.bcf on spark. See https://github.com/broadinstitute/gatk/issues/4274 and https://github.com/broadinstitute/gatk/issues/4303 for more details .");
}

if (outputFile.endsWith(BGZFCodec.DEFAULT_EXTENSION) || outputFile.endsWith(".gz")) {
conf.setBoolean(FileOutputFormat.COMPRESS, true);
conf.setClass(FileOutputFormat.COMPRESS_CODEC, BGZFCodec.class, CompressionCodec.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -427,4 +427,12 @@ public CouldNotIndexFile(final File file, final Exception e) {
file.getAbsolutePath(), e.getClass().getCanonicalName(), e.getMessage()), e);
}
}

public static final class UnimplementedFeature extends UserException {
private static final long serialVersionUID = 0L;

public UnimplementedFeature(String message){
super(message);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import htsjdk.samtools.SAMSequenceDictionary;
import htsjdk.samtools.reference.ReferenceSequence;
import htsjdk.samtools.reference.ReferenceSequenceFile;
import htsjdk.samtools.util.IOUtil;
import htsjdk.tribble.AbstractFeatureReader;
import htsjdk.variant.variantcontext.VariantContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
Expand Down Expand Up @@ -134,6 +136,13 @@ public boolean requiresReference(){

@Override
protected void runTool(final JavaSparkContext ctx) {
//TODO remove me when https://github.com/broadinstitute/gatk/issues/4274 and https://github.com/broadinstitute/gatk/issues/4303 are fixed
if(hcArgs.emitReferenceConfidence == ReferenceConfidenceMode.GVCF
&& (AbstractFeatureReader.hasBlockCompressedExtension(output) || output.endsWith(IOUtil.BCF_FILE_EXTENSION))) {
throw new UserException.UnimplementedFeature("It is currently not possible to write a compressed g.vcf or g.bcf from HaplotypeCallerSpark. " +
"See https://github.com/broadinstitute/gatk/issues/4274 and https://github.com/broadinstitute/gatk/issues/4303 for more details.");
}

logger.info("********************************************************************************");
logger.info("The output of this tool DOES NOT match the output of HaplotypeCaller. ");
logger.info("It is under development and should not be used for production work. ");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,25 @@
package org.broadinstitute.hellbender.engine.spark.datasources;

import com.google.common.io.Files;
import htsjdk.samtools.SAMSequenceDictionary;
import htsjdk.samtools.SAMSequenceRecord;
import htsjdk.samtools.seekablestream.SeekablePathStream;
import htsjdk.samtools.seekablestream.SeekableStream;
import htsjdk.variant.variantcontext.Allele;
import htsjdk.variant.variantcontext.GenotypeBuilder;
import htsjdk.variant.variantcontext.VariantContext;
import htsjdk.variant.vcf.VCFHeader;
import htsjdk.variant.variantcontext.VariantContextBuilder;
import htsjdk.variant.vcf.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.broadinstitute.hellbender.CommandLineProgramTest;
import org.broadinstitute.hellbender.engine.spark.SparkContextFactory;
import org.broadinstitute.hellbender.exceptions.UserException;
import org.broadinstitute.hellbender.tools.IndexFeatureFile;
import org.broadinstitute.hellbender.utils.gcs.BucketUtils;
import org.broadinstitute.hellbender.utils.io.IOUtils;
import org.broadinstitute.hellbender.GATKBaseTest;
Expand All @@ -27,11 +34,12 @@

import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.*;

import static org.testng.Assert.assertEquals;

public final class VariantsSparkSinkUnitTest extends GATKBaseTest {
private static final String SAMPLE = "sample";
private final String outputFileName = getClass().getSimpleName();
private MiniDFSCluster cluster;

Expand Down Expand Up @@ -84,6 +92,81 @@ public void testWritingToFileURL(String vcf, String outputFileExtension) throws
assertSingleShardedWritingWorks(vcf, outputUrl);
}


@DataProvider
public static Object[][] brokenGVCFCases() {
return new Object[][]{
{"g.vcf.gz"},
{"g.bcf"},
{"g.bcf.gz"}
};
}

@Test(dataProvider = "brokenGVCFCases", expectedExceptions = UserException.UnimplementedFeature.class)
public void testBrokenGVCFCasesAreDisallowed(String extension) throws IOException {
JavaSparkContext ctx = SparkContextFactory.getTestSparkContext();
VariantsSparkSink.writeVariants(ctx, createTempFile("test", extension).toString(), null,
new VCFHeader(), true, Arrays.asList(1, 2, 4, 5), 2, 1 );
}

@DataProvider
public Object[][] gvcfCases(){
return new Object[][]{
{true, ".g.vcf"},
{false, ".vcf"},
{false, ".vcf.gz"},
{false, ".bcf"},
{false, ".bcf.gz"},
// {true, "g.vcf.gz"}, TODO enable this when https://github.com/broadinstitute/gatk/issues/4274 is resolved
// {true, ".g.bcf"}, TODO enable these when https://github.com/broadinstitute/gatk/issues/4303 is resolved
// {true, ".g.bcf.gz"}
};
}

@Test(dataProvider = "gvcfCases")
public void testEnableDisableGVCFWriting(boolean writeGvcf, String extension) throws IOException {
List<VariantContext> vcs = new ArrayList<>();
for(int i = 1; i <= 10; i++) {
final Allele A = Allele.create("A", true);
final VariantContext vc = new VariantContextBuilder("hand crafted", "1", i, i,
Arrays.asList(A, Allele.NON_REF_ALLELE))
.genotypes(new GenotypeBuilder(SAMPLE).alleles(Arrays.asList(A, A)).DP(10).GQ(10).PL(new int[]{0, 60, 10}).make())
.make();
vcs.add(vc);
}

final JavaSparkContext ctx = SparkContextFactory.getTestSparkContext();
final File output = createTempFile(outputFileName, extension);
VariantsSparkSink.writeVariants(ctx, output.toString(), ctx.parallelize(vcs), getHeader(), writeGvcf, Arrays.asList(100), 2, 1 );

new CommandLineProgramTest(){
@Override
public String getTestedToolName(){
return IndexFeatureFile.class.getSimpleName();
}
}.runCommandLine(new String[]{"-F", output.getAbsolutePath()});

final List<VariantContext> writtenVcs = readVariants(output.toString());
//if we are actually writing a gvcf, all the variant blocks will be merged into a single homref block with
Assert.assertEquals(writtenVcs.size(), writeGvcf ? 1 : 10);
Assert.assertEquals(writtenVcs.stream().mapToInt(VariantContext::getStart).min().getAsInt(), 1);
Assert.assertEquals(writtenVcs.stream().mapToInt(VariantContext::getEnd).max().getAsInt(), 10);

}

private static VCFHeader getHeader() {
final Set<VCFHeaderLine> headerlines = new LinkedHashSet<>();
VCFStandardHeaderLines.addStandardFormatLines(headerlines, true,
VCFConstants.GENOTYPE_KEY,
VCFConstants.GENOTYPE_QUALITY_KEY,
VCFConstants.GENOTYPE_PL_KEY, VCFConstants.DEPTH_KEY);
final SAMSequenceDictionary dict = new SAMSequenceDictionary(
Collections.singletonList(new SAMSequenceRecord("1", 100)));
final VCFHeader header = new VCFHeader(headerlines, Collections.singleton(SAMPLE));
header.setSequenceDictionary(dict);
return header;
}

private void assertSingleShardedWritingWorks(String vcf, String outputPath) throws IOException {
JavaSparkContext ctx = SparkContextFactory.getTestSparkContext();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.broadinstitute.hellbender.GATKBaseTest;
import org.broadinstitute.hellbender.utils.test.SparkTestUtils;
import org.testng.Assert;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

import java.io.File;
Expand Down Expand Up @@ -127,6 +128,26 @@ public void testGVCFModeIsConcordantWithGATK3_8Results() throws Exception {
Assert.assertTrue(concordance >= 0.99, "Concordance with GATK 3.8 in GVCF mode is < 99% (" + concordance + ")");
}

@DataProvider
public static Object[][] brokenGVCFCases() {
return new Object[][]{
{"g.vcf.gz"},
{"g.bcf"},
{"g.bcf.gz"}
};
}

@Test(dataProvider = "brokenGVCFCases", expectedExceptions = UserException.UnimplementedFeature.class)
public void testBrokenGVCFConfigurationsAreDisallowed(String extension) {
final String[] args = {
"-I", NA12878_20_21_WGS_bam,
"-R", b37_2bit_reference_20_21,
"-O", createTempFile("testGVCF_GZ_throw_exception", extension).getAbsolutePath(),
"-ERC", "GVCF",
};

runCommandLine(args);
}

@Test
public void testGVCFModeIsConcordantWithGATK3_8AlelleSpecificResults() throws Exception {
Expand Down

0 comments on commit d6e7635

Please sign in to comment.