From 72d4956a4f61b2b68db1d184170c841f776cd359 Mon Sep 17 00:00:00 2001 From: Ashish Dhingra Date: Tue, 1 Aug 2023 15:46:21 -0700 Subject: [PATCH] Added support for DynamoDBTimeWindowEvent and KinesisTimeWindowEvent. --- .../Amazon.Lambda.DynamoDBEvents.csproj | 2 +- .../DynamoDBTimeWindowEvent.cs | 58 ++++++ .../DynamoDBTimeWindowResponse.cs | 48 +++++ .../Amazon.Lambda.DynamoDBEvents/README.md | 51 +++++ .../Amazon.Lambda.KinesisEvents.csproj | 2 +- .../KinesisTimeWindowEvent.cs | 58 ++++++ .../KinesisTimeWindowResponse.cs | 48 +++++ .../src/Amazon.Lambda.KinesisEvents/README.md | 51 +++++ .../AbstractLambdaJsonSerializer.cs | 3 +- ...Lambda.Serialization.SystemTextJson.csproj | 2 +- .../Converters/LongToStringJsonConverter.cs | 35 ++++ .../LambdaJsonSerializer.cs | 1 + .../test/EventsTests.Shared/EventTests.cs | 185 ++++++++++++++++++ .../EventsTests.Shared.projitems | 4 + .../dynamodb-timewindow-event.json | 101 ++++++++++ .../dynamodb-timewindow-response.json | 7 + .../kinesis-timewindow-event.json | 32 +++ .../kinesis-timewindow-response.json | 7 + 18 files changed, 691 insertions(+), 4 deletions(-) create mode 100644 Libraries/src/Amazon.Lambda.DynamoDBEvents/DynamoDBTimeWindowEvent.cs create mode 100644 Libraries/src/Amazon.Lambda.DynamoDBEvents/DynamoDBTimeWindowResponse.cs create mode 100644 Libraries/src/Amazon.Lambda.KinesisEvents/KinesisTimeWindowEvent.cs create mode 100644 Libraries/src/Amazon.Lambda.KinesisEvents/KinesisTimeWindowResponse.cs create mode 100644 Libraries/src/Amazon.Lambda.Serialization.SystemTextJson/Converters/LongToStringJsonConverter.cs create mode 100644 Libraries/test/EventsTests.Shared/dynamodb-timewindow-event.json create mode 100644 Libraries/test/EventsTests.Shared/dynamodb-timewindow-response.json create mode 100644 Libraries/test/EventsTests.Shared/kinesis-timewindow-event.json create mode 100644 Libraries/test/EventsTests.Shared/kinesis-timewindow-response.json diff --git a/Libraries/src/Amazon.Lambda.DynamoDBEvents/Amazon.Lambda.DynamoDBEvents.csproj b/Libraries/src/Amazon.Lambda.DynamoDBEvents/Amazon.Lambda.DynamoDBEvents.csproj index 7d50d1fe8..bb33c3a40 100644 --- a/Libraries/src/Amazon.Lambda.DynamoDBEvents/Amazon.Lambda.DynamoDBEvents.csproj +++ b/Libraries/src/Amazon.Lambda.DynamoDBEvents/Amazon.Lambda.DynamoDBEvents.csproj @@ -6,7 +6,7 @@ netstandard2.0;netcoreapp3.1 Amazon Lambda .NET Core support - DynamoDBEvents package. Amazon.Lambda.DynamoDBEvents - 2.1.1 + 2.2.0 Amazon.Lambda.DynamoDBEvents Amazon.Lambda.DynamoDBEvents AWS;Amazon;Lambda diff --git a/Libraries/src/Amazon.Lambda.DynamoDBEvents/DynamoDBTimeWindowEvent.cs b/Libraries/src/Amazon.Lambda.DynamoDBEvents/DynamoDBTimeWindowEvent.cs new file mode 100644 index 000000000..321c44092 --- /dev/null +++ b/Libraries/src/Amazon.Lambda.DynamoDBEvents/DynamoDBTimeWindowEvent.cs @@ -0,0 +1,58 @@ +namespace Amazon.Lambda.DynamoDBEvents +{ + using System; + using System.Collections.Generic; + + /// + /// Represents an Amazon DynamodDB event when using time windows. + /// https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html#services-ddb-windows + /// + public class DynamoDBTimeWindowEvent : DynamoDBEvent + { + /// + /// Time window for the records in the event. + /// + public TimeWindow Window { get; set; } + + /// + /// State being built up to this invoke in the time window. + /// + public Dictionary State { get; set; } + + /// + /// Shard Id of the records. + /// + public string ShardId { get; set; } + + /// + /// DynamoDB stream Arn. + /// + public string EventSourceArn { get; set; } + + /// + /// Set to true for the last invoke of the time window. Subsequent invoke will start a new time window along with a fresh state. + /// + public bool? IsFinalInvokeForWindow { get; set; } + + /// + /// Set to true if window is terminated prematurely. Subsequent invoke will continue the same window with a fresh state. + /// + public bool? IsWindowTerminatedEarly { get; set; } + + /// + /// Time window for the records in the event. + /// + public class TimeWindow + { + /// + /// Window start instant. + /// + public DateTime Start { get; set; } + + /// + /// Window end instant. + /// + public DateTime End { get; set; } + } + } +} diff --git a/Libraries/src/Amazon.Lambda.DynamoDBEvents/DynamoDBTimeWindowResponse.cs b/Libraries/src/Amazon.Lambda.DynamoDBEvents/DynamoDBTimeWindowResponse.cs new file mode 100644 index 000000000..d905515d4 --- /dev/null +++ b/Libraries/src/Amazon.Lambda.DynamoDBEvents/DynamoDBTimeWindowResponse.cs @@ -0,0 +1,48 @@ +namespace Amazon.Lambda.DynamoDBEvents +{ + using System; + using System.Collections.Generic; + using System.Runtime.Serialization; + + /// + /// Response type to return a new state for the time window and to report batch item failures for DynamoDBTimeWindowResponse. + /// + [DataContract] + public class DynamoDBTimeWindowResponse + { + /// + /// New state after processing a batch of records. + /// + [DataMember(Name = "state")] +#if NETCOREAPP_3_1 + [System.Text.Json.Serialization.JsonPropertyName("state")] +#endif + public Dictionary State { get; set; } + + /// + /// A list of records which failed processing. + /// Returning the first record which failed would retry all remaining records from the batch. + /// + [DataMember(Name = "batchItemFailures")] +#if NETCOREAPP_3_1 + [System.Text.Json.Serialization.JsonPropertyName("batchItemFailures")] +#endif + public IList BatchItemFailures { get; set; } + + /// + /// Class representing the individual record which failed processing. + /// + [DataContract] + public class BatchItemFailure + { + /// + /// Sequence number of the record which failed processing. + /// + [DataMember(Name = "itemIdentifier")] +#if NETCOREAPP_3_1 + [System.Text.Json.Serialization.JsonPropertyName("itemIdentifier")] +#endif + public string ItemIdentifier { get; set; } + } + } +} diff --git a/Libraries/src/Amazon.Lambda.DynamoDBEvents/README.md b/Libraries/src/Amazon.Lambda.DynamoDBEvents/README.md index 165af79e0..1b9654169 100644 --- a/Libraries/src/Amazon.Lambda.DynamoDBEvents/README.md +++ b/Libraries/src/Amazon.Lambda.DynamoDBEvents/README.md @@ -56,3 +56,54 @@ public class Function } } ``` + +The following is a sample class and Lambda function that receives Amazon DynamoDB event when using time windows as an input and uses `DynamoDBTimeWindowResponse` object to demonstrates how to aggregate and then process the final state. (Note that by default anything written to Console will be logged as CloudWatch Logs events.) + +```csharp +public class Function +{ + public DynamoDBTimeWindowResponse Handler(DynamoDBTimeWindowEvent ddbTimeWindowEvent) + { + Console.WriteLine($"Incoming event, Source Arn: {ddbTimeWindowEvent.EventSourceArn}, Shard Id: {ddbTimeWindowEvent.ShardId}"); + Console.WriteLine($"Incoming state: {string.Join(';', ddbTimeWindowEvent.State.Select(s => s.Key + "=" + s.Value))}"); + + //Check if this is the end of the window to either aggregate or process. + if (ddbTimeWindowEvent.IsFinalInvokeForWindow.HasValue && ddbTimeWindowEvent.IsFinalInvokeForWindow.Value) + { + // Logic to handle final state of the window + Console.WriteLine("Destination invoke"); + } + else + { + Console.WriteLine("Aggregate invoke"); + } + + //Check for early terminations + if (ddbTimeWindowEvent.IsWindowTerminatedEarly.HasValue && ddbTimeWindowEvent.IsWindowTerminatedEarly.Value) + { + Console.WriteLine("Window terminated early"); + } + + // Aggregation logic + var state = ddbTimeWindowEvent.State; + foreach (var record in ddbTimeWindowEvent.Records) + { + int id; + if (!state.ContainsKey(record.Dynamodb.NewImage["Id"].N) || int.TryParse(record.Dynamodb.NewImage["Id"].N, out id)) + { + state[record.Dynamodb.NewImage["Id"].N] = "1"; + } + else + { + state[record.Dynamodb.NewImage["Id"].N] = (id + 1).ToString(); + } + } + + Console.WriteLine($"Returning state: {string.Join(';', state.Select(s => s.Key + "=" + s.Value))}"); + return new DynamoDBTimeWindowResponse() + { + State = state + }; + } +} +``` diff --git a/Libraries/src/Amazon.Lambda.KinesisEvents/Amazon.Lambda.KinesisEvents.csproj b/Libraries/src/Amazon.Lambda.KinesisEvents/Amazon.Lambda.KinesisEvents.csproj index f904b9ed9..578eda695 100644 --- a/Libraries/src/Amazon.Lambda.KinesisEvents/Amazon.Lambda.KinesisEvents.csproj +++ b/Libraries/src/Amazon.Lambda.KinesisEvents/Amazon.Lambda.KinesisEvents.csproj @@ -6,7 +6,7 @@ netstandard2.0 Amazon Lambda .NET Core support - KinesisEvents package. Amazon.Lambda.KinesisEvents - 2.0.0 + 2.1.0 Amazon.Lambda.KinesisEvents Amazon.Lambda.KinesisEvents AWS;Amazon;Lambda diff --git a/Libraries/src/Amazon.Lambda.KinesisEvents/KinesisTimeWindowEvent.cs b/Libraries/src/Amazon.Lambda.KinesisEvents/KinesisTimeWindowEvent.cs new file mode 100644 index 000000000..41ce04515 --- /dev/null +++ b/Libraries/src/Amazon.Lambda.KinesisEvents/KinesisTimeWindowEvent.cs @@ -0,0 +1,58 @@ +namespace Amazon.Lambda.KinesisEvents +{ + using System; + using System.Collections.Generic; + + /// + /// Represents an Amazon Kinesis event when using time windows. + /// https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html#services-kinesis-windows + /// + public class KinesisTimeWindowEvent : KinesisEvent + { + /// + /// Time window for the records in the event. + /// + public TimeWindow Window { get; set; } + + /// + /// State being built up to this invoke in the time window. + /// + public Dictionary State { get; set; } + + /// + /// Shard Id of the records. + /// + public string ShardId { get; set; } + + /// + /// Kinesis stream or consumer ARN. + /// + public string EventSourceARN { get; set; } + + /// + /// Set to true for the last invoke of the time window. Subsequent invoke will start a new time window along with a fresh state. + /// + public bool? IsFinalInvokeForWindow { get; set; } + + /// + /// Set to true if window is terminated prematurely. Subsequent invoke will continue the same window with a fresh state. + /// + public bool? IsWindowTerminatedEarly { get; set; } + + /// + /// Time window for the records in the event. + /// + public class TimeWindow + { + /// + /// Window start instant. + /// + public DateTime Start { get; set; } + + /// + /// Window end instant. + /// + public DateTime End { get; set; } + } + } +} diff --git a/Libraries/src/Amazon.Lambda.KinesisEvents/KinesisTimeWindowResponse.cs b/Libraries/src/Amazon.Lambda.KinesisEvents/KinesisTimeWindowResponse.cs new file mode 100644 index 000000000..7b6da98f7 --- /dev/null +++ b/Libraries/src/Amazon.Lambda.KinesisEvents/KinesisTimeWindowResponse.cs @@ -0,0 +1,48 @@ +namespace Amazon.Lambda.KinesisEvents +{ + using System; + using System.Collections.Generic; + using System.Runtime.Serialization; + + /// + /// Response type to return a new state for the time window and to report batch item failures for KinesisTimeWindowEvent. + /// + [DataContract] + public class KinesisTimeWindowResponse + { + /// + /// New state after processing a batch of records. + /// + [DataMember(Name = "state")] +#if NETCOREAPP_3_1 + [System.Text.Json.Serialization.JsonPropertyName("state")] +#endif + public Dictionary State { get; set; } + + /// + /// A list of records which failed processing. + /// Returning the first record which failed would retry all remaining records from the batch. + /// + [DataMember(Name = "batchItemFailures")] +#if NETCOREAPP_3_1 + [System.Text.Json.Serialization.JsonPropertyName("batchItemFailures")] +#endif + public IList BatchItemFailures { get; set; } + + /// + /// Class representing the individual record which failed processing. + /// + [DataContract] + public class BatchItemFailure + { + /// + /// Sequence number of the record which failed processing. + /// + [DataMember(Name = "itemIdentifier")] +#if NETCOREAPP_3_1 + [System.Text.Json.Serialization.JsonPropertyName("itemIdentifier")] +#endif + public string ItemIdentifier { get; set; } + } + } +} diff --git a/Libraries/src/Amazon.Lambda.KinesisEvents/README.md b/Libraries/src/Amazon.Lambda.KinesisEvents/README.md index 9e2bc9865..4d6c715ba 100644 --- a/Libraries/src/Amazon.Lambda.KinesisEvents/README.md +++ b/Libraries/src/Amazon.Lambda.KinesisEvents/README.md @@ -36,3 +36,54 @@ public class Function } } ``` + +The following is a sample class and Lambda function that receives Amazon Kinesis event when using time windows as an input and uses `KinesisTimeWindowResponse` object to demonstrates how to aggregate and then process the final state. (Note that by default anything written to Console will be logged as CloudWatch Logs events.) + +```csharp +public class Function +{ + public KinesisTimeWindowResponse Handler(KinesisTimeWindowEvent kinesisTimeWindowEvent) + { + Console.WriteLine($"Incoming event, Source Arn: {kinesisTimeWindowEvent.EventSourceARN}, Shard Id: {kinesisTimeWindowEvent.ShardId}"); + Console.WriteLine($"Incoming state: {string.Join(';', kinesisTimeWindowEvent.State.Select(s => s.Key + "=" + s.Value))}"); + + //Check if this is the end of the window to either aggregate or process. + if (kinesisTimeWindowEvent.IsFinalInvokeForWindow.HasValue && kinesisTimeWindowEvent.IsFinalInvokeForWindow.Value) + { + // Logic to handle final state of the window + Console.WriteLine("Destination invoke"); + } + else + { + Console.WriteLine("Aggregate invoke"); + } + + //Check for early terminations + if (kinesisTimeWindowEvent.IsWindowTerminatedEarly.HasValue && kinesisTimeWindowEvent.IsWindowTerminatedEarly.Value) + { + Console.WriteLine("Window terminated early"); + } + + // Aggregation logic + var state = kinesisTimeWindowEvent.State; + foreach (var record in kinesisTimeWindowEvent.Records) + { + int id; + if (!state.ContainsKey(record.Kinesis.PartitionKey) || int.TryParse(record.Kinesis.PartitionKey, out id)) + { + state[record.Kinesis.PartitionKey] = "1"; + } + else + { + state[record.Kinesis.PartitionKey] = (id + 1).ToString(); + } + } + + Console.WriteLine($"Returning state: {string.Join(';', state.Select(s => s.Key + "=" + s.Value))}"); + return new KinesisTimeWindowResponse() + { + State = state + }; + } +} +``` diff --git a/Libraries/src/Amazon.Lambda.Serialization.SystemTextJson/AbstractLambdaJsonSerializer.cs b/Libraries/src/Amazon.Lambda.Serialization.SystemTextJson/AbstractLambdaJsonSerializer.cs index 0f27cf3fa..f7364ce76 100644 --- a/Libraries/src/Amazon.Lambda.Serialization.SystemTextJson/AbstractLambdaJsonSerializer.cs +++ b/Libraries/src/Amazon.Lambda.Serialization.SystemTextJson/AbstractLambdaJsonSerializer.cs @@ -142,7 +142,8 @@ protected virtual JsonSerializerOptions CreateDefaultJsonSerializationOptions() new DateTimeConverter(), new MemoryStreamConverter(), new ConstantClassConverter(), - new ByteArrayConverter() + new ByteArrayConverter(), + new LongToStringJsonConverter() } }; diff --git a/Libraries/src/Amazon.Lambda.Serialization.SystemTextJson/Amazon.Lambda.Serialization.SystemTextJson.csproj b/Libraries/src/Amazon.Lambda.Serialization.SystemTextJson/Amazon.Lambda.Serialization.SystemTextJson.csproj index 9987bad9f..457df206a 100644 --- a/Libraries/src/Amazon.Lambda.Serialization.SystemTextJson/Amazon.Lambda.Serialization.SystemTextJson.csproj +++ b/Libraries/src/Amazon.Lambda.Serialization.SystemTextJson/Amazon.Lambda.Serialization.SystemTextJson.csproj @@ -9,7 +9,7 @@ Amazon.Lambda.Serialization.SystemTextJson Amazon.Lambda.Serialization.SystemTextJson AWS;Amazon;Lambda - 2.3.1 + 2.3.2 README.md diff --git a/Libraries/src/Amazon.Lambda.Serialization.SystemTextJson/Converters/LongToStringJsonConverter.cs b/Libraries/src/Amazon.Lambda.Serialization.SystemTextJson/Converters/LongToStringJsonConverter.cs new file mode 100644 index 000000000..fd99a6e73 --- /dev/null +++ b/Libraries/src/Amazon.Lambda.Serialization.SystemTextJson/Converters/LongToStringJsonConverter.cs @@ -0,0 +1,35 @@ +using System; +using System.Buffers; +using System.Buffers.Text; +using System.Text.Json; +using System.Text.Json.Serialization; + +namespace Amazon.Lambda.Serialization.SystemTextJson.Converters +{ + public class LongToStringJsonConverter : JsonConverter + { + public override string Read(ref Utf8JsonReader reader, Type type, JsonSerializerOptions options) + { + if (reader.TokenType != JsonTokenType.Number && + type == typeof(String)) + return reader.GetString(); + + var span = reader.HasValueSequence ? reader.ValueSequence.ToArray() : reader.ValueSpan; + + if (Utf8Parser.TryParse(span, out long number, out var bytesConsumed) && span.Length == bytesConsumed) + return number.ToString(); + + var data = reader.GetString(); + + throw new InvalidOperationException($"'{data}' is not a correct expected value!") + { + Source = "LongToStringJsonConverter" + }; + } + + public override void Write(Utf8JsonWriter writer, string value, JsonSerializerOptions options) + { + writer.WriteStringValue(value.ToString()); + } + } +} diff --git a/Libraries/src/Amazon.Lambda.Serialization.SystemTextJson/LambdaJsonSerializer.cs b/Libraries/src/Amazon.Lambda.Serialization.SystemTextJson/LambdaJsonSerializer.cs index ffe817119..f1c5c0d6a 100644 --- a/Libraries/src/Amazon.Lambda.Serialization.SystemTextJson/LambdaJsonSerializer.cs +++ b/Libraries/src/Amazon.Lambda.Serialization.SystemTextJson/LambdaJsonSerializer.cs @@ -46,6 +46,7 @@ public LambdaJsonSerializer() _options.Converters.Add(new MemoryStreamConverter()); _options.Converters.Add(new ConstantClassConverter()); _options.Converters.Add(new ByteArrayConverter()); + _options.Converters.Add(new LongToStringJsonConverter()); WriterOptions = new JsonWriterOptions() { diff --git a/Libraries/test/EventsTests.Shared/EventTests.cs b/Libraries/test/EventsTests.Shared/EventTests.cs index 95afe5ef6..717c4a2af 100644 --- a/Libraries/test/EventsTests.Shared/EventTests.cs +++ b/Libraries/test/EventsTests.Shared/EventTests.cs @@ -336,6 +336,88 @@ private void Handle(KinesisEvent kinesisEvent) } } + [Theory] + [InlineData(typeof(JsonSerializer))] +#if NETCOREAPP3_1_OR_GREATER + [InlineData(typeof(Amazon.Lambda.Serialization.SystemTextJson.LambdaJsonSerializer))] + [InlineData(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))] +#endif + public void KinesisTimeWindowTest(Type serializerType) + { + var serializer = Activator.CreateInstance(serializerType) as ILambdaSerializer; + using (var fileStream = LoadJsonTestFile("kinesis-timewindow-event.json")) + { + var kinesisTimeWindowEvent = serializer.Deserialize(fileStream); + + Assert.Equal(kinesisTimeWindowEvent.ShardId, "shardId-000000000006"); + Assert.Equal(kinesisTimeWindowEvent.EventSourceARN, "arn:aws:kinesis:us-east-1:123456789012:stream/lambda-stream"); + Assert.False(kinesisTimeWindowEvent.IsFinalInvokeForWindow); + Assert.False(kinesisTimeWindowEvent.IsWindowTerminatedEarly); + Assert.Equal(kinesisTimeWindowEvent.State.Count, 2); + Assert.True(kinesisTimeWindowEvent.State.ContainsKey("1")); + Assert.Equal(kinesisTimeWindowEvent.State["1"], "282"); + Assert.True(kinesisTimeWindowEvent.State.ContainsKey("2")); + Assert.Equal(kinesisTimeWindowEvent.State["2"], "715"); + Assert.NotNull(kinesisTimeWindowEvent.Window); + Assert.Equal(637430942400000000, kinesisTimeWindowEvent.Window.Start.Ticks); + Assert.Equal(637430943600000000, kinesisTimeWindowEvent.Window.End.Ticks); + + Assert.Equal(kinesisTimeWindowEvent.Records.Count, 1); + var record = kinesisTimeWindowEvent.Records[0]; + Assert.Equal(record.EventId, "shardId-000000000006:49590338271490256608559692538361571095921575989136588898"); + Assert.Equal(record.EventName, "aws:kinesis:record"); + Assert.Equal(record.EventVersion, "1.0"); + Assert.Equal(record.EventSource, "aws:kinesis"); + Assert.Equal(record.InvokeIdentityArn, "arn:aws:iam::123456789012:role/lambda-kinesis-role"); + Assert.Equal(record.AwsRegion, "us-east-1"); + Assert.Equal(record.EventSourceARN, "arn:aws:kinesis:us-east-1:123456789012:stream/lambda-stream"); + + Assert.Equal(record.Kinesis.KinesisSchemaVersion, "1.0"); + Assert.Equal(record.Kinesis.PartitionKey, "1"); + Assert.Equal(record.Kinesis.SequenceNumber, "49590338271490256608559692538361571095921575989136588898"); + var dataBytes = record.Kinesis.Data.ToArray(); + Assert.Equal(Convert.ToBase64String(dataBytes), "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg=="); + Assert.Equal(Encoding.UTF8.GetString(dataBytes), "Hello, this is a test."); + Assert.Equal(637430942750000000, record.Kinesis.ApproximateArrivalTimestamp.ToUniversalTime().Ticks); + + Handle(kinesisTimeWindowEvent); + } + } + + private void Handle(KinesisTimeWindowEvent kinesisTimeWindowEvent) + { + foreach (var record in kinesisTimeWindowEvent.Records) + { + var kinesisRecord = record.Kinesis; + var dataBytes = kinesisRecord.Data.ToArray(); + var dataText = Encoding.UTF8.GetString(dataBytes); + Assert.Equal("Hello, this is a test.", dataText); + Console.WriteLine($"[{record.EventName}] Data = '{dataText}'."); + } + } + + [Theory] + [InlineData(typeof(JsonSerializer))] +#if NETCOREAPP3_1_OR_GREATER + [InlineData(typeof(Amazon.Lambda.Serialization.SystemTextJson.LambdaJsonSerializer))] + [InlineData(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))] +#endif + public void KinesisTimeWindowResponseTest(Type serializerType) + { + var serializer = Activator.CreateInstance(serializerType) as ILambdaSerializer; + using (var fileStream = LoadJsonTestFile("kinesis-timewindow-response.json")) + { + var kinesisTimeWindowResponse = serializer.Deserialize(fileStream); + + Assert.Equal(kinesisTimeWindowResponse.State.Count, 2); + Assert.True(kinesisTimeWindowResponse.State.ContainsKey("1")); + Assert.Equal(kinesisTimeWindowResponse.State["1"], "282"); + Assert.True(kinesisTimeWindowResponse.State.ContainsKey("2")); + Assert.Equal(kinesisTimeWindowResponse.State["2"], "715"); + Assert.Equal(kinesisTimeWindowResponse.BatchItemFailures.Count, 0); + } + } + [Theory] [InlineData(typeof(JsonSerializer))] #if NETCOREAPP3_1_OR_GREATER @@ -412,6 +494,109 @@ private static void Handle(DynamoDBEvent ddbEvent) Console.WriteLine($"Successfully processed {ddbEvent.Records.Count} records."); } + [Theory] + [InlineData(typeof(JsonSerializer))] +#if NETCOREAPP3_1_OR_GREATER + [InlineData(typeof(Amazon.Lambda.Serialization.SystemTextJson.LambdaJsonSerializer))] + [InlineData(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))] +#endif + public void DynamoDBTimeWindowTest(Type serializerType) + { + var serializer = Activator.CreateInstance(serializerType) as ILambdaSerializer; + using (var fileStream = LoadJsonTestFile("dynamodb-timewindow-event.json")) + { + var dynamoDBTimeWindowEvent = serializer.Deserialize(fileStream); + + Assert.Equal(dynamoDBTimeWindowEvent.ShardId, "shard123456789"); + Assert.Equal(dynamoDBTimeWindowEvent.EventSourceArn, "stream-ARN"); + Assert.False(dynamoDBTimeWindowEvent.IsFinalInvokeForWindow); + Assert.False(dynamoDBTimeWindowEvent.IsWindowTerminatedEarly); + Assert.Equal(dynamoDBTimeWindowEvent.State.Count, 1); + Assert.True(dynamoDBTimeWindowEvent.State.ContainsKey("1")); + Assert.Equal(dynamoDBTimeWindowEvent.State["1"], "state1"); + Assert.NotNull(dynamoDBTimeWindowEvent.Window); + Assert.Equal(637317252000000000, dynamoDBTimeWindowEvent.Window.Start.Ticks); + Assert.Equal(637317255000000000, dynamoDBTimeWindowEvent.Window.End.Ticks); + + Assert.Equal(dynamoDBTimeWindowEvent.Records.Count, 3); + + var record1 = dynamoDBTimeWindowEvent.Records[0]; + Assert.Equal(record1.EventID, "1"); + Assert.Equal(record1.EventName, "INSERT"); + Assert.Equal(record1.EventVersion, "1.0"); + Assert.Equal(record1.EventSource, "aws:dynamodb"); + Assert.Equal(record1.AwsRegion, "us-east-1"); + Assert.Equal(record1.EventSourceArn, "stream-ARN"); + Assert.Equal(record1.Dynamodb.Keys.Count, 1); + Assert.Equal(record1.Dynamodb.Keys["Id"].N, "101"); + Assert.Equal(record1.Dynamodb.SequenceNumber, "111"); + Assert.Equal(record1.Dynamodb.SizeBytes, 26); + Assert.Equal(record1.Dynamodb.StreamViewType, "NEW_IMAGE"); + Assert.Equal(record1.Dynamodb.NewImage.Count, 2); + Assert.Equal(record1.Dynamodb.NewImage["Message"].S, "New item!"); + Assert.Equal(record1.Dynamodb.NewImage["Id"].N, "101"); + Assert.Equal(record1.Dynamodb.OldImage.Count, 0); + + var record2 = dynamoDBTimeWindowEvent.Records[1]; + Assert.Equal(record2.EventID, "2"); + Assert.Equal(record2.EventName, "MODIFY"); + Assert.Equal(record2.EventVersion, "1.0"); + Assert.Equal(record2.EventSource, "aws:dynamodb"); + Assert.Equal(record2.AwsRegion, "us-east-1"); + Assert.Equal(record2.EventSourceArn, "stream-ARN"); + Assert.Equal(record2.Dynamodb.Keys.Count, 1); + Assert.Equal(record2.Dynamodb.Keys["Id"].N, "101"); + Assert.Equal(record2.Dynamodb.SequenceNumber, "222"); + Assert.Equal(record2.Dynamodb.SizeBytes, 59); + Assert.Equal(record2.Dynamodb.StreamViewType, "NEW_AND_OLD_IMAGES"); + Assert.Equal(record2.Dynamodb.NewImage.Count, 2); + Assert.Equal(record2.Dynamodb.NewImage["Message"].S, "This item has changed"); + Assert.Equal(record2.Dynamodb.NewImage["Id"].N, "101"); + Assert.Equal(record2.Dynamodb.OldImage.Count, 2); + Assert.Equal(record2.Dynamodb.OldImage["Message"].S, "New item!"); + Assert.Equal(record2.Dynamodb.OldImage["Id"].N, "101"); + + var record3 = dynamoDBTimeWindowEvent.Records[2]; + Assert.Equal(record3.EventID, "3"); + Assert.Equal(record3.EventName, "REMOVE"); + Assert.Equal(record3.EventVersion, "1.0"); + Assert.Equal(record3.EventSource, "aws:dynamodb"); + Assert.Equal(record3.AwsRegion, "us-east-1"); + Assert.Equal(record3.EventSourceArn, "stream-ARN"); + Assert.Equal(record3.Dynamodb.Keys.Count, 1); + Assert.Equal(record3.Dynamodb.Keys["Id"].N, "101"); + Assert.Equal(record3.Dynamodb.SequenceNumber, "333"); + Assert.Equal(record3.Dynamodb.SizeBytes, 38); + Assert.Equal(record3.Dynamodb.StreamViewType, "NEW_AND_OLD_IMAGES"); + Assert.Equal(record3.Dynamodb.NewImage.Count, 0); + Assert.Equal(record3.Dynamodb.OldImage.Count, 2); + Assert.Equal(record3.Dynamodb.OldImage["Message"].S, "This item has changed"); + Assert.Equal(record3.Dynamodb.OldImage["Id"].N, "101"); + } + } + + [Theory] + [InlineData(typeof(JsonSerializer))] +#if NETCOREAPP3_1_OR_GREATER + [InlineData(typeof(Amazon.Lambda.Serialization.SystemTextJson.LambdaJsonSerializer))] + [InlineData(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))] +#endif + public void DynamoDBTimeWindowResponseTest(Type serializerType) + { + var serializer = Activator.CreateInstance(serializerType) as ILambdaSerializer; + using (var fileStream = LoadJsonTestFile("dynamodb-timewindow-response.json")) + { + var dynamoDBTimeWindowResponse = serializer.Deserialize(fileStream); + + Assert.Equal(dynamoDBTimeWindowResponse.State.Count, 2); + Assert.True(dynamoDBTimeWindowResponse.State.ContainsKey("1")); + Assert.Equal(dynamoDBTimeWindowResponse.State["1"], "282"); + Assert.True(dynamoDBTimeWindowResponse.State.ContainsKey("2")); + Assert.Equal(dynamoDBTimeWindowResponse.State["2"], "715"); + Assert.Equal(dynamoDBTimeWindowResponse.BatchItemFailures.Count, 0); + } + } + [Theory] [InlineData(typeof(JsonSerializer))] #if NETCOREAPP_3_1 diff --git a/Libraries/test/EventsTests.Shared/EventsTests.Shared.projitems b/Libraries/test/EventsTests.Shared/EventsTests.Shared.projitems index 1da4a7f78..12fbe4fba 100644 --- a/Libraries/test/EventsTests.Shared/EventsTests.Shared.projitems +++ b/Libraries/test/EventsTests.Shared/EventsTests.Shared.projitems @@ -31,6 +31,10 @@ + + + + diff --git a/Libraries/test/EventsTests.Shared/dynamodb-timewindow-event.json b/Libraries/test/EventsTests.Shared/dynamodb-timewindow-event.json new file mode 100644 index 000000000..9c31e19e3 --- /dev/null +++ b/Libraries/test/EventsTests.Shared/dynamodb-timewindow-event.json @@ -0,0 +1,101 @@ +{ + "Records": [ + { + "eventID": "1", + "eventName": "INSERT", + "eventVersion": "1.0", + "eventSource": "aws:dynamodb", + "awsRegion": "us-east-1", + "dynamodb": { + "Keys": { + "Id": { + "N": "101" + } + }, + "NewImage": { + "Message": { + "S": "New item!" + }, + "Id": { + "N": "101" + } + }, + "SequenceNumber": "111", + "SizeBytes": 26, + "StreamViewType": "NEW_IMAGE" + }, + "eventSourceARN": "stream-ARN" + }, + { + "eventID": "2", + "eventName": "MODIFY", + "eventVersion": "1.0", + "eventSource": "aws:dynamodb", + "awsRegion": "us-east-1", + "dynamodb": { + "Keys": { + "Id": { + "N": "101" + } + }, + "NewImage": { + "Message": { + "S": "This item has changed" + }, + "Id": { + "N": "101" + } + }, + "OldImage": { + "Message": { + "S": "New item!" + }, + "Id": { + "N": "101" + } + }, + "SequenceNumber": "222", + "SizeBytes": 59, + "StreamViewType": "NEW_AND_OLD_IMAGES" + }, + "eventSourceARN": "stream-ARN" + }, + { + "eventID": "3", + "eventName": "REMOVE", + "eventVersion": "1.0", + "eventSource": "aws:dynamodb", + "awsRegion": "us-east-1", + "dynamodb": { + "Keys": { + "Id": { + "N": "101" + } + }, + "OldImage": { + "Message": { + "S": "This item has changed" + }, + "Id": { + "N": "101" + } + }, + "SequenceNumber": "333", + "SizeBytes": 38, + "StreamViewType": "NEW_AND_OLD_IMAGES" + }, + "eventSourceARN": "stream-ARN" + } + ], + "window": { + "start": "2020-07-30T17:00:00Z", + "end": "2020-07-30T17:05:00Z" + }, + "state": { + "1": "state1" + }, + "shardId": "shard123456789", + "eventSourceARN": "stream-ARN", + "isFinalInvokeForWindow": false, + "isWindowTerminatedEarly": false +} \ No newline at end of file diff --git a/Libraries/test/EventsTests.Shared/dynamodb-timewindow-response.json b/Libraries/test/EventsTests.Shared/dynamodb-timewindow-response.json new file mode 100644 index 000000000..ae2c46b6e --- /dev/null +++ b/Libraries/test/EventsTests.Shared/dynamodb-timewindow-response.json @@ -0,0 +1,7 @@ +{ + "state": { + "1": 282, + "2": 715 + }, + "batchItemFailures": [] +} \ No newline at end of file diff --git a/Libraries/test/EventsTests.Shared/kinesis-timewindow-event.json b/Libraries/test/EventsTests.Shared/kinesis-timewindow-event.json new file mode 100644 index 000000000..6c14af23a --- /dev/null +++ b/Libraries/test/EventsTests.Shared/kinesis-timewindow-event.json @@ -0,0 +1,32 @@ +{ + "Records": [ + { + "kinesis": { + "kinesisSchemaVersion": "1.0", + "partitionKey": "1", + "sequenceNumber": "49590338271490256608559692538361571095921575989136588898", + "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", + "approximateArrivalTimestamp": 1607497475.000 + }, + "eventSource": "aws:kinesis", + "eventVersion": "1.0", + "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898", + "eventName": "aws:kinesis:record", + "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-kinesis-role", + "awsRegion": "us-east-1", + "eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/lambda-stream" + } + ], + "window": { + "start": "2020-12-09T07:04:00Z", + "end": "2020-12-09T07:06:00Z" + }, + "state": { + "1": 282, + "2": 715 + }, + "shardId": "shardId-000000000006", + "eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/lambda-stream", + "isFinalInvokeForWindow": false, + "isWindowTerminatedEarly": false +} \ No newline at end of file diff --git a/Libraries/test/EventsTests.Shared/kinesis-timewindow-response.json b/Libraries/test/EventsTests.Shared/kinesis-timewindow-response.json new file mode 100644 index 000000000..ae2c46b6e --- /dev/null +++ b/Libraries/test/EventsTests.Shared/kinesis-timewindow-response.json @@ -0,0 +1,7 @@ +{ + "state": { + "1": 282, + "2": 715 + }, + "batchItemFailures": [] +} \ No newline at end of file