Skip to content

Commit

Permalink
Merge pull request ceph#49540 from xxhdx1985126/wip-rebuild-missing-set
Browse files Browse the repository at this point in the history
crimson/osd/pg: rebuild missing set when a new interval is created

Reviewed-by: Samuel Just <sjust@redhat.com>
Reviewed-by: Yingxin Cheng <yingxin.cheng@intel.com>
  • Loading branch information
cyx1231st authored Jan 12, 2023
2 parents 9c97a7a + 9c2d11a commit 3c63faf
Show file tree
Hide file tree
Showing 6 changed files with 175 additions and 64 deletions.
8 changes: 5 additions & 3 deletions src/crimson/osd/osd_operations/peering_event.cc
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,11 @@ seastar::future<> PeeringEvent<T>::with_pg(
return this->template enter_stage<interruptor>(
BackfillRecovery::bp(*pg).process);
}).then_interruptible([this, pg, &shard_services] {
pg->do_peering_event(evt, ctx);
that()->get_handle().exit();
return complete_rctx(shard_services, pg);
return pg->do_peering_event(evt, ctx
).then_interruptible([this, pg, &shard_services] {
that()->get_handle().exit();
return complete_rctx(shard_services, pg);
});
}).then_interruptible([pg, &shard_services]()
-> typename T::template interruptible_future<> {
if (!pg->get_need_up_thru()) {
Expand Down
68 changes: 37 additions & 31 deletions src/crimson/osd/osd_operations/pg_advance_map.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,39 +63,45 @@ seastar::future<> PGAdvanceMap::start()
pg->peering_request_pg_pipeline.process
).then([this] {
from = pg->get_osdmap_epoch();
auto fut = seastar::now();
if (do_init) {
pg->handle_initialize(rctx);
pg->handle_activate_map(rctx);
}
return seastar::do_for_each(
boost::make_counting_iterator(*from + 1),
boost::make_counting_iterator(to + 1),
[this](epoch_t next_epoch) {
return shard_services.get_map(next_epoch).then(
[this] (cached_map_t&& next_map) {
logger().debug("{}: advancing map to {}",
*this, next_map->get_epoch());
pg->handle_advance_map(next_map, rctx);
});
}).then([this] {
pg->handle_activate_map(rctx);
logger().debug("{}: map activated", *this);
if (do_init) {
shard_services.pg_created(pg->get_pgid(), pg);
logger().info("PGAdvanceMap::start new pg {}", *pg);
}
return seastar::when_all_succeed(
pg->get_need_up_thru()
? shard_services.send_alive(
pg->get_same_interval_since())
: seastar::now(),
shard_services.dispatch_context(
pg->get_collection_ref(),
std::move(rctx)));
}).then_unpack([this] {
logger().debug("{}: sending pg temp", *this);
return shard_services.send_pg_temp();
fut = pg->handle_initialize(rctx
).then([this] {
return pg->handle_activate_map(rctx);
});
}
return fut.then([this] {
return seastar::do_for_each(
boost::make_counting_iterator(*from + 1),
boost::make_counting_iterator(to + 1),
[this](epoch_t next_epoch) {
return shard_services.get_map(next_epoch).then(
[this] (cached_map_t&& next_map) {
logger().debug("{}: advancing map to {}",
*this, next_map->get_epoch());
return pg->handle_advance_map(next_map, rctx);
});
}).then([this] {
return pg->handle_activate_map(rctx).then([this] {
logger().debug("{}: map activated", *this);
if (do_init) {
shard_services.pg_created(pg->get_pgid(), pg);
logger().info("PGAdvanceMap::start new pg {}", *pg);
}
return seastar::when_all_succeed(
pg->get_need_up_thru()
? shard_services.send_alive(
pg->get_same_interval_since())
: seastar::now(),
shard_services.dispatch_context(
pg->get_collection_ref(),
std::move(rctx)));
});
}).then_unpack([this] {
logger().debug("{}: sending pg temp", *this);
return shard_services.send_pg_temp();
});
});
}).then([this, ref=std::move(ref)] {
logger().debug("{}: complete", *this);
});
Expand Down
62 changes: 37 additions & 25 deletions src/crimson/osd/pg.cc
Original file line number Diff line number Diff line change
Expand Up @@ -494,49 +494,61 @@ seastar::future<> PG::read_state(crimson::os::FuturizedStore* store)
});
}

