Skip to content

Commit

Permalink
feat(client): add Clear to the log stream appender manager
Browse files Browse the repository at this point in the history
This change adds a new method, Clear to the log stream appender manager. Clear closes all the log
stream appenders registered to the Manager and removes them. It does not destroy the Manager.
Therefore, users can continue using it after calling Clear. However, users have to call Clear after
completing the use of the Manager to release associated resources.
  • Loading branch information
ijsong committed Jul 31, 2023
1 parent 5a7a3b0 commit 9a89065
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 0 deletions.
37 changes: 37 additions & 0 deletions pkg/varlog/x/mlsa/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ func (m *managedLSA) Close() {
appenders, ok := m.mgr.mlsas[m.tpid]
if ok && appenders[m.lsid] == m {
delete(appenders, m.lsid)
m.mgr.count--
}
m.mgr.mu.Unlock()

Expand All @@ -41,6 +42,7 @@ func (m *managedLSA) Close() {
// by calling the Get function again.
type Manager struct {
mlsas map[types.TopicID]map[types.LogStreamID]*managedLSA
count int
mu *xsync.RBMutex

vcli varlog.Log
Expand Down Expand Up @@ -94,6 +96,7 @@ func (mgr *Manager) getSlow(tpid types.TopicID, lsid types.LogStreamID) (varlog.
mgr: mgr,
}
appenders[lsid] = mlsa
mgr.count++
}
return mlsa, nil
}
Expand Down Expand Up @@ -127,3 +130,37 @@ func (mgr *Manager) Any(tpid types.TopicID, allowlist map[types.LogStreamID]stru
}
return nil, errors.New("no appendable log stream")
}

// Clear closes all the managed LogStreamAppender, and clears them. Clients can
// continue to use this Manager after calling Clear.
//
// After using the Manager, clients should call Clear to release any associated
// resources.
func (mgr *Manager) Clear() {
lsas := mgr.clear()
for _, lsa := range lsas {
lsa.Close()
}
}

func (mgr *Manager) clear() []varlog.LogStreamAppender {
mgr.mu.Lock()
defer func() {
mgr.count = 0
mgr.mu.Unlock()
}()

if len(mgr.mlsas) == 0 {
return nil
}

lsas := make([]varlog.LogStreamAppender, 0, mgr.count)
for tpid, appenders := range mgr.mlsas {
for lsid, mlsa := range appenders {
lsas = append(lsas, mlsa.lsa)
delete(appenders, lsid)
}
delete(mgr.mlsas, tpid)
}
return lsas
}
38 changes: 38 additions & 0 deletions tests/it/cluster/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1090,6 +1090,44 @@ func TestLogStreamAppender(t *testing.T) {
lsa.Close()
},
},
{
name: "Manager_Clear",
testf: func(t *testing.T, tpid types.TopicID, lsid types.LogStreamID, vcli varlog.Log) {
mgr := mlsa.New(vcli)

lsa, err := mgr.Get(tpid, lsid)
require.NoError(t, err)

var wg sync.WaitGroup
dataBatch := [][]byte{[]byte("foo")}
wg.Add(1)
err = lsa.AppendBatch(dataBatch, func(_ []varlogpb.LogEntryMeta, err error) {
defer wg.Done()
assert.NoError(t, err)
})
require.NoError(t, err)
wg.Wait()

mgr.Clear()
err = lsa.AppendBatch(dataBatch, func([]varlogpb.LogEntryMeta, error) {
assert.Fail(t, "unexpected callback")
})
require.Error(t, err)

lsa, err = mgr.Get(tpid, lsid)
require.NoError(t, err)

wg.Add(1)
err = lsa.AppendBatch(dataBatch, func(_ []varlogpb.LogEntryMeta, err error) {
defer wg.Done()
assert.NoError(t, err)
})
require.NoError(t, err)
wg.Wait()

mgr.Clear()
},
},
}

for _, tc := range tcs {
Expand Down

0 comments on commit 9a89065

Please sign in to comment.