Skip to content

Commit

Permalink
feat: change state set when trigger seed peer (#3003)
Browse files Browse the repository at this point in the history
Signed-off-by: Gaius <gaius.qi@gmail.com>
  • Loading branch information
gaius-qi authored Jan 9, 2024
1 parent 70dd9b1 commit dce1890
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 32 deletions.
3 changes: 1 addition & 2 deletions scheduler/resource/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,8 +373,7 @@ func (t *Task) HasAvailablePeer(blocklist set.SafeSet[string]) bool {
continue
}

if peer.FSM.Is(PeerStatePending) ||
peer.FSM.Is(PeerStateRunning) ||
if peer.FSM.Is(PeerStateRunning) ||
peer.FSM.Is(PeerStateSucceeded) ||
peer.FSM.Is(PeerStateBackToSource) {
hasAvailablePeer = true
Expand Down
11 changes: 0 additions & 11 deletions scheduler/resource/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1003,17 +1003,6 @@ func TestTask_HasAvailablePeer(t *testing.T) {
assert.Equal(task.HasAvailablePeer(blocklist), false)
},
},
{
name: "peer state is PeerStatePending",
expect: func(t *testing.T, task *Task, mockPeer *Peer) {
assert := assert.New(t)
task.StorePeer(mockPeer)
mockPeer.ID = idgen.PeerIDV2()
mockPeer.FSM.SetState(PeerStatePending)
task.StorePeer(mockPeer)
assert.Equal(task.HasAvailablePeer(set.NewSafeSet[string]()), true)
},
},
{
name: "peer state is PeerStateSucceeded",
expect: func(t *testing.T, task *Task, mockPeer *Peer) {
Expand Down
1 change: 1 addition & 0 deletions scheduler/scheduling/scheduling.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,7 @@ func (s *scheduling) filterCandidateParents(peer *resource.Peer, blocklist set.S
filterParentLimit = int(config.FilterParentLimit)
}
}
peer.Log.Debugf("filter parent limit is %d", filterParentLimit)

var (
candidateParents []*resource.Peer
Expand Down
39 changes: 20 additions & 19 deletions scheduler/service/service_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -817,7 +817,7 @@ func (v *V2) SyncProbes(stream schedulerv2.Scheduler_SyncProbesServer) error {
// handleRegisterPeerRequest handles RegisterPeerRequest of AnnouncePeerRequest.
func (v *V2) handleRegisterPeerRequest(ctx context.Context, stream schedulerv2.Scheduler_AnnouncePeerServer, hostID, taskID, peerID string, req *schedulerv2.RegisterPeerRequest) error {
// Handle resource included host, task, and peer.
host, task, peer, err := v.handleResource(ctx, stream, hostID, taskID, peerID, req.GetDownload())
_, task, peer, err := v.handleResource(ctx, stream, hostID, taskID, peerID, req.GetDownload())
if err != nil {
return err
}
Expand All @@ -827,29 +827,30 @@ func (v *V2) handleRegisterPeerRequest(ctx context.Context, stream schedulerv2.S
metrics.RegisterPeerCount.WithLabelValues(priority.String(), peer.Task.Type.String(),
peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc()

// When there are no available peers for a task, the scheduler needs to trigger
// the first task download in the p2p cluster.
blocklist := set.NewSafeSet[string]()
blocklist.Add(peer.ID)
if task.FSM.Is(resource.TaskStatePending) ||
download := proto.Clone(req.Download).(*commonv2.Download)
switch {
// If scheduler trigger seed peer download back-to-source,
// the needBackToSource flag should be true.
case download.GetNeedBackToSource():
peer.Log.Infof("peer need back to source")
peer.NeedBackToSource.Store(true)
// If task is pending, failed, leave, or succeeded and has no available peer,
// scheduler trigger seed peer download back-to-source.
case task.FSM.Is(resource.TaskStatePending) ||
task.FSM.Is(resource.TaskStateFailed) ||
task.FSM.Is(resource.TaskStateLeave) ||
task.FSM.Is(resource.TaskStateSucceeded) &&
!task.HasAvailablePeer(blocklist) {
download := proto.Clone(req.Download).(*commonv2.Download)
if download.GetNeedBackToSource() || host.Type != types.HostTypeNormal {
peer.Log.Infof("peer need back to source")
peer.NeedBackToSource.Store(true)
} else {
// If trigger the seed peer download back-to-source,
// the need back-to-source flag should be true.
download.NeedBackToSource = true
if err := v.downloadTaskBySeedPeer(ctx, taskID, download, peer); err != nil {
// Collect RegisterPeerFailureCount metrics.
metrics.RegisterPeerFailureCount.WithLabelValues(priority.String(), peer.Task.Type.String(),
peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc()
return err
}
!task.HasAvailablePeer(blocklist):
// If trigger the seed peer download back-to-source,
// the need back-to-source flag should be true.
download.NeedBackToSource = true
if err := v.downloadTaskBySeedPeer(ctx, taskID, download, peer); err != nil {
// Collect RegisterPeerFailureCount metrics.
metrics.RegisterPeerFailureCount.WithLabelValues(priority.String(), peer.Task.Type.String(),
peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc()
return err
}
}

Expand Down

0 comments on commit dce1890

Please sign in to comment.