Skip to content
This repository has been archived by the owner on Sep 30, 2024. It is now read-only.

Commit

Permalink
Merge pull request #825 from github/orchestrator-client-purge-binlogs
Browse files Browse the repository at this point in the history
purge-binary-logs API, safe operation
  • Loading branch information
Shlomi Noach authored Mar 3, 2019
2 parents 792f42d + 480acd4 commit 5bd6e2a
Show file tree
Hide file tree
Showing 7 changed files with 85 additions and 24 deletions.
2 changes: 1 addition & 1 deletion go/app/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -803,7 +803,7 @@ func Cli(command string, strict bool, instance string, destination string, owner
log.Fatal("expecting --binlog value")
}

_, err = inst.PurgeBinaryLogsTo(instanceKey, *config.RuntimeCLIFlags.BinlogFile)
_, err = inst.PurgeBinaryLogsTo(instanceKey, *config.RuntimeCLIFlags.BinlogFile, false)
if err != nil {
log.Fatale(err)
}
Expand Down
33 changes: 33 additions & 0 deletions go/http/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -1320,6 +1320,38 @@ func (this *HttpAPI) FlushBinaryLogs(params martini.Params, r render.Render, req
Respond(r, &APIResponse{Code: OK, Message: fmt.Sprintf("Binary logs flushed on: %+v", instance.Key), Details: instance})
}

// PurgeBinaryLogs purges binary logs up to given binlog file
func (this *HttpAPI) PurgeBinaryLogs(params martini.Params, r render.Render, req *http.Request, user auth.User) {
if !isAuthorizedForAction(req, user) {
Respond(r, &APIResponse{Code: ERROR, Message: "Unauthorized"})
return
}
instanceKey, err := this.getInstanceKey(params["host"], params["port"])

if err != nil {
Respond(r, &APIResponse{Code: ERROR, Message: err.Error()})
return
}
logFile := params["logFile"]
if logFile == "" {
Respond(r, &APIResponse{Code: ERROR, Message: "purge-binary-logs: expected log file name or 'latest'"})
return
}
force := (req.URL.Query().Get("force") == "true") || (params["force"] == "true")
var instance *inst.Instance
if logFile == "latest" {
instance, err = inst.PurgeBinaryLogsToLatest(&instanceKey, force)
} else {
instance, err = inst.PurgeBinaryLogsTo(&instanceKey, logFile, force)
}
if err != nil {
Respond(r, &APIResponse{Code: ERROR, Message: err.Error()})
return
}

Respond(r, &APIResponse{Code: OK, Message: fmt.Sprintf("Binary logs flushed on: %+v", instance.Key), Details: instance})
}

