Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

fix(bulk_load): fix bug that poping all committed mutations ineffective #1102

Merged
merged 26 commits into from
May 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
56d6144
feat: support single replica (#932)
empiredan Oct 9, 2021
289eb46
fix: mutation_log_test failed when compile with debug type (#933)
foreverneverer Oct 11, 2021
897bd0c
feat(bulk_load): async create connection with remote file provider (#…
hycdong Nov 8, 2021
6b6e99d
feat(bulk_load): not rollback to downloading if bulk load meet error …
hycdong Nov 9, 2021
c97ff5d
refactor(bulk_load): update some bulk load logs (#960)
hycdong Nov 10, 2021
157853b
feat(bulk_load): remove bulk load request short interval avoid unnece…
hycdong Nov 10, 2021
a25e2fb
refactor(bulk_load): refactor get_app for bulk_load_service (#962)
hycdong Nov 12, 2021
e165da0
feat(bulk_load): add unhealthy partition check (#964)
hycdong Nov 15, 2021
0d06aa8
fix: remove bulk_load_meta_service::check_partition_status useless th…
hycdong Nov 18, 2021
8682dd1
feat(op_status_lock): add an operation status lock for meta (#977)
GiantKing Dec 9, 2021
73a72dd
feat(bulk_load): add bulk load manager for meta (#986)
GiantKing Dec 24, 2021
2d5762b
feat(online_migration):part1 - add new app_envs to control rocksdb al…
hycdong Dec 29, 2021
0d440f7
feat(online_migration): part2 - add ingest_behind option for bulk loa…
hycdong Dec 31, 2021
c4c0d2f
fix(bulk_load): fix replica validate status check (#1004)
hycdong Dec 31, 2021
2c2d163
refactor: add a new function to store app_info into file (#1003)
hycdong Jan 5, 2022
ce76e62
fix: make is_bulk_loading filed optional in query_bulk_load_response …
hycdong Jan 6, 2022
29432f0
fix(bulk_load): update ingestion error handling with move_files mode …
hycdong Jan 17, 2022
0e67b2f
feat(bulk_load): add verify_before_ingest option (#1027)
hycdong Jan 20, 2022
0ac5051
refactor(bulk_load): rename update_partition_status_on_remote_storage…
hycdong Jan 20, 2022
8d499c5
feat(bulk_load): avoid unecessary repeated ingestion (#1018)
hycdong Jan 25, 2022
79e29f8
feat(bulk_load): support disk_level ingesting restriction part1 - add…
hycdong Jan 27, 2022
0c79653
feat(bulk_load): support disk_level ingesting restriction part2 imple…
hycdong Jan 28, 2022
29ed007
fix(bulk_load): fix pop all committed mutation not work
hycdong Apr 29, 2022
2866a55
merge master
hycdong May 7, 2022
c135ac6
fix merge error
hycdong May 7, 2022
6b0929a
small fix
hycdong May 7, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/replica/bulk_load/replica_bulk_loader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -631,6 +631,13 @@ void replica_bulk_loader::handle_bulk_load_succeed()
_replica->_app->set_ingestion_status(ingestion_status::IS_INVALID);
_status = bulk_load_status::BLS_SUCCEED;
_stub->_counter_bulk_load_succeed_count->increment();

// send an empty prepare again to gurantee that learner should learn from checkpoint
if (status() == partition_status::PS_PRIMARY) {
mutation_ptr mu = _replica->new_mutation(invalid_decree);
mu->add_client_request(RPC_REPLICATION_WRITE_EMPTY, nullptr);
_replica->init_prepare(mu, false, true);
}
}

// ThreadPool: THREAD_POOL_REPLICATION
Expand Down
5 changes: 4 additions & 1 deletion src/replica/replica_2pc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -380,13 +380,16 @@ void replica::on_prepare(dsn::message_ex *request)

replica_configuration rconfig;
mutation_ptr mu;
bool pop_all_committed_mutations = false;

{
rpc_read_stream reader(request);
unmarshall(reader, rconfig, DSF_THRIFT_BINARY);
mu = mutation::read_from(reader, request);
mu->set_is_sync_to_child(rconfig.split_sync_to_child);
pop_all_committed_mutations = rconfig.pop_all;
rconfig.split_sync_to_child = false;
rconfig.pop_all = false;
}

decree decree = mu->data.header.decree;
Expand Down Expand Up @@ -494,7 +497,7 @@ void replica::on_prepare(dsn::message_ex *request)
return;
}

error_code err = _prepare_list->prepare(mu, status(), false, false);
error_code err = _prepare_list->prepare(mu, status(), pop_all_committed_mutations);
dassert(err == ERR_OK, "prepare mutation failed, err = %s", err.to_string());

if (partition_status::PS_POTENTIAL_SECONDARY == status() ||
Expand Down