Skip to content

Commit

Permalink
Add aliyun MNS queue support. (#19)
Browse files Browse the repository at this point in the history
* Add aliyun MNS queue support.

* Update mns queue: support credential refresh.

* Add region in option.

* Add examples for MNS queue.

* Fix: do not return err when receive messages in empty queue.

* Update comment.
  • Loading branch information
zhoufeng1989 authored Dec 11, 2024
1 parent 8a915d9 commit 5267762
Show file tree
Hide file tree
Showing 6 changed files with 615 additions and 12 deletions.
18 changes: 18 additions & 0 deletions cloud/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,24 @@ const (
StandaloneRedisProviderV7 Provider = "standalone_redis_v7"
ClusterRedisProviderV7 Provider = "cluster_redis_v7"
AliCloudStorageProvider Provider = "alicloud_storage"
AliCloudMNSQueueProvider Provider = "alicloud_mns_queue"
)

type AliCloudCredentialType string

const (
AliCloudAccessKeyCredentialType AliCloudCredentialType = "access_key"
AliCloudECSRamRoleCredentialType AliCloudCredentialType = "ecs_ram_role"
AliCloudOIDCRoleARNCredentialType AliCloudCredentialType = "oidc_role_arn"
)

const (
AliCloudEnvAccessKeyID = "ALIBABA_CLOUD_ACCESS_KEY_ID"
AliCloudEnvAccessKeySecret = "ALIBABA_CLOUD_ACCESS_KEY_SECRET"
AliCloudEnvRoleArn = "ALIBABA_CLOUD_ROLE_ARN"
AliCloudEnvOIDCProviderArn = "ALIBABA_CLOUD_OIDC_PROVIDER_ARN"
AliCloudEnvOIDCTokenFile = "ALIBABA_CLOUD_OIDC_TOKEN_FILE"
AliCloudEnvRoleSessionName = "ALIBABA_CLOUD_ROLE_SESSION_NAME"
)

var (
Expand Down
77 changes: 77 additions & 0 deletions cloud/examples/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,24 @@ func main() {
},
}
queue_examples("redis_cluster_queue_v7", clusterRedisQueueOptionV7)

// alicloud MNS queue: access_key CredentialType
mnsQueueOption := queue.AliMNSClientOption{
EndPoint: "http://account-id.mns.region.aliyuncs.com",
CredentialType: cloud.AliCloudAccessKeyCredentialType,
AccessKeyId: "alicloud_access_key_id",
AccessKeySecret: "alicloud_access_key_secret",
MessagePriority: 10,
}
queue_examples("mns_queue_name", mnsQueueOption)

// alicloud MNS queue: ecs_ram_role credentialType
mnsQueueOption = queue.AliMNSClientOption{
EndPoint: "http://account-id.mns.region.aliyuncs.com",
CredentialType: cloud.AliCloudECSRamRoleCredentialType,
}
queue_examples("mns_queue_name", mnsQueueOption)
alicloud_mns_queue_examples("mns_queue_name", mnsQueueOption)
}

func queue_examples(queueOrTopicName string, option cloud.Option) {
Expand Down Expand Up @@ -125,3 +143,62 @@ func queue_examples(queueOrTopicName string, option cloud.Option) {
fmt.Printf("ack message %s\n", message.Body())
}
}

// The following examples show mns queue specific examples: How to set message priority; how to set long polling period seconds.
func alicloud_mns_queue_examples(queueOrTopicName string, option cloud.Option) {
service, err := queue.GetQueueService(queueOrTopicName, option)
if err != nil {
fmt.Printf("get queue service error %s %+v %s\n", queueOrTopicName, option, err)
return
}
defer service.Close()
fmt.Printf("get service %+v\n", service)

producer, err := service.CreateProducer()
if err != nil {
fmt.Printf("create producer error %s\n", err)
return
}
defer producer.Close()

consumer, err := service.CreateConsumer()
if err != nil {
fmt.Printf("create consumer error %s\n", err)
return
}
defer consumer.Close()

ts := int(time.Now().Unix())
var messages []string
for i := 0; i < 3; i++ {
messages = append(messages, fmt.Sprintf("message %d", ts+i))
}
for _, message := range messages {
// rewrite message priority
ctx := context.WithValue(context.TODO(), queue.ContextKeyAliMNSMessagePriority, 5)
err = producer.SendMessage(ctx, message)
if err != nil {
fmt.Printf("producer send message error %s", err)
return
}
fmt.Printf("producer send message %s\n", message)
}
// rewrite long polling period
ctx := context.WithValue(context.TODO(), queue.ContextKeyAliMNSLongPollingWaitSeconds, 10)
receivedMsgs, err := consumer.ReceiveMessages(ctx, 10)
if err != nil {
fmt.Printf("receive messages error %s", err)
return
}
for _, message := range receivedMsgs {
fmt.Printf("received message %s\n", message.Body())
}
for _, message := range receivedMsgs {
err := consumer.AckMessage(context.TODO(), message)
if err != nil {
fmt.Printf("ack message error %s %s\n", message.Body(), err)
return
}
fmt.Printf("ack message %s\n", message.Body())
}
}
Loading

0 comments on commit 5267762

Please sign in to comment.