Skip to content
This repository has been archived by the owner on Sep 30, 2024. It is now read-only.

add replication delay command #1267

Merged
merged 8 commits into from
Nov 19, 2020
26 changes: 26 additions & 0 deletions go/http/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -1577,6 +1577,31 @@ func (this *HttpAPI) DisableSemiSyncReplica(params martini.Params, r render.Rend
this.setSemiSyncReplica(params, r, req, user, false)
}

// DelayReplication delays replication on given instance with given seconds
func (this *HttpAPI) DelayReplication(params martini.Params, r render.Render, req *http.Request, user auth.User) {
if !isAuthorizedForAction(req, user) {
Respond(r, &APIResponse{Code: ERROR, Message: "Unauthorized"})
return
}
instanceKey, err := this.getInstanceKey(params["host"], params["port"])
if err != nil {
Respond(r, &APIResponse{Code: ERROR, Message: err.Error()})
return
}
seconds, err := strconv.Atoi(params["seconds"])
if err != nil || seconds < 0 {
Respond(r, &APIResponse{Code: ERROR, Message: "Invalid value provided for seconds"})
return
}
err = inst.DelayReplication(&instanceKey, seconds)
if err != nil {
Respond(r, &APIResponse{Code: ERROR, Message: err.Error()})
return
}

Respond(r, &APIResponse{Code: OK, Message: fmt.Sprintf("Replication delayed: %+v", instanceKey), Details: nil})
marcosvm marked this conversation as resolved.
Show resolved Hide resolved
}

// SetReadOnly sets the global read_only variable
func (this *HttpAPI) SetReadOnly(params martini.Params, r render.Render, req *http.Request, user auth.User) {
if !isAuthorizedForAction(req, user) {
Expand Down Expand Up @@ -3724,6 +3749,7 @@ func (this *HttpAPI) RegisterRequests(m *martini.ClassicMartini) {
this.registerAPIRequest(m, "disable-semi-sync-master/:host/:port", this.DisableSemiSyncMaster)
this.registerAPIRequest(m, "enable-semi-sync-replica/:host/:port", this.EnableSemiSyncReplica)
this.registerAPIRequest(m, "disable-semi-sync-replica/:host/:port", this.DisableSemiSyncReplica)
this.registerAPIRequest(m, "delay-replication/:host/:port/:seconds", this.DelayReplication)

// Replication information:
this.registerAPIRequest(m, "can-replicate-from/:host/:port/:belowHost/:belowPort", this.CanReplicateFrom)
Expand Down
18 changes: 18 additions & 0 deletions go/inst/instance_topology_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -595,6 +595,24 @@ func EnableSemiSync(instanceKey *InstanceKey, master, replica bool) error {
return err
}

// DelayReplication set the replication delay given seconds
// keeping the current state of the replication threads.
func DelayReplication(instanceKey *InstanceKey, seconds int) error {
marcosvm marked this conversation as resolved.
Show resolved Hide resolved
query := fmt.Sprintf("change master to master_delay=%d", seconds)
statements, err := GetReplicationRestartPreserveStatements(instanceKey, query)
if err != nil {
return err
}
for _, cmd := range statements {
if _, err := ExecInstance(instanceKey, cmd); err != nil {
return log.Errorf("%+v: DelayReplication: '%q' failed: %+v", *instanceKey, cmd, err)
} else {
log.Infof("%s on %+v as part of DelayReplication", cmd, *instanceKey)
marcosvm marked this conversation as resolved.
Show resolved Hide resolved
}
}
return nil
marcosvm marked this conversation as resolved.
Show resolved Hide resolved
}

// ChangeMasterCredentials issues a CHANGE MASTER TO... MASTER_USER=, MASTER_PASSWORD=...
func ChangeMasterCredentials(instanceKey *InstanceKey, creds *ReplicationCredentials) (*Instance, error) {
instance, err := ReadTopologyInstance(instanceKey)
Expand Down