Skip to content

Commit

Permalink
add flushLog to redo manager
Browse files Browse the repository at this point in the history
  • Loading branch information
CharlesCheung96 committed Jun 19, 2022
1 parent 82cca1c commit b516897
Show file tree
Hide file tree
Showing 6 changed files with 252 additions and 218 deletions.
2 changes: 2 additions & 0 deletions cdc/processor/pipeline/table_actor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ func TestTableActorInterface(t *testing.T) {
tbl.redoManager.AddTable(tbl.tableID, 0)
require.Equal(t, model.Ts(0), tbl.ResolvedTs())
tbl.redoManager.FlushLog(ctx, tbl.tableID, model.Ts(6))
require.Eventually(t, func() bool { return tbl.ResolvedTs() == model.Ts(6) },
time.Second*2, time.Millisecond*500)
require.Equal(t, model.Ts(6), tbl.ResolvedTs())
tbl.redoManager.Cleanup(ctx)
}
Expand Down
180 changes: 105 additions & 75 deletions cdc/redo/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"context"
"math"
"path/filepath"
"sort"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -119,15 +118,10 @@ type ManagerOptions struct {
}

type cacheEvents struct {
tableID model.TableID
rows []*model.RowChangedEvent

// When calling FlushLog for a table, we must ensure that all data of this
// table has been written to underlying writer. Since the EmitRowChangedEvents
// and FlushLog of the same table can't be executed concurrently, we can
// insert a simple barrier data into data stream to achieve this goal.
flushCallback chan struct{}
resolvedTs model.Ts
tableID model.TableID
rows []*model.RowChangedEvent
resolvedTs model.Ts
eventType model.MqMessageType
}

// ManagerImpl manages redo log writer, buffers un-persistent redo logs, calculates
Expand All @@ -137,13 +131,13 @@ type ManagerImpl struct {
level ConsistentLevelType
storageType consistentStorage

writer writer.RedoLogWriter
logBuffer *chann.Chann[cacheEvents]

writer writer.RedoLogWriter
logBuffer *chann.Chann[cacheEvents]
minResolvedTs uint64
tableIDs []model.TableID
rtsMap map[model.TableID]model.Ts
rtsMapMu sync.RWMutex
needsFlush chan struct{}

rtsMap map[model.TableID]model.Ts
rtsMapMu sync.RWMutex
}

// NewManager creates a new Manager
Expand All @@ -162,6 +156,7 @@ func NewManager(ctx context.Context, cfg *config.ConsistentConfig, opts *Manager
storageType: consistentStorage(uri.Scheme),
rtsMap: make(map[model.TableID]uint64),
logBuffer: chann.New[cacheEvents](),
needsFlush: make(chan struct{}, 1),
}

