diff --git a/components/configstores/etcdv3/etcdv3.go b/components/configstores/etcdv3/etcdv3.go index f5e53ae659..a0fa4bd8fc 100644 --- a/components/configstores/etcdv3/etcdv3.go +++ b/components/configstores/etcdv3/etcdv3.go @@ -10,6 +10,7 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. + package etcdv3 import ( @@ -20,6 +21,7 @@ import ( "sync" "time" + "mosn.io/layotto/components/pkg/actuators" "mosn.io/pkg/utils" "go.etcd.io/etcd/api/v3/mvccpb" @@ -31,10 +33,22 @@ import ( ) const ( - defaultGroup = "default" - defaultLabel = "default" + defaultGroup = "default" + defaultLabel = "default" + componentName = "configstore-etcdv3" +) + +var ( + once sync.Once + readinessIndicator *actuators.HealthIndicator + livenessIndicator *actuators.HealthIndicator ) +func init() { + readinessIndicator = actuators.NewHealthIndicator() + livenessIndicator = actuators.NewHealthIndicator() +} + type EtcdV3ConfigStore struct { client *clientv3.Client sync.RWMutex @@ -56,6 +70,10 @@ func (c *EtcdV3ConfigStore) GetDefaultLabel() string { } func NewStore() configstores.Store { + once.Do(func() { + indicators := &actuators.ComponentsIndicator{ReadinessIndicator: readinessIndicator, LivenessIndicator: livenessIndicator} + actuators.SetComponentsIndicator(componentName, indicators) + }) return &EtcdV3ConfigStore{subscribeKey: make(map[string]string), watchRespCh: make(chan *configstores.SubscribeResp)} } @@ -71,6 +89,13 @@ func (c *EtcdV3ConfigStore) Init(config *configstores.StoreConfig) error { DialTimeout: time.Duration(t) * time.Second, }) c.storeName = config.StoreName + if err != nil { + readinessIndicator.ReportError(err.Error()) + livenessIndicator.ReportError(err.Error()) + } else { + readinessIndicator.SetStarted() + livenessIndicator.SetStarted() + } return err } diff --git a/components/configstores/nacos/configstore.go b/components/configstores/nacos/configstore.go index 2527391795..27389b4eea 100644 --- a/components/configstores/nacos/configstore.go +++ b/components/configstores/nacos/configstore.go @@ -30,8 +30,24 @@ import ( "mosn.io/pkg/log" "mosn.io/layotto/components/configstores" + "mosn.io/layotto/components/pkg/actuators" ) +const ( + componentName = "configstore-nacos" +) + +var ( + once sync.Once + readinessIndicator *actuators.HealthIndicator + livenessIndicator *actuators.HealthIndicator +) + +func init() { + readinessIndicator = actuators.NewHealthIndicator() + livenessIndicator = actuators.NewHealthIndicator() +} + type ConfigStore struct { client config_client.IConfigClient storeName string @@ -41,6 +57,10 @@ type ConfigStore struct { } func NewStore() configstores.Store { + once.Do(func() { + indicators := &actuators.ComponentsIndicator{ReadinessIndicator: readinessIndicator, LivenessIndicator: livenessIndicator} + actuators.SetComponentsIndicator(componentName, indicators) + }) return &ConfigStore{} } @@ -91,10 +111,15 @@ func (n *ConfigStore) Init(config *configstores.StoreConfig) (err error) { } else { client, err = n.init(config.Address, timeoutMs, metadata) } + if err != nil { + readinessIndicator.ReportError(err.Error()) + livenessIndicator.ReportError(err.Error()) return err + } else { + readinessIndicator.SetStarted() + livenessIndicator.SetStarted() } - n.client = client // replace nacos sdk log return n.setupLogger(metadata) diff --git a/components/cryption/aliyun/kms.go b/components/cryption/aliyun/kms.go index 18f76da25e..e2682840af 100644 --- a/components/cryption/aliyun/kms.go +++ b/components/cryption/aliyun/kms.go @@ -18,6 +18,9 @@ package aliyun import ( "context" "fmt" + "sync" + + "mosn.io/layotto/components/pkg/actuators" openapi "github.com/alibabacloud-go/darabonba-openapi/v2/client" kms20160120 "github.com/alibabacloud-go/kms-20160120/v3/client" @@ -27,6 +30,21 @@ import ( "mosn.io/layotto/components/cryption" ) +const ( + componentName = "kms-aliyun" +) + +var ( + once sync.Once + readinessIndicator *actuators.HealthIndicator + livenessIndicator *actuators.HealthIndicator +) + +func init() { + readinessIndicator = actuators.NewHealthIndicator() + livenessIndicator = actuators.NewHealthIndicator() +} + type cy struct { client *kms20160120.Client keyID string @@ -36,6 +54,10 @@ type cy struct { refer: https://help.aliyun.com/document_detail/611325.html */ func NewCryption() cryption.CryptionService { + once.Do(func() { + indicators := &actuators.ComponentsIndicator{ReadinessIndicator: readinessIndicator, LivenessIndicator: livenessIndicator} + actuators.SetComponentsIndicator(componentName, indicators) + }) return &cy{} } @@ -54,7 +76,12 @@ func (k *cy) Init(ctx context.Context, conf *cryption.Config) error { client, err := kms20160120.NewClient(config) if err != nil { + readinessIndicator.ReportError(err.Error()) + livenessIndicator.ReportError(err.Error()) return err + } else { + readinessIndicator.SetStarted() + livenessIndicator.SetStarted() } k.client = client k.keyID = conf.Metadata[cryption.KeyID] diff --git a/components/cryption/aws/kms.go b/components/cryption/aws/kms.go index fa2a90c599..2fdaca8f29 100644 --- a/components/cryption/aws/kms.go +++ b/components/cryption/aws/kms.go @@ -18,6 +18,9 @@ package aws import ( "context" "fmt" + "sync" + + "mosn.io/layotto/components/pkg/actuators" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/credentials" @@ -28,12 +31,31 @@ import ( "mosn.io/layotto/components/cryption" ) +const ( + componentName = "kms-aws" +) + +var ( + once sync.Once + readinessIndicator *actuators.HealthIndicator + livenessIndicator *actuators.HealthIndicator +) + +func init() { + readinessIndicator = actuators.NewHealthIndicator() + livenessIndicator = actuators.NewHealthIndicator() +} + type cy struct { client *kms.KMS keyID string } func NewCryption() cryption.CryptionService { + once.Do(func() { + indicators := &actuators.ComponentsIndicator{ReadinessIndicator: readinessIndicator, LivenessIndicator: livenessIndicator} + actuators.SetComponentsIndicator(componentName, indicators) + }) return &cy{} } @@ -49,6 +71,13 @@ func (k *cy) Init(ctx context.Context, conf *cryption.Config) error { Credentials: staticCredentials, } client := kms.New(session.New(), awsConf) + if client == nil { + readinessIndicator.ReportError("fail to create aws kms client") + livenessIndicator.ReportError("fail to create aws kms client") + } else { + readinessIndicator.SetStarted() + livenessIndicator.SetStarted() + } k.client = client k.keyID = keyID return nil diff --git a/components/delay_queue/azure/servicebus/servicebus.go b/components/delay_queue/azure/servicebus/servicebus.go index e3cca942e2..37582e8bda 100644 --- a/components/delay_queue/azure/servicebus/servicebus.go +++ b/components/delay_queue/azure/servicebus/servicebus.go @@ -10,6 +10,7 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. + package servicebus import ( diff --git a/components/file/aliyun/file.go b/components/file/aliyun/file.go index c31151844f..1016c8dc47 100644 --- a/components/file/aliyun/file.go +++ b/components/file/aliyun/file.go @@ -22,6 +22,9 @@ import ( "fmt" "io" "strconv" + "sync" + + "mosn.io/layotto/components/pkg/actuators" "github.com/aliyun/aliyun-oss-go-sdk/oss" @@ -32,14 +35,30 @@ import ( const ( storageTypeKey = "storageType" + componentName = "file-aliyun" +) + +var ( + once sync.Once + readinessIndicator *actuators.HealthIndicator + livenessIndicator *actuators.HealthIndicator ) +func init() { + readinessIndicator = actuators.NewHealthIndicator() + livenessIndicator = actuators.NewHealthIndicator() +} + // AliyunFile is a binding for an AliCloud OSS storage bucketKey type AliyunFile struct { client *oss.Client } func NewAliyunFile() file.File { + once.Do(func() { + indicators := &actuators.ComponentsIndicator{ReadinessIndicator: readinessIndicator, LivenessIndicator: livenessIndicator} + actuators.SetComponentsIndicator(componentName, indicators) + }) oss := &AliyunFile{} return oss } @@ -49,6 +68,8 @@ func (s *AliyunFile) Init(ctx context.Context, metadata *file.FileConfig) error m := make([]*utils.OssMetadata, 0) err := json.Unmarshal(metadata.Metadata, &m) if err != nil { + readinessIndicator.ReportError(err.Error()) + livenessIndicator.ReportError(err.Error()) return file.ErrInvalid } @@ -58,10 +79,14 @@ func (s *AliyunFile) Init(ctx context.Context, metadata *file.FileConfig) error } client, err := oss.New(v.Endpoint, v.AccessKeyID, v.AccessKeySecret) if err != nil { + readinessIndicator.ReportError(err.Error()) + livenessIndicator.ReportError(err.Error()) return err } s.client = client } + readinessIndicator.SetStarted() + livenessIndicator.SetStarted() return nil } diff --git a/components/file/aws/file.go b/components/file/aws/file.go index 474cefc0c0..09c5d98cf8 100644 --- a/components/file/aws/file.go +++ b/components/file/aws/file.go @@ -23,7 +23,9 @@ import ( "fmt" "io" "strings" + "sync" + "mosn.io/layotto/components/pkg/actuators" "mosn.io/layotto/components/pkg/utils" "github.com/aws/aws-sdk-go-v2/aws" @@ -38,14 +40,30 @@ import ( const ( defaultCredentialsSource = "provider" + componentName = "file-aws" ) +var ( + once sync.Once + readinessIndicator *actuators.HealthIndicator + livenessIndicator *actuators.HealthIndicator +) + +func init() { + readinessIndicator = actuators.NewHealthIndicator() + livenessIndicator = actuators.NewHealthIndicator() +} + // AwsOss is a binding for aws oss storage. type AwsOss struct { client *s3.Client } func NewAwsFile() file.File { + once.Do(func() { + indicators := &actuators.ComponentsIndicator{ReadinessIndicator: readinessIndicator, LivenessIndicator: livenessIndicator} + actuators.SetComponentsIndicator(componentName, indicators) + }) return &AwsOss{} } @@ -54,6 +72,8 @@ func (a *AwsOss) Init(ctx context.Context, config *file.FileConfig) error { m := make([]*utils.OssMetadata, 0) err := json.Unmarshal(config.Metadata, &m) if err != nil { + readinessIndicator.ReportError(err.Error()) + livenessIndicator.ReportError(err.Error()) return errors.New("invalid config for aws oss") } for _, data := range m { @@ -64,6 +84,10 @@ func (a *AwsOss) Init(ctx context.Context, config *file.FileConfig) error { if err != nil { continue } + if client != nil { + readinessIndicator.SetStarted() + livenessIndicator.SetStarted() + } a.client = client } return nil diff --git a/components/file/hdfs/hdfs.go b/components/file/hdfs/hdfs.go index 1620b407b1..c19372ad8e 100644 --- a/components/file/hdfs/hdfs.go +++ b/components/file/hdfs/hdfs.go @@ -25,8 +25,10 @@ import ( "io" "io/ioutil" "strconv" + "sync" "mosn.io/layotto/components/file" + "mosn.io/layotto/components/pkg/actuators" store "go.beyondstorage.io/services/hdfs" "go.beyondstorage.io/v5/pairs" @@ -34,8 +36,9 @@ import ( ) const ( - endpointKey = "endpoint" - fileSize = "filesize" + endpointKey = "endpoint" + fileSize = "filesize" + componentName = "file-hdfs" ) var ( @@ -45,8 +48,16 @@ var ( ErrNotSpecifyEndpoint error = errors.New("other error happend in metadata") ErrHdfsListFail error = errors.New("hdfs list opt failed") ErrInitFailed error = errors.New("hdfs client init failed") + once sync.Once + readinessIndicator *actuators.HealthIndicator + livenessIndicator *actuators.HealthIndicator ) +func init() { + readinessIndicator = actuators.NewHealthIndicator() + livenessIndicator = actuators.NewHealthIndicator() +} + type hdfs struct { client map[string]types.Storager meta map[string]*HdfsMetaData @@ -57,6 +68,10 @@ type HdfsMetaData struct { } func NewHdfs() file.File { + once.Do(func() { + indicators := &actuators.ComponentsIndicator{ReadinessIndicator: readinessIndicator, LivenessIndicator: livenessIndicator} + actuators.SetComponentsIndicator(componentName, indicators) + }) return &hdfs{ client: make(map[string]types.Storager), meta: make(map[string]*HdfsMetaData), @@ -77,6 +92,8 @@ func (h *hdfs) Init(ctx context.Context, config *file.FileConfig) error { client, err := h.createHdfsClient(data) if err != nil { + readinessIndicator.ReportError(err.Error()) + livenessIndicator.ReportError(err.Error()) return ErrInitFailed } @@ -84,6 +101,9 @@ func (h *hdfs) Init(ctx context.Context, config *file.FileConfig) error { h.meta[data.EndPoint] = data } + readinessIndicator.SetStarted() + livenessIndicator.SetStarted() + return nil } diff --git a/components/file/local/file.go b/components/file/local/file.go index 5cdd513631..0900f67f89 100644 --- a/components/file/local/file.go +++ b/components/file/local/file.go @@ -24,24 +24,44 @@ import ( "os" "strconv" "strings" + "sync" "mosn.io/layotto/components/file" + "mosn.io/layotto/components/pkg/actuators" ) const ( - FileMode = "FileMode" - FileFlag = "FileFlag" - FileIsDir = "IsDir" + FileMode = "FileMode" + FileFlag = "FileFlag" + FileIsDir = "IsDir" + componentName = "file-local" ) +var ( + once sync.Once + readinessIndicator *actuators.HealthIndicator + livenessIndicator *actuators.HealthIndicator +) + +func init() { + readinessIndicator = actuators.NewHealthIndicator() + livenessIndicator = actuators.NewHealthIndicator() +} + type LocalStore struct { } func NewLocalStore() file.File { + once.Do(func() { + indicators := &actuators.ComponentsIndicator{ReadinessIndicator: readinessIndicator, LivenessIndicator: livenessIndicator} + actuators.SetComponentsIndicator(componentName, indicators) + }) return &LocalStore{} } func (lf *LocalStore) Init(ctx context.Context, f *file.FileConfig) error { + readinessIndicator.SetStarted() + livenessIndicator.SetStarted() return nil } func (lf *LocalStore) Put(ctx context.Context, f *file.PutFileStu) error { diff --git a/components/file/minio/oss.go b/components/file/minio/oss.go index 5cca66763b..0923943a61 100644 --- a/components/file/minio/oss.go +++ b/components/file/minio/oss.go @@ -23,8 +23,10 @@ import ( "fmt" "io" "strconv" + "sync" "mosn.io/layotto/components/file/util" + "mosn.io/layotto/components/pkg/actuators" "github.com/minio/minio-go/v7/pkg/credentials" @@ -34,8 +36,9 @@ import ( ) const ( - endpointKey = "endpoint" - fileSize = "fileSize" + endpointKey = "endpoint" + fileSize = "fileSize" + componentName = "file-monio" ) var ( @@ -43,8 +46,16 @@ var ( ErrClientNotExist error = errors.New("specific client not exist") ErrInvalidConfig error = errors.New("invalid minio oss config") ErrNotSpecifyEndPoint error = errors.New("not specify endpoint in metadata") + once sync.Once + readinessIndicator *actuators.HealthIndicator + livenessIndicator *actuators.HealthIndicator ) +func init() { + readinessIndicator = actuators.NewHealthIndicator() + livenessIndicator = actuators.NewHealthIndicator() +} + type MinioOss struct { client map[string]*minio.Core meta map[string]*MinioMetaData @@ -59,6 +70,10 @@ type MinioMetaData struct { } func NewMinioOss() file.File { + once.Do(func() { + indicators := &actuators.ComponentsIndicator{ReadinessIndicator: readinessIndicator, LivenessIndicator: livenessIndicator} + actuators.SetComponentsIndicator(componentName, indicators) + }) return &MinioOss{ client: make(map[string]*minio.Core), meta: make(map[string]*MinioMetaData), @@ -69,6 +84,8 @@ func (m *MinioOss) Init(ctx context.Context, config *file.FileConfig) error { md := make([]*MinioMetaData, 0) err := json.Unmarshal(config.Metadata, &md) if err != nil { + readinessIndicator.ReportError(err.Error()) + livenessIndicator.ReportError(err.Error()) return ErrInvalidConfig } for _, data := range md { @@ -79,6 +96,10 @@ func (m *MinioOss) Init(ctx context.Context, config *file.FileConfig) error { if err != nil { continue } + if client != nil { + readinessIndicator.SetStarted() + livenessIndicator.SetStarted() + } m.client[data.EndPoint] = client m.meta[data.EndPoint] = data } diff --git a/components/file/qiniu/quniu_oss.go b/components/file/qiniu/quniu_oss.go index 1c9a853859..72484011d7 100644 --- a/components/file/qiniu/quniu_oss.go +++ b/components/file/qiniu/quniu_oss.go @@ -22,20 +22,31 @@ import ( "errors" "io" "strconv" + "sync" "mosn.io/layotto/components/file" + "mosn.io/layotto/components/pkg/actuators" ) const ( - endpointKey = "endpoint" - fileSizeKey = "filesize" + endpointKey = "endpoint" + fileSizeKey = "filesize" + componentName = "file-qiniu" ) var ( ErrClientNotExist = errors.New("specific client not exist") ErrNotSpecifyEndPoint = errors.New("not specify endpoint in metadata") + once sync.Once + readinessIndicator *actuators.HealthIndicator + livenessIndicator *actuators.HealthIndicator ) +func init() { + readinessIndicator = actuators.NewHealthIndicator() + livenessIndicator = actuators.NewHealthIndicator() +} + type QiniuOSS struct { metadata map[string]*OssMetadata client map[string]*QiniuOSSClient @@ -52,6 +63,10 @@ type OssMetadata struct { } func NewQiniuOSS() file.File { + once.Do(func() { + indicators := &actuators.ComponentsIndicator{ReadinessIndicator: readinessIndicator, LivenessIndicator: livenessIndicator} + actuators.SetComponentsIndicator(componentName, indicators) + }) return &QiniuOSS{ metadata: make(map[string]*OssMetadata), client: make(map[string]*QiniuOSSClient), @@ -62,6 +77,8 @@ func (q *QiniuOSS) Init(ctx context.Context, metadata *file.FileConfig) error { m := make([]*OssMetadata, 0) err := json.Unmarshal(metadata.Metadata, &m) if err != nil { + readinessIndicator.ReportError(err.Error()) + livenessIndicator.ReportError(err.Error()) return file.ErrInvalid } @@ -85,6 +102,15 @@ func (q *QiniuOSS) Init(ctx context.Context, metadata *file.FileConfig) error { v.UseHTTPS, v.UseCdnDomains, ) + + if client != nil { + readinessIndicator.SetStarted() + livenessIndicator.SetStarted() + } else { + readinessIndicator.ReportError("failed to create qiniu oss client") + livenessIndicator.ReportError("failed to create qiniu oss client") + } + q.metadata[v.Endpoint] = v q.client[v.Endpoint] = client } diff --git a/components/file/tencentcloud/oss.go b/components/file/tencentcloud/oss.go index b833fda35a..bb891aa5ae 100644 --- a/components/file/tencentcloud/oss.go +++ b/components/file/tencentcloud/oss.go @@ -25,25 +25,36 @@ import ( "net/url" "strconv" "strings" + "sync" "time" "github.com/pkg/errors" "github.com/tencentyun/cos-go-sdk-v5" "mosn.io/layotto/components/file" + "mosn.io/layotto/components/pkg/actuators" ) const ( endpointKey = "endpoint" aclKey = "ACL" contentTypeKey = "content-type" + componentName = "file-tencentcloud" ) var ( ErrClientNotExist = errors.New("specific client not exist") ErrNotSpecifyEndPoint = errors.New("not specify endpoint in metadata") + once sync.Once + readinessIndicator *actuators.HealthIndicator + livenessIndicator *actuators.HealthIndicator ) +func init() { + readinessIndicator = actuators.NewHealthIndicator() + livenessIndicator = actuators.NewHealthIndicator() +} + type TencentCloudOSS struct { metadata map[string]*OssMetadata client map[string]*cos.Client @@ -58,6 +69,10 @@ type OssMetadata struct { } func NewTencentCloudOSS() file.File { + once.Do(func() { + indicators := &actuators.ComponentsIndicator{ReadinessIndicator: readinessIndicator, LivenessIndicator: livenessIndicator} + actuators.SetComponentsIndicator(componentName, indicators) + }) oss := &TencentCloudOSS{metadata: make(map[string]*OssMetadata), client: make(map[string]*cos.Client)} return oss } @@ -67,6 +82,8 @@ func (t *TencentCloudOSS) Init(ctx context.Context, metadata *file.FileConfig) e m := make([]*OssMetadata, 0) err := json.Unmarshal(metadata.Metadata, &m) if err != nil { + readinessIndicator.ReportError(err.Error()) + livenessIndicator.ReportError(err.Error()) return file.ErrInvalid } @@ -76,11 +93,15 @@ func (t *TencentCloudOSS) Init(ctx context.Context, metadata *file.FileConfig) e } client, err := t.getClient(v) if err != nil { + readinessIndicator.ReportError(err.Error()) + livenessIndicator.ReportError(err.Error()) return err } t.metadata[v.Endpoint] = v t.client[v.Endpoint] = client } + readinessIndicator.SetStarted() + livenessIndicator.SetStarted() return nil } diff --git a/components/lock/consul/consul_lock.go b/components/lock/consul/consul_lock.go index 5547978c6d..bab45a4799 100644 --- a/components/lock/consul/consul_lock.go +++ b/components/lock/consul/consul_lock.go @@ -10,6 +10,7 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. + package consul import ( @@ -23,9 +24,25 @@ import ( "mosn.io/pkg/log" "mosn.io/layotto/components/lock" + "mosn.io/layotto/components/pkg/actuators" "mosn.io/layotto/components/pkg/utils" ) +const ( + componentName = "lock-consul" +) + +var ( + once sync.Once + readinessIndicator *actuators.HealthIndicator + livenessIndicator *actuators.HealthIndicator +) + +func init() { + readinessIndicator = actuators.NewHealthIndicator() + livenessIndicator = actuators.NewHealthIndicator() +} + type ConsulLock struct { metadata utils.ConsulMetadata logger log.ErrorLogger @@ -37,6 +54,10 @@ type ConsulLock struct { } func NewConsulLock(logger log.ErrorLogger) *ConsulLock { + once.Do(func() { + indicators := &actuators.ComponentsIndicator{ReadinessIndicator: readinessIndicator, LivenessIndicator: livenessIndicator} + actuators.SetComponentsIndicator(componentName, indicators) + }) consulLock := &ConsulLock{logger: logger} return consulLock } @@ -44,6 +65,8 @@ func NewConsulLock(logger log.ErrorLogger) *ConsulLock { func (c *ConsulLock) Init(metadata lock.Metadata) error { consulMetadata, err := utils.ParseConsulMetadata(metadata) if err != nil { + readinessIndicator.ReportError(err.Error()) + livenessIndicator.ReportError(err.Error()) return err } c.metadata = consulMetadata @@ -52,12 +75,16 @@ func (c *ConsulLock) Init(metadata lock.Metadata) error { Scheme: consulMetadata.Scheme, }) if err != nil { + readinessIndicator.ReportError(err.Error()) + livenessIndicator.ReportError(err.Error()) return err } c.client = client c.sessionFactory = client.Session() c.kv = client.KV() c.workPool = msync.NewWorkerPool(runtime.NumCPU()) + readinessIndicator.SetStarted() + livenessIndicator.SetStarted() return nil } func (c *ConsulLock) Features() []lock.Feature { diff --git a/components/lock/etcd/etcd_lock.go b/components/lock/etcd/etcd_lock.go index 663c574909..b888d89a38 100644 --- a/components/lock/etcd/etcd_lock.go +++ b/components/lock/etcd/etcd_lock.go @@ -10,11 +10,13 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. + package etcd import ( "context" "fmt" + "sync" clientv3 "go.etcd.io/etcd/client/v3" @@ -23,8 +25,24 @@ import ( "mosn.io/pkg/log" "mosn.io/layotto/components/lock" + "mosn.io/layotto/components/pkg/actuators" +) + +const ( + componentName = "lock-etcd" ) +var ( + once sync.Once + readinessIndicator *actuators.HealthIndicator + livenessIndicator *actuators.HealthIndicator +) + +func init() { + readinessIndicator = actuators.NewHealthIndicator() + livenessIndicator = actuators.NewHealthIndicator() +} + // Etcd lock store type EtcdLock struct { client *clientv3.Client @@ -39,6 +57,10 @@ type EtcdLock struct { // NewEtcdLock returns a new etcd lock func NewEtcdLock(logger log.ErrorLogger) *EtcdLock { + once.Do(func() { + indicators := &actuators.ComponentsIndicator{ReadinessIndicator: readinessIndicator, LivenessIndicator: livenessIndicator} + actuators.SetComponentsIndicator(componentName, indicators) + }) s := &EtcdLock{ features: make([]lock.Feature, 0), logger: logger, @@ -52,15 +74,21 @@ func (e *EtcdLock) Init(metadata lock.Metadata) error { // 1. parse config m, err := utils.ParseEtcdMetadata(metadata.Properties) if err != nil { + readinessIndicator.ReportError(err.Error()) + livenessIndicator.ReportError(err.Error()) return err } e.metadata = m // 2. construct client if e.client, err = utils.NewEtcdClient(m); err != nil { + readinessIndicator.ReportError(err.Error()) + livenessIndicator.ReportError(err.Error()) return err } e.ctx, e.cancel = context.WithCancel(context.Background()) + readinessIndicator.SetStarted() + livenessIndicator.SetStarted() return err } diff --git a/components/lock/in-memory/in_memory_lock.go b/components/lock/in-memory/in_memory_lock.go index 1213f07cb9..bb009e100e 100644 --- a/components/lock/in-memory/in_memory_lock.go +++ b/components/lock/in-memory/in_memory_lock.go @@ -22,8 +22,24 @@ import ( "time" "mosn.io/layotto/components/lock" + "mosn.io/layotto/components/pkg/actuators" ) +const ( + componentName = "lock-memory" +) + +var ( + once sync.Once + readinessIndicator *actuators.HealthIndicator + livenessIndicator *actuators.HealthIndicator +) + +func init() { + readinessIndicator = actuators.NewHealthIndicator() + livenessIndicator = actuators.NewHealthIndicator() +} + type InMemoryLock struct { features []lock.Feature data *lockMap @@ -43,6 +59,10 @@ type lockMap struct { } func NewInMemoryLock() *InMemoryLock { + once.Do(func() { + indicators := &actuators.ComponentsIndicator{ReadinessIndicator: readinessIndicator, LivenessIndicator: livenessIndicator} + actuators.SetComponentsIndicator(componentName, indicators) + }) return &InMemoryLock{ features: make([]lock.Feature, 0), data: &lockMap{ @@ -52,6 +72,8 @@ func NewInMemoryLock() *InMemoryLock { } func (s *InMemoryLock) Init(_ lock.Metadata) error { + readinessIndicator.SetStarted() + livenessIndicator.SetStarted() return nil } diff --git a/components/lock/mongo/mongo_lock.go b/components/lock/mongo/mongo_lock.go index 52f3421149..58818db8e4 100644 --- a/components/lock/mongo/mongo_lock.go +++ b/components/lock/mongo/mongo_lock.go @@ -10,11 +10,13 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. + package mongo import ( "context" "fmt" + "sync" "time" "go.mongodb.org/mongo-driver/bson" @@ -26,6 +28,7 @@ import ( "mosn.io/pkg/log" "mosn.io/layotto/components/lock" + "mosn.io/layotto/components/pkg/actuators" "mosn.io/layotto/components/pkg/utils" ) @@ -36,8 +39,20 @@ const ( UNLOCK_UNEXIST = 4 UNLOCK_BELONG_TO_OTHERS = 5 UNLOCK_FAIL = 6 + componentName = "lock-mongo" +) + +var ( + once sync.Once + readinessIndicator *actuators.HealthIndicator + livenessIndicator *actuators.HealthIndicator ) +func init() { + readinessIndicator = actuators.NewHealthIndicator() + livenessIndicator = actuators.NewHealthIndicator() +} + // mongo lock store type MongoLock struct { factory utils.MongoFactory @@ -56,6 +71,10 @@ type MongoLock struct { // NewMongoLock returns a new mongo lock func NewMongoLock(logger log.ErrorLogger) *MongoLock { + once.Do(func() { + indicators := &actuators.ComponentsIndicator{ReadinessIndicator: readinessIndicator, LivenessIndicator: livenessIndicator} + actuators.SetComponentsIndicator(componentName, indicators) + }) s := &MongoLock{ features: make([]lock.Feature, 0), logger: logger, @@ -68,6 +87,8 @@ func (e *MongoLock) Init(metadata lock.Metadata) error { // 1.parse config m, err := utils.ParseMongoMetadata(metadata.Properties) if err != nil { + readinessIndicator.ReportError(err.Error()) + livenessIndicator.ReportError(err.Error()) return err } e.metadata = m @@ -76,17 +97,23 @@ func (e *MongoLock) Init(metadata lock.Metadata) error { // 2. construct client if client, err = e.factory.NewMongoClient(m); err != nil { + readinessIndicator.ReportError(err.Error()) + livenessIndicator.ReportError(err.Error()) return err } e.ctx, e.cancel = context.WithCancel(context.Background()) if err := client.Ping(e.ctx, nil); err != nil { + readinessIndicator.ReportError(err.Error()) + livenessIndicator.ReportError(err.Error()) return err } // Connections Collection e.collection, err = utils.SetCollection(client, e.factory, e.metadata) if err != nil { + readinessIndicator.ReportError(err.Error()) + livenessIndicator.ReportError(err.Error()) return err } @@ -99,6 +126,8 @@ func (e *MongoLock) Init(metadata lock.Metadata) error { e.client = client + readinessIndicator.SetStarted() + livenessIndicator.SetStarted() return err } diff --git a/components/lock/redis/cluster_redis_lock.go b/components/lock/redis/cluster_redis_lock.go index 2f38b6dd7f..c2faba95b7 100644 --- a/components/lock/redis/cluster_redis_lock.go +++ b/components/lock/redis/cluster_redis_lock.go @@ -10,6 +10,7 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. + package redis import ( @@ -24,9 +25,25 @@ import ( "mosn.io/pkg/log" "mosn.io/layotto/components/lock" + "mosn.io/layotto/components/pkg/actuators" "mosn.io/layotto/components/pkg/utils" ) +const ( + componentName = "lock-redis-cluster" +) + +var ( + once sync.Once + readinessIndicator *actuators.HealthIndicator + livenessIndicator *actuators.HealthIndicator +) + +func init() { + readinessIndicator = actuators.NewHealthIndicator() + livenessIndicator = actuators.NewHealthIndicator() +} + // RedLock // it will be best to use at least 5 hosts type ClusterRedisLock struct { @@ -43,6 +60,10 @@ type ClusterRedisLock struct { // NewClusterRedisLock returns a new redis lock store func NewClusterRedisLock(logger log.ErrorLogger) *ClusterRedisLock { + once.Do(func() { + indicators := &actuators.ComponentsIndicator{ReadinessIndicator: readinessIndicator, LivenessIndicator: livenessIndicator} + actuators.SetComponentsIndicator(componentName, indicators) + }) s := &ClusterRedisLock{ features: make([]lock.Feature, 0), logger: logger, @@ -62,6 +83,8 @@ func (c *ClusterRedisLock) Init(metadata lock.Metadata) error { m, err := utils.ParseRedisClusterMetadata(metadata.Properties) if err != nil { + readinessIndicator.ReportError(err.Error()) + livenessIndicator.ReportError(err.Error()) return err } c.metadata = m @@ -70,9 +93,13 @@ func (c *ClusterRedisLock) Init(metadata lock.Metadata) error { c.workpool = msync.NewWorkerPool(m.Concurrency) for i, client := range c.clients { if _, err = client.Ping(c.ctx).Result(); err != nil { + readinessIndicator.ReportError(err.Error()) + livenessIndicator.ReportError(err.Error()) return fmt.Errorf("[ClusterRedisLock]: error connecting to redis at %s: %s", c.metadata.Hosts[i], err) } } + readinessIndicator.SetStarted() + livenessIndicator.SetStarted() return err } diff --git a/components/lock/redis/standalone_redis_lock.go b/components/lock/redis/standalone_redis_lock.go index e0977a73f5..eb7e72ba1d 100644 --- a/components/lock/redis/standalone_redis_lock.go +++ b/components/lock/redis/standalone_redis_lock.go @@ -10,6 +10,7 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. + package redis import ( @@ -21,9 +22,15 @@ import ( "mosn.io/pkg/log" "mosn.io/layotto/components/lock" + "mosn.io/layotto/components/pkg/actuators" "mosn.io/layotto/components/pkg/utils" ) +func init() { + readinessIndicator = actuators.NewHealthIndicator() + livenessIndicator = actuators.NewHealthIndicator() +} + // Standalone Redis lock store.Any fail-over related features are not supported,such as Sentinel and Redis Cluster. type StandaloneRedisLock struct { client *redis.Client @@ -38,6 +45,10 @@ type StandaloneRedisLock struct { // NewStandaloneRedisLock returns a new redis lock store func NewStandaloneRedisLock(logger log.ErrorLogger) *StandaloneRedisLock { + once.Do(func() { + indicators := &actuators.ComponentsIndicator{ReadinessIndicator: readinessIndicator, LivenessIndicator: livenessIndicator} + actuators.SetComponentsIndicator("lock-redis-standalone", indicators) + }) s := &StandaloneRedisLock{ features: make([]lock.Feature, 0), logger: logger, @@ -51,6 +62,8 @@ func (p *StandaloneRedisLock) Init(metadata lock.Metadata) error { // 1. parse config m, err := utils.ParseRedisMetadata(metadata.Properties) if err != nil { + readinessIndicator.ReportError(err.Error()) + livenessIndicator.ReportError(err.Error()) return err } p.metadata = m @@ -59,8 +72,12 @@ func (p *StandaloneRedisLock) Init(metadata lock.Metadata) error { p.ctx, p.cancel = context.WithCancel(context.Background()) // 3. connect to redis if _, err = p.client.Ping(p.ctx).Result(); err != nil { + readinessIndicator.ReportError(err.Error()) + livenessIndicator.ReportError(err.Error()) return fmt.Errorf("[standaloneRedisLock]: error connecting to redis at %s: %s", m.Host, err) } + readinessIndicator.SetStarted() + livenessIndicator.SetStarted() return err } diff --git a/components/lock/zookeeper/zookeeper_lock.go b/components/lock/zookeeper/zookeeper_lock.go index e456e4988d..e0b6326c0e 100644 --- a/components/lock/zookeeper/zookeeper_lock.go +++ b/components/lock/zookeeper/zookeeper_lock.go @@ -16,6 +16,7 @@ package zookeeper import ( "context" + "sync" "time" "github.com/go-zookeeper/zk" @@ -23,15 +24,30 @@ import ( util "mosn.io/pkg/utils" "mosn.io/layotto/components/lock" + "mosn.io/layotto/components/pkg/actuators" "mosn.io/layotto/components/pkg/utils" ) -var closeConn = func(conn utils.ZKConnection, expireInSecond int32) { - //can also - //time.Sleep(time.Second * time.Duration(expireInSecond)) - <-time.After(time.Second * time.Duration(expireInSecond)) - // make sure close connecion - conn.Close() +var ( + closeConn = func(conn utils.ZKConnection, expireInSecond int32) { + //can also + //time.Sleep(time.Second * time.Duration(expireInSecond)) + <-time.After(time.Second * time.Duration(expireInSecond)) + // make sure close connecion + conn.Close() + } + once sync.Once + readinessIndicator *actuators.HealthIndicator + livenessIndicator *actuators.HealthIndicator +) + +const ( + componentName = "lock-zookeeper" +) + +func init() { + readinessIndicator = actuators.NewHealthIndicator() + livenessIndicator = actuators.NewHealthIndicator() } // ZookeeperLock lock store @@ -46,6 +62,10 @@ type ZookeeperLock struct { // NewZookeeperLock Create ZookeeperLock func NewZookeeperLock(logger log.ErrorLogger) *ZookeeperLock { + once.Do(func() { + indicators := &actuators.ComponentsIndicator{ReadinessIndicator: readinessIndicator, LivenessIndicator: livenessIndicator} + actuators.SetComponentsIndicator(componentName, indicators) + }) lock := &ZookeeperLock{ logger: logger, } @@ -57,6 +77,8 @@ func (p *ZookeeperLock) Init(metadata lock.Metadata) error { m, err := utils.ParseZookeeperMetadata(metadata.Properties) if err != nil { + readinessIndicator.ReportError(err.Error()) + livenessIndicator.ReportError(err.Error()) return err } @@ -66,9 +88,13 @@ func (p *ZookeeperLock) Init(metadata lock.Metadata) error { //init unlock connection zkConn, err := p.factory.NewConnection(p.metadata.SessionTimeout, p.metadata) if err != nil { + readinessIndicator.ReportError(err.Error()) + livenessIndicator.ReportError(err.Error()) return err } p.unlockConn = zkConn + readinessIndicator.SetStarted() + livenessIndicator.SetStarted() return nil } diff --git a/components/oss/aliyun/oss.go b/components/oss/aliyun/oss.go index 1f31ab8a45..2687256d16 100644 --- a/components/oss/aliyun/oss.go +++ b/components/oss/aliyun/oss.go @@ -21,7 +21,9 @@ import ( "encoding/json" "net/http" "strconv" + "sync" + "mosn.io/layotto/components/pkg/actuators" "mosn.io/layotto/components/pkg/utils" l8oss "mosn.io/layotto/components/oss" @@ -32,14 +34,30 @@ import ( const ( connectTimeoutSec = "connectTimeoutSec" readWriteTimeoutSec = "readWriteTimeout" + componentName = "oss-aliyun" ) +var ( + once sync.Once + readinessIndicator *actuators.HealthIndicator + livenessIndicator *actuators.HealthIndicator +) + +func init() { + readinessIndicator = actuators.NewHealthIndicator() + livenessIndicator = actuators.NewHealthIndicator() +} + type AliyunOSS struct { client *oss.Client basicConf json.RawMessage } func NewAliyunOss() l8oss.Oss { + once.Do(func() { + indicators := &actuators.ComponentsIndicator{ReadinessIndicator: readinessIndicator, LivenessIndicator: livenessIndicator} + actuators.SetComponentsIndicator(componentName, indicators) + }) return &AliyunOSS{} } @@ -48,6 +66,8 @@ func (a *AliyunOSS) Init(ctx context.Context, config *l8oss.Config) error { a.basicConf = config.Metadata[l8oss.BasicConfiguration] m := utils.OssMetadata{} if err := json.Unmarshal(a.basicConf, &m); err != nil { + readinessIndicator.ReportError(err.Error()) + livenessIndicator.ReportError(err.Error()) return l8oss.ErrInvalid } if t, ok := config.Metadata[connectTimeoutSec]; ok { @@ -63,9 +83,13 @@ func (a *AliyunOSS) Init(ctx context.Context, config *l8oss.Config) error { client, err := oss.New(m.Endpoint, m.AccessKeyID, m.AccessKeySecret, oss.Timeout(int64(connectTimeout), int64(readWriteTimeout))) if err != nil { + readinessIndicator.ReportError(err.Error()) + livenessIndicator.ReportError(err.Error()) return err } a.client = client + readinessIndicator.SetStarted() + livenessIndicator.SetStarted() return nil } diff --git a/components/oss/aws/oss.go b/components/oss/aws/oss.go index 29fc1dd6d8..a719dec21e 100644 --- a/components/oss/aws/oss.go +++ b/components/oss/aws/oss.go @@ -23,12 +23,14 @@ import ( "fmt" "net/url" "strings" + "sync" "time" "github.com/aws/aws-sdk-go-v2/aws" aws_config "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/credentials" + "mosn.io/layotto/components/pkg/actuators" "mosn.io/layotto/components/pkg/utils" "mosn.io/layotto/components/oss" @@ -41,12 +43,31 @@ import ( "mosn.io/pkg/log" ) +const ( + componentName = "oss-aws" +) + +var ( + once sync.Once + readinessIndicator *actuators.HealthIndicator + livenessIndicator *actuators.HealthIndicator +) + +func init() { + readinessIndicator = actuators.NewHealthIndicator() + livenessIndicator = actuators.NewHealthIndicator() +} + type AwsOss struct { client *s3.Client basicConf json.RawMessage } func NewAwsOss() oss.Oss { + once.Do(func() { + indicators := &actuators.ComponentsIndicator{ReadinessIndicator: readinessIndicator, LivenessIndicator: livenessIndicator} + actuators.SetComponentsIndicator(componentName, indicators) + }) return &AwsOss{} } @@ -55,6 +76,8 @@ func (a *AwsOss) Init(ctx context.Context, config *oss.Config) error { m := &utils.OssMetadata{} err := json.Unmarshal(a.basicConf, &m) if err != nil { + readinessIndicator.ReportError(err.Error()) + livenessIndicator.ReportError(err.Error()) return oss.ErrInvalid } optFunc := []func(options *aws_config.LoadOptions) error{ @@ -68,10 +91,14 @@ func (a *AwsOss) Init(ctx context.Context, config *oss.Config) error { } cfg, err := aws_config.LoadDefaultConfig(context.TODO(), optFunc...) if err != nil { + readinessIndicator.ReportError(err.Error()) + livenessIndicator.ReportError(err.Error()) return err } client := s3.NewFromConfig(cfg) a.client = client + readinessIndicator.SetStarted() + livenessIndicator.SetStarted() return nil } diff --git a/components/oss/ceph/oss.go b/components/oss/ceph/oss.go index 561d6c320d..20fe565201 100644 --- a/components/oss/ceph/oss.go +++ b/components/oss/ceph/oss.go @@ -22,6 +22,7 @@ import ( "errors" "fmt" "strings" + "sync" "time" "github.com/aws/aws-sdk-go-v2/aws" @@ -35,15 +36,35 @@ import ( "mosn.io/pkg/log" "mosn.io/layotto/components/oss" + "mosn.io/layotto/components/pkg/actuators" "mosn.io/layotto/components/pkg/utils" ) +const ( + componentName = "oss-ceph" +) + +var ( + once sync.Once + readinessIndicator *actuators.HealthIndicator + livenessIndicator *actuators.HealthIndicator +) + +func init() { + readinessIndicator = actuators.NewHealthIndicator() + livenessIndicator = actuators.NewHealthIndicator() +} + type CephOSS struct { client *s3.Client basicConf json.RawMessage } func NewCephOss() oss.Oss { + once.Do(func() { + indicators := &actuators.ComponentsIndicator{ReadinessIndicator: readinessIndicator, LivenessIndicator: livenessIndicator} + actuators.SetComponentsIndicator(componentName, indicators) + }) return &CephOSS{} } @@ -52,6 +73,8 @@ func (c *CephOSS) Init(ctx context.Context, config *oss.Config) error { m := &utils.OssMetadata{} err := json.Unmarshal(c.basicConf, &m) if err != nil { + readinessIndicator.ReportError(err.Error()) + livenessIndicator.ReportError(err.Error()) return oss.ErrInvalid } @@ -72,12 +95,16 @@ func (c *CephOSS) Init(ctx context.Context, config *oss.Config) error { } cfg, err := aws_config.LoadDefaultConfig(context.TODO(), optFunc...) if err != nil { + readinessIndicator.ReportError(err.Error()) + livenessIndicator.ReportError(err.Error()) return err } client := s3.NewFromConfig(cfg, func(options *s3.Options) { options.UsePathStyle = true }) c.client = client + readinessIndicator.SetStarted() + livenessIndicator.SetStarted() return nil } diff --git a/components/oss/huaweicloud/oss.go b/components/oss/huaweicloud/oss.go index fa941f392e..9ec51f85b1 100644 --- a/components/oss/huaweicloud/oss.go +++ b/components/oss/huaweicloud/oss.go @@ -13,21 +13,38 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package huaweicloud import ( "context" "encoding/json" "strconv" + "sync" "github.com/huaweicloud/huaweicloud-sdk-go-obs/obs" "github.com/jinzhu/copier" "mosn.io/layotto/components/oss" + "mosn.io/layotto/components/pkg/actuators" "mosn.io/layotto/components/pkg/utils" ) -const connectTimeoutSec = "connectTimeoutSec" +const ( + componentName = "oss-huaweicloud" + connectTimeoutSec = "connectTimeoutSec" +) + +var ( + once sync.Once + readinessIndicator *actuators.HealthIndicator + livenessIndicator *actuators.HealthIndicator +) + +func init() { + readinessIndicator = actuators.NewHealthIndicator() + livenessIndicator = actuators.NewHealthIndicator() +} type HuaweicloudOSS struct { client *obs.ObsClient @@ -35,6 +52,10 @@ type HuaweicloudOSS struct { } func NewHuaweicloudOSS() oss.Oss { + once.Do(func() { + indicators := &actuators.ComponentsIndicator{ReadinessIndicator: readinessIndicator, LivenessIndicator: livenessIndicator} + actuators.SetComponentsIndicator(componentName, indicators) + }) return &HuaweicloudOSS{} } @@ -43,6 +64,8 @@ func (h *HuaweicloudOSS) Init(ctx context.Context, config *oss.Config) error { jsonRawMessage := config.Metadata[oss.BasicConfiguration] err := json.Unmarshal(jsonRawMessage, &h.metadata) if err != nil { + readinessIndicator.ReportError(err.Error()) + livenessIndicator.ReportError(err.Error()) return oss.ErrInvalid } if t, ok := config.Metadata[connectTimeoutSec]; ok { @@ -53,9 +76,13 @@ func (h *HuaweicloudOSS) Init(ctx context.Context, config *oss.Config) error { client, err := obs.New(h.metadata.AccessKeyID, h.metadata.AccessKeySecret, h.metadata.Endpoint, obs.WithConnectTimeout(connectTimeout)) if err != nil { + readinessIndicator.ReportError(err.Error()) + livenessIndicator.ReportError(err.Error()) return err } h.client = client + readinessIndicator.SetStarted() + livenessIndicator.SetStarted() return nil } diff --git a/components/sequencer/etcd/store.go b/components/sequencer/etcd/store.go index 20a211873d..9ee4f191f8 100644 --- a/components/sequencer/etcd/store.go +++ b/components/sequencer/etcd/store.go @@ -10,19 +10,37 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. + package etcd import ( "context" "fmt" + "sync" clientv3 "go.etcd.io/etcd/client/v3" "mosn.io/pkg/log" + "mosn.io/layotto/components/pkg/actuators" "mosn.io/layotto/components/pkg/utils" "mosn.io/layotto/components/sequencer" ) +const ( + componentName = "sequencer-etcd" +) + +var ( + once sync.Once + readinessIndicator *actuators.HealthIndicator + livenessIndicator *actuators.HealthIndicator +) + +func init() { + readinessIndicator = actuators.NewHealthIndicator() + livenessIndicator = actuators.NewHealthIndicator() +} + type EtcdSequencer struct { client *clientv3.Client metadata utils.EtcdMetadata @@ -36,6 +54,10 @@ type EtcdSequencer struct { // EtcdSequencer returns a new etcd sequencer func NewEtcdSequencer(logger log.ErrorLogger) *EtcdSequencer { + once.Do(func() { + indicators := &actuators.ComponentsIndicator{ReadinessIndicator: readinessIndicator, LivenessIndicator: livenessIndicator} + actuators.SetComponentsIndicator(componentName, indicators) + }) s := &EtcdSequencer{ logger: logger, } @@ -54,6 +76,8 @@ func (e *EtcdSequencer) Init(config sequencer.Configuration) error { // 2. construct client if e.client, err = utils.NewEtcdClient(m); err != nil { + readinessIndicator.ReportError(err.Error()) + livenessIndicator.ReportError(err.Error()) return err } e.ctx, e.cancel = context.WithCancel(context.Background()) @@ -68,6 +92,8 @@ func (e *EtcdSequencer) Init(config sequencer.Configuration) error { actualKey := e.getKeyInEtcd(k) get, err := kv.Get(e.ctx, actualKey) if err != nil { + readinessIndicator.ReportError(err.Error()) + livenessIndicator.ReportError(err.Error()) return err } var cur int64 = 0 @@ -80,6 +106,8 @@ func (e *EtcdSequencer) Init(config sequencer.Configuration) error { } } // TODO close component? + readinessIndicator.SetStarted() + livenessIndicator.SetStarted() return nil } diff --git a/components/sequencer/mongo/mongo_sequencer.go b/components/sequencer/mongo/mongo_sequencer.go index 4932504795..5080bfa5ce 100644 --- a/components/sequencer/mongo/mongo_sequencer.go +++ b/components/sequencer/mongo/mongo_sequencer.go @@ -15,6 +15,7 @@ package mongo import ( "context" "fmt" + "sync" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" @@ -23,10 +24,26 @@ import ( "go.mongodb.org/mongo-driver/mongo/writeconcern" "mosn.io/pkg/log" + "mosn.io/layotto/components/pkg/actuators" "mosn.io/layotto/components/pkg/utils" "mosn.io/layotto/components/sequencer" ) +const ( + componentName = "sequencer-mongo" +) + +var ( + once sync.Once + readinessIndicator *actuators.HealthIndicator + livenessIndicator *actuators.HealthIndicator +) + +func init() { + readinessIndicator = actuators.NewHealthIndicator() + livenessIndicator = actuators.NewHealthIndicator() +} + type MongoSequencer struct { factory utils.MongoFactory @@ -50,6 +67,10 @@ type SequencerDocument struct { // MongoSequencer returns a new mongo sequencer func NewMongoSequencer(logger log.ErrorLogger) *MongoSequencer { + once.Do(func() { + indicators := &actuators.ComponentsIndicator{ReadinessIndicator: readinessIndicator, LivenessIndicator: livenessIndicator} + actuators.SetComponentsIndicator(componentName, indicators) + }) m := &MongoSequencer{ logger: logger, } @@ -62,6 +83,8 @@ func (e *MongoSequencer) Init(config sequencer.Configuration) error { // 1.parse config m, err := utils.ParseMongoMetadata(config.Properties) if err != nil { + readinessIndicator.ReportError(err.Error()) + livenessIndicator.ReportError(err.Error()) return err } e.metadata = m @@ -73,16 +96,22 @@ func (e *MongoSequencer) Init(config sequencer.Configuration) error { e.ctx, e.cancel = context.WithCancel(context.Background()) if e.client, err = e.factory.NewMongoClient(m); err != nil { + readinessIndicator.ReportError(err.Error()) + livenessIndicator.ReportError(err.Error()) return err } if err := e.client.Ping(e.ctx, nil); err != nil { + readinessIndicator.ReportError(err.Error()) + livenessIndicator.ReportError(err.Error()) return err } // Connections Collection e.collection, err = utils.SetCollection(e.client, e.factory, e.metadata) if err != nil { + readinessIndicator.ReportError(err.Error()) + livenessIndicator.ReportError(err.Error()) return err } @@ -94,6 +123,8 @@ func (e *MongoSequencer) Init(config sequencer.Configuration) error { // find key of biggerThan cursor, err := e.collection.Find(e.ctx, bson.M{"_id": k}) if err != nil { + readinessIndicator.ReportError(err.Error()) + livenessIndicator.ReportError(err.Error()) return err } if cursor != nil && cursor.RemainingBatchLength() > 0 { @@ -105,7 +136,8 @@ func (e *MongoSequencer) Init(config sequencer.Configuration) error { } } } - + readinessIndicator.SetStarted() + livenessIndicator.SetStarted() return err } diff --git a/components/sequencer/mysql/mysql.go b/components/sequencer/mysql/mysql.go index 255fe86f11..1e8f7dbb2e 100644 --- a/components/sequencer/mysql/mysql.go +++ b/components/sequencer/mysql/mysql.go @@ -15,13 +15,30 @@ package mysql import ( "database/sql" "fmt" + "sync" "mosn.io/pkg/log" + "mosn.io/layotto/components/pkg/actuators" "mosn.io/layotto/components/pkg/utils" "mosn.io/layotto/components/sequencer" ) +const ( + componentName = "sequencer-mysql" +) + +var ( + once sync.Once + readinessIndicator *actuators.HealthIndicator + livenessIndicator *actuators.HealthIndicator +) + +func init() { + readinessIndicator = actuators.NewHealthIndicator() + livenessIndicator = actuators.NewHealthIndicator() +} + type MySQLSequencer struct { metadata utils.MySQLMetadata biggerThan map[string]int64 @@ -30,6 +47,10 @@ type MySQLSequencer struct { } func NewMySQLSequencer(logger log.ErrorLogger) *MySQLSequencer { + once.Do(func() { + indicators := &actuators.ComponentsIndicator{ReadinessIndicator: readinessIndicator, LivenessIndicator: livenessIndicator} + actuators.SetComponentsIndicator(componentName, indicators) + }) s := &MySQLSequencer{ logger: logger, } @@ -42,6 +63,8 @@ func (e *MySQLSequencer) Init(config sequencer.Configuration) error { m, err := utils.ParseMySQLMetadata(config.Properties) if err != nil { + readinessIndicator.ReportError(err.Error()) + livenessIndicator.ReportError(err.Error()) return err } e.metadata = m @@ -49,6 +72,8 @@ func (e *MySQLSequencer) Init(config sequencer.Configuration) error { e.biggerThan = config.BiggerThan if err = utils.NewMySQLClient(e.metadata); err != nil { + readinessIndicator.ReportError(err.Error()) + livenessIndicator.ReportError(err.Error()) return err } @@ -64,6 +89,8 @@ func (e *MySQLSequencer) Init(config sequencer.Configuration) error { } } } + readinessIndicator.SetStarted() + livenessIndicator.SetStarted() return nil } diff --git a/components/sequencer/redis/standalone_redis_sequencer.go b/components/sequencer/redis/standalone_redis_sequencer.go index cff92f46be..95282fd8ab 100644 --- a/components/sequencer/redis/standalone_redis_sequencer.go +++ b/components/sequencer/redis/standalone_redis_sequencer.go @@ -14,14 +14,31 @@ package redis import ( "context" + "sync" "github.com/go-redis/redis/v8" "mosn.io/pkg/log" + "mosn.io/layotto/components/pkg/actuators" "mosn.io/layotto/components/pkg/utils" "mosn.io/layotto/components/sequencer" ) +const ( + componentName = "sequencer-redis-standalone" +) + +var ( + once sync.Once + readinessIndicator *actuators.HealthIndicator + livenessIndicator *actuators.HealthIndicator +) + +func init() { + readinessIndicator = actuators.NewHealthIndicator() + livenessIndicator = actuators.NewHealthIndicator() +} + type StandaloneRedisSequencer struct { client *redis.Client metadata utils.RedisMetadata @@ -35,6 +52,10 @@ type StandaloneRedisSequencer struct { // NewStandaloneRedisSequencer returns a new redis sequencer func NewStandaloneRedisSequencer(logger log.ErrorLogger) *StandaloneRedisSequencer { + once.Do(func() { + indicators := &actuators.ComponentsIndicator{ReadinessIndicator: readinessIndicator, LivenessIndicator: livenessIndicator} + actuators.SetComponentsIndicator(componentName, indicators) + }) s := &StandaloneRedisSequencer{ logger: logger, } @@ -58,6 +79,8 @@ end func (s *StandaloneRedisSequencer) Init(config sequencer.Configuration) error { m, err := utils.ParseRedisMetadata(config.Properties) if err != nil { + readinessIndicator.ReportError(err.Error()) + livenessIndicator.ReportError(err.Error()) return err } //init @@ -78,11 +101,15 @@ func (s *StandaloneRedisSequencer) Init(config sequencer.Configuration) error { err = eval.Err() //occur error, such as value is string type if err != nil { + readinessIndicator.ReportError(err.Error()) + livenessIndicator.ReportError(err.Error()) return err } //As long as there is no error, the initialization is successful //It may be a reset value or it may be satisfied before } + readinessIndicator.SetStarted() + livenessIndicator.SetStarted() return nil } diff --git a/components/sequencer/snowflake/snowflake_sequencer.go b/components/sequencer/snowflake/snowflake_sequencer.go index bd1d772b6b..22019cadaf 100755 --- a/components/sequencer/snowflake/snowflake_sequencer.go +++ b/components/sequencer/snowflake/snowflake_sequencer.go @@ -10,6 +10,7 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. + package snowflake import ( @@ -21,9 +22,25 @@ import ( "mosn.io/pkg/log" + "mosn.io/layotto/components/pkg/actuators" "mosn.io/layotto/components/sequencer" ) +const ( + componentName = "sequencer-snowflake" +) + +var ( + once sync.Once + readinessIndicator *actuators.HealthIndicator + livenessIndicator *actuators.HealthIndicator +) + +func init() { + readinessIndicator = actuators.NewHealthIndicator() + livenessIndicator = actuators.NewHealthIndicator() +} + type SnowFlakeSequencer struct { metadata SnowflakeMetadata workerId int64 @@ -37,6 +54,10 @@ type SnowFlakeSequencer struct { } func NewSnowFlakeSequencer(logger log.ErrorLogger) *SnowFlakeSequencer { + once.Do(func() { + indicators := &actuators.ComponentsIndicator{ReadinessIndicator: readinessIndicator, LivenessIndicator: livenessIndicator} + actuators.SetComponentsIndicator(componentName, indicators) + }) return &SnowFlakeSequencer{ logger: logger, smap: make(map[string]chan int64), @@ -47,6 +68,8 @@ func (s *SnowFlakeSequencer) Init(config sequencer.Configuration) error { var err error s.metadata, err = ParseSnowflakeMetadata(config.Properties) if err != nil { + readinessIndicator.ReportError(err.Error()) + livenessIndicator.ReportError(err.Error()) return err } //for unit test @@ -56,8 +79,12 @@ func (s *SnowFlakeSequencer) Init(config sequencer.Configuration) error { s.ctx, s.cancel = context.WithCancel(context.Background()) if s.workerId, err = NewMysqlClient(&s.metadata.MysqlMetadata); err != nil { + readinessIndicator.ReportError(err.Error()) + livenessIndicator.ReportError(err.Error()) return err } + readinessIndicator.SetStarted() + livenessIndicator.SetStarted() return err } diff --git a/components/sequencer/zookeeper/zookeeper_sequencer.go b/components/sequencer/zookeeper/zookeeper_sequencer.go index ab27683509..a69f21f22c 100644 --- a/components/sequencer/zookeeper/zookeeper_sequencer.go +++ b/components/sequencer/zookeeper/zookeeper_sequencer.go @@ -10,20 +10,37 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. + package zookeeper import ( "context" "fmt" + "sync" "github.com/go-zookeeper/zk" "mosn.io/pkg/log" + "mosn.io/layotto/components/pkg/actuators" "mosn.io/layotto/components/pkg/utils" "mosn.io/layotto/components/sequencer" ) -const maxInt32 = 2147483647 +const ( + componentName = "sequencer-zookeeper" + maxInt32 = 2147483647 +) + +var ( + once sync.Once + readinessIndicator *actuators.HealthIndicator + livenessIndicator *actuators.HealthIndicator +) + +func init() { + readinessIndicator = actuators.NewHealthIndicator() + livenessIndicator = actuators.NewHealthIndicator() +} type ZookeeperSequencer struct { client utils.ZKConnection @@ -37,6 +54,10 @@ type ZookeeperSequencer struct { // NewZookeeperSequencer returns a new zookeeper sequencer func NewZookeeperSequencer(logger log.ErrorLogger) *ZookeeperSequencer { + once.Do(func() { + indicators := &actuators.ComponentsIndicator{ReadinessIndicator: readinessIndicator, LivenessIndicator: livenessIndicator} + actuators.SetComponentsIndicator(componentName, indicators) + }) s := &ZookeeperSequencer{ logger: logger, } @@ -47,6 +68,8 @@ func NewZookeeperSequencer(logger log.ErrorLogger) *ZookeeperSequencer { func (s *ZookeeperSequencer) Init(config sequencer.Configuration) error { m, err := utils.ParseZookeeperMetadata(config.Properties) if err != nil { + readinessIndicator.ReportError(err.Error()) + livenessIndicator.ReportError(err.Error()) return err } //init @@ -55,6 +78,8 @@ func (s *ZookeeperSequencer) Init(config sequencer.Configuration) error { s.factory = &utils.ConnectionFactoryImpl{} connection, err := s.factory.NewConnection(0, s.metadata) if err != nil { + readinessIndicator.ReportError(err.Error()) + livenessIndicator.ReportError(err.Error()) return err } s.client = connection @@ -75,6 +100,8 @@ func (s *ZookeeperSequencer) Init(config sequencer.Configuration) error { if err == zk.ErrNoNode { return fmt.Errorf("zookeeper sequencer error: can not satisfy biggerThan guarantee.key: %s, current key does not exist", k) } + readinessIndicator.ReportError(err.Error()) + livenessIndicator.ReportError(err.Error()) //other error return err } @@ -85,6 +112,8 @@ func (s *ZookeeperSequencer) Init(config sequencer.Configuration) error { } } + readinessIndicator.SetStarted() + livenessIndicator.SetStarted() return err }