Skip to content

Commit

Permalink
[NUC-233] Ensure the call to setShardSequenceNumberInPersistency fail…
Browse files Browse the repository at this point in the history
…s for non-existing or non-shard objects. (#151)
  • Loading branch information
alxtkr77 authored Aug 28, 2024
1 parent f723111 commit da2a3f6
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 1 deletion.
18 changes: 18 additions & 0 deletions pkg/common/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"context"
"reflect"
"runtime"
"strings"
"time"

"github.com/nuclio/errors"
Expand Down Expand Up @@ -170,3 +171,20 @@ func StringSlicesEqual(slice1 []string, slice2 []string) bool {

return true
}

func EngineErrorIsNonFatal(err error) bool {
var nonFatalEngineErrorsPartialMatch = []string{
"dialing to the given TCP address timed out",
"timeout",
"refused",
}
if err != nil && len(err.Error()) > 0 {
for _, nonFatalError := range nonFatalEngineErrorsPartialMatch {
if strings.Contains(err.Error(), nonFatalError) || strings.Contains(errors.Cause(err).Error(), nonFatalError) {
return true
}
}
return false
}
return true
}
4 changes: 3 additions & 1 deletion pkg/dataplane/streamconsumergroup/claim.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,9 @@ func (c *claim) fetchRecordBatches(stopChannel chan struct{}, fetchInterval time
&c.getShardLocationBackoff, func(attempt int) (bool, error) {
c.currentShardLocation, err = c.getCurrentShardLocation(c.shardID)
if err != nil {

if common.EngineErrorIsNonFatal(err) {
return true, errors.Wrap(err, "Failed to get shard location due to a network error")
}
// requested for an immediate stop
if err == v3ioerrors.ErrStopped {
return false, nil
Expand Down
4 changes: 4 additions & 0 deletions pkg/dataplane/streamconsumergroup/streamconsumergroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,9 @@ func (scg *streamConsumerGroup) setState(modifier stateModifier,
if err != nil && !errors.Is(err, v3ioerrors.ErrNotFound) {
return true, errors.Wrap(err, "Failed getting current state from persistency")
}
if common.EngineErrorIsNonFatal(err) {
return true, errors.Wrap(err, "Failed getting current state from persistency due to a network error")
}

if state == nil {
state, err = newState()
Expand Down Expand Up @@ -367,6 +370,7 @@ func (scg *streamConsumerGroup) setShardSequenceNumberInPersistency(shardID int,
Attributes: map[string]interface{}{
scg.getShardCommittedSequenceNumberAttributeName(): sequenceNumber,
},
Condition: "__obj_type == 3",
})
return err
}
Expand Down

0 comments on commit da2a3f6

Please sign in to comment.