// RestartSlaveStatements receives a query to execute that requires a replication restart to apply.
// As an example, this may be `set global rpl_semi_sync_slave_enabled=1`. orchestrator will check
// replication status on given host and will wrap with appropriate stop/start statements, if need be.
Expand Down Expand Up @@ -3508,6 +3540,7 @@ func (this *HttpAPI) RegisterRequests(m *martini.ClassicMartini) {
this.registerAPIRequest(m, "detach-slave-master-host/:host/:port", this.DetachReplicaMasterHost)
this.registerAPIRequest(m, "reattach-slave-master-host/:host/:port", this.ReattachReplicaMasterHost)
this.registerAPIRequest(m, "flush-binary-logs/:host/:port", this.FlushBinaryLogs)
this.registerAPIRequest(m, "purge-binary-logs/:host/:port/:logFile", this.PurgeBinaryLogs)
this.registerAPIRequest(m, "restart-slave-statements/:host/:port", this.RestartSlaveStatements)
this.registerAPIRequest(m, "enable-semi-sync-master/:host/:port", this.EnableSemiSyncMaster)
this.registerAPIRequest(m, "disable-semi-sync-master/:host/:port", this.DisableSemiSyncMaster)
Expand Down
20 changes: 10 additions & 10 deletions go/inst/analysis_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,23 +68,23 @@ func GetReplicationAnalysis(clusterName string, hints *ReplicationAnalysisHints)
AND replica_instance.slave_io_running = 0
AND replica_instance.last_io_error like '%error %connecting to master%'
AND replica_instance.slave_sql_running = 1),
0) /* AS count_slaves_failing_to_connect_to_master */ > 0)
0) /* AS count_replicas_failing_to_connect_to_master */ > 0)
OR (IFNULL(SUM(replica_instance.last_checked <= replica_instance.last_seen),
0) /* AS count_valid_slaves */ < COUNT(replica_instance.server_id) /* AS count_slaves */)
0) /* AS count_valid_slaves */ < COUNT(replica_instance.server_id) /* AS count_replicas */)
OR (IFNULL(SUM(replica_instance.last_checked <= replica_instance.last_seen
AND replica_instance.slave_io_running != 0
AND replica_instance.slave_sql_running != 0),
0) /* AS count_valid_replicating_slaves */ < COUNT(replica_instance.server_id) /* AS count_slaves */)
0) /* AS count_valid_replicating_slaves */ < COUNT(replica_instance.server_id) /* AS count_replicas */)
OR (MIN(
master_instance.slave_sql_running = 1
AND master_instance.slave_io_running = 0
AND master_instance.last_io_error like '%error %connecting to master%'
) /* AS is_failing_to_connect_to_master */)
OR (COUNT(replica_instance.server_id) /* AS count_slaves */ > 0)
OR (COUNT(replica_instance.server_id) /* AS count_replicas */ > 0)
`
args = append(args, ValidSecondsFromSeenToLastAttemptedCheck())
}
// "OR count_slaves > 0" above is a recent addition, which, granted, makes some previous conditions redundant.
// "OR count_replicas > 0" above is a recent addition, which, granted, makes some previous conditions redundant.
// It gives more output, and more "NoProblem" messages that I am now interested in for purpose of auditing in database_instance_analysis_changelog
query := fmt.Sprintf(`
SELECT
Expand All @@ -109,7 +109,7 @@ func GetReplicationAnalysis(clusterName string, hints *ReplicationAnalysisHints)
':',
master_instance.port) = master_instance.cluster_name) AS is_cluster_master,
MIN(master_instance.gtid_mode) AS gtid_mode,
COUNT(replica_instance.server_id) AS count_slaves,
COUNT(replica_instance.server_id) AS count_replicas,
IFNULL(SUM(replica_instance.last_checked <= replica_instance.last_seen),
0) AS count_valid_slaves,
IFNULL(SUM(replica_instance.last_checked <= replica_instance.last_seen
Expand All @@ -120,7 +120,7 @@ func GetReplicationAnalysis(clusterName string, hints *ReplicationAnalysisHints)
AND replica_instance.slave_io_running = 0
AND replica_instance.last_io_error like '%%error %%connecting to master%%'
AND replica_instance.slave_sql_running = 1),
0) AS count_slaves_failing_to_connect_to_master,
0) AS count_replicas_failing_to_connect_to_master,
MIN(master_instance.replication_depth) AS replication_depth,
GROUP_CONCAT(concat(replica_instance.Hostname, ':', replica_instance.Port)) as slave_hosts,
MIN(
Expand Down Expand Up @@ -236,7 +236,7 @@ func GetReplicationAnalysis(clusterName string, hints *ReplicationAnalysisHints)
ORDER BY
is_master DESC ,
is_cluster_master DESC,
count_slaves DESC
count_replicas DESC
`, analysisQueryReductionClause)
err := db.QueryOrchestrator(query, args, func(m sqlutils.RowMap) error {
a := ReplicationAnalysis{
Expand All @@ -256,10 +256,10 @@ func GetReplicationAnalysis(clusterName string, hints *ReplicationAnalysisHints)
a.GTIDMode = m.GetString("gtid_mode")
a.LastCheckValid = m.GetBool("is_last_check_valid")
a.LastCheckPartialSuccess = m.GetBool("last_check_partial_success")
a.CountReplicas = m.GetUint("count_slaves")
a.CountReplicas = m.GetUint("count_replicas")
a.CountValidReplicas = m.GetUint("count_valid_slaves")
a.CountValidReplicatingReplicas = m.GetUint("count_valid_replicating_slaves")
a.CountReplicasFailingToConnectToMaster = m.GetUint("count_slaves_failing_to_connect_to_master")
a.CountReplicasFailingToConnectToMaster = m.GetUint("count_replicas_failing_to_connect_to_master")
a.CountDowntimedReplicas = m.GetUint("count_downtimed_replicas")
a.ReplicationDepth = m.GetUint("replication_depth")
a.IsFailingToConnectToMaster = m.GetBool("is_failing_to_connect_to_master")
Expand Down
26 changes: 26 additions & 0 deletions go/inst/instance_topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -2700,3 +2700,29 @@ func RelocateReplicas(instanceKey, otherKey *InstanceKey, pattern string) (repli
}
return replicas, other, err, errs
}

// PurgeBinaryLogsTo attempts to 'PURGE BINARY LOGS' until given binary log is reached
func PurgeBinaryLogsTo(instanceKey *InstanceKey, logFile string, force bool) (*Instance, error) {
replicas, err := ReadReplicaInstances(instanceKey)
if err != nil {
return nil, err
}
if !force {
purgeCoordinates := &BinlogCoordinates{LogFile: logFile, LogPos: 0}
for _, replica := range replicas {
if !purgeCoordinates.SmallerThan(&replica.ExecBinlogCoordinates) {
return nil, log.Errorf("Unsafe to purge binary logs on %+v up to %s because replica %+v has only applied up to %+v", *instanceKey, logFile, replica.Key, replica.ExecBinlogCoordinates)
}
}
}
return purgeBinaryLogsTo(instanceKey, logFile)
}

// PurgeBinaryLogsToLatest attempts to 'PURGE BINARY LOGS' until latest binary log
func PurgeBinaryLogsToLatest(instanceKey *InstanceKey, force bool) (*Instance, error) {
instance, err := ReadTopologyInstance(instanceKey)
if err != nil {
return instance, log.Errore(err)
}
return PurgeBinaryLogsTo(instanceKey, instance.SelfBinlogCoordinates.LogFile, force)
}
13 changes: 2 additions & 11 deletions go/inst/instance_topology_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,8 @@ func FlushBinaryLogsTo(instanceKey *InstanceKey, logFile string) (*Instance, err
return FlushBinaryLogs(instanceKey, distance)
}

// FlushBinaryLogsTo attempts to 'PURGE BINARY LOGS' until given binary log is reached
func PurgeBinaryLogsTo(instanceKey *InstanceKey, logFile string) (*Instance, error) {
// purgeBinaryLogsTo attempts to 'PURGE BINARY LOGS' until given binary log is reached
func purgeBinaryLogsTo(instanceKey *InstanceKey, logFile string) (*Instance, error) {
if *config.RuntimeCLIFlags.Noop {
return nil, fmt.Errorf("noop: aborting purge-binary-logs operation on %+v; signalling error but nothing went wrong.", *instanceKey)
}
Expand All @@ -207,15 +207,6 @@ func PurgeBinaryLogsTo(instanceKey *InstanceKey, logFile string) (*Instance, err
return ReadTopologyInstance(instanceKey)
}

// FlushBinaryLogsTo attempts to 'PURGE BINARY LOGS' until given binary log is reached
func PurgeBinaryLogsToCurrent(instanceKey *InstanceKey) (*Instance, error) {
instance, err := ReadTopologyInstance(instanceKey)
if err != nil {
return instance, log.Errore(err)
}
return PurgeBinaryLogsTo(instanceKey, instance.SelfBinlogCoordinates.LogFile)
}

func SetSemiSyncMaster(instanceKey *InstanceKey, enableMaster bool) (*Instance, error) {
instance, err := ReadTopologyInstance(instanceKey)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion go/logic/topology_recovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@ func recoverDeadMasterInBinlogServerTopology(topologyRecovery *TopologyRecovery)
if err != nil {
return promotedReplica, log.Errore(err)
}
promotedReplica, err = inst.PurgeBinaryLogsToCurrent(&promotedReplica.Key)
promotedReplica, err = inst.PurgeBinaryLogsToLatest(&promotedReplica.Key, false)
if err != nil {
return promotedReplica, log.Errore(err)
}
Expand Down
13 changes: 12 additions & 1 deletion resources/bin/orchestrator-client
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ pool=
hostname_flag=
api_path=
basic_auth="${ORCHESTRATOR_AUTH_USER:-}:${ORCHESTRATOR_AUTH_PASSWORD:-}"
binlog=

instance_hostport=
destination_hostport=
Expand Down Expand Up @@ -79,11 +80,12 @@ for arg in "$@"; do
"-path"|"--path") set -- "$@" "-P" ;;
"-query"|"--query") set -- "$@" "-q" ;;
"-auth"|"--auth") set -- "$@" "-b" ;;
"-binlog"|"--binlog") set -- "$@" "-n" ;;
*) set -- "$@" "$arg"
esac
done

while getopts "c:i:d:s:a:D:U:o:r:u:R:t:l:H:P:q:b:h" OPTION
while getopts "c:i:d:s:a:D:U:o:r:u:R:t:l:H:P:q:b:n:h" OPTION
do
case $OPTION in
h) command="help" ;;
Expand All @@ -103,6 +105,7 @@ do
U) [ ! -z "$OPTARG" ] && orchestrator_api="$OPTARG" ;;
P) api_path="$OPTARG" ;;
b) basic_auth="$OPTARG" ;;
n) binlog="$OPTARG" ;;
q) query="$OPTARG"
esac
done
Expand Down Expand Up @@ -402,6 +405,13 @@ function is_replication_stopped {
print_response | jq '. | select(.ReplicationSQLThreadState==0 and .ReplicationIOThreadState==0)' | filter_key | print_key
}

function purge_binary_logs {
assert_nonempty "instance" "$instance_hostport"
assert_nonempty "binlog" "$binlog"
api "purge-binary-logs/$instance_hostport/$binlog"
print_details | filter_key | print_key
}

function last_pseudo_gtid {
assert_nonempty "instance" "$instance_hostport"
api "last-pseudo-gtid/$instance_hostport"
Expand Down Expand Up @@ -888,6 +898,7 @@ function run_command {
"set-read-only") general_instance_command ;; # Turn an instance read-only, via SET GLOBAL read_only := 1
"set-writeable") general_instance_command ;; # Turn an instance writeable, via SET GLOBAL read_only := 0
"flush-binary-logs") general_instance_command ;; # Flush binary logs on an instance
"purge-binary-logs") purge_binary_logs ;; # Purge binary logs on an instance
"last-pseudo-gtid") last_pseudo_gtid ;; # Dump last injected Pseudo-GTID entry on a server

"recover") recover ;; # Do auto-recovery given a dead instance, assuming orchestrator agrees there's a problem. Override blocking.
Expand Down

0 comments on commit 5bd6e2a

Please sign in to comment.