switch m.storageType {
Expand Down Expand Up @@ -207,7 +202,7 @@ func NewManager(ctx context.Context, cfg *config.ConsistentConfig, opts *Manager
}

if opts.EnableBgRunner {
go m.bgUpdateResolvedTs(ctx, opts.ErrCh)
go m.bgUpdateFlushFlag(ctx)
go m.bgUpdateLog(ctx, opts.ErrCh)
}
return m, nil
Expand Down Expand Up @@ -246,8 +241,9 @@ func (m *ManagerImpl) EmitRowChangedEvents(
case <-timer.C:
return cerror.ErrBufferLogTimeout.GenWithStackByArgs()
case m.logBuffer.In() <- cacheEvents{
tableID: tableID,
rows: rows,
tableID: tableID,
rows: rows,
eventType: model.MqMessageTypeRow,
}:
}
return nil
Expand All @@ -259,19 +255,18 @@ func (m *ManagerImpl) FlushLog(
tableID model.TableID,
resolvedTs uint64,
) error {
// Adding a barrier to data stream, to ensure all logs of this table has been
// written to underlying writer.
flushCallbackCh := make(chan struct{})
m.logBuffer.In() <- cacheEvents{
tableID: tableID,
flushCallback: flushCallbackCh,
resolvedTs: resolvedTs,
}

timer := time.NewTimer(logBufferTimeout)
defer timer.Stop()
select {
case <-ctx.Done():
return errors.Trace(ctx.Err())
case <-flushCallbackCh:
case <-timer.C:
return cerror.ErrBufferLogTimeout.GenWithStackByArgs()
case m.logBuffer.In() <- cacheEvents{
tableID: tableID,
resolvedTs: resolvedTs,
eventType: model.MqMessageTypeResolved,
}:
}

return nil
Expand All @@ -291,6 +286,7 @@ func (m *ManagerImpl) GetResolvedTs(tableID model.TableID) model.Ts {

// GetMinResolvedTs returns the minimum resolved ts of all tables in this redo log manager
func (m *ManagerImpl) GetMinResolvedTs() model.Ts {
log.Error("", zap.Uint64("minResolvedTs", atomic.LoadUint64(&m.minResolvedTs)))
return atomic.LoadUint64(&m.minResolvedTs)
}

Expand All @@ -308,32 +304,18 @@ func (m *ManagerImpl) FlushResolvedAndCheckpointTs(ctx context.Context, resolved
func (m *ManagerImpl) AddTable(tableID model.TableID, startTs uint64) {
m.rtsMapMu.Lock()
defer m.rtsMapMu.Unlock()
i := sort.Search(len(m.tableIDs), func(i int) bool {
return m.tableIDs[i] >= tableID
})
if i < len(m.tableIDs) && m.tableIDs[i] == tableID {
if _, ok := m.rtsMap[tableID]; ok {
log.Warn("add duplicated table in redo log manager", zap.Int64("tableID", tableID))
return
}
if i == len(m.tableIDs) {
m.tableIDs = append(m.tableIDs, tableID)
} else {
m.tableIDs = append(m.tableIDs[:i+1], m.tableIDs[i:]...)
m.tableIDs[i] = tableID
}
m.rtsMap[tableID] = startTs
}

// RemoveTable removes a table from redo log manager
func (m *ManagerImpl) RemoveTable(tableID model.TableID) {
m.rtsMapMu.Lock()
defer m.rtsMapMu.Unlock()
i := sort.Search(len(m.tableIDs), func(i int) bool {
return m.tableIDs[i] >= tableID
})
if i < len(m.tableIDs) && m.tableIDs[i] == tableID {
copy(m.tableIDs[i:], m.tableIDs[i+1:])
m.tableIDs = m.tableIDs[:len(m.tableIDs)-1]
if _, ok := m.rtsMap[tableID]; ok {
delete(m.rtsMap, tableID)
} else {
log.Warn("remove a table not maintained in redo log manager", zap.Int64("tableID", tableID))
Expand All @@ -351,83 +333,131 @@ func (m *ManagerImpl) Cleanup(ctx context.Context) error {
return m.writer.DeleteAllLogs(ctx)
}

func (m *ManagerImpl) updateTableResolvedTs(ctx context.Context) error {
func (m *ManagerImpl) flushLog(
ctx context.Context, tableRtsMap map[model.TableID]model.Ts,
) (map[model.TableID]model.Ts, error) {
emptyRtsMap := make(map[model.TableID]model.Ts)
// err := m.writer.FlushLog(ctx, tableRtsMap)
// if err != nil {
// return emptyRtsMap, err
// }

m.rtsMapMu.Lock()
defer m.rtsMapMu.Unlock()

minResolvedTs := uint64(math.MaxUint64)
for tableID := range m.rtsMap {
if newRts, ok := tableRtsMap[tableID]; ok {
if newRts < m.rtsMap[tableID] {
log.Panic("resolvedTs in redoManager regressed, report a bug",
zap.Int64("tableID", tableID),
zap.Uint64("oldResolvedTs", m.rtsMap[tableID]),
zap.Uint64("currentReolvedTs", newRts))
}
m.rtsMap[tableID] = newRts
}

rts := m.rtsMap[tableID]
if rts < minResolvedTs {
minResolvedTs = rts
}
}
atomic.StoreUint64(&m.minResolvedTs, minResolvedTs)
return nil
return emptyRtsMap, nil
}

func (m *ManagerImpl) bgUpdateResolvedTs(ctx context.Context, errCh chan<- error) {
func (m *ManagerImpl) bgUpdateFlushFlag(ctx context.Context) {
ticker := time.NewTicker(updateRtsInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
err := m.updateTableResolvedTs(ctx)
if err != nil {
select {
case errCh <- err:
default:
log.Error("err channel is full", zap.Error(err))
}
return
select {
case m.needsFlush <- struct{}{}:
default:
log.Warn("Fail to update flush flag, " +
"the previous flush operation hasn't finished yet")
}
}
}
}

func (m *ManagerImpl) bgUpdateLog(ctx context.Context, errCh chan<- error) {
handleErr := func(err error) {
select {
case errCh <- err:
default:
log.Error("err channel is full", zap.Error(err))
}
}
tableRtsMap := make(map[int64]uint64)

go func() {
// interpolate unknown type event to prevent the flush opration from blocking
ticker := time.NewTicker(time.Millisecond * 500)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
m.logBuffer.In() <- cacheEvents{
eventType: model.MqMessageTypeUnknown,
}
}
}
}()

for {
var err error
select {
case <-ctx.Done():
return
case cache, ok := <-m.logBuffer.Out():
if !ok {
return // channel closed
}
if cache.rows != nil {
switch cache.eventType {
case model.MqMessageTypeRow:
logs := make([]*model.RedoRowChangedEvent, 0, len(cache.rows))
for _, row := range cache.rows {
logs = append(logs, RowToRedo(row))
}
_, err = m.writer.WriteLog(ctx, cache.tableID, logs)
} else {
_, err := m.writer.WriteLog(ctx, cache.tableID, logs)
if err != nil {
handleErr(err)
return
}
case model.MqMessageTypeResolved:
// handle resolved ts
err = m.writer.FlushLog(ctx, cache.tableID, cache.resolvedTs)
_ = err
m.rtsMapMu.Lock()
if v, ok := m.rtsMap[cache.tableID]; ok {
if v > cache.resolvedTs {
if oldRts, ok := tableRtsMap[cache.tableID]; ok {
if cache.resolvedTs < oldRts {
log.Panic("resolvedTs in redoManager regressed, report a bug",
zap.Int64("tableID", cache.tableID),
zap.Uint64("oldResolvedTs", v),
zap.Uint64("oldResolvedTs", oldRts),
zap.Uint64("currentReolvedTs", cache.resolvedTs))
}
m.rtsMap[cache.tableID] = cache.resolvedTs
}
m.rtsMapMu.Unlock()
close(cache.flushCallback)
tableRtsMap[cache.tableID] = cache.resolvedTs
default:
log.Debug("handle unknown event type")
}
}

if err != nil {
select {
case errCh <- err:
default:
log.Error("err channel is full", zap.Error(err))
// flush writer if needed
select {
case <-m.needsFlush:
log.Info("flushing redo log >>>>>>")
var err error
tableRtsMap, err = m.flushLog(ctx, tableRtsMap)
log.Info("flushing done <<<<<<======")
if err != nil {
handleErr(err)
return
}
return
default:
log.Info("no need to flush log")
}
}
}
Loading

0 comments on commit b516897

Please sign in to comment.