Skip to content
This repository has been archived by the owner on Jun 16, 2022. It is now read-only.

Commit

Permalink
Fixed bug with MqMessageReader.NextFrame where the message position w…
Browse files Browse the repository at this point in the history
…ould not be updated.

Added MqMessageReader.ReadToEnd to read remaining bytes in the message.
Added Position and Length properties to MqMessageReader.
Added tests for the new methods and properties.
Removed redundant actions in EnsureBuffer.  Now will only check and throw if there is not enough buffer space.
  • Loading branch information
DJGosnell committed Sep 11, 2016
1 parent 8311936 commit 0f42224
Show file tree
Hide file tree
Showing 2 changed files with 157 additions and 41 deletions.
78 changes: 78 additions & 0 deletions DtronixMessageQueue.Tests/MqMessageWriterReaderTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,84 @@ public void MessageReader_peeks_char() {
Assert.True(message_reader.IsAtEnd);
}

[Fact]
public void MessageReader_reads_to_end() {
var expected_value = new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
message_builder.Write(expected_value);
var message = message_builder.ToMessage();
message_reader.Message = message;

Assert.Equal(expected_value, message_reader.ReadToEnd());
Assert.True(message_reader.IsAtEnd);
}

[Fact]
public void MessageReader_reads_to_end_multi_frame() {
var expected_value = new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 0};

message_builder.Write(new byte[] { 1, 2, 3, 4, 5 });
message_builder.FinalizeFrame();

message_builder.Write(new byte[] { 6, 7, 8, 9, 0 });
var message = message_builder.ToMessage();
message_reader.Message = message;

Assert.Equal(expected_value, message_reader.ReadToEnd());
Assert.True(message_reader.IsAtEnd);
}

[Fact]
public void MessageReader_reads_to_end_partial() {
var expected_value = new byte[] { 4, 5, 6, 7, 8, 9, 10 };
message_builder.Write(new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 });
var message = message_builder.ToMessage();
message_reader.Message = message;

message_reader.ReadBytes(3);
Assert.Equal(expected_value, message_reader.ReadToEnd());
Assert.True(message_reader.IsAtEnd);
}


[Fact]
public void MessageReader_maintains_position() {
var expected_value = 5;
message_builder.Write(new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 });
var message = message_builder.ToMessage();
message_reader.Message = message;

message_reader.ReadBytes(5);
Assert.Equal(expected_value, message_reader.Position);
Assert.False(message_reader.IsAtEnd);
}

[Fact]
public void MessageReader_maintains_position_across_frames() {
var expected_value = 7;
message_builder.Write(new byte[] {1, 2, 3, 4});
message_builder.FinalizeFrame();
message_builder.Write(new byte[] {5, 6, 7, 8, 9, 10});

var message = message_builder.ToMessage();
message_reader.Message = message;


Assert.Equal(new byte[] {1, 2, 3, 4, 5, 6, 7}, message_reader.ReadBytes(7));
Assert.Equal(expected_value, message_reader.Position);
Assert.False(message_reader.IsAtEnd);
}

[Fact]
public void MessageReader_throws_when_reading_simple_type_across_frames() {
message_builder.Write(new byte[] { 1, 2 });
message_builder.FinalizeFrame();
message_builder.Write(new byte[] { 5, 6 });

var message = message_builder.ToMessage();
message_reader.Message = message;

Assert.Throws<InvalidOperationException>(() => message_reader.ReadInt32());
}


}
Expand Down
120 changes: 79 additions & 41 deletions DtronixMessageQueue/MqMessageReader.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.IO;
using System.Linq;
using System.Text;

