Skip to content

Commit

Permalink
Moving itemBroker to storeWrapper (#646)
Browse files Browse the repository at this point in the history
* Moving itemBroker to storeWrapper

* Fixing comments
  • Loading branch information
TalZaccai authored Sep 7, 2024
1 parent b92c3dc commit 6c68094
Show file tree
Hide file tree
Showing 12 changed files with 46 additions and 38 deletions.
30 changes: 16 additions & 14 deletions libs/host/GarnetServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ public class GarnetServer : IDisposable
private IDevice aofDevice;
private TsavoriteLog appendOnlyFile;
private SubscribeBroker<SpanByte, SpanByte, IKeySerializer<SpanByte>> subscribeBroker;
private CollectionItemBroker itemBroker;
private KVSettings<SpanByte, SpanByte> kvSettings;
private KVSettings<byte[], IGarnetObject> objKvSettings;
private INamedDeviceFactory logFactory;
Expand Down Expand Up @@ -184,7 +183,7 @@ private void InitializeServer()
throw new Exception($"Unable to call ThreadPool.SetMaxThreads with {opts.ThreadPoolMaxThreads}");

CreateMainStore(clusterFactory, out var checkpointDir);
CreateObjectStore(clusterFactory, customCommandManager, checkpointDir, out var objectStoreSizeTracker, out itemBroker);
CreateObjectStore(clusterFactory, customCommandManager, checkpointDir, out var objectStoreSizeTracker);

if (!opts.DisablePubSub)
subscribeBroker = new SubscribeBroker<SpanByte, SpanByte, IKeySerializer<SpanByte>>(new SpanByteKeySerializer(), null, opts.PubSubPageSizeBytes(), true);
Expand All @@ -200,7 +199,7 @@ private void InitializeServer()
customCommandManager, appendOnlyFile, opts, clusterFactory: clusterFactory, loggerFactory: loggerFactory);

// Create session provider for Garnet
Provider = new GarnetProvider(storeWrapper, subscribeBroker, itemBroker);
Provider = new GarnetProvider(storeWrapper, subscribeBroker);

// Create user facing API endpoints
Metrics = new MetricsApi(Provider);
Expand Down Expand Up @@ -237,33 +236,37 @@ private void CreateMainStore(IClusterFactory clusterFactory, out string checkpoi
, (allocatorSettings, storeFunctions) => new(allocatorSettings, storeFunctions));
}

private void CreateObjectStore(IClusterFactory clusterFactory, CustomCommandManager customCommandManager, string CheckpointDir, out CacheSizeTracker objectStoreSizeTracker, out CollectionItemBroker itemBroker)
private void CreateObjectStore(IClusterFactory clusterFactory, CustomCommandManager customCommandManager, string CheckpointDir, out CacheSizeTracker objectStoreSizeTracker)
{
objectStoreSizeTracker = null;
itemBroker = null;
if (!opts.DisableObjects)
{
objKvSettings = opts.GetObjectStoreSettings(this.loggerFactory?.CreateLogger("TsavoriteKV [obj]"), out var objTotalMemorySize);
objKvSettings = opts.GetObjectStoreSettings(this.loggerFactory?.CreateLogger("TsavoriteKV [obj]"),
out var objTotalMemorySize);

// Run checkpoint on its own thread to control p99
objKvSettings.ThrottleCheckpointFlushDelayMs = opts.CheckpointThrottleFlushDelayMs;
objKvSettings.CheckpointVersionSwitchBarrier = opts.EnableCluster;

if (opts.EnableCluster)
objKvSettings.CheckpointManager = clusterFactory.CreateCheckpointManager(opts.DeviceFactoryCreator(),
new DefaultCheckpointNamingScheme(CheckpointDir + "/ObjectStore/checkpoints"), isMainStore: false, logger);
objKvSettings.CheckpointManager = clusterFactory.CreateCheckpointManager(
opts.DeviceFactoryCreator(),
new DefaultCheckpointNamingScheme(CheckpointDir + "/ObjectStore/checkpoints"),
isMainStore: false, logger);
else
objKvSettings.CheckpointManager = new DeviceLogCommitCheckpointManager(opts.DeviceFactoryCreator(),
new DefaultCheckpointNamingScheme(CheckpointDir + "/ObjectStore/checkpoints"), removeOutdated: true);
new DefaultCheckpointNamingScheme(CheckpointDir + "/ObjectStore/checkpoints"),
removeOutdated: true);

objectStore = new(objKvSettings
, StoreFunctions<byte[], IGarnetObject>.Create(new ByteArrayKeyComparer(), () => new ByteArrayBinaryObjectSerializer(), () => new GarnetObjectSerializer(customCommandManager))
, StoreFunctions<byte[], IGarnetObject>.Create(new ByteArrayKeyComparer(),
() => new ByteArrayBinaryObjectSerializer(),
() => new GarnetObjectSerializer(customCommandManager))
, (allocatorSettings, storeFunctions) => new(allocatorSettings, storeFunctions));

if (objTotalMemorySize > 0)
objectStoreSizeTracker = new CacheSizeTracker(objectStore, objKvSettings, objTotalMemorySize, this.loggerFactory);

itemBroker = new CollectionItemBroker();
objectStoreSizeTracker = new CacheSizeTracker(objectStore, objKvSettings, objTotalMemorySize,
this.loggerFactory);
}
}

