Skip to content

Commit

Permalink
[MySQL] Supporting GTID (#607)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 authored Dec 18, 2024
1 parent 02cee52 commit 7b37af0
Show file tree
Hide file tree
Showing 8 changed files with 126 additions and 69 deletions.
5 changes: 1 addition & 4 deletions config/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,7 @@ import (
)

type MySQLStreamingSettings struct {
Enabled bool `yaml:"enabled,omitempty"`
// TODO: Remove TODO once GTID is fully functional.
EnableGTID bool `yaml:"enableGTID,omitempty"`

Enabled bool `yaml:"enabled,omitempty"`
OffsetFile string `yaml:"offsetFile,omitempty"`
SchemaHistoryFile string `yaml:"schemaHistoryFile,omitempty"`
// ServerID - Unique ID in the cluster.
Expand Down
6 changes: 4 additions & 2 deletions sources/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,19 @@ func Load(ctx context.Context, cfg config.MySQL) (sources.Source, bool, error) {
return nil, false, fmt.Errorf("failed to connect to MySQL: %w", err)
}

settings, err := retrieveSettings(db)
settings, err := retrieveSettings(ctx, db)
if err != nil {
return nil, false, fmt.Errorf("failed to retrieve MySQL settings: %w", err)
}

slog.Info("Loading MySQL connector",
slog.String("version", settings.Version),
slog.Any("sqlMode", settings.SQLMode),
slog.Bool("gtidEnabled", settings.GTIDEnabled),
)

if cfg.StreamingSettings.Enabled {
stream, err := buildStreamingConfig(ctx, db, cfg, settings.SQLMode)
stream, err := buildStreamingConfig(ctx, db, cfg, settings.SQLMode, settings.GTIDEnabled)
if err != nil {
return nil, false, fmt.Errorf("failed to build streaming config: %w", err)
}
Expand Down
34 changes: 29 additions & 5 deletions sources/mysql/settings.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
package mysql

import (
"context"
"database/sql"
"fmt"
"strings"
)

type Settings struct {
Version string
SQLMode []string
Version string
SQLMode []string
GTIDEnabled bool
}

func retrieveSettings(db *sql.DB) (Settings, error) {
func retrieveSettings(ctx context.Context, db *sql.DB) (Settings, error) {
version, err := retrieveVersion(db)
if err != nil {
return Settings{}, fmt.Errorf("failed to retrieve MySQL version: %w", err)
Expand All @@ -22,9 +24,15 @@ func retrieveSettings(db *sql.DB) (Settings, error) {
return Settings{}, fmt.Errorf("failed to retrieve MySQL session sql_mode: %w", err)
}

gtidEnabled, err := hasGTIDEnabled(ctx, db)
if err != nil {
return Settings{}, fmt.Errorf("failed to check if GTID is enabled: %w", err)
}

return Settings{
Version: version,
SQLMode: sqlMode,
Version: version,
SQLMode: sqlMode,
GTIDEnabled: gtidEnabled,
}, nil
}

Expand All @@ -45,3 +53,19 @@ func retrieveSessionSQLMode(db *sql.DB) ([]string, error) {

return strings.Split(sqlMode, ","), nil
}

func hasGTIDEnabled(ctx context.Context, db *sql.DB) (bool, error) {
requiredVariables := []string{"gtid_mode", "enforce_gtid_consistency"}
for _, requiredVariable := range requiredVariables {
value, err := fetchVariable(ctx, db, requiredVariable)
if err != nil {
return false, err
}

if strings.ToUpper(value) != "ON" {
return false, nil
}
}

return true, nil
}
7 changes: 3 additions & 4 deletions sources/mysql/streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"database/sql"
"fmt"

"github.com/artie-labs/reader/config"
"github.com/artie-labs/reader/sources/mysql/streaming"
"github.com/artie-labs/reader/writers"
Expand All @@ -15,13 +14,13 @@ type Streaming struct {
db *sql.DB
}

func buildStreamingConfig(ctx context.Context, db *sql.DB, cfg config.MySQL, sqlMode []string) (Streaming, error) {
func buildStreamingConfig(ctx context.Context, db *sql.DB, cfg config.MySQL, sqlMode []string, gtidEnabled bool) (Streaming, error) {
// Validate to ensure that we can use streaming.
if err := ValidateMySQL(ctx, db, true, cfg.StreamingSettings.EnableGTID); err != nil {
if err := ValidateMySQL(ctx, db, true); err != nil {
return Streaming{}, fmt.Errorf("failed validation: %w", err)
}

iter, err := streaming.BuildStreamingIterator(db, cfg, sqlMode)
iter, err := streaming.BuildStreamingIterator(db, cfg, sqlMode, gtidEnabled)
if err != nil {
return Streaming{}, err
}
Expand Down
25 changes: 21 additions & 4 deletions sources/mysql/streaming/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func buildSchemaAdapter(db *sql.DB, cfg config.MySQL, schemaHistoryList persiste
return schemaAdapter, nil
}

func BuildStreamingIterator(db *sql.DB, cfg config.MySQL, sqlMode []string) (Iterator, error) {
func BuildStreamingIterator(db *sql.DB, cfg config.MySQL, sqlMode []string, gtidEnabled bool) (Iterator, error) {
var pos Position
offsets := persistedmap.NewPersistedMap[Position](cfg.StreamingSettings.OffsetFile)
if _pos, isOk := offsets.Get(offsetKey); isOk {
Expand Down Expand Up @@ -91,9 +91,22 @@ func BuildStreamingIterator(db *sql.DB, cfg config.MySQL, sqlMode []string) (Ite
},
)

streamer, err := syncer.StartSync(pos.ToMySQLPosition())
if err != nil {
return Iterator{}, fmt.Errorf("failed to start sync: %w", err)
var streamer *replication.BinlogStreamer
if gtidEnabled {
gtidSet, err := pos.ToGTIDSet()
if err != nil {
return Iterator{}, fmt.Errorf("failed to parse GTID: %w", err)
}

streamer, err = syncer.StartSyncGTID(gtidSet)
if err != nil {
return Iterator{}, fmt.Errorf("failed to start sync: %w", err)
}
} else {
streamer, err = syncer.StartSync(pos.ToMySQLPosition())
if err != nil {
return Iterator{}, fmt.Errorf("failed to start sync: %w", err)
}
}

return Iterator{
Expand Down Expand Up @@ -152,6 +165,10 @@ func (i *Iterator) Next() ([]lib.RawMessage, error) {

switch event.Header.EventType {
case
// We don't need these events, [GTID_EVENT] will contain the offsets via GTID sets, which is handled in [UpdatePosition]
replication.GTID_EVENT,
replication.PREVIOUS_GTIDS_EVENT,
replication.FORMAT_DESCRIPTION_EVENT,
replication.ANONYMOUS_GTID_EVENT,
replication.TABLE_MAP_EVENT,
// We don't need TableMapEvent because we are handling it by consuming DDL queries, applying it to our schema adapter
Expand Down
64 changes: 64 additions & 0 deletions sources/mysql/streaming/offset.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package streaming

import (
"fmt"
"time"

"github.com/artie-labs/transfer/lib/typing"
"github.com/go-mysql-org/go-mysql/mysql"
"github.com/go-mysql-org/go-mysql/replication"
)

type Position struct {
// Binlog position
File string `yaml:"file"`
Pos uint32 `yaml:"pos"`
// GTID
GTIDSet string `yaml:"gtidSet"`

UnixTs int64 `yaml:"unixTs"`
}

func (p Position) String() string {
return fmt.Sprintf("File: %q, Pos: %d, GTIDSet (if enabled): %q", p.File, p.Pos, p.GTIDSet)
}

func (p Position) ToGTIDSet() (mysql.GTIDSet, error) {
return mysql.ParseGTIDSet(mysql.MySQLFlavor, p.GTIDSet)
}

func (p Position) ToMySQLPosition() mysql.Position {
return mysql.Position{Name: p.File, Pos: p.Pos}
}

func (p *Position) UpdatePosition(ts time.Time, evt *replication.BinlogEvent) error {
// We should always update the log position
p.Pos = evt.Header.LogPos
p.UnixTs = ts.Unix()

if evt.Header.EventType == replication.GTID_EVENT {
gtidEvent, err := typing.AssertType[*replication.GTIDEvent](evt.Event)
if err != nil {
return err
}

set, err := gtidEvent.GTIDNext()
if err != nil {
return fmt.Errorf("failed to retrieve next GTID set: %w", err)
}

p.GTIDSet = set.String()
}

if evt.Header.EventType == replication.ROTATE_EVENT {
// When we encounter a rotate event, we'll then update the log file
rotate, err := typing.AssertType[*replication.RotateEvent](evt.Event)
if err != nil {
return err
}

p.File = string(rotate.NextLogName)
}

return nil
}
36 changes: 0 additions & 36 deletions sources/mysql/streaming/types.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,6 @@
package streaming

import (
"fmt"
"time"

"github.com/artie-labs/transfer/lib/typing"
"github.com/go-mysql-org/go-mysql/mysql"
"github.com/go-mysql-org/go-mysql/replication"

"github.com/artie-labs/reader/config"
Expand All @@ -31,34 +26,3 @@ type SchemaHistory struct {
Query string `json:"query"`
UnixTs int64 `json:"unixTs"`
}

type Position struct {
File string `yaml:"file"`
Pos uint32 `yaml:"pos"`
UnixTs int64 `yaml:"unixTs"`
}

func (p Position) String() string {
return fmt.Sprintf("File: %s, Pos: %d", p.File, p.Pos)
}

func (p Position) ToMySQLPosition() mysql.Position {
return mysql.Position{Name: p.File, Pos: p.Pos}
}

func (p *Position) UpdatePosition(ts time.Time, evt *replication.BinlogEvent) error {
// We should always update the log position
p.UnixTs = ts.Unix()
p.Pos = evt.Header.LogPos
if evt.Header.EventType == replication.ROTATE_EVENT {
// When we encounter a rotate event, we'll then update the log file
rotate, err := typing.AssertType[*replication.RotateEvent](evt.Event)
if err != nil {
return err
}

p.File = string(rotate.NextLogName)
}

return nil
}
18 changes: 4 additions & 14 deletions sources/mysql/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,25 +24,15 @@ func fetchVariable(ctx context.Context, db *sql.DB, name string) (string, error)
return value, nil
}

func ValidateMySQL(ctx context.Context, db *sql.DB, validateStreaming bool, validateGTID bool) error {
requiredVariableToValueMap := make(map[string]string)
func ValidateMySQL(ctx context.Context, db *sql.DB, validateStreaming bool) error {
if validateStreaming {
requiredVariableToValueMap["binlog_format"] = "ROW"
}

if validateGTID {
requiredVariableToValueMap["gtid_mode"] = "ON"
requiredVariableToValueMap["enforce_gtid_consistency"] = "ON"
}

for requiredVariable, requiredValue := range requiredVariableToValueMap {
value, err := fetchVariable(ctx, db, requiredVariable)
value, err := fetchVariable(ctx, db, "binlog_format")
if err != nil {
return err
}

if strings.ToUpper(value) != requiredValue {
return fmt.Errorf("%s must be set to %q, current value is %q", requiredVariable, requiredValue, value)
if strings.ToUpper(value) != "ROW" {
return fmt.Errorf("'binlog_format' must be set to 'ROW', current value is '%s'", value)
}
}

Expand Down

0 comments on commit 7b37af0

Please sign in to comment.