Skip to content

Commit

Permalink
feat(client): log stream appender manager
Browse files Browse the repository at this point in the history
This PR adds a manager for log stream appenders, `pkg/varlog/x/mlsa.(Manager)`. It provides two
methods:

- `pkg/varlog/x/mlsa.(Manager).Get`: Get a LogStreamAppender specified by the given topic and log
  stream.
- `pkg/varlog/x/mlsa.(Manager).Any`: Get a writable LogStreamAppender belonging to the given topic.
  • Loading branch information
ijsong committed Jun 13, 2023
1 parent 6f916d4 commit 617dd42
Show file tree
Hide file tree
Showing 5 changed files with 358 additions and 9 deletions.
13 changes: 13 additions & 0 deletions pkg/varlog/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ type Log interface {

// NewLogStreamAppender returns a new LogStreamAppender.
NewLogStreamAppender(tpid types.TopicID, lsid types.LogStreamID, opts ...LogStreamAppenderOption) (LogStreamAppender, error)

// AppendableLogStreams returns all writable log streams belonging to the
// topic specified by the argument tpid.
AppendableLogStreams(tpid types.TopicID) map[types.LogStreamID]struct{}
}

type AppendResult struct {
Expand Down Expand Up @@ -184,6 +188,15 @@ func (v *logImpl) NewLogStreamAppender(tpid types.TopicID, lsid types.LogStreamI
return v.newLogStreamAppender(context.Background(), tpid, lsid, opts...)
}

func (v *logImpl) AppendableLogStreams(tpid types.TopicID) map[types.LogStreamID]struct{} {
ids := v.lsSelector.GetAll(tpid)
ret := make(map[types.LogStreamID]struct{})
for _, id := range ids {
ret[id] = struct{}{}
}
return ret
}

func (v *logImpl) Close() (err error) {
if v.closed.Load() {
return
Expand Down
129 changes: 129 additions & 0 deletions pkg/varlog/x/mlsa/manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
package mlsa

import (
"errors"

"github.com/puzpuzpuz/xsync/v2"

"github.com/kakao/varlog/pkg/types"
"github.com/kakao/varlog/pkg/varlog"
)

type managedLSA struct {
mgr *Manager
lsa varlog.LogStreamAppender
tpid types.TopicID
lsid types.LogStreamID
}

var _ varlog.LogStreamAppender = (*managedLSA)(nil)

func (m *managedLSA) AppendBatch(dataBatch [][]byte, callback varlog.BatchCallback) error {
return m.lsa.AppendBatch(dataBatch, callback)
}

func (m *managedLSA) Close() {
m.mgr.mu.Lock()
appenders, ok := m.mgr.mlsas[m.tpid]
if ok && appenders[m.lsid] == m {
delete(appenders, m.lsid)
}
m.mgr.mu.Unlock()

m.lsa.Close()
}

// Manager manages a set of LogStreamAppenders.
// When multiple users attempt to obtain the same LogStreamAppender using the
// same TopicID and LogStreamID, they may share the same object. Therefore,
// when one user calls the Close function, another may encounter an ErrClose
// error. To resolve this issue, users can simply get a new LogStreamAppender
// by calling the Get function again.
type Manager struct {
mlsas map[types.TopicID]map[types.LogStreamID]*managedLSA
mu *xsync.RBMutex

vcli varlog.Log
opts []varlog.LogStreamAppenderOption
}

// New returns a new Manager. Generally, users only require one manager.
func New(vcli varlog.Log, opts ...varlog.LogStreamAppenderOption) *Manager {
mgr := &Manager{
vcli: vcli,
opts: opts,
mlsas: make(map[types.TopicID]map[types.LogStreamID]*managedLSA),
mu: xsync.NewRBMutex(),
}
return mgr
}

// Get returns a LogStreamAppender specified by the arguments tpid and lsid. It
// returns an error if it cannot access the given log stream.
func (mgr *Manager) Get(tpid types.TopicID, lsid types.LogStreamID) (varlog.LogStreamAppender, error) {
t := mgr.mu.RLock()
if lsa := mgr.mlsas[tpid][lsid]; lsa != nil {
mgr.mu.RUnlock(t)
return lsa, nil
}
mgr.mu.RUnlock(t)

return mgr.getSlow(tpid, lsid)
}

func (mgr *Manager) getSlow(tpid types.TopicID, lsid types.LogStreamID) (varlog.LogStreamAppender, error) {
mgr.mu.Lock()
defer mgr.mu.Unlock()

appenders, ok := mgr.mlsas[tpid]
if !ok {
appenders = make(map[types.LogStreamID]*managedLSA)
mgr.mlsas[tpid] = appenders
}

mlsa, ok := appenders[lsid]
if !ok {
lsa, err := mgr.vcli.NewLogStreamAppender(tpid, lsid, mgr.opts...)
if err != nil {
return nil, err
}
mlsa = &managedLSA{
lsa: lsa,
tpid: tpid,
lsid: lsid,
mgr: mgr,
}
appenders[lsid] = mlsa
}
return mlsa, nil
}

// Any returns the LogStreamAppender that is filtered by the argument
// allowlist. If the allowlist is empty, all log streams in the topic can be
// chosen.
// It chooses the writable LogStreamAppender as possible.
func (mgr *Manager) Any(tpid types.TopicID, allowlist map[types.LogStreamID]struct{}) (varlog.LogStreamAppender, error) {
var shorter, longer map[types.LogStreamID]struct{}
candidates := mgr.vcli.AppendableLogStreams(tpid)
if len(allowlist) == 0 || len(candidates) < len(allowlist) {
shorter = candidates
longer = allowlist
} else {
shorter = allowlist
longer = candidates
}

for lsid := range shorter {
if len(longer) > 0 {
if _, ok := longer[lsid]; !ok {
continue
}
}

lsa, err := mgr.Get(tpid, lsid)
if err == nil {
return lsa, nil
}
}
return nil, errors.New("no appendable log stream")
}
28 changes: 28 additions & 0 deletions pkg/varlogtest/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,12 +297,40 @@ func (c *testLog) PeekLogStream(ctx context.Context, tpid types.TopicID, lsid ty

}

func (c *testLog) AppendableLogStreams(tpid types.TopicID) map[types.LogStreamID]struct{} {
if err := c.lock(); err != nil {
return nil
}
defer c.unlock()

topicDesc, ok := c.vt.topics[tpid]
if !ok {
return nil
}

ret := make(map[types.LogStreamID]struct{}, len(topicDesc.LogStreams))
for _, lsid := range topicDesc.LogStreams {
ret[lsid] = struct{}{}
}
return ret
}

// NewLogStreamAppender returns a new fake LogStreamAppender for testing. It
// ignores options; the pipeline size is five, and the default callback has no
// operation.
func (c *testLog) NewLogStreamAppender(tpid types.TopicID, lsid types.LogStreamID, _ ...varlog.LogStreamAppenderOption) (varlog.LogStreamAppender, error) {
const pipelineSize = 5

if err := c.lock(); err != nil {
return nil, err
}
defer c.unlock()

_, err := c.vt.logStreamDescriptor(tpid, lsid)
if err != nil {
return nil, err
}

lsa := &logStreamAppender{
c: c,
tpid: tpid,
Expand Down
65 changes: 56 additions & 9 deletions pkg/varlogtest/varlogtest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/kakao/varlog/pkg/types"
"github.com/kakao/varlog/pkg/util/container/set"
"github.com/kakao/varlog/pkg/varlog"
"github.com/kakao/varlog/pkg/varlog/x/mlsa"
"github.com/kakao/varlog/pkg/varlogtest"
"github.com/kakao/varlog/pkg/verrors"
"github.com/kakao/varlog/proto/varlogpb"
Expand All @@ -30,19 +31,26 @@ func TestVarlotTest_LogStreamAppender(t *testing.T) {

tcs := []struct {
name string
testf func(t *testing.T, lsa varlog.LogStreamAppender)
testf func(t *testing.T, vcli varlog.Log, tpid types.TopicID, lsid types.LogStreamID)
}{
{
name: "Closed",
testf: func(t *testing.T, lsa varlog.LogStreamAppender) {
testf: func(t *testing.T, vcli varlog.Log, tpid types.TopicID, lsid types.LogStreamID) {
lsa, err := vcli.NewLogStreamAppender(tpid, lsid)
require.NoError(t, err)

lsa.Close()
err := lsa.AppendBatch([][]byte{[]byte("foo")}, nil)
err = lsa.AppendBatch([][]byte{[]byte("foo")}, nil)
require.Equal(t, varlog.ErrClosed, err)
},
},
{
name: "AppendLogs",
testf: func(t *testing.T, lsa varlog.LogStreamAppender) {
testf: func(t *testing.T, vcli varlog.Log, tpid types.TopicID, lsid types.LogStreamID) {
lsa, err := vcli.NewLogStreamAppender(tpid, lsid)
require.NoError(t, err)
defer lsa.Close()

cb := func(_ []varlogpb.LogEntryMeta, err error) {
assert.NoError(t, err)
}
Expand All @@ -52,6 +60,49 @@ func TestVarlotTest_LogStreamAppender(t *testing.T) {
}
},
},
{
name: "Manager",
testf: func(t *testing.T, vcli varlog.Log, tpid types.TopicID, lsid types.LogStreamID) {
mgr := mlsa.New(vcli)

_, err := mgr.Get(tpid+1, lsid)
require.Error(t, err)

_, err = mgr.Get(tpid, lsid+1)
require.Error(t, err)

lsa1, err := mgr.Get(tpid, lsid)
require.NoError(t, err)
lsa2, err := mgr.Get(tpid, lsid)
require.NoError(t, err)

var wg sync.WaitGroup
wg.Add(2)
cb := func(_ []varlogpb.LogEntryMeta, err error) {
defer wg.Done()
assert.NoError(t, err)
}
err = lsa1.AppendBatch([][]byte{[]byte("foo")}, cb)
require.NoError(t, err)
err = lsa2.AppendBatch([][]byte{[]byte("foo")}, cb)
require.NoError(t, err)
wg.Wait()

lsa1.Close()
err = lsa2.AppendBatch([][]byte{[]byte("foo")}, nil)
require.Equal(t, varlog.ErrClosed, err)

lsa1, err = mgr.Get(tpid, lsid)
require.NoError(t, err)
wg.Add(1)
err = lsa1.AppendBatch([][]byte{[]byte("foo")}, cb)
require.NoError(t, err)
err = lsa2.AppendBatch([][]byte{[]byte("foo")}, nil)
require.Equal(t, varlog.ErrClosed, err)
wg.Wait()
lsa1.Close()
},
},
}

for _, tc := range tcs {
Expand Down Expand Up @@ -87,11 +138,7 @@ func TestVarlotTest_LogStreamAppender(t *testing.T) {
require.Equal(t, varlogpb.LogStreamStatusRunning, lsd.Status)
require.Len(t, lsd.Replicas, replicationFactor)

lsa, err := vlg.NewLogStreamAppender(td.TopicID, lsd.LogStreamID)
require.NoError(t, err)
defer lsa.Close()

tc.testf(t, lsa)
tc.testf(t, vlg, td.TopicID, lsd.LogStreamID)
})
}
}
Expand Down
Loading

0 comments on commit 617dd42

Please sign in to comment.