Skip to content

Latest commit

 

History

History
518 lines (401 loc) · 17.1 KB

ksqldbcontext.md

File metadata and controls

518 lines (401 loc) · 17.1 KB

KSqlDbContext

KSqlDbContext provides querying capabilities, allowing developers to express complex queries against the ksql database using a higher-level query language. This enables retrieval of specific data based on filtering conditions, limiting, and other criteria in a more compile type safe way. It also exposes a method to perform operations to create records.

Creating query streams

v1.0.0

Within the ksqlDB.RestApi.Client .NET client library, the KSqlDBContext class is responsible for handling the sending of requests to the /query-stream endpoint in ksqlDB using the HTTP 2.0 protocol.

Executing pull or push queries

POST /query-stream HTTP/2.0
Accept: application/vnd.ksqlapi.delimited.v1
Content-Type: application/vnd.ksqlapi.delimited.v1

{
  "sql": "SELECT * FROM movies EMIT CHANGES;",
  "properties": {
    "auto.offset.reset": "earliest"
  }
}
using System;
using ksqlDB.RestApi.Client.KSql.Linq;
using ksqlDB.RestApi.Client.KSql.Query.Context;
using ksqlDB.RestApi.Client.Sample.Models.Movies;

var ksqlDbUrl = @"http://localhost:8088";
var contextOptions = new KSqlDBContextOptions(ksqlDbUrl);

await using var context = new KSqlDBContext(contextOptions);

using var disposable = context.CreatePushQuery<Movie>()
  .Subscribe(onNext: movie =>
  {
    Console.WriteLine($"{nameof(Movie)}: {movie.Id} - {movie.Title} - {movie.RowTime}");
    Console.WriteLine();
  }, onError: error => { Console.WriteLine($"Exception: {error.Message}"); }, onCompleted: () => Console.WriteLine("Completed"));

Creating queries

v1.0.0

To post queries to ksqlDB, you can use the ksqlDB REST API. The process of posting queries to ksqlDB is encapsulated within the KSqlDBContext in the ksqlDB.RestApi.Client .NET client library.

Post a query

POST /query HTTP/1.1
Accept: application/vnd.ksql.v1+json
Content-Type: application/vnd.ksql.v1+json

{
  "ksql": "SELECT * FROM movies EMIT CHANGES;",
  "streamsProperties": {
    "ksql.streams.auto.offset.reset": "earliest"
  }
}
using System;
using ksqlDB.RestApi.Client.KSql.Linq;
using ksqlDB.RestApi.Client.KSql.Query.Context;
using ksqlDB.RestApi.Client.Sample.Models.Movies;

var ksqlDbUrl = @"http://localhost:8088";
var contextOptions = new KSqlDBContextOptions(ksqlDbUrl);

await using var context = new KSqlDBContext(contextOptions);

using var disposable = context.CreateQuery<Movie>()
  .Subscribe(onNext: movie =>
  {
    Console.WriteLine($"{nameof(Movie)}: {movie.Id} - {movie.Title} - {movie.RowTime}");
    Console.WriteLine();
  }, onError: error => { Console.WriteLine($"Exception: {error.Message}"); }, onCompleted: () => Console.WriteLine("Completed"));

⚠️ In version 6.0.0, CreateQuery has been merged into CreatePushQuery.

TFM netstandard 2.0 (.Net Framework, NetCoreApp 2.0 etc.)

The lack of support for HTTP 2.0 in netstandard 2.0 prevents the exposure of IKSqlDBContext.CreatePushQuery<TEntity> in the current version. To address this limitation, IKSqlDBContext.CreateQuery<TEntity> was introduced as an alternative solution utilizing HTTP 1.1 to provide the same functionality.

Basic auth

v1.0.0

In ksqlDB you can use the Http-Basic authentication mechanism:

using ksqlDB.RestApi.Client.KSql.Query.Context.Options;

string ksqlDbUrl = @"http://localhost:8088";

string userName = "fred";
string password = "letmein";

var options = new KSqlDbContextOptionsBuilder()
  .UseKSqlDb(ksqlDbUrl)
  .SetBasicAuthCredentials(userName, password)
  .Options;

await using var context = new KSqlDBContext(options);

See also how to intercept http requests

KSqlDbServiceCollectionExtensions - AddDbContext and AddDbContextFactory

