Skip to content

Commit

Permalink
fix(repo-server): excess git requests, cache lock on revisions
Browse files Browse the repository at this point in the history
Signed-off-by: nromriell <nateromriell@gmail.com>
  • Loading branch information
nromriell committed Feb 22, 2024
1 parent 6aa79f2 commit fd0fa44
Show file tree
Hide file tree
Showing 13 changed files with 896 additions and 103 deletions.
181 changes: 156 additions & 25 deletions reposerver/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/argoproj/gitops-engine/pkg/utils/text"
"github.com/go-git/go-git/v5/plumbing"
"github.com/google/uuid"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"

Expand All @@ -24,11 +25,15 @@ import (
)

var ErrCacheMiss = cacheutil.ErrCacheMiss
var ErrCacheKeyLocked = cacheutil.ErrCacheKeyLocked

const RevisionCacheLockTimeout = 10 * time.Second

type Cache struct {
cache *cacheutil.Cache
repoCacheExpiration time.Duration
revisionCacheExpiration time.Duration
cache *cacheutil.Cache
repoCacheExpiration time.Duration
revisionCacheExpiration time.Duration
revisionCacheLockTimeout time.Duration
}

// ClusterRuntimeInfo holds cluster runtime information
Expand All @@ -39,8 +44,8 @@ type ClusterRuntimeInfo interface {
GetKubeVersion() string
}

