Skip to content

ksqlDB.RestApi.Client workshop

Tomas Fabian edited this page May 10, 2024 · 28 revisions

Create a new ksqlDB.RestApi.Client service project

Prerequisites

  • Visual Studio 2022
  • .NET 5.0 or higher

Hosted services provide a way to encapsulate background operations and have them managed by the hosting environment in a structured and controlled manner. It enables the execution of asynchronous tasks alongside the main application logic, contributing to the overall responsiveness and efficiency of the application.

To develop a streaming microservice that runs as an IHostedService, providing continuous event streaming using a streaming platform or technology, we will employ the ksqlDB.RestApi.Client library. The microservice we will create takes advantage of the IHostedService interface in .NET to handle the lifecycle management of the microservice within the hosting environment.

  1. create a new solution and worker service project
mkdir ksqlDB.RestApi.Client.Sensors
cd ksqlDB.RestApi.Client.Sensors

Add a nuget.config file in the root of your project repository:

dotnet new nugetconfig

If you haven't installed the latest .NET preview version, you can include a global.json file and specify a preferred SDK version for overriding:

dotnet --list-sdks

The following example shows the global.json syntax:

{
  "sdk": {
    "version": "6.0.302"
  }
}

Now we are ready to create our solution:

dotnet new sln -n SensorsStreamProcessor
dotnet new worker -n ksqlDB.RestApi.Client.Sensors
dotnet sln add ksqlDB.RestApi.Client.Sensors

Add a reference to the ksqlDB.RestApi.Client NuGet package.

dotnet add ksqlDB.RestApi.Client.Sensors package ksqlDb.RestApi.Client --version 6.0.0

We prepared the basic structure of the solution and therefore we can open it now with Visual Studio:

./SensorsStreamProcessor.sln

Note: Many of the commands mentioned above could have been executed directly within Visual Studio.

Register the KSqlDbContext

Add two new folders named 'KSqlDb' and 'Model' where we want to place all our database-related code. In Visual Studio right-click the project with the name KSqlDB.RestApi.Client.Sensors and select Add->New Folder.

Add a new file Sensor.cs in the 'Model' directory using the following code:

namespace ksqlDB.RestApi.Client.Sensors.Model;

public record Sensor
{
  public int Id { get; init; }

  public string Name { get; init; } = null!;

  public bool IsAvailable { get; init; }
}

In the KSqlDB.RestApi.Client library, the KSqlDBContext class provides a convenient way to interact with ksqlDB and execute push queries. The CreatePushQuerym method of KSqlDBContext is used to create a continuous query stream that allows you to receive real-time updates from ksqlDB as the query results change.

Next, add a file named SensorsKSqlDbContext.cs in the 'KSqlDb' directory using the following code:

using ksqlDB.RestApi.Client.KSql.Linq;
using ksqlDB.RestApi.Client.KSql.Query.Context;
using ksqlDB.RestApi.Client.Sensors.Model;

namespace ksqlDB.RestApi.Client.Sensors.KSqlDb;

internal interface ISensorsKSqlDbContext : IKSqlDBContext
{
  IQbservable<Sensor> Sensors { get; }
}

internal class SensorsKSqlDbContext : KSqlDBContext, ISensorsKSqlDbContext
{
  public SensorsKSqlDbContext(string ksqlDbUrl, ILoggerFactory? loggerFactory = null) 
    : base(ksqlDbUrl, loggerFactory)
  {
  }

  public SensorsKSqlDbContext(KSqlDBContextOptions contextOptions, ILoggerFactory? loggerFactory = null) 
    : base(contextOptions, loggerFactory)
  {
  }

  public IQbservable<Sensor> Sensors => CreatePushQuery<Sensor>();
}

Extracting an interface from the KSqlDBContext class can provide several benefits in your application's design and development process:

  • Abstraction and Loose Coupling: By defining an interface for KSqlDBContext, you create an abstraction layer that allows you to decouple the code that depends on KSqlDBContext from its concrete implementation. This promotes loose coupling, making it easier to swap out implementations or mock the behavior of the KSqlDBContext during testing.

  • Testability: By extracting an interface, you enable easier unit testing. You can create mock or stub implementations of the interface to simulate the behavior of the KSqlDBContext without needing a real connection to ksqlDB. This allows for more isolated and focused testing of the dependent code.

  • Dependency Inversion: By depending on the interface rather than the concrete implementation, you adhere to the Dependency Inversion Principle. This principle promotes dependencies on abstractions rather than concrete implementations, which leads to more flexible and maintainable code.

