Skip to content

Commit

Permalink
[CT-1190] Emit FinalizeBlock updates in single batch. (#2260)
Browse files Browse the repository at this point in the history
  • Loading branch information
teddyding authored Sep 16, 2024
1 parent 9577fe0 commit 9fe7566
Showing 1 changed file with 141 additions and 44 deletions.
185 changes: 141 additions & 44 deletions protocol/streaming/full_node_streaming_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -490,19 +490,11 @@ func (sm *FullNodeStreamingManagerImpl) TracksSubaccountId(subaccountId satypes.
return exists
}

// SendOrderbookUpdates groups updates by their clob pair ids and
// sends messages to the subscribers.
func (sm *FullNodeStreamingManagerImpl) SendOrderbookUpdates(
func getStreamUpdatesFromOffchainUpdates(
offchainUpdates *clobtypes.OffchainUpdates,
blockHeight uint32,
execMode sdk.ExecMode,
) {
defer metrics.ModuleMeasureSince(
metrics.FullNodeGrpc,
metrics.GrpcSendOrderbookUpdatesLatency,
time.Now(),
)

) (streamUpdates []clobtypes.StreamUpdate, clobPairIds []uint32) {
// Group updates by clob pair id.
updates := make(map[uint32]*clobtypes.OffchainUpdates)
for _, message := range offchainUpdates.Messages {
Expand All @@ -514,8 +506,8 @@ func (sm *FullNodeStreamingManagerImpl) SendOrderbookUpdates(
}

// Unmarshal each per-clob pair message to v1 updates.
streamUpdates := make([]clobtypes.StreamUpdate, 0)
clobPairIds := make([]uint32, 0)
streamUpdates = make([]clobtypes.StreamUpdate, 0)
clobPairIds = make([]uint32, 0)
for clobPairId, update := range updates {
v1updates, err := streaming_util.GetOffchainUpdatesV1(update)
if err != nil {
Expand All @@ -535,26 +527,39 @@ func (sm *FullNodeStreamingManagerImpl) SendOrderbookUpdates(
clobPairIds = append(clobPairIds, clobPairId)
}

sm.AddOrderUpdatesToCache(streamUpdates, clobPairIds)
return streamUpdates, clobPairIds
}

// SendOrderbookFillUpdates groups fills by their clob pair ids and
// SendOrderbookUpdates groups updates by their clob pair ids and
// sends messages to the subscribers.
func (sm *FullNodeStreamingManagerImpl) SendOrderbookFillUpdates(
orderbookFills []clobtypes.StreamOrderbookFill,
func (sm *FullNodeStreamingManagerImpl) SendOrderbookUpdates(
offchainUpdates *clobtypes.OffchainUpdates,
blockHeight uint32,
execMode sdk.ExecMode,
perpetualIdToClobPairId map[uint32][]clobtypes.ClobPairId,
) {
defer metrics.ModuleMeasureSince(
metrics.FullNodeGrpc,
metrics.GrpcSendOrderbookFillsLatency,
metrics.GrpcSendOrderbookUpdatesLatency,
time.Now(),
)

streamUpdates, clobPairIds := getStreamUpdatesFromOffchainUpdates(offchainUpdates, blockHeight, execMode)

sm.AddOrderUpdatesToCache(streamUpdates, clobPairIds)
}

func (sm *FullNodeStreamingManagerImpl) getStreamUpdatesForOrderbookFills(
orderbookFills []clobtypes.StreamOrderbookFill,
blockHeight uint32,
execMode sdk.ExecMode,
perpetualIdToClobPairId map[uint32][]clobtypes.ClobPairId,
) (
streamUpdates []clobtypes.StreamUpdate,
clobPairIds []uint32,
) {
// Group fills by clob pair id.
streamUpdates := make([]clobtypes.StreamUpdate, 0)
clobPairIds := make([]uint32, 0)
streamUpdates = make([]clobtypes.StreamUpdate, 0)
clobPairIds = make([]uint32, 0)
for _, orderbookFill := range orderbookFills {
// If this is a deleveraging fill, fetch the clob pair id from the deleveraged
// perpetual id.
Expand All @@ -577,6 +582,29 @@ func (sm *FullNodeStreamingManagerImpl) SendOrderbookFillUpdates(
streamUpdates = append(streamUpdates, streamUpdate)
clobPairIds = append(clobPairIds, clobPairId)
}
return streamUpdates, clobPairIds
}

// SendOrderbookFillUpdates groups fills by their clob pair ids and
// sends messages to the subscribers.
func (sm *FullNodeStreamingManagerImpl) SendOrderbookFillUpdates(
orderbookFills []clobtypes.StreamOrderbookFill,
blockHeight uint32,
execMode sdk.ExecMode,
perpetualIdToClobPairId map[uint32][]clobtypes.ClobPairId,
) {
defer metrics.ModuleMeasureSince(
metrics.FullNodeGrpc,
metrics.GrpcSendOrderbookFillsLatency,
time.Now(),
)

streamUpdates, clobPairIds := sm.getStreamUpdatesForOrderbookFills(
orderbookFills,
blockHeight,
execMode,
perpetualIdToClobPairId,
)

sm.AddOrderUpdatesToCache(streamUpdates, clobPairIds)
}
Expand Down Expand Up @@ -609,6 +637,31 @@ func (sm *FullNodeStreamingManagerImpl) SendTakerOrderStatus(
)
}

func getStreamUpdatesForSubaccountUpdates(
subaccountUpdates []satypes.StreamSubaccountUpdate,
blockHeight uint32,
execMode sdk.ExecMode,
) (
streamUpdates []clobtypes.StreamUpdate,
subaccountIds []*satypes.SubaccountId,
) {
// Group subaccount updates by subaccount id.
streamUpdates = make([]clobtypes.StreamUpdate, 0)
subaccountIds = make([]*satypes.SubaccountId, 0)
for _, subaccountUpdate := range subaccountUpdates {
streamUpdate := clobtypes.StreamUpdate{
UpdateMessage: &clobtypes.StreamUpdate_SubaccountUpdate{
SubaccountUpdate: &subaccountUpdate,
},
BlockHeight: blockHeight,
ExecMode: uint32(execMode),
}
streamUpdates = append(streamUpdates, streamUpdate)
subaccountIds = append(subaccountIds, subaccountUpdate.SubaccountId)
}
return streamUpdates, subaccountIds
}

// SendFinalizedSubaccountUpdates groups subaccount updates by their subaccount ids and
// sends messages to the subscribers.
func (sm *FullNodeStreamingManagerImpl) SendFinalizedSubaccountUpdates(
Expand All @@ -626,20 +679,11 @@ func (sm *FullNodeStreamingManagerImpl) SendFinalizedSubaccountUpdates(
panic("SendFinalizedSubaccountUpdates should only be called in ExecModeFinalize")
}

// Group subaccount updates by subaccount id.
streamUpdates := make([]clobtypes.StreamUpdate, 0)
subaccountIds := make([]*satypes.SubaccountId, 0)
for _, subaccountUpdate := range subaccountUpdates {
streamUpdate := clobtypes.StreamUpdate{
UpdateMessage: &clobtypes.StreamUpdate_SubaccountUpdate{
SubaccountUpdate: &subaccountUpdate,
},
BlockHeight: blockHeight,
ExecMode: uint32(execMode),
}
streamUpdates = append(streamUpdates, streamUpdate)
subaccountIds = append(subaccountIds, subaccountUpdate.SubaccountId)
}
streamUpdates, subaccountIds := getStreamUpdatesForSubaccountUpdates(
subaccountUpdates,
blockHeight,
execMode,
)

sm.AddSubaccountUpdatesToCache(streamUpdates, subaccountIds)
}
Expand Down Expand Up @@ -796,6 +840,47 @@ func (sm *FullNodeStreamingManagerImpl) GetSubaccountSnapshotsForInitStreams(
return ret
}

// addBatchUpdatesToCacheWithLock adds batched updates to the cache.
// Used by `StreamBatchUpdatesAfterFinalizeBlock` to batch orderbook, fill
// and subaccount updates in a single stream.
// Note this method requires the lock and assumes that the lock has already been
// acquired by the caller.
func (sm *FullNodeStreamingManagerImpl) addBatchUpdatesToCacheWithLock(
orderbookStreamUpdates []clobtypes.StreamUpdate,
orderbookClobPairIds []uint32,
fillStreamUpdates []clobtypes.StreamUpdate,
fillClobPairIds []uint32,
subaccountStreamUpdates []clobtypes.StreamUpdate,
subaccountIds []*satypes.SubaccountId,
) {
// Add orderbook updates to cache.
sm.streamUpdateCache = append(sm.streamUpdateCache, orderbookStreamUpdates...)
for _, clobPairId := range orderbookClobPairIds {
sm.streamUpdateSubscriptionCache = append(
sm.streamUpdateSubscriptionCache,
sm.clobPairIdToSubscriptionIdMapping[clobPairId],
)
}

// Add fill updates to cache.
sm.streamUpdateCache = append(sm.streamUpdateCache, fillStreamUpdates...)
for _, clobPairId := range fillClobPairIds {
sm.streamUpdateSubscriptionCache = append(
sm.streamUpdateSubscriptionCache,
sm.clobPairIdToSubscriptionIdMapping[clobPairId],
)
}

// Add subaccount updates to cache.
sm.streamUpdateCache = append(sm.streamUpdateCache, subaccountStreamUpdates...)
for _, subaccountId := range subaccountIds {
sm.streamUpdateSubscriptionCache = append(
sm.streamUpdateSubscriptionCache,
sm.subaccountIdToSubscriptionIdMapping[*subaccountId],
)
}
}

// Grpc Streaming logic after consensus agrees on a block.
// - Stream all events staged during `FinalizeBlock`.
// - Stream orderbook updates to sync fills in local ops queue.
Expand All @@ -804,33 +889,45 @@ func (sm *FullNodeStreamingManagerImpl) StreamBatchUpdatesAfterFinalizeBlock(
orderBookUpdatesToSyncLocalOpsQueue *clobtypes.OffchainUpdates,
perpetualIdToClobPairId map[uint32][]clobtypes.ClobPairId,
) {
// Flush all pending updates, since we want the onchain updates to arrive in a batch.
sm.FlushStreamUpdates()

finalizedFills, finalizedSubaccountUpdates := sm.getStagedEventsFromFinalizeBlock(ctx)

// TODO(CT-1190): Stream below in a single batch.
// Send orderbook updates to sync optimistic orderbook onchain state after FinalizeBlock.
sm.SendOrderbookUpdates(
orderbookStreamUpdates, orderbookClobPairIds := getStreamUpdatesFromOffchainUpdates(
orderBookUpdatesToSyncLocalOpsQueue,
uint32(ctx.BlockHeight()),
ctx.ExecMode(),
)

// Send finalized fills from FinalizeBlock.
sm.SendOrderbookFillUpdates(
fillStreamUpdates, fillClobPairIds := sm.getStreamUpdatesForOrderbookFills(
finalizedFills,
uint32(ctx.BlockHeight()),
ctx.ExecMode(),
perpetualIdToClobPairId,
)

// Send finalized subaccount updates from FinalizeBlock.
sm.SendFinalizedSubaccountUpdates(
subaccountStreamUpdates, subaccountIds := getStreamUpdatesForSubaccountUpdates(
finalizedSubaccountUpdates,
uint32(ctx.BlockHeight()),
ctx.ExecMode(),
)

sm.Lock()
defer sm.Unlock()

// Flush all pending updates, since we want the onchain updates to arrive in a batch.
sm.FlushStreamUpdatesWithLock()

sm.addBatchUpdatesToCacheWithLock(
orderbookStreamUpdates,
orderbookClobPairIds,
fillStreamUpdates,
fillClobPairIds,
subaccountStreamUpdates,
subaccountIds,
)

// Emit all stream updates in a single batch.
// Note we still have the lock, which is released right before function returns.
sm.FlushStreamUpdatesWithLock()
}

// getStagedEventsFromFinalizeBlock returns staged events from `FinalizeBlock`.
Expand Down

0 comments on commit 9fe7566

Please sign in to comment.