diff --git a/internal/ctl/connectors/manage.go b/internal/ctl/connectors/manage.go index 2a50348..b1adeec 100644 --- a/internal/ctl/connectors/manage.go +++ b/internal/ctl/connectors/manage.go @@ -21,28 +21,36 @@ import ( ) type manageConnectorsCmdParams struct { - ClusterURL string - Files []string - Directory string - EnvVar string - SyncPeriod time.Duration - SyncErrorRetryMax int - SyncErrorRetryPeriod time.Duration - AllowPurge bool - AutoRestart bool - RunOnce bool - EnableHealthCheck bool - HealthCheckAddress string - HTTPClientTimeout time.Duration + ClusterURL string + Files []string + Directory string + EnvVar string + SyncPeriod time.Duration + SyncErrorRetryMax int + SyncErrorRetryPeriod time.Duration + AllowPurge bool + AutoRestart bool + RunOnce bool + EnableHealthCheck bool + HealthCheckAddress string + HTTPClientTimeout time.Duration + GlobalConnectorRestartsMax int + GlobalConnectorRestartPeriod time.Duration + GlobalTaskRestartsMax int + GlobalTaskRestartPeriod time.Duration } -func manageConnectorsCmd() *cobra.Command { +func manageConnectorsCmd() *cobra.Command { // nolint: funlen params := &manageConnectorsCmdParams{ - SyncPeriod: 5 * time.Minute, - SyncErrorRetryMax: 10, - SyncErrorRetryPeriod: 1 * time.Minute, - HealthCheckAddress: ":9000", - HTTPClientTimeout: 20 * time.Second, + SyncPeriod: 5 * time.Minute, + SyncErrorRetryMax: 10, + SyncErrorRetryPeriod: 1 * time.Minute, + HealthCheckAddress: ":9000", + HTTPClientTimeout: 20 * time.Second, + GlobalConnectorRestartsMax: 5, + GlobalConnectorRestartPeriod: 10 * time.Second, + GlobalTaskRestartsMax: 5, + GlobalTaskRestartPeriod: 10 * time.Second, } manageCmd := &cobra.Command{ @@ -82,6 +90,18 @@ if you specify --once then it will sync once and then exit.`, manageCmd.Flags().DurationVar(¶ms.HTTPClientTimeout, "http-client-timeout", params.HTTPClientTimeout, "HTTP client timeout") _ = viper.BindPFlag("http-client-timeout", manageCmd.PersistentFlags().Lookup("http-client-timeout")) + manageCmd.Flags().IntVar(¶ms.GlobalConnectorRestartsMax, "global-connector-restarts-max", params.GlobalConnectorRestartsMax, "maximum times a failed connector will be restarted") + _ = viper.BindPFlag("global-connector-restarts-max", manageCmd.PersistentFlags().Lookup("global-connector-restarts-max")) + + manageCmd.Flags().DurationVar(¶ms.GlobalConnectorRestartPeriod, "global-connector-restart-period", params.GlobalConnectorRestartPeriod, "period of time between failed connector restarts") + _ = viper.BindPFlag("global-connector-restart-period", manageCmd.PersistentFlags().Lookup("global-connector-restart-period")) + + manageCmd.Flags().IntVar(¶ms.GlobalTaskRestartsMax, "global-task-restarts-max", params.GlobalTaskRestartsMax, "maximum times a failed task will be restarted") + _ = viper.BindPFlag("global-task-restarts-max", manageCmd.PersistentFlags().Lookup("global-task-restarts-max")) + + manageCmd.Flags().DurationVar(¶ms.GlobalTaskRestartPeriod, "global-task-restart-period", params.GlobalTaskRestartPeriod, "period of time between failed task restarts") + _ = viper.BindPFlag("global-task-restart-period", manageCmd.PersistentFlags().Lookup("global-task-restart-period")) + return manageCmd } @@ -94,11 +114,15 @@ func doManageConnectors(cmd *cobra.Command, params *manageConnectorsCmdParams) e } config := &manager.Config{ - ClusterURL: params.ClusterURL, - SyncPeriod: params.SyncPeriod, - AllowPurge: params.AllowPurge, - AutoRestart: params.AutoRestart, - Version: version.Version, + ClusterURL: params.ClusterURL, + SyncPeriod: params.SyncPeriod, + AllowPurge: params.AllowPurge, + AutoRestart: params.AutoRestart, + Version: version.Version, + GlobalConnectorRestartsMax: params.GlobalConnectorRestartsMax, + GlobalConnectorRestartPeriod: params.GlobalConnectorRestartPeriod, + GlobalTaskRestartsMax: params.GlobalTaskRestartsMax, + GlobalTaskRestartPeriod: params.GlobalTaskRestartPeriod, } logger.WithField("config", config).Trace("manage connectors configuration") diff --git a/pkg/manager/config.go b/pkg/manager/config.go index ae44733..9413471 100644 --- a/pkg/manager/config.go +++ b/pkg/manager/config.go @@ -13,7 +13,12 @@ type Config struct { Version string `json:"version"` - RestartPolicy *RestartPolicy `json:"restart_policy"` + GlobalConnectorRestartsMax int `json:"global_connector_restarts_max"` + GlobalConnectorRestartPeriod time.Duration `json:"global_connector_restart_period"` + GlobalTaskRestartsMax int `json:"global_task_restarts_max"` + GlobalTaskRestartPeriod time.Duration `json:"global_task_restart_period"` + + RestartOverrides *RestartPolicy `json:"restart_policy"` } // RestartPolicy lists each connectors maximum restart policy @@ -27,8 +32,8 @@ type RestartPolicy struct { // Policy contains a collection of values to be managed type Policy struct { - MaxConnectorRestarts int `json:"max_connector_restarts"` + ConnectorRestartsMax int `json:"connector_restarts_max"` ConnectorRestartPeriod time.Duration `json:"connector_restart_period"` - MaxTaskRestarts int `json:"max_task_restarts"` + TaskRestartsMax int `json:"task_restarts_max"` TaskRestartPeriod time.Duration `json:"task_restart_period"` } diff --git a/pkg/manager/manage.go b/pkg/manager/manage.go index a43733c..5412a5e 100644 --- a/pkg/manager/manage.go +++ b/pkg/manager/manage.go @@ -44,7 +44,7 @@ func (c *ConnectorManager) Sync(source ConnectorSource) error { // creating a runtime restart policy here, overriding with the supplied one (if any) // Ensuring that we have a policy defined for each connector we are manging here // dramatically simplifies the management and restart code - policy := runtimePolicyFromConnectors(connectors, c.config.RestartPolicy) + policy := runtimePolicyFromConnectors(connectors, c.config) if err = c.reconcileConnectors(connectors, policy); err != nil { return errors.Wrap(err, "error synchronising connectors") @@ -75,11 +75,11 @@ func (c *ConnectorManager) reconcileConnectors(connectors []connect.Connector, r func (c *ConnectorManager) autoRestart(connectors []connect.Connector, restartPolicy runtimeRestartPolicy) error { for _, connector := range connectors { name := connector.Name - err := c.retryRestartConnector(name, restartPolicy[name].MaxConnectorRestarts, restartPolicy[name].ConnectorRestartPeriod) + err := c.retryRestartConnector(name, restartPolicy[name].ConnectorRestartsMax, restartPolicy[name].ConnectorRestartPeriod) if err != nil { return err } - err = c.retryRestartConnectorTask(name, restartPolicy[name].MaxTaskRestarts, restartPolicy[name].TaskRestartPeriod) + err = c.retryRestartConnectorTask(name, restartPolicy[name].TaskRestartsMax, restartPolicy[name].TaskRestartPeriod) if err != nil { return err } diff --git a/pkg/manager/manager_test.go b/pkg/manager/manager_test.go index 63e9802..aeb725a 100644 --- a/pkg/manager/manager_test.go +++ b/pkg/manager/manager_test.go @@ -211,10 +211,10 @@ func Test_Manage_ConnectorFailed_IsRestarted_WithPolicy(t *testing.T) { config := &Config{ AutoRestart: true, - RestartPolicy: &RestartPolicy{ + RestartOverrides: &RestartPolicy{ Connectors: map[string]Policy{ "foo": Policy{ - MaxConnectorRestarts: 10, + ConnectorRestartsMax: 10, ConnectorRestartPeriod: time.Millisecond, }, }, @@ -275,12 +275,12 @@ func Test_Manage_ConnectorFailed_IsRestarted_WithPolicy_RestartWorks(t *testing. config := &Config{ AutoRestart: true, - RestartPolicy: &RestartPolicy{ + RestartOverrides: &RestartPolicy{ Connectors: map[string]Policy{ "foo": Policy{ - MaxConnectorRestarts: 10, + ConnectorRestartsMax: 10, ConnectorRestartPeriod: time.Millisecond, - MaxTaskRestarts: 0, + TaskRestartsMax: 0, TaskRestartPeriod: time.Millisecond, }, }, diff --git a/pkg/manager/restart_policy.go b/pkg/manager/restart_policy.go index fd9ca79..21a4af5 100644 --- a/pkg/manager/restart_policy.go +++ b/pkg/manager/restart_policy.go @@ -14,32 +14,48 @@ const ( defaultRestartPeriod = time.Second * 10 ) -func runtimePolicyFromConnectors(connectors []connect.Connector, overrides *RestartPolicy) runtimeRestartPolicy { +func runtimePolicyFromConnectors(connectors []connect.Connector, config *Config) runtimeRestartPolicy { // create restart policy here, overriding with any supplied values (if any) policy := runtimeRestartPolicy{} for _, c := range connectors { - policy[c.Name] = Policy{ - MaxConnectorRestarts: 1, + p := Policy{ + ConnectorRestartsMax: 1, ConnectorRestartPeriod: defaultRestartPeriod, - MaxTaskRestarts: 1, + TaskRestartsMax: 1, TaskRestartPeriod: defaultRestartPeriod, } + if config != nil { + // apply globals (if any) + if config.GlobalConnectorRestartsMax != 0 { + p.ConnectorRestartsMax = config.GlobalConnectorRestartsMax + } + if config.GlobalTaskRestartsMax != 0 { + p.TaskRestartsMax = config.GlobalTaskRestartsMax + } + if config.GlobalConnectorRestartPeriod != 0 { + p.ConnectorRestartPeriod = config.GlobalConnectorRestartPeriod + } + if config.GlobalTaskRestartPeriod != 0 { + p.TaskRestartPeriod = config.GlobalTaskRestartPeriod + } + } + policy[c.Name] = p } - // apply overrides - if overrides != nil { - for k, v := range overrides.Connectors { + // apply overrides (if any) + if config != nil && config.RestartOverrides != nil { + for k, v := range config.RestartOverrides.Connectors { p := policy[k] - if v.MaxConnectorRestarts != 0 { - p.MaxConnectorRestarts = v.MaxConnectorRestarts + if v.ConnectorRestartsMax != 0 { + p.ConnectorRestartsMax = v.ConnectorRestartsMax } if v.ConnectorRestartPeriod != 0 { p.ConnectorRestartPeriod = v.ConnectorRestartPeriod } - if v.MaxTaskRestarts != 0 { - p.MaxTaskRestarts = v.MaxTaskRestarts + if v.TaskRestartsMax != 0 { + p.TaskRestartsMax = v.TaskRestartsMax } if v.TaskRestartPeriod != 0 { p.TaskRestartPeriod = v.TaskRestartPeriod diff --git a/pkg/manager/restart_policy_test.go b/pkg/manager/restart_policy_test.go index 6f41715..742e69a 100644 --- a/pkg/manager/restart_policy_test.go +++ b/pkg/manager/restart_policy_test.go @@ -23,12 +23,37 @@ func Test_RestartPolicy_Default(t *testing.T) { foo := policy["foo"] - require.Equal(t, 1, foo.MaxConnectorRestarts) - require.Equal(t, 1, foo.MaxTaskRestarts) + require.Equal(t, 1, foo.ConnectorRestartsMax) + require.Equal(t, 1, foo.TaskRestartsMax) require.Equal(t, defaultRestartPeriod, foo.ConnectorRestartPeriod) require.Equal(t, defaultRestartPeriod, foo.TaskRestartPeriod) } +func Test_RestartPolicy_Globals(t *testing.T) { + t.Parallel() + + connectors := []connect.Connector{ + connect.Connector{Name: "foo"}, + } + + policy := runtimePolicyFromConnectors(connectors, &Config{ + GlobalConnectorRestartsMax: 97, + GlobalConnectorRestartPeriod: time.Second * 98, + GlobalTaskRestartsMax: 99, + GlobalTaskRestartPeriod: time.Second * 100, + }) + + require.Len(t, policy, 1) + require.NotNil(t, policy["foo"]) + + foo := policy["foo"] + + require.Equal(t, 97, foo.ConnectorRestartsMax) + require.Equal(t, time.Second*98, foo.ConnectorRestartPeriod) + require.Equal(t, 99, foo.TaskRestartsMax) + require.Equal(t, time.Second*100, foo.TaskRestartPeriod) +} + func Test_RestartPolicy_Override(t *testing.T) { t.Parallel() @@ -39,23 +64,24 @@ func Test_RestartPolicy_Override(t *testing.T) { ovveride := RestartPolicy{ Connectors: map[string]Policy{ "foo": Policy{ - MaxConnectorRestarts: 10, - MaxTaskRestarts: 11, + ConnectorRestartsMax: 10, + TaskRestartsMax: 11, TaskRestartPeriod: time.Second * 100, ConnectorRestartPeriod: time.Second * 101, }, }, } - policy := runtimePolicyFromConnectors(connectors, &ovveride) + config := &Config{RestartOverrides: &ovveride} + policy := runtimePolicyFromConnectors(connectors, config) require.Len(t, policy, 1) require.NotNil(t, policy["foo"]) foo := policy["foo"] - require.Equal(t, 10, foo.MaxConnectorRestarts) - require.Equal(t, 11, foo.MaxTaskRestarts) + require.Equal(t, 10, foo.ConnectorRestartsMax) + require.Equal(t, 11, foo.TaskRestartsMax) require.Equal(t, time.Second*101, foo.ConnectorRestartPeriod) require.Equal(t, time.Second*100, foo.TaskRestartPeriod) }