diff --git a/storage/tianmu/data/pack_str.cpp b/storage/tianmu/data/pack_str.cpp index 2eacafe6c..d6b636d88 100644 --- a/storage/tianmu/data/pack_str.cpp +++ b/storage/tianmu/data/pack_str.cpp @@ -205,7 +205,6 @@ void PackStr::UpdateValue(size_t locationInPack, const Value &v) { dpn_->max_i = -1; } - // ASSERT(v.HasValue(), col_share_->DataFile() + " locationInPack: " + std::to_string(locationInPack)); UnsetNull(locationInPack); dpn_->numOfNulls--; auto &str = v.GetString(); @@ -227,10 +226,13 @@ void PackStr::UpdateValue(size_t locationInPack, const Value &v) { SetPtrSize(locationInPack, nullptr, 0); SetNull(locationInPack); dpn_->numOfNulls++; + if(dpn_->NullOnly()){ + data_.sum_len = 0; + } } else { // update non-null to another nonull auto vsize = GetValueBinary(locationInPack).size(); - ASSERT(data_.sum_len >= vsize); + ASSERT(data_.sum_len >= vsize, col_share_->DataFile() + ", data_.sum_len:" + std::to_string(data_.sum_len) + ", vsize:"+std::to_string(vsize)); data_.sum_len -= vsize; auto &str = v.GetString(); if (str.size() <= vsize) { @@ -260,6 +262,9 @@ void PackStr::DeleteByRow(size_t locationInPack) { SetPtrSize(locationInPack, nullptr, 0); SetNull(locationInPack); dpn_->numOfNulls++; + if(dpn_->NullOnly()){ + data_.sum_len = 0; + } } SetDeleted(locationInPack); dpn_->numOfDeleted++; @@ -381,7 +386,7 @@ std::pair PackStr::Compress() { reinterpret_cast(alloc(comp_len_buf_size / 4 * sizeof(uint), mm::BLOCK_TYPE::BLOCK_TEMPORARY)), *this); uint tmp_comp_len_buf_size = comp_len_buf_size - 8; compress::NumCompressor nc; - CprsErr res = nc.Compress(reinterpret_cast(comp_len_buf.get() + sizeof(ushort)), tmp_comp_len_buf_size, + CprsErr res = nc.Compress(reinterpret_cast(comp_len_buf.get() + 2), tmp_comp_len_buf_size, nc_buffer.get(), onn, maxv); if (res != CprsErr::CPRS_SUCCESS) { throw common::InternalException("Compression of lengths of values failed for column " + @@ -409,7 +414,7 @@ std::pair PackStr::Compress() { mm::MMGuard comp_buf(reinterpret_cast(alloc(dlen, mm::BLOCK_TYPE::BLOCK_TEMPORARY)), *this); - if (data_.sum_len) { + if (data_.sum_len > 0) { int objs = (dpn_->numOfRecords - dpn_->numOfNulls) - zlo; mm::MMGuard tmp_index( @@ -559,7 +564,7 @@ void PackStr::Save() { SaveUncompressed(&f); } - ASSERT(f.Tell() == off_t(dpn_->dataAddress + dpn_->dataLength), + ASSERT(f.Tell() == off_t(dpn_->dataAddress + dpn_->dataLength), col_share_->DataFile() + ", " + std::to_string(dpn_->dataAddress) + ":" + std::to_string(dpn_->dataLength) + "/" + std::to_string(f.Tell())); dpn_->synced = true; } @@ -568,8 +573,10 @@ void PackStr::SaveUncompressed(system::Stream *f) { f->WriteExact(nulls_ptr_.get(), bitmap_size_); f->WriteExact(deletes_ptr_.get(), bitmap_size_); f->WriteExact(data_.lens, (data_.len_mode * (1 << col_share_->pss))); - if (data_.v.empty()) + if (data_.v.empty() || dpn_->NullOnly()){ return; + } + std::unique_ptr buff(new char[data_.sum_len]); char *ptr = buff.get(); @@ -586,7 +593,7 @@ void PackStr::SaveUncompressed(system::Stream *f) { } void PackStr::LoadCompressed(system::Stream *f) { - ASSERT(IsModeCompressionApplied()); + ASSERT(IsModeCompressionApplied() , "path:" + col_share_->DataFile()); auto compressed_buf = alloc_ptr(dpn_->dataLength + 1, mm::BLOCK_TYPE::BLOCK_COMPRESSED); f->ReadExact(compressed_buf.get(), dpn_->dataLength); @@ -619,7 +626,7 @@ void PackStr::LoadCompressed(system::Stream *f) { } if (dpn_->numOfDeleted > 0) { - uint delete_buf_size = 0; + ushort delete_buf_size = 0; delete_buf_size = (*reinterpret_cast(cur_buf)); if (delete_buf_size > bitmap_size_) throw common::DatabaseException("Unexpected bytes found in data pack."); @@ -628,7 +635,7 @@ void PackStr::LoadCompressed(system::Stream *f) { else { compress::BitstreamCompressor bsc; CprsErr res = bsc.Decompress(reinterpret_cast(deletes_ptr_.get()), delete_buf_size, - reinterpret_cast(cur_buf) + 2, dpn_->numOfRecords, dpn_->numOfDeleted); + reinterpret_cast(cur_buf) + sizeof(ushort), dpn_->numOfRecords, dpn_->numOfDeleted); if (res != CprsErr::CPRS_SUCCESS) { throw common::DatabaseException("Decompression of deletes failed for column " + std::to_string(pc_column(GetCoordinate().co.pack) + 1) + ", pack " + @@ -646,7 +653,7 @@ void PackStr::LoadCompressed(system::Stream *f) { compress::NumCompressor nc; mm::MMGuard cn_ptr((uint *)alloc((1 << col_share_->pss) * sizeof(uint), mm::BLOCK_TYPE::BLOCK_TEMPORARY), *this); - CprsErr res = nc.Decompress(cn_ptr.get(), reinterpret_cast(cur_buf + 8), comp_len_buf_size - 8, + CprsErr res = nc.Decompress(cn_ptr.get(), reinterpret_cast(cur_buf + sizeof(uint32_t)*2), comp_len_buf_size - 8, dpn_->numOfRecords - dpn_->numOfNulls, maxv); if (res != CprsErr::CPRS_SUCCESS) { std::stringstream msg_buf; @@ -714,7 +721,7 @@ void PackStr::LoadCompressed(system::Stream *f) { } void PackStr::LoadCompressedTrie(system::Stream *f) { - ASSERT(IsModeCompressionApplied()); + ASSERT(IsModeCompressionApplied() , "path:" + col_share_->DataFile()); compressed_data_.reset(nullptr); compressed_data_ = alloc_ptr(dpn_->dataLength + 1, mm::BLOCK_TYPE::BLOCK_COMPRESSED); f->ReadExact(compressed_data_.get(), dpn_->dataLength);