Skip to content
This repository has been archived by the owner on Sep 30, 2024. It is now read-only.

Commit

Permalink
Merge pull request #591 from github/candidate-database-instance-utc
Browse files Browse the repository at this point in the history
 Candidate databse instance: database native time
  • Loading branch information
Shlomi Noach authored Aug 29, 2018
2 parents 1c9ef99 + 4994bc0 commit 769d6cb
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 59 deletions.
2 changes: 1 addition & 1 deletion go/app/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -1456,7 +1456,7 @@ func Cli(command string, strict bool, instance string, destination string, owner
if err != nil {
log.Fatale(err)
}
err = inst.RegisterCandidateInstance(inst.NewCandidateDatabaseInstance(instanceKey, promotionRule))
err = inst.RegisterCandidateInstance(inst.NewCandidateDatabaseInstance(instanceKey, promotionRule).WithCurrentTime())
if err != nil {
log.Fatale(err)
}
Expand Down
12 changes: 12 additions & 0 deletions go/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,3 +394,15 @@ func QueryOrchestratorBuffered(query string, argsArray []interface{}, on_row fun
}
return log.Criticale(sqlutils.QueryRowsMapBuffered(db, query, on_row, argsArray...))
}

// ReadTimeNow reads and returns the current timestamp as string. This is an unfortunate workaround
// to support both MySQL and SQLite in all possible timezones. SQLite only speaks UTC where MySQL has
// timezone support. By reading the time as string we get the database's de-facto notion of the time,
// which we can then feed back to it.
func ReadTimeNow() (timeNow string, err error) {
err = QueryOrchestrator(`select now() as time_now`, nil, func(m sqlutils.RowMap) error {
timeNow = m.GetString("time_now")
return nil
})
return timeNow, err
}
2 changes: 1 addition & 1 deletion go/http/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -2853,7 +2853,7 @@ func (this *HttpAPI) RegisterCandidate(params martini.Params, r render.Render, r
return
}

candidate := inst.NewCandidateDatabaseInstance(&instanceKey, promotionRule)
candidate := inst.NewCandidateDatabaseInstance(&instanceKey, promotionRule).WithCurrentTime()

