Skip to content

Commit

Permalink
WIP - added throttling of foreach
Browse files Browse the repository at this point in the history
  • Loading branch information
skliper committed Oct 19, 2020
1 parent 825ea0a commit fa8a216
Show file tree
Hide file tree
Showing 7 changed files with 88 additions and 42 deletions.
24 changes: 17 additions & 7 deletions fsw/cfe-core/src/inc/private/cfe_sbr.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,14 @@ typedef struct
CFE_SB_RouteId_Atom_t RouteId; /**< \brief Holding value, do not use directly in code */
} CFE_SBR_RouteId_t;

/** \brief Callback throttling structure */
typedef struct
{
uint32 StartIndex; /**< /brief 0 based index to start at */
uint32 MaxLoop; /**< /brief Max number to process */
uint32 NextIndex; /**< /brief Next start index (output), 0 if completed */
} CFE_SBR_Throttle_t;

/** \brief For each id callback function prototype */
typedef void (*CFE_SBR_CallbackPtr_t)(CFE_SBR_RouteId_t RouteId, void *ArgPtr);

Expand Down Expand Up @@ -143,10 +151,11 @@ CFE_MSG_SequenceCount_t CFE_SBR_GetSequenceCounter(CFE_SBR_RouteId_t RouteId);
* but guaranteed to be called in order of increasing MsgId raw values. Only
* invokes callback if route exists.
*
* \param[in] CallbackPtr Function to invoke for each matching ID
* \param[in] ArgPtr Opaque argument to pass to callback function
* \param[in] CallbackPtr Function to invoke for each matching ID
* \param[in] ArgPtr Opaque argument to pass to callback function
* \param[in,out] ThrottlePtr Throttling structure, NULL for no throttle
*/
void CFE_SBR_ForEachMsgId(CFE_SBR_CallbackPtr_t CallbackPtr, void *ArgPtr);
void CFE_SBR_ForEachMsgId(CFE_SBR_CallbackPtr_t CallbackPtr, void *ArgPtr, CFE_SBR_Throttle_t *ThrottlePtr);

/**
* \brief Call the supplied callback function for all routes
Expand All @@ -155,10 +164,11 @@ void CFE_SBR_ForEachMsgId(CFE_SBR_CallbackPtr_t CallbackPtr, void *ArgPtr);
* but should only be used when MsgId order doesn't matter. Invokes callback
* for each route in the table.
*
* \param[in] CallbackPtr Function to invoke for each matching ID
* \param[in] ArgPtr Opaque argument to pass to callback function
* \param[in] CallbackPtr Function to invoke for each matching ID
* \param[in] ArgPtr Opaque argument to pass to callback function
* \param[in,out] ThrottlePtr Throttling structure, NULL for no throttle
*/
void CFE_SBR_ForEachRouteId(CFE_SBR_CallbackPtr_t CallbackPtr, void *ArgPtr);
void CFE_SBR_ForEachRouteId(CFE_SBR_CallbackPtr_t CallbackPtr, void *ArgPtr, CFE_SBR_Throttle_t *ThrottlePtr);

/******************************************************************************
** Inline functions
Expand All @@ -185,7 +195,7 @@ static inline bool CFE_SBR_IsValidRouteId(CFE_SBR_RouteId_t RouteId)
*/
static inline CFE_SBR_RouteId_t CFE_SBR_ValueToRouteId(CFE_SB_RouteId_Atom_t Value)
{
return ((CFE_SBR_RouteId_t){ .RouteId = 1 + Value });
return ((CFE_SBR_RouteId_t) {.RouteId = 1 + Value});
}

/**
Expand Down
2 changes: 1 addition & 1 deletion fsw/cfe-core/src/sb/cfe_sb_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ int32 CFE_SB_DeletePipeFull(CFE_SB_PipeId_t PipeId,CFE_ES_ResourceID_t AppId)
/* Remove the pipe from all routes */
Args.PipeId = PipeId;
Args.FullName = FullName;
CFE_SBR_ForEachRouteId(CFE_SB_RemovePipeFromRoute, &Args);
CFE_SBR_ForEachRouteId(CFE_SB_RemovePipeFromRoute, &Args, NULL);

