Skip to content

Commit

Permalink
core: mbox: introduce thread decoupled message queues
Browse files Browse the repository at this point in the history
  • Loading branch information
kaspar030 committed Jul 18, 2016
1 parent 9bfea94 commit 8e9c5f7
Show file tree
Hide file tree
Showing 4 changed files with 297 additions and 4 deletions.
1 change: 1 addition & 0 deletions Makefile.pseudomodules
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ PSEUDOMODULES += conn_ip
PSEUDOMODULES += conn_tcp
PSEUDOMODULES += conn_udp
PSEUDOMODULES += core_msg
PSEUDOMODULES += core_mbox
PSEUDOMODULES += core_thread_flags
PSEUDOMODULES += emb6_router
PSEUDOMODULES += gnrc_ipv6_default
Expand Down
163 changes: 163 additions & 0 deletions core/include/mbox.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
/*
* Copyright (C) 2016 Kaspar Schleiser <kaspar@schleiser.de>
*
* This file is subject to the terms and conditions of the GNU Lesser
* General Public License v2.1. See the file LICENSE in the top level
* directory for more details.
*/

/**
* @defgroup core_mbox Mailboxes
* @ingroup core
* @brief Mailbox implementation
*
* @{
*
* @file
* @brief Mailbox API
*
* @author Kaspar Schleiser <kaspar@schleiser.de>
*/

#ifndef MBOX_H
#define MBOX_H

#include "list.h"
#include "cib.h"
#include "msg.h"

#ifdef __cplusplus
extern "C" {
#endif

/** Static initializer for mbox objects */
#define MBOX_INIT(queue, queue_size) {{0}, {0}, CIB_INIT(queue_size), queue, NULL}

/**
* @brief Mailbox struct definition
*/
typedef struct {
list_node_t readers; /**< list of threads waiting for message */
list_node_t writers; /**< list of threads waiting to send */
cib_t cib; /**< cib for msg array */
msg_t *msg_array; /**< ptr to array of msg queue */
} mbox_t;

enum {
NON_BLOCKING = 0, /**< non-blocking mode */
BLOCKING, /**< blocking mode */
};

/**
* @brief Initialize mbox object
*
* @note The message queue size must be a power of two!
*
* @param[in] mbox ptr to mailbox to initialize
* @param[in] queue array of msg_t used as queue
* @param[in] queue_size number of msg_t objects in queue
*/
static inline void mbox_init(mbox_t *mbox, msg_t *queue, unsigned int queue_size)
{
mbox_t m = MBOX_INIT(queue, queue_size);
*mbox = m;
}

/**
* @brief Add message to mailbox
*
* If the mailbox is full, this fuction will return right away.
*
* @internal
*
* @param[in] mbox ptr to mailbox to operate on
* @param[in] msg ptr to message that will be copied into mailbox
* @param[in] blocking block if 1, don't block if 0
*
* @return 1 if msg could be delivered
* @return 0 otherwise
*/
int _mbox_put(mbox_t *mbox, msg_t *msg, int blocking);

/**
* @brief Get message from mailbox
*
* If the mailbox is empty, this fuction will return right away.
*
* @internal
*
* @param[in] mbox ptr to mailbox to operate on
* @param[in] msg ptr to storage for retrieved message
* @param[in] blocking block if 1, don't block if 0
*
* @return 1 if msg could be retrieved
* @return 0 otherwise
*/
int _mbox_get(mbox_t *mbox, msg_t *msg, int blocking);

/**
* @brief Add message to mailbox
*
* If the mailbox is full, this fuction will block until space becomes
* available.
*
* @param[in] mbox ptr to mailbox to operate on
* @param[in] msg ptr to message that will be copied into mailbox
*/
static inline void mbox_put(mbox_t *mbox, msg_t *msg)
{
_mbox_put(mbox, msg, BLOCKING);
}

/**
* @brief Add message to mailbox
*
* If the mailbox is full, this fuction will return right away.
*
* @param[in] mbox ptr to mailbox to operate on
* @param[in] msg ptr to message that will be copied into mailbox
*
* @return 1 if msg could be delivered
* @return 0 otherwise
*/
static inline int mbox_try_put(mbox_t *mbox, msg_t *msg)
{
return _mbox_put(mbox, msg, NON_BLOCKING);
}

/**
* @brief Get message from mailbox
*
* If the mailbox is empty, this fuction will block until a message becomes
* available.
*
* @param[in] mbox ptr to mailbox to operate on
* @param[in] msg ptr to storage for retrieved message
*/
static inline void mbox_get(mbox_t *mbox, msg_t *msg)
{
_mbox_get(mbox, msg, BLOCKING);
}

/**
* @brief Get message from mailbox
*
* If the mailbox is empty, this fuction will return right away.
*
* @param[in] mbox ptr to mailbox to operate on
* @param[in] msg ptr to storage for retrieved message
*
* @return 1 if msg could be retrieved
* @return 0 otherwise
*/
static inline int mbox_try_get(mbox_t *mbox, msg_t *msg)
{
return _mbox_get(mbox, msg, NON_BLOCKING);
}

#ifdef __cplusplus
}
#endif