v1.4.0

  • AddDbContext - registers the given ksqlDB context as a service in the IServiceCollection
  • AddDbContextFactory - registers the given ksqlDB context factory as a service in the IServiceCollection
using ksqlDb.RestApi.Client.DependencyInjection;
using ksqlDB.RestApi.Client.KSql.Query.Context;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
var serviceCollection = new ServiceCollection();

var ksqlDbUrl = @"http://localhost:8088";

serviceCollection.AddDbContext<ApplicationKSqlDbContext, IApplicationKSqlDbContext>(options =>
  options.UseKSqlDb(ksqlDbUrl), contextLifetime: ServiceLifetime.Transient);

serviceCollection.AddDbContextFactory<IApplicationKSqlDbContext>(factoryLifetime: ServiceLifetime.Scoped);
internal class ApplicationKSqlDbContext : KSqlDBContext, Program.IApplicationKSqlDbContext
{
  public ApplicationKSqlDbContext(string ksqlDbUrl, ILoggerFactory loggerFactory = null)
    : base(ksqlDbUrl, loggerFactory)
  {
  }

  public ApplicationKSqlDbContext(KSqlDBContextOptions contextOptions, ILoggerFactory loggerFactory = null)
    : base(contextOptions, loggerFactory)
  {
  }

  public ksqlDB.RestApi.Client.KSql.Linq.IQbservable<Movie> Movies => CreatePushQuery<Movie>();
}

public interface IApplicationKSqlDbContext : IKSqlDBContext
{
  ksqlDB.RestApi.Client.KSql.Linq.IQbservable<Movie> Movies { get; }
}
public record Movie
{
  public int Id { get; set; }
  public string Title { get; set; } = null!;
  public int Release_Year { get; set; }
}

IKSqlDBContextFactory

v1.4.0

A factory for creating derived KSqlDBContext instances.

var contextFactory = serviceCollection.BuildServiceProvider().GetRequiredService<IKSqlDBContextFactory<IKSqlDBContext>>();

var context = contextFactory.Create();

Logging info and ConfigureKSqlDb

v1.2.0

Below code demonstrates two new concepts. Logging and registration of services.

In this example, we use the Microsoft.Extensions.Logging library to add console and debug logging providers. You can also add additional providers like file-based logging or third-party providers.

In .NET, the ConfigureServices extension method is a commonly used method to configure services, including 3rd party services like KSqlDbContext, in the dependency injection container. The ConfigureKSqlDb extension method is used to register ksqlDB-related service implementations with the IServiceCollection.

KSqlDbServiceCollectionExtensions.ConfigureKSqlDb - registers the following dependencies:

  • IKSqlDBContext with Scoped ServiceLifetime. Can be altered with contextLifetime parameter.
  • IKSqlDbRestApiClient with Scoped ServiceLifetime.
  • IHttpClientFactory with Singleton ServiceLifetime.
  • KSqlDBContextOptions with Singleton ServiceLifetime.
<PackageReference Include="Microsoft.Extensions.Hosting" Version="5.0.0" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="5.0.1" />
<PackageReference Include="Microsoft.Extensions.Logging" Version="5.0.0" />
<PackageReference Include="Microsoft.Extensions.Logging.Console" Version="5.0.0" />
using System.Threading.Tasks;
using ksqlDB.Api.Client.Samples.HostedServices;
using ksqlDb.RestApi.Client.DependencyInjection;
using ksqlDB.RestApi.Client.KSql.Query.Options;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;

namespace ksqlDB.Api.Client.Samples
{
  public class Program
  {
    public static async Task Main(string[] args)
    {
      await CreateHostBuilder(args).RunConsoleAsync();
    }

    public static IHostBuilder CreateHostBuilder(string[] args) =>
      Host.CreateDefaultBuilder(args)
        .ConfigureLogging((hostingContext, logging) =>
                          {
                            logging.AddConsole();
                            logging.AddDebug();
                          })
        .ConfigureServices((hostContext, serviceCollection) =>
                           {
                             var ksqlDbUrl = @"http://localhost:8088";

                             var setupAction = setupParameters =>
                                               {
                                                   setupParameters.SetAutoOffsetReset(AutoOffsetReset.Earliest);
                                               };

                             serviceCollection.ConfigureKSqlDb(ksqlDbUrl, setupAction);

                             serviceCollection.AddHostedService<Worker>();
                           });
  }
}

