Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: surface some globals around restarts for the manage command #58

Merged
merged 2 commits into from
Mar 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 48 additions & 24 deletions internal/ctl/connectors/manage.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,28 +21,36 @@ import (
)

type manageConnectorsCmdParams struct {
ClusterURL string
Files []string
Directory string
EnvVar string
SyncPeriod time.Duration
SyncErrorRetryMax int
SyncErrorRetryPeriod time.Duration
AllowPurge bool
AutoRestart bool
RunOnce bool
EnableHealthCheck bool
HealthCheckAddress string
HTTPClientTimeout time.Duration
ClusterURL string
Files []string
Directory string
EnvVar string
SyncPeriod time.Duration
SyncErrorRetryMax int
SyncErrorRetryPeriod time.Duration
AllowPurge bool
AutoRestart bool
RunOnce bool
EnableHealthCheck bool
HealthCheckAddress string
HTTPClientTimeout time.Duration
GlobalConnectorRestartsMax int
GlobalConnectorRestartPeriod time.Duration
GlobalTaskRestartsMax int
GlobalTaskRestartPeriod time.Duration
}

func manageConnectorsCmd() *cobra.Command {
func manageConnectorsCmd() *cobra.Command { // nolint: funlen
params := &manageConnectorsCmdParams{
SyncPeriod: 5 * time.Minute,
SyncErrorRetryMax: 10,
SyncErrorRetryPeriod: 1 * time.Minute,
HealthCheckAddress: ":9000",
HTTPClientTimeout: 20 * time.Second,
SyncPeriod: 5 * time.Minute,
SyncErrorRetryMax: 10,
SyncErrorRetryPeriod: 1 * time.Minute,
HealthCheckAddress: ":9000",
HTTPClientTimeout: 20 * time.Second,
GlobalConnectorRestartsMax: 5,
GlobalConnectorRestartPeriod: 10 * time.Second,
GlobalTaskRestartsMax: 5,
GlobalTaskRestartPeriod: 10 * time.Second,
}

manageCmd := &cobra.Command{
Expand Down Expand Up @@ -82,6 +90,18 @@ if you specify --once then it will sync once and then exit.`,
manageCmd.Flags().DurationVar(&params.HTTPClientTimeout, "http-client-timeout", params.HTTPClientTimeout, "HTTP client timeout")
_ = viper.BindPFlag("http-client-timeout", manageCmd.PersistentFlags().Lookup("http-client-timeout"))

manageCmd.Flags().IntVar(&params.GlobalConnectorRestartsMax, "global-connector-restarts-max", params.GlobalConnectorRestartsMax, "maximum times a failed connector will be restarted")
_ = viper.BindPFlag("global-connector-restarts-max", manageCmd.PersistentFlags().Lookup("global-connector-restarts-max"))

manageCmd.Flags().DurationVar(&params.GlobalConnectorRestartPeriod, "global-connector-restart-period", params.GlobalConnectorRestartPeriod, "period of time between failed connector restarts")
_ = viper.BindPFlag("global-connector-restart-period", manageCmd.PersistentFlags().Lookup("global-connector-restart-period"))

manageCmd.Flags().IntVar(&params.GlobalTaskRestartsMax, "global-task-restarts-max", params.GlobalTaskRestartsMax, "maximum times a failed task will be restarted")
_ = viper.BindPFlag("global-task-restarts-max", manageCmd.PersistentFlags().Lookup("global-task-restarts-max"))

manageCmd.Flags().DurationVar(&params.GlobalTaskRestartPeriod, "global-task-restart-period", params.GlobalTaskRestartPeriod, "period of time between failed task restarts")
_ = viper.BindPFlag("global-task-restart-period", manageCmd.PersistentFlags().Lookup("global-task-restart-period"))

return manageCmd
}

Expand All @@ -94,11 +114,15 @@ func doManageConnectors(cmd *cobra.Command, params *manageConnectorsCmdParams) e
}

config := &manager.Config{
ClusterURL: params.ClusterURL,
SyncPeriod: params.SyncPeriod,
AllowPurge: params.AllowPurge,
AutoRestart: params.AutoRestart,
Version: version.Version,
ClusterURL: params.ClusterURL,
SyncPeriod: params.SyncPeriod,
AllowPurge: params.AllowPurge,
AutoRestart: params.AutoRestart,
Version: version.Version,
GlobalConnectorRestartsMax: params.GlobalConnectorRestartsMax,
GlobalConnectorRestartPeriod: params.GlobalConnectorRestartPeriod,
GlobalTaskRestartsMax: params.GlobalTaskRestartsMax,
GlobalTaskRestartPeriod: params.GlobalTaskRestartPeriod,
}

logger.WithField("config", config).Trace("manage connectors configuration")
Expand Down
11 changes: 8 additions & 3 deletions pkg/manager/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,12 @@ type Config struct {

Version string `json:"version"`

RestartPolicy *RestartPolicy `json:"restart_policy"`
GlobalConnectorRestartsMax int `json:"global_connector_restarts_max"`
GlobalConnectorRestartPeriod time.Duration `json:"global_connector_restart_period"`
GlobalTaskRestartsMax int `json:"global_task_restarts_max"`
GlobalTaskRestartPeriod time.Duration `json:"global_task_restart_period"`

RestartOverrides *RestartPolicy `json:"restart_policy"`
}

// RestartPolicy lists each connectors maximum restart policy
Expand All @@ -27,8 +32,8 @@ type RestartPolicy struct {

// Policy contains a collection of values to be managed
type Policy struct {
MaxConnectorRestarts int `json:"max_connector_restarts"`
ConnectorRestartsMax int `json:"connector_restarts_max"`
ConnectorRestartPeriod time.Duration `json:"connector_restart_period"`
MaxTaskRestarts int `json:"max_task_restarts"`
TaskRestartsMax int `json:"task_restarts_max"`
TaskRestartPeriod time.Duration `json:"task_restart_period"`
}
6 changes: 3 additions & 3 deletions pkg/manager/manage.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (c *ConnectorManager) Sync(source ConnectorSource) error {
// creating a runtime restart policy here, overriding with the supplied one (if any)
// Ensuring that we have a policy defined for each connector we are manging here
// dramatically simplifies the management and restart code
policy := runtimePolicyFromConnectors(connectors, c.config.RestartPolicy)
policy := runtimePolicyFromConnectors(connectors, c.config)

if err = c.reconcileConnectors(connectors, policy); err != nil {
return errors.Wrap(err, "error synchronising connectors")
Expand Down Expand Up @@ -75,11 +75,11 @@ func (c *ConnectorManager) reconcileConnectors(connectors []connect.Connector, r
func (c *ConnectorManager) autoRestart(connectors []connect.Connector, restartPolicy runtimeRestartPolicy) error {
for _, connector := range connectors {
name := connector.Name
err := c.retryRestartConnector(name, restartPolicy[name].MaxConnectorRestarts, restartPolicy[name].ConnectorRestartPeriod)
err := c.retryRestartConnector(name, restartPolicy[name].ConnectorRestartsMax, restartPolicy[name].ConnectorRestartPeriod)
if err != nil {
return err
}
err = c.retryRestartConnectorTask(name, restartPolicy[name].MaxTaskRestarts, restartPolicy[name].TaskRestartPeriod)
err = c.retryRestartConnectorTask(name, restartPolicy[name].TaskRestartsMax, restartPolicy[name].TaskRestartPeriod)
if err != nil {
return err
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/manager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,10 +211,10 @@ func Test_Manage_ConnectorFailed_IsRestarted_WithPolicy(t *testing.T) {

config := &Config{
AutoRestart: true,
RestartPolicy: &RestartPolicy{
RestartOverrides: &RestartPolicy{
Connectors: map[string]Policy{
"foo": Policy{
MaxConnectorRestarts: 10,
ConnectorRestartsMax: 10,
ConnectorRestartPeriod: time.Millisecond,
},
},
Expand Down Expand Up @@ -275,12 +275,12 @@ func Test_Manage_ConnectorFailed_IsRestarted_WithPolicy_RestartWorks(t *testing.

config := &Config{
AutoRestart: true,
RestartPolicy: &RestartPolicy{
RestartOverrides: &RestartPolicy{
Connectors: map[string]Policy{
"foo": Policy{
MaxConnectorRestarts: 10,
ConnectorRestartsMax: 10,
ConnectorRestartPeriod: time.Millisecond,
MaxTaskRestarts: 0,
TaskRestartsMax: 0,
TaskRestartPeriod: time.Millisecond,
},
},
Expand Down
38 changes: 27 additions & 11 deletions pkg/manager/restart_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,32 +14,48 @@ const (
defaultRestartPeriod = time.Second * 10
)

func runtimePolicyFromConnectors(connectors []connect.Connector, overrides *RestartPolicy) runtimeRestartPolicy {
func runtimePolicyFromConnectors(connectors []connect.Connector, config *Config) runtimeRestartPolicy {
// create restart policy here, overriding with any supplied values (if any)
policy := runtimeRestartPolicy{}

for _, c := range connectors {
policy[c.Name] = Policy{
MaxConnectorRestarts: 1,
p := Policy{
ConnectorRestartsMax: 1,
ConnectorRestartPeriod: defaultRestartPeriod,
MaxTaskRestarts: 1,
TaskRestartsMax: 1,
TaskRestartPeriod: defaultRestartPeriod,
}
if config != nil {
// apply globals (if any)
if config.GlobalConnectorRestartsMax != 0 {
p.ConnectorRestartsMax = config.GlobalConnectorRestartsMax
}
if config.GlobalTaskRestartsMax != 0 {
p.TaskRestartsMax = config.GlobalTaskRestartsMax
}
if config.GlobalConnectorRestartPeriod != 0 {
p.ConnectorRestartPeriod = config.GlobalConnectorRestartPeriod
}
if config.GlobalTaskRestartPeriod != 0 {
p.TaskRestartPeriod = config.GlobalTaskRestartPeriod
}
}
policy[c.Name] = p
}

// apply overrides
if overrides != nil {
for k, v := range overrides.Connectors {
// apply overrides (if any)
if config != nil && config.RestartOverrides != nil {
for k, v := range config.RestartOverrides.Connectors {
p := policy[k]

if v.MaxConnectorRestarts != 0 {
p.MaxConnectorRestarts = v.MaxConnectorRestarts
if v.ConnectorRestartsMax != 0 {
p.ConnectorRestartsMax = v.ConnectorRestartsMax
}
if v.ConnectorRestartPeriod != 0 {
p.ConnectorRestartPeriod = v.ConnectorRestartPeriod
}
if v.MaxTaskRestarts != 0 {
p.MaxTaskRestarts = v.MaxTaskRestarts
if v.TaskRestartsMax != 0 {
p.TaskRestartsMax = v.TaskRestartsMax
}
if v.TaskRestartPeriod != 0 {
p.TaskRestartPeriod = v.TaskRestartPeriod
Expand Down
40 changes: 33 additions & 7 deletions pkg/manager/restart_policy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,37 @@ func Test_RestartPolicy_Default(t *testing.T) {

foo := policy["foo"]

require.Equal(t, 1, foo.MaxConnectorRestarts)
require.Equal(t, 1, foo.MaxTaskRestarts)
require.Equal(t, 1, foo.ConnectorRestartsMax)
require.Equal(t, 1, foo.TaskRestartsMax)
require.Equal(t, defaultRestartPeriod, foo.ConnectorRestartPeriod)
require.Equal(t, defaultRestartPeriod, foo.TaskRestartPeriod)
}

func Test_RestartPolicy_Globals(t *testing.T) {
t.Parallel()

connectors := []connect.Connector{
connect.Connector{Name: "foo"},
}

policy := runtimePolicyFromConnectors(connectors, &Config{
GlobalConnectorRestartsMax: 97,
GlobalConnectorRestartPeriod: time.Second * 98,
GlobalTaskRestartsMax: 99,
GlobalTaskRestartPeriod: time.Second * 100,
})

require.Len(t, policy, 1)
require.NotNil(t, policy["foo"])

foo := policy["foo"]

require.Equal(t, 97, foo.ConnectorRestartsMax)
require.Equal(t, time.Second*98, foo.ConnectorRestartPeriod)
require.Equal(t, 99, foo.TaskRestartsMax)
require.Equal(t, time.Second*100, foo.TaskRestartPeriod)
}

func Test_RestartPolicy_Override(t *testing.T) {
t.Parallel()

Expand All @@ -39,23 +64,24 @@ func Test_RestartPolicy_Override(t *testing.T) {
ovveride := RestartPolicy{
Connectors: map[string]Policy{
"foo": Policy{
MaxConnectorRestarts: 10,
MaxTaskRestarts: 11,
ConnectorRestartsMax: 10,
TaskRestartsMax: 11,
TaskRestartPeriod: time.Second * 100,
ConnectorRestartPeriod: time.Second * 101,
},
},
}

policy := runtimePolicyFromConnectors(connectors, &ovveride)
config := &Config{RestartOverrides: &ovveride}
policy := runtimePolicyFromConnectors(connectors, config)

require.Len(t, policy, 1)
require.NotNil(t, policy["foo"])

foo := policy["foo"]

require.Equal(t, 10, foo.MaxConnectorRestarts)
require.Equal(t, 11, foo.MaxTaskRestarts)
require.Equal(t, 10, foo.ConnectorRestartsMax)
require.Equal(t, 11, foo.TaskRestartsMax)
require.Equal(t, time.Second*101, foo.ConnectorRestartPeriod)
require.Equal(t, time.Second*100, foo.TaskRestartPeriod)
}