Skip to content

Commit

Permalink
Added support for DynamoDBTimeWindowEvent and KinesisTimeWindowEvent.
Browse files Browse the repository at this point in the history
  • Loading branch information
ashishdhingra committed Aug 2, 2023
1 parent 0e443c1 commit 72d4956
Show file tree
Hide file tree
Showing 18 changed files with 691 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<TargetFrameworks>netstandard2.0;netcoreapp3.1</TargetFrameworks>
<Description>Amazon Lambda .NET Core support - DynamoDBEvents package.</Description>
<AssemblyTitle>Amazon.Lambda.DynamoDBEvents</AssemblyTitle>
<VersionPrefix>2.1.1</VersionPrefix>
<VersionPrefix>2.2.0</VersionPrefix>
<AssemblyName>Amazon.Lambda.DynamoDBEvents</AssemblyName>
<PackageId>Amazon.Lambda.DynamoDBEvents</PackageId>
<PackageTags>AWS;Amazon;Lambda</PackageTags>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
namespace Amazon.Lambda.DynamoDBEvents
{
using System;
using System.Collections.Generic;

/// <summary>
/// Represents an Amazon DynamodDB event when using time windows.
/// https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html#services-ddb-windows
/// </summary>
public class DynamoDBTimeWindowEvent : DynamoDBEvent
{
/// <summary>
/// Time window for the records in the event.
/// </summary>
public TimeWindow Window { get; set; }

/// <summary>
/// State being built up to this invoke in the time window.
/// </summary>
public Dictionary<string, string> State { get; set; }

/// <summary>
/// Shard Id of the records.
/// </summary>
public string ShardId { get; set; }

/// <summary>
/// DynamoDB stream Arn.
/// </summary>
public string EventSourceArn { get; set; }

/// <summary>
/// Set to true for the last invoke of the time window. Subsequent invoke will start a new time window along with a fresh state.
/// </summary>
public bool? IsFinalInvokeForWindow { get; set; }

/// <summary>
/// Set to true if window is terminated prematurely. Subsequent invoke will continue the same window with a fresh state.
/// </summary>
public bool? IsWindowTerminatedEarly { get; set; }

/// <summary>
/// Time window for the records in the event.
/// </summary>
public class TimeWindow
{
/// <summary>
/// Window start instant.
/// </summary>
public DateTime Start { get; set; }

/// <summary>
/// Window end instant.
/// </summary>
public DateTime End { get; set; }
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
namespace Amazon.Lambda.DynamoDBEvents
{
using System;
using System.Collections.Generic;
using System.Runtime.Serialization;

/// <summary>
/// Response type to return a new state for the time window and to report batch item failures for DynamoDBTimeWindowResponse.
/// </summary>
[DataContract]
public class DynamoDBTimeWindowResponse
{
/// <summary>
/// New state after processing a batch of records.
/// </summary>
[DataMember(Name = "state")]
#if NETCOREAPP_3_1
[System.Text.Json.Serialization.JsonPropertyName("state")]
#endif
public Dictionary<String, String> State { get; set; }

/// <summary>
/// A list of records which failed processing.
/// Returning the first record which failed would retry all remaining records from the batch.
/// </summary>
[DataMember(Name = "batchItemFailures")]
#if NETCOREAPP_3_1
[System.Text.Json.Serialization.JsonPropertyName("batchItemFailures")]
#endif
public IList<BatchItemFailure> BatchItemFailures { get; set; }

/// <summary>
/// Class representing the individual record which failed processing.
/// </summary>
[DataContract]
public class BatchItemFailure
{
/// <summary>
/// Sequence number of the record which failed processing.
/// </summary>
[DataMember(Name = "itemIdentifier")]
#if NETCOREAPP_3_1
[System.Text.Json.Serialization.JsonPropertyName("itemIdentifier")]
#endif
public string ItemIdentifier { get; set; }
}
}
}
51 changes: 51 additions & 0 deletions Libraries/src/Amazon.Lambda.DynamoDBEvents/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,54 @@ public class Function
}
}
```

The following is a sample class and Lambda function that receives Amazon DynamoDB event when using time windows as an input and uses `DynamoDBTimeWindowResponse` object to demonstrates how to aggregate and then process the final state. (Note that by default anything written to Console will be logged as CloudWatch Logs events.)

```csharp
public class Function
{
public DynamoDBTimeWindowResponse Handler(DynamoDBTimeWindowEvent ddbTimeWindowEvent)
{
Console.WriteLine($"Incoming event, Source Arn: {ddbTimeWindowEvent.EventSourceArn}, Shard Id: {ddbTimeWindowEvent.ShardId}");
Console.WriteLine($"Incoming state: {string.Join(';', ddbTimeWindowEvent.State.Select(s => s.Key + "=" + s.Value))}");

//Check if this is the end of the window to either aggregate or process.
if (ddbTimeWindowEvent.IsFinalInvokeForWindow.HasValue && ddbTimeWindowEvent.IsFinalInvokeForWindow.Value)
{
// Logic to handle final state of the window
Console.WriteLine("Destination invoke");
}
else
{
Console.WriteLine("Aggregate invoke");
}

//Check for early terminations
if (ddbTimeWindowEvent.IsWindowTerminatedEarly.HasValue && ddbTimeWindowEvent.IsWindowTerminatedEarly.Value)
{
Console.WriteLine("Window terminated early");
}

// Aggregation logic
var state = ddbTimeWindowEvent.State;
foreach (var record in ddbTimeWindowEvent.Records)
{
int id;
if (!state.ContainsKey(record.Dynamodb.NewImage["Id"].N) || int.TryParse(record.Dynamodb.NewImage["Id"].N, out id))
{
state[record.Dynamodb.NewImage["Id"].N] = "1";
}
else
{
state[record.Dynamodb.NewImage["Id"].N] = (id + 1).ToString();
}
}

Console.WriteLine($"Returning state: {string.Join(';', state.Select(s => s.Key + "=" + s.Value))}");
return new DynamoDBTimeWindowResponse()
{
State = state
};
}
}
```
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<TargetFramework>netstandard2.0</TargetFramework>
<Description>Amazon Lambda .NET Core support - KinesisEvents package.</Description>
<AssemblyTitle>Amazon.Lambda.KinesisEvents</AssemblyTitle>
<VersionPrefix>2.0.0</VersionPrefix>
<VersionPrefix>2.1.0</VersionPrefix>
<AssemblyName>Amazon.Lambda.KinesisEvents</AssemblyName>
<PackageId>Amazon.Lambda.KinesisEvents</PackageId>
<PackageTags>AWS;Amazon;Lambda</PackageTags>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
namespace Amazon.Lambda.KinesisEvents
{
using System;
using System.Collections.Generic;

/// <summary>
/// Represents an Amazon Kinesis event when using time windows.
/// https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html#services-kinesis-windows
/// </summary>
public class KinesisTimeWindowEvent : KinesisEvent
{
/// <summary>
/// Time window for the records in the event.
/// </summary>
public TimeWindow Window { get; set; }

/// <summary>
/// State being built up to this invoke in the time window.
/// </summary>
public Dictionary<string, string> State { get; set; }

/// <summary>
/// Shard Id of the records.
/// </summary>
public string ShardId { get; set; }

/// <summary>
/// Kinesis stream or consumer ARN.
/// </summary>
public string EventSourceARN { get; set; }

/// <summary>
/// Set to true for the last invoke of the time window. Subsequent invoke will start a new time window along with a fresh state.
/// </summary>
public bool? IsFinalInvokeForWindow { get; set; }

/// <summary>
/// Set to true if window is terminated prematurely. Subsequent invoke will continue the same window with a fresh state.
/// </summary>
public bool? IsWindowTerminatedEarly { get; set; }

/// <summary>
/// Time window for the records in the event.
/// </summary>
public class TimeWindow
{
/// <summary>
/// Window start instant.
/// </summary>
public DateTime Start { get; set; }

/// <summary>
/// Window end instant.
/// </summary>
public DateTime End { get; set; }
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
namespace Amazon.Lambda.KinesisEvents
{
using System;
using System.Collections.Generic;
using System.Runtime.Serialization;

/// <summary>
/// Response type to return a new state for the time window and to report batch item failures for KinesisTimeWindowEvent.
/// </summary>
[DataContract]
public class KinesisTimeWindowResponse
{
/// <summary>
/// New state after processing a batch of records.
/// </summary>
[DataMember(Name = "state")]
#if NETCOREAPP_3_1
[System.Text.Json.Serialization.JsonPropertyName("state")]
#endif
public Dictionary<String, String> State { get; set; }

/// <summary>
/// A list of records which failed processing.
/// Returning the first record which failed would retry all remaining records from the batch.
/// </summary>
[DataMember(Name = "batchItemFailures")]
#if NETCOREAPP_3_1
[System.Text.Json.Serialization.JsonPropertyName("batchItemFailures")]
#endif
public IList<BatchItemFailure> BatchItemFailures { get; set; }

/// <summary>
/// Class representing the individual record which failed processing.
/// </summary>
[DataContract]
public class BatchItemFailure
{
/// <summary>
/// Sequence number of the record which failed processing.
/// </summary>
[DataMember(Name = "itemIdentifier")]
#if NETCOREAPP_3_1
[System.Text.Json.Serialization.JsonPropertyName("itemIdentifier")]
#endif
public string ItemIdentifier { get; set; }
}
}
}
51 changes: 51 additions & 0 deletions Libraries/src/Amazon.Lambda.KinesisEvents/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,54 @@ public class Function
}
}
```

The following is a sample class and Lambda function that receives Amazon Kinesis event when using time windows as an input and uses `KinesisTimeWindowResponse` object to demonstrates how to aggregate and then process the final state. (Note that by default anything written to Console will be logged as CloudWatch Logs events.)

```csharp
public class Function
{
public KinesisTimeWindowResponse Handler(KinesisTimeWindowEvent kinesisTimeWindowEvent)
{
Console.WriteLine($"Incoming event, Source Arn: {kinesisTimeWindowEvent.EventSourceARN}, Shard Id: {kinesisTimeWindowEvent.ShardId}");
Console.WriteLine($"Incoming state: {string.Join(';', kinesisTimeWindowEvent.State.Select(s => s.Key + "=" + s.Value))}");

//Check if this is the end of the window to either aggregate or process.
if (kinesisTimeWindowEvent.IsFinalInvokeForWindow.HasValue && kinesisTimeWindowEvent.IsFinalInvokeForWindow.Value)
{
// Logic to handle final state of the window
Console.WriteLine("Destination invoke");
}
else
{
Console.WriteLine("Aggregate invoke");
}

//Check for early terminations
if (kinesisTimeWindowEvent.IsWindowTerminatedEarly.HasValue && kinesisTimeWindowEvent.IsWindowTerminatedEarly.Value)
{
Console.WriteLine("Window terminated early");
}

// Aggregation logic
var state = kinesisTimeWindowEvent.State;
foreach (var record in kinesisTimeWindowEvent.Records)
{
int id;
if (!state.ContainsKey(record.Kinesis.PartitionKey) || int.TryParse(record.Kinesis.PartitionKey, out id))
{
state[record.Kinesis.PartitionKey] = "1";
}
else
{
state[record.Kinesis.PartitionKey] = (id + 1).ToString();
}
}

Console.WriteLine($"Returning state: {string.Join(';', state.Select(s => s.Key + "=" + s.Value))}");
return new KinesisTimeWindowResponse()
{
State = state
};
}
}
```
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,8 @@ protected virtual JsonSerializerOptions CreateDefaultJsonSerializationOptions()
new DateTimeConverter(),
new MemoryStreamConverter(),
new ConstantClassConverter(),
new ByteArrayConverter()
new ByteArrayConverter(),
new LongToStringJsonConverter()
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
<AssemblyName>Amazon.Lambda.Serialization.SystemTextJson</AssemblyName>
<PackageId>Amazon.Lambda.Serialization.SystemTextJson</PackageId>
<PackageTags>AWS;Amazon;Lambda</PackageTags>
<VersionPrefix>2.3.1</VersionPrefix>
<VersionPrefix>2.3.2</VersionPrefix>
<PackageReadmeFile>README.md</PackageReadmeFile>
</PropertyGroup>
<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
using System;
using System.Buffers;
using System.Buffers.Text;
using System.Text.Json;
using System.Text.Json.Serialization;

namespace Amazon.Lambda.Serialization.SystemTextJson.Converters
{
public class LongToStringJsonConverter : JsonConverter<string>
{
public override string Read(ref Utf8JsonReader reader, Type type, JsonSerializerOptions options)
{
if (reader.TokenType != JsonTokenType.Number &&
type == typeof(String))
return reader.GetString();

var span = reader.HasValueSequence ? reader.ValueSequence.ToArray() : reader.ValueSpan;

if (Utf8Parser.TryParse(span, out long number, out var bytesConsumed) && span.Length == bytesConsumed)
return number.ToString();

var data = reader.GetString();

throw new InvalidOperationException($"'{data}' is not a correct expected value!")
{
Source = "LongToStringJsonConverter"
};
}

public override void Write(Utf8JsonWriter writer, string value, JsonSerializerOptions options)
{
writer.WriteStringValue(value.ToString());
}
}
}
Loading

0 comments on commit 72d4956

Please sign in to comment.