diff --git a/src/replica/bulk_load/replica_bulk_loader.cpp b/src/replica/bulk_load/replica_bulk_loader.cpp index 2e85f59dfe..57e98ec525 100644 --- a/src/replica/bulk_load/replica_bulk_loader.cpp +++ b/src/replica/bulk_load/replica_bulk_loader.cpp @@ -631,6 +631,13 @@ void replica_bulk_loader::handle_bulk_load_succeed() _replica->_app->set_ingestion_status(ingestion_status::IS_INVALID); _status = bulk_load_status::BLS_SUCCEED; _stub->_counter_bulk_load_succeed_count->increment(); + + // send an empty prepare again to gurantee that learner should learn from checkpoint + if (status() == partition_status::PS_PRIMARY) { + mutation_ptr mu = _replica->new_mutation(invalid_decree); + mu->add_client_request(RPC_REPLICATION_WRITE_EMPTY, nullptr); + _replica->init_prepare(mu, false, true); + } } // ThreadPool: THREAD_POOL_REPLICATION diff --git a/src/replica/replica_2pc.cpp b/src/replica/replica_2pc.cpp index 2ddf320adf..314ded57b1 100644 --- a/src/replica/replica_2pc.cpp +++ b/src/replica/replica_2pc.cpp @@ -380,13 +380,16 @@ void replica::on_prepare(dsn::message_ex *request) replica_configuration rconfig; mutation_ptr mu; + bool pop_all_committed_mutations = false; { rpc_read_stream reader(request); unmarshall(reader, rconfig, DSF_THRIFT_BINARY); mu = mutation::read_from(reader, request); mu->set_is_sync_to_child(rconfig.split_sync_to_child); + pop_all_committed_mutations = rconfig.pop_all; rconfig.split_sync_to_child = false; + rconfig.pop_all = false; } decree decree = mu->data.header.decree; @@ -494,7 +497,7 @@ void replica::on_prepare(dsn::message_ex *request) return; } - error_code err = _prepare_list->prepare(mu, status(), false, false); + error_code err = _prepare_list->prepare(mu, status(), pop_all_committed_mutations); dassert(err == ERR_OK, "prepare mutation failed, err = %s", err.to_string()); if (partition_status::PS_POTENTIAL_SECONDARY == status() ||