Skip to content

Commit

Permalink
Added option to replace values on null (#25)
Browse files Browse the repository at this point in the history
  • Loading branch information
pparshin authored Nov 25, 2020
1 parent 566057a commit 14f670d
Show file tree
Hide file tree
Showing 12 changed files with 165 additions and 32 deletions.
19 changes: 14 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,19 +38,22 @@ Replicator reads primary keys from MySQL table info and sync them automatically.
Updating primary key in MySQL causes two Tarantool requests: delete an old row and insert a new one, because
it is illegal to update primary key in Tarantool.

### Force cast MySQL value
### Custom mapping rules for columns

Replicator can cast the value from MySQL to the required type
if your Tarantool schema does not comply with the MySQL schema.
For example, MySQL column stores `bigint(20)` values, but Tarantool
expects `unsigned`.
Without explicit casting you get an error, e.g.:
Without explicit casting you will get an error, e.g.:
> Tuple field 1 type does not match one required by operation: expected unsigned
Supported types to cast to:
* `unsigned`: try to cast any number to unsigned value.

To enable this feature specify which column should be casted:
If MySQL column stores `null` values, you can replace them by another value.
It is useful when the space format is defined or you have an index on this field in Tarantool.

Custom column mapping configuration example:

```yaml
...
Expand All @@ -62,8 +65,14 @@ To enable this feature specify which column should be casted:
- client_id
dest:
space: 'users'
cast:
client_id: 'unsigned'
column:
id:
cast: 'unsigned'
email:
on_null: 'my_default_value'
client_id:
cast: 'unsigned'
on_null: 0
```
## Docker image
Expand Down
3 changes: 3 additions & 0 deletions config/dev.conf.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,6 @@ replication:
- email
dest:
space: 'users'
column:
email:
on_null: 'stub@mail.ru'
3 changes: 3 additions & 0 deletions config/replicator.conf.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,6 @@ replication:
- email
dest:
space: 'users'
column:
email:
on_null: 'stub@mail.ru'
2 changes: 1 addition & 1 deletion docker/mysql/init.d/init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ CREATE TABLE users
username varchar(16) not null,
password varchar(254) not null,
name varchar(50) default '' not null,
email varchar(254) not null
email varchar(254)
) charset = utf8;

