Skip to content

Commit

Permalink
Merge pull request #78 from tomasfabian/77-distinguish-the-configurat…
Browse files Browse the repository at this point in the history
…ion-for-push-and-pull-queries-using-ksqldbcontextoptions

77 Distinguish the configuration for push and pull queries using KSqlDbContextOptions
  • Loading branch information
tomasfabian committed Apr 30, 2024
2 parents 8f8c1c3 + 216175e commit 861f5d7
Show file tree
Hide file tree
Showing 92 changed files with 893 additions and 577 deletions.
22 changes: 11 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,15 @@ dotnet add package ksqlDb.RestApi.Client
```
This adds a `<PackageReference>` to your csproj file, similar to the following:
```XML
<PackageReference Include="ksqlDb.RestApi.Client" Version="4.0.0" />
<PackageReference Include="ksqlDb.RestApi.Client" Version="6.0.0" />
```

Alternative option is to use [Protobuf content type](https://github.com/tomasfabian/ksqlDB.RestApi.Client-DotNet/blob/main/docs/protobuf.md):
```
dotnet add package ksqlDb.RestApi.Client.ProtoBuf
```

The following example can be tried out with a [.NET interactive Notebook](https://github.com/tomasfabian/ksqlDB.RestApi.Client-DotNet/tree/main/Samples/Notebooks):
Feel free to experiment with the following example in a [.NET interactive Notebook](https://github.com/tomasfabian/ksqlDB.RestApi.Client-DotNet/tree/main/Samples/Notebooks):

```C#
using ksqlDB.RestApi.Client.KSql.Linq;
Expand All @@ -41,7 +41,7 @@ var contextOptions = new KSqlDBContextOptions(ksqlDbUrl)

await using var context = new KSqlDBContext(contextOptions);

using var subscription = context.CreateQueryStream<Tweet>()
using var subscription = context.CreatePushQuery<Tweet>()
.WithOffsetResetPolicy(AutoOffsetReset.Latest)
.Where(p => p.Message != "Hello world" || p.Id == 1)
.Select(l => new { l.Message, l.Id })
Expand All @@ -65,7 +65,8 @@ public class Tweet : Record
}
```

An entity class in **ksqlDB.RestApi.Client** represents the structure of a table or stream. An instance of the class represents a record in that stream while properties are mapped to columns respectively.
An entity class in **ksqlDB.RestApi.Client** represents the structure of a table or stream.
An instance of the class represents a record in that stream or table while properties are mapped to columns respectively.

LINQ code written in C# from the sample is equivalent to this KSQL query:
```SQL
Expand Down Expand Up @@ -164,7 +165,7 @@ This ensures smooth integration with the `ksqlDB.RestApi.Client` library, allowi
The **server-side Blazor** application communicates with ksqlDB using the `ksqlDB.RestApi.Client`.
Whenever an event in `ksqlDB` occurs, the server-side Blazor app responds and signals the UI in the client's browser to update. This setup allows a smooth and continuous update flow, creating a real-time, reactive user interface.

