Skip to content

Commit

Permalink
sorter/leveldb(ticdc): fix input regression panic (#3986)
Browse files Browse the repository at this point in the history
  • Loading branch information
overvenus authored Dec 22, 2021
1 parent d84f15b commit bd9494d
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 18 deletions.
3 changes: 2 additions & 1 deletion cdc/processor/pipeline/sorter.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ func newSorterNode(
flowController: flowController,
mounter: mounter,
resolvedTs: startTs,
barrierTs: startTs,
replConfig: replConfig,
}
}
Expand All @@ -101,7 +102,7 @@ func (n *sorterNode) Init(ctx pipeline.NodeContext) error {
startTs := ctx.ChangefeedVars().Info.StartTs
actorID := ctx.GlobalVars().SorterSystem.ActorID(uint64(n.tableID))
router := ctx.GlobalVars().SorterSystem.Router()
levelSorter := leveldb.NewLevelDBSorter(ctx, n.tableID, startTs, router, actorID)
levelSorter := leveldb.NewSorter(ctx, n.tableID, startTs, router, actorID)
n.cleanID = actorID
n.cleanTask = levelSorter.CleanupTask()
n.cleanRouter = ctx.GlobalVars().SorterSystem.CleanerRouter()
Expand Down
15 changes: 13 additions & 2 deletions cdc/processor/pipeline/sorter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,20 +104,31 @@ func (c *checkSorter) Output() <-chan *model.PolymorphicEvent {

func TestSorterResolvedTsLessEqualBarrierTs(t *testing.T) {
t.Parallel()
s := &checkSorter{ch: make(chan *model.PolymorphicEvent, 1)}
sch := make(chan *model.PolymorphicEvent, 1)
s := &checkSorter{ch: sch}
sn := newSorterNode("tableName", 1, 1, nil, nil, &config.ReplicaConfig{
Consistent: &config.ConsistentConfig{},
})
sn.sorter = s

ch := make(chan pipeline.Message, 1)
require.EqualValues(t, 1, sn.ResolvedTs())

// Resolved ts must not regress even if there is no barrier ts message.
resolvedTs1 := pipeline.PolymorphicEventMessage(model.NewResolvedPolymorphicEvent(0, 1))
nctx := pipeline.NewNodeContext(
cdcContext.NewContext(context.Background(), nil), resolvedTs1, ch)
err := sn.Receive(nctx)
require.Nil(t, err)
require.EqualValues(t, model.NewResolvedPolymorphicEvent(0, 1), <-sch)

// Advance barrier ts.
nctx = pipeline.NewNodeContext(
cdcContext.NewContext(context.Background(), nil),
pipeline.BarrierMessage(2),
ch,
)
err := sn.Receive(nctx)
err = sn.Receive(nctx)
require.Nil(t, err)
require.EqualValues(t, 2, sn.barrierTs)
// Barrier message must be passed to the next node.
Expand Down
15 changes: 10 additions & 5 deletions cdc/sorter/leveldb/table_sorter.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (
)

const (
// Capacity of leveldb sorter input and output channels.
// Capacity of db sorter input and output channels.
sorterInputCap, sorterOutputCap = 64, 64
// Max size of received event batch.
batchReceiveEventSize = 32
Expand Down Expand Up @@ -65,8 +65,8 @@ type Sorter struct {
metricTotalEventsResolvedTs prometheus.Counter
}

// NewLevelDBSorter creates a new LevelDBSorter
func NewLevelDBSorter(
// NewSorter creates a new Sorter
func NewSorter(
ctx context.Context, tableID int64, startTs uint64,
router *actor.Router, actorID actor.ID,
) *Sorter {
Expand Down Expand Up @@ -138,9 +138,14 @@ func (ls *Sorter) wait(
inputCount, kvEventCount, resolvedEventCount := 0, 0, 0
appendInputEvent := func(ev *model.PolymorphicEvent) {
if ls.lastSentResolvedTs != 0 && ev.CRTs < ls.lastSentResolvedTs {
log.Panic("commit ts < resolved ts",
// Since TiKV/Puller may send out of order or duplicated events,
// we should not panic here.
// Regression is not a common case, use warn level to rise our
// attention.
log.Warn("commit ts < resolved ts",
zap.Uint64("lastSentResolvedTs", ls.lastSentResolvedTs),
zap.Any("event", ev), zap.Uint64("regionID", ev.RegionID()))
return
}
if ev.RawKV.OpType == model.OpTypeResolved {
if maxResolvedTs < ev.CRTs {
Expand Down Expand Up @@ -527,7 +532,7 @@ func (ls *Sorter) poll(ctx context.Context, state *pollState) error {
return nil
}

// Run runs LevelDBSorter
// Run runs Sorter
func (ls *Sorter) Run(ctx context.Context) error {
state := &pollState{
eventsBuf: make([]*model.PolymorphicEvent, batchReceiveEventSize),
Expand Down
46 changes: 36 additions & 10 deletions cdc/sorter/leveldb/table_sorter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,43 @@ import (
"golang.org/x/sync/semaphore"
)

func newTestLeveldbSorter(
func newTestSorter(
ctx context.Context, capacity int,
) (*Sorter, actor.Mailbox) {
id := actor.ID(1)
router := actor.NewRouter("teet")
mb := actor.NewMailbox(1, capacity)
router.InsertMailbox4Test(id, mb)
ls := NewLevelDBSorter(ctx, 1, 1, router, id)
ls := NewSorter(ctx, 1, 1, router, id)
return ls, mb
}

func TestInputOutOfOrder(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Poll twice.
capacity := 2
require.Greater(t, batchReceiveEventSize, capacity)
ls, _ := newTestSorter(ctx, capacity)

ls.AddEntry(ctx, model.NewResolvedPolymorphicEvent(0, 2))
ls.AddEntry(ctx, model.NewResolvedPolymorphicEvent(0, 3))
require.Nil(t, ls.poll(ctx, &pollState{
eventsBuf: make([]*model.PolymorphicEvent, 1),
outputBuf: newOutputBuffer(1),
}))
require.EqualValues(t, model.NewResolvedPolymorphicEvent(0, 3), <-ls.Output())

ls.AddEntry(ctx, model.NewResolvedPolymorphicEvent(0, 2))
require.Nil(t, ls.poll(ctx, &pollState{
eventsBuf: make([]*model.PolymorphicEvent, 1),
outputBuf: newOutputBuffer(1),
}))
}

func TestWaitInput(t *testing.T) {
t.Parallel()
// Make sure input capacity is larger than batch size in order to test
Expand All @@ -53,7 +79,7 @@ func TestWaitInput(t *testing.T) {

capacity := 8
require.Greater(t, batchReceiveEventSize, capacity)
ls, _ := newTestLeveldbSorter(ctx, capacity)
ls, _ := newTestSorter(ctx, capacity)
// Nonbuffered channel is unavailable during the test.
ls.outputCh = make(chan *model.PolymorphicEvent)

Expand Down Expand Up @@ -158,7 +184,7 @@ func TestWaitOutput(t *testing.T) {

capacity := 4
require.Greater(t, batchReceiveEventSize, capacity)
ls, _ := newTestLeveldbSorter(ctx, capacity)
ls, _ := newTestSorter(ctx, capacity)

eventsBuf := make([]*model.PolymorphicEvent, batchReceiveEventSize)

Expand Down Expand Up @@ -191,7 +217,7 @@ func TestAsyncWrite(t *testing.T) {

capacity := 4
require.Greater(t, batchReceiveEventSize, capacity)
ls, mb := newTestLeveldbSorter(ctx, capacity)
ls, mb := newTestSorter(ctx, capacity)

cases := []struct {
events []*model.PolymorphicEvent
Expand Down Expand Up @@ -299,7 +325,7 @@ func TestOutput(t *testing.T) {
defer cancel()

capacity := 4
ls, _ := newTestLeveldbSorter(ctx, capacity)
ls, _ := newTestSorter(ctx, capacity)

ls.outputCh = make(chan *model.PolymorphicEvent, 1)
ok := ls.output(&model.PolymorphicEvent{CRTs: 1})
Expand All @@ -326,7 +352,7 @@ func TestOutputBufferedResolvedEvents(t *testing.T) {
defer cancel()

capacity := 4
ls, _ := newTestLeveldbSorter(ctx, capacity)
ls, _ := newTestSorter(ctx, capacity)

buf := newOutputBuffer(capacity)

Expand Down Expand Up @@ -534,7 +560,7 @@ func TestOutputIterEvents(t *testing.T) {
defer cancel()

capacity := 4
ls, _ := newTestLeveldbSorter(ctx, capacity)
ls, _ := newTestSorter(ctx, capacity)

// Prepare data, 3 txns, 3 events for each.
// CRTs 2, StartTs 1, keys (0|1|2)
Expand Down Expand Up @@ -702,7 +728,7 @@ func TestPoll(t *testing.T) {
defer cancel()

capacity := 4
ls, mb := newTestLeveldbSorter(ctx, capacity)
ls, mb := newTestSorter(ctx, capacity)

// Prepare data, 3 txns, 3 events for each.
// CRTs 2, StartTs 1, keys (0|1|2)
Expand Down Expand Up @@ -941,7 +967,7 @@ func TestTryAddEntry(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
capacity := 1
ls, _ := newTestLeveldbSorter(ctx, capacity)
ls, _ := newTestSorter(ctx, capacity)

resolvedTs1 := model.NewResolvedPolymorphicEvent(0, 1)
sent, err := ls.TryAddEntry(ctx, resolvedTs1)
Expand Down
1 change: 1 addition & 0 deletions cdc/sorter/sorter.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
// sorted PolymorphicEvents in Output channel
type EventSorter interface {
Run(ctx context.Context) error
// TODO add constraints to entries, e.g., order and duplication guarantees.
AddEntry(ctx context.Context, entry *model.PolymorphicEvent)
// TryAddEntry tries to add and entry to the sorter.
// Returns false if the entry can not be added; otherwise it returns true
Expand Down

0 comments on commit bd9494d

Please sign in to comment.