Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

Commit

Permalink
[NSE-640] Disable compression for tiny payloads in shuffle (#641)
Browse files Browse the repository at this point in the history
Closes #640
  • Loading branch information
zhztheplayer authored Dec 22, 2021
1 parent ad4c4e4 commit 825c6d6
Show file tree
Hide file tree
Showing 10 changed files with 100 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,11 @@ public void loadCompressed(ArrowRecordBatch recordBatch) {
+ Collections2.toList(buffers).toString());
}
}

/**
* Direct router to VectorLoader#load()
*/
public void loadUncompressed(ArrowRecordBatch recordBatch) {
super.load(recordBatch);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.compression.NoCompressionCodec;
import org.apache.arrow.vector.dictionary.Dictionary;
import org.apache.arrow.vector.ipc.ArrowStreamReader;
import org.apache.arrow.vector.ipc.message.ArrowDictionaryBatch;
Expand All @@ -42,7 +43,11 @@
* ArrowRecordBatches.
*/
public class SchemaAwareArrowCompressedStreamReader extends ArrowStreamReader {
public static final String COMPRESS_TYPE_NONE = "none";

private final Schema originalSchema;

// fixme: the design can be improved to avoid relying on this stateful field
private String compressType;

public SchemaAwareArrowCompressedStreamReader(Schema originalSchema, InputStream in,
Expand All @@ -57,7 +62,7 @@ public SchemaAwareArrowCompressedStreamReader(InputStream in,
this(null, in, allocator);
}

public String GetCompressType() {
public String getCompressType() {
return compressType;
}

Expand Down Expand Up @@ -112,12 +117,17 @@ public boolean loadNextBatch() throws IOException {
}

ArrowRecordBatch batch = MessageSerializer.deserializeRecordBatch(result.getMessage(), bodyBuffer);
String codecName = CompressionType.name(batch.getBodyCompression().getCodec());

if (codecName.equals("LZ4_FRAME")) {
compressType = "lz4";
byte codec = batch.getBodyCompression().getCodec();
final String codecName;
if (codec == NoCompressionCodec.COMPRESSION_TYPE) {
compressType = COMPRESS_TYPE_NONE;
} else {
compressType = codecName;
codecName = CompressionType.name(codec);
if (codecName.equals("LZ4_FRAME")) {
compressType = "lz4";
} else {
compressType = codecName;
}
}

loadRecordBatch(batch);
Expand All @@ -138,9 +148,18 @@ public boolean loadNextBatch() throws IOException {
@Override
protected void loadRecordBatch(ArrowRecordBatch batch) {
try {
((CompressedVectorLoader) loader).loadCompressed(batch);
CompressedVectorLoader loader = (CompressedVectorLoader) this.loader;
if (isCurrentBatchCompressed()) {
loader.loadCompressed(batch);
} else {
loader.loadUncompressed(batch);
}
} finally {
batch.close();
}
}

public boolean isCurrentBatchCompressed() {
return !Objects.equals(getCompressType(), COMPRESS_TYPE_NONE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public long make(
long offheapPerTask,
int bufferSize,
String codec,
int batchCompressThreshold,
String dataFile,
int subDirsPerLocalDir,
String localDirs,
Expand All @@ -57,6 +58,7 @@ public long make(
offheapPerTask,
bufferSize,
codec,
batchCompressThreshold,
dataFile,
subDirsPerLocalDir,
localDirs,
Expand All @@ -73,6 +75,7 @@ public native long nativeMake(
long offheapPerTask,
int bufferSize,
String codec,
int batchCompressThreshold,
String dataFile,
int subDirsPerLocalDir,
String localDirs,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,10 @@ class GazellePluginConfig(conf: SQLConf) extends Logging {
val columnarShuffleUseCustomizedCompressionCodec: String =
conf.getConfString("spark.oap.sql.columnar.shuffle.customizedCompression.codec", "lz4")

val shuffleSplitDefaultSize: Int =
val columnarShuffleBatchCompressThreshold: Int =
conf.getConfString("spark.oap.sql.columnar.shuffle.batchCompressThreshold", "100").toInt

val shuffleSplitDefaultSize: Int =
conf
.getConfString("spark.oap.sql.columnar.shuffleSplitDefaultSize", "8192").toInt

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,9 @@ private class ArrowColumnarBatchSerializerInstance(
numRowsTotal += numRows

// jni call to decompress buffers
if (compressionEnabled) {
if (compressionEnabled &&
reader.asInstanceOf[SchemaAwareArrowCompressedStreamReader]
.isCurrentBatchCompressed) {
try {
decompressVectors()
} catch {
Expand Down Expand Up @@ -231,7 +233,7 @@ private class ArrowColumnarBatchSerializerInstance(

val serializedBatch = jniWrapper.decompress(
schemaHolderId,
reader.asInstanceOf[SchemaAwareArrowCompressedStreamReader].GetCompressType(),
reader.asInstanceOf[SchemaAwareArrowCompressedStreamReader].getCompressType,
root.getRowCount,
bufAddrs.toArray,
bufSizes.toArray,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ class ColumnarShuffleWriter[K, V](
} else {
"uncompressed"
}
private val batchCompressThreshold =
GazellePluginConfig.getConf.columnarShuffleBatchCompressThreshold;

private val preferSpill = GazellePluginConfig.getConf.columnarShufflePreferSpill

private val writeSchema = GazellePluginConfig.getConf.columnarShuffleWriteSchema
Expand Down Expand Up @@ -103,6 +106,7 @@ class ColumnarShuffleWriter[K, V](
offheapPerTask,
nativeBufferSize,
defaultCompressionCodec,
batchCompressThreshold,
dataTmp.getAbsolutePath,
blockManager.subDirsPerLocalDir,
localDirs,
Expand Down
7 changes: 4 additions & 3 deletions native-sql-engine/cpp/src/jni/jni_wrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1026,9 +1026,9 @@ JNIEXPORT jlong JNICALL
Java_com_intel_oap_vectorized_ShuffleSplitterJniWrapper_nativeMake(
JNIEnv* env, jobject, jstring partitioning_name_jstr, jint num_partitions,
jbyteArray schema_arr, jbyteArray expr_arr, jlong offheap_per_task, jint buffer_size,
jstring compression_type_jstr, jstring data_file_jstr, jint num_sub_dirs,
jstring local_dirs_jstr, jboolean prefer_spill, jlong memory_pool_id,
jboolean write_schema) {
jstring compression_type_jstr, jint batch_compress_threshold, jstring data_file_jstr,
jint num_sub_dirs, jstring local_dirs_jstr, jboolean prefer_spill,
jlong memory_pool_id, jboolean write_schema) {
JNI_METHOD_START
if (partitioning_name_jstr == NULL) {
JniThrow(std::string("Short partitioning name can't be null"));
Expand Down Expand Up @@ -1114,6 +1114,7 @@ Java_com_intel_oap_vectorized_ShuffleSplitterJniWrapper_nativeMake(
jlong attmpt_id = env->CallLongMethod(tc_obj, get_tsk_attmpt_mid);
splitOptions.task_attempt_id = (int64_t)attmpt_id;
}
splitOptions.batch_compress_threshold = batch_compress_threshold;

auto splitter =
JniGetOrThrow(Splitter::Make(partitioning_name, std::move(schema), num_partitions,
Expand Down
69 changes: 43 additions & 26 deletions native-sql-engine/cpp/src/shuffle/splitter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -361,15 +361,45 @@ arrow::Status Splitter::Init() {
arrow::Compression::UNCOMPRESSED));
}

// initialize tiny batch write options
tiny_bach_write_options_ = ipc_write_options;
ARROW_ASSIGN_OR_RAISE(
tiny_bach_write_options_.codec,
arrow::util::Codec::CreateInt32(arrow::Compression::UNCOMPRESSED));

return arrow::Status::OK();
}

int64_t batch_nbytes(const arrow::RecordBatch& batch) {
int64_t accumulated = 0L;
for (const auto& array : batch.columns()) {
if (array == nullptr || array->data() == nullptr) {
continue;
}
for (const auto& buf : array->data()->buffers) {
if (buf == nullptr) {
continue;
}
accumulated += buf->capacity();
}
}
return accumulated;
}

int64_t batch_nbytes(std::shared_ptr<arrow::RecordBatch> batch) {
if (batch == nullptr) {
return 0;
}
return batch_nbytes(*batch);
}

int64_t Splitter::CompressedSize(const arrow::RecordBatch& rb) {
auto payload = std::make_shared<arrow::ipc::IpcPayload>();
auto result =
arrow::Status result;
result =
arrow::ipc::GetRecordBatchPayload(rb, options_.ipc_write_options, payload.get());
if (result.ok()) {
return payload.get()->body_length;
return payload->body_length;
} else {
result.UnknownError("Failed to get the compressed size.");
return -1;
Expand Down Expand Up @@ -433,25 +463,6 @@ arrow::Status Splitter::Stop() {
return arrow::Status::OK();
}

int64_t batch_nbytes(std::shared_ptr<arrow::RecordBatch> batch) {
int64_t accumulated = 0L;
if (batch == nullptr) {
return accumulated;
}
for (const auto& array : batch->columns()) {
if (array == nullptr || array->data() == nullptr) {
continue;
}
for (const auto& buf : array->data()->buffers) {
if (buf == nullptr) {
continue;
}
accumulated += buf->capacity();
}
}
return accumulated;
}

arrow::Status Splitter::CacheRecordBatch(int32_t partition_id, bool reset_buffers) {
if (partition_buffer_idx_base_[partition_id] > 0) {
auto fixed_width_idx = 0;
Expand Down Expand Up @@ -549,12 +560,18 @@ arrow::Status Splitter::CacheRecordBatch(int32_t partition_id, bool reset_buffer
}
}
auto batch = arrow::RecordBatch::Make(schema_, num_rows, std::move(arrays));

int64_t raw_size = batch_nbytes(batch);
raw_partition_lengths_[partition_id] += raw_size;
auto payload = std::make_shared<arrow::ipc::IpcPayload>();
TIME_NANO_OR_RAISE(total_compress_time_,
arrow::ipc::GetRecordBatchPayload(
*batch, options_.ipc_write_options, payload.get()));
raw_partition_lengths_[partition_id] += batch_nbytes(batch);
if (num_rows <= options_.batch_compress_threshold) {
TIME_NANO_OR_RAISE(total_compress_time_,
arrow::ipc::GetRecordBatchPayload(
*batch, tiny_bach_write_options_, payload.get()));
} else {
TIME_NANO_OR_RAISE(total_compress_time_,
arrow::ipc::GetRecordBatchPayload(
*batch, options_.ipc_write_options, payload.get()));
}
partition_cached_recordbatch_size_[partition_id] += payload->body_length;
partition_cached_recordbatch_[partition_id].push_back(std::move(payload));
partition_buffer_idx_base_[partition_id] = 0;
Expand Down
3 changes: 3 additions & 0 deletions native-sql-engine/cpp/src/shuffle/splitter.h
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,9 @@ class Splitter {
std::shared_ptr<arrow::Schema> schema_;
SplitOptions options_;

// write options for tiny batches
arrow::ipc::IpcWriteOptions tiny_bach_write_options_;

int64_t total_bytes_written_ = 0;
int64_t total_bytes_spilled_ = 0;
int64_t total_write_time_ = 0;
Expand Down
2 changes: 2 additions & 0 deletions native-sql-engine/cpp/src/shuffle/type.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ namespace shuffle {

static constexpr int32_t kDefaultSplitterBufferSize = 4096;
static constexpr int32_t kDefaultNumSubDirs = 64;
static constexpr int32_t kDefaultBatchCompressThreshold = 256;

// This 0xFFFFFFFF value is the first 4 bytes of a valid IPC message
static constexpr int32_t kIpcContinuationToken = -1;
Expand All @@ -39,6 +40,7 @@ struct SplitOptions {
int64_t offheap_per_task = 0;
int32_t buffer_size = kDefaultSplitterBufferSize;
int32_t num_sub_dirs = kDefaultNumSubDirs;
int32_t batch_compress_threshold = kDefaultBatchCompressThreshold;
arrow::Compression::type compression_type = arrow::Compression::UNCOMPRESSED;
bool prefer_spill = true;
bool write_schema = true;
Expand Down

0 comments on commit 825c6d6

Please sign in to comment.