-
Notifications
You must be signed in to change notification settings - Fork 3
/
Given_an_inmemory_kafka_test_framework_and_a_message_subscription.When_the_client_sends_the_message_subscribed.cs
135 lines (118 loc) · 4.86 KB
/
Given_an_inmemory_kafka_test_framework_and_a_message_subscription.When_the_client_sends_the_message_subscribed.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
using System.Threading.Tasks;
using FluentAssertions;
using Kafka.Protocol;
using Xunit;
using Xunit.Abstractions;
namespace Kafka.TestFramework.Tests
{
public partial class Given_an_inmemory_kafka_test_framework_and_a_message_subscription
{
public partial class
When_the_client_sends_the_message_subscribed :
TestSpecificationAsync
{
private readonly InMemoryKafkaTestFramework _testServer =
KafkaTestFramework.InMemory();
private ResponsePayload _response;
public When_the_client_sends_the_message_subscribed(
ITestOutputHelper testOutputHelper)
: base(testOutputHelper)
{
}
protected override Task GivenAsync()
{
_testServer.On<ApiVersionsRequest, ApiVersionsResponse>(
request => request.Respond()
.WithThrottleTimeMs(Int32.From(100))
.WithApiKeysCollection(
key => key
.WithApiKey(FetchRequest.ApiKey)
.WithMinVersion(FetchRequest.MinVersion)
.WithMaxVersion(FetchRequest.MaxVersion)));
return Task.CompletedTask;
}
protected override async Task WhenAsync()
{
await using (_testServer.Start()
.ConfigureAwait(false))
{
var client = await _testServer
.CreateRequestClientAsync()
.ConfigureAwait(false);
var message = new ApiVersionsRequest(ApiVersionsRequest.MaxVersion);
var requestPayload = new RequestPayload(
new RequestHeader(message.HeaderVersion)
.WithRequestApiKey(ApiVersionsRequest.ApiKey)
.WithRequestApiVersion(
message.Version)
.WithCorrelationId(Int32.From(12)),
message
);
await client
.SendAsync(requestPayload)
.ConfigureAwait(false);
_response = await client
.ReadAsync(requestPayload)
.ConfigureAwait(false);
}
}
[Fact]
public void
The_subscription_should_receive_a_api_versions_response()
{
_response.Message.Should().BeOfType<ApiVersionsResponse>();
}
[Fact]
public void
The_subscription_should_receive_a_api_versions_response_with_correlation_id()
{
_response.Header.CorrelationId.Should().Be(Int32.From(12));
}
[Fact]
public void
The_subscription_should_receive_a_api_versions_response_with_throttle_time()
{
_response.Message.As<ApiVersionsResponse>()
.ThrottleTimeMs.Should().Be(Int32.From(100));
}
[Fact]
public void
The_subscription_should_receive_a_api_versions_response_with_one_api_key()
{
_response.Message.As<ApiVersionsResponse>()
.ApiKeysCollection.Should().HaveCount(1);
}
[Fact]
public void
The_subscription_should_receive_a_api_versions_response_with_fetch_request_api_key()
{
_response.Message.As<ApiVersionsResponse>()
.ApiKeysCollection.Value.Should().ContainKey(FetchRequest.ApiKey);
}
[Fact]
public void
The_subscription_should_receive_a_api_versions_response_with_fetch_request_api_index()
{
_response.Message.As<ApiVersionsResponse>()
.ApiKeysCollection[FetchRequest.ApiKey].ApiKey.Should()
.Be(FetchRequest.ApiKey);
}
[Fact]
public void
The_subscription_should_receive_a_api_versions_response_with_fetch_request_api_min_version()
{
_response.Message.As<ApiVersionsResponse>()
.ApiKeysCollection[FetchRequest.ApiKey].MinVersion.Should()
.Be(FetchRequest.MinVersion);
}
[Fact]
public void
The_subscription_should_receive_a_api_versions_response_with_fetch_request_api_max_version()
{
_response.Message.As<ApiVersionsResponse>()
.ApiKeysCollection[FetchRequest.ApiKey].MaxVersion.Should()
.Be(FetchRequest.MaxVersion);
}
}
}
}