From 88fd5da26daaa8b607b01cb013f2d77a39818e40 Mon Sep 17 00:00:00 2001 From: ngjaying Date: Tue, 16 Jul 2024 08:54:39 +0800 Subject: [PATCH] fix(topo): detach subtopo from closed rule (#3010) Signed-off-by: Jiyong Huang --- internal/topo/node/contract.go | 3 ++- internal/topo/node/node.go | 28 +++++++++++++++++++------ internal/topo/node/node_test.go | 36 +++++++++++++++++++++++++++++++++ internal/topo/subtopo.go | 7 ++++++- internal/topo/subtopo_test.go | 22 +++++++++++++++----- internal/topo/topo.go | 11 ++++++++-- 6 files changed, 92 insertions(+), 15 deletions(-) create mode 100644 internal/topo/node/node_test.go diff --git a/internal/topo/node/contract.go b/internal/topo/node/contract.go index 2a10ad12d6..007a0ff21a 100644 --- a/internal/topo/node/contract.go +++ b/internal/topo/node/contract.go @@ -23,6 +23,7 @@ import ( type Emitter interface { AddOutput(chan<- interface{}, string) error + RemoveOutput(string) error } type Collector interface { @@ -83,7 +84,7 @@ type MergeableTopo interface { // SubMetrics return the metrics of the sub nodes SubMetrics() ([]string, []any) // Close notifies subtopo to deref - Close(ctx api.StreamContext, ruleId string) + Close(ctx api.StreamContext, ruleId string, runId int) } type SchemaNode interface { diff --git a/internal/topo/node/node.go b/internal/topo/node/node.go index eb8945e42e..8ebb7f5fb7 100644 --- a/internal/topo/node/node.go +++ b/internal/topo/node/node.go @@ -17,6 +17,7 @@ package node import ( "errors" "fmt" + "strings" "sync" "github.com/lf-edge/ekuiper/contract/v2/api" @@ -56,13 +57,28 @@ func newDefaultNode(name string, options *def.RuleOption) *defaultNode { } } -func (o *defaultNode) AddOutput(output chan<- interface{}, name string) error { +func (o *defaultNode) AddOutput(output chan<- any, name string) error { o.outputMu.Lock() defer o.outputMu.Unlock() o.outputs[name] = output return nil } +func (o *defaultNode) RemoveOutput(name string) error { + o.outputMu.Lock() + defer o.outputMu.Unlock() + namePre := name + "_" + for n := range o.outputs { + if strings.HasPrefix(n, namePre) { + delete(o.outputs, n) + if o.ctx != nil { + o.ctx.GetLogger().Infof("Remove output %s from %s", n, o.name) + } + } + } + return nil +} + func (o *defaultNode) GetName() string { return o.name } @@ -84,11 +100,11 @@ func (o *defaultNode) RemoveMetrics(ruleId string) { } } -func (o *defaultNode) Broadcast(val interface{}) { +func (o *defaultNode) Broadcast(val any) { o.BroadcastCustomized(val, o.doBroadcast) } -func (o *defaultNode) BroadcastCustomized(val interface{}, broadcastFunc func(val any)) { +func (o *defaultNode) BroadcastCustomized(val any, broadcastFunc func(val any)) { if _, ok := val.(error); ok && !o.sendError { return } @@ -104,7 +120,7 @@ func (o *defaultNode) BroadcastCustomized(val interface{}, broadcastFunc func(va return } -func (o *defaultNode) doBroadcast(val interface{}) { +func (o *defaultNode) doBroadcast(val any) { o.outputMu.RLock() defer o.outputMu.RUnlock() l := len(o.outputs) @@ -151,7 +167,7 @@ func newDefaultSinkNode(name string, options *def.RuleOption) *defaultSinkNode { } } -func (o *defaultSinkNode) GetInput() (chan<- interface{}, string) { +func (o *defaultSinkNode) GetInput() (chan<- any, string) { return o.input, o.name } @@ -236,7 +252,7 @@ func (o *defaultSinkNode) handleEof(ctx api.StreamContext, d xsql.EOFTuple) { } } -func SourcePing(sourceType string, config map[string]interface{}) error { +func SourcePing(sourceType string, config map[string]any) error { source, err := io.Source(sourceType) if err != nil { return err diff --git a/internal/topo/node/node_test.go b/internal/topo/node/node_test.go new file mode 100644 index 0000000000..9cd73f9228 --- /dev/null +++ b/internal/topo/node/node_test.go @@ -0,0 +1,36 @@ +// Copyright 2024 EMQ Technologies Co., Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package node + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/lf-edge/ekuiper/v2/internal/pkg/def" +) + +func TestOutputs(t *testing.T) { + n := newDefaultNode("test", &def.RuleOption{}) + err := n.AddOutput(make(chan<- any), "rule.1_test") + assert.NoError(t, err) + err = n.AddOutput(make(chan<- any), "rule.2_test") + assert.NoError(t, err) + err = n.RemoveOutput("rule.1") + assert.NoError(t, err) + err = n.RemoveOutput("rule.4") + assert.NoError(t, err) + assert.Equal(t, 1, len(n.outputs)) +} diff --git a/internal/topo/subtopo.go b/internal/topo/subtopo.go index 8ce506f2e9..f8ab13e0f5 100644 --- a/internal/topo/subtopo.go +++ b/internal/topo/subtopo.go @@ -65,6 +65,10 @@ func (s *SrcSubTopo) AddOutput(output chan<- interface{}, name string) error { return s.tail.AddOutput(output, name) } +func (s *SrcSubTopo) RemoveOutput(name string) error { + return s.tail.RemoveOutput(name) +} + func (s *SrcSubTopo) Open(ctx api.StreamContext, parentErrCh chan<- error) { // Update the ref count if _, loaded := s.refRules.LoadOrStore(ctx.GetRuleId(), parentErrCh); !loaded { @@ -171,7 +175,7 @@ func (s *SrcSubTopo) StoreSchema(ruleID, dataSource string, schema map[string]*a } } -func (s *SrcSubTopo) Close(ctx api.StreamContext, ruleId string) { +func (s *SrcSubTopo) Close(ctx api.StreamContext, ruleId string, runId int) { if _, ok := s.refRules.LoadAndDelete(ruleId); ok { s.refCount.Add(-1) if s.refCount.Load() == 0 { @@ -186,6 +190,7 @@ func (s *SrcSubTopo) Close(ctx api.StreamContext, ruleId string) { } } } + _ = s.RemoveOutput(fmt.Sprintf("%s.%d", ruleId, runId)) } // RemoveMetrics is called when the rule is deleted diff --git a/internal/topo/subtopo_test.go b/internal/topo/subtopo_test.go index ec31f20619..9f9030a9e3 100644 --- a/internal/topo/subtopo_test.go +++ b/internal/topo/subtopo_test.go @@ -108,10 +108,10 @@ func TestSubtopoLC(t *testing.T) { assert.Equal(t, []checkpoint.StreamTask{srcNode}, sources) assert.Equal(t, []checkpoint.NonSourceTask{opNode}, ops) // Stop - subTopo.Close(ctx1, "rule1") + subTopo.Close(ctx1, "rule1", 1) assert.Equal(t, int32(1), subTopo.refCount.Load()) assert.Equal(t, 1, mlen(&subTopoPool)) - subTopo2.Close(ctx2, "rule2") + subTopo2.Close(ctx2, "rule2", 2) assert.Equal(t, int32(0), subTopo.refCount.Load()) assert.Equal(t, 0, mlen(&subTopoPool)) assert.Equal(t, 2, len(subTopo.schemaReg)) @@ -138,7 +138,7 @@ func TestSubtopoRunError(t *testing.T) { subTopo.Open(ctx1, make(chan error)) assert.Equal(t, int32(1), subTopo.refCount.Load()) assert.Equal(t, true, subTopo.opened.Load()) - subTopo.Close(ctx1, "rule1") + subTopo.Close(ctx1, "rule1", 1) assert.Equal(t, int32(0), subTopo.refCount.Load()) assert.Equal(t, 0, mlen(&subTopoPool)) time.Sleep(10 * time.Millisecond) @@ -156,14 +156,14 @@ func TestSubtopoRunError(t *testing.T) { select { case err := <-errCh1: assert.Equal(t, assert.AnError, err) - subTopo.Close(ctx1, "rule1") + subTopo.Close(ctx1, "rule1", 1) case <-time.After(1 * time.Second): assert.Fail(t, "Should receive error") } select { case err := <-errCh2: assert.Equal(t, assert.AnError, err) - subTopo2.Close(ctx2, "rule2") + subTopo2.Close(ctx2, "rule2", 2) case <-time.After(1 * time.Second): assert.Fail(t, "Should receive error") } @@ -239,6 +239,11 @@ func (m *mockSrc) AddOutput(c chan<- interface{}, s string) error { return nil } +func (m *mockSrc) RemoveOutput(s string) error { + m.outputs = m.outputs[1:] + return nil +} + func (m *mockSrc) Open(ctx api.StreamContext, errCh chan<- error) { if m.runCount%3 != 0 { fmt.Printf("sent error for %d \n", m.runCount) @@ -273,6 +278,13 @@ type mockOp struct { schemaCount int } +func (m *mockOp) RemoveOutput(s string) error { + if len(m.outputs) > 0 { + m.outputs = m.outputs[1:] + } + return nil +} + func (m *mockOp) AddOutput(c chan<- interface{}, s string) error { m.outputs = append(m.outputs, c) return nil diff --git a/internal/topo/topo.go b/internal/topo/topo.go index 019c7fd067..c498e3e09b 100644 --- a/internal/topo/topo.go +++ b/internal/topo/topo.go @@ -39,6 +39,10 @@ import ( "github.com/lf-edge/ekuiper/v2/pkg/timex" ) +var uid atomic.Uint32 + +// Topo is the runtime DAG for a rule +// It only run once. If the rule restarts, another topo is created. type Topo struct { streams []string sources []node.DataSourceNode @@ -48,6 +52,7 @@ type Topo struct { drain chan error ops []node.OperatorNode name string + runId int options *def.RuleOption store api.Store coordinator *checkpoint.Coordinator @@ -59,8 +64,10 @@ type Topo struct { } func NewWithNameAndOptions(name string, options *def.RuleOption) (*Topo, error) { + id := uid.Add(1) tp := &Topo{ name: name, + runId: int(id), options: options, topo: &def.PrintableTopo{ Sources: make([]string, 0), @@ -115,7 +122,7 @@ func (s *Topo) Cancel() { s.coordinator = nil for _, src := range s.sources { if rt, ok := src.(node.MergeableTopo); ok { - rt.Close(s.ctx, s.name) + rt.Close(s.ctx, s.name, s.runId) } } } @@ -158,7 +165,7 @@ func (s *Topo) AddOperator(inputs []node.Emitter, operator node.OperatorNode) *T ch, opName := operator.GetInput() for _, input := range inputs { // add rule id to make operator name unique - _ = input.AddOutput(ch, fmt.Sprintf("%s_%s", s.name, opName)) + _ = input.AddOutput(ch, fmt.Sprintf("%s.%d_%s", s.name, s.runId, opName)) operator.AddInputCount() switch rt := input.(type) { case node.MergeableTopo: