From f25cbf83d469be44627dfd23f070be711b8b3cbc Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Thu, 4 Apr 2024 14:47:06 +0100 Subject: [PATCH 1/2] NATS protocol parser This is an experimental NATS Protocol Parser with zero allocations. Somewhat inspired by the .NET Runtime System.Text.Json library's Utf8JsonReader. | Method | Mean | Error | StdDev | Allocated | |------- |---------:|---------:|---------:|----------:| | Parse | 862.5 ns | 320.3 ns | 17.56 ns | - | --- NATS.Client.sln | 6 + .../NatsProtocolParserProf.csproj | 14 + NatsProtocolParserProf/Program.cs | 137 +++++ .../MicroBenchmark/NatsProtoParserBench.cs | 128 +++++ src/NATS.Client.Core/NatsProtocolParser.cs | 511 ++++++++++++++++++ .../NatsProtocolParserTest.cs | 103 ++++ 6 files changed, 899 insertions(+) create mode 100644 NatsProtocolParserProf/NatsProtocolParserProf.csproj create mode 100644 NatsProtocolParserProf/Program.cs create mode 100644 sandbox/MicroBenchmark/NatsProtoParserBench.cs create mode 100644 src/NATS.Client.Core/NatsProtocolParser.cs create mode 100644 tests/NATS.Client.Core.Tests/NatsProtocolParserTest.cs diff --git a/NATS.Client.sln b/NATS.Client.sln index ee9dac34d..8f3d241e1 100644 --- a/NATS.Client.sln +++ b/NATS.Client.sln @@ -107,6 +107,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Example.OpenTelemetry", "sa EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "NATS.Net.OpenTelemetry.Tests", "tests\NATS.Net.OpenTelemetry.Tests\NATS.Net.OpenTelemetry.Tests.csproj", "{B8554582-DE19-41A2-9784-9B27C9F22429}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "NatsProtocolParserProf", "NatsProtocolParserProf\NatsProtocolParserProf.csproj", "{1981B633-D522-4468-873D-5CC49B489159}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -285,6 +287,10 @@ Global {B8554582-DE19-41A2-9784-9B27C9F22429}.Debug|Any CPU.Build.0 = Debug|Any CPU {B8554582-DE19-41A2-9784-9B27C9F22429}.Release|Any CPU.ActiveCfg = Release|Any CPU {B8554582-DE19-41A2-9784-9B27C9F22429}.Release|Any CPU.Build.0 = Release|Any CPU + {1981B633-D522-4468-873D-5CC49B489159}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {1981B633-D522-4468-873D-5CC49B489159}.Debug|Any CPU.Build.0 = Debug|Any CPU + {1981B633-D522-4468-873D-5CC49B489159}.Release|Any CPU.ActiveCfg = Release|Any CPU + {1981B633-D522-4468-873D-5CC49B489159}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE diff --git a/NatsProtocolParserProf/NatsProtocolParserProf.csproj b/NatsProtocolParserProf/NatsProtocolParserProf.csproj new file mode 100644 index 000000000..177ee2ba4 --- /dev/null +++ b/NatsProtocolParserProf/NatsProtocolParserProf.csproj @@ -0,0 +1,14 @@ + + + + Exe + net8.0 + enable + enable + + + + + + + diff --git a/NatsProtocolParserProf/Program.cs b/NatsProtocolParserProf/Program.cs new file mode 100644 index 000000000..309fa1299 --- /dev/null +++ b/NatsProtocolParserProf/Program.cs @@ -0,0 +1,137 @@ +// See https://aka.ms/new-console-template for more information + +using System.Buffers; +using NATS.Client.Core; + +var bench = new NatsProtoParserBench(); +bench.Setup(); + +Console.WriteLine("Setup completed"); +Console.ReadLine(); + +var count = 0; +for (var i = 0; i < 1_000_000; i++) +{ + count += bench.Parse(); +} + +Console.WriteLine($"count: {count}"); +Console.ReadLine(); + +public class NatsProtoParserBench +{ + private List> _sequences; + private NatsProtocolParser _parser; + + public void Setup() + { + _sequences = + [ + new SequenceBuilder() + .Append("INFO {\"server_id\":\"nats-server\""u8.ToArray()) + .Append("}\r"u8.ToArray()) + .Append("\nPI"u8.ToArray()) + .ReadOnlySequence, + + new SequenceBuilder() + .Append("NG"u8.ToArray()) + .Append("\r"u8.ToArray()) + .Append("\n"u8.ToArray()) + .Append("PO"u8.ToArray()) + .ReadOnlySequence, + + new SequenceBuilder() + .Append("NG\r\n"u8.ToArray()) + .Append("+OK\r\n"u8.ToArray()) + .Append("-ER"u8.ToArray()) + .Append("R 'cra"u8.ToArray()) + .Append("sh!'\r\nPI"u8.ToArray()) + .Append("NG\r\n"u8.ToArray()) + .ReadOnlySequence, + + new SequenceBuilder() + .Append("MSG subject sid1 reply_to 1\r\nx\r\n"u8.ToArray()) + .ReadOnlySequence, + + new SequenceBuilder() + .Append("PING\r\n"u8.ToArray()) + .ReadOnlySequence + ]; + + _parser = new NatsProtocolParser(); + } + + public int Parse() + { + var tokenizer = new NatsProtocolParser.NatsTokenizer(); + var count = 0; + + foreach (var sequence in _sequences) + { + var buffer = sequence; + + while (_parser.TryRead(ref tokenizer, ref buffer)) + { + switch (_parser.Command) + { + case NatsProtocolParser.NatsTokenizer.Command.INFO: + case NatsProtocolParser.NatsTokenizer.Command.PING: + case NatsProtocolParser.NatsTokenizer.Command.PONG: + case NatsProtocolParser.NatsTokenizer.Command.OK: + case NatsProtocolParser.NatsTokenizer.Command.ERR: + case NatsProtocolParser.NatsTokenizer.Command.MSG: + count++; + break; + } + + _parser.Reset(); + } + } + + if (count != 8) + throw new Exception("Invalid count"); + + return count; + } + + private class BufferSegment : ReadOnlySequenceSegment + { + public void SetMemory(ReadOnlyMemory memory) => Memory = memory; + + public void SetNextSegment(BufferSegment? segment) => Next = segment; + + public void SetRunningIndex(int index) => RunningIndex = index; + } + + private class SequenceBuilder + { + private BufferSegment? _start; + private BufferSegment? _end; + private int _length; + + public ReadOnlySequence ReadOnlySequence => new(_start!, 0, _end!, _end!.Memory.Length); + + // Memory is only allowed rent from ArrayPool. + public SequenceBuilder Append(ReadOnlyMemory buffer) + { + var segment = new BufferSegment(); + segment.SetMemory(buffer); + + if (_start == null) + { + _start = segment; + _end = segment; + } + else + { + _end!.SetNextSegment(segment); + segment.SetRunningIndex(_length); + _end = segment; + } + + _length += buffer.Length; + + return this; + } + } +} diff --git a/sandbox/MicroBenchmark/NatsProtoParserBench.cs b/sandbox/MicroBenchmark/NatsProtoParserBench.cs new file mode 100644 index 000000000..fc3b56dcf --- /dev/null +++ b/sandbox/MicroBenchmark/NatsProtoParserBench.cs @@ -0,0 +1,128 @@ +using System.Buffers; +using BenchmarkDotNet.Attributes; +using NATS.Client.Core; + +namespace MicroBenchmark; + +[ShortRunJob] +[MemoryDiagnoser] +[PlainExporter] +public class NatsProtoParserBench +{ + private List> _sequences; + private NatsProtocolParser _parser; + + [GlobalSetup] + public void Setup() + { + _sequences = + [ + new SequenceBuilder() + .Append("INFO {\"server_id\":\"nats-server\""u8.ToArray()) + .Append("}\r"u8.ToArray()) + .Append("\nPI"u8.ToArray()) + .ReadOnlySequence, + + new SequenceBuilder() + .Append("NG"u8.ToArray()) + .Append("\r"u8.ToArray()) + .Append("\n"u8.ToArray()) + .Append("PO"u8.ToArray()) + .ReadOnlySequence, + + new SequenceBuilder() + .Append("NG\r\n"u8.ToArray()) + .Append("+OK\r\n"u8.ToArray()) + .Append("-ER"u8.ToArray()) + .Append("R 'cra"u8.ToArray()) + .Append("sh!'\r\nPI"u8.ToArray()) + .Append("NG\r\n"u8.ToArray()) + .ReadOnlySequence, + + new SequenceBuilder() + .Append("MSG subject sid1 reply_to 1\r\nx\r\n"u8.ToArray()) + .ReadOnlySequence, + + new SequenceBuilder() + .Append("PING\r\n"u8.ToArray()) + .ReadOnlySequence + ]; + + _parser = new NatsProtocolParser(); + } + + [Benchmark] + public int Parse() + { + var tokenizer = new NatsProtocolParser.NatsTokenizer(); + var count = 0; + + foreach (var sequence in _sequences) + { + var buffer = sequence; + + while (_parser.TryRead(ref tokenizer, ref buffer)) + { + switch (_parser.Command) + { + case NatsProtocolParser.NatsTokenizer.Command.INFO: + case NatsProtocolParser.NatsTokenizer.Command.PING: + case NatsProtocolParser.NatsTokenizer.Command.PONG: + case NatsProtocolParser.NatsTokenizer.Command.OK: + case NatsProtocolParser.NatsTokenizer.Command.ERR: + case NatsProtocolParser.NatsTokenizer.Command.MSG: + count++; + break; + } + + _parser.Reset(); + } + } + + if (count != 8) + throw new Exception("Invalid count"); + + return count; + } + + private class BufferSegment : ReadOnlySequenceSegment + { + public void SetMemory(ReadOnlyMemory memory) => Memory = memory; + + public void SetNextSegment(BufferSegment? segment) => Next = segment; + + public void SetRunningIndex(int index) => RunningIndex = index; + } + + private class SequenceBuilder + { + private BufferSegment? _start; + private BufferSegment? _end; + private int _length; + + public ReadOnlySequence ReadOnlySequence => new(_start!, 0, _end!, _end!.Memory.Length); + + // Memory is only allowed rent from ArrayPool. + public SequenceBuilder Append(ReadOnlyMemory buffer) + { + var segment = new BufferSegment(); + segment.SetMemory(buffer); + + if (_start == null) + { + _start = segment; + _end = segment; + } + else + { + _end!.SetNextSegment(segment); + segment.SetRunningIndex(_length); + _end = segment; + } + + _length += buffer.Length; + + return this; + } + } +} diff --git a/src/NATS.Client.Core/NatsProtocolParser.cs b/src/NATS.Client.Core/NatsProtocolParser.cs new file mode 100644 index 000000000..7437ad390 --- /dev/null +++ b/src/NATS.Client.Core/NatsProtocolParser.cs @@ -0,0 +1,511 @@ +using System.Buffers; +using System.Buffers.Binary; +using System.Buffers.Text; +using System.Text; + +namespace NATS.Client.Core; + +public class NatsProtocolParser +{ + private int _currentToken; + + public NatsBytes Subject { get; private set; } + + public NatsBytes ReplyTo { get; private set; } + + public NatsBytes QueueGroup { get; private set; } + + public NatsBytes Sid { get; private set; } + + public NatsBytes Headers { get; private set; } + + public NatsBytes Payload { get; private set; } + + public NatsBytes? Error { get; private set; } + + public NatsTokenizer.Command Command { get; private set; } + + public NatsBytes? Json { get; private set; } + + public void Reset() + { + Subject = default; + Sid = default; + ReplyTo = default; + QueueGroup = default; + Headers = default; + Payload = default; + _currentToken = default; + Command = default; + Error = default; + Json = default; + } + + public bool TryRead(ref NatsTokenizer tokenizer, ref ReadOnlySequence buffer) + { + while (true) + { + var result = tokenizer.Read(ref buffer); + + if (result == NatsTokenizer.Result.ExamineMore) + { + continue; + } + + if (result == NatsTokenizer.Result.Done) + { + if (tokenizer.GetCommand() == NatsTokenizer.Command.INFO) + { + Json = new NatsBytes(tokenizer.GetBufferToken()); + Command = NatsTokenizer.Command.INFO; + } + else if (tokenizer.GetCommand() == NatsTokenizer.Command.PING) + { + Command = NatsTokenizer.Command.PING; + } + else if (tokenizer.GetCommand() == NatsTokenizer.Command.PONG) + { + Command = NatsTokenizer.Command.PONG; + } + else if (tokenizer.GetCommand() == NatsTokenizer.Command.OK) + { + Command = NatsTokenizer.Command.OK; + } + else if (tokenizer.GetCommand() == NatsTokenizer.Command.ERR) + { + Error = new NatsBytes(tokenizer.GetBufferToken()); + Command = NatsTokenizer.Command.ERR; + } + + tokenizer.Reset(); + return true; + } + + if (result == NatsTokenizer.Result.Token) + { + _currentToken++; + + // MSG [reply-to] <#bytes>␍␊[payload]␍␊ + if (tokenizer.GetCommand() == NatsTokenizer.Command.MSG) + { + Command = NatsTokenizer.Command.MSG; + + if (tokenizer.IsLastToken) + { + var length = tokenizer.GetIntegerToken(); + tokenizer.StartReadSize(length); + continue; + } + + if (_currentToken == 1) + { + Subject = new NatsBytes(tokenizer.GetBufferToken()); + continue; + } + + if (_currentToken == 2) + { + Sid = new NatsBytes(tokenizer.GetBufferToken()); + continue; + } + + if (_currentToken == 3) + { + ReplyTo = new NatsBytes(tokenizer.GetBufferToken()); + continue; + } + } + + continue; + } + + if (result == NatsTokenizer.Result.Payload) + { + if (tokenizer.GetCommand() == NatsTokenizer.Command.MSG) + { + Payload = new NatsBytes(tokenizer.GetBufferToken()); + tokenizer.Reset(); + return true; + } + } + + if (result == NatsTokenizer.Result.Error) + { + throw new Exception("tokenizer error"); + } + + if (result == NatsTokenizer.Result.ReadMore) + { + return false; + } + } + } + + public readonly struct NatsBytes(ReadOnlySequence sequence) + { + public string GetString() => Encoding.ASCII.GetString(sequence); + } + + public struct NatsTokenizer + { + /* + B: PING␍␊ + B: PONG␍␊ + S: +OK␍␊ + S: -ERR ␍␊ + S: INFO {"option_name":option_value,...}␍␊ + S: MSG [reply-to] <#bytes>␍␊[payload]␍␊ + S: HMSG [reply-to] <#header-bytes> <#total-bytes>␍␊[headers]␍␊␍␊[payload]␍␊ + + C: CONNECT {"option_name":option_value,...}␍␊ + C: SUB [queue group] ␍␊ + C: UNSUB [max_msgs]␍␊ + C: PUB [reply-to] <#bytes>␍␊[payload]␍␊ + C: HPUB [reply-to] <#header-bytes> <#total-bytes>␍␊[headers]␍␊␍␊[payload]␍␊ + */ + + private int _size; + private bool _isLastToken; + private short _tokenIndex; + private Command _cmd; + private State _state; + private ReadOnlySequence _buffer; + + public NatsTokenizer() => Reset(); + + public enum Result : byte + { + ReadMore, + ExamineMore, + Done, + Token, + Payload, + Error, + } + + public enum Command : byte + { + NONE, + OK, + ERR, + CONNECT, + HMSG, + HPUB, + INFO, + MSG, + PING, + PONG, + PUB, + SUB, + UNSUB, + } + + private enum State + { + Start, + Cmd, + End, + } + + public bool IsLastToken => _isLastToken; + + public void Reset() + { + _buffer = default; + _cmd = default; + _isLastToken = default; + _size = default; + _state = default; + _tokenIndex = default; + } + + public short GetTokenIndex() => _tokenIndex; + + public Command GetCommand() => _cmd; + + public ReadOnlySequence GetBufferToken() => _buffer; + + public int GetIntegerToken() + { + if (_buffer.Length > 10) + throw new Exception("number too long"); + + Span span = stackalloc byte[(int)_buffer.Length]; + _buffer.CopyTo(span); + + if (!Utf8Parser.TryParse(span, out int value, out _)) + { + throw new Exception("number format error"); + } + + return value; + } + + public Result Read(ref ReadOnlySequence buffer) + { + if (_size > 0) + { + if (buffer.Length < _size + 2) + { + return Result.ReadMore; + } + + _buffer = buffer.Slice(0, _size); + buffer = buffer.Slice(_size + 2); + _size = 0; + return Result.Payload; + } + + if (_state == State.Start) + { + if (buffer.Length < 2) + { + return Result.ReadMore; + } + + var readShort = ReadShort(ref buffer); + + if (readShort == CommandShort.INFO) + { + _cmd = Command.INFO; + _state = State.Cmd; + return Result.ExamineMore; + } + + if (readShort == CommandShort.PING) + { + _cmd = Command.PING; + _state = State.Cmd; + return Result.ExamineMore; + } + + if (readShort == CommandShort.PONG) + { + _cmd = Command.PONG; + _state = State.Cmd; + return Result.ExamineMore; + } + + if (readShort == CommandShort.OK) + { + _cmd = Command.OK; + _state = State.Cmd; + return Result.ExamineMore; + } + + if (readShort == CommandShort.ERR) + { + _cmd = Command.ERR; + _state = State.Cmd; + return Result.ExamineMore; + } + + if (readShort == CommandShort.MSG) + { + _cmd = Command.MSG; + _state = State.Cmd; + return Result.ExamineMore; + } + + return Result.Error; + } + + if (_state == State.Cmd) + { + if (_cmd == Command.INFO) + { + if (_tokenIndex == 0) + { + // IN FO. + if (buffer.Length < 3) + { + return Result.ReadMore; + } + + buffer = buffer.Slice(3); + _tokenIndex = 1; + return Result.ExamineMore; + } + + if (_tokenIndex == 1) + { + var positionOfNewLine = buffer.PositionOf((byte)'\n'); + if (positionOfNewLine == null) + return Result.ReadMore; + + _buffer = buffer.Slice(0, positionOfNewLine.Value); + _isLastToken = true; + + buffer = buffer.Slice(positionOfNewLine.Value).Slice(1); + + return Result.Done; + } + + return Result.Error; + } + + if (_cmd == Command.PING) + { + // PI NG.. + if (buffer.Length < 4) + { + return Result.ReadMore; + } + + buffer = buffer.Slice(4); + return Result.Done; + } + + if (_cmd == Command.PONG) + { + // PO NG.. + if (buffer.Length < 4) + { + return Result.ReadMore; + } + + buffer = buffer.Slice(4); + return Result.Done; + } + + if (_cmd == Command.OK) + { + // +O K.. + if (buffer.Length < 3) + { + return Result.ReadMore; + } + + buffer = buffer.Slice(3); + return Result.Done; + } + + if (_cmd == Command.ERR) + { + if (_tokenIndex == 0) + { + // -ER R. + if (buffer.Length < 3) + { + return Result.ReadMore; + } + + buffer = buffer.Slice(3); + _tokenIndex = 1; + return Result.ExamineMore; + } + + if (_tokenIndex == 1) + { + var positionOfNewLine = buffer.PositionOf((byte)'\n'); + if (positionOfNewLine == null) + return Result.ReadMore; + + _buffer = buffer.Slice(0, positionOfNewLine.Value); + + // Trim last \r + if (_buffer.Length > 0) + _buffer = buffer.Slice(0, _buffer.Length - 1); + + _isLastToken = true; + + buffer = buffer.Slice(positionOfNewLine.Value); + + // Trim last \n + if (buffer.Length > 0) + buffer = buffer.Slice(1); + + return Result.Done; + } + } + + if (_cmd == Command.MSG) + { + if (_tokenIndex == 0) + { + // MS G. + if (buffer.Length < 2) + { + return Result.ReadMore; + } + + buffer = buffer.Slice(2); + _tokenIndex = 1; + return Result.ExamineMore; + } + + if (_tokenIndex > 0) + { + _tokenIndex++; + + var position = buffer.PositionOf((byte)' '); + if (position == null) + { + position = buffer.PositionOf((byte)'\n'); + if (position == null) + return Result.ReadMore; + + _isLastToken = true; + } + + _buffer = buffer.Slice(0, position.Value); + + // Trim last \r + if (_isLastToken && _buffer.Length > 0) + _buffer = buffer.Slice(0, _buffer.Length - 1); + + buffer = buffer.Slice(position.Value); + + // Trim last space + if (buffer.Length > 0) + buffer = buffer.Slice(1); + + return Result.Token; + } + } + } + + return Result.Error; + } + + public void StartReadSize(int size) + { + _size = size; + } + + private short ReadShort(ref ReadOnlySequence buffer) + { + short cmd; + if (buffer.IsSingleSegment) + { + cmd = BinaryPrimitives.ReadInt16LittleEndian(buffer.First.Span); + } + else + { + Span b1 = stackalloc byte[2]; + buffer.Slice(0, 2).CopyTo(b1); + cmd = BinaryPrimitives.ReadInt16LittleEndian(b1); + } + + buffer = buffer.Slice(2); + + return cmd; + } + + public static class CommandShort + { + public const short OK = 20267; + public const short ERR = 17709; + public const short CONNECT = 20291; + public const short HMSG = 19784; + public const short HPUB = 20552; + public const short INFO = 20041; + public const short MSG = 21325; + public const short PING = 18768; + public const short PONG = 20304; + public const short PUB = 21840; + public const short SUB = 21843; + public const short UNSUB = 20053; + } + } +} diff --git a/tests/NATS.Client.Core.Tests/NatsProtocolParserTest.cs b/tests/NATS.Client.Core.Tests/NatsProtocolParserTest.cs new file mode 100644 index 000000000..d95c3e55c --- /dev/null +++ b/tests/NATS.Client.Core.Tests/NatsProtocolParserTest.cs @@ -0,0 +1,103 @@ +using System.Buffers; + +namespace NATS.Client.Core.Tests; + +public class NatsProtocolParserTest(ITestOutputHelper output) +{ + [Fact] + public void T() + { + var sequences = new List> + { + new SequenceBuilder() + .Append("INFO {\"server_id\":\"nats-server\""u8.ToArray()) + .Append("}\r"u8.ToArray()) + .Append("\nPI"u8.ToArray()) + .ReadOnlySequence, + new SequenceBuilder() + .Append("NG"u8.ToArray()) + .Append("\r"u8.ToArray()) + .Append("\n"u8.ToArray()) + .Append("PO"u8.ToArray()) + .ReadOnlySequence, + new SequenceBuilder() + .Append("NG\r\n"u8.ToArray()) + .Append("+OK\r\n"u8.ToArray()) + .Append("-ER"u8.ToArray()) + .Append("R 'cra"u8.ToArray()) + .Append("sh!'\r\nPI"u8.ToArray()) + .Append("NG\r\n"u8.ToArray()) + .ReadOnlySequence, + new SequenceBuilder() + .Append("MSG subject sid1 reply_to 1\r\nx\r\n"u8.ToArray()) + .ReadOnlySequence, + new SequenceBuilder() + .Append("PING\r\n"u8.ToArray()) + .ReadOnlySequence, + }; + + var tokenizer = new NatsProtocolParser.NatsTokenizer(); + var parser = new NatsProtocolParser(); + + foreach (var sequence in sequences) + { + var buffer = sequence; + + while (parser.TryRead(ref tokenizer, ref buffer)) + { + output.WriteLine($"Command: {parser.Command}"); + if (parser.Command == NatsProtocolParser.NatsTokenizer.Command.MSG) + { + output.WriteLine($" subject: {parser.Subject.GetString()}"); + output.WriteLine($" sid: {parser.Sid.GetString()}"); + output.WriteLine($" reply-to: {parser.ReplyTo.GetString()}"); + output.WriteLine($" Payload-Length: {parser.Payload.GetString().Length}"); + output.WriteLine($" Payload: {parser.Payload.GetString()}"); + } + + parser.Reset(); + } + } + } + + private class BufferSegment : ReadOnlySequenceSegment + { + public void SetMemory(ReadOnlyMemory memory) => Memory = memory; + + public void SetNextSegment(BufferSegment? segment) => Next = segment; + + public void SetRunningIndex(int index) => RunningIndex = index; + } + + private class SequenceBuilder + { + private BufferSegment? _start; + private BufferSegment? _end; + private int _length; + + public ReadOnlySequence ReadOnlySequence => new(_start!, 0, _end!, _end!.Memory.Length); + + // Memory is only allowed rent from ArrayPool. + public SequenceBuilder Append(ReadOnlyMemory buffer) + { + var segment = new BufferSegment(); + segment.SetMemory(buffer); + + if (_start == null) + { + _start = segment; + _end = segment; + } + else + { + _end!.SetNextSegment(segment); + segment.SetRunningIndex(_length); + _end = segment; + } + + _length += buffer.Length; + + return this; + } + } +} From 34cfc8aed513457a342f93c7e82a698610acb133 Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Thu, 4 Apr 2024 16:07:22 +0100 Subject: [PATCH 2/2] wip --- NatsProtocolParserProf/Program.cs | 4 ++-- sandbox/MicroBenchmark/NatsProtoParserBench.cs | 4 ++-- src/NATS.Client.Core/NatsProtocolParser.cs | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/NatsProtocolParserProf/Program.cs b/NatsProtocolParserProf/Program.cs index 309fa1299..4e33ea2f1 100644 --- a/NatsProtocolParserProf/Program.cs +++ b/NatsProtocolParserProf/Program.cs @@ -20,8 +20,8 @@ public class NatsProtoParserBench { - private List> _sequences; - private NatsProtocolParser _parser; + private List> _sequences = new(); + private NatsProtocolParser _parser = new(); public void Setup() { diff --git a/sandbox/MicroBenchmark/NatsProtoParserBench.cs b/sandbox/MicroBenchmark/NatsProtoParserBench.cs index fc3b56dcf..4ca2b2dac 100644 --- a/sandbox/MicroBenchmark/NatsProtoParserBench.cs +++ b/sandbox/MicroBenchmark/NatsProtoParserBench.cs @@ -9,8 +9,8 @@ namespace MicroBenchmark; [PlainExporter] public class NatsProtoParserBench { - private List> _sequences; - private NatsProtocolParser _parser; + private List> _sequences = new(); + private NatsProtocolParser _parser = new(); [GlobalSetup] public void Setup() diff --git a/src/NATS.Client.Core/NatsProtocolParser.cs b/src/NATS.Client.Core/NatsProtocolParser.cs index 7437ad390..cc1c1b4e7 100644 --- a/src/NATS.Client.Core/NatsProtocolParser.cs +++ b/src/NATS.Client.Core/NatsProtocolParser.cs @@ -146,7 +146,7 @@ public readonly struct NatsBytes(ReadOnlySequence sequence) public string GetString() => Encoding.ASCII.GetString(sequence); } - public struct NatsTokenizer + public ref struct NatsTokenizer { /* B: PING␍␊