Skip to content
This repository has been archived by the owner on Feb 18, 2021. It is now read-only.

Hack -> Config: add options to disable smart retry #281

Merged
merged 3 commits into from
Sep 1, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions clients/metadata/metadata_cassandra_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,7 @@ type CassandraSuite struct {
TestCluster
}

const (
testPageSize = 2
)
const testPageSize = 2

func TestCassandraSuite(t *testing.T) {
s := new(CassandraSuite)
Expand Down Expand Up @@ -1758,6 +1756,7 @@ func (s *CassandraSuite) TestConsumerGroupCRUD() {

options := make(map[string]string)
options[common.FlagDisableNackThrottling] = "true"
options[common.FlagEnableSmartRetry] = "false"

cgName := s.generateName("/foo/bar_consumer")

Expand Down Expand Up @@ -1836,6 +1835,7 @@ func (s *CassandraSuite) TestConsumerGroupCRUD() {
assertConsumerGroupsEqual(s, expectedCG, gotCG)

options[common.FlagDisableNackThrottling] = "false"
options[common.FlagEnableSmartRetry] = "true"

readReq.ConsumerGroupUUID = common.StringPtr(gotCG.GetConsumerGroupUUID())
gotCG, err = s.client.ReadConsumerGroupByUUID(nil, readReq)
Expand Down Expand Up @@ -2113,6 +2113,7 @@ func (s *CassandraSuite) TestListAllConsumerGroups() {

options := make(map[string]string)
options[common.FlagDisableNackThrottling] = "true"
options[common.FlagEnableSmartRetry] = "false"

for i := 0; i < 10; i++ {
name := s.generateName(fmt.Sprintf("foobar-consumer-%v", i))
Expand Down
8 changes: 8 additions & 0 deletions cmd/tools/common/lib.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,10 @@ func SetCommonCommands(
Name: common.FlagDisableNackThrottling,
Usage: "Disable nack throttling for consumer group",
},
cli.BoolFlag{
Name: common.FlagEnableSmartRetry,
Usage: "Enable smart retry for consumer group",
},
},
Action: func(c *cli.Context) {
if authEnabled {
Expand Down Expand Up @@ -374,6 +378,10 @@ func SetCommonCommands(
Name: common.FlagDisableNackThrottling,
Usage: "Disable nack throttling for consumer group",
},
cli.BoolFlag{
Name: common.FlagEnableSmartRetry,
Usage: "Enable smart retry for consumer group",
},
},
Action: func(c *cli.Context) {
if authEnabled {
Expand Down
3 changes: 3 additions & 0 deletions common/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,4 +74,7 @@ const (
const (
// FlagDisableNackThrottling is the flag string for disabling Nack throttling
FlagDisableNackThrottling = "disable_nack_throttling"

// FlagEnableSmartRetry is the flag string for enabling smart retry
FlagEnableSmartRetry = "enable_smart_retry"
)
2 changes: 2 additions & 0 deletions services/frontendhost/frontend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -829,6 +829,7 @@ func (s *FrontendHostSuite) TestFrontendHostCreateConsumerGroup() {

options := make(map[string]string)
options[common.FlagDisableNackThrottling] = "true"
options[common.FlagEnableSmartRetry] = "false"

req := c.NewCreateConsumerGroupRequest()
req.DestinationPath = common.StringPtr(testPath)
Expand Down Expand Up @@ -1134,6 +1135,7 @@ func (s *FrontendHostSuite) TestFrontendHostUpdateConsumerGroup() {

options := make(map[string]string)
options[common.FlagDisableNackThrottling] = "true"
options[common.FlagEnableSmartRetry] = "false"

req := new(c.UpdateConsumerGroupRequest)
req.DestinationPath = common.StringPtr(testPath)
Expand Down
14 changes: 10 additions & 4 deletions services/outputhost/messagecache.go
Original file line number Diff line number Diff line change
Expand Up @@ -1075,10 +1075,16 @@ func (msgCache *cgMsgCache) checkTimer() {

func (msgCache *cgMsgCache) isStalled() bool {
var m3St m3HealthState
var smartRetryDisabled bool
var smartRetryEnabled bool

if atomic.LoadInt32(&msgCache.cgCache.dlqMerging) > 0 || strings.Contains(msgCache.GetOwnerEmail(), SmartRetryDisableString) {
smartRetryDisabled = true
smartRetryEnabledFlag, ok := msgCache.cgCache.cachedCGDesc.Options[common.FlagEnableSmartRetry]
if ok && smartRetryEnabledFlag == "true" {
smartRetryEnabled = true
}

if atomic.LoadInt32(&msgCache.cgCache.dlqMerging) > 0 ||
strings.Contains(msgCache.GetOwnerEmail(), SmartRetryDisableString) {
smartRetryEnabled = false
}

now := common.Now()
Expand Down Expand Up @@ -1184,7 +1190,7 @@ func (msgCache *cgMsgCache) isStalled() bool {
msgCache.consumerM3Client.UpdateGauge(metrics.ConsConnectionScope, metrics.OutputhostCGHealthState, int64(m3St))
msgCache.consumerM3Client.UpdateGauge(metrics.ConsConnectionScope, metrics.OutputhostCGOutstandingDeliveries, int64(msgCache.countStateDelivered+msgCache.countStateEarlyNACK))

if smartRetryDisabled {
if !smartRetryEnabled {
if stalled {
msgCache.lclLg.Warn("Smart Retry disabled while consumer group stalled")
}
Expand Down
6 changes: 5 additions & 1 deletion services/outputhost/messagecache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,18 @@ func (s *MessageCacheSuite) SetupTest() {

cgStatus := shared.ConsumerGroupStatus_ENABLED

options := make(map[string]string)
options[common.FlagEnableSmartRetry] = "false"

cgDesc := &shared.ConsumerGroupDescription{
ConsumerGroupUUID: common.StringPtr(uuid.New()),
DestinationUUID: common.StringPtr(uuid.New()),
ConsumerGroupName: common.StringPtr("/unittest/msgCache_cg"),
Status: &cgStatus,
MaxDeliveryCount: common.Int32Ptr(10),
LockTimeoutSeconds: common.Int32Ptr(1),
OwnerEmail: common.StringPtr("foo+smartRetryDisable@uber.com"),
OwnerEmail: common.StringPtr("foo@uber.com"),
Options: options,
}

s.msgRedeliveryCh = make(chan *cherami.ConsumerMessage, 128)
Expand Down
46 changes: 36 additions & 10 deletions tools/common/lib.go
Original file line number Diff line number Diff line change
Expand Up @@ -503,11 +503,15 @@ func CreateConsumerGroupSecure(
isMultiZone := len(zoneConfigs.GetConfigs()) > 0

options := make(map[string]string)
disableNackThrottling := strings.ToLower(c.String(common.FlagDisableNackThrottling))
if disableNackThrottling == "true" {

if c.Bool(common.FlagDisableNackThrottling) {
options[common.FlagDisableNackThrottling] = "true"
}

if c.Bool(common.FlagEnableSmartRetry) {
options[common.FlagEnableSmartRetry] = "true"
}

desc, err := cClient.CreateConsumerGroup(&cherami.CreateConsumerGroupRequest{
DestinationPath: &path,
ConsumerGroupName: &name,
Expand Down Expand Up @@ -573,6 +577,16 @@ func getCgZoneConfigs(c *cli.Context, mClient mcli.Client, cliHelper common.CliH
return zoneConfigs
}

func getCgFromMedatada(mClient mcli.Client, path string, name string) *shared.ConsumerGroupDescription {
cg, err := mClient.ReadConsumerGroup(&shared.ReadConsumerGroupRequest{
DestinationPath: common.StringPtr(path),
ConsumerGroupName: common.StringPtr(name),
})
ExitIfError(err)

return cg
}

// UpdateConsumerGroup updates the consumer group
func UpdateConsumerGroup(c *cli.Context, cliHelper common.CliHelper, serviceName string) {
UpdateConsumerGroupSecure(c, cliHelper, serviceName, nil)
Expand Down Expand Up @@ -633,7 +647,7 @@ func UpdateConsumerGroupSecure(
OwnerEmail: getIfSetString(c, `owner_email`, &setCount),
ActiveZone: getIfSetString(c, `active_zone`, &setCount),
ZoneConfigs: getIfSetCgZoneConfig(c, mClient, cliHelper, path, &setCount),
Options: getIfSetOptions(c, &setCount),
Options: getIfSetOptions(c, mClient, path, name, &setCount),
}

if c.IsSet(`status`) {
Expand Down Expand Up @@ -1640,15 +1654,27 @@ func getIfSetCgZoneConfig(c *cli.Context, mClient mcli.Client, cliHelper common.
return
}

func getIfSetOptions(c *cli.Context, setCount *int) (options map[string]string) {
if c.IsSet(common.FlagDisableNackThrottling) {
disableNackThrottling := "false"
if strings.ToLower(c.String(common.FlagDisableNackThrottling)) == "true" {
disableNackThrottling = "true"
func getIfSetOptions(c *cli.Context, mClient mcli.Client, path string, name string, setCount *int) (options map[string]string) {
if c.IsSet(common.FlagDisableNackThrottling) || c.IsSet(common.FlagEnableSmartRetry) {
cg := getCgFromMedatada(mClient, path, name)
options = cg.Options

if c.IsSet(common.FlagDisableNackThrottling) {
if c.Bool(common.FlagDisableNackThrottling) {
options[common.FlagDisableNackThrottling] = "true"
} else {
options[common.FlagDisableNackThrottling] = "false"
}
}

if c.IsSet(common.FlagEnableSmartRetry) {
if c.Bool(common.FlagEnableSmartRetry) {
options[common.FlagEnableSmartRetry] = "true"
} else {
options[common.FlagEnableSmartRetry] = "false"
}
}

options = make(map[string]string)
options[common.FlagDisableNackThrottling] = disableNackThrottling
*setCount++
return options
}
Expand Down