Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow connection to be passed to constructor #4

Merged
merged 1 commit into from
Jun 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 45 additions & 13 deletions Npgmq.Example/Program.cs
Original file line number Diff line number Diff line change
@@ -1,29 +1,61 @@
using System.Reflection;
using Microsoft.Extensions.Configuration;
using Npgmq;
using Npgsql;

var configuration = new ConfigurationBuilder()
.AddEnvironmentVariables()
.AddUserSecrets(Assembly.GetExecutingAssembly())
.Build();

var npgmq = new NpgmqClient(configuration.GetConnectionString("ExampleDB")!);
var connectionString = configuration.GetConnectionString("ExampleDB")!;

await npgmq.InitAsync();
await npgmq.CreateQueueAsync("example_queue");

var msgId = await npgmq.SendAsync("example_queue", new MyMessageType
// Test Npgmq with connection string
{
Foo = "Test",
Bar = 123
});
Console.WriteLine($"Sent message with id {msgId}");
var npgmq = new NpgmqClient(connectionString);

await npgmq.InitAsync();
await npgmq.CreateQueueAsync("example_queue");

var msgId = await npgmq.SendAsync("example_queue", new MyMessageType
{
Foo = "Connection string test",
Bar = 1
});
Console.WriteLine($"Sent message with id {msgId}");

var msg = await npgmq.ReadAsync<MyMessageType>("example_queue");
if (msg != null)
{
Console.WriteLine($"Read message with id {msg.MsgId}: Foo = {msg.Message?.Foo}, Bar = {msg.Message?.Bar}");
await npgmq.ArchiveAsync("example_queue", msg.MsgId);
}
}

var msg = await npgmq.ReadAsync<MyMessageType>("example_queue");
if (msg != null)
// Test Npgmq with connection object and a transaction
{
Console.WriteLine($"Read message with id {msg.MsgId}: Foo = {msg.Message?.Foo}, Bar = {msg.Message?.Bar}");
await npgmq.ArchiveAsync("example_queue", msg.MsgId);
await using var connection = new NpgsqlConnection(connectionString);
await connection.OpenAsync();
var npgmq = new NpgmqClient(connection);

await using (var tx = connection.BeginTransaction())
{
var msgId = await npgmq.SendAsync("example_queue", new MyMessageType
{
Foo = "Connection object test",
Bar = 2
});
Console.WriteLine($"Sent message with id {msgId}");

await tx.CommitAsync();
}

var msg = await npgmq.ReadAsync<MyMessageType>("example_queue");
if (msg != null)
{
Console.WriteLine($"Read message with id {msg.MsgId}: Foo = {msg.Message?.Foo}, Bar = {msg.Message?.Bar}");
await npgmq.ArchiveAsync("example_queue", msg.MsgId);
}
}

internal class MyMessageType
Expand Down
89 changes: 86 additions & 3 deletions Npgmq.Test/NpgmqClientTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,8 @@ public NpgmqClientTest()
.Build();

_connectionString = configuration.GetConnectionString("Test")!;

_connection = new NpgsqlConnection(_connectionString);
_sut = new NpgmqClient(_connectionString);
_sut = new NpgmqClient(_connection);
}

public void Dispose()
Expand Down Expand Up @@ -545,6 +544,27 @@ public async Task ReadBatchAsync_should_return_list_of_messages()
});
}

[Fact]
public async Task ConnectionString_should_be_used_to_connect()
{
// Arrange
await ResetTestQueueAsync();
var sut2 = new NpgmqClient(_connectionString);

// Act
var msgId = await sut2.SendAsync(TestQueueName, new TestMessage
{
Foo = 123,
Bar = "Test",
Baz = DateTimeOffset.Parse("2023-09-01T01:23:45-04:00")
});

// Assert
Assert.Equal(1, await _connection.ExecuteScalarAsync<long>($"SELECT count(*) FROM pgmq.q_{TestQueueName};"));
Assert.Equal(1, await _connection.ExecuteScalarAsync<long>($"SELECT count(*) FROM pgmq.q_{TestQueueName} WHERE vt <= CURRENT_TIMESTAMP;"));
Assert.Equal(msgId, await _connection.ExecuteScalarAsync<long>($"SELECT msg_id FROM pgmq.q_{TestQueueName} LIMIT 1;"));
}

