diff --git a/Libraries/src/Amazon.Lambda.DynamoDBEvents/Amazon.Lambda.DynamoDBEvents.csproj b/Libraries/src/Amazon.Lambda.DynamoDBEvents/Amazon.Lambda.DynamoDBEvents.csproj index 7d50d1fe8..022982798 100644 --- a/Libraries/src/Amazon.Lambda.DynamoDBEvents/Amazon.Lambda.DynamoDBEvents.csproj +++ b/Libraries/src/Amazon.Lambda.DynamoDBEvents/Amazon.Lambda.DynamoDBEvents.csproj @@ -6,7 +6,7 @@ netstandard2.0;netcoreapp3.1 Amazon Lambda .NET Core support - DynamoDBEvents package. Amazon.Lambda.DynamoDBEvents - 2.1.1 + 2.2.0 Amazon.Lambda.DynamoDBEvents Amazon.Lambda.DynamoDBEvents AWS;Amazon;Lambda @@ -16,4 +16,12 @@ + + + + + + + + diff --git a/Libraries/src/Amazon.Lambda.DynamoDBEvents/Converters/DictionaryLongToStringJsonConverter.cs b/Libraries/src/Amazon.Lambda.DynamoDBEvents/Converters/DictionaryLongToStringJsonConverter.cs new file mode 100644 index 000000000..50dc180a0 --- /dev/null +++ b/Libraries/src/Amazon.Lambda.DynamoDBEvents/Converters/DictionaryLongToStringJsonConverter.cs @@ -0,0 +1,71 @@ +using System; +using System.Collections.Generic; +using System.Text.Json; +using System.Text.Json.Serialization; + +namespace Amazon.Lambda.DynamoDBEvents.Converters +{ + public class DictionaryLongToStringJsonConverter : JsonConverter> + { + public override Dictionary Read(ref Utf8JsonReader reader, Type type, JsonSerializerOptions options) + { + if (reader.TokenType != JsonTokenType.StartObject) + { + throw new JsonException($"JsonTokenType was of type {reader.TokenType}, only objects are supported."); + } + + var dictionary = new Dictionary(); + + while (reader.Read()) + { + if (reader.TokenType == JsonTokenType.EndObject) + { + return dictionary; + } + + // Get the key. + if (reader.TokenType != JsonTokenType.PropertyName) + { + throw new JsonException("JsonTokenType was not PropertyName."); + } + + var propertyName = reader.GetString(); + + if (string.IsNullOrWhiteSpace(propertyName)) + { + throw new JsonException("Failed to get property name."); + } + + // Get the value. + reader.Read(); + var keyValue = ExtractValue(ref reader, options); + dictionary.Add(propertyName, keyValue); + } + + return dictionary; + } + + public override void Write(Utf8JsonWriter writer, Dictionary value, JsonSerializerOptions options) + { + // Use the built-in serializer, because it can handle dictionaries with string keys. + JsonSerializer.Serialize(writer, value, options); + } + + private string ExtractValue(ref Utf8JsonReader reader, JsonSerializerOptions options) + { + switch (reader.TokenType) + { + case JsonTokenType.Number: + if (reader.TryGetInt64(out var result)) + { + return result.ToString(); + } + throw new JsonException($"Unable to convert '{reader.TokenType}' to long value."); + case JsonTokenType.String: // If it is string, then use as it is. + return reader.GetString(); + default: + throw new JsonException($"'{reader.TokenType}' is not supported."); + } + } + } +} \ No newline at end of file diff --git a/Libraries/src/Amazon.Lambda.DynamoDBEvents/DynamoDBTimeWindowEvent.cs b/Libraries/src/Amazon.Lambda.DynamoDBEvents/DynamoDBTimeWindowEvent.cs new file mode 100644 index 000000000..97e5644e8 --- /dev/null +++ b/Libraries/src/Amazon.Lambda.DynamoDBEvents/DynamoDBTimeWindowEvent.cs @@ -0,0 +1,66 @@ +namespace Amazon.Lambda.DynamoDBEvents +{ + using System; + using System.Collections.Generic; + +#if NETCOREAPP3_1_OR_GREATER + using Amazon.Lambda.DynamoDBEvents.Converters; + using System.Text.Json.Serialization; +#endif + + /// + /// Represents an Amazon DynamodDB event when using time windows. + /// https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html#services-ddb-windows + /// + public class DynamoDBTimeWindowEvent : DynamoDBEvent + { + /// + /// Time window for the records in the event. + /// + public TimeWindow Window { get; set; } + + /// + /// State being built up to this invoke in the time window. + /// +#if NETCOREAPP3_1_OR_GREATER + [JsonConverter(typeof(DictionaryLongToStringJsonConverter))] +#endif + public Dictionary State { get; set; } + + /// + /// Shard Id of the records. + /// + public string ShardId { get; set; } + + /// + /// DynamoDB stream Arn. + /// + public string EventSourceArn { get; set; } + + /// + /// Set to true for the last invoke of the time window. Subsequent invoke will start a new time window along with a fresh state. + /// + public bool? IsFinalInvokeForWindow { get; set; } + + /// + /// Set to true if window is terminated prematurely. Subsequent invoke will continue the same window with a fresh state. + /// + public bool? IsWindowTerminatedEarly { get; set; } + + /// + /// Time window for the records in the event. + /// + public class TimeWindow + { + /// + /// Window start instant. + /// + public DateTime Start { get; set; } + + /// + /// Window end instant. + /// + public DateTime End { get; set; } + } + } +} diff --git a/Libraries/src/Amazon.Lambda.DynamoDBEvents/DynamoDBTimeWindowResponse.cs b/Libraries/src/Amazon.Lambda.DynamoDBEvents/DynamoDBTimeWindowResponse.cs new file mode 100644 index 000000000..73a14ab7e --- /dev/null +++ b/Libraries/src/Amazon.Lambda.DynamoDBEvents/DynamoDBTimeWindowResponse.cs @@ -0,0 +1,54 @@ +namespace Amazon.Lambda.DynamoDBEvents +{ + using System; + using System.Collections.Generic; + using System.Runtime.Serialization; + +#if NETCOREAPP3_1_OR_GREATER + using Amazon.Lambda.DynamoDBEvents.Converters; + using System.Text.Json.Serialization; +#endif + + /// + /// Response type to return a new state for the time window and to report batch item failures. + /// + [DataContract] + public class DynamoDBTimeWindowResponse + { + /// + /// New state after processing a batch of records. + /// + [DataMember(Name = "state")] +#if NETCOREAPP3_1_OR_GREATER + [System.Text.Json.Serialization.JsonPropertyName("state")] + [JsonConverter(typeof(DictionaryLongToStringJsonConverter))] +#endif + public Dictionary State { get; set; } + + /// + /// A list of records which failed processing. + /// Returning the first record which failed would retry all remaining records from the batch. + /// + [DataMember(Name = "batchItemFailures")] +#if NETCOREAPP3_1_OR_GREATER + [System.Text.Json.Serialization.JsonPropertyName("batchItemFailures")] +#endif + public IList BatchItemFailures { get; set; } + + /// + /// Class representing the individual record which failed processing. + /// + [DataContract] + public class BatchItemFailure + { + /// + /// Sequence number of the record which failed processing. + /// + [DataMember(Name = "itemIdentifier")] +#if NETCOREAPP3_1_OR_GREATER + [System.Text.Json.Serialization.JsonPropertyName("itemIdentifier")] +#endif + public string ItemIdentifier { get; set; } + } + } +} diff --git a/Libraries/src/Amazon.Lambda.DynamoDBEvents/README.md b/Libraries/src/Amazon.Lambda.DynamoDBEvents/README.md index 165af79e0..3088cc7cd 100644 --- a/Libraries/src/Amazon.Lambda.DynamoDBEvents/README.md +++ b/Libraries/src/Amazon.Lambda.DynamoDBEvents/README.md @@ -56,3 +56,54 @@ public class Function } } ``` + +The following is a sample class and Lambda function that receives Amazon DynamoDB event when using time windows as an input and uses `DynamoDBTimeWindowResponse` object to demonstrate how to aggregate and then process the final state. (Note that by default anything written to Console will be logged as CloudWatch Logs events.) + +```csharp +public class Function +{ + public DynamoDBTimeWindowResponse Handler(DynamoDBTimeWindowEvent ddbTimeWindowEvent) + { + Console.WriteLine($"Incoming event, Source Arn: {ddbTimeWindowEvent.EventSourceArn}, Shard Id: {ddbTimeWindowEvent.ShardId}"); + Console.WriteLine($"Incoming state: {string.Join(';', ddbTimeWindowEvent.State.Select(s => s.Key + "=" + s.Value))}"); + + //Check if this is the end of the window to either aggregate or process. + if (ddbTimeWindowEvent.IsFinalInvokeForWindow.HasValue && ddbTimeWindowEvent.IsFinalInvokeForWindow.Value) + { + // Logic to handle final state of the window + Console.WriteLine("Destination invoke"); + } + else + { + Console.WriteLine("Aggregate invoke"); + } + + //Check for early terminations + if (ddbTimeWindowEvent.IsWindowTerminatedEarly.HasValue && ddbTimeWindowEvent.IsWindowTerminatedEarly.Value) + { + Console.WriteLine("Window terminated early"); + } + + // Aggregation logic + var state = ddbTimeWindowEvent.State; + foreach (var record in ddbTimeWindowEvent.Records) + { + int id; + if (!state.ContainsKey(record.Dynamodb.NewImage["Id"].N) || int.TryParse(record.Dynamodb.NewImage["Id"].N, out id)) + { + state[record.Dynamodb.NewImage["Id"].N] = "1"; + } + else + { + state[record.Dynamodb.NewImage["Id"].N] = (id + 1).ToString(); + } + } + + Console.WriteLine($"Returning state: {string.Join(';', state.Select(s => s.Key + "=" + s.Value))}"); + return new DynamoDBTimeWindowResponse() + { + State = state + }; + } +} +``` diff --git a/Libraries/src/Amazon.Lambda.KinesisEvents/Amazon.Lambda.KinesisEvents.csproj b/Libraries/src/Amazon.Lambda.KinesisEvents/Amazon.Lambda.KinesisEvents.csproj index f904b9ed9..f8b28000c 100644 --- a/Libraries/src/Amazon.Lambda.KinesisEvents/Amazon.Lambda.KinesisEvents.csproj +++ b/Libraries/src/Amazon.Lambda.KinesisEvents/Amazon.Lambda.KinesisEvents.csproj @@ -3,10 +3,10 @@ - netstandard2.0 + netstandard2.0;netcoreapp3.1 Amazon Lambda .NET Core support - KinesisEvents package. Amazon.Lambda.KinesisEvents - 2.0.0 + 2.1.0 Amazon.Lambda.KinesisEvents Amazon.Lambda.KinesisEvents AWS;Amazon;Lambda @@ -16,4 +16,12 @@ + + + + + + + + diff --git a/Libraries/src/Amazon.Lambda.KinesisEvents/Converters/DictionaryLongToStringJsonConverter.cs b/Libraries/src/Amazon.Lambda.KinesisEvents/Converters/DictionaryLongToStringJsonConverter.cs new file mode 100644 index 000000000..39100a90c --- /dev/null +++ b/Libraries/src/Amazon.Lambda.KinesisEvents/Converters/DictionaryLongToStringJsonConverter.cs @@ -0,0 +1,71 @@ +using System; +using System.Collections.Generic; +using System.Text.Json; +using System.Text.Json.Serialization; + +namespace Amazon.Lambda.KinesisEvents.Converters +{ + public class DictionaryLongToStringJsonConverter : JsonConverter> + { + public override Dictionary Read(ref Utf8JsonReader reader, Type type, JsonSerializerOptions options) + { + if (reader.TokenType != JsonTokenType.StartObject) + { + throw new JsonException($"JsonTokenType was of type {reader.TokenType}, only objects are supported."); + } + + var dictionary = new Dictionary(); + + while (reader.Read()) + { + if (reader.TokenType == JsonTokenType.EndObject) + { + return dictionary; + } + + // Get the key. + if (reader.TokenType != JsonTokenType.PropertyName) + { + throw new JsonException("JsonTokenType was not PropertyName."); + } + + var propertyName = reader.GetString(); + + if (string.IsNullOrWhiteSpace(propertyName)) + { + throw new JsonException("Failed to get property name."); + } + + // Get the value. + reader.Read(); + var keyValue = ExtractValue(ref reader, options); + dictionary.Add(propertyName, keyValue); + } + + return dictionary; + } + + public override void Write(Utf8JsonWriter writer, Dictionary value, JsonSerializerOptions options) + { + // Use the built-in serializer, because it can handle dictionaries with string keys. + JsonSerializer.Serialize(writer, value, options); + } + + private string ExtractValue(ref Utf8JsonReader reader, JsonSerializerOptions options) + { + switch (reader.TokenType) + { + case JsonTokenType.Number: + if (reader.TryGetInt64(out var result)) + { + return result.ToString(); + } + throw new JsonException($"Unable to convert '{reader.TokenType}' to long value."); + case JsonTokenType.String: // If it is string, then use as it is. + return reader.GetString(); + default: + throw new JsonException($"'{reader.TokenType}' is not supported."); + } + } + } +} \ No newline at end of file diff --git a/Libraries/src/Amazon.Lambda.KinesisEvents/KinesisTimeWindowEvent.cs b/Libraries/src/Amazon.Lambda.KinesisEvents/KinesisTimeWindowEvent.cs new file mode 100644 index 000000000..d0a55658d --- /dev/null +++ b/Libraries/src/Amazon.Lambda.KinesisEvents/KinesisTimeWindowEvent.cs @@ -0,0 +1,66 @@ +namespace Amazon.Lambda.KinesisEvents +{ + using System; + using System.Collections.Generic; + +#if NETCOREAPP3_1_OR_GREATER + using Amazon.Lambda.KinesisEvents.Converters; + using System.Text.Json.Serialization; +#endif + + /// + /// Represents an Amazon Kinesis event when using time windows. + /// https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html#services-kinesis-windows + /// + public class KinesisTimeWindowEvent : KinesisEvent + { + /// + /// Time window for the records in the event. + /// + public TimeWindow Window { get; set; } + + /// + /// State being built up to this invoke in the time window. + /// +#if NETCOREAPP3_1_OR_GREATER + [JsonConverter(typeof(DictionaryLongToStringJsonConverter))] +#endif + public Dictionary State { get; set; } + + /// + /// Shard Id of the records. + /// + public string ShardId { get; set; } + + /// + /// Kinesis stream or consumer ARN. + /// + public string EventSourceARN { get; set; } + + /// + /// Set to true for the last invoke of the time window. Subsequent invoke will start a new time window along with a fresh state. + /// + public bool? IsFinalInvokeForWindow { get; set; } + + /// + /// Set to true if window is terminated prematurely. Subsequent invoke will continue the same window with a fresh state. + /// + public bool? IsWindowTerminatedEarly { get; set; } + + /// + /// Time window for the records in the event. + /// + public class TimeWindow + { + /// + /// Window start instant. + /// + public DateTime Start { get; set; } + + /// + /// Window end instant. + /// + public DateTime End { get; set; } + } + } +} diff --git a/Libraries/src/Amazon.Lambda.KinesisEvents/KinesisTimeWindowResponse.cs b/Libraries/src/Amazon.Lambda.KinesisEvents/KinesisTimeWindowResponse.cs new file mode 100644 index 000000000..7454f7ab0 --- /dev/null +++ b/Libraries/src/Amazon.Lambda.KinesisEvents/KinesisTimeWindowResponse.cs @@ -0,0 +1,54 @@ +namespace Amazon.Lambda.KinesisEvents +{ + using System; + using System.Collections.Generic; + using System.Runtime.Serialization; + +#if NETCOREAPP3_1_OR_GREATER + using Amazon.Lambda.KinesisEvents.Converters; + using System.Text.Json.Serialization; +#endif + + /// + /// Response type to return a new state for the time window and to report batch item failures. + /// + [DataContract] + public class KinesisTimeWindowResponse + { + /// + /// New state after processing a batch of records. + /// + [DataMember(Name = "state")] +#if NETCOREAPP3_1_OR_GREATER + [System.Text.Json.Serialization.JsonPropertyName("state")] + [JsonConverter(typeof(DictionaryLongToStringJsonConverter))] +#endif + public Dictionary State { get; set; } + + /// + /// A list of records which failed processing. + /// Returning the first record which failed would retry all remaining records from the batch. + /// + [DataMember(Name = "batchItemFailures")] +#if NETCOREAPP3_1_OR_GREATER + [System.Text.Json.Serialization.JsonPropertyName("batchItemFailures")] +#endif + public IList BatchItemFailures { get; set; } + + /// + /// Class representing the individual record which failed processing. + /// + [DataContract] + public class BatchItemFailure + { + /// + /// Sequence number of the record which failed processing. + /// + [DataMember(Name = "itemIdentifier")] +#if NETCOREAPP3_1_OR_GREATER + [System.Text.Json.Serialization.JsonPropertyName("itemIdentifier")] +#endif + public string ItemIdentifier { get; set; } + } + } +} diff --git a/Libraries/src/Amazon.Lambda.KinesisEvents/README.md b/Libraries/src/Amazon.Lambda.KinesisEvents/README.md index 9e2bc9865..3b7675752 100644 --- a/Libraries/src/Amazon.Lambda.KinesisEvents/README.md +++ b/Libraries/src/Amazon.Lambda.KinesisEvents/README.md @@ -36,3 +36,86 @@ public class Function } } ``` + +The following is a sample class and Lambda function that receives Amazon Kinesis event record data as an input and uses `StreamsEventResponse` object to return batch item failures, if any. (Note that by default anything written to Console will be logged as CloudWatch Logs events.) + +```csharp +public class Function +{ + public StreamsEventResponse Handler(KinesisEvent kinesisEvent) + { + var batchItemFailures = new List(); + string curRecordSequenceNumber = string.Empty; + + foreach (var record in kinesisEvent.Records) + { + try + { + //Process your record + var kinesisRecord = record.Kinesis; + curRecordSequenceNumber = kinesisRecord.SequenceNumber; + } + catch (Exception e) + { + /* Since we are working with streams, we can return the failed item immediately. + Lambda will immediately begin to retry processing from this failed item onwards. */ + batchItemFailures.Add(new StreamsEventResponse.BatchItemFailure() { ItemIdentifier = curRecordSequenceNumber }); + return new StreamsEventResponse() { BatchItemFailures = batchItemFailures }; + } + } + + return new StreamsEventResponse() { BatchItemFailures = batchItemFailures }; + } +} +``` + +The following is a sample class and Lambda function that receives Amazon Kinesis event when using time windows as an input and uses `KinesisTimeWindowResponse` object to demonstrate how to aggregate and then process the final state. (Note that by default anything written to Console will be logged as CloudWatch Logs events.) + +```csharp +public class Function +{ + public KinesisTimeWindowResponse Handler(KinesisTimeWindowEvent kinesisTimeWindowEvent) + { + Console.WriteLine($"Incoming event, Source Arn: {kinesisTimeWindowEvent.EventSourceARN}, Shard Id: {kinesisTimeWindowEvent.ShardId}"); + Console.WriteLine($"Incoming state: {string.Join(';', kinesisTimeWindowEvent.State.Select(s => s.Key + "=" + s.Value))}"); + + //Check if this is the end of the window to either aggregate or process. + if (kinesisTimeWindowEvent.IsFinalInvokeForWindow.HasValue && kinesisTimeWindowEvent.IsFinalInvokeForWindow.Value) + { + // Logic to handle final state of the window + Console.WriteLine("Destination invoke"); + } + else + { + Console.WriteLine("Aggregate invoke"); + } + + //Check for early terminations + if (kinesisTimeWindowEvent.IsWindowTerminatedEarly.HasValue && kinesisTimeWindowEvent.IsWindowTerminatedEarly.Value) + { + Console.WriteLine("Window terminated early"); + } + + // Aggregation logic + var state = kinesisTimeWindowEvent.State; + foreach (var record in kinesisTimeWindowEvent.Records) + { + int id; + if (!state.ContainsKey(record.Kinesis.PartitionKey) || int.TryParse(record.Kinesis.PartitionKey, out id)) + { + state[record.Kinesis.PartitionKey] = "1"; + } + else + { + state[record.Kinesis.PartitionKey] = (id + 1).ToString(); + } + } + + Console.WriteLine($"Returning state: {string.Join(';', state.Select(s => s.Key + "=" + s.Value))}"); + return new KinesisTimeWindowResponse() + { + State = state + }; + } +} +``` diff --git a/Libraries/src/Amazon.Lambda.KinesisEvents/StreamsEventResponse.cs b/Libraries/src/Amazon.Lambda.KinesisEvents/StreamsEventResponse.cs new file mode 100644 index 000000000..390ce7d01 --- /dev/null +++ b/Libraries/src/Amazon.Lambda.KinesisEvents/StreamsEventResponse.cs @@ -0,0 +1,38 @@ +namespace Amazon.Lambda.KinesisEvents +{ + using System.Collections.Generic; + using System.Runtime.Serialization; + + /// + /// Function response type to report batch item failures for KinesisEvent. + /// https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html#services-kinesis-batchfailurereporting + /// + [DataContract] + public class StreamsEventResponse + { + /// + /// A list of records which failed processing. Returning the first record which failed would retry all remaining records from the batch. + /// + [DataMember(Name = "batchItemFailures", EmitDefaultValue = false)] +#if NETCOREAPP3_1 + [System.Text.Json.Serialization.JsonPropertyName("batchItemFailures")] +#endif + public IList BatchItemFailures { get; set; } + + /// + /// The class representing the BatchItemFailure. + /// + [DataContract] + public class BatchItemFailure + { + /// + /// Sequence number of the record which failed processing. + /// + [DataMember(Name = "itemIdentifier", EmitDefaultValue = false)] +#if NETCOREAPP3_1 + [System.Text.Json.Serialization.JsonPropertyName("itemIdentifier")] +#endif + public string ItemIdentifier { get; set; } + } + } +} \ No newline at end of file diff --git a/Libraries/test/EventsTests.Shared/EventTests.cs b/Libraries/test/EventsTests.Shared/EventTests.cs index d1079cb1c..42d4dde1a 100644 --- a/Libraries/test/EventsTests.Shared/EventTests.cs +++ b/Libraries/test/EventsTests.Shared/EventTests.cs @@ -5,6 +5,7 @@ namespace Amazon.Lambda.Tests using Amazon.Lambda.ApplicationLoadBalancerEvents; using Amazon.Lambda.CloudWatchEvents.BatchEvents; using Amazon.Lambda.CloudWatchEvents.ECSEvents; + using Amazon.Lambda.CloudWatchEvents.S3Events; using Amazon.Lambda.CloudWatchEvents.ScheduledEvents; using Amazon.Lambda.CloudWatchEvents.TranscribeEvents; using Amazon.Lambda.CloudWatchEvents.TranslateEvents; @@ -19,14 +20,13 @@ namespace Amazon.Lambda.Tests using Amazon.Lambda.KinesisEvents; using Amazon.Lambda.KinesisFirehoseEvents; using Amazon.Lambda.LexEvents; + using Amazon.Lambda.LexV2Events; using Amazon.Lambda.MQEvents; using Amazon.Lambda.S3Events; using Amazon.Lambda.SimpleEmailEvents; using Amazon.Lambda.SNSEvents; using Amazon.Lambda.SQSEvents; - using Amazon.Lambda.LexV2Events; - - + using Newtonsoft.Json; using Newtonsoft.Json.Linq; using Newtonsoft.Json.Serialization; using System; @@ -35,10 +35,6 @@ namespace Amazon.Lambda.Tests using System.Linq; using System.Text; using Xunit; - using Newtonsoft.Json; - using Amazon.Lambda.CloudWatchEvents; - using Amazon.Lambda.CloudWatchEvents.S3Events; - using JsonSerializer = Amazon.Lambda.Serialization.Json.JsonSerializer; public class EventTest @@ -336,6 +332,115 @@ private void Handle(KinesisEvent kinesisEvent) } } + [Theory] + [InlineData(typeof(JsonSerializer))] +#if NETCOREAPP3_1_OR_GREATER + [InlineData(typeof(Amazon.Lambda.Serialization.SystemTextJson.LambdaJsonSerializer))] + [InlineData(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))] +#endif + public void KinesisBatchItemFailuresTest(Type serializerType) + { + var serializer = Activator.CreateInstance(serializerType) as ILambdaSerializer; + using (var fileStream = LoadJsonTestFile("kinesis-batchitemfailures-response.json")) + { + var kinesisStreamsEventResponse = serializer.Deserialize(fileStream); + + Assert.Equal(1, kinesisStreamsEventResponse.BatchItemFailures.Count); + Assert.Equal("1405400000000002063282832", kinesisStreamsEventResponse.BatchItemFailures[0].ItemIdentifier); + + MemoryStream ms = new MemoryStream(); + serializer.Serialize(kinesisStreamsEventResponse, ms); + ms.Position = 0; + var json = new StreamReader(ms).ReadToEnd(); + + var original = JObject.Parse(File.ReadAllText("kinesis-batchitemfailures-response.json")); + var serialized = JObject.Parse(json); + Assert.True(JToken.DeepEquals(serialized, original), "Serialized object is not the same as the original JSON"); + } + } + + [Theory] + [InlineData(typeof(JsonSerializer))] +#if NETCOREAPP3_1_OR_GREATER + [InlineData(typeof(Amazon.Lambda.Serialization.SystemTextJson.LambdaJsonSerializer))] + [InlineData(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))] +#endif + public void KinesisTimeWindowTest(Type serializerType) + { + var serializer = Activator.CreateInstance(serializerType) as ILambdaSerializer; + using (var fileStream = LoadJsonTestFile("kinesis-timewindow-event.json")) + { + var kinesisTimeWindowEvent = serializer.Deserialize(fileStream); + + Assert.Equal(kinesisTimeWindowEvent.ShardId, "shardId-000000000006"); + Assert.Equal(kinesisTimeWindowEvent.EventSourceARN, "arn:aws:kinesis:us-east-1:123456789012:stream/lambda-stream"); + Assert.False(kinesisTimeWindowEvent.IsFinalInvokeForWindow); + Assert.False(kinesisTimeWindowEvent.IsWindowTerminatedEarly); + Assert.Equal(kinesisTimeWindowEvent.State.Count, 2); + Assert.True(kinesisTimeWindowEvent.State.ContainsKey("1")); + Assert.Equal(kinesisTimeWindowEvent.State["1"], "282"); + Assert.True(kinesisTimeWindowEvent.State.ContainsKey("2")); + Assert.Equal(kinesisTimeWindowEvent.State["2"], "715"); + Assert.NotNull(kinesisTimeWindowEvent.Window); + Assert.Equal(637430942400000000, kinesisTimeWindowEvent.Window.Start.Ticks); + Assert.Equal(637430943600000000, kinesisTimeWindowEvent.Window.End.Ticks); + + Assert.Equal(kinesisTimeWindowEvent.Records.Count, 1); + var record = kinesisTimeWindowEvent.Records[0]; + Assert.Equal(record.EventId, "shardId-000000000006:49590338271490256608559692538361571095921575989136588898"); + Assert.Equal(record.EventName, "aws:kinesis:record"); + Assert.Equal(record.EventVersion, "1.0"); + Assert.Equal(record.EventSource, "aws:kinesis"); + Assert.Equal(record.InvokeIdentityArn, "arn:aws:iam::123456789012:role/lambda-kinesis-role"); + Assert.Equal(record.AwsRegion, "us-east-1"); + Assert.Equal(record.EventSourceARN, "arn:aws:kinesis:us-east-1:123456789012:stream/lambda-stream"); + + Assert.Equal(record.Kinesis.KinesisSchemaVersion, "1.0"); + Assert.Equal(record.Kinesis.PartitionKey, "1"); + Assert.Equal(record.Kinesis.SequenceNumber, "49590338271490256608559692538361571095921575989136588898"); + var dataBytes = record.Kinesis.Data.ToArray(); + Assert.Equal(Convert.ToBase64String(dataBytes), "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg=="); + Assert.Equal(Encoding.UTF8.GetString(dataBytes), "Hello, this is a test."); + Assert.Equal(637430942750000000, record.Kinesis.ApproximateArrivalTimestamp.ToUniversalTime().Ticks); + + Handle(kinesisTimeWindowEvent); + } + } + + private void Handle(KinesisTimeWindowEvent kinesisTimeWindowEvent) + { + foreach (var record in kinesisTimeWindowEvent.Records) + { + var kinesisRecord = record.Kinesis; + var dataBytes = kinesisRecord.Data.ToArray(); + var dataText = Encoding.UTF8.GetString(dataBytes); + Assert.Equal("Hello, this is a test.", dataText); + Console.WriteLine($"[{record.EventName}] Data = '{dataText}'."); + } + } + + [Theory] + [InlineData(typeof(JsonSerializer))] +#if NETCOREAPP3_1_OR_GREATER + [InlineData(typeof(Amazon.Lambda.Serialization.SystemTextJson.LambdaJsonSerializer))] + [InlineData(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))] +#endif + public void KinesisTimeWindowResponseTest(Type serializerType) + { + var serializer = Activator.CreateInstance(serializerType) as ILambdaSerializer; + using (var fileStream = LoadJsonTestFile("kinesis-timewindow-response.json")) + { + var kinesisTimeWindowResponse = serializer.Deserialize(fileStream); + + Assert.Equal(kinesisTimeWindowResponse.State.Count, 2); + Assert.True(kinesisTimeWindowResponse.State.ContainsKey("1")); + Assert.Equal(kinesisTimeWindowResponse.State["1"], "282"); + Assert.True(kinesisTimeWindowResponse.State.ContainsKey("2")); + Assert.Equal(kinesisTimeWindowResponse.State["2"], "715"); + Assert.Equal(kinesisTimeWindowResponse.BatchItemFailures.Count, 0); + } + } + [Theory] [InlineData(typeof(JsonSerializer))] #if NETCOREAPP3_1_OR_GREATER @@ -376,7 +481,7 @@ public void DynamoDbUpdateTest(Type serializerType) [Theory] [InlineData(typeof(JsonSerializer))] -#if NETCOREAPP3_1_OR_GREATER +#if NETCOREAPP3_1_OR_GREATER [InlineData(typeof(Amazon.Lambda.Serialization.SystemTextJson.LambdaJsonSerializer))] [InlineData(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))] #endif @@ -385,13 +490,13 @@ public void DynamoDbBatchItemFailuresTest(Type serializerType) var serializer = Activator.CreateInstance(serializerType) as ILambdaSerializer; using (var fileStream = LoadJsonTestFile("dynamodb-batchitemfailures-response.json")) { - var dynamoDbStreamsEventResponse = serializer.Deserialize(fileStream); + var dynamoDbStreamsEventResponse = serializer.Deserialize(fileStream); Assert.Equal(1, dynamoDbStreamsEventResponse.BatchItemFailures.Count); Assert.Equal("1405400000000002063282832", dynamoDbStreamsEventResponse.BatchItemFailures[0].ItemIdentifier); MemoryStream ms = new MemoryStream(); - serializer.Serialize(dynamoDbStreamsEventResponse, ms); + serializer.Serialize(dynamoDbStreamsEventResponse, ms); ms.Position = 0; var json = new StreamReader(ms).ReadToEnd(); @@ -412,6 +517,109 @@ private static void Handle(DynamoDBEvent ddbEvent) Console.WriteLine($"Successfully processed {ddbEvent.Records.Count} records."); } + [Theory] + [InlineData(typeof(JsonSerializer))] +#if NETCOREAPP3_1_OR_GREATER + [InlineData(typeof(Amazon.Lambda.Serialization.SystemTextJson.LambdaJsonSerializer))] + [InlineData(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))] +#endif + public void DynamoDBTimeWindowTest(Type serializerType) + { + var serializer = Activator.CreateInstance(serializerType) as ILambdaSerializer; + using (var fileStream = LoadJsonTestFile("dynamodb-timewindow-event.json")) + { + var dynamoDBTimeWindowEvent = serializer.Deserialize(fileStream); + + Assert.Equal(dynamoDBTimeWindowEvent.ShardId, "shard123456789"); + Assert.Equal(dynamoDBTimeWindowEvent.EventSourceArn, "stream-ARN"); + Assert.False(dynamoDBTimeWindowEvent.IsFinalInvokeForWindow); + Assert.False(dynamoDBTimeWindowEvent.IsWindowTerminatedEarly); + Assert.Equal(dynamoDBTimeWindowEvent.State.Count, 1); + Assert.True(dynamoDBTimeWindowEvent.State.ContainsKey("1")); + Assert.Equal(dynamoDBTimeWindowEvent.State["1"], "state1"); + Assert.NotNull(dynamoDBTimeWindowEvent.Window); + Assert.Equal(637317252000000000, dynamoDBTimeWindowEvent.Window.Start.Ticks); + Assert.Equal(637317255000000000, dynamoDBTimeWindowEvent.Window.End.Ticks); + + Assert.Equal(dynamoDBTimeWindowEvent.Records.Count, 3); + + var record1 = dynamoDBTimeWindowEvent.Records[0]; + Assert.Equal(record1.EventID, "1"); + Assert.Equal(record1.EventName, "INSERT"); + Assert.Equal(record1.EventVersion, "1.0"); + Assert.Equal(record1.EventSource, "aws:dynamodb"); + Assert.Equal(record1.AwsRegion, "us-east-1"); + Assert.Equal(record1.EventSourceArn, "stream-ARN"); + Assert.Equal(record1.Dynamodb.Keys.Count, 1); + Assert.Equal(record1.Dynamodb.Keys["Id"].N, "101"); + Assert.Equal(record1.Dynamodb.SequenceNumber, "111"); + Assert.Equal(record1.Dynamodb.SizeBytes, 26); + Assert.Equal(record1.Dynamodb.StreamViewType, "NEW_IMAGE"); + Assert.Equal(record1.Dynamodb.NewImage.Count, 2); + Assert.Equal(record1.Dynamodb.NewImage["Message"].S, "New item!"); + Assert.Equal(record1.Dynamodb.NewImage["Id"].N, "101"); + Assert.Equal(record1.Dynamodb.OldImage.Count, 0); + + var record2 = dynamoDBTimeWindowEvent.Records[1]; + Assert.Equal(record2.EventID, "2"); + Assert.Equal(record2.EventName, "MODIFY"); + Assert.Equal(record2.EventVersion, "1.0"); + Assert.Equal(record2.EventSource, "aws:dynamodb"); + Assert.Equal(record2.AwsRegion, "us-east-1"); + Assert.Equal(record2.EventSourceArn, "stream-ARN"); + Assert.Equal(record2.Dynamodb.Keys.Count, 1); + Assert.Equal(record2.Dynamodb.Keys["Id"].N, "101"); + Assert.Equal(record2.Dynamodb.SequenceNumber, "222"); + Assert.Equal(record2.Dynamodb.SizeBytes, 59); + Assert.Equal(record2.Dynamodb.StreamViewType, "NEW_AND_OLD_IMAGES"); + Assert.Equal(record2.Dynamodb.NewImage.Count, 2); + Assert.Equal(record2.Dynamodb.NewImage["Message"].S, "This item has changed"); + Assert.Equal(record2.Dynamodb.NewImage["Id"].N, "101"); + Assert.Equal(record2.Dynamodb.OldImage.Count, 2); + Assert.Equal(record2.Dynamodb.OldImage["Message"].S, "New item!"); + Assert.Equal(record2.Dynamodb.OldImage["Id"].N, "101"); + + var record3 = dynamoDBTimeWindowEvent.Records[2]; + Assert.Equal(record3.EventID, "3"); + Assert.Equal(record3.EventName, "REMOVE"); + Assert.Equal(record3.EventVersion, "1.0"); + Assert.Equal(record3.EventSource, "aws:dynamodb"); + Assert.Equal(record3.AwsRegion, "us-east-1"); + Assert.Equal(record3.EventSourceArn, "stream-ARN"); + Assert.Equal(record3.Dynamodb.Keys.Count, 1); + Assert.Equal(record3.Dynamodb.Keys["Id"].N, "101"); + Assert.Equal(record3.Dynamodb.SequenceNumber, "333"); + Assert.Equal(record3.Dynamodb.SizeBytes, 38); + Assert.Equal(record3.Dynamodb.StreamViewType, "NEW_AND_OLD_IMAGES"); + Assert.Equal(record3.Dynamodb.NewImage.Count, 0); + Assert.Equal(record3.Dynamodb.OldImage.Count, 2); + Assert.Equal(record3.Dynamodb.OldImage["Message"].S, "This item has changed"); + Assert.Equal(record3.Dynamodb.OldImage["Id"].N, "101"); + } + } + + [Theory] + [InlineData(typeof(JsonSerializer))] +#if NETCOREAPP3_1_OR_GREATER + [InlineData(typeof(Amazon.Lambda.Serialization.SystemTextJson.LambdaJsonSerializer))] + [InlineData(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))] +#endif + public void DynamoDBTimeWindowResponseTest(Type serializerType) + { + var serializer = Activator.CreateInstance(serializerType) as ILambdaSerializer; + using (var fileStream = LoadJsonTestFile("dynamodb-timewindow-response.json")) + { + var dynamoDBTimeWindowResponse = serializer.Deserialize(fileStream); + + Assert.Equal(dynamoDBTimeWindowResponse.State.Count, 2); + Assert.True(dynamoDBTimeWindowResponse.State.ContainsKey("1")); + Assert.Equal(dynamoDBTimeWindowResponse.State["1"], "282"); + Assert.True(dynamoDBTimeWindowResponse.State.ContainsKey("2")); + Assert.Equal(dynamoDBTimeWindowResponse.State["2"], "715"); + Assert.Equal(dynamoDBTimeWindowResponse.BatchItemFailures.Count, 0); + } + } + [Theory] [InlineData(typeof(JsonSerializer))] #if NETCOREAPP_3_1 diff --git a/Libraries/test/EventsTests.Shared/EventsTests.Shared.projitems b/Libraries/test/EventsTests.Shared/EventsTests.Shared.projitems index 1da4a7f78..e34d37f62 100644 --- a/Libraries/test/EventsTests.Shared/EventsTests.Shared.projitems +++ b/Libraries/test/EventsTests.Shared/EventsTests.Shared.projitems @@ -27,10 +27,15 @@ + + + + + diff --git a/Libraries/test/EventsTests.Shared/dynamodb-timewindow-event.json b/Libraries/test/EventsTests.Shared/dynamodb-timewindow-event.json new file mode 100644 index 000000000..9c31e19e3 --- /dev/null +++ b/Libraries/test/EventsTests.Shared/dynamodb-timewindow-event.json @@ -0,0 +1,101 @@ +{ + "Records": [ + { + "eventID": "1", + "eventName": "INSERT", + "eventVersion": "1.0", + "eventSource": "aws:dynamodb", + "awsRegion": "us-east-1", + "dynamodb": { + "Keys": { + "Id": { + "N": "101" + } + }, + "NewImage": { + "Message": { + "S": "New item!" + }, + "Id": { + "N": "101" + } + }, + "SequenceNumber": "111", + "SizeBytes": 26, + "StreamViewType": "NEW_IMAGE" + }, + "eventSourceARN": "stream-ARN" + }, + { + "eventID": "2", + "eventName": "MODIFY", + "eventVersion": "1.0", + "eventSource": "aws:dynamodb", + "awsRegion": "us-east-1", + "dynamodb": { + "Keys": { + "Id": { + "N": "101" + } + }, + "NewImage": { + "Message": { + "S": "This item has changed" + }, + "Id": { + "N": "101" + } + }, + "OldImage": { + "Message": { + "S": "New item!" + }, + "Id": { + "N": "101" + } + }, + "SequenceNumber": "222", + "SizeBytes": 59, + "StreamViewType": "NEW_AND_OLD_IMAGES" + }, + "eventSourceARN": "stream-ARN" + }, + { + "eventID": "3", + "eventName": "REMOVE", + "eventVersion": "1.0", + "eventSource": "aws:dynamodb", + "awsRegion": "us-east-1", + "dynamodb": { + "Keys": { + "Id": { + "N": "101" + } + }, + "OldImage": { + "Message": { + "S": "This item has changed" + }, + "Id": { + "N": "101" + } + }, + "SequenceNumber": "333", + "SizeBytes": 38, + "StreamViewType": "NEW_AND_OLD_IMAGES" + }, + "eventSourceARN": "stream-ARN" + } + ], + "window": { + "start": "2020-07-30T17:00:00Z", + "end": "2020-07-30T17:05:00Z" + }, + "state": { + "1": "state1" + }, + "shardId": "shard123456789", + "eventSourceARN": "stream-ARN", + "isFinalInvokeForWindow": false, + "isWindowTerminatedEarly": false +} \ No newline at end of file diff --git a/Libraries/test/EventsTests.Shared/dynamodb-timewindow-response.json b/Libraries/test/EventsTests.Shared/dynamodb-timewindow-response.json new file mode 100644 index 000000000..ae2c46b6e --- /dev/null +++ b/Libraries/test/EventsTests.Shared/dynamodb-timewindow-response.json @@ -0,0 +1,7 @@ +{ + "state": { + "1": 282, + "2": 715 + }, + "batchItemFailures": [] +} \ No newline at end of file diff --git a/Libraries/test/EventsTests.Shared/kinesis-batchitemfailures-response.json b/Libraries/test/EventsTests.Shared/kinesis-batchitemfailures-response.json new file mode 100644 index 000000000..e6bcc0333 --- /dev/null +++ b/Libraries/test/EventsTests.Shared/kinesis-batchitemfailures-response.json @@ -0,0 +1,7 @@ +{ + "batchItemFailures": [ + { + "itemIdentifier": "1405400000000002063282832" + } + ] +} \ No newline at end of file diff --git a/Libraries/test/EventsTests.Shared/kinesis-timewindow-event.json b/Libraries/test/EventsTests.Shared/kinesis-timewindow-event.json new file mode 100644 index 000000000..6c14af23a --- /dev/null +++ b/Libraries/test/EventsTests.Shared/kinesis-timewindow-event.json @@ -0,0 +1,32 @@ +{ + "Records": [ + { + "kinesis": { + "kinesisSchemaVersion": "1.0", + "partitionKey": "1", + "sequenceNumber": "49590338271490256608559692538361571095921575989136588898", + "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", + "approximateArrivalTimestamp": 1607497475.000 + }, + "eventSource": "aws:kinesis", + "eventVersion": "1.0", + "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898", + "eventName": "aws:kinesis:record", + "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-kinesis-role", + "awsRegion": "us-east-1", + "eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/lambda-stream" + } + ], + "window": { + "start": "2020-12-09T07:04:00Z", + "end": "2020-12-09T07:06:00Z" + }, + "state": { + "1": 282, + "2": 715 + }, + "shardId": "shardId-000000000006", + "eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/lambda-stream", + "isFinalInvokeForWindow": false, + "isWindowTerminatedEarly": false +} \ No newline at end of file diff --git a/Libraries/test/EventsTests.Shared/kinesis-timewindow-response.json b/Libraries/test/EventsTests.Shared/kinesis-timewindow-response.json new file mode 100644 index 000000000..ae2c46b6e --- /dev/null +++ b/Libraries/test/EventsTests.Shared/kinesis-timewindow-response.json @@ -0,0 +1,7 @@ +{ + "state": { + "1": 282, + "2": 715 + }, + "batchItemFailures": [] +} \ No newline at end of file