Skip to content

Commit

Permalink
feat: evalutorBase adds networktopology (#3015)
Browse files Browse the repository at this point in the history
Signed-off-by: huangmin <2107139596@qq.com>
  • Loading branch information
MinH-09 authored Jan 29, 2024
1 parent 1b88b4b commit 0f9c400
Show file tree
Hide file tree
Showing 17 changed files with 1,418 additions and 264 deletions.
42 changes: 21 additions & 21 deletions scheduler/announcer/announcer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,11 @@ func TestAnnouncer_New(t *testing.T) {
mockTrainerClient := trainerclientmocks.NewMockV1(ctl)

tests := []struct {
name string
config *config.Config
option []Option
mock func(m *managerclientmocks.MockV2MockRecorder)
expect func(t *testing.T, announcer Announcer, err error)
name string
config *config.Config
options []Option
mock func(m *managerclientmocks.MockV2MockRecorder)
expect func(t *testing.T, announcer Announcer, err error)
}{
{
name: "new announcer",
Expand All @@ -82,7 +82,7 @@ func TestAnnouncer_New(t *testing.T) {
SchedulerClusterID: 1,
},
},
option: []Option{},
options: []Option{},
mock: func(m *managerclientmocks.MockV2MockRecorder) {
m.UpdateScheduler(gomock.Any(), gomock.Any()).Return(nil, nil).Times(1)
},
Expand Down Expand Up @@ -111,7 +111,7 @@ func TestAnnouncer_New(t *testing.T) {
SchedulerClusterID: 1,
},
},
option: []Option{WithTrainerClient(mockTrainerClient)},
options: []Option{WithTrainerClient(mockTrainerClient)},
mock: func(m *managerclientmocks.MockV2MockRecorder) {
m.UpdateScheduler(gomock.Any(), gomock.Any()).Return(nil, nil).Times(1)
},
Expand Down Expand Up @@ -141,7 +141,7 @@ func TestAnnouncer_New(t *testing.T) {
SchedulerClusterID: 1,
},
},
option: []Option{},
options: []Option{},
mock: func(m *managerclientmocks.MockV2MockRecorder) {
m.UpdateScheduler(gomock.Any(), gomock.Any()).Return(nil, errors.New("foo")).Times(1)
},
Expand All @@ -158,7 +158,7 @@ func TestAnnouncer_New(t *testing.T) {
mockStorage := storagemocks.NewMockStorage(ctl)
tc.mock(mockManagerClient.EXPECT())

a, err := New(tc.config, mockManagerClient, mockStorage, tc.option...)
a, err := New(tc.config, mockManagerClient, mockStorage, tc.options...)
tc.expect(t, a, err)
})
}
Expand All @@ -170,13 +170,13 @@ func TestAnnouncer_Serve(t *testing.T) {
mockTrainerClient := trainerclientmocks.NewMockV1(ctl)

tests := []struct {
name string
config *config.Config
data []byte
option []Option
sleep func()
mock func(stream trainerv1.Trainer_TrainClient, data []byte, m *managerclientmocks.MockV2MockRecorder, mtc *trainerclientmocks.MockV1MockRecorder, ms *storagemocks.MockStorageMockRecorder, mt *trainerv1mocks.MockTrainer_TrainClientMockRecorder)
except func(t *testing.T, a Announcer)
name string
config *config.Config
data []byte
options []Option
sleep func()
mock func(stream trainerv1.Trainer_TrainClient, data []byte, m *managerclientmocks.MockV2MockRecorder, mtc *trainerclientmocks.MockV1MockRecorder, ms *storagemocks.MockStorageMockRecorder, mt *trainerv1mocks.MockTrainer_TrainClientMockRecorder)
except func(t *testing.T, a Announcer)
}{
{
name: "started announcer server success",
Expand All @@ -202,8 +202,8 @@ func TestAnnouncer_Serve(t *testing.T) {
UploadTimeout: 10 * time.Second,
},
},
data: []byte("bar"),
option: []Option{WithTrainerClient(mockTrainerClient)},
data: []byte("bar"),
options: []Option{WithTrainerClient(mockTrainerClient)},
sleep: func() {
time.Sleep(3 * time.Second)
},
Expand Down Expand Up @@ -273,8 +273,8 @@ func TestAnnouncer_Serve(t *testing.T) {
SchedulerClusterID: 1,
},
},
data: []byte("bar"),
option: []Option{},
data: []byte("bar"),
options: []Option{},
sleep: func() {
time.Sleep(100 * time.Millisecond)
},
Expand Down Expand Up @@ -310,7 +310,7 @@ func TestAnnouncer_Serve(t *testing.T) {
mockStorage := storagemocks.NewMockStorage(ctl)

tc.mock(stream, tc.data, mockManagerClient.EXPECT(), mockTrainerClient.EXPECT(), mockStorage.EXPECT(), stream.EXPECT())
a, err := New(tc.config, mockManagerClient, mockStorage, tc.option...)
a, err := New(tc.config, mockManagerClient, mockStorage, tc.options...)
if err != nil {
t.Fatal(err)
}
Expand Down
62 changes: 30 additions & 32 deletions scheduler/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,6 @@ type Config struct {
// Network configuration.
Network NetworkConfig `yaml:"network" mapstructure:"network"`

// NetworkTopology configuration.
NetworkTopology NetworkTopologyConfig `yaml:"networkTopology" mapstructure:"networkTopology"`

// Trainer configuration.
Trainer TrainerConfig `yaml:"trainer" mapstructure:"trainer"`
}
Expand Down Expand Up @@ -130,6 +127,9 @@ type SchedulerConfig struct {

// GC configuration.
GC GCConfig `yaml:"gc" mapstructure:"gc"`

// NetworkTopology configuration.
NetworkTopology NetworkTopologyConfig `yaml:"networkTopology" mapstructure:"networkTopology"`
}

type DatabaseConfig struct {
Expand Down Expand Up @@ -331,9 +331,6 @@ type NetworkConfig struct {
}

type NetworkTopologyConfig struct {
// Enable network topology service, including probe, network topology collection.
Enable bool `yaml:"enable" mapstructure:"enable"`

// CollectInterval is the interval of collecting network topology.
CollectInterval time.Duration `mapstructure:"collectInterval" yaml:"collectInterval"`

Expand Down Expand Up @@ -396,6 +393,17 @@ func New() *Config {
HostGCInterval: DefaultSchedulerHostGCInterval,
HostTTL: DefaultSchedulerHostTTL,
},
NetworkTopology: NetworkTopologyConfig{
CollectInterval: DefaultSchedulerNetworkTopologyCollectInterval,
Probe: ProbeConfig{
QueueLength: DefaultSchedulerNetworkTopologyProbeQueueLength,
Count: DefaultSchedulerNetworkTopologyProbeCount,
},
Cache: CacheConfig{
Interval: DefaultSchedulerNetworkTopologyCacheInterval,
TTL: DefaultSchedulerNetworkTopologyCacheTLL,
},
},
},
Database: DatabaseConfig{
Redis: RedisConfig{
Expand Down Expand Up @@ -458,18 +466,6 @@ func New() *Config {
Network: NetworkConfig{
EnableIPv6: DefaultNetworkEnableIPv6,
},
NetworkTopology: NetworkTopologyConfig{
Enable: true,
CollectInterval: DefaultNetworkTopologyCollectInterval,
Probe: ProbeConfig{
QueueLength: DefaultProbeQueueLength,
Count: DefaultProbeCount,
},
Cache: CacheConfig{
Interval: DefaultNetworkTopologyCacheInterval,
TTL: DefaultNetworkTopologyCacheTLL,
},
},
Trainer: TrainerConfig{
Enable: false,
Addr: DefaultTrainerAddr,
Expand Down Expand Up @@ -639,24 +635,26 @@ func (cfg *Config) Validate() error {
}
}

if cfg.NetworkTopology.CollectInterval <= 0 {
return errors.New("networkTopology requires parameter collectInterval")
}
if cfg.Scheduler.Algorithm == NetworkTopologyAlgorithm {
if cfg.Scheduler.NetworkTopology.CollectInterval <= 0 {
return errors.New("networkTopology requires parameter collectInterval")
}

if cfg.NetworkTopology.Probe.QueueLength <= 0 {
return errors.New("probe requires parameter queueLength")
}
if cfg.Scheduler.NetworkTopology.Probe.QueueLength <= 0 {
return errors.New("probe requires parameter queueLength")
}

if cfg.NetworkTopology.Probe.Count <= 0 {
return errors.New("probe requires parameter count")
}
if cfg.Scheduler.NetworkTopology.Probe.Count <= 0 {
return errors.New("probe requires parameter count")
}

if cfg.NetworkTopology.Cache.Interval <= 0 {
return errors.New("networkTopology requires parameter interval")
}
if cfg.Scheduler.NetworkTopology.Cache.Interval <= 0 {
return errors.New("networkTopology requires parameter interval")
}

if cfg.NetworkTopology.Cache.TTL <= 0 {
return errors.New("networkTopology requires parameter ttl")
if cfg.Scheduler.NetworkTopology.Cache.TTL <= 0 {
return errors.New("networkTopology requires parameter ttl")
}
}

if cfg.Trainer.Enable {
Expand Down
Loading

0 comments on commit 0f9c400

Please sign in to comment.