Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix #806: Reduce count of IPI during work_queue processing. #1072

Merged
merged 5 commits into from
Oct 15, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions tempesta_fw/cache.c
Original file line number Diff line number Diff line change
Expand Up @@ -1673,7 +1673,7 @@ static void
tfw_cache_ipi(struct irq_work *work)
{
TfwWorkTasklet *ct = container_of(work, TfwWorkTasklet, ipi_work);

clear_bit(TFW_QUEUE_IPI, &ct->wq.flags);
tasklet_schedule(&ct->tasklet);
}

Expand Down Expand Up @@ -1726,7 +1726,6 @@ tfw_cache_process(TfwHttpMsg *msg, tfw_http_cache_cb_t action)
TFW_DBG2("Cache: schedule tasklet w/ work: to_cpu=%d from_cpu=%d"
" msg=%p key=%lx\n", cpu, smp_processor_id(),
cw.msg, key);

if (tfw_wq_push(&ct->wq, &cw, cpu, &ct->ipi_work, tfw_cache_ipi)) {
TFW_WARN("Cache work queue overrun: [%s]\n",
resp ? "response" : "request");
Expand All @@ -1743,10 +1742,15 @@ static void
tfw_wq_tasklet(unsigned long data)
{
TfwWorkTasklet *ct = (TfwWorkTasklet *)data;
TfwRBQueue *wq = &ct->wq;
TfwCWork cw;

while (!tfw_wq_pop(&ct->wq, &cw))
while (!tfw_wq_pop(wq, &cw))
tfw_cache_do_action(cw.msg, cw.action);

TFW_WQ_IPI_SYNC(tfw_wq_size, wq);

tasklet_schedule(&ct->tasklet);
}

/**
Expand Down
10 changes: 8 additions & 2 deletions tempesta_fw/http_sched_ratio.c
Original file line number Diff line number Diff line change
Expand Up @@ -1011,8 +1011,14 @@ tfw_sched_ratio_del_grp(TfwSrvGroup *sg)
if (!ratio)
return;
/*
* Make sure the timer doesn't re-arms itself. This
* also ensures that no more RCU callbacks are created.
* Make sure the timer doesn't re-arms itself. This also ensures
* that no more RCU callbacks are created.
*
* TODO: check if the memory barriers is redundand here (and in
* several similar places as well as in corresponding timer
* callbacks); also it seems that function 'del_timer_sync'
* process correctly the situation with reactivation of timer
* from callback, so perhaps we don't need 'rearm' flag at all.
*/
if (sg->flags & (TFW_SG_F_SCHED_RATIO_DYNAMIC
| TFW_SG_F_SCHED_RATIO_PREDICT))
Expand Down
45 changes: 30 additions & 15 deletions tempesta_fw/sock.c
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,8 @@ do { \
static void
ss_ipi(struct irq_work *work)
{
TfwRBQueue *wq = &per_cpu(si_wq, smp_processor_id());
clear_bit(TFW_QUEUE_IPI, &wq->flags);
raise_softirq(NET_TX_SOFTIRQ);
}

Expand All @@ -200,6 +202,7 @@ ss_turnstile_push(long ticket, SsWork *sw, int cpu)
{
struct irq_work *iw = &per_cpu(ipi_work, cpu);
SsCloseBacklog *cb = &per_cpu(close_backlog, cpu);
TfwRBQueue *wq = &per_cpu(si_wq, cpu);
SsCblNode *cn;

cn = kmem_cache_alloc(ss_cbacklog_cache, GFP_ATOMIC);
Expand All @@ -208,13 +211,18 @@ ss_turnstile_push(long ticket, SsWork *sw, int cpu)
cn->ticket = ticket;
memcpy(&cn->sw, sw, sizeof(*sw));
spin_lock_bh(&cb->lock);
list_add(&cn->list, &cb->head);
list_add_tail(&cn->list, &cb->head);
cb->size++;
if (cb->turn > ticket)
cb->turn = ticket;
spin_unlock_bh(&cb->lock);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, we don't need a barrier after spinlock operation, but having a comment here would be good in case if we remove the spinlock.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

tfw_raise_softirq(cpu, iw, ss_ipi);
/*
* We do not need explicit memory barriers after
* spinlock operation.
*/
if (test_bit(TFW_QUEUE_IPI, &wq->flags))
tfw_raise_softirq(cpu, iw, ss_ipi);

return 0;
}
Expand Down Expand Up @@ -254,9 +262,8 @@ ss_wq_push(SsWork *sw, int cpu)
}

static int
ss_wq_pop(SsWork *sw, long *ticket)
ss_wq_pop(TfwRBQueue *wq, SsWork *sw, long *ticket)
{
TfwRBQueue *wq = this_cpu_ptr(&si_wq);
SsCloseBacklog *cb = this_cpu_ptr(&close_backlog);

/*
Expand Down Expand Up @@ -296,7 +303,7 @@ ss_wq_pop(SsWork *sw, long *ticket)
}

static size_t
__ss_close_q_sz(int cpu)
ss_wq_size(int cpu)
{
TfwRBQueue *wq = &per_cpu(si_wq, cpu);
SsCloseBacklog *cb = &per_cpu(close_backlog, cpu);
Expand All @@ -305,9 +312,11 @@ __ss_close_q_sz(int cpu)
}

static size_t
ss_close_q_sz(void)
ss_wq_local_size(TfwRBQueue *wq)
{
return __ss_close_q_sz(smp_processor_id());
SsCloseBacklog *cb = this_cpu_ptr(&close_backlog);

return tfw_wq_size(wq) + cb->size;
}

/*
Expand Down Expand Up @@ -1308,17 +1317,18 @@ static void
ss_tx_action(void)
{
SsWork sw;
int budget;
struct sk_buff *skb;
TfwRBQueue *wq = this_cpu_ptr(&si_wq);
long ticket = 0;
int budget;

/*
* @budget limits the loop to prevent live lock on constantly arriving
* new items. We use some small integer as a lower bound to catch just
* ariving items.
*/
budget = max(10UL, ss_close_q_sz());
while ((!ss_active() || budget--) && !ss_wq_pop(&sw, &ticket)) {
budget = max(10UL, ss_wq_local_size(wq));
while ((!ss_active() || budget--) && !ss_wq_pop(wq, &sw, &ticket)) {
struct sock *sk = sw.sk;

bh_lock_sock(sk);
Expand Down Expand Up @@ -1358,11 +1368,16 @@ ss_tx_action(void)

/*
* Rearm softirq for local CPU if there are more jobs to do.
* ss_synchronize() is responsible for raising the softirq if there are
* more jobs in the work queue or the backlog.
* If all jobs are finished, and work queue and backlog are
* empty, then enable IPI generation by producers (disabled
* in 'ss_ipi()' handler).
* ss_synchronize() is responsible for raising the softirq
* if there are more jobs in the work queue or the backlog.
*/
if (!budget)
raise_softirq(NET_TX_SOFTIRQ);
if (budget)
TFW_WQ_IPI_SYNC(ss_wq_local_size, wq);

raise_softirq(NET_TX_SOFTIRQ);
}

/*
Expand Down Expand Up @@ -1439,7 +1454,7 @@ ss_synchronize(void)
while (1) {
for_each_online_cpu(cpu) {
int n_conn = atomic64_read(&per_cpu(__ss_act_cnt, cpu));
int n_q = __ss_close_q_sz(cpu);
int n_q = ss_wq_size(cpu);
if (n_conn + n_q) {
irq_work_sync(&per_cpu(ipi_work, cpu));
schedule(); /* let softirq finish works */
Expand Down
1 change: 1 addition & 0 deletions tempesta_fw/work_queue.c
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ tfw_wq_init(TfwRBQueue *q, int node)
q->last_head = 0;
atomic64_set(&q->head, 0);
atomic64_set(&q->tail, 0);
set_bit(TFW_QUEUE_IPI, &q->flags);

q->array = kmalloc_node(QSZ * WQ_ITEM_SZ, GFP_KERNEL, node);
if (!q->array) {
Expand Down
41 changes: 31 additions & 10 deletions tempesta_fw/work_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,37 @@ typedef struct {
long last_head;
atomic64_t head ____cacheline_aligned;
atomic64_t tail ____cacheline_aligned;
unsigned long flags;
} TfwRBQueue;

enum {
/* Enable IPI generation. */
TFW_QUEUE_IPI = 0
};

#define TFW_WQ_IPI_SYNC(size_cb, wq) \
do { \
set_bit(TFW_QUEUE_IPI, &(wq)->flags); \
smp_mb__after_atomic(); \
if (!size_cb(wq)) \
return; \
clear_bit(TFW_QUEUE_IPI, &(wq)->flags); \
} while (0)

int tfw_wq_init(TfwRBQueue *wq, int node);
void tfw_wq_destroy(TfwRBQueue *wq);
long __tfw_wq_push(TfwRBQueue *wq, void *ptr);
int tfw_wq_pop_ticket(TfwRBQueue *wq, void *buf, long *ticket);

static inline int
tfw_wq_size(TfwRBQueue *q)
{
long t = atomic64_read(&q->tail);
long h = atomic64_read(&q->head);

return t > h ? 0 : h - t;
}

static inline void
tfw_raise_softirq(int cpu, struct irq_work *work,
void (*local_cpu_cb)(struct irq_work *))
Expand All @@ -62,8 +86,14 @@ tfw_wq_push(TfwRBQueue *q, void *ptr, int cpu, struct irq_work *work,
long ticket = __tfw_wq_push(q, ptr);
if (unlikely(ticket))
return ticket;
/*
* The atomic operation is 'atomic64_cmpxchg()' in
* '__tfw_wq_push()' above.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why? atomic64_cmpxchg() is called very rarely, only on slow path. In fast path there are just atomic reads and writes: in good case there is write barrier, but in bad case there is no any barriers.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fast path take place only when the queue is full and tfw_wq_push exits with ticket - before IPI generation. In case of insertion new element into the queue - the atomic64_cmpxchg() must be called.

*/
smp_mb__after_atomic();

tfw_raise_softirq(cpu, work, local_cpu_cb);
if (test_bit(TFW_QUEUE_IPI, &q->flags))
tfw_raise_softirq(cpu, work, local_cpu_cb);

return 0;
}
Expand All @@ -74,13 +104,4 @@ tfw_wq_pop(TfwRBQueue *wq, void *buf)
return tfw_wq_pop_ticket(wq, buf, NULL);
}

static inline int
tfw_wq_size(TfwRBQueue *q)
{
long t = atomic64_read(&q->tail);
long h = atomic64_read(&q->head);

return t > h ? 0 : h - t;
}

#endif /* __TFW_WORK_QUEUE_H__ */