diff --git a/native-sql-engine/core/src/main/java/com/intel/oap/vectorized/CompressedVectorLoader.java b/native-sql-engine/core/src/main/java/com/intel/oap/vectorized/CompressedVectorLoader.java index 6a0cdbd76..8def18b04 100644 --- a/native-sql-engine/core/src/main/java/com/intel/oap/vectorized/CompressedVectorLoader.java +++ b/native-sql-engine/core/src/main/java/com/intel/oap/vectorized/CompressedVectorLoader.java @@ -65,4 +65,11 @@ public void loadCompressed(ArrowRecordBatch recordBatch) { + Collections2.toList(buffers).toString()); } } + + /** + * Direct router to VectorLoader#load() + */ + public void loadUncompressed(ArrowRecordBatch recordBatch) { + super.load(recordBatch); + } } diff --git a/native-sql-engine/core/src/main/java/com/intel/oap/vectorized/SchemaAwareArrowCompressedStreamReader.java b/native-sql-engine/core/src/main/java/com/intel/oap/vectorized/SchemaAwareArrowCompressedStreamReader.java index fc5fd0c2b..102427262 100644 --- a/native-sql-engine/core/src/main/java/com/intel/oap/vectorized/SchemaAwareArrowCompressedStreamReader.java +++ b/native-sql-engine/core/src/main/java/com/intel/oap/vectorized/SchemaAwareArrowCompressedStreamReader.java @@ -23,6 +23,7 @@ import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.vector.FieldVector; import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.compression.NoCompressionCodec; import org.apache.arrow.vector.dictionary.Dictionary; import org.apache.arrow.vector.ipc.ArrowStreamReader; import org.apache.arrow.vector.ipc.message.ArrowDictionaryBatch; @@ -42,7 +43,11 @@ * ArrowRecordBatches. */ public class SchemaAwareArrowCompressedStreamReader extends ArrowStreamReader { + public static final String COMPRESS_TYPE_NONE = "none"; + private final Schema originalSchema; + + // fixme: the design can be improved to avoid relying on this stateful field private String compressType; public SchemaAwareArrowCompressedStreamReader(Schema originalSchema, InputStream in, @@ -57,7 +62,7 @@ public SchemaAwareArrowCompressedStreamReader(InputStream in, this(null, in, allocator); } - public String GetCompressType() { + public String getCompressType() { return compressType; } @@ -112,12 +117,17 @@ public boolean loadNextBatch() throws IOException { } ArrowRecordBatch batch = MessageSerializer.deserializeRecordBatch(result.getMessage(), bodyBuffer); - String codecName = CompressionType.name(batch.getBodyCompression().getCodec()); - - if (codecName.equals("LZ4_FRAME")) { - compressType = "lz4"; + byte codec = batch.getBodyCompression().getCodec(); + final String codecName; + if (codec == NoCompressionCodec.COMPRESSION_TYPE) { + compressType = COMPRESS_TYPE_NONE; } else { - compressType = codecName; + codecName = CompressionType.name(codec); + if (codecName.equals("LZ4_FRAME")) { + compressType = "lz4"; + } else { + compressType = codecName; + } } loadRecordBatch(batch); @@ -138,9 +148,18 @@ public boolean loadNextBatch() throws IOException { @Override protected void loadRecordBatch(ArrowRecordBatch batch) { try { - ((CompressedVectorLoader) loader).loadCompressed(batch); + CompressedVectorLoader loader = (CompressedVectorLoader) this.loader; + if (isCurrentBatchCompressed()) { + loader.loadCompressed(batch); + } else { + loader.loadUncompressed(batch); + } } finally { batch.close(); } } + + public boolean isCurrentBatchCompressed() { + return !Objects.equals(getCompressType(), COMPRESS_TYPE_NONE); + } } diff --git a/native-sql-engine/core/src/main/java/com/intel/oap/vectorized/ShuffleSplitterJniWrapper.java b/native-sql-engine/core/src/main/java/com/intel/oap/vectorized/ShuffleSplitterJniWrapper.java index b9c362c04..93d5f3223 100644 --- a/native-sql-engine/core/src/main/java/com/intel/oap/vectorized/ShuffleSplitterJniWrapper.java +++ b/native-sql-engine/core/src/main/java/com/intel/oap/vectorized/ShuffleSplitterJniWrapper.java @@ -43,6 +43,7 @@ public long make( long offheapPerTask, int bufferSize, String codec, + int batchCompressThreshold, String dataFile, int subDirsPerLocalDir, String localDirs, @@ -57,6 +58,7 @@ public long make( offheapPerTask, bufferSize, codec, + batchCompressThreshold, dataFile, subDirsPerLocalDir, localDirs, @@ -73,6 +75,7 @@ public native long nativeMake( long offheapPerTask, int bufferSize, String codec, + int batchCompressThreshold, String dataFile, int subDirsPerLocalDir, String localDirs, diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/GazellePluginConfig.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/GazellePluginConfig.scala index d7e243654..53b2f836f 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/GazellePluginConfig.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/GazellePluginConfig.scala @@ -187,7 +187,10 @@ class GazellePluginConfig(conf: SQLConf) extends Logging { val columnarShuffleUseCustomizedCompressionCodec: String = conf.getConfString("spark.oap.sql.columnar.shuffle.customizedCompression.codec", "lz4") - val shuffleSplitDefaultSize: Int = + val columnarShuffleBatchCompressThreshold: Int = + conf.getConfString("spark.oap.sql.columnar.shuffle.batchCompressThreshold", "100").toInt + + val shuffleSplitDefaultSize: Int = conf .getConfString("spark.oap.sql.columnar.shuffleSplitDefaultSize", "8192").toInt diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/vectorized/ArrowColumnarBatchSerializer.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/vectorized/ArrowColumnarBatchSerializer.scala index 9ee99179f..a5e4d814b 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/vectorized/ArrowColumnarBatchSerializer.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/vectorized/ArrowColumnarBatchSerializer.scala @@ -131,7 +131,9 @@ private class ArrowColumnarBatchSerializerInstance( numRowsTotal += numRows // jni call to decompress buffers - if (compressionEnabled) { + if (compressionEnabled && + reader.asInstanceOf[SchemaAwareArrowCompressedStreamReader] + .isCurrentBatchCompressed) { try { decompressVectors() } catch { @@ -231,7 +233,7 @@ private class ArrowColumnarBatchSerializerInstance( val serializedBatch = jniWrapper.decompress( schemaHolderId, - reader.asInstanceOf[SchemaAwareArrowCompressedStreamReader].GetCompressType(), + reader.asInstanceOf[SchemaAwareArrowCompressedStreamReader].getCompressType, root.getRowCount, bufAddrs.toArray, bufSizes.toArray, diff --git a/native-sql-engine/core/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala b/native-sql-engine/core/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala index 24757ba24..a3b05e30c 100644 --- a/native-sql-engine/core/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala +++ b/native-sql-engine/core/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala @@ -71,6 +71,9 @@ class ColumnarShuffleWriter[K, V]( } else { "uncompressed" } + private val batchCompressThreshold = + GazellePluginConfig.getConf.columnarShuffleBatchCompressThreshold; + private val preferSpill = GazellePluginConfig.getConf.columnarShufflePreferSpill private val writeSchema = GazellePluginConfig.getConf.columnarShuffleWriteSchema @@ -103,6 +106,7 @@ class ColumnarShuffleWriter[K, V]( offheapPerTask, nativeBufferSize, defaultCompressionCodec, + batchCompressThreshold, dataTmp.getAbsolutePath, blockManager.subDirsPerLocalDir, localDirs, diff --git a/native-sql-engine/cpp/src/jni/jni_wrapper.cc b/native-sql-engine/cpp/src/jni/jni_wrapper.cc index a000a8af3..f82068c37 100644 --- a/native-sql-engine/cpp/src/jni/jni_wrapper.cc +++ b/native-sql-engine/cpp/src/jni/jni_wrapper.cc @@ -1026,9 +1026,9 @@ JNIEXPORT jlong JNICALL Java_com_intel_oap_vectorized_ShuffleSplitterJniWrapper_nativeMake( JNIEnv* env, jobject, jstring partitioning_name_jstr, jint num_partitions, jbyteArray schema_arr, jbyteArray expr_arr, jlong offheap_per_task, jint buffer_size, - jstring compression_type_jstr, jstring data_file_jstr, jint num_sub_dirs, - jstring local_dirs_jstr, jboolean prefer_spill, jlong memory_pool_id, - jboolean write_schema) { + jstring compression_type_jstr, jint batch_compress_threshold, jstring data_file_jstr, + jint num_sub_dirs, jstring local_dirs_jstr, jboolean prefer_spill, + jlong memory_pool_id, jboolean write_schema) { JNI_METHOD_START if (partitioning_name_jstr == NULL) { JniThrow(std::string("Short partitioning name can't be null")); @@ -1114,6 +1114,7 @@ Java_com_intel_oap_vectorized_ShuffleSplitterJniWrapper_nativeMake( jlong attmpt_id = env->CallLongMethod(tc_obj, get_tsk_attmpt_mid); splitOptions.task_attempt_id = (int64_t)attmpt_id; } + splitOptions.batch_compress_threshold = batch_compress_threshold; auto splitter = JniGetOrThrow(Splitter::Make(partitioning_name, std::move(schema), num_partitions, diff --git a/native-sql-engine/cpp/src/shuffle/splitter.cc b/native-sql-engine/cpp/src/shuffle/splitter.cc index 44f21567b..d58f0605d 100644 --- a/native-sql-engine/cpp/src/shuffle/splitter.cc +++ b/native-sql-engine/cpp/src/shuffle/splitter.cc @@ -361,15 +361,45 @@ arrow::Status Splitter::Init() { arrow::Compression::UNCOMPRESSED)); } + // initialize tiny batch write options + tiny_bach_write_options_ = ipc_write_options; + ARROW_ASSIGN_OR_RAISE( + tiny_bach_write_options_.codec, + arrow::util::Codec::CreateInt32(arrow::Compression::UNCOMPRESSED)); + return arrow::Status::OK(); } +int64_t batch_nbytes(const arrow::RecordBatch& batch) { + int64_t accumulated = 0L; + for (const auto& array : batch.columns()) { + if (array == nullptr || array->data() == nullptr) { + continue; + } + for (const auto& buf : array->data()->buffers) { + if (buf == nullptr) { + continue; + } + accumulated += buf->capacity(); + } + } + return accumulated; +} + +int64_t batch_nbytes(std::shared_ptr batch) { + if (batch == nullptr) { + return 0; + } + return batch_nbytes(*batch); +} + int64_t Splitter::CompressedSize(const arrow::RecordBatch& rb) { auto payload = std::make_shared(); - auto result = + arrow::Status result; + result = arrow::ipc::GetRecordBatchPayload(rb, options_.ipc_write_options, payload.get()); if (result.ok()) { - return payload.get()->body_length; + return payload->body_length; } else { result.UnknownError("Failed to get the compressed size."); return -1; @@ -433,25 +463,6 @@ arrow::Status Splitter::Stop() { return arrow::Status::OK(); } -int64_t batch_nbytes(std::shared_ptr batch) { - int64_t accumulated = 0L; - if (batch == nullptr) { - return accumulated; - } - for (const auto& array : batch->columns()) { - if (array == nullptr || array->data() == nullptr) { - continue; - } - for (const auto& buf : array->data()->buffers) { - if (buf == nullptr) { - continue; - } - accumulated += buf->capacity(); - } - } - return accumulated; -} - arrow::Status Splitter::CacheRecordBatch(int32_t partition_id, bool reset_buffers) { if (partition_buffer_idx_base_[partition_id] > 0) { auto fixed_width_idx = 0; @@ -549,12 +560,18 @@ arrow::Status Splitter::CacheRecordBatch(int32_t partition_id, bool reset_buffer } } auto batch = arrow::RecordBatch::Make(schema_, num_rows, std::move(arrays)); - + int64_t raw_size = batch_nbytes(batch); + raw_partition_lengths_[partition_id] += raw_size; auto payload = std::make_shared(); - TIME_NANO_OR_RAISE(total_compress_time_, - arrow::ipc::GetRecordBatchPayload( - *batch, options_.ipc_write_options, payload.get())); - raw_partition_lengths_[partition_id] += batch_nbytes(batch); + if (num_rows <= options_.batch_compress_threshold) { + TIME_NANO_OR_RAISE(total_compress_time_, + arrow::ipc::GetRecordBatchPayload( + *batch, tiny_bach_write_options_, payload.get())); + } else { + TIME_NANO_OR_RAISE(total_compress_time_, + arrow::ipc::GetRecordBatchPayload( + *batch, options_.ipc_write_options, payload.get())); + } partition_cached_recordbatch_size_[partition_id] += payload->body_length; partition_cached_recordbatch_[partition_id].push_back(std::move(payload)); partition_buffer_idx_base_[partition_id] = 0; diff --git a/native-sql-engine/cpp/src/shuffle/splitter.h b/native-sql-engine/cpp/src/shuffle/splitter.h index 1bdd6aa50..dbf07aa87 100644 --- a/native-sql-engine/cpp/src/shuffle/splitter.h +++ b/native-sql-engine/cpp/src/shuffle/splitter.h @@ -209,6 +209,9 @@ class Splitter { std::shared_ptr schema_; SplitOptions options_; + // write options for tiny batches + arrow::ipc::IpcWriteOptions tiny_bach_write_options_; + int64_t total_bytes_written_ = 0; int64_t total_bytes_spilled_ = 0; int64_t total_write_time_ = 0; diff --git a/native-sql-engine/cpp/src/shuffle/type.h b/native-sql-engine/cpp/src/shuffle/type.h index f7d43ca69..e73974243 100644 --- a/native-sql-engine/cpp/src/shuffle/type.h +++ b/native-sql-engine/cpp/src/shuffle/type.h @@ -29,6 +29,7 @@ namespace shuffle { static constexpr int32_t kDefaultSplitterBufferSize = 4096; static constexpr int32_t kDefaultNumSubDirs = 64; +static constexpr int32_t kDefaultBatchCompressThreshold = 256; // This 0xFFFFFFFF value is the first 4 bytes of a valid IPC message static constexpr int32_t kIpcContinuationToken = -1; @@ -39,6 +40,7 @@ struct SplitOptions { int64_t offheap_per_task = 0; int32_t buffer_size = kDefaultSplitterBufferSize; int32_t num_sub_dirs = kDefaultNumSubDirs; + int32_t batch_compress_threshold = kDefaultBatchCompressThreshold; arrow::Compression::type compression_type = arrow::Compression::UNCOMPRESSED; bool prefer_spill = true; bool write_schema = true;