Expand Down Expand Up @@ -331,7 +334,6 @@ private void InternalDispose()
Provider?.Dispose();
server.Dispose();
subscribeBroker?.Dispose();
itemBroker?.Dispose();
store.Dispose();
appendOnlyFile?.Dispose();
aofDevice?.Dispose();
Expand Down
2 changes: 1 addition & 1 deletion libs/server/AOF/AofProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public AofProcessor(
accessControlList: storeWrapper.accessControlList,
loggerFactory: storeWrapper.loggerFactory);

this.respServerSession = new RespServerSession(0, networkSender: null, storeWrapper: replayAofStoreWrapper, subscribeBroker: null, itemBroker: null, authenticator: null, enableScripts: false);
this.respServerSession = new RespServerSession(0, networkSender: null, storeWrapper: replayAofStoreWrapper, subscribeBroker: null, authenticator: null, enableScripts: false);

var session = respServerSession.storageSession.basicContext.Session;
basicContext = session.BasicContext;
Expand Down
2 changes: 1 addition & 1 deletion libs/server/Lua/SessionScriptCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public SessionScriptCache(StoreWrapper storeWrapper, IGarnetAuthenticator authen
{
this.scratchBufferNetworkSender = new ScratchBufferNetworkSender();
this.storeWrapper = storeWrapper;
this.processor = new RespServerSession(0, scratchBufferNetworkSender, storeWrapper, null, null, authenticator, false);
this.processor = new RespServerSession(0, scratchBufferNetworkSender, storeWrapper, null, authenticator, false);
this.logger = logger;
}

Expand Down
6 changes: 1 addition & 5 deletions libs/server/Providers/GarnetProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ public sealed class GarnetProvider : TsavoriteKVProviderBase<SpanByte, SpanByte,
/// </summary>
internal StoreWrapper StoreWrapper => storeWrapper;

internal CollectionItemBroker itemBroker;

/// <summary>
/// Create SpanByte TsavoriteKV backend for Garnet
/// </summary>
Expand All @@ -36,12 +34,10 @@ public sealed class GarnetProvider : TsavoriteKVProviderBase<SpanByte, SpanByte,
/// <param name="maxSizeSettings"></param>
public GarnetProvider(StoreWrapper storeWrapper,
SubscribeBroker<SpanByte, SpanByte, IKeySerializer<SpanByte>> broker = null,
CollectionItemBroker itemBroker = null,
MaxSizeSettings maxSizeSettings = default)
: base(storeWrapper.store, new(), broker, false, maxSizeSettings)
{
this.storeWrapper = storeWrapper;
this.itemBroker = itemBroker;
}

/// <summary>
Expand Down Expand Up @@ -70,7 +66,7 @@ public void Dispose()
/// <inheritdoc />
public override IMessageConsumer GetSession(WireFormat wireFormat, INetworkSender networkSender)
=> (wireFormat == WireFormat.ASCII)
? new RespServerSession(Interlocked.Increment(ref lastSessionId), networkSender, storeWrapper, broker, itemBroker, null, true)
? new RespServerSession(Interlocked.Increment(ref lastSessionId), networkSender, storeWrapper, broker, null, true)
: throw new GarnetException($"Unsupported wireFormat {wireFormat}");
}
}
2 changes: 1 addition & 1 deletion libs/server/Resp/LocalServerSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public LocalServerSession(StoreWrapper storeWrapper)
this.scratchBufferManager = new ScratchBufferManager();

// Create storage session and API
this.storageSession = new StorageSession(storeWrapper, scratchBufferManager, sessionMetrics, LatencyMetrics, null, logger);
this.storageSession = new StorageSession(storeWrapper, scratchBufferManager, sessionMetrics, LatencyMetrics, logger);

this.BasicGarnetApi = new BasicGarnetApi(storageSession, storageSession.basicContext, storageSession.objectStoreBasicContext);
}
Expand Down
10 changes: 8 additions & 2 deletions libs/server/Resp/Objects/ListCommands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,10 @@ private bool ListBlockingPop(RespCommand command)
return true;
}

var result = itemBroker.GetCollectionItemAsync(command, keysBytes, this, timeout).Result;
if (storeWrapper.itemBroker == null)
throw new GarnetException("Object store is disabled");