if orcraft.IsRaftEnabled() {
_, err = orcraft.PublishCommand("register-candidate", candidate)
Expand Down
17 changes: 11 additions & 6 deletions go/inst/candidate_database_instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,31 @@ package inst

import (
"fmt"
"time"

"github.com/github/orchestrator/go/db"
)

// CandidateDatabaseInstance contains information about explicit promotion rules for an instance
type CandidateDatabaseInstance struct {
Hostname string
Port int
PromotionRule CandidatePromotionRule
LastSuggested time.Time
Hostname string
Port int
PromotionRule CandidatePromotionRule
LastSuggestedString string
}

func NewCandidateDatabaseInstance(instanceKey *InstanceKey, promotionRule CandidatePromotionRule) *CandidateDatabaseInstance {
return &CandidateDatabaseInstance{
Hostname: instanceKey.Hostname,
Port: instanceKey.Port,
PromotionRule: promotionRule,
LastSuggested: time.Now(),
}
}

func (cdi *CandidateDatabaseInstance) WithCurrentTime() *CandidateDatabaseInstance {
cdi.LastSuggestedString, _ = db.ReadTimeNow()
return cdi
}

// String returns a string representation of the CandidateDatabaseInstance struct
func (cdi *CandidateDatabaseInstance) String() string {
return fmt.Sprintf("%s:%d %s", cdi.Hostname, cdi.Port, cdi.PromotionRule)
Expand Down
52 changes: 48 additions & 4 deletions go/inst/candidate_database_instance_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,55 @@
package inst

import (
"fmt"

"github.com/openark/golib/log"
"github.com/openark/golib/sqlutils"

"github.com/github/orchestrator/go/config"
"github.com/github/orchestrator/go/db"
)

// RegisterCandidateInstance markes a given instance as suggested for successoring a master in the event of failover.
func RegisterCandidateInstance(candidate *CandidateDatabaseInstance) error {
if candidate.LastSuggestedString == "" {
candidate = candidate.WithCurrentTime()
}
args := sqlutils.Args(candidate.Hostname, candidate.Port, string(candidate.PromotionRule), candidate.LastSuggestedString)

query := fmt.Sprintf(`
insert into candidate_database_instance (
hostname,
port,
promotion_rule,
last_suggested
) values (
?, ?, ?, ?
) on duplicate key update
last_suggested=values(last_suggested),
promotion_rule=values(promotion_rule)
`)
writeFunc := func() error {
_, err := db.ExecOrchestrator(query, args...)
AuditOperation("register-candidate", candidate.Key(), string(candidate.PromotionRule))
return log.Errore(err)
}
return ExecDBWriteFunc(writeFunc)
}

// ExpireCandidateInstances removes stale master candidate suggestions.
func ExpireCandidateInstances() error {
writeFunc := func() error {
_, err := db.ExecOrchestrator(`
delete from candidate_database_instance
where last_suggested < NOW() - INTERVAL ? MINUTE
`, config.Config.CandidateInstanceExpireMinutes,
)
return log.Errore(err)
}
return ExecDBWriteFunc(writeFunc)
}

// BulkReadCandidateDatabaseInstance returns a slice of
// CandidateDatabaseInstance converted to JSON.
/*
Expand Down Expand Up @@ -49,10 +93,10 @@ func BulkReadCandidateDatabaseInstance() ([]CandidateDatabaseInstance, error) {
`
err := db.QueryOrchestrator(query, nil, func(m sqlutils.RowMap) error {
cdi := CandidateDatabaseInstance{
Hostname: m.GetString("hostname"),
Port: m.GetInt("port"),
PromotionRule: CandidatePromotionRule(m.GetString("promotion_rule")),
LastSuggested: m.GetTime("last_suggested"),
Hostname: m.GetString("hostname"),
Port: m.GetInt("port"),
PromotionRule: CandidatePromotionRule(m.GetString("promotion_rule")),
LastSuggestedString: m.GetString("last_suggested"),
}
// add to end of candidateDatabaseInstances
candidateDatabaseInstances = append(candidateDatabaseInstances, cdi)
Expand Down
48 changes: 1 addition & 47 deletions go/inst/instance_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -723,7 +723,7 @@ func ReadTopologyInstanceBufferable(instanceKey *InstanceKey, bufferWrites bool,
// We register the rule even if it hasn't changed,
// to bump the last_suggested time.
instance.PromotionRule = promotionRule
err = RegisterCandidateInstance(NewCandidateDatabaseInstance(instanceKey, promotionRule))
err = RegisterCandidateInstance(NewCandidateDatabaseInstance(instanceKey, promotionRule).WithCurrentTime())
logReadTopologyInstanceError(instanceKey, "RegisterCandidateInstance", err)
}
}()
Expand Down Expand Up @@ -2550,52 +2550,6 @@ func ReadHistoryClusterInstances(clusterName string, historyTimestampPattern str
return instances, err
}

// RegisterCandidateInstance markes a given instance as suggested for successoring a master in the event of failover.
func RegisterCandidateInstance(candidate *CandidateDatabaseInstance) error {
args := sqlutils.Args(candidate.Hostname, candidate.Port, string(candidate.PromotionRule))
lastSuggestedHint := "now()"
if !candidate.LastSuggested.IsZero() {
lastSuggestedHint = "?"
args = append(args, candidate.LastSuggested)
}
query := fmt.Sprintf(`
insert into candidate_database_instance (
hostname,
port,
promotion_rule,
last_suggested
) values (?, ?, ?, %s)
on duplicate key update
hostname=values(hostname),
port=values(port),
last_suggested=now(),
promotion_rule=values(promotion_rule)
`, lastSuggestedHint)
writeFunc := func() error {
_, err := db.ExecOrchestrator(query, args...)
AuditOperation("register-candidate", candidate.Key(), string(candidate.PromotionRule))
return log.Errore(err)
}
return ExecDBWriteFunc(writeFunc)
}

// ExpireCandidateInstances removes stale master candidate suggestions.
func ExpireCandidateInstances() error {
writeFunc := func() error {
_, err := db.ExecOrchestrator(`
delete from candidate_database_instance
where last_suggested < NOW() - INTERVAL ? MINUTE
`, config.Config.CandidateInstanceExpireMinutes,
)
if err != nil {
return log.Errore(err)
}

return nil
}
return ExecDBWriteFunc(writeFunc)
}

// RecordInstanceCoordinatesHistory snapshots the binlog coordinates of instances
func RecordInstanceCoordinatesHistory() error {
{
Expand Down

0 comments on commit 769d6cb

Please sign in to comment.