[Fact]
public async Task SendAsync_should_add_message()
{
Expand All @@ -565,6 +585,69 @@ public async Task SendAsync_should_add_message()
Assert.Equal(msgId, await _connection.ExecuteScalarAsync<long>($"SELECT msg_id FROM pgmq.q_{TestQueueName} LIMIT 1;"));
}

[Fact]
public async Task SendAsync_should_commit_with_database_transaction()
{
// Arrange
await ResetTestQueueAsync();
await using var connection2 = new NpgsqlConnection(_connectionString);
await connection2.OpenAsync();

// Act
await using var transaction = await _connection.BeginTransactionAsync();
var msgId = await _sut.SendAsync(TestQueueName, new TestMessage
{
Foo = 123,
Bar = "Test",
Baz = DateTimeOffset.Parse("2023-09-01T01:23:45-04:00")
});

// Assert
Assert.Equal(1, await _connection.ExecuteScalarAsync<long>($"SELECT count(*) FROM pgmq.q_{TestQueueName};"));
Assert.Equal(0, await connection2.ExecuteScalarAsync<long>($"SELECT count(*) FROM pgmq.q_{TestQueueName};"));
Assert.Equal(0, await _connection.ExecuteScalarAsync<long>($"SELECT count(*) FROM pgmq.q_{TestQueueName} WHERE vt <= CURRENT_TIMESTAMP;"));
Assert.Equal(msgId, await _connection.ExecuteScalarAsync<long>($"SELECT msg_id FROM pgmq.q_{TestQueueName} LIMIT 1;"));

// Act
await transaction.CommitAsync();

// Assert
Assert.Equal(1, await connection2.ExecuteScalarAsync<long>($"SELECT count(*) FROM pgmq.q_{TestQueueName};"));
Assert.Equal(1, await connection2.ExecuteScalarAsync<long>($"SELECT count(*) FROM pgmq.q_{TestQueueName} WHERE vt <= CURRENT_TIMESTAMP;"));
Assert.Equal(msgId, await connection2.ExecuteScalarAsync<long>($"SELECT msg_id FROM pgmq.q_{TestQueueName} LIMIT 1;"));
}

[Fact]
public async Task SendAsync_should_rollback_with_database_transaction()
{
// Arrange
await ResetTestQueueAsync();
await using var connection2 = new NpgsqlConnection(_connectionString);
await connection2.OpenAsync();

// Act
await using var transaction = await _connection.BeginTransactionAsync();
var msgId = await _sut.SendAsync(TestQueueName, new TestMessage
{
Foo = 123,
Bar = "Test",
Baz = DateTimeOffset.Parse("2023-09-01T01:23:45-04:00")
});

// Assert
Assert.Equal(1, await _connection.ExecuteScalarAsync<long>($"SELECT count(*) FROM pgmq.q_{TestQueueName};"));
Assert.Equal(0, await connection2.ExecuteScalarAsync<long>($"SELECT count(*) FROM pgmq.q_{TestQueueName};"));
Assert.Equal(0, await _connection.ExecuteScalarAsync<long>($"SELECT count(*) FROM pgmq.q_{TestQueueName} WHERE vt <= CURRENT_TIMESTAMP;"));
Assert.Equal(msgId, await _connection.ExecuteScalarAsync<long>($"SELECT msg_id FROM pgmq.q_{TestQueueName} LIMIT 1;"));

// Act
await transaction.RollbackAsync();

// Assert
Assert.Equal(0, await _connection.ExecuteScalarAsync<long>($"SELECT count(*) FROM pgmq.q_{TestQueueName};"));
Assert.Equal(0, await connection2.ExecuteScalarAsync<long>($"SELECT count(*) FROM pgmq.q_{TestQueueName};"));
}

[Fact]
public async Task SendAsync_should_add_string_message()
{
Expand Down Expand Up @@ -640,4 +723,4 @@ public async Task SetVtAsync_should_change_vt_for_a_message()
Assert.NotNull(message2);
Assert.Equal(msgId, message2.MsgId);
}
}
}
1 change: 1 addition & 0 deletions Npgmq.sln
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution
ProjectSection(SolutionItems) = preProject
LICENSE = LICENSE
README.md = README.md
global.json = global.json
EndProjectSection
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "workflows", "workflows", "{8C37002D-05C6-4B1F-B4FC-C2F45C5E5328}"
Expand Down
Loading