Skip to content

Commit

Permalink
Merge branch 'main' into feat/log_management
Browse files Browse the repository at this point in the history
# Conflicts:
#	components/configstores/etcdv3/etcdv3.go
#	components/configstores/nacos/configstore.go
#	components/cryption/aliyun/kms.go
#	components/cryption/aws/kms.go
#	components/lock/consul/consul_lock.go
#	components/lock/etcd/etcd_lock.go
#	components/lock/mongo/mongo_lock.go
#	components/lock/redis/cluster_redis_lock.go
#	components/lock/redis/standalone_redis_lock.go
#	components/lock/zookeeper/zookeeper_lock.go
#	components/oss/aws/oss.go
#	components/oss/ceph/oss.go
#	components/sequencer/etcd/store.go
#	components/sequencer/mongo/mongo_sequencer.go
#	components/sequencer/mysql/mysql.go
#	components/sequencer/redis/standalone_redis_sequencer.go
#	components/sequencer/snowflake/snowflake_sequencer.go
#	components/sequencer/zookeeper/zookeeper_sequencer.go
#	pkg/grpc/default_api/api.go
#	pkg/grpc/default_api/api_pubsub.go
  • Loading branch information
CrazyHZM committed Dec 17, 2024
2 parents a693a9a + 39c3fa4 commit 6637e90
Show file tree
Hide file tree
Showing 39 changed files with 3,076 additions and 1,005 deletions.
29 changes: 27 additions & 2 deletions components/configstores/etcdv3/etcdv3.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package etcdv3