func NewCache(cache *cacheutil.Cache, repoCacheExpiration time.Duration, revisionCacheExpiration time.Duration) *Cache {
return &Cache{cache, repoCacheExpiration, revisionCacheExpiration}
func NewCache(cache *cacheutil.Cache, repoCacheExpiration time.Duration, revisionCacheExpiration time.Duration, revisionCacheLockTimeout time.Duration) *Cache {
return &Cache{cache, repoCacheExpiration, revisionCacheExpiration, revisionCacheLockTimeout}
}

func AddCacheFlagsToCmd(cmd *cobra.Command, opts ...cacheutil.Options) func() (*Cache, error) {
Expand All @@ -57,7 +62,7 @@ func AddCacheFlagsToCmd(cmd *cobra.Command, opts ...cacheutil.Options) func() (*
if err != nil {
return nil, fmt.Errorf("error adding cache flags to cmd: %w", err)
}
return NewCache(cache, repoCacheExpiration, revisionCacheExpiration), nil
return NewCache(cache, repoCacheExpiration, revisionCacheExpiration, RevisionCacheLockTimeout), nil
}
}

Expand Down Expand Up @@ -145,7 +150,12 @@ func (c *Cache) ListApps(repoUrl, revision string) (map[string]string, error) {
}

func (c *Cache) SetApps(repoUrl, revision string, apps map[string]string) error {
return c.cache.SetItem(listApps(repoUrl, revision), apps, c.repoCacheExpiration, apps == nil)
return c.cache.SetItem(
listApps(repoUrl, revision),
apps,
&cacheutil.CacheActionOpts{
Expiration: c.repoCacheExpiration,
Delete: apps == nil})
}

func helmIndexRefsKey(repo string) string {
Expand All @@ -154,7 +164,14 @@ func helmIndexRefsKey(repo string) string {

// SetHelmIndex stores helm repository index.yaml content to cache
func (c *Cache) SetHelmIndex(repo string, indexData []byte) error {
return c.cache.SetItem(helmIndexRefsKey(repo), indexData, c.revisionCacheExpiration, false)
if indexData == nil {
// Logged as warning upstream
return fmt.Errorf("helm index data is nil, skipping cache")
}
return c.cache.SetItem(
helmIndexRefsKey(repo),
indexData,
&cacheutil.CacheActionOpts{Expiration: c.revisionCacheExpiration})
}

// GetHelmIndex retrieves helm repository index.yaml content from cache
Expand All @@ -172,21 +189,110 @@ func (c *Cache) SetGitReferences(repo string, references []*plumbing.Reference)
for i := range references {
input = append(input, references[i].Strings())
}
return c.cache.SetItem(gitRefsKey(repo), input, c.revisionCacheExpiration, false)
return c.cache.SetItem(gitRefsKey(repo), input, &cacheutil.CacheActionOpts{Expiration: c.revisionCacheExpiration})
}

// Converts raw cache items to plumbing.Reference objects
func GitRefCacheItemToReferences(cacheItem [][2]string) *[]*plumbing.Reference {
var res []*plumbing.Reference
for i := range cacheItem {
// Skip empty data
if cacheItem[i][0] != "" || cacheItem[i][1] != "" {
res = append(res, plumbing.NewReferenceFromStrings(cacheItem[i][0], cacheItem[i][1]))
}
}
return &res
}

// GetGitReferences retrieves resolved Git repository references from cache
func (c *Cache) GetGitReferences(repo string, references *[]*plumbing.Reference) error {
// TryLockGitRefCache attempts to lock the key for the Git repository references if the key doesn't exist
func (c *Cache) TryLockGitRefCache(repo string, lockId string) error {
// This try set with DisableOverwrite is important for making sure that only one process is able to claim ownership
// A normal get + set, or just set would cause ownership to go to whoever the last writer was, and during race conditions
// leads to duplicate requests
err := c.cache.SetItem(gitRefsKey(repo), [][2]string{{cacheutil.CacheLockedValue, lockId}}, &cacheutil.CacheActionOpts{
Expiration: c.revisionCacheLockTimeout,
DisableOverwrite: true})
return err
}

// Retrieves the cache item for git repo references. Returns lockHolderId, references, error
func (c *Cache) GetGitReferences(repo string) (string, *[]*plumbing.Reference, error) {
var input [][2]string
if err := c.cache.GetItem(gitRefsKey(repo), &input); err != nil {
return err
err := c.cache.GetItem(gitRefsKey(repo), &input)
if err == ErrCacheMiss {
// Expected
return "", nil, nil
} else if err == nil && len(input) > 0 && len(input[0]) > 0 {
if input[0][0] != cacheutil.CacheLockedValue {
// Valid value in cache, convert to plumbing.Reference and return
return "", GitRefCacheItemToReferences(input), nil
} else {
// The key lock is being held
return input[0][1], nil, nil
}
}
var res []*plumbing.Reference
for i := range input {
res = append(res, plumbing.NewReferenceFromStrings(input[i][0], input[i][1]))
return "", nil, err
}

// GetOrLockGitReferences retrieves the git references if they exist, otherwise creates a lock and returns so the caller can populate the cache
// Returns isLockOwner, localLockId, error
func (c *Cache) GetOrLockGitReferences(repo string, references *[]*plumbing.Reference) (bool, string, error) {
myLockUUID, err := uuid.NewRandom()
if err != nil {
log.Debug("Error generating git references cache lock id: ", err)
return false, "", err
}
*references = res
return nil
// We need to be able to identify that our lock was the successful one, otherwise we'll still have duplicate requests
myLockId := myLockUUID.String()
// Value matches the ttl on the lock in TryLockGitRefCache
waitUntil := time.Now().Add(c.revisionCacheLockTimeout)
// Wait only the maximum amount of time configured for the lock
for time.Now().Before(waitUntil) {
// Attempt to retrieve the key from local cache only
if _, cacheReferences, err := c.GetGitReferences(repo); err != nil || cacheReferences != nil {
if cacheReferences != nil {
*references = *cacheReferences
}
return false, myLockId, err
}
// Could not get key locally attempt to get the lock
err = c.TryLockGitRefCache(repo, myLockId)
if err != nil {
// Log but ignore this error since we'll want to retry, failing to obtain the lock should not throw an error
log.Errorf("Error attempting to acquire git references cache lock: %v", err)
}
// Attempt to retrieve the key again to see if we have the lock, or the key was populated
if lockOwner, cacheReferences, err := c.GetGitReferences(repo); err != nil || cacheReferences != nil {
if cacheReferences != nil {
// Someone else populated the key
*references = *cacheReferences
}
return false, myLockId, err
} else if lockOwner == myLockId {
// We have the lock, populate the key
return true, myLockId, nil
}
// Wait for lock, valid value, or timeout
time.Sleep(1 * time.Second)
}
// Timeout waiting for lock
log.Debug("Repository cache was unable to acquire lock or valid data within timeout")
return true, myLockId, nil
}

// UnlockGitReferences unlocks the key for the Git repository references if needed
func (c *Cache) UnlockGitReferences(repo string, lockId string) error {
var input [][2]string
var err error
if err = c.cache.GetItem(gitRefsKey(repo), &input); err == nil &&
len(input) > 0 &&
len(input[0]) > 1 &&
input[0][0] == cacheutil.CacheLockedValue &&
input[0][1] == lockId {
// We have the lock, so remove it
return c.cache.SetItem(gitRefsKey(repo), input, &cacheutil.CacheActionOpts{Delete: true})
}
return err
}

// refSourceCommitSHAs is a list of resolved revisions for each ref source. This allows us to invalidate the cache
Expand Down Expand Up @@ -274,11 +380,19 @@ func (c *Cache) SetManifests(revision string, appSrc *appv1.ApplicationSource, s
res.CacheEntryHash = hash
}

return c.cache.SetItem(manifestCacheKey(revision, appSrc, srcRefs, namespace, trackingMethod, appLabelKey, appName, clusterInfo, refSourceCommitSHAs), res, c.repoCacheExpiration, res == nil)
return c.cache.SetItem(
manifestCacheKey(revision, appSrc, srcRefs, namespace, trackingMethod, appLabelKey, appName, clusterInfo, refSourceCommitSHAs),
res,
&cacheutil.CacheActionOpts{
Expiration: c.repoCacheExpiration,
Delete: res == nil})
}

func (c *Cache) DeleteManifests(revision string, appSrc *appv1.ApplicationSource, srcRefs appv1.RefTargetRevisionMapping, clusterInfo ClusterRuntimeInfo, namespace, trackingMethod, appLabelKey, appName string, refSourceCommitSHAs ResolvedRevisions) error {
return c.cache.SetItem(manifestCacheKey(revision, appSrc, srcRefs, namespace, trackingMethod, appLabelKey, appName, clusterInfo, refSourceCommitSHAs), "", c.repoCacheExpiration, true)
return c.cache.SetItem(
manifestCacheKey(revision, appSrc, srcRefs, namespace, trackingMethod, appLabelKey, appName, clusterInfo, refSourceCommitSHAs),
"",
&cacheutil.CacheActionOpts{Delete: true})
}

func appDetailsCacheKey(revision string, appSrc *appv1.ApplicationSource, srcRefs appv1.RefTargetRevisionMapping, trackingMethod appv1.TrackingMethod, refSourceCommitSHAs ResolvedRevisions) string {
Expand All @@ -293,7 +407,12 @@ func (c *Cache) GetAppDetails(revision string, appSrc *appv1.ApplicationSource,
}

func (c *Cache) SetAppDetails(revision string, appSrc *appv1.ApplicationSource, srcRefs appv1.RefTargetRevisionMapping, res *apiclient.RepoAppDetailsResponse, trackingMethod appv1.TrackingMethod, refSourceCommitSHAs ResolvedRevisions) error {
return c.cache.SetItem(appDetailsCacheKey(revision, appSrc, srcRefs, trackingMethod, refSourceCommitSHAs), res, c.repoCacheExpiration, res == nil)
return c.cache.SetItem(
appDetailsCacheKey(revision, appSrc, srcRefs, trackingMethod, refSourceCommitSHAs),
res,
&cacheutil.CacheActionOpts{
Expiration: c.repoCacheExpiration,
Delete: res == nil})
}

func revisionMetadataKey(repoURL, revision string) string {
Expand All @@ -306,7 +425,10 @@ func (c *Cache) GetRevisionMetadata(repoURL, revision string) (*appv1.RevisionMe
}

func (c *Cache) SetRevisionMetadata(repoURL, revision string, item *appv1.RevisionMetadata) error {
return c.cache.SetItem(revisionMetadataKey(repoURL, revision), item, c.repoCacheExpiration, false)
return c.cache.SetItem(
revisionMetadataKey(repoURL, revision),
item,
&cacheutil.CacheActionOpts{Expiration: c.repoCacheExpiration})
}

func revisionChartDetailsKey(repoURL, chart, revision string) string {
Expand All @@ -319,15 +441,21 @@ func (c *Cache) GetRevisionChartDetails(repoURL, chart, revision string) (*appv1
}

func (c *Cache) SetRevisionChartDetails(repoURL, chart, revision string, item *appv1.ChartDetails) error {
return c.cache.SetItem(revisionChartDetailsKey(repoURL, chart, revision), item, c.repoCacheExpiration, false)
return c.cache.SetItem(
revisionChartDetailsKey(repoURL, chart, revision),
item,
&cacheutil.CacheActionOpts{Expiration: c.repoCacheExpiration})
}

func gitFilesKey(repoURL, revision, pattern string) string {
return fmt.Sprintf("gitfiles|%s|%s|%s", repoURL, revision, pattern)
}

func (c *Cache) SetGitFiles(repoURL, revision, pattern string, files map[string][]byte) error {
return c.cache.SetItem(gitFilesKey(repoURL, revision, pattern), &files, c.repoCacheExpiration, false)
return c.cache.SetItem(
gitFilesKey(repoURL, revision, pattern),
&files,
&cacheutil.CacheActionOpts{Expiration: c.repoCacheExpiration})
}

func (c *Cache) GetGitFiles(repoURL, revision, pattern string) (map[string][]byte, error) {
Expand All @@ -340,7 +468,10 @@ func gitDirectoriesKey(repoURL, revision string) string {
}

func (c *Cache) SetGitDirectories(repoURL, revision string, directories []string) error {
return c.cache.SetItem(gitDirectoriesKey(repoURL, revision), &directories, c.repoCacheExpiration, false)
return c.cache.SetItem(
gitDirectoriesKey(repoURL, revision),
&directories,
&cacheutil.CacheActionOpts{Expiration: c.repoCacheExpiration})
}

func (c *Cache) GetGitDirectories(repoURL, revision string) ([]string, error) {
Expand Down
Loading

0 comments on commit fd0fa44

Please sign in to comment.