-
Notifications
You must be signed in to change notification settings - Fork 511
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Improvements in command handling #5 #631
base: main
Are you sure you want to change the base?
Changes from all commits
5c570c9
b29c03b
8d5c67b
57d9534
90c2489
6ac7518
ce4e972
76aa03d
2698b67
b05050a
fe19e3e
5459083
34354be
cea36f6
65a62c8
9132c87
d4ea372
9570d86
ea10ce6
ba1f67c
8d5a186
ce208e3
38fef93
abf8e28
925291d
7a410c2
6555787
6ee3652
cadfca2
9df3e68
36fcc2f
7515130
5fb1a75
c499c3c
96a38d8
7920d5b
f944d25
ea53fda
815a0a5
04b27a6
beea5d8
6eb1e03
0bfb1d9
64ea1c5
dbacb58
22ea02c
41aaaa4
4d992f9
ec63ba4
6f8ec74
5ae9fdd
ecc8716
a0c5155
79f7298
cec844a
49f0ef7
c60f844
2d43807
1c0df8e
fc97b7b
b5680f2
df5b50d
3d833d7
ac4a1dd
dd3ac07
3ec10aa
9774e71
31d2025
ab1356f
d08e71e
72615cf
509d2a5
3334a34
8b1583d
265a6c6
6a1ec72
26e8c37
745557a
51b10ed
e086031
126fdc2
55fae9b
b93bbc4
52e045c
cf3bd30
86272fe
54d4f43
e228af6
1cc29c7
242a057
3cd584d
3cfe228
eb66d5c
4c5082a
dbd0c36
6d63f80
cad4a7f
6e79a8b
b3daafe
40c16ce
460d7ca
53402bb
17c2451
20270c2
44de156
ed0f03f
8dca3a2
0beb052
e5cbae0
05188fc
4e55852
405236b
ca22307
089f4dd
97e1372
24a28a4
d05d76f
9c0dc38
df1f773
5cea7d7
ecb970b
c93d9ab
ffc438d
64d9e06
fd322f6
5487d3f
a8d4c69
e3ee243
ad702ba
0909431
c47a90b
d461a15
b848dd3
956c711
979c9f9
e2b93a6
a2e5e75
7b5d5e4
eaa00ab
b1309af
ea1e337
9fb4989
12d4ff6
0ec23cc
626ddf0
4280997
51a17ba
ca80161
7aa443e
344e5e7
f3a57fe
318bd36
b0891ce
ae0d1c5
2dbf562
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -101,28 +101,71 @@ private static bool TryReadUlong(ref byte* ptr, byte* end, out ulong value, out | |
|
||
/// <summary> | ||
/// Tries to read a signed 64-bit integer from a given ASCII-encoded input stream. | ||
/// This method will throw if an overflow occurred. | ||
/// </summary> | ||
/// <param name="ptr">Pointer to the beginning of the ASCII encoded input string.</param> | ||
/// <param name="end">The end of the string to parse.</param> | ||
/// <param name="value">If parsing was successful, contains the parsed long value.</param> | ||
/// <param name="bytesRead">If parsing was successful, contains the number of bytes that were parsed.</param> | ||
/// <param name="allowLeadingZeros">True if leading zeros allowed</param> | ||
/// <returns> | ||
/// True if a long was successfully parsed, false if the input string did not start with | ||
/// a valid integer or the end of the string was reached before finishing parsing. | ||
/// </returns> | ||
[MethodImpl(MethodImplOptions.AggressiveInlining)] | ||
public static bool TryReadLong(ref byte* ptr, byte* end, out long value, out ulong bytesRead) | ||
public static bool TryReadLong(ref byte* ptr, byte* end, out long value, out ulong bytesRead, bool allowLeadingZeros = true) | ||
{ | ||
var parseSuccessful = TryReadLongSafe(ref ptr, end, out value, out bytesRead, out var signRead, | ||
out var overflow, allowLeadingZeros); | ||
|
||
if (parseSuccessful) return true; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. better readability to have return on its own line. In fact parseSuccessful is not needed; just return true if TryReadLongSafe. and only the second of the "return false" lines below is needed |
||
|
||
if (overflow) | ||
{ | ||
var digitsRead = signRead ? bytesRead - 1 : bytesRead; | ||
RespParsingException.ThrowIntegerOverflow(ptr - digitsRead, (int)digitsRead); | ||
return false; | ||
} | ||
|
||
return false; | ||
} | ||
|
||
/// <summary> | ||
/// Tries to read a signed 64-bit integer from a given ASCII-encoded input stream. | ||
/// </summary> | ||
/// <param name="ptr">Pointer to the beginning of the ASCII encoded input string.</param> | ||
/// <param name="end">The end of the string to parse.</param> | ||
/// <param name="value">If parsing was successful, contains the parsed long value.</param> | ||
/// <param name="bytesRead">If parsing was successful, contains the number of bytes that were parsed.</param> | ||
/// <param name="signRead">True if +/- sign was read during parsing</param> | ||
/// <param name="overflow">True if overflow occured during parsing</param> | ||
/// <param name="allowLeadingZeros">True if leading zeros allowed</param> | ||
/// <returns> | ||
/// True if a long was successfully parsed, false if the input string did not start with | ||
/// a valid integer or the end of the string was reached before finishing parsing. | ||
/// </returns> | ||
[MethodImpl(MethodImplOptions.AggressiveInlining)] | ||
public static bool TryReadLongSafe(ref byte* ptr, byte* end, out long value, out ulong bytesRead, out bool signRead, out bool overflow, bool allowLeadingZeros = true) | ||
{ | ||
bytesRead = 0; | ||
value = 0; | ||
overflow = false; | ||
|
||
// Parse optional leading sign | ||
if (TryReadSign(ptr, out var negative)) | ||
signRead = TryReadSign(ptr, out var negative); | ||
if (signRead) | ||
{ | ||
ptr++; | ||
bytesRead = 1; | ||
} | ||
|
||
if (!allowLeadingZeros) | ||
{ | ||
// Do not allow leading zeros | ||
if (end - ptr > 1 && *ptr == '0') | ||
return false; | ||
} | ||
|
||
// Parse digits as ulong | ||
if (!TryReadUlong(ref ptr, end, out var number, out var digitsRead)) | ||
{ | ||
|
@@ -134,7 +177,8 @@ public static bool TryReadLong(ref byte* ptr, byte* end, out long value, out ulo | |
{ | ||
if (number > ((ulong)long.MaxValue) + 1) | ||
{ | ||
RespParsingException.ThrowIntegerOverflow(ptr - digitsRead, (int)digitsRead); | ||
overflow = true; | ||
return false; | ||
} | ||
|
||
value = -1 - (long)(number - 1); | ||
|
@@ -143,7 +187,8 @@ public static bool TryReadLong(ref byte* ptr, byte* end, out long value, out ulo | |
{ | ||
if (number > long.MaxValue) | ||
{ | ||
RespParsingException.ThrowIntegerOverflow(ptr - digitsRead, (int)digitsRead); | ||
overflow = true; | ||
return false; | ||
} | ||
value = (long)number; | ||
} | ||
|
@@ -153,25 +198,61 @@ public static bool TryReadLong(ref byte* ptr, byte* end, out long value, out ulo | |
return true; | ||
} | ||
|
||
/// <summary> | ||
/// Tries to read a signed 32-bit integer from a given ASCII-encoded input stream. | ||
/// This method will throw if an overflow occurred. | ||
/// </summary> | ||
/// <param name="ptr">Pointer to the beginning of the ASCII encoded input string.</param> | ||
/// <param name="end">The end of the string to parse.</param> | ||
/// <param name="value">If parsing was successful, contains the parsed int value.</param> | ||
/// <param name="bytesRead">If parsing was successful, contains the number of bytes that were parsed.</param> | ||
/// <param name="allowLeadingZeros">True if leading zeros allowed</param> | ||
/// <returns> | ||
/// True if an int was successfully parsed, false if the input string did not start with | ||
/// a valid integer or the end of the string was reached before finishing parsing. | ||
/// </returns> | ||
[MethodImpl(MethodImplOptions.AggressiveInlining)] | ||
public static bool TryReadInt(ref byte* ptr, byte* end, out int value, out ulong bytesRead, bool allowLeadingZeros = true) | ||
{ | ||
var parseSuccessful = TryReadIntSafe(ref ptr, end, out value, out bytesRead, out var signRead, | ||
out var overflow, allowLeadingZeros); | ||
|
||
if (parseSuccessful) return true; | ||
|
||
if (overflow) | ||
{ | ||
var digitsRead = signRead ? bytesRead - 1 : bytesRead; | ||
RespParsingException.ThrowIntegerOverflow(ptr - digitsRead, (int)digitsRead); | ||
return false; | ||
} | ||
|
||
return false; | ||
} | ||
|
||
/// <summary> | ||
/// Tries to read a signed 32-bit integer from a given ASCII-encoded input stream. | ||
/// </summary> | ||
/// <param name="ptr">Pointer to the beginning of the ASCII encoded input string.</param> | ||
/// <param name="end">The end of the string to parse.</param> | ||
/// <param name="value">If parsing was successful, contains the parsed int value.</param> | ||
/// <param name="bytesRead">If parsing was successful, contains the number of bytes that were parsed.</param> | ||
/// <param name="signRead">True if +/- sign was read during parsing</param> | ||
/// <param name="overflow">True if overflow occured during parsing</param> | ||
/// <param name="allowLeadingZeros">True if leading zeros allowed</param> | ||
/// <returns> | ||
/// True if an int was successfully parsed, false if the input string did not start with | ||
/// a valid integer or the end of the string was reached before finishing parsing. | ||
/// </returns> | ||
[MethodImpl(MethodImplOptions.AggressiveInlining)] | ||
public static bool TryReadInt(ref byte* ptr, byte* end, out int value, out ulong bytesRead) | ||
public static bool TryReadIntSafe(ref byte* ptr, byte* end, out int value, out ulong bytesRead, out bool signRead, out bool overflow, bool allowLeadingZeros = true) | ||
{ | ||
bytesRead = 0; | ||
value = 0; | ||
overflow = false; | ||
|
||
// Parse optional leading sign | ||
if (TryReadSign(ptr, out var negative)) | ||
signRead = TryReadSign(ptr, out var negative); | ||
if (signRead) | ||
{ | ||
ptr++; | ||
bytesRead = 1; | ||
|
@@ -188,7 +269,8 @@ public static bool TryReadInt(ref byte* ptr, byte* end, out int value, out ulong | |
{ | ||
if (number > ((ulong)int.MaxValue) + 1) | ||
{ | ||
RespParsingException.ThrowIntegerOverflow(ptr - digitsRead, (int)digitsRead); | ||
overflow = true; | ||
return false; | ||
} | ||
|
||
value = (int)(0 - (long)number); | ||
|
@@ -197,7 +279,8 @@ public static bool TryReadInt(ref byte* ptr, byte* end, out int value, out ulong | |
{ | ||
if (number > int.MaxValue) | ||
{ | ||
RespParsingException.ThrowIntegerOverflow(ptr - digitsRead, (int)digitsRead); | ||
overflow = true; | ||
return false; | ||
} | ||
value = (int)number; | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -29,6 +29,10 @@ public sealed unsafe partial class AofProcessor | |
readonly CustomObjectCommandWrapper[] customObjectCommands; | ||
readonly RespServerSession respServerSession; | ||
|
||
static RawStringInput storeInput; | ||
static ObjectInput objectStoreInput; | ||
static SessionParseState parseState; | ||
|
||
/// <summary> | ||
/// Replication offset | ||
/// </summary> | ||
|
@@ -37,7 +41,7 @@ public sealed unsafe partial class AofProcessor | |
/// <summary> | ||
/// Session for main store | ||
/// </summary> | ||
readonly BasicContext<SpanByte, SpanByte, SpanByte, SpanByteAndMemory, long, MainSessionFunctions, MainStoreFunctions, MainStoreAllocator> basicContext; | ||
readonly BasicContext<SpanByte, SpanByte, RawStringInput, SpanByteAndMemory, long, MainSessionFunctions, MainStoreFunctions, MainStoreAllocator> basicContext; | ||
|
||
/// <summary> | ||
/// Session for object store | ||
|
@@ -88,6 +92,10 @@ public AofProcessor( | |
if (objectStoreSession is not null) | ||
objectStoreBasicContext = objectStoreSession.BasicContext; | ||
|
||
parseState.Initialize(); | ||
storeInput.parseState = parseState; | ||
objectStoreInput.parseState = parseState; | ||
|
||
inflightTxns = new Dictionary<int, List<byte[]>>(); | ||
buffer = new byte[BufferSizeUtils.ServerBufferSize(new MaxSizeSettings())]; | ||
handle = GCHandle.Alloc(buffer, GCHandleType.Pinned); | ||
|
@@ -259,40 +267,82 @@ private unsafe bool ReplayOp(byte* entryPtr) | |
ObjectStoreDelete(objectStoreBasicContext, entryPtr); | ||
break; | ||
case AofEntryType.StoredProcedure: | ||
ref var input = ref Unsafe.AsRef<SpanByte>(entryPtr + sizeof(AofHeader)); | ||
respServerSession.RunTransactionProc(header.type, new ArgSlice(ref input), ref output); | ||
RunStoredProc(header.type, entryPtr); | ||
break; | ||
default: | ||
throw new GarnetException($"Unknown AOF header operation type {header.opType}"); | ||
} | ||
return true; | ||
} | ||
|
||
static unsafe void StoreUpsert(BasicContext<SpanByte, SpanByte, SpanByte, SpanByteAndMemory, long, MainSessionFunctions, MainStoreFunctions, MainStoreAllocator> basicContext, byte* ptr) | ||
unsafe void RunStoredProc(byte id, byte* ptr) | ||
{ | ||
ref var key = ref Unsafe.AsRef<SpanByte>(ptr + sizeof(AofHeader)); | ||
ref var input = ref Unsafe.AsRef<SpanByte>(ptr + sizeof(AofHeader) + key.TotalSize); | ||
ref var value = ref Unsafe.AsRef<SpanByte>(ptr + sizeof(AofHeader) + key.TotalSize + input.TotalSize); | ||
var curr = ptr; | ||
var parseStateCount = *(int*)curr; | ||
curr += sizeof(int); | ||
|
||
if (parseStateCount > 0) | ||
{ | ||
parseState.Initialize(parseStateCount); | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I had to go look to figure out what "count" was here. Better to name it (and the ctor arg) "argCount" |
||
for (var i = 0; i < parseStateCount; i++) | ||
{ | ||
ref var sbArgument = ref Unsafe.AsRef<SpanByte>(curr); | ||
parseState.SetArgument(i, new ArgSlice(ref sbArgument)); | ||
curr += sbArgument.TotalSize; | ||
} | ||
} | ||
|
||
respServerSession.RunTransactionProc(id, ref parseState, ref output); | ||
} | ||
|
||
static unsafe void StoreUpsert(BasicContext<SpanByte, SpanByte, RawStringInput, SpanByteAndMemory, long, MainSessionFunctions, MainStoreFunctions, MainStoreAllocator> basicContext, byte* ptr) | ||
{ | ||
var curr = ptr + sizeof(AofHeader); | ||
ref var key = ref Unsafe.AsRef<SpanByte>(curr); | ||
curr += key.TotalSize; | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This could be cleaned up with an AofRecordDescriptor or something like that, to cover these pointer manipulations |
||
ref var value = ref Unsafe.AsRef<SpanByte>(curr); | ||
curr += value.TotalSize; | ||
|
||
// Reconstructing RawStringInput | ||
|
||
// input | ||
var length = *(int*)curr; | ||
curr += sizeof(int); | ||
|
||
storeInput.DeserializeFrom(curr); | ||
|
||
SpanByteAndMemory output = default; | ||
basicContext.Upsert(ref key, ref input, ref value, ref output); | ||
basicContext.Upsert(ref key, ref storeInput, ref value, ref output); | ||
if (!output.IsSpanByte) | ||
output.Memory.Dispose(); | ||
} | ||
|
||
static unsafe void StoreRMW(BasicContext<SpanByte, SpanByte, SpanByte, SpanByteAndMemory, long, MainSessionFunctions, MainStoreFunctions, MainStoreAllocator> basicContext, byte* ptr) | ||
static unsafe void StoreRMW(BasicContext<SpanByte, SpanByte, RawStringInput, SpanByteAndMemory, long, MainSessionFunctions, MainStoreFunctions, MainStoreAllocator> basicContext, byte* ptr) | ||
{ | ||
byte* pbOutput = stackalloc byte[32]; | ||
ref var key = ref Unsafe.AsRef<SpanByte>(ptr + sizeof(AofHeader)); | ||
ref var input = ref Unsafe.AsRef<SpanByte>(ptr + sizeof(AofHeader) + key.TotalSize); | ||
var curr = ptr + sizeof(AofHeader); | ||
ref var key = ref Unsafe.AsRef<SpanByte>(curr); | ||
curr += key.TotalSize; | ||
|
||
// Reconstructing RawStringInput | ||
|
||
// input | ||
var length = *(int*)curr; | ||
curr += sizeof(int); | ||
|
||
storeInput.DeserializeFrom(curr); | ||
|
||
var pbOutput = stackalloc byte[32]; | ||
var output = new SpanByteAndMemory(pbOutput, 32); | ||
if (basicContext.RMW(ref key, ref input, ref output).IsPending) | ||
|
||
if (basicContext.RMW(ref key, ref storeInput, ref output).IsPending) | ||
basicContext.CompletePending(true); | ||
if (!output.IsSpanByte) | ||
output.Memory.Dispose(); | ||
} | ||
|
||
static unsafe void StoreDelete(BasicContext<SpanByte, SpanByte, SpanByte, SpanByteAndMemory, long, MainSessionFunctions, MainStoreFunctions, MainStoreAllocator> basicContext, byte* ptr) | ||
static unsafe void StoreDelete(BasicContext<SpanByte, SpanByte, RawStringInput, SpanByteAndMemory, long, MainSessionFunctions, MainStoreFunctions, MainStoreAllocator> basicContext, byte* ptr) | ||
{ | ||
ref var key = ref Unsafe.AsRef<SpanByte>(ptr + sizeof(AofHeader)); | ||
basicContext.Delete(ref key); | ||
|
@@ -324,29 +374,14 @@ static unsafe void ObjectStoreRMW(BasicContext<byte[], IGarnetObject, ObjectInpu | |
// Reconstructing ObjectInput | ||
|
||
// input | ||
ref var sbInput = ref Unsafe.AsRef<SpanByte>(curr); | ||
ref var input = ref Unsafe.AsRef<ObjectInput>(sbInput.ToPointer()); | ||
curr += sbInput.TotalSize; | ||
|
||
// Reconstructing parse state | ||
var parseStateCount = input.parseState.Count; | ||
|
||
if (parseStateCount > 0) | ||
{ | ||
ArgSlice[] parseStateBuffer = default; | ||
input.parseState.Initialize(ref parseStateBuffer, parseStateCount); | ||
var length = *(int*)curr; | ||
curr += sizeof(int); | ||
|
||
for (var i = 0; i < parseStateCount; i++) | ||
{ | ||
ref var sbArgument = ref Unsafe.AsRef<SpanByte>(curr); | ||
parseStateBuffer[i] = new ArgSlice(ref sbArgument); | ||
curr += sbArgument.TotalSize; | ||
} | ||
} | ||
objectStoreInput.DeserializeFrom(curr); | ||
|
||
// Call RMW with the reconstructed key & ObjectInput | ||
var output = new GarnetObjectStoreOutput { spanByteAndMemory = new(outputPtr, outputLength) }; | ||
if (basicContext.RMW(ref keyB, ref input, ref output).IsPending) | ||
if (basicContext.RMW(ref keyB, ref objectStoreInput, ref output).IsPending) | ||
basicContext.CompletePending(true); | ||
if (!output.spanByteAndMemory.IsSpanByte) | ||
output.spanByteAndMemory.Memory.Dispose(); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this pass the RespCommand as a ctor arg? The more we guarantee correct initialization, the better