Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLASH-544]Add test cases for DM #262

Merged
merged 5 commits into from
Oct 22, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions dbms/src/Raft/RaftService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,16 +129,16 @@ RaftService::~RaftService()

grpc::Status RaftService::ApplyCommandBatch(grpc::ServerContext * grpc_context, CommandServerReaderWriter * stream)
{
RaftContext raft_contex(&db_context, grpc_context, stream);
RaftContext raft_context(&db_context, grpc_context, stream);

try
{
kvstore->report(raft_contex);
kvstore->report(raft_context);

enginepb::CommandRequestBatch cmds;
while (stream->Read(&cmds))
{
kvstore->onServiceCommand(std::move(cmds), raft_contex);
kvstore->onServiceCommand(std::move(cmds), raft_context);
}
}
catch (...)
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ void DeltaMergeStore::deleteRange(const Context & db_context, const DB::Settings
action.task = action.segment->createAppendTask(op_context, wbs, action.update);
}

// TODO: We need to do a delta merge after write a delete range, otherwise, the rows got deleted could never be acutally removed.
// TODO: We need to do a delta merge after write a delete range, otherwise, the rows got deleted could never be actually removed.

commitWrites(actions, wbs, dm_context, op_context);
}
Expand Down
311 changes: 311 additions & 0 deletions dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,32 @@ TEST_F(DeltaMergeStore_test, Create)
}
}

TEST_F(DeltaMergeStore_test, OpenWithExtraColumns)
{
const ColumnDefine col_str_define(2, "col2", std::make_shared<DataTypeString>());
const ColumnDefine col_i8_define(3, "i8", std::make_shared<DataTypeInt8>());
{
ColumnDefines table_column_defines = DMTestEnv::getDefaultColumns();
table_column_defines.emplace_back(col_str_define);
table_column_defines.emplace_back(col_i8_define);
store = reload(table_column_defines);
}

{
// check column structure
const auto & cols = store->getTableColumns();
ASSERT_EQ(cols.size(), 5UL);
const auto & str_col = cols[3];
ASSERT_EQ(str_col.name, col_str_define.name);
ASSERT_EQ(str_col.id, col_str_define.id);
ASSERT_TRUE(str_col.type->equals(*col_str_define.type));
const auto & i8_col = cols[4];
ASSERT_EQ(i8_col.name, col_i8_define.name);
ASSERT_EQ(i8_col.id, col_i8_define.id);
ASSERT_TRUE(i8_col.type->equals(*col_i8_define.type));
}
}

TEST_F(DeltaMergeStore_test, SimpleWriteRead)
{
const ColumnDefine col_str_define(2, "col2", std::make_shared<DataTypeString>());
Expand Down Expand Up @@ -233,6 +259,291 @@ TEST_F(DeltaMergeStore_test, SimpleWriteRead)
}
}

TEST_F(DeltaMergeStore_test, DeleteRead)
{
const size_t num_rows_write = 128;
{
// Create a block with sequential Int64 handle in range [0, 128)
Block block = DMTestEnv::prepareSimpleWriteBlock(0, 128, false);
store->write(*context, context->getSettingsRef(), block);
}
// Test Reading first
{
const auto & columns = store->getTableColumns();
BlockInputStreamPtr in = store->read(*context,
context->getSettingsRef(),
columns,
{HandleRange::newAll()},
/* num_streams= */ 1,
/* max_version= */ std::numeric_limits<UInt64>::max(),
/* expected_block_size= */ 1024)[0];
size_t num_rows_read = 0;
while (Block block = in->read())
{
num_rows_read += block.rows();
for (auto && iter : block)
{
auto c = iter.column;
for (Int64 i = 0; i < Int64(c->size()); ++i)
{
if (iter.name == "pk")
{
ASSERT_EQ(c->getInt(i), i);
}
}
}
}

ASSERT_EQ(num_rows_read, num_rows_write);
}
// Delete range [0, 64)
const size_t num_deleted_rows = 64;
{
HandleRange range(0, num_deleted_rows);
store->deleteRange(*context, context->getSettingsRef(), range);
}
// Read after deletion
{
const auto & columns = store->getTableColumns();
BlockInputStreamPtr in = store->read(*context,
context->getSettingsRef(),
columns,
{HandleRange::newAll()},
/* num_streams= */ 1,
/* max_version= */ std::numeric_limits<UInt64>::max(),
/* expected_block_size= */ 1024)[0];
size_t num_rows_read = 0;
while (Block block = in->read())
{
num_rows_read += block.rows();
for (auto && iter : block)
{
auto c = iter.column;
for (Int64 i = 0; i < Int64(c->size()); ++i)
{
if (iter.name == "pk")
{
// Range after deletion is [64, 128)
ASSERT_EQ(c->getInt(i), i + Int64(num_deleted_rows));
}
}
}
}

ASSERT_EQ(num_rows_read, num_rows_write - num_deleted_rows);
}
}

