Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add mutation support for StorageMemory #15127

Merged
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 68 additions & 28 deletions src/Storages/StorageMemory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@

#include <DataStreams/IBlockInputStream.h>

#include <Storages/StorageMemory.h>
#include <Interpreters/MutationsInterpreter.h>
#include <Storages/StorageFactory.h>
#include <Storages/StorageMemory.h>

#include <IO/WriteHelpers.h>
#include <Processors/Sources/SourceWithProgress.h>
Expand All @@ -27,7 +28,7 @@ class MemorySource : public SourceWithProgress
/// We don't use synchronisation here, because elements in range [first, last] won't be modified.
MemorySource(
Names column_names_,
BlocksList::iterator first_,
BlocksList::const_iterator first_,
size_t num_blocks_,
const StorageMemory & storage,
const StorageMetadataPtr & metadata_snapshot)
Expand All @@ -41,11 +42,7 @@ class MemorySource : public SourceWithProgress
/// If called, will initialize the number of blocks at first read.
/// It allows to read data which was inserted into memory table AFTER Storage::read was called.
/// This hack is needed for global subqueries.
void delayInitialization(BlocksList * data_, std::mutex * mutex_)
{
data = data_;
mutex = mutex_;
}
void delayInitialization(std::shared_ptr<const BlocksList> data_) { data = data_; }

String getName() const override { return "Memory"; }

Expand All @@ -54,13 +51,11 @@ class MemorySource : public SourceWithProgress
{
if (data)
{
std::lock_guard guard(*mutex);
current_it = data->begin();
num_blocks = data->size();
is_finished = num_blocks == 0;

data = nullptr;
mutex = nullptr;
}

if (is_finished)
Expand Down Expand Up @@ -89,13 +84,12 @@ class MemorySource : public SourceWithProgress
}
private:
Names column_names;
BlocksList::iterator current_it;
BlocksList::const_iterator current_it;
size_t current_block_idx = 0;
size_t num_blocks;
bool is_finished = false;

BlocksList * data = nullptr;
std::mutex * mutex = nullptr;
std::shared_ptr<const BlocksList> data = nullptr;
};


Expand All @@ -114,8 +108,9 @@ class MemoryBlockOutputStream : public IBlockOutputStream
void write(const Block & block) override
{
metadata_snapshot->check(block, true);
std::lock_guard lock(storage.mutex);
storage.data.push_back(block);
auto new_data = std::make_unique<BlocksList>(*(storage.data.get()));
new_data->push_back(block);
storage.data.set(std::move(new_data));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is going to be a problem due to a race condition here. Because we do not employ any CAS technique here, data can be lost when concurrent INSERTs each add a block to its own copy of data.

Copy link
Collaborator Author

@ucasfl ucasfl Oct 7, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, in order to solve this problem, we still need a table-level lock? @Akazz

}
private:
StorageMemory & storage;
Expand All @@ -124,7 +119,7 @@ class MemoryBlockOutputStream : public IBlockOutputStream


