Skip to content
This repository has been archived by the owner on Feb 18, 2021. It is now read-only.

Commit

Permalink
Change smart retry to disabled by default
Browse files Browse the repository at this point in the history
  • Loading branch information
kobeyang committed Aug 30, 2017
1 parent eac12a9 commit 74cf8f9
Show file tree
Hide file tree
Showing 7 changed files with 51 additions and 35 deletions.
6 changes: 3 additions & 3 deletions clients/metadata/metadata_cassandra_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1756,7 +1756,7 @@ func (s *CassandraSuite) TestConsumerGroupCRUD() {

options := make(map[string]string)
options[common.FlagDisableNackThrottling] = "true"
options[common.FlagDisableSmartRetry] = "true"
options[common.FlagEnableSmartRetry] = "false"

cgName := s.generateName("/foo/bar_consumer")

Expand Down Expand Up @@ -1835,7 +1835,7 @@ func (s *CassandraSuite) TestConsumerGroupCRUD() {
assertConsumerGroupsEqual(s, expectedCG, gotCG)

options[common.FlagDisableNackThrottling] = "false"
options[common.FlagDisableSmartRetry] = "false"
options[common.FlagEnableSmartRetry] = "true"

readReq.ConsumerGroupUUID = common.StringPtr(gotCG.GetConsumerGroupUUID())
gotCG, err = s.client.ReadConsumerGroupByUUID(nil, readReq)
Expand Down Expand Up @@ -2113,7 +2113,7 @@ func (s *CassandraSuite) TestListAllConsumerGroups() {

options := make(map[string]string)
options[common.FlagDisableNackThrottling] = "true"
options[common.FlagDisableSmartRetry] = "true"
options[common.FlagEnableSmartRetry] = "false"

for i := 0; i < 10; i++ {
name := s.generateName(fmt.Sprintf("foobar-consumer-%v", i))
Expand Down
8 changes: 4 additions & 4 deletions cmd/tools/common/lib.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,8 +225,8 @@ func SetCommonCommands(
Usage: "Disable nack throttling for consumer group",
},
cli.BoolFlag{
Name: common.FlagDisableSmartRetry,
Usage: "Disable smart retry for consumer group",
Name: common.FlagEnableSmartRetry,
Usage: "Enable smart retry for consumer group",
},
},
Action: func(c *cli.Context) {
Expand Down Expand Up @@ -379,8 +379,8 @@ func SetCommonCommands(
Usage: "Disable nack throttling for consumer group",
},
cli.BoolFlag{
Name: common.FlagDisableSmartRetry,
Usage: "Disable smart retry for consumer group",
Name: common.FlagEnableSmartRetry,
Usage: "Enable smart retry for consumer group",
},
},
Action: func(c *cli.Context) {
Expand Down
4 changes: 2 additions & 2 deletions common/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,6 @@ const (
// FlagDisableNackThrottling is the flag string for disabling Nack throttling
FlagDisableNackThrottling = "disable_nack_throttling"

// FlagDisableSmartRetry is the flag string for disabling smart retry
FlagDisableSmartRetry = "disable_smart_retry"
// FlagEnableSmartRetry is the flag string for enabling smart retry
FlagEnableSmartRetry = "enable_smart_retry"
)
4 changes: 2 additions & 2 deletions services/frontendhost/frontend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -829,7 +829,7 @@ func (s *FrontendHostSuite) TestFrontendHostCreateConsumerGroup() {

options := make(map[string]string)
options[common.FlagDisableNackThrottling] = "true"
options[common.FlagDisableSmartRetry] = "true"
options[common.FlagEnableSmartRetry] = "false"

req := c.NewCreateConsumerGroupRequest()
req.DestinationPath = common.StringPtr(testPath)
Expand Down Expand Up @@ -1135,7 +1135,7 @@ func (s *FrontendHostSuite) TestFrontendHostUpdateConsumerGroup() {

options := make(map[string]string)
options[common.FlagDisableNackThrottling] = "true"
options[common.FlagDisableSmartRetry] = "true"
options[common.FlagEnableSmartRetry] = "false"

req := new(c.UpdateConsumerGroupRequest)
req.DestinationPath = common.StringPtr(testPath)
Expand Down
14 changes: 8 additions & 6 deletions services/outputhost/messagecache.go
Original file line number Diff line number Diff line change
Expand Up @@ -1075,14 +1075,16 @@ func (msgCache *cgMsgCache) checkTimer() {

func (msgCache *cgMsgCache) isStalled() bool {
var m3St m3HealthState
var smartRetryDisabled bool
var smartRetryEnabled bool

smartRetryDisabledFlag, _ := msgCache.cgCache.cachedCGDesc.Options[common.FlagDisableSmartRetry]
smartRetryEnabledFlag, ok := msgCache.cgCache.cachedCGDesc.Options[common.FlagEnableSmartRetry]
if ok && smartRetryEnabledFlag == "true" {
smartRetryEnabled = true
}

if atomic.LoadInt32(&msgCache.cgCache.dlqMerging) > 0 ||
strings.Contains(msgCache.GetOwnerEmail(), SmartRetryDisableString) ||
smartRetryDisabledFlag == "true" {
smartRetryDisabled = true
strings.Contains(msgCache.GetOwnerEmail(), SmartRetryDisableString) {
smartRetryEnabled = false
}

now := common.Now()
Expand Down Expand Up @@ -1188,7 +1190,7 @@ func (msgCache *cgMsgCache) isStalled() bool {
msgCache.consumerM3Client.UpdateGauge(metrics.ConsConnectionScope, metrics.OutputhostCGHealthState, int64(m3St))
msgCache.consumerM3Client.UpdateGauge(metrics.ConsConnectionScope, metrics.OutputhostCGOutstandingDeliveries, int64(msgCache.countStateDelivered+msgCache.countStateEarlyNACK))

if smartRetryDisabled {
if !smartRetryEnabled {
if stalled {
msgCache.lclLg.Warn("Smart Retry disabled while consumer group stalled")
}
Expand Down
2 changes: 1 addition & 1 deletion services/outputhost/messagecache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (s *MessageCacheSuite) SetupTest() {
cgStatus := shared.ConsumerGroupStatus_ENABLED

options := make(map[string]string)
options[common.FlagDisableSmartRetry] = "true"
options[common.FlagEnableSmartRetry] = "false"

cgDesc := &shared.ConsumerGroupDescription{
ConsumerGroupUUID: common.StringPtr(uuid.New()),
Expand Down
48 changes: 31 additions & 17 deletions tools/common/lib.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,14 +504,12 @@ func CreateConsumerGroupSecure(

options := make(map[string]string)

disableNackThrottling := c.Bool(common.FlagDisableNackThrottling)
if disableNackThrottling == true {
if c.Bool(common.FlagDisableNackThrottling) == true {
options[common.FlagDisableNackThrottling] = "true"
}

disableSmartRetry := c.Bool(common.FlagDisableSmartRetry)
if disableSmartRetry == true {
options[common.FlagDisableSmartRetry] = "true"
if c.Bool(common.FlagEnableSmartRetry) == true {
options[common.FlagEnableSmartRetry] = "true"
}

desc, err := cClient.CreateConsumerGroup(&cherami.CreateConsumerGroupRequest{
Expand Down Expand Up @@ -579,6 +577,16 @@ func getCgZoneConfigs(c *cli.Context, mClient mcli.Client, cliHelper common.CliH
return zoneConfigs
}

func getCgFromMedatada(mClient mcli.Client, path string, name string) *shared.ConsumerGroupDescription {
cg, err := mClient.ReadConsumerGroup(&shared.ReadConsumerGroupRequest{
DestinationPath: common.StringPtr(path),
ConsumerGroupName: common.StringPtr(name),
})
ExitIfError(err)

return cg
}

// UpdateConsumerGroup updates the consumer group
func UpdateConsumerGroup(c *cli.Context, cliHelper common.CliHelper, serviceName string) {
UpdateConsumerGroupSecure(c, cliHelper, serviceName, nil)
Expand Down Expand Up @@ -639,7 +647,7 @@ func UpdateConsumerGroupSecure(
OwnerEmail: getIfSetString(c, `owner_email`, &setCount),
ActiveZone: getIfSetString(c, `active_zone`, &setCount),
ZoneConfigs: getIfSetCgZoneConfig(c, mClient, cliHelper, path, &setCount),
Options: getIfSetOptions(c, &setCount),
Options: getIfSetOptions(c, mClient, path, name, &setCount),
}

if c.IsSet(`status`) {
Expand Down Expand Up @@ -1646,21 +1654,27 @@ func getIfSetCgZoneConfig(c *cli.Context, mClient mcli.Client, cliHelper common.
return
}

func getIfSetOptions(c *cli.Context, setCount *int) (options map[string]string) {
if c.IsSet(common.FlagDisableNackThrottling) || c.IsSet(common.FlagDisableSmartRetry) {
disableNackThrottling := "false"
if c.Bool(common.FlagDisableNackThrottling) == true {
disableNackThrottling = "true"
func getIfSetOptions(c *cli.Context, mClient mcli.Client, path string, name string, setCount *int) (options map[string]string) {
if c.IsSet(common.FlagDisableNackThrottling) || c.IsSet(common.FlagEnableSmartRetry) {
cg := getCgFromMedatada(mClient, path, name)
options = cg.Options

if c.IsSet(common.FlagDisableNackThrottling) {
if c.Bool(common.FlagDisableNackThrottling) == true {
options[common.FlagDisableNackThrottling] = "true"
} else {
options[common.FlagDisableNackThrottling] = "false"
}
}

disableSmartRetry := "false"
if c.Bool(common.FlagDisableSmartRetry) == true {
disableSmartRetry = "true"
if c.IsSet(common.FlagEnableSmartRetry) {
if c.Bool(common.FlagEnableSmartRetry) == true {
options[common.FlagEnableSmartRetry] = "true"
} else {
options[common.FlagEnableSmartRetry] = "false"
}
}

options = make(map[string]string)
options[common.FlagDisableNackThrottling] = disableNackThrottling
options[common.FlagDisableSmartRetry] = disableSmartRetry
*setCount++
return options
}
Expand Down

0 comments on commit 74cf8f9

Please sign in to comment.