Skip to content

Commit

Permalink
Linting
Browse files Browse the repository at this point in the history
  • Loading branch information
Victor Castell committed Jul 9, 2019
1 parent e32262f commit c474d13
Show file tree
Hide file tree
Showing 9 changed files with 56 additions and 17 deletions.
2 changes: 1 addition & 1 deletion dkron/hclog_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ type HCLogAdapter struct {
Name string
}

// HCLog has one more level than we do. As such, we will never
// Trace HCLog has one more level than we do. As such, we will never
// set trace level.
func (*HCLogAdapter) Trace(_ string, _ ...interface{}) {
return
Expand Down
4 changes: 2 additions & 2 deletions dkron/invoke.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ const (
maxBufSize = 256000
)

// Return an error in case no suitable server to send the request is found.
var ErrNoSuitableServer = errors.New("No suitable server found to send the request, aborting.")
// ErrNoSuitableServer returns an error in case no suitable server to send the request is found.
var ErrNoSuitableServer = errors.New("no suitable server found to send the request, aborting")

// invokeJob will execute the given job. Depending on the event.
func (a *Agent) invokeJob(job *Job, execution *Execution) error {
Expand Down
11 changes: 6 additions & 5 deletions dkron/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,15 @@ import (
)

const (
// StatusNotSet is the initial job status.
StatusNotSet = ""
// Success is status of a job whose last run was a success.
// StatusSuccess is status of a job whose last run was a success.
StatusSuccess = "success"
// Running is status of a job whose last run has not finished.
// StatusRunning is status of a job whose last run has not finished.
StatusRunning = "running"
// Failed is status of a job whose last run was not successful on any nodes.
// StatusFailed is status of a job whose last run was not successful on any nodes.
StatusFailed = "failed"
// PartialyFailed is status of a job whose last run was successful on only some nodes.
// StatusPartialyFailed is status of a job whose last run was successful on only some nodes.
StatusPartialyFailed = "partially_failed"

// ConcurrencyAllow allows a job to execute concurrency.
Expand Down Expand Up @@ -212,7 +213,7 @@ func (j *Job) String() string {
return fmt.Sprintf("\"Job: %s, scheduled at: %s, tags:%v\"", j.Name, j.Schedule, j.Tags)
}

// Status returns the status of a job whether it's running, succeded or failed
// GetStatus returns the status of a job whether it's running, succeded or failed
func (j *Job) GetStatus() string {
// Maybe we are testing
if j.Agent == nil {
Expand Down
3 changes: 3 additions & 0 deletions dkron/notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@ import (
"github.com/sirupsen/logrus"
)

// Notifier represents a new notification to be sent by any of the available notificators
type Notifier struct {
Config *Config
Job *Job
Execution *Execution
ExecutionGroup []*Execution
}

// Notification creates a new Notifier instance
func Notification(config *Config, execution *Execution, exGroup []*Execution, job *Job) *Notifier {
return &Notifier{
Config: config,
Expand All @@ -30,6 +32,7 @@ func Notification(config *Config, execution *Execution, exGroup []*Execution, jo
}
}

// Send sends the notifications using any configured method
func (n *Notifier) Send() {
if n.Config.MailHost != "" && n.Config.MailPort != 0 && n.Job.OwnerEmail != "" {
n.sendExecutionEmail()
Expand Down
17 changes: 11 additions & 6 deletions dkron/queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,24 @@ import (
)

const (
// QuerySchedulerRestart define the string to be sent
QuerySchedulerRestart = "scheduler:restart"
QueryRunJob = "run:job"
QueryExecutionDone = "execution:done"
// QueryRunJob define a run job query string
QueryRunJob = "run:job"
// QueryExecutionDone define the execution done query string
QueryExecutionDone = "execution:done"
)

var rescheduleThrotle *time.Timer

// RunQueryParam defines the struct used to send a Run query
// using serf.
type RunQueryParam struct {
Execution *Execution `json:"execution"`
RPCAddr string `json:"rpc_addr"`
}

// Send a serf run query to the cluster, this is used to ask a node or nodes
// RunQuery sends a serf run query to the cluster, this is used to ask a node or nodes
// to run a Job.
func (a *Agent) RunQuery(job *Job, ex *Execution) {
var params *serf.QueryParam
Expand Down Expand Up @@ -76,7 +81,7 @@ func (a *Agent) RunQuery(job *Job, ex *Execution) {
Execution: ex,
RPCAddr: a.getRPCAddr(),
}
rqpJson, _ := json.Marshal(rqp)
rqpJSON, _ := json.Marshal(rqp)

log.WithFields(logrus.Fields{
"query": QueryRunJob,
Expand All @@ -86,10 +91,10 @@ func (a *Agent) RunQuery(job *Job, ex *Execution) {
log.WithFields(logrus.Fields{
"query": QueryRunJob,
"job_name": job.Name,
"json": string(rqpJson),
"json": string(rqpJSON),
}).Debug("agent: Sending query")

qr, err := a.serf.Query(QueryRunJob, rqpJson, params)
qr, err := a.serf.Query(QueryRunJob, rqpJSON, params)
if err != nil {
log.WithField("query", QueryRunJob).WithError(err).Fatal("agent: Sending query error")
}
Expand Down
12 changes: 12 additions & 0 deletions dkron/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,12 @@ var (
cronInspect = expvar.NewMap("cron_entries")
schedulerStarted = expvar.NewInt("scheduler_started")

// ErrScheduleParse is the error returned when the schdule parsing fails.
ErrScheduleParse = errors.New("Can't parse job schedule")
)

// Cron interface is the minimum set of methods that a Cron
// engine should implement to work with Dkron.
type Cron interface {
Start()
Stop()
Expand All @@ -26,17 +29,22 @@ type Cron interface {
AddTimezoneSensitiveJob(spec, timezone string, cmd cron.Job) error
}

// Scheduler represents a dkron scheduler instance, it stores the cron engine
// and the related parameters.
type Scheduler struct {
Cron Cron
Started bool
}

// NewScheduler creates a new Scheduler instance
func NewScheduler() *Scheduler {
c := cron.New()
schedulerStarted.Set(0)
return &Scheduler{Cron: c, Started: false}
}

// Start thee cron scheduler adding it's corresponding jobs and
// executiong them on time.
func (s *Scheduler) Start(jobs []*Job) {
metrics.IncrCounter([]string{"scheduler", "start"}, 1)
for _, job := range jobs {
Expand All @@ -63,6 +71,7 @@ func (s *Scheduler) Start(jobs []*Job) {
schedulerStarted.Set(1)
}

// Stop stop the scheduler efectively not running any job.
func (s *Scheduler) Stop() {
if s.Started {
log.Debug("scheduler: Stopping scheduler")
Expand All @@ -78,11 +87,14 @@ func (s *Scheduler) Stop() {
schedulerStarted.Set(0)
}

// Restart thee scheduler
func (s *Scheduler) Restart(jobs []*Job) {
s.Stop()
s.Start(jobs)
}

// GetEntry returns a scheduler entry from a snapshot in
// the current time.
func (s *Scheduler) GetEntry(job *Job) *cron.Entry {
for _, e := range s.Cron.Entries() {
j, _ := e.Job.(*Job)
Expand Down
4 changes: 4 additions & 0 deletions dkron/storage.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package dkron

// Storage is the interface that should be used by any
// storage engine implemented for dkron. It contains the
// minumum set of operations that are needed to have a working
// dkron store.
type Storage interface {
SetJob(job *Job, copyDependentJobs bool) error
DeleteJob(name string) (*Job, error)
Expand Down
16 changes: 13 additions & 3 deletions dkron/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ import (
)

const (
MaxExecutions = 100
// MaxExecutions to maintain in the storage
MaxExecutions = 100

defaultUpdateMaxAttempts = 5
defaultGCInterval = 5 * time.Minute
defaultGCDiscardRatio = 0.7
Expand All @@ -28,18 +30,23 @@ var (
ErrTooManyUpdateConflicts = errors.New("badger: too many transaction conflicts")
)

// Store is the local implementation of the Storage interface.
// It gives dkron the ability to manipulate its embedded storage
// BadgerDB.
type Store struct {
agent *Agent
db *badger.DB
lock *sync.Mutex // for
closed bool
}

// JobOptions additional options to apply when loading a Job.
type JobOptions struct {
ComputeStatus bool
Metadata map[string]string `json:"tags"`
}

// NewStore creates a new Storage instance.
func NewStore(a *Agent, dir string) (*Store, error) {
opts := badger.DefaultOptions
opts.Dir = dir
Expand Down Expand Up @@ -83,7 +90,7 @@ func (s *Store) runGcLoop() {
}
}

// Store a job
// SetJob stores a job in the storage
func (s *Store) SetJob(job *Job, copyDependentJobs bool) error {
//Existing job that has children, let's keep it's children

Expand Down Expand Up @@ -409,6 +416,7 @@ func (s *Store) DeleteJob(name string) (*Job, error) {
return job, err
}

// GetExecutions returns the exections given a Job name.
func (s *Store) GetExecutions(jobName string) ([]*Execution, error) {
prefix := fmt.Sprintf("executions/%s", jobName)

Expand Down Expand Up @@ -466,6 +474,7 @@ func (s *Store) list(prefix string, checkRoot bool) ([]*kv, error) {
return kvs, err
}

// GetLastExecutionGroup get last execution group given the Job name.
func (s *Store) GetLastExecutionGroup(jobName string) ([]*Execution, error) {
executions, byGroup, err := s.GetGroupedExecutions(jobName)
if err != nil {
Expand Down Expand Up @@ -495,7 +504,7 @@ func (s *Store) GetExecutionGroup(execution *Execution) ([]*Execution, error) {
return executions, nil
}

// Returns executions for a job grouped and with an ordered index
// GetGroupedExecutions returns executions for a job grouped and with an ordered index
// to facilitate access.
func (s *Store) GetGroupedExecutions(jobName string) (map[int64][]*Execution, []int64, error) {
execs, err := s.GetExecutions(jobName)
Expand Down Expand Up @@ -656,6 +665,7 @@ ConflictRetry:
return ErrTooManyUpdateConflicts
}

// Shutdown close the KV store
func (s *Store) Shutdown() error {
return s.db.Close()
}
Expand Down
4 changes: 4 additions & 0 deletions dkron/version.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
package dkron

// Name store the name of this software
var Name = "Dkron"

// Version is the current version that will get replaced
// on build.
var Version = "devel"

0 comments on commit c474d13

Please sign in to comment.