import (
Expand All @@ -22,6 +23,8 @@ import (

"mosn.io/pkg/utils"

"mosn.io/layotto/components/pkg/actuators"

"go.etcd.io/etcd/api/v3/mvccpb"
clientv3 "go.etcd.io/etcd/client/v3"

Expand All @@ -32,10 +35,22 @@ import (
)

const (
defaultGroup = "default"
defaultLabel = "default"
defaultGroup = "default"
defaultLabel = "default"
componentName = "configstore-etcdv3"
)

var (
once sync.Once
readinessIndicator *actuators.HealthIndicator
livenessIndicator *actuators.HealthIndicator
)

func init() {
readinessIndicator = actuators.NewHealthIndicator()
livenessIndicator = actuators.NewHealthIndicator()
}

type EtcdV3ConfigStore struct {
client *clientv3.Client
sync.RWMutex
Expand All @@ -62,6 +77,10 @@ func (c *EtcdV3ConfigStore) OnLogLevelChanged(outputLevel log.LogLevel) {
}

func NewStore() configstores.Store {
once.Do(func() {
indicators := &actuators.ComponentsIndicator{ReadinessIndicator: readinessIndicator, LivenessIndicator: livenessIndicator}
actuators.SetComponentsIndicator(componentName, indicators)
})
cs := &EtcdV3ConfigStore{
subscribeKey: make(map[string]string),
watchRespCh: make(chan *configstores.SubscribeResp),
Expand All @@ -83,6 +102,12 @@ func (c *EtcdV3ConfigStore) Init(config *configstores.StoreConfig) error {
DialTimeout: time.Duration(t) * time.Second,
})
c.storeName = config.StoreName
if err != nil {
readinessIndicator.ReportError(err.Error())
livenessIndicator.ReportError(err.Error())
}
readinessIndicator.SetStarted()
livenessIndicator.SetStarted()
return err
}

Expand Down
26 changes: 25 additions & 1 deletion components/configstores/nacos/configstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,24 @@ import (
log "mosn.io/layotto/kit/logger"

"mosn.io/layotto/components/configstores"
"mosn.io/layotto/components/pkg/actuators"
)

const (
componentName = "configstore-nacos"
)

var (
once sync.Once
readinessIndicator *actuators.HealthIndicator
livenessIndicator *actuators.HealthIndicator
)

func init() {
readinessIndicator = actuators.NewHealthIndicator()
livenessIndicator = actuators.NewHealthIndicator()
}

type ConfigStore struct {
client config_client.IConfigClient
storeName string
Expand All @@ -42,6 +58,10 @@ type ConfigStore struct {
}

func NewStore() configstores.Store {
once.Do(func() {
indicators := &actuators.ComponentsIndicator{ReadinessIndicator: readinessIndicator, LivenessIndicator: livenessIndicator}
actuators.SetComponentsIndicator(componentName, indicators)
})
cs := &ConfigStore{
log: log.NewLayottoLogger("configstore/nacos"),
}
Expand Down Expand Up @@ -100,10 +120,14 @@ func (n *ConfigStore) Init(config *configstores.StoreConfig) (err error) {
} else {
client, err = n.init(config.Address, timeoutMs, metadata)
}

if err != nil {
readinessIndicator.ReportError(err.Error())
livenessIndicator.ReportError(err.Error())
return err
}

readinessIndicator.SetStarted()
livenessIndicator.SetStarted()
n.client = client
// replace nacos sdk log
return n.setupLogger(metadata)
Expand Down
26 changes: 26 additions & 0 deletions components/cryption/aliyun/kms.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 28 additions & 0 deletions components/cryption/aws/kms.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions components/delay_queue/azure/servicebus/servicebus.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package servicebus

import (
Expand Down
25 changes: 25 additions & 0 deletions components/file/aliyun/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ import (
"fmt"
"io"
"strconv"
"sync"

"mosn.io/layotto/components/pkg/actuators"

"github.com/aliyun/aliyun-oss-go-sdk/oss"

Expand All @@ -32,14 +35,30 @@ import (

const (
storageTypeKey = "storageType"
componentName = "file-aliyun"
)

var (
once sync.Once
readinessIndicator *actuators.HealthIndicator
livenessIndicator *actuators.HealthIndicator
)

func init() {
readinessIndicator = actuators.NewHealthIndicator()
livenessIndicator = actuators.NewHealthIndicator()
}

// AliyunFile is a binding for an AliCloud OSS storage bucketKey
type AliyunFile struct {
client *oss.Client
}

func NewAliyunFile() file.File {
once.Do(func() {
indicators := &actuators.ComponentsIndicator{ReadinessIndicator: readinessIndicator, LivenessIndicator: livenessIndicator}
actuators.SetComponentsIndicator(componentName, indicators)
})
oss := &AliyunFile{}
return oss
}
Expand All @@ -49,6 +68,8 @@ func (s *AliyunFile) Init(ctx context.Context, metadata *file.FileConfig) error
m := make([]*utils.OssMetadata, 0)
err := json.Unmarshal(metadata.Metadata, &m)
if err != nil {
readinessIndicator.ReportError(err.Error())
livenessIndicator.ReportError(err.Error())
return file.ErrInvalid
}

Expand All @@ -58,10 +79,14 @@ func (s *AliyunFile) Init(ctx context.Context, metadata *file.FileConfig) error
}
client, err := oss.New(v.Endpoint, v.AccessKeyID, v.AccessKeySecret)
if err != nil {
readinessIndicator.ReportError(err.Error())
livenessIndicator.ReportError(err.Error())
return err
}
s.client = client
}
readinessIndicator.SetStarted()
livenessIndicator.SetStarted()
return nil
}

Expand Down
24 changes: 24 additions & 0 deletions components/file/aws/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ import (
"fmt"
"io"
"strings"
"sync"

"mosn.io/layotto/components/pkg/actuators"
"mosn.io/layotto/components/pkg/utils"

"github.com/aws/aws-sdk-go-v2/aws"
Expand All @@ -38,14 +40,30 @@ import (

const (
defaultCredentialsSource = "provider"
componentName = "file-aws"
)

var (
once sync.Once
readinessIndicator *actuators.HealthIndicator
livenessIndicator *actuators.HealthIndicator
)

func init() {
readinessIndicator = actuators.NewHealthIndicator()
livenessIndicator = actuators.NewHealthIndicator()
}

// AwsOss is a binding for aws oss storage.
type AwsOss struct {
client *s3.Client
}

func NewAwsFile() file.File {
once.Do(func() {
indicators := &actuators.ComponentsIndicator{ReadinessIndicator: readinessIndicator, LivenessIndicator: livenessIndicator}
actuators.SetComponentsIndicator(componentName, indicators)
})
return &AwsOss{}
}

Expand All @@ -54,6 +72,8 @@ func (a *AwsOss) Init(ctx context.Context, config *file.FileConfig) error {
m := make([]*utils.OssMetadata, 0)
err := json.Unmarshal(config.Metadata, &m)
if err != nil {
readinessIndicator.ReportError(err.Error())
livenessIndicator.ReportError(err.Error())
return errors.New("invalid config for aws oss")
}
for _, data := range m {
Expand All @@ -64,6 +84,10 @@ func (a *AwsOss) Init(ctx context.Context, config *file.FileConfig) error {
if err != nil {
continue
}
if client != nil {
readinessIndicator.SetStarted()
livenessIndicator.SetStarted()
}
a.client = client
}
return nil
Expand Down
Loading

0 comments on commit 6637e90

Please sign in to comment.