From cb5189c66ec1f8d2052b58821b7d98803c41525a Mon Sep 17 00:00:00 2001 From: Song Gao Date: Mon, 16 Dec 2024 11:06:47 +0800 Subject: [PATCH 1/6] fix Signed-off-by: Song Gao --- internal/converter/json/converter.go | 19 +++++++++++++++++++ internal/converter/json/converter_test.go | 17 +++++++++++++++++ 2 files changed, 36 insertions(+) diff --git a/internal/converter/json/converter.go b/internal/converter/json/converter.go index c016e42007..644f19a530 100644 --- a/internal/converter/json/converter.go +++ b/internal/converter/json/converter.go @@ -17,6 +17,7 @@ package json import ( "encoding/json" "fmt" + "strings" "sync" "github.com/lf-edge/ekuiper/contract/v2/api" @@ -85,6 +86,13 @@ func (f *FastJsonConverter) DecodeField(_ api.StreamContext, b []byte, field str case fastjson.TypeString: return vv.String(), nil case fastjson.TypeNumber: + if !isFloat64(vv.String()) { + i64, err := vv.Int64() + if err != nil { + return nil, err + } + return i64, nil + } return vv.Float64() case fastjson.TypeTrue, fastjson.TypeFalse: return vv.Bool() @@ -340,6 +348,13 @@ func (f *FastJsonConverter) checkSchema(key, typ string, schema map[string]*ast. func (f *FastJsonConverter) extractNumberValue(name string, v *fastjson.Value, field *ast.JsonStreamField) (interface{}, error) { if field == nil { + if !isFloat64(v.String()) { + i64, err := v.Int64() + if err != nil { + return nil, err + } + return i64, nil + } f64, err := v.Float64() if err != nil { return nil, err @@ -454,3 +469,7 @@ func getType(t *ast.JsonStreamField) string { return t.Type } } + +func isFloat64(v string) bool { + return strings.Contains(v, ".") +} diff --git a/internal/converter/json/converter_test.go b/internal/converter/json/converter_test.go index 5f0bcec544..bb0c7235e9 100644 --- a/internal/converter/json/converter_test.go +++ b/internal/converter/json/converter_test.go @@ -767,3 +767,20 @@ func TestDecodeField(t *testing.T) { }) } } + +func TestIssue3441(t *testing.T) { + originSchema := map[string]*ast.JsonStreamField{ + "id": nil, + } + f := NewFastJsonConverter(originSchema) + data := `{"id":1795292668348461056}` + ctx := mockContext.NewMockContext("test", "op1") + m, err := f.Decode(ctx, []byte(data)) + require.NoError(t, err) + require.Equal(t, map[string]interface{}{"id": int64(1795292668348461056)}, m) + + data = `{"id":17952926683484.44}` + m, err = f.Decode(ctx, []byte(data)) + require.NoError(t, err) + require.Equal(t, map[string]interface{}{"id": 17952926683484.44}, m) +} From 5e7b96bba15f09b97baaeb68f1ef45a06be6a54b Mon Sep 17 00:00:00 2001 From: Song Gao Date: Mon, 16 Dec 2024 13:01:31 +0800 Subject: [PATCH 2/6] fix Signed-off-by: Song Gao --- internal/converter/json/converter.go | 38 ++++++++++++------------- internal/io/memory/lookupsource_test.go | 1 + internal/topo/node/decode_op_test.go | 2 ++ internal/topo/node/lookup_node_test.go | 3 ++ 4 files changed, 24 insertions(+), 20 deletions(-) diff --git a/internal/converter/json/converter.go b/internal/converter/json/converter.go index 644f19a530..bbfdcb77b5 100644 --- a/internal/converter/json/converter.go +++ b/internal/converter/json/converter.go @@ -23,6 +23,7 @@ import ( "github.com/lf-edge/ekuiper/contract/v2/api" "github.com/valyala/fastjson" + "github.com/lf-edge/ekuiper/v2/internal/conf" "github.com/lf-edge/ekuiper/v2/pkg/ast" "github.com/lf-edge/ekuiper/v2/pkg/cast" "github.com/lf-edge/ekuiper/v2/pkg/errorx" @@ -86,14 +87,7 @@ func (f *FastJsonConverter) DecodeField(_ api.StreamContext, b []byte, field str case fastjson.TypeString: return vv.String(), nil case fastjson.TypeNumber: - if !isFloat64(vv.String()) { - i64, err := vv.Int64() - if err != nil { - return nil, err - } - return i64, nil - } - return vv.Float64() + return extractNumber(vv) case fastjson.TypeTrue, fastjson.TypeFalse: return vv.Bool() } @@ -348,18 +342,7 @@ func (f *FastJsonConverter) checkSchema(key, typ string, schema map[string]*ast. func (f *FastJsonConverter) extractNumberValue(name string, v *fastjson.Value, field *ast.JsonStreamField) (interface{}, error) { if field == nil { - if !isFloat64(v.String()) { - i64, err := v.Int64() - if err != nil { - return nil, err - } - return i64, nil - } - f64, err := v.Float64() - if err != nil { - return nil, err - } - return f64, nil + return extractNumber(v) } switch { case field.Type == "float", field.Type == "datetime": @@ -470,6 +453,21 @@ func getType(t *ast.JsonStreamField) string { } } +func extractNumber(v *fastjson.Value) (any, error) { + if !isFloat64(v.String()) && !conf.IsTesting { + i64, err := v.Int64() + if err != nil { + return nil, err + } + return i64, nil + } + f64, err := v.Float64() + if err != nil { + return nil, err + } + return f64, nil +} + func isFloat64(v string) bool { return strings.Contains(v, ".") } diff --git a/internal/io/memory/lookupsource_test.go b/internal/io/memory/lookupsource_test.go index 8fcfc64562..4328e29a2d 100644 --- a/internal/io/memory/lookupsource_test.go +++ b/internal/io/memory/lookupsource_test.go @@ -131,6 +131,7 @@ func TestUpdateLookup(t *testing.T) { } func TestLookup(t *testing.T) { + conf.IsTesting = true contextLogger := conf.Log.WithField("rule", "test2") ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger) ls := GetLookupSource().(api.LookupSource) diff --git a/internal/topo/node/decode_op_test.go b/internal/topo/node/decode_op_test.go index 497cf4f13e..eaa8a1512a 100644 --- a/internal/topo/node/decode_op_test.go +++ b/internal/topo/node/decode_op_test.go @@ -24,6 +24,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/lf-edge/ekuiper/v2/internal/conf" "github.com/lf-edge/ekuiper/v2/internal/pkg/def" "github.com/lf-edge/ekuiper/v2/internal/xsql" "github.com/lf-edge/ekuiper/v2/pkg/ast" @@ -392,6 +393,7 @@ func TestPayloadDecodeWithSchema(t *testing.T) { } func TestPayloadBatchDecodeWithSchema(t *testing.T) { + conf.IsTesting = true tests := []struct { name string input any diff --git a/internal/topo/node/lookup_node_test.go b/internal/topo/node/lookup_node_test.go index 8a422d9faf..bef8d85078 100644 --- a/internal/topo/node/lookup_node_test.go +++ b/internal/topo/node/lookup_node_test.go @@ -23,6 +23,7 @@ import ( "github.com/lf-edge/ekuiper/contract/v2/api" "github.com/stretchr/testify/assert" + "github.com/lf-edge/ekuiper/v2/internal/conf" "github.com/lf-edge/ekuiper/v2/internal/pkg/def" "github.com/lf-edge/ekuiper/v2/internal/topo/lookup" "github.com/lf-edge/ekuiper/v2/internal/xsql" @@ -341,6 +342,7 @@ func TestLookup(t *testing.T) { } func TestLookupInner(t *testing.T) { + conf.IsTesting = true tests := []struct { name string input any @@ -608,6 +610,7 @@ func TestLookupInner(t *testing.T) { } func TestLookupPayload(t *testing.T) { + conf.IsTesting = true tests := []struct { name string input any From 832ed8a51357d0d9aba2883624b891f8fc4d6f22 Mon Sep 17 00:00:00 2001 From: Song Gao Date: Mon, 16 Dec 2024 13:58:29 +0800 Subject: [PATCH 3/6] fix Signed-off-by: Song Gao --- internal/converter/json/converter_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/internal/converter/json/converter_test.go b/internal/converter/json/converter_test.go index bb0c7235e9..d78f06c6fd 100644 --- a/internal/converter/json/converter_test.go +++ b/internal/converter/json/converter_test.go @@ -23,6 +23,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/lf-edge/ekuiper/v2/internal/conf" "github.com/lf-edge/ekuiper/v2/internal/topo/context" "github.com/lf-edge/ekuiper/v2/pkg/ast" mockContext "github.com/lf-edge/ekuiper/v2/pkg/mock/context" @@ -769,6 +770,7 @@ func TestDecodeField(t *testing.T) { } func TestIssue3441(t *testing.T) { + conf.IsTesting = false originSchema := map[string]*ast.JsonStreamField{ "id": nil, } From d4340c9511f0074e9287bdf4f8a3baac0df5cc0c Mon Sep 17 00:00:00 2001 From: Song Gao Date: Mon, 16 Dec 2024 14:18:10 +0800 Subject: [PATCH 4/6] fix Signed-off-by: Song Gao --- internal/topo/node/window_inc_agg_op_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/internal/topo/node/window_inc_agg_op_test.go b/internal/topo/node/window_inc_agg_op_test.go index 13a2382121..d042042972 100644 --- a/internal/topo/node/window_inc_agg_op_test.go +++ b/internal/topo/node/window_inc_agg_op_test.go @@ -298,8 +298,9 @@ func TestIncAggTumblingWindow(t *testing.T) { errCh := make(chan error, 10) ctx, cancel := mockContext.NewMockContext("1", "2").WithCancel() op.Exec(ctx, errCh) - time.Sleep(10 * time.Millisecond) + waitExecute() input <- &xsql.Tuple{Message: map[string]any{"a": int64(1)}} + waitExecute() timex.Add(1100 * time.Millisecond) got := <-output wt, ok := got.(*xsql.WindowTuples) From aef7ac900d49c70c57667cc1d2ca9d084061fadb Mon Sep 17 00:00:00 2001 From: Song Gao Date: Mon, 16 Dec 2024 15:57:48 +0800 Subject: [PATCH 5/6] fix Signed-off-by: Song Gao --- internal/converter/json/converter.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/internal/converter/json/converter.go b/internal/converter/json/converter.go index bbfdcb77b5..58e06643c0 100644 --- a/internal/converter/json/converter.go +++ b/internal/converter/json/converter.go @@ -59,11 +59,6 @@ func (f *FastJsonConverter) Decode(ctx api.StreamContext, b []byte) (m any, err }() f.RLock() defer f.RUnlock() - if f.schema == nil { - var r any - err = json.Unmarshal(b, &r) - return r, err - } return f.decodeWithSchema(b, f.schema) } From f31e9ec3bcf479559673ce176837053ce4004274 Mon Sep 17 00:00:00 2001 From: Song Gao Date: Thu, 19 Dec 2024 11:20:34 +0800 Subject: [PATCH 6/6] fix Signed-off-by: Song Gao fix Signed-off-by: Song Gao fix Signed-off-by: Song Gao --- etc/mqtt_source.yaml | 1 + internal/converter/converter.go | 2 +- internal/converter/json/convert_bench_test.go | 32 +++++++++++- internal/converter/json/converter.go | 50 ++++++++++++------- internal/converter/json/converter_test.go | 30 ++++++----- internal/io/memory/lookupsource_test.go | 1 - internal/topo/node/decode_op_test.go | 8 ++- internal/topo/node/lookup_node_test.go | 3 -- 8 files changed, 81 insertions(+), 46 deletions(-) diff --git a/etc/mqtt_source.yaml b/etc/mqtt_source.yaml index 3036ccf42e..b435b8420f 100644 --- a/etc/mqtt_source.yaml +++ b/etc/mqtt_source.yaml @@ -14,6 +14,7 @@ default: #connectionSelector: mqtt.mqtt_conf1 #kubeedgeVersion: #kubeedgeModelFile: "" + #useInt64ForWholeNumber: true # demo_conf: #Conf_key # qos: 0 diff --git a/internal/converter/converter.go b/internal/converter/converter.go index f9b16ccd23..e622459723 100644 --- a/internal/converter/converter.go +++ b/internal/converter/converter.go @@ -33,7 +33,7 @@ import ( func init() { modules.RegisterConverter(message.FormatJson, func(_ api.StreamContext, _ string, schema map[string]*ast.JsonStreamField, props map[string]any) (message.Converter, error) { - return json.NewFastJsonConverter(schema), nil + return json.NewFastJsonConverter(schema, props), nil }) modules.RegisterConverter(message.FormatXML, func(ctx api.StreamContext, schemaId string, logicalSchema map[string]*ast.JsonStreamField, props map[string]any) (message.Converter, error) { return xml.NewXMLConverter(), nil diff --git a/internal/converter/json/convert_bench_test.go b/internal/converter/json/convert_bench_test.go index d88064bfdd..546f918f80 100644 --- a/internal/converter/json/convert_bench_test.go +++ b/internal/converter/json/convert_bench_test.go @@ -15,6 +15,7 @@ package json import ( + "encoding/json" "os" "testing" @@ -106,9 +107,38 @@ func benchmarkByFiles(filePath string, b *testing.B, schema map[string]*ast.Json if err != nil { b.Fatal(err) } - f := NewFastJsonConverter(schema) + f := NewFastJsonConverter(schema, nil) b.ResetTimer() for i := 0; i < b.N; i++ { f.Decode(ctx, payload) } } + +func BenchmarkNativeFloatParse(b *testing.B) { + m := make(map[string]interface{}) + data := `{"id":1.2}` + b.ResetTimer() + for i := 0; i < b.N; i++ { + json.Unmarshal([]byte(data), &m) + } +} + +func BenchmarkFloatParse(b *testing.B) { + ctx := mockContext.NewMockContext("test", "test") + f := NewFastJsonConverter(nil, map[string]any{"useInt64ForWholeNumber": true}) + data := `{"id":1.2}` + b.ResetTimer() + for i := 0; i < b.N; i++ { + f.Decode(ctx, []byte(data)) + } +} + +func BenchmarkIntParse(b *testing.B) { + ctx := mockContext.NewMockContext("test", "test") + f := NewFastJsonConverter(nil, map[string]any{"useInt64ForWholeNumber": true}) + data := `{"id":1}` + b.ResetTimer() + for i := 0; i < b.N; i++ { + f.Decode(ctx, []byte(data)) + } +} diff --git a/internal/converter/json/converter.go b/internal/converter/json/converter.go index 58e06643c0..22da770d3e 100644 --- a/internal/converter/json/converter.go +++ b/internal/converter/json/converter.go @@ -23,7 +23,6 @@ import ( "github.com/lf-edge/ekuiper/contract/v2/api" "github.com/valyala/fastjson" - "github.com/lf-edge/ekuiper/v2/internal/conf" "github.com/lf-edge/ekuiper/v2/pkg/ast" "github.com/lf-edge/ekuiper/v2/pkg/cast" "github.com/lf-edge/ekuiper/v2/pkg/errorx" @@ -32,15 +31,28 @@ import ( type FastJsonConverter struct { sync.RWMutex schema map[string]*ast.JsonStreamField + FastJsonConverterConf } -func NewFastJsonConverter(schema map[string]*ast.JsonStreamField) *FastJsonConverter { +type FastJsonConverterConf struct { + UseInt64 bool `json:"useInt64ForWholeNumber"` +} + +func NewFastJsonConverter(schema map[string]*ast.JsonStreamField, props map[string]any) *FastJsonConverter { f := &FastJsonConverter{ schema: schema, } + f.setupProps(props) return f } +func (f *FastJsonConverter) setupProps(props map[string]any) { + if props == nil { + return + } + cast.MapToStruct(props, &f.FastJsonConverterConf) +} + func (f *FastJsonConverter) ResetSchema(schema map[string]*ast.JsonStreamField) { f.Lock() defer f.Unlock() @@ -82,7 +94,7 @@ func (f *FastJsonConverter) DecodeField(_ api.StreamContext, b []byte, field str case fastjson.TypeString: return vv.String(), nil case fastjson.TypeNumber: - return extractNumber(vv) + return f.extractNumber(vv) case fastjson.TypeTrue, fastjson.TypeFalse: return vv.Bool() } @@ -337,7 +349,7 @@ func (f *FastJsonConverter) checkSchema(key, typ string, schema map[string]*ast. func (f *FastJsonConverter) extractNumberValue(name string, v *fastjson.Value, field *ast.JsonStreamField) (interface{}, error) { if field == nil { - return extractNumber(v) + return f.extractNumber(v) } switch { case field.Type == "float", field.Type == "datetime": @@ -413,6 +425,21 @@ func (f *FastJsonConverter) extractBooleanFromValue(name string, v *fastjson.Val return nil, fmt.Errorf("%v has wrong type:%v, expect:%v", name, v.Type().String(), getType(field)) } +func (f *FastJsonConverter) extractNumber(v *fastjson.Value) (any, error) { + if f.UseInt64 && !isFloat64(v.String()) { + i64, err := v.Int64() + if err != nil { + return nil, err + } + return i64, nil + } + f64, err := v.Float64() + if err != nil { + return nil, err + } + return f64, nil +} + func getBooleanFromValue(value *fastjson.Value) (interface{}, error) { typ := value.Type() switch typ { @@ -448,21 +475,6 @@ func getType(t *ast.JsonStreamField) string { } } -func extractNumber(v *fastjson.Value) (any, error) { - if !isFloat64(v.String()) && !conf.IsTesting { - i64, err := v.Int64() - if err != nil { - return nil, err - } - return i64, nil - } - f64, err := v.Float64() - if err != nil { - return nil, err - } - return f64, nil -} - func isFloat64(v string) bool { return strings.Contains(v, ".") } diff --git a/internal/converter/json/converter_test.go b/internal/converter/json/converter_test.go index d78f06c6fd..f962a96f0f 100644 --- a/internal/converter/json/converter_test.go +++ b/internal/converter/json/converter_test.go @@ -23,7 +23,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "github.com/lf-edge/ekuiper/v2/internal/conf" "github.com/lf-edge/ekuiper/v2/internal/topo/context" "github.com/lf-edge/ekuiper/v2/pkg/ast" mockContext "github.com/lf-edge/ekuiper/v2/pkg/mock/context" @@ -163,7 +162,7 @@ func TestFastJsonConverterWithSchema(t *testing.T) { } ctx := mockContext.NewMockContext("test", "op1") for _, tc := range testcases { - f := NewFastJsonConverter(tc.schema) + f := NewFastJsonConverter(tc.schema, nil) v, err := f.Decode(ctx, tc.payload) require.NoError(t, err) require.Equal(t, v, tc.require) @@ -174,7 +173,7 @@ func TestFastJsonConverterWithSchema(t *testing.T) { arrayRequire := []map[string]interface{}{ tc.require, } - f := NewFastJsonConverter(tc.schema) + f := NewFastJsonConverter(tc.schema, nil) v, err := f.Decode(ctx, arrayPayload) require.NoError(t, err) require.Equal(t, v, arrayRequire) @@ -334,7 +333,7 @@ func TestFastJsonConverterWithSchemaError(t *testing.T) { } ctx := mockContext.NewMockContext("test", "op1") for _, tc := range testcases { - f := NewFastJsonConverter(tc.schema) + f := NewFastJsonConverter(tc.schema, nil) _, err := f.Decode(ctx, tc.payload) require.Error(t, err) require.Equal(t, err.Error(), tc.err.Error()) @@ -345,7 +344,7 @@ func TestFastJsonEncode(t *testing.T) { a := make(map[string]int) a["a"] = 1 ctx := mockContext.NewMockContext("test", "op1") - f := NewFastJsonConverter(nil) + f := NewFastJsonConverter(nil, nil) v, err := f.Encode(ctx, a) require.NoError(t, err) require.Equal(t, v, []byte(`{"a":1}`)) @@ -378,7 +377,7 @@ func TestArrayWithArray(t *testing.T) { }, } ctx := mockContext.NewMockContext("test", "op1") - f := NewFastJsonConverter(schema) + f := NewFastJsonConverter(schema, nil) v, err := f.Decode(ctx, payload) require.NoError(t, err) require.Equal(t, v, map[string]interface{}{ @@ -559,7 +558,7 @@ func TestTypeNull(t *testing.T) { arrayRequire := []map[string]interface{}{ tc.require, } - f := NewFastJsonConverter(tc.schema) + f := NewFastJsonConverter(tc.schema, nil) v, err := f.Decode(ctx, arrayPayload) require.NoError(t, err) require.Equal(t, v, arrayRequire) @@ -569,7 +568,7 @@ func TestTypeNull(t *testing.T) { arrayRequire := []map[string]interface{}{ tc.require, } - f := NewFastJsonConverter(tc.schema) + f := NewFastJsonConverter(tc.schema, nil) v, err := f.Decode(ctx, arrayPayload) require.NoError(t, err) require.Equal(t, v, arrayRequire) @@ -586,7 +585,7 @@ func TestConvertBytea(t *testing.T) { }, } ctx := mockContext.NewMockContext("test", "op1") - f := NewFastJsonConverter(schema) + f := NewFastJsonConverter(schema, nil) v, err := f.Decode(ctx, []byte(payload)) require.NoError(t, err) require.Equal(t, v, map[string]interface{}{ @@ -602,7 +601,7 @@ func TestConvertBytea(t *testing.T) { }, }, } - f = NewFastJsonConverter(schema) + f = NewFastJsonConverter(schema, nil) v, err = f.Decode(ctx, []byte(payload)) require.NoError(t, err) require.Equal(t, v, map[string]interface{}{ @@ -614,7 +613,7 @@ func TestSchemaless(t *testing.T) { originSchema := map[string]*ast.JsonStreamField{ "a": nil, } - f := NewFastJsonConverter(originSchema) + f := NewFastJsonConverter(originSchema, nil) testcases := []struct { data map[string]interface{} expect map[string]interface{} @@ -665,7 +664,7 @@ func TestIssue(t *testing.T) { originSchema := map[string]*ast.JsonStreamField{ "results": nil, } - f := NewFastJsonConverter(originSchema) + f := NewFastJsonConverter(originSchema, nil) data := `{ "results": [ { @@ -705,7 +704,7 @@ func TestIssue(t *testing.T) { schmema2 := map[string]*ast.JsonStreamField{ "others": nil, } - f2 := NewFastJsonConverter(schmema2) + f2 := NewFastJsonConverter(schmema2, nil) m, err = f2.Decode(context.Background(), []byte(data)) require.NoError(t, err) require.Len(t, m, 0) @@ -755,7 +754,7 @@ func TestDecodeField(t *testing.T) { }, } ctx := mockContext.NewMockContext("test", "op1") - f := NewFastJsonConverter(nil) + f := NewFastJsonConverter(nil, nil) for _, tc := range testcases { t.Run(tc.name, func(t *testing.T) { field, err := f.DecodeField(ctx, tc.payload, "id") @@ -770,11 +769,10 @@ func TestDecodeField(t *testing.T) { } func TestIssue3441(t *testing.T) { - conf.IsTesting = false originSchema := map[string]*ast.JsonStreamField{ "id": nil, } - f := NewFastJsonConverter(originSchema) + f := NewFastJsonConverter(originSchema, map[string]any{"useInt64ForWholeNumber": true}) data := `{"id":1795292668348461056}` ctx := mockContext.NewMockContext("test", "op1") m, err := f.Decode(ctx, []byte(data)) diff --git a/internal/io/memory/lookupsource_test.go b/internal/io/memory/lookupsource_test.go index 4328e29a2d..8fcfc64562 100644 --- a/internal/io/memory/lookupsource_test.go +++ b/internal/io/memory/lookupsource_test.go @@ -131,7 +131,6 @@ func TestUpdateLookup(t *testing.T) { } func TestLookup(t *testing.T) { - conf.IsTesting = true contextLogger := conf.Log.WithField("rule", "test2") ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger) ls := GetLookupSource().(api.LookupSource) diff --git a/internal/topo/node/decode_op_test.go b/internal/topo/node/decode_op_test.go index eaa8a1512a..d63bed2fd8 100644 --- a/internal/topo/node/decode_op_test.go +++ b/internal/topo/node/decode_op_test.go @@ -24,7 +24,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "github.com/lf-edge/ekuiper/v2/internal/conf" "github.com/lf-edge/ekuiper/v2/internal/pkg/def" "github.com/lf-edge/ekuiper/v2/internal/xsql" "github.com/lf-edge/ekuiper/v2/pkg/ast" @@ -70,9 +69,9 @@ func TestJSON(t *testing.T) { &xsql.Tuple{Emitter: "test", Message: map[string]interface{}{"a": 3.0, "b": 4.0, "sourceConf": "hello"}, Timestamp: time.UnixMilli(111), Metadata: map[string]any{"topic": "demo", "qos": 1}}, }, {errors.New("go through error")}, - {errors.New("invalid character ':' after top-level value")}, - {errors.New("only map[string]any inside a list is supported but got: hello")}, - {errors.New("unsupported decode result: hello")}, + {errors.New(`unexpected tail: ":1,\"b\":2},{\"a\":3,\"b\":4,\"sourceConf\":\"hello\"}]"`)}, + {errors.New(`value doesn't contain object; it contains string`)}, + {errors.New(`only map[string]interface{} and []map[string]interface{} is supported`)}, {errors.New("unsupported data received: invalid")}, } timex.Add(2 * time.Second) @@ -393,7 +392,6 @@ func TestPayloadDecodeWithSchema(t *testing.T) { } func TestPayloadBatchDecodeWithSchema(t *testing.T) { - conf.IsTesting = true tests := []struct { name string input any diff --git a/internal/topo/node/lookup_node_test.go b/internal/topo/node/lookup_node_test.go index bef8d85078..8a422d9faf 100644 --- a/internal/topo/node/lookup_node_test.go +++ b/internal/topo/node/lookup_node_test.go @@ -23,7 +23,6 @@ import ( "github.com/lf-edge/ekuiper/contract/v2/api" "github.com/stretchr/testify/assert" - "github.com/lf-edge/ekuiper/v2/internal/conf" "github.com/lf-edge/ekuiper/v2/internal/pkg/def" "github.com/lf-edge/ekuiper/v2/internal/topo/lookup" "github.com/lf-edge/ekuiper/v2/internal/xsql" @@ -342,7 +341,6 @@ func TestLookup(t *testing.T) { } func TestLookupInner(t *testing.T) { - conf.IsTesting = true tests := []struct { name string input any @@ -610,7 +608,6 @@ func TestLookupInner(t *testing.T) { } func TestLookupPayload(t *testing.T) { - conf.IsTesting = true tests := []struct { name string input any