Skip to content

Commit

Permalink
Linting
Browse files Browse the repository at this point in the history
  • Loading branch information
Victor Castell committed Jul 4, 2019
1 parent 11a8973 commit 6ddab6e
Show file tree
Hide file tree
Showing 7 changed files with 44 additions and 13 deletions.
5 changes: 5 additions & 0 deletions dkron/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,14 @@ type Agent struct {
// of a processor.
type ProcessorFactory func() (ExecutionProcessor, error)

// Plugins struct to store loaded plugins of each type
type Plugins struct {
Processors map[string]ExecutionProcessor
Executors map[string]Executor
}

// NewAgent return a new Agent instace capable of starting
// and running a Dkron instance.
func NewAgent(config *Config, plugins *Plugins) *Agent {
a := &Agent{config: config}

Expand All @@ -108,6 +111,8 @@ func NewAgent(config *Config, plugins *Plugins) *Agent {
return a
}

// Start the current agent by running all the necessary
// checks and server or client routines.
func (a *Agent) Start() error {
s, err := a.setupSerf()
if err != nil {
Expand Down
4 changes: 3 additions & 1 deletion dkron/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ type Config struct {
// DefaultBindPort is the default port that dkron will use for Serf communication
const DefaultBindPort int = 8946

// DefaultConfig returns a Config struct pointer with sensible
// default settings.
func DefaultConfig() *Config {
hostname, err := os.Hostname()
if err != nil {
Expand All @@ -105,7 +107,7 @@ func DefaultConfig() *Config {
}
}

// configFlagSet creates all of our configuration flags.
// ConfigFlagSet creates all of our configuration flags.
func ConfigFlagSet() *flag.FlagSet {
c := DefaultConfig()
cmdFlags := flag.NewFlagSet("agent flagset", flag.ContinueOnError)
Expand Down
4 changes: 3 additions & 1 deletion dkron/dashboard.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func newCommonDashboardData(a *Agent, nodeName, path string) *commonDashboardDat
}
}

// dashboardRoutes registers dashboard specific routes on the gin RouterGroup.
// DashboardRoutes registers dashboard specific routes on the gin RouterGroup.
func (a *Agent) DashboardRoutes(r *gin.RouterGroup) {
// If we are visiting from a browser redirect to the dashboard
r.GET("/", func(c *gin.Context) {
Expand Down Expand Up @@ -124,6 +124,8 @@ func mustLoadTemplate(path string) []byte {
return tmpl
}

// CreateMyRender returns a new custom multitemplate renderer
// to use with Gin.
func CreateMyRender() multitemplate.Render {
r := multitemplate.New()

Expand Down
3 changes: 3 additions & 0 deletions dkron/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ func NewExecutionFromProto(e *proto.Execution) *Execution {
}
}

// ToProto returns the protobuf struct corresponding to
// the representation of the current execution.
func (e *Execution) ToProto() *proto.Execution {
startedAt, _ := ptypes.TimestampProto(e.StartedAt)
finishedAt, _ := ptypes.TimestampProto(e.FinishedAt)
Expand All @@ -81,6 +83,7 @@ func (e *Execution) Key() string {
return fmt.Sprintf("%d-%s", e.StartedAt.UnixNano(), e.NodeName)
}

// GetGroup is the getter for the execution group.
func (e *Execution) GetGroup() string {
return strconv.FormatInt(e.Group, 10)
}
23 changes: 15 additions & 8 deletions dkron/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,20 @@ import (
dkronpb "github.com/victorcoder/dkron/proto"
)

// MessageType is the type to encode FSM commands.
type MessageType uint8

const (
// SetJobType is the command used to store a job in the store.
SetJobType MessageType = iota
// DeleteJobType is the command used to delete a Job from the store.
DeleteJobType
// SetExecutionType is the command used to store an Execution to the store.
SetExecutionType
// DeleteExecutionsType is the command used to delete executions from the store.
DeleteExecutionsType
// ExecutionDoneType is the command to perform the logic needed once an exeuction
// is done.
ExecutionDoneType
)

Expand All @@ -25,7 +32,7 @@ type dkronFSM struct {
mu sync.Mutex
}

// NewFSMPath is used to construct a new FSM with a blank state
// NewFSM is used to construct a new FSM with a blank state
func NewFSM(store Storage) *dkronFSM {
return &dkronFSM{
store: store,
Expand Down Expand Up @@ -110,9 +117,9 @@ func (d *dkronFSM) applySetExecution(buf []byte) interface{} {
// Persist encodes the needed data from fsmsnapshot and transport it to
// Restore where the necessary data is replicated into the finite state machine.
// This allows the consensus algorithm to truncate the replicated log.
func (f *dkronFSM) Snapshot() (raft.FSMSnapshot, error) {
f.mu.Lock()
defer f.mu.Unlock()
func (d *dkronFSM) Snapshot() (raft.FSMSnapshot, error) {
d.mu.Lock()
defer d.mu.Unlock()

// Clone the kvstore into a map for easy transport
mapClone := make(map[string]string)
Expand All @@ -128,7 +135,7 @@ func (f *dkronFSM) Snapshot() (raft.FSMSnapshot, error) {
}

// Restore stores the key-value store to a previous state.
func (f *dkronFSM) Restore(kvMap io.ReadCloser) error {
func (d *dkronFSM) Restore(kvMap io.ReadCloser) error {
kvSnapshot := make(map[string]string)
if err := json.NewDecoder(kvMap).Decode(&kvSnapshot); err != nil {
return err
Expand All @@ -147,10 +154,10 @@ type dkronSnapshot struct {
kvMap map[string]string
}

func (f *dkronSnapshot) Persist(sink raft.SnapshotSink) error {
func (d *dkronSnapshot) Persist(sink raft.SnapshotSink) error {
err := func() error {
// Encode data.
b, err := json.Marshal(f.kvMap)
b, err := json.Marshal(d.kvMap)
if err != nil {
return err
}
Expand All @@ -176,4 +183,4 @@ func (f *dkronSnapshot) Persist(sink raft.SnapshotSink) error {
return nil
}

func (f *dkronSnapshot) Release() {}
func (d *dkronSnapshot) Release() {}
11 changes: 9 additions & 2 deletions dkron/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,23 @@ import (
)

var (
// ErrExecutionDoneForDeletedJob is returned when an execution done
// is received for a non existent job.
ErrExecutionDoneForDeletedJob = errors.New("rpc: Received execution done for a deleted job")
ErrRPCDialing = errors.New("rpc: Error dialing, verify the network connection to the server")
ErrNotLeader = errors.New("Error, server is not leader, this operation should be run on the leader")
// ErrRPCDialing is returned on dialing fail.
ErrRPCDialing = errors.New("rpc: Error dialing, verify the network connection to the server")
// ErrNotLeader is the error returned when the operation need the node to be the leader,
// but the current node is not the leader.
ErrNotLeader = errors.New("Error, server is not leader, this operation should be run on the leader")
)

// DkronGRPCServer defines the basics that a gRPC server should implement.
type DkronGRPCServer interface {
proto.DkronServer
Serve(net.Listener) error
}

// GRPCServer is the local implementation of the gRPC server interface.
type GRPCServer struct {
agent *Agent
}
Expand Down
7 changes: 6 additions & 1 deletion dkron/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"google.golang.org/grpc"
)

// DkronGRPCClient defines the interface that any gRPC client for
// dkron should implement.
type DkronGRPCClient interface {
Connect(string) (*grpc.ClientConn, error)
CallExecutionDone(string, *Execution) error
Expand All @@ -23,11 +25,13 @@ type DkronGRPCClient interface {
RaftRemovePeerByID(string, string) error
}

// GRPCClient is the local implementation of the DkronGRPCClient interface.
type GRPCClient struct {
dialOpt []grpc.DialOption
agent *Agent
}

// NewGRPCClient returns a new instance of the gRPC client.
func NewGRPCClient(dialOpt grpc.DialOption, agent *Agent) DkronGRPCClient {
if dialOpt == nil {
dialOpt = grpc.WithInsecure()
Expand All @@ -42,6 +46,7 @@ func NewGRPCClient(dialOpt grpc.DialOption, agent *Agent) DkronGRPCClient {
}
}

// Connect dialing to a gRPC server
func (grpcc *GRPCClient) Connect(addr string) (*grpc.ClientConn, error) {
// Initiate a connection with the server
conn, err := grpc.Dial(addr, grpcc.dialOpt...)
Expand Down Expand Up @@ -276,7 +281,7 @@ func (grpcc *GRPCClient) RaftGetConfiguration(addr string) (*proto.RaftGetConfig
return res, nil
}

// RaftRemovePeerByAddress remove a raft peer
// RaftRemovePeerByID remove a raft peer
func (grpcc *GRPCClient) RaftRemovePeerByID(addr, peerID string) error {
var conn *grpc.ClientConn

Expand Down

0 comments on commit 6ddab6e

Please sign in to comment.