Merge pull request 'Revert "avoid superfluous synchronized pull_request run when opening a PR"' (#2848) from earl-warren/forgejo:wip-revert-bad-race-fix into forgejo
Reviewed-on: https://codeberg.org/forgejo/forgejo/pulls/2848 Reviewed-by: oliverpool <oliverpool@noreply.codeberg.org>
This commit is contained in:
commit
26f6a3b578
9 changed files with 94 additions and 336 deletions
|
@ -9,14 +9,6 @@ import (
|
|||
"code.gitea.io/gitea/models/db"
|
||||
)
|
||||
|
||||
func GetMaxIssueIndexForRepo(ctx context.Context, repoID int64) (int64, error) {
|
||||
var max int64
|
||||
if _, err := db.GetEngine(ctx).Select("MAX(`index`)").Table("issue").Where("repo_id=?", repoID).Get(&max); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return max, nil
|
||||
}
|
||||
|
||||
// RecalculateIssueIndexForRepo create issue_index for repo if not exist and
|
||||
// update it based on highest index of existing issues assigned to a repo
|
||||
func RecalculateIssueIndexForRepo(ctx context.Context, repoID int64) error {
|
||||
|
@ -26,8 +18,8 @@ func RecalculateIssueIndexForRepo(ctx context.Context, repoID int64) error {
|
|||
}
|
||||
defer committer.Close()
|
||||
|
||||
max, err := GetMaxIssueIndexForRepo(ctx, repoID)
|
||||
if err != nil {
|
||||
var max int64
|
||||
if _, err = db.GetEngine(ctx).Select(" MAX(`index`)").Table("issue").Where("repo_id=?", repoID).Get(&max); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
|
@ -1,38 +0,0 @@
|
|||
// Copyright 2024 The Forgejo Authors
|
||||
// SPDX-License-Identifier: MIT
|
||||
|
||||
package issues_test
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"code.gitea.io/gitea/models/db"
|
||||
issues_model "code.gitea.io/gitea/models/issues"
|
||||
repo_model "code.gitea.io/gitea/models/repo"
|
||||
"code.gitea.io/gitea/models/unittest"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestGetMaxIssueIndexForRepo(t *testing.T) {
|
||||
assert.NoError(t, unittest.PrepareTestDatabase())
|
||||
|
||||
repo := unittest.AssertExistsAndLoadBean(t, &repo_model.Repository{ID: 1})
|
||||
|
||||
maxPR, err := issues_model.GetMaxIssueIndexForRepo(db.DefaultContext, repo.ID)
|
||||
assert.NoError(t, err)
|
||||
|
||||
issue := testCreateIssue(t, repo.ID, repo.OwnerID, "title1", "content1", false)
|
||||
assert.Greater(t, issue.Index, maxPR)
|
||||
|
||||
maxPR, err = issues_model.GetMaxIssueIndexForRepo(db.DefaultContext, repo.ID)
|
||||
assert.NoError(t, err)
|
||||
|
||||
pull := testCreateIssue(t, repo.ID, repo.OwnerID, "title2", "content2", true)
|
||||
assert.Greater(t, pull.Index, maxPR)
|
||||
|
||||
maxPR, err = issues_model.GetMaxIssueIndexForRepo(db.DefaultContext, repo.ID)
|
||||
assert.NoError(t, err)
|
||||
|
||||
assert.Equal(t, maxPR, pull.Index)
|
||||
}
|
|
@ -47,14 +47,6 @@ func listPullRequestStatement(ctx context.Context, baseRepoID int64, opts *PullR
|
|||
return sess, nil
|
||||
}
|
||||
|
||||
func GetUnmergedPullRequestsByHeadInfoMax(ctx context.Context, repoID, maxIndex int64, branch string) ([]*PullRequest, error) {
|
||||
prs := make([]*PullRequest, 0, 2)
|
||||
sess := db.GetEngine(ctx).
|
||||
Join("INNER", "issue", "issue.id = `pull_request`.issue_id").
|
||||
Where("`pull_request`.head_repo_id = ? AND `pull_request`.head_branch = ? AND `pull_request`.has_merged = ? AND `issue`.is_closed = ? AND `pull_request`.flow = ? AND `issue`.`index` <= ?", repoID, branch, false, false, PullRequestFlowGithub, maxIndex)
|
||||
return prs, sess.Find(&prs)
|
||||
}
|
||||
|
||||
// GetUnmergedPullRequestsByHeadInfo returns all pull requests that are open and has not been merged
|
||||
func GetUnmergedPullRequestsByHeadInfo(ctx context.Context, repoID int64, branch string) ([]*PullRequest, error) {
|
||||
prs := make([]*PullRequest, 0, 2)
|
||||
|
|
|
@ -4,7 +4,6 @@
|
|||
package issues_test
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"code.gitea.io/gitea/models/db"
|
||||
|
@ -157,91 +156,6 @@ func TestGetUnmergedPullRequestsByHeadInfo(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestGetUnmergedPullRequestsByHeadInfoMax(t *testing.T) {
|
||||
assert.NoError(t, unittest.PrepareTestDatabase())
|
||||
|
||||
repoID := int64(1)
|
||||
maxPR := int64(0)
|
||||
prs, err := issues_model.GetUnmergedPullRequestsByHeadInfoMax(db.DefaultContext, repoID, maxPR, "branch2")
|
||||
assert.NoError(t, err)
|
||||
assert.Len(t, prs, 0)
|
||||
maxPR, err = issues_model.GetMaxIssueIndexForRepo(db.DefaultContext, repoID)
|
||||
assert.NoError(t, err)
|
||||
prs, err = issues_model.GetUnmergedPullRequestsByHeadInfoMax(db.DefaultContext, repoID, maxPR, "branch2")
|
||||
assert.NoError(t, err)
|
||||
assert.Len(t, prs, 1)
|
||||
for _, pr := range prs {
|
||||
assert.Equal(t, int64(1), pr.HeadRepoID)
|
||||
assert.Equal(t, "branch2", pr.HeadBranch)
|
||||
}
|
||||
pr := prs[0]
|
||||
|
||||
for _, testCase := range []struct {
|
||||
table string
|
||||
field string
|
||||
id int64
|
||||
match any
|
||||
nomatch any
|
||||
}{
|
||||
{
|
||||
table: "issue",
|
||||
field: "is_closed",
|
||||
id: pr.IssueID,
|
||||
match: false,
|
||||
nomatch: true,
|
||||
},
|
||||
{
|
||||
table: "pull_request",
|
||||
field: "flow",
|
||||
id: pr.ID,
|
||||
match: issues_model.PullRequestFlowGithub,
|
||||
nomatch: issues_model.PullRequestFlowAGit,
|
||||
},
|
||||
{
|
||||
table: "pull_request",
|
||||
field: "head_repo_id",
|
||||
id: pr.ID,
|
||||
match: pr.HeadRepoID,
|
||||
nomatch: 0,
|
||||
},
|
||||
{
|
||||
table: "pull_request",
|
||||
field: "head_branch",
|
||||
id: pr.ID,
|
||||
match: pr.HeadBranch,
|
||||
nomatch: "something else",
|
||||
},
|
||||
{
|
||||
table: "pull_request",
|
||||
field: "has_merged",
|
||||
id: pr.ID,
|
||||
match: false,
|
||||
nomatch: true,
|
||||
},
|
||||
} {
|
||||
t.Run(testCase.field, func(t *testing.T) {
|
||||
update := fmt.Sprintf("UPDATE `%s` SET `%s` = ? WHERE `id` = ?", testCase.table, testCase.field)
|
||||
|
||||
// expect no match
|
||||
_, err = db.GetEngine(db.DefaultContext).Exec(update, testCase.nomatch, testCase.id)
|
||||
assert.NoError(t, err)
|
||||
prs, err = issues_model.GetUnmergedPullRequestsByHeadInfoMax(db.DefaultContext, repoID, maxPR, "branch2")
|
||||
assert.NoError(t, err)
|
||||
assert.Len(t, prs, 0)
|
||||
|
||||
// expect one match
|
||||
_, err = db.GetEngine(db.DefaultContext).Exec(update, testCase.match, testCase.id)
|
||||
assert.NoError(t, err)
|
||||
prs, err = issues_model.GetUnmergedPullRequestsByHeadInfoMax(db.DefaultContext, repoID, maxPR, "branch2")
|
||||
assert.NoError(t, err)
|
||||
assert.Len(t, prs, 1)
|
||||
|
||||
// identical to the known PR
|
||||
assert.Equal(t, pr.ID, prs[0].ID)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetUnmergedPullRequestsByBaseInfo(t *testing.T) {
|
||||
assert.NoError(t, unittest.PrepareTestDatabase())
|
||||
prs, err := issues_model.GetUnmergedPullRequestsByBaseInfo(db.DefaultContext, 1, "master")
|
||||
|
|
|
@ -187,7 +187,7 @@ func Merge(ctx context.Context, pr *issues_model.PullRequest, doer *user_model.U
|
|||
}
|
||||
|
||||
defer func() {
|
||||
AddTestPullRequestTask(ctx, doer, pr.BaseRepo.ID, pr.BaseBranch, false, "", "")
|
||||
go AddTestPullRequestTask(doer, pr.BaseRepo.ID, pr.BaseBranch, false, "", "")
|
||||
}()
|
||||
|
||||
pr.MergedCommitID, err = doMergeAndPush(ctx, pr, doer, mergeStyle, expectedHeadCommitID, message)
|
||||
|
|
|
@ -292,39 +292,22 @@ func checkForInvalidation(ctx context.Context, requests issues_model.PullRequest
|
|||
|
||||
// AddTestPullRequestTask adds new test tasks by given head/base repository and head/base branch,
|
||||
// and generate new patch for testing as needed.
|
||||
func AddTestPullRequestTask(ctx context.Context, doer *user_model.User, repoID int64, branch string, isSync bool, oldCommitID, newCommitID string) {
|
||||
// When TestPullRequest runs it must ignore any PR with an index > maxPR because they
|
||||
// would have been created after the goroutine started. They are in the future.
|
||||
// This guards the following race:
|
||||
// * commit A is pushed
|
||||
// * goroutine starts but does not run TestPullRequest yet
|
||||
// * a pull request with commit A as the head is created
|
||||
// * goroutine continues and runs TestPullRequest
|
||||
maxPR, err := issues_model.GetMaxIssueIndexForRepo(ctx, repoID)
|
||||
if err != nil {
|
||||
log.Error("AddTestPullRequestTask GetMaxIssueIndexForRepo(%d): %v", repoID, err)
|
||||
return
|
||||
}
|
||||
log.Trace("AddTestPullRequestTask [head_repo_id: %d, head_branch: %s]: only pull requests with index <= %d will be considered", repoID, branch, maxPR)
|
||||
go graceful.GetManager().RunWithShutdownContext(func(ctx context.Context) {
|
||||
func AddTestPullRequestTask(doer *user_model.User, repoID int64, branch string, isSync bool, oldCommitID, newCommitID string) {
|
||||
log.Trace("AddTestPullRequestTask [head_repo_id: %d, head_branch: %s]: finding pull requests", repoID, branch)
|
||||
graceful.GetManager().RunWithShutdownContext(func(ctx context.Context) {
|
||||
// There is no sensible way to shut this down ":-("
|
||||
// If you don't let it run all the way then you will lose data
|
||||
// TODO: graceful: TestPullRequest needs to become a queue!
|
||||
// TODO: graceful: AddTestPullRequestTask needs to become a queue!
|
||||
|
||||
TestPullRequest(ctx, doer, repoID, maxPR, branch, isSync, oldCommitID, newCommitID)
|
||||
})
|
||||
}
|
||||
|
||||
func TestPullRequest(ctx context.Context, doer *user_model.User, repoID, maxPR int64, branch string, isSync bool, oldCommitID, newCommitID string) {
|
||||
// GetUnmergedPullRequestsByHeadInfo() only return open and unmerged PR.
|
||||
prs, err := issues_model.GetUnmergedPullRequestsByHeadInfoMax(ctx, repoID, maxPR, branch)
|
||||
prs, err := issues_model.GetUnmergedPullRequestsByHeadInfo(ctx, repoID, branch)
|
||||
if err != nil {
|
||||
log.Error("Find pull requests [head_repo_id: %d, head_branch: %s]: %v", repoID, branch, err)
|
||||
return
|
||||
}
|
||||
|
||||
for _, pr := range prs {
|
||||
log.Trace("Updating PR[id=%d,index=%d]: composing new test task", pr.ID, pr.Index)
|
||||
log.Trace("Updating PR[%d]: composing new test task", pr.ID)
|
||||
if pr.Flow == issues_model.PullRequestFlowGithub {
|
||||
if err := PushToBaseRepo(ctx, pr); err != nil {
|
||||
log.Error("PushToBaseRepo: %v", err)
|
||||
|
@ -393,7 +376,7 @@ func TestPullRequest(ctx context.Context, doer *user_model.User, repoID, maxPR i
|
|||
}
|
||||
}
|
||||
|
||||
log.Trace("TestPullRequest [base_repo_id: %d, base_branch: %s]: finding pull requests", repoID, branch)
|
||||
log.Trace("AddTestPullRequestTask [base_repo_id: %d, base_branch: %s]: finding pull requests", repoID, branch)
|
||||
prs, err = issues_model.GetUnmergedPullRequestsByBaseInfo(ctx, repoID, branch)
|
||||
if err != nil {
|
||||
log.Error("Find pull requests [base_repo_id: %d, base_branch: %s]: %v", repoID, branch, err)
|
||||
|
@ -415,6 +398,7 @@ func TestPullRequest(ctx context.Context, doer *user_model.User, repoID, maxPR i
|
|||
}
|
||||
AddToTaskQueue(ctx, pr)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// checkIfPRContentChanged checks if diff to target branch has changed by push
|
||||
|
|
|
@ -36,7 +36,7 @@ func Update(ctx context.Context, pr *issues_model.PullRequest, doer *user_model.
|
|||
|
||||
if rebase {
|
||||
defer func() {
|
||||
AddTestPullRequestTask(ctx, doer, pr.BaseRepo.ID, pr.BaseBranch, false, "", "")
|
||||
go AddTestPullRequestTask(doer, pr.BaseRepo.ID, pr.BaseBranch, false, "", "")
|
||||
}()
|
||||
|
||||
return updateHeadByRebaseOnToBase(ctx, pr, doer, message)
|
||||
|
@ -75,7 +75,7 @@ func Update(ctx context.Context, pr *issues_model.PullRequest, doer *user_model.
|
|||
_, err = doMergeAndPush(ctx, reversePR, doer, repo_model.MergeStyleMerge, "", message)
|
||||
|
||||
defer func() {
|
||||
AddTestPullRequestTask(ctx, doer, reversePR.HeadRepo.ID, reversePR.HeadBranch, false, "", "")
|
||||
go AddTestPullRequestTask(doer, reversePR.HeadRepo.ID, reversePR.HeadBranch, false, "", "")
|
||||
}()
|
||||
|
||||
return err
|
||||
|
|
|
@ -166,7 +166,7 @@ func pushUpdates(optsList []*repo_module.PushUpdateOptions) error {
|
|||
branch := opts.RefFullName.BranchName()
|
||||
if !opts.IsDelRef() {
|
||||
log.Trace("TriggerTask '%s/%s' by %s", repo.Name, branch, pusher.Name)
|
||||
pull_service.AddTestPullRequestTask(ctx, pusher, repo.ID, branch, true, opts.OldCommitID, opts.NewCommitID)
|
||||
go pull_service.AddTestPullRequestTask(pusher, repo.ID, branch, true, opts.OldCommitID, opts.NewCommitID)
|
||||
|
||||
newCommit, err := gitRepo.GetCommit(opts.NewCommitID)
|
||||
if err != nil {
|
||||
|
|
|
@ -1,86 +0,0 @@
|
|||
// Copyright 2024 The Forgejo Authors
|
||||
// SPDX-License-Identifier: MIT
|
||||
|
||||
package integration
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
issues_model "code.gitea.io/gitea/models/issues"
|
||||
repo_model "code.gitea.io/gitea/models/repo"
|
||||
"code.gitea.io/gitea/models/unittest"
|
||||
user_model "code.gitea.io/gitea/models/user"
|
||||
"code.gitea.io/gitea/modules/git"
|
||||
"code.gitea.io/gitea/modules/log"
|
||||
repo_module "code.gitea.io/gitea/modules/repository"
|
||||
"code.gitea.io/gitea/modules/test"
|
||||
pull_service "code.gitea.io/gitea/services/pull"
|
||||
repo_service "code.gitea.io/gitea/services/repository"
|
||||
"code.gitea.io/gitea/tests"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestPullRequestSynchronized(t *testing.T) {
|
||||
defer tests.PrepareTestEnv(t)()
|
||||
|
||||
// unmerged pull request of user2/repo1 from branch2 to master
|
||||
pull := unittest.AssertExistsAndLoadBean(t, &issues_model.PullRequest{ID: 2})
|
||||
// tip of tests/gitea-repositories-meta/user2/repo1 branch2
|
||||
pull.HeadCommitID = "985f0301dba5e7b34be866819cd15ad3d8f508ee"
|
||||
|
||||
require.Equal(t, pull.HeadRepoID, pull.BaseRepoID)
|
||||
repo := unittest.AssertExistsAndLoadBean(t, &repo_model.Repository{ID: pull.HeadRepoID})
|
||||
owner := unittest.AssertExistsAndLoadBean(t, &user_model.User{ID: repo.OwnerID})
|
||||
|
||||
t.Run("AddTestPullRequestTask", func(t *testing.T) {
|
||||
logChecker, cleanup := test.NewLogChecker(log.DEFAULT, log.TRACE)
|
||||
logChecker.Filter("Updating PR").StopMark("TestPullRequest ")
|
||||
defer cleanup()
|
||||
|
||||
opt := &repo_module.PushUpdateOptions{
|
||||
PusherID: owner.ID,
|
||||
PusherName: owner.Name,
|
||||
RepoUserName: owner.Name,
|
||||
RepoName: repo.Name,
|
||||
RefFullName: git.RefName("refs/heads/branch2"),
|
||||
OldCommitID: pull.HeadCommitID,
|
||||
NewCommitID: pull.HeadCommitID,
|
||||
}
|
||||
require.NoError(t, repo_service.PushUpdate(opt))
|
||||
logFiltered, logStopped := logChecker.Check(5 * time.Second)
|
||||
assert.True(t, logStopped)
|
||||
assert.True(t, logFiltered[0])
|
||||
})
|
||||
|
||||
for _, testCase := range []struct {
|
||||
name string
|
||||
maxPR int64
|
||||
expected bool
|
||||
}{
|
||||
{
|
||||
name: "TestPullRequest process PR",
|
||||
maxPR: pull.Index,
|
||||
expected: true,
|
||||
},
|
||||
{
|
||||
name: "TestPullRequest skip PR",
|
||||
maxPR: pull.Index - 1,
|
||||
expected: false,
|
||||
},
|
||||
} {
|
||||
t.Run(testCase.name, func(t *testing.T) {
|
||||
logChecker, cleanup := test.NewLogChecker(log.DEFAULT, log.TRACE)
|
||||
logChecker.Filter("Updating PR").StopMark("TestPullRequest ")
|
||||
defer cleanup()
|
||||
|
||||
pull_service.TestPullRequest(context.Background(), owner, repo.ID, testCase.maxPR, "branch2", true, pull.HeadCommitID, pull.HeadCommitID)
|
||||
logFiltered, logStopped := logChecker.Check(5 * time.Second)
|
||||
assert.True(t, logStopped)
|
||||
assert.Equal(t, testCase.expected, logFiltered[0])
|
||||
})
|
||||
}
|
||||
}
|
Loading…
Reference in a new issue