Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support for DynamoDBTimeWindowEvent, KinesisTimeWindowEvent and StreamsEventResponse (for KinesisEvent). #1559

Merged
merged 3 commits into from
Sep 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<TargetFrameworks>netstandard2.0;netcoreapp3.1</TargetFrameworks>
<Description>Amazon Lambda .NET Core support - DynamoDBEvents package.</Description>
<AssemblyTitle>Amazon.Lambda.DynamoDBEvents</AssemblyTitle>
<VersionPrefix>2.1.1</VersionPrefix>
<VersionPrefix>2.2.0</VersionPrefix>
<AssemblyName>Amazon.Lambda.DynamoDBEvents</AssemblyName>
<PackageId>Amazon.Lambda.DynamoDBEvents</PackageId>
<PackageTags>AWS;Amazon;Lambda</PackageTags>
Expand All @@ -16,4 +16,12 @@
<PackageReference Include="AWSSDK.DynamoDBv2" Version="3.7.3.24" />
</ItemGroup>

<ItemGroup>
<Compile Remove="Converters\DictionaryLongToStringJsonConverter.cs" />
</ItemGroup>

<ItemGroup Condition=" '$(TargetFramework)' != 'netstandard2.0' ">
<Compile Include="Converters\DictionaryLongToStringJsonConverter.cs" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
using System;
using System.Collections.Generic;
using System.Text.Json;
using System.Text.Json.Serialization;

