Skip to content
This repository has been archived by the owner on Feb 18, 2021. It is now read-only.

Commit

Permalink
Hack -> Config: add options to disable smart retry (#281)
Browse files Browse the repository at this point in the history
  • Loading branch information
kobeyang authored Sep 1, 2017
1 parent dd03fe5 commit c293949
Show file tree
Hide file tree
Showing 7 changed files with 68 additions and 18 deletions.
7 changes: 4 additions & 3 deletions clients/metadata/metadata_cassandra_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,7 @@ type CassandraSuite struct {
TestCluster
}

const (
testPageSize = 2
)
const testPageSize = 2

func TestCassandraSuite(t *testing.T) {
s := new(CassandraSuite)
Expand Down Expand Up @@ -1758,6 +1756,7 @@ func (s *CassandraSuite) TestConsumerGroupCRUD() {

options := make(map[string]string)
options[common.FlagDisableNackThrottling] = "true"
options[common.FlagEnableSmartRetry] = "false"

cgName := s.generateName("/foo/bar_consumer")

Expand Down Expand Up @@ -1836,6 +1835,7 @@ func (s *CassandraSuite) TestConsumerGroupCRUD() {
assertConsumerGroupsEqual(s, expectedCG, gotCG)

options[common.FlagDisableNackThrottling] = "false"
options[common.FlagEnableSmartRetry] = "true"

readReq.ConsumerGroupUUID = common.StringPtr(gotCG.GetConsumerGroupUUID())
gotCG, err = s.client.ReadConsumerGroupByUUID(nil, readReq)
Expand Down Expand Up @@ -2113,6 +2113,7 @@ func (s *CassandraSuite) TestListAllConsumerGroups() {

options := make(map[string]string)
options[common.FlagDisableNackThrottling] = "true"
options[common.FlagEnableSmartRetry] = "false"

for i := 0; i < 10; i++ {
name := s.generateName(fmt.Sprintf("foobar-consumer-%v", i))
Expand Down
8 changes: 8 additions & 0 deletions cmd/tools/common/lib.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,10 @@ func SetCommonCommands(
Name: common.FlagDisableNackThrottling,
Usage: "Disable nack throttling for consumer group",
},
cli.BoolFlag{
Name: common.FlagEnableSmartRetry,
Usage: "Enable smart retry for consumer group",
},
},
Action: func(c *cli.Context) {
if authEnabled {
Expand Down Expand Up @@ -374,6 +378,10 @@ func SetCommonCommands(
Name: common.FlagDisableNackThrottling,
Usage: "Disable nack throttling for consumer group",
},
cli.BoolFlag{
Name: common.FlagEnableSmartRetry,
Usage: "Enable smart retry for consumer group",
},
},
Action: func(c *cli.Context) {
if authEnabled {
Expand Down
3 changes: 3 additions & 0 deletions common/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,4 +74,7 @@ const (
const (
// FlagDisableNackThrottling is the flag string for disabling Nack throttling
FlagDisableNackThrottling = "disable_nack_throttling"

// FlagEnableSmartRetry is the flag string for enabling smart retry
FlagEnableSmartRetry = "enable_smart_retry"
)
2 changes: 2 additions & 0 deletions services/frontendhost/frontend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -829,6 +829,7 @@ func (s *FrontendHostSuite) TestFrontendHostCreateConsumerGroup() {

options := make(map[string]string)
options[common.FlagDisableNackThrottling] = "true"
options[common.FlagEnableSmartRetry] = "false"

req := c.NewCreateConsumerGroupRequest()
req.DestinationPath = common.StringPtr(testPath)
Expand Down Expand Up @@ -1134,6 +1135,7 @@ func (s *FrontendHostSuite) TestFrontendHostUpdateConsumerGroup() {

options := make(map[string]string)
options[common.FlagDisableNackThrottling] = "true"
options[common.FlagEnableSmartRetry] = "false"

req := new(c.UpdateConsumerGroupRequest)
req.DestinationPath = common.StringPtr(testPath)
Expand Down
14 changes: 10 additions & 4 deletions services/outputhost/messagecache.go
Original file line number Diff line number Diff line change
Expand Up @@ -1075,10 +1075,16 @@ func (msgCache *cgMsgCache) checkTimer() {

func (msgCache *cgMsgCache) isStalled() bool {
var m3St m3HealthState
var smartRetryDisabled bool
var smartRetryEnabled bool

if atomic.LoadInt32(&msgCache.cgCache.dlqMerging) > 0 || strings.Contains(msgCache.GetOwnerEmail(), SmartRetryDisableString) {
smartRetryDisabled = true
smartRetryEnabledFlag, ok := msgCache.cgCache.cachedCGDesc.Options[common.FlagEnableSmartRetry]
if ok && smartRetryEnabledFlag == "true" {
smartRetryEnabled = true
}

if atomic.LoadInt32(&msgCache.cgCache.dlqMerging) > 0 ||
strings.Contains(msgCache.GetOwnerEmail(), SmartRetryDisableString) {
smartRetryEnabled = false
}

now := common.Now()
Expand Down Expand Up @@ -1184,7 +1190,7 @@ func (msgCache *cgMsgCache) isStalled() bool {
msgCache.consumerM3Client.UpdateGauge(metrics.ConsConnectionScope, metrics.OutputhostCGHealthState, int64(m3St))
msgCache.consumerM3Client.UpdateGauge(metrics.ConsConnectionScope, metrics.OutputhostCGOutstandingDeliveries, int64(msgCache.countStateDelivered+msgCache.countStateEarlyNACK))

if smartRetryDisabled {
if !smartRetryEnabled {
if stalled {
msgCache.lclLg.Warn("Smart Retry disabled while consumer group stalled")
}
Expand Down
6 changes: 5 additions & 1 deletion services/outputhost/messagecache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,18 @@ func (s *MessageCacheSuite) SetupTest() {

cgStatus := shared.ConsumerGroupStatus_ENABLED

options := make(map[string]string)
options[common.FlagEnableSmartRetry] = "false"

cgDesc := &shared.ConsumerGroupDescription{
ConsumerGroupUUID: common.StringPtr(uuid.New()),
DestinationUUID: common.StringPtr(uuid.New()),
ConsumerGroupName: common.StringPtr("/unittest/msgCache_cg"),
Status: &cgStatus,
MaxDeliveryCount: common.Int32Ptr(10),
LockTimeoutSeconds: common.Int32Ptr(1),
OwnerEmail: common.StringPtr("foo+smartRetryDisable@uber.com"),
OwnerEmail: common.StringPtr("foo@uber.com"),
Options: options,
}

s.msgRedeliveryCh = make(chan *cherami.ConsumerMessage, 128)
Expand Down
46 changes: 36 additions & 10 deletions tools/common/lib.go
Original file line number Diff line number Diff line change
Expand Up @@ -503,11 +503,15 @@ func CreateConsumerGroupSecure(
isMultiZone := len(zoneConfigs.GetConfigs()) > 0

options := make(map[string]string)
disableNackThrottling := strings.ToLower(c.String(common.FlagDisableNackThrottling))
if disableNackThrottling == "true" {

if c.Bool(common.FlagDisableNackThrottling) {
options[common.FlagDisableNackThrottling] = "true"
}

if c.Bool(common.FlagEnableSmartRetry) {
options[common.FlagEnableSmartRetry] = "true"
}

desc, err := cClient.CreateConsumerGroup(&cherami.CreateConsumerGroupRequest{
DestinationPath: &path,
ConsumerGroupName: &name,
Expand Down Expand Up @@ -573,6 +577,16 @@ func getCgZoneConfigs(c *cli.Context, mClient mcli.Client, cliHelper common.CliH
return zoneConfigs
}

func getCgFromMedatada(mClient mcli.Client, path string, name string) *shared.ConsumerGroupDescription {
cg, err := mClient.ReadConsumerGroup(&shared.ReadConsumerGroupRequest{
DestinationPath: common.StringPtr(path),
ConsumerGroupName: common.StringPtr(name),
})
ExitIfError(err)

return cg
}

// UpdateConsumerGroup updates the consumer group
func UpdateConsumerGroup(c *cli.Context, cliHelper common.CliHelper, serviceName string) {
UpdateConsumerGroupSecure(c, cliHelper, serviceName, nil)
Expand Down Expand Up @@ -633,7 +647,7 @@ func UpdateConsumerGroupSecure(
OwnerEmail: getIfSetString(c, `owner_email`, &setCount),
ActiveZone: getIfSetString(c, `active_zone`, &setCount),
ZoneConfigs: getIfSetCgZoneConfig(c, mClient, cliHelper, path, &setCount),
Options: getIfSetOptions(c, &setCount),
Options: getIfSetOptions(c, mClient, path, name, &setCount),
}

if c.IsSet(`status`) {
Expand Down Expand Up @@ -1640,15 +1654,27 @@ func getIfSetCgZoneConfig(c *cli.Context, mClient mcli.Client, cliHelper common.
return
}

func getIfSetOptions(c *cli.Context, setCount *int) (options map[string]string) {
if c.IsSet(common.FlagDisableNackThrottling) {
disableNackThrottling := "false"
if strings.ToLower(c.String(common.FlagDisableNackThrottling)) == "true" {
disableNackThrottling = "true"
func getIfSetOptions(c *cli.Context, mClient mcli.Client, path string, name string, setCount *int) (options map[string]string) {
if c.IsSet(common.FlagDisableNackThrottling) || c.IsSet(common.FlagEnableSmartRetry) {
cg := getCgFromMedatada(mClient, path, name)
options = cg.Options

if c.IsSet(common.FlagDisableNackThrottling) {
if c.Bool(common.FlagDisableNackThrottling) {
options[common.FlagDisableNackThrottling] = "true"
} else {
options[common.FlagDisableNackThrottling] = "false"
}
}

if c.IsSet(common.FlagEnableSmartRetry) {
if c.Bool(common.FlagEnableSmartRetry) {
options[common.FlagEnableSmartRetry] = "true"
} else {
options[common.FlagEnableSmartRetry] = "false"
}
}

options = make(map[string]string)
options[common.FlagDisableNackThrottling] = disableNackThrottling
*setCount++
return options
}
Expand Down

0 comments on commit c293949

Please sign in to comment.