Skip to content
This repository has been archived by the owner on Sep 2, 2024. It is now read-only.

Commit

Permalink
[release-1.0] Consumer manager use factory merged errors (#1044) (#1048)
Browse files Browse the repository at this point in the history
* Consumer manager use factory merged errors (#1044)

* ConsumerGroup Manager-use factory merged errors channel (fix for #1043)

* Attempting to un-flaky the tests that use the zap test logger

* Updated knative.dev/hack@release-1.0
  • Loading branch information
eric-sap authored Jan 10, 2022
1 parent 6d52f06 commit 8d69c29
Show file tree
Hide file tree
Showing 10 changed files with 169 additions and 93 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ require (
k8s.io/utils v0.0.0-20201110183641-67b214c5f920
knative.dev/control-protocol v0.0.0-20211101215039-a32569595694
knative.dev/eventing v0.27.0
knative.dev/hack v0.0.0-20211101195839-11d193bf617b
knative.dev/hack v0.0.0-20211122163517-fe1340f21191
knative.dev/pkg v0.0.0-20211101212339-96c0204a70dc
knative.dev/reconciler-test v0.0.0-20211101214439-9839937c9b13
)
3 changes: 2 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1296,8 +1296,9 @@ knative.dev/control-protocol v0.0.0-20211101215039-a32569595694 h1:dcN5l0upFtQLq
knative.dev/control-protocol v0.0.0-20211101215039-a32569595694/go.mod h1:HlIoGfErGnSVKN6NI3iG5gXxCo9EcdV8sCDGW2IObJo=
knative.dev/eventing v0.27.0 h1:P9rqKvsCeFb8sEhfK32WFTG7awdOAa7XO6p3umSq/wU=
knative.dev/eventing v0.27.0/go.mod h1:4ppWQEQ/2B66/YFENDmV1Gjxs4meLpz6UTUgLkkINt4=
knative.dev/hack v0.0.0-20211101195839-11d193bf617b h1:DaW1iliZlBAwq/I8gTqVu8UnfGxyb5yR7CDsJi5jyWk=
knative.dev/hack v0.0.0-20211101195839-11d193bf617b/go.mod h1:PHt8x8yX5Z9pPquBEfIj0X66f8iWkWfR0S/sarACJrI=
knative.dev/hack v0.0.0-20211122163517-fe1340f21191 h1:yRUruaFxjk6tkubtg44Nya0phUm1idKZr5bFO30AkmE=
knative.dev/hack v0.0.0-20211122163517-fe1340f21191/go.mod h1:PHt8x8yX5Z9pPquBEfIj0X66f8iWkWfR0S/sarACJrI=
knative.dev/hack/schema v0.0.0-20211101195839-11d193bf617b/go.mod h1:ffjwmdcrH5vN3mPhO8RrF2KfNnbHeCE2C60A+2cv3U0=
knative.dev/pkg v0.0.0-20211101212339-96c0204a70dc h1:ldjbTpoMXUTgzw0IJsAFLdyA6/6QYRvz8IGPZOknEDg=
knative.dev/pkg v0.0.0-20211101212339-96c0204a70dc/go.mod h1:SkfDk9bWIiNZD7XtILGkG7AKVyF/M6M0bGxLgl0SYL8=
Expand Down
9 changes: 6 additions & 3 deletions pkg/common/consumer/consumer_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,19 +102,20 @@ func (c kafkaConsumerGroupFactoryImpl) startExistingConsumerGroup(
errorCh := make(chan error, 10)
releasedCh := make(chan bool)
ctx, cancel := context.WithCancel(context.Background())
groupLogger := logger.With(zap.Any("topics", topics), zap.String("groupId", groupID), zap.String("channel", channelRef.String()))

go func() {
// this is a blocking func
// do not proceed until the check is done
err := c.offsetsChecker.WaitForOffsetsInitialization(ctx, groupID, topics, logger, c.addrs, c.config)
if err != nil {
logger.Errorw("error while checking if offsets are initialized", zap.Any("topics", topics), zap.String("groupId", groupID), zap.String("channel", channelRef.String()), zap.Error(err))
groupLogger.Errorw("error while checking if offsets are initialized", zap.Error(err))
errorCh <- err
c.enqueue(channelRef)
return
}

logger.Debugw("all offsets are initialized", zap.Any("topics", topics), zap.Any("groupID", groupID))
groupLogger.Debugw("all offsets are initialized")

defer func() {
close(errorCh)
Expand All @@ -123,8 +124,9 @@ func (c kafkaConsumerGroupFactoryImpl) startExistingConsumerGroup(
for {
consumerHandler := NewConsumerHandler(logger, handler, errorCh, options...)

err := consume(ctx, topics, &consumerHandler)
err = consume(ctx, topics, &consumerHandler)
if err == sarama.ErrClosedConsumerGroup {
groupLogger.Infow("consumer group was closed", zap.Error(err))
return
}
if err != nil {
Expand All @@ -133,6 +135,7 @@ func (c kafkaConsumerGroupFactoryImpl) startExistingConsumerGroup(

select {
case <-ctx.Done():
groupLogger.Info("consumer group terminated gracefully")
return
default:
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/common/consumer/consumer_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,9 +285,10 @@ func (m *kafkaConsumerGroupManagerImpl) StartConsumerGroup(ctx context.Context,
return m.consume(ctx, groupId, topics, handler)
}

// The only thing we really want from the factory is the cancel function for the customConsumerGroup
// The only things we really want from the factory are the cancel function for the customConsumerGroup
// and the error channel (if the error channel isn't read it will fill up and block between consume calls)
customGroup := m.factory.startExistingConsumerGroup(groupId, group, consume, topics, logger, handler, ref, options...)
managedGrp := createManagedGroup(ctx, m.logger, group, cancel, customGroup.cancel)
managedGrp := createManagedGroup(ctx, m.logger, group, customGroup.Errors(), cancel, customGroup.cancel)

// Add the Sarama ConsumerGroup we obtained from the factory to the managed group map,
// so that it can be stopped and started via control-protocol messages.
Expand Down
33 changes: 25 additions & 8 deletions pkg/common/consumer/consumer_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func TestReconfigure(t *testing.T) {
},
} {
t.Run(testCase.name, func(t *testing.T) {
manager, group, _, server := getManagerWithMockGroup(t, testCase.groupId, testCase.factoryErr)
manager, group, mgdGroup, server := getManagerWithMockGroup(t, testCase.groupId, testCase.factoryErr)
if group != nil {
group.On("Close").Return(testCase.closeErr)
}
Expand All @@ -91,6 +91,10 @@ func TestReconfigure(t *testing.T) {
assert.Equal(t, testCase.groupId, err.GroupIds[0])
}
server.AssertExpectations(t)
if mgdGroup != nil {
close(mgdGroup.errors())
time.Sleep(shortTimeout) // Allow transferErrors routine to exit
}
})
}
}
Expand Down Expand Up @@ -199,14 +203,18 @@ func TestCloseConsumerGroup(t *testing.T) {
},
} {
t.Run(testCase.name, func(t *testing.T) {
manager, group, _, server := getManagerWithMockGroup(t, testCase.groupId, false)
manager, group, mgdGroup, server := getManagerWithMockGroup(t, testCase.groupId, false)
if group != nil {
group.On("Close").Return(testCase.closeErr)
group.On("Errors").Return(make(chan error))
}
err := manager.CloseConsumerGroup(testCase.groupId)
assert.Equal(t, testCase.expectErr, err != nil)
server.AssertExpectations(t)
if mgdGroup != nil {
close(mgdGroup.errors())
time.Sleep(shortTimeout) // Allow transferErrors routine to exit
}
})
}
}
Expand Down Expand Up @@ -293,14 +301,18 @@ func TestErrors(t *testing.T) {
},
} {
t.Run(testCase.name, func(t *testing.T) {
manager, _, _, server := getManagerWithMockGroup(t, testCase.groupId, false)
manager, _, mgdGroup, server := getManagerWithMockGroup(t, testCase.groupId, false)
valid := manager.IsManaged(testCase.groupId)
stopped := manager.IsStopped(testCase.groupId)
mgrErrors := manager.Errors(testCase.groupId)
assert.Equal(t, testCase.expectErr, mgrErrors == nil)
assert.Equal(t, !testCase.expectErr, valid)
assert.False(t, stopped) // Not actually using a stopped group in this test
server.AssertExpectations(t)
if mgdGroup != nil {
close(mgdGroup.errors())
time.Sleep(shortTimeout) // Allow transferErrors routine to exit
}
})
}
}
Expand Down Expand Up @@ -501,6 +513,10 @@ func TestNotifications(t *testing.T) {
group.AssertExpectations(t)
}
serverHandler.AssertExpectations(t)
if managedGrp != nil {
close(managedGrp.errors())
time.Sleep(shortTimeout) // Allow transferErrors routine to exit
}
})
}
}
Expand Down Expand Up @@ -550,7 +566,7 @@ func TestManagerEvents(t *testing.T) {
},
} {
t.Run(testCase.name, func(t *testing.T) {
manager, _, _, _ := getManagerWithMockGroup(t, "", false)
manager, _, mgdGroup, _ := getManagerWithMockGroup(t, "", false)
impl := manager.(*kafkaConsumerGroupManagerImpl)

var notifyChannels []<-chan ManagerEvent
Expand Down Expand Up @@ -592,6 +608,10 @@ func TestManagerEvents(t *testing.T) {
manager.ClearNotifications()
waitGroup.Wait()
assert.Equal(t, testCase.expectClose, count)
if mgdGroup != nil {
close(mgdGroup.errors())
time.Sleep(shortTimeout) // Allow transferErrors routine to exit
}
})
}
}
Expand Down Expand Up @@ -633,10 +653,7 @@ func getManagerWithMockGroup(t *testing.T, groupId string, factoryErr bool) (Kaf

func createMockAndManagedGroups(t *testing.T) (*kafkatesting.MockConsumerGroup, *managedGroupImpl) {
mockGroup := kafkatesting.NewMockConsumerGroup()
mockGroup.On("Errors").Return(make(chan error))
managedGrp := createManagedGroup(context.Background(), logtesting.TestLogger(t).Desugar(), mockGroup, func() {}, func() {})
// let the transferErrors function start (otherwise AssertExpectations will randomly fail because Errors() isn't called)
time.Sleep(shortTimeout)
managedGrp := createManagedGroup(context.Background(), logtesting.TestLogger(t).Desugar(), mockGroup, make(chan error), func() {}, func() {})
return mockGroup, managedGrp.(*managedGroupImpl)
}

Expand Down
28 changes: 15 additions & 13 deletions pkg/common/consumer/managed_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ type managedGroupImpl struct {
// createManagedGroup associates a Sarama ConsumerGroup and cancel function (usually from the factory)
// inside a new managedGroup struct. If a timeout is given (nonzero), the lockId will be reset to an
// empty string (i.e. "unlocked") after that time has passed.
func createManagedGroup(ctx context.Context, logger *zap.Logger, group sarama.ConsumerGroup, cancelErrors func(), cancelConsume func()) managedGroup {
func createManagedGroup(ctx context.Context, logger *zap.Logger, group sarama.ConsumerGroup, errors <-chan error, cancelErrors func(), cancelConsume func()) managedGroup {

managedGrp := &managedGroupImpl{
logger: logger,
Expand All @@ -79,8 +79,8 @@ func createManagedGroup(ctx context.Context, logger *zap.Logger, group sarama.Co
managedGrp.lockedBy.Store("") // Empty token string indicates "unlocked"
managedGrp.stopped.Store(false) // A managed group defaults to "started" when created

// Begin listening on the group's Errors() channel and write them to the managedGroup's errors channel
managedGrp.transferErrors(ctx)
// Begin listening on the provided errors channel and write them to the managedGroup's errors channel
managedGrp.transferErrors(ctx, errors)

return managedGrp
}
Expand Down Expand Up @@ -142,7 +142,7 @@ func (m *managedGroupImpl) consume(ctx context.Context, topics []string, handler
// Call the internal sarama ConsumerGroup's Consume function directly
err := m.getSaramaGroup().Consume(ctx, topics, handler)
if !m.isStopped() {
m.logger.Debug("Managed Consume Finished Without Stop", zap.Error(err))
m.logger.Warn("Managed Consume Finished Without Stop", zap.Error(err))
// This ConsumerGroup wasn't stopped by the manager, so pass the error along to the caller
return err
}
Expand Down Expand Up @@ -313,28 +313,30 @@ func (m *managedGroupImpl) waitForStart(ctx context.Context) bool {
}
}

// transferErrors starts a goroutine that reads errors from the managedGroup's internal group.Errors() channel
// and sends them to the m.errors channel. This is done so that when the group.Errors() channel is closed during
// a stop ("pause") of the group, the m.errors channel can remain open (so that users of the manager do not
// receive a closed error channel during stop/start events).
func (m *managedGroupImpl) transferErrors(ctx context.Context) {
// transferErrors starts a goroutine that reads errors from the provided errors channel and sends them to the m.errors
// channel. Typically, this provided channel comes from the merged errors channel created in the KafkaConsumerGroupFactory
// This is done so that when the group.Errors() channel is closed during a stop ("pause") of the group, the m.errors
// channel can remain open (so that users of the manager do not receive a closed error channel during stop/start events).
func (m *managedGroupImpl) transferErrors(ctx context.Context, errors <-chan error) {
go func() {
for {
m.logger.Debug("Starting managed group error transfer")
for groupErr := range m.getSaramaGroup().Errors() {
m.logger.Info("Starting managed group error transfer")
for groupErr := range errors {
m.transferredErrors <- groupErr
}
if !m.isStopped() {
// If the error channel was closed without the consumergroup being marked as stopped,
// or if we were unable to wait for the group to be restarted, that is outside
// of the manager's responsibility, so we are finished transferring errors.
// the manager's responsibility, so we are finished transferring errors.
m.logger.Warn("Consumer group's error channel was closed unexpectedly")
close(m.transferredErrors)
return
}
// Wait for the manager to restart the Consumer Group before calling m.group.Errors() again
m.logger.Debug("Error transfer is waiting for managed group restart")
m.logger.Info("Error transfer is waiting for managed group restart")
if !m.waitForStart(ctx) {
// Abort if the context was canceled
m.logger.Info("Wait for managed group restart was canceled")
return
}
}
Expand Down
61 changes: 38 additions & 23 deletions pkg/common/consumer/managed_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,9 @@ func TestManagedGroup(t *testing.T) {
defer cancel()

mockGroup := kafkatesting.NewMockConsumerGroup()
mockGroup.On("Errors").Return(make(chan error))
group := createManagedGroup(ctx, logtesting.TestLogger(t).Desugar(), mockGroup, cancel, func() {}).(*managedGroupImpl)
errorChannel := make(chan error)
mockGroup.On("Errors").Return(errorChannel)
group := createManagedGroup(ctx, logtesting.TestLogger(t).Desugar(), mockGroup, errorChannel, cancel, func() {}).(*managedGroupImpl)
waitGroup := sync.WaitGroup{}
assert.False(t, group.isStopped())

Expand All @@ -85,6 +86,8 @@ func TestManagedGroup(t *testing.T) {
cancel()
}
waitGroup.Wait() // Let the waitForStart function finish
close(errorChannel)
time.Sleep(shortTimeout) // Let the transferErrors goroutine finish
})
}

Expand Down Expand Up @@ -189,6 +192,9 @@ func TestProcessLock(t *testing.T) {
assert.Equal(t, testCase.expectUnlock, managedGrp.lockedBy.Load())
assert.Equal(t, testCase.expectErrBefore, errBefore)
assert.Equal(t, testCase.expectErrAfter, errAfter)

close(managedGrp.errors())
time.Sleep(shortTimeout) // Let the transferErrors goroutine finish
})
}
}
Expand Down Expand Up @@ -280,6 +286,8 @@ func TestResetLockTimer(t *testing.T) {
time.Sleep(2 * testCase.timeout)
assert.Equal(t, "", managedGrp.lockedBy.Load())
}
close(managedGrp.errors())
time.Sleep(shortTimeout) // Let the transferErrors goroutine finish
})
}
}
Expand Down Expand Up @@ -340,6 +348,8 @@ func TestManagedGroupConsume(t *testing.T) {
assert.Nil(t, mgdGroup.saramaGroup.Close()) // Stops the MockConsumerGroup's Consume() call
}
waitGroup.Wait() // Allows the goroutine with the consume call to finish
close(mgdGroup.errors())
time.Sleep(shortTimeout) // Let the transferErrors goroutine finish
})
}
}
Expand Down Expand Up @@ -402,7 +412,8 @@ func TestStopStart(t *testing.T) {
assert.Equal(t, testCase.errStopping, err != nil)

mockGroup.AssertExpectations(t)

close(mgdGroup.errors())
time.Sleep(shortTimeout) // Let the transferErrors goroutine finish
})
}
}
Expand Down Expand Up @@ -439,6 +450,8 @@ func TestClose(t *testing.T) {
assert.Nil(t, err)
assert.Equal(t, testCase.cancel, cancelConsumeCalled)
assert.Equal(t, testCase.cancel, cancelErrorsCalled)
close(mgdGroup.errors())
time.Sleep(shortTimeout) // Let the transferErrors goroutine finish
})
}
}
Expand Down Expand Up @@ -471,51 +484,53 @@ func TestTransferErrors(t *testing.T) {
t.Run(testCase.name, func(t *testing.T) {
time.Sleep(time.Millisecond)

// errorChan simulates the merged error channel usually provided via customConsumerGroup.handlerErrorChannel
errorChan := make(chan error)

// mockGrp represents the internal Sarama ConsumerGroup
mockGrp := kafkatesting.NewMockConsumerGroup()
managedGrp := managedGroupImpl{
logger: logtesting.TestLogger(t).Desugar(),
saramaGroup: mockGrp,
transferredErrors: make(chan error),
groupMutex: sync.RWMutex{},
}
managedGrp.lockedBy.Store("")
managedGrp.stopped.Store(false)

mockGrp.On("Errors").Return(mockGrp.ErrorChan)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
managedGrp.transferErrors(ctx)

mockGrp.ErrorChan <- fmt.Errorf("test-error")
// Call the transferErrors function (via createManagedGroup) with the simulated customConsumerGroup error channel
managedGrp := createManagedGroup(ctx, logtesting.TestLogger(t).Desugar(), mockGrp, errorChan, func() {}, func() {})
managedGrpImpl := managedGrp.(*managedGroupImpl)

// Send an error to the simulated customConsumerGroup error channel
errorChan <- fmt.Errorf("test-error")

// Verify that the error appears in the managed group's error channel
err := <-managedGrp.errors()
assert.NotNil(t, err)
assert.Equal(t, "test-error", err.Error())

if testCase.stopGroup {
managedGrp.createRestartChannel()
managedGrpImpl.createRestartChannel()
}
if testCase.cancel {
cancel()
}
close(mockGrp.ErrorChan)
mockGrp.AssertExpectations(t)

time.Sleep(shortTimeout) // Let the error handling loop move forward
if testCase.startGroup {
time.Sleep(shortTimeout) // Let the error handling loop move forward
// Simulate the effects of startConsumerGroup (new ConsumerGroup, same managedConsumerGroup)
mockGrp = kafkatesting.NewMockConsumerGroup()
mockGrp.On("Errors").Return(mockGrp.ErrorChan)
managedGrp.saramaGroup = mockGrp
managedGrp.closeRestartChannel()
managedGrpImpl.saramaGroup = mockGrp
managedGrpImpl.closeRestartChannel()

time.Sleep(shortTimeout) // Let the waitForStart function finish
// Verify that errors work again after restart
mockGrp.ErrorChan <- fmt.Errorf("test-error-2")

// Verify that error transfer continues to work after the restart
errorChan <- fmt.Errorf("test-error-2")
err = <-managedGrp.errors()
assert.NotNil(t, err)
assert.Equal(t, "test-error-2", err.Error())
close(mockGrp.ErrorChan)
mockGrp.AssertExpectations(t)
}
close(managedGrp.errors())
time.Sleep(shortTimeout) // Let the transferErrors goroutine finish
})
}
}
Expand Down
Loading

0 comments on commit 8d69c29

Please sign in to comment.