Skip to content

Commit

Permalink
Added new StreamsEventResponse class for reporting batch item failure…
Browse files Browse the repository at this point in the history
…s when processing streams for KinesisEvent.
  • Loading branch information
ashishdhingra committed Sep 1, 2023
1 parent 0e62fe1 commit 5737503
Show file tree
Hide file tree
Showing 6 changed files with 113 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<Import Project="..\..\..\buildtools\common.props" />

<PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework>
<TargetFrameworks>netstandard2.0;netcoreapp3.1</TargetFrameworks>
<Description>Amazon Lambda .NET Core support - KinesisEvents package.</Description>
<AssemblyTitle>Amazon.Lambda.KinesisEvents</AssemblyTitle>
<VersionPrefix>2.1.0</VersionPrefix>
Expand Down
34 changes: 33 additions & 1 deletion Libraries/src/Amazon.Lambda.KinesisEvents/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,39 @@ public class Function
}
```

The following is a sample class and Lambda function that receives Amazon Kinesis event when using time windows as an input and uses `KinesisTimeWindowResponse` object to demonstrates how to aggregate and then process the final state. (Note that by default anything written to Console will be logged as CloudWatch Logs events.)
The following is a sample class and Lambda function that receives Amazon Kinesis event record data as an input and uses `StreamsEventResponse` object to return batch item failures, if any. (Note that by default anything written to Console will be logged as CloudWatch Logs events.)

```csharp
public class Function
{
public StreamsEventResponse Handler(KinesisEvent kinesisEvent)
{
var batchItemFailures = new List<StreamsEventResponse.BatchItemFailure>();
string curRecordSequenceNumber = string.Empty;

foreach (var record in kinesisEvent.Records)
{
try
{
//Process your record
var kinesisRecord = record.Kinesis;
curRecordSequenceNumber = kinesisRecord.SequenceNumber;
}
catch (Exception e)
{
/* Since we are working with streams, we can return the failed item immediately.
Lambda will immediately begin to retry processing from this failed item onwards. */
batchItemFailures.Add(new StreamsEventResponse.BatchItemFailure() { ItemIdentifier = curRecordSequenceNumber });
return new StreamsEventResponse() { BatchItemFailures = batchItemFailures };
}
}

return new StreamsEventResponse() { BatchItemFailures = batchItemFailures };
}
}
```

The following is a sample class and Lambda function that receives Amazon Kinesis event when using time windows as an input and uses `KinesisTimeWindowResponse` object to demonstrate how to aggregate and then process the final state. (Note that by default anything written to Console will be logged as CloudWatch Logs events.)

```csharp
public class Function
Expand Down
38 changes: 38 additions & 0 deletions Libraries/src/Amazon.Lambda.KinesisEvents/StreamsEventResponse.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
namespace Amazon.Lambda.KinesisEvents
{
using System.Collections.Generic;
using System.Runtime.Serialization;

/// <summary>
/// Function response type to report batch item failures for KinesisEvent.
/// https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html#services-kinesis-batchfailurereporting
/// </summary>
[DataContract]
public class StreamsEventResponse
{
/// <summary>
/// A list of records which failed processing. Returning the first record which failed would retry all remaining records from the batch.
/// </summary>
[DataMember(Name = "batchItemFailures", EmitDefaultValue = false)]
#if NETCOREAPP3_1
[System.Text.Json.Serialization.JsonPropertyName("batchItemFailures")]
#endif
public IList<BatchItemFailure> BatchItemFailures { get; set; }

/// <summary>
/// The class representing the BatchItemFailure.
/// </summary>
[DataContract]
public class BatchItemFailure
{
/// <summary>
/// Sequence number of the record which failed processing.
/// </summary>
[DataMember(Name = "itemIdentifier", EmitDefaultValue = false)]
#if NETCOREAPP3_1
[System.Text.Json.Serialization.JsonPropertyName("itemIdentifier")]
#endif
public string ItemIdentifier { get; set; }
}
}
}
43 changes: 33 additions & 10 deletions Libraries/test/EventsTests.Shared/EventTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ namespace Amazon.Lambda.Tests
using Amazon.Lambda.ApplicationLoadBalancerEvents;
using Amazon.Lambda.CloudWatchEvents.BatchEvents;
using Amazon.Lambda.CloudWatchEvents.ECSEvents;
using Amazon.Lambda.CloudWatchEvents.S3Events;
using Amazon.Lambda.CloudWatchEvents.ScheduledEvents;
using Amazon.Lambda.CloudWatchEvents.TranscribeEvents;
using Amazon.Lambda.CloudWatchEvents.TranslateEvents;
Expand All @@ -19,14 +20,13 @@ namespace Amazon.Lambda.Tests
using Amazon.Lambda.KinesisEvents;
using Amazon.Lambda.KinesisFirehoseEvents;
using Amazon.Lambda.LexEvents;
using Amazon.Lambda.LexV2Events;
using Amazon.Lambda.MQEvents;
using Amazon.Lambda.S3Events;
using Amazon.Lambda.SimpleEmailEvents;
using Amazon.Lambda.SNSEvents;
using Amazon.Lambda.SQSEvents;
using Amazon.Lambda.LexV2Events;


