Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Graceful: Xorm, RepoIndexer, Cron and Others #9282

Merged
merged 41 commits into from
Dec 15, 2019
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
5b3fa6d
Graceful: use GetManager instead of global
zeripath Dec 7, 2019
00ddf85
Graceful: Make repo indexer shutdown gracefully
zeripath Dec 7, 2019
b2dea35
Graceful: Make the cron tasks graceful
zeripath Nov 15, 2019
c0ea8ef
Graceful: Make TestPullRequests shutdownable
zeripath Nov 13, 2019
7beda2d
Graceful: AddTestPullRequest run in graceful ctx
zeripath Dec 7, 2019
db022dd
Graceful: SetDefaultContext for Xorm to be HammerContext
zeripath Dec 8, 2019
1aafb56
Avoid starting graceful for migrate commands and checkout
zeripath Dec 8, 2019
eae3144
Merge branch 'master' into graceful-more-things-graceful
zeripath Dec 10, 2019
7589a49
Graceful: DeliverHooks now can be shutdown
zeripath Dec 9, 2019
89154f5
Graceful: SyncMirrors shutdown
zeripath Dec 9, 2019
9b5d465
Merge branch 'master' into graceful-more-things-graceful
zeripath Dec 11, 2019
d7d6c86
Remove unnecessary ctx check
zeripath Dec 12, 2019
1a79b2f
Fix hammer syncing
zeripath Dec 12, 2019
7ce742c
rename manager.run to manager.start
zeripath Dec 12, 2019
fa751ea
adjust ctx.Done check
zeripath Dec 12, 2019
19f2ca7
Prevent deadlock in mirror.Update on shutdown
zeripath Dec 12, 2019
ef9db2d
Lint doesn't permit passing in ctx nil
zeripath Dec 12, 2019
90bde75
Add FIXME note to indexer/code/bleve.go
zeripath Dec 12, 2019
13a808d
Say CheckRepoStats: Aborting due to Shutdown instead
zeripath Dec 12, 2019
f1c85e0
Update modules/sync/unique_queue.go
zeripath Dec 12, 2019
aa2dabd
Merge branch 'graceful-more-things-graceful' of github.com:zeripath/g…
zeripath Dec 13, 2019
d55db9e
Make channels at start up rather than delayed
zeripath Dec 13, 2019
398ab3b
Merge branch 'master' into graceful-more-things-graceful
zeripath Dec 13, 2019
c93f1a7
Make repo indexer shutdown safely
zeripath Dec 13, 2019
4411cae
Update manager.go
zeripath Dec 13, 2019
5c2b081
Merge branch 'master' into graceful-more-things-graceful
zeripath Dec 14, 2019
9faf8da
Merge branch 'master' into graceful-more-things-graceful
lunny Dec 14, 2019
cb2ac9b
oops
zeripath Dec 14, 2019
607f2c2
Better error reporting
zeripath Dec 14, 2019
a4a722f
fixup
zeripath Dec 14, 2019
a9269a5
Push TestPullRequests to the UniqueQueue and make UniqueQueue closable
zeripath Dec 14, 2019
3b97005
Ensure webhook queue is also closed to prevent blockage on add here
zeripath Dec 14, 2019
c227a34
Remove unnecessary channel check
zeripath Dec 14, 2019
0e38ea8
Double check that we are not trying to add id to the table again
zeripath Dec 14, 2019
244b54a
Sort results of getunindexrepos by descending id
zeripath Dec 14, 2019
2098dd2
Ensure repos only added once to queue
zeripath Dec 14, 2019
1374ad2
Merge branch 'master' into graceful-more-things-graceful
zeripath Dec 14, 2019
d42cf0d
D'oh
zeripath Dec 15, 2019
be4388c
Skip duplicate exist check in addfunc
zeripath Dec 15, 2019
3e79dae
Merge branch 'master' into graceful-more-things-graceful
lunny Dec 15, 2019
caeecd2
Merge branch 'master' into graceful-more-things-graceful
zeripath Dec 15, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions models/pull_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,12 @@ func GetUnmergedPullRequestsByBaseInfo(repoID int64, branch string) ([]*PullRequ
Find(&prs)
}