TEST_F(DeltaMergeStore_test, WriteMultipleBlock)
{
const size_t num_write_rows = 32;

// Test write multi blocks without overlap
{
Block block1 = DMTestEnv::prepareSimpleWriteBlock(0, 1 * num_write_rows, false);
Block block2 = DMTestEnv::prepareSimpleWriteBlock(1 * num_write_rows, 2 * num_write_rows, false);
Block block3 = DMTestEnv::prepareSimpleWriteBlock(2 * num_write_rows, 3 * num_write_rows, false);
store->write(*context, context->getSettingsRef(), block1);
store->write(*context, context->getSettingsRef(), block2);
store->write(*context, context->getSettingsRef(), block3);
}

{
const auto & columns = store->getTableColumns();
BlockInputStreamPtr in = store->read(*context,
context->getSettingsRef(),
columns,
{HandleRange::newAll()},
/* num_streams= */ 1,
/* max_version= */ std::numeric_limits<UInt64>::max(),
/* expected_block_size= */ 1024)[0];
size_t num_rows_read = 0;
while (Block block = in->read())
{
num_rows_read += block.rows();
for (auto && iter : block)
{
auto c = iter.column;
for (Int64 i = 0; i < Int64(c->size()); ++i)
{
if (iter.name == "pk")
{
ASSERT_EQ(c->getInt(i), i);
}
}
}
}

ASSERT_EQ(num_rows_read, 3 * num_write_rows);
}

store = reload();

// Test write multi blocks with overlap
{
UInt64 tso1 = 1;
UInt64 tso2 = 100;
Block block1 = DMTestEnv::prepareSimpleWriteBlock(0, 1 * num_write_rows, false, tso1);
Block block2 = DMTestEnv::prepareSimpleWriteBlock(1 * num_write_rows, 2 * num_write_rows, false, tso1);
Block block3 = DMTestEnv::prepareSimpleWriteBlock(num_write_rows / 2, num_write_rows / 2 + num_write_rows, false, tso2);
store->write(*context, context->getSettingsRef(), block1);
store->write(*context, context->getSettingsRef(), block2);
store->write(*context, context->getSettingsRef(), block3);
}
// Read without version
{
const auto & columns = store->getTableColumns();
BlockInputStreamPtr in = store->read(*context,
context->getSettingsRef(),
columns,
{HandleRange::newAll()},
/* num_streams= */ 1,
/* max_version= */ std::numeric_limits<UInt64>::max(),
/* expected_block_size= */ 1024)[0];
size_t num_rows_read = 0;
while (Block block = in->read())
{
num_rows_read += block.rows();
for (auto && iter : block)
{
auto c = iter.column;
for (Int64 i = 0; i < Int64(c->size()); ++i)
{
if (iter.name == "pk")
{
ASSERT_EQ(c->getInt(i), i);
}
}
}
}

ASSERT_EQ(num_rows_read, 3 * num_write_rows);
}
// Read with version
{
const auto & columns = store->getTableColumns();
BlockInputStreamPtr in = store->read(*context,
context->getSettingsRef(),
columns,
{HandleRange::newAll()},
/* num_streams= */ 1,
/* max_version= */ UInt64(1),
/* expected_block_size= */ 1024)[0];
size_t num_rows_read = 0;
while (Block block = in->read())
{
num_rows_read += block.rows();
for (auto && iter : block)
{
auto c = iter.column;
for (Int64 i = 0; i < Int64(c->size()); ++i)
{
if (iter.name == "pk")
{
ASSERT_EQ(c->getInt(i), i);
}
}
}
}

ASSERT_EQ(num_rows_read, 2 * num_write_rows);
}
}

// DEPRECATED:
// This test case strongly depends on implementation of `shouldSplit()` and `shouldMerge()`.
// The machanism of them may be changed one day. So uncomment the test if need.
TEST_F(DeltaMergeStore_test, DISABLED_WriteLargeBlock)
{
DB::Settings settings = context->getSettings();
// Mock dm_segment_rows for test
// if rows > 8 will split
// if left->rows < 2 && right->rows + left->rows < 4 will merge
settings.dm_segment_limit_rows = 4;

{
store->check(*context);
}

{
// Write 7 rows that would not trigger a split
Block block = DMTestEnv::prepareSimpleWriteBlock(0, 8, false);
store->write(*context, settings, block);
}

{
const auto & columns = store->getTableColumns();
BlockInputStreamPtr in = store->read(*context,
settings,
columns,
{HandleRange::newAll()},
/* num_streams= */ 1,
/* max_version= */ std::numeric_limits<UInt64>::max(),
/* expected_block_size= */ 1024)[0];
size_t num_rows_read = 0;
while (Block block = in->read())
{
num_rows_read += block.rows();
for (auto & iter : block)
{
auto c = iter.column;
for (Int64 i = 0; i < Int64(c->size()); i++)
{
if (iter.name == "pk")
{
EXPECT_EQ(c->getInt(i), i);
}
}
}
}
ASSERT_EQ(num_rows_read, 8UL);
}

{
// Write rows that would trigger a split
Block block = DMTestEnv::prepareSimpleWriteBlock(8, 9, false);
store->write(*context, settings, block);
}

// Now there is 2 segments
// segment1: 0, 1, 2, 3
// segment2: 4, 5, 6, 7, 8
{
const auto & columns = store->getTableColumns();
BlockInputStreamPtr in = store->read(*context,
settings,
columns,
{HandleRange::newAll()},
/* num_streams= */ 1,
/* max_version= */ std::numeric_limits<UInt64>::max(),
/* expected_block_size= */ 1024)[0];
size_t num_rows_read = 0;
// block_num represents index of current segment
int block_num = 0;
while (Block block = in->read())
{
num_rows_read += block.rows();
for (auto & iter : block)
{
auto c = iter.column;
for (Int64 i = 0; i < Int64(c->size()); i++)
{
if (iter.name == "pk" && block_num == 0)
{
EXPECT_EQ(c->getInt(i), i);
}
else if (iter.name == "pk" && block_num == 1)
{
EXPECT_EQ(c->getInt(i), i + 4);
}
}
}
block_num++;
}
ASSERT_EQ(num_rows_read, 9UL);
}
}

TEST_F(DeltaMergeStore_test, ReadWithSpecifyTso)
{
const UInt64 tso1 = 4;
Expand Down
Loading