- set `docker-compose.csproj` as startup project in [InsideOut.sln](https://github.com/tomasfabian/ksqlDB.RestApi.Client-DotNet/tree/main/Samples/InsideOut) for an embedded Kafka connect integration and stream processing examples.
- set `docker-compose.csproj` as startup project in [InsideOut.sln](https://github.com/tomasfabian/ksqlDB.RestApi.Client-DotNet/tree/main/Samples/InsideOut) for embedded Kafka connect integration and stream processing examples.

# ```IQbservable<T>``` extension methods
As depicted below `IObservable<T>` is the dual of `IEnumerable<T>` and `IQbservable<T>` is the dual of `IQueryable<T>`. In all four cases LINQ providers are using deferred execution.
Expand Down Expand Up @@ -237,9 +238,8 @@ var contextOptions = new KSqlDbContextOptionsBuilder()
.SetAutoOffsetReset(AutoOffsetReset.Latest)
.SetProcessingGuarantee(ProcessingGuarantee.ExactlyOnce)
.SetIdentifierEscaping(IdentifierEscaping.Keywords)
.SetupQueryStream(options =>
.SetupPushQuery(options =>
{
//SetupQueryStream affects only IKSqlDBContext.CreateQueryStream<T>
options.Properties["ksql.query.push.v2.enabled"] = "true";
})
.Options;
Expand All @@ -253,7 +253,7 @@ This code initializes a `KSqlDbContextOptionsBuilder` to configure settings for
- `SetAutoOffsetReset(AutoOffsetReset.Latest)`: Sets the offset reset behavior to start consuming messages from the **latest** available when no committed offset is found. By default, 'auto.offset.reset' is configured to 'earliest'.
- `SetProcessingGuarantee(ProcessingGuarantee.ExactlyOnce)`: Specifies the processing guarantee as **exactly-once** semantics.
- `SetIdentifierEscaping(IdentifierEscaping.Keywords)`: Escapes identifiers such as table and column names that are SQL keywords.
- `SetupQueryStream(options => { ... })`: Configures query stream options, specifically enabling KSQL query push version 2.
- `SetupPushQuery(options => { ... })`: Configures push query options, specifically enabling KSQL query push version 2.

Finally, `.Options` returns the configured options for the `ksqlDB` context.

Expand All @@ -263,7 +263,7 @@ Stream names are generated based on the generic record types. They are pluralize
**By default the generated from item names such as stream and table names are pluralized**. This behavior could be switched off with the following `ShouldPluralizeStreamName` configuration.

```C#
context.CreateQueryStream<Person>();
context.CreatePushQuery<Person>();
```
```SQL
FROM People
Expand All @@ -275,15 +275,15 @@ var contextOptions = new KSqlDBContextOptions(@"http://localhost:8088")
ShouldPluralizeFromItemName = false
};

new KSqlDBContext(contextOptions).CreateQueryStream<Person>();
new KSqlDBContext(contextOptions).CreatePushQuery<Person>();
```
```SQL
FROM Person
```

Setting an arbitrary stream name (from_item name):
```C#
context.CreateQueryStream<Tweet>("custom_topic_name");
context.CreatePushQuery<Tweet>("custom_topic_name");
```
```SQL
FROM custom_topic_name
Expand Down
2 changes: 1 addition & 1 deletion Samples/Aggregations/Aggregations.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="ksqlDb.RestApi.Client" Version="5.0.0" />
<PackageReference Include="ksqlDb.RestApi.Client" Version="6.0.0" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="8.0.0" />
</ItemGroup>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public ApplicationKSqlDbContext(KSqlDBContextOptions contextOptions, ILoggerFact
{
}

public ksqlDB.RestApi.Client.KSql.Linq.IQbservable<Tweet> Tweets => CreateQueryStream<Tweet>();
public ksqlDB.RestApi.Client.KSql.Linq.IQbservable<Tweet> Tweets => CreatePushQuery<Tweet>();
}

public interface IApplicationKSqlDbContext : IKSqlDBContext
Expand Down
24 changes: 12 additions & 12 deletions Samples/Aggregations/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ static IDisposable Having(IKSqlDBContext context)
{
return
//https://kafka-tutorials.confluent.io/finding-distinct-events/ksql.html
context.CreateQueryStream<Click>()
context.CreatePushQuery<Click>()
.GroupBy(c => new { c.IP_ADDRESS, c.URL, c.TIMESTAMP })
.WindowedBy(new TimeWindows(Duration.OfMinutes(2)))
.Having(c => c.Count(g => c.Key.IP_ADDRESS) == 1)
Expand All @@ -91,7 +91,7 @@ static async Task GroupBy()

await using var context = new KSqlDBContext(contextOptions);

context.CreateQueryStream<Tweet>()
context.CreatePushQuery<Tweet>()
.GroupBy(c => c.Id)
.Select(g => new { Id = g.Key, Count = g.Count() })
.Subscribe(count =>
Expand All @@ -101,7 +101,7 @@ static async Task GroupBy()
}, error => { Console.WriteLine($"Exception: {error.Message}"); }, () => Console.WriteLine("Completed"));


context.CreateQueryStream<Tweet>()
context.CreatePushQuery<Tweet>()
.GroupBy(c => c.Id)
.Select(g => g.Count())
.Subscribe(count =>
Expand All @@ -110,7 +110,7 @@ static async Task GroupBy()
Console.WriteLine();
}, error => { Console.WriteLine($"Exception: {error.Message}"); }, () => Console.WriteLine("Completed"));

context.CreateQueryStream<Tweet>()
context.CreatePushQuery<Tweet>()
.GroupBy(c => c.Id)
.Select(g => new { Count = g.Count() })
.Subscribe(count =>
Expand All @@ -120,7 +120,7 @@ static async Task GroupBy()
}, error => { Console.WriteLine($"Exception: {error.Message}"); }, () => Console.WriteLine("Completed"));

//Sum
var subscription = context.CreateQueryStream<Tweet>()
var subscription = context.CreatePushQuery<Tweet>()
.GroupBy(c => c.Id)
//.Select(g => g.Sum(c => c.Id))
.Select(g => new { Id = g.Key, MySum = g.Sum(c => c.Id) })
Expand All @@ -130,7 +130,7 @@ static async Task GroupBy()
Console.WriteLine();
}, error => { Console.WriteLine($"Exception: {error.Message}"); }, () => Console.WriteLine("Completed"));

var groupBySubscription = context.CreateQueryStream<IoTSensorChange>("sqlserversensors")
var groupBySubscription = context.CreatePushQuery<IoTSensorChange>("sqlserversensors")
.GroupBy(c => new { c.Op, c.After!.Value })
.Select(g => new { g.Source.Op, g.Source.After!.Value, num_times = g.Count() })
.Subscribe(c =>
Expand All @@ -146,13 +146,13 @@ static IDisposable Window(IKSqlDBContext context)
{
new TimeWindows(Duration.OfSeconds(2), OutputRefinement.Final).WithGracePeriod(Duration.OfSeconds(2));

var subscription1 = context.CreateQueryStream<Tweet>()
var subscription1 = context.CreatePushQuery<Tweet>()
.GroupBy(c => c.Id)
.WindowedBy(new TimeWindows(Duration.OfSeconds(5)).WithGracePeriod(Duration.OfHours(2)))
.Select(g => new { g.WindowStart, g.WindowEnd, Id = g.Key, Count = g.Count() })
.Subscribe(c => { Console.WriteLine($"{c.Id}: {c.Count}: {c.WindowStart}: {c.WindowEnd}"); }, exception => { Console.WriteLine(exception.Message); });

var query = context.CreateQueryStream<Tweet>()
var query = context.CreatePushQuery<Tweet>()
.GroupBy(c => c.Id)
.WindowedBy(new HoppingWindows(Duration.OfSeconds(5)).WithAdvanceBy(Duration.OfSeconds(4))
.WithRetention(Duration.OfDays(7)))
Expand All @@ -168,7 +168,7 @@ static IDisposable Window(IKSqlDBContext context)

static IDisposable CountDistinct(IKSqlDBContext context)
{
var subscription = context.CreateQueryStream<Tweet>()
var subscription = context.CreatePushQuery<Tweet>()
.GroupBy(c => c.Id)
// .Select(g => new { Id = g.Key, Count = g.CountDistinct(c => c.Message) })
.Select(g => new { Id = g.Key, Count = g.LongCountDistinct(c => c.Message) })
Expand All @@ -182,7 +182,7 @@ static IDisposable CountDistinct(IKSqlDBContext context)

static IDisposable CollectSet(IKSqlDBContext context)
{
var subscription = context.CreateQueryStream<Tweet>()
var subscription = context.CreatePushQuery<Tweet>()
.GroupBy(c => c.Id)
.Select(g => new { Id = g.Key, Array = g.CollectSet(c => c.Message) })
//.Select(g => new { Id = g.Key, Array = g.CollectList(c => c.Message) })
Expand All @@ -200,7 +200,7 @@ static IDisposable CollectSet(IKSqlDBContext context)

static IDisposable TopKDistinct(IKSqlDBContext context)
{
return context.CreateQueryStream<Tweet>()
return context.CreatePushQuery<Tweet>()
.GroupBy(c => c.Id)
.Select(g => new { Id = g.Key, TopK = g.TopKDistinct(c => c.Amount, 2) })
// .Select(g => new { Id = g.Key, TopK = g.TopK(c => c.Amount, 2) })
Expand All @@ -221,7 +221,7 @@ static void EmitFinal(IKSqlDBContext ksqlDbContext)
var tumblingWindow =
new TimeWindows(Duration.OfSeconds(2), OutputRefinement.Final).WithGracePeriod(Duration.OfSeconds(2));

var query = ksqlDbContext.CreateQueryStream<Tweet>()
var query = ksqlDbContext.CreatePushQuery<Tweet>()
.WithOffsetResetPolicy(AutoOffsetReset.Earliest)
.GroupBy(c => c.Id)
.WindowedBy(tumblingWindow)
Expand Down
2 changes: 1 addition & 1 deletion Samples/Avro/Avro.Samples.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="ksqlDb.RestApi.Client" Version="5.0.0" />
<PackageReference Include="ksqlDb.RestApi.Client" Version="6.0.0" />
<PackageReference Include="Confluent.SchemaRegistry.Serdes.Avro" Version="2.3.0" />
</ItemGroup>
</Project>
2 changes: 1 addition & 1 deletion Samples/Avro/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@

await using var context = new KSqlDBContext(ksqlDbUrl);

using var disposable = context.CreateQueryStream<IoTSensor>()
using var disposable = context.CreatePushQuery<IoTSensor>()
.ToObservable()
.Subscribe(onNext: sensor =>
{
Expand Down
4 changes: 2 additions & 2 deletions Samples/Blazor.Sample/Blazor.Sample.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,15 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="ksqlDb.RestApi.Client" Version="4.0.0" />
<PackageReference Include="ksqlDb.RestApi.Client" Version="6.0.0" />
<PackageReference Include="SqlServer.Connector" Version="1.0.0" />
<PackageReference Include="System.IdentityModel.Tokens.Jwt" Version="7.5.0" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\..\Samples\InsideOut\InsideOut.csproj" />
<!-- <ProjectReference Include="..\..\SqlServer.Connector\SqlServer.Connector.csproj" /> -->
<!-- <ProjectReference Include="..\..\ksqlDb.RestApi.Client\ksqlDb.RestApi.Client.csproj" /> -->
<!-- <ProjectReference Include="..\..\ksqlDb.RestApi.Client\ksqlDb.RestApi.Client.csproj" /> -->
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ private async Task SubscribeToQuery(SynchronizationContext synchronizationContex

await using var context = new KSqlDBContext(options);

context.CreateQuery<SensorsStream>("SensorsStream")
context.CreatePushQuery<SensorsStream>("SensorsStream")
.ToObservable()
.ObserveOn(synchronizationContext)
.Subscribe(c =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ protected override async Task OnInitializedAsync()

await using var context = new KSqlDBContext(options);

subscription = context.CreateQuery<IoTSensorStats>(TopicNames.SensorsTable)
subscription = context.CreatePushQuery<IoTSensorStats>(TopicNames.SensorsTable)
.ToObservable()
.ObserveOn(synchronizationContext)
.Subscribe(c =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ private async Task SubscribeToQuery(SynchronizationContext synchronizationContex

await using var context = new KSqlDBContext(options);

cdcSubscription = context.CreateQuery<IoTSensorChange>("sqlserversensors")
cdcSubscription = context.CreatePushQuery<IoTSensorChange>("sqlserversensors")
.WithOffsetResetPolicy(AutoOffsetReset.Latest)
.Where(c => c.Op != "r" && (c.After == null || c.After.SensorId != "d542a2b3-c"))
.ToObservable()
Expand All @@ -237,7 +237,7 @@ private async Task SubscribeToRawQuery(SynchronizationContext synchronizationCon

await using var context = new KSqlDBContext(options);

cdcSubscription = context.CreateQuery<IoTSensorRawChange>("sqlserversensors")
cdcSubscription = context.CreatePushQuery<IoTSensorRawChange>("sqlserversensors")
.WithOffsetResetPolicy(AutoOffsetReset.Latest)
.ToObservable()
.ObserveOn(synchronizationContext)
Expand Down
2 changes: 1 addition & 1 deletion Samples/DataTypes/DataTypes.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="ksqlDb.RestApi.Client" Version="5.0.0" />
<PackageReference Include="ksqlDb.RestApi.Client" Version="6.0.0" />
</ItemGroup>

</Project>
18 changes: 9 additions & 9 deletions Samples/DataTypes/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

static async Task StructType(KSqlDBContext context)
{
var moviesStream = context.CreateQueryStream<Movie>();
var moviesStream = context.CreatePushQuery<Movie>();

var source = moviesStream.Select(c => new
{
Expand All @@ -46,12 +46,12 @@ static async Task StructType(KSqlDBContext context)
static IDisposable Arrays(KSqlDBContext context)
{
var subscription =
context.CreateQueryStream<Movie>()
context.CreatePushQuery<Movie>()
.Select(_ => new { FirstItem = new[] { 1, 2, 3 }[1] })
.Subscribe(onNext: c => { Console.WriteLine($"Array first value: {c}"); },
onError: error => { Console.WriteLine($"Exception: {error.Message}"); });

var arrayLengthQuery = context.CreateQueryStream<Movie>()
var arrayLengthQuery = context.CreatePushQuery<Movie>()
.Select(_ => new[] { 1, 2, 3 }.Length)
.ToQueryString();

Expand All @@ -61,7 +61,7 @@ static IDisposable Arrays(KSqlDBContext context)
static IDisposable NestedTypes(KSqlDBContext context)
{
var disposable =
context.CreateQueryStream<Movie>()
context.CreatePushQuery<Movie>()
.Select(c => new
{
MapValue = new Dictionary<string, Dictionary<string, int>>
Expand All @@ -86,7 +86,7 @@ static IDisposable NestedTypes(KSqlDBContext context)

static async Task DeeplyNestedTypes(KSqlDBContext context)
{
var moviesStream = context.CreateQueryStream<Movie>();
var moviesStream = context.CreatePushQuery<Movie>();

var source = moviesStream.Select(c => new
{
Expand Down Expand Up @@ -140,12 +140,12 @@ static async Task TimeTypes(IKSqlDbRestApiClient restApiClient, IKSqlDBContext c
var from = new TimeSpan(1, 0, 0);
var to = new TimeSpan(22, 0, 0);

var query = context.CreateQueryStream<Dates>()
var query = context.CreatePushQuery<Dates>()
.Select(c => new { c.Ts, to, FromTime = from, DateTime.Now, New = new TimeSpan(1, 0, 0) })
.ToQueryString();

//.Select(c => new { c.Ts, to, FromTime = from, DateTime.Now, New = new TimeSpan(1, 0, 0) })
using var subscription = context.CreateQueryStream<Dates>()
using var subscription = context.CreatePushQuery<Dates>()
.Where(c => c.Ts.Between(from, to))
.Subscribe(onNext: m =>
{
Expand All @@ -169,7 +169,7 @@ static async Task TimeTypes(IKSqlDbRestApiClient restApiClient, IKSqlDBContext c

static void Bytes(IKSqlDBContext ksqlDbContext)
{
var ksql = ksqlDbContext.CreateQuery<Thumbnail>()
var ksql = ksqlDbContext.CreatePushQuery<Thumbnail>()
.Select(c => new { Col = K.Functions.FromBytes(c.Image, "hex") })
.ToQueryString();
}
Expand All @@ -186,7 +186,7 @@ static async Task SubscriptionToAComplexTypeAsync(IKSqlDbRestApiClient restApiCl
httpResponseMessage = await restApiClient.CreateTypeAsync<EventCategory>();
httpResponseMessage = await restApiClient.CreateTableAsync<Event>(new EntityCreationMetadata("Events") { Partitions = 1 });

var subscription = ksqlDbContext.CreateQueryStream<Event>()
var subscription = ksqlDbContext.CreatePushQuery<Event>()
.Subscribe(value =>
{
Console.WriteLine("Categories: ");
Expand Down
2 changes: 1 addition & 1 deletion Samples/GraphQL/GraphQL.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
<ItemGroup>
<PackageReference Include="HotChocolate.AspNetCore" Version="13.9.0" />
<PackageReference Include="HotChocolate.Data" Version="13.9.0" />
<PackageReference Include="ksqlDb.RestApi.Client" Version="4.0.0" />
<PackageReference Include="ksqlDb.RestApi.Client" Version="6.0.0" />
<PackageReference Include="Microsoft.VisualStudio.Azure.Containers.Tools.Targets" Version="1.20.1" />
</ItemGroup>
</Project>
2 changes: 1 addition & 1 deletion Samples/GraphQL/ksqlDB/MoviesKSqlDbContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ public MoviesKSqlDbContext(KSqlDBContextOptions contextOptions, ILoggerFactory?
{
}

public IQbservable<Movie> Movies => CreateQueryStream<Movie>();
public IQbservable<Movie> Movies => CreatePushQuery<Movie>();
}
Loading

0 comments on commit 861f5d7

Please sign in to comment.