var result = storeWrapper.itemBroker.GetCollectionItemAsync(command, keysBytes, this, timeout).Result;

if (!result.Found)
{
Expand Down Expand Up @@ -347,7 +350,10 @@ private unsafe bool ListBlockingMove(RespCommand command)
return true;
}

var result = itemBroker.MoveCollectionItemAsync(command, srcKey.ToArray(), this, timeout,
if (storeWrapper.itemBroker == null)
throw new GarnetException("Object store is disabled");

var result = storeWrapper.itemBroker.MoveCollectionItemAsync(command, srcKey.ToArray(), this, timeout,
cmdArgs).Result;

if (!result.Found)
Expand Down
7 changes: 2 additions & 5 deletions libs/server/Resp/RespServerSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ internal sealed unsafe partial class RespServerSession : ServerSessionBase
public readonly StorageSession storageSession;
internal BasicGarnetApi basicGarnetApi;
internal LockableGarnetApi lockableGarnetApi;
internal CollectionItemBroker itemBroker;

readonly IGarnetAuthenticator _authenticator;

Expand Down Expand Up @@ -182,7 +181,6 @@ public RespServerSession(
INetworkSender networkSender,
StoreWrapper storeWrapper,
SubscribeBroker<SpanByte, SpanByte, IKeySerializer<SpanByte>> subscribeBroker,
CollectionItemBroker itemBroker,
IGarnetAuthenticator authenticator,
bool enableScripts)
: base(networkSender)
Expand All @@ -201,14 +199,13 @@ public RespServerSession(
this.scratchBufferManager = new ScratchBufferManager();

// Create storage session and API
this.storageSession = new StorageSession(storeWrapper, scratchBufferManager, sessionMetrics, LatencyMetrics, itemBroker, logger);
this.storageSession = new StorageSession(storeWrapper, scratchBufferManager, sessionMetrics, LatencyMetrics, logger);

this.basicGarnetApi = new BasicGarnetApi(storageSession, storageSession.basicContext, storageSession.objectStoreBasicContext);
this.lockableGarnetApi = new LockableGarnetApi(storageSession, storageSession.lockableContext, storageSession.objectStoreLockableContext);

this.storeWrapper = storeWrapper;
this.subscribeBroker = subscribeBroker;
this.itemBroker = itemBroker;
this._authenticator = authenticator ?? storeWrapper.serverOptions.AuthSettings?.CreateAuthenticator(this.storeWrapper) ?? new GarnetNoAuthAuthenticator();

if (storeWrapper.serverOptions.EnableLua && enableScripts)
Expand Down Expand Up @@ -256,7 +253,7 @@ public override void Dispose()
storeWrapper.monitor.AddMetricsHistorySessionDispose(sessionMetrics, latencyMetrics);

subscribeBroker?.RemoveSubscription(this);
itemBroker?.HandleSessionDisposed(this);
storeWrapper.itemBroker?.HandleSessionDisposed(this);
sessionScriptCache?.Dispose();

// Cancel the async processor, if any
Expand Down
13 changes: 8 additions & 5 deletions libs/server/Storage/Session/ObjectStore/ListOps.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public unsafe GarnetStatus ListPush<TObjectContext>(ArgSlice key, ArgSlice[] ele
var status = RMWObjectStoreOperation(arrKey, ref input, out var output, ref objectStoreContext);

itemsDoneCount = output.result1;
itemBroker?.HandleCollectionUpdate(arrKey);
itemBroker.HandleCollectionUpdate(arrKey);
return status;
}

Expand Down Expand Up @@ -96,7 +96,7 @@ public unsafe GarnetStatus ListPush<TObjectContext>(ArgSlice key, ArgSlice eleme
var status = RMWObjectStoreOperation(key.ToArray(), ref input, out var output, ref objectStoreContext);
itemsDoneCount = output.result1;

itemBroker?.HandleCollectionUpdate(key.Span.ToArray());
itemBroker.HandleCollectionUpdate(key.Span.ToArray());
return status;
}

Expand Down Expand Up @@ -241,6 +241,9 @@ public GarnetStatus ListMove(ArgSlice sourceKey, ArgSlice destinationKey, Operat
element = default;
var objectLockableContext = txnManager.ObjectStoreLockableContext;

if (itemBroker == null)
ThrowObjectStoreUninitializedException();

// If source and destination are the same, the operation is equivalent to removing the last element from the list
// and pushing it as first element of the list, so it can be considered as a list rotation command.
bool sameKey = sourceKey.ReadOnlySpan.SequenceEqual(destinationKey.ReadOnlySpan);
Expand Down Expand Up @@ -344,7 +347,7 @@ public GarnetStatus ListMove(ArgSlice sourceKey, ArgSlice destinationKey, Operat
txnManager.Commit(true);
}

itemBroker?.HandleCollectionUpdate(destinationKey.Span.ToArray());
itemBroker.HandleCollectionUpdate(destinationKey.Span.ToArray());
return GarnetStatus.OK;
}

Expand Down Expand Up @@ -390,7 +393,7 @@ public GarnetStatus ListPush<TObjectContext>(byte[] key, ref ObjectInput input,
where TObjectContext : ITsavoriteContext<byte[], IGarnetObject, ObjectInput, GarnetObjectStoreOutput, long, ObjectSessionFunctions, ObjectStoreFunctions, ObjectStoreAllocator>
{
var status = RMWObjectStoreOperation(key, ref input, out output, ref objectStoreContext);
itemBroker?.HandleCollectionUpdate(key);
itemBroker.HandleCollectionUpdate(key);
return status;
}

Expand Down Expand Up @@ -432,7 +435,7 @@ public GarnetStatus ListInsert<TObjectContext>(byte[] key, ref ObjectInput input
where TObjectContext : ITsavoriteContext<byte[], IGarnetObject, ObjectInput, GarnetObjectStoreOutput, long, ObjectSessionFunctions, ObjectStoreFunctions, ObjectStoreAllocator>
{
var status = RMWObjectStoreOperation(key, ref input, out output, ref objectStoreContext);
itemBroker?.HandleCollectionUpdate(key);
itemBroker.HandleCollectionUpdate(key);
return status;
}

Expand Down
3 changes: 1 addition & 2 deletions libs/server/Storage/Session/StorageSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,13 @@ public StorageSession(StoreWrapper storeWrapper,
ScratchBufferManager scratchBufferManager,
GarnetSessionMetrics sessionMetrics,
GarnetLatencyMetricsSession LatencyMetrics,
CollectionItemBroker itemBroker,
ILogger logger = null)
{
this.sessionMetrics = sessionMetrics;
this.LatencyMetrics = LatencyMetrics;
this.scratchBufferManager = scratchBufferManager;
this.logger = logger;
this.itemBroker = itemBroker;
this.itemBroker = storeWrapper.itemBroker;

functionsState = storeWrapper.CreateFunctionsState();

Expand Down
5 changes: 5 additions & 0 deletions libs/server/StoreWrapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public sealed class StoreWrapper
/// </summary>
public readonly ILoggerFactory loggerFactory;

internal readonly CollectionItemBroker itemBroker;
internal readonly CustomCommandManager customCommandManager;
internal readonly GarnetServerMonitor monitor;
internal readonly WatchVersionMap versionMap;
Expand Down Expand Up @@ -141,6 +142,9 @@ public StoreWrapper(
this.GarnetObjectSerializer = new GarnetObjectSerializer(this.customCommandManager);
this.loggingFrequncy = TimeSpan.FromSeconds(serverOptions.LoggingFrequency);

if (!serverOptions.DisableObjects)
this.itemBroker = new CollectionItemBroker();

// Initialize store scripting cache
if (serverOptions.EnableLua)
this.storeScriptCache = new ConcurrentDictionary<byte[], byte[]>(new ByteArrayComparer());
Expand Down Expand Up @@ -604,6 +608,7 @@ public void Dispose()
//Wait for checkpoints to complete and disable checkpointing
_checkpointTaskLock.WriteLock();

itemBroker?.Dispose();
monitor?.Dispose();
ctsCommit?.Cancel();

Expand Down
2 changes: 1 addition & 1 deletion playground/Embedded.perftest/EmbeddedRespServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public EmbeddedRespServer(GarnetServerOptions opts, ILoggerFactory loggerFactory
/// <returns>A new RESP server session</returns>
internal RespServerSession GetRespSession()
{
return new RespServerSession(0, new DummyNetworkSender(), storeWrapper, null, null, null, false);
return new RespServerSession(0, new DummyNetworkSender(), storeWrapper, null, null, false);
}
}
}
2 changes: 1 addition & 1 deletion test/Garnet.test/RespSortedSetTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public unsafe void SortedSetPopTest()
db.SortedSetAdd("key1", "a", 1);
db.SortedSetAdd("key1", "b", 2);

var session = new RespServerSession(0, new DummyNetworkSender(), server.Provider.StoreWrapper, null, null, null, false);
var session = new RespServerSession(0, new DummyNetworkSender(), server.Provider.StoreWrapper, null, null, false);
var api = new TestBasicGarnetApi(session.storageSession, session.storageSession.basicContext, session.storageSession.objectStoreBasicContext);
var key = Encoding.ASCII.GetBytes("key1");
fixed (byte* keyPtr = key)
Expand Down

0 comments on commit 6c68094

Please sign in to comment.