Skip to content

Commit

Permalink
Merge pull request #15127 from ucasFL/add-mutation-for-storagememory
Browse files Browse the repository at this point in the history
Add mutation support for StorageMemory
  • Loading branch information
alexey-milovidov authored Nov 26, 2020
2 parents 9291bbb + ee80ee7 commit 1cd09fa
Show file tree
Hide file tree
Showing 8 changed files with 175 additions and 29 deletions.
125 changes: 99 additions & 26 deletions src/Storages/StorageMemory.cpp
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
#include <cassert>
#include <Common/Exception.h>

#include <DataStreams/IBlockInputStream.h>

#include <Storages/StorageMemory.h>
#include <Interpreters/MutationsInterpreter.h>
#include <Storages/StorageFactory.h>
#include <Storages/StorageMemory.h>

#include <IO/WriteHelpers.h>
#include <Processors/Sources/SourceWithProgress.h>
Expand All @@ -21,7 +23,7 @@ namespace ErrorCodes

class MemorySource : public SourceWithProgress
{
using InitializerFunc = std::function<void(BlocksList::const_iterator &, size_t &)>;
using InitializerFunc = std::function<void(BlocksList::const_iterator &, size_t &, std::shared_ptr<const BlocksList> &)>;
public:
/// Blocks are stored in std::list which may be appended in another thread.
/// We use pointer to the beginning of the list and its current size.
Expand All @@ -34,11 +36,13 @@ class MemorySource : public SourceWithProgress
size_t num_blocks_,
const StorageMemory & storage,
const StorageMetadataPtr & metadata_snapshot,
InitializerFunc initializer_func_ = [](BlocksList::const_iterator &, size_t &) {})
std::shared_ptr<const BlocksList> data_,
InitializerFunc initializer_func_ = [](BlocksList::const_iterator &, size_t &, std::shared_ptr<const BlocksList> &) {})
: SourceWithProgress(metadata_snapshot->getSampleBlockForColumns(column_names_, storage.getVirtuals(), storage.getStorageID()))
, column_names(std::move(column_names_))
, current_it(first_)
, num_blocks(num_blocks_)
, data(data_)
, initializer_func(std::move(initializer_func_))
{
}
Expand All @@ -50,7 +54,7 @@ class MemorySource : public SourceWithProgress
{
if (!postponed_init_done)
{
initializer_func(current_it, num_blocks);
initializer_func(current_it, num_blocks, data);
postponed_init_done = true;
}

Expand All @@ -77,6 +81,7 @@ class MemorySource : public SourceWithProgress
size_t num_blocks;
size_t current_block_idx = 0;

std::shared_ptr<const BlocksList> data;
bool postponed_init_done = false;
InitializerFunc initializer_func;
};
Expand All @@ -102,7 +107,9 @@ class MemoryBlockOutputStream : public IBlockOutputStream
metadata_snapshot->check(block, true);
{
std::lock_guard lock(storage.mutex);
storage.data.push_back(block);
auto new_data = std::make_unique<BlocksList>(*(storage.data.get()));
new_data->push_back(block);
storage.data.set(std::move(new_data));

storage.total_size_bytes.fetch_add(size_bytes_diff, std::memory_order_relaxed);
storage.total_size_rows.fetch_add(size_rows_diff, std::memory_order_relaxed);
Expand All @@ -116,7 +123,7 @@ class MemoryBlockOutputStream : public IBlockOutputStream


StorageMemory::StorageMemory(const StorageID & table_id_, ColumnsDescription columns_description_, ConstraintsDescription constraints_)
: IStorage(table_id_)
: IStorage(table_id_), data(std::make_unique<const BlocksList>())
{
StorageInMemoryMetadata storage_metadata;
storage_metadata.setColumns(std::move(columns_description_));
Expand Down Expand Up @@ -146,30 +153,31 @@ Pipe StorageMemory::read(
/// set for IN or hash table for JOIN, which can't be done concurrently.
/// Since no other manipulation with data is done, multiple sources shouldn't give any profit.

return Pipe(
std::make_shared<MemorySource>(
column_names, data.end(), 0, *this, metadata_snapshot,
/// This hack is needed for global subqueries.
/// It allows to set up this Source for read AFTER Storage::read() has been called and just before actual reading
[this](BlocksList::const_iterator & current_it, size_t & num_blocks)
{
std::lock_guard guard(mutex);
current_it = data.begin();
num_blocks = data.size();
}
));
return Pipe(std::make_shared<MemorySource>(
column_names,
data.get()->end(),
0,
*this,
metadata_snapshot,
data.get(),
[this](BlocksList::const_iterator & current_it, size_t & num_blocks, std::shared_ptr<const BlocksList> & current_data)
{
current_data = data.get();
current_it = current_data->begin();
num_blocks = current_data->size();
}));
}

std::lock_guard lock(mutex);
auto current_data = data.get();

size_t size = data.size();
size_t size = current_data->size();

if (num_streams > size)
num_streams = size;

Pipes pipes;

BlocksList::const_iterator it = data.begin();
BlocksList::const_iterator it = current_data->begin();

size_t offset = 0;
for (size_t stream = 0; stream < num_streams; ++stream)
Expand All @@ -179,7 +187,7 @@ Pipe StorageMemory::read(

assert(num_blocks > 0);

pipes.emplace_back(std::make_shared<MemorySource>(column_names, it, num_blocks, *this, metadata_snapshot));
pipes.emplace_back(std::make_shared<MemorySource>(column_names, it, num_blocks, *this, metadata_snapshot, current_data));

while (offset < next_offset)
{
Expand All @@ -200,18 +208,83 @@ BlockOutputStreamPtr StorageMemory::write(const ASTPtr & /*query*/, const Storag

void StorageMemory::drop()
{
std::lock_guard lock(mutex);
data.clear();
data.set(std::make_unique<BlocksList>());
total_size_bytes.store(0, std::memory_order_relaxed);
total_size_rows.store(0, std::memory_order_relaxed);
}

static inline void updateBlockData(Block & old_block, const Block & new_block)
{
for (const auto & it : new_block)
{
auto col_name = it.name;
auto & col_with_type_name = old_block.getByName(col_name);
col_with_type_name.column = it.column;
}
}

void StorageMemory::mutate(const MutationCommands & commands, const Context & context)
{
std::lock_guard lock(mutex);
auto metadata_snapshot = getInMemoryMetadataPtr();
auto storage = getStorageID();
auto storage_ptr = DatabaseCatalog::instance().getTable(storage, context);
auto interpreter = std::make_unique<MutationsInterpreter>(storage_ptr, metadata_snapshot, commands, context, true);
auto in = interpreter->execute();

in->readPrefix();
BlocksList out;
Block block;
while ((block = in->read()))
{
out.push_back(block);
}
in->readSuffix();

std::unique_ptr<BlocksList> new_data;

// all column affected
if (interpreter->isAffectingAllColumns())
{
new_data = std::make_unique<BlocksList>(out);
}
else
{
/// just some of the column affected, we need update it with new column
new_data = std::make_unique<BlocksList>(*(data.get()));
auto data_it = new_data->begin();
auto out_it = out.begin();

while (data_it != new_data->end())
{
/// Mutation does not change the number of blocks
assert(out_it != out.end());

updateBlockData(*data_it, *out_it);
++data_it;
++out_it;
}

assert(out_it == out.end());
}

size_t rows = 0;
size_t bytes = 0;
for (const auto & buffer : *new_data)
{
rows += buffer.rows();
bytes += buffer.bytes();
}
total_size_bytes.store(rows, std::memory_order_relaxed);
total_size_rows.store(bytes, std::memory_order_relaxed);
data.set(std::move(new_data));
}


void StorageMemory::truncate(
const ASTPtr &, const StorageMetadataPtr &, const Context &, TableExclusiveLockHolder &)
{
std::lock_guard lock(mutex);
data.clear();
data.set(std::make_unique<BlocksList>());
total_size_bytes.store(0, std::memory_order_relaxed);
total_size_rows.store(0, std::memory_order_relaxed);
}
Expand Down
9 changes: 6 additions & 3 deletions src/Storages/StorageMemory.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <Storages/IStorage.h>
#include <DataStreams/IBlockOutputStream.h>

#include <Common/MultiVersion.h>

namespace DB
{
Expand All @@ -27,7 +28,7 @@ friend struct ext::shared_ptr_helper<StorageMemory>;
public:
String getName() const override { return "Memory"; }

size_t getSize() const { return data.size(); }
size_t getSize() const { return data.get()->size(); }

Pipe read(
const Names & column_names,
Expand All @@ -44,6 +45,8 @@ friend struct ext::shared_ptr_helper<StorageMemory>;

void drop() override;

void mutate(const MutationCommands & commands, const Context & context) override;

void truncate(const ASTPtr &, const StorageMetadataPtr &, const Context &, TableExclusiveLockHolder &) override;

std::optional<UInt64> totalRows(const Settings &) const override;
Expand Down Expand Up @@ -87,8 +90,8 @@ friend struct ext::shared_ptr_helper<StorageMemory>;
void delayReadForGlobalSubqueries() { delay_read_for_global_subqueries = true; }

private:
/// The data itself. `list` - so that when inserted to the end, the existing iterators are not invalidated.
BlocksList data;
/// MultiVersion data storage, so that we can copy the list of blocks to readers.
MultiVersion<BlocksList> data;

mutable std::mutex mutex;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
1 1
2 2
3 3
4 4
5 5
100 1
2 2
3 3
4 4
5 5
2 2
3 3
4 4
5 5
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
DROP TABLE IF EXISTS defaults;
CREATE TABLE defaults
(
n Int32,
s String
)ENGINE = Memory();

INSERT INTO defaults VALUES(1, '1') (2, '2') (3, '3') (4, '4') (5, '5');

SELECT * FROM defaults;

ALTER TABLE defaults UPDATE n = 100 WHERE s = '1';

SELECT * FROM defaults;

ALTER TABLE defaults DELETE WHERE n = 100;

SELECT * FROM defaults;

DROP TABLE defaults;
Empty file.
11 changes: 11 additions & 0 deletions tests/queries/0_stateless/01498_alter_column_storage_memory.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
DROP TABLE IF EXISTS defaults;
CREATE TABLE defaults
(
n Int32,
s String
)ENGINE = Memory();

ALTER TABLE defaults ADD COLUMN m Int8; -- { serverError 48 }
ALTER TABLE defaults DROP COLUMN n; -- { serverError 48 }

DROP TABLE defaults;
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
0
1
2
3
4
5
6
7
8
9
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
DROP TABLE IF EXISTS defaults;
CREATE TABLE defaults
(
n Int32
)ENGINE = Memory();

INSERT INTO defaults SELECT * FROM numbers(10);

SELECT * FROM defaults;

TRUNCATE defaults;

SELECT * FROM defaults;

DROP TABLE defaults;

0 comments on commit 1cd09fa

Please sign in to comment.