Skip to content

Commit

Permalink
Add universal option for object_storage and queue. (#20)
Browse files Browse the repository at this point in the history
* Add universal option for object_storage and queue.

* Rename APIs.

* Add path in ObjectStorageOption.
  • Loading branch information
zhoufeng1989 authored Dec 17, 2024
1 parent 5267762 commit d09c25c
Show file tree
Hide file tree
Showing 12 changed files with 407 additions and 153 deletions.
78 changes: 65 additions & 13 deletions cloud/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package cloud

import (
"errors"
"fmt"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
Expand All @@ -13,14 +12,17 @@ import (
type Provider string

const (
AWSProvider Provider = "aws"
TencentCloudProvider Provider = "tencentcloud"
AWSProvider Provider = "aws"
TencentCloudProvider Provider = "tencentcloud"
AliCloudProvider Provider = "alicloud"

StandaloneRedisProvider Provider = "standalone_redis"
ClusterRedisProvider Provider = "cluster_redis"
StandaloneRedisProviderV7 Provider = "standalone_redis_v7"
ClusterRedisProviderV7 Provider = "cluster_redis_v7"
AliCloudStorageProvider Provider = "alicloud_storage"
AliCloudMNSQueueProvider Provider = "alicloud_mns_queue"

// AliCloudStorageProvider is deprecated, use AliCloudProvider instead.
AliCloudStorageProvider Provider = "alicloud_storage"
)

type AliCloudCredentialType string
Expand All @@ -41,7 +43,7 @@ const (
)

var (
ErrUnsupportedCloudProvider = fmt.Errorf("unsupported provider, only support %s, %s and %s", AWSProvider, TencentCloudProvider, StandaloneRedisProvider)
ErrUnsupportedCloudProvider = errors.New("unsupported provider")
ErrProviderNotTencentCloud = errors.New("provider is not tencentcloud")
ErrProviderNotAWS = errors.New("provider is not aws")
ErrProviderNotStandaloneRedis = errors.New("provider is not standalone redis")
Expand All @@ -66,13 +68,35 @@ type Option interface {
CheckAliCloudStorage() error
}

type CommonCloudOption struct {
SecretID string `json:"secret_id" yaml:"secret_id"`
SecretKey string `json:"secret_key" yaml:"secret_key"`
AssumeRoleArn string `json:"assume_role_arn" yaml:"assume_role_arn"`
Region string `json:"region" yaml:"region"`
AssumeRegion string `json:"assume_region" yaml:"assume_region"`
}

type AWSOption CommonCloudOption

func (option AWSOption) Check() error {
if option.Region == "" {
return ErrEmptyRegion
}
validKeyConfig := option.SecretID != "" && option.SecretKey != ""
validAssumeRoleConfig := option.AssumeRoleArn != "" && option.AssumeRegion != ""
if !validKeyConfig && !validAssumeRoleConfig {
return errors.New("must have valid key pairs config or assume role config")
}
return nil
}

type CommonOption struct {
Provider Provider
SecretID string
SecretKey string
AssumeRoleArn string
Region string
AssumeRegion string
Provider Provider `json:"provider" yaml:"provider"`
SecretID string `json:"secret_id" yaml:"secret_id"`
SecretKey string `json:"secret_key" yaml:"secret_key"`
AssumeRoleArn string `json:"assume_role_arn" yaml:"assume_role_arn"`
Region string `json:"region" yaml:"region"`
AssumeRegion string `json:"assume_region" yaml:"assume_region"`
}

func (option CommonOption) GetProvider() Provider {
Expand Down Expand Up @@ -134,7 +158,7 @@ func (option CommonOption) CheckTencentCloud() error {
}

func (option CommonOption) CheckAliCloudStorage() error {
if option.Provider != AliCloudStorageProvider {
if option.Provider != AliCloudStorageProvider && option.Provider != AliCloudProvider {
return ErrProviderNotAliCloudStorage
}
return option.check()
Expand Down Expand Up @@ -173,3 +197,31 @@ func AwsNewSession(option Option) (*session.Session, *aws.Config, error) {
}
return sess, nil, nil
}

func AwsNewSessionWithOption(option AWSOption) (*session.Session, *aws.Config, error) {
if err := option.Check(); err != nil {
return nil, nil, err
}
var creds *credentials.Credentials
if option.SecretID != "" && option.SecretKey != "" {
creds = credentials.NewStaticCredentials(option.SecretID, option.SecretKey, "")
}

sess, err := session.NewSessionWithOptions(session.Options{
Config: aws.Config{
CredentialsChainVerboseErrors: aws.Bool(true),
Credentials: creds, // 可能是nil
// LogLevel: aws.LogLevel(aws.LogDebug),
Region: aws.String(option.Region),
},
SharedConfigFiles: []string{},
})
if err != nil {
return nil, nil, err
}
if roleArn := option.AssumeRoleArn; roleArn != "" { // 切换 assumeRole
assumeRoleCreds := stscreds.NewCredentials(sess, roleArn)
return sess, aws.NewConfig().WithCredentials(assumeRoleCreds).WithRegion(option.AssumeRegion), nil
}
return sess, nil, nil
}
59 changes: 35 additions & 24 deletions cloud/examples/object_storage/object_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,49 +5,60 @@ import (
"context"
"errors"
"fmt"
"github.com/aws/aws-sdk-go/aws"
"io"
"log"
"net/http"
"time"

"github.com/aws/aws-sdk-go/aws"

"github.com/byte-power/gorich/cloud"
"github.com/byte-power/gorich/cloud/object_storage"
)

// Configure secret_id, secret_key, region and bucket_name to run this example.
func main() {
optionForTencentCloud := cloud.CommonOption{
Provider: cloud.TencentCloudProvider,
SecretID: "tencentcloud_secret_id_xxx",
SecretKey: "tencentcloud_secret_key_xxx",
Region: "tencentcloud_region_xxx",
option := object_storage.ObjectStorageOption{
Provider: cloud.TencentCloudProvider,
Bucket: "tencent_bucket_name_xxx",
COS: object_storage.COSOption{
SecretID: "tencentcloud_secret_id_xxx",
SecretKey: "tencentcloud_secret_key_xxx",
Region: "tencentcloud_region_xxx",
},
}
object_storage_examples("tencentcloud_bucket_name_xxx", optionForTencentCloud)
object_storage_examples(option)

optionForAWS := cloud.CommonOption{
Provider: cloud.AWSProvider,
SecretID: "aws_secret_id_xxx",
SecretKey: "aws_secret_key_xxx",
Region: "aws_region_xxx",
option = object_storage.ObjectStorageOption{
Provider: cloud.AWSProvider,
Bucket: "aws_bucket_name_xxx",
S3: cloud.AWSOption{
SecretID: "aws_secret_id_xxx",
SecretKey: "aws_secret_key_xxx",
Region: "aws_region_xxx",
},
}
object_storage_examples("aws_bucket_name_xxx", optionForAWS)
object_storage_examples(option)

optionForAliOSS := object_storage.AliCloudStorageOption{
//CredentialType: "oidc_role_arn",
//EndPoint: "oss-cn-zhangjiakou.aliyuncs.com",
//SessionName: "test-rrsa-oidc-token",
option = object_storage.ObjectStorageOption{
Provider: cloud.AliCloudProvider,
Bucket: "alicloud_bucket_name_xxx",
OSS: object_storage.AliCloudStorageOption{
//CredentialType: "oidc_role_arn",
//EndPoint: "oss-cn-zhangjiakou.aliyuncs.com",
//SessionName: "test-rrsa-oidc-token",

CredentialType: "ak",
EndPoint: "oss-cn-beijing.aliyuncs.com",
AccessKeyID: "alicloud_access_key_id_xxx",
AccessKeySecret: "alicloud_access_key_secret_xxx",
CredentialType: "ak",
EndPoint: "oss-cn-beijing.aliyuncs.com",
AccessKeyID: "alicloud_access_key_id_xxx",
AccessKeySecret: "alicloud_access_key_secret_xxx",
},
}
object_storage_examples("alicloud_bucket_name_xxx", optionForAliOSS)
object_storage_examples(option)
}

func object_storage_examples(bucketName string, option cloud.Option) {
service, err := object_storage.GetObjectStorageService(bucketName, option)
func object_storage_examples(option object_storage.ObjectStorageOption) {
service, err := object_storage.GetObjectStorageServiceWithOption(option)
if err != nil {
fmt.Printf("GetObjectStorageService error:%s\n", err)
return
Expand Down
135 changes: 74 additions & 61 deletions cloud/examples/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,86 +14,99 @@ import (
// Configure secret_id, secret_key, region, and queue_name to run this example.
func main() {

// Redis 单节点
//optionForBaseRedis := queue.StandaloneRedisQueueOption{
// Addr: "localhost:6379",
// Password: "",
// ConsumerGroup: "save_task_consumer_group",
// Idle: 10,
//}

// Redis 集群
optionForBaseRedis := queue.ClusterRedisQueueOption{
Addrs: []string{
"localhost:7000",
"localhost:7001",
"localhost:7002",
"localhost:7003",
"localhost:7004",
"localhost:7005",
option := queue.QueueOption{
Provider: cloud.ClusterRedisProvider,
QueueName: "test_queue_name",
ClusterRedis: queue.ClusterRedisQueueOption{
Addrs: []string{
"localhost:7000",
"localhost:7001",
"localhost:7002",
"localhost:7003",
"localhost:7004",
"localhost:7005",
},
Password: "",
ConsumerGroup: "save_task_consumer_group",
Idle: 10,
},
Password: "",
ConsumerGroup: "save_task_consumer_group",
Idle: 10,
}
queue_examples(option)

queue_examples("test_queue_name", optionForBaseRedis)

optionForTencentCloud := queue.TencentCloudQueueOption{
Token: "access_jwt_token_xxx",
URL: "http://pulsar-xxxxxxxxx.tdmq.ap-gz.public.tencenttdmq.com:8080",
}
topicName := "pulsar-xxxxxx/namespace_name/topic_name"
subscriptionName := "subscription_name"
topicSub := queue.GenerateTopicAndSubName(topicName, subscriptionName)
queue_examples(topicSub, optionForTencentCloud)
option = queue.QueueOption{
Provider: cloud.TencentCloudProvider,
QueueName: topicSub,
Pulsar: queue.TencentCloudQueueOption{
Token: "access_jwt_token_xxx",
URL: "http://pulsar-xxxxxxxxx.tdmq.ap-gz.public.tencenttdmq.com:8080",
},
}
queue_examples(option)

optionForAWS := cloud.CommonOption{
option = queue.QueueOption{
Provider: cloud.AWSProvider,
SecretID: "aws_secret_id_xxxx",
SecretKey: "aws_secret_key_xxxx",
Region: "aws_region_xxx",
QueueName: "aws_queue_name",
SQS: cloud.AWSOption{
SecretID: "aws_secret_id_xxxx",
SecretKey: "aws_secret_key_xxxx",
Region: "aws_region_xxx",
},
}
queue_examples("aws_queue_name", optionForAWS)

queue_examples(option)

dialTimeout := 5 * time.Second
clusterRedisQueueOptionV7 := queue.ClusterRedisQueueOptionV7{
ClusterRedisQueueOption: queue.ClusterRedisQueueOption{
Addrs: []string{
"localhost:30001",
"localhost:30002",
"localhost:30003",
option = queue.QueueOption{
Provider: cloud.ClusterRedisProviderV7,
QueueName: "redis_cluster_queue_v7",
ClusterRedisV7: queue.ClusterRedisQueueOptionV7{
ClusterRedisQueueOption: queue.ClusterRedisQueueOption{
Addrs: []string{
"localhost:30001",
"localhost:30002",
"localhost:30003",
},
ConsumerGroup: "save_task_consumer_group_2",
DialTimeout: &dialTimeout,
Idle: 10,
},
ConsumerGroup: "save_task_consumer_group_2",
DialTimeout: &dialTimeout,
Idle: 10,
},
}
queue_examples("redis_cluster_queue_v7", clusterRedisQueueOptionV7)
queue_examples(option)

// alicloud MNS queue: access_key CredentialType
mnsQueueOption := queue.AliMNSClientOption{
EndPoint: "http://account-id.mns.region.aliyuncs.com",
CredentialType: cloud.AliCloudAccessKeyCredentialType,
AccessKeyId: "alicloud_access_key_id",
AccessKeySecret: "alicloud_access_key_secret",
MessagePriority: 10,
option = queue.QueueOption{
Provider: cloud.AliCloudProvider,
QueueName: "mns_queue_name",
MNS: queue.AliMNSClientOption{
EndPoint: "http://account-id.mns.region.aliyuncs.com",
CredentialType: cloud.AliCloudAccessKeyCredentialType,
AccessKeyId: "alicloud_access_key_id",
AccessKeySecret: "alicloud_access_key_secret",
MessagePriority: 10,
},
}
queue_examples("mns_queue_name", mnsQueueOption)
queue_examples(option)

// alicloud MNS queue: ecs_ram_role credentialType
mnsQueueOption = queue.AliMNSClientOption{
EndPoint: "http://account-id.mns.region.aliyuncs.com",
CredentialType: cloud.AliCloudECSRamRoleCredentialType,
}
queue_examples("mns_queue_name", mnsQueueOption)
alicloud_mns_queue_examples("mns_queue_name", mnsQueueOption)
option = queue.QueueOption{
Provider: cloud.AliCloudProvider,
QueueName: "mns_queue_name",
MNS: queue.AliMNSClientOption{
EndPoint: "http://account-id.mns.region.aliyuncs.com",
CredentialType: cloud.AliCloudECSRamRoleCredentialType,
}}
queue_examples(option)
alicloud_mns_queue_examples(option)
}

func queue_examples(queueOrTopicName string, option cloud.Option) {
service, err := queue.GetQueueService(queueOrTopicName, option)
func queue_examples(option queue.QueueOption) {
service, err := queue.GetQueueServiceWithOption(option)
if err != nil {
fmt.Printf("get queue service error %s %+v %s\n", queueOrTopicName, option, err)
fmt.Printf("get queue service error %s %+v %s\n", option.QueueName, option, err)
return
}
defer service.Close()
Expand Down Expand Up @@ -145,10 +158,10 @@ func queue_examples(queueOrTopicName string, option cloud.Option) {
}

// The following examples show mns queue specific examples: How to set message priority; how to set long polling period seconds.
func alicloud_mns_queue_examples(queueOrTopicName string, option cloud.Option) {
service, err := queue.GetQueueService(queueOrTopicName, option)
func alicloud_mns_queue_examples(option queue.QueueOption) {
service, err := queue.GetQueueServiceWithOption(option)
if err != nil {
fmt.Printf("get queue service error %s %+v %s\n", queueOrTopicName, option, err)
fmt.Printf("get queue service error %s %+v %s\n", option.QueueName, option, err)
return
}
defer service.Close()
Expand Down
10 changes: 5 additions & 5 deletions cloud/object_storage/alicloud_object_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@ var (
)

type AliCloudStorageOption struct {
CredentialType string // eg: "oidc_role_arn" or "ak"
EndPoint string // eg: "oss-cn-zhangjiakou.aliyuncs.com"
SessionName string // eg: "test-rrsa-oidc-token"
CredentialType string `yaml:"credential_type" json:"credential_type"` // eg: "oidc_role_arn" or "ak"
EndPoint string `yaml:"endpoint" json:"endpoint"` // eg: "oss-cn-zhangjiakou.aliyuncs.com"
SessionName string `yaml:"session_name" json:"session_name"` // eg: "test-rrsa-oidc-token"

// "ak" required
AccessKeyID string
AccessKeySecret string
AccessKeyID string `yaml:"access_key_id" json:"access_key_id"`
AccessKeySecret string `yaml:"access_key_secret" json:"access_key_secret"`
}

func (option AliCloudStorageOption) GetProvider() cloud.Provider {
Expand Down
Loading

0 comments on commit d09c25c

Please sign in to comment.