Skip to content

Commit

Permalink
this commit includes many fixs
Browse files Browse the repository at this point in the history
1) add superblock checksum
2) add hot compression method support(snappy, quicklz)
3) add db_bench --rowformat=COMPRESS-METHOD support
4) fix valgrind check warnnings
5) change all crc check to layout.cpp
  • Loading branch information
BohuTANG committed Jun 28, 2013
1 parent 8dc636c commit 1b82ffd
Show file tree
Hide file tree
Showing 30 changed files with 3,688 additions and 183 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,6 @@
#CMake libraries
bin/*
build/*

#Others
tags
15 changes: 3 additions & 12 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ ENDIF("${isSystemDir}" STREQUAL "-1")
add_definitions("-g -O2 -Wall")

include_directories(
${PROJECT_SOURCE_DIR}/thirdparty/snappy
${PROJECT_SOURCE_DIR}/thirdparty/quicklz
${PROJECT_SOURCE_DIR}/include
${PROJECT_SOURCE_DIR}/src
)
Expand All @@ -39,17 +41,6 @@ ${PROJECT_SOURCE_DIR}/lib

# check dependencies

# Check Snappy
include(${CMAKE_SOURCE_DIR}/cmake/FindSnappy.cmake)
if (SNAPPY_FOUND)
message(STATUS "Find snappy include:${SNAPPY_INCLUDE_DIR} libs:${SNAPPY_LIBRARIES}")
add_definitions("-DHAS_SNAPPY")
include_directories(${SNAPPY_INCLUDE_DIR})
link_libraries(${SNAPPY_LIBRARIES})
else (SNAPPY_FOUND)
message(WARNING "Cannot find snappy, compression is disabled in cascadb")
endif (SNAPPY_FOUND)

# Check Libaio
include(${CMAKE_SOURCE_DIR}/cmake/FindLibaio.cmake)
if (LIBAIO_FOUND)
Expand All @@ -68,7 +59,7 @@ if (${CMAKE_SYSTEM_NAME} MATCHES "Linux")
endif (${CMAKE_SYSTEM_NAME} MATCHES "Linux")

set(CASCADB_LIBS
pthread m rt dl z
pthread m rt dl z snappy quicklz
)

# Source files
Expand Down
47 changes: 33 additions & 14 deletions bench/db_bench_cascadb.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ static bool FLAGS_use_existing_db = false;
// Use the db with the following name.
static const char* FLAGS_db = NULL;

static enum Compress FLAGS_method = kSnappyCompress;


// Helper for quickly generating random values.
class RandomGenerator {
Expand Down Expand Up @@ -301,33 +303,45 @@ class Benchmark {
fprintf(stdout, "RawSize: %.1f MB (estimated)\n",
((static_cast<int64_t>(kKeySize + FLAGS_value_size) * num_)
/ 1048576.0));
#ifdef HAS_SNAPPY
fprintf(stdout, "FileSize: %.1f MB (estimated)\n",
if (FLAGS_method != kNoCompress)
fprintf(stdout, "filesize: %.1f MB (estimated)\n",
(((kKeySize + FLAGS_value_size * FLAGS_compression_ratio) * num_)
/ 1048576.0));
#else
fprintf(stdout, "FileSize: %.1f MB (estimated, compression disabled)\n",
else
fprintf(stdout, "filesize: %.1f MB (estimated, compression disabled)\n",
(((kKeySize + FLAGS_value_size) * num_)
/ 1048576.0));
#endif
PrintWarnings();
fprintf(stdout, "------------------------------------------------\n");
}

void PrintWarnings() {
if (FLAGS_method != kNoCompress) {
switch (FLAGS_method) {
case kSnappyCompress:
fprintf(stdout,
"Compression: Snappy\n");
break;
case kQuicklzCompress:
fprintf(stdout,
"Compression: Quicklz\n");
break;
default:break;
}
} else
fprintf(stdout,
"WARNING: Snappy&Quicklz compression is disabled\n");
#if defined(__GNUC__) && !defined(__OPTIMIZE__)
fprintf(stdout,
"WARNING: Optimization is disabled: benchmarks unnecessarily slow\n"
);
fprintf(stdout,
"WARNING: Optimization is disabled: benchmarks unnecessarily slow\n"
);
#endif

#ifndef NDEBUG
fprintf(stdout,
"WARNING: Assertions are enabled; benchmarks unnecessarily slow\n");
#endif
#ifndef HAS_SNAPPY
fprintf(stdout,
"WARNING: Snappy compression is disabled\n");
#endif

#ifndef HAS_LIBAIO
fprintf(stdout,
"WARNING: Linux AIO is disabled, Posix AIO (simulate AIO with user threads) is used instead\n");
Expand Down Expand Up @@ -536,9 +550,7 @@ class Benchmark {
Options opts;
opts.dir = directory_;
opts.comparator = comparator_;
#ifdef HAS_SNAPPY
opts.compress = kSnappyCompress;
#endif
if (FLAGS_cache_size) {
opts.cache_limit = FLAGS_cache_size;
}
Expand Down Expand Up @@ -661,6 +673,13 @@ int main(int argc, char** argv)
FLAGS_value_size = n;
} else if(strncmp(argv[i], "--db=", 5) == 0) {
FLAGS_db = argv[i] + 5;
} else if(strncmp(argv[i], "--rowformat=", 9) == 0) {
if (strcmp(argv[i] + strlen("--rowformat="), "snappy") == 0)
FLAGS_method = kSnappyCompress;
else if (strcmp(argv[i] + strlen("--rowformat="), "quicklz") == 0)
FLAGS_method = kQuicklzCompress;
else if (strcmp(argv[i] + strlen("--rowformat="), "no") == 0)
FLAGS_method = kNoCompress;
} else {
cerr << "Invalid flag '" << argv[i] << "'" << endl;
exit(1);
Expand Down
43 changes: 0 additions & 43 deletions cmake/FindSnappy.cmake

This file was deleted.

5 changes: 3 additions & 2 deletions include/cascadb/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@ class Directory;
class Comparator;

enum Compress {
kNoCompress, // No compression
kSnappyCompress // Google's Snappy, used in leveldb
kNoCompress, // No compression
kSnappyCompress, // Google's Snappy, used in leveldb
kQuicklzCompress // Quicklz 1.5 final
};

class Options {
Expand Down
10 changes: 10 additions & 0 deletions src/serialize/block.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@ class BlockReader
{
}

const char* start()
{
return block_->start();
}

char *addr()
{
return (char *)block_->start() + pos();
Expand Down Expand Up @@ -140,6 +145,11 @@ class BlockWriter
{
}

const char *start()
{
return block_->start();
}

char *addr()
{
return (char *)block_->start() + pos();
Expand Down
71 changes: 48 additions & 23 deletions src/serialize/layout.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,23 +124,23 @@ Block* Layout::read(bid_t bid, bool skeleton_only)
uint16_t actual_crc;

if (skeleton_only) {
expected_crc = meta.skeleton_crc;
actual_crc = crc16(block->buffer().data(), meta.skeleton_size);
expected_crc = meta.skeleton_crc;
actual_crc = crc16(block->buffer().data(), meta.skeleton_size);
} else {
expected_crc = meta.crc;
// here buffer().size is aligned, not meta.total_size
actual_crc = crc16(block->buffer().data(), block->buffer().size());
expected_crc = meta.crc;
// here buffer().size is aligned, not meta.total_size
actual_crc = crc16(block->buffer().data(), block->buffer().size());
}

if (expected_crc != actual_crc && expected_crc != 0) {
LOG_ERROR("crc error, bid " << hex << bid << dec
<< ", offset " << meta.offset
<< ", expected_crc " << expected_crc
<< ", actual_crc " << actual_crc
<< ", skeleton_only " << skeleton_only);
LOG_ERROR("crc error, bid " << hex << bid << dec
<< ", offset " << meta.offset
<< ", expected_crc " << expected_crc
<< ", actual_crc " << actual_crc
<< ", skeleton_only " << skeleton_only);

destroy(block);
return NULL;
destroy(block);
return NULL;
}

LOG_TRACE("read block ok, bid " << hex << bid << dec
Expand All @@ -149,7 +149,7 @@ Block* Layout::read(bid_t bid, bool skeleton_only)
return block;
}

Block* Layout::read(bid_t bid, uint32_t offset, uint32_t size)
Block* Layout::read(bid_t bid, uint32_t offset, uint32_t size, uint16_t subblock_crc)
{
BlockMeta meta;
if (!get_block_meta(bid, meta)) {
Expand All @@ -162,10 +162,22 @@ Block* Layout::read(bid_t bid, uint32_t offset, uint32_t size)

Block *block;
if (!read_block(meta, offset, size, &block)) {
LOG_ERROR("read block error, bid " << hex << bid << dec
LOG_ERROR("read subblock error, bid " << hex << bid << dec
<< ", offset " << (meta.offset + offset)
<< ", size " << size
<< ", crc " << meta.crc);
<< ", subblock crc " << subblock_crc);
return NULL;
}

uint16_t actual_crc = crc16(block->start(), size);
if (actual_crc != subblock_crc) {
LOG_ERROR("one msgbuf crc error " << " bid " << bid
<< ", expected_crc " << subblock_crc
<< ", actual_crc " << actual_crc
<< ", offset " << offset
<< ", length " << size);

destroy(block);
return NULL;
}

Expand Down Expand Up @@ -226,7 +238,7 @@ void Layout::handle_async_read(AsyncReadReq *req, AIOStatus status)
req->cb->exec(true);
} else {
LOG_ERROR("read block crc" << hex << req->bid << dec << req->meta.crc << " error");
free_buffer(req->buffer);
destroy(*req->block);
req->cb->exec(false);
}
} else {
Expand Down Expand Up @@ -345,6 +357,7 @@ bool Layout::load_superblock()
LOG_ERROR("alloc_aligned_buffer error, size " << SUPER_BLOCK_SIZE);
return false;
}
memset((void*)buffer.data(), 0, buffer.size());

if (!read_data(0, buffer)) {
LOG_ERROR("try to read 1st superblock error");
Expand Down Expand Up @@ -372,6 +385,7 @@ bool Layout::load_2nd_superblock()
LOG_ERROR("alloc_aligned_buffer error, size " << SUPER_BLOCK_SIZE);
return false;
}
memset((void*)buffer.data(), 0, buffer.size());

if (!read_data(SUPER_BLOCK_SIZE, buffer)) {
LOG_ERROR("try to read 2nd superblock error");
Expand Down Expand Up @@ -399,6 +413,7 @@ bool Layout::flush_superblock()
LOG_ERROR("alloc_aligned_buffer fail, size " << SUPER_BLOCK_SIZE);
return false;
}
memset((void*)buffer.data(), 0, buffer.size());

Block block(buffer, 0, 0);
BlockWriter writer(&block);
Expand Down Expand Up @@ -525,13 +540,20 @@ bool Layout::read_superblock(BlockReader& reader)
if (!read_block_meta(superblock_->index_block_meta, reader)) return false;
}

if (!reader.readUInt64(&(superblock_->magic_number1))) return false;
if (superblock_->magic_number0 != SUPER_BLOCK_MAGIC_NUM ||
superblock_->magic_number0 != superblock_->magic_number1) {
LOG_ERROR("read superblock " "magic_num0:"<< superblock_->magic_number0
<< ", magic_num1:" << superblock_->magic_number1);
return false;
uint16_t expected_crc, actual_crc;
uint32_t super_size = reader.pos();

if (!reader.readUInt16(&expected_crc)) return false;
actual_crc= crc16(reader.start(), super_size);
if (actual_crc != expected_crc) {
LOG_ERROR("superblock crc error"
<< ", expected_crc " << expected_crc
<< ", actual_crc " << actual_crc
<< ", length " << super_size);

return false;
}

return true;
}

Expand All @@ -548,7 +570,9 @@ bool Layout::write_superblock(BlockWriter& writer)
if (!writer.writeBool(false)) return false;
}

if (!writer.writeUInt64(superblock_->magic_number1)) return false;
uint16_t crc = crc16(writer.start(), writer.pos());
if (!writer.writeUInt16(crc)) return false;

return true;
}

Expand Down Expand Up @@ -961,6 +985,7 @@ Block* Layout::create(size_t size)
{
Slice buffer = alloc_aligned_buffer(size);
if (buffer.size()) {
memset((void*)buffer.data(), 0, buffer.size());
return new Block(buffer, 0, 0);
} else {
return NULL;
Expand Down
2 changes: 1 addition & 1 deletion src/serialize/layout.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class Layout {
// Blocking read
// Read from a relative offset from the beginning of block
// and get n bytes, the area should not out of bounds
Block* read(bid_t bid, uint32_t offset, uint32_t size);
Block* read(bid_t bid, uint32_t offset, uint32_t size, uint16_t subblock_crc);

// Initialize a read operation
void async_read(bid_t bid, Block** block, Callback *cb);
Expand Down
Loading

0 comments on commit 1b82ffd

Please sign in to comment.