void PG::do_peering_event(
PG::interruptible_future<> PG::do_peering_event(
PGPeeringEvent& evt, PeeringCtx &rctx)
{
if (peering_state.pg_has_reset_since(evt.get_epoch_requested()) ||
peering_state.pg_has_reset_since(evt.get_epoch_sent())) {
logger().debug("{} ignoring {} -- pg has reset", __func__, evt.get_desc());
return interruptor::now();
} else {
logger().debug("{} handling {} for pg: {}", __func__, evt.get_desc(), pgid);
peering_state.handle_event(
evt.get_event(),
&rctx);
peering_state.write_if_dirty(rctx.transaction);
// all peering event handling needs to be run in a dedicated seastar::thread,
// so that event processing can involve I/O reqs freely, for example: PG::on_removal,
// PG::on_new_interval
return interruptor::async([this, &evt, &rctx] {
peering_state.handle_event(
evt.get_event(),
&rctx);
peering_state.write_if_dirty(rctx.transaction);
});
}
}

void PG::handle_advance_map(
seastar::future<> PG::handle_advance_map(
cached_map_t next_map, PeeringCtx &rctx)
{
vector<int> newup, newacting;
int up_primary, acting_primary;
next_map->pg_to_up_acting_osds(
pgid.pgid,
&newup, &up_primary,
&newacting, &acting_primary);
peering_state.advance_map(
next_map,
peering_state.get_osdmap(),
newup,
up_primary,
newacting,
acting_primary,
rctx);
osdmap_gate.got_map(next_map->get_epoch());
return seastar::async([this, next_map=std::move(next_map), &rctx] {
vector<int> newup, newacting;
int up_primary, acting_primary;
next_map->pg_to_up_acting_osds(
pgid.pgid,
&newup, &up_primary,
&newacting, &acting_primary);
peering_state.advance_map(
next_map,
peering_state.get_osdmap(),
newup,
up_primary,
newacting,
acting_primary,
rctx);
osdmap_gate.got_map(next_map->get_epoch());
});
}

void PG::handle_activate_map(PeeringCtx &rctx)
seastar::future<> PG::handle_activate_map(PeeringCtx &rctx)
{
peering_state.activate_map(rctx);
return seastar::async([this, &rctx] {
peering_state.activate_map(rctx);
});
}

void PG::handle_initialize(PeeringCtx &rctx)
seastar::future<> PG::handle_initialize(PeeringCtx &rctx)
{
peering_state.handle_event(PeeringState::Initialize{}, &rctx);
return seastar::async([this, &rctx] {
peering_state.handle_event(PeeringState::Initialize{}, &rctx);
});
}


Expand Down
13 changes: 8 additions & 5 deletions src/crimson/osd/pg.h
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,10 @@ class PG : public boost::intrusive_ref_counter<
}

void rebuild_missing_set_with_deletes(PGLog &pglog) final {
ceph_assert(0 == "Impossible for crimson");
pglog.rebuild_missing_set_with_deletes_crimson(
shard_services.get_store(),
coll_ref,
peering_state.get_info()).get();
}

PerfCounters &get_peering_perf() final {
Expand Down Expand Up @@ -497,12 +500,12 @@ class PG : public boost::intrusive_ref_counter<

seastar::future<> read_state(crimson::os::FuturizedStore* store);

void do_peering_event(
interruptible_future<> do_peering_event(
PGPeeringEvent& evt, PeeringCtx &rctx);

void handle_advance_map(cached_map_t next_map, PeeringCtx &rctx);
void handle_activate_map(PeeringCtx &rctx);
void handle_initialize(PeeringCtx &rctx);
seastar::future<> handle_advance_map(cached_map_t next_map, PeeringCtx &rctx);
seastar::future<> handle_activate_map(PeeringCtx &rctx);
seastar::future<> handle_initialize(PeeringCtx &rctx);

static hobject_t get_oid(const hobject_t& hobj);
static RWState::State get_lock_type(const OpInfo &op_info);
Expand Down
81 changes: 81 additions & 0 deletions src/osd/PGLog.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1206,4 +1206,85 @@ seastar::future<> PGLog::read_log_and_missing_crimson(
});
}

seastar::future<> PGLog::rebuild_missing_set_with_deletes_crimson(
crimson::os::FuturizedStore &store,
crimson::os::CollectionRef ch,
const pg_info_t &info)
{
// save entries not generated from the current log (e.g. added due
// to repair, EIO handling, or divergent_priors).
map<hobject_t, pg_missing_item> extra_missing;
for (const auto& p : missing.get_items()) {
if (!log.logged_object(p.first)) {
ldpp_dout(this, 20) << __func__ << " extra missing entry: " << p.first
<< " " << p.second << dendl;
extra_missing[p.first] = p.second;
}
}
missing.clear();

// go through the log and add items that are not present or older
// versions on disk, just as if we were reading the log + metadata
// off disk originally
return seastar::do_with(
set<hobject_t>(),
log.log.rbegin(),
[this, &store, ch, &info](auto &did, auto &it) {
return seastar::repeat([this, &store, ch, &info, &it, &did] {
if (it == log.log.rend()) {
return seastar::make_ready_future<seastar::stop_iteration>(
seastar::stop_iteration::yes);
}
auto &log_entry = *it;
it++;
if (log_entry.version <= info.last_complete)
return seastar::make_ready_future<seastar::stop_iteration>(
seastar::stop_iteration::yes);
if (log_entry.soid > info.last_backfill ||
log_entry.is_error() ||
did.find(log_entry.soid) != did.end())
return seastar::make_ready_future<seastar::stop_iteration>(
seastar::stop_iteration::no);
did.insert(log_entry.soid);
return store.get_attr(
ch,
ghobject_t(log_entry.soid, ghobject_t::NO_GEN, info.pgid.shard),
OI_ATTR
).safe_then([this, &log_entry](auto bv) {
object_info_t oi(bv);
ldpp_dout(this, 20)
<< "rebuild_missing_set_with_deletes_crimson found obj "
<< log_entry.soid
<< " version = " << oi.version << dendl;
if (oi.version < log_entry.version) {
ldpp_dout(this, 20)
<< "rebuild_missing_set_with_deletes_crimson missing obj "
<< log_entry.soid
<< " for version = " << log_entry.version << dendl;
missing.add(
log_entry.soid,
log_entry.version,
oi.version,
log_entry.is_delete());
}
},
crimson::ct_error::enoent::handle([this, &log_entry] {
ldpp_dout(this, 20)
<< "rebuild_missing_set_with_deletes_crimson missing object "
<< log_entry.soid << dendl;
missing.add(
log_entry.soid,
log_entry.version,
eversion_t(),
log_entry.is_delete());
}),
crimson::ct_error::enodata::handle([] { ceph_abort("unexpected enodata"); })
).then([] {
return seastar::stop_iteration::no;
});
});
}).then([this] {
set_missing_may_contain_deletes();
});
}
#endif
7 changes: 7 additions & 0 deletions src/osd/PGLog.h
Original file line number Diff line number Diff line change
Expand Up @@ -943,6 +943,13 @@ struct PGLog : DoutPrefixProvider {
ObjectStore::CollectionHandle& ch,
const pg_info_t &info);

#ifdef WITH_SEASTAR
seastar::future<> rebuild_missing_set_with_deletes_crimson(
crimson::os::FuturizedStore &store,
crimson::os::CollectionRef ch,
const pg_info_t &info);
#endif

protected:
static void split_by_object(
mempool::osd_pglog::list<pg_log_entry_t> &entries,
Expand Down

0 comments on commit 3c63faf

Please sign in to comment.