CREATE TABLE logins
Expand Down
25 changes: 15 additions & 10 deletions internal/bridge/attribute.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,13 @@ func castTypeFromString(str string) castType {

// attribute represents MySQL column mapped to Tarantool.
type attribute struct {
colIndex uint64 // column sequence number in MySQL table
tupIndex uint64 // attribute sequence number in Tarantool tuple
name string // unique attribute name
vType attrType // value type stored in the column
cType castType // value must be casted to this type
unsigned bool // whether attribute contains unsigned number or not
colIndex uint64 // column sequence number in MySQL table
tupIndex uint64 // attribute sequence number in Tarantool tuple
name string // unique attribute name
vType attrType // value type stored in the column
cType castType // value must be casted to this type
onNull interface{} // replace null by this value
unsigned bool // whether attribute contains unsigned number or not
}

func newAttr(table *schema.Table, tupIndex uint64, name string) (*attribute, error) {
Expand Down Expand Up @@ -99,6 +100,10 @@ func (a *attribute) fetchValue(row []interface{}) (interface{}, error) {

value := row[a.colIndex]

if value == nil && a.onNull != nil {
value = a.onNull
}

if a.shouldCastToUInt64(value) {
v, err := toUint64(value)
if err != nil {
Expand All @@ -112,14 +117,14 @@ func (a *attribute) fetchValue(row []interface{}) (interface{}, error) {
}

func (a *attribute) shouldCastToUInt64(value interface{}) bool {
if value == nil {
return false
}

if a.cType == castUnsigned {
return true
}

if value == nil {
return false
}

if !a.unsigned {
return false
}
Expand Down
36 changes: 36 additions & 0 deletions internal/bridge/attribute_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ func Test_attribute_fetchValue(t *testing.T) {
name string
vType attrType
cType castType
onNull interface{}
unsigned bool
}
type args struct {
Expand Down Expand Up @@ -121,6 +122,40 @@ func Test_attribute_fetchValue(t *testing.T) {
want: uint64(10),
wantErr: false,
},
{
name: "String_ReplaceOnNull",
fields: fields{
colIndex: 0,
tupIndex: 0,
name: "email",
vType: typeString,
cType: castNone,
onNull: "replacement",
unsigned: false,
},
args: args{
row: []interface{}{nil},
},
want: "replacement",
wantErr: false,
},
{
name: "UnsignedNumber_ReplaceOnNull",
fields: fields{
colIndex: 0,
tupIndex: 0,
name: "id",
vType: typeNumber,
cType: castUnsigned,
onNull: 102,
unsigned: false,
},
args: args{
row: []interface{}{nil},
},
want: uint64(102),
wantErr: false,
},
{
name: "ColumnIndexEqualRowLen",
fields: fields{
Expand Down Expand Up @@ -164,6 +199,7 @@ func Test_attribute_fetchValue(t *testing.T) {
name: tt.fields.name,
vType: tt.fields.vType,
cType: tt.fields.cType,
onNull: tt.fields.onNull,
unsigned: tt.fields.unsigned,
}
got, err := a.fetchValue(tt.args.row)
Expand Down
18 changes: 11 additions & 7 deletions internal/bridge/replicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,6 @@ func New(cfg *config.Config, logger zerolog.Logger) (*Bridge, error) {
return nil, err
}

b.syncRulesAndCanalDump()

// We must use binlog full row image.
if err := b.canal.CheckBinlogRowImage("FULL"); err != nil {
return nil, err
Expand Down Expand Up @@ -97,7 +95,7 @@ func (b *Bridge) newRules(cfg *config.Config) error {
rules := make(map[string]*rule, len(cfg.Replication.Mappings))
for _, mapping := range cfg.Replication.Mappings {
source := mapping.Source
cast := mapping.Dest.Cast
colmap := mapping.Dest.Column

tableInfo, err := b.canal.GetTable(source.Schema, source.Table)
if err != nil {
Expand All @@ -109,8 +107,10 @@ func (b *Bridge) newRules(cfg *config.Config) error {
return fmt.Errorf("no primary keys found, schema: %s, table: %s", source.Schema, source.Table)
}
for _, pk := range pks {
typ := castTypeFromString(cast[pk.name])
pk.castTo(typ)
if m, ok := colmap[pk.name]; ok {
pk.castTo(castTypeFromString(m.Cast))
pk.onNull = m.OnNull
}
}

attrs := make([]*attribute, 0, len(source.Columns))
Expand All @@ -130,8 +130,11 @@ func (b *Bridge) newRules(cfg *config.Config) error {
if err != nil {
return err
}
typ := castTypeFromString(cast[name])
attr.castTo(typ)

if m, ok := colmap[name]; ok {
attr.castTo(castTypeFromString(m.Cast))
attr.onNull = m.OnNull
}

attrs = append(attrs, attr)
}
Expand All @@ -151,6 +154,7 @@ func (b *Bridge) newRules(cfg *config.Config) error {
}

b.rules = rules
b.syncRulesAndCanalDump()

return nil
}
Expand Down
45 changes: 44 additions & 1 deletion internal/bridge/replicator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,8 +300,11 @@ func (s *bridgeSuite) TestForceCast() {

cfg := *s.cfg
if wantErr {
cfg.Replication.Mappings = make([]config.Mapping, len(s.cfg.Replication.Mappings))
copy(cfg.Replication.Mappings, s.cfg.Replication.Mappings)

for i := range cfg.Replication.Mappings {
cfg.Replication.Mappings[i].Dest.Cast = nil
cfg.Replication.Mappings[i].Dest.Column = nil
}
}

Expand Down Expand Up @@ -343,6 +346,46 @@ func (s *bridgeSuite) TestForceCast() {
}
}

func (s *bridgeSuite) TestReplaceOnNull() {
t := s.T()
s.init(s.cfg)

go func() {
errors := s.bridge.Run()
for err := range errors {
assert.NoError(t, err)
}
}()

_, err := s.executeSQL("INSERT INTO city.users (username, password, name, email) VALUES (?, ?, ?, ?)", "alice", "12345", "Alice", nil)
require.NoError(t, err)

err = s.bridge.canal.CatchMasterPos(500 * time.Millisecond)
require.NoError(t, err)

require.Eventually(t, func() bool {
return s.hasSyncedData("users", 1)
}, 500*time.Millisecond, 50*time.Millisecond)

err = s.bridge.Close()
assert.NoError(t, err)

got, err := s.executeTNT(&tarantool.Select{
Space: "users",
Iterator: tarantool.IterAll,
})
require.NoError(t, err)
require.NotNil(t, got)
require.NotEmpty(t, got.Data)
require.Len(t, got.Data, 1)
want := []interface{}{1, "alice", "12345", "null"}
gotTuple := got.Data[0]
require.Len(t, gotTuple, len(want))
for i, v := range want {
require.EqualValues(t, v, gotTuple[i])
}
}

func (s *bridgeSuite) TestReconnect() {
t := s.T()

Expand Down
9 changes: 7 additions & 2 deletions internal/bridge/testdata/cfg.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ replication:
- email
dest:
space: 'users'
column:
email:
on_null: 'null'
- source:
schema: 'city'
table: 'logins'
Expand All @@ -54,5 +57,7 @@ replication:
- latitude
dest:
space: 'logins'
cast:
attempts: 'unsigned'
column:
attempts:
cast: 'unsigned'
on_null: 0
9 changes: 7 additions & 2 deletions internal/config/cfg.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,11 +134,16 @@ type Mapping struct {
} `yaml:"source"`

Dest struct {
Space string `yaml:"space"`
Cast map[string]string `yaml:"cast"`
Space string `yaml:"space"`
Column map[string]MappingColumn `yaml:"column"`
} `yaml:"dest"`
}

type MappingColumn struct {
Cast string `yaml:"cast"`
OnNull interface{} `yaml:"on_null,omitempty"`
}

func ReadFromFile(path string) (*Config, error) {
file, err := os.Open(path)
if err != nil {
Expand Down
18 changes: 16 additions & 2 deletions internal/config/cfg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,20 @@ func TestReadFromFile_ValidPath(t *testing.T) {
assert.Equal(t, "users", mapping.Source.Table)
assert.Equal(t, []string{"username", "password", "email"}, mapping.Source.Columns)
assert.Equal(t, "users", mapping.Dest.Space)
assert.Len(t, mapping.Dest.Cast, 1)
assert.Contains(t, mapping.Dest.Cast, "attempts")
assert.Len(t, mapping.Dest.Column, 3)
columnMapping, ok := mapping.Dest.Column["attempts"]
if assert.True(t, ok) {
assert.Equal(t, "unsigned", columnMapping.Cast)
assert.Equal(t, 0, columnMapping.OnNull)
}
columnMapping, ok = mapping.Dest.Column["email"]
if assert.True(t, ok) {
assert.Equal(t, "", columnMapping.Cast)
assert.Equal(t, "", columnMapping.OnNull)
}
columnMapping, ok = mapping.Dest.Column["client_id"]
if assert.True(t, ok) {
assert.Equal(t, "unsigned", columnMapping.Cast)
assert.Nil(t, columnMapping.OnNull)
}
}
10 changes: 8 additions & 2 deletions internal/config/testdata/replicator.conf.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,5 +45,11 @@ replication:
- email
dest:
space: 'users'
cast:
attempts: 'unsigned'
column:
attempts:
cast: 'unsigned'
on_null: 0
email:
on_null: ''
client_id:
cast: 'unsigned'

0 comments on commit 14f670d

Please sign in to comment.