Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix parallel creating commit status bug with tests (#21911) #21989

Merged
merged 5 commits into from
Dec 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 20 additions & 5 deletions models/db/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ import (
"context"
"errors"
"fmt"
"strconv"

"code.gitea.io/gitea/modules/setting"
)

// ResourceIndex represents a resource index which could be used as issue/release and others
Expand All @@ -24,11 +27,6 @@ var (
ErrGetResourceIndexFailed = errors.New("get resource index failed")
)

const (
// MaxDupIndexAttempts max retry times to create index
MaxDupIndexAttempts = 3
)

// SyncMaxResourceIndex sync the max index with the resource
func SyncMaxResourceIndex(ctx context.Context, tableName string, groupID, maxIndex int64) (err error) {
e := GetEngine(ctx)
Expand Down Expand Up @@ -61,8 +59,25 @@ func SyncMaxResourceIndex(ctx context.Context, tableName string, groupID, maxInd
return nil
}

func postgresGetNextResourceIndex(ctx context.Context, tableName string, groupID int64) (int64, error) {
res, err := GetEngine(ctx).Query(fmt.Sprintf("INSERT INTO %s (group_id, max_index) "+
"VALUES (?,1) ON CONFLICT (group_id) DO UPDATE SET max_index = %s.max_index+1 RETURNING max_index",
tableName, tableName), groupID)
if err != nil {
return 0, err
}
if len(res) == 0 {
return 0, ErrGetResourceIndexFailed
}
return strconv.ParseInt(string(res[0]["max_index"]), 10, 64)
}

// GetNextResourceIndex generates a resource index, it must run in the same transaction where the resource is created
func GetNextResourceIndex(ctx context.Context, tableName string, groupID int64) (int64, error) {
if setting.Database.UsePostgreSQL {
return postgresGetNextResourceIndex(ctx, tableName, groupID)
}

e := GetEngine(ctx)

// try to update the max_index to next value, and acquire the write-lock for the record
Expand Down
110 changes: 50 additions & 60 deletions models/git/commit_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ package git
import (
"context"
"crypto/sha1"
"errors"
"fmt"
"net/url"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -49,79 +51,67 @@ func init() {
db.RegisterModel(new(CommitStatusIndex))
}

// upsertCommitStatusIndex the function will not return until it acquires the lock or receives an error.
func upsertCommitStatusIndex(ctx context.Context, repoID int64, sha string) (err error) {
// An atomic UPSERT operation (INSERT/UPDATE) is the only operation
// that ensures that the key is actually locked.
switch {
case setting.Database.UseSQLite3 || setting.Database.UsePostgreSQL:
_, err = db.Exec(ctx, "INSERT INTO `commit_status_index` (repo_id, sha, max_index) "+
"VALUES (?,?,1) ON CONFLICT (repo_id,sha) DO UPDATE SET max_index = `commit_status_index`.max_index+1",
repoID, sha)
case setting.Database.UseMySQL:
_, err = db.Exec(ctx, "INSERT INTO `commit_status_index` (repo_id, sha, max_index) "+
"VALUES (?,?,1) ON DUPLICATE KEY UPDATE max_index = max_index+1",
repoID, sha)
case setting.Database.UseMSSQL:
// https://weblogs.sqlteam.com/dang/2009/01/31/upsert-race-condition-with-merge/
_, err = db.Exec(ctx, "MERGE `commit_status_index` WITH (HOLDLOCK) as target "+
"USING (SELECT ? AS repo_id, ? AS sha) AS src "+
"ON src.repo_id = target.repo_id AND src.sha = target.sha "+
"WHEN MATCHED THEN UPDATE SET target.max_index = target.max_index+1 "+
"WHEN NOT MATCHED THEN INSERT (repo_id, sha, max_index) "+
"VALUES (src.repo_id, src.sha, 1);",
repoID, sha)
default:
return fmt.Errorf("database type not supported")
func postgresGetCommitStatusIndex(ctx context.Context, repoID int64, sha string) (int64, error) {
res, err := db.GetEngine(ctx).Query("INSERT INTO `commit_status_index` (repo_id, sha, max_index) "+
"VALUES (?,?,1) ON CONFLICT (repo_id, sha) DO UPDATE SET max_index = `commit_status_index`.max_index+1 RETURNING max_index",
repoID, sha)
if err != nil {
return 0, err
}
if len(res) == 0 {
return 0, db.ErrGetResourceIndexFailed
}
return err
return strconv.ParseInt(string(res[0]["max_index"]), 10, 64)
}

// GetNextCommitStatusIndex retried 3 times to generate a resource index
func GetNextCommitStatusIndex(repoID int64, sha string) (int64, error) {
for i := 0; i < db.MaxDupIndexAttempts; i++ {
idx, err := getNextCommitStatusIndex(repoID, sha)
if err == db.ErrResouceOutdated {
continue
}
if err != nil {
return 0, err
}
return idx, nil
func GetNextCommitStatusIndex(ctx context.Context, repoID int64, sha string) (int64, error) {
if setting.Database.UsePostgreSQL {
return postgresGetCommitStatusIndex(ctx, repoID, sha)
}
return 0, db.ErrGetResourceIndexFailed
}

// getNextCommitStatusIndex return the next index
func getNextCommitStatusIndex(repoID int64, sha string) (int64, error) {
ctx, commiter, err := db.TxContext()
e := db.GetEngine(ctx)

// try to update the max_index to next value, and acquire the write-lock for the record
res, err := e.Exec("UPDATE `commit_status_index` SET max_index=max_index+1 WHERE repo_id=? AND sha=?", repoID, sha)
if err != nil {
return 0, err
}
defer commiter.Close()

var preIdx int64
_, err = db.GetEngine(ctx).SQL("SELECT max_index FROM `commit_status_index` WHERE repo_id = ? AND sha = ?", repoID, sha).Get(&preIdx)
affected, err := res.RowsAffected()
if err != nil {
return 0, err
}
if affected == 0 {
// this slow path is only for the first time of creating a resource index
_, errIns := e.Exec("INSERT INTO `commit_status_index` (repo_id, sha, max_index) VALUES (?, ?, 0)", repoID, sha)
res, err = e.Exec("UPDATE `commit_status_index` SET max_index=max_index+1 WHERE repo_id=? AND sha=?", repoID, sha)
if err != nil {
return 0, err
}

if err := upsertCommitStatusIndex(ctx, repoID, sha); err != nil {
return 0, err
affected, err = res.RowsAffected()
if err != nil {
return 0, err
}
// if the update still can not update any records, the record must not exist and there must be some errors (insert error)
if affected == 0 {
if errIns == nil {
return 0, errors.New("impossible error when GetNextCommitStatusIndex, insert and update both succeeded but no record is updated")
}
return 0, errIns
}
}

var curIdx int64
has, err := db.GetEngine(ctx).SQL("SELECT max_index FROM `commit_status_index` WHERE repo_id = ? AND sha = ? AND max_index=?", repoID, sha, preIdx+1).Get(&curIdx)
// now, the new index is in database (protected by the transaction and write-lock)
var newIdx int64
has, err := e.SQL("SELECT max_index FROM `commit_status_index` WHERE repo_id=? AND sha=?", repoID, sha).Get(&newIdx)
if err != nil {
return 0, err
}
if !has {
return 0, db.ErrResouceOutdated
}
if err := commiter.Commit(); err != nil {
return 0, err
return 0, errors.New("impossible error when GetNextCommitStatusIndex, upsert succeeded but no record can be selected")
}
return curIdx, nil
return newIdx, nil
}

func (status *CommitStatus) loadAttributes(ctx context.Context) (err error) {
Expand Down Expand Up @@ -291,18 +281,18 @@ func NewCommitStatus(opts NewCommitStatusOptions) error {
return fmt.Errorf("NewCommitStatus[%s, %s]: no user specified", repoPath, opts.SHA)
}

// Get the next Status Index
idx, err := GetNextCommitStatusIndex(opts.Repo.ID, opts.SHA)
if err != nil {
return fmt.Errorf("generate commit status index failed: %w", err)
}

ctx, committer, err := db.TxContext()
if err != nil {
return fmt.Errorf("NewCommitStatus[repo_id: %d, user_id: %d, sha: %s]: %w", opts.Repo.ID, opts.Creator.ID, opts.SHA, err)
}
defer committer.Close()

// Get the next Status Index
idx, err := GetNextCommitStatusIndex(ctx, opts.Repo.ID, opts.SHA)
if err != nil {
return fmt.Errorf("generate commit status index failed: %w", err)
}

opts.CommitStatus.Description = strings.TrimSpace(opts.CommitStatus.Description)
opts.CommitStatus.Context = strings.TrimSpace(opts.CommitStatus.Context)
opts.CommitStatus.TargetURL = strings.TrimSpace(opts.CommitStatus.TargetURL)
Expand All @@ -316,7 +306,7 @@ func NewCommitStatus(opts NewCommitStatusOptions) error {

// Insert new CommitStatus
if _, err = db.GetEngine(ctx).Insert(opts.CommitStatus); err != nil {
return fmt.Errorf("Insert CommitStatus[%s, %s]: %w", repoPath, opts.SHA, err)
return fmt.Errorf("insert CommitStatus[%s, %s]: %w", repoPath, opts.SHA, err)
}

return committer.Commit()
Expand Down
31 changes: 31 additions & 0 deletions tests/integration/repo_commits_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@
package integration

import (
"fmt"
"net/http"
"net/http/httptest"
"path"
"sync"
"testing"

"code.gitea.io/gitea/modules/json"
Expand Down Expand Up @@ -115,3 +117,32 @@ func TestRepoCommitsWithStatusFailure(t *testing.T) {
func TestRepoCommitsWithStatusWarning(t *testing.T) {
doTestRepoCommitWithStatus(t, "warning", "gitea-exclamation", "yellow")
}

func TestRepoCommitsStatusParallel(t *testing.T) {
defer tests.PrepareTestEnv(t)()

session := loginUser(t, "user2")

// Request repository commits page
req := NewRequest(t, "GET", "/user2/repo1/commits/branch/master")
resp := session.MakeRequest(t, req, http.StatusOK)

doc := NewHTMLParser(t, resp.Body)
// Get first commit URL
commitURL, exists := doc.doc.Find("#commits-table tbody tr td.sha a").Attr("href")
assert.True(t, exists)
assert.NotEmpty(t, commitURL)

var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func(parentT *testing.T, i int) {
parentT.Run(fmt.Sprintf("ParallelCreateStatus_%d", i), func(t *testing.T) {
runBody := doAPICreateCommitStatus(NewAPITestContext(t, "user2", "repo1"), path.Base(commitURL), api.CommitStatusState("pending"))
runBody(t)
wg.Done()
})
}(t, i)
}
wg.Wait()
}