Skip to content

Commit

Permalink
Merge pull request kakao#199 from ijsong/change-sync
Browse files Browse the repository at this point in the history
feat(storagenode): change the synchronization method to accept only the last commit context
  • Loading branch information
ijsong authored Nov 3, 2022
2 parents 67812dc + 8d331f6 commit 34fed84
Show file tree
Hide file tree
Showing 8 changed files with 1,357 additions and 660 deletions.
25 changes: 25 additions & 0 deletions internal/storage/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/stretchr/testify/require"

"github.com/kakao/varlog/pkg/types"
"github.com/kakao/varlog/proto/varlogpb"
)

func TestNewStorage(tb testing.TB, opts ...Option) *Storage {
Expand Down Expand Up @@ -44,3 +45,27 @@ func TestSetCommitContext(tb testing.TB, stg *Storage, cc CommitContext) {
require.NoError(tb, batch.Apply())
require.NoError(tb, batch.Close())
}

func TestDeleteCommitContext(tb testing.TB, stg *Storage) {
err := stg.db.Delete(commitContextKey, pebble.Sync)
require.NoError(tb, err)
}

func TestDeleteLogEntry(tb testing.TB, stg *Storage, lsn varlogpb.LogSequenceNumber) {
batch := stg.db.NewBatch()
defer func() {
err := batch.Close()
require.NoError(tb, err)
}()

dk := make([]byte, dataKeyLength)
err := batch.Delete(encodeDataKeyInternal(lsn.LLSN, dk), nil)
require.NoError(tb, err)

ck := make([]byte, commitKeyLength)
err = batch.Delete(encodeCommitKeyInternal(lsn.GLSN, ck), nil)
require.NoError(tb, err)

err = batch.Commit(pebble.Sync)
require.NoError(tb, err)
}
13 changes: 8 additions & 5 deletions internal/storagenode/logstream/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ const (
DefaultWriteQueueCapacity = 1024
DefaultCommitQueueCapacity = 1024
DefaultReplicateClientQueueCapacity = 1024
DefaultSyncInitTimeout = 10 * time.Second
DefaultSyncTimeout = 10 * time.Second
)

type executorConfig struct {
Expand All @@ -33,7 +33,7 @@ type executorConfig struct {
replicateClientGRPCOptions []grpc.DialOption
logger *zap.Logger
lsm *telemetry.LogStreamMetrics
syncInitTimeout time.Duration
syncTimeout time.Duration
}

func newExecutorConfig(opts []ExecutorOption) (executorConfig, error) {
Expand All @@ -43,7 +43,7 @@ func newExecutorConfig(opts []ExecutorOption) (executorConfig, error) {
commitQueueCapacity: DefaultCommitQueueCapacity,
replicateClientQueueCapacity: DefaultReplicateClientQueueCapacity,
logger: zap.NewNop(),
syncInitTimeout: DefaultSyncInitTimeout,
syncTimeout: DefaultSyncTimeout,
}
for _, opt := range opts {
opt.applyExecutor(&cfg)
Expand Down Expand Up @@ -174,8 +174,11 @@ func WithLogStreamMetrics(lsm *telemetry.LogStreamMetrics) ExecutorOption {
})
}

func WithSyncInitTimeout(syncInitTimeout time.Duration) ExecutorOption {
// WithSyncTimeout sets timeout for synchronization in the destination replica.
// If the destination replica doesn't receive the SyncReplicate RPC within
// syncTimeout, other SyncInit RPC can cancel the synchronization.
func WithSyncTimeout(syncTimeout time.Duration) ExecutorOption {
return newFuncExecutorOption(func(cfg *executorConfig) {
cfg.syncInitTimeout = syncInitTimeout
cfg.syncTimeout = syncTimeout
})
}
14 changes: 12 additions & 2 deletions internal/storagenode/logstream/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,14 @@ type Executor struct {
// primaryBackups is a slice of replicas of a log stream.
// It is updated by Unseal and is read by many codes.
primaryBackups []varlogpb.LogStreamReplica
// sync replicate buffer
srb *syncReplicateBuffer
dstSyncInfo struct {
// lastSyncTime is when the replica handles SyncInit or SyncReplicate
// successfully. If the elapsed time since the last RPC is greater than
// configured syncDurationTimeout, the synchronization process can be
// canceled by another SyncInit.
lastSyncTime time.Time
srcReplica types.StorageNodeID
}
sts map[types.StorageNodeID]*syncTracker
syncRunner *runner.Runner

Expand Down Expand Up @@ -241,6 +247,10 @@ func (lse *Executor) Seal(_ context.Context, lastCommittedGLSN types.GLSN) (stat
// log stream.
lse.logger.Panic("log stream: seal: unexpected last committed GLSN")
}
if lse.esm.load() == executorStateLearning {
return varlogpb.LogStreamStatusSealing, localHWM, nil
}

if localHighWatermark.GLSN < lastCommittedGLSN || invalid {
status = varlogpb.LogStreamStatusSealing
return status, localHWM, nil
Expand Down
Loading

0 comments on commit 34fed84

Please sign in to comment.