Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge downstream gh #225

Merged
merged 17 commits into from
Jul 11, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
#
set -e

RELEASE_VERSION="1.5.0"
RELEASE_VERSION="1.5.5"
TOPDIR=/tmp/orchestrator-release
export RELEASE_VERSION TOPDIR
export GO15VENDOREXPERIMENT=1
Expand Down
15 changes: 13 additions & 2 deletions go/app/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -978,7 +978,7 @@ func Cli(command string, strict bool, instance string, destination string, owner
fmt.Println(clusterInstance.Key.DisplayString())
}
}
case registerCliCommand("which-cluster-osc-slaves", "Information", `Output a list of slaves in same cluster as given instance, that could serve as a pt-online-schema-change operation control slaves`):
case registerCliCommand("which-cluster-osc-slaves", "Information", `Output a list of slaves in a cluster, that could serve as a pt-online-schema-change operation control slaves`):
{
clusterName := getClusterName(clusterAlias, instanceKey)
instances, err := inst.GetClusterOSCSlaves(clusterName)
Expand All @@ -989,6 +989,17 @@ func Cli(command string, strict bool, instance string, destination string, owner
fmt.Println(clusterInstance.Key.DisplayString())
}
}
case registerCliCommand("which-cluster-gh-ost-slaves", "Information", `Output a list of slaves in a cluster, that could serve as a gh-ost working server`):
{
clusterName := getClusterName(clusterAlias, instanceKey)
instances, err := inst.GetClusterGhostSlaves(clusterName)
if err != nil {
log.Fatale(err)
}
for _, clusterInstance := range instances {
fmt.Println(clusterInstance.Key.DisplayString())
}
}
case registerCliCommand("which-master", "Information", `Output the fully-qualified hostname:port representation of a given instance's master`):
{
instanceKey = deduceInstanceKeyIfNeeded(instance, instanceKey, true)
Expand Down Expand Up @@ -1080,7 +1091,7 @@ func Cli(command string, strict bool, instance string, destination string, owner
log.Fatalf("Duration value must be non-negative. Given value: %d", durationSeconds)
}
}
maintenanceKey, err := inst.BeginBoundedMaintenance(instanceKey, inst.GetMaintenanceOwner(), reason, uint(durationSeconds))
maintenanceKey, err := inst.BeginBoundedMaintenance(instanceKey, inst.GetMaintenanceOwner(), reason, uint(durationSeconds), true)
if err == nil {
log.Infof("Maintenance key: %+v", maintenanceKey)
log.Infof("Maintenance duration: %d seconds", durationSeconds)
Expand Down
15 changes: 13 additions & 2 deletions go/app/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package app

import (
"net"
nethttp "net/http"
"strings"

Expand Down Expand Up @@ -122,7 +123,17 @@ func standardHttp(discovery bool) {
http.Web.RegisterRequests(m)

// Serve
if config.Config.UseSSL {
if config.Config.ListenSocket != "" {
log.Infof("Starting HTTP listener on unix socket %v", config.Config.ListenSocket)
unixListener, err := net.Listen("unix", config.Config.ListenSocket)
if err != nil {
log.Fatale(err)
}
defer unixListener.Close()
if err := nethttp.Serve(unixListener, m); err != nil {
log.Fatale(err)
}
} else if config.Config.UseSSL {
log.Info("Starting HTTPS listener")
tlsConfig, err := ssl.NewTLSConfig(config.Config.SSLCAFile, config.Config.UseMutualTLS)
if err != nil {
Expand All @@ -136,7 +147,7 @@ func standardHttp(discovery bool) {
log.Fatale(err)
}
} else {
log.Info("Starting HTTP listener")
log.Infof("Starting HTTP listener on %+v", config.Config.ListenAddress)
if err := nethttp.ListenAndServe(config.Config.ListenAddress, m); err != nil {
log.Fatale(err)
}
Expand Down
10 changes: 7 additions & 3 deletions go/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,10 @@ var (
// Some of the parameteres have reasonable default values, and some (like database credentials) are
// strictly expected from user.
type Configuration struct {
Debug bool // set debug mode (similar to --debug option)
EnableSyslog bool // Should logs be directed (in addition) to syslog daemon?
ListenAddress string
Debug bool // set debug mode (similar to --debug option)
EnableSyslog bool // Should logs be directed (in addition) to syslog daemon?
ListenAddress string // Where orchestrator HTTP should listen for TCP
ListenSocket string // Where orchestrator HTTP should listen for unix socket (default: empty; when given, TCP is disabled)
AgentsServerPort string // port orchestrator agents talk back to
MySQLTopologyUser string
MySQLTopologyPassword string // my.cnf style configuration file from where to pick credentials. Expecting `user`, `password` under `[client]` section
Expand Down Expand Up @@ -71,6 +72,7 @@ type Configuration struct {
SnapshotTopologiesIntervalHours uint // Interval in hour between snapshot-topologies invocation. Default: 0 (disabled)
InstanceBulkOperationsWaitTimeoutSeconds uint // Time to wait on a single instance when doing bulk (many instances) operation
ActiveNodeExpireSeconds uint // Maximum time to wait for active node to send keepalive before attempting to take over as active node.
NodeHealthExpiry bool // Do we expire the node_health table? Usually this is true but it might be disabled on command line tools if an orchestrator daemon is running.
HostnameResolveMethod string // Method by which to "normalize" hostname ("none"/"default"/"cname")
MySQLHostnameResolveMethod string // Method by which to "normalize" hostname via MySQL server. ("none"/"@@hostname"/"@@report_host"; default "@@hostname")
SkipBinlogServerUnresolveCheck bool // Skip the double-check that an unresolved hostname resolves back to same hostname for binlog servers
Expand Down Expand Up @@ -192,6 +194,7 @@ func newConfiguration() *Configuration {
Debug: false,
EnableSyslog: false,
ListenAddress: ":3000",
ListenSocket: "",
AgentsServerPort: ":3001",
StatusEndpoint: "/api/status",
StatusSimpleHealth: true,
Expand All @@ -212,6 +215,7 @@ func newConfiguration() *Configuration {
DiscoverByShowSlaveHosts: false,
InstanceBulkOperationsWaitTimeoutSeconds: 10,
ActiveNodeExpireSeconds: 5,
NodeHealthExpiry: true,
HostnameResolveMethod: "default",
MySQLHostnameResolveMethod: "@@hostname",
SkipBinlogServerUnresolveCheck: true,
Expand Down
16 changes: 16 additions & 0 deletions go/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -825,6 +825,22 @@ var generateSQLPatches = []string{
database_instance
MODIFY cluster_name varchar(128) NOT NULL
`,
`
ALTER TABLE
node_health
ADD INDEX last_seen_active_idx (last_seen_active)
`,
`
ALTER TABLE
database_instance_maintenance
ADD COLUMN processing_node_hostname varchar(128) CHARACTER SET ascii NOT NULL,
ADD COLUMN processing_node_token varchar(128) NOT NULL
`,
`
ALTER TABLE
database_instance_maintenance
ADD COLUMN explicitly_bounded TINYINT UNSIGNED NOT NULL
`,
}

// Track if a TLS has already been configured for topology
Expand Down
2 changes: 1 addition & 1 deletion go/http/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func (this *HttpAPI) BeginMaintenance(params martini.Params, r render.Render, re
r.JSON(200, &APIResponse{Code: ERROR, Message: err.Error()})
return
}
key, err := inst.BeginMaintenance(&instanceKey, params["owner"], params["reason"])
key, err := inst.BeginBoundedMaintenance(&instanceKey, params["owner"], params["reason"], 0, true)
if err != nil {
r.JSON(200, &APIResponse{Code: ERROR, Message: err.Error(), Details: key})
return
Expand Down
49 changes: 47 additions & 2 deletions go/inst/instance_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,8 +288,13 @@ func ReadTopologyInstance(instanceKey *InstanceKey) (*Instance, error) {
instance.MasterKey = *masterKey
instance.IsDetachedMaster = instance.MasterKey.IsDetached()
instance.SecondsBehindMaster = m.GetNullInt64("Seconds_Behind_Master")
if instance.SecondsBehindMaster.Valid && instance.SecondsBehindMaster.Int64 < 0 {
log.Warningf("Host: %+v, instance.SecondsBehindMaster < 0 [%+v], correcting to 0", instanceKey, instance.SecondsBehindMaster.Int64)
instance.SecondsBehindMaster.Int64 = 0
}
// And until told otherwise:
instance.SlaveLagSeconds = instance.SecondsBehindMaster

instance.AllowTLS = (m.GetString("Master_SSL_Allowed") == "Yes")
// Not breaking the flow even on error
slaveStatusFound = true
Expand Down Expand Up @@ -424,8 +429,12 @@ func ReadTopologyInstance(instanceKey *InstanceKey) (*Instance, error) {
}

if config.Config.SlaveLagQuery != "" && !isMaxScale {
err := db.QueryRow(config.Config.SlaveLagQuery).Scan(&instance.SlaveLagSeconds)
if err != nil {
if err := db.QueryRow(config.Config.SlaveLagQuery).Scan(&instance.SlaveLagSeconds); err == nil {
if instance.SlaveLagSeconds.Valid && instance.SlaveLagSeconds.Int64 < 0 {
log.Warningf("Host: %+v, instance.SlaveLagSeconds < 0 [%+v], correcting to 0", instanceKey, instance.SlaveLagSeconds.Int64)
instance.SlaveLagSeconds.Int64 = 0
}
} else {
instance.SlaveLagSeconds = instance.SecondsBehindMaster
logReadTopologyInstanceError(instanceKey, "SlaveLagQuery", err)
}
Expand Down Expand Up @@ -1119,6 +1128,42 @@ func GetClusterOSCSlaves(clusterName string) ([](*Instance), error) {
return result, nil
}

// GetClusterGhostSlaves returns a list of replicas that can serve as the connected servers
// for a [gh-ost](https://github.com/github/gh-ost) operation. A gh-ost operation prefers to talk
// to a RBR replica that has no children.
func GetClusterGhostSlaves(clusterName string) (result [](*Instance), err error) {
condition := `
replication_depth > 0
and binlog_format = 'ROW'
and cluster_name = ?
`
instances, err := readInstancesByCondition(condition, sqlutils.Args(clusterName), "num_slave_hosts asc")
if err != nil {
return result, err
}

for _, instance := range instances {
skipThisHost := false
if instance.IsBinlogServer() {
skipThisHost = true
}
if !instance.IsLastCheckValid {
skipThisHost = true
}
if !instance.LogBinEnabled {
skipThisHost = true
}
if !instance.LogSlaveUpdatesEnabled {
skipThisHost = true
}
if !skipThisHost {
result = append(result, instance)
}
}

return result, err
}

// GetInstancesMaxLag returns the maximum lag in a set of instances
func GetInstancesMaxLag(instances [](*Instance)) (maxLag int64, err error) {
if len(instances) == 0 {
Expand Down
62 changes: 44 additions & 18 deletions go/inst/maintenance_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,19 @@ package inst

import (
"fmt"

"github.com/outbrain/golib/log"
"github.com/outbrain/golib/sqlutils"
"github.com/outbrain/orchestrator/go/config"
"github.com/outbrain/orchestrator/go/db"
"github.com/outbrain/orchestrator/go/process"
)

// ReadActiveMaintenance returns the list of currently active maintenance entries
func ReadActiveMaintenance() ([]Maintenance, error) {
res := []Maintenance{}
query := `
select
select
database_instance_maintenance_id,
hostname,
port,
Expand All @@ -37,7 +39,7 @@ func ReadActiveMaintenance() ([]Maintenance, error) {
maintenance_active,
owner,
reason
from
from
database_instance_maintenance
where
maintenance_active = 1
Expand Down Expand Up @@ -67,24 +69,29 @@ func ReadActiveMaintenance() ([]Maintenance, error) {
}

// BeginBoundedMaintenance will make new maintenance entry for given instanceKey.
func BeginBoundedMaintenance(instanceKey *InstanceKey, owner string, reason string, durationSeconds uint) (int64, error) {
func BeginBoundedMaintenance(instanceKey *InstanceKey, owner string, reason string, durationSeconds uint, explicitlyBounded bool) (int64, error) {
var maintenanceToken int64 = 0
if durationSeconds == 0 {
durationSeconds = config.Config.MaintenanceExpireMinutes * 60
}
res, err := db.ExecOrchestrator(`
insert ignore
into database_instance_maintenance (
hostname, port, maintenance_active, begin_timestamp, end_timestamp, owner, reason
hostname, port, maintenance_active, begin_timestamp, end_timestamp, owner, reason,
processing_node_hostname, processing_node_token, explicitly_bounded
) VALUES (
?, ?, 1, NOW(), NOW() + INTERVAL ? SECOND, ?, ?
?, ?, 1, NOW(), NOW() + INTERVAL ? SECOND, ?, ?,
?, ?, ?
)
`,
instanceKey.Hostname,
instanceKey.Port,
durationSeconds,
owner,
reason,
process.ThisHostname,
process.ProcessToken.Hash,
explicitlyBounded,
)
if err != nil {
return maintenanceToken, log.Errore(err)
Expand All @@ -102,19 +109,19 @@ func BeginBoundedMaintenance(instanceKey *InstanceKey, owner string, reason stri

// BeginMaintenance will make new maintenance entry for given instanceKey. Maintenance time is unbounded
func BeginMaintenance(instanceKey *InstanceKey, owner string, reason string) (int64, error) {
return BeginBoundedMaintenance(instanceKey, owner, reason, 0)
return BeginBoundedMaintenance(instanceKey, owner, reason, 0, false)
}

// EndMaintenanceByInstanceKey will terminate an active maintenance using given instanceKey as hint
func EndMaintenanceByInstanceKey(instanceKey *InstanceKey) error {
res, err := db.ExecOrchestrator(`
update
database_instance_maintenance
set
set
maintenance_active = NULL,
end_timestamp = NOW()
where
hostname = ?
hostname = ?
and port = ?
and maintenance_active = 1
`,
Expand All @@ -138,10 +145,10 @@ func EndMaintenanceByInstanceKey(instanceKey *InstanceKey) error {
func ReadMaintenanceInstanceKey(maintenanceToken int64) (*InstanceKey, error) {
var res *InstanceKey
query := `
select
hostname, port
from
database_instance_maintenance
select
hostname, port
from
database_instance_maintenance
where
database_instance_maintenance_id = ?
`
Expand All @@ -167,11 +174,11 @@ func EndMaintenance(maintenanceToken int64) error {
res, err := db.ExecOrchestrator(`
update
database_instance_maintenance
set
set
maintenance_active = NULL,
end_timestamp = NOW()
where
database_instance_maintenance_id = ?
database_instance_maintenance_id = ?
`,
maintenanceToken,
)
Expand All @@ -196,7 +203,7 @@ func ExpireMaintenance() error {
database_instance_maintenance
where
maintenance_active is null
and end_timestamp < NOW() - INTERVAL ? DAY
and end_timestamp < NOW() - INTERVAL ? DAY
`,
config.Config.MaintenancePurgeDays,
)
Expand All @@ -211,11 +218,11 @@ func ExpireMaintenance() error {
res, err := db.ExecOrchestrator(`
update
database_instance_maintenance
set
maintenance_active = NULL
set
maintenance_active = NULL
where
maintenance_active = 1
and end_timestamp < NOW()
and end_timestamp < NOW()
`,
)
if err != nil {
Expand All @@ -225,6 +232,25 @@ func ExpireMaintenance() error {
AuditOperation("expire-maintenance", nil, fmt.Sprintf("Expired bounded: %d", rowsAffected))
}
}
{
res, err := db.ExecOrchestrator(`
update
database_instance_maintenance
left join node_health on (processing_node_hostname = node_health.hostname AND processing_node_token = node_health.token)
set
database_instance_maintenance.maintenance_active = NULL
where
node_health.last_seen_active IS NULL
and explicitly_bounded = 0
`,
)
if err != nil {
return log.Errore(err)
}
if rowsAffected, _ := res.RowsAffected(); rowsAffected > 0 {
AuditOperation("expire-maintenance", nil, fmt.Sprintf("Expired dead: %d", rowsAffected))
}
}

return nil
}
Loading