Skip to content

Commit

Permalink
feat: optimize network topology in evaluator (#3053)
Browse files Browse the repository at this point in the history
Signed-off-by: Gaius <gaius.qi@gmail.com>
  • Loading branch information
gaius-qi authored Jan 29, 2024
1 parent 0f9c400 commit e1d8e05
Show file tree
Hide file tree
Showing 9 changed files with 55 additions and 47 deletions.
6 changes: 3 additions & 3 deletions scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ func New(ctx context.Context, cfg *config.Config, d dfpath.Dfpath) (*Server, err
}

// Initialize options of evaluator.
evaluatorOptions := []evaluator.Option{}
evaluatorNetworkTopologyOptions := []evaluator.NetworkTopologyOption{}
// Initialize network topology service.
if cfg.Scheduler.Algorithm == evaluator.NetworkTopologyAlgorithm {
cache := cache.New(cfg.Scheduler.NetworkTopology.Cache.TTL, cfg.Scheduler.NetworkTopology.Cache.Interval)
Expand All @@ -268,11 +268,11 @@ func New(ctx context.Context, cfg *config.Config, d dfpath.Dfpath) (*Server, err
return nil, err
}

evaluatorOptions = append(evaluatorOptions, evaluator.WithNetworkTopology(s.networkTopology))
evaluatorNetworkTopologyOptions = append(evaluatorNetworkTopologyOptions, evaluator.WithNetworkTopology(s.networkTopology))
}

// Initialize scheduling.
scheduling := scheduling.New(&cfg.Scheduler, dynconfig, d.PluginDir(), evaluatorOptions...)
scheduling := scheduling.New(&cfg.Scheduler, dynconfig, d.PluginDir(), evaluatorNetworkTopologyOptions...)

// Initialize server options of scheduler grpc server.
schedulerServerOptions := []grpc.ServerOption{}
Expand Down
16 changes: 10 additions & 6 deletions scheduler/scheduling/evaluator/evaluator.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,7 @@ const (
minAvailableCostLen = 2
)

type evaluator struct{}

// Evaluator is an interface that evaluates the parents.
type Evaluator interface {
// EvaluateParents sort parents by evaluating multiple feature scores.
EvaluateParents(parents []*resource.Peer, child *resource.Peer, taskPieceCount int32) []*resource.Peer
Expand All @@ -70,22 +69,27 @@ type Evaluator interface {
IsBadNode(peer *resource.Peer) bool
}

func New(algorithm string, pluginDir string, options ...Option) Evaluator {
// evaluator is an implementation of Evaluator.
type evaluator struct{}

// New returns a new Evaluator.
func New(algorithm string, pluginDir string, networkTopologyOptions ...NetworkTopologyOption) Evaluator {
switch algorithm {
case PluginAlgorithm:
if plugin, err := LoadPlugin(pluginDir); err == nil {
return plugin
}
case NetworkTopologyAlgorithm:
return NewEvaluatorNetworkTopology(options...)
return newEvaluatorNetworkTopology(networkTopologyOptions...)
// TODO Implement MLAlgorithm.
case MLAlgorithm, DefaultAlgorithm:
return NewEvaluatorBase()
return newEvaluatorBase()
}

return NewEvaluatorBase()
return newEvaluatorBase()
}

// IsBadNode determine if peer is a failed node.
func (e *evaluator) IsBadNode(peer *resource.Peer) bool {
if peer.FSM.Is(resource.PeerStateFailed) || peer.FSM.Is(resource.PeerStateLeave) || peer.FSM.Is(resource.PeerStatePending) ||
peer.FSM.Is(resource.PeerStateReceivedTiny) || peer.FSM.Is(resource.PeerStateReceivedSmall) ||
Expand Down
4 changes: 3 additions & 1 deletion scheduler/scheduling/evaluator/evaluator_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,13 @@ const (
locationAffinityWeight = 0.15
)

// evaluatorBase is an implementation of Evaluator.
type evaluatorBase struct {
evaluator
}

func NewEvaluatorBase() Evaluator {
// NewEvaluatorBase returns a new EvaluatorBase.
func newEvaluatorBase() Evaluator {
return &evaluatorBase{}
}

Expand Down
22 changes: 11 additions & 11 deletions scheduler/scheduling/evaluator/evaluator_base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ var (
mockPeerID = idgen.PeerIDV2()
)

func TestEvaluatorBase_NewEvaluatorBase(t *testing.T) {
func TestEvaluatorBase_newEvaluatorBase(t *testing.T) {
tests := []struct {
name string
expect func(t *testing.T, e any)
Expand All @@ -172,7 +172,7 @@ func TestEvaluatorBase_NewEvaluatorBase(t *testing.T) {

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
tc.expect(t, NewEvaluatorBase())
tc.expect(t, newEvaluatorBase())
})
}
}
Expand Down Expand Up @@ -334,7 +334,7 @@ func TestEvaluatorBase_EvaluateParents(t *testing.T) {

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
e := NewEvaluatorBase()
e := newEvaluatorBase()
tc.mock(tc.parents, tc.child)
tc.expect(t, e.EvaluateParents(tc.parents, tc.child, tc.totalPieceCount))
})
Expand Down Expand Up @@ -395,7 +395,7 @@ func TestEvaluatorBase_evaluate(t *testing.T) {

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
e := NewEvaluatorBase()
e := newEvaluatorBase()
tc.mock(tc.parent, tc.child)
tc.expect(t, e.(*evaluatorBase).evaluate(tc.parent, tc.child, tc.totalPieceCount))
})
Expand Down Expand Up @@ -515,7 +515,7 @@ func TestEvaluatorBase_calculatePieceScore(t *testing.T) {

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
e := NewEvaluatorBase()
e := newEvaluatorBase()
tc.mock(tc.parent, tc.child)
tc.expect(t, e.(*evaluatorBase).calculatePieceScore(tc.parent, tc.child, tc.totalPieceCount))
})
Expand Down Expand Up @@ -570,7 +570,7 @@ func TestEvaluatorBase_calculatehostUploadSuccessScore(t *testing.T) {
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength))
mockPeer := resource.NewPeer(mockPeerID, mockResourceConfig, mockTask, host)
e := NewEvaluatorBase()
e := newEvaluatorBase()
tc.mock(host)
tc.expect(t, e.(*evaluatorBase).calculateParentHostUploadSuccessScore(mockPeer))
})
Expand Down Expand Up @@ -620,7 +620,7 @@ func TestEvaluatorBase_calculateFreeUploadScore(t *testing.T) {
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength))
mockPeer := resource.NewPeer(mockPeerID, mockResourceConfig, mockTask, host)
e := NewEvaluatorBase()
e := newEvaluatorBase()
tc.mock(host, mockPeer)
tc.expect(t, e.(*evaluatorBase).calculateFreeUploadScore(host))
})
Expand Down Expand Up @@ -672,7 +672,7 @@ func TestEvaluatorBase_calculateHostTypeScore(t *testing.T) {
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength))
peer := resource.NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost)
e := NewEvaluatorBase()
e := newEvaluatorBase()
tc.mock(peer)
tc.expect(t, e.(*evaluatorBase).calculateHostTypeScore(peer))
})
Expand Down Expand Up @@ -748,7 +748,7 @@ func TestEvaluatorBase_calculateIDCAffinityScore(t *testing.T) {
srcHost := resource.NewHost(
mockRawSeedHost.ID, mockRawSeedHost.IP, mockRawSeedHost.Hostname,
mockRawSeedHost.Port, mockRawSeedHost.DownloadPort, mockRawSeedHost.Type)
e := NewEvaluatorBase()
e := newEvaluatorBase()
tc.mock(dstHost, srcHost)
tc.expect(t, e.(*evaluatorBase).calculateIDCAffinityScore(dstHost.Network.IDC, srcHost.Network.IDC))
})
Expand Down Expand Up @@ -874,7 +874,7 @@ func TestEvaluatorBase_calculateMultiElementAffinityScore(t *testing.T) {

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
e := NewEvaluatorBase()
e := newEvaluatorBase()
tc.expect(t, e.(*evaluatorBase).calculateMultiElementAffinityScore(tc.dst, tc.src))
})
}
Expand Down Expand Up @@ -1045,7 +1045,7 @@ func TestEvaluatorBase_IsBadNode(t *testing.T) {

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
e := NewEvaluatorBase()
e := newEvaluatorBase()
tc.mock(tc.peer)
tc.expect(t, e.IsBadNode(tc.peer))
})
Expand Down
10 changes: 6 additions & 4 deletions scheduler/scheduling/evaluator/evaluator_network_topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,26 +55,28 @@ const (
defaultPingTimeout = 1 * time.Second
)

