Skip to content

Commit

Permalink
feat: scheduler detects peer survivability and clears offline peers' …
Browse files Browse the repository at this point in the history
…metadata (#3353)

feat: scheduler detects peer survivability and clears offline peers

1. The Host GC method will remove hosts that did not announce in time and their peers' metadata.
2. The scheduler Host GC interval is lowered in order to detect non-surviving hosts in time.
3. Add unit tests.

Signed-off-by: BruceAko <chongzhi@hust.edu.cn>
  • Loading branch information
BruceAko authored Jul 4, 2024
1 parent 43ef996 commit 8153e57
Show file tree
Hide file tree
Showing 10 changed files with 112 additions and 5 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module d7y.io/dragonfly/v2
go 1.21

require (
d7y.io/api/v2 v2.0.123
d7y.io/api/v2 v2.0.126
github.com/MysteriousPotato/go-lockable v1.0.0
github.com/RichardKnop/machinery v1.10.8
github.com/Showmax/go-fqdn v1.0.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ cloud.google.com/go/storage v1.5.0/go.mod h1:tpKbwo567HUNpVclU5sGELwQWBDZ8gh0Zeo
cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohlUTyfDhBk=
cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs=
cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0=
d7y.io/api/v2 v2.0.123 h1:GGJy9DIaVYDHpS5PfiW2/Ad93jS4237uhmZx/lH9Zhc=
d7y.io/api/v2 v2.0.123/go.mod h1:5n5c+0oceb9/Ih4xL6UNRwQEZhBztiHMf4ghb+wGx4U=
d7y.io/api/v2 v2.0.126 h1:mlVZHBJwOQL9PZcnsVN9Etcru+rr2nbDXBuDC8v0PhY=
d7y.io/api/v2 v2.0.126/go.mod h1:5n5c+0oceb9/Ih4xL6UNRwQEZhBztiHMf4ghb+wGx4U=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
github.com/Azure/azure-sdk-for-go v16.2.1+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U=
Expand Down
2 changes: 1 addition & 1 deletion scheduler/config/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ const (
DefaultSchedulerTaskGCInterval = 30 * time.Minute

// DefaultSchedulerHostGCInterval is default interval for host gc.
DefaultSchedulerHostGCInterval = 6 * time.Hour
DefaultSchedulerHostGCInterval = 5 * time.Minute

// DefaultSchedulerHostTTL is default ttl for host.
DefaultSchedulerHostTTL = 1 * time.Hour
Expand Down
10 changes: 10 additions & 0 deletions scheduler/resource/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,13 @@ func WithBuild(build Build) HostOption {
}
}

// WithAnnounceInterval sets host's announce interval.
func WithAnnounceInterval(announceInterval time.Duration) HostOption {
return func(h *Host) {
h.AnnounceInterval = announceInterval
}
}

// Host contains content for host.
type Host struct {
// ID is host id.
Expand Down Expand Up @@ -178,6 +185,9 @@ type Host struct {
// SchedulerClusterID is the scheduler cluster id matched by scopes.
SchedulerClusterID uint64

// AnnounceInterval is the interval between host announces to scheduler.
AnnounceInterval time.Duration

// ConcurrentUploadLimit is concurrent upload limit count.
ConcurrentUploadLimit *atomic.Int32

Expand Down
14 changes: 13 additions & 1 deletion scheduler/resource/host_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package resource

import (
"sync"
"time"

"d7y.io/dragonfly/v2/pkg/container/set"
pkggc "d7y.io/dragonfly/v2/pkg/gc"
Expand Down Expand Up @@ -142,7 +143,7 @@ func (h *hostManager) LoadRandomHosts(n int, blocklist set.SafeSet[string]) []*H
return hosts
}

// Try to reclaim host.
// RunGC tries to reclaim host.
func (h *hostManager) RunGC() error {
h.Map.Range(func(_, value any) bool {
host, ok := value.(*Host)
Expand All @@ -151,11 +152,22 @@ func (h *hostManager) RunGC() error {
return true
}

// If the host's elapsed exceeds twice the announcing interval,
// then leave peers in host.
elapsed := time.Since(host.UpdatedAt.Load())
if host.AnnounceInterval > 0 && elapsed > host.AnnounceInterval*2 {
host.Log.Info("host elapsed exceeds twice the announce interval, causing the host to leave peers")
host.LeavePeers()
return true
}

// Reclaim the host.
if host.PeerCount.Load() == 0 &&
host.ConcurrentUploadCount.Load() == 0 &&
host.Type == types.HostTypeNormal {
host.Log.Info("host has been reclaimed")
h.Delete(host.ID)
return true
}

return true
Expand Down
24 changes: 24 additions & 0 deletions scheduler/resource/host_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,30 @@ func TestHostManager_RunGC(t *testing.T) {
assert.Equal(host.ID, mockSeedHost.ID)
},
},
{
name: "host elapsed exceeds twice the announce interval",
mock: func(m *gc.MockGCMockRecorder) {
m.Add(gomock.Any()).Return(nil).Times(1)
},
expect: func(t *testing.T, hostManager HostManager, mockHost *Host, mockPeer *Peer) {
assert := assert.New(t)
mockHost.AnnounceInterval = 1 * time.Microsecond
hostManager.Store(mockHost)
mockHost.StorePeer(mockPeer)
err := hostManager.RunGC()
assert.NoError(err)

mockHost.Peers.Range(func(_, value any) bool {
peer := value.(*Peer)
assert.True(peer.FSM.Is(PeerStateLeave))
return true
})

host, loaded := hostManager.Load(mockHost.ID)
assert.Equal(loaded, true)
assert.Equal(host.ID, mockHost.ID)
},
},
}

for _, tc := range tests {
Expand Down
42 changes: 42 additions & 0 deletions scheduler/resource/host_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ var (
Platform: "darwin",
}

mockAnnounceInterval = 5 * time.Minute

mockHostID = idgen.HostIDV2("127.0.0.1", "foo")
mockSeedHostID = idgen.HostIDV2("127.0.0.1", "bar")
mockHostLocation = "baz"
Expand All @@ -152,6 +154,7 @@ func TestHost_NewHost(t *testing.T) {
assert.Equal(host.DownloadPort, mockRawHost.DownloadPort)
assert.Equal(host.ObjectStoragePort, int32(0))
assert.Equal(host.SchedulerClusterID, uint64(0))
assert.Equal(host.AnnounceInterval, time.Duration(0))
assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit))
assert.Equal(host.ConcurrentUploadCount.Load(), int32(0))
assert.Equal(host.UploadCount.Load(), int64(0))
Expand All @@ -176,6 +179,7 @@ func TestHost_NewHost(t *testing.T) {
assert.Equal(host.DownloadPort, mockRawSeedHost.DownloadPort)
assert.Equal(host.ObjectStoragePort, int32(0))
assert.Equal(host.SchedulerClusterID, uint64(0))
assert.Equal(host.AnnounceInterval, time.Duration(0))
assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultSeedPeerConcurrentUploadLimit))
assert.Equal(host.ConcurrentUploadCount.Load(), int32(0))
assert.Equal(host.UploadCount.Load(), int64(0))
Expand Down Expand Up @@ -225,6 +229,7 @@ func TestHost_NewHost(t *testing.T) {
assert.Equal(host.DownloadPort, mockRawHost.DownloadPort)
assert.Equal(host.ObjectStoragePort, int32(1))
assert.Equal(host.SchedulerClusterID, uint64(0))
assert.Equal(host.AnnounceInterval, time.Duration(0))
assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit))
assert.Equal(host.ConcurrentUploadCount.Load(), int32(0))
assert.Equal(host.UploadCount.Load(), int64(0))
Expand All @@ -250,6 +255,7 @@ func TestHost_NewHost(t *testing.T) {
assert.Equal(host.DownloadPort, mockRawHost.DownloadPort)
assert.Equal(host.ObjectStoragePort, int32(0))
assert.Equal(host.SchedulerClusterID, uint64(0))
assert.Equal(host.AnnounceInterval, time.Duration(0))
assert.Equal(host.ConcurrentUploadLimit.Load(), int32(200))
assert.Equal(host.ConcurrentUploadCount.Load(), int32(0))
assert.Equal(host.UploadCount.Load(), int64(0))
Expand All @@ -276,6 +282,7 @@ func TestHost_NewHost(t *testing.T) {
assert.Equal(host.ObjectStoragePort, int32(0))
assert.Equal(host.OS, "linux")
assert.Equal(host.SchedulerClusterID, uint64(0))
assert.Equal(host.AnnounceInterval, time.Duration(0))
assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit))
assert.Equal(host.ConcurrentUploadCount.Load(), int32(0))
assert.Equal(host.UploadCount.Load(), int64(0))
Expand All @@ -302,6 +309,7 @@ func TestHost_NewHost(t *testing.T) {
assert.Equal(host.ObjectStoragePort, int32(0))
assert.Equal(host.Platform, "ubuntu")
assert.Equal(host.SchedulerClusterID, uint64(0))
assert.Equal(host.AnnounceInterval, time.Duration(0))
assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit))
assert.Equal(host.ConcurrentUploadCount.Load(), int32(0))
assert.Equal(host.UploadCount.Load(), int64(0))
Expand All @@ -328,6 +336,7 @@ func TestHost_NewHost(t *testing.T) {
assert.Equal(host.ObjectStoragePort, int32(0))
assert.Equal(host.PlatformFamily, "debian")
assert.Equal(host.SchedulerClusterID, uint64(0))
assert.Equal(host.AnnounceInterval, time.Duration(0))
assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit))
assert.Equal(host.ConcurrentUploadCount.Load(), int32(0))
assert.Equal(host.UploadCount.Load(), int64(0))
Expand All @@ -353,6 +362,7 @@ func TestHost_NewHost(t *testing.T) {
assert.Equal(host.DownloadPort, mockRawHost.DownloadPort)
assert.Equal(host.ObjectStoragePort, int32(0))
assert.Equal(host.PlatformVersion, "22.04")
assert.Equal(host.AnnounceInterval, time.Duration(0))
assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit))
assert.Equal(host.ConcurrentUploadCount.Load(), int32(0))
assert.Equal(host.UploadCount.Load(), int64(0))
Expand All @@ -379,6 +389,7 @@ func TestHost_NewHost(t *testing.T) {
assert.Equal(host.ObjectStoragePort, int32(0))
assert.Equal(host.KernelVersion, "5.15.0-27-generic")
assert.Equal(host.SchedulerClusterID, uint64(0))
assert.Equal(host.AnnounceInterval, time.Duration(0))
assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit))
assert.Equal(host.ConcurrentUploadCount.Load(), int32(0))
assert.Equal(host.UploadCount.Load(), int64(0))
Expand All @@ -405,6 +416,7 @@ func TestHost_NewHost(t *testing.T) {
assert.Equal(host.ObjectStoragePort, int32(0))
assert.EqualValues(host.CPU, mockCPU)
assert.Equal(host.SchedulerClusterID, uint64(0))
assert.Equal(host.AnnounceInterval, time.Duration(0))
assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit))
assert.Equal(host.ConcurrentUploadCount.Load(), int32(0))
assert.Equal(host.UploadCount.Load(), int64(0))
Expand All @@ -431,6 +443,7 @@ func TestHost_NewHost(t *testing.T) {
assert.Equal(host.ObjectStoragePort, int32(0))
assert.EqualValues(host.Memory, mockMemory)
assert.Equal(host.SchedulerClusterID, uint64(0))
assert.Equal(host.AnnounceInterval, time.Duration(0))
assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit))
assert.Equal(host.ConcurrentUploadCount.Load(), int32(0))
assert.Equal(host.UploadCount.Load(), int64(0))
Expand All @@ -457,6 +470,7 @@ func TestHost_NewHost(t *testing.T) {
assert.Equal(host.ObjectStoragePort, int32(0))
assert.EqualValues(host.Network, mockNetwork)
assert.Equal(host.SchedulerClusterID, uint64(0))
assert.Equal(host.AnnounceInterval, time.Duration(0))
assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit))
assert.Equal(host.ConcurrentUploadCount.Load(), int32(0))
assert.Equal(host.UploadCount.Load(), int64(0))
Expand All @@ -483,6 +497,7 @@ func TestHost_NewHost(t *testing.T) {
assert.Equal(host.ObjectStoragePort, int32(0))
assert.EqualValues(host.Disk, mockDisk)
assert.Equal(host.SchedulerClusterID, uint64(0))
assert.Equal(host.AnnounceInterval, time.Duration(0))
assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit))
assert.Equal(host.ConcurrentUploadCount.Load(), int32(0))
assert.Equal(host.UploadCount.Load(), int64(0))
Expand All @@ -509,6 +524,33 @@ func TestHost_NewHost(t *testing.T) {
assert.Equal(host.ObjectStoragePort, int32(0))
assert.EqualValues(host.Build, mockBuild)
assert.Equal(host.SchedulerClusterID, uint64(0))
assert.Equal(host.AnnounceInterval, time.Duration(0))
assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit))
assert.Equal(host.ConcurrentUploadCount.Load(), int32(0))
assert.Equal(host.UploadCount.Load(), int64(0))
assert.Equal(host.UploadFailedCount.Load(), int64(0))
assert.NotNil(host.Peers)
assert.Equal(host.PeerCount.Load(), int32(0))
assert.NotEmpty(host.CreatedAt.Load())
assert.NotEmpty(host.UpdatedAt.Load())
assert.NotNil(host.Log)
},
},
{
name: "new host and set announce interval",
rawHost: mockRawHost,
options: []HostOption{WithAnnounceInterval(mockAnnounceInterval)},
expect: func(t *testing.T, host *Host) {
assert := assert.New(t)
assert.Equal(host.ID, mockRawHost.ID)
assert.Equal(host.Type, types.HostTypeNormal)
assert.Equal(host.Hostname, mockRawHost.Hostname)
assert.Equal(host.IP, mockRawHost.IP)
assert.Equal(host.Port, mockRawHost.Port)
assert.Equal(host.DownloadPort, mockRawHost.DownloadPort)
assert.Equal(host.ObjectStoragePort, int32(0))
assert.Equal(host.SchedulerClusterID, uint64(0))
assert.Equal(host.AnnounceInterval, 5*time.Minute)
assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit))
assert.Equal(host.ConcurrentUploadCount.Load(), int32(0))
assert.Equal(host.UploadCount.Load(), int64(0))
Expand Down
2 changes: 2 additions & 0 deletions scheduler/service/service_v1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,8 @@ var (
Platform: "darwin",
}