/** @} */
#endif /* MBOX_H */
11 changes: 7 additions & 4 deletions core/include/thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,16 @@
#define STATUS_REPLY_BLOCKED 5 /**< waiting for a message response */
#define STATUS_FLAG_BLOCKED_ANY 6 /**< waiting for any flag from flag_mask*/
#define STATUS_FLAG_BLOCKED_ALL 7 /**< waiting for all flags in flag_mask */
#define STATUS_MBOX_BLOCKED 8 /**< waiting for get/put on mbox */
/** @} */

/**
* @brief These have to be on a run queue.
* @{*/
#define STATUS_ON_RUNQUEUE STATUS_RUNNING /**< to check if on run queue:
`st >= STATUS_ON_RUNQUEUE` */
#define STATUS_RUNNING 8 /**< currently running */
#define STATUS_PENDING 9 /**< waiting to be scheduled to run */
#define STATUS_RUNNING 9 /**< currently running */
#define STATUS_PENDING 10 /**< waiting to be scheduled to run */
/** @} */
/** @} */

Expand All @@ -84,8 +85,10 @@ struct _thread {

clist_node_t rq_entry; /**< run queue entry */

#if defined(MODULE_CORE_MSG) || defined(MODULE_CORE_THREAD_FLAGS)
void *wait_data; /**< used by msg and thread flags */
#if defined(MODULE_CORE_MSG) || defined(MODULE_CORE_THREAD_FLAGS) \
|| defined(MODULE_CORE_MBOX)
void *wait_data; /**< used by msg, mbox and thread
flags */
#endif
#if defined(MODULE_CORE_MSG)
list_node_t msg_waiters; /**< threads waiting on message */
Expand Down
126 changes: 126 additions & 0 deletions core/mbox.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Copyright (C) 2016 Kaspar Schleiser <kaspar@schleiser.de>
*
* This file is subject to the terms and conditions of the GNU Lesser
* General Public License v2.1. See the file LICENSE in the top level
* directory for more details.
*/

/**
* @ingroup core_mbox
* @{
*
* @file
* @brief mailbox implementation
*
* @author Kaspar Schleiser <kaspar@schleiser.de>
*
* @}
*/

#include <string.h>

#include "mbox.h"
#include "irq.h"
#include "sched.h"
#include "thread.h"

#define ENABLE_DEBUG (0)
#include "debug.h"

#ifdef MODULE_CORE_MBOX

static void _wake_waiter(thread_t *thread, unsigned irqstate)
{
sched_set_status(thread, STATUS_PENDING);

DEBUG("mbox: Thread %"PRIkernel_pid": _wake_waiter(): waking up "
"%"PRIkernel_pid".\n", sched_active_pid, thread->pid);

uint16_t process_priority = thread->priority;
irq_restore(irqstate);
sched_switch(process_priority);
}

static void _wait(list_node_t *wait_list, unsigned irqstate)
{
DEBUG("mbox: Thread %"PRIkernel_pid" _wait(): going blocked.\n",
sched_active_pid);

thread_t *me = (thread_t*) sched_active_thread;
sched_set_status(me, STATUS_MBOX_BLOCKED);
thread_add_to_list(wait_list, me);
irq_restore(irqstate);
thread_yield();

DEBUG("mbox: Thread %"PRIkernel_pid" _wait(): woke up.\n",
sched_active_pid);
}

int _mbox_put(mbox_t *mbox, msg_t *msg, int blocking)
{
unsigned irqstate = irq_disable();

list_node_t *next = (list_node_t*) list_remove_head(&mbox->readers);
if (next) {
DEBUG("mbox: Thread %"PRIkernel_pid" mbox 0x%08x: _tryput(): "
"there's a waiter.\n", sched_active_pid, (unsigned)mbox);
thread_t *thread = container_of((clist_node_t*)next, thread_t, rq_entry);
*(msg_t *)thread->wait_data = *msg;
_wake_waiter(thread, irqstate);
return 1;
}
else {
if (cib_full(&mbox->cib)) {
if (blocking) {
_wait(&mbox->writers, irqstate);
irqstate = irq_disable();
}
else {
irq_restore(irqstate);
return 0;
}
}

DEBUG("mbox: Thread %"PRIkernel_pid" mbox 0x%08x: _tryput(): "
"queued message.\n", sched_active_pid, (unsigned)mbox);
msg->sender_pid = sched_active_pid;
/* copy msg into queue */
mbox->msg_array[cib_put_unsafe(&mbox->cib)] = *msg;
irq_restore(irqstate);
return 1;
}
}

int _mbox_get(mbox_t *mbox, msg_t *msg, int blocking)
{
unsigned irqstate = irq_disable();

if (cib_avail(&mbox->cib)) {
DEBUG("mbox: Thread %"PRIkernel_pid" mbox 0x%08x: _tryget(): "
"got queued message.\n", sched_active_pid, (unsigned)mbox);
/* copy msg from queue */
*msg = mbox->msg_array[cib_get_unsafe(&mbox->cib)];
list_node_t *next = (list_node_t*) list_remove_head(&mbox->writers);
if (next) {
thread_t *thread = container_of((clist_node_t*)next, thread_t, rq_entry);
_wake_waiter(thread, irqstate);
}
else {
irq_restore(irqstate);
}
return 1;
}
else if (blocking) {
sched_active_thread->wait_data = (void*)msg;
_wait(&mbox->readers, irqstate);
/* sender has copied message */
return 1;
}
else {
irq_restore(irqstate);
return 0;
}
}

#endif /* MODULE_CORE_MBOX */

0 comments on commit 8e9c5f7

Please sign in to comment.