Skip to content

Commit

Permalink
chore : add an initial wait period before syncing (#67)
Browse files Browse the repository at this point in the history
* add an initial wait period before syncing
  • Loading branch information
mdevilliers authored Mar 25, 2020
1 parent ded7004 commit 4749dee
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 20 deletions.
4 changes: 4 additions & 0 deletions internal/ctl/connectors/manage.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type manageDefaults struct {
Files []string `envconfig:"FILES"`
Directory string `envconfig:"DIRECTORY"`
EnvVar string `envconfig:"ENV_VAR"`
InitialWaitPeriod time.Duration `envconfig:"INITIAL_WAIT_PERIOD"`
SyncPeriod time.Duration `envconfig:"SYNC_PERIOD"`
SyncErrorRetryMax int `envconfig:"SYNC_ERROR_RETRY_MAX"`
SyncErrorRetryPeriod time.Duration `envconfig:"SYNC_ERROR_RETRY_PERIOD"`
Expand All @@ -43,6 +44,7 @@ type manageDefaults struct {

func manageConnectorsCmd() *cobra.Command { // nolint: funlen
params := &manageDefaults{
InitialWaitPeriod: 3 * time.Minute,
SyncPeriod: 5 * time.Minute,
SyncErrorRetryMax: 10,
SyncErrorRetryPeriod: 1 * time.Minute,
Expand Down Expand Up @@ -78,6 +80,7 @@ if you specify --once then it will sync once and then exit.`,
ctl.AddDefinitionFilesFlags(manageCmd, &params.Files, &params.Directory, &params.EnvVar)

ctl.BindDurationVarP(manageCmd.Flags(), &params.SyncPeriod, params.SyncPeriod, "sync-period", "s", "how often to sync with the connect cluster")
ctl.BindDurationVar(manageCmd.Flags(), &params.InitialWaitPeriod, params.InitialWaitPeriod, "wait-period", "time period to wait before starting the first sync")

ctl.BindBoolVar(manageCmd.Flags(), &params.AllowPurge, false, "allow-purge", "if set connectctl will manage all connectors in a cluster. If connectors exist in the cluster that aren't specified in --files then the connectors will be deleted")
ctl.BindBoolVar(manageCmd.Flags(), &params.AutoRestart, false, "auto-restart", "if set connectors and tasks that are failed with automatically be restarted")
Expand Down Expand Up @@ -112,6 +115,7 @@ func doManageConnectors(cmd *cobra.Command, params *manageDefaults) error {

config := &manager.Config{
ClusterURL: params.ClusterURL,
InitialWaitPeriod: params.InitialWaitPeriod,
SyncPeriod: params.SyncPeriod,
AllowPurge: params.AllowPurge,
AutoRestart: params.AutoRestart,
Expand Down
9 changes: 5 additions & 4 deletions pkg/manager/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@ import (

// Config represents the connect manager configuration
type Config struct {
ClusterURL string `json:"cluster_url"`
SyncPeriod time.Duration `json:"sync_period"`
AllowPurge bool `json:"allow_purge"`
AutoRestart bool `json:"auto_restart"`
ClusterURL string `json:"cluster_url"`
SyncPeriod time.Duration `json:"sync_period"`
InitialWaitPeriod time.Duration `json:"initial_wait_period"`
AllowPurge bool `json:"allow_purge"`
AutoRestart bool `json:"auto_restart"`

Version string `json:"version"`

Expand Down
41 changes: 25 additions & 16 deletions pkg/manager/manage.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,39 +15,48 @@ func (c *ConnectorManager) Manage(source ConnectorSource, stopCH <-chan struct{}
// successfully configured the kafka-connect instance
c.readinessState = errorState

syncChannel := time.NewTicker(c.config.SyncPeriod).C
syncChannel := time.NewTicker(c.config.InitialWaitPeriod).C
for {
select {
case <-syncChannel:

// we only want to try Syncing if we can contact the kafka-connect instance.
// Using the LivenessCheck as a proxy for calculating the connection
if c.livenessState == okState {
err := c.Sync(source)
if err != nil {
// set back into an unhealthy state
c.readinessState = errorState
return errors.Wrap(err, "error synchronising connectors for source")
}
// mark ourselves as being in an ok state as we have
// started syncing without any error
c.readinessState = okState
} else {
c.logger.Infof("skipping sync as livenessState == %v", c.livenessState)
err := c.trySync(source)
if err != nil {
return errors.Wrap(err, "error synchronising connectors for source")
}
syncChannel = time.NewTicker(c.config.SyncPeriod).C

case <-stopCH:
return nil
}
}
}

func (c *ConnectorManager) trySync(source ConnectorSource) error {
// we only want to try Syncing if we can contact the kafka-connect instance.
// Using the LivenessCheck as a proxy for calculating the connection
if c.livenessState == okState {
err := c.Sync(source)
if err != nil {
// set back into an unhealthy state
c.readinessState = errorState
return err
}
// mark ourselves as being in an ok state as we have
// started syncing without any error
c.readinessState = okState
return nil
}
c.logger.Infof("skipping sync as livenessState == %v", c.livenessState)
return nil
}

// Sync will synchronise the desired and actual state of connectors in a cluster
func (c *ConnectorManager) Sync(source ConnectorSource) error {
c.logger.Infof("loading connectors")
connectors, err := source()
if err != nil {
return errors.Wrap(err, "error getting connector configurations")
return errors.Wrap(err, "error getting connectors configuration")
}
c.logger.Infof("connectors loaded : %d", len(connectors))
// creating a runtime restart policy here, overriding with the supplied one (if any)
Expand Down

0 comments on commit 4749dee

Please sign in to comment.