Skip to content
This repository has been archived by the owner on Sep 30, 2024. It is now read-only.

Commit

Permalink
Merge branch 'master' into mariadb-gtid-io-thread
Browse files Browse the repository at this point in the history
  • Loading branch information
shlomi-noach authored Nov 19, 2020
2 parents 0339367 + e305ad0 commit 4b95079
Show file tree
Hide file tree
Showing 7 changed files with 161 additions and 18 deletions.
20 changes: 20 additions & 0 deletions docs/tags.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,26 @@ vttablet_alias=dc1-0123456789
listed tags are sorted by name.
Note that we added `old-hardware` tag without value. It exports as `old-hardware=`, with implicit empty value.


### Listing instance tags for a cluster

For a given instance or cluster alias `-c topology-tags` or `api/topology-tags` lists the cluster topology with all known tags for each instance.

Example:
```shell
$ orchestrator-client -c tag -i db-host-01:3306 --tag vttablet_alias=dc1-0123456789
$ orchestrator-client -c tag -i db-host-01:3306 --tag old-hardware

$ orchestrator-client -c topology-tags -alias mycluster
db-host-01:3306 [0s,ok,5.7.23-log,rw,ROW,>>,GTID,P-GTID] [vttablet_alias=dc1-0123456789, old-hardware]
+ db-host-02:3306 [0s,ok,5.7.23-log,ro,ROW,>>,GTID,P-GTID] []

$ orchestrator-client -c topology-tags -i db-host-01:3306
db-host-01:3306 [0s,ok,5.7.23-log,rw,ROW,>>,GTID,P-GTID] [vttablet_alias=dc1-0123456789, old-hardware]
+ db-host-02:3306 [0s,ok,5.7.23-log,ro,ROW,>>,GTID,P-GTID] []
```


### Getting the value of a specific tag

`-c tag-value` or `api/tag-value` return the value of a specific tag on an instance.
Expand Down
26 changes: 26 additions & 0 deletions go/http/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -1577,6 +1577,31 @@ func (this *HttpAPI) DisableSemiSyncReplica(params martini.Params, r render.Rend
this.setSemiSyncReplica(params, r, req, user, false)
}

// DelayReplication delays replication on given instance with given seconds
func (this *HttpAPI) DelayReplication(params martini.Params, r render.Render, req *http.Request, user auth.User) {
if !isAuthorizedForAction(req, user) {
Respond(r, &APIResponse{Code: ERROR, Message: "Unauthorized"})
return
}
instanceKey, err := this.getInstanceKey(params["host"], params["port"])
if err != nil {
Respond(r, &APIResponse{Code: ERROR, Message: err.Error()})
return
}
seconds, err := strconv.Atoi(params["seconds"])
if err != nil {
Respond(r, &APIResponse{Code: ERROR, Message: "Invalid value provided for seconds"})
return
}
err = inst.DelayReplication(&instanceKey, seconds)
if err != nil {
Respond(r, &APIResponse{Code: ERROR, Message: err.Error()})
return
}

Respond(r, &APIResponse{Code: OK, Message: fmt.Sprintf("Replication delayed: %+v", instanceKey), Details: seconds})
}

