diff --git a/clients/metadata/metadata_cassandra_test.go b/clients/metadata/metadata_cassandra_test.go index bb4e345e..aab9c6b8 100644 --- a/clients/metadata/metadata_cassandra_test.go +++ b/clients/metadata/metadata_cassandra_test.go @@ -52,9 +52,7 @@ type CassandraSuite struct { TestCluster } -const ( - testPageSize = 2 -) +const testPageSize = 2 func TestCassandraSuite(t *testing.T) { s := new(CassandraSuite) @@ -1758,6 +1756,7 @@ func (s *CassandraSuite) TestConsumerGroupCRUD() { options := make(map[string]string) options[common.FlagDisableNackThrottling] = "true" + options[common.FlagEnableSmartRetry] = "false" cgName := s.generateName("/foo/bar_consumer") @@ -1836,6 +1835,7 @@ func (s *CassandraSuite) TestConsumerGroupCRUD() { assertConsumerGroupsEqual(s, expectedCG, gotCG) options[common.FlagDisableNackThrottling] = "false" + options[common.FlagEnableSmartRetry] = "true" readReq.ConsumerGroupUUID = common.StringPtr(gotCG.GetConsumerGroupUUID()) gotCG, err = s.client.ReadConsumerGroupByUUID(nil, readReq) @@ -2113,6 +2113,7 @@ func (s *CassandraSuite) TestListAllConsumerGroups() { options := make(map[string]string) options[common.FlagDisableNackThrottling] = "true" + options[common.FlagEnableSmartRetry] = "false" for i := 0; i < 10; i++ { name := s.generateName(fmt.Sprintf("foobar-consumer-%v", i)) diff --git a/cmd/tools/common/lib.go b/cmd/tools/common/lib.go index 77d6f7d0..aaadda8a 100644 --- a/cmd/tools/common/lib.go +++ b/cmd/tools/common/lib.go @@ -224,6 +224,10 @@ func SetCommonCommands( Name: common.FlagDisableNackThrottling, Usage: "Disable nack throttling for consumer group", }, + cli.BoolFlag{ + Name: common.FlagEnableSmartRetry, + Usage: "Enable smart retry for consumer group", + }, }, Action: func(c *cli.Context) { if authEnabled { @@ -374,6 +378,10 @@ func SetCommonCommands( Name: common.FlagDisableNackThrottling, Usage: "Disable nack throttling for consumer group", }, + cli.BoolFlag{ + Name: common.FlagEnableSmartRetry, + Usage: "Enable smart retry for consumer group", + }, }, Action: func(c *cli.Context) { if authEnabled { diff --git a/common/constants.go b/common/constants.go index f91f3122..cb989957 100644 --- a/common/constants.go +++ b/common/constants.go @@ -74,4 +74,7 @@ const ( const ( // FlagDisableNackThrottling is the flag string for disabling Nack throttling FlagDisableNackThrottling = "disable_nack_throttling" + + // FlagEnableSmartRetry is the flag string for enabling smart retry + FlagEnableSmartRetry = "enable_smart_retry" ) diff --git a/services/frontendhost/frontend_test.go b/services/frontendhost/frontend_test.go index 6546f135..a4182cc4 100644 --- a/services/frontendhost/frontend_test.go +++ b/services/frontendhost/frontend_test.go @@ -829,6 +829,7 @@ func (s *FrontendHostSuite) TestFrontendHostCreateConsumerGroup() { options := make(map[string]string) options[common.FlagDisableNackThrottling] = "true" + options[common.FlagEnableSmartRetry] = "false" req := c.NewCreateConsumerGroupRequest() req.DestinationPath = common.StringPtr(testPath) @@ -1134,6 +1135,7 @@ func (s *FrontendHostSuite) TestFrontendHostUpdateConsumerGroup() { options := make(map[string]string) options[common.FlagDisableNackThrottling] = "true" + options[common.FlagEnableSmartRetry] = "false" req := new(c.UpdateConsumerGroupRequest) req.DestinationPath = common.StringPtr(testPath) diff --git a/services/outputhost/messagecache.go b/services/outputhost/messagecache.go index a2b84cff..c6c3d3a7 100644 --- a/services/outputhost/messagecache.go +++ b/services/outputhost/messagecache.go @@ -1075,10 +1075,16 @@ func (msgCache *cgMsgCache) checkTimer() { func (msgCache *cgMsgCache) isStalled() bool { var m3St m3HealthState - var smartRetryDisabled bool + var smartRetryEnabled bool - if atomic.LoadInt32(&msgCache.cgCache.dlqMerging) > 0 || strings.Contains(msgCache.GetOwnerEmail(), SmartRetryDisableString) { - smartRetryDisabled = true + smartRetryEnabledFlag, ok := msgCache.cgCache.cachedCGDesc.Options[common.FlagEnableSmartRetry] + if ok && smartRetryEnabledFlag == "true" { + smartRetryEnabled = true + } + + if atomic.LoadInt32(&msgCache.cgCache.dlqMerging) > 0 || + strings.Contains(msgCache.GetOwnerEmail(), SmartRetryDisableString) { + smartRetryEnabled = false } now := common.Now() @@ -1184,7 +1190,7 @@ func (msgCache *cgMsgCache) isStalled() bool { msgCache.consumerM3Client.UpdateGauge(metrics.ConsConnectionScope, metrics.OutputhostCGHealthState, int64(m3St)) msgCache.consumerM3Client.UpdateGauge(metrics.ConsConnectionScope, metrics.OutputhostCGOutstandingDeliveries, int64(msgCache.countStateDelivered+msgCache.countStateEarlyNACK)) - if smartRetryDisabled { + if !smartRetryEnabled { if stalled { msgCache.lclLg.Warn("Smart Retry disabled while consumer group stalled") } diff --git a/services/outputhost/messagecache_test.go b/services/outputhost/messagecache_test.go index c820877a..85d2cae6 100644 --- a/services/outputhost/messagecache_test.go +++ b/services/outputhost/messagecache_test.go @@ -55,6 +55,9 @@ func (s *MessageCacheSuite) SetupTest() { cgStatus := shared.ConsumerGroupStatus_ENABLED + options := make(map[string]string) + options[common.FlagEnableSmartRetry] = "false" + cgDesc := &shared.ConsumerGroupDescription{ ConsumerGroupUUID: common.StringPtr(uuid.New()), DestinationUUID: common.StringPtr(uuid.New()), @@ -62,7 +65,8 @@ func (s *MessageCacheSuite) SetupTest() { Status: &cgStatus, MaxDeliveryCount: common.Int32Ptr(10), LockTimeoutSeconds: common.Int32Ptr(1), - OwnerEmail: common.StringPtr("foo+smartRetryDisable@uber.com"), + OwnerEmail: common.StringPtr("foo@uber.com"), + Options: options, } s.msgRedeliveryCh = make(chan *cherami.ConsumerMessage, 128) diff --git a/tools/common/lib.go b/tools/common/lib.go index f47c6fa5..6f4f97d9 100644 --- a/tools/common/lib.go +++ b/tools/common/lib.go @@ -503,11 +503,15 @@ func CreateConsumerGroupSecure( isMultiZone := len(zoneConfigs.GetConfigs()) > 0 options := make(map[string]string) - disableNackThrottling := strings.ToLower(c.String(common.FlagDisableNackThrottling)) - if disableNackThrottling == "true" { + + if c.Bool(common.FlagDisableNackThrottling) { options[common.FlagDisableNackThrottling] = "true" } + if c.Bool(common.FlagEnableSmartRetry) { + options[common.FlagEnableSmartRetry] = "true" + } + desc, err := cClient.CreateConsumerGroup(&cherami.CreateConsumerGroupRequest{ DestinationPath: &path, ConsumerGroupName: &name, @@ -573,6 +577,16 @@ func getCgZoneConfigs(c *cli.Context, mClient mcli.Client, cliHelper common.CliH return zoneConfigs } +func getCgFromMedatada(mClient mcli.Client, path string, name string) *shared.ConsumerGroupDescription { + cg, err := mClient.ReadConsumerGroup(&shared.ReadConsumerGroupRequest{ + DestinationPath: common.StringPtr(path), + ConsumerGroupName: common.StringPtr(name), + }) + ExitIfError(err) + + return cg +} + // UpdateConsumerGroup updates the consumer group func UpdateConsumerGroup(c *cli.Context, cliHelper common.CliHelper, serviceName string) { UpdateConsumerGroupSecure(c, cliHelper, serviceName, nil) @@ -633,7 +647,7 @@ func UpdateConsumerGroupSecure( OwnerEmail: getIfSetString(c, `owner_email`, &setCount), ActiveZone: getIfSetString(c, `active_zone`, &setCount), ZoneConfigs: getIfSetCgZoneConfig(c, mClient, cliHelper, path, &setCount), - Options: getIfSetOptions(c, &setCount), + Options: getIfSetOptions(c, mClient, path, name, &setCount), } if c.IsSet(`status`) { @@ -1640,15 +1654,27 @@ func getIfSetCgZoneConfig(c *cli.Context, mClient mcli.Client, cliHelper common. return } -func getIfSetOptions(c *cli.Context, setCount *int) (options map[string]string) { - if c.IsSet(common.FlagDisableNackThrottling) { - disableNackThrottling := "false" - if strings.ToLower(c.String(common.FlagDisableNackThrottling)) == "true" { - disableNackThrottling = "true" +func getIfSetOptions(c *cli.Context, mClient mcli.Client, path string, name string, setCount *int) (options map[string]string) { + if c.IsSet(common.FlagDisableNackThrottling) || c.IsSet(common.FlagEnableSmartRetry) { + cg := getCgFromMedatada(mClient, path, name) + options = cg.Options + + if c.IsSet(common.FlagDisableNackThrottling) { + if c.Bool(common.FlagDisableNackThrottling) { + options[common.FlagDisableNackThrottling] = "true" + } else { + options[common.FlagDisableNackThrottling] = "false" + } + } + + if c.IsSet(common.FlagEnableSmartRetry) { + if c.Bool(common.FlagEnableSmartRetry) { + options[common.FlagEnableSmartRetry] = "true" + } else { + options[common.FlagEnableSmartRetry] = "false" + } } - options = make(map[string]string) - options[common.FlagDisableNackThrottling] = disableNackThrottling *setCount++ return options }