Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable servicebus batch trigger (cardinality=many) #73

Merged
merged 12 commits into from
Oct 13, 2020

Conversation

Hazhzeng
Copy link
Contributor

@Hazhzeng Hazhzeng commented Oct 12, 2020

Fixes

Fixes: #66

Description

  1. Enable cardinality=many in ServiceBus rich-binding, the customer can now access the batch of the service bus messages with List[ServiceBusMessage].
  2. Completing the service bus property based on https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-messages-payloads. Adding dead_letter_source, lock_token, scheduled_enqueue_time_utc, and sequence_number.
  3. Deprecating expiration_time, should use expires_at_utc instead based on the servicebus payloads doc.
  4. Deprecating scheduled_enqueue_time, should use scheduled_enqueue_time_utc instead based on the servicebus payloads doc.
  5. Implemented a timedelta parser, _parse_timedelta() for parsing time_to_live field.
  6. Refactor CollectionString, CollectionBytes, and CollectionSint64 into testutils, this helps us writing test cases which constructs meta.Datum.
  7. Update documentations.

Todo

  1. The force_persistence property is not exposed to the customer yet. I haven't confirmed how gRPC transfer boolean value.

Usage

functions.json

{
  "scriptFile": "__init__.py",
  "bindings": [
    {
      "name": "msg",
      "type": "serviceBusTrigger",
      "direction": "in",
      "cardinality": "many",
      "queueName": "queue-many",
      "dataType": "binary",
      "connection": "SERVICE_BUS_CONNECTION"
    }
  ]
}

__init__.py

import logging

import azure.functions as func
from typing import List


def main(msg: List[func.ServiceBusMessage]):
    message_length = len(msg)
    if message_length > 1:
        logging.warn('Handling multiple requests')

    for m in msg:
        logging.info('.get_body().decode: %s', m.get_body().decode('utf-8'))
        logging.info('.content_type: %s', m.content_type)
        logging.info('.correlation_id: %s', m.correlation_id)
        logging.info('.dead_letter_source: %s', m.dead_letter_source)
        logging.info('.delivery_count: %s', m.delivery_count)
        logging.info('.enqueued_sequence_number: %s', m.enqueued_sequence_number)
        logging.info('.enqueued_time_utc: %s', m.enqueued_time_utc)
        logging.info('.expires_at_utc: %s', m.expires_at_utc)
        logging.info('.expiration_time: %s', m.expiration_time)
        logging.info('.label: %s', m.label)
        logging.info('.locked_until_utc: %s', m.locked_until_utc)
        logging.info('.lock_token: %s', m.lock_token)
        logging.info('.message_id: %s', m.message_id)
        logging.info('.partition_key: %s', m.partition_key)
        logging.info('.reply_to: %s', m.reply_to)
        logging.info('.reply_to_session_id: %s', m.reply_to_session_id)
        logging.info('.scheduled_enqueue_time: %s', m.scheduled_enqueue_time)
        logging.info('.scheduled_enqueue_time_utc: %s', m.scheduled_enqueue_time_utc)
        logging.info('.sequence_number: %s', m.sequence_number)
        logging.info('.time_to_live: %s', m.time_to_live)
        logging.info('.to: %s', m.to)
        logging.info('.via_partition_key: %s', m.via_partition_key)
        logging.info('.user_properties: %s', m.user_properties)
        logging.info('.metadata: %s', m.metadata)

Copy link
Member

@vrdmr vrdmr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the changes. LGTM. :shipit:

azure/functions/kafka.py Show resolved Hide resolved
cls, datetime_str: Optional[str]) -> Optional[datetime.datetime]:

if datetime_str is None or datetime_str == '':
return None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can simplify this by just doing

if not datetime_str

Empty string

>>> x = ''
>>> if x:
...   print ("Yes")
... else:
...   print("no")
...
no

A None object

>>> z = None
>>> if z:
...   print("Yes")
... else:
...   print("No")
...
No

String with a value

>>> y = 'Yo'
>>> if y:
...   print ("Yes")
... else:
...   print("no")
...
Yes

azure/functions/meta.py Outdated Show resolved Hide resolved
azure/functions/_servicebus.py Outdated Show resolved Hide resolved
def reply_to_session_id(self) -> typing.Optional[str]:
"""A session identifier augmenting the reply_to address."""
def scheduled_enqueue_time(self) -> Optional[datetime.datetime]:
"""(Deprecated, use scheduled_enqueue_time_utc instead)"""
pass
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Think of a way to inform users through the logs (mainly for the in the runtime).

@Hazhzeng Hazhzeng merged commit 6b962c4 into dev Oct 13, 2020
@Hazhzeng Hazhzeng deleted the hazeng/servicebus-cardinality-many branch October 13, 2020 16:50
@bgawale
Copy link

bgawale commented Jul 27, 2022

Has anyone got this working? I am trying to use it with Service bus triggered azure function (python) and even after adding changes suggested above - the function is not honoring batching and is getting executed every time new message appears on the service bus queue

here is my sample function app code

import logging
import azure.functions as func
import json
from typing import List

def main(msg: List[func.ServiceBusMessage]):
    logging.info('Python ServiceBus queue trigger processed message.')

    for m in msg:
        result = json.dumps({
            'message_id': m.message_id,
            'body': m.get_body().decode('utf-8')
        }, default=str)
        logging.info(result)

and here is function.json

{
  "bindings": [
    {
      "name": "msg",
      "type": "serviceBusTrigger",
      "direction": "in",
      "queueName": "iot-queue",
      "connection": "iotpocsbns_RootManageSharedAccessKey_SERVICEBUS",
      "cardinality": "many"
    }
  ]
}

apart from that, here is host.json

{
  "version": "2.0",
  "extensionBundle": {
    "id": "Microsoft.Azure.Functions.ExtensionBundle",
    "version": "[3.3.0, 4.0.0)"
  },
  "extensions": {
    "serviceBus": {
      "prefetchCount": 20,
      "messageHandlerOptions": {
        "autoComplete": true,
        "maxConcurrentCalls": 1,
        "maxAutoRenewDuration": "00:05:00"
      },
      "sessionHandlerOptions": {
        "autoComplete": false,
        "messageWaitTimeout": "00:00:30",
        "maxAutoRenewDuration": "00:55:00",
        "maxConcurrentSessions": 1
      },
      "batchOptions": {
        "maxMessageCount": 100,
        "operationTimeout": "00:01:00",
        "autoComplete": true
      }
    }
  }
}

is this something yet to hit GA?

@ryanotella
Copy link

ryanotella commented Sep 5, 2024

cardinality="many" is currently working for Python in "[4.*, 5.0.0)"

@app.service_bus_queue_trigger(
    arg_name="input", queue_name="my-queue", connection="AzureServiceBus", cardinality="many"
)
def create(input: List[func.ServiceBusMessage]):
  ...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Service Bus Batch Trigger
4 participants