Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Getting batched events #205

Merged
merged 24 commits into from
Mar 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions extensions/Worker.Extensions.Abstractions/DefaultValueAttribute.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;

namespace Microsoft.Azure.Functions.Worker.Extensions.Abstractions
{
[AttributeUsage(AttributeTargets.Property)]
public class DefaultValueAttribute : Attribute
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is temporary to set default while we do not load attributes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:No need to change for right now, but I wonder if we should move this to a different namespace so it's not discoverable and polluting?

{
/// <summary>
/// Define a default value for a property on a FunctionAttribute type.
/// </summary>
/// <param name="stringDefault"></param>
public DefaultValueAttribute(string stringDefault)
{
DefaultStringValue = stringDefault;
}

public DefaultValueAttribute(bool boolDefault)
{
DefaultBoolValue = boolDefault;
}

public string? DefaultStringValue { get; }

public bool? DefaultBoolValue { get; }
}
}
26 changes: 26 additions & 0 deletions extensions/Worker.Extensions.Abstractions/ISupportCardinality.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.Collections.Generic;

namespace Microsoft.Azure.Functions.Worker.Extensions.Abstractions
{
public interface ISupportCardinality
{
/// <summary>
/// Configures the "cardinality" property
/// This property indicates that the requested type is an array. Note that for
/// inputs and outputs, the effect of cardinality may be different (ex: electing to
/// receive a collection of events vs. indicating that my return type will be
/// a collection).
/// </summary>
public Cardinality Cardinality { get; set; }
}

public enum Cardinality
{
Many,
One
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@

namespace Microsoft.Azure.Functions.Worker
{
public sealed class EventHubTriggerAttribute : TriggerBindingAttribute
public sealed class EventHubTriggerAttribute : TriggerBindingAttribute, ISupportCardinality
{
// Batch by default
private bool _isBatched = true;

/// <summary>
/// Create an instance of this attribute.
/// </summary>
Expand All @@ -30,5 +33,41 @@ public EventHubTriggerAttribute(string eventHubName)
/// Gets or sets the optional app setting name that contains the Event Hub connection string. If missing, tries to use a registered event hub receiver.
/// </summary>
public string? Connection { get; set; }

/// <summary>
/// Gets or sets the configuration to enable batch processing of events. Default value is "true".
/// </summary>
[DefaultValue(true)]
public bool IsBatched
{
get => _isBatched;
set => _isBatched = value;
}

Cardinality ISupportCardinality.Cardinality
{
get
{
if (_isBatched)
{
return Cardinality.Many;
}
else
{
return Cardinality.One;
}
}
set
{
if (value.Equals(Cardinality.Many))
{
_isBatched = true;
}
else
{
_isBatched = false;
}
}
}
}
}
38 changes: 37 additions & 1 deletion extensions/Worker.Extensions.Kafka/KafkaTriggerAttribute.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@

namespace Microsoft.Azure.Functions.Worker
{
public sealed class KafkaTriggerAttribute : TriggerBindingAttribute
public sealed class KafkaTriggerAttribute : TriggerBindingAttribute, ISupportCardinality
{
private bool _isBatched = false;

public KafkaTriggerAttribute(string brokerList, string topic)
{
BrokerList = brokerList;
Expand Down Expand Up @@ -95,5 +97,39 @@ public KafkaTriggerAttribute(string brokerList, string topic)
/// </summary>
public string? SslKeyPassword { get; set; }

/// <summary>
/// Gets or sets the configuration to enable batch processing of events. Default value is "false".
/// </summary>
public bool IsBatched
{
get => _isBatched;
set => _isBatched = value;
}

Cardinality ISupportCardinality.Cardinality
{
get
{
if (_isBatched)
{
return Cardinality.Many;
}
else
{
return Cardinality.One;
}
}
set
{
if (value.Equals(Cardinality.Many))
{
_isBatched = true;
}
else
{
_isBatched = false;
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@

namespace Microsoft.Azure.Functions.Worker
{
public sealed class ServiceBusTriggerAttribute : TriggerBindingAttribute
public sealed class ServiceBusTriggerAttribute : TriggerBindingAttribute, ISupportCardinality
{
private bool _isBatched = false;

private readonly string? _queueName;
private readonly string? _topicName;
private readonly string? _subscriptionName;
Expand Down Expand Up @@ -67,5 +69,40 @@ public string? SubscriptionName
/// Gets or sets a value indicating whether the sessions are enabled.
/// </summary>
public bool IsSessionsEnabled { get; set; }

/// <summary>
/// Gets or sets the configuration to enable batch processing of events. Default value is "false".
/// </summary>
public bool IsBatched
{
get => _isBatched;
set => _isBatched = value;
}

Cardinality ISupportCardinality.Cardinality
{
get
{
if (_isBatched)
{
return Cardinality.Many;
}
else
{
return Cardinality.One;
}
}
set
{
if (value.Equals(Cardinality.Many))
{
_isBatched = true;
}
else
{
_isBatched = false;
}
}
}
}
}
10 changes: 10 additions & 0 deletions release_notes.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,13 @@
- Updates to HttpResponseData
- API updates
- Support for response Cookies
- Add support for batched trigger events (#205)
- The following services allow trigger events to be batched:
- Event Hubs (batched by default)
- Service Bus (set `IsBatched = true` in trigger attribute)
- Kafka (set `IsBatched = true` in trigger attribute)
- To read batched event data in function code:
- Use array (`[]`), `IList`, `ICollection`, or `IEnumerable` if event data is `string`, `byte[]`, or `ReadOnlyMemory<byte>` (example: `string[]`).
- Note: `ReadOnlyMemory<byte>` is the more performant option to read binary data, especially for large payloads.
- Use a class that implements `IEnumerable` or `IEnumerable<T>` for serializable event data (example: `List<MyData>`).
- Fail function execution if the requested parameter cannot be converted to the specified type (#216)
37 changes: 37 additions & 0 deletions sdk/Sdk/Constants.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

namespace Microsoft.Azure.Functions.Worker.Sdk
{
internal static class Constants
{
// Our types
internal const string BindingType = "Microsoft.Azure.Functions.Worker.Extensions.Abstractions.BindingAttribute";
internal const string OutputBindingType = "Microsoft.Azure.Functions.Worker.Extensions.Abstractions.OutputBindingAttribute";
internal const string FunctionNameType = "Microsoft.Azure.Functions.Worker.FunctionAttribute";
internal const string ExtensionsInformationType = "Microsoft.Azure.Functions.Worker.Extensions.Abstractions.ExtensionInformationAttribute";
internal const string HttpResponseType = "Microsoft.Azure.Functions.Worker.Http.HttpResponseData";
internal const string DefaultValueAttributeType = "Microsoft.Azure.Functions.Worker.Extensions.Abstractions.DefaultValueAttribute";

// System types
internal const string IEnumerableType = "System.Collections.IEnumerable";
internal const string IEnumerableGenericType = "System.Collections.Generic.IEnumerable`1";
internal const string IEnumerableOfStringType = "System.Collections.Generic.IEnumerable`1<System.String>";
internal const string IEnumerableOfBinaryType = "System.Collections.Generic.IEnumerable`1<System.Byte[]>";
internal const string IEnumerableOfT = "System.Collections.Generic.IEnumerable`1<T>";
internal const string IEnumerableOfKeyValuePair = "System.Collections.Generic.IEnumerable`1<System.Collections.Generic.KeyValuePair`2<TKey,TValue>>";
internal const string GenericIEnumerableArgumentName = "T";
internal const string StringType = "System.String";
internal const string ByteArrayType = "System.Byte[]";
internal const string LookupGenericType = "System.Linq.Lookup`2";
internal const string DictionaryGenericType = "System.Collections.Generic.Dictionary`2";
internal const string TaskGenericType = "System.Threading.Tasks.Task`1";
internal const string TaskType = "System.Threading.Tasks.Task";
internal const string VoidType = "System.Void";
internal const string ReadOnlyMemoryOfBytes = "System.ReadOnlyMemory`1<System.Byte>";

internal const string ReturnBindingName = "$return";
internal const string HttpTriggerBindingType = "HttpTrigger";
internal const string IsBatchedKey = "IsBatched";
}
}
39 changes: 31 additions & 8 deletions sdk/Sdk/CustomAttributeExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
Expand All @@ -14,16 +14,39 @@ public static class CustomAttributeExtensions
public static IDictionary<string, object> GetAllDefinedProperties(this CustomAttribute attribute)
{
var properties = new Dictionary<string, object>();

// To avoid needing to instantiate any types, assume that the constructor
// argument names are equal to property names.
LoadDefaultProperties(properties, attribute);
LoadConstructorArguments(properties, attribute);
LoadDefinedProperties(properties, attribute);

return properties;
}

private static void LoadConstructorArguments(IDictionary<string, object> properties, CustomAttribute attribute)
private static IEnumerable<(string, CustomAttributeArgument?)> GetDefaultValues(this CustomAttribute attribute)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is all should be removed later... after above todo is done: avoid needing to instantiate any types, assume that the constructor argument names are equal to property names.

{
return attribute.AttributeType.Resolve().Properties
.Select(p => (p.Name, p.CustomAttributes
.Where(attribute => string.Equals(attribute.AttributeType.FullName, Constants.DefaultValueAttributeType, StringComparison.Ordinal))
.SingleOrDefault()
?.ConstructorArguments.SingleOrDefault()))
.Where(t => t.Item2 is not null);
}

private static void LoadDefaultProperties(IDictionary<string, object> properties, CustomAttribute attribute)
{
var propertyDefaults = attribute.GetDefaultValues();

foreach (var propertyDefault in propertyDefaults)
{
if (propertyDefault.Item2 is not null)
{
properties[propertyDefault.Item1] = propertyDefault.Item2.Value.Value;
}
}
}

private static void LoadConstructorArguments(IDictionary<string, object> properties, CustomAttribute attribute)
{
var constructorParams = attribute.Constructor.Resolve().Parameters;
for (int i = 0; i < attribute.ConstructorArguments.Count; i++)
Expand All @@ -34,13 +57,12 @@ private static void LoadConstructorArguments(IDictionary<string, object> propert
string? paramName = param?.Name;
object? paramValue = arg.Value;

if (paramName == null || paramValue == null)
if (paramName is null || paramValue is null)
{
continue;
}

paramValue = GetEnrichedValue(param!.ParameterType, paramValue);

paramValue = GetEnrichedValue(param!.ParameterType, paramValue);
properties[paramName] = paramValue!;
}
}
Expand All @@ -50,15 +72,16 @@ private static void LoadDefinedProperties(IDictionary<string, object> properties
foreach (CustomAttributeNamedArgument property in attribute.Properties)
{
object? propVal = property.Argument.Value;
string? propName = property.Name;

if (propVal == null)
if (propVal is null || propName is null)
{
continue;
}

propVal = GetEnrichedValue(property.Argument.Type, propVal);

properties[property.Name] = propVal!;
properties[propName] = propVal!;
}
}

Expand Down
13 changes: 13 additions & 0 deletions sdk/Sdk/DataTypeEnum.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

namespace Microsoft.Azure.Functions.Worker.Sdk
{
internal enum DataType
{
Undefined,
Binary,
String,
Stream
}
}
Loading