diff --git a/go/http/api.go b/go/http/api.go index 1220add3b..7e1625436 100644 --- a/go/http/api.go +++ b/go/http/api.go @@ -1577,6 +1577,31 @@ func (this *HttpAPI) DisableSemiSyncReplica(params martini.Params, r render.Rend this.setSemiSyncReplica(params, r, req, user, false) } +// DelayReplication delays replication on given instance with given seconds +func (this *HttpAPI) DelayReplication(params martini.Params, r render.Render, req *http.Request, user auth.User) { + if !isAuthorizedForAction(req, user) { + Respond(r, &APIResponse{Code: ERROR, Message: "Unauthorized"}) + return + } + instanceKey, err := this.getInstanceKey(params["host"], params["port"]) + if err != nil { + Respond(r, &APIResponse{Code: ERROR, Message: err.Error()}) + return + } + seconds, err := strconv.Atoi(params["seconds"]) + if err != nil { + Respond(r, &APIResponse{Code: ERROR, Message: "Invalid value provided for seconds"}) + return + } + err = inst.DelayReplication(&instanceKey, seconds) + if err != nil { + Respond(r, &APIResponse{Code: ERROR, Message: err.Error()}) + return + } + + Respond(r, &APIResponse{Code: OK, Message: fmt.Sprintf("Replication delayed: %+v", instanceKey), Details: seconds}) +} + // SetReadOnly sets the global read_only variable func (this *HttpAPI) SetReadOnly(params martini.Params, r render.Render, req *http.Request, user auth.User) { if !isAuthorizedForAction(req, user) { @@ -3724,6 +3749,7 @@ func (this *HttpAPI) RegisterRequests(m *martini.ClassicMartini) { this.registerAPIRequest(m, "disable-semi-sync-master/:host/:port", this.DisableSemiSyncMaster) this.registerAPIRequest(m, "enable-semi-sync-replica/:host/:port", this.EnableSemiSyncReplica) this.registerAPIRequest(m, "disable-semi-sync-replica/:host/:port", this.DisableSemiSyncReplica) + this.registerAPIRequest(m, "delay-replication/:host/:port/:seconds", this.DelayReplication) // Replication information: this.registerAPIRequest(m, "can-replicate-from/:host/:port/:belowHost/:belowPort", this.CanReplicateFrom) diff --git a/go/inst/instance_topology_dao.go b/go/inst/instance_topology_dao.go index 32d18251c..6848e8157 100644 --- a/go/inst/instance_topology_dao.go +++ b/go/inst/instance_topology_dao.go @@ -595,6 +595,28 @@ func EnableSemiSync(instanceKey *InstanceKey, master, replica bool) error { return err } +// DelayReplication set the replication delay given seconds +// keeping the current state of the replication threads. +func DelayReplication(instanceKey *InstanceKey, seconds int) error { + if seconds < 0 { + return fmt.Errorf("invalid seconds: %d, it should be greater or equal to 0", seconds) + } + query := fmt.Sprintf("change master to master_delay=%d", seconds) + statements, err := GetReplicationRestartPreserveStatements(instanceKey, query) + if err != nil { + return err + } + for _, cmd := range statements { + if _, err := ExecInstance(instanceKey, cmd); err != nil { + return log.Errorf("%+v: DelayReplication: '%q' failed: %+v", *instanceKey, cmd, err) + } else { + log.Infof("DelayReplication: %s on %+v", cmd, *instanceKey) + } + } + AuditOperation("delay-replication", instanceKey, fmt.Sprintf("set to %d", seconds)) + return nil +} + // ChangeMasterCredentials issues a CHANGE MASTER TO... MASTER_USER=, MASTER_PASSWORD=... func ChangeMasterCredentials(instanceKey *InstanceKey, creds *ReplicationCredentials) (*Instance, error) { instance, err := ReadTopologyInstance(instanceKey) diff --git a/resources/bin/orchestrator-client b/resources/bin/orchestrator-client index d68b6ee4c..cd913e32c 100755 --- a/resources/bin/orchestrator-client +++ b/resources/bin/orchestrator-client @@ -63,6 +63,7 @@ api_path= basic_auth="${ORCHESTRATOR_AUTH_USER:-}:${ORCHESTRATOR_AUTH_PASSWORD:-}" headers_auth="${ORCHESTRATOR_AUTH_USER_HEADER}" binlog= +seconds= instance_hostport= destination_hostport= @@ -92,11 +93,12 @@ for arg in "$@"; do "-auth"|"--auth") set -- "$@" "-b" ;; "-headers-auth"|"--headers-auth") set -- "$@" "-e" ;; "-binlog"|"--binlog") set -- "$@" "-n" ;; + "-seconds"|"--seconds") set -- "$@" "-S" ;; *) set -- "$@" "$arg" esac done -while getopts "c:i:d:s:a:D:U:o:r:u:R:t:l:H:P:q:b:e:n:h" OPTION +while getopts "c:i:d:s:a:D:U:o:r:u:R:t:l:H:P:q:b:e:n:h:S:" OPTION do case $OPTION in h) command="help" ;; @@ -118,7 +120,8 @@ do b) basic_auth="$OPTARG" ;; e) headers_auth="$OPTARG" ;; n) binlog="$OPTARG" ;; - q) query="$OPTARG" + q) query="$OPTARG" ;; + S) seconds="$OPTARG" esac done @@ -372,6 +375,8 @@ function prompt_help { pool name for pool related commands -H -h indicate host for resolve and raft operations + -S --seconds + seconds for delaying replication " cat "$0" | universal_sed -n '/run_command/,/esac/p' | egrep '".*"[)].*;;' | universal_sed -r -e 's/"(.*?)".*#(.*)/\1~\2/' | column -t -s "~" @@ -756,6 +761,15 @@ function general_instance_command { print_details | filter_key | print_key } +function delay_replication_command { + path="${1:-$command}" + + assert_nonempty "instance" "$instance_hostport" + assert_nonempty "seconds" "$seconds" + api "$path/$instance_hostport/$seconds" + print_details +} + function replication_analysis { api "replication-analysis" print_details | jq -r '.[] | @@ -978,7 +992,7 @@ function run_command { "enable-semi-sync-replica") general_instance_command ;; # Enable semi-sync (replica-side) "disable-semi-sync-replica") general_instance_command ;; # Disable semi-sync (replica-side) "restart-replica-statements") restart_replica_statements ;; # Given `-q ""` that requires replication restart to apply, wrap query with stop/start slave statements as required to restore instance to same replication state. Print out set of statements - + "delay-replication") delay_replication_command ;; # Issue a CHANGE MASTER TO DELAY=seconds preserving the replication threads state "can-replicate-from") can_replicate_from ;; # Check if an instance can potentially replicate from another, according to replication rules "can-replicate-from-gtid") can_replicate_from_gtid ;; # Check if an instance can potentially replicate from another, according to replication rules and assuming Oracle GTID "is-replicating") is_replicating ;; # Check if an instance is replicating at this time (both SQL and IO threads running)