From b956c1fc61946e6b587d653176a3bc0ab65ff260 Mon Sep 17 00:00:00 2001 From: Alex Toker Date: Mon, 19 Aug 2024 13:01:27 +0300 Subject: [PATCH 1/2] [NUC-233] Ensure the call to setShardSequenceNumberInPersistency fails for non-existing or non-shard objects. --- pkg/dataplane/streamconsumergroup/streamconsumergroup.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/dataplane/streamconsumergroup/streamconsumergroup.go b/pkg/dataplane/streamconsumergroup/streamconsumergroup.go index 8a21087..288e031 100644 --- a/pkg/dataplane/streamconsumergroup/streamconsumergroup.go +++ b/pkg/dataplane/streamconsumergroup/streamconsumergroup.go @@ -367,6 +367,7 @@ func (scg *streamConsumerGroup) setShardSequenceNumberInPersistency(shardID int, Attributes: map[string]interface{}{ scg.getShardCommittedSequenceNumberAttributeName(): sequenceNumber, }, + Condition: "__obj_type == 3", }) return err } From 9c22073a13f32903d7b1fee0786a0cfccb88d518 Mon Sep 17 00:00:00 2001 From: Alex Toker Date: Mon, 26 Aug 2024 16:14:56 +0300 Subject: [PATCH 2/2] [NUC-233] streamconsumergroup - add retries on network errors --- pkg/common/helper.go | 18 ++++++++++++++++++ pkg/dataplane/streamconsumergroup/claim.go | 4 +++- .../streamconsumergroup/streamconsumergroup.go | 3 +++ 3 files changed, 24 insertions(+), 1 deletion(-) diff --git a/pkg/common/helper.go b/pkg/common/helper.go index 6177456..86bb110 100644 --- a/pkg/common/helper.go +++ b/pkg/common/helper.go @@ -24,6 +24,7 @@ import ( "context" "reflect" "runtime" + "strings" "time" "github.com/nuclio/errors" @@ -170,3 +171,20 @@ func StringSlicesEqual(slice1 []string, slice2 []string) bool { return true } + +func EngineErrorIsNonFatal(err error) bool { + var nonFatalEngineErrorsPartialMatch = []string{ + "dialing to the given TCP address timed out", + "timeout", + "refused", + } + if err != nil && len(err.Error()) > 0 { + for _, nonFatalError := range nonFatalEngineErrorsPartialMatch { + if strings.Contains(err.Error(), nonFatalError) || strings.Contains(errors.Cause(err).Error(), nonFatalError) { + return true + } + } + return false + } + return true +} diff --git a/pkg/dataplane/streamconsumergroup/claim.go b/pkg/dataplane/streamconsumergroup/claim.go index 8e6e45c..6555a15 100644 --- a/pkg/dataplane/streamconsumergroup/claim.go +++ b/pkg/dataplane/streamconsumergroup/claim.go @@ -129,7 +129,9 @@ func (c *claim) fetchRecordBatches(stopChannel chan struct{}, fetchInterval time &c.getShardLocationBackoff, func(attempt int) (bool, error) { c.currentShardLocation, err = c.getCurrentShardLocation(c.shardID) if err != nil { - + if common.EngineErrorIsNonFatal(err) { + return true, errors.Wrap(err, "Failed to get shard location due to a network error") + } // requested for an immediate stop if err == v3ioerrors.ErrStopped { return false, nil diff --git a/pkg/dataplane/streamconsumergroup/streamconsumergroup.go b/pkg/dataplane/streamconsumergroup/streamconsumergroup.go index 288e031..fd5b5a6 100644 --- a/pkg/dataplane/streamconsumergroup/streamconsumergroup.go +++ b/pkg/dataplane/streamconsumergroup/streamconsumergroup.go @@ -118,6 +118,9 @@ func (scg *streamConsumerGroup) setState(modifier stateModifier, if err != nil && !errors.Is(err, v3ioerrors.ErrNotFound) { return true, errors.Wrap(err, "Failed getting current state from persistency") } + if common.EngineErrorIsNonFatal(err) { + return true, errors.Wrap(err, "Failed getting current state from persistency due to a network error") + } if state == nil { state, err = newState()