Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unique repo path and permissions #8517

Merged
merged 6 commits into from
Feb 24, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions reposerver/repository/lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ type repositoryLock struct {
}

// Lock acquires lock unless lock is already acquired with the same commit and allowConcurrent is set to true
func (r *repositoryLock) Lock(path string, revision string, allowConcurrent bool, init func() error) (io.Closer, error) {
func (r *repositoryLock) Lock(path string, revision string, allowConcurrent bool, init func() (io.Closer, error)) (io.Closer, error) {
r.lock.Lock()
state, ok := r.stateByKey[path]
if !ok {
Expand All @@ -30,26 +30,30 @@ func (r *repositoryLock) Lock(path string, revision string, allowConcurrent bool
state.cond.L.Lock()
notify := false
state.processCount--
var err error
if state.processCount == 0 {
notify = true
state.revision = ""
err = state.initCloser.Close()
}

state.cond.L.Unlock()
if notify {
state.cond.Broadcast()
}
return nil
return err
})

for {
state.cond.L.Lock()
if state.revision == "" {
// no in progress operation for that repo. Go ahead.
if err := init(); err != nil {
initCloser, err := init()
if err != nil {
state.cond.L.Unlock()
return nil, err
}

state.initCloser = initCloser
state.revision = revision
state.processCount = 1
state.allowConcurrent = allowConcurrent
Expand All @@ -71,6 +75,7 @@ func (r *repositoryLock) Lock(path string, revision string, allowConcurrent bool
type repositoryState struct {
cond *sync.Cond
revision string
initCloser io.Closer
processCount int
allowConcurrent bool
}
14 changes: 7 additions & 7 deletions reposerver/repository/lock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ func lockQuickly(action func() (io.Closer, error)) (io.Closer, bool) {
}
}

func numberOfInits(initializedTimes *int) func() error {
return func() error {
func numberOfInits(initializedTimes *int) func() (io.Closer, error) {
return func() (io.Closer, error) {
*initializedTimes++
return nil
return util.NopCloser, nil
}
}

Expand Down Expand Up @@ -120,8 +120,8 @@ func TestLock_FailedInitialization(t *testing.T) {
lock := NewRepositoryLock()

closer1, done := lockQuickly(func() (io.Closer, error) {
return lock.Lock("myRepo", "1", true, func() error {
return errors.New("failed")
return lock.Lock("myRepo", "1", true, func() (io.Closer, error) {
return util.NopCloser, errors.New("failed")
})
})

Expand All @@ -132,8 +132,8 @@ func TestLock_FailedInitialization(t *testing.T) {
assert.Nil(t, closer1)

closer2, done := lockQuickly(func() (io.Closer, error) {
return lock.Lock("myRepo", "1", true, func() error {
return nil
return lock.Lock("myRepo", "1", true, func() (io.Closer, error) {
return util.NopCloser, nil
})
})

Expand Down
143 changes: 109 additions & 34 deletions reposerver/repository/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"encoding/json"
"errors"
"fmt"
goio "io"
"io/fs"
"io/ioutil"
"net/url"
"os"
Expand All @@ -16,17 +18,15 @@ import (
"strings"
"time"

"github.com/google/go-jsonnet"

"github.com/argoproj/argo-cd/v2/util/argo"

"github.com/Masterminds/semver/v3"
"github.com/TomOnTime/utfutil"
"github.com/argoproj/gitops-engine/pkg/utils/kube"
textutils "github.com/argoproj/gitops-engine/pkg/utils/text"
"github.com/argoproj/pkg/sync"
jsonpatch "github.com/evanphx/json-patch"
"github.com/ghodss/yaml"
gogit "github.com/go-git/go-git/v5"
"github.com/google/go-jsonnet"
log "github.com/sirupsen/logrus"
"golang.org/x/sync/semaphore"
"google.golang.org/grpc/codes"
Expand All @@ -44,10 +44,12 @@ import (
"github.com/argoproj/argo-cd/v2/reposerver/metrics"
"github.com/argoproj/argo-cd/v2/util/app/discovery"
argopath "github.com/argoproj/argo-cd/v2/util/app/path"
"github.com/argoproj/argo-cd/v2/util/argo"
executil "github.com/argoproj/argo-cd/v2/util/exec"
"github.com/argoproj/argo-cd/v2/util/git"
"github.com/argoproj/argo-cd/v2/util/glob"
"github.com/argoproj/argo-cd/v2/util/gpg"
"github.com/argoproj/argo-cd/v2/util/grpc"
"github.com/argoproj/argo-cd/v2/util/helm"
"github.com/argoproj/argo-cd/v2/util/io"
"github.com/argoproj/argo-cd/v2/util/ksonnet"
Expand All @@ -68,12 +70,16 @@ const (
// Service implements ManifestService interface
type Service struct {
gitCredsStore git.CredsStore
rootDir string
gitRepoPaths *io.TempPaths
chartPaths *io.TempPaths
gitRepoInitializer func(rootPath string) goio.Closer
repoLock *repositoryLock
cache *reposervercache.Cache
parallelismLimitSemaphore *semaphore.Weighted
metricsServer *metrics.MetricsServer
resourceTracking argo.ResourceTracking
newGitClient func(rawRepoURL string, creds git.Creds, insecure bool, enableLfs bool, proxy string, opts ...git.ClientOpts) (git.Client, error)
newGitClient func(rawRepoURL string, root string, creds git.Creds, insecure bool, enableLfs bool, proxy string, opts ...git.ClientOpts) (git.Client, error)
newHelmClient func(repoURL string, creds helm.Creds, enableOci bool, proxy string, opts ...helm.ClientOpts) helm.Client
initConstants RepoServerInitConstants
// now is usually just time.Now, but may be replaced by unit tests for testing purposes
Expand All @@ -89,7 +95,7 @@ type RepoServerInitConstants struct {
}

// NewService returns a new instance of the Manifest service
func NewService(metricsServer *metrics.MetricsServer, cache *reposervercache.Cache, initConstants RepoServerInitConstants, resourceTracking argo.ResourceTracking, gitCredsStore git.CredsStore) *Service {
func NewService(metricsServer *metrics.MetricsServer, cache *reposervercache.Cache, initConstants RepoServerInitConstants, resourceTracking argo.ResourceTracking, gitCredsStore git.CredsStore, rootDir string) *Service {
var parallelismLimitSemaphore *semaphore.Weighted
if initConstants.ParallelismLimit > 0 {
parallelismLimitSemaphore = semaphore.NewWeighted(initConstants.ParallelismLimit)
Expand All @@ -100,20 +106,59 @@ func NewService(metricsServer *metrics.MetricsServer, cache *reposervercache.Cac
repoLock: repoLock,
cache: cache,
metricsServer: metricsServer,
newGitClient: git.NewClient,
newGitClient: git.NewClientExt,
resourceTracking: resourceTracking,
newHelmClient: func(repoURL string, creds helm.Creds, enableOci bool, proxy string, opts ...helm.ClientOpts) helm.Client {
return helm.NewClientWithLock(repoURL, creds, sync.NewKeyLock(), enableOci, proxy, opts...)
},
initConstants: initConstants,
now: time.Now,
gitCredsStore: gitCredsStore,
initConstants: initConstants,
now: time.Now,
gitCredsStore: gitCredsStore,
gitRepoPaths: io.NewTempPaths(rootDir),
chartPaths: io.NewTempPaths(rootDir),
gitRepoInitializer: directoryPermissionInitializer,
rootDir: rootDir,
}
}

func (s *Service) Init() error {
_, err := os.Stat(s.rootDir)
if os.IsNotExist(err) {
return os.MkdirAll(s.rootDir, 0300)
}
if err == nil {
// give itself read permissions to list previously written directories
err = os.Chmod(s.rootDir, 0700)
}
var files []fs.FileInfo
if err == nil {
files, err = ioutil.ReadDir(s.rootDir)
}
if err != nil {
log.Warnf("Failed to restore cloned repositories paths: %v", err)
return nil
}

for _, file := range files {
if !file.IsDir() {
continue
}
fullPath := filepath.Join(s.rootDir, file.Name())
closer := s.gitRepoInitializer(fullPath)
if repo, err := gogit.PlainOpen(fullPath); err == nil {
if remotes, err := repo.Remotes(); err == nil && len(remotes) > 0 && len(remotes[0].Config().URLs) > 0 {
s.gitRepoPaths.Add(git.NormalizeGitURL(remotes[0].Config().URLs[0]), fullPath)
}
}
io.Close(closer)
}
// remove read permissions since no-one should be able to list the directories
return os.Chmod(s.rootDir, 0300)
}

// List a subset of the refs (currently, branches and tags) of a git repo
func (s *Service) ListRefs(ctx context.Context, q *apiclient.ListRefsRequest) (*apiclient.Refs, error) {
gitClient, err := s.newClient(q.Repo)
gitClient, err := s.newClient(ctx, q.Repo)
if err != nil {
return nil, err
}
Expand All @@ -136,7 +181,7 @@ func (s *Service) ListRefs(ctx context.Context, q *apiclient.ListRefsRequest) (*

// ListApps lists the contents of a GitHub repo
func (s *Service) ListApps(ctx context.Context, q *apiclient.ListAppsRequest) (*apiclient.AppList, error) {
gitClient, commitSHA, err := s.newClientResolveRevision(q.Repo, q.Revision)
gitClient, commitSHA, err := s.newClientResolveRevision(ctx, q.Repo, q.Revision)
if err != nil {
return nil, err
}
Expand All @@ -148,8 +193,8 @@ func (s *Service) ListApps(ctx context.Context, q *apiclient.ListAppsRequest) (*
s.metricsServer.IncPendingRepoRequest(q.Repo.Repo)
defer s.metricsServer.DecPendingRepoRequest(q.Repo.Repo)

closer, err := s.repoLock.Lock(gitClient.Root(), commitSHA, true, func() error {
return checkoutRevision(gitClient, commitSHA, s.initConstants.SubmoduleEnabled)
closer, err := s.repoLock.Lock(gitClient.Root(), commitSHA, true, func() (goio.Closer, error) {
return s.checkoutRevision(gitClient, commitSHA, s.initConstants.SubmoduleEnabled)
})

if err != nil {
Expand Down Expand Up @@ -217,7 +262,7 @@ func (s *Service) runRepoOperation(
return err
}
} else {
gitClient, revision, err = s.newClientResolveRevision(repo, revision, git.WithCache(s.cache, !settings.noRevisionCache && !settings.noCache))
gitClient, revision, err = s.newClientResolveRevision(ctx, repo, revision, git.WithCache(s.cache, !settings.noRevisionCache && !settings.noCache))
if err != nil {
return err
}
Expand Down Expand Up @@ -260,8 +305,8 @@ func (s *Service) runRepoOperation(
return &operationContext{chartPath, ""}, nil
})
} else {
closer, err := s.repoLock.Lock(gitClient.Root(), revision, settings.allowConcurrent, func() error {
return checkoutRevision(gitClient, revision, s.initConstants.SubmoduleEnabled)
closer, err := s.repoLock.Lock(gitClient.Root(), revision, settings.allowConcurrent, func() (goio.Closer, error) {
return s.checkoutRevision(gitClient, revision, s.initConstants.SubmoduleEnabled)
})

if err != nil {
Expand Down Expand Up @@ -1618,16 +1663,16 @@ func (s *Service) GetRevisionMetadata(ctx context.Context, q *apiclient.RepoServ
}
}

gitClient, _, err := s.newClientResolveRevision(q.Repo, q.Revision)
gitClient, _, err := s.newClientResolveRevision(ctx, q.Repo, q.Revision)
if err != nil {
return nil, err
}

s.metricsServer.IncPendingRepoRequest(q.Repo.Repo)
defer s.metricsServer.DecPendingRepoRequest(q.Repo.Repo)

closer, err := s.repoLock.Lock(gitClient.Root(), q.Revision, true, func() error {
return checkoutRevision(gitClient, q.Revision, s.initConstants.SubmoduleEnabled)
closer, err := s.repoLock.Lock(gitClient.Root(), q.Revision, true, func() (goio.Closer, error) {
return s.checkoutRevision(gitClient, q.Revision, s.initConstants.SubmoduleEnabled)
})

if err != nil {
Expand Down Expand Up @@ -1674,15 +1719,23 @@ func fileParameters(q *apiclient.RepoServerAppDetailsQuery) []v1alpha1.HelmFileP
return q.Source.Helm.FileParameters
}

func (s *Service) newClient(repo *v1alpha1.Repository, opts ...git.ClientOpts) (git.Client, error) {
func (s *Service) newClient(ctx context.Context, repo *v1alpha1.Repository, opts ...git.ClientOpts) (git.Client, error) {
repoPath, err := s.gitRepoPaths.GetPath(git.NormalizeGitURL(repo.Repo))
if err != nil {
return nil, err
}
if sanitizer, ok := grpc.SanitizerFromContext(ctx); ok {
// make sure randomized path replaced with '.' in the error message
sanitizer.AddReplacement(repoPath, ".")
}
opts = append(opts, git.WithEventHandlers(metrics.NewGitClientEventHandlers(s.metricsServer)))
return s.newGitClient(repo.Repo, repo.GetGitCreds(s.gitCredsStore), repo.IsInsecure(), repo.EnableLFS, repo.Proxy, opts...)
return s.newGitClient(repo.Repo, repoPath, repo.GetGitCreds(s.gitCredsStore), repo.IsInsecure(), repo.EnableLFS, repo.Proxy, opts...)
}

// newClientResolveRevision is a helper to perform the common task of instantiating a git client
// and resolving a revision to a commit SHA
func (s *Service) newClientResolveRevision(repo *v1alpha1.Repository, revision string, opts ...git.ClientOpts) (git.Client, string, error) {
gitClient, err := s.newClient(repo, opts...)
func (s *Service) newClientResolveRevision(ctx context.Context, repo *v1alpha1.Repository, revision string, opts ...git.ClientOpts) (git.Client, string, error) {
gitClient, err := s.newClient(ctx, repo, opts...)
if err != nil {
return nil, "", err
}
Expand All @@ -1695,7 +1748,7 @@ func (s *Service) newClientResolveRevision(repo *v1alpha1.Repository, revision s

func (s *Service) newHelmClientResolveRevision(repo *v1alpha1.Repository, revision string, chart string, noRevisionCache bool) (helm.Client, string, error) {
enableOCI := repo.EnableOCI || helm.IsHelmOciRepo(repo.Repo)
helmClient := s.newHelmClient(repo.Repo, repo.GetHelmCreds(), enableOCI, repo.Proxy, helm.WithIndexCache(s.cache))
helmClient := s.newHelmClient(repo.Repo, repo.GetHelmCreds(), enableOCI, repo.Proxy, helm.WithIndexCache(s.cache), helm.WithChartPaths(s.chartPaths))
// OCI helm registers don't support semver ranges. Assuming that given revision is exact version
if helm.IsVersion(revision) || enableOCI {
return helmClient, revision, nil
Expand All @@ -1719,13 +1772,35 @@ func (s *Service) newHelmClientResolveRevision(repo *v1alpha1.Repository, revisi
return helmClient, version.String(), nil
}

// directoryPermissionInitializer ensures the directory has read/write/execute permissions and returns
// a function that can be used to remove all permissions.
func directoryPermissionInitializer(rootPath string) goio.Closer {
if _, err := os.Stat(rootPath); err == nil {
alexmt marked this conversation as resolved.
Show resolved Hide resolved
if err := os.Chmod(rootPath, 0700); err != nil {
log.Warnf("Failed to restore read/write/execute permissions on %s: %v", rootPath, err)
} else {
log.Debugf("Successfully restored read/write/execute permissions on %s", rootPath)
}
}

return io.NewCloser(func() error {
if err := os.Chmod(rootPath, 0000); err != nil {
log.Warnf("Failed to remove permissions on %s: %v", rootPath, err)
} else {
log.Debugf("Successfully removed permissions on %s", rootPath)
}
return nil
})
}

// checkoutRevision is a convenience function to initialize a repo, fetch, and checkout a revision
// Returns the 40 character commit SHA after the checkout has been performed
// nolint:unparam
func checkoutRevision(gitClient git.Client, revision string, submoduleEnabled bool) error {
func (s *Service) checkoutRevision(gitClient git.Client, revision string, submoduleEnabled bool) (goio.Closer, error) {
closer := s.gitRepoInitializer(gitClient.Root())
err := gitClient.Init()
if err != nil {
return status.Errorf(codes.Internal, "Failed to initialize git repo: %v", err)
return closer, status.Errorf(codes.Internal, "Failed to initialize git repo: %v", err)
}

err = gitClient.Fetch(revision)
Expand All @@ -1735,25 +1810,25 @@ func checkoutRevision(gitClient git.Client, revision string, submoduleEnabled bo
log.Infof("Fallback to fetch default")
err = gitClient.Fetch("")
if err != nil {
return status.Errorf(codes.Internal, "Failed to fetch default: %v", err)
return closer, status.Errorf(codes.Internal, "Failed to fetch default: %v", err)
}
err = gitClient.Checkout(revision, submoduleEnabled)
if err != nil {
return status.Errorf(codes.Internal, "Failed to checkout revision %s: %v", revision, err)
return closer, status.Errorf(codes.Internal, "Failed to checkout revision %s: %v", revision, err)
}
return err
return closer, err
}

err = gitClient.Checkout("FETCH_HEAD", submoduleEnabled)
if err != nil {
return status.Errorf(codes.Internal, "Failed to checkout FETCH_HEAD: %v", err)
return closer, status.Errorf(codes.Internal, "Failed to checkout FETCH_HEAD: %v", err)
}

return err
return closer, err
}

func (s *Service) GetHelmCharts(ctx context.Context, q *apiclient.HelmChartsRequest) (*apiclient.HelmChartsResponse, error) {
index, err := s.newHelmClient(q.Repo.Repo, q.Repo.GetHelmCreds(), q.Repo.EnableOCI, q.Repo.Proxy).GetIndex(true)
index, err := s.newHelmClient(q.Repo.Repo, q.Repo.GetHelmCreds(), q.Repo.EnableOCI, q.Repo.Proxy, helm.WithChartPaths(s.chartPaths)).GetIndex(true)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1814,7 +1889,7 @@ func (s *Service) ResolveRevision(ctx context.Context, q *apiclient.ResolveRevis
if helm.IsVersion(ambiguousRevision) {
return &apiclient.ResolveRevisionResponse{Revision: ambiguousRevision, AmbiguousRevision: ambiguousRevision}, nil
}
client := helm.NewClient(repo.Repo, repo.GetHelmCreds(), repo.EnableOCI || app.Spec.Source.IsHelmOci(), repo.Proxy)
client := helm.NewClient(repo.Repo, repo.GetHelmCreds(), repo.EnableOCI || app.Spec.Source.IsHelmOci(), repo.Proxy, helm.WithChartPaths(s.chartPaths))
index, err := client.GetIndex(false)
if err != nil {
return &apiclient.ResolveRevisionResponse{Revision: "", AmbiguousRevision: ""}, err
Expand Down
Loading