From e4a46ba50c45ff732629a77ca48465a88e3f4c69 Mon Sep 17 00:00:00 2001 From: Ashish Dhingra Date: Tue, 1 Aug 2023 15:46:21 -0700 Subject: [PATCH 1/3] Added support for DynamoDBTimeWindowEvent and KinesisTimeWindowEvent. --- .../Amazon.Lambda.DynamoDBEvents.csproj | 2 +- .../DynamoDBTimeWindowEvent.cs | 58 ++++++ .../DynamoDBTimeWindowResponse.cs | 48 +++++ .../Amazon.Lambda.DynamoDBEvents/README.md | 51 +++++ .../Amazon.Lambda.KinesisEvents.csproj | 2 +- .../KinesisTimeWindowEvent.cs | 58 ++++++ .../KinesisTimeWindowResponse.cs | 48 +++++ .../src/Amazon.Lambda.KinesisEvents/README.md | 51 +++++ .../AbstractLambdaJsonSerializer.cs | 3 +- ...Lambda.Serialization.SystemTextJson.csproj | 2 +- .../Converters/LongToStringJsonConverter.cs | 35 ++++ .../LambdaJsonSerializer.cs | 1 + .../test/EventsTests.Shared/EventTests.cs | 185 ++++++++++++++++++ .../EventsTests.Shared.projitems | 4 + .../dynamodb-timewindow-event.json | 101 ++++++++++ .../dynamodb-timewindow-response.json | 7 + .../kinesis-timewindow-event.json | 32 +++ .../kinesis-timewindow-response.json | 7 + 18 files changed, 691 insertions(+), 4 deletions(-) create mode 100644 Libraries/src/Amazon.Lambda.DynamoDBEvents/DynamoDBTimeWindowEvent.cs create mode 100644 Libraries/src/Amazon.Lambda.DynamoDBEvents/DynamoDBTimeWindowResponse.cs create mode 100644 Libraries/src/Amazon.Lambda.KinesisEvents/KinesisTimeWindowEvent.cs create mode 100644 Libraries/src/Amazon.Lambda.KinesisEvents/KinesisTimeWindowResponse.cs create mode 100644 Libraries/src/Amazon.Lambda.Serialization.SystemTextJson/Converters/LongToStringJsonConverter.cs create mode 100644 Libraries/test/EventsTests.Shared/dynamodb-timewindow-event.json create mode 100644 Libraries/test/EventsTests.Shared/dynamodb-timewindow-response.json create mode 100644 Libraries/test/EventsTests.Shared/kinesis-timewindow-event.json create mode 100644 Libraries/test/EventsTests.Shared/kinesis-timewindow-response.json diff --git a/Libraries/src/Amazon.Lambda.DynamoDBEvents/Amazon.Lambda.DynamoDBEvents.csproj b/Libraries/src/Amazon.Lambda.DynamoDBEvents/Amazon.Lambda.DynamoDBEvents.csproj index 7d50d1fe8..bb33c3a40 100644 --- a/Libraries/src/Amazon.Lambda.DynamoDBEvents/Amazon.Lambda.DynamoDBEvents.csproj +++ b/Libraries/src/Amazon.Lambda.DynamoDBEvents/Amazon.Lambda.DynamoDBEvents.csproj @@ -6,7 +6,7 @@ netstandard2.0;netcoreapp3.1 Amazon Lambda .NET Core support - DynamoDBEvents package. Amazon.Lambda.DynamoDBEvents - 2.1.1 + 2.2.0 Amazon.Lambda.DynamoDBEvents Amazon.Lambda.DynamoDBEvents AWS;Amazon;Lambda diff --git a/Libraries/src/Amazon.Lambda.DynamoDBEvents/DynamoDBTimeWindowEvent.cs b/Libraries/src/Amazon.Lambda.DynamoDBEvents/DynamoDBTimeWindowEvent.cs new file mode 100644 index 000000000..321c44092 --- /dev/null +++ b/Libraries/src/Amazon.Lambda.DynamoDBEvents/DynamoDBTimeWindowEvent.cs @@ -0,0 +1,58 @@ +namespace Amazon.Lambda.DynamoDBEvents +{ + using System; + using System.Collections.Generic; + + /// + /// Represents an Amazon DynamodDB event when using time windows. + /// https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html#services-ddb-windows + /// + public class DynamoDBTimeWindowEvent : DynamoDBEvent + { + /// + /// Time window for the records in the event. + /// + public TimeWindow Window { get; set; } + + /// + /// State being built up to this invoke in the time window. + /// + public Dictionary State { get; set; } + + /// + /// Shard Id of the records. + /// + public string ShardId { get; set; } + + /// + /// DynamoDB stream Arn. + /// + public string EventSourceArn { get; set; } + + /// + /// Set to true for the last invoke of the time window. Subsequent invoke will start a new time window along with a fresh state. + /// + public bool? IsFinalInvokeForWindow { get; set; } + + /// + /// Set to true if window is terminated prematurely. Subsequent invoke will continue the same window with a fresh state. + /// + public bool? IsWindowTerminatedEarly { get; set; } + + /// + /// Time window for the records in the event. + /// + public class TimeWindow + { + /// + /// Window start instant. + /// + public DateTime Start { get; set; } + + /// + /// Window end instant. + /// + public DateTime End { get; set; } + } + } +} diff --git a/Libraries/src/Amazon.Lambda.DynamoDBEvents/DynamoDBTimeWindowResponse.cs b/Libraries/src/Amazon.Lambda.DynamoDBEvents/DynamoDBTimeWindowResponse.cs new file mode 100644 index 000000000..d4350a849 --- /dev/null +++ b/Libraries/src/Amazon.Lambda.DynamoDBEvents/DynamoDBTimeWindowResponse.cs @@ -0,0 +1,48 @@ +namespace Amazon.Lambda.DynamoDBEvents +{ + using System; + using System.Collections.Generic; + using System.Runtime.Serialization; + + /// + /// Response type to return a new state for the time window and to report batch item failures. + /// + [DataContract] + public class DynamoDBTimeWindowResponse + { + /// + /// New state after processing a batch of records. + /// + [DataMember(Name = "state")] +#if NETCOREAPP_3_1 + [System.Text.Json.Serialization.JsonPropertyName("state")] +#endif + public Dictionary State { get; set; } + + /// + /// A list of records which failed processing. + /// Returning the first record which failed would retry all remaining records from the batch. + /// + [DataMember(Name = "batchItemFailures")] +#if NETCOREAPP_3_1 + [System.Text.Json.Serialization.JsonPropertyName("batchItemFailures")] +#endif + public IList BatchItemFailures { get; set; } + + /// + /// Class representing the individual record which failed processing. + /// + [DataContract] + public class BatchItemFailure + { + /// + /// Sequence number of the record which failed processing. + /// + [DataMember(Name = "itemIdentifier")] +#if NETCOREAPP_3_1 + [System.Text.Json.Serialization.JsonPropertyName("itemIdentifier")] +#endif + public string ItemIdentifier { get; set; } + } + } +} diff --git a/Libraries/src/Amazon.Lambda.DynamoDBEvents/README.md b/Libraries/src/Amazon.Lambda.DynamoDBEvents/README.md index 165af79e0..3088cc7cd 100644 --- a/Libraries/src/Amazon.Lambda.DynamoDBEvents/README.md +++ b/Libraries/src/Amazon.Lambda.DynamoDBEvents/README.md @@ -56,3 +56,54 @@ public class Function } } ``` + +The following is a sample class and Lambda function that receives Amazon DynamoDB event when using time windows as an input and uses `DynamoDBTimeWindowResponse` object to demonstrate how to aggregate and then process the final state. (Note that by default anything written to Console will be logged as CloudWatch Logs events.) + +```csharp +public class Function +{ + public DynamoDBTimeWindowResponse Handler(DynamoDBTimeWindowEvent ddbTimeWindowEvent) + { + Console.WriteLine($"Incoming event, Source Arn: {ddbTimeWindowEvent.EventSourceArn}, Shard Id: {ddbTimeWindowEvent.ShardId}"); + Console.WriteLine($"Incoming state: {string.Join(';', ddbTimeWindowEvent.State.Select(s => s.Key + "=" + s.Value))}"); + + //Check if this is the end of the window to either aggregate or process. + if (ddbTimeWindowEvent.IsFinalInvokeForWindow.HasValue && ddbTimeWindowEvent.IsFinalInvokeForWindow.Value) + { + // Logic to handle final state of the window + Console.WriteLine("Destination invoke"); + } + else + { + Console.WriteLine("Aggregate invoke"); + } + + //Check for early terminations + if (ddbTimeWindowEvent.IsWindowTerminatedEarly.HasValue && ddbTimeWindowEvent.IsWindowTerminatedEarly.Value) + { + Console.WriteLine("Window terminated early"); + } + + // Aggregation logic + var state = ddbTimeWindowEvent.State; + foreach (var record in ddbTimeWindowEvent.Records) + { + int id; + if (!state.ContainsKey(record.Dynamodb.NewImage["Id"].N) || int.TryParse(record.Dynamodb.NewImage["Id"].N, out id)) + { + state[record.Dynamodb.NewImage["Id"].N] = "1"; + } + else + { + state[record.Dynamodb.NewImage["Id"].N] = (id + 1).ToString(); + } + } + + Console.WriteLine($"Returning state: {string.Join(';', state.Select(s => s.Key + "=" + s.Value))}"); + return new DynamoDBTimeWindowResponse() + { + State = state + }; + } +} +``` diff --git a/Libraries/src/Amazon.Lambda.KinesisEvents/Amazon.Lambda.KinesisEvents.csproj b/Libraries/src/Amazon.Lambda.KinesisEvents/Amazon.Lambda.KinesisEvents.csproj index f904b9ed9..578eda695 100644 --- a/Libraries/src/Amazon.Lambda.KinesisEvents/Amazon.Lambda.KinesisEvents.csproj +++ b/Libraries/src/Amazon.Lambda.KinesisEvents/Amazon.Lambda.KinesisEvents.csproj @@ -6,7 +6,7 @@ netstandard2.0 Amazon Lambda .NET Core support - KinesisEvents package. Amazon.Lambda.KinesisEvents - 2.0.0 + 2.1.0 Amazon.Lambda.KinesisEvents Amazon.Lambda.KinesisEvents AWS;Amazon;Lambda diff --git a/Libraries/src/Amazon.Lambda.KinesisEvents/KinesisTimeWindowEvent.cs b/Libraries/src/Amazon.Lambda.KinesisEvents/KinesisTimeWindowEvent.cs new file mode 100644 index 000000000..41ce04515 --- /dev/null +++ b/Libraries/src/Amazon.Lambda.KinesisEvents/KinesisTimeWindowEvent.cs @@ -0,0 +1,58 @@ +namespace Amazon.Lambda.KinesisEvents +{ + using System; + using System.Collections.Generic; + + /// + /// Represents an Amazon Kinesis event when using time windows. + /// https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html#services-kinesis-windows + /// + public class KinesisTimeWindowEvent : KinesisEvent + { + /// + /// Time window for the records in the event. + /// + public TimeWindow Window { get; set; } + + /// + /// State being built up to this invoke in the time window. + /// + public Dictionary State { get; set; } + + /// + /// Shard Id of the records. + /// + public string ShardId { get; set; } + + /// + /// Kinesis stream or consumer ARN. + /// + public string EventSourceARN { get; set; } + + /// + /// Set to true for the last invoke of the time window. Subsequent invoke will start a new time window along with a fresh state. + /// + public bool? IsFinalInvokeForWindow { get; set; } + + /// + /// Set to true if window is terminated prematurely. Subsequent invoke will continue the same window with a fresh state. + /// + public bool? IsWindowTerminatedEarly { get; set; } + + /// + /// Time window for the records in the event. + /// + public class TimeWindow + { + /// + /// Window start instant. + /// + public DateTime Start { get; set; } + + /// + /// Window end instant. + /// + public DateTime End { get; set; } + } + } +} diff --git a/Libraries/src/Amazon.Lambda.KinesisEvents/KinesisTimeWindowResponse.cs b/Libraries/src/Amazon.Lambda.KinesisEvents/KinesisTimeWindowResponse.cs new file mode 100644 index 000000000..d441878cd --- /dev/null +++ b/Libraries/src/Amazon.Lambda.KinesisEvents/KinesisTimeWindowResponse.cs @@ -0,0 +1,48 @@ +namespace Amazon.Lambda.KinesisEvents +{ + using System; + using System.Collections.Generic; + using System.Runtime.Serialization; + + /// + /// Response type to return a new state for the time window and to report batch item failures. + /// + [DataContract] + public class KinesisTimeWindowResponse + { + /// + /// New state after processing a batch of records. + /// + [DataMember(Name = "state")] +#if NETCOREAPP_3_1 + [System.Text.Json.Serialization.JsonPropertyName("state")] +#endif + public Dictionary State { get; set; } + + /// + /// A list of records which failed processing. + /// Returning the first record which failed would retry all remaining records from the batch. + /// + [DataMember(Name = "batchItemFailures")] +#if NETCOREAPP_3_1 + [System.Text.Json.Serialization.JsonPropertyName("batchItemFailures")] +#endif + public IList BatchItemFailures { get; set; } + + /// + /// Class representing the individual record which failed processing. + /// + [DataContract] + public class BatchItemFailure + { + /// + /// Sequence number of the record which failed processing. + /// + [DataMember(Name = "itemIdentifier")] +#if NETCOREAPP_3_1 + [System.Text.Json.Serialization.JsonPropertyName("itemIdentifier")] +#endif + public string ItemIdentifier { get; set; } + } + } +} diff --git a/Libraries/src/Amazon.Lambda.KinesisEvents/README.md b/Libraries/src/Amazon.Lambda.KinesisEvents/README.md index 9e2bc9865..4d6c715ba 100644 --- a/Libraries/src/Amazon.Lambda.KinesisEvents/README.md +++ b/Libraries/src/Amazon.Lambda.KinesisEvents/README.md @@ -36,3 +36,54 @@ public class Function } } ``` + +The following is a sample class and Lambda function that receives Amazon Kinesis event when using time windows as an input and uses `KinesisTimeWindowResponse` object to demonstrates how to aggregate and then process the final state. (Note that by default anything written to Console will be logged as CloudWatch Logs events.) + +```csharp +public class Function +{ + public KinesisTimeWindowResponse Handler(KinesisTimeWindowEvent kinesisTimeWindowEvent) + { + Console.WriteLine($"Incoming event, Source Arn: {kinesisTimeWindowEvent.EventSourceARN}, Shard Id: {kinesisTimeWindowEvent.ShardId}"); + Console.WriteLine($"Incoming state: {string.Join(';', kinesisTimeWindowEvent.State.Select(s => s.Key + "=" + s.Value))}"); + + //Check if this is the end of the window to either aggregate or process. + if (kinesisTimeWindowEvent.IsFinalInvokeForWindow.HasValue && kinesisTimeWindowEvent.IsFinalInvokeForWindow.Value) + { + // Logic to handle final state of the window + Console.WriteLine("Destination invoke"); + } + else + { + Console.WriteLine("Aggregate invoke"); + } + + //Check for early terminations + if (kinesisTimeWindowEvent.IsWindowTerminatedEarly.HasValue && kinesisTimeWindowEvent.IsWindowTerminatedEarly.Value) + { + Console.WriteLine("Window terminated early"); + } + + // Aggregation logic + var state = kinesisTimeWindowEvent.State; + foreach (var record in kinesisTimeWindowEvent.Records) + { + int id; + if (!state.ContainsKey(record.Kinesis.PartitionKey) || int.TryParse(record.Kinesis.PartitionKey, out id)) + { + state[record.Kinesis.PartitionKey] = "1"; + } + else + { + state[record.Kinesis.PartitionKey] = (id + 1).ToString(); + } + } + + Console.WriteLine($"Returning state: {string.Join(';', state.Select(s => s.Key + "=" + s.Value))}"); + return new KinesisTimeWindowResponse() + { + State = state + }; + } +} +``` diff --git a/Libraries/src/Amazon.Lambda.Serialization.SystemTextJson/AbstractLambdaJsonSerializer.cs b/Libraries/src/Amazon.Lambda.Serialization.SystemTextJson/AbstractLambdaJsonSerializer.cs index 0f27cf3fa..f7364ce76 100644 --- a/Libraries/src/Amazon.Lambda.Serialization.SystemTextJson/AbstractLambdaJsonSerializer.cs +++ b/Libraries/src/Amazon.Lambda.Serialization.SystemTextJson/AbstractLambdaJsonSerializer.cs @@ -142,7 +142,8 @@ protected virtual JsonSerializerOptions CreateDefaultJsonSerializationOptions() new DateTimeConverter(), new MemoryStreamConverter(), new ConstantClassConverter(), - new ByteArrayConverter() + new ByteArrayConverter(), + new LongToStringJsonConverter() } }; diff --git a/Libraries/src/Amazon.Lambda.Serialization.SystemTextJson/Amazon.Lambda.Serialization.SystemTextJson.csproj b/Libraries/src/Amazon.Lambda.Serialization.SystemTextJson/Amazon.Lambda.Serialization.SystemTextJson.csproj index 9987bad9f..457df206a 100644 --- a/Libraries/src/Amazon.Lambda.Serialization.SystemTextJson/Amazon.Lambda.Serialization.SystemTextJson.csproj +++ b/Libraries/src/Amazon.Lambda.Serialization.SystemTextJson/Amazon.Lambda.Serialization.SystemTextJson.csproj @@ -9,7 +9,7 @@ Amazon.Lambda.Serialization.SystemTextJson Amazon.Lambda.Serialization.SystemTextJson AWS;Amazon;Lambda - 2.3.1 + 2.3.2 README.md diff --git a/Libraries/src/Amazon.Lambda.Serialization.SystemTextJson/Converters/LongToStringJsonConverter.cs b/Libraries/src/Amazon.Lambda.Serialization.SystemTextJson/Converters/LongToStringJsonConverter.cs new file mode 100644 index 000000000..fd99a6e73 --- /dev/null +++ b/Libraries/src/Amazon.Lambda.Serialization.SystemTextJson/Converters/LongToStringJsonConverter.cs @@ -0,0 +1,35 @@ +using System; +using System.Buffers; +using System.Buffers.Text; +using System.Text.Json; +using System.Text.Json.Serialization; + +namespace Amazon.Lambda.Serialization.SystemTextJson.Converters +{ + public class LongToStringJsonConverter : JsonConverter + { + public override string Read(ref Utf8JsonReader reader, Type type, JsonSerializerOptions options) + { + if (reader.TokenType != JsonTokenType.Number && + type == typeof(String)) + return reader.GetString(); + + var span = reader.HasValueSequence ? reader.ValueSequence.ToArray() : reader.ValueSpan; + + if (Utf8Parser.TryParse(span, out long number, out var bytesConsumed) && span.Length == bytesConsumed) + return number.ToString(); + + var data = reader.GetString(); + + throw new InvalidOperationException($"'{data}' is not a correct expected value!") + { + Source = "LongToStringJsonConverter" + }; + } + + public override void Write(Utf8JsonWriter writer, string value, JsonSerializerOptions options) + { + writer.WriteStringValue(value.ToString()); + } + } +} diff --git a/Libraries/src/Amazon.Lambda.Serialization.SystemTextJson/LambdaJsonSerializer.cs b/Libraries/src/Amazon.Lambda.Serialization.SystemTextJson/LambdaJsonSerializer.cs index ffe817119..f1c5c0d6a 100644 --- a/Libraries/src/Amazon.Lambda.Serialization.SystemTextJson/LambdaJsonSerializer.cs +++ b/Libraries/src/Amazon.Lambda.Serialization.SystemTextJson/LambdaJsonSerializer.cs @@ -46,6 +46,7 @@ public LambdaJsonSerializer() _options.Converters.Add(new MemoryStreamConverter()); _options.Converters.Add(new ConstantClassConverter()); _options.Converters.Add(new ByteArrayConverter()); + _options.Converters.Add(new LongToStringJsonConverter()); WriterOptions = new JsonWriterOptions() { diff --git a/Libraries/test/EventsTests.Shared/EventTests.cs b/Libraries/test/EventsTests.Shared/EventTests.cs index d1079cb1c..97777caa9 100644 --- a/Libraries/test/EventsTests.Shared/EventTests.cs +++ b/Libraries/test/EventsTests.Shared/EventTests.cs @@ -336,6 +336,88 @@ private void Handle(KinesisEvent kinesisEvent) } } + [Theory] + [InlineData(typeof(JsonSerializer))] +#if NETCOREAPP3_1_OR_GREATER + [InlineData(typeof(Amazon.Lambda.Serialization.SystemTextJson.LambdaJsonSerializer))] + [InlineData(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))] +#endif + public void KinesisTimeWindowTest(Type serializerType) + { + var serializer = Activator.CreateInstance(serializerType) as ILambdaSerializer; + using (var fileStream = LoadJsonTestFile("kinesis-timewindow-event.json")) + { + var kinesisTimeWindowEvent = serializer.Deserialize(fileStream); + + Assert.Equal(kinesisTimeWindowEvent.ShardId, "shardId-000000000006"); + Assert.Equal(kinesisTimeWindowEvent.EventSourceARN, "arn:aws:kinesis:us-east-1:123456789012:stream/lambda-stream"); + Assert.False(kinesisTimeWindowEvent.IsFinalInvokeForWindow); + Assert.False(kinesisTimeWindowEvent.IsWindowTerminatedEarly); + Assert.Equal(kinesisTimeWindowEvent.State.Count, 2); + Assert.True(kinesisTimeWindowEvent.State.ContainsKey("1")); + Assert.Equal(kinesisTimeWindowEvent.State["1"], "282"); + Assert.True(kinesisTimeWindowEvent.State.ContainsKey("2")); + Assert.Equal(kinesisTimeWindowEvent.State["2"], "715"); + Assert.NotNull(kinesisTimeWindowEvent.Window); + Assert.Equal(637430942400000000, kinesisTimeWindowEvent.Window.Start.Ticks); + Assert.Equal(637430943600000000, kinesisTimeWindowEvent.Window.End.Ticks); + + Assert.Equal(kinesisTimeWindowEvent.Records.Count, 1); + var record = kinesisTimeWindowEvent.Records[0]; + Assert.Equal(record.EventId, "shardId-000000000006:49590338271490256608559692538361571095921575989136588898"); + Assert.Equal(record.EventName, "aws:kinesis:record"); + Assert.Equal(record.EventVersion, "1.0"); + Assert.Equal(record.EventSource, "aws:kinesis"); + Assert.Equal(record.InvokeIdentityArn, "arn:aws:iam::123456789012:role/lambda-kinesis-role"); + Assert.Equal(record.AwsRegion, "us-east-1"); + Assert.Equal(record.EventSourceARN, "arn:aws:kinesis:us-east-1:123456789012:stream/lambda-stream"); + + Assert.Equal(record.Kinesis.KinesisSchemaVersion, "1.0"); + Assert.Equal(record.Kinesis.PartitionKey, "1"); + Assert.Equal(record.Kinesis.SequenceNumber, "49590338271490256608559692538361571095921575989136588898"); + var dataBytes = record.Kinesis.Data.ToArray(); + Assert.Equal(Convert.ToBase64String(dataBytes), "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg=="); + Assert.Equal(Encoding.UTF8.GetString(dataBytes), "Hello, this is a test."); + Assert.Equal(637430942750000000, record.Kinesis.ApproximateArrivalTimestamp.ToUniversalTime().Ticks); + + Handle(kinesisTimeWindowEvent); + } + } + + private void Handle(KinesisTimeWindowEvent kinesisTimeWindowEvent) + { + foreach (var record in kinesisTimeWindowEvent.Records) + { + var kinesisRecord = record.Kinesis; + var dataBytes = kinesisRecord.Data.ToArray(); + var dataText = Encoding.UTF8.GetString(dataBytes); + Assert.Equal("Hello, this is a test.", dataText); + Console.WriteLine($"[{record.EventName}] Data = '{dataText}'."); + } + } + + [Theory] + [InlineData(typeof(JsonSerializer))] +#if NETCOREAPP3_1_OR_GREATER + [InlineData(typeof(Amazon.Lambda.Serialization.SystemTextJson.LambdaJsonSerializer))] + [InlineData(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))] +#endif + public void KinesisTimeWindowResponseTest(Type serializerType) + { + var serializer = Activator.CreateInstance(serializerType) as ILambdaSerializer; + using (var fileStream = LoadJsonTestFile("kinesis-timewindow-response.json")) + { + var kinesisTimeWindowResponse = serializer.Deserialize(fileStream); + + Assert.Equal(kinesisTimeWindowResponse.State.Count, 2); + Assert.True(kinesisTimeWindowResponse.State.ContainsKey("1")); + Assert.Equal(kinesisTimeWindowResponse.State["1"], "282"); + Assert.True(kinesisTimeWindowResponse.State.ContainsKey("2")); + Assert.Equal(kinesisTimeWindowResponse.State["2"], "715"); + Assert.Equal(kinesisTimeWindowResponse.BatchItemFailures.Count, 0); + } + } + [Theory] [InlineData(typeof(JsonSerializer))] #if NETCOREAPP3_1_OR_GREATER @@ -412,6 +494,109 @@ private static void Handle(DynamoDBEvent ddbEvent) Console.WriteLine($"Successfully processed {ddbEvent.Records.Count} records."); } + [Theory] + [InlineData(typeof(JsonSerializer))] +#if NETCOREAPP3_1_OR_GREATER + [InlineData(typeof(Amazon.Lambda.Serialization.SystemTextJson.LambdaJsonSerializer))] + [InlineData(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))] +#endif + public void DynamoDBTimeWindowTest(Type serializerType) + { + var serializer = Activator.CreateInstance(serializerType) as ILambdaSerializer; + using (var fileStream = LoadJsonTestFile("dynamodb-timewindow-event.json")) + { + var dynamoDBTimeWindowEvent = serializer.Deserialize(fileStream); + + Assert.Equal(dynamoDBTimeWindowEvent.ShardId, "shard123456789"); + Assert.Equal(dynamoDBTimeWindowEvent.EventSourceArn, "stream-ARN"); + Assert.False(dynamoDBTimeWindowEvent.IsFinalInvokeForWindow); + Assert.False(dynamoDBTimeWindowEvent.IsWindowTerminatedEarly); + Assert.Equal(dynamoDBTimeWindowEvent.State.Count, 1); + Assert.True(dynamoDBTimeWindowEvent.State.ContainsKey("1")); + Assert.Equal(dynamoDBTimeWindowEvent.State["1"], "state1"); + Assert.NotNull(dynamoDBTimeWindowEvent.Window); + Assert.Equal(637317252000000000, dynamoDBTimeWindowEvent.Window.Start.Ticks); + Assert.Equal(637317255000000000, dynamoDBTimeWindowEvent.Window.End.Ticks); + + Assert.Equal(dynamoDBTimeWindowEvent.Records.Count, 3); + + var record1 = dynamoDBTimeWindowEvent.Records[0]; + Assert.Equal(record1.EventID, "1"); + Assert.Equal(record1.EventName, "INSERT"); + Assert.Equal(record1.EventVersion, "1.0"); + Assert.Equal(record1.EventSource, "aws:dynamodb"); + Assert.Equal(record1.AwsRegion, "us-east-1"); + Assert.Equal(record1.EventSourceArn, "stream-ARN"); + Assert.Equal(record1.Dynamodb.Keys.Count, 1); + Assert.Equal(record1.Dynamodb.Keys["Id"].N, "101"); + Assert.Equal(record1.Dynamodb.SequenceNumber, "111"); + Assert.Equal(record1.Dynamodb.SizeBytes, 26); + Assert.Equal(record1.Dynamodb.StreamViewType, "NEW_IMAGE"); + Assert.Equal(record1.Dynamodb.NewImage.Count, 2); + Assert.Equal(record1.Dynamodb.NewImage["Message"].S, "New item!"); + Assert.Equal(record1.Dynamodb.NewImage["Id"].N, "101"); + Assert.Equal(record1.Dynamodb.OldImage.Count, 0); + + var record2 = dynamoDBTimeWindowEvent.Records[1]; + Assert.Equal(record2.EventID, "2"); + Assert.Equal(record2.EventName, "MODIFY"); + Assert.Equal(record2.EventVersion, "1.0"); + Assert.Equal(record2.EventSource, "aws:dynamodb"); + Assert.Equal(record2.AwsRegion, "us-east-1"); + Assert.Equal(record2.EventSourceArn, "stream-ARN"); + Assert.Equal(record2.Dynamodb.Keys.Count, 1); + Assert.Equal(record2.Dynamodb.Keys["Id"].N, "101"); + Assert.Equal(record2.Dynamodb.SequenceNumber, "222"); + Assert.Equal(record2.Dynamodb.SizeBytes, 59); + Assert.Equal(record2.Dynamodb.StreamViewType, "NEW_AND_OLD_IMAGES"); + Assert.Equal(record2.Dynamodb.NewImage.Count, 2); + Assert.Equal(record2.Dynamodb.NewImage["Message"].S, "This item has changed"); + Assert.Equal(record2.Dynamodb.NewImage["Id"].N, "101"); + Assert.Equal(record2.Dynamodb.OldImage.Count, 2); + Assert.Equal(record2.Dynamodb.OldImage["Message"].S, "New item!"); + Assert.Equal(record2.Dynamodb.OldImage["Id"].N, "101"); + + var record3 = dynamoDBTimeWindowEvent.Records[2]; + Assert.Equal(record3.EventID, "3"); + Assert.Equal(record3.EventName, "REMOVE"); + Assert.Equal(record3.EventVersion, "1.0"); + Assert.Equal(record3.EventSource, "aws:dynamodb"); + Assert.Equal(record3.AwsRegion, "us-east-1"); + Assert.Equal(record3.EventSourceArn, "stream-ARN"); + Assert.Equal(record3.Dynamodb.Keys.Count, 1); + Assert.Equal(record3.Dynamodb.Keys["Id"].N, "101"); + Assert.Equal(record3.Dynamodb.SequenceNumber, "333"); + Assert.Equal(record3.Dynamodb.SizeBytes, 38); + Assert.Equal(record3.Dynamodb.StreamViewType, "NEW_AND_OLD_IMAGES"); + Assert.Equal(record3.Dynamodb.NewImage.Count, 0); + Assert.Equal(record3.Dynamodb.OldImage.Count, 2); + Assert.Equal(record3.Dynamodb.OldImage["Message"].S, "This item has changed"); + Assert.Equal(record3.Dynamodb.OldImage["Id"].N, "101"); + } + } + + [Theory] + [InlineData(typeof(JsonSerializer))] +#if NETCOREAPP3_1_OR_GREATER + [InlineData(typeof(Amazon.Lambda.Serialization.SystemTextJson.LambdaJsonSerializer))] + [InlineData(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))] +#endif + public void DynamoDBTimeWindowResponseTest(Type serializerType) + { + var serializer = Activator.CreateInstance(serializerType) as ILambdaSerializer; + using (var fileStream = LoadJsonTestFile("dynamodb-timewindow-response.json")) + { + var dynamoDBTimeWindowResponse = serializer.Deserialize(fileStream); + + Assert.Equal(dynamoDBTimeWindowResponse.State.Count, 2); + Assert.True(dynamoDBTimeWindowResponse.State.ContainsKey("1")); + Assert.Equal(dynamoDBTimeWindowResponse.State["1"], "282"); + Assert.True(dynamoDBTimeWindowResponse.State.ContainsKey("2")); + Assert.Equal(dynamoDBTimeWindowResponse.State["2"], "715"); + Assert.Equal(dynamoDBTimeWindowResponse.BatchItemFailures.Count, 0); + } + } + [Theory] [InlineData(typeof(JsonSerializer))] #if NETCOREAPP_3_1 diff --git a/Libraries/test/EventsTests.Shared/EventsTests.Shared.projitems b/Libraries/test/EventsTests.Shared/EventsTests.Shared.projitems index 1da4a7f78..12fbe4fba 100644 --- a/Libraries/test/EventsTests.Shared/EventsTests.Shared.projitems +++ b/Libraries/test/EventsTests.Shared/EventsTests.Shared.projitems @@ -31,6 +31,10 @@ + + + + diff --git a/Libraries/test/EventsTests.Shared/dynamodb-timewindow-event.json b/Libraries/test/EventsTests.Shared/dynamodb-timewindow-event.json new file mode 100644 index 000000000..9c31e19e3 --- /dev/null +++ b/Libraries/test/EventsTests.Shared/dynamodb-timewindow-event.json @@ -0,0 +1,101 @@ +{ + "Records": [ + { + "eventID": "1", + "eventName": "INSERT", + "eventVersion": "1.0", + "eventSource": "aws:dynamodb", + "awsRegion": "us-east-1", + "dynamodb": { + "Keys": { + "Id": { + "N": "101" + } + }, + "NewImage": { + "Message": { + "S": "New item!" + }, + "Id": { + "N": "101" + } + }, + "SequenceNumber": "111", + "SizeBytes": 26, + "StreamViewType": "NEW_IMAGE" + }, + "eventSourceARN": "stream-ARN" + }, + { + "eventID": "2", + "eventName": "MODIFY", + "eventVersion": "1.0", + "eventSource": "aws:dynamodb", + "awsRegion": "us-east-1", + "dynamodb": { + "Keys": { + "Id": { + "N": "101" + } + }, + "NewImage": { + "Message": { + "S": "This item has changed" + }, + "Id": { + "N": "101" + } + }, + "OldImage": { + "Message": { + "S": "New item!" + }, + "Id": { + "N": "101" + } + }, + "SequenceNumber": "222", + "SizeBytes": 59, + "StreamViewType": "NEW_AND_OLD_IMAGES" + }, + "eventSourceARN": "stream-ARN" + }, + { + "eventID": "3", + "eventName": "REMOVE", + "eventVersion": "1.0", + "eventSource": "aws:dynamodb", + "awsRegion": "us-east-1", + "dynamodb": { + "Keys": { + "Id": { + "N": "101" + } + }, + "OldImage": { + "Message": { + "S": "This item has changed" + }, + "Id": { + "N": "101" + } + }, + "SequenceNumber": "333", + "SizeBytes": 38, + "StreamViewType": "NEW_AND_OLD_IMAGES" + }, + "eventSourceARN": "stream-ARN" + } + ], + "window": { + "start": "2020-07-30T17:00:00Z", + "end": "2020-07-30T17:05:00Z" + }, + "state": { + "1": "state1" + }, + "shardId": "shard123456789", + "eventSourceARN": "stream-ARN", + "isFinalInvokeForWindow": false, + "isWindowTerminatedEarly": false +} \ No newline at end of file diff --git a/Libraries/test/EventsTests.Shared/dynamodb-timewindow-response.json b/Libraries/test/EventsTests.Shared/dynamodb-timewindow-response.json new file mode 100644 index 000000000..ae2c46b6e --- /dev/null +++ b/Libraries/test/EventsTests.Shared/dynamodb-timewindow-response.json @@ -0,0 +1,7 @@ +{ + "state": { + "1": 282, + "2": 715 + }, + "batchItemFailures": [] +} \ No newline at end of file diff --git a/Libraries/test/EventsTests.Shared/kinesis-timewindow-event.json b/Libraries/test/EventsTests.Shared/kinesis-timewindow-event.json new file mode 100644 index 000000000..6c14af23a --- /dev/null +++ b/Libraries/test/EventsTests.Shared/kinesis-timewindow-event.json @@ -0,0 +1,32 @@ +{ + "Records": [ + { + "kinesis": { + "kinesisSchemaVersion": "1.0", + "partitionKey": "1", + "sequenceNumber": "49590338271490256608559692538361571095921575989136588898", + "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", + "approximateArrivalTimestamp": 1607497475.000 + }, + "eventSource": "aws:kinesis", + "eventVersion": "1.0", + "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898", + "eventName": "aws:kinesis:record", + "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-kinesis-role", + "awsRegion": "us-east-1", + "eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/lambda-stream" + } + ], + "window": { + "start": "2020-12-09T07:04:00Z", + "end": "2020-12-09T07:06:00Z" + }, + "state": { + "1": 282, + "2": 715 + }, + "shardId": "shardId-000000000006", + "eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/lambda-stream", + "isFinalInvokeForWindow": false, + "isWindowTerminatedEarly": false +} \ No newline at end of file diff --git a/Libraries/test/EventsTests.Shared/kinesis-timewindow-response.json b/Libraries/test/EventsTests.Shared/kinesis-timewindow-response.json new file mode 100644 index 000000000..ae2c46b6e --- /dev/null +++ b/Libraries/test/EventsTests.Shared/kinesis-timewindow-response.json @@ -0,0 +1,7 @@ +{ + "state": { + "1": 282, + "2": 715 + }, + "batchItemFailures": [] +} \ No newline at end of file From 9e0be8116579c50277e924150c8c1de0809c8b0c Mon Sep 17 00:00:00 2001 From: Ashish Dhingra Date: Wed, 23 Aug 2023 15:14:32 -0700 Subject: [PATCH 2/3] Implemented custom DictionaryLongToStringJsonConverter (System.Text.Json.Serialization.JsonConverter) for non-NetStandard 2.0 target(s) for handling Kinesis and DynamoDB TimeWindowEvent State property. --- .../Amazon.Lambda.DynamoDBEvents.csproj | 8 +++ .../DictionaryLongToStringJsonConverter.cs | 71 +++++++++++++++++++ .../DynamoDBTimeWindowEvent.cs | 8 +++ .../DynamoDBTimeWindowResponse.cs | 12 +++- .../Amazon.Lambda.KinesisEvents.csproj | 8 +++ .../DictionaryLongToStringJsonConverter.cs | 71 +++++++++++++++++++ .../KinesisTimeWindowEvent.cs | 8 +++ .../KinesisTimeWindowResponse.cs | 12 +++- .../AbstractLambdaJsonSerializer.cs | 3 +- ...Lambda.Serialization.SystemTextJson.csproj | 2 +- .../Converters/LongToStringJsonConverter.cs | 35 --------- .../LambdaJsonSerializer.cs | 1 - 12 files changed, 194 insertions(+), 45 deletions(-) create mode 100644 Libraries/src/Amazon.Lambda.DynamoDBEvents/Converters/DictionaryLongToStringJsonConverter.cs create mode 100644 Libraries/src/Amazon.Lambda.KinesisEvents/Converters/DictionaryLongToStringJsonConverter.cs delete mode 100644 Libraries/src/Amazon.Lambda.Serialization.SystemTextJson/Converters/LongToStringJsonConverter.cs diff --git a/Libraries/src/Amazon.Lambda.DynamoDBEvents/Amazon.Lambda.DynamoDBEvents.csproj b/Libraries/src/Amazon.Lambda.DynamoDBEvents/Amazon.Lambda.DynamoDBEvents.csproj index bb33c3a40..022982798 100644 --- a/Libraries/src/Amazon.Lambda.DynamoDBEvents/Amazon.Lambda.DynamoDBEvents.csproj +++ b/Libraries/src/Amazon.Lambda.DynamoDBEvents/Amazon.Lambda.DynamoDBEvents.csproj @@ -16,4 +16,12 @@ + + + + + + + + diff --git a/Libraries/src/Amazon.Lambda.DynamoDBEvents/Converters/DictionaryLongToStringJsonConverter.cs b/Libraries/src/Amazon.Lambda.DynamoDBEvents/Converters/DictionaryLongToStringJsonConverter.cs new file mode 100644 index 000000000..50dc180a0 --- /dev/null +++ b/Libraries/src/Amazon.Lambda.DynamoDBEvents/Converters/DictionaryLongToStringJsonConverter.cs @@ -0,0 +1,71 @@ +using System; +using System.Collections.Generic; +using System.Text.Json; +using System.Text.Json.Serialization; + +namespace Amazon.Lambda.DynamoDBEvents.Converters +{ + public class DictionaryLongToStringJsonConverter : JsonConverter> + { + public override Dictionary Read(ref Utf8JsonReader reader, Type type, JsonSerializerOptions options) + { + if (reader.TokenType != JsonTokenType.StartObject) + { + throw new JsonException($"JsonTokenType was of type {reader.TokenType}, only objects are supported."); + } + + var dictionary = new Dictionary(); + + while (reader.Read()) + { + if (reader.TokenType == JsonTokenType.EndObject) + { + return dictionary; + } + + // Get the key. + if (reader.TokenType != JsonTokenType.PropertyName) + { + throw new JsonException("JsonTokenType was not PropertyName."); + } + + var propertyName = reader.GetString(); + + if (string.IsNullOrWhiteSpace(propertyName)) + { + throw new JsonException("Failed to get property name."); + } + + // Get the value. + reader.Read(); + var keyValue = ExtractValue(ref reader, options); + dictionary.Add(propertyName, keyValue); + } + + return dictionary; + } + + public override void Write(Utf8JsonWriter writer, Dictionary value, JsonSerializerOptions options) + { + // Use the built-in serializer, because it can handle dictionaries with string keys. + JsonSerializer.Serialize(writer, value, options); + } + + private string ExtractValue(ref Utf8JsonReader reader, JsonSerializerOptions options) + { + switch (reader.TokenType) + { + case JsonTokenType.Number: + if (reader.TryGetInt64(out var result)) + { + return result.ToString(); + } + throw new JsonException($"Unable to convert '{reader.TokenType}' to long value."); + case JsonTokenType.String: // If it is string, then use as it is. + return reader.GetString(); + default: + throw new JsonException($"'{reader.TokenType}' is not supported."); + } + } + } +} \ No newline at end of file diff --git a/Libraries/src/Amazon.Lambda.DynamoDBEvents/DynamoDBTimeWindowEvent.cs b/Libraries/src/Amazon.Lambda.DynamoDBEvents/DynamoDBTimeWindowEvent.cs index 321c44092..97e5644e8 100644 --- a/Libraries/src/Amazon.Lambda.DynamoDBEvents/DynamoDBTimeWindowEvent.cs +++ b/Libraries/src/Amazon.Lambda.DynamoDBEvents/DynamoDBTimeWindowEvent.cs @@ -3,6 +3,11 @@ namespace Amazon.Lambda.DynamoDBEvents using System; using System.Collections.Generic; +#if NETCOREAPP3_1_OR_GREATER + using Amazon.Lambda.DynamoDBEvents.Converters; + using System.Text.Json.Serialization; +#endif + /// /// Represents an Amazon DynamodDB event when using time windows. /// https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html#services-ddb-windows @@ -17,6 +22,9 @@ public class DynamoDBTimeWindowEvent : DynamoDBEvent /// /// State being built up to this invoke in the time window. /// +#if NETCOREAPP3_1_OR_GREATER + [JsonConverter(typeof(DictionaryLongToStringJsonConverter))] +#endif public Dictionary State { get; set; } /// diff --git a/Libraries/src/Amazon.Lambda.DynamoDBEvents/DynamoDBTimeWindowResponse.cs b/Libraries/src/Amazon.Lambda.DynamoDBEvents/DynamoDBTimeWindowResponse.cs index d4350a849..73a14ab7e 100644 --- a/Libraries/src/Amazon.Lambda.DynamoDBEvents/DynamoDBTimeWindowResponse.cs +++ b/Libraries/src/Amazon.Lambda.DynamoDBEvents/DynamoDBTimeWindowResponse.cs @@ -4,6 +4,11 @@ using System.Collections.Generic; using System.Runtime.Serialization; +#if NETCOREAPP3_1_OR_GREATER + using Amazon.Lambda.DynamoDBEvents.Converters; + using System.Text.Json.Serialization; +#endif + /// /// Response type to return a new state for the time window and to report batch item failures. /// @@ -14,8 +19,9 @@ public class DynamoDBTimeWindowResponse /// New state after processing a batch of records. /// [DataMember(Name = "state")] -#if NETCOREAPP_3_1 +#if NETCOREAPP3_1_OR_GREATER [System.Text.Json.Serialization.JsonPropertyName("state")] + [JsonConverter(typeof(DictionaryLongToStringJsonConverter))] #endif public Dictionary State { get; set; } @@ -24,7 +30,7 @@ public class DynamoDBTimeWindowResponse /// Returning the first record which failed would retry all remaining records from the batch. /// [DataMember(Name = "batchItemFailures")] -#if NETCOREAPP_3_1 +#if NETCOREAPP3_1_OR_GREATER [System.Text.Json.Serialization.JsonPropertyName("batchItemFailures")] #endif public IList BatchItemFailures { get; set; } @@ -39,7 +45,7 @@ public class BatchItemFailure /// Sequence number of the record which failed processing. /// [DataMember(Name = "itemIdentifier")] -#if NETCOREAPP_3_1 +#if NETCOREAPP3_1_OR_GREATER [System.Text.Json.Serialization.JsonPropertyName("itemIdentifier")] #endif public string ItemIdentifier { get; set; } diff --git a/Libraries/src/Amazon.Lambda.KinesisEvents/Amazon.Lambda.KinesisEvents.csproj b/Libraries/src/Amazon.Lambda.KinesisEvents/Amazon.Lambda.KinesisEvents.csproj index 578eda695..b59f8b130 100644 --- a/Libraries/src/Amazon.Lambda.KinesisEvents/Amazon.Lambda.KinesisEvents.csproj +++ b/Libraries/src/Amazon.Lambda.KinesisEvents/Amazon.Lambda.KinesisEvents.csproj @@ -16,4 +16,12 @@ + + + + + + + + diff --git a/Libraries/src/Amazon.Lambda.KinesisEvents/Converters/DictionaryLongToStringJsonConverter.cs b/Libraries/src/Amazon.Lambda.KinesisEvents/Converters/DictionaryLongToStringJsonConverter.cs new file mode 100644 index 000000000..39100a90c --- /dev/null +++ b/Libraries/src/Amazon.Lambda.KinesisEvents/Converters/DictionaryLongToStringJsonConverter.cs @@ -0,0 +1,71 @@ +using System; +using System.Collections.Generic; +using System.Text.Json; +using System.Text.Json.Serialization; + +namespace Amazon.Lambda.KinesisEvents.Converters +{ + public class DictionaryLongToStringJsonConverter : JsonConverter> + { + public override Dictionary Read(ref Utf8JsonReader reader, Type type, JsonSerializerOptions options) + { + if (reader.TokenType != JsonTokenType.StartObject) + { + throw new JsonException($"JsonTokenType was of type {reader.TokenType}, only objects are supported."); + } + + var dictionary = new Dictionary(); + + while (reader.Read()) + { + if (reader.TokenType == JsonTokenType.EndObject) + { + return dictionary; + } + + // Get the key. + if (reader.TokenType != JsonTokenType.PropertyName) + { + throw new JsonException("JsonTokenType was not PropertyName."); + } + + var propertyName = reader.GetString(); + + if (string.IsNullOrWhiteSpace(propertyName)) + { + throw new JsonException("Failed to get property name."); + } + + // Get the value. + reader.Read(); + var keyValue = ExtractValue(ref reader, options); + dictionary.Add(propertyName, keyValue); + } + + return dictionary; + } + + public override void Write(Utf8JsonWriter writer, Dictionary value, JsonSerializerOptions options) + { + // Use the built-in serializer, because it can handle dictionaries with string keys. + JsonSerializer.Serialize(writer, value, options); + } + + private string ExtractValue(ref Utf8JsonReader reader, JsonSerializerOptions options) + { + switch (reader.TokenType) + { + case JsonTokenType.Number: + if (reader.TryGetInt64(out var result)) + { + return result.ToString(); + } + throw new JsonException($"Unable to convert '{reader.TokenType}' to long value."); + case JsonTokenType.String: // If it is string, then use as it is. + return reader.GetString(); + default: + throw new JsonException($"'{reader.TokenType}' is not supported."); + } + } + } +} \ No newline at end of file diff --git a/Libraries/src/Amazon.Lambda.KinesisEvents/KinesisTimeWindowEvent.cs b/Libraries/src/Amazon.Lambda.KinesisEvents/KinesisTimeWindowEvent.cs index 41ce04515..d0a55658d 100644 --- a/Libraries/src/Amazon.Lambda.KinesisEvents/KinesisTimeWindowEvent.cs +++ b/Libraries/src/Amazon.Lambda.KinesisEvents/KinesisTimeWindowEvent.cs @@ -3,6 +3,11 @@ namespace Amazon.Lambda.KinesisEvents using System; using System.Collections.Generic; +#if NETCOREAPP3_1_OR_GREATER + using Amazon.Lambda.KinesisEvents.Converters; + using System.Text.Json.Serialization; +#endif + /// /// Represents an Amazon Kinesis event when using time windows. /// https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html#services-kinesis-windows @@ -17,6 +22,9 @@ public class KinesisTimeWindowEvent : KinesisEvent /// /// State being built up to this invoke in the time window. /// +#if NETCOREAPP3_1_OR_GREATER + [JsonConverter(typeof(DictionaryLongToStringJsonConverter))] +#endif public Dictionary State { get; set; } /// diff --git a/Libraries/src/Amazon.Lambda.KinesisEvents/KinesisTimeWindowResponse.cs b/Libraries/src/Amazon.Lambda.KinesisEvents/KinesisTimeWindowResponse.cs index d441878cd..7454f7ab0 100644 --- a/Libraries/src/Amazon.Lambda.KinesisEvents/KinesisTimeWindowResponse.cs +++ b/Libraries/src/Amazon.Lambda.KinesisEvents/KinesisTimeWindowResponse.cs @@ -4,6 +4,11 @@ using System.Collections.Generic; using System.Runtime.Serialization; +#if NETCOREAPP3_1_OR_GREATER + using Amazon.Lambda.KinesisEvents.Converters; + using System.Text.Json.Serialization; +#endif + /// /// Response type to return a new state for the time window and to report batch item failures. /// @@ -14,8 +19,9 @@ public class KinesisTimeWindowResponse /// New state after processing a batch of records. /// [DataMember(Name = "state")] -#if NETCOREAPP_3_1 +#if NETCOREAPP3_1_OR_GREATER [System.Text.Json.Serialization.JsonPropertyName("state")] + [JsonConverter(typeof(DictionaryLongToStringJsonConverter))] #endif public Dictionary State { get; set; } @@ -24,7 +30,7 @@ public class KinesisTimeWindowResponse /// Returning the first record which failed would retry all remaining records from the batch. /// [DataMember(Name = "batchItemFailures")] -#if NETCOREAPP_3_1 +#if NETCOREAPP3_1_OR_GREATER [System.Text.Json.Serialization.JsonPropertyName("batchItemFailures")] #endif public IList BatchItemFailures { get; set; } @@ -39,7 +45,7 @@ public class BatchItemFailure /// Sequence number of the record which failed processing. /// [DataMember(Name = "itemIdentifier")] -#if NETCOREAPP_3_1 +#if NETCOREAPP3_1_OR_GREATER [System.Text.Json.Serialization.JsonPropertyName("itemIdentifier")] #endif public string ItemIdentifier { get; set; } diff --git a/Libraries/src/Amazon.Lambda.Serialization.SystemTextJson/AbstractLambdaJsonSerializer.cs b/Libraries/src/Amazon.Lambda.Serialization.SystemTextJson/AbstractLambdaJsonSerializer.cs index f7364ce76..0f27cf3fa 100644 --- a/Libraries/src/Amazon.Lambda.Serialization.SystemTextJson/AbstractLambdaJsonSerializer.cs +++ b/Libraries/src/Amazon.Lambda.Serialization.SystemTextJson/AbstractLambdaJsonSerializer.cs @@ -142,8 +142,7 @@ protected virtual JsonSerializerOptions CreateDefaultJsonSerializationOptions() new DateTimeConverter(), new MemoryStreamConverter(), new ConstantClassConverter(), - new ByteArrayConverter(), - new LongToStringJsonConverter() + new ByteArrayConverter() } }; diff --git a/Libraries/src/Amazon.Lambda.Serialization.SystemTextJson/Amazon.Lambda.Serialization.SystemTextJson.csproj b/Libraries/src/Amazon.Lambda.Serialization.SystemTextJson/Amazon.Lambda.Serialization.SystemTextJson.csproj index 457df206a..9987bad9f 100644 --- a/Libraries/src/Amazon.Lambda.Serialization.SystemTextJson/Amazon.Lambda.Serialization.SystemTextJson.csproj +++ b/Libraries/src/Amazon.Lambda.Serialization.SystemTextJson/Amazon.Lambda.Serialization.SystemTextJson.csproj @@ -9,7 +9,7 @@ Amazon.Lambda.Serialization.SystemTextJson Amazon.Lambda.Serialization.SystemTextJson AWS;Amazon;Lambda - 2.3.2 + 2.3.1 README.md diff --git a/Libraries/src/Amazon.Lambda.Serialization.SystemTextJson/Converters/LongToStringJsonConverter.cs b/Libraries/src/Amazon.Lambda.Serialization.SystemTextJson/Converters/LongToStringJsonConverter.cs deleted file mode 100644 index fd99a6e73..000000000 --- a/Libraries/src/Amazon.Lambda.Serialization.SystemTextJson/Converters/LongToStringJsonConverter.cs +++ /dev/null @@ -1,35 +0,0 @@ -using System; -using System.Buffers; -using System.Buffers.Text; -using System.Text.Json; -using System.Text.Json.Serialization; - -namespace Amazon.Lambda.Serialization.SystemTextJson.Converters -{ - public class LongToStringJsonConverter : JsonConverter - { - public override string Read(ref Utf8JsonReader reader, Type type, JsonSerializerOptions options) - { - if (reader.TokenType != JsonTokenType.Number && - type == typeof(String)) - return reader.GetString(); - - var span = reader.HasValueSequence ? reader.ValueSequence.ToArray() : reader.ValueSpan; - - if (Utf8Parser.TryParse(span, out long number, out var bytesConsumed) && span.Length == bytesConsumed) - return number.ToString(); - - var data = reader.GetString(); - - throw new InvalidOperationException($"'{data}' is not a correct expected value!") - { - Source = "LongToStringJsonConverter" - }; - } - - public override void Write(Utf8JsonWriter writer, string value, JsonSerializerOptions options) - { - writer.WriteStringValue(value.ToString()); - } - } -} diff --git a/Libraries/src/Amazon.Lambda.Serialization.SystemTextJson/LambdaJsonSerializer.cs b/Libraries/src/Amazon.Lambda.Serialization.SystemTextJson/LambdaJsonSerializer.cs index f1c5c0d6a..ffe817119 100644 --- a/Libraries/src/Amazon.Lambda.Serialization.SystemTextJson/LambdaJsonSerializer.cs +++ b/Libraries/src/Amazon.Lambda.Serialization.SystemTextJson/LambdaJsonSerializer.cs @@ -46,7 +46,6 @@ public LambdaJsonSerializer() _options.Converters.Add(new MemoryStreamConverter()); _options.Converters.Add(new ConstantClassConverter()); _options.Converters.Add(new ByteArrayConverter()); - _options.Converters.Add(new LongToStringJsonConverter()); WriterOptions = new JsonWriterOptions() { From 2dd31a1c21af2faa1c028d2e840d8fcd854523ab Mon Sep 17 00:00:00 2001 From: Ashish Dhingra Date: Thu, 3 Aug 2023 15:48:11 -0700 Subject: [PATCH 3/3] Added new StreamsEventResponse class for reporting batch item failures when processing streams for KinesisEvent. --- .../Amazon.Lambda.KinesisEvents.csproj | 2 +- .../src/Amazon.Lambda.KinesisEvents/README.md | 34 ++++++++++++++- .../StreamsEventResponse.cs | 38 ++++++++++++++++ .../test/EventsTests.Shared/EventTests.cs | 43 ++++++++++++++----- .../EventsTests.Shared.projitems | 1 + .../kinesis-batchitemfailures-response.json | 7 +++ 6 files changed, 113 insertions(+), 12 deletions(-) create mode 100644 Libraries/src/Amazon.Lambda.KinesisEvents/StreamsEventResponse.cs create mode 100644 Libraries/test/EventsTests.Shared/kinesis-batchitemfailures-response.json diff --git a/Libraries/src/Amazon.Lambda.KinesisEvents/Amazon.Lambda.KinesisEvents.csproj b/Libraries/src/Amazon.Lambda.KinesisEvents/Amazon.Lambda.KinesisEvents.csproj index b59f8b130..f8b28000c 100644 --- a/Libraries/src/Amazon.Lambda.KinesisEvents/Amazon.Lambda.KinesisEvents.csproj +++ b/Libraries/src/Amazon.Lambda.KinesisEvents/Amazon.Lambda.KinesisEvents.csproj @@ -3,7 +3,7 @@ - netstandard2.0 + netstandard2.0;netcoreapp3.1 Amazon Lambda .NET Core support - KinesisEvents package. Amazon.Lambda.KinesisEvents 2.1.0 diff --git a/Libraries/src/Amazon.Lambda.KinesisEvents/README.md b/Libraries/src/Amazon.Lambda.KinesisEvents/README.md index 4d6c715ba..3b7675752 100644 --- a/Libraries/src/Amazon.Lambda.KinesisEvents/README.md +++ b/Libraries/src/Amazon.Lambda.KinesisEvents/README.md @@ -37,7 +37,39 @@ public class Function } ``` -The following is a sample class and Lambda function that receives Amazon Kinesis event when using time windows as an input and uses `KinesisTimeWindowResponse` object to demonstrates how to aggregate and then process the final state. (Note that by default anything written to Console will be logged as CloudWatch Logs events.) +The following is a sample class and Lambda function that receives Amazon Kinesis event record data as an input and uses `StreamsEventResponse` object to return batch item failures, if any. (Note that by default anything written to Console will be logged as CloudWatch Logs events.) + +```csharp +public class Function +{ + public StreamsEventResponse Handler(KinesisEvent kinesisEvent) + { + var batchItemFailures = new List(); + string curRecordSequenceNumber = string.Empty; + + foreach (var record in kinesisEvent.Records) + { + try + { + //Process your record + var kinesisRecord = record.Kinesis; + curRecordSequenceNumber = kinesisRecord.SequenceNumber; + } + catch (Exception e) + { + /* Since we are working with streams, we can return the failed item immediately. + Lambda will immediately begin to retry processing from this failed item onwards. */ + batchItemFailures.Add(new StreamsEventResponse.BatchItemFailure() { ItemIdentifier = curRecordSequenceNumber }); + return new StreamsEventResponse() { BatchItemFailures = batchItemFailures }; + } + } + + return new StreamsEventResponse() { BatchItemFailures = batchItemFailures }; + } +} +``` + +The following is a sample class and Lambda function that receives Amazon Kinesis event when using time windows as an input and uses `KinesisTimeWindowResponse` object to demonstrate how to aggregate and then process the final state. (Note that by default anything written to Console will be logged as CloudWatch Logs events.) ```csharp public class Function diff --git a/Libraries/src/Amazon.Lambda.KinesisEvents/StreamsEventResponse.cs b/Libraries/src/Amazon.Lambda.KinesisEvents/StreamsEventResponse.cs new file mode 100644 index 000000000..390ce7d01 --- /dev/null +++ b/Libraries/src/Amazon.Lambda.KinesisEvents/StreamsEventResponse.cs @@ -0,0 +1,38 @@ +namespace Amazon.Lambda.KinesisEvents +{ + using System.Collections.Generic; + using System.Runtime.Serialization; + + /// + /// Function response type to report batch item failures for KinesisEvent. + /// https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html#services-kinesis-batchfailurereporting + /// + [DataContract] + public class StreamsEventResponse + { + /// + /// A list of records which failed processing. Returning the first record which failed would retry all remaining records from the batch. + /// + [DataMember(Name = "batchItemFailures", EmitDefaultValue = false)] +#if NETCOREAPP3_1 + [System.Text.Json.Serialization.JsonPropertyName("batchItemFailures")] +#endif + public IList BatchItemFailures { get; set; } + + /// + /// The class representing the BatchItemFailure. + /// + [DataContract] + public class BatchItemFailure + { + /// + /// Sequence number of the record which failed processing. + /// + [DataMember(Name = "itemIdentifier", EmitDefaultValue = false)] +#if NETCOREAPP3_1 + [System.Text.Json.Serialization.JsonPropertyName("itemIdentifier")] +#endif + public string ItemIdentifier { get; set; } + } + } +} \ No newline at end of file diff --git a/Libraries/test/EventsTests.Shared/EventTests.cs b/Libraries/test/EventsTests.Shared/EventTests.cs index 97777caa9..42d4dde1a 100644 --- a/Libraries/test/EventsTests.Shared/EventTests.cs +++ b/Libraries/test/EventsTests.Shared/EventTests.cs @@ -5,6 +5,7 @@ namespace Amazon.Lambda.Tests using Amazon.Lambda.ApplicationLoadBalancerEvents; using Amazon.Lambda.CloudWatchEvents.BatchEvents; using Amazon.Lambda.CloudWatchEvents.ECSEvents; + using Amazon.Lambda.CloudWatchEvents.S3Events; using Amazon.Lambda.CloudWatchEvents.ScheduledEvents; using Amazon.Lambda.CloudWatchEvents.TranscribeEvents; using Amazon.Lambda.CloudWatchEvents.TranslateEvents; @@ -19,14 +20,13 @@ namespace Amazon.Lambda.Tests using Amazon.Lambda.KinesisEvents; using Amazon.Lambda.KinesisFirehoseEvents; using Amazon.Lambda.LexEvents; + using Amazon.Lambda.LexV2Events; using Amazon.Lambda.MQEvents; using Amazon.Lambda.S3Events; using Amazon.Lambda.SimpleEmailEvents; using Amazon.Lambda.SNSEvents; using Amazon.Lambda.SQSEvents; - using Amazon.Lambda.LexV2Events; - - + using Newtonsoft.Json; using Newtonsoft.Json.Linq; using Newtonsoft.Json.Serialization; using System; @@ -35,10 +35,6 @@ namespace Amazon.Lambda.Tests using System.Linq; using System.Text; using Xunit; - using Newtonsoft.Json; - using Amazon.Lambda.CloudWatchEvents; - using Amazon.Lambda.CloudWatchEvents.S3Events; - using JsonSerializer = Amazon.Lambda.Serialization.Json.JsonSerializer; public class EventTest @@ -336,6 +332,33 @@ private void Handle(KinesisEvent kinesisEvent) } } + [Theory] + [InlineData(typeof(JsonSerializer))] +#if NETCOREAPP3_1_OR_GREATER + [InlineData(typeof(Amazon.Lambda.Serialization.SystemTextJson.LambdaJsonSerializer))] + [InlineData(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))] +#endif + public void KinesisBatchItemFailuresTest(Type serializerType) + { + var serializer = Activator.CreateInstance(serializerType) as ILambdaSerializer; + using (var fileStream = LoadJsonTestFile("kinesis-batchitemfailures-response.json")) + { + var kinesisStreamsEventResponse = serializer.Deserialize(fileStream); + + Assert.Equal(1, kinesisStreamsEventResponse.BatchItemFailures.Count); + Assert.Equal("1405400000000002063282832", kinesisStreamsEventResponse.BatchItemFailures[0].ItemIdentifier); + + MemoryStream ms = new MemoryStream(); + serializer.Serialize(kinesisStreamsEventResponse, ms); + ms.Position = 0; + var json = new StreamReader(ms).ReadToEnd(); + + var original = JObject.Parse(File.ReadAllText("kinesis-batchitemfailures-response.json")); + var serialized = JObject.Parse(json); + Assert.True(JToken.DeepEquals(serialized, original), "Serialized object is not the same as the original JSON"); + } + } + [Theory] [InlineData(typeof(JsonSerializer))] #if NETCOREAPP3_1_OR_GREATER @@ -458,7 +481,7 @@ public void DynamoDbUpdateTest(Type serializerType) [Theory] [InlineData(typeof(JsonSerializer))] -#if NETCOREAPP3_1_OR_GREATER +#if NETCOREAPP3_1_OR_GREATER [InlineData(typeof(Amazon.Lambda.Serialization.SystemTextJson.LambdaJsonSerializer))] [InlineData(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))] #endif @@ -467,13 +490,13 @@ public void DynamoDbBatchItemFailuresTest(Type serializerType) var serializer = Activator.CreateInstance(serializerType) as ILambdaSerializer; using (var fileStream = LoadJsonTestFile("dynamodb-batchitemfailures-response.json")) { - var dynamoDbStreamsEventResponse = serializer.Deserialize(fileStream); + var dynamoDbStreamsEventResponse = serializer.Deserialize(fileStream); Assert.Equal(1, dynamoDbStreamsEventResponse.BatchItemFailures.Count); Assert.Equal("1405400000000002063282832", dynamoDbStreamsEventResponse.BatchItemFailures[0].ItemIdentifier); MemoryStream ms = new MemoryStream(); - serializer.Serialize(dynamoDbStreamsEventResponse, ms); + serializer.Serialize(dynamoDbStreamsEventResponse, ms); ms.Position = 0; var json = new StreamReader(ms).ReadToEnd(); diff --git a/Libraries/test/EventsTests.Shared/EventsTests.Shared.projitems b/Libraries/test/EventsTests.Shared/EventsTests.Shared.projitems index 12fbe4fba..e34d37f62 100644 --- a/Libraries/test/EventsTests.Shared/EventsTests.Shared.projitems +++ b/Libraries/test/EventsTests.Shared/EventsTests.Shared.projitems @@ -27,6 +27,7 @@ + diff --git a/Libraries/test/EventsTests.Shared/kinesis-batchitemfailures-response.json b/Libraries/test/EventsTests.Shared/kinesis-batchitemfailures-response.json new file mode 100644 index 000000000..e6bcc0333 --- /dev/null +++ b/Libraries/test/EventsTests.Shared/kinesis-batchitemfailures-response.json @@ -0,0 +1,7 @@ +{ + "batchItemFailures": [ + { + "itemIdentifier": "1405400000000002063282832" + } + ] +} \ No newline at end of file