From d09c25c0e74801c0fe8114abbcf4284cd95bb801 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=91=A8=E4=B8=B0?= Date: Tue, 17 Dec 2024 09:29:01 +0800 Subject: [PATCH] Add universal option for object_storage and queue. (#20) * Add universal option for object_storage and queue. * Rename APIs. * Add path in ObjectStorageOption. --- cloud/common.go | 78 ++++++++-- .../examples/object_storage/object_storage.go | 59 ++++---- cloud/examples/queue/queue.go | 135 ++++++++++-------- .../object_storage/alicloud_object_storage.go | 10 +- cloud/object_storage/aws_object_storage.go | 22 +++ cloud/object_storage/object_storage.go | 40 +++++- .../tencentcloud_object_storage.go | 41 ++++++ cloud/queue/alicloud_mns_queue.go | 35 ++--- cloud/queue/aws_queue.go | 29 ++++ cloud/queue/base_redis_queue_struct.go | 58 ++++---- cloud/queue/queue.go | 49 ++++++- cloud/queue/tencent_queue.go | 4 +- 12 files changed, 407 insertions(+), 153 deletions(-) diff --git a/cloud/common.go b/cloud/common.go index 4e27530..58245d1 100644 --- a/cloud/common.go +++ b/cloud/common.go @@ -2,7 +2,6 @@ package cloud import ( "errors" - "fmt" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/credentials" @@ -13,14 +12,17 @@ import ( type Provider string const ( - AWSProvider Provider = "aws" - TencentCloudProvider Provider = "tencentcloud" + AWSProvider Provider = "aws" + TencentCloudProvider Provider = "tencentcloud" + AliCloudProvider Provider = "alicloud" + StandaloneRedisProvider Provider = "standalone_redis" ClusterRedisProvider Provider = "cluster_redis" StandaloneRedisProviderV7 Provider = "standalone_redis_v7" ClusterRedisProviderV7 Provider = "cluster_redis_v7" - AliCloudStorageProvider Provider = "alicloud_storage" - AliCloudMNSQueueProvider Provider = "alicloud_mns_queue" + + // AliCloudStorageProvider is deprecated, use AliCloudProvider instead. + AliCloudStorageProvider Provider = "alicloud_storage" ) type AliCloudCredentialType string @@ -41,7 +43,7 @@ const ( ) var ( - ErrUnsupportedCloudProvider = fmt.Errorf("unsupported provider, only support %s, %s and %s", AWSProvider, TencentCloudProvider, StandaloneRedisProvider) + ErrUnsupportedCloudProvider = errors.New("unsupported provider") ErrProviderNotTencentCloud = errors.New("provider is not tencentcloud") ErrProviderNotAWS = errors.New("provider is not aws") ErrProviderNotStandaloneRedis = errors.New("provider is not standalone redis") @@ -66,13 +68,35 @@ type Option interface { CheckAliCloudStorage() error } +type CommonCloudOption struct { + SecretID string `json:"secret_id" yaml:"secret_id"` + SecretKey string `json:"secret_key" yaml:"secret_key"` + AssumeRoleArn string `json:"assume_role_arn" yaml:"assume_role_arn"` + Region string `json:"region" yaml:"region"` + AssumeRegion string `json:"assume_region" yaml:"assume_region"` +} + +type AWSOption CommonCloudOption + +func (option AWSOption) Check() error { + if option.Region == "" { + return ErrEmptyRegion + } + validKeyConfig := option.SecretID != "" && option.SecretKey != "" + validAssumeRoleConfig := option.AssumeRoleArn != "" && option.AssumeRegion != "" + if !validKeyConfig && !validAssumeRoleConfig { + return errors.New("must have valid key pairs config or assume role config") + } + return nil +} + type CommonOption struct { - Provider Provider - SecretID string - SecretKey string - AssumeRoleArn string - Region string - AssumeRegion string + Provider Provider `json:"provider" yaml:"provider"` + SecretID string `json:"secret_id" yaml:"secret_id"` + SecretKey string `json:"secret_key" yaml:"secret_key"` + AssumeRoleArn string `json:"assume_role_arn" yaml:"assume_role_arn"` + Region string `json:"region" yaml:"region"` + AssumeRegion string `json:"assume_region" yaml:"assume_region"` } func (option CommonOption) GetProvider() Provider { @@ -134,7 +158,7 @@ func (option CommonOption) CheckTencentCloud() error { } func (option CommonOption) CheckAliCloudStorage() error { - if option.Provider != AliCloudStorageProvider { + if option.Provider != AliCloudStorageProvider && option.Provider != AliCloudProvider { return ErrProviderNotAliCloudStorage } return option.check() @@ -173,3 +197,31 @@ func AwsNewSession(option Option) (*session.Session, *aws.Config, error) { } return sess, nil, nil } + +func AwsNewSessionWithOption(option AWSOption) (*session.Session, *aws.Config, error) { + if err := option.Check(); err != nil { + return nil, nil, err + } + var creds *credentials.Credentials + if option.SecretID != "" && option.SecretKey != "" { + creds = credentials.NewStaticCredentials(option.SecretID, option.SecretKey, "") + } + + sess, err := session.NewSessionWithOptions(session.Options{ + Config: aws.Config{ + CredentialsChainVerboseErrors: aws.Bool(true), + Credentials: creds, // 可能是nil + // LogLevel: aws.LogLevel(aws.LogDebug), + Region: aws.String(option.Region), + }, + SharedConfigFiles: []string{}, + }) + if err != nil { + return nil, nil, err + } + if roleArn := option.AssumeRoleArn; roleArn != "" { // 切换 assumeRole + assumeRoleCreds := stscreds.NewCredentials(sess, roleArn) + return sess, aws.NewConfig().WithCredentials(assumeRoleCreds).WithRegion(option.AssumeRegion), nil + } + return sess, nil, nil +} diff --git a/cloud/examples/object_storage/object_storage.go b/cloud/examples/object_storage/object_storage.go index fd02b80..78bb3ad 100644 --- a/cloud/examples/object_storage/object_storage.go +++ b/cloud/examples/object_storage/object_storage.go @@ -5,49 +5,60 @@ import ( "context" "errors" "fmt" - "github.com/aws/aws-sdk-go/aws" "io" "log" "net/http" "time" + "github.com/aws/aws-sdk-go/aws" + "github.com/byte-power/gorich/cloud" "github.com/byte-power/gorich/cloud/object_storage" ) // Configure secret_id, secret_key, region and bucket_name to run this example. func main() { - optionForTencentCloud := cloud.CommonOption{ - Provider: cloud.TencentCloudProvider, - SecretID: "tencentcloud_secret_id_xxx", - SecretKey: "tencentcloud_secret_key_xxx", - Region: "tencentcloud_region_xxx", + option := object_storage.ObjectStorageOption{ + Provider: cloud.TencentCloudProvider, + Bucket: "tencent_bucket_name_xxx", + COS: object_storage.COSOption{ + SecretID: "tencentcloud_secret_id_xxx", + SecretKey: "tencentcloud_secret_key_xxx", + Region: "tencentcloud_region_xxx", + }, } - object_storage_examples("tencentcloud_bucket_name_xxx", optionForTencentCloud) + object_storage_examples(option) - optionForAWS := cloud.CommonOption{ - Provider: cloud.AWSProvider, - SecretID: "aws_secret_id_xxx", - SecretKey: "aws_secret_key_xxx", - Region: "aws_region_xxx", + option = object_storage.ObjectStorageOption{ + Provider: cloud.AWSProvider, + Bucket: "aws_bucket_name_xxx", + S3: cloud.AWSOption{ + SecretID: "aws_secret_id_xxx", + SecretKey: "aws_secret_key_xxx", + Region: "aws_region_xxx", + }, } - object_storage_examples("aws_bucket_name_xxx", optionForAWS) + object_storage_examples(option) - optionForAliOSS := object_storage.AliCloudStorageOption{ - //CredentialType: "oidc_role_arn", - //EndPoint: "oss-cn-zhangjiakou.aliyuncs.com", - //SessionName: "test-rrsa-oidc-token", + option = object_storage.ObjectStorageOption{ + Provider: cloud.AliCloudProvider, + Bucket: "alicloud_bucket_name_xxx", + OSS: object_storage.AliCloudStorageOption{ + //CredentialType: "oidc_role_arn", + //EndPoint: "oss-cn-zhangjiakou.aliyuncs.com", + //SessionName: "test-rrsa-oidc-token", - CredentialType: "ak", - EndPoint: "oss-cn-beijing.aliyuncs.com", - AccessKeyID: "alicloud_access_key_id_xxx", - AccessKeySecret: "alicloud_access_key_secret_xxx", + CredentialType: "ak", + EndPoint: "oss-cn-beijing.aliyuncs.com", + AccessKeyID: "alicloud_access_key_id_xxx", + AccessKeySecret: "alicloud_access_key_secret_xxx", + }, } - object_storage_examples("alicloud_bucket_name_xxx", optionForAliOSS) + object_storage_examples(option) } -func object_storage_examples(bucketName string, option cloud.Option) { - service, err := object_storage.GetObjectStorageService(bucketName, option) +func object_storage_examples(option object_storage.ObjectStorageOption) { + service, err := object_storage.GetObjectStorageServiceWithOption(option) if err != nil { fmt.Printf("GetObjectStorageService error:%s\n", err) return diff --git a/cloud/examples/queue/queue.go b/cloud/examples/queue/queue.go index 8cade51..8cbcf26 100644 --- a/cloud/examples/queue/queue.go +++ b/cloud/examples/queue/queue.go @@ -14,86 +14,99 @@ import ( // Configure secret_id, secret_key, region, and queue_name to run this example. func main() { - // Redis 单节点 - //optionForBaseRedis := queue.StandaloneRedisQueueOption{ - // Addr: "localhost:6379", - // Password: "", - // ConsumerGroup: "save_task_consumer_group", - // Idle: 10, - //} - - // Redis 集群 - optionForBaseRedis := queue.ClusterRedisQueueOption{ - Addrs: []string{ - "localhost:7000", - "localhost:7001", - "localhost:7002", - "localhost:7003", - "localhost:7004", - "localhost:7005", + option := queue.QueueOption{ + Provider: cloud.ClusterRedisProvider, + QueueName: "test_queue_name", + ClusterRedis: queue.ClusterRedisQueueOption{ + Addrs: []string{ + "localhost:7000", + "localhost:7001", + "localhost:7002", + "localhost:7003", + "localhost:7004", + "localhost:7005", + }, + Password: "", + ConsumerGroup: "save_task_consumer_group", + Idle: 10, }, - Password: "", - ConsumerGroup: "save_task_consumer_group", - Idle: 10, } + queue_examples(option) - queue_examples("test_queue_name", optionForBaseRedis) - - optionForTencentCloud := queue.TencentCloudQueueOption{ - Token: "access_jwt_token_xxx", - URL: "http://pulsar-xxxxxxxxx.tdmq.ap-gz.public.tencenttdmq.com:8080", - } topicName := "pulsar-xxxxxx/namespace_name/topic_name" subscriptionName := "subscription_name" topicSub := queue.GenerateTopicAndSubName(topicName, subscriptionName) - queue_examples(topicSub, optionForTencentCloud) + option = queue.QueueOption{ + Provider: cloud.TencentCloudProvider, + QueueName: topicSub, + Pulsar: queue.TencentCloudQueueOption{ + Token: "access_jwt_token_xxx", + URL: "http://pulsar-xxxxxxxxx.tdmq.ap-gz.public.tencenttdmq.com:8080", + }, + } + queue_examples(option) - optionForAWS := cloud.CommonOption{ + option = queue.QueueOption{ Provider: cloud.AWSProvider, - SecretID: "aws_secret_id_xxxx", - SecretKey: "aws_secret_key_xxxx", - Region: "aws_region_xxx", + QueueName: "aws_queue_name", + SQS: cloud.AWSOption{ + SecretID: "aws_secret_id_xxxx", + SecretKey: "aws_secret_key_xxxx", + Region: "aws_region_xxx", + }, } - queue_examples("aws_queue_name", optionForAWS) + + queue_examples(option) dialTimeout := 5 * time.Second - clusterRedisQueueOptionV7 := queue.ClusterRedisQueueOptionV7{ - ClusterRedisQueueOption: queue.ClusterRedisQueueOption{ - Addrs: []string{ - "localhost:30001", - "localhost:30002", - "localhost:30003", + option = queue.QueueOption{ + Provider: cloud.ClusterRedisProviderV7, + QueueName: "redis_cluster_queue_v7", + ClusterRedisV7: queue.ClusterRedisQueueOptionV7{ + ClusterRedisQueueOption: queue.ClusterRedisQueueOption{ + Addrs: []string{ + "localhost:30001", + "localhost:30002", + "localhost:30003", + }, + ConsumerGroup: "save_task_consumer_group_2", + DialTimeout: &dialTimeout, + Idle: 10, }, - ConsumerGroup: "save_task_consumer_group_2", - DialTimeout: &dialTimeout, - Idle: 10, }, } - queue_examples("redis_cluster_queue_v7", clusterRedisQueueOptionV7) + queue_examples(option) // alicloud MNS queue: access_key CredentialType - mnsQueueOption := queue.AliMNSClientOption{ - EndPoint: "http://account-id.mns.region.aliyuncs.com", - CredentialType: cloud.AliCloudAccessKeyCredentialType, - AccessKeyId: "alicloud_access_key_id", - AccessKeySecret: "alicloud_access_key_secret", - MessagePriority: 10, + option = queue.QueueOption{ + Provider: cloud.AliCloudProvider, + QueueName: "mns_queue_name", + MNS: queue.AliMNSClientOption{ + EndPoint: "http://account-id.mns.region.aliyuncs.com", + CredentialType: cloud.AliCloudAccessKeyCredentialType, + AccessKeyId: "alicloud_access_key_id", + AccessKeySecret: "alicloud_access_key_secret", + MessagePriority: 10, + }, } - queue_examples("mns_queue_name", mnsQueueOption) + queue_examples(option) // alicloud MNS queue: ecs_ram_role credentialType - mnsQueueOption = queue.AliMNSClientOption{ - EndPoint: "http://account-id.mns.region.aliyuncs.com", - CredentialType: cloud.AliCloudECSRamRoleCredentialType, - } - queue_examples("mns_queue_name", mnsQueueOption) - alicloud_mns_queue_examples("mns_queue_name", mnsQueueOption) + option = queue.QueueOption{ + Provider: cloud.AliCloudProvider, + QueueName: "mns_queue_name", + MNS: queue.AliMNSClientOption{ + EndPoint: "http://account-id.mns.region.aliyuncs.com", + CredentialType: cloud.AliCloudECSRamRoleCredentialType, + }} + queue_examples(option) + alicloud_mns_queue_examples(option) } -func queue_examples(queueOrTopicName string, option cloud.Option) { - service, err := queue.GetQueueService(queueOrTopicName, option) +func queue_examples(option queue.QueueOption) { + service, err := queue.GetQueueServiceWithOption(option) if err != nil { - fmt.Printf("get queue service error %s %+v %s\n", queueOrTopicName, option, err) + fmt.Printf("get queue service error %s %+v %s\n", option.QueueName, option, err) return } defer service.Close() @@ -145,10 +158,10 @@ func queue_examples(queueOrTopicName string, option cloud.Option) { } // The following examples show mns queue specific examples: How to set message priority; how to set long polling period seconds. -func alicloud_mns_queue_examples(queueOrTopicName string, option cloud.Option) { - service, err := queue.GetQueueService(queueOrTopicName, option) +func alicloud_mns_queue_examples(option queue.QueueOption) { + service, err := queue.GetQueueServiceWithOption(option) if err != nil { - fmt.Printf("get queue service error %s %+v %s\n", queueOrTopicName, option, err) + fmt.Printf("get queue service error %s %+v %s\n", option.QueueName, option, err) return } defer service.Close() diff --git a/cloud/object_storage/alicloud_object_storage.go b/cloud/object_storage/alicloud_object_storage.go index 3c8a078..dd8af87 100644 --- a/cloud/object_storage/alicloud_object_storage.go +++ b/cloud/object_storage/alicloud_object_storage.go @@ -25,13 +25,13 @@ var ( ) type AliCloudStorageOption struct { - CredentialType string // eg: "oidc_role_arn" or "ak" - EndPoint string // eg: "oss-cn-zhangjiakou.aliyuncs.com" - SessionName string // eg: "test-rrsa-oidc-token" + CredentialType string `yaml:"credential_type" json:"credential_type"` // eg: "oidc_role_arn" or "ak" + EndPoint string `yaml:"endpoint" json:"endpoint"` // eg: "oss-cn-zhangjiakou.aliyuncs.com" + SessionName string `yaml:"session_name" json:"session_name"` // eg: "test-rrsa-oidc-token" // "ak" required - AccessKeyID string - AccessKeySecret string + AccessKeyID string `yaml:"access_key_id" json:"access_key_id"` + AccessKeySecret string `yaml:"access_key_secret" json:"access_key_secret"` } func (option AliCloudStorageOption) GetProvider() cloud.Provider { diff --git a/cloud/object_storage/aws_object_storage.go b/cloud/object_storage/aws_object_storage.go index 5b0ddaa..0a9fc86 100644 --- a/cloud/object_storage/aws_object_storage.go +++ b/cloud/object_storage/aws_object_storage.go @@ -19,6 +19,28 @@ type AWSObjectStorageService struct { bucketName string } +func getAWSObjectService(bucketName string, option cloud.AWSOption) (ObjectStorageService, error) { + if bucketName == "" { + return nil, ErrBucketNameEmpty + } + if err := option.Check(); err != nil { + return nil, err + } + sess, cfg, err := cloud.AwsNewSessionWithOption(option) + if err != nil { + return nil, err + } + var client *s3.S3 + // Assume the specified role + if cfg != nil { + client = s3.New(sess, cfg) + } else { + client = s3.New(sess) + } + return &AWSObjectStorageService{client: client, bucketName: bucketName}, nil +} + +// GetAWSObjectService is deprecated, use getAWSObjectService instead. func GetAWSObjectService(bucketName string, option cloud.Option) (ObjectStorageService, error) { if bucketName == "" { return nil, ErrBucketNameEmpty diff --git a/cloud/object_storage/object_storage.go b/cloud/object_storage/object_storage.go index beb303f..dc8f158 100644 --- a/cloud/object_storage/object_storage.go +++ b/cloud/object_storage/object_storage.go @@ -3,13 +3,14 @@ package object_storage import ( "context" "errors" - "github.com/aliyun/aliyun-oss-go-sdk/oss" - "github.com/tencentyun/cos-go-sdk-v5" "net/http" "net/url" "strconv" "time" + "github.com/aliyun/aliyun-oss-go-sdk/oss" + "github.com/tencentyun/cos-go-sdk-v5" + "github.com/byte-power/gorich/cloud" ) @@ -71,6 +72,7 @@ func (object Object) GetContentType() string { return object.contentType } +// GetObjectStorageService is deprecated, use GetObjectStorageServiceWithOption instead. func GetObjectStorageService(bucketName string, option cloud.Option) (ObjectStorageService, error) { if option.GetProvider() == cloud.TencentCloudProvider { return GetTencentCloudObjectService(bucketName, option) @@ -82,6 +84,40 @@ func GetObjectStorageService(bucketName string, option cloud.Option) (ObjectStor return nil, cloud.ErrUnsupportedCloudProvider } +type ObjectStorageOption struct { + Provider cloud.Provider `json:"provider" yaml:"provider"` + Bucket string `json:"bucket" yaml:"bucket"` + // Path is not used currently. + Path string `json:"path" yaml:"path"` + + S3 cloud.AWSOption `json:"s3" yaml:"s3"` + OSS AliCloudStorageOption `json:"oss" yaml:"oss"` + COS COSOption `json:"cos" yaml:"cos"` +} + +func (option ObjectStorageOption) Check() error { + if option.Bucket == "" { + return ErrBucketNameEmpty + } + return nil +} + +func GetObjectStorageServiceWithOption(option ObjectStorageOption) (ObjectStorageService, error) { + if err := option.Check(); err != nil { + return nil, err + } + switch option.Provider { + case cloud.AWSProvider: + return getAWSObjectService(option.Bucket, option.S3) + case cloud.AliCloudProvider: + return GetAliCloudObjectService(option.Bucket, option.OSS) + case cloud.TencentCloudProvider: + return getTencentCloudObjectService(option.Bucket, option.COS) + default: + return nil, cloud.ErrUnsupportedCloudProvider + } +} + type PutHeaderOption struct { ContentDisposition *string ContentEncoding *string diff --git a/cloud/object_storage/tencentcloud_object_storage.go b/cloud/object_storage/tencentcloud_object_storage.go index 44e615e..b555311 100644 --- a/cloud/object_storage/tencentcloud_object_storage.go +++ b/cloud/object_storage/tencentcloud_object_storage.go @@ -35,6 +35,47 @@ type TencentCloudObjectStorageService struct { client *cos.Client } +type COSOption cloud.CommonCloudOption + +func (option COSOption) Check() error { + if option.Region == "" { + return cloud.ErrEmptyRegion + } + if option.SecretID == "" { + return cloud.ErrEmptySecretID + } + if option.SecretKey == "" { + return cloud.ErrEmptySecretKey + } + return nil +} + +func getTencentCloudObjectService(bucketName string, option COSOption) (ObjectStorageService, error) { + if bucketName == "" { + return nil, ErrBucketNameEmpty + } + if err := option.Check(); err != nil { + return nil, err + } + bucketURL, err := getTencentCloudBucketURL(bucketName, option.Region) + if err != nil { + return nil, err + } + serviceURL, err := getTencentCloudServiceURL(option.Region) + if err != nil { + return nil, err + } + baseURL := &cos.BaseURL{BucketURL: bucketURL, ServiceURL: serviceURL} + client := cos.NewClient(baseURL, &http.Client{ + Transport: &cos.AuthorizationTransport{ + SecretID: option.SecretID, + SecretKey: option.SecretKey, + }, + }) + return &TencentCloudObjectStorageService{client: client}, nil +} + +// GetTencentCloudObjectService is deprecated, use getTencentCloudObjectService instead. func GetTencentCloudObjectService(bucketName string, option cloud.Option) (ObjectStorageService, error) { if bucketName == "" { return nil, ErrBucketNameEmpty diff --git a/cloud/queue/alicloud_mns_queue.go b/cloud/queue/alicloud_mns_queue.go index 7d3b855..8a52c99 100644 --- a/cloud/queue/alicloud_mns_queue.go +++ b/cloud/queue/alicloud_mns_queue.go @@ -41,13 +41,14 @@ type AliMNSQueueService struct { } type AliMNSClientOption struct { - EndPoint string `json:"endpoint"` - TimeoutSecond int64 `json:"timeout_second"` - MaxConnsPerHost int `json:"max_conns_per_host"` - QueueQPS int32 `json:"queue_qps"` + EndPoint string `json:"endpoint" yaml:"endpoint"` + TimeoutSecond int64 `json:"timeout_second" yaml:"timeout_second"` + MaxConnsPerHost int `json:"max_conns_per_host" yaml:"max_conns_per_host"` + QueueQPS int32 `json:"queue_qps" yaml:"queue_qps"` + // MessagePriority is used to set message priority when sending messages. // message priority can also be set in ctx parameter when calling Producer's SendMessage method. - MessagePriority int `json:"message_priority"` + MessagePriority int `json:"message_priority" yaml:"message_priority"` // ReceiveMessageLongPollingWaitSeconds is used to set long polling wait seconds. // ReceiveMessages will wait at most `ReceiveMessageLongPollingWaitSeconds` seconds before return @@ -57,27 +58,27 @@ type AliMNSClientOption struct { // 2. value set in option (i.e. here) // 3. queue's long polling period configuration // If want to disable long polling, do not set the above 3 configurations. - ReceiveMessageLongPollingWaitSeconds int `json:"receive_message_long_polling_wait_seconds"` + ReceiveMessageLongPollingWaitSeconds int `json:"receive_message_long_polling_wait_seconds" yaml:"receive_message_long_polling_wait_seconds"` - CredentialType cloud.AliCloudCredentialType `json:"credential_type"` + CredentialType cloud.AliCloudCredentialType `json:"credential_type" yaml:"credential_type"` // required when CredentialType is AliCloudAccessKeyCredentialType, get from env if not provided - AccessKeyId string `json:"access_key_id"` - AccessKeySecret string `json:"access_key_secret"` + AccessKeyId string `json:"access_key_id" yaml:"access_key_id"` + AccessKeySecret string `json:"access_key_secret" yaml:"access_key_secret"` // optional when CredentialType is AliCloudECSRamRoleCredentialType - RoleName string `json:"role_name"` + RoleName string `json:"role_name" yaml:"role_name"` // required when CredentialType is AliCloudOIDCRoleARNCredentialType, get from env if not provided - RoleArn string `json:"role_arn"` - OIDCProviderArn string `json:"oidc_provider_arn"` - OIDCTokenFilePath string `json:"oidc_token_file_path"` + RoleArn string `json:"role_arn" yaml:"role_arn"` + OIDCProviderArn string `json:"oidc_provider_arn" yaml:"oidc_provider_arn"` + OIDCTokenFilePath string `json:"oidc_token_file_path" yaml:"oidc_token_file_path"` // optional when CredentialType is AliCloudOIDCRoleARNCredentialType // RoleSessionName will get from env if not provided - RoleSessionName string `json:"role_session_name"` - Policy string `json:"policy"` - RoleSessionExpiration int `json:"role_session_expiration"` + RoleSessionName string `json:"role_session_name" yaml:"role_session_name"` + Policy string `json:"policy" yaml:"policy"` + RoleSessionExpiration int `json:"role_session_expiration" yaml:"role_session_expiration"` region string } @@ -130,7 +131,7 @@ func (option AliMNSClientOption) check() error { } func (option AliMNSClientOption) GetProvider() cloud.Provider { - return cloud.AliCloudMNSQueueProvider + return cloud.AliCloudProvider } func (option AliMNSClientOption) GetSecretID() string { diff --git a/cloud/queue/aws_queue.go b/cloud/queue/aws_queue.go index 28d71d9..d98973a 100644 --- a/cloud/queue/aws_queue.go +++ b/cloud/queue/aws_queue.go @@ -25,6 +25,35 @@ type AWSQueueService struct { var ErrAWSQueueNameEmpty = errors.New("aws queue name is empty") +func getAWSQueueService(queueName string, option cloud.AWSOption) (QueueService, error) { + if queueName == "" { + return nil, ErrAWSQueueNameEmpty + } + if err := option.Check(); err != nil { + return nil, err + } + + sess, cfg, err := cloud.AwsNewSessionWithOption(option) + if err != nil { + return nil, err + } + var client *sqs.SQS + // Assume the specified role + if cfg != nil { + client = sqs.New(sess, cfg) + } else { + client = sqs.New(sess) + } + + input := &sqs.GetQueueUrlInput{QueueName: aws.String(queueName)} + output, err := client.GetQueueUrl(input) + if err != nil { + return nil, err + } + return &AWSQueueService{client: client, queueURL: aws.StringValue(output.QueueUrl)}, nil +} + +// GetAWSQueueService is deprecated, use getAWSQueueService instead. func GetAWSQueueService(queueName string, option cloud.Option) (QueueService, error) { if queueName == "" { return nil, ErrAWSQueueNameEmpty diff --git a/cloud/queue/base_redis_queue_struct.go b/cloud/queue/base_redis_queue_struct.go index 61721bc..91c94d3 100644 --- a/cloud/queue/base_redis_queue_struct.go +++ b/cloud/queue/base_redis_queue_struct.go @@ -2,8 +2,9 @@ package queue import ( "errors" - "github.com/byte-power/gorich/cloud" "time" + + "github.com/byte-power/gorich/cloud" ) var ( @@ -16,20 +17,20 @@ var ( // StandaloneRedisQueueOption 用于 Redis Standalone 实例 type StandaloneRedisQueueOption struct { - Addr string - Password string - DB *int - MaxRetries *int - PoolSize *int - DialTimeout *time.Duration - ReadTimeout *time.Duration - WriteTimeout *time.Duration - MinIdleConns *int + Addr string `json:"addr" yaml:"addr"` + Password string `json:"password" yaml:"password"` + DB *int `json:"db" yaml:"db"` + MaxRetries *int `json:"max_retries" yaml:"max_retries"` + PoolSize *int `json:"pool_size" yaml:"pool_size"` + DialTimeout *time.Duration `json:"dial_timeout" yaml:"dial_timeout"` + ReadTimeout *time.Duration `json:"read_timeout" yaml:"read_timeout"` + WriteTimeout *time.Duration `json:"write_timeout" yaml:"write_timeout"` + MinIdleConns *int `json:"min_idle_conns" yaml:"min_idle_conns"` // queue - ConsumerGroup string - Idle int - GlobalIdle int + ConsumerGroup string `json:"consumer_group" yaml:"consumer_group"` + Idle int `json:"idle" yaml:"idle"` + GlobalIdle int `json:"global_idle" yaml:"global_idle"` } func (option StandaloneRedisQueueOption) GetProvider() cloud.Provider { @@ -89,23 +90,24 @@ func (option StandaloneRedisQueueOption) check() error { // ClusterRedisQueueOption 用于 Redis Cluster 集群 type ClusterRedisQueueOption struct { - Addrs []string - Password string - DB *int - MaxRetries *int - PoolSize *int - DialTimeout *time.Duration - ReadTimeout *time.Duration - WriteTimeout *time.Duration - MinIdleConns *int - MaxIdleConns *int - ConnMaxIdleTime *time.Duration - ConnMaxLifetime *time.Duration + // redis cluster + Addrs []string `json:"addrs" yaml:"addrs"` + Password string `json:"password" yaml:"password"` + DB *int `json:"db" yaml:"db"` + MaxRetries *int `json:"max_retries" yaml:"max_retries"` + PoolSize *int `json:"pool_size" yaml:"pool_size"` + DialTimeout *time.Duration `json:"dial_timeout" yaml:"dial_timeout"` + ReadTimeout *time.Duration `json:"read_timeout" yaml:"read_timeout"` + WriteTimeout *time.Duration `json:"write_timeout" yaml:"write_timeout"` + MinIdleConns *int `json:"min_idle_conns" yaml:"min_idle_conns"` + MaxIdleConns *int `json:"max_idle_conns" yaml:"max_idle_conns"` + ConnMaxIdleTime *time.Duration `json:"conn_max_idle_time" yaml:"conn_max_idle_time"` + ConnMaxLifetime *time.Duration `json:"conn_max_lifetime" yaml:"conn_max_lifetime"` // queue - ConsumerGroup string - Idle int - GlobalIdle int + ConsumerGroup string `json:"consumer_group" yaml:"consumer_group"` + Idle int `json:"idle" yaml:"idle"` + GlobalIdle int `json:"global_idle" yaml:"global_idle"` } func (option ClusterRedisQueueOption) GetProvider() cloud.Provider { diff --git a/cloud/queue/queue.go b/cloud/queue/queue.go index 8af3496..e772302 100644 --- a/cloud/queue/queue.go +++ b/cloud/queue/queue.go @@ -2,6 +2,7 @@ package queue import ( "context" + "errors" "github.com/byte-power/gorich/cloud" ) @@ -27,6 +28,7 @@ type Message interface { Body() string } +// GetQueueService is deprecated, use GetQueueServiceWithOption instead. func GetQueueService(queueOrTopicSubName string, option cloud.Option) (QueueService, error) { if option.GetProvider() == cloud.TencentCloudProvider { return GetTencentCloudQueueService(queueOrTopicSubName, option) @@ -40,8 +42,53 @@ func GetQueueService(queueOrTopicSubName string, option cloud.Option) (QueueServ return getStandaloneRedisQueueServiceForV7(queueOrTopicSubName, option) } else if option.GetProvider() == cloud.ClusterRedisProviderV7 { return getClusterRedisQueueServiceV7(queueOrTopicSubName, option) - } else if option.GetProvider() == cloud.AliCloudMNSQueueProvider { + } else if option.GetProvider() == cloud.AliCloudProvider { return getAliMNSQueueService(queueOrTopicSubName, option) } return nil, cloud.ErrUnsupportedCloudProvider } + +var ErrQueueNameRequired = errors.New("queue_name is required") + +type QueueOption struct { + Provider cloud.Provider `json:"provider" yaml:"provider"` + QueueName string `json:"queue_name" yaml:"queue_name"` + SQS cloud.AWSOption `json:"sqs" yaml:"sqs"` + MNS AliMNSClientOption `json:"mns" yaml:"mns"` + StandaloneRedis StandaloneRedisQueueOption `json:"standalone_redis" yaml:"standalone_redis"` + ClusterRedis ClusterRedisQueueOption `json:"cluster_redis" yaml:"cluster_redis"` + StandaloneRedisV7 ClusterRedisQueueOptionV7 `json:"standalone_redis_v7" yaml:"standalone_redis_v7"` + ClusterRedisV7 ClusterRedisQueueOptionV7 `json:"cluster_redis_v7" yaml:"cluster_redis_v7"` + Pulsar TencentCloudQueueOption `json:"pulsar" yaml:"pulsar"` +} + +func (option QueueOption) Check() error { + if option.QueueName == "" { + return ErrQueueNameRequired + } + return nil +} + +func GetQueueServiceWithOption(option QueueOption) (QueueService, error) { + if err := option.Check(); err != nil { + return nil, err + } + switch option.Provider { + case cloud.AWSProvider: + return getAWSQueueService(option.QueueName, option.SQS) + case cloud.AliCloudProvider: + return getAliMNSQueueService(option.QueueName, option.MNS) + case cloud.StandaloneRedisProvider: + return GetStandaloneRedisQueueService(option.QueueName, option.StandaloneRedis) + case cloud.ClusterRedisProvider: + return GetClusterRedisQueueService(option.QueueName, option.ClusterRedis) + case cloud.StandaloneRedisProviderV7: + return getStandaloneRedisQueueServiceForV7(option.QueueName, option.StandaloneRedisV7) + case cloud.ClusterRedisProviderV7: + return getClusterRedisQueueServiceV7(option.QueueName, option.ClusterRedisV7) + case cloud.TencentCloudProvider: + return GetTencentCloudQueueService(option.QueueName, option.Pulsar) + default: + return nil, cloud.ErrUnsupportedCloudProvider + } +} diff --git a/cloud/queue/tencent_queue.go b/cloud/queue/tencent_queue.go index 16c802c..28c3bcf 100644 --- a/cloud/queue/tencent_queue.go +++ b/cloud/queue/tencent_queue.go @@ -19,8 +19,8 @@ var ( ) type TencentCloudQueueOption struct { - Token string - URL string + Token string `json:"token" yaml:"token"` + URL string `json:"url" yaml:"url"` } func (option TencentCloudQueueOption) GetProvider() cloud.Provider {