appsettings.json

{
  "Logging": {
    "LogLevel": {
      "Default": "Information",
      "Microsoft": "Warning",
      "ksqlDb.RestApi.Client": "Information" // "Debug"
    }
  }
}

In .NET, a hosted service is a long-running background task or service that is managed by the .NET runtime environment. It is typically used for performing asynchronous or recurring operations, such as processing queues, executing scheduled tasks, or handling background data processing.

The example demonstrates how to inject dependencies related to ksqlDB operations.

using System;
using System.Threading;
using System.Threading.Tasks;
using ksqlDB.Api.Client.Samples.Models.Movies;
using ksqlDB.RestApi.Client.KSql.Linq;
using ksqlDB.RestApi.Client.KSql.Query.Context;
using ksqlDB.RestApi.Client.KSql.Query.Options;
using ksqlDB.RestApi.Client.KSql.RestApi;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;

public class Worker : IHostedService, IDisposable
{
  private readonly IKSqlDBContext context;
  private readonly IKSqlDbRestApiClient restApiClient;
  private readonly ILogger logger;

  public Worker(IKSqlDBContext context, IKSqlDbRestApiClient restApiClient, ILoggerFactory loggerFactory)
  {
    this.context = context ?? throw new ArgumentNullException(nameof(context));
    this.restApiClient = restApiClient ?? throw new ArgumentNullException(nameof(restApiClient));

    logger = loggerFactory.CreateLogger<Worker>();
  }

  private Subscription subscription;

  public async Task StartAsync(CancellationToken cancellationToken)
  {
    logger.LogInformation("Starting");

    subscription = await context.CreatePushQuery<Movie>()
      .WithOffsetResetPolicy(AutoOffsetReset.Earliest)
      .SubscribeAsync(
        onNext: movie => { },
        onError: e => { },
        onCompleted: () => { },
        cancellationToken: cancellationToken);
  }

  public Task StopAsync(CancellationToken cancellationToken)
  {
    logger.LogInformation("Stopping.");

    return Task.CompletedTask;
  }

  public void Dispose()
  {
    using (subscription) { }
  }
}

Add and SaveChangesAsync

v1.3.0

By leveraging the methods Add and SaveChangesAsync provided by the IKSqlDBContext interface, you have the capability to add multiple entities to the context and asynchronously save them in a single request, also known as batch inserts. It's important to note that internally, this functionality does not include an entity change tracker. Instead, it caches snapshots of the insert statements, which are executed when the SaveChangesAsync method is invoked.

private static async Task AddAndSaveChangesAsync(IKSqlDBContext context)
{
  context.Add(new Movie { Id = 1 });
  context.Add(new Movie { Id = 2 });

  var saveResponse = await context.SaveChangesAsync();
}

Include read-only properties for inserts

v1.3.1

  • Inserts - include read-only properties configuration

The default convention is to include all public instance properties and fields that are writable when generating the "INSERT INTO" statement.

public record Foo
{
  public Foo(string name)
  {
    Name = name;
  }

  public string Name { get; }
  public int Count { get; set; }
}
var insertProperties = new InsertProperties
                       {
                         IncludeReadOnlyProperties = true
                       };

await using KSqlDBContext context = new KSqlDBContext(@"http://localhost:8088");

var model = new Foo("Bar") {
  Count = 3
};

context.Add(model, insertProperties);

var responseMessage = await context.SaveChangesAsync();

KSqlDbContextOptionsBuilder

KSqlDbContextOptionsBuilder provides a fluent API that allows you to configure various aspects of the ksqlDB context, such as the connection string, processing guarantee, and other options.

⚠When creating KSqlDBContextOptions using a constructor or through KSqlDbContextOptionsBuilder, the default behavior is to set the auto.offset.reset property to "earliest."

public static KSqlDBContextOptions CreatePushQueryOptions(string ksqlDbUrl)
{
  var contextOptions = new KSqlDbContextOptionsBuilder()
    .UseKSqlDb(ksqlDbUrl)
    .SetupPushQuery(options =>
    {
      options.AutoOffsetReset = AutoOffsetReset.Latest;
    })
    .Options;

  return contextOptions;
}

⚠ In version 6.0.0 SetupQuery was unified with SetupQueryStream. Subsequently SetupQueryStream was renamed to SetupPushQuery to align with the nomenclature of SetupPullQuery.