mockInterval = durationpb.New(5 * time.Minute).AsDuration()

mockPeerHost = &schedulerv1.PeerHost{
Id: mockHostID,
Ip: "127.0.0.1",
Expand Down
8 changes: 8 additions & 0 deletions scheduler/service/service_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -584,6 +584,10 @@ func (v *V2) AnnounceHost(ctx context.Context, req *schedulerv2.AnnounceHostRequ
options = append(options, resource.WithSchedulerClusterID(uint64(v.config.Manager.SchedulerClusterID)))
}

if req.GetInterval() != nil {
options = append(options, resource.WithAnnounceInterval(req.GetInterval().AsDuration()))
}

host = resource.NewHost(
req.Host.GetId(), req.Host.GetIp(), req.Host.GetHostname(),
req.Host.GetPort(), req.Host.GetDownloadPort(), types.HostType(req.Host.GetType()),
Expand Down Expand Up @@ -673,6 +677,10 @@ func (v *V2) AnnounceHost(ctx context.Context, req *schedulerv2.AnnounceHostRequ
}
}

if req.GetInterval() != nil {
host.AnnounceInterval = req.GetInterval().AsDuration()
}

return nil
}

Expand Down
9 changes: 9 additions & 0 deletions scheduler/service/service_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"strconv"
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
"go.uber.org/mock/gomock"
Expand Down Expand Up @@ -488,6 +489,7 @@ func TestServiceV2_AnnounceHost(t *testing.T) {
Platform: &mockBuild.Platform,
},
},
Interval: durationpb.New(5 * time.Minute),
},
run: func(t *testing.T, svc *V2, req *schedulerv2.AnnounceHostRequest, host *resource.Host, hostManager resource.HostManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
gomock.InOrder(
Expand All @@ -512,6 +514,7 @@ func TestServiceV2_AnnounceHost(t *testing.T) {
assert.EqualValues(host.Network, mockNetwork)
assert.EqualValues(host.Disk, mockDisk)
assert.EqualValues(host.Build, mockBuild)
assert.EqualValues(host.AnnounceInterval, mockInterval)
assert.Equal(host.ConcurrentUploadLimit.Load(), int32(10))
assert.Equal(host.ConcurrentUploadCount.Load(), int32(0))
assert.Equal(host.UploadCount.Load(), int64(0))
Expand Down Expand Up @@ -592,6 +595,7 @@ func TestServiceV2_AnnounceHost(t *testing.T) {
Platform: &mockBuild.Platform,
},
},
Interval: durationpb.New(5 * time.Minute),
},
run: func(t *testing.T, svc *V2, req *schedulerv2.AnnounceHostRequest, host *resource.Host, hostManager resource.HostManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
gomock.InOrder(
Expand All @@ -616,6 +620,7 @@ func TestServiceV2_AnnounceHost(t *testing.T) {
assert.EqualValues(host.Network, mockNetwork)
assert.EqualValues(host.Disk, mockDisk)
assert.EqualValues(host.Build, mockBuild)
assert.EqualValues(host.AnnounceInterval, mockInterval)
assert.Equal(host.ConcurrentUploadLimit.Load(), int32(200))
assert.Equal(host.ConcurrentUploadCount.Load(), int32(0))
assert.Equal(host.UploadCount.Load(), int64(0))
Expand Down Expand Up @@ -696,6 +701,7 @@ func TestServiceV2_AnnounceHost(t *testing.T) {
Platform: &mockBuild.Platform,
},
},
Interval: durationpb.New(5 * time.Minute),
},
run: func(t *testing.T, svc *V2, req *schedulerv2.AnnounceHostRequest, host *resource.Host, hostManager resource.HostManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
gomock.InOrder(
Expand All @@ -721,6 +727,7 @@ func TestServiceV2_AnnounceHost(t *testing.T) {
assert.EqualValues(host.Network, mockNetwork)
assert.EqualValues(host.Disk, mockDisk)
assert.EqualValues(host.Build, mockBuild)
assert.EqualValues(host.AnnounceInterval, mockInterval)
assert.Equal(host.ConcurrentUploadLimit.Load(), int32(10))
assert.Equal(host.ConcurrentUploadCount.Load(), int32(0))
assert.Equal(host.UploadCount.Load(), int64(0))
Expand Down Expand Up @@ -796,6 +803,7 @@ func TestServiceV2_AnnounceHost(t *testing.T) {
Platform: &mockBuild.Platform,
},
},
Interval: durationpb.New(5 * time.Minute),
},
run: func(t *testing.T, svc *V2, req *schedulerv2.AnnounceHostRequest, host *resource.Host, hostManager resource.HostManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
gomock.InOrder(
Expand All @@ -821,6 +829,7 @@ func TestServiceV2_AnnounceHost(t *testing.T) {
assert.EqualValues(host.Network, mockNetwork)
assert.EqualValues(host.Disk, mockDisk)
assert.EqualValues(host.Build, mockBuild)
assert.EqualValues(host.AnnounceInterval, mockInterval)
assert.Equal(host.ConcurrentUploadLimit.Load(), int32(200))
assert.Equal(host.ConcurrentUploadCount.Load(), int32(0))
assert.Equal(host.UploadCount.Load(), int64(0))
Expand Down

0 comments on commit 8153e57

Please sign in to comment.