From d6e7635897e1a6a773af5684511e2358d369af94 Mon Sep 17 00:00:00 2001 From: Louis Bergelson Date: Tue, 30 Jan 2018 10:10:41 -0500 Subject: [PATCH] prevent users from requesting g.vcf.gz in Spark (#4277) * prevent users from requesting g.vcf.gz in Spark * this is currently broken, see https://github.com/broadinstitute/gatk/issues/4274 * add a check to HaplotypeCallerSpark and VariantSparkSink and throw a clear exception in this case * added test for GVCF writing in VariantSparkSink which previously didn't exist * added new UserException.UnimplementedFeature class * closes https://github.com/broadinstitute/gatk/issues/4275 --- .../spark/datasources/VariantsSparkSink.java | 14 ++- .../hellbender/exceptions/UserException.java | 8 ++ .../tools/HaplotypeCallerSpark.java | 9 ++ .../VariantsSparkSinkUnitTest.java | 87 ++++++++++++++++++- .../HaplotypeCallerSparkIntegrationTest.java | 21 +++++ 5 files changed, 133 insertions(+), 6 deletions(-) diff --git a/src/main/java/org/broadinstitute/hellbender/engine/spark/datasources/VariantsSparkSink.java b/src/main/java/org/broadinstitute/hellbender/engine/spark/datasources/VariantsSparkSink.java index 1af2b5c6a79..f12bc9fd4a7 100644 --- a/src/main/java/org/broadinstitute/hellbender/engine/spark/datasources/VariantsSparkSink.java +++ b/src/main/java/org/broadinstitute/hellbender/engine/spark/datasources/VariantsSparkSink.java @@ -2,8 +2,8 @@ import com.google.common.base.Joiner; import com.google.common.base.Splitter; -import htsjdk.samtools.SAMFileHeader; -import htsjdk.samtools.SAMRecord; +import htsjdk.samtools.util.IOUtil; +import htsjdk.tribble.AbstractFeatureReader; import htsjdk.variant.variantcontext.VariantContext; import htsjdk.variant.variantcontext.writer.VariantContextWriter; import htsjdk.variant.vcf.VCFHeader; @@ -21,6 +21,7 @@ import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.broadcast.Broadcast; +import org.broadinstitute.hellbender.exceptions.UserException; import org.broadinstitute.hellbender.utils.gcs.BucketUtils; import org.broadinstitute.hellbender.utils.variant.HomoSapiensConstants; import org.broadinstitute.hellbender.utils.variant.writers.GVCFWriter; @@ -29,7 +30,6 @@ import org.seqdoop.hadoop_bam.util.VCFFileMerger; import scala.Tuple2; -import java.io.File; import java.io.IOException; import java.io.OutputStream; import java.util.Comparator; @@ -100,7 +100,7 @@ public RecordWriter getRecordWriter(TaskAt WRITE_HEADER_PROPERTY, true); switch (format) { - case BCF: return new KeyIgnoringBCFRecordWriter(out,header,wh,ctx); + case BCF: return new KeyIgnoringBCFRecordWriter(out, header, wh, ctx); case VCF: return new GvcfKeyIgnoringVCFRecordWriter(out,header,wh,ctx); default: throw new IllegalStateException("Unrecognized variant format: " + format); } @@ -178,6 +178,12 @@ private static void writeVariantsSingle( final VCFHeader header, final boolean writeGvcf, final List gqPartitions, final int defaultPloidy, final int numReducers) throws IOException { final Configuration conf = ctx.hadoopConfiguration(); + + //TODO remove me when https://github.com/broadinstitute/gatk/issues/4274 and https://github.com/broadinstitute/gatk/issues/4303 are fixed + if (writeGvcf && (AbstractFeatureReader.hasBlockCompressedExtension(outputFile) || outputFile.endsWith(IOUtil.BCF_FILE_EXTENSION))) { + throw new UserException.UnimplementedFeature("It is currently not possible to write a compressed g.vcf or any g.bcf on spark. See https://github.com/broadinstitute/gatk/issues/4274 and https://github.com/broadinstitute/gatk/issues/4303 for more details ."); + } + if (outputFile.endsWith(BGZFCodec.DEFAULT_EXTENSION) || outputFile.endsWith(".gz")) { conf.setBoolean(FileOutputFormat.COMPRESS, true); conf.setClass(FileOutputFormat.COMPRESS_CODEC, BGZFCodec.class, CompressionCodec.class); diff --git a/src/main/java/org/broadinstitute/hellbender/exceptions/UserException.java b/src/main/java/org/broadinstitute/hellbender/exceptions/UserException.java index 70c1e0959a7..5c17b950755 100644 --- a/src/main/java/org/broadinstitute/hellbender/exceptions/UserException.java +++ b/src/main/java/org/broadinstitute/hellbender/exceptions/UserException.java @@ -427,4 +427,12 @@ public CouldNotIndexFile(final File file, final Exception e) { file.getAbsolutePath(), e.getClass().getCanonicalName(), e.getMessage()), e); } } + + public static final class UnimplementedFeature extends UserException { + private static final long serialVersionUID = 0L; + + public UnimplementedFeature(String message){ + super(message); + } + } } diff --git a/src/main/java/org/broadinstitute/hellbender/tools/HaplotypeCallerSpark.java b/src/main/java/org/broadinstitute/hellbender/tools/HaplotypeCallerSpark.java index 6effbae499f..86baa4e30ac 100644 --- a/src/main/java/org/broadinstitute/hellbender/tools/HaplotypeCallerSpark.java +++ b/src/main/java/org/broadinstitute/hellbender/tools/HaplotypeCallerSpark.java @@ -5,6 +5,8 @@ import htsjdk.samtools.SAMSequenceDictionary; import htsjdk.samtools.reference.ReferenceSequence; import htsjdk.samtools.reference.ReferenceSequenceFile; +import htsjdk.samtools.util.IOUtil; +import htsjdk.tribble.AbstractFeatureReader; import htsjdk.variant.variantcontext.VariantContext; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; @@ -134,6 +136,13 @@ public boolean requiresReference(){ @Override protected void runTool(final JavaSparkContext ctx) { + //TODO remove me when https://github.com/broadinstitute/gatk/issues/4274 and https://github.com/broadinstitute/gatk/issues/4303 are fixed + if(hcArgs.emitReferenceConfidence == ReferenceConfidenceMode.GVCF + && (AbstractFeatureReader.hasBlockCompressedExtension(output) || output.endsWith(IOUtil.BCF_FILE_EXTENSION))) { + throw new UserException.UnimplementedFeature("It is currently not possible to write a compressed g.vcf or g.bcf from HaplotypeCallerSpark. " + + "See https://github.com/broadinstitute/gatk/issues/4274 and https://github.com/broadinstitute/gatk/issues/4303 for more details."); + } + logger.info("********************************************************************************"); logger.info("The output of this tool DOES NOT match the output of HaplotypeCaller. "); logger.info("It is under development and should not be used for production work. "); diff --git a/src/test/java/org/broadinstitute/hellbender/engine/spark/datasources/VariantsSparkSinkUnitTest.java b/src/test/java/org/broadinstitute/hellbender/engine/spark/datasources/VariantsSparkSinkUnitTest.java index b8f32d03b55..39c16c22493 100644 --- a/src/test/java/org/broadinstitute/hellbender/engine/spark/datasources/VariantsSparkSinkUnitTest.java +++ b/src/test/java/org/broadinstitute/hellbender/engine/spark/datasources/VariantsSparkSinkUnitTest.java @@ -1,18 +1,25 @@ package org.broadinstitute.hellbender.engine.spark.datasources; import com.google.common.io.Files; +import htsjdk.samtools.SAMSequenceDictionary; +import htsjdk.samtools.SAMSequenceRecord; import htsjdk.samtools.seekablestream.SeekablePathStream; import htsjdk.samtools.seekablestream.SeekableStream; +import htsjdk.variant.variantcontext.Allele; +import htsjdk.variant.variantcontext.GenotypeBuilder; import htsjdk.variant.variantcontext.VariantContext; -import htsjdk.variant.vcf.VCFHeader; +import htsjdk.variant.variantcontext.VariantContextBuilder; +import htsjdk.variant.vcf.*; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; +import org.broadinstitute.hellbender.CommandLineProgramTest; import org.broadinstitute.hellbender.engine.spark.SparkContextFactory; import org.broadinstitute.hellbender.exceptions.UserException; +import org.broadinstitute.hellbender.tools.IndexFeatureFile; import org.broadinstitute.hellbender.utils.gcs.BucketUtils; import org.broadinstitute.hellbender.utils.io.IOUtils; import org.broadinstitute.hellbender.GATKBaseTest; @@ -27,11 +34,12 @@ import java.io.File; import java.io.IOException; -import java.util.List; +import java.util.*; import static org.testng.Assert.assertEquals; public final class VariantsSparkSinkUnitTest extends GATKBaseTest { + private static final String SAMPLE = "sample"; private final String outputFileName = getClass().getSimpleName(); private MiniDFSCluster cluster; @@ -84,6 +92,81 @@ public void testWritingToFileURL(String vcf, String outputFileExtension) throws assertSingleShardedWritingWorks(vcf, outputUrl); } + + @DataProvider + public static Object[][] brokenGVCFCases() { + return new Object[][]{ + {"g.vcf.gz"}, + {"g.bcf"}, + {"g.bcf.gz"} + }; + } + + @Test(dataProvider = "brokenGVCFCases", expectedExceptions = UserException.UnimplementedFeature.class) + public void testBrokenGVCFCasesAreDisallowed(String extension) throws IOException { + JavaSparkContext ctx = SparkContextFactory.getTestSparkContext(); + VariantsSparkSink.writeVariants(ctx, createTempFile("test", extension).toString(), null, + new VCFHeader(), true, Arrays.asList(1, 2, 4, 5), 2, 1 ); + } + + @DataProvider + public Object[][] gvcfCases(){ + return new Object[][]{ + {true, ".g.vcf"}, + {false, ".vcf"}, + {false, ".vcf.gz"}, + {false, ".bcf"}, + {false, ".bcf.gz"}, + // {true, "g.vcf.gz"}, TODO enable this when https://github.com/broadinstitute/gatk/issues/4274 is resolved + // {true, ".g.bcf"}, TODO enable these when https://github.com/broadinstitute/gatk/issues/4303 is resolved + // {true, ".g.bcf.gz"} + }; + } + + @Test(dataProvider = "gvcfCases") + public void testEnableDisableGVCFWriting(boolean writeGvcf, String extension) throws IOException { + List vcs = new ArrayList<>(); + for(int i = 1; i <= 10; i++) { + final Allele A = Allele.create("A", true); + final VariantContext vc = new VariantContextBuilder("hand crafted", "1", i, i, + Arrays.asList(A, Allele.NON_REF_ALLELE)) + .genotypes(new GenotypeBuilder(SAMPLE).alleles(Arrays.asList(A, A)).DP(10).GQ(10).PL(new int[]{0, 60, 10}).make()) + .make(); + vcs.add(vc); + } + + final JavaSparkContext ctx = SparkContextFactory.getTestSparkContext(); + final File output = createTempFile(outputFileName, extension); + VariantsSparkSink.writeVariants(ctx, output.toString(), ctx.parallelize(vcs), getHeader(), writeGvcf, Arrays.asList(100), 2, 1 ); + + new CommandLineProgramTest(){ + @Override + public String getTestedToolName(){ + return IndexFeatureFile.class.getSimpleName(); + } + }.runCommandLine(new String[]{"-F", output.getAbsolutePath()}); + + final List writtenVcs = readVariants(output.toString()); + //if we are actually writing a gvcf, all the variant blocks will be merged into a single homref block with + Assert.assertEquals(writtenVcs.size(), writeGvcf ? 1 : 10); + Assert.assertEquals(writtenVcs.stream().mapToInt(VariantContext::getStart).min().getAsInt(), 1); + Assert.assertEquals(writtenVcs.stream().mapToInt(VariantContext::getEnd).max().getAsInt(), 10); + + } + + private static VCFHeader getHeader() { + final Set headerlines = new LinkedHashSet<>(); + VCFStandardHeaderLines.addStandardFormatLines(headerlines, true, + VCFConstants.GENOTYPE_KEY, + VCFConstants.GENOTYPE_QUALITY_KEY, + VCFConstants.GENOTYPE_PL_KEY, VCFConstants.DEPTH_KEY); + final SAMSequenceDictionary dict = new SAMSequenceDictionary( + Collections.singletonList(new SAMSequenceRecord("1", 100))); + final VCFHeader header = new VCFHeader(headerlines, Collections.singleton(SAMPLE)); + header.setSequenceDictionary(dict); + return header; + } + private void assertSingleShardedWritingWorks(String vcf, String outputPath) throws IOException { JavaSparkContext ctx = SparkContextFactory.getTestSparkContext(); diff --git a/src/test/java/org/broadinstitute/hellbender/tools/HaplotypeCallerSparkIntegrationTest.java b/src/test/java/org/broadinstitute/hellbender/tools/HaplotypeCallerSparkIntegrationTest.java index c01c252c553..dca0ce8b9b6 100644 --- a/src/test/java/org/broadinstitute/hellbender/tools/HaplotypeCallerSparkIntegrationTest.java +++ b/src/test/java/org/broadinstitute/hellbender/tools/HaplotypeCallerSparkIntegrationTest.java @@ -15,6 +15,7 @@ import org.broadinstitute.hellbender.GATKBaseTest; import org.broadinstitute.hellbender.utils.test.SparkTestUtils; import org.testng.Assert; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; import java.io.File; @@ -127,6 +128,26 @@ public void testGVCFModeIsConcordantWithGATK3_8Results() throws Exception { Assert.assertTrue(concordance >= 0.99, "Concordance with GATK 3.8 in GVCF mode is < 99% (" + concordance + ")"); } + @DataProvider + public static Object[][] brokenGVCFCases() { + return new Object[][]{ + {"g.vcf.gz"}, + {"g.bcf"}, + {"g.bcf.gz"} + }; + } + + @Test(dataProvider = "brokenGVCFCases", expectedExceptions = UserException.UnimplementedFeature.class) + public void testBrokenGVCFConfigurationsAreDisallowed(String extension) { + final String[] args = { + "-I", NA12878_20_21_WGS_bam, + "-R", b37_2bit_reference_20_21, + "-O", createTempFile("testGVCF_GZ_throw_exception", extension).getAbsolutePath(), + "-ERC", "GVCF", + }; + + runCommandLine(args); + } @Test public void testGVCFModeIsConcordantWithGATK3_8AlelleSpecificResults() throws Exception {