Skip to content
This repository has been archived by the owner on Sep 30, 2024. It is now read-only.

Commit

Permalink
Merge pull request #1267 from marcosvm/marcosvm/deploy-replication
Browse files Browse the repository at this point in the history
thank you!
  • Loading branch information
shlomi-noach authored Nov 19, 2020
2 parents 97eb9c2 + adfffdf commit 8ba9274
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 3 deletions.
26 changes: 26 additions & 0 deletions go/http/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -1577,6 +1577,31 @@ func (this *HttpAPI) DisableSemiSyncReplica(params martini.Params, r render.Rend
this.setSemiSyncReplica(params, r, req, user, false)
}

// DelayReplication delays replication on given instance with given seconds
func (this *HttpAPI) DelayReplication(params martini.Params, r render.Render, req *http.Request, user auth.User) {
if !isAuthorizedForAction(req, user) {
Respond(r, &APIResponse{Code: ERROR, Message: "Unauthorized"})
return
}
instanceKey, err := this.getInstanceKey(params["host"], params["port"])
if err != nil {
Respond(r, &APIResponse{Code: ERROR, Message: err.Error()})
return
}
seconds, err := strconv.Atoi(params["seconds"])
if err != nil {
Respond(r, &APIResponse{Code: ERROR, Message: "Invalid value provided for seconds"})
return
}
err = inst.DelayReplication(&instanceKey, seconds)
if err != nil {
Respond(r, &APIResponse{Code: ERROR, Message: err.Error()})
return
}

Respond(r, &APIResponse{Code: OK, Message: fmt.Sprintf("Replication delayed: %+v", instanceKey), Details: seconds})
}

// SetReadOnly sets the global read_only variable
func (this *HttpAPI) SetReadOnly(params martini.Params, r render.Render, req *http.Request, user auth.User) {
if !isAuthorizedForAction(req, user) {
Expand Down Expand Up @@ -3724,6 +3749,7 @@ func (this *HttpAPI) RegisterRequests(m *martini.ClassicMartini) {
this.registerAPIRequest(m, "disable-semi-sync-master/:host/:port", this.DisableSemiSyncMaster)
this.registerAPIRequest(m, "enable-semi-sync-replica/:host/:port", this.EnableSemiSyncReplica)
this.registerAPIRequest(m, "disable-semi-sync-replica/:host/:port", this.DisableSemiSyncReplica)
this.registerAPIRequest(m, "delay-replication/:host/:port/:seconds", this.DelayReplication)

// Replication information:
this.registerAPIRequest(m, "can-replicate-from/:host/:port/:belowHost/:belowPort", this.CanReplicateFrom)
Expand Down
22 changes: 22 additions & 0 deletions go/inst/instance_topology_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -595,6 +595,28 @@ func EnableSemiSync(instanceKey *InstanceKey, master, replica bool) error {
return err
}

// DelayReplication set the replication delay given seconds
// keeping the current state of the replication threads.
func DelayReplication(instanceKey *InstanceKey, seconds int) error {
if seconds < 0 {
return fmt.Errorf("invalid seconds: %d, it should be greater or equal to 0", seconds)
}
query := fmt.Sprintf("change master to master_delay=%d", seconds)
statements, err := GetReplicationRestartPreserveStatements(instanceKey, query)
if err != nil {
return err
}
for _, cmd := range statements {
if _, err := ExecInstance(instanceKey, cmd); err != nil {
return log.Errorf("%+v: DelayReplication: '%q' failed: %+v", *instanceKey, cmd, err)
} else {
log.Infof("DelayReplication: %s on %+v", cmd, *instanceKey)
}
}
AuditOperation("delay-replication", instanceKey, fmt.Sprintf("set to %d", seconds))
return nil
}

// ChangeMasterCredentials issues a CHANGE MASTER TO... MASTER_USER=, MASTER_PASSWORD=...
func ChangeMasterCredentials(instanceKey *InstanceKey, creds *ReplicationCredentials) (*Instance, error) {
instance, err := ReadTopologyInstance(instanceKey)
Expand Down
20 changes: 17 additions & 3 deletions resources/bin/orchestrator-client
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ api_path=
basic_auth="${ORCHESTRATOR_AUTH_USER:-}:${ORCHESTRATOR_AUTH_PASSWORD:-}"
headers_auth="${ORCHESTRATOR_AUTH_USER_HEADER}"
binlog=
seconds=

instance_hostport=
destination_hostport=
Expand Down Expand Up @@ -92,11 +93,12 @@ for arg in "$@"; do
"-auth"|"--auth") set -- "$@" "-b" ;;
"-headers-auth"|"--headers-auth") set -- "$@" "-e" ;;
"-binlog"|"--binlog") set -- "$@" "-n" ;;
"-seconds"|"--seconds") set -- "$@" "-S" ;;
*) set -- "$@" "$arg"
esac
done

while getopts "c:i:d:s:a:D:U:o:r:u:R:t:l:H:P:q:b:e:n:h" OPTION
while getopts "c:i:d:s:a:D:U:o:r:u:R:t:l:H:P:q:b:e:n:h:S:" OPTION
do
case $OPTION in
h) command="help" ;;
Expand All @@ -118,7 +120,8 @@ do
b) basic_auth="$OPTARG" ;;
e) headers_auth="$OPTARG" ;;
n) binlog="$OPTARG" ;;
q) query="$OPTARG"
q) query="$OPTARG" ;;
S) seconds="$OPTARG"
esac
done

Expand Down Expand Up @@ -372,6 +375,8 @@ function prompt_help {
pool name for pool related commands
-H <hostname> -h <hostname>
indicate host for resolve and raft operations
-S <seconds> --seconds
seconds for delaying replication
"

cat "$0" | universal_sed -n '/run_command/,/esac/p' | egrep '".*"[)].*;;' | universal_sed -r -e 's/"(.*?)".*#(.*)/\1~\2/' | column -t -s "~"
Expand Down Expand Up @@ -756,6 +761,15 @@ function general_instance_command {
print_details | filter_key | print_key
}

function delay_replication_command {
path="${1:-$command}"

assert_nonempty "instance" "$instance_hostport"
assert_nonempty "seconds" "$seconds"
api "$path/$instance_hostport/$seconds"
print_details
}

function replication_analysis {
api "replication-analysis"
print_details | jq -r '.[] |
Expand Down Expand Up @@ -978,7 +992,7 @@ function run_command {
"enable-semi-sync-replica") general_instance_command ;; # Enable semi-sync (replica-side)
"disable-semi-sync-replica") general_instance_command ;; # Disable semi-sync (replica-side)
"restart-replica-statements") restart_replica_statements ;; # Given `-q "<query>"` that requires replication restart to apply, wrap query with stop/start slave statements as required to restore instance to same replication state. Print out set of statements

"delay-replication") delay_replication_command ;; # Issue a CHANGE MASTER TO DELAY=seconds preserving the replication threads state
"can-replicate-from") can_replicate_from ;; # Check if an instance can potentially replicate from another, according to replication rules
"can-replicate-from-gtid") can_replicate_from_gtid ;; # Check if an instance can potentially replicate from another, according to replication rules and assuming Oracle GTID
"is-replicating") is_replicating ;; # Check if an instance is replicating at this time (both SQL and IO threads running)
Expand Down

0 comments on commit 8ba9274

Please sign in to comment.