diff --git a/pkg/varlog/log.go b/pkg/varlog/log.go index ebc76073a..e24ca1d33 100644 --- a/pkg/varlog/log.go +++ b/pkg/varlog/log.go @@ -45,6 +45,10 @@ type Log interface { // NewLogStreamAppender returns a new LogStreamAppender. NewLogStreamAppender(tpid types.TopicID, lsid types.LogStreamID, opts ...LogStreamAppenderOption) (LogStreamAppender, error) + + // AppendableLogStreams returns all writable log streams belonging to the + // topic specified by the argument tpid. + AppendableLogStreams(tpid types.TopicID) map[types.LogStreamID]struct{} } type AppendResult struct { @@ -184,6 +188,15 @@ func (v *logImpl) NewLogStreamAppender(tpid types.TopicID, lsid types.LogStreamI return v.newLogStreamAppender(context.Background(), tpid, lsid, opts...) } +func (v *logImpl) AppendableLogStreams(tpid types.TopicID) map[types.LogStreamID]struct{} { + ids := v.lsSelector.GetAll(tpid) + ret := make(map[types.LogStreamID]struct{}) + for _, id := range ids { + ret[id] = struct{}{} + } + return ret +} + func (v *logImpl) Close() (err error) { if v.closed.Load() { return diff --git a/pkg/varlog/x/mlsa/manager.go b/pkg/varlog/x/mlsa/manager.go new file mode 100644 index 000000000..f39840ab9 --- /dev/null +++ b/pkg/varlog/x/mlsa/manager.go @@ -0,0 +1,129 @@ +package mlsa + +import ( + "errors" + + "github.com/puzpuzpuz/xsync/v2" + + "github.com/kakao/varlog/pkg/types" + "github.com/kakao/varlog/pkg/varlog" +) + +type managedLSA struct { + mgr *Manager + lsa varlog.LogStreamAppender + tpid types.TopicID + lsid types.LogStreamID +} + +var _ varlog.LogStreamAppender = (*managedLSA)(nil) + +func (m *managedLSA) AppendBatch(dataBatch [][]byte, callback varlog.BatchCallback) error { + return m.lsa.AppendBatch(dataBatch, callback) +} + +func (m *managedLSA) Close() { + m.mgr.mu.Lock() + appenders, ok := m.mgr.mlsas[m.tpid] + if ok && appenders[m.lsid] == m { + delete(appenders, m.lsid) + } + m.mgr.mu.Unlock() + + m.lsa.Close() +} + +// Manager manages a set of LogStreamAppenders. +// When multiple users attempt to obtain the same LogStreamAppender using the +// same TopicID and LogStreamID, they may share the same object. Therefore, +// when one user calls the Close function, another may encounter an ErrClose +// error. To resolve this issue, users can simply get a new LogStreamAppender +// by calling the Get function again. +type Manager struct { + mlsas map[types.TopicID]map[types.LogStreamID]*managedLSA + mu *xsync.RBMutex + + vcli varlog.Log + opts []varlog.LogStreamAppenderOption +} + +// New returns a new Manager. Generally, users only require one manager. +func New(vcli varlog.Log, opts ...varlog.LogStreamAppenderOption) *Manager { + mgr := &Manager{ + vcli: vcli, + opts: opts, + mlsas: make(map[types.TopicID]map[types.LogStreamID]*managedLSA), + mu: xsync.NewRBMutex(), + } + return mgr +} + +// Get returns a LogStreamAppender specified by the arguments tpid and lsid. It +// returns an error if it cannot access the given log stream. +func (mgr *Manager) Get(tpid types.TopicID, lsid types.LogStreamID) (varlog.LogStreamAppender, error) { + t := mgr.mu.RLock() + if lsa := mgr.mlsas[tpid][lsid]; lsa != nil { + mgr.mu.RUnlock(t) + return lsa, nil + } + mgr.mu.RUnlock(t) + + return mgr.getSlow(tpid, lsid) +} + +func (mgr *Manager) getSlow(tpid types.TopicID, lsid types.LogStreamID) (varlog.LogStreamAppender, error) { + mgr.mu.Lock() + defer mgr.mu.Unlock() + + appenders, ok := mgr.mlsas[tpid] + if !ok { + appenders = make(map[types.LogStreamID]*managedLSA) + mgr.mlsas[tpid] = appenders + } + + mlsa, ok := appenders[lsid] + if !ok { + lsa, err := mgr.vcli.NewLogStreamAppender(tpid, lsid, mgr.opts...) + if err != nil { + return nil, err + } + mlsa = &managedLSA{ + lsa: lsa, + tpid: tpid, + lsid: lsid, + mgr: mgr, + } + appenders[lsid] = mlsa + } + return mlsa, nil +} + +// Any returns the LogStreamAppender that is filtered by the argument +// allowlist. If the allowlist is empty, all log streams in the topic can be +// chosen. +// It chooses the writable LogStreamAppender as possible. +func (mgr *Manager) Any(tpid types.TopicID, allowlist map[types.LogStreamID]struct{}) (varlog.LogStreamAppender, error) { + var shorter, longer map[types.LogStreamID]struct{} + candidates := mgr.vcli.AppendableLogStreams(tpid) + if len(allowlist) == 0 || len(candidates) < len(allowlist) { + shorter = candidates + longer = allowlist + } else { + shorter = allowlist + longer = candidates + } + + for lsid := range shorter { + if len(longer) > 0 { + if _, ok := longer[lsid]; !ok { + continue + } + } + + lsa, err := mgr.Get(tpid, lsid) + if err == nil { + return lsa, nil + } + } + return nil, errors.New("no appendable log stream") +} diff --git a/pkg/varlogtest/log.go b/pkg/varlogtest/log.go index a09b5c8e0..f4b0abc65 100644 --- a/pkg/varlogtest/log.go +++ b/pkg/varlogtest/log.go @@ -297,12 +297,40 @@ func (c *testLog) PeekLogStream(ctx context.Context, tpid types.TopicID, lsid ty } +func (c *testLog) AppendableLogStreams(tpid types.TopicID) map[types.LogStreamID]struct{} { + if err := c.lock(); err != nil { + return nil + } + defer c.unlock() + + topicDesc, ok := c.vt.topics[tpid] + if !ok { + return nil + } + + ret := make(map[types.LogStreamID]struct{}, len(topicDesc.LogStreams)) + for _, lsid := range topicDesc.LogStreams { + ret[lsid] = struct{}{} + } + return ret +} + // NewLogStreamAppender returns a new fake LogStreamAppender for testing. It // ignores options; the pipeline size is five, and the default callback has no // operation. func (c *testLog) NewLogStreamAppender(tpid types.TopicID, lsid types.LogStreamID, _ ...varlog.LogStreamAppenderOption) (varlog.LogStreamAppender, error) { const pipelineSize = 5 + if err := c.lock(); err != nil { + return nil, err + } + defer c.unlock() + + _, err := c.vt.logStreamDescriptor(tpid, lsid) + if err != nil { + return nil, err + } + lsa := &logStreamAppender{ c: c, tpid: tpid, diff --git a/pkg/varlogtest/varlogtest_test.go b/pkg/varlogtest/varlogtest_test.go index 39429681b..7751fc915 100644 --- a/pkg/varlogtest/varlogtest_test.go +++ b/pkg/varlogtest/varlogtest_test.go @@ -16,6 +16,7 @@ import ( "github.com/kakao/varlog/pkg/types" "github.com/kakao/varlog/pkg/util/container/set" "github.com/kakao/varlog/pkg/varlog" + "github.com/kakao/varlog/pkg/varlog/x/mlsa" "github.com/kakao/varlog/pkg/varlogtest" "github.com/kakao/varlog/pkg/verrors" "github.com/kakao/varlog/proto/varlogpb" @@ -30,19 +31,26 @@ func TestVarlotTest_LogStreamAppender(t *testing.T) { tcs := []struct { name string - testf func(t *testing.T, lsa varlog.LogStreamAppender) + testf func(t *testing.T, vcli varlog.Log, tpid types.TopicID, lsid types.LogStreamID) }{ { name: "Closed", - testf: func(t *testing.T, lsa varlog.LogStreamAppender) { + testf: func(t *testing.T, vcli varlog.Log, tpid types.TopicID, lsid types.LogStreamID) { + lsa, err := vcli.NewLogStreamAppender(tpid, lsid) + require.NoError(t, err) + lsa.Close() - err := lsa.AppendBatch([][]byte{[]byte("foo")}, nil) + err = lsa.AppendBatch([][]byte{[]byte("foo")}, nil) require.Equal(t, varlog.ErrClosed, err) }, }, { name: "AppendLogs", - testf: func(t *testing.T, lsa varlog.LogStreamAppender) { + testf: func(t *testing.T, vcli varlog.Log, tpid types.TopicID, lsid types.LogStreamID) { + lsa, err := vcli.NewLogStreamAppender(tpid, lsid) + require.NoError(t, err) + defer lsa.Close() + cb := func(_ []varlogpb.LogEntryMeta, err error) { assert.NoError(t, err) } @@ -52,6 +60,49 @@ func TestVarlotTest_LogStreamAppender(t *testing.T) { } }, }, + { + name: "Manager", + testf: func(t *testing.T, vcli varlog.Log, tpid types.TopicID, lsid types.LogStreamID) { + mgr := mlsa.New(vcli) + + _, err := mgr.Get(tpid+1, lsid) + require.Error(t, err) + + _, err = mgr.Get(tpid, lsid+1) + require.Error(t, err) + + lsa1, err := mgr.Get(tpid, lsid) + require.NoError(t, err) + lsa2, err := mgr.Get(tpid, lsid) + require.NoError(t, err) + + var wg sync.WaitGroup + wg.Add(2) + cb := func(_ []varlogpb.LogEntryMeta, err error) { + defer wg.Done() + assert.NoError(t, err) + } + err = lsa1.AppendBatch([][]byte{[]byte("foo")}, cb) + require.NoError(t, err) + err = lsa2.AppendBatch([][]byte{[]byte("foo")}, cb) + require.NoError(t, err) + wg.Wait() + + lsa1.Close() + err = lsa2.AppendBatch([][]byte{[]byte("foo")}, nil) + require.Equal(t, varlog.ErrClosed, err) + + lsa1, err = mgr.Get(tpid, lsid) + require.NoError(t, err) + wg.Add(1) + err = lsa1.AppendBatch([][]byte{[]byte("foo")}, cb) + require.NoError(t, err) + err = lsa2.AppendBatch([][]byte{[]byte("foo")}, nil) + require.Equal(t, varlog.ErrClosed, err) + wg.Wait() + lsa1.Close() + }, + }, } for _, tc := range tcs { @@ -87,11 +138,7 @@ func TestVarlotTest_LogStreamAppender(t *testing.T) { require.Equal(t, varlogpb.LogStreamStatusRunning, lsd.Status) require.Len(t, lsd.Replicas, replicationFactor) - lsa, err := vlg.NewLogStreamAppender(td.TopicID, lsd.LogStreamID) - require.NoError(t, err) - defer lsa.Close() - - tc.testf(t, lsa) + tc.testf(t, vlg, td.TopicID, lsd.LogStreamID) }) } } diff --git a/tests/it/cluster/client_test.go b/tests/it/cluster/client_test.go index 74deb26d2..f6d78f71e 100644 --- a/tests/it/cluster/client_test.go +++ b/tests/it/cluster/client_test.go @@ -16,6 +16,7 @@ import ( "github.com/kakao/varlog/pkg/types" "github.com/kakao/varlog/pkg/util/testutil" "github.com/kakao/varlog/pkg/varlog" + "github.com/kakao/varlog/pkg/varlog/x/mlsa" "github.com/kakao/varlog/proto/varlogpb" "github.com/kakao/varlog/tests/it" ) @@ -958,6 +959,137 @@ func TestLogStreamAppender(t *testing.T) { wg.Wait() }, }, + { + name: "Manager_NoSuchTopic", + testf: func(t *testing.T, tpid types.TopicID, lsid types.LogStreamID, vcli varlog.Log) { + mgr := mlsa.New(vcli) + _, err := mgr.Get(tpid+1, lsid) + require.Error(t, err) + + _, err = mgr.Any(tpid+1, nil) + require.Error(t, err) + }, + }, + { + name: "Manager_NoSuchLogStream", + testf: func(t *testing.T, tpid types.TopicID, lsid types.LogStreamID, vcli varlog.Log) { + mgr := mlsa.New(vcli) + _, err := mgr.Get(tpid, lsid+1) + require.Error(t, err) + + _, err = mgr.Any(tpid, map[types.LogStreamID]struct{}{ + lsid + 1: {}, + }) + require.Error(t, err) + }, + }, + { + name: "Manager_AppendBatch", + testf: func(t *testing.T, tpid types.TopicID, lsid types.LogStreamID, vcli varlog.Log) { + mgr := mlsa.New(vcli) + + lsa1, err := mgr.Get(tpid, lsid) + require.NoError(t, err) + lsa2, err := mgr.Get(tpid, lsid) + require.NoError(t, err) + + var wg sync.WaitGroup + dataBatch := [][]byte{[]byte("foo")} + + wg.Add(2) + err = lsa1.AppendBatch(dataBatch, func(_ []varlogpb.LogEntryMeta, err error) { + defer wg.Done() + assert.NoError(t, err) + }) + require.NoError(t, err) + err = lsa2.AppendBatch(dataBatch, func(_ []varlogpb.LogEntryMeta, err error) { + defer wg.Done() + assert.NoError(t, err) + }) + require.NoError(t, err) + wg.Wait() + + lsa1.Close() + lsa2.Close() + }, + }, + { + name: "Manager_Close", + testf: func(t *testing.T, tpid types.TopicID, lsid types.LogStreamID, vcli varlog.Log) { + mgr := mlsa.New(vcli) + + lsa1, err := mgr.Get(tpid, lsid) + require.NoError(t, err) + lsa2, err := mgr.Get(tpid, lsid) + require.NoError(t, err) + + dataBatch := [][]byte{[]byte("foo")} + + lsa1.Close() + err = lsa1.AppendBatch(dataBatch, func([]varlogpb.LogEntryMeta, error) { + assert.Fail(t, "unexpected callback") + }) + require.Error(t, err) + err = lsa2.AppendBatch(dataBatch, func([]varlogpb.LogEntryMeta, error) { + assert.Fail(t, "unexpected callback") + }) + require.Error(t, err) + }, + }, + { + name: "Manager_CloseAndGet", + testf: func(t *testing.T, tpid types.TopicID, lsid types.LogStreamID, vcli varlog.Log) { + mgr := mlsa.New(vcli) + + lsa1, err := mgr.Get(tpid, lsid) + require.NoError(t, err) + lsa2, err := mgr.Get(tpid, lsid) + require.NoError(t, err) + + lsa1.Close() + lsa1, err = mgr.Get(tpid, lsid) + require.NoError(t, err) + + var wg sync.WaitGroup + dataBatch := [][]byte{[]byte("foo")} + wg.Add(1) + err = lsa1.AppendBatch(dataBatch, func(_ []varlogpb.LogEntryMeta, err error) { + defer wg.Done() + assert.NoError(t, err) + }) + require.NoError(t, err) + err = lsa2.AppendBatch(dataBatch, func([]varlogpb.LogEntryMeta, error) { + assert.Fail(t, "unexpected callback") + }) + require.Error(t, err) + wg.Wait() + + lsa1.Close() + }, + }, + { + name: "Manager_Any", + testf: func(t *testing.T, tpid types.TopicID, lsid types.LogStreamID, vcli varlog.Log) { + mgr := mlsa.New(vcli) + + lsa, err := mgr.Any(tpid, map[types.LogStreamID]struct{}{ + lsid: {}, + }) + require.NoError(t, err) + + var wg sync.WaitGroup + dataBatch := [][]byte{[]byte("foo")} + wg.Add(1) + err = lsa.AppendBatch(dataBatch, func(_ []varlogpb.LogEntryMeta, err error) { + defer wg.Done() + assert.NoError(t, err) + }) + require.NoError(t, err) + wg.Wait() + + lsa.Close() + }, + }, } for _, tc := range tcs {