Skip to content

Commit

Permalink
Merge branch 'master' into SankarshanMishra-patch-1
Browse files Browse the repository at this point in the history
  • Loading branch information
SankarshanMishra authored Apr 2, 2024
2 parents c4297f5 + fde3fbf commit 1b82d24
Show file tree
Hide file tree
Showing 1,813 changed files with 4,594 additions and 122,050 deletions.
66 changes: 33 additions & 33 deletions .github/workflows/build-extension.yml
Original file line number Diff line number Diff line change
Expand Up @@ -84,39 +84,39 @@ jobs:
- name: Build broker
run: |
cd fs_brokers/apache_hdfs_broker/ && /bin/bash build.sh
build-docs:
name: Build Documents
needs: changes
if: ${{ needs.changes.outputs.docs_changes == 'true' }}
runs-on: ubuntu-latest
steps:
- name: Checkout ${{ github.ref }}
uses: actions/checkout@v3
# build-docs:
# name: Build Documents
# needs: changes
# if: ${{ needs.changes.outputs.docs_changes == 'true' }}
# runs-on: ubuntu-latest
# steps:
# - name: Checkout ${{ github.ref }}
# uses: actions/checkout@v3

- name: Build docs
run: |
cd docs && /bin/bash build_help_zip.sh
- name: Build
run: |
git clone https://github.com/apache/doris-website.git website
cd website
echo "[\"current\"]" > versions.json
mkdir -p docs
cp -R ../docs/en/docs/* docs/
cp -R ../docs/sidebars.json sidebars.json
mkdir -p i18n/zh-CN/docusaurus-plugin-content-docs/current
cp -R ../docs/zh-CN/docs/* i18n/zh-CN/docusaurus-plugin-content-docs/current/
cp -R ../docs/dev.json i18n/zh-CN/docusaurus-plugin-content-docs/current.json
# - name: Build docs
# run: |
# cd docs && /bin/bash build_help_zip.sh
# - name: Build
# run: |
# git clone https://github.com/apache/doris-website.git website
# cd website
# echo "[\"current\"]" > versions.json
# mkdir -p docs
# cp -R ../docs/en/docs/* docs/
# cp -R ../docs/sidebars.json sidebars.json
# mkdir -p i18n/zh-CN/docusaurus-plugin-content-docs/current
# cp -R ../docs/zh-CN/docs/* i18n/zh-CN/docusaurus-plugin-content-docs/current/
# cp -R ../docs/dev.json i18n/zh-CN/docusaurus-plugin-content-docs/current.json

mkdir -p community
cp -R ../docs/en/community/* community/
mkdir -p i18n/zh-CN/docusaurus-plugin-content-docs-community/current/
cp -R ../docs/zh-CN/community/* i18n/zh-CN/docusaurus-plugin-content-docs-community/current/
cp -R ../docs/sidebarsCommunity.json .
# mkdir -p community
# cp -R ../docs/en/community/* community/
# mkdir -p i18n/zh-CN/docusaurus-plugin-content-docs-community/current/
# cp -R ../docs/zh-CN/community/* i18n/zh-CN/docusaurus-plugin-content-docs-community/current/
# cp -R ../docs/sidebarsCommunity.json .

cp -R ../docs/images static/
npm install -g yarn
yarn cache clean
yarn && yarn build
cd ../
rm -rf website
# cp -R ../docs/images static/
# npm install -g yarn
# yarn cache clean
# yarn && yarn build
# cd ../
# rm -rf website
7 changes: 4 additions & 3 deletions be/src/cloud/cloud_delete_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,11 @@ Status CloudDeleteTask::execute(CloudStorageEngine& engine, const TPushReq& requ
load_id.set_hi(0);
load_id.set_lo(0);
RowsetWriterContext context;
if (engine.latest_fs() == nullptr) [[unlikely]] {
return Status::IOError("Invalid latest fs");
context.fs = engine.get_fs_by_vault_id(request.storage_vault_id);
if (context.fs == nullptr) {
return Status::InternalError("vault id not found, maybe not sync, vault id {}",
request.storage_vault_id);
}
context.fs = engine.latest_fs();
context.txn_id = request.transaction_id;
context.load_id = load_id;
context.rowset_state = PREPARED;
Expand Down
15 changes: 4 additions & 11 deletions be/src/cloud/cloud_rowset_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,17 +66,10 @@ Status CloudRowsetBuilder::init() {
context.write_file_cache = _req.write_file_cache;
context.partial_update_info = _partial_update_info;
context.file_cache_ttl_sec = _tablet->ttl_seconds();
// New loaded data is always written to latest shared storage
// TODO(AlexYue): use the passed resource id to retrive the corresponding
// fs to pass to the RowsetWriterContext
if (_req.storage_vault_id.empty()) {
if (_engine.latest_fs() == nullptr) [[unlikely]] {
return Status::IOError("Invalid latest fs");
}
context.fs = _engine.latest_fs();
} else {
// TODO(ByteYue): What if the corresponding fs does not exists temporarily?
context.fs = get_filesystem(_req.storage_vault_id);
context.fs = _engine.get_fs_by_vault_id(_req.storage_vault_id);
if (context.fs == nullptr) {
return Status::InternalError("vault id not found, maybe not sync, vault id {}",
_req.storage_vault_id);
}
context.rowset_dir = _tablet->tablet_path();
_rowset_writer = DORIS_TRY(_tablet->create_rowset_writer(context, false));
Expand Down
8 changes: 5 additions & 3 deletions be/src/cloud/cloud_schema_change_job.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ Status CloudSchemaChangeJob::process_alter_tablet(const TAlterTabletReqV2& reque
sc_params.alter_tablet_type = AlterTabletType::MIGRATION;
break;
}
sc_params.vault_id = request.storage_vault_id;
if (!request.__isset.materialized_view_params) {
return _convert_historical_rowsets(sc_params);
}
Expand Down Expand Up @@ -251,10 +252,11 @@ Status CloudSchemaChangeJob::_convert_historical_rowsets(const SchemaChangeParam
context.segments_overlap = rs_reader->rowset()->rowset_meta()->segments_overlap();
context.tablet_schema = _new_tablet->tablet_schema();
context.newest_write_timestamp = rs_reader->newest_write_timestamp();
if (_cloud_storage_engine.latest_fs() == nullptr) [[unlikely]] {
return Status::IOError("Invalid latest fs");
context.fs = _cloud_storage_engine.get_fs_by_vault_id(sc_params.vault_id);
if (context.fs == nullptr) {
return Status::InternalError("vault id not found, maybe not sync, vault id {}",
sc_params.vault_id);
}
context.fs = _cloud_storage_engine.latest_fs();
context.write_type = DataWriteType::TYPE_SCHEMA_CHANGE;
auto rowset_writer = DORIS_TRY(_new_tablet->create_rowset_writer(context, false));

Expand Down
29 changes: 23 additions & 6 deletions be/src/cloud/cloud_storage_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
#include <rapidjson/prettywriter.h>
#include <rapidjson/stringbuffer.h>

#include <variant>

#include "cloud/cloud_base_compaction.h"
#include "cloud/cloud_cumulative_compaction.h"
#include "cloud/cloud_cumulative_compaction_policy.h"
Expand Down Expand Up @@ -72,6 +74,21 @@ CloudStorageEngine::CloudStorageEngine(const UniqueId& backend_uid)

CloudStorageEngine::~CloudStorageEngine() = default;

static Status vault_process_error(std::string_view id,
std::variant<S3Conf, cloud::HdfsVaultInfo>& vault, Status err) {
std::stringstream ss;
std::visit(
[&]<typename T>(T& val) {
if constexpr (std::is_same_v<T, S3Conf>) {
ss << val.to_string();
} else if constexpr (std::is_same_v<T, cloud::HdfsVaultInfo>) {
val.SerializeToOstream(&ss);
}
},
vault);
return Status::IOError("Invalid vault, id {}, err {}, detail conf {}", id, err, ss.str());
}

struct VaultCreateFSVisitor {
VaultCreateFSVisitor(const std::string& id) : id(id) {}
Status operator()(const S3Conf& s3_conf) const {
Expand Down Expand Up @@ -136,12 +153,12 @@ Status CloudStorageEngine::open() {

LOG(WARNING) << "failed to get vault info, retry after 5s, err=" << st;
std::this_thread::sleep_for(5s);
} while (true);

CHECK(!vault_infos.empty()) << "no vault infos";
} while (vault_infos.empty());

for (auto& [id, vault_info] : vault_infos) {
RETURN_IF_ERROR(std::visit(VaultCreateFSVisitor {id}, vault_info));
if (auto st = std::visit(VaultCreateFSVisitor {id}, vault_info); !st.ok()) [[unlikely]] {
return vault_process_error(id, vault_info, std::move(st));
}
}
set_latest_fs(get_filesystem(std::get<0>(vault_infos.back())));

Expand Down Expand Up @@ -261,8 +278,8 @@ void CloudStorageEngine::_refresh_storage_vault_info_thread_callback() {
auto st = (fs == nullptr)
? std::visit(VaultCreateFSVisitor {id}, vault_info)
: std::visit(RefreshFSVaultVisitor {id, std::move(fs)}, vault_info);
if (!st.ok()) {
LOG(WARNING) << "failed to refresh storage vault. err=" << st;
if (!st.ok()) [[unlikely]] {
LOG(WARNING) << vault_process_error(id, vault_info, std::move(st));
}
}

Expand Down
8 changes: 8 additions & 0 deletions be/src/cloud/cloud_storage_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "cloud/cloud_tablet.h"
#include "cloud_txn_delete_bitmap_cache.h"
#include "olap/storage_engine.h"
#include "olap/storage_policy.h"
#include "util/threadpool.h"

namespace doris {
Expand Down Expand Up @@ -66,6 +67,13 @@ class CloudStorageEngine final : public BaseStorageEngine {
return _calc_tablet_delete_bitmap_task_thread_pool;
}

io::FileSystemSPtr get_fs_by_vault_id(const std::string& vault_id) const {
if (vault_id.empty()) {
return latest_fs();
}
return get_filesystem(vault_id);
}

io::FileSystemSPtr latest_fs() const {
std::lock_guard lock(_latest_fs_mtx);
return _latest_fs;
Expand Down
4 changes: 4 additions & 0 deletions be/src/exec/tablet_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ Status OlapTableSchemaParam::init(const POlapTableSchemaParam& pschema) {
if (_is_partial_update) {
_auto_increment_column = pschema.auto_increment_column();
}
_timestamp_ms = pschema.timestamp_ms();
_timezone = pschema.timezone();

for (const auto& col : pschema.partial_update_input_columns()) {
_partial_update_input_columns.insert(col);
Expand Down Expand Up @@ -256,6 +258,8 @@ void OlapTableSchemaParam::to_protobuf(POlapTableSchemaParam* pschema) const {
pschema->set_partial_update(_is_partial_update);
pschema->set_is_strict_mode(_is_strict_mode);
pschema->set_auto_increment_column(_auto_increment_column);
pschema->set_timestamp_ms(_timestamp_ms);
pschema->set_timezone(_timezone);
for (auto col : _partial_update_input_columns) {
*pschema->add_partial_update_input_columns() = col;
}
Expand Down
6 changes: 6 additions & 0 deletions be/src/exec/tablet_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ class OlapTableSchemaParam {
return _partial_update_input_columns;
}
std::string auto_increment_coulumn() const { return _auto_increment_column; }
void set_timestamp_ms(int64_t timestamp_ms) { _timestamp_ms = timestamp_ms; }
int64_t timestamp_ms() const { return _timestamp_ms; }
void set_timezone(std::string timezone) { _timezone = timezone; }
std::string timezone() const { return _timezone; }
bool is_strict_mode() const { return _is_strict_mode; }
std::string debug_string() const;

Expand All @@ -109,6 +113,8 @@ class OlapTableSchemaParam {
std::set<std::string> _partial_update_input_columns;
bool _is_strict_mode = false;
std::string _auto_increment_column;
int64_t _timestamp_ms = 0;
std::string _timezone;
};

using OlapTableIndexTablets = TOlapTableIndexTablets;
Expand Down
12 changes: 9 additions & 3 deletions be/src/olap/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1072,10 +1072,16 @@ Status CloudCompactionMixin::modify_rowsets() {
}

Status CloudCompactionMixin::construct_output_rowset_writer(RowsetWriterContext& ctx) {
if (_engine.latest_fs() == nullptr) [[unlikely]] {
return Status::IOError("Invalid latest fs");
// Use the vault id of the previous rowset
for (const auto& rs : _input_rowsets) {
if (nullptr != rs->rowset_meta()->fs()) {
ctx.fs = rs->rowset_meta()->fs();
break;
}
}
if (nullptr == ctx.fs) [[unlikely]] {
return Status::InternalError("Failed to find fs");
}
ctx.fs = _engine.latest_fs();
ctx.txn_id = boost::uuids::hash_value(UUIDGenerator::instance()->next_uuid()) &
std::numeric_limits<int64_t>::max(); // MUST be positive
ctx.txn_expiration = _expiration;
Expand Down
3 changes: 2 additions & 1 deletion be/src/olap/delta_writer_v2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,8 @@ void DeltaWriterV2::_build_current_tablet_schema(int64_t index_id,
_partial_update_info = std::make_shared<PartialUpdateInfo>();
_partial_update_info->init(*_tablet_schema, table_schema_param->is_partial_update(),
table_schema_param->partial_update_input_columns(),
table_schema_param->is_strict_mode());
table_schema_param->is_strict_mode(),
table_schema_param->timestamp_ms(), table_schema_param->timezone());
}

} // namespace doris
5 changes: 2 additions & 3 deletions be/src/olap/inverted_index_parser.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,12 +99,11 @@ std::string get_parser_ignore_above_value_from_properties(
template <bool ReturnTrue = false>
std::string get_parser_lowercase_from_properties(
const std::map<std::string, std::string>& properties) {
DBUG_EXECUTE_IF("inverted_index_parser.get_parser_lowercase_from_properties", { return ""; })

if (properties.find(INVERTED_INDEX_PARSER_LOWERCASE_KEY) != properties.end()) {
return properties.at(INVERTED_INDEX_PARSER_LOWERCASE_KEY);
} else {
DBUG_EXECUTE_IF("inverted_index_parser.get_parser_lowercase_from_properties",
{ return ""; })

if constexpr (ReturnTrue) {
return INVERTED_INDEX_PARSER_TRUE;
} else {
Expand Down
7 changes: 6 additions & 1 deletion be/src/olap/partial_update_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,12 @@ namespace doris {

struct PartialUpdateInfo {
void init(const TabletSchema& tablet_schema, bool partial_update,
const std::set<string>& partial_update_cols, bool is_strict_mode) {
const std::set<string>& partial_update_cols, bool is_strict_mode,
int64_t timestamp_ms, const std::string& timezone) {
is_partial_update = partial_update;
partial_update_input_columns = partial_update_cols;
this->timestamp_ms = timestamp_ms;
this->timezone = timezone;
missing_cids.clear();
update_cids.clear();
for (auto i = 0; i < tablet_schema.num_columns(); ++i) {
Expand All @@ -51,5 +54,7 @@ struct PartialUpdateInfo {
// to generate a new row, only available in non-strict mode
bool can_insert_new_rows_in_partial_update {true};
bool is_strict_mode {false};
int64_t timestamp_ms {0};
std::string timezone;
};
} // namespace doris
16 changes: 15 additions & 1 deletion be/src/olap/rowset/segment_v2/segment_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,15 @@
#include "util/faststring.h"
#include "util/key_util.h"
#include "vec/columns/column_nullable.h"
#include "vec/columns/columns_number.h"
#include "vec/common/schema_util.h"
#include "vec/core/block.h"
#include "vec/core/column_with_type_and_name.h"
#include "vec/core/types.h"
#include "vec/io/reader_buffer.h"
#include "vec/jsonb/serialize.h"
#include "vec/olap/olap_data_convertor.h"
#include "vec/runtime/vdatetime_value.h"

namespace doris {
namespace segment_v2 {
Expand Down Expand Up @@ -693,7 +695,19 @@ Status SegmentWriter::fill_missing_columns(vectorized::MutableColumns& mutable_f
for (auto i = 0; i < cids_missing.size(); ++i) {
const auto& column = _tablet_schema->column(cids_missing[i]);
if (column.has_default_value()) {
auto default_value = _tablet_schema->column(cids_missing[i]).default_value();
std::string default_value;
if (UNLIKELY(_tablet_schema->column(cids_missing[i]).type() ==
FieldType::OLAP_FIELD_TYPE_DATETIMEV2 &&
to_lower(_tablet_schema->column(cids_missing[i]).default_value())
.find(to_lower("CURRENT_TIMESTAMP")) !=
std::string::npos)) {
DateV2Value<DateTimeV2ValueType> dtv;
dtv.from_unixtime(_opts.rowset_ctx->partial_update_info->timestamp_ms / 1000,
_opts.rowset_ctx->partial_update_info->timezone);
default_value = dtv.debug_string();
} else {
default_value = _tablet_schema->column(cids_missing[i]).default_value();
}
vectorized::ReadBuffer rb(const_cast<char*>(default_value.c_str()),
default_value.size());
RETURN_IF_ERROR(old_value_block.get_by_position(i).type->from_string(
Expand Down
Loading

0 comments on commit 1b82d24

Please sign in to comment.