Skip to content

Commit

Permalink
feat(storagenode): change the synchronization method to accept only t…
Browse files Browse the repository at this point in the history
…he last commit context

This patch changes the synchronization process to accept only the last commit context applied by kakao#162.
The synchronization copies log entries and the latest commit context from the source replica to the
destination replica. The destination's reportCommitBase should be valid, and its log entries should
be the same as the source's.

SyncInit can now delete log entries that the source replica had already removed. It deletes all log
entries simultaneously to avoid a hole in the middle of log entries. It also makes the replica's
reportCommitBase invalid because commit context and log entry are inconsistent during the
synchronization.

SyncReplicate no longer uses the SyncReplicateBuffer since there is no reason to store a commit
context and corresponding log entries in the buffer and write them into a disk at once. It now saves
log entries or the commit context every time. We can optimize the SyncReplicate to transfer a batch
of log entries, which we can discuss later.

Updates kakao#125
  • Loading branch information
ijsong committed Nov 3, 2022
1 parent 67812dc commit 8d331f6
Show file tree
Hide file tree
Showing 8 changed files with 1,357 additions and 660 deletions.
25 changes: 25 additions & 0 deletions internal/storage/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/stretchr/testify/require"

"github.com/kakao/varlog/pkg/types"
"github.com/kakao/varlog/proto/varlogpb"
)

func TestNewStorage(tb testing.TB, opts ...Option) *Storage {
Expand Down Expand Up @@ -44,3 +45,27 @@ func TestSetCommitContext(tb testing.TB, stg *Storage, cc CommitContext) {
require.NoError(tb, batch.Apply())
require.NoError(tb, batch.Close())
}

func TestDeleteCommitContext(tb testing.TB, stg *Storage) {
err := stg.db.Delete(commitContextKey, pebble.Sync)
require.NoError(tb, err)
}

func TestDeleteLogEntry(tb testing.TB, stg *Storage, lsn varlogpb.LogSequenceNumber) {
batch := stg.db.NewBatch()
defer func() {
err := batch.Close()
require.NoError(tb, err)
}()

dk := make([]byte, dataKeyLength)
err := batch.Delete(encodeDataKeyInternal(lsn.LLSN, dk), nil)
require.NoError(tb, err)

ck := make([]byte, commitKeyLength)
err = batch.Delete(encodeCommitKeyInternal(lsn.GLSN, ck), nil)
require.NoError(tb, err)

err = batch.Commit(pebble.Sync)
require.NoError(tb, err)
}
13 changes: 8 additions & 5 deletions internal/storagenode/logstream/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ const (
DefaultWriteQueueCapacity = 1024
DefaultCommitQueueCapacity = 1024
DefaultReplicateClientQueueCapacity = 1024
DefaultSyncInitTimeout = 10 * time.Second
DefaultSyncTimeout = 10 * time.Second
)

type executorConfig struct {
Expand All @@ -33,7 +33,7 @@ type executorConfig struct {
replicateClientGRPCOptions []grpc.DialOption
logger *zap.Logger
lsm *telemetry.LogStreamMetrics
syncInitTimeout time.Duration
syncTimeout time.Duration
}

func newExecutorConfig(opts []ExecutorOption) (executorConfig, error) {
Expand All @@ -43,7 +43,7 @@ func newExecutorConfig(opts []ExecutorOption) (executorConfig, error) {
commitQueueCapacity: DefaultCommitQueueCapacity,
replicateClientQueueCapacity: DefaultReplicateClientQueueCapacity,
logger: zap.NewNop(),
syncInitTimeout: DefaultSyncInitTimeout,
syncTimeout: DefaultSyncTimeout,
}
for _, opt := range opts {
opt.applyExecutor(&cfg)
Expand Down Expand Up @@ -174,8 +174,11 @@ func WithLogStreamMetrics(lsm *telemetry.LogStreamMetrics) ExecutorOption {
})
}

func WithSyncInitTimeout(syncInitTimeout time.Duration) ExecutorOption {
// WithSyncTimeout sets timeout for synchronization in the destination replica.
// If the destination replica doesn't receive the SyncReplicate RPC within
// syncTimeout, other SyncInit RPC can cancel the synchronization.
func WithSyncTimeout(syncTimeout time.Duration) ExecutorOption {
return newFuncExecutorOption(func(cfg *executorConfig) {
cfg.syncInitTimeout = syncInitTimeout
cfg.syncTimeout = syncTimeout
})
}
14 changes: 12 additions & 2 deletions internal/storagenode/logstream/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,14 @@ type Executor struct {
// primaryBackups is a slice of replicas of a log stream.
// It is updated by Unseal and is read by many codes.
primaryBackups []varlogpb.LogStreamReplica
// sync replicate buffer
srb *syncReplicateBuffer
dstSyncInfo struct {
// lastSyncTime is when the replica handles SyncInit or SyncReplicate
// successfully. If the elapsed time since the last RPC is greater than
// configured syncDurationTimeout, the synchronization process can be
// canceled by another SyncInit.
lastSyncTime time.Time
srcReplica types.StorageNodeID
}
sts map[types.StorageNodeID]*syncTracker
syncRunner *runner.Runner

Expand Down Expand Up @@ -241,6 +247,10 @@ func (lse *Executor) Seal(_ context.Context, lastCommittedGLSN types.GLSN) (stat
// log stream.
lse.logger.Panic("log stream: seal: unexpected last committed GLSN")
}
if lse.esm.load() == executorStateLearning {
return varlogpb.LogStreamStatusSealing, localHWM, nil
}

if localHighWatermark.GLSN < lastCommittedGLSN || invalid {
status = varlogpb.LogStreamStatusSealing
return status, localHWM, nil
Expand Down
Loading

0 comments on commit 8d331f6

Please sign in to comment.