SetupPullQuery

v6.0.0

  • configures the parameters for setting up a pull query
contextOptions
  .SetupPullQuery(options =>
  {
    options[KSqlDbConfigs.KsqlQueryPullTableScanEnabled] = "true";
  })

Setting processing guarantee with KSqlDbContextOptionsBuilder

v1.0.0

When using the ksqlDB.RestApi.Client, you have the ability to configure the processing guarantee for your queries. This can be done by making use of the SetProcessingGuarantee method from the KSqlDbContextOptionsBuilder class, which allows you to configure the processing.guarantee streams property. Enable exactly_once_v2 or at_least_once semantics:

using ksqlDB.RestApi.Client.KSql.Query.Context;
using ksqlDB.RestApi.Client.KSql.Query.Context.Options;
using ksqlDB.RestApi.Client.KSql.Query.Options;
var ksqlDbUrl = @"http://localhost:8088";

var contextOptions = new KSqlDbContextOptionsBuilder()
  .UseKSqlDb(ksqlDbUrl)
  .SetProcessingGuarantee(ProcessingGuarantee.AtLeastOnce)
  .Options;

await using var context = new KSqlDBContext(contextOptions);

The ProcessingGuarantee enum offers three options: ExactlyOnce, ExactlyOnceV2 and AtLeastOnce.

Setting formatting for identifiers

v3.6.0

When using the ksqlDB.RestApi.Client, you have the ability to configure the formatting for identifiers in your statements. This can be done by making use of the SetIdentifierEscaping method from the KSqlDbContextOptionsBuilder class. You can choose from these options:

  • Never - the default option where identifiers are not modified
  • Keywords - when an identifier is a reserved keyword it is escaped using backticks `
  • Always - all identifiers are escaped using backticks `

Escaping options can also be set on types that derive from IEntityCreationProperties. When you create a stream or table that escapes identifiers your insert statements must also use the same escaping option to work

See here for additional details: https://docs.ksqldb.io/en/latest/reference/sql/syntax/lexical-structure/#identifiers

using ksqlDB.RestApi.Client.KSql.Query.Context;
using ksqlDB.RestApi.Client.KSql.Query.Context.Options;
using ksqlDB.RestApi.Client.KSql.Query.Options;

var ksqlDbUrl = @"http://localhost:8088";

var contextOptions = new KSqlDbContextOptionsBuilder()
  .UseKSqlDb(ksqlDbUrl)
  .SetIdentifierEscaping(IdentifierEscaping.Always)
  .Options;

await using var context = new KSqlDBContext(contextOptions);

The resulting statements will escape identifiers like this when you use the Always option:

SELECT `Message`, `Id`, `Values`
  FROM `Tweets`
 WHERE `Message` != 'Hello world' OR Id = 1
  EMIT CHANGES;

INSERT INTO `Message` (`Message`, `Id`, `Values`) VALUES ('Hello', 42, '123, abc');

For the Keywords option only ksql keywords will be escaped:

SELECT MESSAGE, ID, `Values`
  FROM TWEETS
 WHERE MESSAGE != 'Hello world' OR ID = 1
  EMIT CHANGES;

INSERT INTO Message (Message, Id, `Values`) VALUES ('Hello', 42, '123, abc');

InsertIntoAsync

v5.1.0

Stream the result of the SELECT query into an existing stream and its underlying topic.

var response = await context.CreatePushQuery<Movie>("from_stream")
  .Where(c => c.Title != "Apocalypse now")
  .InsertIntoAsync("stream_name");

var responses = await response.ToStatementResponsesAsync();
Console.WriteLine($"QueryId: {responses[0].CommandStatus?.QueryId}");
INSERT INTO stream_name
SELECT *
  FROM from_stream
 WHERE Title != 'Apocalypse now'
  EMIT CHANGES;

Alternatively an optional query ID can be used for INSERT INTO queries:

string queryId = "insert_query_123";

var response = await context.CreatePushQuery<Movie>("from_stream")
  .Where(c => c.Title != "Apocalypse now")
  .InsertIntoAsync("stream_name", queryId);
INSERT INTO stream_name
  WITH( QUERY_ID = 'insert_query_123' )
SELECT *
  FROM from_stream
 WHERE Title != 'Apocalypse now'
  EMIT CHANGES;