diff --git a/.travis.yml b/.travis.yml index fa64f55b8..6ab1db883 100644 --- a/.travis.yml +++ b/.travis.yml @@ -12,9 +12,9 @@ jobs: - sudo apt-get install cmake install: - # Download spark 3.0.0 - - "[ -f spark ] || mkdir spark && cd spark && wget https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop2.7-hive1.2.tgz && cd .." - - "tar -xf ./spark/spark-3.0.0-bin-hadoop2.7-hive1.2.tgz" - - "export SPARK_HOME=`pwd`/spark-3.0.0-bin-hadoop2.7-hive1.2" + - "[ -f spark ] || mkdir spark && cd spark && wget https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop2.7.tgz && cd .." + - "tar -xf ./spark/spark-3.0.0-bin-hadoop2.7.tgz" + - "export SPARK_HOME=`pwd`/spark-3.0.0-bin-hadoop2.7" before_script: - cd ${TRAVIS_BUILD_DIR}/dev - ./install_vmemcache.sh diff --git a/oap-cache/oap/pom.xml b/oap-cache/oap/pom.xml index e7d53188b..7445fe451 100644 --- a/oap-cache/oap/pom.xml +++ b/oap-cache/oap/pom.xml @@ -35,7 +35,7 @@ UTF-8 false ./ - 3.0.1-Hive1.2-SNAPSHOT + 3.0.0 2.12.10 2.12 1.8 @@ -46,7 +46,7 @@ 9.3.24.v20180605 1.8 - 1.5.9 + 1.5.10 1.6.0 @@ -97,10 +97,6 @@ false - - ossrh - https://oss.sonatype.org/content/repositories/snapshots - @@ -199,65 +195,65 @@ compile - com.intel.spark + org.apache.spark spark-core_${scala.binary.version} ${spark.internal.version} - com.intel.spark + org.apache.spark spark-core_${scala.binary.version} ${spark.internal.version} test-jar test - com.intel.spark + org.apache.spark spark-sql_${scala.binary.version} ${spark.internal.version} - com.intel.spark + org.apache.spark spark-sql_${scala.binary.version} ${spark.internal.version} test-jar test - com.intel.spark + org.apache.spark spark-catalyst_${scala.binary.version} ${spark.internal.version} - com.intel.spark + org.apache.spark spark-catalyst_${scala.binary.version} ${spark.internal.version} test-jar test - com.intel.spark + org.apache.spark spark-hive_${scala.binary.version} ${spark.internal.version} - com.intel.spark + org.apache.spark spark-hive_${scala.binary.version} ${spark.internal.version} test-jar test - com.intel.spark + org.apache.spark spark-hive-thriftserver_${scala.binary.version} ${spark.internal.version} - com.intel.spark + org.apache.spark spark-unsafe_${scala.binary.version} ${spark.internal.version} - com.intel.spark + org.apache.spark spark-unsafe_${scala.binary.version} ${spark.internal.version} test-jar @@ -317,7 +313,7 @@ org.apache.orc orc-core ${orc.version} - nohive + compile @@ -342,7 +338,7 @@ org.apache.orc orc-mapreduce ${orc.version} - nohive + compile @@ -690,7 +686,7 @@ src/main/spark3.0.0/java src/main/spark3.0.0/scala src/main/parquet1.10.1/java - src/main/orc-1.5.9-nohive/java + src/main/orc-1.5.10/java diff --git a/oap-cache/oap/src/main/java/org/apache/orc/impl/ColumnDiskRangeList.java b/oap-cache/oap/src/main/java/org/apache/orc/impl/ColumnDiskRangeList.java index c4dcaa7ee..b7c141894 100644 --- a/oap-cache/oap/src/main/java/org/apache/orc/impl/ColumnDiskRangeList.java +++ b/oap-cache/oap/src/main/java/org/apache/orc/impl/ColumnDiskRangeList.java @@ -17,7 +17,7 @@ package org.apache.orc.impl; -import org.apache.orc.storage.common.io.DiskRangeList; +import org.apache.hadoop.hive.common.io.DiskRangeList; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/oap-cache/oap/src/main/java/org/apache/orc/impl/RecordReaderCacheImpl.java b/oap-cache/oap/src/main/java/org/apache/orc/impl/RecordReaderCacheImpl.java index a369fc9f5..e95702368 100644 --- a/oap-cache/oap/src/main/java/org/apache/orc/impl/RecordReaderCacheImpl.java +++ b/oap-cache/oap/src/main/java/org/apache/orc/impl/RecordReaderCacheImpl.java @@ -29,6 +29,17 @@ import java.util.TimeZone; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.io.DiskRange; +import org.apache.hadoop.hive.common.io.DiskRangeList; +import org.apache.hadoop.hive.common.io.DiskRangeList.CreateHelper; +import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf; +import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; +import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.TruthValue; +import org.apache.hadoop.hive.ql.util.TimestampUtils; +import org.apache.hadoop.hive.serde2.io.DateWritable; +import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; import org.apache.hadoop.io.Text; import org.apache.orc.BooleanColumnStatistics; import org.apache.orc.ColumnStatistics; @@ -47,18 +58,6 @@ import org.apache.orc.StripeInformation; import org.apache.orc.TimestampColumnStatistics; import org.apache.orc.TypeDescription; -import org.apache.orc.storage.common.io.DiskRange; -import org.apache.orc.storage.common.io.DiskRangeList; -import org.apache.orc.storage.common.io.DiskRangeList.CreateHelper; -import org.apache.orc.storage.common.type.HiveDecimal; -//import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; -import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch; -import org.apache.orc.storage.ql.io.sarg.PredicateLeaf; -import org.apache.orc.storage.ql.io.sarg.SearchArgument; -import org.apache.orc.storage.ql.io.sarg.SearchArgument.TruthValue; -import org.apache.orc.storage.ql.util.TimestampUtils; -import org.apache.orc.storage.serde2.io.DateWritable; -import org.apache.orc.storage.serde2.io.HiveDecimalWritable; import org.apache.orc.util.BloomFilter; import org.apache.orc.util.BloomFilterIO; import org.slf4j.Logger; diff --git a/oap-cache/oap/src/main/java/org/apache/parquet/hadoop/VectorizedOapRecordReader.java b/oap-cache/oap/src/main/java/org/apache/parquet/hadoop/VectorizedOapRecordReader.java index 1acdd4bea..dcf123e3f 100644 --- a/oap-cache/oap/src/main/java/org/apache/parquet/hadoop/VectorizedOapRecordReader.java +++ b/oap-cache/oap/src/main/java/org/apache/parquet/hadoop/VectorizedOapRecordReader.java @@ -378,7 +378,7 @@ protected void initColumnReaders(PageReadStore pages) throws IOException { if (missingColumns[i]) continue; columnReaders[i] = new SkippableVectorizedColumnReader(columns.get(i), types.get(i).getOriginalType(), pages.getPageReader(columns.get(i)), - ZoneId.systemDefault(), true); + ZoneId.systemDefault(), "LEGACY"); } totalCountLoadedSoFar += pages.getRowCount(); } diff --git a/oap-cache/oap/src/main/java/org/apache/spark/sql/execution/datasources/oap/orc/OrcColumnarBatchReader.java b/oap-cache/oap/src/main/java/org/apache/spark/sql/execution/datasources/oap/orc/OrcColumnarBatchReader.java index d951e57b6..65c699fca 100644 --- a/oap-cache/oap/src/main/java/org/apache/spark/sql/execution/datasources/oap/orc/OrcColumnarBatchReader.java +++ b/oap-cache/oap/src/main/java/org/apache/spark/sql/execution/datasources/oap/orc/OrcColumnarBatchReader.java @@ -22,15 +22,15 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.hadoop.hive.ql.exec.vector.*; +import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; import org.apache.orc.*; import org.apache.orc.impl.DataReaderProperties; import org.apache.orc.impl.ReaderImpl; import org.apache.orc.impl.RecordReaderBinaryCacheImpl; import org.apache.orc.impl.RecordReaderBinaryCacheUtils; import org.apache.orc.mapred.OrcInputFormat; -import org.apache.orc.storage.common.type.HiveDecimal; -import org.apache.orc.storage.ql.exec.vector.*; -import org.apache.orc.storage.serde2.io.HiveDecimalWritable; import org.apache.spark.memory.MemoryMode; import org.apache.spark.sql.catalyst.InternalRow; diff --git a/oap-cache/oap/src/main/java/org/apache/spark/sql/execution/datasources/oap/orc/OrcMapreduceRecordReader.java b/oap-cache/oap/src/main/java/org/apache/spark/sql/execution/datasources/oap/orc/OrcMapreduceRecordReader.java index ad2767d6b..112cf0c2a 100644 --- a/oap-cache/oap/src/main/java/org/apache/spark/sql/execution/datasources/oap/orc/OrcMapreduceRecordReader.java +++ b/oap-cache/oap/src/main/java/org/apache/spark/sql/execution/datasources/oap/orc/OrcMapreduceRecordReader.java @@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.io.WritableComparable; import org.apache.orc.*; import org.apache.orc.impl.DataReaderProperties; @@ -31,7 +32,6 @@ import org.apache.orc.impl.RecordReaderBinaryCacheUtils; import org.apache.orc.mapred.OrcMapredRecordReader; import org.apache.orc.mapred.OrcStruct; -import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch; import org.apache.spark.sql.internal.oap.OapConf$; diff --git a/oap-cache/oap/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVectorAllocator.java b/oap-cache/oap/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVectorAllocator.java index 0041f8956..2c1724be5 100644 --- a/oap-cache/oap/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVectorAllocator.java +++ b/oap-cache/oap/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVectorAllocator.java @@ -17,7 +17,7 @@ package org.apache.spark.sql.execution.datasources.orc; -import org.apache.orc.storage.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; import org.apache.spark.sql.types.DataType; diff --git a/oap-cache/oap/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SkippableVectorizedColumnReader.java b/oap-cache/oap/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SkippableVectorizedColumnReader.java index 89027db20..e060f4f8a 100644 --- a/oap-cache/oap/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SkippableVectorizedColumnReader.java +++ b/oap-cache/oap/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SkippableVectorizedColumnReader.java @@ -61,7 +61,7 @@ public SkippableVectorizedColumnReader( OriginalType originalType, PageReader pageReader, ZoneId convertTz, - boolean rebaseDateTime + String rebaseDateTime ) throws IOException { super(descriptor, originalType, pageReader, convertTz, rebaseDateTime); diff --git a/oap-cache/oap/src/main/orc-1.5.9-nohive/java/org/apache/orc/impl/RecordReaderBinaryCacheImpl.java b/oap-cache/oap/src/main/orc-1.5.10/java/org/apache/orc/impl/RecordReaderBinaryCacheImpl.java similarity index 98% rename from oap-cache/oap/src/main/orc-1.5.9-nohive/java/org/apache/orc/impl/RecordReaderBinaryCacheImpl.java rename to oap-cache/oap/src/main/orc-1.5.10/java/org/apache/orc/impl/RecordReaderBinaryCacheImpl.java index 8fbb629f7..d526502c1 100644 --- a/oap-cache/oap/src/main/orc-1.5.9-nohive/java/org/apache/orc/impl/RecordReaderBinaryCacheImpl.java +++ b/oap-cache/oap/src/main/orc-1.5.10/java/org/apache/orc/impl/RecordReaderBinaryCacheImpl.java @@ -18,7 +18,7 @@ package org.apache.orc.impl; import org.apache.orc.*; -import org.apache.orc.storage.common.io.DiskRangeList; +import org.apache.hadoop.hive.common.io.DiskRangeList; import java.io.IOException; import java.util.HashMap; diff --git a/oap-cache/oap/src/main/orc-1.5.9-nohive/java/org/apache/orc/impl/RecordReaderBinaryCacheUtils.java b/oap-cache/oap/src/main/orc-1.5.10/java/org/apache/orc/impl/RecordReaderBinaryCacheUtils.java similarity index 98% rename from oap-cache/oap/src/main/orc-1.5.9-nohive/java/org/apache/orc/impl/RecordReaderBinaryCacheUtils.java rename to oap-cache/oap/src/main/orc-1.5.10/java/org/apache/orc/impl/RecordReaderBinaryCacheUtils.java index 67e4aed80..2de2e6d71 100644 --- a/oap-cache/oap/src/main/orc-1.5.9-nohive/java/org/apache/orc/impl/RecordReaderBinaryCacheUtils.java +++ b/oap-cache/oap/src/main/orc-1.5.10/java/org/apache/orc/impl/RecordReaderBinaryCacheUtils.java @@ -21,7 +21,7 @@ import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.Path; import org.apache.orc.DataReader; -import org.apache.orc.storage.common.io.DiskRangeList; +import org.apache.hadoop.hive.common.io.DiskRangeList; import org.apache.spark.sql.execution.datasources.oap.filecache.FiberCache; import org.apache.spark.sql.execution.datasources.oap.filecache.FiberCacheManager; import org.apache.spark.sql.execution.datasources.oap.filecache.OrcBinaryFiberId; diff --git a/oap-cache/oap/src/main/orc-1.5.9-nohive/java/org/apache/orc/impl/RecordReaderImpl.java b/oap-cache/oap/src/main/orc-1.5.10/java/org/apache/orc/impl/RecordReaderImpl.java similarity index 98% rename from oap-cache/oap/src/main/orc-1.5.9-nohive/java/org/apache/orc/impl/RecordReaderImpl.java rename to oap-cache/oap/src/main/orc-1.5.10/java/org/apache/orc/impl/RecordReaderImpl.java index 955b931cb..d83bcc051 100644 --- a/oap-cache/oap/src/main/orc-1.5.9-nohive/java/org/apache/orc/impl/RecordReaderImpl.java +++ b/oap-cache/oap/src/main/orc-1.5.10/java/org/apache/orc/impl/RecordReaderImpl.java @@ -53,17 +53,17 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.Path; -import org.apache.orc.storage.common.io.DiskRange; -import org.apache.orc.storage.common.io.DiskRangeList; -import org.apache.orc.storage.common.io.DiskRangeList.CreateHelper; -import org.apache.orc.storage.common.type.HiveDecimal; -import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch; -import org.apache.orc.storage.ql.io.sarg.PredicateLeaf; -import org.apache.orc.storage.ql.io.sarg.SearchArgument; -import org.apache.orc.storage.ql.io.sarg.SearchArgument.TruthValue; -import org.apache.orc.storage.serde2.io.DateWritable; -import org.apache.orc.storage.serde2.io.HiveDecimalWritable; -import org.apache.orc.storage.ql.util.TimestampUtils; +import org.apache.hadoop.hive.common.io.DiskRange; +import org.apache.hadoop.hive.common.io.DiskRangeList; +import org.apache.hadoop.hive.common.io.DiskRangeList.CreateHelper; +import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf; +import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; +import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.TruthValue; +import org.apache.hadoop.hive.serde2.io.DateWritable; +import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; +import org.apache.hadoop.hive.ql.util.TimestampUtils; import org.apache.hadoop.io.Text; public class RecordReaderImpl implements RecordReader { @@ -310,6 +310,13 @@ public long getNext() { } } + public static final class ZeroPositionProvider implements PositionProvider { + @Override + public long getNext() { + return 0; + } + } + public OrcProto.StripeFooter readStripeFooter(StripeInformation stripe ) throws IOException { return dataReader.readStripeFooter(stripe); @@ -1419,7 +1426,13 @@ protected void seekToRowEntry(TreeReaderFactory.TreeReader reader, int rowEntry) PositionProvider[] index = new PositionProvider[indexes.length]; for (int i = 0; i < indexes.length; ++i) { if (indexes[i] != null) { - index[i] = new PositionProviderImpl(indexes[i].getEntry(rowEntry)); + OrcProto.RowIndexEntry entry = indexes[i].getEntry(rowEntry); + // This is effectively a test for pre-ORC-569 files. + if (rowEntry == 0 && entry.getPositionsCount() == 0) { + index[i] = new ZeroPositionProvider(); + } else { + index[i] = new PositionProviderImpl(entry); + } } } reader.seek(index); diff --git a/oap-cache/oap/src/main/orc-1.5.9-nohive/java/org/apache/orc/impl/RecordReaderUtils.java b/oap-cache/oap/src/main/orc-1.5.10/java/org/apache/orc/impl/RecordReaderUtils.java similarity index 99% rename from oap-cache/oap/src/main/orc-1.5.9-nohive/java/org/apache/orc/impl/RecordReaderUtils.java rename to oap-cache/oap/src/main/orc-1.5.10/java/org/apache/orc/impl/RecordReaderUtils.java index 32f5639d8..4a14648b6 100644 --- a/oap-cache/oap/src/main/orc-1.5.9-nohive/java/org/apache/orc/impl/RecordReaderUtils.java +++ b/oap-cache/oap/src/main/orc-1.5.10/java/org/apache/orc/impl/RecordReaderUtils.java @@ -32,8 +32,8 @@ import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.orc.storage.common.io.DiskRange; -import org.apache.orc.storage.common.io.DiskRangeList; +import org.apache.hadoop.hive.common.io.DiskRange; +import org.apache.hadoop.hive.common.io.DiskRangeList; import org.apache.orc.CompressionCodec; import org.apache.orc.CompressionKind; import org.apache.orc.DataReader; @@ -381,7 +381,7 @@ public static void addRgFilteredStreamToRanges(OrcProto.Stream stream, if (!includedRowGroups[group]) continue; int posn = getIndexPosition( encoding.getKind(), type.getKind(), stream.getKind(), isCompressed, hasNull); - long start = index.getEntry(group).getPositions(posn); + long start = group == 0 ? 0 : index.getEntry(group).getPositions(posn); final long nextGroupOffset; boolean isLast = group == (includedRowGroups.length - 1); nextGroupOffset = isLast ? length : index.getEntry(group + 1).getPositions(posn); diff --git a/oap-cache/oap/src/main/scala/org/apache/spark/sql/execution/datasources/oap/io/OrcCacheReader.java b/oap-cache/oap/src/main/scala/org/apache/spark/sql/execution/datasources/oap/io/OrcCacheReader.java index da644dbae..799e5ad8b 100644 --- a/oap-cache/oap/src/main/scala/org/apache/spark/sql/execution/datasources/oap/io/OrcCacheReader.java +++ b/oap-cache/oap/src/main/scala/org/apache/spark/sql/execution/datasources/oap/io/OrcCacheReader.java @@ -20,9 +20,9 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.orc.*; -import org.apache.orc.storage.common.type.HiveDecimal; -import org.apache.orc.storage.ql.exec.vector.*; -import org.apache.orc.storage.serde2.io.HiveDecimalWritable; +import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.hadoop.hive.ql.exec.vector.*; +import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; import org.apache.spark.memory.MemoryMode; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.execution.datasources.oap.filecache.FiberCache; diff --git a/oap-cache/oap/src/main/scala/org/apache/spark/sql/execution/datasources/oap/io/OrcDataFile.scala b/oap-cache/oap/src/main/scala/org/apache/spark/sql/execution/datasources/oap/io/OrcDataFile.scala index 65de80670..c35516b64 100644 --- a/oap-cache/oap/src/main/scala/org/apache/spark/sql/execution/datasources/oap/io/OrcDataFile.scala +++ b/oap-cache/oap/src/main/scala/org/apache/spark/sql/execution/datasources/oap/io/OrcDataFile.scala @@ -23,12 +23,12 @@ import scala.collection.JavaConverters._ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path +import org.apache.hadoop.hive.ql.exec.vector.{ColumnVector, VectorizedRowBatch} import org.apache.hadoop.mapreduce.RecordReader import org.apache.orc._ import org.apache.orc.impl.{ReaderImpl, RecordReaderCacheImpl} import org.apache.orc.mapred.{OrcInputFormat, OrcStruct} import org.apache.orc.mapreduce._ -import org.apache.orc.storage.ql.exec.vector.{ColumnVector, VectorizedRowBatch} import org.apache.parquet.hadoop.{ParquetFiberDataReader, VectorizedOapRecordReader} import org.apache.spark.sql.catalyst.InternalRow @@ -230,10 +230,10 @@ private[oap] case class OrcDataFile( val field = schema.fields(fiberId) val toColumn = new OnHeapColumnVector(rowCount, field.dataType) if (fromColumn.isRepeating) { - OrcCacheReader.putRepeatingValues(rowCount, field, fromColumn, toColumn) + OrcCacheReader.putRepeatingValues(rowCount, field, fromColumn, toColumn) } else if (fromColumn.noNulls) { - OrcCacheReader.putNonNullValues(rowCount, field, fromColumn, toColumn) + OrcCacheReader.putNonNullValues(rowCount, field, fromColumn, toColumn) } else { OrcCacheReader.putValues(rowCount, field, fromColumn, toColumn) diff --git a/oap-cache/oap/src/main/scala/org/apache/spark/sql/execution/datasources/oap/io/ParquetFiberDataLoader.scala b/oap-cache/oap/src/main/scala/org/apache/spark/sql/execution/datasources/oap/io/ParquetFiberDataLoader.scala index 0cceae5ba..a42e856ba 100644 --- a/oap-cache/oap/src/main/scala/org/apache/spark/sql/execution/datasources/oap/io/ParquetFiberDataLoader.scala +++ b/oap-cache/oap/src/main/scala/org/apache/spark/sql/execution/datasources/oap/io/ParquetFiberDataLoader.scala @@ -67,7 +67,7 @@ private[oap] case class ParquetFiberDataLoader( val fiberData = reader.readFiberData(blockMetaData, columnDescriptor) val columnReader = new VectorizedColumnReader(columnDescriptor, originalType, - fiberData.getPageReader(columnDescriptor), ZoneId.systemDefault, true) + fiberData.getPageReader(columnDescriptor), ZoneId.systemDefault, "LEGACY") if (OapRuntime.getOrCreate.fiberCacheManager.dataCacheCompressEnable) { ParquetDataFiberCompressedWriter.dumpToCache( diff --git a/oap-cache/oap/src/main/spark3.0.0/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java b/oap-cache/oap/src/main/spark3.0.0/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java index 8fbd73076..a24134808 100644 --- a/oap-cache/oap/src/main/spark3.0.0/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java +++ b/oap-cache/oap/src/main/spark3.0.0/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java @@ -36,9 +36,9 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils; import org.apache.spark.sql.catalyst.util.RebaseDateTime; +import org.apache.spark.sql.execution.datasources.DataSourceUtils; import org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException; import org.apache.spark.sql.execution.vectorized.WritableColumnVector; -import org.apache.spark.sql.internal.SQLConf; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.DecimalType; @@ -103,14 +103,14 @@ public class VectorizedColumnReader { // The timezone conversion to apply to int96 timestamps. Null if no conversion. protected final ZoneId convertTz; protected static final ZoneId UTC = ZoneOffset.UTC; - protected final boolean rebaseDateTime; + protected final String datetimeRebaseMode; public VectorizedColumnReader( ColumnDescriptor descriptor, OriginalType originalType, PageReader pageReader, ZoneId convertTz, - boolean rebaseDateTime) throws IOException { + String datetimeRebaseMode) throws IOException { this.descriptor = descriptor; this.pageReader = pageReader; this.convertTz = convertTz; @@ -133,7 +133,9 @@ public VectorizedColumnReader( if (totalValueCount == 0) { throw new IOException("totalValueCount == 0"); } - this.rebaseDateTime = rebaseDateTime; + assert "LEGACY".equals(datetimeRebaseMode) || "EXCEPTION".equals(datetimeRebaseMode) || + "CORRECTED".equals(datetimeRebaseMode); + this.datetimeRebaseMode = datetimeRebaseMode; } /** @@ -152,15 +154,16 @@ private boolean next() throws IOException { //repetitionLevel = repetitionLevelColumn.nextInt(); return definitionLevelColumn.nextInt() == maxDefLevel; } + private boolean isLazyDecodingSupported(PrimitiveType.PrimitiveTypeName typeName) { boolean isSupported = false; switch (typeName) { case INT32: - isSupported = originalType != OriginalType.DATE || !rebaseDateTime; + isSupported = originalType != OriginalType.DATE || "CORRECTED".equals(datetimeRebaseMode); break; case INT64: if (originalType == OriginalType.TIMESTAMP_MICROS) { - isSupported = !rebaseDateTime; + isSupported = "CORRECTED".equals(datetimeRebaseMode); } else { isSupported = originalType != OriginalType.TIMESTAMP_MILLIS; } @@ -174,6 +177,30 @@ private boolean isLazyDecodingSupported(PrimitiveType.PrimitiveTypeName typeName return isSupported; } + static int rebaseDays(int julianDays, final boolean failIfRebase) { + if (failIfRebase) { + if (julianDays < RebaseDateTime.lastSwitchJulianDay()) { + throw DataSourceUtils.newRebaseExceptionInRead("Parquet"); + } else { + return julianDays; + } + } else { + return RebaseDateTime.rebaseJulianToGregorianDays(julianDays); + } + } + + static long rebaseMicros(long julianMicros, final boolean failIfRebase) { + if (failIfRebase) { + if (julianMicros < RebaseDateTime.lastSwitchJulianTs()) { + throw DataSourceUtils.newRebaseExceptionInRead("Parquet"); + } else { + return julianMicros; + } + } else { + return RebaseDateTime.rebaseJulianToGregorianMicros(julianMicros); + } + } + /** * Reads `total` values from this columnReader into column. */ @@ -283,7 +310,7 @@ private void decodeDictionaryIds( case INT32: if (column.dataType() == DataTypes.IntegerType || DecimalType.is32BitDecimalType(column.dataType()) || - (column.dataType() == DataTypes.DateType && !rebaseDateTime)) { + (column.dataType() == DataTypes.DateType && "CORRECTED".equals(datetimeRebaseMode))) { for (int i = rowId; i < rowId + num; ++i) { if (!column.isNullAt(i)) { column.putInt(i, dictionary.decodeToInt(dictionaryIds.getDictId(i))); @@ -302,11 +329,11 @@ private void decodeDictionaryIds( } } } else if (column.dataType() == DataTypes.DateType) { + final boolean failIfRebase = "EXCEPTION".equals(datetimeRebaseMode); for (int i = rowId; i < rowId + num; ++i) { if (!column.isNullAt(i)) { int julianDays = dictionary.decodeToInt(dictionaryIds.getDictId(i)); - int gregorianDays = RebaseDateTime.rebaseJulianToGregorianDays(julianDays); - column.putInt(i, gregorianDays); + column.putInt(i, rebaseDays(julianDays, failIfRebase)); } } } else { @@ -317,20 +344,40 @@ private void decodeDictionaryIds( case INT64: if (column.dataType() == DataTypes.LongType || DecimalType.is64BitDecimalType(column.dataType()) || - originalType == OriginalType.TIMESTAMP_MICROS) { + (originalType == OriginalType.TIMESTAMP_MICROS && + "CORRECTED".equals(datetimeRebaseMode))) { for (int i = rowId; i < rowId + num; ++i) { if (!column.isNullAt(i)) { column.putLong(i, dictionary.decodeToLong(dictionaryIds.getDictId(i))); } } } else if (originalType == OriginalType.TIMESTAMP_MILLIS) { + if ("CORRECTED".equals(datetimeRebaseMode)) { + for (int i = rowId; i < rowId + num; ++i) { + if (!column.isNullAt(i)) { + long gregorianMillis = dictionary.decodeToLong(dictionaryIds.getDictId(i)); + column.putLong(i, DateTimeUtils.fromMillis(gregorianMillis)); + } + } + } else { + final boolean failIfRebase = "EXCEPTION".equals(datetimeRebaseMode); + for (int i = rowId; i < rowId + num; ++i) { + if (!column.isNullAt(i)) { + long julianMillis = dictionary.decodeToLong(dictionaryIds.getDictId(i)); + long julianMicros = DateTimeUtils.fromMillis(julianMillis); + column.putLong(i, rebaseMicros(julianMicros, failIfRebase)); + } + } + } + } else if (originalType == OriginalType.TIMESTAMP_MICROS) { + final boolean failIfRebase = "EXCEPTION".equals(datetimeRebaseMode); for (int i = rowId; i < rowId + num; ++i) { if (!column.isNullAt(i)) { - column.putLong(i, - DateTimeUtils.fromMillis(dictionary.decodeToLong(dictionaryIds.getDictId(i)))); + long julianMicros = dictionary.decodeToLong(dictionaryIds.getDictId(i)); + column.putLong(i, rebaseMicros(julianMicros, failIfRebase)); } } - } else { + } else { throw constructConvertNotSupportedException(descriptor, column); } break; @@ -447,12 +494,13 @@ private void readIntBatch(int rowId, int num, WritableColumnVector column) throw defColumn.readShorts( num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn); } else if (column.dataType() == DataTypes.DateType ) { - if (rebaseDateTime) { - defColumn.readIntegersWithRebase( - num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn); - } else { + if ("CORRECTED".equals(datetimeRebaseMode)) { defColumn.readIntegers( - num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn); + num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn); + } else { + boolean failIfRebase = "EXCEPTION".equals(datetimeRebaseMode); + defColumn.readIntegersWithRebase( + num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn, failIfRebase); } } else { throw constructConvertNotSupportedException(descriptor, column); @@ -466,27 +514,29 @@ private void readLongBatch(int rowId, int num, WritableColumnVector column) thro defColumn.readLongs( num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn); } else if (originalType == OriginalType.TIMESTAMP_MICROS) { - if (rebaseDateTime) { - defColumn.readIntegersWithRebase( - num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn); - } else { + if ("CORRECTED".equals(datetimeRebaseMode)) { defColumn.readLongs( num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn); + } else { + boolean failIfRebase = "EXCEPTION".equals(datetimeRebaseMode); + defColumn.readLongsWithRebase( + num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn, failIfRebase); } } else if (originalType == OriginalType.TIMESTAMP_MILLIS) { - if (rebaseDateTime) { - for (int i = 0; i < num; i++) { - if (defColumn.readInteger() == maxDefLevel) { - long micros = DateTimeUtils.fromMillis(dataColumn.readLong()); - column.putLong(rowId + i, RebaseDateTime.rebaseJulianToGregorianMicros(micros)); - } else { - column.putNull(rowId + i); + if ("CORRECTED".equals(datetimeRebaseMode)) { + for (int i = 0; i < num; i++) { + if (defColumn.readInteger() == maxDefLevel) { + column.putLong(rowId + i, DateTimeUtils.fromMillis(dataColumn.readLong())); + } else { + column.putNull(rowId + i); + } } - } } else { + final boolean failIfRebase = "EXCEPTION".equals(datetimeRebaseMode); for (int i = 0; i < num; i++) { if (defColumn.readInteger() == maxDefLevel) { - column.putLong(rowId + i, DateTimeUtils.fromMillis(dataColumn.readLong())); + long julianMicros = DateTimeUtils.fromMillis(dataColumn.readLong()); + column.putLong(rowId + i, rebaseMicros(julianMicros, failIfRebase)); } else { column.putNull(rowId + i); } diff --git a/oap-cache/oap/src/main/spark3.0.0/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java b/oap-cache/oap/src/main/spark3.0.0/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java index 926f6c7d2..d5efd44d1 100644 --- a/oap-cache/oap/src/main/spark3.0.0/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java +++ b/oap-cache/oap/src/main/spark3.0.0/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java @@ -21,12 +21,14 @@ import java.nio.ByteOrder; import org.apache.parquet.bytes.ByteBufferInputStream; +import org.apache.parquet.column.values.ValuesReader; +import org.apache.parquet.io.api.Binary; import org.apache.parquet.io.ParquetDecodingException; + import org.apache.spark.sql.catalyst.util.RebaseDateTime; +import org.apache.spark.sql.execution.datasources.DataSourceUtils; import org.apache.spark.sql.execution.vectorized.WritableColumnVector; -import org.apache.parquet.column.values.ValuesReader; -import org.apache.parquet.io.api.Binary; /** * An implementation of the Parquet PLAIN decoder that supports the vectorized interface. @@ -86,7 +88,8 @@ public final void readIntegers(int total, WritableColumnVector c, int rowId) { // iterates the values twice: check if we need to rebase first, then go to the optimized branch // if rebase is not needed. @Override - public final void readIntegersWithRebase(int total, WritableColumnVector c, int rowId) { + public final void readIntegersWithRebase( + int total, WritableColumnVector c, int rowId, boolean failIfRebase) { int requiredBytes = total * 4; ByteBuffer buffer = getBuffer(requiredBytes); boolean rebase = false; @@ -94,8 +97,12 @@ public final void readIntegersWithRebase(int total, WritableColumnVector c, int rebase |= buffer.getInt(buffer.position() + i * 4) < RebaseDateTime.lastSwitchJulianDay(); } if (rebase) { - for (int i = 0; i < total; i += 1) { - c.putInt(rowId + i, RebaseDateTime.rebaseJulianToGregorianDays(buffer.getInt())); + if (failIfRebase) { + throw DataSourceUtils.newRebaseExceptionInRead("Parquet"); + } else { + for (int i = 0; i < total; i += 1) { + c.putInt(rowId + i, RebaseDateTime.rebaseJulianToGregorianDays(buffer.getInt())); + } } } else { if (buffer.hasArray()) { @@ -128,7 +135,8 @@ public final void readLongs(int total, WritableColumnVector c, int rowId) { // iterates the values twice: check if we need to rebase first, then go to the optimized branch // if rebase is not needed. @Override - public final void readLongsWithRebase(int total, WritableColumnVector c, int rowId) { + public final void readLongsWithRebase( + int total, WritableColumnVector c, int rowId, boolean failIfRebase) { int requiredBytes = total * 8; ByteBuffer buffer = getBuffer(requiredBytes); boolean rebase = false; @@ -136,8 +144,12 @@ public final void readLongsWithRebase(int total, WritableColumnVector c, int row rebase |= buffer.getLong(buffer.position() + i * 8) < RebaseDateTime.lastSwitchJulianTs(); } if (rebase) { - for (int i = 0; i < total; i += 1) { - c.putLong(rowId + i, RebaseDateTime.rebaseJulianToGregorianMicros(buffer.getLong())); + if (failIfRebase) { + throw DataSourceUtils.newRebaseExceptionInRead("Parquet"); + } else { + for (int i = 0; i < total; i += 1) { + c.putLong(rowId + i, RebaseDateTime.rebaseJulianToGregorianMicros(buffer.getLong())); + } } } else { if (buffer.hasArray()) { diff --git a/oap-cache/oap/src/main/spark3.0.0/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java b/oap-cache/oap/src/main/spark3.0.0/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java index 3a5799281..e5af6bc2e 100644 --- a/oap-cache/oap/src/main/spark3.0.0/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java +++ b/oap-cache/oap/src/main/spark3.0.0/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java @@ -17,6 +17,9 @@ package org.apache.spark.sql.execution.datasources.parquet; +import java.io.IOException; +import java.nio.ByteBuffer; + import org.apache.parquet.Preconditions; import org.apache.parquet.bytes.ByteBufferInputStream; import org.apache.parquet.bytes.BytesUtils; @@ -26,12 +29,8 @@ import org.apache.parquet.io.ParquetDecodingException; import org.apache.parquet.io.api.Binary; -import org.apache.spark.sql.catalyst.util.RebaseDateTime; import org.apache.spark.sql.execution.vectorized.WritableColumnVector; -import java.io.IOException; -import java.nio.ByteBuffer; - /** * A values reader for Parquet's run-length encoded data. This is based off of the version in * parquet-mr with these changes: @@ -211,7 +210,8 @@ public void readIntegersWithRebase( WritableColumnVector c, int rowId, int level, - VectorizedValuesReader data) throws IOException { + VectorizedValuesReader data, + final boolean failIfRebase) throws IOException { int left = total; while (left > 0) { if (this.currentCount == 0) this.readNextGroup(); @@ -219,7 +219,7 @@ public void readIntegersWithRebase( switch (mode) { case RLE: if (currentValue == level) { - data.readIntegersWithRebase(n, c, rowId); + data.readIntegersWithRebase(n, c, rowId, failIfRebase); } else { c.putNulls(rowId, n); } @@ -227,8 +227,8 @@ public void readIntegersWithRebase( case PACKED: for (int i = 0; i < n; ++i) { if (currentBuffer[currentBufferIdx++] == level) { - c.putInt(rowId + i, - RebaseDateTime.rebaseJulianToGregorianDays(data.readInteger())); + int julianDays = data.readInteger(); + c.putInt(rowId + i, VectorizedColumnReader.rebaseDays(julianDays, failIfRebase)); } else { c.putNull(rowId + i); } @@ -387,7 +387,8 @@ public void readLongsWithRebase( WritableColumnVector c, int rowId, int level, - VectorizedValuesReader data) throws IOException { + VectorizedValuesReader data, + final boolean failIfRebase) throws IOException { int left = total; while (left > 0) { if (this.currentCount == 0) this.readNextGroup(); @@ -395,7 +396,7 @@ public void readLongsWithRebase( switch (mode) { case RLE: if (currentValue == level) { - data.readLongsWithRebase(n, c, rowId); + data.readLongsWithRebase(n, c, rowId, failIfRebase); } else { c.putNulls(rowId, n); } @@ -403,8 +404,8 @@ public void readLongsWithRebase( case PACKED: for (int i = 0; i < n; ++i) { if (currentBuffer[currentBufferIdx++] == level) { - c.putLong(rowId + i, - RebaseDateTime.rebaseJulianToGregorianMicros(data.readLong())); + long julianMicros = data.readLong(); + c.putLong(rowId + i, VectorizedColumnReader.rebaseMicros(julianMicros, failIfRebase)); } else { c.putNull(rowId + i); } @@ -584,7 +585,8 @@ public void readIntegers(int total, WritableColumnVector c, int rowId) { } @Override - public void readIntegersWithRebase(int total, WritableColumnVector c, int rowId) { + public void readIntegersWithRebase( + int total, WritableColumnVector c, int rowId, boolean failIfRebase) { throw new UnsupportedOperationException("only readInts is valid."); } @@ -604,7 +606,8 @@ public void readLongs(int total, WritableColumnVector c, int rowId) { } @Override - public void readLongsWithRebase(int total, WritableColumnVector c, int rowId) { + public void readLongsWithRebase( + int total, WritableColumnVector c, int rowId, boolean failIfRebase) { throw new UnsupportedOperationException("only readInts is valid."); } diff --git a/oap-cache/oap/src/test/scala/org/apache/spark/sql/execution/datasources/oap/io/ParquetDataFileSuite.scala b/oap-cache/oap/src/test/scala/org/apache/spark/sql/execution/datasources/oap/io/ParquetDataFileSuite.scala index 08354b030..77514d9ed 100644 --- a/oap-cache/oap/src/test/scala/org/apache/spark/sql/execution/datasources/oap/io/ParquetDataFileSuite.scala +++ b/oap-cache/oap/src/test/scala/org/apache/spark/sql/execution/datasources/oap/io/ParquetDataFileSuite.scala @@ -635,7 +635,7 @@ class ParquetCacheDataWithDictionaryWithNullsCompressedSuite extends ParquetData val fiberData = reader.readFiberData(blockMetaData, columnDescriptor) val columnReader = new VectorizedColumnReader(columnDescriptor, types.get(0).getOriginalType, - fiberData.getPageReader(columnDescriptor), ZoneId.systemDefault, true) + fiberData.getPageReader(columnDescriptor), ZoneId.systemDefault, "LEGACY") val fiberCache = ParquetDataFiberCompressedWriter. dumpToCache(columnReader, rowCount, IntegerType) // init reader @@ -666,7 +666,7 @@ class ParquetCacheDataWithDictionaryWithNullsCompressedSuite extends ParquetData val fiberData = reader.readFiberData(blockMetaData, columnDescriptor) val columnReader = new VectorizedColumnReader(columnDescriptor, types.get(1).getOriginalType, - fiberData.getPageReader(columnDescriptor), ZoneId.systemDefault, true) + fiberData.getPageReader(columnDescriptor), ZoneId.systemDefault, "LEGACY") val fiberCache = ParquetDataFiberCompressedWriter. dumpToCache(columnReader, rowCount, LongType) // init reader @@ -698,7 +698,7 @@ class ParquetCacheDataWithDictionaryWithNullsCompressedSuite extends ParquetData val fiberData = reader.readFiberData(blockMetaData, columnDescriptor) val columnReader = new VectorizedColumnReader(columnDescriptor, types.get(2).getOriginalType, - fiberData.getPageReader(columnDescriptor), ZoneId.systemDefault, true) + fiberData.getPageReader(columnDescriptor), ZoneId.systemDefault, "LEGACY") val fiberCache = ParquetDataFiberCompressedWriter. dumpToCache(columnReader, rowCount, BooleanType) // init reader @@ -731,7 +731,7 @@ class ParquetCacheDataWithDictionaryWithNullsCompressedSuite extends ParquetData val fiberData = reader.readFiberData(blockMetaData, columnDescriptor) val columnReader = new VectorizedColumnReader(columnDescriptor, types.get(3).getOriginalType, - fiberData.getPageReader(columnDescriptor), ZoneId.systemDefault, true) + fiberData.getPageReader(columnDescriptor), ZoneId.systemDefault, "LEGACY") val fiberCache = ParquetDataFiberCompressedWriter. dumpToCache(columnReader, rowCount, FloatType) // init reader @@ -764,7 +764,7 @@ class ParquetCacheDataWithDictionaryWithNullsCompressedSuite extends ParquetData val fiberData = reader.readFiberData(blockMetaData, columnDescriptor) val columnReader = new VectorizedColumnReader(columnDescriptor, types.get(4).getOriginalType, - fiberData.getPageReader(columnDescriptor), ZoneId.systemDefault, true) + fiberData.getPageReader(columnDescriptor), ZoneId.systemDefault, "LEGACY") val fiberCache = ParquetDataFiberCompressedWriter. dumpToCache(columnReader, rowCount, DoubleType) // init reader @@ -797,7 +797,7 @@ class ParquetCacheDataWithDictionaryWithNullsCompressedSuite extends ParquetData val fiberData = reader.readFiberData(blockMetaData, columnDescriptor) val columnReader = new VectorizedColumnReader(columnDescriptor, types.get(5).getOriginalType, - fiberData.getPageReader(columnDescriptor), ZoneId.systemDefault, true) + fiberData.getPageReader(columnDescriptor), ZoneId.systemDefault, "LEGACY") val fiberCache = ParquetDataFiberCompressedWriter. dumpToCache(columnReader, rowCount, BinaryType) // init reader @@ -867,7 +867,7 @@ class ParquetCacheDataWithDictionaryWithoutNullsCompressedSuite extends ParquetD val fiberData = reader.readFiberData(blockMetaData, columnDescriptor) val columnReader = new VectorizedColumnReader(columnDescriptor, types.get(0).getOriginalType, - fiberData.getPageReader(columnDescriptor), ZoneId.systemDefault, true) + fiberData.getPageReader(columnDescriptor), ZoneId.systemDefault, "LEGACY") val fiberCache = ParquetDataFiberCompressedWriter. dumpToCache(columnReader, rowCount, IntegerType) // init reader @@ -896,7 +896,7 @@ class ParquetCacheDataWithDictionaryWithoutNullsCompressedSuite extends ParquetD val fiberData = reader.readFiberData(blockMetaData, columnDescriptor) val columnReader = new VectorizedColumnReader(columnDescriptor, types.get(1).getOriginalType, - fiberData.getPageReader(columnDescriptor), ZoneId.systemDefault, true) + fiberData.getPageReader(columnDescriptor), ZoneId.systemDefault, "LEGACY") val fiberCache = ParquetDataFiberCompressedWriter. dumpToCache(columnReader, rowCount, LongType) // init reader @@ -926,7 +926,7 @@ class ParquetCacheDataWithDictionaryWithoutNullsCompressedSuite extends ParquetD val fiberData = reader.readFiberData(blockMetaData, columnDescriptor) val columnReader = new VectorizedColumnReader(columnDescriptor, types.get(2).getOriginalType, - fiberData.getPageReader(columnDescriptor), ZoneId.systemDefault, true) + fiberData.getPageReader(columnDescriptor), ZoneId.systemDefault, "LEGACY") val fiberCache = ParquetDataFiberCompressedWriter. dumpToCache(columnReader, rowCount, BooleanType) // init reader @@ -957,7 +957,7 @@ class ParquetCacheDataWithDictionaryWithoutNullsCompressedSuite extends ParquetD val fiberData = reader.readFiberData(blockMetaData, columnDescriptor) val columnReader = new VectorizedColumnReader(columnDescriptor, types.get(3).getOriginalType, - fiberData.getPageReader(columnDescriptor), ZoneId.systemDefault, true) + fiberData.getPageReader(columnDescriptor), ZoneId.systemDefault, "LEGACY") val fiberCache = ParquetDataFiberCompressedWriter. dumpToCache(columnReader, rowCount, FloatType) // init reader @@ -988,7 +988,7 @@ class ParquetCacheDataWithDictionaryWithoutNullsCompressedSuite extends ParquetD val fiberData = reader.readFiberData(blockMetaData, columnDescriptor) val columnReader = new VectorizedColumnReader(columnDescriptor, types.get(4).getOriginalType, - fiberData.getPageReader(columnDescriptor), ZoneId.systemDefault, true) + fiberData.getPageReader(columnDescriptor), ZoneId.systemDefault, "LEGACY") val fiberCache = ParquetDataFiberCompressedWriter. dumpToCache(columnReader, rowCount, DoubleType) // init reader @@ -1019,7 +1019,7 @@ class ParquetCacheDataWithDictionaryWithoutNullsCompressedSuite extends ParquetD val fiberData = reader.readFiberData(blockMetaData, columnDescriptor) val columnReader = new VectorizedColumnReader(columnDescriptor, types.get(5).getOriginalType, - fiberData.getPageReader(columnDescriptor), ZoneId.systemDefault, true) + fiberData.getPageReader(columnDescriptor), ZoneId.systemDefault, "LEGACY") val fiberCache = ParquetDataFiberCompressedWriter. dumpToCache(columnReader, rowCount, BinaryType) // init reader @@ -1538,7 +1538,7 @@ class ParquetFiberDataReaderSuite extends ParquetDataFileSuite { val fiberData = reader.readFiberData(blockMetaData, columnDescriptor) val columnReader = new SkippableVectorizedColumnReader(columnDescriptor, types.get(0).getOriginalType, - fiberData.getPageReader(columnDescriptor), ZoneId.systemDefault, true) + fiberData.getPageReader(columnDescriptor), ZoneId.systemDefault, "LEGACY") columnReader.readBatch(rowCount, vector) for (i <- 0 until rowCount) { assert(i * 2 == vector.getInt(i)) diff --git a/oap-cache/oap/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/SkippableVectorizedColumnReaderSuite.scala b/oap-cache/oap/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/SkippableVectorizedColumnReaderSuite.scala index 0284770c9..2d4c1f5fe 100644 --- a/oap-cache/oap/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/SkippableVectorizedColumnReaderSuite.scala +++ b/oap-cache/oap/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/SkippableVectorizedColumnReaderSuite.scala @@ -904,7 +904,7 @@ class SkippableVectorizedColumnReaderSuite extends SparkFunSuite with SharedOapC val pageReader = rowGroup.getPageReader(descriptor) val timeZone = ZoneId.systemDefault val columnReader = - new SkippableVectorizedColumnReader(descriptor, originalType, pageReader, timeZone, true) + new SkippableVectorizedColumnReader(descriptor, originalType, pageReader, timeZone, "LEGACY") val columnVector = new OnHeapColumnVector(unitSize, dataType) columnReader.skipBatch(unitSize, columnVector.dataType) columnVector.reset() @@ -932,7 +932,7 @@ class SkippableVectorizedColumnReaderSuite extends SparkFunSuite with SharedOapC val pageReader = rowGroup.getPageReader(descriptor) val timeZone = ZoneId.systemDefault val columnReader = - new SkippableVectorizedColumnReader(descriptor, originalType, pageReader, timeZone, true) + new SkippableVectorizedColumnReader(descriptor, originalType, pageReader, timeZone, "LEGACY") columnReader.skipBatch(unitSize, dataType) } finally { if (reader != null) reader.close()