// SetReadOnly sets the global read_only variable
func (this *HttpAPI) SetReadOnly(params martini.Params, r render.Render, req *http.Request, user auth.User) {
if !isAuthorizedForAction(req, user) {
Expand Down Expand Up @@ -3724,6 +3749,7 @@ func (this *HttpAPI) RegisterRequests(m *martini.ClassicMartini) {
this.registerAPIRequest(m, "disable-semi-sync-master/:host/:port", this.DisableSemiSyncMaster)
this.registerAPIRequest(m, "enable-semi-sync-replica/:host/:port", this.EnableSemiSyncReplica)
this.registerAPIRequest(m, "disable-semi-sync-replica/:host/:port", this.DisableSemiSyncReplica)
this.registerAPIRequest(m, "delay-replication/:host/:port/:seconds", this.DelayReplication)

// Replication information:
this.registerAPIRequest(m, "can-replicate-from/:host/:port/:belowHost/:belowPort", this.CanReplicateFrom)
Expand Down
16 changes: 16 additions & 0 deletions go/inst/instance_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -2639,6 +2639,14 @@ func mkInsertOdkuForInstances(instances []*Instance, instanceWasActuallyFound bo
}

// writeManyInstances stores instances in the orchestrator backend
//
// This may trigger this error: Error 1390: Prepared statement contains too
// many placeholders if you are monitoring several thousand servers as it
// may trigger the total number of place holders to exceed (uint)UINT_MAX16
// in the MySQL source code.

const tooManyPlaceholders = "Error 1390: Prepared statement contains too many placeholders"

func writeManyInstances(instances []*Instance, instanceWasActuallyFound bool, updateLastSeen bool) error {
writeInstances := [](*Instance){}
for _, instance := range instances {
Expand All @@ -2655,6 +2663,14 @@ func writeManyInstances(instances []*Instance, instanceWasActuallyFound bool, up
return err
}
if _, err := db.ExecOrchestrator(sql, args...); err != nil {
if strings.Contains(err.Error(), tooManyPlaceholders) {
return fmt.Errorf("writeManyInstances(?,%v,%v): error: %+v, len(instances): %v, len(args): %v. Reduce InstanceWriteBufferSize to avoid len(args) being > 64k, a limit in the MySQL source code.",
instanceWasActuallyFound,
updateLastSeen,
err.Error(),
len(writeInstances),
len(args))
}
return err
}
return nil
Expand Down
4 changes: 2 additions & 2 deletions go/inst/instance_topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -1023,9 +1023,9 @@ func MakeCoMaster(instanceKey *InstanceKey) (*Instance, error) {
}
if !master.HasReplicationCredentials {
// Let's try , if possible, to get credentials from replica. Best effort.
if replicationUser, replicationPassword, credentialsErr := ReadReplicationCredentials(&instance.Key); credentialsErr == nil {
if credentials, credentialsErr := ReadReplicationCredentials(&instance.Key); credentialsErr == nil {
log.Debugf("Got credentials from a replica. will now apply")
_, err = ChangeMasterCredentials(&master.Key, replicationUser, replicationPassword)
_, err = ChangeMasterCredentials(&master.Key, credentials)
if err != nil {
goto Cleanup
}
Expand Down
89 changes: 78 additions & 11 deletions go/inst/instance_topology_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,14 @@ const (
Error1201CouldnotInitializeMasterInfoStructure = "Error 1201:"
)

type ReplicationCredentials struct {
User string
Password string
SSLCert string
SSLKey string
SSLCaCert string
}

// ExecInstance executes a given query on the given MySQL topology instance
func ExecInstance(instanceKey *InstanceKey, query string, args ...interface{}) (sql.Result, error) {
db, err := db.OpenTopology(instanceKey.Hostname, instanceKey.Port)
Expand Down Expand Up @@ -587,13 +595,35 @@ func EnableSemiSync(instanceKey *InstanceKey, master, replica bool) error {
return err
}

// DelayReplication set the replication delay given seconds
// keeping the current state of the replication threads.
func DelayReplication(instanceKey *InstanceKey, seconds int) error {
if seconds < 0 {
return fmt.Errorf("invalid seconds: %d, it should be greater or equal to 0", seconds)
}
query := fmt.Sprintf("change master to master_delay=%d", seconds)
statements, err := GetReplicationRestartPreserveStatements(instanceKey, query)
if err != nil {
return err
}
for _, cmd := range statements {
if _, err := ExecInstance(instanceKey, cmd); err != nil {
return log.Errorf("%+v: DelayReplication: '%q' failed: %+v", *instanceKey, cmd, err)
} else {
log.Infof("DelayReplication: %s on %+v", cmd, *instanceKey)
}
}
AuditOperation("delay-replication", instanceKey, fmt.Sprintf("set to %d", seconds))
return nil
}

// ChangeMasterCredentials issues a CHANGE MASTER TO... MASTER_USER=, MASTER_PASSWORD=...
func ChangeMasterCredentials(instanceKey *InstanceKey, masterUser string, masterPassword string) (*Instance, error) {
func ChangeMasterCredentials(instanceKey *InstanceKey, creds *ReplicationCredentials) (*Instance, error) {
instance, err := ReadTopologyInstance(instanceKey)
if err != nil {
return instance, log.Errore(err)
}
if masterUser == "" {
if creds.User == "" {
return instance, log.Errorf("Empty user in ChangeMasterCredentials() for %+v", *instanceKey)
}

Expand All @@ -605,8 +635,38 @@ func ChangeMasterCredentials(instanceKey *InstanceKey, masterUser string, master
if *config.RuntimeCLIFlags.Noop {
return instance, fmt.Errorf("noop: aborting CHANGE MASTER TO operation on %+v; signalling error but nothing went wrong.", *instanceKey)
}
_, err = ExecInstance(instanceKey, "change master to master_user=?, master_password=?",
masterUser, masterPassword)

var query_params []string
var query_params_args []interface{}

// User
query_params = append(query_params, "master_user = ?")
query_params_args = append(query_params_args, creds.User)
// Password
if creds.Password != "" {
query_params = append(query_params, "master_password = ?")
query_params_args = append(query_params_args, creds.Password)
}

// SSL CA cert
if creds.SSLCaCert != "" {
query_params = append(query_params, "master_ssl_ca = ?")
query_params_args = append(query_params_args, creds.SSLCaCert)
}
// SSL cert
if creds.SSLCert != "" {
query_params = append(query_params, "master_ssl_cert = ?")
query_params_args = append(query_params_args, creds.SSLCert)
}
// SSL key
if creds.SSLKey != "" {
query_params = append(query_params, "master_ssl = 1")
query_params = append(query_params, "master_ssl_key = ?")
query_params_args = append(query_params_args, creds.SSLKey)
}

query := fmt.Sprintf("change master to %s", strings.Join(query_params, ", "))
_, err = ExecInstance(instanceKey, query, query_params_args...)

if err != nil {
return instance, log.Errore(err)
Expand Down Expand Up @@ -982,14 +1042,21 @@ func MasterPosWait(instanceKey *InstanceKey, binlogCoordinates *BinlogCoordinate
}

// Attempt to read and return replication credentials from the mysql.slave_master_info system table
func ReadReplicationCredentials(instanceKey *InstanceKey) (replicationUser string, replicationPassword string, err error) {
func ReadReplicationCredentials(instanceKey *InstanceKey) (creds *ReplicationCredentials, err error) {
creds = &ReplicationCredentials{}
if config.Config.ReplicationCredentialsQuery != "" {
err = ScanInstanceRow(instanceKey, config.Config.ReplicationCredentialsQuery, &replicationUser, &replicationPassword)
if err == nil && replicationUser == "" {
err = ScanInstanceRow(instanceKey, config.Config.ReplicationCredentialsQuery,
&creds.User,
&creds.Password,
&creds.SSLCaCert,
&creds.SSLCert,
&creds.SSLKey,
)

This comment has been minimized.

Copy link
@laurent-indermuehle

laurent-indermuehle Jun 24, 2021

Contributor

@shlomi-noach these lines broke my ReplicationCredentialsQuery.
The documentation says: "custom query to get replication credentials. Must return a single row, with two text columns: 1st is username, 2nd is password." which is now wrong.
This sentence comes from: https://github.com/openark/orchestrator/blob/88867e684eb68b2918fa7cecb97c383bdde3898d/go/config/config.go

if err == nil && creds.User == "" {
err = fmt.Errorf("Empty username retrieved by ReplicationCredentialsQuery")
}
if err == nil {
return replicationUser, replicationPassword, nil
return creds, nil
}
log.Errore(err)
}
Expand All @@ -1003,12 +1070,12 @@ func ReadReplicationCredentials(instanceKey *InstanceKey) (replicationUser strin
from
mysql.slave_master_info
`
err = ScanInstanceRow(instanceKey, query, &replicationUser, &replicationPassword)
if err == nil && replicationUser == "" {
err = ScanInstanceRow(instanceKey, query, &creds.User, &creds.Password)
if err == nil && creds.User == "" {
err = fmt.Errorf("Empty username found in mysql.slave_master_info")
}
}
return replicationUser, replicationPassword, log.Errore(err)
return creds, log.Errore(err)
}

// SetReadOnly sets or clears the instance's global read_only variable
Expand Down
4 changes: 2 additions & 2 deletions go/logic/topology_recovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -2048,7 +2048,7 @@ func GracefulMasterTakeover(clusterName string, designatedKey *inst.InstanceKey,
}
log.Infof("GracefulMasterTakeover: Will demote %+v and promote %+v instead", clusterMaster.Key, designatedInstance.Key)

replicationUser, replicationPassword, replicationCredentialsError := inst.ReadReplicationCredentials(&designatedInstance.Key)
replicationCreds, replicationCredentialsError := inst.ReadReplicationCredentials(&designatedInstance.Key)

analysisEntry, err := forceAnalysisEntry(clusterName, inst.DeadMaster, inst.GracefulMasterTakeoverCommandHint, &clusterMaster.Key)
if err != nil {
Expand Down Expand Up @@ -2099,7 +2099,7 @@ func GracefulMasterTakeover(clusterName string, designatedKey *inst.InstanceKey,
log.Errorf("GracefulMasterTakeover: sanity problem. Demoted master's coordinates changed from %+v to %+v while supposed to have been frozen", *demotedMasterSelfBinlogCoordinates, clusterMaster.SelfBinlogCoordinates)
}
if !clusterMaster.HasReplicationCredentials && replicationCredentialsError == nil {
_, credentialsErr := inst.ChangeMasterCredentials(&clusterMaster.Key, replicationUser, replicationPassword)
_, credentialsErr := inst.ChangeMasterCredentials(&clusterMaster.Key, replicationCreds)
if err == nil {
err = credentialsErr
}
Expand Down
20 changes: 17 additions & 3 deletions resources/bin/orchestrator-client
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ api_path=
basic_auth="${ORCHESTRATOR_AUTH_USER:-}:${ORCHESTRATOR_AUTH_PASSWORD:-}"
headers_auth="${ORCHESTRATOR_AUTH_USER_HEADER}"
binlog=
seconds=

instance_hostport=
destination_hostport=
Expand Down Expand Up @@ -92,11 +93,12 @@ for arg in "$@"; do
"-auth"|"--auth") set -- "$@" "-b" ;;
"-headers-auth"|"--headers-auth") set -- "$@" "-e" ;;
"-binlog"|"--binlog") set -- "$@" "-n" ;;
"-seconds"|"--seconds") set -- "$@" "-S" ;;
*) set -- "$@" "$arg"
esac
done

while getopts "c:i:d:s:a:D:U:o:r:u:R:t:l:H:P:q:b:e:n:h" OPTION
while getopts "c:i:d:s:a:D:U:o:r:u:R:t:l:H:P:q:b:e:n:h:S:" OPTION
do
case $OPTION in
h) command="help" ;;
Expand All @@ -118,7 +120,8 @@ do
b) basic_auth="$OPTARG" ;;
e) headers_auth="$OPTARG" ;;
n) binlog="$OPTARG" ;;
q) query="$OPTARG"
q) query="$OPTARG" ;;
S) seconds="$OPTARG"
esac
done

Expand Down Expand Up @@ -372,6 +375,8 @@ function prompt_help {
pool name for pool related commands
-H <hostname> -h <hostname>
indicate host for resolve and raft operations
-S <seconds> --seconds
seconds for delaying replication
"

cat "$0" | universal_sed -n '/run_command/,/esac/p' | egrep '".*"[)].*;;' | universal_sed -r -e 's/"(.*?)".*#(.*)/\1~\2/' | column -t -s "~"
Expand Down Expand Up @@ -756,6 +761,15 @@ function general_instance_command {
print_details | filter_key | print_key
}

function delay_replication_command {
path="${1:-$command}"

assert_nonempty "instance" "$instance_hostport"
assert_nonempty "seconds" "$seconds"
api "$path/$instance_hostport/$seconds"
print_details
}

function replication_analysis {
api "replication-analysis"
print_details | jq -r '.[] |
Expand Down Expand Up @@ -978,7 +992,7 @@ function run_command {
"enable-semi-sync-replica") general_instance_command ;; # Enable semi-sync (replica-side)
"disable-semi-sync-replica") general_instance_command ;; # Disable semi-sync (replica-side)
"restart-replica-statements") restart_replica_statements ;; # Given `-q "<query>"` that requires replication restart to apply, wrap query with stop/start slave statements as required to restore instance to same replication state. Print out set of statements

"delay-replication") delay_replication_command ;; # Issue a CHANGE MASTER TO DELAY=seconds preserving the replication threads state
"can-replicate-from") can_replicate_from ;; # Check if an instance can potentially replicate from another, according to replication rules
"can-replicate-from-gtid") can_replicate_from_gtid ;; # Check if an instance can potentially replicate from another, according to replication rules and assuming Oracle GTID
"is-replicating") is_replicating ;; # Check if an instance is replicating at this time (both SQL and IO threads running)
Expand Down

0 comments on commit 4b95079

Please sign in to comment.