Skip to content

Commit

Permalink
Merge branch 'dev'
Browse files Browse the repository at this point in the history
  • Loading branch information
Hanzhang Zeng (Roger) committed Jul 14, 2020
2 parents ff8ee95 + ab828a2 commit 5b33092
Show file tree
Hide file tree
Showing 24 changed files with 459 additions and 32 deletions.
26 changes: 26 additions & 0 deletions azure_functions_worker/bindings/datumdef.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.

from typing import Any
import json
from .. import protos


Expand All @@ -8,6 +11,29 @@ def __init__(self, value, type):
self.value = value
self.type = type

@property
def python_value(self) -> Any:
if self.value is None or self.type is None:
return None
elif self.type in ('bytes', 'string', 'int', 'double'):
return self.value
elif self.type == 'json':
return json.loads(self.value)
elif self.type == 'collection_string':
return [v for v in self.value.string]
elif self.type == 'collection_bytes':
return [v for v in self.value.bytes]
elif self.type == 'collection_double':
return [v for v in self.value.double]
elif self.type == 'collection_sint64':
return [v for v in self.value.sint64]
else:
return self.value

@property
def python_type(self) -> type:
return type(self.python_value)

def __eq__(self, other):
if not isinstance(other, type(self)):
return False
Expand Down
4 changes: 2 additions & 2 deletions azure_functions_worker/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,9 +170,9 @@ def on_logging(self, record: logging.LogRecord, formatted_msg: str) -> None:
log_level = getattr(protos.RpcLog, 'None')

if is_system_log_category(record.name):
log_category = protos.RpcLog.RpcLogCategory.System
log_category = protos.RpcLog.RpcLogCategory.Value('System')
else: # customers using logging will yield 'root' in record.name
log_category = protos.RpcLog.RpcLogCategory.User
log_category = protos.RpcLog.RpcLogCategory.Value('User')

