Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into impl-hex-int-str-arg
Browse files Browse the repository at this point in the history
  • Loading branch information
YangKeao committed Jul 5, 2022
2 parents e057637 + a0ecce0 commit 0ca9582
Show file tree
Hide file tree
Showing 27 changed files with 248 additions and 69 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -253,15 +253,15 @@ ninja tiflash
tiup playground nightly --tiflash.binpath $BUILD/dbms/src/Server/tiflash
```
3. Check $WORKSPACE/tests/_env.sh to make the port and build dir right.
4. Run your integration tests using commands like "./run-test.sh fullstack-test2/ddl" under $WORKSPACE dir
4. Run your integration tests using commands like "./run-test.sh fullstack-test2/ddl" under $WORKSPACE/tests dir

## Run MicroBenchmark Tests

To run micro benchmark tests, you need to build with -DCMAKE_BUILD_TYPE=RELEASE -DENABLE_TESTS=ON:

```shell
cd $BUILD
cmake $WORKSPACE/tiflash -GNinja -DCMAKE_BUILD_TYPE=DEBUG -DENABLE_TESTS=ON
cmake $WORKSPACE/tiflash -GNinja -DCMAKE_BUILD_TYPE=RELEASE -DENABLE_TESTS=ON
ninja bench_dbms
```

Expand Down
2 changes: 1 addition & 1 deletion contrib/prometheus-cpp
Submodule prometheus-cpp updated 126 files
9 changes: 9 additions & 0 deletions contrib/prometheus-cpp-cmake/pull/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,18 @@ if(ENABLE_COMPRESSION)
endif()

add_library(pull
${PROMETHEUS_SRC_DIR}/pull/src/basic_auth.cc
${PROMETHEUS_SRC_DIR}/pull/src/basic_auth.h
${PROMETHEUS_SRC_DIR}/pull/src/endpoint.cc
${PROMETHEUS_SRC_DIR}/pull/src/endpoint.h
${PROMETHEUS_SRC_DIR}/pull/src/exposer.cc
${PROMETHEUS_SRC_DIR}/pull/src/handler.cc
${PROMETHEUS_SRC_DIR}/pull/src/handler.h
${PROMETHEUS_SRC_DIR}/pull/src/metrics_collector.cc
${PROMETHEUS_SRC_DIR}/pull/src/metrics_collector.h

${PROMETHEUS_SRC_DIR}/pull/src/detail/base64.h

$<$<BOOL:${USE_THIRDPARTY_LIBRARIES}>:$<TARGET_OBJECTS:civetweb>>
)

Expand Down
2 changes: 2 additions & 0 deletions contrib/prometheus-cpp-cmake/push/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ if(NOT CURL_FOUND)
endif()

add_library(push
${PROMETHEUS_SRC_DIR}/push/src/curl_wrapper.cc
${PROMETHEUS_SRC_DIR}/push/src/curl_wrapper.h
${PROMETHEUS_SRC_DIR}/push/src/gateway.cc
)

Expand Down
2 changes: 1 addition & 1 deletion contrib/tiflash-proxy
Submodule tiflash-proxy updated 814 files
6 changes: 5 additions & 1 deletion contrib/tiflash-proxy-cmake/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@ file(GLOB_RECURSE _TIFLASH_PROXY_SRCS "${_TIFLASH_PROXY_SOURCE_DIR}/*.rs")
list(FILTER _TIFLASH_PROXY_SRCS EXCLUDE REGEX ${_TIFLASH_PROXY_SOURCE_DIR}/target/.*)

# use `CFLAGS=-w CXXFLAGS=-w` to inhibit warning messages.
set(TIFLASH_RUST_ENV CMAKE=${CMAKE_COMMAND} CFLAGS=-w CXXFLAGS=-w)
if (TIFLASH_LLVM_TOOLCHAIN)
set(TIFLASH_RUST_ENV CMAKE=${CMAKE_COMMAND} "CFLAGS=-w -fuse-ld=lld" "CXXFLAGS=-w -fuse-ld=lld -stdlib=libc++")
else()
set(TIFLASH_RUST_ENV CMAKE=${CMAKE_COMMAND} CFLAGS=-w CXXFLAGS=-w)
endif()

if(TIFLASH_LLVM_TOOLCHAIN AND USE_LIBCXX)
set(TIFLASH_RUST_LINKER ${CMAKE_CURRENT_BINARY_DIR}/tiflash-linker)
Expand Down
13 changes: 11 additions & 2 deletions dbms/src/Debug/MockSchemaGetter.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,25 @@
#include <Debug/MockTiDB.h>
#include <TiDB/Schema/SchemaGetter.h>

#include <optional>

namespace DB
{

struct MockSchemaGetter
{
TiDB::DBInfoPtr getDatabase(DatabaseID db_id) { return MockTiDB::instance().getDBInfoByID(db_id); }

Int64 getVersion() { return MockTiDB::instance().getVersion(); }

SchemaDiff getSchemaDiff(Int64 version) { return MockTiDB::instance().getSchemaDiff(version); }
std::optional<SchemaDiff> getSchemaDiff(Int64 version)
{
return MockTiDB::instance().getSchemaDiff(version);
}

bool checkSchemaDiffExists(Int64 version)
{
return MockTiDB::instance().checkSchemaDiffExists(version);
}

TiDB::TableInfoPtr getTableInfo(DatabaseID, TableID table_id) { return MockTiDB::instance().getTableInfoByID(table_id); }

Expand Down
7 changes: 6 additions & 1 deletion dbms/src/Debug/MockTiDB.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -668,9 +668,14 @@ std::pair<bool, DatabaseID> MockTiDB::getDBIDByName(const String & database_name
return std::make_pair(false, -1);
}

SchemaDiff MockTiDB::getSchemaDiff(Int64 version_)
std::optional<SchemaDiff> MockTiDB::getSchemaDiff(Int64 version_)
{
return version_diff[version_];
}

bool MockTiDB::checkSchemaDiffExists(Int64 version)
{
return version_diff.find(version) != version_diff.end();
}

} // namespace DB
4 changes: 3 additions & 1 deletion dbms/src/Debug/MockTiDB.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,9 @@ class MockTiDB : public ext::Singleton<MockTiDB>

std::pair<bool, DatabaseID> getDBIDByName(const String & database_name);

SchemaDiff getSchemaDiff(Int64 version);
bool checkSchemaDiffExists(Int64 version);

std::optional<SchemaDiff> getSchemaDiff(Int64 version);

std::unordered_map<String, DatabaseID> getDatabases() { return databases; }

Expand Down
6 changes: 5 additions & 1 deletion dbms/src/Flash/Coprocessor/DAGContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -349,14 +349,18 @@ class DAGContext
std::vector<tipb::FieldType> output_field_types;
std::vector<Int32> output_offsets;

/// Hold the order of list based executors.
/// It is used to ensure that the order of Execution summary of list based executors is the same as the order of list based executors.
std::vector<String> list_based_executors_order;

private:
void initExecutorIdToJoinIdMap();
void initOutputInfo();

private:
/// Hold io for correcting the destruction order.
BlockIO io;
/// profile_streams_map is a map that maps from executor_id to profile BlockInputStreams
/// profile_streams_map is a map that maps from executor_id to profile BlockInputStreams.
std::unordered_map<String, BlockInputStreams> profile_streams_map;
/// executor_id_to_join_id_map is a map that maps executor id to all the join executor id of itself and all its children.
std::unordered_map<String, std::vector<String>> executor_id_to_join_id_map;
Expand Down
23 changes: 23 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGQuerySource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,26 @@

namespace DB
{
namespace
{
void fillOrderForListBasedExecutors(DAGContext & dag_context, const DAGQueryBlock & query_block)
{
assert(query_block.source);
auto & list_based_executors_order = dag_context.list_based_executors_order;
list_based_executors_order.push_back(query_block.source_name);
if (query_block.selection)
list_based_executors_order.push_back(query_block.selection_name);
if (query_block.aggregation)
list_based_executors_order.push_back(query_block.aggregation_name);
if (query_block.having)
list_based_executors_order.push_back(query_block.having_name);
if (query_block.limit_or_topn)
list_based_executors_order.push_back(query_block.limit_or_topn_name);
if (query_block.exchange_sender)
dag_context.list_based_executors_order.push_back(query_block.exchange_sender_name);
}
} // namespace

DAGQuerySource::DAGQuerySource(Context & context_)
: context(context_)
{
Expand All @@ -32,6 +52,9 @@ DAGQuerySource::DAGQuerySource(Context & context_)
else
{
root_query_block = std::make_shared<DAGQueryBlock>(1, dag_request.executors());
auto & dag_context = getDAGContext();
if (!dag_context.return_executor_id)
fillOrderForListBasedExecutors(dag_context, *root_query_block);
}
}

Expand Down
33 changes: 25 additions & 8 deletions dbms/src/Flash/Coprocessor/DAGResponseWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,10 @@ void DAGResponseWriter::addExecuteSummaries(tipb::SelectResponse & response, boo
}
}

/// add execution_summary for local executor
for (auto & p : dag_context.getProfileStreamsMap())
{
auto fill_execution_summary = [&](const String & executor_id, const BlockInputStreams & streams) {
ExecutionSummary current;
/// part 1: local execution info
for (auto & stream_ptr : p.second)
for (const auto & stream_ptr : streams)
{
if (auto * p_stream = dynamic_cast<IProfilingBlockInputStream *>(stream_ptr.get()))
{
Expand All @@ -105,16 +103,16 @@ void DAGResponseWriter::addExecuteSummaries(tipb::SelectResponse & response, boo
current.concurrency++;
}
/// part 2: remote execution info
if (merged_remote_execution_summaries.find(p.first) != merged_remote_execution_summaries.end())
if (merged_remote_execution_summaries.find(executor_id) != merged_remote_execution_summaries.end())
{
for (auto & remote : merged_remote_execution_summaries[p.first])
for (auto & remote : merged_remote_execution_summaries[executor_id])
current.merge(remote, false);
}
/// part 3: for join need to add the build time
/// In TiFlash, a hash join's build side is finished before probe side starts,
/// so the join probe side's running time does not include hash table's build time,
/// when construct ExecSummaries, we need add the build cost to probe executor
auto all_join_id_it = dag_context.getExecutorIdToJoinIdMap().find(p.first);
auto all_join_id_it = dag_context.getExecutorIdToJoinIdMap().find(executor_id);
if (all_join_id_it != dag_context.getExecutorIdToJoinIdMap().end())
{
for (const auto & join_executor_id : all_join_id_it->second)
Expand All @@ -138,8 +136,27 @@ void DAGResponseWriter::addExecuteSummaries(tipb::SelectResponse & response, boo
}

current.time_processed_ns += dag_context.compile_time_ns;
fillTiExecutionSummary(response.add_execution_summaries(), current, p.first, delta_mode);
fillTiExecutionSummary(response.add_execution_summaries(), current, executor_id, delta_mode);
};

/// add execution_summary for local executor
if (dag_context.return_executor_id)
{
for (auto & p : dag_context.getProfileStreamsMap())
fill_execution_summary(p.first, p.second);
}
else
{
const auto & profile_streams_map = dag_context.getProfileStreamsMap();
assert(profile_streams_map.size() == dag_context.list_based_executors_order.size());
for (const auto & executor_id : dag_context.list_based_executors_order)
{
auto it = profile_streams_map.find(executor_id);
assert(it != profile_streams_map.end());
fill_execution_summary(executor_id, it->second);
}
}

for (auto & p : merged_remote_execution_summaries)
{
if (local_executors.find(p.first) == local_executors.end())
Expand Down
13 changes: 13 additions & 0 deletions dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,19 @@ bool DeltaValueSpace::ingestColumnFiles(DMContext & /*context*/, const RowKeyRan

bool DeltaValueSpace::flush(DMContext & context)
{
bool v = false;
if (!is_flushing.compare_exchange_strong(v, true))
{
// other thread is flushing, just return.
LOG_FMT_DEBUG(log, "{}, Flush stop because other thread is flushing", simpleInfo());
return false;
}
SCOPE_EXIT({
bool v = true;
if (!is_flushing.compare_exchange_strong(v, false))
throw Exception(simpleInfo() + " is expected to be flushing", ErrorCodes::LOGICAL_ERROR);
});

LOG_FMT_DEBUG(log, "{}, Flush start", info());

/// We have two types of data needed to flush to disk:
Expand Down
7 changes: 7 additions & 0 deletions dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ class DeltaValueSpace
/// Note that those things can not be done at the same time.
std::atomic_bool is_updating = false;

/// Note that it's safe to do multiple flush concurrently but only one of them can succeed,
/// and other thread's work is just a waste of resource.
/// So we only allow one flush task running at any time to aviod waste resource.
std::atomic_bool is_flushing = false;

std::atomic<size_t> last_try_flush_rows = 0;
std::atomic<size_t> last_try_flush_bytes = 0;
std::atomic<size_t> last_try_compact_column_files = 0;
Expand Down Expand Up @@ -159,6 +164,8 @@ class DeltaValueSpace
size_t getTotalCacheBytes() const;
size_t getValidCacheRows() const;

bool isFlushing() const { return is_flushing; }

bool isUpdating() const { return is_updating; }

bool tryLockUpdating()
Expand Down
35 changes: 29 additions & 6 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -980,14 +980,14 @@ void DeltaMergeStore::deleteRange(const Context & db_context, const DB::Settings
checkSegmentUpdate(dm_context, segment, ThreadType::Write);
}

void DeltaMergeStore::flushCache(const DMContextPtr & dm_context, const RowKeyRange & range)
bool DeltaMergeStore::flushCache(const DMContextPtr & dm_context, const RowKeyRange & range, bool try_until_succeed)
{
RowKeyRange cur_range = range;
while (!cur_range.none())
{
RowKeyRange segment_range;

// Keep trying until succeeded.
// Keep trying until succeeded if needed.
while (true)
{
SegmentPtr segment;
Expand All @@ -1010,10 +1010,15 @@ void DeltaMergeStore::flushCache(const DMContextPtr & dm_context, const RowKeyRa
{
break;
}
else if (!try_until_succeed)
{
return false;
}
}

cur_range.setStart(segment_range.end);
}
return true;
}

void DeltaMergeStore::mergeDeltaAll(const Context & context)
Expand Down Expand Up @@ -1347,6 +1352,12 @@ void DeltaMergeStore::checkSegmentUpdate(const DMContextPtr & dm_context, const
&& (delta_rows - delta_last_try_flush_rows >= delta_cache_limit_rows
|| delta_bytes - delta_last_try_flush_bytes >= delta_cache_limit_bytes);
bool should_foreground_flush = unsaved_rows >= delta_cache_limit_rows * 3 || unsaved_bytes >= delta_cache_limit_bytes * 3;
/// For write thread, we want to avoid foreground flush to block the process of apply raft command.
/// So we increase the threshold of foreground flush for write thread.
if (thread_type == ThreadType::Write)
{
should_foreground_flush = unsaved_rows >= delta_cache_limit_rows * 10 || unsaved_bytes >= delta_cache_limit_bytes * 10;
}

bool should_background_merge_delta = ((delta_check_rows >= delta_limit_rows || delta_check_bytes >= delta_limit_bytes) //
&& (delta_rows - delta_last_try_merge_delta_rows >= delta_cache_limit_rows
Expand Down Expand Up @@ -1404,9 +1415,16 @@ void DeltaMergeStore::checkSegmentUpdate(const DMContextPtr & dm_context, const
}
else if (should_background_flush)
{
delta_last_try_flush_rows = delta_rows;
delta_last_try_flush_bytes = delta_bytes;
try_add_background_task(BackgroundTask{TaskType::Flush, dm_context, segment, {}});
/// It's meaningless to add more flush tasks if the segment is flushing.
/// Because only one flush task can proceed at any time.
/// And after the current flush task finished,
/// it will call `checkSegmentUpdate` again to check whether there is more flush task to do.
if (!segment->isFlushing())
{
delta_last_try_flush_rows = delta_rows;
delta_last_try_flush_bytes = delta_bytes;
try_add_background_task(BackgroundTask{TaskType::Flush, dm_context, segment, {}});
}
}
}

Expand Down Expand Up @@ -1502,7 +1520,12 @@ void DeltaMergeStore::checkSegmentUpdate(const DMContextPtr & dm_context, const
return false;
};
auto try_bg_compact = [&]() {
if (should_compact)
/// Compact task should be a really low priority task.
/// And if the segment is flushing,
/// we should avoid adding background compact task to reduce lock contention on the segment and save disk throughput.
/// And after the current flush task complete,
/// it will call `checkSegmentUpdate` again to check whether there is other kinds of task to do.
if (should_compact && !segment->isFlushing())
{
delta_last_try_compact_column_files = column_file_count;
try_add_background_task(BackgroundTask{TaskType::Compact, dm_context, segment, {}});
Expand Down
8 changes: 4 additions & 4 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -367,14 +367,14 @@ class DeltaMergeStore : private boost::noncopyable
const SegmentIdSet & read_segments = {},
size_t extra_table_id_index = InvalidColumnID);

/// Force flush all data to disk.
void flushCache(const Context & context, const RowKeyRange & range)
/// Try flush all data in `range` to disk and return whether the task succeed.
bool flushCache(const Context & context, const RowKeyRange & range, bool try_until_succeed = true)
{
auto dm_context = newDMContext(context, context.getSettingsRef());
flushCache(dm_context, range);
return flushCache(dm_context, range, try_until_succeed);
}

void flushCache(const DMContextPtr & dm_context, const RowKeyRange & range);
bool flushCache(const DMContextPtr & dm_context, const RowKeyRange & range, bool try_until_succeed = true);

/// Merge delta into the stable layer for all segments.
///
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Storages/DeltaMerge/Segment.h
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,8 @@ class Segment : private boost::noncopyable

void drop(const FileProviderPtr & file_provider, WriteBatches & wbs);

bool isFlushing() const { return delta->isFlushing(); }

RowsAndBytes getRowsAndBytesInRange(
DMContext & dm_context,
const SegmentSnapshotPtr & segment_snap,
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/IManageableStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class IManageableStorage : public IStorage

virtual void flushCache(const Context & /*context*/) {}

virtual void flushCache(const Context & /*context*/, const DM::RowKeyRange & /*range_to_flush*/) {}
virtual bool flushCache(const Context & /*context*/, const DM::RowKeyRange & /*range_to_flush*/, [[maybe_unused]] bool try_until_succeed = true) { return true; }

virtual BlockInputStreamPtr status() { return {}; }

Expand Down
Loading

0 comments on commit 0ca9582

Please sign in to comment.