Skip to content
This repository has been archived by the owner on Jun 16, 2022. It is now read-only.

Commit

Permalink
Added blocking tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
DJGosnell committed Oct 4, 2016
1 parent 3010cb0 commit 42d8b9a
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 12 deletions.
12 changes: 10 additions & 2 deletions src/DtronixMessageQueue.Tests.Performance/RpcPerformanceTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ public RpcPerformanceTest(string[] args) {

RpcSingleProcessTest(200000, 4, config, RpcTestType.Await);

RpcSingleProcessTest(100, 4, config, RpcTestType.Block);

RpcSingleProcessTest(10000, 4, config, RpcTestType.Return);

RpcSingleProcessTest(10000, 4, config, RpcTestType.Exception);
Expand Down Expand Up @@ -76,12 +78,17 @@ private void RpcSingleProcessTest(int runs, int loops, RpcConfig config, RpcTest
sw.Restart();
for (var i = 0; i < runs; i++) {
switch (type) {

case RpcTestType.Block:
service.TestNoReturnBlock();
break;

case RpcTestType.NoRetrun:
service.TestNoReturn();
break;

case RpcTestType.Await:
service.TestNoReturnBlock();
service.TestNoReturnAwait();
break;

case RpcTestType.Return:
Expand Down Expand Up @@ -133,6 +140,7 @@ enum RpcTestType {
NoRetrun,
Return,
Exception,
Await
Await,
Block
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Xml.Serialization;
using DtronixMessageQueue.Rpc;

namespace DtronixMessageQueue.Tests.Performance.Services.Server {
Expand All @@ -28,13 +29,24 @@ public void TestNoReturn() {

}

public async void TestNoReturnBlock() {
public async void TestNoReturnAwait() {
var number = Interlocked.Increment(ref call_count);
await Task.Delay(1000);

VerifyComplete();

}


public void TestNoReturnBlock() {
Task.Factory.StartNew(() => {
var number = Interlocked.Increment(ref call_count);

Thread.Sleep(1000);
VerifyComplete();
}, TaskCreationOptions.LongRunning);
}

public int TestIncrement() {
call_count++;
VerifyComplete();
Expand Down Expand Up @@ -69,6 +81,7 @@ private void VerifyComplete() {

internal interface ITestService : IRemoteService<SimpleRpcSession, RpcConfig> {
void TestNoReturn();
void TestNoReturnAwait();
void TestNoReturnBlock();
int TestIncrement();
void TestSetup(int calls);
Expand Down
8 changes: 4 additions & 4 deletions src/DtronixMessageQueue/MqSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ protected override void HandleIncomingBytes(byte[] buffer) {


if (inbox_task == null || inbox_task.IsCompleted) {
inbox_task = Task.Factory.StartNew(ProcessIncomingQueue);
inbox_task = Task.Run((Action)ProcessIncomingQueue);
}
}

Expand Down Expand Up @@ -144,7 +144,7 @@ private void ProcessOutbox() {

lock (outbox_lock) {
if (outbox.IsEmpty == false) {
outbox_task = Task.Factory.StartNew(ProcessOutbox);
outbox_task = Task.Run((Action)ProcessOutbox);
}
}

Expand Down Expand Up @@ -221,7 +221,7 @@ private void ProcessIncomingQueue() {

lock (inbox_lock) {
if (inbox_bytes.IsEmpty == false) {
inbox_task = Task.Factory.StartNew(ProcessIncomingQueue);
inbox_task = Task.Run((Action)ProcessIncomingQueue);
}
}

Expand Down Expand Up @@ -302,7 +302,7 @@ public void Send(MqMessage message) {
}

if (outbox_task == null || outbox_task.IsCompleted) {
outbox_task = Task.Factory.StartNew(ProcessOutbox);
outbox_task = Task.Run((Action)ProcessOutbox);
}


Expand Down
4 changes: 2 additions & 2 deletions src/DtronixMessageQueue/Rpc/RpcCallMessageHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public override bool HandleMessage(MqMessage message) {
private void ProcessRpcCall(MqMessage message, RpcCallMessageType message_type) {

// Execute the processing on the worker thread.
Task.Factory.StartNew(() => {
Task.Run(() => {

// Retrieve a serialization cache to work with.
var serialization = Session.SerializationCache.Get(message);
Expand Down Expand Up @@ -247,7 +247,7 @@ private void ProcessRpcCall(MqMessage message, RpcCallMessageType message_type)
private void ProcessRpcReturn(MqMessage message) {

// Execute the processing on the worker thread.
Task.Factory.StartNew(() => {
Task.Run(() => {

// Retrieve a serialization cache to work with.
var serialization = Session.SerializationCache.Get(message);
Expand Down
6 changes: 3 additions & 3 deletions src/DtronixMessageQueue/Rpc/RpcSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ protected override void ProcessCommand(MqFrame frame) {
}

// Alert the server that this session is ready for usage.
Task.Factory.StartNew(() => {
Task.Run(() => {
Ready?.Invoke(this, new SessionEventArgs<TSession, TConfig>((TSession) this));
});

Expand Down Expand Up @@ -211,7 +211,7 @@ protected override void ProcessCommand(MqFrame frame) {
Close(SocketCloseReason.AuthenticationFailure);
} else {
// Alert the server that this session is ready for usage.
Task.Factory.StartNew(() => {
Task.Run(() => {
Ready?.Invoke(this, new SessionEventArgs<TSession, TConfig>((TSession) this));
});
}
Expand Down Expand Up @@ -240,7 +240,7 @@ protected override void ProcessCommand(MqFrame frame) {
AuthenticationResult?.Invoke(this, auth_args);

// Alert the client that this session is ready for usage.
Task.Factory.StartNew(() => {
Task.Run(() => {
Ready?.Invoke(this, new SessionEventArgs<TSession, TConfig>((TSession) this));
});

Expand Down

0 comments on commit 42d8b9a

Please sign in to comment.