Skip to content

Commit

Permalink
fix(cache): fix sync cache context (#3451)
Browse files Browse the repository at this point in the history
Signed-off-by: Song Gao <disxiaofei@163.com>
  • Loading branch information
Yisaer authored Dec 18, 2024
1 parent b8f0763 commit 0bc2ae8
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 3 deletions.
7 changes: 5 additions & 2 deletions internal/topo/node/cache/sync_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,11 @@ func NewSyncCache(ctx api.StreamContext, cacheConf *conf.SinkConf) (*SyncCache,
writeBufferPage: newPage(cacheConf.BufferPageSize),
readBufferPage: newPage(cacheConf.BufferPageSize),
}
err := c.initStore(ctx)
return c, err
return c, nil
}

func (c *SyncCache) InitStore(ctx api.StreamContext) error {
return c.initStore(ctx)
}

func (c *SyncCache) SetupMeta(ctx api.StreamContext) {
Expand Down
6 changes: 6 additions & 0 deletions internal/topo/node/cache/sync_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/lf-edge/ekuiper/v2/internal/conf"
"github.com/lf-edge/ekuiper/v2/internal/pkg/def"
Expand Down Expand Up @@ -130,6 +131,7 @@ func TestCache(t *testing.T) {
CleanCacheAtStop: false,
})
assert.NoError(t, err)
require.NoError(t, s.InitStore(ctx))
// prepare data
tuples := make([]any, 15)
for i := 0; i < 15; i++ {
Expand Down Expand Up @@ -234,6 +236,7 @@ func TestCacheCase2(t *testing.T) {
ResendInterval: cast.DurationConf(10 * time.Millisecond),
})
assert.NoError(t, err)
require.NoError(t, s.InitStore(ctx))
// prepare data
tuples := make([]any, 15)
for i := 0; i < 15; i++ {
Expand Down Expand Up @@ -337,6 +340,7 @@ func TestCacheInit(t *testing.T) {
CleanCacheAtStop: false,
})
assert.NoError(t, err)
require.NoError(t, s.InitStore(ctx))
// prepare data
tuples := make([]any, 10)
for i := 0; i < 10; i++ {
Expand All @@ -361,6 +365,7 @@ func TestCacheInit(t *testing.T) {
CleanCacheAtStop: false,
})
assert.NoError(t, err)
require.NoError(t, s.InitStore(ctx))
r, _ := s.PopCache(ctx)
assert.Equal(t, 3, s.CacheLength, "cache length after pop")
assert.Equal(t, &xsql.RawTuple{
Expand All @@ -380,6 +385,7 @@ func TestCacheInit(t *testing.T) {
CleanCacheAtStop: false,
})
assert.NoError(t, err)
require.NoError(t, s.InitStore(ctx))
r, _ = s.PopCache(ctx)
assert.Equal(t, 2, s.CacheLength, "cache length after pop")
assert.Equal(t, &xsql.RawTuple{
Expand Down
6 changes: 5 additions & 1 deletion internal/topo/node/cache_op.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ func (s *CacheOp) Exec(ctx api.StreamContext, errCh chan<- error) {
infra.DrainError(ctx, fmt.Errorf("cache op should have only 1 output but got %+v", s.outputs), errCh)
}
s.cache.SetupMeta(ctx)
if err := s.cache.InitStore(ctx); err != nil {
infra.DrainError(ctx, fmt.Errorf("cache op init store error:%v", err), errCh)
return
}
s.prepareExec(ctx, errCh, "op")
go func() {
err := infra.SafeRun(func() error {
Expand Down Expand Up @@ -105,7 +109,7 @@ func (s *CacheOp) Exec(ctx api.StreamContext, errCh chan<- error) {
s.send()
s.span = nil
s.onProcessEnd(ctx)
l := int64(len(s.input) + s.cache.CacheLength)
l := int64(len(s.input)) + int64(s.cache.CacheLength)
if s.currItem != nil {
l += 1
}
Expand Down

0 comments on commit 0bc2ae8

Please sign in to comment.