using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using Newtonsoft.Json.Serialization;
using System;
Expand All @@ -35,10 +35,6 @@ namespace Amazon.Lambda.Tests
using System.Linq;
using System.Text;
using Xunit;
using Newtonsoft.Json;
using Amazon.Lambda.CloudWatchEvents;
using Amazon.Lambda.CloudWatchEvents.S3Events;

using JsonSerializer = Amazon.Lambda.Serialization.Json.JsonSerializer;

public class EventTest
Expand Down Expand Up @@ -336,6 +332,33 @@ private void Handle(KinesisEvent kinesisEvent)
}
}

[Theory]
[InlineData(typeof(JsonSerializer))]
#if NETCOREAPP3_1_OR_GREATER
[InlineData(typeof(Amazon.Lambda.Serialization.SystemTextJson.LambdaJsonSerializer))]
[InlineData(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))]
#endif
public void KinesisBatchItemFailuresTest(Type serializerType)
{
var serializer = Activator.CreateInstance(serializerType) as ILambdaSerializer;
using (var fileStream = LoadJsonTestFile("kinesis-batchitemfailures-response.json"))
{
var kinesisStreamsEventResponse = serializer.Deserialize<KinesisEvents.StreamsEventResponse>(fileStream);

Assert.Equal(1, kinesisStreamsEventResponse.BatchItemFailures.Count);
Assert.Equal("1405400000000002063282832", kinesisStreamsEventResponse.BatchItemFailures[0].ItemIdentifier);

MemoryStream ms = new MemoryStream();
serializer.Serialize(kinesisStreamsEventResponse, ms);
ms.Position = 0;
var json = new StreamReader(ms).ReadToEnd();

var original = JObject.Parse(File.ReadAllText("kinesis-batchitemfailures-response.json"));
var serialized = JObject.Parse(json);
Assert.True(JToken.DeepEquals(serialized, original), "Serialized object is not the same as the original JSON");
}
}

[Theory]
[InlineData(typeof(JsonSerializer))]
#if NETCOREAPP3_1_OR_GREATER
Expand Down Expand Up @@ -458,7 +481,7 @@ public void DynamoDbUpdateTest(Type serializerType)

[Theory]
[InlineData(typeof(JsonSerializer))]
#if NETCOREAPP3_1_OR_GREATER
#if NETCOREAPP3_1_OR_GREATER
[InlineData(typeof(Amazon.Lambda.Serialization.SystemTextJson.LambdaJsonSerializer))]
[InlineData(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))]
#endif
Expand All @@ -467,13 +490,13 @@ public void DynamoDbBatchItemFailuresTest(Type serializerType)
var serializer = Activator.CreateInstance(serializerType) as ILambdaSerializer;
using (var fileStream = LoadJsonTestFile("dynamodb-batchitemfailures-response.json"))
{
var dynamoDbStreamsEventResponse = serializer.Deserialize<StreamsEventResponse>(fileStream);
var dynamoDbStreamsEventResponse = serializer.Deserialize<DynamoDBEvents.StreamsEventResponse>(fileStream);

Assert.Equal(1, dynamoDbStreamsEventResponse.BatchItemFailures.Count);
Assert.Equal("1405400000000002063282832", dynamoDbStreamsEventResponse.BatchItemFailures[0].ItemIdentifier);

MemoryStream ms = new MemoryStream();
serializer.Serialize<StreamsEventResponse>(dynamoDbStreamsEventResponse, ms);
serializer.Serialize(dynamoDbStreamsEventResponse, ms);
ms.Position = 0;
var json = new StreamReader(ms).ReadToEnd();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
<Content Include="$(MSBuildThisFileDirectory)cognito-presignup-event.json" />
<Content Include="$(MSBuildThisFileDirectory)cognito-event.json" />
<Content Include="$(MSBuildThisFileDirectory)config-event.json" />
<Content Include="$(MSBuildThisFileDirectory)kinesis-batchitemfailures-response.json" />
<Content Include="$(MSBuildThisFileDirectory)dynamodb-batchitemfailures-response.json" />
<Content Include="$(MSBuildThisFileDirectory)dynamodb-event.json" />
<Content Include="$(MSBuildThisFileDirectory)ecs-container-state-change-event.json" />
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"batchItemFailures": [
{
"itemIdentifier": "1405400000000002063282832"
}
]
}

0 comments on commit 5737503

Please sign in to comment.