Skip to content

Commit

Permalink
fix: rollback catchup flow blocking snapshot events
Browse files Browse the repository at this point in the history
  • Loading branch information
erayarslan committed Jan 24, 2025
1 parent 36ee24f commit 3bbd4e7
Showing 1 changed file with 13 additions and 13 deletions.
26 changes: 13 additions & 13 deletions couchbase/observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,12 @@ func (so *observer) waitRollbackMitigation(seqNo uint64) {
}
}

func (so *observer) canForward(seqNo uint64) bool {
func (so *observer) canForward(seqNo uint64, isControl bool) bool {
if !so.config.RollbackMitigation.Disabled {
so.waitRollbackMitigation(seqNo)
}

return !so.needCatchup(seqNo)
return isControl || !so.needCatchup(seqNo)
}

func (so *observer) isBeforeSkipWindow(eventTime time.Time) bool {
Expand Down Expand Up @@ -166,7 +166,7 @@ func (so *observer) sendOrSkip(args models.ListenerArgs) {
}

func (so *observer) SnapshotMarker(event models.DcpSnapshotMarker) {
if !so.canForward(event.StartSeqNo) {
if !so.canForward(event.StartSeqNo, true) {
return
}

Expand All @@ -192,7 +192,7 @@ func (so *observer) IsInSnapshotMarker(seqNo uint64) bool {
}

func (so *observer) Mutation(event gocbcore.DcpMutation) { //nolint:dupl
if !so.canForward(event.SeqNo) {
if !so.canForward(event.SeqNo, false) {
return
}

Expand Down Expand Up @@ -220,7 +220,7 @@ func (so *observer) Mutation(event gocbcore.DcpMutation) { //nolint:dupl
}

func (so *observer) Deletion(event gocbcore.DcpDeletion) { //nolint:dupl
if !so.canForward(event.SeqNo) {
if !so.canForward(event.SeqNo, false) {
return
}

Expand Down Expand Up @@ -248,7 +248,7 @@ func (so *observer) Deletion(event gocbcore.DcpDeletion) { //nolint:dupl
}

func (so *observer) Expiration(event gocbcore.DcpExpiration) { //nolint:dupl
if !so.canForward(event.SeqNo) {
if !so.canForward(event.SeqNo, false) {
return
}

Expand Down Expand Up @@ -288,7 +288,7 @@ func (so *observer) End(event models.DcpStreamEnd, err error) {
}

func (so *observer) CreateCollection(event gocbcore.DcpCollectionCreation) {
if !so.canForward(event.SeqNo) {
if !so.canForward(event.SeqNo, false) {
return
}

Expand All @@ -308,7 +308,7 @@ func (so *observer) CreateCollection(event gocbcore.DcpCollectionCreation) {
}

func (so *observer) DeleteCollection(event gocbcore.DcpCollectionDeletion) {
if !so.canForward(event.SeqNo) {
if !so.canForward(event.SeqNo, false) {
return
}

Expand All @@ -328,7 +328,7 @@ func (so *observer) DeleteCollection(event gocbcore.DcpCollectionDeletion) {
}

func (so *observer) FlushCollection(event gocbcore.DcpCollectionFlush) {
if !so.canForward(event.SeqNo) {
if !so.canForward(event.SeqNo, false) {
return
}

Expand All @@ -348,7 +348,7 @@ func (so *observer) FlushCollection(event gocbcore.DcpCollectionFlush) {
}

func (so *observer) CreateScope(event gocbcore.DcpScopeCreation) {
if !so.canForward(event.SeqNo) {
if !so.canForward(event.SeqNo, false) {
return
}

Expand All @@ -367,7 +367,7 @@ func (so *observer) CreateScope(event gocbcore.DcpScopeCreation) {
}

func (so *observer) DeleteScope(event gocbcore.DcpScopeDeletion) {
if !so.canForward(event.SeqNo) {
if !so.canForward(event.SeqNo, false) {
return
}

Expand All @@ -386,7 +386,7 @@ func (so *observer) DeleteScope(event gocbcore.DcpScopeDeletion) {
}

func (so *observer) ModifyCollection(event gocbcore.DcpCollectionModification) {
if !so.canForward(event.SeqNo) {
if !so.canForward(event.SeqNo, false) {
return
}

Expand All @@ -412,7 +412,7 @@ func (so *observer) OSOSnapshot(event gocbcore.DcpOSOSnapshot) {
}

func (so *observer) SeqNoAdvanced(advanced gocbcore.DcpSeqNoAdvanced) {
if !so.canForward(advanced.SeqNo) {
if !so.canForward(advanced.SeqNo, true) {
return
}

Expand Down

0 comments on commit 3bbd4e7

Please sign in to comment.