if (CFE_SB.PipeTbl[PipeTblIdx].ToTrashBuff != NULL) {

Expand Down
6 changes: 3 additions & 3 deletions fsw/cfe-core/src/sb/cfe_sb_task.c
Original file line number Diff line number Diff line change
Expand Up @@ -951,7 +951,7 @@ int32 CFE_SB_SendRtgInfo(const char *Filename)
args.Filename = Filename;

/* Write route info to file in MsgId order */
CFE_SBR_ForEachMsgId(CFE_SB_WriteRouteToFile, &args);
CFE_SBR_ForEachMsgId(CFE_SB_WriteRouteToFile, &args, NULL);

if (args.Status != 0)
{
Expand Down Expand Up @@ -1115,7 +1115,7 @@ int32 CFE_SB_SendMapInfo(const char *Filename)
args.Filename = Filename;

/* Write route info to file in MsgId order */
CFE_SBR_ForEachMsgId(CFE_SB_WriteMapToFile, &args);
CFE_SBR_ForEachMsgId(CFE_SB_WriteMapToFile, &args, NULL);

if (args.Status != 0)
{
Expand Down Expand Up @@ -1209,7 +1209,7 @@ int32 CFE_SB_SendPrevSubsCmd(const CFE_SB_SendPrevSubs_t *data)
CFE_SB.PrevSubMsg.Payload.Entries = 0;

/* Send subcription for each route */
CFE_SBR_ForEachRouteId(CFE_SB_SendRouteSub, NULL);
CFE_SBR_ForEachRouteId(CFE_SB_SendRouteSub, NULL, NULL);

CFE_SB_UnlockSharedData(__func__,__LINE__);

Expand Down
2 changes: 1 addition & 1 deletion modules/sbr/private_inc/cfe_sbr_priv.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
*/

/** \brief Invalid route id */
#define CFE_SBR_INVALID_ROUTE_ID ((CFE_SBR_RouteId_t){ .RouteId = 0 })
#define CFE_SBR_INVALID_ROUTE_ID ((CFE_SBR_RouteId_t) {.RouteId = 0})

/******************************************************************************
* Function prototypes
Expand Down
8 changes: 4 additions & 4 deletions modules/sbr/src/cfe_sbr_map_direct.c
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@
*/

/**
* \brief Message map size
*
* For direct mapping, map size is maximum valid MsgId value + 1 (since MsgId 0 is valid)
*/
* \brief Message map size
*
* For direct mapping, map size is maximum valid MsgId value + 1 (since MsgId 0 is valid)
*/
#define CFE_SBR_MSG_MAP_SIZE (CFE_PLATFORM_SB_HIGHEST_VALID_MSGID + 1)

/******************************************************************************
Expand Down
31 changes: 16 additions & 15 deletions modules/sbr/src/cfe_sbr_map_hash.c
Original file line number Diff line number Diff line change
Expand Up @@ -42,28 +42,29 @@
*/

/**
* \brief Message map size
*
* For hash mapping, map size is a multiple of maximum number of routes.
* The multiple impacts the number of collisions when the routes fill up.
* 4 was initially chosen to provide for plenty of holes in the map, while
* still remaining much smaller than the routing table. Note the
* multiple must be a factor of 2 to use the efficient shift logic, and
* can't be bigger than what can be indexed by CFE_SB_MsgId_Atom_t
*/
#define CFE_SBR_MSG_MAP_SIZE (4*CFE_PLATFORM_SB_MAX_MSG_IDS)
* \brief Message map size
*
* For hash mapping, map size is a multiple of maximum number of routes.
* The multiple impacts the number of collisions when the routes fill up.
* 4 was initially chosen to provide for plenty of holes in the map, while
* still remaining much smaller than the routing table. Note the
* multiple must be a factor of 2 to use the efficient shift logic, and
* can't be bigger than what can be indexed by CFE_SB_MsgId_Atom_t
*/
#define CFE_SBR_MSG_MAP_SIZE (4 * CFE_PLATFORM_SB_MAX_MSG_IDS)

/* Verify power of two */
#if ((CFE_SBR_MSG_MAP_SIZE & (CFE_SBR_MSG_MAP_SIZE - 1)) != 0)
#error CFE_SBR_MSG_MAP_SIZE must be a power of 2 for hash algorithm to work
#error CFE_SBR_MSG_MAP_SIZE must be a power of 2 for hash algorithm to work
#endif

/* TODO consider verifing MsgId_Atom index is big enough */
/* TODO since algorithm is designed for 32 bit, consider verifing Atom is 32 bit? */

/** \brief Hash algorithm magic number
*
* Ref: https://stackoverflow.com/questions/664014/what-integer-hash-function-are-good-that-accepts-an-integer-hash-key/12996028#12996028
* Ref:
* https://stackoverflow.com/questions/664014/what-integer-hash-function-are-good-that-accepts-an-integer-hash-key/12996028#12996028
*/
#define CFE_SBR_HASH_MAGIC (0x45d9f3b)

Expand Down Expand Up @@ -131,7 +132,7 @@ uint32 CFE_SBR_SetRouteId(CFE_SB_MsgId_t MsgId, CFE_SBR_RouteId_t RouteId)
CFE_SBR_MSGMAP[hash] = RouteId;
}

return collisions;
return collisions;
}

/******************************************************************************
Expand All @@ -144,7 +145,7 @@ CFE_SBR_RouteId_t CFE_SBR_GetRouteId(CFE_SB_MsgId_t MsgId)

if (CFE_SB_IsValidMsgId(MsgId))
{
hash = CFE_SBR_MsgIdHash(MsgId);
hash = CFE_SBR_MsgIdHash(MsgId);
routeid = CFE_SBR_MSGMAP[hash];

/*
Expand All @@ -155,7 +156,7 @@ CFE_SBR_RouteId_t CFE_SBR_GetRouteId(CFE_SB_MsgId_t MsgId)
while (CFE_SBR_IsValidRouteId(routeid) && !CFE_SB_MsgId_Equal(CFE_SBR_GetMsgId(routeid), MsgId))
{
/* Increment or loop to start of array */
hash = (hash + 1) & (CFE_SBR_MSG_MAP_SIZE - 1);
hash = (hash + 1) & (CFE_SBR_MSG_MAP_SIZE - 1);
routeid = CFE_SBR_MSGMAP[hash];
}
}
Expand Down
57 changes: 46 additions & 11 deletions modules/sbr/src/cfe_sbr_route_unsorted.c
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@
/** \brief Routing table entry */
typedef struct
{
CFE_SB_DestinationD_t *ListHeadPtr; /**< \brief Destination list head */
CFE_SB_MsgId_t MsgId; /**< \brief Message ID associated with route */
CFE_MSG_SequenceCount_t SeqCnt; /**< \brief Message sequence counter */
CFE_SB_DestinationD_t * ListHeadPtr; /**< \brief Destination list head */
CFE_SB_MsgId_t MsgId; /**< \brief Message ID associated with route */
CFE_MSG_SequenceCount_t SeqCnt; /**< \brief Message sequence counter */
} CFE_SBR_RouteEntry_t;

