Skip to content

kostiantyn-matsebora/streamstore

Repository files navigation

StreamStore

Build Quality Gate Status Coverage NuGet version (StreamStore)

Asynchronous event sourcing.

Library provides a logical layer for storing and querying events as a stream.

Heavily inspired by Greg Young's Event Store and Streamstone solutions.

Overview

Designed to be easily extended with custom database backends. Despite the fact that component implements a logical layer for storing and querying events as a stream, it does not provide functionality of DDD aggregate, such as state mutation, conflict resolution etc., but serves more as persistence layer for it.

Storage packages

Package Description Multitenancy Package
StreamStore.NoSql.Cassandra Apache Cassandra and Azure Cosmos DB for Apache Cassandra port NuGet version (StreamStore.NoSql.Cassandra)
StreamStore.Sql.PostgreSql PostgreSQL port NuGet version (StreamStore.Sql.PostgreSql)
StreamStore.Sql.Sqlite SQLite port NuGet version (StreamStore.Sql.Sqlite)
StreamStore.InMemory In-memory port is provided for testing and educational purposes only NuGet version (StreamStore.InMemory)
StreamStore.S3.AWS Amazon S3 port NuGet version (StreamStore.S3.AWS)
StreamStore.S3.B2 Backblaze B2 port NuGet version (StreamStore.S3.B2)

Concepts

About basic concepts you can read in CONCEPTS.md.

Features

The general idea is to highlight the common characteristics and features of event sourcing storage:

  • Asynchronous read and write operations.
  • Multitenancy support.
  • Automatic provisioning of storage schema.
  • Event ordering.
  • Serialization/deserialization of events.
  • Optimistic concurrency control.
  • Event duplication detection based on event ID.
  • Database agnostic test framework, including benchmarking test scenarios.
  • Binary serialization support.

Storages

Also add implementations of particular storage, such as:

Roadmap

  • Composite stream identifier
  • Custom event properties (?).
  • External transaction support (?).
  • Transactional outbox pattern implementation (?).

Installation

To install the package, you can use the following command from the command line:

  # Install StreamStore package
  
  dotnet add package StreamStore

  # Install package of particular database implementation, for instance InMemory

  dotnet add package StreamStore.InMemory

or from NuGet Package Manager Console:

   # Install StreamStore package
  Install-Package StreamStore

   # Install package of particular database implementation, for instance SQLite database backend
  Install-Package StreamStore.Sql.Sqlite

Usage

  • Register store in DI container
    services.ConfigureStreamStore(x =>              // Register StreamStore
      x.EnableSchemaProvisioning()                  // Optional. Enable schema provisioning, default: false.
      
      // Register single database implementation, see details in documentation for particular database
      x.WithSingleDatabase(c =>                 
          c.UseSqliteDatabase(x =>                  // For instance, SQLite database backend
              x.ConfigureDatabase(c =>
                c.WithConnectionString(connectionString)
              )
          )
      )
      // Or enable multitenancy, see details in documentation for particular database.
      x.WithMultitenancy(c => 
          c.UseInMemoryDatabase()                   // For instance, InMemory database backend
           .UseTenantProvider<MyTenantProvider>()   // Optional. Register your  ITenantProvider implementation.
                                                    // Required if you want schema to be provisioned for each tenant.
      )
    ); 
  • Use store in your application
   // Inject IStreamStore in your service or controller for single database implementation
    public class MyService
    {
        private readonly IStreamStore store;
  
        public MyService(IStreamStore store)
        {
            this.store = store;
        }
    }
 
  // Or IStreamStoreFactory for multitenancy
    public class MyService
    {
        private readonly IStreamStoreFactory storeFactory;
  
        public MyService(IStreamStoreFactory storeFactory)
        {
            this.storeFactory = storeFactory;
        }
    }

  // Append events to stream or create a new stream if it does not exist
  // EventObject property is where you store your event
  var events = new Event[]  {
        new Event { Id = "event-1", Timestamp = DateTime.Now, EventObject = eventObject } 
        ...
      };

  try {
    store
      .BeginWriteAsync("stream-1")       // Open stream like new since revision is not provided
         .AppendEventAsync(x =>          // Append events one by one using fluent API
            x.WithId("event-3")
             .Dated(DateTime.Now)
             .WithEvent(eventObject)
         )
        ...
        .AppendRangeAsync(events)       // Or append range of events by passing IEnumerable<Event>
      .CommitAsync(token);

  } catch (StreamConcurrencyException ex) {

    // Read from stream and implement your logic for handling optimistic concurrency exception
    await foreach(var @event in await store.BeginReadAsync("stream-1", token)) {
        ...
    }
    
    // Push result to the end of stream
    store
        .BeginWriteAsync("stream-1", ex.ActualRevision)
           .AppendEventAsync(x =>                // Append events one by one using fluent API
            x.WithId( "event-4")
             .Dated(DateTime.Now)
             .WithEvent(yourEventObject)
           )
        ...
        .CommitAsync(streamId);
  } catch (StreamLockedException ex) {
    // Some database backends like S3 do not support optimistic concurrency control
    // So, the only way to handle concurrency is to lock the stream
  }

More examples of reading and writing events you can find in test scenarios of StreamStore.Testing project.

Example

Each type of storage has its own example project, for instance, you can find an example of usage in the StreamStore.Sql.Example project.

Example projects provides a simple console application that demonstrates how to configure and use StreamStore in your application as single database or multitenancy.

Single database examples demonstrates:

  • optimistic concurrency control
  • asynchronous reading and writing operations

Multitenancy examples, in turn, demonstrates asynchronous reading and writing operations in isolated tenant storage.

For getting all running options simply run the application with --help argument.

For configuring application via configuration file, create appsettings.Development.json file.

Create your own storage implementation

How to create your own storage implementation you can find in CUSTOMIZATION.md.

Contributing

If you experience any issues, have a question or a suggestion, or if you wish to contribute, feel free to open an issue or start a discussion.

License

MIT License