Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor and clean up the code that reads a blob from a file #6093

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 93 additions & 55 deletions utilities/blob_db/blob_db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1273,98 +1273,150 @@ Status BlobDBImpl::GetBlobValue(const Slice& key, const Slice& index_entry,
value->PinSelf(blob_index.value());
return Status::OK();
}
if (blob_index.size() == 0) {

CompressionType compression_type = kNoCompression;
s = GetRawBlobFromFile(key, blob_index.file_number(), blob_index.offset(),
blob_index.size(), value, &compression_type);
if (!s.ok()) {
return s;
}

if (compression_type != kNoCompression) {
BlockContents contents;
auto cfh = static_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily());

{
StopWatch decompression_sw(env_, statistics_,
BLOB_DB_DECOMPRESSION_MICROS);
UncompressionContext context(compression_type);
UncompressionInfo info(context, UncompressionDict::GetEmptyDict(),
compression_type);
s = UncompressBlockContentsForCompressionType(
info, value->data(), value->size(), &contents,
kBlockBasedTableVersionFormat, *(cfh->cfd()->ioptions()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's outside the scope to this PR but UncompressBlockContentsForCompressionType() seems to only need ImmutableCFOptions for Env and Statistics, which should be available here locally too. The complicated casting for cfh should be able to be avoided.

if (!s.ok()) {
if (debug_level_ >= 2) {
ROCKS_LOG_ERROR(
db_options_.info_log,
"Uncompression error during blob read from file: %" PRIu64
" blob_offset: %" PRIu64 " blob_size: %" PRIu64
" key: %s status: '%s'",
blob_index.file_number(), blob_index.offset(), blob_index.size(),
key.ToString(/* output_hex */ true).c_str(),
s.ToString().c_str());
}

return Status::Corruption("Unable to uncompress blob.");
}
}

value->PinSelf(contents.data);
}

return Status::OK();
}

Status BlobDBImpl::GetRawBlobFromFile(const Slice& key, uint64_t file_number,
uint64_t offset, uint64_t size,
PinnableSlice* value,
CompressionType* compression_type) {
assert(value);
assert(compression_type);
assert(*compression_type == kNoCompression);

if (!size) {
value->PinSelf("");
return Status::OK();
}

// offset has to have certain min, as we will read CRC
// later from the Blob Header, which needs to be also a
// valid offset.
if (blob_index.offset() <
if (offset <
(BlobLogHeader::kSize + BlobLogRecord::kHeaderSize + key.size())) {
if (debug_level_ >= 2) {
ROCKS_LOG_ERROR(db_options_.info_log,
"Invalid blob index file_number: %" PRIu64
" blob_offset: %" PRIu64 " blob_size: %" PRIu64
" key: %s",
blob_index.file_number(), blob_index.offset(),
blob_index.size(), key.data());
file_number, offset, size,
key.ToString(/* output_hex */ true).c_str());
}

return Status::NotFound("Invalid blob offset");
}

std::shared_ptr<BlobFile> bfile;
std::shared_ptr<BlobFile> blob_file;

{
ReadLock rl(&mutex_);
auto hitr = blob_files_.find(blob_index.file_number());
auto it = blob_files_.find(file_number);

// file was deleted
if (hitr == blob_files_.end()) {
if (it == blob_files_.end()) {
return Status::NotFound("Blob Not Found as blob file missing");
}

bfile = hitr->second;
blob_file = it->second;
}

if (blob_index.size() == 0 && value != nullptr) {
value->PinSelf("");
return Status::OK();
}
*compression_type = blob_file->compression();

// takes locks when called
std::shared_ptr<RandomAccessFileReader> reader;
s = GetBlobFileReader(bfile, &reader);
Status s = GetBlobFileReader(blob_file, &reader);
if (!s.ok()) {
return s;
}

assert(blob_index.offset() > key.size() + sizeof(uint32_t));
uint64_t record_offset = blob_index.offset() - key.size() - sizeof(uint32_t);
uint64_t record_size = sizeof(uint32_t) + key.size() + blob_index.size();
assert(offset >= key.size() + sizeof(uint32_t));
const uint64_t record_offset = offset - key.size() - sizeof(uint32_t);
const uint64_t record_size = sizeof(uint32_t) + key.size() + size;

// Allocate the buffer. This is safe in C++11
std::string buffer_str(static_cast<size_t>(record_size), static_cast<char>(0));
char* buffer = &buffer_str[0];

// A partial blob record contain checksum, key and value.
Slice blob_record;

{
StopWatch read_sw(env_, statistics_, BLOB_DB_BLOB_FILE_READ_MICROS);
s = reader->Read(record_offset, static_cast<size_t>(record_size), &blob_record, buffer);
RecordTick(statistics_, BLOB_DB_BLOB_FILE_BYTES_READ, blob_record.size());
}

if (!s.ok()) {
ROCKS_LOG_DEBUG(db_options_.info_log,
"Failed to read blob from blob file %" PRIu64
", blob_offset: %" PRIu64 ", blob_size: %" PRIu64
", key_size: %" ROCKSDB_PRIszt ", status: '%s'",
bfile->BlobFileNumber(), blob_index.offset(),
blob_index.size(), key.size(), s.ToString().c_str());
ROCKS_LOG_DEBUG(
db_options_.info_log,
"Failed to read blob from blob file %" PRIu64 ", blob_offset: %" PRIu64
", blob_size: %" PRIu64 ", key_size: %" ROCKSDB_PRIszt ", status: '%s'",
file_number, offset, size, key.size(), s.ToString().c_str());
return s;
}

if (blob_record.size() != record_size) {
ROCKS_LOG_DEBUG(
db_options_.info_log,
"Failed to read blob from blob file %" PRIu64 ", blob_offset: %" PRIu64
", blob_size: %" PRIu64 ", key_size: %" ROCKSDB_PRIszt
", read %" ROCKSDB_PRIszt " bytes, expected %" PRIu64 " bytes",
bfile->BlobFileNumber(), blob_index.offset(), blob_index.size(),
key.size(), blob_record.size(), record_size);
file_number, offset, size, key.size(), blob_record.size(), record_size);

return Status::Corruption("Failed to retrieve blob from blob index.");
}

Slice crc_slice(blob_record.data(), sizeof(uint32_t));
Slice blob_value(blob_record.data() + sizeof(uint32_t) + key.size(),
static_cast<size_t>(blob_index.size()));
uint32_t crc_exp;
static_cast<size_t>(size));

uint32_t crc_exp = 0;
if (!GetFixed32(&crc_slice, &crc_exp)) {
ROCKS_LOG_DEBUG(db_options_.info_log,
"Unable to decode CRC from blob file %" PRIu64
", blob_offset: %" PRIu64 ", blob_size: %" PRIu64
", key size: %" ROCKSDB_PRIszt ", status: '%s'",
bfile->BlobFileNumber(), blob_index.offset(),
blob_index.size(), key.size(), s.ToString().c_str());
ROCKS_LOG_DEBUG(
db_options_.info_log,
"Unable to decode CRC from blob file %" PRIu64 ", blob_offset: %" PRIu64
", blob_size: %" PRIu64 ", key size: %" ROCKSDB_PRIszt ", status: '%s'",
file_number, offset, size, key.size(), s.ToString().c_str());
return Status::Corruption("Unable to decode checksum.");
}

Expand All @@ -1373,34 +1425,20 @@ Status BlobDBImpl::GetBlobValue(const Slice& key, const Slice& index_entry,
crc = crc32c::Mask(crc); // Adjust for storage
if (crc != crc_exp) {
if (debug_level_ >= 2) {
ROCKS_LOG_ERROR(db_options_.info_log,
"Blob crc mismatch file: %s blob_offset: %" PRIu64
" blob_size: %" PRIu64 " key: %s status: '%s'",
bfile->PathName().c_str(), blob_index.offset(),
blob_index.size(), key.data(), s.ToString().c_str());
ROCKS_LOG_ERROR(
db_options_.info_log,
"Blob crc mismatch file: %" PRIu64 " blob_offset: %" PRIu64
" blob_size: %" PRIu64 " key: %s status: '%s'",
file_number, offset, size,
key.ToString(/* output_hex */ true).c_str(), s.ToString().c_str());
}

return Status::Corruption("Corruption. Blob CRC mismatch");
}

if (bfile->compression() == kNoCompression) {
value->PinSelf(blob_value);
} else {
BlockContents contents;
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily());
{
StopWatch decompression_sw(env_, statistics_,
BLOB_DB_DECOMPRESSION_MICROS);
UncompressionContext context(bfile->compression());
UncompressionInfo info(context, UncompressionDict::GetEmptyDict(),
bfile->compression());
s = UncompressBlockContentsForCompressionType(
info, blob_value.data(), blob_value.size(), &contents,
kBlockBasedTableVersionFormat, *(cfh->cfd()->ioptions()));
}
value->PinSelf(contents.data);
}
value->PinSelf(blob_value);

return s;
return Status::OK();
}

Status BlobDBImpl::Get(const ReadOptions& read_options,
Expand Down
5 changes: 5 additions & 0 deletions utilities/blob_db/blob_db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,11 @@ class BlobDBImpl : public BlobDB {
Status GetBlobValue(const Slice& key, const Slice& index_entry,
PinnableSlice* value, uint64_t* expiration = nullptr);

Status GetRawBlobFromFile(const Slice& key, uint64_t file_number,
uint64_t offset, uint64_t size,
PinnableSlice* value,
CompressionType* compression_type);

Slice GetCompressedSlice(const Slice& raw,
std::string* compression_output) const;

Expand Down