The Host.CreateDefaultBuilder method is a convenient way to create a default IHostBuilder instance. It sets up a basic host configuration with sensible defaults, allowing you to quickly bootstrap your application.

SensorsKSqlDbContext can be registered now with the IServiceCollection. Open Program.cs file and edit it with the below code:

using ksqlDB.RestApi.Client.Sensors;
using ksqlDB.RestApi.Client.KSql.Query.Options;
using ksqlDb.RestApi.Client.DependencyInjection;
using ksqlDB.RestApi.Client.Sensors.KSqlDb;

IHost host = Host.CreateDefaultBuilder(args)
    .ConfigureServices(services =>
    {
      var ksqlDbUrl = @"http://localhost:8088";

      services.AddDbContext<ISensorsKSqlDbContext, SensorsKSqlDbContext>(
        options =>
        {
          var setupParameters = options.UseKSqlDb(ksqlDbUrl);

          setupParameters.SetAutoOffsetReset(AutoOffsetReset.Earliest);

        }, ServiceLifetime.Transient, restApiLifetime: ServiceLifetime.Transient);

      services.AddHostedService<Worker>();
    })
    .Build();

await host.RunAsync();

In this example, the 'Worker' class implements the IHostedService interface. It performs an asynchronous operation to create a ksqlDB stream and then subscribes to it.

The StartAsync method performs the necessary initialization tasks when the hosted service starts. It logs a message and creates a Kafka stream in ksqlDB based on the provided metadata.

We can now proceed to set up our initial ksqlDB push query. Simply replace the code within the Worker.cs class with the following snippet:

using ksqlDB.RestApi.Client.KSql.Linq;
using ksqlDB.RestApi.Client.KSql.RestApi;
using ksqlDB.RestApi.Client.KSql.RestApi.Statements;
using ksqlDB.RestApi.Client.Sensors.KSqlDb;
using ksqlDB.RestApi.Client.Sensors.Model;

namespace ksqlDB.RestApi.Client.Sensors;

internal class Worker : IHostedService
{
  private readonly ISensorsKSqlDbContext kSqlDbContext;
  private readonly IKSqlDbRestApiClient kSqlDbRestApiClient;
  private readonly ILogger<Worker> logger;

  public Worker(ISensorsKSqlDbContext kSqlDbContext, IKSqlDbRestApiClient kSqlDbRestApiClient, ILogger<Worker> logger)
  {
    this.kSqlDbContext = kSqlDbContext;
    this.kSqlDbRestApiClient = kSqlDbRestApiClient;
    this.logger = logger;
  }

  public async Task StartAsync(CancellationToken cancellationToken)
  {
    logger.LogInformation("Worker started at: {time}", DateTimeOffset.Now);
    
    EntityCreationMetadata metadata = new(kafkaTopic: nameof(Sensor))
    {
      Partitions = 3,
      Replicas = 1
    };

    var httpResponseMessage = await kSqlDbRestApiClient.CreateStreamAsync<Sensor>(metadata, ifNotExists: true, cancellationToken: cancellationToken).ConfigureAwait(false);
   
    SubscribeToSensors();
  }

  private IDisposable subscription = null!;

  private void SubscribeToSensors()
  {
    subscription = kSqlDbContext.Sensors
      .Where(c => c.IsAvailable)
      .Subscribe(
          onNext: sensor =>
          {
            Console.WriteLine($"{nameof(Sensor)}: {sensor.Id} - {sensor.Name}");
            Console.WriteLine();
          },
          onError: error => { Console.WriteLine($"Exception: {error.Message}"); },
          onCompleted: () => Console.WriteLine("Completed"));
  }

  public Task StopAsync(CancellationToken cancellationToken)
  {
    logger.LogInformation("Stopping.");

    using (subscription)
    {
    }

    return Task.CompletedTask;
  }
}

The StopAsync method in the IHostedService interface of .NET is responsible for gracefully stopping the hosted service. It is called when the application is shutting down or when the service needs to be stopped due to certain conditions.

If you are working with an IQbservable<T> that implements IDisposable, you can use the using operator to ensure that the observable is properly disposed of when it completes or encounters an error. In this example, the 'subscription' field holds the subscription object returned by the Subscribe method of the observable. In the StopAsync method, the 'subscription' object is disposed of to release any resources associated with it.

Try it out

Create a local KSqlDb server with Docker-desktop or use your existing cluster.

Run the application by pressing F5 in Visual Studio. The application will try to create a sensor stream and subscribe to it.

Congratulations! You've just successfully created your stream processor with ksqlDB.RestApi.Client.