namespace DtronixMessageQueue {
Expand All @@ -22,7 +23,12 @@ public class MqMessageReader : BinaryReader {
/// <summary>
/// Current position in the frame that is being read.
/// </summary>
private int position;
private int frame_position;

/// <summary>
/// Position in the entire message read.
/// </summary>
private int absolute_position;

/// <summary>
/// Current message being read.
Expand Down Expand Up @@ -69,11 +75,22 @@ public MqMessage Message {
set {
message = value;
current_frame = value?[0];
position = 0;
frame_position = 0;
message_position = 0;
absolute_position = 0;
}
}

/// <summary>
/// Byte position in the message.
/// </summary>
public int Position => absolute_position;

/// <summary>
/// Total length of the bytes in this message.
/// </summary>
public int Length => message.Frames.Sum(frm => frm.DataLength);

/// <summary>
/// Unused. Stream.Null
/// </summary>
Expand All @@ -85,7 +102,7 @@ public MqMessage Message {
public bool IsAtEnd {
get {
var last_frame = message.Frames[message.Frames.Count - 1];
return current_frame == last_frame && last_frame.DataLength == position;
return current_frame == last_frame && last_frame.DataLength == frame_position;
}
}

Expand Down Expand Up @@ -120,21 +137,16 @@ public MqMessageReader(MqMessage initial_message, Encoding encoding) : base(Stre
/// </summary>
/// <param name="length">Number of bytes to ensure available.</param>
private void EnsureBuffer(int length) {

if (position + length > current_frame.DataLength) {
if (frame_position + length > current_frame.DataLength) {
throw new InvalidOperationException("Trying to read simple type across frames which is not allowed.");
}

if (position + length > current_frame.DataLength) {
NextNonEmptyFrame();
}
}

/// <summary>
/// Skips over to the next non-empty frame in the message.
/// </summary>
private void NextNonEmptyFrame() {
position = 0;
frame_position = 0;
// Increment until we reach the next non-empty frame.
do {
message_position++;
Expand All @@ -148,8 +160,8 @@ private void NextNonEmptyFrame() {
/// Advances the reader to the next frame and resets reading positions.
/// </summary>
public void NextFrame() {
position = 0;
message_position++;
frame_position = 0;
message_position += 1;
current_frame = message[message_position];
}

Expand All @@ -159,8 +171,9 @@ public void NextFrame() {
/// </summary>
public override bool ReadBoolean() {
EnsureBuffer(1);
var value = current_frame.ReadBoolean(position);
position += 1;
var value = current_frame.ReadBoolean(frame_position);
frame_position += 1;
absolute_position += 1;
return value;
}

Expand All @@ -171,8 +184,9 @@ public override bool ReadBoolean() {
/// </summary>
public override byte ReadByte() {
EnsureBuffer(1);
var value = current_frame.ReadByte(position);
position += 1;
var value = current_frame.ReadByte(frame_position);
frame_position += 1;
absolute_position += 1;
return value;
}

Expand All @@ -182,8 +196,9 @@ public override byte ReadByte() {
/// </summary>
public override sbyte ReadSByte() {
EnsureBuffer(1);
var value = current_frame.ReadSByte(position);
position += 1;
var value = current_frame.ReadSByte(frame_position);
frame_position += 1;
absolute_position += 1;
return value;
}

Expand Down Expand Up @@ -284,14 +299,14 @@ public override int Read(char[] buffer, int index, int count) {
public override int PeekChar() {
// Store the temporary state of the reader.
var previous_frame = current_frame;
var previous_position = position;
var previous_position = frame_position;
var previous_message_position = message_position;

var value = ReadChar();

// Restore the original state of the reader.
current_frame = previous_frame;
position = previous_position;
frame_position = previous_position;
message_position = previous_message_position;

return (int) value;
Expand All @@ -303,8 +318,9 @@ public override int PeekChar() {
/// </summary>
public override short ReadInt16() {
EnsureBuffer(2);
var value = current_frame.ReadInt16(position);
position += 2;
var value = current_frame.ReadInt16(frame_position);
frame_position += 2;
absolute_position += 2;
return value;
}

Expand All @@ -314,8 +330,9 @@ public override short ReadInt16() {
/// </summary>
public override ushort ReadUInt16() {
EnsureBuffer(2);
var value = current_frame.ReadUInt16(position);
position += 2;
var value = current_frame.ReadUInt16(frame_position);
frame_position += 2;
absolute_position += 2;
return value;
}

Expand All @@ -335,8 +352,9 @@ public override int Read() {
/// </summary>
public override int ReadInt32() {
EnsureBuffer(4);
var value = current_frame.ReadInt32(position);
position += 4;
var value = current_frame.ReadInt32(frame_position);
frame_position += 4;
absolute_position += 4;
return value;
}

Expand All @@ -347,8 +365,9 @@ public override int ReadInt32() {
/// </summary>
public override uint ReadUInt32() {
EnsureBuffer(4);
var value = current_frame.ReadUInt32(position);
position += 4;
var value = current_frame.ReadUInt32(frame_position);
frame_position += 4;
absolute_position += 4;
return value;
}

Expand All @@ -358,8 +377,9 @@ public override uint ReadUInt32() {
/// </summary>
public override long ReadInt64() {
EnsureBuffer(8);
var value = current_frame.ReadInt64(position);
position += 8;
var value = current_frame.ReadInt64(frame_position);
frame_position += 8;
absolute_position += 8;
return value;
}

Expand All @@ -370,8 +390,9 @@ public override long ReadInt64() {
/// </summary>
public override ulong ReadUInt64() {
EnsureBuffer(8);
var value = current_frame.ReadUInt64(position);
position += 8;
var value = current_frame.ReadUInt64(frame_position);
frame_position += 8;
absolute_position += 8;
return value;
}

Expand All @@ -382,8 +403,9 @@ public override ulong ReadUInt64() {
/// </summary>
public override float ReadSingle() {
EnsureBuffer(4);
var value = current_frame.ReadSingle(position);
position += 4;
var value = current_frame.ReadSingle(frame_position);
frame_position += 4;
absolute_position += 4;
return value;
}

Expand All @@ -394,8 +416,9 @@ public override float ReadSingle() {
/// </summary>
public override double ReadDouble() {
EnsureBuffer(8);
var value = current_frame.ReadDouble(position);
position += 8;
var value = current_frame.ReadDouble(frame_position);
frame_position += 8;
absolute_position += 8;
return value;
}

Expand All @@ -405,8 +428,9 @@ public override double ReadDouble() {
/// </summary>
public override decimal ReadDecimal() {
EnsureBuffer(16);
var value = current_frame.ReadDecimal(position);
position += 16;
var value = current_frame.ReadDecimal(frame_position);
frame_position += 16;
absolute_position += 16;
return value;
}

Expand All @@ -426,6 +450,19 @@ public override string ReadString() {

}

/// <summary>
/// Reads the rest of the message bytes from the current position to the end.
/// >1 Byte.
/// 1 or more frames.
/// </summary>
/// <returns>Message bytes read.</returns>
public byte[] ReadToEnd() {
var remaining = Length - absolute_position;

// If there is nothing left to read, return null; otherwise return the bytes.
return remaining == 0 ? null : ReadBytes(remaining);
}

/// <summary>
/// Reads the specified number of bytes from the message.
/// >1 Byte.
Expand All @@ -449,16 +486,17 @@ public override byte[] ReadBytes(int count) {
public override int Read(byte[] byte_buffer, int offset, int count) {
var total_read = 0;
while (offset < count) {
var max_read_length = current_frame.DataLength - position;
var max_read_length = current_frame.DataLength - frame_position;
var read_length = count - total_read < max_read_length ? count - total_read : max_read_length;
// If we are at the end of this max frame size, get a new one.
if (max_read_length == 0) {
NextNonEmptyFrame();
continue;
}

var read = current_frame.Read(position, byte_buffer, offset, read_length);
position += read;
var read = current_frame.Read(frame_position, byte_buffer, offset, read_length);
frame_position += read;
absolute_position += read;
total_read += read;
offset += read;
}
Expand Down

0 comments on commit 0f42224

Please sign in to comment.