Skip to content

Commit

Permalink
Merging Develop into Main (#868)
Browse files Browse the repository at this point in the history
  • Loading branch information
KarnveerSGill authored Dec 14, 2023
1 parent 2d424b9 commit b4078bd
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 25 deletions.
4 changes: 2 additions & 2 deletions trcflow/core/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -650,8 +650,8 @@ func (tfmContext *TrcFlowMachineContext) SyncTableCycle(tfContext *TrcFlowContex

// Second row here
// Not sure if necessary to copy entire ReportStatistics method
//tenantIndexPath, tenantDFSIdPath := utilcore.GetDFSPathName()
//df.FinishStatistic(tfmContext, tfContext, tfContext.GoMod, "flume", tenantIndexPath, tenantDFSIdPath, tfmContext.Config.Log, false)
tenantIndexPath, tenantDFSIdPath := utilcore.GetDFSPathName()
df.FinishStatistic(tfmContext, tfContext, tfContext.GoMod, "flume", tenantIndexPath, tenantDFSIdPath, tfmContext.Config.Log, false)

//df.FinishStatistic(tfmContext, tfContext, tfContext.GoMod, ...)
tfmContext.FlowControllerLock.Lock()
Expand Down
46 changes: 23 additions & 23 deletions vaulthelper/kv/Modifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,8 @@ func PreCheckEnvironment(environment string) (string, string, bool, error) {
// Any errors generated in creating the client
func NewModifier(insecure bool, token string, address string, env string, regions []string, useCache bool, logger *log.Logger) (*Modifier, error) {
if useCache {
PruneCache(env, 10)
checkoutModifier, err := cachedModifierHelper(env)
PruneCache(env, address, 10)
checkoutModifier, err := cachedModifierHelper(env, address)
if err == nil && checkoutModifier != nil {
checkoutModifier.Insecure = insecure
checkoutModifier.RawEnv = env
Expand Down Expand Up @@ -142,20 +142,20 @@ func NewModifier(insecure bool, token string, address string, env string, region
return newModifier, nil
}

func checkInitModCache(env string) {
if _, ok := modifierCache[env]; !ok {
func checkInitModCache(env string, addr string) {
if _, ok := modifierCache[fmt.Sprintf("%s+%s", env, addr)]; !ok {
modifierCachLock.Lock()
modifierCache[env] = &modCache{modCount: 0, modifierChan: make(chan *Modifier, 20)}
modifierCache[fmt.Sprintf("%s+%s", env, addr)] = &modCache{modCount: 0, modifierChan: make(chan *Modifier, 20)}
modifierCachLock.Unlock()
}
}

func cachedModifierHelper(env string) (*Modifier, error) {
checkInitModCache(env)
func cachedModifierHelper(env string, addr string) (*Modifier, error) {
checkInitModCache(env, addr)

for {
select {
case checkoutModifier := <-modifierCache[env].modifierChan:
case checkoutModifier := <-modifierCache[fmt.Sprintf("%s+%s", env, addr)].modifierChan:
atomic.AddUint64(&modifierCache[env].modCount, ^uint64(0))
return checkoutModifier, nil
case <-time.After(time.Millisecond * 200):
Expand All @@ -174,32 +174,32 @@ func (m *Modifier) Release() {
}

func (m *Modifier) releaseHelper(env string) {
checkInitModCache(env)
checkInitModCache(env, m.client.Address())

// Since modifiers are re-used now, this may not be necessary or even desired for that
// matter.
// m.httpClient.CloseIdleConnections()
if modifierCache[env].modCount > 10 {
if modifierCache[fmt.Sprintf("%s+%s", env, m.client.Address())].modCount > 10 {
m.CleanCache(10)
}

atomic.AddUint64(&modifierCache[env].modCount, 1)
modifierCache[env].modifierChan <- m
atomic.AddUint64(&modifierCache[fmt.Sprintf("%s+%s", env, m.client.Address())].modCount, 1)
modifierCache[fmt.Sprintf("%s+%s", env, m.client.Address())].modifierChan <- m
}

func (m *Modifier) RemoveFromCache() {
m.CleanCache(20)
}

func cleanCacheHelper(env string, limit uint64) {
func cleanCacheHelper(env string, addr string, limit uint64) {
modifierCachLock.Lock()
if modifierCache[env].modCount > 1 {
if modifierCache[fmt.Sprintf("%s+%s", env, addr)].modCount > 1 {
emptied:
for i := uint64(0); i < limit; i++ {
select {
case mod := <-modifierCache[env].modifierChan:
case mod := <-modifierCache[fmt.Sprintf("%s+%s", env, addr)].modifierChan:
mod.Close()
atomic.AddUint64(&modifierCache[env].modCount, ^uint64(0))
atomic.AddUint64(&modifierCache[fmt.Sprintf("%s+%s", env, addr)].modCount, ^uint64(0))
default:
break emptied
}
Expand All @@ -208,11 +208,11 @@ func cleanCacheHelper(env string, limit uint64) {
modifierCachLock.Unlock()
}

func PruneCache(env string, limit uint64) {
if modifierCache != nil && modifierCache[env] != nil {
if modifierCache[env].modCount > limit {
if _, ok := modifierCache[env]; ok {
cleanCacheHelper(env, limit)
func PruneCache(env string, addr string, limit uint64) {
if modifierCache != nil && modifierCache[fmt.Sprintf("%s+%s", env, addr)] != nil {
if modifierCache[fmt.Sprintf("%s+%s", env, addr)].modCount > limit {
if _, ok := modifierCache[fmt.Sprintf("%s+%s", env, addr)]; ok {
cleanCacheHelper(env, addr, limit)
}
}
}
Expand All @@ -221,9 +221,9 @@ func PruneCache(env string, limit uint64) {
func (m *Modifier) CleanCache(limit uint64) {
m.Close()
if _, ok := modifierCache[m.Env]; ok {
cleanCacheHelper(m.Env, limit)
cleanCacheHelper(m.Env, m.client.Address(), limit)
} else {
cleanCacheHelper(m.RawEnv, limit)
cleanCacheHelper(m.RawEnv, m.client.Address(), limit)
}
}

Expand Down

0 comments on commit b4078bd

Please sign in to comment.