diff --git a/tempesta_fw/cache.c b/tempesta_fw/cache.c index 5efb103ce7..9fb9cfdfcb 100644 --- a/tempesta_fw/cache.c +++ b/tempesta_fw/cache.c @@ -1673,7 +1673,7 @@ static void tfw_cache_ipi(struct irq_work *work) { TfwWorkTasklet *ct = container_of(work, TfwWorkTasklet, ipi_work); - + clear_bit(TFW_QUEUE_IPI, &ct->wq.flags); tasklet_schedule(&ct->tasklet); } @@ -1726,7 +1726,6 @@ tfw_cache_process(TfwHttpMsg *msg, tfw_http_cache_cb_t action) TFW_DBG2("Cache: schedule tasklet w/ work: to_cpu=%d from_cpu=%d" " msg=%p key=%lx\n", cpu, smp_processor_id(), cw.msg, key); - if (tfw_wq_push(&ct->wq, &cw, cpu, &ct->ipi_work, tfw_cache_ipi)) { TFW_WARN("Cache work queue overrun: [%s]\n", resp ? "response" : "request"); @@ -1743,10 +1742,15 @@ static void tfw_wq_tasklet(unsigned long data) { TfwWorkTasklet *ct = (TfwWorkTasklet *)data; + TfwRBQueue *wq = &ct->wq; TfwCWork cw; - while (!tfw_wq_pop(&ct->wq, &cw)) + while (!tfw_wq_pop(wq, &cw)) tfw_cache_do_action(cw.msg, cw.action); + + TFW_WQ_IPI_SYNC(tfw_wq_size, wq); + + tasklet_schedule(&ct->tasklet); } /** diff --git a/tempesta_fw/http_sched_ratio.c b/tempesta_fw/http_sched_ratio.c index b5228407e1..8991d36efe 100644 --- a/tempesta_fw/http_sched_ratio.c +++ b/tempesta_fw/http_sched_ratio.c @@ -1011,8 +1011,14 @@ tfw_sched_ratio_del_grp(TfwSrvGroup *sg) if (!ratio) return; /* - * Make sure the timer doesn't re-arms itself. This - * also ensures that no more RCU callbacks are created. + * Make sure the timer doesn't re-arms itself. This also ensures + * that no more RCU callbacks are created. + * + * TODO: check if the memory barriers is redundand here (and in + * several similar places as well as in corresponding timer + * callbacks); also it seems that function 'del_timer_sync' + * process correctly the situation with reactivation of timer + * from callback, so perhaps we don't need 'rearm' flag at all. */ if (sg->flags & (TFW_SG_F_SCHED_RATIO_DYNAMIC | TFW_SG_F_SCHED_RATIO_PREDICT)) diff --git a/tempesta_fw/sock.c b/tempesta_fw/sock.c index 8e89c84ee7..41b6b50b9f 100644 --- a/tempesta_fw/sock.c +++ b/tempesta_fw/sock.c @@ -187,6 +187,8 @@ do { \ static void ss_ipi(struct irq_work *work) { + TfwRBQueue *wq = &per_cpu(si_wq, smp_processor_id()); + clear_bit(TFW_QUEUE_IPI, &wq->flags); raise_softirq(NET_TX_SOFTIRQ); } @@ -200,6 +202,7 @@ ss_turnstile_push(long ticket, SsWork *sw, int cpu) { struct irq_work *iw = &per_cpu(ipi_work, cpu); SsCloseBacklog *cb = &per_cpu(close_backlog, cpu); + TfwRBQueue *wq = &per_cpu(si_wq, cpu); SsCblNode *cn; cn = kmem_cache_alloc(ss_cbacklog_cache, GFP_ATOMIC); @@ -208,13 +211,18 @@ ss_turnstile_push(long ticket, SsWork *sw, int cpu) cn->ticket = ticket; memcpy(&cn->sw, sw, sizeof(*sw)); spin_lock_bh(&cb->lock); - list_add(&cn->list, &cb->head); + list_add_tail(&cn->list, &cb->head); cb->size++; if (cb->turn > ticket) cb->turn = ticket; spin_unlock_bh(&cb->lock); - tfw_raise_softirq(cpu, iw, ss_ipi); + /* + * We do not need explicit memory barriers after + * spinlock operation. + */ + if (test_bit(TFW_QUEUE_IPI, &wq->flags)) + tfw_raise_softirq(cpu, iw, ss_ipi); return 0; } @@ -254,9 +262,8 @@ ss_wq_push(SsWork *sw, int cpu) } static int -ss_wq_pop(SsWork *sw, long *ticket) +ss_wq_pop(TfwRBQueue *wq, SsWork *sw, long *ticket) { - TfwRBQueue *wq = this_cpu_ptr(&si_wq); SsCloseBacklog *cb = this_cpu_ptr(&close_backlog); /* @@ -296,7 +303,7 @@ ss_wq_pop(SsWork *sw, long *ticket) } static size_t -__ss_close_q_sz(int cpu) +ss_wq_size(int cpu) { TfwRBQueue *wq = &per_cpu(si_wq, cpu); SsCloseBacklog *cb = &per_cpu(close_backlog, cpu); @@ -305,9 +312,11 @@ __ss_close_q_sz(int cpu) } static size_t -ss_close_q_sz(void) +ss_wq_local_size(TfwRBQueue *wq) { - return __ss_close_q_sz(smp_processor_id()); + SsCloseBacklog *cb = this_cpu_ptr(&close_backlog); + + return tfw_wq_size(wq) + cb->size; } /* @@ -1308,17 +1317,18 @@ static void ss_tx_action(void) { SsWork sw; + int budget; struct sk_buff *skb; + TfwRBQueue *wq = this_cpu_ptr(&si_wq); long ticket = 0; - int budget; /* * @budget limits the loop to prevent live lock on constantly arriving * new items. We use some small integer as a lower bound to catch just * ariving items. */ - budget = max(10UL, ss_close_q_sz()); - while ((!ss_active() || budget--) && !ss_wq_pop(&sw, &ticket)) { + budget = max(10UL, ss_wq_local_size(wq)); + while ((!ss_active() || budget--) && !ss_wq_pop(wq, &sw, &ticket)) { struct sock *sk = sw.sk; bh_lock_sock(sk); @@ -1358,11 +1368,16 @@ ss_tx_action(void) /* * Rearm softirq for local CPU if there are more jobs to do. - * ss_synchronize() is responsible for raising the softirq if there are - * more jobs in the work queue or the backlog. + * If all jobs are finished, and work queue and backlog are + * empty, then enable IPI generation by producers (disabled + * in 'ss_ipi()' handler). + * ss_synchronize() is responsible for raising the softirq + * if there are more jobs in the work queue or the backlog. */ - if (!budget) - raise_softirq(NET_TX_SOFTIRQ); + if (budget) + TFW_WQ_IPI_SYNC(ss_wq_local_size, wq); + + raise_softirq(NET_TX_SOFTIRQ); } /* @@ -1439,7 +1454,7 @@ ss_synchronize(void) while (1) { for_each_online_cpu(cpu) { int n_conn = atomic64_read(&per_cpu(__ss_act_cnt, cpu)); - int n_q = __ss_close_q_sz(cpu); + int n_q = ss_wq_size(cpu); if (n_conn + n_q) { irq_work_sync(&per_cpu(ipi_work, cpu)); schedule(); /* let softirq finish works */ diff --git a/tempesta_fw/work_queue.c b/tempesta_fw/work_queue.c index ba40344d18..67b5752f7b 100644 --- a/tempesta_fw/work_queue.c +++ b/tempesta_fw/work_queue.c @@ -49,6 +49,7 @@ tfw_wq_init(TfwRBQueue *q, int node) q->last_head = 0; atomic64_set(&q->head, 0); atomic64_set(&q->tail, 0); + set_bit(TFW_QUEUE_IPI, &q->flags); q->array = kmalloc_node(QSZ * WQ_ITEM_SZ, GFP_KERNEL, node); if (!q->array) { diff --git a/tempesta_fw/work_queue.h b/tempesta_fw/work_queue.h index f5a073af8f..df072c9a71 100644 --- a/tempesta_fw/work_queue.h +++ b/tempesta_fw/work_queue.h @@ -38,13 +38,37 @@ typedef struct { long last_head; atomic64_t head ____cacheline_aligned; atomic64_t tail ____cacheline_aligned; + unsigned long flags; } TfwRBQueue; +enum { + /* Enable IPI generation. */ + TFW_QUEUE_IPI = 0 +}; + +#define TFW_WQ_IPI_SYNC(size_cb, wq) \ +do { \ + set_bit(TFW_QUEUE_IPI, &(wq)->flags); \ + smp_mb__after_atomic(); \ + if (!size_cb(wq)) \ + return; \ + clear_bit(TFW_QUEUE_IPI, &(wq)->flags); \ +} while (0) + int tfw_wq_init(TfwRBQueue *wq, int node); void tfw_wq_destroy(TfwRBQueue *wq); long __tfw_wq_push(TfwRBQueue *wq, void *ptr); int tfw_wq_pop_ticket(TfwRBQueue *wq, void *buf, long *ticket); +static inline int +tfw_wq_size(TfwRBQueue *q) +{ + long t = atomic64_read(&q->tail); + long h = atomic64_read(&q->head); + + return t > h ? 0 : h - t; +} + static inline void tfw_raise_softirq(int cpu, struct irq_work *work, void (*local_cpu_cb)(struct irq_work *)) @@ -62,8 +86,14 @@ tfw_wq_push(TfwRBQueue *q, void *ptr, int cpu, struct irq_work *work, long ticket = __tfw_wq_push(q, ptr); if (unlikely(ticket)) return ticket; + /* + * The atomic operation is 'atomic64_cmpxchg()' in + * '__tfw_wq_push()' above. + */ + smp_mb__after_atomic(); - tfw_raise_softirq(cpu, work, local_cpu_cb); + if (test_bit(TFW_QUEUE_IPI, &q->flags)) + tfw_raise_softirq(cpu, work, local_cpu_cb); return 0; } @@ -74,13 +104,4 @@ tfw_wq_pop(TfwRBQueue *wq, void *buf) return tfw_wq_pop_ticket(wq, buf, NULL); } -static inline int -tfw_wq_size(TfwRBQueue *q) -{ - long t = atomic64_read(&q->tail); - long h = atomic64_read(&q->head); - - return t > h ? 0 : h - t; -} - #endif /* __TFW_WORK_QUEUE_H__ */