Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[NUC-233] Ensure the call to setShardSequenceNumberInPersistency fails for non-existing or non-shard objects. #151

Merged
merged 2 commits into from
Aug 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions pkg/common/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"context"
"reflect"
"runtime"
"strings"
"time"

"github.com/nuclio/errors"
Expand Down Expand Up @@ -170,3 +171,20 @@ func StringSlicesEqual(slice1 []string, slice2 []string) bool {

return true
}

func EngineErrorIsNonFatal(err error) bool {
var nonFatalEngineErrorsPartialMatch = []string{
"dialing to the given TCP address timed out",
"timeout",
"refused",
}
if err != nil && len(err.Error()) > 0 {
for _, nonFatalError := range nonFatalEngineErrorsPartialMatch {
if strings.Contains(err.Error(), nonFatalError) || strings.Contains(errors.Cause(err).Error(), nonFatalError) {
return true
}
}
return false
}
return true
}
4 changes: 3 additions & 1 deletion pkg/dataplane/streamconsumergroup/claim.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,9 @@ func (c *claim) fetchRecordBatches(stopChannel chan struct{}, fetchInterval time
&c.getShardLocationBackoff, func(attempt int) (bool, error) {
c.currentShardLocation, err = c.getCurrentShardLocation(c.shardID)
if err != nil {

if common.EngineErrorIsNonFatal(err) {
return true, errors.Wrap(err, "Failed to get shard location due to a network error")
}
// requested for an immediate stop
if err == v3ioerrors.ErrStopped {
return false, nil
Expand Down
4 changes: 4 additions & 0 deletions pkg/dataplane/streamconsumergroup/streamconsumergroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,9 @@ func (scg *streamConsumerGroup) setState(modifier stateModifier,
if err != nil && !errors.Is(err, v3ioerrors.ErrNotFound) {
return true, errors.Wrap(err, "Failed getting current state from persistency")
}
if common.EngineErrorIsNonFatal(err) {
return true, errors.Wrap(err, "Failed getting current state from persistency due to a network error")
}

if state == nil {
state, err = newState()
Expand Down Expand Up @@ -367,6 +370,7 @@ func (scg *streamConsumerGroup) setShardSequenceNumberInPersistency(shardID int,
Attributes: map[string]interface{}{
scg.getShardCommittedSequenceNumberAttributeName(): sequenceNumber,
},
Condition: "__obj_type == 3",
})
return err
}
Expand Down
Loading