namespace Amazon.Lambda.DynamoDBEvents.Converters
{
public class DictionaryLongToStringJsonConverter : JsonConverter<Dictionary<string, string>>
{
public override Dictionary<string, string> Read(ref Utf8JsonReader reader, Type type, JsonSerializerOptions options)
{
if (reader.TokenType != JsonTokenType.StartObject)
{
throw new JsonException($"JsonTokenType was of type {reader.TokenType}, only objects are supported.");
}

var dictionary = new Dictionary<string, string>();

while (reader.Read())
{
if (reader.TokenType == JsonTokenType.EndObject)
{
return dictionary;
}

// Get the key.
if (reader.TokenType != JsonTokenType.PropertyName)
{
throw new JsonException("JsonTokenType was not PropertyName.");
}

var propertyName = reader.GetString();

if (string.IsNullOrWhiteSpace(propertyName))
{
throw new JsonException("Failed to get property name.");
}

// Get the value.
reader.Read();
var keyValue = ExtractValue(ref reader, options);
dictionary.Add(propertyName, keyValue);
}

return dictionary;
}

public override void Write(Utf8JsonWriter writer, Dictionary<string, string> value, JsonSerializerOptions options)
{
// Use the built-in serializer, because it can handle dictionaries with string keys.
JsonSerializer.Serialize(writer, value, options);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't this write the value as a string but given the context of this converter shouldn't it be written as a long?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it would. I still haven’t received response from Lambda team. My hunch is that the reason there is a note to use Map<string, string> for Java is that sometimes the state might contain string values. In such case, we would not be sure on whether to convert it to long or string. That’s the reason I’m handling token type string as well in ExtractValue. Please advise.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed, got response from Lambda team that State field could have values represented as strings and in some cases long.

}

private string ExtractValue(ref Utf8JsonReader reader, JsonSerializerOptions options)
{
switch (reader.TokenType)
{
case JsonTokenType.Number:
if (reader.TryGetInt64(out var result))
{
return result.ToString();
}
throw new JsonException($"Unable to convert '{reader.TokenType}' to long value.");
case JsonTokenType.String: // If it is string, then use as it is.
return reader.GetString();
default:
throw new JsonException($"'{reader.TokenType}' is not supported.");
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
namespace Amazon.Lambda.DynamoDBEvents
ashishdhingra marked this conversation as resolved.
Show resolved Hide resolved
{
using System;
using System.Collections.Generic;
ashishdhingra marked this conversation as resolved.
Show resolved Hide resolved

#if NETCOREAPP3_1_OR_GREATER
using Amazon.Lambda.DynamoDBEvents.Converters;
using System.Text.Json.Serialization;
#endif

/// <summary>
/// Represents an Amazon DynamodDB event when using time windows.
/// https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html#services-ddb-windows
/// </summary>
public class DynamoDBTimeWindowEvent : DynamoDBEvent
{
/// <summary>
/// Time window for the records in the event.
/// </summary>
public TimeWindow Window { get; set; }

/// <summary>
/// State being built up to this invoke in the time window.
/// </summary>
#if NETCOREAPP3_1_OR_GREATER
[JsonConverter(typeof(DictionaryLongToStringJsonConverter))]
#endif
public Dictionary<string, string> State { get; set; }

/// <summary>
/// Shard Id of the records.
/// </summary>
public string ShardId { get; set; }

/// <summary>
/// DynamoDB stream Arn.
/// </summary>
public string EventSourceArn { get; set; }

/// <summary>
/// Set to true for the last invoke of the time window. Subsequent invoke will start a new time window along with a fresh state.
/// </summary>
public bool? IsFinalInvokeForWindow { get; set; }

/// <summary>
/// Set to true if window is terminated prematurely. Subsequent invoke will continue the same window with a fresh state.
/// </summary>
public bool? IsWindowTerminatedEarly { get; set; }

/// <summary>
/// Time window for the records in the event.
/// </summary>
public class TimeWindow
{
/// <summary>
/// Window start instant.
/// </summary>
public DateTime Start { get; set; }

/// <summary>
/// Window end instant.
/// </summary>
public DateTime End { get; set; }
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
namespace Amazon.Lambda.DynamoDBEvents
ashishdhingra marked this conversation as resolved.
Show resolved Hide resolved
{
using System;
using System.Collections.Generic;
using System.Runtime.Serialization;
ashishdhingra marked this conversation as resolved.
Show resolved Hide resolved

#if NETCOREAPP3_1_OR_GREATER
using Amazon.Lambda.DynamoDBEvents.Converters;
using System.Text.Json.Serialization;
#endif

/// <summary>
/// Response type to return a new state for the time window and to report batch item failures.
/// </summary>
[DataContract]
public class DynamoDBTimeWindowResponse
{
/// <summary>
/// New state after processing a batch of records.
/// </summary>
[DataMember(Name = "state")]
#if NETCOREAPP3_1_OR_GREATER
[System.Text.Json.Serialization.JsonPropertyName("state")]
[JsonConverter(typeof(DictionaryLongToStringJsonConverter))]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be combined with the previous NETCOREAPP_3_1 condition block.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have changed it to NETCOREAPP3_1_OR_GREATER (else it was failing test case) since test case was having this conditional compilation check. Tested locally.

#endif
public Dictionary<String, String> State { get; set; }

/// <summary>
/// A list of records which failed processing.
/// Returning the first record which failed would retry all remaining records from the batch.
/// </summary>
[DataMember(Name = "batchItemFailures")]
#if NETCOREAPP3_1_OR_GREATER
[System.Text.Json.Serialization.JsonPropertyName("batchItemFailures")]
#endif
public IList<BatchItemFailure> BatchItemFailures { get; set; }

/// <summary>
/// Class representing the individual record which failed processing.
/// </summary>
[DataContract]
public class BatchItemFailure
{
/// <summary>
/// Sequence number of the record which failed processing.
/// </summary>
[DataMember(Name = "itemIdentifier")]
#if NETCOREAPP3_1_OR_GREATER
[System.Text.Json.Serialization.JsonPropertyName("itemIdentifier")]
#endif
public string ItemIdentifier { get; set; }
}
}
}
51 changes: 51 additions & 0 deletions Libraries/src/Amazon.Lambda.DynamoDBEvents/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,54 @@ public class Function
}
}
```

The following is a sample class and Lambda function that receives Amazon DynamoDB event when using time windows as an input and uses `DynamoDBTimeWindowResponse` object to demonstrate how to aggregate and then process the final state. (Note that by default anything written to Console will be logged as CloudWatch Logs events.)

```csharp
public class Function
{
public DynamoDBTimeWindowResponse Handler(DynamoDBTimeWindowEvent ddbTimeWindowEvent)
{
Console.WriteLine($"Incoming event, Source Arn: {ddbTimeWindowEvent.EventSourceArn}, Shard Id: {ddbTimeWindowEvent.ShardId}");
Console.WriteLine($"Incoming state: {string.Join(';', ddbTimeWindowEvent.State.Select(s => s.Key + "=" + s.Value))}");

//Check if this is the end of the window to either aggregate or process.
if (ddbTimeWindowEvent.IsFinalInvokeForWindow.HasValue && ddbTimeWindowEvent.IsFinalInvokeForWindow.Value)
{
// Logic to handle final state of the window
Console.WriteLine("Destination invoke");
}
else
{
Console.WriteLine("Aggregate invoke");
}

//Check for early terminations
if (ddbTimeWindowEvent.IsWindowTerminatedEarly.HasValue && ddbTimeWindowEvent.IsWindowTerminatedEarly.Value)
{
Console.WriteLine("Window terminated early");
}

// Aggregation logic
var state = ddbTimeWindowEvent.State;
foreach (var record in ddbTimeWindowEvent.Records)
{
int id;
if (!state.ContainsKey(record.Dynamodb.NewImage["Id"].N) || int.TryParse(record.Dynamodb.NewImage["Id"].N, out id))
{
state[record.Dynamodb.NewImage["Id"].N] = "1";
}
else
{
state[record.Dynamodb.NewImage["Id"].N] = (id + 1).ToString();
}
}

Console.WriteLine($"Returning state: {string.Join(';', state.Select(s => s.Key + "=" + s.Value))}");
return new DynamoDBTimeWindowResponse()
{
State = state
};
}
}
```
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
<Import Project="..\..\..\buildtools\common.props" />

<PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework>
<TargetFrameworks>netstandard2.0;netcoreapp3.1</TargetFrameworks>
<Description>Amazon Lambda .NET Core support - KinesisEvents package.</Description>
<AssemblyTitle>Amazon.Lambda.KinesisEvents</AssemblyTitle>
<VersionPrefix>2.0.0</VersionPrefix>
<VersionPrefix>2.1.0</VersionPrefix>
<AssemblyName>Amazon.Lambda.KinesisEvents</AssemblyName>
<PackageId>Amazon.Lambda.KinesisEvents</PackageId>
<PackageTags>AWS;Amazon;Lambda</PackageTags>
Expand All @@ -16,4 +16,12 @@
<PackageReference Include="AWSSDK.Kinesis" Version="3.7.0" />
</ItemGroup>

<ItemGroup>
<Compile Remove="Converters\DictionaryLongToStringJsonConverter.cs" />
</ItemGroup>

<ItemGroup Condition=" '$(TargetFramework)' != 'netstandard2.0' ">
<Compile Include="Converters\DictionaryLongToStringJsonConverter.cs" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
using System;
using System.Collections.Generic;
using System.Text.Json;
using System.Text.Json.Serialization;

namespace Amazon.Lambda.KinesisEvents.Converters
{
public class DictionaryLongToStringJsonConverter : JsonConverter<Dictionary<string, string>>
{
public override Dictionary<string, string> Read(ref Utf8JsonReader reader, Type type, JsonSerializerOptions options)
{
if (reader.TokenType != JsonTokenType.StartObject)
{
throw new JsonException($"JsonTokenType was of type {reader.TokenType}, only objects are supported.");
}

var dictionary = new Dictionary<string, string>();

while (reader.Read())
{
if (reader.TokenType == JsonTokenType.EndObject)
{
return dictionary;
}

// Get the key.
if (reader.TokenType != JsonTokenType.PropertyName)
{
throw new JsonException("JsonTokenType was not PropertyName.");
}

var propertyName = reader.GetString();

if (string.IsNullOrWhiteSpace(propertyName))
{
throw new JsonException("Failed to get property name.");
}

// Get the value.
reader.Read();
var keyValue = ExtractValue(ref reader, options);
dictionary.Add(propertyName, keyValue);
}

return dictionary;
}

public override void Write(Utf8JsonWriter writer, Dictionary<string, string> value, JsonSerializerOptions options)
{
// Use the built-in serializer, because it can handle dictionaries with string keys.
JsonSerializer.Serialize(writer, value, options);
}

private string ExtractValue(ref Utf8JsonReader reader, JsonSerializerOptions options)
{
switch (reader.TokenType)
{
case JsonTokenType.Number:
if (reader.TryGetInt64(out var result))
{
return result.ToString();
}
throw new JsonException($"Unable to convert '{reader.TokenType}' to long value.");
case JsonTokenType.String: // If it is string, then use as it is.
return reader.GetString();
default:
throw new JsonException($"'{reader.TokenType}' is not supported.");
}
}
}
}
Loading
Loading