StorageMemory::StorageMemory(const StorageID & table_id_, ColumnsDescription columns_description_, ConstraintsDescription constraints_)
: IStorage(table_id_)
: IStorage(table_id_), data(std::make_unique<const BlocksList>())
{
StorageInMemoryMetadata storage_metadata;
storage_metadata.setColumns(std::move(columns_description_));
Expand All @@ -144,7 +139,7 @@ Pipe StorageMemory::read(
{
metadata_snapshot->check(column_names, getVirtuals(), getStorageID());

std::lock_guard lock(mutex);
auto current_data = data.get();

if (delay_read_for_global_subqueries)
{
Expand All @@ -156,19 +151,19 @@ Pipe StorageMemory::read(
/// set for IN or hash table for JOIN, which can't be done concurrently.
/// Since no other manipulation with data is done, multiple sources shouldn't give any profit.

auto source = std::make_shared<MemorySource>(column_names, data.begin(), data.size(), *this, metadata_snapshot);
source->delayInitialization(&data, &mutex);
auto source = std::make_shared<MemorySource>(column_names, current_data->begin(), current_data->size(), *this, metadata_snapshot);
source->delayInitialization(current_data);
return Pipe(std::move(source));
}

size_t size = data.size();
size_t size = current_data->size();

if (num_streams > size)
num_streams = size;

Pipes pipes;

BlocksList::iterator it = data.begin();
auto it = current_data->begin();

size_t offset = 0;
for (size_t stream = 0; stream < num_streams; ++stream)
Expand Down Expand Up @@ -199,31 +194,76 @@ BlockOutputStreamPtr StorageMemory::write(const ASTPtr & /*query*/, const Storag

void StorageMemory::drop()
{
std::lock_guard lock(mutex);
data.clear();
data.set(std::make_unique<BlocksList>());
}

static inline void updateBlockData(Block & old_block, const Block & new_block)
{
for (const auto & it : new_block)
{
auto col_name = it.name;
auto & col_with_type_name = old_block.getByName(col_name);
col_with_type_name.column = it.column;
}
}

void StorageMemory::mutate(const MutationCommands & commands, const Context & context)
{
auto metadata_snapshot = getInMemoryMetadataPtr();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a test checking that ALTER ADD/REMOVE column is not supported right now (that is Ok).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok.

auto storage = getStorageID();
auto storage_ptr = DatabaseCatalog::instance().getTable(storage, context);
auto interpreter = std::make_unique<MutationsInterpreter>(storage_ptr, metadata_snapshot, commands, context, true);
auto in = interpreter->execute();

in->readPrefix();
BlocksList out;
Block block;
while ((block = in->read()))
{
out.push_back(block);
}
in->readSuffix();

// all column affected
if (interpreter->isAffectingAllColumns())
{
data.set(std::make_unique<BlocksList>(out));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's move calculation of rows/bytes and assignment out of the branch to avoid code duplication.

}
else
{
auto new_data = std::make_unique<BlocksList>(*(data.get()));
auto data_it = new_data->begin();
auto out_it = out.begin();
while (data_it != new_data->end() && out_it != out.end())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check that data has the same number of blocks?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clarify in comments that deletion of whole blocks cannot happen here.

{
updateBlockData(*data_it, *out_it);
++data_it;
++out_it;
}
data.set(std::move(new_data));
}
}

void StorageMemory::truncate(
const ASTPtr &, const StorageMetadataPtr &, const Context &, TableExclusiveLockHolder &)
{
std::lock_guard lock(mutex);
data.clear();
data.set(std::make_unique<BlocksList>());
}

std::optional<UInt64> StorageMemory::totalRows() const
{
UInt64 rows = 0;
std::lock_guard lock(mutex);
for (const auto & buffer : data)
auto current_data = data.get();
for (const auto & buffer : *current_data)
rows += buffer.rows();
return rows;
}

std::optional<UInt64> StorageMemory::totalBytes() const
{
UInt64 bytes = 0;
std::lock_guard lock(mutex);
for (const auto & buffer : data)
auto current_data = data.get();
for (const auto & buffer : *current_data)
bytes += buffer.allocatedBytes();
return bytes;
}
Expand Down
11 changes: 6 additions & 5 deletions src/Storages/StorageMemory.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <Storages/IStorage.h>
#include <DataStreams/IBlockOutputStream.h>

#include <Common/MultiVersion.h>

namespace DB
{
Expand All @@ -26,7 +27,7 @@ friend struct ext::shared_ptr_helper<StorageMemory>;
public:
String getName() const override { return "Memory"; }

size_t getSize() const { return data.size(); }
size_t getSize() const { return data.get()->size(); }

Pipe read(
const Names & column_names,
Expand All @@ -43,6 +44,8 @@ friend struct ext::shared_ptr_helper<StorageMemory>;

void drop() override;

void mutate(const MutationCommands & commands, const Context & context) override;

void truncate(const ASTPtr &, const StorageMetadataPtr &, const Context &, TableExclusiveLockHolder &) override;

std::optional<UInt64> totalRows() const override;
Expand Down Expand Up @@ -86,10 +89,8 @@ friend struct ext::shared_ptr_helper<StorageMemory>;
void delayReadForGlobalSubqueries() { delay_read_for_global_subqueries = true; }

private:
/// The data itself. `list` - so that when inserted to the end, the existing iterators are not invalidated.
BlocksList data;

mutable std::mutex mutex;
/// MultiVersion data storage, so that we can copy the list of blocks to readers.
MultiVersion<BlocksList> data;

bool delay_read_for_global_subqueries = false;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
1 1
2 2
3 3
4 4
5 5
100 1
2 2
3 3
4 4
5 5
2 2
3 3
4 4
5 5
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
DROP TABLE IF EXISTS defaults;
CREATE TABLE defaults
(
n Int32,
s String
)ENGINE = Memory();

INSERT INTO defaults VALUES(1, '1') (2, '2') (3, '3') (4, '4') (5, '5');

SELECT * FROM defaults;

ALTER TABLE defaults UPDATE n = 100 WHERE s = '1';

SELECT * FROM defaults;

ALTER TABLE defaults DELETE WHERE n = 100;

SELECT * FROM defaults;

DROP TABLE defaults;
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
0
1
2
3
4
5
6
7
8
9
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
DROP TABLE IF EXISTS defaults;
CREATE TABLE defaults
(
n Int32
)ENGINE = Memory();

INSERT INTO defaults SELECT * FROM numbers(10);

SELECT * FROM defaults;

TRUNCATE defaults;

SELECT * FROM defaults;

DROP TABLE defaults;