Skip to content

Commit

Permalink
fix(worker): fix concurrent map iteration and map write (#698)
Browse files Browse the repository at this point in the history
Co-authored-by: zhenghaoz <zhangzhenghao@hotmail.com>
  • Loading branch information
github-actions[bot] and zhenghaoz authored Jun 4, 2023
1 parent 3bfac13 commit 80ed93d
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 0 deletions.
2 changes: 2 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,7 @@ func (config *Config) OfflineRecommendDigest(option ...DigestOption) string {
})

var builder strings.Builder
config.Recommend.Offline.Lock()
builder.WriteString(fmt.Sprintf("%v-%v-%v-%v-%v-%v-%v-%v",
config.Recommend.Offline.ExploreRecommend,
config.Recommend.Offline.EnableLatestRecommend,
Expand All @@ -344,6 +345,7 @@ func (config *Config) OfflineRecommendDigest(option ...DigestOption) string {
options.enableRanking,
config.Recommend.Replacement.EnableReplacement,
))
config.Recommend.Offline.UnLock()
if config.Recommend.Offline.EnablePopularRecommend {
builder.WriteString(fmt.Sprintf("-%v", config.Recommend.Popular.PopularWindow))
}
Expand Down
49 changes: 49 additions & 0 deletions worker/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -727,6 +727,55 @@ func TestWorker_Sync(t *testing.T) {
done <- struct{}{}
}

func TestWorker_SyncRecommend(t *testing.T) {
cfg := config.GetDefaultConfig()
cfg.Recommend.Offline.ExploreRecommend = map[string]float64{"popular": 0.5}
master := newMockMaster(t)
master.meta.Config = marshal(t, cfg)
go master.Start(t)
address := <-master.addr
conn, err := grpc.Dial(address, grpc.WithTransportCredentials(insecure.NewCredentials()))
assert.NoError(t, err)
worker := &Worker{
Settings: config.NewSettings(),
jobs: 1,
testMode: true,
masterClient: protocol.NewMasterClient(conn),
syncedChan: parallel.NewConditionChannel(),
ticker: time.NewTicker(time.Minute),
}
worker.Sync()

stopSync := make(chan struct{})
go func() {
for {
select {
case <-stopSync:
return
default:
worker.Sync()
}
}
}()

stopRecommend := make(chan struct{})
go func() {
for {
select {
case <-stopRecommend:
return
default:
worker.Settings.Config.OfflineRecommendDigest()
}
}
}()

time.Sleep(time.Second)
stopSync <- struct{}{}
stopRecommend <- struct{}{}
master.Stop()
}

type mockFactorizationMachine struct {
click.BaseFactorizationMachine
}
Expand Down

0 comments on commit 80ed93d

Please sign in to comment.