Skip to content

Commit

Permalink
feat(storagenode): replica in the learning state does not report to a…
Browse files Browse the repository at this point in the history
… metadata repository

Previously, a log stream replica in the learning state sent reports to the metadata repository, and
the log stream replica had to use commit contexts sent from the source replica.
However, the source cannot do that in a future patch because replicas in a log stream will store the
latest commit context rather than all commit contexts.
This patch slightly changes the report's behavior in a log stream replica, not to send it while in a
learning state. When the log stream replica copies the data from the source replica, it will not
send the report.

Updates kakao#125
  • Loading branch information
ijsong committed Sep 21, 2022
1 parent b273066 commit e066495
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 2 deletions.
5 changes: 4 additions & 1 deletion internal/storagenode/logstream/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,8 +348,11 @@ func (lse *Executor) Report(_ context.Context) (snpb.LogStreamUncommitReport, er
atomic.AddInt64(&lse.inflight, 1)
defer atomic.AddInt64(&lse.inflight, -1)

if lse.esm.load() == executorStateClosed {
switch lse.esm.load() {
case executorStateClosed:
return snpb.LogStreamUncommitReport{}, verrors.ErrClosed
case executorStateLearning:
return snpb.LogStreamUncommitReport{}, errors.New("log stream: learning state")
}

version, highWatermark, uncommittedLLSNBegin := lse.lsc.reportCommitBase()
Expand Down
10 changes: 10 additions & 0 deletions internal/storagenode/logstream/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"

"github.com/kakao/varlog/pkg/rpc"
Expand Down Expand Up @@ -74,3 +75,12 @@ func TestNewBatchData(tb testing.TB, batchLen int, msgSize int) [][]byte {
}
return batch
}

func TestNewReplicatorClient(t *testing.T, addr string) (snpb.ReplicatorClient, func()) {
cc, err := rpc.NewConn(context.Background(), addr)
require.NoError(t, err)
client := snpb.NewReplicatorClient(cc.Conn)
return client, func() {
assert.NoError(t, cc.Close())
}
}
90 changes: 90 additions & 0 deletions internal/storagenode/storagenode_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package storagenode

import (
"context"
"math/rand"
"os"
"path/filepath"
Expand All @@ -10,8 +11,10 @@ import (
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/kakao/varlog/internal/reportcommitter"
"github.com/kakao/varlog/internal/storagenode/logstream"
"github.com/kakao/varlog/pkg/types"
"github.com/kakao/varlog/proto/snpb"
"github.com/kakao/varlog/proto/varlogpb"
Expand Down Expand Up @@ -430,3 +433,90 @@ func TestStorageNode_MakeVolumesAbsolute(t *testing.T) {
assert.True(t, filepath.IsAbs(volume))
}
}

func TestStorageNode_Report(t *testing.T) {
const (
cid = types.ClusterID(1)
tpid = types.TopicID(1)
lsid = types.LogStreamID(1)
snid = types.StorageNodeID(1)
)

tcs := []struct {
name string
testf func(t *testing.T, addr string)
}{
{
name: "Succeed",
testf: func(t *testing.T, addr string) {
reports := reportcommitter.TestGetReport(t, addr)
require.Len(t, reports, 1)
require.Equal(t, lsid, reports[0].LogStreamID)
require.EqualValues(t, 1, reports[0].UncommittedLLSNOffset)
require.Zero(t, reports[0].UncommittedLLSNLength)
},
},
{
name: "Learning",
testf: func(t *testing.T, addr string) {
rc, rcClose := logstream.TestNewReplicatorClient(t, addr)
defer rcClose()

_, err := rc.SyncInit(context.Background(), &snpb.SyncInitRequest{
ClusterID: cid,
Source: varlogpb.LogStreamReplica{
StorageNode: varlogpb.StorageNode{
StorageNodeID: snid + 1,
},
TopicLogStream: varlogpb.TopicLogStream{
TopicID: tpid,
LogStreamID: lsid,
},
},
Destination: varlogpb.LogStreamReplica{
StorageNode: varlogpb.StorageNode{
StorageNodeID: snid,
},
TopicLogStream: varlogpb.TopicLogStream{
TopicID: tpid,
LogStreamID: lsid,
},
},
Range: snpb.SyncRange{
FirstLLSN: 1,
LastLLSN: 10,
},
})
require.NoError(t, err)

reports := reportcommitter.TestGetReport(t, addr)
require.Empty(t, reports)
},
},
}

for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
sn := TestNewSimpleStorageNode(t,
WithDefaultLogStreamExecutorOptions(
logstream.WithSyncInitTimeout(time.Minute),
),
)

var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
_ = sn.Serve()
}()

addr := TestGetAdvertiseAddress(t, sn)

mc, mcClose := TestNewManagementClient(t, sn.cid, sn.snid, addr)
defer mcClose()

_, err := mc.AddLogStreamReplica(context.Background(), tpid, lsid, sn.snPaths[0])
require.NoError(t, err)
})
}
}
9 changes: 8 additions & 1 deletion tests/it/management/management_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,14 @@ func TestSyncLogStream(t *testing.T) {
if err != nil {
return false
}
rpt := rsp.GetUncommitReports()[0]

// Log stream replica does not send a report in the learning state.
rpts := rsp.GetUncommitReports()
if len(rpts) < 1 {
return false
}

rpt := rpts[0]
return rpt.GetVersion() == types.Version(numLogs) &&
rpt.GetUncommittedLLSNOffset() == types.LLSN(numLogs+1) &&
rpt.GetUncommittedLLSNLength() == 0 &&
Expand Down

0 comments on commit e066495

Please sign in to comment.