Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

Commit

Permalink
fix(bulk_load): fix bug that poping all committed mutations ineffecti…
Browse files Browse the repository at this point in the history
…ve (#1102)
  • Loading branch information
hycdong authored May 12, 2022
1 parent f75c4bb commit 51900ce
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 1 deletion.
7 changes: 7 additions & 0 deletions src/replica/bulk_load/replica_bulk_loader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -631,6 +631,13 @@ void replica_bulk_loader::handle_bulk_load_succeed()
_replica->_app->set_ingestion_status(ingestion_status::IS_INVALID);
_status = bulk_load_status::BLS_SUCCEED;
_stub->_counter_bulk_load_succeed_count->increment();

// send an empty prepare again to gurantee that learner should learn from checkpoint
if (status() == partition_status::PS_PRIMARY) {
mutation_ptr mu = _replica->new_mutation(invalid_decree);
mu->add_client_request(RPC_REPLICATION_WRITE_EMPTY, nullptr);
_replica->init_prepare(mu, false, true);
}
}

// ThreadPool: THREAD_POOL_REPLICATION
Expand Down
5 changes: 4 additions & 1 deletion src/replica/replica_2pc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -380,13 +380,16 @@ void replica::on_prepare(dsn::message_ex *request)

replica_configuration rconfig;
mutation_ptr mu;
bool pop_all_committed_mutations = false;

{
rpc_read_stream reader(request);
unmarshall(reader, rconfig, DSF_THRIFT_BINARY);
mu = mutation::read_from(reader, request);
mu->set_is_sync_to_child(rconfig.split_sync_to_child);
pop_all_committed_mutations = rconfig.pop_all;
rconfig.split_sync_to_child = false;
rconfig.pop_all = false;
}

decree decree = mu->data.header.decree;
Expand Down Expand Up @@ -494,7 +497,7 @@ void replica::on_prepare(dsn::message_ex *request)
return;
}

error_code err = _prepare_list->prepare(mu, status(), false, false);
error_code err = _prepare_list->prepare(mu, status(), pop_all_committed_mutations);
dassert(err == ERR_OK, "prepare mutation failed, err = %s", err.to_string());

if (partition_status::PS_POTENTIAL_SECONDARY == status() ||
Expand Down

0 comments on commit 51900ce

Please sign in to comment.