diff --git a/Libraries/src/Amazon.Lambda.KinesisEvents/Amazon.Lambda.KinesisEvents.csproj b/Libraries/src/Amazon.Lambda.KinesisEvents/Amazon.Lambda.KinesisEvents.csproj index f904b9ed9..578eda695 100644 --- a/Libraries/src/Amazon.Lambda.KinesisEvents/Amazon.Lambda.KinesisEvents.csproj +++ b/Libraries/src/Amazon.Lambda.KinesisEvents/Amazon.Lambda.KinesisEvents.csproj @@ -6,7 +6,7 @@ netstandard2.0 Amazon Lambda .NET Core support - KinesisEvents package. Amazon.Lambda.KinesisEvents - 2.0.0 + 2.1.0 Amazon.Lambda.KinesisEvents Amazon.Lambda.KinesisEvents AWS;Amazon;Lambda diff --git a/Libraries/src/Amazon.Lambda.KinesisEvents/KinesisTimeWindowEvent.cs b/Libraries/src/Amazon.Lambda.KinesisEvents/KinesisTimeWindowEvent.cs new file mode 100644 index 000000000..8a99851c7 --- /dev/null +++ b/Libraries/src/Amazon.Lambda.KinesisEvents/KinesisTimeWindowEvent.cs @@ -0,0 +1,58 @@ +namespace Amazon.Lambda.KinesisEvents +{ + using System; + using System.Collections.Generic; + + /// + /// AWS Kinesis stream event + /// https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html#services-kinesis-windows + /// + public class KinesisTimeWindowEvent : KinesisEvent + { + /// + /// Time window for the records in the event. + /// + public TimeWindow Window { get; set; } + + /// + /// State being built up to this invoke in the time window. + /// + public Dictionary State { get; set; } + + /// + /// Shard Id of the records. + /// + public string ShardId { get; set; } + + /// + /// Kinesis stream or consumer ARN. + /// + public string EventSourceARN { get; set; } + + /// + /// Set to true for the last invoke of the time window. Subsequent invoke will start a new time window along with a fresh state. + /// + public bool? IsFinalInvokeForWindow { get; set; } + + /// + /// Set to true if window is terminated prematurely. Subsequent invoke will continue the same window with a fresh state. + /// + public bool? IsWindowTerminatedEarly { get; set; } + + /// + /// Kinesis event record. + /// + public class TimeWindow + { + /// + /// Window start instant. + /// + public DateTime Start { get; set; } + + /// + /// Window end instant. + /// + public DateTime End { get; set; } + } + } +} diff --git a/Libraries/src/Amazon.Lambda.Serialization.SystemTextJson/AbstractLambdaJsonSerializer.cs b/Libraries/src/Amazon.Lambda.Serialization.SystemTextJson/AbstractLambdaJsonSerializer.cs index 0f27cf3fa..f7364ce76 100644 --- a/Libraries/src/Amazon.Lambda.Serialization.SystemTextJson/AbstractLambdaJsonSerializer.cs +++ b/Libraries/src/Amazon.Lambda.Serialization.SystemTextJson/AbstractLambdaJsonSerializer.cs @@ -142,7 +142,8 @@ protected virtual JsonSerializerOptions CreateDefaultJsonSerializationOptions() new DateTimeConverter(), new MemoryStreamConverter(), new ConstantClassConverter(), - new ByteArrayConverter() + new ByteArrayConverter(), + new LongToStringJsonConverter() } }; diff --git a/Libraries/src/Amazon.Lambda.Serialization.SystemTextJson/Amazon.Lambda.Serialization.SystemTextJson.csproj b/Libraries/src/Amazon.Lambda.Serialization.SystemTextJson/Amazon.Lambda.Serialization.SystemTextJson.csproj index 9987bad9f..457df206a 100644 --- a/Libraries/src/Amazon.Lambda.Serialization.SystemTextJson/Amazon.Lambda.Serialization.SystemTextJson.csproj +++ b/Libraries/src/Amazon.Lambda.Serialization.SystemTextJson/Amazon.Lambda.Serialization.SystemTextJson.csproj @@ -9,7 +9,7 @@ Amazon.Lambda.Serialization.SystemTextJson Amazon.Lambda.Serialization.SystemTextJson AWS;Amazon;Lambda - 2.3.1 + 2.3.2 README.md diff --git a/Libraries/src/Amazon.Lambda.Serialization.SystemTextJson/Converters/LongToStringJsonConverter.cs b/Libraries/src/Amazon.Lambda.Serialization.SystemTextJson/Converters/LongToStringJsonConverter.cs new file mode 100644 index 000000000..fd99a6e73 --- /dev/null +++ b/Libraries/src/Amazon.Lambda.Serialization.SystemTextJson/Converters/LongToStringJsonConverter.cs @@ -0,0 +1,35 @@ +using System; +using System.Buffers; +using System.Buffers.Text; +using System.Text.Json; +using System.Text.Json.Serialization; + +namespace Amazon.Lambda.Serialization.SystemTextJson.Converters +{ + public class LongToStringJsonConverter : JsonConverter + { + public override string Read(ref Utf8JsonReader reader, Type type, JsonSerializerOptions options) + { + if (reader.TokenType != JsonTokenType.Number && + type == typeof(String)) + return reader.GetString(); + + var span = reader.HasValueSequence ? reader.ValueSequence.ToArray() : reader.ValueSpan; + + if (Utf8Parser.TryParse(span, out long number, out var bytesConsumed) && span.Length == bytesConsumed) + return number.ToString(); + + var data = reader.GetString(); + + throw new InvalidOperationException($"'{data}' is not a correct expected value!") + { + Source = "LongToStringJsonConverter" + }; + } + + public override void Write(Utf8JsonWriter writer, string value, JsonSerializerOptions options) + { + writer.WriteStringValue(value.ToString()); + } + } +} diff --git a/Libraries/src/Amazon.Lambda.Serialization.SystemTextJson/LambdaJsonSerializer.cs b/Libraries/src/Amazon.Lambda.Serialization.SystemTextJson/LambdaJsonSerializer.cs index ffe817119..f1c5c0d6a 100644 --- a/Libraries/src/Amazon.Lambda.Serialization.SystemTextJson/LambdaJsonSerializer.cs +++ b/Libraries/src/Amazon.Lambda.Serialization.SystemTextJson/LambdaJsonSerializer.cs @@ -46,6 +46,7 @@ public LambdaJsonSerializer() _options.Converters.Add(new MemoryStreamConverter()); _options.Converters.Add(new ConstantClassConverter()); _options.Converters.Add(new ByteArrayConverter()); + _options.Converters.Add(new LongToStringJsonConverter()); WriterOptions = new JsonWriterOptions() { diff --git a/Libraries/test/EventsTests.Shared/EventTests.cs b/Libraries/test/EventsTests.Shared/EventTests.cs index 95afe5ef6..bdaafb992 100644 --- a/Libraries/test/EventsTests.Shared/EventTests.cs +++ b/Libraries/test/EventsTests.Shared/EventTests.cs @@ -336,6 +336,66 @@ private void Handle(KinesisEvent kinesisEvent) } } + [Theory] + [InlineData(typeof(JsonSerializer))] +#if NETCOREAPP3_1_OR_GREATER + [InlineData(typeof(Amazon.Lambda.Serialization.SystemTextJson.LambdaJsonSerializer))] + [InlineData(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))] +#endif + public void KinesisTimeWindowTest(Type serializerType) + { + var serializer = Activator.CreateInstance(serializerType) as ILambdaSerializer; + using (var fileStream = LoadJsonTestFile("kinesis-timewindow-event.json")) + { + var kinesisTimeWindowEvent = serializer.Deserialize(fileStream); + + Assert.Equal(kinesisTimeWindowEvent.ShardId, "shardId-000000000006"); + Assert.Equal(kinesisTimeWindowEvent.EventSourceARN, "arn:aws:kinesis:us-east-1:123456789012:stream/lambda-stream"); + Assert.False(kinesisTimeWindowEvent.IsFinalInvokeForWindow); + Assert.False(kinesisTimeWindowEvent.IsWindowTerminatedEarly); + Assert.Equal(kinesisTimeWindowEvent.State.Count, 2); + Assert.True(kinesisTimeWindowEvent.State.ContainsKey("1")); + Assert.Equal(kinesisTimeWindowEvent.State["1"], "282"); + Assert.True(kinesisTimeWindowEvent.State.ContainsKey("2")); + Assert.Equal(kinesisTimeWindowEvent.State["2"], "715"); + Assert.NotNull(kinesisTimeWindowEvent.Window); + Assert.Equal(637430942400000000, kinesisTimeWindowEvent.Window.Start.Ticks); + Assert.Equal(637430943600000000, kinesisTimeWindowEvent.Window.End.Ticks); + + Assert.Equal(kinesisTimeWindowEvent.Records.Count, 1); + var record = kinesisTimeWindowEvent.Records[0]; + Assert.Equal(record.EventId, "shardId-000000000006:49590338271490256608559692538361571095921575989136588898"); + Assert.Equal(record.EventName, "aws:kinesis:record"); + Assert.Equal(record.EventVersion, "1.0"); + Assert.Equal(record.EventSource, "aws:kinesis"); + Assert.Equal(record.InvokeIdentityArn, "arn:aws:iam::123456789012:role/lambda-kinesis-role"); + Assert.Equal(record.AwsRegion, "us-east-1"); + Assert.Equal(record.EventSourceARN, "arn:aws:kinesis:us-east-1:123456789012:stream/lambda-stream"); + + Assert.Equal(record.Kinesis.KinesisSchemaVersion, "1.0"); + Assert.Equal(record.Kinesis.PartitionKey, "1"); + Assert.Equal(record.Kinesis.SequenceNumber, "49590338271490256608559692538361571095921575989136588898"); + var dataBytes = record.Kinesis.Data.ToArray(); + Assert.Equal(Convert.ToBase64String(dataBytes), "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg=="); + Assert.Equal(Encoding.UTF8.GetString(dataBytes), "Hello, this is a test."); + Assert.Equal(637430942750000000, record.Kinesis.ApproximateArrivalTimestamp.ToUniversalTime().Ticks); + + Handle(kinesisTimeWindowEvent); + } + } + + private void Handle(KinesisTimeWindowEvent kinesisTimeWindowEvent) + { + foreach (var record in kinesisTimeWindowEvent.Records) + { + var kinesisRecord = record.Kinesis; + var dataBytes = kinesisRecord.Data.ToArray(); + var dataText = Encoding.UTF8.GetString(dataBytes); + Assert.Equal("Hello, this is a test.", dataText); + Console.WriteLine($"[{record.EventName}] Data = '{dataText}'."); + } + } + [Theory] [InlineData(typeof(JsonSerializer))] #if NETCOREAPP3_1_OR_GREATER diff --git a/Libraries/test/EventsTests.Shared/EventsTests.Shared.projitems b/Libraries/test/EventsTests.Shared/EventsTests.Shared.projitems index 1da4a7f78..aa67865e7 100644 --- a/Libraries/test/EventsTests.Shared/EventsTests.Shared.projitems +++ b/Libraries/test/EventsTests.Shared/EventsTests.Shared.projitems @@ -31,6 +31,7 @@ + diff --git a/Libraries/test/EventsTests.Shared/kinesis-timewindow-event.json b/Libraries/test/EventsTests.Shared/kinesis-timewindow-event.json new file mode 100644 index 000000000..6c14af23a --- /dev/null +++ b/Libraries/test/EventsTests.Shared/kinesis-timewindow-event.json @@ -0,0 +1,32 @@ +{ + "Records": [ + { + "kinesis": { + "kinesisSchemaVersion": "1.0", + "partitionKey": "1", + "sequenceNumber": "49590338271490256608559692538361571095921575989136588898", + "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", + "approximateArrivalTimestamp": 1607497475.000 + }, + "eventSource": "aws:kinesis", + "eventVersion": "1.0", + "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898", + "eventName": "aws:kinesis:record", + "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-kinesis-role", + "awsRegion": "us-east-1", + "eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/lambda-stream" + } + ], + "window": { + "start": "2020-12-09T07:04:00Z", + "end": "2020-12-09T07:06:00Z" + }, + "state": { + "1": 282, + "2": 715 + }, + "shardId": "shardId-000000000006", + "eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/lambda-stream", + "isFinalInvokeForWindow": false, + "isWindowTerminatedEarly": false +} \ No newline at end of file