Skip to content

Commit

Permalink
Refactor: remvoe Copy bound from NodeId
Browse files Browse the repository at this point in the history
The `NodeId` type is currently defined as:

```rust
type NodeId: .. + Copy + .. + 'static;
```

This commit removes the `Copy` bound from `NodeId`.
This modification will allow the use of non-`Copy` types as `NodeId`,
providing greater flexibility for applications that prefer
variable-length strings or other non-`Copy` types for node
identification.

This change maintain compatibility by updating derived `Copy`
implementations with manual implementations:

```rust
// Before
#[derive(Copy...)]
pub struct LogId<NID: NodeId> {}

// After
impl<NID: Copy> Copy for LogId<NID> {}
```

- Fix: databendlabs#1250
  • Loading branch information
drmingdrmer committed Oct 14, 2024
1 parent 3fe7d9a commit 77d40a2
Show file tree
Hide file tree
Showing 55 changed files with 464 additions and 425 deletions.
16 changes: 8 additions & 8 deletions examples/memstore/src/log_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,12 @@ impl<C: RaftTypeConfig> LogStoreInner<C> {
}

async fn get_log_state(&mut self) -> Result<LogState<C>, StorageError<C>> {
let last = self.log.iter().next_back().map(|(_, ent)| *ent.get_log_id());
let last = self.log.iter().next_back().map(|(_, ent)| ent.get_log_id().clone());

let last_purged = self.last_purged_log_id;
let last_purged = self.last_purged_log_id.clone();

let last = match last {
None => last_purged,
None => last_purged.clone(),
Some(x) => Some(x),
};

Expand All @@ -81,16 +81,16 @@ impl<C: RaftTypeConfig> LogStoreInner<C> {
}

async fn read_committed(&mut self) -> Result<Option<LogId<C::NodeId>>, StorageError<C>> {
Ok(self.committed)
Ok(self.committed.clone())
}

async fn save_vote(&mut self, vote: &Vote<C::NodeId>) -> Result<(), StorageError<C>> {
self.vote = Some(*vote);
self.vote = Some(vote.clone());
Ok(())
}

async fn read_vote(&mut self) -> Result<Option<Vote<C::NodeId>>, StorageError<C>> {
Ok(self.vote)
Ok(self.vote.clone())
}

async fn append<I>(&mut self, entries: I, callback: IOFlushed<C>) -> Result<(), StorageError<C>>
Expand All @@ -116,8 +116,8 @@ impl<C: RaftTypeConfig> LogStoreInner<C> {
async fn purge(&mut self, log_id: LogId<C::NodeId>) -> Result<(), StorageError<C>> {
{
let ld = &mut self.last_purged_log_id;
assert!(*ld <= Some(log_id));
*ld = Some(log_id);
assert!(ld.as_ref() <= Some(&log_id));
*ld = Some(log_id.clone());
}

{
Expand Down
4 changes: 2 additions & 2 deletions openraft/src/core/heartbeat/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ use crate::LogId;
use crate::RaftTypeConfig;

/// The information for broadcasting a heartbeat.
#[derive(Debug, Clone, Copy)]
#[derive(Debug, Clone)]
#[derive(PartialEq, Eq)]
pub struct HeartbeatEvent<C>
pub(crate) struct HeartbeatEvent<C>
where C: RaftTypeConfig
{
/// The timestamp when this heartbeat is sent.
Expand Down
8 changes: 4 additions & 4 deletions openraft/src/core/heartbeat/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,19 +69,19 @@ where C: RaftTypeConfig
{
for (target, node) in targets {
tracing::debug!("id={} spawn HeartbeatWorker target={}", self.id, target);
let network = network_factory.new_client(target, &node).await;
let network = network_factory.new_client(target.clone(), &node).await;

let worker = HeartbeatWorker {
id: self.id,
id: self.id.clone(),
rx: self.rx.clone(),
network,
target,
target: target.clone(),
node,
config: self.config.clone(),
tx_notification: tx_notification.clone(),
};

let span = tracing::span!(parent: &Span::current(), Level::DEBUG, "heartbeat", id=display(self.id), target=display(target));
let span = tracing::span!(parent: &Span::current(), Level::DEBUG, "heartbeat", id=display(&self.id), target=display(&target));

let (tx_shutdown, rx_shutdown) = C::oneshot();

Expand Down
10 changes: 5 additions & 5 deletions openraft/src/core/heartbeat/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ where
_ = self.rx.changed().fuse() => {},
}

let heartbeat: Option<HeartbeatEvent<C>> = *self.rx.borrow_watched();
let heartbeat: Option<HeartbeatEvent<C>> = self.rx.borrow_watched().clone();

// None is the initial value of the WatchReceiver, ignore it.
let Some(heartbeat) = heartbeat else {
Expand All @@ -82,9 +82,9 @@ where
let option = RPCOption::new(timeout);

let payload = AppendEntriesRequest {
vote: heartbeat.session_id.leader_vote.into_vote(),
vote: heartbeat.session_id.leader_vote.clone().into_vote(),
prev_log_id: None,
leader_commit: heartbeat.committed,
leader_commit: heartbeat.committed.clone(),
entries: vec![],
};

Expand All @@ -94,9 +94,9 @@ where
match res {
Ok(Ok(_)) => {
let res = self.tx_notification.send(Notification::HeartbeatProgress {
session_id: heartbeat.session_id,
session_id: heartbeat.session_id.clone(),
sending_time: heartbeat.time,
target: self.target,
target: self.target.clone(),
});

if res.is_err() {
Expand Down
Loading

0 comments on commit 77d40a2

Please sign in to comment.