Skip to content

Commit

Permalink
Added support for KinesisTimeWindowEvent.
Browse files Browse the repository at this point in the history
  • Loading branch information
ashishdhingra committed Aug 1, 2023
1 parent 0e443c1 commit 37d9352
Show file tree
Hide file tree
Showing 9 changed files with 191 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<TargetFramework>netstandard2.0</TargetFramework>
<Description>Amazon Lambda .NET Core support - KinesisEvents package.</Description>
<AssemblyTitle>Amazon.Lambda.KinesisEvents</AssemblyTitle>
<VersionPrefix>2.0.0</VersionPrefix>
<VersionPrefix>2.1.0</VersionPrefix>
<AssemblyName>Amazon.Lambda.KinesisEvents</AssemblyName>
<PackageId>Amazon.Lambda.KinesisEvents</PackageId>
<PackageTags>AWS;Amazon;Lambda</PackageTags>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
namespace Amazon.Lambda.KinesisEvents
{
using System;
using System.Collections.Generic;

/// <summary>
/// AWS Kinesis stream event
/// https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html#services-kinesis-windows
/// </summary>
public class KinesisTimeWindowEvent : KinesisEvent
{
/// <summary>
/// Time window for the records in the event.
/// </summary>
public TimeWindow Window { get; set; }

/// <summary>
/// State being built up to this invoke in the time window.
/// </summary>
public Dictionary<string, string> State { get; set; }

/// <summary>
/// Shard Id of the records.
/// </summary>
public string ShardId { get; set; }

/// <summary>
/// Kinesis stream or consumer ARN.
/// </summary>
public string EventSourceARN { get; set; }

/// <summary>
/// Set to true for the last invoke of the time window. Subsequent invoke will start a new time window along with a fresh state.
/// </summary>
public bool? IsFinalInvokeForWindow { get; set; }

/// <summary>
/// Set to true if window is terminated prematurely. Subsequent invoke will continue the same window with a fresh state.
/// </summary>
public bool? IsWindowTerminatedEarly { get; set; }

/// <summary>
/// Kinesis event record.
/// </summary>
public class TimeWindow
{
/// <summary>
/// Window start instant.
/// </summary>
public DateTime Start { get; set; }

/// <summary>
/// Window end instant.
/// </summary>
public DateTime End { get; set; }
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,8 @@ protected virtual JsonSerializerOptions CreateDefaultJsonSerializationOptions()
new DateTimeConverter(),
new MemoryStreamConverter(),
new ConstantClassConverter(),
new ByteArrayConverter()
new ByteArrayConverter(),
new LongToStringJsonConverter()
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
<AssemblyName>Amazon.Lambda.Serialization.SystemTextJson</AssemblyName>
<PackageId>Amazon.Lambda.Serialization.SystemTextJson</PackageId>
<PackageTags>AWS;Amazon;Lambda</PackageTags>
<VersionPrefix>2.3.1</VersionPrefix>
<VersionPrefix>2.3.2</VersionPrefix>
<PackageReadmeFile>README.md</PackageReadmeFile>
</PropertyGroup>
<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
using System;
using System.Buffers;
using System.Buffers.Text;
using System.Text.Json;
using System.Text.Json.Serialization;

namespace Amazon.Lambda.Serialization.SystemTextJson.Converters
{
public class LongToStringJsonConverter : JsonConverter<string>
{
public override string Read(ref Utf8JsonReader reader, Type type, JsonSerializerOptions options)
{
if (reader.TokenType != JsonTokenType.Number &&
type == typeof(String))
return reader.GetString();

var span = reader.HasValueSequence ? reader.ValueSequence.ToArray() : reader.ValueSpan;

if (Utf8Parser.TryParse(span, out long number, out var bytesConsumed) && span.Length == bytesConsumed)
return number.ToString();

var data = reader.GetString();

throw new InvalidOperationException($"'{data}' is not a correct expected value!")
{
Source = "LongToStringJsonConverter"
};
}

public override void Write(Utf8JsonWriter writer, string value, JsonSerializerOptions options)
{
writer.WriteStringValue(value.ToString());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public LambdaJsonSerializer()
_options.Converters.Add(new MemoryStreamConverter());
_options.Converters.Add(new ConstantClassConverter());
_options.Converters.Add(new ByteArrayConverter());
_options.Converters.Add(new LongToStringJsonConverter());

WriterOptions = new JsonWriterOptions()
{
Expand Down
60 changes: 60 additions & 0 deletions Libraries/test/EventsTests.Shared/EventTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,66 @@ private void Handle(KinesisEvent kinesisEvent)
}
}

[Theory]
[InlineData(typeof(JsonSerializer))]
#if NETCOREAPP3_1_OR_GREATER
[InlineData(typeof(Amazon.Lambda.Serialization.SystemTextJson.LambdaJsonSerializer))]
[InlineData(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))]
#endif
public void KinesisTimeWindowTest(Type serializerType)
{
var serializer = Activator.CreateInstance(serializerType) as ILambdaSerializer;
using (var fileStream = LoadJsonTestFile("kinesis-timewindow-event.json"))
{
var kinesisTimeWindowEvent = serializer.Deserialize<KinesisTimeWindowEvent>(fileStream);

Assert.Equal(kinesisTimeWindowEvent.ShardId, "shardId-000000000006");
Assert.Equal(kinesisTimeWindowEvent.EventSourceARN, "arn:aws:kinesis:us-east-1:123456789012:stream/lambda-stream");
Assert.False(kinesisTimeWindowEvent.IsFinalInvokeForWindow);
Assert.False(kinesisTimeWindowEvent.IsWindowTerminatedEarly);
Assert.Equal(kinesisTimeWindowEvent.State.Count, 2);
Assert.True(kinesisTimeWindowEvent.State.ContainsKey("1"));
Assert.Equal(kinesisTimeWindowEvent.State["1"], "282");
Assert.True(kinesisTimeWindowEvent.State.ContainsKey("2"));
Assert.Equal(kinesisTimeWindowEvent.State["2"], "715");
Assert.NotNull(kinesisTimeWindowEvent.Window);
Assert.Equal(637430942400000000, kinesisTimeWindowEvent.Window.Start.Ticks);
Assert.Equal(637430943600000000, kinesisTimeWindowEvent.Window.End.Ticks);

Assert.Equal(kinesisTimeWindowEvent.Records.Count, 1);
var record = kinesisTimeWindowEvent.Records[0];
Assert.Equal(record.EventId, "shardId-000000000006:49590338271490256608559692538361571095921575989136588898");
Assert.Equal(record.EventName, "aws:kinesis:record");
Assert.Equal(record.EventVersion, "1.0");
Assert.Equal(record.EventSource, "aws:kinesis");
Assert.Equal(record.InvokeIdentityArn, "arn:aws:iam::123456789012:role/lambda-kinesis-role");
Assert.Equal(record.AwsRegion, "us-east-1");
Assert.Equal(record.EventSourceARN, "arn:aws:kinesis:us-east-1:123456789012:stream/lambda-stream");

Assert.Equal(record.Kinesis.KinesisSchemaVersion, "1.0");
Assert.Equal(record.Kinesis.PartitionKey, "1");
Assert.Equal(record.Kinesis.SequenceNumber, "49590338271490256608559692538361571095921575989136588898");
var dataBytes = record.Kinesis.Data.ToArray();
Assert.Equal(Convert.ToBase64String(dataBytes), "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==");
Assert.Equal(Encoding.UTF8.GetString(dataBytes), "Hello, this is a test.");
Assert.Equal(637430942750000000, record.Kinesis.ApproximateArrivalTimestamp.ToUniversalTime().Ticks);

Handle(kinesisTimeWindowEvent);
}
}

private void Handle(KinesisTimeWindowEvent kinesisTimeWindowEvent)
{
foreach (var record in kinesisTimeWindowEvent.Records)
{
var kinesisRecord = record.Kinesis;
var dataBytes = kinesisRecord.Data.ToArray();
var dataText = Encoding.UTF8.GetString(dataBytes);
Assert.Equal("Hello, this is a test.", dataText);
Console.WriteLine($"[{record.EventName}] Data = '{dataText}'.");
}
}

[Theory]
[InlineData(typeof(JsonSerializer))]
#if NETCOREAPP3_1_OR_GREATER
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
<Content Include="$(MSBuildThisFileDirectory)dynamodb-event.json" />
<Content Include="$(MSBuildThisFileDirectory)ecs-container-state-change-event.json" />
<Content Include="$(MSBuildThisFileDirectory)ecs-task-state-change-event.json" />
<Content Include="$(MSBuildThisFileDirectory)kinesis-timewindow-event.json" />
<Content Include="$(MSBuildThisFileDirectory)websocket-api-connect-request.json" />
<Content Include="$(MSBuildThisFileDirectory)custom-authorizer-v2-iam-response.json" />
<Content Include="$(MSBuildThisFileDirectory)custom-authorizer-v2-simple-response.json" />
Expand Down
32 changes: 32 additions & 0 deletions Libraries/test/EventsTests.Shared/kinesis-timewindow-event.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
{
"Records": [
{
"kinesis": {
"kinesisSchemaVersion": "1.0",
"partitionKey": "1",
"sequenceNumber": "49590338271490256608559692538361571095921575989136588898",
"data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==",
"approximateArrivalTimestamp": 1607497475.000
},
"eventSource": "aws:kinesis",
"eventVersion": "1.0",
"eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898",
"eventName": "aws:kinesis:record",
"invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-kinesis-role",
"awsRegion": "us-east-1",
"eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/lambda-stream"
}
],
"window": {
"start": "2020-12-09T07:04:00Z",
"end": "2020-12-09T07:06:00Z"
},
"state": {
"1": 282,
"2": 715
},
"shardId": "shardId-000000000006",
"eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/lambda-stream",
"isFinalInvokeForWindow": false,
"isWindowTerminatedEarly": false
}

0 comments on commit 37d9352

Please sign in to comment.