Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: stop/delete rule should wait operator close #3056

Merged
merged 4 commits into from
Jul 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 60 additions & 0 deletions internal/server/rest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"path/filepath"
"strings"
"testing"
"time"

"github.com/gorilla/mux"
"github.com/pingcap/failpoint"
Expand All @@ -33,6 +34,7 @@ import (
"github.com/stretchr/testify/suite"

"github.com/lf-edge/ekuiper/v2/internal/conf"
"github.com/lf-edge/ekuiper/v2/internal/io/http/httpserver"
"github.com/lf-edge/ekuiper/v2/internal/meta"
"github.com/lf-edge/ekuiper/v2/internal/pkg/def"
"github.com/lf-edge/ekuiper/v2/internal/pkg/store"
Expand Down Expand Up @@ -1014,3 +1016,61 @@ func (suite *RestTestSuite) TestSinkHiddenPassword() {
m := r.Actions[0]["mqtt"].(map[string]interface{})
require.Equal(suite.T(), "12345", m["password"])
}

func (suite *RestTestSuite) TestWaitStopRule() {
ip := "127.0.0.1"
port := 10085
httpserver.InitGlobalServerManager(ip, port, nil)
connection.InitConnectionManager4Test()

// delete create
req, _ := http.NewRequest(http.MethodDelete, "http://localhost:8080/streams/demo221", bytes.NewBufferString("any"))
w := httptest.NewRecorder()
suite.r.ServeHTTP(w, req)

// delete rule
req, _ = http.NewRequest(http.MethodDelete, "http://localhost:8080/rules/rule221", bytes.NewBufferString("any"))
w = httptest.NewRecorder()
suite.r.ServeHTTP(w, req)

// create stream
buf := bytes.NewBuffer([]byte(`{"sql":"CREATE stream demo221() WITH (DATASOURCE=\"/data1\", TYPE=\"websocket\")"}`))
req, _ = http.NewRequest(http.MethodPost, "http://localhost:8080/streams", buf)
w = httptest.NewRecorder()
suite.r.ServeHTTP(w, req)
require.Equal(suite.T(), http.StatusCreated, w.Code)

// create rule
ruleJson := `{"id": "rule221","sql": "select a,b from demo221","actions": [{"log": {}}]}`
buf = bytes.NewBuffer([]byte(ruleJson))
req, _ = http.NewRequest(http.MethodPost, "http://localhost:8080/rules", buf)
w = httptest.NewRecorder()
suite.r.ServeHTTP(w, req)
require.Equal(suite.T(), http.StatusCreated, w.Code)

failpoint.Enable("github.com/lf-edge/ekuiper/v2/internal/topo/node/mockTimeConsumingClose", "return(true)")
defer failpoint.Disable("github.com/lf-edge/ekuiper/v2/internal/topo/node/mockTimeConsumingClose")
now := time.Now()
// delete rule
req, _ = http.NewRequest(http.MethodDelete, "http://localhost:8080/rules/rule221", bytes.NewBufferString("any"))
w = httptest.NewRecorder()
suite.r.ServeHTTP(w, req)
end := time.Now()
require.True(suite.T(), end.Sub(now) >= 300*time.Millisecond)

// create rule
buf = bytes.NewBuffer([]byte(ruleJson))
req, _ = http.NewRequest(http.MethodPost, "http://localhost:8080/rules", buf)
w = httptest.NewRecorder()
suite.r.ServeHTTP(w, req)
require.Equal(suite.T(), http.StatusCreated, w.Code)

now = time.Now()
// stop rule
req, _ = http.NewRequest(http.MethodPost, "http://localhost:8080/rules/rule221/stop", bytes.NewBufferString("any"))
w = httptest.NewRecorder()
suite.r.ServeHTTP(w, req)
end = time.Now()
require.True(suite.T(), end.Sub(now) >= 300*time.Millisecond)
waitAllRuleStop()
}
39 changes: 2 additions & 37 deletions internal/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,9 +287,7 @@
wg.Add(2)
go func() {
conf.Log.Info("start to stop all rules")
if err := waitAllRuleStop(ctx); err != nil {
conf.Log.Warnf(err.Error())
}
waitAllRuleStop()

Check warning on line 290 in internal/server/server.go

View check run for this annotation

Codecov / codecov/patch

internal/server/server.go#L290

Added line #L290 was not covered by tests
wg.Done()
}()
go func() {
Expand Down Expand Up @@ -503,42 +501,9 @@
return nil
}

func waitAllRuleStop(ctx context.Context) error {
func waitAllRuleStop() {
rules, _ := ruleProcessor.GetAllRules()
for _, r := range rules {
stopRuleWhenServerStop(r)
}
wg := &sync.WaitGroup{}
m := &sync.Map{}
for _, r := range rules {
rs, ok := registry.Load(r)
if ok {
m.Store(r, struct{}{})
wg.Add(1)
go func() {
defer func() {
m.Delete(r)
wg.Done()
}()
rs.Topology.WaitClose()
}()
}
}
wgCh := make(chan struct{})
go func() {
wg.Wait()
wgCh <- struct{}{}
}()

select {
case <-ctx.Done():
timeoutRules := make([]string, 0)
m.Range(func(key, value any) bool {
timeoutRules = append(timeoutRules, key.(string))
return true
})
return fmt.Errorf("stop rules timeout, remain::%s", timeoutRules)
case <-wgCh:
return nil
}
}
11 changes: 0 additions & 11 deletions internal/topo/context/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,6 @@ type DefaultContext struct {
// cache
tpReg sync.Map
jpReg sync.Map

// ops waitGroup
opsWg *sync.WaitGroup
}

func RuleBackground(ruleName string) *DefaultContext {
Expand Down Expand Up @@ -87,11 +84,6 @@ func WithValue(parent *DefaultContext, key, val interface{}) *DefaultContext {
return parent
}

func WithWg(parent *DefaultContext, wg *sync.WaitGroup) *DefaultContext {
parent.opsWg = wg
return parent
}

// Deadline Implement context interface
func (c *DefaultContext) Deadline() (deadline time.Time, ok bool) {
return c.ctx.Deadline()
Expand Down Expand Up @@ -212,7 +204,6 @@ func (c *DefaultContext) WithMeta(ruleId string, opId string, store api.Store) a
state: s,
tpReg: sync.Map{},
jpReg: sync.Map{},
opsWg: c.opsWg,
}
}

Expand All @@ -223,7 +214,6 @@ func (c *DefaultContext) WithInstance(instanceId int) api.StreamContext {
opId: c.opId,
ctx: c.ctx,
state: c.state,
opsWg: c.opsWg,
}
}

Expand All @@ -235,7 +225,6 @@ func (c *DefaultContext) WithCancel() (api.StreamContext, context.CancelFunc) {
instanceId: c.instanceId,
ctx: ctx,
state: c.state,
opsWg: c.opsWg,
}, cancel
}

