Skip to content
This repository has been archived by the owner on Sep 30, 2024. It is now read-only.

Candidate databse instance: database native time #591

Merged
merged 4 commits into from
Aug 29, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go/app/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -1456,7 +1456,7 @@ func Cli(command string, strict bool, instance string, destination string, owner
if err != nil {
log.Fatale(err)
}
err = inst.RegisterCandidateInstance(inst.NewCandidateDatabaseInstance(instanceKey, promotionRule))
err = inst.RegisterCandidateInstance(inst.NewCandidateDatabaseInstance(instanceKey, promotionRule).WithCurrentTime())
if err != nil {
log.Fatale(err)
}
Expand Down
12 changes: 12 additions & 0 deletions go/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,3 +394,15 @@ func QueryOrchestratorBuffered(query string, argsArray []interface{}, on_row fun
}
return log.Criticale(sqlutils.QueryRowsMapBuffered(db, query, on_row, argsArray...))
}

// ReadTimeNow reads and returns the current timestamp as string. This is an unfortunate workaround
// to support both MySQL and SQLite in all possible timezones. SQLite only speaks UTC where MySQL has
// timezone support. By reading the time as string we get the database's de-facto notion of the time,
// which we can then feed back to it.
func ReadTimeNow() (timeNow string, err error) {
err = QueryOrchestrator(`select now() as time_now`, nil, func(m sqlutils.RowMap) error {
timeNow = m.GetString("time_now")
return nil
})
return timeNow, err
}
2 changes: 1 addition & 1 deletion go/http/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -2853,7 +2853,7 @@ func (this *HttpAPI) RegisterCandidate(params martini.Params, r render.Render, r
return
}

candidate := inst.NewCandidateDatabaseInstance(&instanceKey, promotionRule)
candidate := inst.NewCandidateDatabaseInstance(&instanceKey, promotionRule).WithCurrentTime()

