Skip to content

Commit

Permalink
[FLASH-544]Add test cases for DM (pingcap#262)
Browse files Browse the repository at this point in the history
* fix typo

Signed-off-by: leiysky <leiysky@outlook.com>

* Add test cases

* resolve conflict

* disable WriteLargeBlock test

* make temp vector a stack variable
  • Loading branch information
leiysky authored and zanmato1984 committed Nov 1, 2019
1 parent 5463764 commit aa3ea0b
Show file tree
Hide file tree
Showing 4 changed files with 721 additions and 6 deletions.
6 changes: 3 additions & 3 deletions dbms/src/Raft/RaftService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,16 +129,16 @@ RaftService::~RaftService()

grpc::Status RaftService::ApplyCommandBatch(grpc::ServerContext * grpc_context, CommandServerReaderWriter * stream)
{
RaftContext raft_contex(&db_context, grpc_context, stream);
RaftContext raft_context(&db_context, grpc_context, stream);

try
{
kvstore->report(raft_contex);
kvstore->report(raft_context);

enginepb::CommandRequestBatch cmds;
while (stream->Read(&cmds))
{
kvstore->onServiceCommand(std::move(cmds), raft_contex);
kvstore->onServiceCommand(std::move(cmds), raft_context);
}
}
catch (...)
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ void DeltaMergeStore::deleteRange(const Context & db_context, const DB::Settings
action.task = action.segment->createAppendTask(op_context, wbs, action.update);
}

// TODO: We need to do a delta merge after write a delete range, otherwise, the rows got deleted could never be acutally removed.
// TODO: We need to do a delta merge after write a delete range, otherwise, the rows got deleted could never be actually removed.

commitWrites(actions, wbs, dm_context, op_context);
}
Expand Down
311 changes: 311 additions & 0 deletions dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,32 @@ TEST_F(DeltaMergeStore_test, Create)
}
}

TEST_F(DeltaMergeStore_test, OpenWithExtraColumns)
{
const ColumnDefine col_str_define(2, "col2", std::make_shared<DataTypeString>());
const ColumnDefine col_i8_define(3, "i8", std::make_shared<DataTypeInt8>());
{
ColumnDefines table_column_defines = DMTestEnv::getDefaultColumns();
table_column_defines.emplace_back(col_str_define);
table_column_defines.emplace_back(col_i8_define);
store = reload(table_column_defines);
}

{
// check column structure
const auto & cols = store->getTableColumns();
ASSERT_EQ(cols.size(), 5UL);
const auto & str_col = cols[3];
ASSERT_EQ(str_col.name, col_str_define.name);
ASSERT_EQ(str_col.id, col_str_define.id);
ASSERT_TRUE(str_col.type->equals(*col_str_define.type));
const auto & i8_col = cols[4];
ASSERT_EQ(i8_col.name, col_i8_define.name);
ASSERT_EQ(i8_col.id, col_i8_define.id);
ASSERT_TRUE(i8_col.type->equals(*col_i8_define.type));
}
}

TEST_F(DeltaMergeStore_test, SimpleWriteRead)
{
const ColumnDefine col_str_define(2, "col2", std::make_shared<DataTypeString>());
Expand Down Expand Up @@ -233,6 +259,291 @@ TEST_F(DeltaMergeStore_test, SimpleWriteRead)
}
}