// GetPullRequestsByCheckStatus returns all pull requests according the special checking status.
func GetPullRequestsByCheckStatus(status PullRequestStatus) ([]*PullRequest, error) {
prs := make([]*PullRequest, 0, 10)
return prs, x.
// GetPullRequestIDsByCheckStatus returns all pull requests according the special checking status.
func GetPullRequestIDsByCheckStatus(status PullRequestStatus) ([]int64, error) {
prs := make([]int64, 0, 10)
return prs, x.Table("pull_request").
Where("status=?", status).
Cols("pull_request.id").
Find(&prs)
}

Expand Down
17 changes: 11 additions & 6 deletions models/repo_indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
package models

import (
"fmt"

"xorm.io/builder"
)

Expand All @@ -22,10 +24,10 @@ func GetUnindexedRepos(maxRepoID int64, page, pageSize int) ([]int64, error) {
cond := builder.Cond(builder.IsNull{
"repo_indexer_status.id",
})
sess := x.Table("repo").Join("LEFT OUTER", "repo_indexer_status", "repo.id = repo_indexer_status.repoID")
sess := x.Table("repository").Join("LEFT OUTER", "repo_indexer_status", "repository.id = repo_indexer_status.repo_id")
if maxRepoID > 0 {
cond = builder.And(cond, builder.Lte{
"repo.id": maxRepoID,
"repository.id": maxRepoID,
})
}
if page >= 0 && pageSize > 0 {
Expand All @@ -36,7 +38,7 @@ func GetUnindexedRepos(maxRepoID int64, page, pageSize int) ([]int64, error) {
sess.Limit(pageSize, start)
}

sess.Where(cond).Cols("repo.id")
sess.Where(cond).Cols("repository.id")
err := sess.Find(&ids)
return ids, err
}
Expand All @@ -60,15 +62,18 @@ func (repo *Repository) GetIndexerStatus() error {
// UpdateIndexerStatus updates indexer status
func (repo *Repository) UpdateIndexerStatus(sha string) error {
if err := repo.GetIndexerStatus(); err != nil {
return err
return fmt.Errorf("UpdateIndexerStatus: Unable to getIndexerStatus for repo: %s/%s Error: %v", repo.MustOwnerName(), repo.Name, err)
}
if len(repo.IndexerStatus.CommitSha) == 0 {
repo.IndexerStatus.CommitSha = sha
_, err := x.Insert(repo.IndexerStatus)
return err
return fmt.Errorf("UpdateIndexerStatus: Unable to insert repoIndexerStatus for repo: %s/%s Sha: %s Error: %v", repo.MustOwnerName(), repo.Name, sha, err)
}
repo.IndexerStatus.CommitSha = sha
_, err := x.ID(repo.IndexerStatus.ID).Cols("commit_sha").
Update(repo.IndexerStatus)
return err
if err != nil {
return fmt.Errorf("UpdateIndexerStatus: Unable to update repoIndexerStatus for repo: %s/%s Sha: %s Error: %v", repo.MustOwnerName(), repo.Name, sha, err)
}
return nil
}
12 changes: 6 additions & 6 deletions modules/indexer/code/bleve.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,33 +124,33 @@ func populateRepoIndexer(maxRepoID int64) {
func updateRepoIndexer(repoID int64) error {
repo, err := models.GetRepositoryByID(repoID)
if err != nil {
return err
return fmt.Errorf("UpdateRepoIndexer: Unable to GetRepositoryByID: %d, Error: %v", repoID, err)
}

sha, err := getDefaultBranchSha(repo)
if err != nil {
return err
return fmt.Errorf("UpdateRepoIndexer: Unable to GetDefaultBranchSha for: %s/%s, Error: %v", repo.MustOwnerName(), repo.Name, err)
}
changes, err := getRepoChanges(repo, sha)
if err != nil {
return err
return fmt.Errorf("UpdateRepoIndexer: Unable to GetRepoChanges for: %s/%s Sha: %s Error: %v", repo.MustOwnerName(), repo.Name, sha, err)
} else if changes == nil {
return nil
}

batch := RepoIndexerBatch()
for _, update := range changes.Updates {
if err := addUpdate(update, repo, batch); err != nil {
return err
return fmt.Errorf("UpdateRepoIndexer: Unable to addUpdate to: %s/%s Sha: %s, update: %s(%s) Error: %v", repo.MustOwnerName(), repo.Name, sha, update.Filename, update.BlobSha, err)
}
}
for _, filename := range changes.RemovedFilenames {
if err := addDelete(filename, repo, batch); err != nil {
return err
return fmt.Errorf("UpdateRepoIndexer: Unable to addDelete to: %s/%s Sha: %s, filename: %s Error: %v", repo.MustOwnerName(), repo.Name, sha, filename, err)
}
}
if err = batch.Flush(); err != nil {
return err
return fmt.Errorf("UpdateRepoIndexer: Unable to flush batch to indexer for repo: %s/%s Error: %v", repo.MustOwnerName(), repo.Name, err)
}
return repo.UpdateIndexerStatus(sha)
}
Expand Down
63 changes: 41 additions & 22 deletions modules/sync/unique_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@
package sync

import (
"context"

"github.com/unknwon/com"
)

Expand All @@ -18,8 +16,9 @@ import (
// This queue is particularly useful for preventing duplicated task
// of same purpose.
type UniqueQueue struct {
table *StatusTable
queue chan string
table *StatusTable
queue chan string
closed chan struct{}
}

// NewUniqueQueue initializes and returns a new UniqueQueue object.
Expand All @@ -29,11 +28,43 @@ func NewUniqueQueue(queueLength int) *UniqueQueue {
}

return &UniqueQueue{
table: NewStatusTable(),
queue: make(chan string, queueLength),
table: NewStatusTable(),
queue: make(chan string, queueLength),
closed: make(chan struct{}),
}
}

// Close closes this queue
func (q *UniqueQueue) Close() {
select {
case <-q.closed:
default:
q.table.lock.Lock()
select {
case <-q.closed:
default:
close(q.closed)
}
q.table.lock.Unlock()
}
}

// IsClosed returns a channel that is closed when this Queue is closed
func (q *UniqueQueue) IsClosed() <-chan struct{} {
return q.closed
}

// IDs returns the current ids in the pool
func (q *UniqueQueue) IDs() []interface{} {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is this needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not needed in this PR

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it returns a snapshot of the IDs of an ever-changing queue (instead of draining it), I'd suggest renaming it to GetIDsSnapshot() or something. Otherwise, when it's used for another PR its functionality will probably be overlooked and not reviewed properly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Heh, it's to be used once the queue is closed... Patience my friend..

q.table.lock.Lock()
defer q.table.lock.Unlock()
ids := make([]interface{}, 0, len(q.table.pool))
for id := range q.table.pool {
ids = append(ids, id)
}
return ids
}

// Queue returns channel of queue for retrieving instances.
func (q *UniqueQueue) Queue() <-chan string {
return q.queue
Expand All @@ -48,15 +79,8 @@ func (q *UniqueQueue) Exist(id interface{}) bool {
// AddFunc adds new instance to the queue with a custom runnable function,
// the queue is blocked until the function exits.
func (q *UniqueQueue) AddFunc(id interface{}, fn func()) {
q.AddCtxFunc(context.Background(), id, fn)
}

// AddCtxFunc adds new instance to the queue with a custom runnable function,
// the queue is blocked until the function exits. If the context is done before
// the id is added to the queue it will not be added and false will be returned.
func (q *UniqueQueue) AddCtxFunc(ctx context.Context, id interface{}, fn func()) bool {
if q.Exist(id) {
return true
return
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a potential race condition when two goroutines attempt to add functions with the same ID and neither exists before this line, but they keep going end are both ran below. I know we've tried to avoid duplicating Exist() because of the lock, but I think the check for existance should be done inside the same lock as the rest of the function.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Damn. I didn't actually check that this code was syncing correctly. I need to stop assuming that this has been done correctly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK I've decided do a double check as the pool has a RWMutex.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(This has made me realise this code needs to be entirely eaten by the queue stuff in the WIP)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK I've decided do a double check as the pool has a RWMutex.

Mutexes are pretty fast devices. The chances are high that we'll have to enter the write lock anyway. so I'd skip the read lock. There's no harm in "write locking" without doing an actual write. 😁

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean: I'd skip the read lock. 😄


idStr := com.ToStr(id)
Expand All @@ -67,10 +91,10 @@ func (q *UniqueQueue) AddCtxFunc(ctx context.Context, id interface{}, fn func())
}
q.table.lock.Unlock()
select {
case <-ctx.Done():
return false
case <-q.closed:
return
case q.queue <- idStr:
return true
return
}
}

Expand All @@ -79,11 +103,6 @@ func (q *UniqueQueue) Add(id interface{}) {
q.AddFunc(id, nil)
}

// AddCtx adds new instance to the queue with a context - if the context is done before the id is added to the queue it is cancelled
func (q *UniqueQueue) AddCtx(ctx context.Context, id interface{}) bool {
return q.AddCtxFunc(ctx, id, nil)
}

// Remove removes instance from the queue.
func (q *UniqueQueue) Remove(id interface{}) {
q.table.Stop(com.ToStr(id))
Expand Down
1 change: 1 addition & 0 deletions modules/webhook/deliver.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ func DeliverHooks(ctx context.Context) {
for {
select {
case <-ctx.Done():
hookQueue.Close()
return
case repoIDStr := <-hookQueue.Queue():
log.Trace("DeliverHooks [repo_id: %v]", repoIDStr)
Expand Down
3 changes: 2 additions & 1 deletion services/mirror/mirror.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ func Update(ctx context.Context) {
case <-ctx.Done():
return fmt.Errorf("Aborted due to shutdown")
default:
_ = mirrorQueue.AddCtx(ctx, m.RepoID)
mirrorQueue.Add(m.RepoID)
zeripath marked this conversation as resolved.
Show resolved Hide resolved
return nil
}
}); err != nil {
Expand All @@ -323,6 +323,7 @@ func SyncMirrors(ctx context.Context) {
for {
select {
case <-ctx.Done():
mirrorQueue.Close()
return
case repoID := <-mirrorQueue.Queue():
syncMirror(repoID)
Expand Down
48 changes: 15 additions & 33 deletions services/pull/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,37 +154,22 @@ func manuallyMerged(pr *models.PullRequest) bool {
// TestPullRequests checks and tests untested patches of pull requests.
// TODO: test more pull requests at same time.
func TestPullRequests(ctx context.Context) {
prs, err := models.GetPullRequestsByCheckStatus(models.PullRequestStatusChecking)
if err != nil {
log.Error("Find Checking PRs: %v", err)
return
}

var checkedPRs = make(map[int64]struct{})

// Update pull request status.
for _, pr := range prs {
checkedPRs[pr.ID] = struct{}{}
if err := pr.GetBaseRepo(); err != nil {
log.Error("GetBaseRepo: %v", err)
continue
}
if manuallyMerged(pr) {
continue
}
if err := TestPatch(pr); err != nil {
log.Error("testPatch: %v", err)
continue
}

checkAndUpdateStatus(pr)
select {
case <-ctx.Done():
log.Info("PID: %d Pull Request testing shutdown", os.Getpid())
go func() {
prs, err := models.GetPullRequestIDsByCheckStatus(models.PullRequestStatusChecking)
if err != nil {
log.Error("Find Checking PRs: %v", err)
return
default:
}
}
for _, prID := range prs {
select {
case <-ctx.Done():
return
default:
pullRequestQueue.Add(prID)
}
}
}()

// Start listening on new test requests.
for {
Expand All @@ -194,23 +179,20 @@ func TestPullRequests(ctx context.Context) {
pullRequestQueue.Remove(prID)

id := com.StrTo(prID).MustInt64()
if _, ok := checkedPRs[id]; ok {
continue
}

pr, err := models.GetPullRequestByID(id)
if err != nil {
log.Error("GetPullRequestByID[%s]: %v", prID, err)
continue
} else if manuallyMerged(pr) {
continue
} else if err = pr.TestPatch(); err != nil {
} else if err = TestPatch(pr); err != nil {
log.Error("testPatch[%d]: %v", pr.ID, err)
continue
}

checkAndUpdateStatus(pr)
case <-ctx.Done():
pullRequestQueue.Close()
log.Info("PID: %d Pull Request testing shutdown", os.Getpid())
return
}
Expand Down