if orcraft.IsRaftEnabled() {
_, err = orcraft.PublishCommand("register-candidate", candidate)
Expand Down
17 changes: 11 additions & 6 deletions go/inst/candidate_database_instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,31 @@ package inst

import (
"fmt"
"time"

"github.com/github/orchestrator/go/db"
)

// CandidateDatabaseInstance contains information about explicit promotion rules for an instance
type CandidateDatabaseInstance struct {
Hostname string
Port int
PromotionRule CandidatePromotionRule
LastSuggested time.Time
Hostname string
Port int
PromotionRule CandidatePromotionRule
LastSuggestedString string
}

func NewCandidateDatabaseInstance(instanceKey *InstanceKey, promotionRule CandidatePromotionRule) *CandidateDatabaseInstance {
return &CandidateDatabaseInstance{
Hostname: instanceKey.Hostname,
Port: instanceKey.Port,
PromotionRule: promotionRule,
LastSuggested: time.Now(),
}
}

func (cdi *CandidateDatabaseInstance) WithCurrentTime() *CandidateDatabaseInstance {
cdi.LastSuggestedString, _ = db.ReadTimeNow()
return cdi
}

// String returns a string representation of the CandidateDatabaseInstance struct
func (cdi *CandidateDatabaseInstance) String() string {
return fmt.Sprintf("%s:%d %s", cdi.Hostname, cdi.Port, cdi.PromotionRule)
Expand Down
52 changes: 48 additions & 4 deletions go/inst/candidate_database_instance_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,55 @@
package inst

import (
"fmt"

"github.com/openark/golib/log"
"github.com/openark/golib/sqlutils"

"github.com/github/orchestrator/go/config"
"github.com/github/orchestrator/go/db"
)

// RegisterCandidateInstance markes a given instance as suggested for successoring a master in the event of failover.
func RegisterCandidateInstance(candidate *CandidateDatabaseInstance) error {
if candidate.LastSuggestedString == "" {
candidate = candidate.WithCurrentTime()
}
args := sqlutils.Args(candidate.Hostname, candidate.Port, string(candidate.PromotionRule), candidate.LastSuggestedString)

query := fmt.Sprintf(`
insert into candidate_database_instance (
hostname,
port,
promotion_rule,
last_suggested
) values (
?, ?, ?, ?
) on duplicate key update
last_suggested=values(last_suggested),
promotion_rule=values(promotion_rule)
`)
writeFunc := func() error {
_, err := db.ExecOrchestrator(query, args...)
AuditOperation("register-candidate", candidate.Key(), string(candidate.PromotionRule))
return log.Errore(err)
}
return ExecDBWriteFunc(writeFunc)
}

// ExpireCandidateInstances removes stale master candidate suggestions.
func ExpireCandidateInstances() error {
writeFunc := func() error {
_, err := db.ExecOrchestrator(`
delete from candidate_database_instance
where last_suggested < NOW() - INTERVAL ? MINUTE
`, config.Config.CandidateInstanceExpireMinutes,
)
return log.Errore(err)
}
return ExecDBWriteFunc(writeFunc)
}

// BulkReadCandidateDatabaseInstance returns a slice of
// CandidateDatabaseInstance converted to JSON.
/*
Expand Down Expand Up @@ -49,10 +93,10 @@ func BulkReadCandidateDatabaseInstance() ([]CandidateDatabaseInstance, error) {
`
err := db.QueryOrchestrator(query, nil, func(m sqlutils.RowMap) error {
cdi := CandidateDatabaseInstance{
Hostname: m.GetString("hostname"),
Port: m.GetInt("port"),
PromotionRule: CandidatePromotionRule(m.GetString("promotion_rule")),
LastSuggested: m.GetTime("last_suggested"),
Hostname: m.GetString("hostname"),
Port: m.GetInt("port"),
PromotionRule: CandidatePromotionRule(m.GetString("promotion_rule")),
LastSuggestedString: m.GetString("last_suggested"),
}
// add to end of candidateDatabaseInstances
candidateDatabaseInstances = append(candidateDatabaseInstances, cdi)
Expand Down
48 changes: 1 addition & 47 deletions go/inst/instance_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -723,7 +723,7 @@ func ReadTopologyInstanceBufferable(instanceKey *InstanceKey, bufferWrites bool,
// We register the rule even if it hasn't changed,
// to bump the last_suggested time.
instance.PromotionRule = promotionRule
err = RegisterCandidateInstance(NewCandidateDatabaseInstance(instanceKey, promotionRule))
err = RegisterCandidateInstance(NewCandidateDatabaseInstance(instanceKey, promotionRule).WithCurrentTime())
logReadTopologyInstanceError(instanceKey, "RegisterCandidateInstance", err)
}
}()
Expand Down Expand Up @@ -2550,52 +2550,6 @@ func ReadHistoryClusterInstances(clusterName string, historyTimestampPattern str
return instances, err
}

// RegisterCandidateInstance markes a given instance as suggested for successoring a master in the event of failover.
func RegisterCandidateInstance(candidate *CandidateDatabaseInstance) error {
args := sqlutils.Args(candidate.Hostname, candidate.Port, string(candidate.PromotionRule))
lastSuggestedHint := "now()"
if !candidate.LastSuggested.IsZero() {
lastSuggestedHint = "?"
args = append(args, candidate.LastSuggested)
}
query := fmt.Sprintf(`
insert into candidate_database_instance (
hostname,
port,
promotion_rule,
last_suggested
) values (?, ?, ?, %s)
on duplicate key update
hostname=values(hostname),
port=values(port),
last_suggested=now(),
promotion_rule=values(promotion_rule)
`, lastSuggestedHint)
writeFunc := func() error {
_, err := db.ExecOrchestrator(query, args...)
AuditOperation("register-candidate", candidate.Key(), string(candidate.PromotionRule))
return log.Errore(err)
}
return ExecDBWriteFunc(writeFunc)
}

// ExpireCandidateInstances removes stale master candidate suggestions.
func ExpireCandidateInstances() error {
writeFunc := func() error {
_, err := db.ExecOrchestrator(`
delete from candidate_database_instance
where last_suggested < NOW() - INTERVAL ? MINUTE
`, config.Config.CandidateInstanceExpireMinutes,
)
if err != nil {
return log.Errore(err)
}

return nil
}
return ExecDBWriteFunc(writeFunc)
}

// RecordInstanceCoordinatesHistory snapshots the binlog coordinates of instances
func RecordInstanceCoordinatesHistory() error {
{
Expand Down