Expand Down
7 changes: 7 additions & 0 deletions internal/topo/node/operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@
package node

import (
"time"

"github.com/pingcap/failpoint"

"github.com/lf-edge/ekuiper/contract/v2/api"
"github.com/lf-edge/ekuiper/v2/internal/pkg/def"
"github.com/lf-edge/ekuiper/v2/internal/xsql"
Expand Down Expand Up @@ -61,6 +65,9 @@ func (o *UnaryOperator) Exec(ctx api.StreamContext, errCh chan<- error) {
}
go func() {
defer func() {
failpoint.Inject("mockTimeConsumingClose", func() {
time.Sleep(300 * time.Millisecond)
})
o.Close()
}()
err := infra.SafeRun(func() error {
Expand Down
18 changes: 8 additions & 10 deletions internal/topo/node/window_op.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,30 +150,28 @@ func (o *WindowOperator) Exec(ctx api.StreamContext, errCh chan<- error) {
}
}
log.Infof("Start with window state triggerTime: %d, msgCount: %d", o.triggerTime.UnixMilli(), o.msgCount)
if o.isEventTime {
go func() {
defer func() {
o.Close()
}()
go func() {
defer func() {
o.Close()
}()
if o.isEventTime {
err := infra.SafeRun(func() error {
o.execEventWindow(ctx, inputs, errCh)
return nil
})
if err != nil {
infra.DrainError(ctx, err, errCh)
}
}()
} else {
go func() {
} else {
err := infra.SafeRun(func() error {
o.execProcessingWindow(ctx, inputs, errCh)
return nil
})
if err != nil {
infra.DrainError(ctx, err, errCh)
}
}()
}
}
}()
}

func getAlignedWindowEndTime(n time.Time, interval int, timeUnit ast.Token) time.Time {
Expand Down
3 changes: 3 additions & 0 deletions internal/topo/rule/ruleState.go
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,8 @@ func (rs *RuleState) stop() error {
rs.triggered = 0
if rs.Topology != nil {
rs.Topology.Cancel()
// wait all operator close
rs.Topology.WaitClose()
// de-reference old Topology in order to release data memory
rs.Topology = rs.Topology.NewTopoWithSucceededCtx()
}
Expand Down Expand Up @@ -542,6 +544,7 @@ func (rs *RuleState) Close() (err error) {
}
if rs.triggered == 1 && rs.Topology != nil {
rs.Topology.Cancel()
rs.Topology.WaitClose()
}
rs.triggered = -1
rs.stopScheduleRule()
Expand Down
10 changes: 5 additions & 5 deletions internal/topo/topo.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ func (s *Topo) prepareContext() {
}
ctx := kctx.WithValue(kctx.RuleBackground(s.name), kctx.LoggerKey, contextLogger)
ctx = kctx.WithValue(ctx, kctx.RuleStartKey, timex.GetNowInMilli())
ctx = kctx.WithWg(ctx, s.opsWg)
ctx = kctx.WithValue(ctx, kctx.RuleWaitGroupKey, s.opsWg)
s.ctx, s.cancel = ctx.WithCancel()
}
}
Expand All @@ -257,19 +257,19 @@ func (s *Topo) Open() <-chan error {
if err := s.enableCheckpoint(s.ctx); err != nil {
return err
}
topoStore := s.store
// open stream sink, after log sink is ready.
for _, snk := range s.sinks {
snk.Exec(s.ctx.WithMeta(s.name, snk.GetName(), s.store), s.drain)
snk.Exec(s.ctx.WithMeta(s.name, snk.GetName(), topoStore), s.drain)
}

for _, op := range s.ops {
op.Exec(s.ctx.WithMeta(s.name, op.GetName(), s.store), s.drain)
op.Exec(s.ctx.WithMeta(s.name, op.GetName(), topoStore), s.drain)
}

for _, source := range s.sources {
source.Open(s.ctx.WithMeta(s.name, source.GetName(), s.store), s.drain)
source.Open(s.ctx.WithMeta(s.name, source.GetName(), topoStore), s.drain)
}

// activate checkpoint
if s.coordinator != nil {
return s.coordinator.Activate()
Expand Down
34 changes: 17 additions & 17 deletions internal/trial/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
// Run two test rules in parallel. Rerun one of the rules
func TestTrialRule(t *testing.T) {
ip := "127.0.0.1"
port := 10087
port := 10089
httpserver.InitGlobalServerManager(ip, port, nil)
connection.InitConnectionManager4Test()
conf.IsTesting = true
Expand All @@ -43,16 +43,16 @@ func TestTrialRule(t *testing.T) {
require.NoError(t, err)
require.NoError(t, store.SetupDefault(dataDir))
p := processor.NewStreamProcessor()
p.ExecStmt("DROP STREAM demo")
p.ExecStmt("DROP STREAM demo876")
// Test 1 wrong rule
mockDef1 := `{"id":"rule1","sql":"select * from demo","mockSource":{"demo":{"data":[{"name":"demo","value":1}],"interval":100,"loop":true}},"sinkProps":{"sendSingle":true}}`
mockDef1 := `{"id":"rule876","sql":"select * from demo876","mockSource":{"demo876":{"data":[{"name":"demo876","value":1}],"interval":100,"loop":true}},"sinkProps":{"sendSingle":true}}`
_, err = TrialManager.CreateRule(mockDef1)
require.Error(t, err)
require.Equal(t, "fail to run rule rule1: fail to get stream demo, please check if stream is created", err.Error())
require.Equal(t, "fail to run rule rule876: fail to get stream demo876, please check if stream is created", err.Error())

_, err = p.ExecStmt("CREATE STREAM demo () WITH (DATASOURCE=\"demo\", TYPE=\"simulator\", FORMAT=\"json\", KEY=\"ts\")")
_, err = p.ExecStmt("CREATE STREAM demo876 () WITH (DATASOURCE=\"demo876\", TYPE=\"simulator\", FORMAT=\"json\", KEY=\"ts\")")
require.NoError(t, err)
defer p.ExecStmt("DROP STREAM demo")
defer p.ExecStmt("DROP STREAM demo876")

// Test 2 valid rule with mock
testValidTrial(t, mockDef1)
Expand All @@ -69,9 +69,9 @@ func testValidTrial(t *testing.T, mockDef1 string) {
// Test 2 valid rule with mock
id, err := TrialManager.CreateRule(mockDef1)
require.NoError(t, err)
require.Equal(t, "rule1", id)
require.Equal(t, "rule876", id)
// Read from ws
u := url.URL{Scheme: "ws", Host: "localhost:10087", Path: "/test/rule1"}
u := url.URL{Scheme: "ws", Host: "localhost:10089", Path: "/test/rule876"}
c1, _, err := websocket.DefaultDialer.Dial(u.String(), nil)
assert.NoError(t, err)
recvCh := make(chan []byte, 10)
Expand All @@ -93,21 +93,21 @@ func testValidTrial(t *testing.T, mockDef1 string) {
}
}()
time.Sleep(100 * time.Millisecond)
require.NoError(t, TrialManager.StartRule("rule1"))
require.Equal(t, []byte(`{"name":"demo","value":1}`), <-recvCh)
require.NoError(t, TrialManager.StartRule("rule876"))
require.Equal(t, []byte(`{"name":"demo876","value":1}`), <-recvCh)
c1.Close()
TrialManager.StopRule("rule1")
TrialManager.StopRule("rule876")
closeCh <- struct{}{}
}

func testRuntimeErrorTrial(t *testing.T) {
// Test 3 Runtime error rule
mockDefErr := `{"id":"ruleErr","sql":"select name + value from demo","mockSource":{"demo":{"data":[{"name":"demo","value":1}],"interval":100,"loop":true}},"sinkProps":{"sendSingle":true}}`
mockDefErr := `{"id":"ruleErr","sql":"select name + value from demo876","mockSource":{"demo876":{"data":[{"name":"demo876","value":1}],"interval":100,"loop":true}},"sinkProps":{"sendSingle":true}}`
id, err := TrialManager.CreateRule(mockDefErr)
require.NoError(t, err)
require.Equal(t, "ruleErr", id)
// Read from ws
u := url.URL{Scheme: "ws", Host: "localhost:10087", Path: "/test/ruleErr"}
u := url.URL{Scheme: "ws", Host: "localhost:10089", Path: "/test/ruleErr"}
c2, _, err := websocket.DefaultDialer.Dial(u.String(), nil)
require.NoError(t, err)
recvCh := make(chan []byte, 10)
Expand All @@ -130,19 +130,19 @@ func testRuntimeErrorTrial(t *testing.T) {
}()
time.Sleep(10 * time.Millisecond)
require.NoError(t, TrialManager.StartRule(id))
require.Equal(t, `run Select error: expr: binaryExpr:{ demo.name + demo.value } meet error, err:invalid operation string(demo) + float64(1)`, string(<-recvCh))
require.Equal(t, `run Select error: expr: binaryExpr:{ demo876.name + demo876.value } meet error, err:invalid operation string(demo876) + float64(1)`, string(<-recvCh))
TrialManager.StopRule(id)
closeCh <- struct{}{}
c2.Close()
}

func testRealSourceTrial(t *testing.T) {
noMockDef := `{"id":"rule2","sql":"select * from demo","sinkProps":{"sendSingle":true}}`
noMockDef := `{"id":"rule8765","sql":"select * from demo876","sinkProps":{"sendSingle":true}}`
id, err := TrialManager.CreateRule(noMockDef)
assert.Equal(t, "rule2", id)
assert.Equal(t, "rule8765", id)
assert.NoError(t, err)

u := url.URL{Scheme: "ws", Host: "localhost:10087", Path: "/test/rule2"}
u := url.URL{Scheme: "ws", Host: "localhost:10089", Path: "/test/rule8765"}
c3, _, err := websocket.DefaultDialer.Dial(u.String(), nil)
require.NoError(t, err)
recvCh := make(chan []byte, 10)
Expand Down
Loading