Skip to content

Commit

Permalink
[Debezium] Adding Float64 converter (#1032)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 authored Nov 12, 2024
1 parent 6cc1c7b commit 3f2bfc2
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 3 deletions.
2 changes: 1 addition & 1 deletion lib/cdc/relational/debezium.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ import (
type Debezium struct{}

func (Debezium) GetEventFromBytes(bytes []byte) (cdc.Event, error) {
var event util.SchemaEventPayload
if len(bytes) == 0 {
return nil, fmt.Errorf("empty message")
}

var event util.SchemaEventPayload
if err := json.Unmarshal(bytes, &event); err != nil {
return nil, err
}
Expand Down
30 changes: 30 additions & 0 deletions lib/debezium/converters/basic.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,33 @@ func (Base64) Convert(value any) (any, error) {

return base64.StdEncoding.EncodeToString(castedValue), nil
}

// Float64 converter is used when Debezium's double.handling.mode is set to double.
type Float64 struct{}

func (Float64) ToKindDetails() typing.KindDetails {
return typing.Float
}

func (Float64) Convert(value any) (any, error) {
switch castedValue := value.(type) {
case int:
return float64(castedValue), nil
case int64:
return float64(castedValue), nil
case int32:
return float64(castedValue), nil
case float32:
return float64(castedValue), nil
case float64:
return castedValue, nil
case string:
if castedValue == "NaN" {
return nil, nil
}

return nil, fmt.Errorf("unexpected type %T, with value %q", value, castedValue)
default:
return nil, fmt.Errorf("unexpected type %T", value)
}
}
37 changes: 37 additions & 0 deletions lib/debezium/converters/basic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,40 @@ func TestBase64_Convert(t *testing.T) {
assert.Equal(t, "MjAyNA==", value)
}
}

func TestFloat64_Convert(t *testing.T) {
{
// Invalid
{
// Wrong data type
_, err := Float64{}.Convert("123")
assert.ErrorContains(t, err, `unexpected type string, with value "123"`)
}
{
// Another wrong data type
_, err := Float64{}.Convert(false)
assert.ErrorContains(t, err, "unexpected type bool")
}
}
{
// Valid
{
// int
value, err := Float64{}.Convert(123)
assert.NoError(t, err)
assert.Equal(t, float64(123), value)
}
{
// NaN
value, err := Float64{}.Convert("NaN")
assert.NoError(t, err)
assert.Nil(t, value)
}
{
// Float
value, err := Float64{}.Convert(123.45)
assert.NoError(t, err)
assert.Equal(t, 123.45, value)
}
}
}
7 changes: 5 additions & 2 deletions lib/debezium/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,11 @@ func (f Field) ToValueConverter() (converters.ValueConverter, error) {
slog.Warn("Unhandled Debezium type", slog.String("type", string(f.Type)), slog.String("debeziumType", string(f.DebeziumType)))
}

switch f.Type {
case Double, Float:
return converters.Float64{}, nil
}

return nil, nil
}
}
Expand All @@ -156,8 +161,6 @@ func (f Field) ToKindDetails() (typing.KindDetails, error) {
return typing.Struct, nil
case Int16, Int32, Int64:
return typing.Integer, nil
case Float, Double:
return typing.Float, nil
case String, Bytes:
return typing.String, nil
case Struct:
Expand Down
11 changes: 11 additions & 0 deletions lib/debezium/types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -469,4 +469,15 @@ func TestField_Decimal_ParseValue(t *testing.T) {
assert.Equal(t, int32(-1), value.(*decimal.Decimal).Details().Precision())
assert.Equal(t, int32(2), value.(*decimal.Decimal).Details().Scale())
}
{
// Float
field := Field{Type: Double}
converter, err := field.ToValueConverter()
assert.NoError(t, err)
assert.Equal(t, typing.Float, converter.ToKindDetails())

value, err := converter.Convert(123.45)
assert.NoError(t, err)
assert.Equal(t, 123.45, value.(float64))
}
}

0 comments on commit 3f2bfc2

Please sign in to comment.