Skip to content

Latest commit

 

History

History
160 lines (139 loc) · 5.37 KB

README.md

File metadata and controls

160 lines (139 loc) · 5.37 KB

.NET Nuget (with prereleases)

C# SDK for Iggy

Getting Started

The whole SDK revolves around IMessageStream interface to create an instance of it, use following code

var bus = MessageStreamFactory.CreateMessageStream(x =>
{
    x.Protocol = Protocol.Http;
    x.BaseAddress = "http://localhost:8080";
});

Currently supported transfer protocols

  • TCP
  • HTTP

Creating first stream

In order to create stream call CreateStreamAsync method

bus.CreateStreamAsync(new StreamRequest
{
    Name = "First Stream",
    StreamId = 1,
});

Every stream has a topic to which you can broadcast messages, to create a topic use CreateTopicAsync method

bus.CreateTopicAsync(streamId, new TopicRequest
{
    Name = "First Topic",
    PartitionsCount = 3,
    TopicId = 1
});

To send messages you can use SendMessagesAsync method.

var messages = new List<Message>();
await bus.SendMessagesAsync(streamId, topicId, new MessageSendRequest
{
    Messages = messages,
    Partitioning = Partitioning.PartitionId(partitionId)
});

The Message struct has two fields Id and Payload

struct Message
{
    public required Guid Id { get; init; }
    public required byte[] Payload { get; init; }
}

Polling messages is done with PollMessagesAsync

var messages = await bus.PollMessagesAsync(new MessageFetchRequest
{
    StreamId = streamId,
    TopicId = topicId,
    Consumer = Consumer.New(consumerId),
    Count = 1,
    PartitionId = partitionId,
    PollingStrategy = MessagePolling.Next,
    Value = 0,
    AutoCommit = true
});

With version 0.0.5 a new api for PollMessagesAsync and SendMessagesAsync has been added, that allows user to provide custom serializer/deserializer.

SendMessages:

Func<Product, byte[]> serialier = // provide your own serializer.
var messages = new List<Product>();
await bus.SendMessagesAsync<Product>(streamId, topicId, Partitioning.PartitionId(partitionId), messages, serializer);

PollMessages:

Func<byte[], Product> deserializer = // provide your own deserializer.
var messages = await bus.PollMessagesAsync<Product>(new MessageFetchRequest<Product>
{
    StreamId = streamId,
    TopicId = topicId,
    Consumer = Consumer.New(consumerId),
    Count = 1,
    PartitionId = partitionId,
    PollingStrategy = MessagePolling.Next,
    Value = 0,
    AutoCommit = true
}, deserializer);

In version 0.0.6 an optional encryptor/decryptor parameter to SendMessagesAsync and PollMessagesAsync has been added.

Func<byte[], byte[]> encryptor = payload =>
{
    //your encryption logic goes here...
};
await bus.SendMessagesAsync<Product>(streamId, topicId, Partitioning.PartitionId(partitionId), messages, serializer, encryptor);

Func<byte[], byte[]> decryptor = payload =>
{
    //your decryption logic goes here...
};
var messages = await bus.PollMessagesAsync(new MessageFetchRequest
{
    Consumer = Consumer.New(1),
    Count = 1,
    TopicId = topicId,
    StreamId = streamId,
    PartitionId = partitionId,
    PollingStrategy = PollingStrategy.Next(),
    AutoCommit = true
}, deserializer, decryptor);

As of version 0.0.7 optional headers has been added to SendMessagesAsync, You can create an header object with following code:

var headers = new Dictionary<HeaderKey, HeaderValue>();
headers.Add(new HeaderKey { Value = "key_1".ToLower() }, HeaderValue.String("test-value-1"));
headers.Add(new HeaderKey { Value = "key_2".ToLower() }, HeaderValue.Int32(69));
headers.Add(new HeaderKey { Value = "key_3".ToLower() }, HeaderValue.Float32(420.69f));
headers.Add(new HeaderKey { Value = "key_4".ToLower() }, HeaderValue.Bool(true));
headers.Add(new HeaderKey { Value = "key_5".ToLower() }, HeaderValue.Raw(byteArray));
headers.Add(new HeaderKey { Value = "key_6".ToLower() }, HeaderValue.Int128(new Int128(6969696969, 420420420)));
headers.Add(new HeaderKey { Value = "key7".ToLower() }, HeaderValue.Guid(Guid.NewGuid()));

and then simply pass them as argument to SendMessagesAsync function

await bus.SendMessagesAsync<Product>(streamId, topicId, Partitioning.PartitionId(partitionId), messages, serializer, encryptor, headers);

It is worth noting that every method will throw an InvalidResponseException when encountering an error.

If you register IMessageStream in a dependency injection container, you will have access to interfaces that encapsulate smaller parts of the system IStreamClient ITopicClient IMessageClient IOffsetClient IConsumerGroupClient IUtilsClient IPartitionClient

For more information about how Iggy works check its documentation

Producer / Consumer Sample

248472612-0b894c86-8fb5-4e45-b705-5688a9fdf9e1.mp4

To run the samples, first get Iggy, Run the server with cargo r --bin server, then get the SDK, cd into Iggy_SDK and run following commands: dotnet run -c Release --project Iggy_Sample_Producer for producer, dotnet run -c Release --project Iggy_Sample_Consumer for consumer.

TODO

  • Add support for ASP.NET Core Dependency Injection