/** \brief Module data */
Expand Down Expand Up @@ -74,7 +74,7 @@ void CFE_SBR_Init(void)
memset(&CFE_SBR_RDATA, 0, sizeof(CFE_SBR_RDATA));

/* Only non-zero value for shared data initialization is the invalid MsgId */
for(routeidx = 0; routeidx < CFE_PLATFORM_SB_MAX_MSG_IDS; routeidx++)
for (routeidx = 0; routeidx < CFE_PLATFORM_SB_MAX_MSG_IDS; routeidx++)
{
CFE_SBR_RDATA.RoutingTbl[routeidx].MsgId = CFE_SB_INVALID_MSG_ID;
}
Expand All @@ -88,13 +88,14 @@ void CFE_SBR_Init(void)
*/
CFE_SBR_RouteId_t CFE_SBR_AddRoute(CFE_SB_MsgId_t MsgId, uint32 *CollisionsPtr)
{
CFE_SBR_RouteId_t routeid = CFE_SBR_INVALID_ROUTE_ID;
CFE_SBR_RouteId_t routeid = CFE_SBR_INVALID_ROUTE_ID;
uint32 collisions = 0;

if (CFE_SB_IsValidMsgId(MsgId) && (CFE_SBR_RDATA.RouteIdxTop < CFE_PLATFORM_SB_MAX_MSG_IDS))
{
routeid = CFE_SBR_ValueToRouteId(CFE_SBR_RDATA.RouteIdxTop);
routeid = CFE_SBR_ValueToRouteId(CFE_SBR_RDATA.RouteIdxTop);
collisions = CFE_SBR_SetRouteId(MsgId, routeid);

CFE_SBR_RDATA.RoutingTbl[CFE_SBR_RDATA.RouteIdxTop].MsgId = MsgId;
CFE_SBR_RDATA.RouteIdxTop++;
}
Expand Down Expand Up @@ -179,12 +180,29 @@ CFE_MSG_SequenceCount_t CFE_SBR_GetSequenceCounter(CFE_SBR_RouteId_t RouteId)
/******************************************************************************
* Interface function - see API for description
*/
void CFE_SBR_ForEachMsgId(CFE_SBR_CallbackPtr_t CallbackPtr, void *ArgPtr)
void CFE_SBR_ForEachMsgId(CFE_SBR_CallbackPtr_t CallbackPtr, void *ArgPtr, CFE_SBR_Throttle_t *ThrottlePtr)
{
CFE_SB_MsgId_Atom_t msgidx;
CFE_SBR_RouteId_t routeid;
CFE_SBR_RouteId_t routeid;
CFE_SB_MsgId_Atom_t startidx = 0;
CFE_SB_MsgId_Atom_t endidx = CFE_PLATFORM_SB_HIGHEST_VALID_MSGID + 1;

/* Throttle if pointer supplied */
if (ThrottlePtr != NULL)
{
startidx = ThrottlePtr->StartIndex;

/* Return next index of zero if full range is processed */
ThrottlePtr->NextIndex = 0;

if ((startidx + ThrottlePtr->MaxLoop) < endidx)
{
endidx = startidx + ThrottlePtr->MaxLoop;
ThrottlePtr->NextIndex = endidx;
}
}

for (msgidx = 0; msgidx <= CFE_PLATFORM_SB_HIGHEST_VALID_MSGID; msgidx++)
for (msgidx = startidx; msgidx < endidx; msgidx++)
{
routeid = CFE_SBR_GetRouteId(CFE_SB_ValueToMsgId(msgidx));
if (CFE_SBR_IsValidRouteId(routeid))
Expand All @@ -197,11 +215,28 @@ void CFE_SBR_ForEachMsgId(CFE_SBR_CallbackPtr_t CallbackPtr, void *ArgPtr)
/******************************************************************************
* Interface function - see API for description
*/
void CFE_SBR_ForEachRouteId(CFE_SBR_CallbackPtr_t CallbackPtr, void *ArgPtr)
void CFE_SBR_ForEachRouteId(CFE_SBR_CallbackPtr_t CallbackPtr, void *ArgPtr, CFE_SBR_Throttle_t *ThrottlePtr)
{
CFE_SB_RouteId_Atom_t routeidx;
CFE_SB_MsgId_Atom_t startidx = 0;
CFE_SB_MsgId_Atom_t endidx = CFE_SBR_RDATA.RouteIdxTop;

/* Throttle if pointer supplied */
if (ThrottlePtr != NULL)
{
startidx = ThrottlePtr->StartIndex;

/* Return next index of zero if full range is processed */
ThrottlePtr->NextIndex = 0;

if ((startidx + ThrottlePtr->MaxLoop) < endidx)
{
endidx = startidx + ThrottlePtr->MaxLoop;
ThrottlePtr->NextIndex = endidx;
}
}

for (routeidx = 0; routeidx < CFE_SBR_RDATA.RouteIdxTop; routeidx++)
for (routeidx = startidx; routeidx < endidx; routeidx++)
{
(*CallbackPtr)(CFE_SBR_ValueToRouteId(routeidx), ArgPtr);
}
Expand Down

0 comments on commit fa8a216

Please sign in to comment.