log = dict(
level=log_level,
Expand Down
4 changes: 3 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ def run(self):

setup(
name='azure-functions-worker',
version='1.1.3',
version='1.1.4',
description='Python Language Worker for Azure Functions Host',
long_description=long_description,
long_description_content_type='text/markdown',
Expand Down Expand Up @@ -286,6 +286,8 @@ def run(self):
extras_require={
'dev': [
'azure-functions==1.3.0',
'azure-eventhub~=5.1.0',
'python-dateutil~=2.8.1',
'flake8~=3.7.9',
'mypy',
'pytest',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import json


# This is an actual EventHub trigger which handles Eventhub events in batches.
# It serializes multiple event data into a json and store it into a blob.
def main(events):
table_entries = []
for event in events:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import azure.functions as func


# An HttpTrigger to generating EventHub event from EventHub Output Binding
def main(req: func.HttpRequest) -> str:
events = req.get_body().decode('utf-8')
return events
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,6 @@
import azure.functions as func


# Retrieve the event data from storage blob and return it as Http response
def main(req: func.HttpRequest, testEntities):
return func.HttpResponse(status_code=200, body=testEntities)
Original file line number Diff line number Diff line change
@@ -1,27 +1,27 @@
{
"scriptFile": "__init__.py",
"bindings": [
{
"type": "httpTrigger",
"direction": "in",
"authLevel": "anonymous",
"methods": [
"get"
],
"name": "req"
},
{
"direction": "in",
"type": "table",
"name": "testEntities",
"partitionKey": "WillBePopulated",
"tableName": "EventHubBatchTest",
"connection": "AzureWebJobsStorage"
},
{
"type": "http",
"direction": "out",
"name": "$return"
}
]
{
"scriptFile": "__init__.py",
"bindings": [
{
"type": "httpTrigger",
"direction": "in",
"authLevel": "anonymous",
"methods": [
"get"
],
"name": "req"
},
{
"direction": "in",
"type": "table",
"name": "testEntities",
"partitionKey": "WillBePopulated",
"tableName": "EventHubBatchTest",
"connection": "AzureWebJobsStorage"
},
{
"type": "http",
"direction": "out",
"name": "$return"
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
import azure.functions as func


# Retrieve the event data from storage blob and return it as Http response
def main(req: func.HttpRequest, file: func.InputStream) -> str:
return func.HttpResponse(body=file.read().decode('utf-8'),
status_code=200,
mimetype='application/json')
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
{
"scriptFile": "__init__.py",
"bindings": [
{
"type": "httpTrigger",
"direction": "in",
"name": "req"
},
{
"type": "blob",
"direction": "in",
"name": "file",
"connection": "AzureWebJobsStorage",
"path": "python-worker-tests/test-metadata-batch-triggered.txt"
},
{
"type": "http",
"direction": "out",
"name": "$return"
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.

import typing
import json
import azure.functions as func


# This is an actual EventHub trigger which handles Eventhub events in batches.
# It serializes multiple event data into a json and store it into a blob.
def main(events: typing.List[func.EventHubEvent]) -> bytes:
event_list = []
for event in events:
event_dict: typing.Mapping[str, typing.Any] = {
'body': event.get_body().decode('utf-8'),
'enqueued_time': event.enqueued_time.isoformat(),
'partition_key': event.partition_key,
'sequence_number': event.sequence_number,
'offset': event.offset,
'metadata': event.metadata
}
event_list.append(event_dict)

return json.dumps(event_list)
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
{
"scriptFile": "__init__.py",
"bindings": [
{
"type": "eventHubTrigger",
"name": "events",
"direction": "in",
"cardinality": "many",
"dataType": "binary",
"eventHubName": "python-worker-ci-eventhub-batch-metadata",
"connection": "AzureWebJobsEventHubConnectionString"
},
{
"type": "blob",
"direction": "out",
"name": "$return",
"connection": "AzureWebJobsStorage",
"path": "python-worker-tests/test-metadata-batch-triggered.txt"
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.

import os
import json

import azure.functions as func
from azure.eventhub import EventHubProducerClient, EventData


# An HttpTrigger to generating EventHub event from azure-eventhub SDK.
# Events generated from azure-eventhub contain the full metadata.
def main(req: func.HttpRequest):
# Get event count from http request query parameter
count = int(req.params.get('count', '1'))

# Parse event metadata from http request
json_string = req.get_body().decode('utf-8')
event_dict = json.loads(json_string)

# Create an EventHub Client and event batch
client = EventHubProducerClient.from_connection_string(
os.getenv('AzureWebJobsEventHubConnectionString'),
eventhub_name='python-worker-ci-eventhub-batch-metadata')

# Generate new event based on http request with full metadata
event_data_batch = client.create_batch()
random_number = int(event_dict.get('body', '0'))
for i in range(count):
event_data_batch.add(EventData(str(random_number + i)))

# Send out event into event hub
with client:
client.send_batch(event_data_batch)

return f'OK'
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"scriptFile": "__init__.py",
"bindings": [
{
"type": "httpTrigger",
"direction": "in",
"name": "req"
},
{
"direction": "out",
"name": "$return",
"type": "http"
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import azure.functions as func


# An HttpTrigger to generating EventHub event from EventHub Output Binding
def main(req: func.HttpRequest, event: func.Out[str]):
event.set(req.get_body().decode('utf-8'))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,7 @@
import azure.functions as func


# This is an actual EventHub trigger which will convert the event data
# into a storage blob.
def main(event: func.EventHubEvent) -> bytes:
return event.get_body()
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,6 @@
import azure.functions as func


# Retrieve the event data from storage blob and return it as Http response
def main(req: func.HttpRequest, file: func.InputStream) -> str:
return file.read().decode('utf-8')
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
import azure.functions as func


# Retrieve the event data from storage blob and return it as Http response
async def main(req: func.HttpRequest, file: func.InputStream) -> str:
return func.HttpResponse(body=file.read().decode('utf-8'),
status_code=200,
mimetype='application/json')
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
{
"scriptFile": "__init__.py",
"bindings": [
{
"type": "httpTrigger",
"direction": "in",
"name": "req"
},
{
"type": "blob",
"direction": "in",
"name": "file",
"connection": "AzureWebJobsStorage",
"path": "python-worker-tests/test-metadata-triggered.txt"
},
{
"type": "http",
"direction": "out",
"name": "$return"
}
]
}
35 changes: 35 additions & 0 deletions tests/endtoend/eventhub_functions/metadata_output/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.

import os
import json

import azure.functions as func
from azure.eventhub import EventData
from azure.eventhub.aio import EventHubProducerClient


# An HttpTrigger to generating EventHub event from azure-eventhub SDK.
# Events generated from azure-eventhub contain the full metadata.
async def main(req: func.HttpRequest):

# Parse event metadata from http request
json_string = req.get_body().decode('utf-8')
event_dict = json.loads(json_string)

# Create an EventHub Client and event batch
client = EventHubProducerClient.from_connection_string(
os.getenv('AzureWebJobsEventHubConnectionString'),
eventhub_name='python-worker-ci-eventhub-one-metadata')

# Generate new event based on http request with full metadata
event_data_batch = await client.create_batch()
event_data_batch.add(EventData(event_dict.get('body')))

# Send out event into event hub
try:
await client.send_batch(event_data_batch)
finally:
await client.close()

return f'OK'
16 changes: 16 additions & 0 deletions tests/endtoend/eventhub_functions/metadata_output/function.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
{
"scriptFile": "__init__.py",

"bindings": [
{
"type": "httpTrigger",
"direction": "in",
"name": "req"
},
{
"direction": "out",
"name": "$return",
"type": "http"
}
]
}
Loading

0 comments on commit 5b33092

Please sign in to comment.