TEST_F(DeltaMergeStore_test, DeleteRead)
{
const size_t num_rows_write = 128;
{
// Create a block with sequential Int64 handle in range [0, 128)
Block block = DMTestEnv::prepareSimpleWriteBlock(0, 128, false);
store->write(*context, context->getSettingsRef(), block);
}
// Test Reading first
{
const auto & columns = store->getTableColumns();
BlockInputStreamPtr in = store->read(*context,
context->getSettingsRef(),
columns,
{HandleRange::newAll()},
/* num_streams= */ 1,
/* max_version= */ std::numeric_limits<UInt64>::max(),
/* expected_block_size= */ 1024)[0];
size_t num_rows_read = 0;
while (Block block = in->read())
{
num_rows_read += block.rows();
for (auto && iter : block)
{
auto c = iter.column;
for (Int64 i = 0; i < Int64(c->size()); ++i)
{
if (iter.name == "pk")
{
ASSERT_EQ(c->getInt(i), i);
}
}
}
}

ASSERT_EQ(num_rows_read, num_rows_write);
}
// Delete range [0, 64)
const size_t num_deleted_rows = 64;
{
HandleRange range(0, num_deleted_rows);
store->deleteRange(*context, context->getSettingsRef(), range);
}
// Read after deletion
{
const auto & columns = store->getTableColumns();
BlockInputStreamPtr in = store->read(*context,
context->getSettingsRef(),
columns,
{HandleRange::newAll()},
/* num_streams= */ 1,
/* max_version= */ std::numeric_limits<UInt64>::max(),
/* expected_block_size= */ 1024)[0];
size_t num_rows_read = 0;
while (Block block = in->read())
{
num_rows_read += block.rows();
for (auto && iter : block)
{
auto c = iter.column;
for (Int64 i = 0; i < Int64(c->size()); ++i)
{
if (iter.name == "pk")
{
// Range after deletion is [64, 128)
ASSERT_EQ(c->getInt(i), i + Int64(num_deleted_rows));
}
}
}
}

ASSERT_EQ(num_rows_read, num_rows_write - num_deleted_rows);
}
}

TEST_F(DeltaMergeStore_test, WriteMultipleBlock)
{
const size_t num_write_rows = 32;

// Test write multi blocks without overlap
{
Block block1 = DMTestEnv::prepareSimpleWriteBlock(0, 1 * num_write_rows, false);
Block block2 = DMTestEnv::prepareSimpleWriteBlock(1 * num_write_rows, 2 * num_write_rows, false);
Block block3 = DMTestEnv::prepareSimpleWriteBlock(2 * num_write_rows, 3 * num_write_rows, false);
store->write(*context, context->getSettingsRef(), block1);
store->write(*context, context->getSettingsRef(), block2);
store->write(*context, context->getSettingsRef(), block3);
}

{
const auto & columns = store->getTableColumns();
BlockInputStreamPtr in = store->read(*context,
context->getSettingsRef(),
columns,
{HandleRange::newAll()},
/* num_streams= */ 1,
/* max_version= */ std::numeric_limits<UInt64>::max(),
/* expected_block_size= */ 1024)[0];
size_t num_rows_read = 0;
while (Block block = in->read())
{
num_rows_read += block.rows();
for (auto && iter : block)
{
auto c = iter.column;
for (Int64 i = 0; i < Int64(c->size()); ++i)
{
if (iter.name == "pk")
{
ASSERT_EQ(c->getInt(i), i);
}
}
}
}

ASSERT_EQ(num_rows_read, 3 * num_write_rows);
}

store = reload();

// Test write multi blocks with overlap
{
UInt64 tso1 = 1;
UInt64 tso2 = 100;
Block block1 = DMTestEnv::prepareSimpleWriteBlock(0, 1 * num_write_rows, false, tso1);
Block block2 = DMTestEnv::prepareSimpleWriteBlock(1 * num_write_rows, 2 * num_write_rows, false, tso1);
Block block3 = DMTestEnv::prepareSimpleWriteBlock(num_write_rows / 2, num_write_rows / 2 + num_write_rows, false, tso2);
store->write(*context, context->getSettingsRef(), block1);
store->write(*context, context->getSettingsRef(), block2);
store->write(*context, context->getSettingsRef(), block3);
}
// Read without version
{
const auto & columns = store->getTableColumns();
BlockInputStreamPtr in = store->read(*context,
context->getSettingsRef(),
columns,
{HandleRange::newAll()},
/* num_streams= */ 1,
/* max_version= */ std::numeric_limits<UInt64>::max(),
/* expected_block_size= */ 1024)[0];
size_t num_rows_read = 0;
while (Block block = in->read())
{
num_rows_read += block.rows();
for (auto && iter : block)
{
auto c = iter.column;
for (Int64 i = 0; i < Int64(c->size()); ++i)
{
if (iter.name == "pk")
{
ASSERT_EQ(c->getInt(i), i);
}
}
}
}

ASSERT_EQ(num_rows_read, 3 * num_write_rows);
}
// Read with version
{
const auto & columns = store->getTableColumns();
BlockInputStreamPtr in = store->read(*context,
context->getSettingsRef(),
columns,
{HandleRange::newAll()},
/* num_streams= */ 1,
/* max_version= */ UInt64(1),
/* expected_block_size= */ 1024)[0];
size_t num_rows_read = 0;
while (Block block = in->read())
{
num_rows_read += block.rows();
for (auto && iter : block)
{
auto c = iter.column;
for (Int64 i = 0; i < Int64(c->size()); ++i)
{
if (iter.name == "pk")
{
ASSERT_EQ(c->getInt(i), i);
}
}
}
}

ASSERT_EQ(num_rows_read, 2 * num_write_rows);
}
}

