-
-
Notifications
You must be signed in to change notification settings - Fork 141
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feature: Cache disk files for intermediate results of aggregate operations to avoid OOM #949
Comments
Aggregate Cache disks - Development process
Step 1 Include
Step 2: Strategy
Step 1 Include
Step 2: Strategy
Step 1 Include
Step 2: Strategy
|
Aggregate cache hard drives - Requirements analysis
|
Abstract:Cache the disk file for the intermediate results of the aggregate operation to avoid the OOM summary design as a starting point for the next detailed design. And play a role in communicating design ideas to other developers only. Requirement analysis: 2022-11-17 mysql column storage Engine - Aggregate operation intermediate results cache disk files to avoid OOM- Requirement Analysis _ Zunwu World's blog -CSDN blog Cache RAM analysis of current aggregated intermediate results:Static structure: Dynamic structure: |
Summary design for adding disk cache:Design Idea:Maintain compatibility with the upper layer module interfaces of existing aggregation operations Avoid excessive disk I/OS without causing OOM Design Strategy:Add DiskCache instead of BlockedRowMemStorage, but keep the interface consistent DiskCache uses LRU internally to cache disk blocks When the LRU reaches the upper limit to eliminate blocks, the Block must be flushed to disk and the occupied RAM space must be released |
Refer to the AIO used by mysql for asynchronous IOhttps://dev.mysql.com/doc/refman/5.7/en/innodb-linux-native-aio.html |
More flexible memory control and disk swapping mechanisms are needed to make more efficient use of memory |
buffer Settings for mysql5.7.36mysql> show variables like '%buffer%';
+-------------------------------------+----------------+
| Variable_name | Value |
+-------------------------------------+----------------+
| bulk_insert_buffer_size | 8388608 |
| innodb_buffer_pool_chunk_size | 134217728 |
| innodb_buffer_pool_dump_at_shutdown | ON |
| innodb_buffer_pool_dump_now | OFF |
| innodb_buffer_pool_dump_pct | 40 |
| innodb_buffer_pool_filename | ib_buffer_pool |
| innodb_buffer_pool_instances | 1 |
| innodb_buffer_pool_load_abort | OFF |
| innodb_buffer_pool_load_at_startup | ON |
| innodb_buffer_pool_load_now | OFF |
| innodb_buffer_pool_size | 536870912 |
| innodb_change_buffer_max_size | 25 |
| innodb_change_buffering | all |
| innodb_log_buffer_size | 1048576 |
| innodb_sort_buffer_size | 1048576 |
| join_buffer_size | 262144 |
| key_buffer_size | 536870912 |
| myisam_sort_buffer_size | 8388608 |
| net_buffer_length | 16384 |
| preload_buffer_size | 32768 |
| read_buffer_size | 4194304 |
| read_rnd_buffer_size | 16777216 |
| sort_buffer_size | 4194304 |
| sql_buffer_result | OFF |
| tianmu_insert_buffer_size | 512 |
| tianmu_insert_max_buffered | 65536 |
| tianmu_sync_buffers | 0 |
+-------------------------------------+----------------+
27 rows in set (0.00 sec)
mysql> show global status like '%innodb_buffer_pool%';
+---------------------------------------+--------------------------------------------------+
| Variable_name | Value |
+---------------------------------------+--------------------------------------------------+
| Innodb_buffer_pool_dump_status | Dumping of buffer pool not started |
| Innodb_buffer_pool_load_status | Buffer pool(s) load completed at 230208 7:42:46 |
| Innodb_buffer_pool_resize_status | |
| Innodb_buffer_pool_pages_data | 252 |
| Innodb_buffer_pool_bytes_data | 4128768 |
| Innodb_buffer_pool_pages_dirty | 0 |
| Innodb_buffer_pool_bytes_dirty | 0 |
| Innodb_buffer_pool_pages_flushed | 36 |
| Innodb_buffer_pool_pages_free | 32512 |
| Innodb_buffer_pool_pages_misc | 0 |
| Innodb_buffer_pool_pages_total | 32764 |
| Innodb_buffer_pool_read_ahead_rnd | 0 |
| Innodb_buffer_pool_read_ahead | 0 |
| Innodb_buffer_pool_read_ahead_evicted | 0 |
| Innodb_buffer_pool_read_requests | 1055 |
| Innodb_buffer_pool_reads | 219 |
| Innodb_buffer_pool_wait_free | 0 |
| Innodb_buffer_pool_write_requests | 325 |
+---------------------------------------+--------------------------------------------------+
18 rows in set (0.00 sec)
|
Buffer Pool
|
Multiple Buffer Pool Instances
|
Design objectives: No matter how large the data volume is, no OOM is displayed The upper limit of the disk usage is not considered There is no noticeable performance degradation until memory is exhausted |
Static memory control: Simple control strategy Unable to use memory efficiently |
Dynamic memory control: Granularity control of the minimum allocation unit block Meta-information management for aggregation blocks Identify whether the block is in memory or on disk Location information on the disk The meta information must all be in memory The unity of the aggregation block: to read and write memory | disk interface Unified upper-layer interfaces facilitate unified control Multithreaded concurrent access control Concurrent access by multiple query threads A single query thread but concurrent access by multiple aggregation threads |
A preaggregation operation of an aggregator in which internal data is stored through a hash table whose key is a "grouping key" value (for example, if group by b is used in an sql statement, the key of the hash table is all the different values of b in the table). The hash table is dynamic, and as the number of keys increases, ClickHouse switches it to a two-level hash table to improve performance; In addition, for different key types, ClickHouse provides many specialties to optimize for specific types. For a single level hash table, the block that aggregator converts is the single_level block, for a double level hash table, the block that aggregator converts is the two_level block, two_level block will have a block_num, You can think of block_num as the key at the first level of the two-tier hash table. There are two benefits to using two_level block: Blocks of the same block_num of multiple nodes performing pre-aggregation can be combined so that different combinations can perform the merge operation in parallel If you restrict the nodes that produce two_level blocks to the order in which block_num increments, you can reduce memory usage because the data that needs to be merged must be in the same combination, and when you see a new block_num, it means that all previous merging operations have been completed. In fact, the branch above that writes data to a disk file does just that. In GroupingAggregatedTransform node, it will be single_level block into two_level block, and carried out in accordance with the block_num combination, Then to MergingAggregatedBucketTransform merge, because MergingAggregatedBucketTransform there can be multiple, so combining phase can also be parallel. Finally to SortingAggregatedTransform nodes according to block_num sort |
GROUP BY in External Memory When using max_bytes_before_external_group_by, we recommend that you set max_memory_usage about twice as high. This is necessary because there are two stages to aggregation: reading the data and forming intermediate data (1) and merging the intermediate data (2). Dumping data to the file system can only occur during stage 1. If the temporary data wasn’t dumped, then stage 2 might require up to the same amount of memory as in stage 1. For example, if max_memory_usage was set to 10000000000 and you want to use external aggregation, it makes sense to set max_bytes_before_external_group_by to 10000000000, and max_memory_usage to 20000000000. When external aggregation is triggered (if there was at least one dump of temporary data), maximum consumption of RAM is only slightly more than max_bytes_before_external_group_by. With distributed query processing, external aggregation is performed on remote servers. In order for the requester server to use only a small amount of RAM, set distributed_aggregation_memory_efficient to 1. When merging data flushed to the disk, as well as when merging results from remote servers when the distributed_aggregation_memory_efficient setting is enabled, consumes up to 1/256 * the_number_of_threads from the total amount of RAM. When external aggregation is enabled, if there was less than max_bytes_before_external_group_by of data (i.e. data was not flushed), the query runs just as fast as without external aggregation. If any temporary data was flushed, the run time will be several times longer (approximately three times). If you have an ORDER BY with a LIMIT after GROUP BY, then the amount of used RAM depends on the amount of data in LIMIT, not in the whole table. But if the ORDER BY does not have LIMIT, do not forget to enable external sorting (max_bytes_before_external_sort). |
template <typename Method>
void Aggregator::writeToTemporaryFileImpl(
AggregatedDataVariants & data_variants,
Method & method,
TemporaryFileStream & out) const
{
size_t max_temporary_block_size_rows = 0;
size_t max_temporary_block_size_bytes = 0;
auto update_max_sizes = [&](const Block & block)
{
size_t block_size_rows = block.rows();
size_t block_size_bytes = block.bytes();
if (block_size_rows > max_temporary_block_size_rows)
max_temporary_block_size_rows = block_size_rows;
if (block_size_bytes > max_temporary_block_size_bytes)
max_temporary_block_size_bytes = block_size_bytes;
};
for (UInt32 bucket = 0; bucket < Method::Data::NUM_BUCKETS; ++bucket)
{
Block block = convertOneBucketToBlock(data_variants, method, data_variants.aggregates_pool, false, bucket);
out.write(block);
update_max_sizes(block);
}
if (params.overflow_row)
{
Block block = prepareBlockAndFillWithoutKey(data_variants, false, true);
out.write(block);
update_max_sizes(block);
}
/// Pass ownership of the aggregate functions states:
/// `data_variants` will not destroy them in the destructor, they are now owned by ColumnAggregateFunction objects.
data_variants.aggregator = nullptr;
LOG_DEBUG(log, "Max size of temporary block: {} rows, {}.", max_temporary_block_size_rows, ReadableSize(max_temporary_block_size_bytes));
}
|
Data write file/// This class helps with the handling of temporary files or directories.
/// A unique name for the temporary file or directory is automatically chosen based on a specified prefix.
/// Create a directory in the constructor.
/// The destructor always removes the temporary file or directory with all contained files.
class TemporaryFileOnDisk
{
public:
explicit TemporaryFileOnDisk(const DiskPtr & disk_);
explicit TemporaryFileOnDisk(const DiskPtr & disk_, CurrentMetrics::Value metric_scope);
explicit TemporaryFileOnDisk(const DiskPtr & disk_, const String & prefix);
~TemporaryFileOnDisk();
DiskPtr getDisk() const { return disk; }
String getPath() const;
private:
DiskPtr disk;
/// Relative path in disk to the temporary file or directory
String relative_path;
CurrentMetrics::Increment metric_increment;
/// Specified if we know what for file is used (sort/aggregate/join).
std::optional<CurrentMetrics::Increment> sub_metric_increment = {};
};
/*
* Data can be written into this stream and then read.
* After finish writing, call `finishWriting` and then `read` to read the data.
* Account amount of data written to disk in parent scope.
*/
class TemporaryFileStream : boost::noncopyable
{
public:
struct Stat
{
/// Statistics for file
/// Non-atomic because we don't allow to `read` or `write` into single file from multiple threads
size_t compressed_size = 0;
size_t uncompressed_size = 0;
size_t num_rows = 0;
};
TemporaryFileStream(TemporaryFileOnDiskHolder file_, const Block & header_, TemporaryDataOnDisk * parent_);
TemporaryFileStream(FileSegmentsHolder && segments_, const Block & header_, TemporaryDataOnDisk * parent_);
size_t write(const Block & block);
void flush();
Stat finishWriting();
bool isWriteFinished() const;
Block read();
String getPath() const;
Block getHeader() const { return header; }
/// Read finished and file released
bool isEof() const;
~TemporaryFileStream();
private:
void updateAllocAndCheck();
/// Release everything, close reader and writer, delete file
void release();
TemporaryDataOnDisk * parent;
Block header;
/// Data can be stored in file directly or in the cache
TemporaryFileOnDiskHolder file;
FileSegmentsHolder segment_holder;
Stat stat;
struct OutputWriter;
std::unique_ptr<OutputWriter> out_writer;
struct InputReader;
std::unique_ptr<InputReader> in_reader;
};
|
Data segmentation for parallel aggregation auto many_data = std::make_shared<ManyAggregatedData>(streams);
for (size_t j = 0; j < streams; ++j)
{
auto aggregation_for_set = std::make_shared<AggregatingTransform>(input_header, transform_params_for_set, many_data, j, merge_threads, temporary_data_merge_threads);
// For each input stream we have `grouping_sets_size` copies, so port index
// for transform #j should skip ports of first (j-1) streams.
connect(*ports[i + grouping_sets_size * j], aggregation_for_set->getInputs().front());
ports[i + grouping_sets_size * j] = &aggregation_for_set->getOutputs().front();
processors.push_back(aggregation_for_set);
}
|
|
ExpressionTransform
|
execute
|
HashJoin: (0x7f2af22cb798) Keys: [(c_custkey) = (o_custkey)] |
DiskLocal: Reserved 3.26 GiB on local disk |
0 ./24773aaaaaa |
#0 DB::MergingAggregatedBucketTransform::transform (this=0x7ff2bb134718, chunk=...) at ../src/Processors/Transforms/MergingAggregatedMemoryEfficientTransform.cpp:318 |
static void executeJob(ExecutingGraph::Node * node, ReadProgressCallback * read_progress_callback)
} |
Flush data in the RAM to disk also. It's easier than merging on-disk and RAM data. |
Pre-aggregates data from ports, holding in RAM only one or more (up to merging_threads) blocks from each source. Aggregate functions in blocks should not be finalized so that their states can be combined. Used to solve two tasks:
The essence of the work: There are a number of sources. They give out blocks with partially aggregated data.
We start from the convention that split blocks are always passed in the order of bucket_num.
In this case, not all bucket_num from the range of 0..255 can be present. It is necessary to combine these sequences of blocks and return the result as a sequence with the same properties. The merge can be performed using several (merging_threads) threads. When you receive next blocks from different sources, |
|
|
class AggregatedChunkInfo : public ChunkInfo |
Working with states of aggregate functions in the pool is arranged in the following (inconvenient) way:
PS. This can be corrected by making a pool that knows about which states of aggregate functions and in which order are put in it, and knows how to destroy them. |
|
What are the static and dynamic flows of the aggregate's overall architecture From the analysis of module layer, it can be divided into several modules What is the focus of the responsibilities of different modules, and what are the boundaries of interaction between modules How do classes interact within a single module, and what are the relationships between classes? Pan China? A combination? Aggregation? CRTP(Singular Template Programming)? |
#0 0x000000002d48c6ea in DB::AggregatingTransform::AggregatingTransform (this=0x7f332b252c18, header=..., params_=..., many_data_=..., current_variant=0, max_threads_=16, |
Why write aggregate data to disk files and what is the purpose of aggregating it into separate pipelines |
An aggregation is done with store of temporary data on the disk, and they need to be merged in a memory efficient way. |
The new size fits into the last MemoryChunk, so just alloc the |
Begin or expand a contiguous range of memory. NOTE This method is usable only for the last allocation made on this |
Memory pool to append something. For example, short strings.
|
|
During processing of row #i we will prefetch HashTable cell for row #(i + prefetch_look_ahead). |
|
HUGE MMAP CRASH
|
7fc71f1ce000-7fce9f1ce000 rw-s 00000000 08:02 3670018 /tmp/tianmuhuge.12039 |
1. Use memory mapping 2. Set the maximum number of concurrent threads for aggregation
Abstract:
TIANMU Engine - Aggregate operation intermediate results cache disk files to avoid OOM- requirements analysis
Related ISSUE: #21
Note of context:
The results of the current aggregate operation are cached in the in-memory HASH. Once the amount of data exceeds the RAM, the OOM occurs.
If the data volume exceeds RAM, convert it to disk file storage to avoid OOM
Functional Requirements:
1. The HASH result of the aggregation operation is cached in the disk file and properly participates in the operation
When the intermediate result exceeds RAM
Performance requirements:
1. The compression ratio of the cache file size to the original data content, and the ratio of disk space to memory
Influence compression algorithm
2. Disk cache file write speed and read speed
Standard disk hardware
It affects the speed of aggregation operations and the rules for writing and reading disk cache files
3. The impact on the performance of aggregation operations
Development cycle:
TODO:
The text was updated successfully, but these errors were encountered: