From ded7004eca2b0690ae61acdd75a52f403a1bc850 Mon Sep 17 00:00:00 2001 From: Mark deVilliers Date: Fri, 20 Mar 2020 16:34:28 +0000 Subject: [PATCH] feat : only sync if kafka-connect is running (#66) * only sync if kafka-connect is running Signed-off-by: Mark deVilliers --- pkg/manager/manage.go | 26 +++++++++++++++++--------- pkg/manager/manager.go | 10 +++++++--- 2 files changed, 24 insertions(+), 12 deletions(-) diff --git a/pkg/manager/manage.go b/pkg/manager/manage.go index 32f6cee..2c5e5d4 100644 --- a/pkg/manager/manage.go +++ b/pkg/manager/manage.go @@ -12,22 +12,30 @@ import ( // Manage will start the connector manager running and managing connectors func (c *ConnectorManager) Manage(source ConnectorSource, stopCH <-chan struct{}) error { // mark ourselves as having an unhealthy state until we have - // tried to contact the kafka-connect instance + // successfully configured the kafka-connect instance c.readinessState = errorState syncChannel := time.NewTicker(c.config.SyncPeriod).C for { select { case <-syncChannel: - err := c.Sync(source) - if err != nil { - // set back into an unhealthy state - c.readinessState = errorState - return errors.Wrap(err, "error synchronising connectors for source") + + // we only want to try Syncing if we can contact the kafka-connect instance. + // Using the LivenessCheck as a proxy for calculating the connection + if c.livenessState == okState { + err := c.Sync(source) + if err != nil { + // set back into an unhealthy state + c.readinessState = errorState + return errors.Wrap(err, "error synchronising connectors for source") + } + // mark ourselves as being in an ok state as we have + // started syncing without any error + c.readinessState = okState + } else { + c.logger.Infof("skipping sync as livenessState == %v", c.livenessState) } - // mark ourselves as being in an ok state as we have - // started syncing without any error - c.readinessState = okState + case <-stopCH: return nil } diff --git a/pkg/manager/manager.go b/pkg/manager/manager.go index aed4e9e..ee42575 100644 --- a/pkg/manager/manager.go +++ b/pkg/manager/manager.go @@ -32,7 +32,8 @@ type ConnectorManager struct { client client logger Logger - readinessState readinessState + readinessState healthcheckState + livenessState healthcheckState } // Option can be supplied that override the default ConnectorManager properties @@ -52,6 +53,7 @@ func NewConnectorsManager(client client, config *Config, opts ...Option) (*Conne client: client, logger: newNoopLogger(), readinessState: unknownState, + livenessState: unknownState, } for _, opt := range opts { @@ -61,10 +63,10 @@ func NewConnectorsManager(client client, config *Config, opts ...Option) (*Conne return cm, nil } -type readinessState int +type healthcheckState int const ( - unknownState readinessState = iota + unknownState healthcheckState = iota okState errorState ) @@ -91,9 +93,11 @@ func (c *ConnectorManager) LivenessCheck() (string, func() error) { check := func() error { err := healthcheck.HTTPGetCheck(c.config.ClusterURL, time.Second*2)() if err != nil { + c.livenessState = errorState c.logger.Infof("healthcheck: liveness : %s", err.Error()) return err } + c.livenessState = okState c.logger.Infof("healthcheck: liveness : ok") return nil }