diff --git a/services/replicator/replicator.go b/services/replicator/replicator.go index 7202b929..ff8735a5 100644 --- a/services/replicator/replicator.go +++ b/services/replicator/replicator.go @@ -39,6 +39,7 @@ import ( dconfig "github.com/uber/cherami-server/common/dconfigclient" mm "github.com/uber/cherami-server/common/metadata" "github.com/uber/cherami-server/common/metrics" + "github.com/uber/cherami-server/common/set" storeStream "github.com/uber/cherami-server/stream" "github.com/uber/cherami-thrift/.generated/go/admin" "github.com/uber/cherami-thrift/.generated/go/metadata" @@ -67,6 +68,8 @@ type ( storehostConn map[string]*outConnection storehostConnMutex sync.RWMutex + knownCgExtents set.Set + metadataReconciler MetadataReconciler } ) @@ -127,6 +130,7 @@ func NewReplicator(serviceName string, sVice common.SCommon, metadataClient meta replicatorclientFactory: replicatorClientFactory, remoteReplicatorConn: make(map[string]*outConnection), storehostConn: make(map[string]*outConnection), + knownCgExtents: set.NewConcurrent(0), } r.metaClient = mm.NewMetadataMetricsMgr(metadataClient, r.m3Client, r.logger) @@ -1102,6 +1106,23 @@ func (r *Replicator) SetAckOffset(ctx thrift.Context, request *shared.SetAckOffs }) r.m3Client.IncCounter(metrics.ReplicatorSetAckOffsetScope, metrics.ReplicatorRequests) + if !r.knownCgExtents.Contains(request.GetExtentUUID()) { + // make sure the cg extent is created locally before accepting the SetAckOffset call. + // otherwise SetAckOffset will create the cg extent entry with no store uuid or output host uuid + // and we may not be able to clean up the entry eventually. + _, err := r.metaClient.ReadConsumerGroupExtent(nil, &metadata.ReadConsumerGroupExtentRequest{ + ConsumerGroupUUID: common.StringPtr(request.GetConsumerGroupUUID()), + ExtentUUID: common.StringPtr(request.GetExtentUUID()), + }) + if err != nil { + lcllg.WithField(common.TagErr, err).Error(`SetAckOffset: Failed to read cg extent locally`) + r.m3Client.IncCounter(metrics.ReplicatorSetAckOffsetScope, metrics.ReplicatorFailures) + return err + } + + r.knownCgExtents.Insert(request.GetExtentUUID()) + } + err := r.metaClient.SetAckOffset(nil, request) if err != nil { lcllg.WithField(common.TagErr, err).Error(`Error calling metadata to set ack offset`) diff --git a/services/replicator/replicator_test.go b/services/replicator/replicator_test.go index d8a356ed..3b7afb2a 100644 --- a/services/replicator/replicator_test.go +++ b/services/replicator/replicator_test.go @@ -941,13 +941,25 @@ func (s *ReplicatorSuite) TestCreateRemoteConsumerGroupExtentFailure() { func (s *ReplicatorSuite) TestSetAckOffset() { repliator, _ := NewReplicator("replicator-test", s.mockService, s.mockMeta, s.mockReplicatorClientFactory, s.cfg) + cgUUID := uuid.New() extentUUID := uuid.New() + storeUUID := []string{uuid.New(), uuid.New(), uuid.New()} ackLevel := int64(20) req := &shared.SetAckOffsetRequest{ - ExtentUUID: common.StringPtr(extentUUID), - AckLevelAddress: common.Int64Ptr(ackLevel), + ConsumerGroupUUID: common.StringPtr(cgUUID), + ExtentUUID: common.StringPtr(extentUUID), + AckLevelAddress: common.Int64Ptr(ackLevel), } + s.mockMeta.On("ReadConsumerGroupExtent", mock.Anything, mock.Anything).Return(&metadata.ReadConsumerGroupExtentResult_{ + Extent: &shared.ConsumerGroupExtent{ + StoreUUIDs: storeUUID, + }, + }, nil).Run(func(args mock.Arguments) { + req := args.Get(1).(*metadata.ReadConsumerGroupExtentRequest) + s.Equal(extentUUID, req.GetExtentUUID()) + s.Equal(cgUUID, req.GetConsumerGroupUUID()) + }) s.mockMeta.On("SetAckOffset", mock.Anything, mock.Anything).Return(nil).Run(func(args mock.Arguments) { req := args.Get(1).(*shared.SetAckOffsetRequest) s.Equal(extentUUID, req.GetExtentUUID()) @@ -958,6 +970,27 @@ func (s *ReplicatorSuite) TestSetAckOffset() { s.mockMeta.AssertExpectations(s.T()) } +func (s *ReplicatorSuite) TestSetAckOffsetFailure_ReadExtentFail() { + repliator, _ := NewReplicator("replicator-test", s.mockService, s.mockMeta, s.mockReplicatorClientFactory, s.cfg) + cgUUID := uuid.New() + extentUUID := uuid.New() + ackLevel := int64(20) + req := &shared.SetAckOffsetRequest{ + ConsumerGroupUUID: common.StringPtr(cgUUID), + ExtentUUID: common.StringPtr(extentUUID), + AckLevelAddress: common.Int64Ptr(ackLevel), + } + + s.mockMeta.On("ReadConsumerGroupExtent", mock.Anything, mock.Anything).Return(nil, &shared.InternalServiceError{Message: "test2"}).Run(func(args mock.Arguments) { + req := args.Get(1).(*metadata.ReadConsumerGroupExtentRequest) + s.Equal(extentUUID, req.GetExtentUUID()) + s.Equal(cgUUID, req.GetConsumerGroupUUID()) + }) + err := repliator.SetAckOffset(nil, req) + s.Error(err) + s.mockMeta.AssertExpectations(s.T()) +} + func (s *ReplicatorSuite) TestRemoteSetAckOffsetFailed() { repliator, _ := NewReplicator("replicator-test", s.mockService, s.mockMeta, s.mockReplicatorClientFactory, s.cfg) extentUUID := uuid.New()