diff --git a/README.md b/README.md index ed44813..db8f31b 100644 --- a/README.md +++ b/README.md @@ -38,19 +38,22 @@ Replicator reads primary keys from MySQL table info and sync them automatically. Updating primary key in MySQL causes two Tarantool requests: delete an old row and insert a new one, because it is illegal to update primary key in Tarantool. -### Force cast MySQL value +### Custom mapping rules for columns Replicator can cast the value from MySQL to the required type if your Tarantool schema does not comply with the MySQL schema. For example, MySQL column stores `bigint(20)` values, but Tarantool expects `unsigned`. -Without explicit casting you get an error, e.g.: +Without explicit casting you will get an error, e.g.: > Tuple field 1 type does not match one required by operation: expected unsigned Supported types to cast to: * `unsigned`: try to cast any number to unsigned value. -To enable this feature specify which column should be casted: +If MySQL column stores `null` values, you can replace them by another value. +It is useful when the space format is defined or you have an index on this field in Tarantool. + +Custom column mapping configuration example: ```yaml ... @@ -62,8 +65,14 @@ To enable this feature specify which column should be casted: - client_id dest: space: 'users' - cast: - client_id: 'unsigned' + column: + id: + cast: 'unsigned' + email: + on_null: 'my_default_value' + client_id: + cast: 'unsigned' + on_null: 0 ``` ## Docker image diff --git a/config/dev.conf.yml b/config/dev.conf.yml index a1aa3d0..a6e92f7 100644 --- a/config/dev.conf.yml +++ b/config/dev.conf.yml @@ -45,3 +45,6 @@ replication: - email dest: space: 'users' + column: + email: + on_null: 'stub@mail.ru' diff --git a/config/replicator.conf.yml b/config/replicator.conf.yml index 86d22b5..58da7c3 100644 --- a/config/replicator.conf.yml +++ b/config/replicator.conf.yml @@ -45,3 +45,6 @@ replication: - email dest: space: 'users' + column: + email: + on_null: 'stub@mail.ru' diff --git a/docker/mysql/init.d/init.sql b/docker/mysql/init.d/init.sql index 9230a7c..e9b1aea 100644 --- a/docker/mysql/init.d/init.sql +++ b/docker/mysql/init.d/init.sql @@ -6,7 +6,7 @@ CREATE TABLE users username varchar(16) not null, password varchar(254) not null, name varchar(50) default '' not null, - email varchar(254) not null + email varchar(254) ) charset = utf8; CREATE TABLE logins diff --git a/internal/bridge/attribute.go b/internal/bridge/attribute.go index 4d7429f..da4d0d5 100644 --- a/internal/bridge/attribute.go +++ b/internal/bridge/attribute.go @@ -45,12 +45,13 @@ func castTypeFromString(str string) castType { // attribute represents MySQL column mapped to Tarantool. type attribute struct { - colIndex uint64 // column sequence number in MySQL table - tupIndex uint64 // attribute sequence number in Tarantool tuple - name string // unique attribute name - vType attrType // value type stored in the column - cType castType // value must be casted to this type - unsigned bool // whether attribute contains unsigned number or not + colIndex uint64 // column sequence number in MySQL table + tupIndex uint64 // attribute sequence number in Tarantool tuple + name string // unique attribute name + vType attrType // value type stored in the column + cType castType // value must be casted to this type + onNull interface{} // replace null by this value + unsigned bool // whether attribute contains unsigned number or not } func newAttr(table *schema.Table, tupIndex uint64, name string) (*attribute, error) { @@ -99,6 +100,10 @@ func (a *attribute) fetchValue(row []interface{}) (interface{}, error) { value := row[a.colIndex] + if value == nil && a.onNull != nil { + value = a.onNull + } + if a.shouldCastToUInt64(value) { v, err := toUint64(value) if err != nil { @@ -112,14 +117,14 @@ func (a *attribute) fetchValue(row []interface{}) (interface{}, error) { } func (a *attribute) shouldCastToUInt64(value interface{}) bool { - if value == nil { - return false - } - if a.cType == castUnsigned { return true } + if value == nil { + return false + } + if !a.unsigned { return false } diff --git a/internal/bridge/attribute_test.go b/internal/bridge/attribute_test.go index 490db4b..ce7d971 100644 --- a/internal/bridge/attribute_test.go +++ b/internal/bridge/attribute_test.go @@ -13,6 +13,7 @@ func Test_attribute_fetchValue(t *testing.T) { name string vType attrType cType castType + onNull interface{} unsigned bool } type args struct { @@ -121,6 +122,40 @@ func Test_attribute_fetchValue(t *testing.T) { want: uint64(10), wantErr: false, }, + { + name: "String_ReplaceOnNull", + fields: fields{ + colIndex: 0, + tupIndex: 0, + name: "email", + vType: typeString, + cType: castNone, + onNull: "replacement", + unsigned: false, + }, + args: args{ + row: []interface{}{nil}, + }, + want: "replacement", + wantErr: false, + }, + { + name: "UnsignedNumber_ReplaceOnNull", + fields: fields{ + colIndex: 0, + tupIndex: 0, + name: "id", + vType: typeNumber, + cType: castUnsigned, + onNull: 102, + unsigned: false, + }, + args: args{ + row: []interface{}{nil}, + }, + want: uint64(102), + wantErr: false, + }, { name: "ColumnIndexEqualRowLen", fields: fields{ @@ -164,6 +199,7 @@ func Test_attribute_fetchValue(t *testing.T) { name: tt.fields.name, vType: tt.fields.vType, cType: tt.fields.cType, + onNull: tt.fields.onNull, unsigned: tt.fields.unsigned, } got, err := a.fetchValue(tt.args.row) diff --git a/internal/bridge/replicator.go b/internal/bridge/replicator.go index 84a092c..3366db1 100644 --- a/internal/bridge/replicator.go +++ b/internal/bridge/replicator.go @@ -65,8 +65,6 @@ func New(cfg *config.Config, logger zerolog.Logger) (*Bridge, error) { return nil, err } - b.syncRulesAndCanalDump() - // We must use binlog full row image. if err := b.canal.CheckBinlogRowImage("FULL"); err != nil { return nil, err @@ -97,7 +95,7 @@ func (b *Bridge) newRules(cfg *config.Config) error { rules := make(map[string]*rule, len(cfg.Replication.Mappings)) for _, mapping := range cfg.Replication.Mappings { source := mapping.Source - cast := mapping.Dest.Cast + colmap := mapping.Dest.Column tableInfo, err := b.canal.GetTable(source.Schema, source.Table) if err != nil { @@ -109,8 +107,10 @@ func (b *Bridge) newRules(cfg *config.Config) error { return fmt.Errorf("no primary keys found, schema: %s, table: %s", source.Schema, source.Table) } for _, pk := range pks { - typ := castTypeFromString(cast[pk.name]) - pk.castTo(typ) + if m, ok := colmap[pk.name]; ok { + pk.castTo(castTypeFromString(m.Cast)) + pk.onNull = m.OnNull + } } attrs := make([]*attribute, 0, len(source.Columns)) @@ -130,8 +130,11 @@ func (b *Bridge) newRules(cfg *config.Config) error { if err != nil { return err } - typ := castTypeFromString(cast[name]) - attr.castTo(typ) + + if m, ok := colmap[name]; ok { + attr.castTo(castTypeFromString(m.Cast)) + attr.onNull = m.OnNull + } attrs = append(attrs, attr) } @@ -151,6 +154,7 @@ func (b *Bridge) newRules(cfg *config.Config) error { } b.rules = rules + b.syncRulesAndCanalDump() return nil } diff --git a/internal/bridge/replicator_test.go b/internal/bridge/replicator_test.go index ad36e59..6fa824f 100644 --- a/internal/bridge/replicator_test.go +++ b/internal/bridge/replicator_test.go @@ -300,8 +300,11 @@ func (s *bridgeSuite) TestForceCast() { cfg := *s.cfg if wantErr { + cfg.Replication.Mappings = make([]config.Mapping, len(s.cfg.Replication.Mappings)) + copy(cfg.Replication.Mappings, s.cfg.Replication.Mappings) + for i := range cfg.Replication.Mappings { - cfg.Replication.Mappings[i].Dest.Cast = nil + cfg.Replication.Mappings[i].Dest.Column = nil } } @@ -343,6 +346,46 @@ func (s *bridgeSuite) TestForceCast() { } } +func (s *bridgeSuite) TestReplaceOnNull() { + t := s.T() + s.init(s.cfg) + + go func() { + errors := s.bridge.Run() + for err := range errors { + assert.NoError(t, err) + } + }() + + _, err := s.executeSQL("INSERT INTO city.users (username, password, name, email) VALUES (?, ?, ?, ?)", "alice", "12345", "Alice", nil) + require.NoError(t, err) + + err = s.bridge.canal.CatchMasterPos(500 * time.Millisecond) + require.NoError(t, err) + + require.Eventually(t, func() bool { + return s.hasSyncedData("users", 1) + }, 500*time.Millisecond, 50*time.Millisecond) + + err = s.bridge.Close() + assert.NoError(t, err) + + got, err := s.executeTNT(&tarantool.Select{ + Space: "users", + Iterator: tarantool.IterAll, + }) + require.NoError(t, err) + require.NotNil(t, got) + require.NotEmpty(t, got.Data) + require.Len(t, got.Data, 1) + want := []interface{}{1, "alice", "12345", "null"} + gotTuple := got.Data[0] + require.Len(t, gotTuple, len(want)) + for i, v := range want { + require.EqualValues(t, v, gotTuple[i]) + } +} + func (s *bridgeSuite) TestReconnect() { t := s.T() diff --git a/internal/bridge/testdata/cfg.yml b/internal/bridge/testdata/cfg.yml index 167467d..cdc1f0a 100644 --- a/internal/bridge/testdata/cfg.yml +++ b/internal/bridge/testdata/cfg.yml @@ -45,6 +45,9 @@ replication: - email dest: space: 'users' + column: + email: + on_null: 'null' - source: schema: 'city' table: 'logins' @@ -54,5 +57,7 @@ replication: - latitude dest: space: 'logins' - cast: - attempts: 'unsigned' \ No newline at end of file + column: + attempts: + cast: 'unsigned' + on_null: 0 \ No newline at end of file diff --git a/internal/config/cfg.go b/internal/config/cfg.go index e6ac154..1374363 100644 --- a/internal/config/cfg.go +++ b/internal/config/cfg.go @@ -134,11 +134,16 @@ type Mapping struct { } `yaml:"source"` Dest struct { - Space string `yaml:"space"` - Cast map[string]string `yaml:"cast"` + Space string `yaml:"space"` + Column map[string]MappingColumn `yaml:"column"` } `yaml:"dest"` } +type MappingColumn struct { + Cast string `yaml:"cast"` + OnNull interface{} `yaml:"on_null,omitempty"` +} + func ReadFromFile(path string) (*Config, error) { file, err := os.Open(path) if err != nil { diff --git a/internal/config/cfg_test.go b/internal/config/cfg_test.go index f5e4e70..19aa97b 100644 --- a/internal/config/cfg_test.go +++ b/internal/config/cfg_test.go @@ -67,6 +67,20 @@ func TestReadFromFile_ValidPath(t *testing.T) { assert.Equal(t, "users", mapping.Source.Table) assert.Equal(t, []string{"username", "password", "email"}, mapping.Source.Columns) assert.Equal(t, "users", mapping.Dest.Space) - assert.Len(t, mapping.Dest.Cast, 1) - assert.Contains(t, mapping.Dest.Cast, "attempts") + assert.Len(t, mapping.Dest.Column, 3) + columnMapping, ok := mapping.Dest.Column["attempts"] + if assert.True(t, ok) { + assert.Equal(t, "unsigned", columnMapping.Cast) + assert.Equal(t, 0, columnMapping.OnNull) + } + columnMapping, ok = mapping.Dest.Column["email"] + if assert.True(t, ok) { + assert.Equal(t, "", columnMapping.Cast) + assert.Equal(t, "", columnMapping.OnNull) + } + columnMapping, ok = mapping.Dest.Column["client_id"] + if assert.True(t, ok) { + assert.Equal(t, "unsigned", columnMapping.Cast) + assert.Nil(t, columnMapping.OnNull) + } } diff --git a/internal/config/testdata/replicator.conf.yml b/internal/config/testdata/replicator.conf.yml index bdbba28..f71ccbc 100644 --- a/internal/config/testdata/replicator.conf.yml +++ b/internal/config/testdata/replicator.conf.yml @@ -45,5 +45,11 @@ replication: - email dest: space: 'users' - cast: - attempts: 'unsigned' + column: + attempts: + cast: 'unsigned' + on_null: 0 + email: + on_null: '' + client_id: + cast: 'unsigned'