// DEPRECATED:
// This test case strongly depends on implementation of `shouldSplit()` and `shouldMerge()`.
// The machanism of them may be changed one day. So uncomment the test if need.
TEST_F(DeltaMergeStore_test, DISABLED_WriteLargeBlock)
{
DB::Settings settings = context->getSettings();
// Mock dm_segment_rows for test
// if rows > 8 will split
// if left->rows < 2 && right->rows + left->rows < 4 will merge
settings.dm_segment_limit_rows = 4;

{
store->check(*context);
}

{
// Write 7 rows that would not trigger a split
Block block = DMTestEnv::prepareSimpleWriteBlock(0, 8, false);
store->write(*context, settings, block);
}

{
const auto & columns = store->getTableColumns();
BlockInputStreamPtr in = store->read(*context,
settings,
columns,
{HandleRange::newAll()},
/* num_streams= */ 1,
/* max_version= */ std::numeric_limits<UInt64>::max(),
/* expected_block_size= */ 1024)[0];
size_t num_rows_read = 0;
while (Block block = in->read())
{
num_rows_read += block.rows();
for (auto & iter : block)
{
auto c = iter.column;
for (Int64 i = 0; i < Int64(c->size()); i++)
{
if (iter.name == "pk")
{
EXPECT_EQ(c->getInt(i), i);
}
}
}
}
ASSERT_EQ(num_rows_read, 8UL);
}

{
// Write rows that would trigger a split
Block block = DMTestEnv::prepareSimpleWriteBlock(8, 9, false);
store->write(*context, settings, block);
}

// Now there is 2 segments
// segment1: 0, 1, 2, 3
// segment2: 4, 5, 6, 7, 8
{
const auto & columns = store->getTableColumns();
BlockInputStreamPtr in = store->read(*context,
settings,
columns,
{HandleRange::newAll()},
/* num_streams= */ 1,
/* max_version= */ std::numeric_limits<UInt64>::max(),
/* expected_block_size= */ 1024)[0];
size_t num_rows_read = 0;
// block_num represents index of current segment
int block_num = 0;
while (Block block = in->read())
{
num_rows_read += block.rows();
for (auto & iter : block)
{
auto c = iter.column;
for (Int64 i = 0; i < Int64(c->size()); i++)
{
if (iter.name == "pk" && block_num == 0)
{
EXPECT_EQ(c->getInt(i), i);
}
else if (iter.name == "pk" && block_num == 1)
{
EXPECT_EQ(c->getInt(i), i + 4);
}
}
}
block_num++;
}
ASSERT_EQ(num_rows_read, 9UL);
}
}

TEST_F(DeltaMergeStore_test, ReadWithSpecifyTso)
{
const UInt64 tso1 = 4;
Expand Down
Loading

0 comments on commit aa3ea0b

Please sign in to comment.