Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: converter lose precision #3445

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion internal/converter/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (

func init() {
modules.RegisterConverter(message.FormatJson, func(_ api.StreamContext, _ string, schema map[string]*ast.JsonStreamField, props map[string]any) (message.Converter, error) {
return json.NewFastJsonConverter(schema), nil
return json.NewFastJsonConverter(schema, props), nil
})
modules.RegisterConverter(message.FormatXML, func(ctx api.StreamContext, schemaId string, logicalSchema map[string]*ast.JsonStreamField, props map[string]any) (message.Converter, error) {
return xml.NewXMLConverter(), nil
Expand Down
36 changes: 35 additions & 1 deletion internal/converter/json/convert_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
package json

import (
"encoding/json"
"os"
"testing"

"github.com/lf-edge/ekuiper/v2/internal/conf"
"github.com/lf-edge/ekuiper/v2/pkg/ast"
mockContext "github.com/lf-edge/ekuiper/v2/pkg/mock/context"
)
Expand Down Expand Up @@ -106,9 +108,41 @@ func benchmarkByFiles(filePath string, b *testing.B, schema map[string]*ast.Json
if err != nil {
b.Fatal(err)
}
f := NewFastJsonConverter(schema)
f := NewFastJsonConverter(schema, nil)
b.ResetTimer()
for i := 0; i < b.N; i++ {
f.Decode(ctx, payload)
}
}

func BenchmarkNativeFloatParse(b *testing.B) {
conf.IsTesting = false
m := make(map[string]interface{})
data := `{"id":1.2}`
b.ResetTimer()
for i := 0; i < b.N; i++ {
json.Unmarshal([]byte(data), &m)
}
}

func BenchmarkFloatParse(b *testing.B) {
conf.IsTesting = false
ctx := mockContext.NewMockContext("test", "test")
f := NewFastJsonConverter(nil, map[string]any{"use_int64": true})
data := `{"id":1.2}`
b.ResetTimer()
for i := 0; i < b.N; i++ {
f.Decode(ctx, []byte(data))
}
}

func BenchmarkIntParse(b *testing.B) {
conf.IsTesting = false
ctx := mockContext.NewMockContext("test", "test")
f := NewFastJsonConverter(nil, map[string]any{"use_int64": true})
data := `{"id":1}`
b.ResetTimer()
for i := 0; i < b.N; i++ {
f.Decode(ctx, []byte(data))
}
}
59 changes: 46 additions & 13 deletions internal/converter/json/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import (
"encoding/json"
"fmt"
"strings"
"sync"

"github.com/lf-edge/ekuiper/contract/v2/api"
Expand All @@ -29,16 +30,31 @@

type FastJsonConverter struct {
sync.RWMutex
schema map[string]*ast.JsonStreamField
schema map[string]*ast.JsonStreamField
useInt64 bool
}

func NewFastJsonConverter(schema map[string]*ast.JsonStreamField) *FastJsonConverter {
func NewFastJsonConverter(schema map[string]*ast.JsonStreamField, props map[string]any) *FastJsonConverter {
f := &FastJsonConverter{
schema: schema,
}
f.setupProps(props)
return f
}

func (f *FastJsonConverter) setupProps(props map[string]any) {
if props == nil {
return
}
v, ok := props["use_int64"]
if ok {
b, ok := v.(bool)
if ok {
f.useInt64 = b
}
}
}

func (f *FastJsonConverter) ResetSchema(schema map[string]*ast.JsonStreamField) {
f.Lock()
defer f.Unlock()
Expand All @@ -57,11 +73,6 @@
}()
f.RLock()
defer f.RUnlock()
if f.schema == nil {
var r any
err = json.Unmarshal(b, &r)
return r, err
}
return f.decodeWithSchema(b, f.schema)
}

Expand All @@ -85,7 +96,7 @@
case fastjson.TypeString:
return vv.String(), nil
case fastjson.TypeNumber:
return vv.Float64()
return f.extractNumber(vv)
case fastjson.TypeTrue, fastjson.TypeFalse:
return vv.Bool()
}
Expand Down Expand Up @@ -340,11 +351,7 @@

func (f *FastJsonConverter) extractNumberValue(name string, v *fastjson.Value, field *ast.JsonStreamField) (interface{}, error) {
if field == nil {
f64, err := v.Float64()
if err != nil {
return nil, err
}
return f64, nil
return f.extractNumber(v)
}
switch {
case field.Type == "float", field.Type == "datetime":
Expand Down Expand Up @@ -454,3 +461,29 @@
return t.Type
}
}

func (f *FastJsonConverter) extractNumber(v *fastjson.Value) (any, error) {
if f.useInt64 {
if isFloat64(v) {
f64, err := v.Float64()
if err != nil {
return nil, err
}

Check warning on line 471 in internal/converter/json/converter.go

View check run for this annotation

Codecov / codecov/patch

internal/converter/json/converter.go#L470-L471

Added lines #L470 - L471 were not covered by tests
return f64, nil
}
i64, err := v.Int64()
if err != nil {
return nil, err
}

Check warning on line 477 in internal/converter/json/converter.go

View check run for this annotation

Codecov / codecov/patch

internal/converter/json/converter.go#L476-L477

Added lines #L476 - L477 were not covered by tests
return i64, nil
}
f64, err := v.Float64()
if err != nil {
return nil, err
}

Check warning on line 483 in internal/converter/json/converter.go

View check run for this annotation

Codecov / codecov/patch

internal/converter/json/converter.go#L482-L483

Added lines #L482 - L483 were not covered by tests
return f64, nil
}

func isFloat64(v *fastjson.Value) bool {
return strings.Contains(v.String(), ".")
}
45 changes: 32 additions & 13 deletions internal/converter/json/converter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/lf-edge/ekuiper/v2/internal/conf"
"github.com/lf-edge/ekuiper/v2/internal/topo/context"
"github.com/lf-edge/ekuiper/v2/pkg/ast"
mockContext "github.com/lf-edge/ekuiper/v2/pkg/mock/context"
Expand Down Expand Up @@ -162,7 +163,7 @@ func TestFastJsonConverterWithSchema(t *testing.T) {
}
ctx := mockContext.NewMockContext("test", "op1")
for _, tc := range testcases {
f := NewFastJsonConverter(tc.schema)
f := NewFastJsonConverter(tc.schema, nil)
v, err := f.Decode(ctx, tc.payload)
require.NoError(t, err)
require.Equal(t, v, tc.require)
Expand All @@ -173,7 +174,7 @@ func TestFastJsonConverterWithSchema(t *testing.T) {
arrayRequire := []map[string]interface{}{
tc.require,
}
f := NewFastJsonConverter(tc.schema)
f := NewFastJsonConverter(tc.schema, nil)
v, err := f.Decode(ctx, arrayPayload)
require.NoError(t, err)
require.Equal(t, v, arrayRequire)
Expand Down Expand Up @@ -333,7 +334,7 @@ func TestFastJsonConverterWithSchemaError(t *testing.T) {
}
ctx := mockContext.NewMockContext("test", "op1")
for _, tc := range testcases {
f := NewFastJsonConverter(tc.schema)
f := NewFastJsonConverter(tc.schema, nil)
_, err := f.Decode(ctx, tc.payload)
require.Error(t, err)
require.Equal(t, err.Error(), tc.err.Error())
Expand All @@ -344,7 +345,7 @@ func TestFastJsonEncode(t *testing.T) {
a := make(map[string]int)
a["a"] = 1
ctx := mockContext.NewMockContext("test", "op1")
f := NewFastJsonConverter(nil)
f := NewFastJsonConverter(nil, nil)
v, err := f.Encode(ctx, a)
require.NoError(t, err)
require.Equal(t, v, []byte(`{"a":1}`))
Expand Down Expand Up @@ -377,7 +378,7 @@ func TestArrayWithArray(t *testing.T) {
},
}
ctx := mockContext.NewMockContext("test", "op1")
f := NewFastJsonConverter(schema)
f := NewFastJsonConverter(schema, nil)
v, err := f.Decode(ctx, payload)
require.NoError(t, err)
require.Equal(t, v, map[string]interface{}{
Expand Down Expand Up @@ -558,7 +559,7 @@ func TestTypeNull(t *testing.T) {
arrayRequire := []map[string]interface{}{
tc.require,
}
f := NewFastJsonConverter(tc.schema)
f := NewFastJsonConverter(tc.schema, nil)
v, err := f.Decode(ctx, arrayPayload)
require.NoError(t, err)
require.Equal(t, v, arrayRequire)
Expand All @@ -568,7 +569,7 @@ func TestTypeNull(t *testing.T) {
arrayRequire := []map[string]interface{}{
tc.require,
}
f := NewFastJsonConverter(tc.schema)
f := NewFastJsonConverter(tc.schema, nil)
v, err := f.Decode(ctx, arrayPayload)
require.NoError(t, err)
require.Equal(t, v, arrayRequire)
Expand All @@ -585,7 +586,7 @@ func TestConvertBytea(t *testing.T) {
},
}
ctx := mockContext.NewMockContext("test", "op1")
f := NewFastJsonConverter(schema)
f := NewFastJsonConverter(schema, nil)
v, err := f.Decode(ctx, []byte(payload))
require.NoError(t, err)
require.Equal(t, v, map[string]interface{}{
Expand All @@ -601,7 +602,7 @@ func TestConvertBytea(t *testing.T) {
},
},
}
f = NewFastJsonConverter(schema)
f = NewFastJsonConverter(schema, nil)
v, err = f.Decode(ctx, []byte(payload))
require.NoError(t, err)
require.Equal(t, v, map[string]interface{}{
Expand All @@ -613,7 +614,7 @@ func TestSchemaless(t *testing.T) {
originSchema := map[string]*ast.JsonStreamField{
"a": nil,
}
f := NewFastJsonConverter(originSchema)
f := NewFastJsonConverter(originSchema, nil)
testcases := []struct {
data map[string]interface{}
expect map[string]interface{}
Expand Down Expand Up @@ -664,7 +665,7 @@ func TestIssue(t *testing.T) {
originSchema := map[string]*ast.JsonStreamField{
"results": nil,
}
f := NewFastJsonConverter(originSchema)
f := NewFastJsonConverter(originSchema, nil)
data := `{
"results": [
{
Expand Down Expand Up @@ -704,7 +705,7 @@ func TestIssue(t *testing.T) {
schmema2 := map[string]*ast.JsonStreamField{
"others": nil,
}
f2 := NewFastJsonConverter(schmema2)
f2 := NewFastJsonConverter(schmema2, nil)
m, err = f2.Decode(context.Background(), []byte(data))
require.NoError(t, err)
require.Len(t, m, 0)
Expand Down Expand Up @@ -754,7 +755,7 @@ func TestDecodeField(t *testing.T) {
},
}
ctx := mockContext.NewMockContext("test", "op1")
f := NewFastJsonConverter(nil)
f := NewFastJsonConverter(nil, nil)
for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
field, err := f.DecodeField(ctx, tc.payload, "id")
Expand All @@ -767,3 +768,21 @@ func TestDecodeField(t *testing.T) {
})
}
}

func TestIssue3441(t *testing.T) {
conf.IsTesting = false
originSchema := map[string]*ast.JsonStreamField{
"id": nil,
}
f := NewFastJsonConverter(originSchema, map[string]any{"use_int64": true})
data := `{"id":1795292668348461056}`
ctx := mockContext.NewMockContext("test", "op1")
m, err := f.Decode(ctx, []byte(data))
require.NoError(t, err)
require.Equal(t, map[string]interface{}{"id": int64(1795292668348461056)}, m)

data = `{"id":17952926683484.44}`
m, err = f.Decode(ctx, []byte(data))
require.NoError(t, err)
require.Equal(t, map[string]interface{}{"id": 17952926683484.44}, m)
}
1 change: 1 addition & 0 deletions internal/topo/node/conf/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ func GetSourceConf(sourceType string, options *ast.Options) map[string]interface
props["strictValidation"] = options.STRICT_VALIDATION
props["timestamp"] = options.TIMESTAMP
props["timestampFormat"] = options.TIMESTAMP_FORMAT
props["use_int64"] = options.USE_INT64
conf.Log.Infof("get conf for %s with conf key %s: %v", sourceType, confkey, printable(props))
return props
}
Expand Down
3 changes: 3 additions & 0 deletions internal/topo/node/conf/source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ func TestGetSourceConf(t *testing.T) {
"strictValidation": false,
"timestamp": "",
"timestampFormat": "",
"use_int64": false,
},
},
{
Expand All @@ -93,6 +94,7 @@ func TestGetSourceConf(t *testing.T) {
"strictValidation": false,
"timestamp": "",
"timestampFormat": "",
"use_int64": false,
},
},
{
Expand Down Expand Up @@ -120,6 +122,7 @@ func TestGetSourceConf(t *testing.T) {
"timestampFormat": "",
"connectionSelector": "test11",
"a": 1,
"use_int64": false,
},
},
}
Expand Down
6 changes: 3 additions & 3 deletions internal/topo/node/decode_op_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,9 @@ func TestJSON(t *testing.T) {
&xsql.Tuple{Emitter: "test", Message: map[string]interface{}{"a": 3.0, "b": 4.0, "sourceConf": "hello"}, Timestamp: time.UnixMilli(111), Metadata: map[string]any{"topic": "demo", "qos": 1}},
},
{errors.New("go through error")},
{errors.New("invalid character ':' after top-level value")},
{errors.New("only map[string]any inside a list is supported but got: hello")},
{errors.New("unsupported decode result: hello")},
{errors.New(`unexpected tail: ":1,\"b\":2},{\"a\":3,\"b\":4,\"sourceConf\":\"hello\"}]"`)},
{errors.New("value doesn't contain object; it contains string")},
{errors.New("only map[string]interface{} and []map[string]interface{} is supported")},
{errors.New("unsupported data received: invalid")},
}
timex.Add(2 * time.Second)
Expand Down
3 changes: 2 additions & 1 deletion internal/topo/node/window_inc_agg_op_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,8 +298,9 @@ func TestIncAggTumblingWindow(t *testing.T) {
errCh := make(chan error, 10)
ctx, cancel := mockContext.NewMockContext("1", "2").WithCancel()
op.Exec(ctx, errCh)
time.Sleep(10 * time.Millisecond)
waitExecute()
input <- &xsql.Tuple{Message: map[string]any{"a": int64(1)}}
waitExecute()
timex.Add(1100 * time.Millisecond)
got := <-output
wt, ok := got.(*xsql.WindowTuples)
Expand Down
8 changes: 7 additions & 1 deletion internal/xsql/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -1551,6 +1551,12 @@
case ast.KIND:
val := strings.ToLower(lit3)
opts.KIND = val
case ast.USE_INT64:
if val := strings.ToUpper(lit3); (val != "TRUE") && (val != "FALSE") {
return nil, fmt.Errorf("found %q, expect TRUE/FALSE value in %s option.", lit3, lit1)

Check warning on line 1556 in internal/xsql/parser.go

View check run for this annotation

Codecov / codecov/patch

internal/xsql/parser.go#L1556

Added line #L1556 was not covered by tests
} else {
opts.USE_INT64 = val == "TRUE"
}
default:
f := v.Elem().FieldByName(lit1)
if f.IsValid() {
Expand All @@ -1573,7 +1579,7 @@
}
return nil, fmt.Errorf("Parenthesis is not matched in options definition.")
} else {
return nil, fmt.Errorf("found %q, unknown option keys(DATASOURCE|FORMAT|KEY|CONF_KEY|SHARED|STRICT_VALIDATION|TYPE|TIMESTAMP|TIMESTAMP_FORMAT|RETAIN_SIZE|SCHEMAID).", lit1)
return nil, fmt.Errorf("found %q, unknown option keys(DATASOURCE|FORMAT|KEY|CONF_KEY|SHARED|STRICT_VALIDATION|TYPE|TIMESTAMP|TIMESTAMP_FORMAT|RETAIN_SIZE|SCHEMAID|USE_INT64).", lit1)
}
}
} else {
Expand Down
Loading
Loading