// evaluatorNetworkTopology is an implementation of Evaluator.
type evaluatorNetworkTopology struct {
evaluator
networktopology networktopology.NetworkTopology
}

type Option func(e *evaluatorNetworkTopology)
// NetworkTopologyOption is a functional option for configuring the evaluatorNetworkTopology.
type NetworkTopologyOption func(e *evaluatorNetworkTopology)

// WithNetworkTopology sets the networkTopology.
func WithNetworkTopology(networktopology networktopology.NetworkTopology) Option {
func WithNetworkTopology(networktopology networktopology.NetworkTopology) NetworkTopologyOption {
return func(e *evaluatorNetworkTopology) {
e.networktopology = networktopology
}
}

func NewEvaluatorNetworkTopology(options ...Option) Evaluator {
func newEvaluatorNetworkTopology(options ...NetworkTopologyOption) Evaluator {
e := &evaluatorNetworkTopology{}

for _, opt := range options {
opt(e)
}

return e
}

Expand Down
22 changes: 11 additions & 11 deletions scheduler/scheduling/evaluator/evaluator_network_topology_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
"d7y.io/dragonfly/v2/scheduler/resource"
)

func TestEvaluatorNetworkTopology_NewEvaluatorNetworkTopology(t *testing.T) {
func TestEvaluatorNetworkTopology_newEvaluatorNetworkTopology(t *testing.T) {
tests := []struct {
name string
expect func(t *testing.T, e any)
Expand All @@ -53,7 +53,7 @@ func TestEvaluatorNetworkTopology_NewEvaluatorNetworkTopology(t *testing.T) {
ctl := gomock.NewController(t)
defer ctl.Finish()
mockNetworkTopology := networktopologymocks.NewMockNetworkTopology(ctl)
tc.expect(t, NewEvaluatorNetworkTopology(WithNetworkTopology(mockNetworkTopology)))
tc.expect(t, newEvaluatorNetworkTopology(WithNetworkTopology(mockNetworkTopology)))
})
}
}
Expand Down Expand Up @@ -254,7 +254,7 @@ func TestEvaluatorNetworkTopology_EvaluateParents(t *testing.T) {
defer ctl.Finish()
mockNetworkTopology := networktopologymocks.NewMockNetworkTopology(ctl)
mockProbe := networktopologymocks.NewMockProbes(ctl)
e := NewEvaluatorNetworkTopology(WithNetworkTopology(mockNetworkTopology))
e := newEvaluatorNetworkTopology(WithNetworkTopology(mockNetworkTopology))
tc.mock(tc.parents, tc.child, mockProbe, mockNetworkTopology.EXPECT(), mockProbe.EXPECT())
tc.expect(t, e.EvaluateParents(tc.parents, tc.child, tc.totalPieceCount))
})
Expand Down Expand Up @@ -323,7 +323,7 @@ func TestEvaluatorNetworkTopology_evaluate(t *testing.T) {
defer ctl.Finish()
mockNetworkTopology := networktopologymocks.NewMockNetworkTopology(ctl)
mockProbe := networktopologymocks.NewMockProbes(ctl)
e := NewEvaluatorNetworkTopology(WithNetworkTopology(mockNetworkTopology))
e := newEvaluatorNetworkTopology(WithNetworkTopology(mockNetworkTopology))
tc.mock(tc.parent, tc.child, mockProbe, mockProbe.EXPECT(), mockNetworkTopology.EXPECT())
tc.expect(t, e.(*evaluatorNetworkTopology).evaluate(tc.parent, tc.child, tc.totalPieceCount))
})
Expand Down Expand Up @@ -446,7 +446,7 @@ func TestEvaluatorNetworkTopology_calculatePieceScore(t *testing.T) {
ctl := gomock.NewController(t)
defer ctl.Finish()
mockNetworkTopology := networktopologymocks.NewMockNetworkTopology(ctl)
e := NewEvaluatorNetworkTopology(WithNetworkTopology(mockNetworkTopology))
e := newEvaluatorNetworkTopology(WithNetworkTopology(mockNetworkTopology))
tc.mock(tc.parent, tc.child)
tc.expect(t, e.(*evaluatorNetworkTopology).calculatePieceScore(tc.parent, tc.child, tc.totalPieceCount))
})
Expand Down Expand Up @@ -504,7 +504,7 @@ func TestEvaluatorNetworkTopology_calculatehostUploadSuccessScore(t *testing.T)
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength))
mockPeer := resource.NewPeer(mockPeerID, mockResourceConfig, mockTask, host)
e := NewEvaluatorNetworkTopology(WithNetworkTopology(mockNetworkTopology))
e := newEvaluatorNetworkTopology(WithNetworkTopology(mockNetworkTopology))
tc.mock(host)
tc.expect(t, e.(*evaluatorNetworkTopology).calculateParentHostUploadSuccessScore(mockPeer))
})
Expand Down Expand Up @@ -557,7 +557,7 @@ func TestEvaluatorNetworkTopology_calculateFreeUploadScore(t *testing.T) {
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength))
mockPeer := resource.NewPeer(mockPeerID, mockResourceConfig, mockTask, host)
e := NewEvaluatorNetworkTopology(WithNetworkTopology(mockNetworkTopology))
e := newEvaluatorNetworkTopology(WithNetworkTopology(mockNetworkTopology))
tc.mock(host, mockPeer)
tc.expect(t, e.(*evaluatorNetworkTopology).calculateFreeUploadScore(host))
})
Expand Down Expand Up @@ -612,7 +612,7 @@ func TestEvaluatorNetworkTopology_calculateHostTypeScore(t *testing.T) {
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength))
peer := resource.NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost)
e := NewEvaluatorNetworkTopology(WithNetworkTopology(mockNetworkTopology))
e := newEvaluatorNetworkTopology(WithNetworkTopology(mockNetworkTopology))
tc.mock(peer)
tc.expect(t, e.(*evaluatorNetworkTopology).calculateHostTypeScore(peer))
})
Expand Down Expand Up @@ -692,7 +692,7 @@ func TestEvaluatorNetworkTopology_calculateIDCAffinityScore(t *testing.T) {
mockRawSeedHost.ID, mockRawSeedHost.IP, mockRawSeedHost.Hostname,
mockRawSeedHost.Port, mockRawSeedHost.DownloadPort, mockRawSeedHost.Type)
tc.mock(dstHost, srcHost)
e := NewEvaluatorNetworkTopology(WithNetworkTopology(mockNetworkTopology))
e := newEvaluatorNetworkTopology(WithNetworkTopology(mockNetworkTopology))
tc.expect(t, e.(*evaluatorNetworkTopology).calculateIDCAffinityScore(dstHost.Network.IDC, srcHost.Network.IDC))
})
}
Expand Down Expand Up @@ -820,7 +820,7 @@ func TestEvaluatorNetworkTopology_calculateMultiElementAffinityScore(t *testing.
ctl := gomock.NewController(t)
defer ctl.Finish()
mockNetworkTopology := networktopologymocks.NewMockNetworkTopology(ctl)
e := NewEvaluatorNetworkTopology(WithNetworkTopology(mockNetworkTopology))
e := newEvaluatorNetworkTopology(WithNetworkTopology(mockNetworkTopology))
tc.expect(t, e.(*evaluatorNetworkTopology).calculateMultiElementAffinityScore(tc.dst, tc.src))
})
}
Expand Down Expand Up @@ -879,7 +879,7 @@ func TestEvaluatorNetworkTopology_calculateNetworkTopologyScore(t *testing.T) {
ctl := gomock.NewController(t)
defer ctl.Finish()
mockNetworkTopology := networktopologymocks.NewMockNetworkTopology(ctl)
e := NewEvaluatorNetworkTopology(WithNetworkTopology(mockNetworkTopology))
e := newEvaluatorNetworkTopology(WithNetworkTopology(mockNetworkTopology))
mockProbe := networktopologymocks.NewMockProbes(ctl)
tc.mock(tc.parent, tc.child, mockProbe, mockNetworkTopology.EXPECT(), mockProbe.EXPECT())
tc.expect(t, tc.parent, tc.child, e.(*evaluatorNetworkTopology).calculateNetworkTopologyScore(tc.parent.ID, tc.child.ID))
Expand Down
12 changes: 6 additions & 6 deletions scheduler/scheduling/evaluator/evaluator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,13 @@ func TestEvaluator_New(t *testing.T) {
tests := []struct {
name string
algorithm string
options []Option
options []NetworkTopologyOption
expect func(t *testing.T, e any)
}{
{
name: "new evaluator with default algorithm",
algorithm: "default",
options: []Option{},
options: []NetworkTopologyOption{},
expect: func(t *testing.T, e any) {
assert := assert.New(t)
assert.Equal(reflect.TypeOf(e).Elem().Name(), "evaluatorBase")
Expand All @@ -49,7 +49,7 @@ func TestEvaluator_New(t *testing.T) {
{
name: "new evaluator with machine learning algorithm",
algorithm: "ml",
options: []Option{},
options: []NetworkTopologyOption{},
expect: func(t *testing.T, e any) {
assert := assert.New(t)
assert.Equal(reflect.TypeOf(e).Elem().Name(), "evaluatorBase")
Expand All @@ -58,7 +58,7 @@ func TestEvaluator_New(t *testing.T) {
{
name: "new evaluator with plugin",
algorithm: "plugin",
options: []Option{},
options: []NetworkTopologyOption{},
expect: func(t *testing.T, e any) {
assert := assert.New(t)
assert.Equal(reflect.TypeOf(e).Elem().Name(), "evaluatorBase")
Expand All @@ -67,7 +67,7 @@ func TestEvaluator_New(t *testing.T) {
{
name: "new evaluator with empty string",
algorithm: "",
options: []Option{},
options: []NetworkTopologyOption{},
expect: func(t *testing.T, e any) {
assert := assert.New(t)
assert.Equal(reflect.TypeOf(e).Elem().Name(), "evaluatorBase")
Expand All @@ -76,7 +76,7 @@ func TestEvaluator_New(t *testing.T) {
{
name: "new evaluator with default algorithm and networkTopology",
algorithm: "nt",
options: []Option{WithNetworkTopology(mockNetworkTopology)},
options: []NetworkTopologyOption{WithNetworkTopology(mockNetworkTopology)},
expect: func(t *testing.T, e any) {
assert := assert.New(t)
assert.Equal(reflect.TypeOf(e).Elem().Name(), "evaluatorNetworkTopology")
Expand Down
4 changes: 2 additions & 2 deletions scheduler/scheduling/scheduling.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,9 @@ type scheduling struct {
dynconfig config.DynconfigInterface
}

func New(cfg *config.SchedulerConfig, dynconfig config.DynconfigInterface, pluginDir string, evaluatorOpts ...evaluator.Option) Scheduling {
func New(cfg *config.SchedulerConfig, dynconfig config.DynconfigInterface, pluginDir string, networkTopologyOptions ...evaluator.NetworkTopologyOption) Scheduling {
return &scheduling{
evaluator: evaluator.New(cfg.Algorithm, pluginDir, evaluatorOpts...),
evaluator: evaluator.New(cfg.Algorithm, pluginDir, networkTopologyOptions...),
config: cfg,
dynconfig: dynconfig,
}
Expand Down
6 changes: 3 additions & 3 deletions scheduler/scheduling/scheduling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,13 +199,13 @@ func TestScheduling_New(t *testing.T) {
tests := []struct {
name string
pluginDir string
options []evaluator.Option
options []evaluator.NetworkTopologyOption
expect func(t *testing.T, s any)
}{
{
name: "new scheduling",
pluginDir: "bar",
options: []evaluator.Option{},
options: []evaluator.NetworkTopologyOption{},
expect: func(t *testing.T, s any) {
assert := assert.New(t)
assert.Equal(reflect.TypeOf(s).Elem().Name(), "scheduling")
Expand All @@ -214,7 +214,7 @@ func TestScheduling_New(t *testing.T) {
{
name: "new scheduling with empty pluginDir",
pluginDir: "",
options: []evaluator.Option{},
options: []evaluator.NetworkTopologyOption{},
expect: func(t *testing.T, s any) {
assert := assert.New(t)
assert.Equal(reflect.TypeOf(s).Elem().Name(), "scheduling")
Expand Down

0 comments on commit e1d8e05

Please sign in to comment.