diff --git a/utilities/blob_db/blob_db_impl.cc b/utilities/blob_db/blob_db_impl.cc index 3403c345c9f..9c4a8a4c5ad 100644 --- a/utilities/blob_db/blob_db_impl.cc +++ b/utilities/blob_db/blob_db_impl.cc @@ -1251,15 +1251,18 @@ bool BlobDBImpl::SetSnapshotIfNeeded(ReadOptions* read_options) { Status BlobDBImpl::GetBlobValue(const Slice& key, const Slice& index_entry, PinnableSlice* value, uint64_t* expiration) { - assert(value != nullptr); + assert(value); + BlobIndex blob_index; Status s = blob_index.DecodeFrom(index_entry); if (!s.ok()) { return s; } + if (blob_index.HasTTL() && blob_index.expiration() <= EpochNow()) { return Status::NotFound("Key expired"); } + if (expiration != nullptr) { if (blob_index.HasTTL()) { *expiration = blob_index.expiration(); @@ -1267,13 +1270,65 @@ Status BlobDBImpl::GetBlobValue(const Slice& key, const Slice& index_entry, *expiration = kNoExpiration; } } + if (blob_index.IsInlined()) { // TODO(yiwu): If index_entry is a PinnableSlice, we can also pin the same // memory buffer to avoid extra copy. value->PinSelf(blob_index.value()); return Status::OK(); } - if (blob_index.size() == 0) { + + CompressionType compression_type = kNoCompression; + s = GetRawBlobFromFile(key, blob_index.file_number(), blob_index.offset(), + blob_index.size(), value, &compression_type); + if (!s.ok()) { + return s; + } + + if (compression_type != kNoCompression) { + BlockContents contents; + auto cfh = static_cast(DefaultColumnFamily()); + + { + StopWatch decompression_sw(env_, statistics_, + BLOB_DB_DECOMPRESSION_MICROS); + UncompressionContext context(compression_type); + UncompressionInfo info(context, UncompressionDict::GetEmptyDict(), + compression_type); + s = UncompressBlockContentsForCompressionType( + info, value->data(), value->size(), &contents, + kBlockBasedTableVersionFormat, *(cfh->cfd()->ioptions())); + } + + if (!s.ok()) { + if (debug_level_ >= 2) { + ROCKS_LOG_ERROR( + db_options_.info_log, + "Uncompression error during blob read from file: %" PRIu64 + " blob_offset: %" PRIu64 " blob_size: %" PRIu64 + " key: %s status: '%s'", + blob_index.file_number(), blob_index.offset(), blob_index.size(), + key.ToString(/* output_hex */ true).c_str(), s.ToString().c_str()); + } + + return Status::Corruption("Unable to uncompress blob."); + } + + value->PinSelf(contents.data); + } + + return Status::OK(); +} + +Status BlobDBImpl::GetRawBlobFromFile(const Slice& key, uint64_t file_number, + uint64_t offset, uint64_t size, + PinnableSlice* value, + CompressionType* compression_type) { + assert(value); + assert(compression_type); + assert(*compression_type == kNoCompression); + + if (!size) { value->PinSelf(""); return Status::OK(); } @@ -1281,47 +1336,46 @@ Status BlobDBImpl::GetBlobValue(const Slice& key, const Slice& index_entry, // offset has to have certain min, as we will read CRC // later from the Blob Header, which needs to be also a // valid offset. - if (blob_index.offset() < + if (offset < (BlobLogHeader::kSize + BlobLogRecord::kHeaderSize + key.size())) { if (debug_level_ >= 2) { ROCKS_LOG_ERROR(db_options_.info_log, "Invalid blob index file_number: %" PRIu64 " blob_offset: %" PRIu64 " blob_size: %" PRIu64 " key: %s", - blob_index.file_number(), blob_index.offset(), - blob_index.size(), key.data()); + file_number, offset, size, + key.ToString(/* output_hex */ true).c_str()); } + return Status::NotFound("Invalid blob offset"); } - std::shared_ptr bfile; + std::shared_ptr blob_file; + { ReadLock rl(&mutex_); - auto hitr = blob_files_.find(blob_index.file_number()); + auto it = blob_files_.find(file_number); // file was deleted - if (hitr == blob_files_.end()) { + if (it == blob_files_.end()) { return Status::NotFound("Blob Not Found as blob file missing"); } - bfile = hitr->second; + blob_file = it->second; } - if (blob_index.size() == 0 && value != nullptr) { - value->PinSelf(""); - return Status::OK(); - } + *compression_type = blob_file->compression(); // takes locks when called std::shared_ptr reader; - s = GetBlobFileReader(bfile, &reader); + Status s = GetBlobFileReader(blob_file, &reader); if (!s.ok()) { return s; } - assert(blob_index.offset() > key.size() + sizeof(uint32_t)); - uint64_t record_offset = blob_index.offset() - key.size() - sizeof(uint32_t); - uint64_t record_size = sizeof(uint32_t) + key.size() + blob_index.size(); + assert(offset >= key.size() + sizeof(uint32_t)); + const uint64_t record_offset = offset - key.size() - sizeof(uint32_t); + const uint64_t record_size = sizeof(uint32_t) + key.size() + size; // Allocate the buffer. This is safe in C++11 std::string buffer_str(static_cast(record_size), static_cast(0)); @@ -1329,42 +1383,44 @@ Status BlobDBImpl::GetBlobValue(const Slice& key, const Slice& index_entry, // A partial blob record contain checksum, key and value. Slice blob_record; + { StopWatch read_sw(env_, statistics_, BLOB_DB_BLOB_FILE_READ_MICROS); s = reader->Read(record_offset, static_cast(record_size), &blob_record, buffer); RecordTick(statistics_, BLOB_DB_BLOB_FILE_BYTES_READ, blob_record.size()); } + if (!s.ok()) { - ROCKS_LOG_DEBUG(db_options_.info_log, - "Failed to read blob from blob file %" PRIu64 - ", blob_offset: %" PRIu64 ", blob_size: %" PRIu64 - ", key_size: %" ROCKSDB_PRIszt ", status: '%s'", - bfile->BlobFileNumber(), blob_index.offset(), - blob_index.size(), key.size(), s.ToString().c_str()); + ROCKS_LOG_DEBUG( + db_options_.info_log, + "Failed to read blob from blob file %" PRIu64 ", blob_offset: %" PRIu64 + ", blob_size: %" PRIu64 ", key_size: %" ROCKSDB_PRIszt ", status: '%s'", + file_number, offset, size, key.size(), s.ToString().c_str()); return s; } + if (blob_record.size() != record_size) { ROCKS_LOG_DEBUG( db_options_.info_log, "Failed to read blob from blob file %" PRIu64 ", blob_offset: %" PRIu64 ", blob_size: %" PRIu64 ", key_size: %" ROCKSDB_PRIszt ", read %" ROCKSDB_PRIszt " bytes, expected %" PRIu64 " bytes", - bfile->BlobFileNumber(), blob_index.offset(), blob_index.size(), - key.size(), blob_record.size(), record_size); + file_number, offset, size, key.size(), blob_record.size(), record_size); return Status::Corruption("Failed to retrieve blob from blob index."); } + Slice crc_slice(blob_record.data(), sizeof(uint32_t)); Slice blob_value(blob_record.data() + sizeof(uint32_t) + key.size(), - static_cast(blob_index.size())); - uint32_t crc_exp; + static_cast(size)); + + uint32_t crc_exp = 0; if (!GetFixed32(&crc_slice, &crc_exp)) { - ROCKS_LOG_DEBUG(db_options_.info_log, - "Unable to decode CRC from blob file %" PRIu64 - ", blob_offset: %" PRIu64 ", blob_size: %" PRIu64 - ", key size: %" ROCKSDB_PRIszt ", status: '%s'", - bfile->BlobFileNumber(), blob_index.offset(), - blob_index.size(), key.size(), s.ToString().c_str()); + ROCKS_LOG_DEBUG( + db_options_.info_log, + "Unable to decode CRC from blob file %" PRIu64 ", blob_offset: %" PRIu64 + ", blob_size: %" PRIu64 ", key size: %" ROCKSDB_PRIszt ", status: '%s'", + file_number, offset, size, key.size(), s.ToString().c_str()); return Status::Corruption("Unable to decode checksum."); } @@ -1373,34 +1429,20 @@ Status BlobDBImpl::GetBlobValue(const Slice& key, const Slice& index_entry, crc = crc32c::Mask(crc); // Adjust for storage if (crc != crc_exp) { if (debug_level_ >= 2) { - ROCKS_LOG_ERROR(db_options_.info_log, - "Blob crc mismatch file: %s blob_offset: %" PRIu64 - " blob_size: %" PRIu64 " key: %s status: '%s'", - bfile->PathName().c_str(), blob_index.offset(), - blob_index.size(), key.data(), s.ToString().c_str()); + ROCKS_LOG_ERROR( + db_options_.info_log, + "Blob crc mismatch file: %" PRIu64 " blob_offset: %" PRIu64 + " blob_size: %" PRIu64 " key: %s status: '%s'", + file_number, offset, size, + key.ToString(/* output_hex */ true).c_str(), s.ToString().c_str()); } + return Status::Corruption("Corruption. Blob CRC mismatch"); } - if (bfile->compression() == kNoCompression) { - value->PinSelf(blob_value); - } else { - BlockContents contents; - auto cfh = reinterpret_cast(DefaultColumnFamily()); - { - StopWatch decompression_sw(env_, statistics_, - BLOB_DB_DECOMPRESSION_MICROS); - UncompressionContext context(bfile->compression()); - UncompressionInfo info(context, UncompressionDict::GetEmptyDict(), - bfile->compression()); - s = UncompressBlockContentsForCompressionType( - info, blob_value.data(), blob_value.size(), &contents, - kBlockBasedTableVersionFormat, *(cfh->cfd()->ioptions())); - } - value->PinSelf(contents.data); - } + value->PinSelf(blob_value); - return s; + return Status::OK(); } Status BlobDBImpl::Get(const ReadOptions& read_options, diff --git a/utilities/blob_db/blob_db_impl.h b/utilities/blob_db/blob_db_impl.h index 7e9d572048e..18aed63f82d 100644 --- a/utilities/blob_db/blob_db_impl.h +++ b/utilities/blob_db/blob_db_impl.h @@ -237,6 +237,11 @@ class BlobDBImpl : public BlobDB { Status GetBlobValue(const Slice& key, const Slice& index_entry, PinnableSlice* value, uint64_t* expiration = nullptr); + Status GetRawBlobFromFile(const Slice& key, uint64_t file_number, + uint64_t offset, uint64_t size, + PinnableSlice* value, + CompressionType* compression_type); + Slice GetCompressedSlice(const Slice& raw, std::string* compression_output) const;