Skip to content

Commit

Permalink
receive: use ring buffer for incoming handshakes
Browse files Browse the repository at this point in the history
Apparently the spinlock on incoming_handshake's skb_queue is highly
contended, and a torrent of handshake or cookie packets can bring the
data plane to its knees, simply by virtue of enqueueing the handshake
packets to be processed asynchronously. So, we try switching this to a
ring buffer to hopefully have less lock contention. This alleviates the
problem somewhat, though it still isn't perfect, so future patches will
have to improve this further. However, it at least doesn't completely
diminish the data plane.

Reported-by: Streun Fabio <fstreun@student.ethz.ch>
Reported-by: Joel Wanner <joel.wanner@inf.ethz.ch>
Signed-off-by: Jason A. Donenfeld <Jason@zx2c4.com>
Change-Id: I00ce0cdd8205925d3ca035e584730566c76ba558
(cherry picked from commit 9c2b7161d6de25e664eaf0a0e5a186e662535aed)
Signed-off-by: TogoFire <togofire@mailfence.com>
  • Loading branch information
zx2c4 authored and TogoFire committed Jun 9, 2023
1 parent 3992373 commit 93d3bdc
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 43 deletions.
36 changes: 18 additions & 18 deletions drivers/net/wireguard/device.c
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ static int wg_stop(struct net_device *dev)
{
struct wg_device *wg = netdev_priv(dev);
struct wg_peer *peer;
struct sk_buff *skb;

mutex_lock(&wg->device_update_lock);
list_for_each_entry(peer, &wg->peer_list, peer_list) {
Expand All @@ -116,7 +117,9 @@ static int wg_stop(struct net_device *dev)
wg_noise_reset_last_sent_handshake(&peer->last_sent_handshake);
}
mutex_unlock(&wg->device_update_lock);
skb_queue_purge(&wg->incoming_handshakes);
while ((skb = ptr_ring_consume(&wg->handshake_queue.ring)) != NULL)
kfree_skb(skb);
atomic_set(&wg->handshake_queue_len, 0);
wg_socket_reinit(wg, NULL, NULL);
return 0;
}
Expand Down Expand Up @@ -243,14 +246,13 @@ static void wg_destruct(struct net_device *dev)
destroy_workqueue(wg->handshake_receive_wq);
destroy_workqueue(wg->handshake_send_wq);
destroy_workqueue(wg->packet_crypt_wq);
wg_packet_queue_free(&wg->decrypt_queue);
wg_packet_queue_free(&wg->encrypt_queue);
wg_packet_queue_free(&wg->handshake_queue, true);
wg_packet_queue_free(&wg->decrypt_queue, false);
wg_packet_queue_free(&wg->encrypt_queue, false);
rcu_barrier(); /* Wait for all the peers to be actually freed. */
wg_ratelimiter_uninit();
memzero_explicit(&wg->static_identity, sizeof(wg->static_identity));
skb_queue_purge(&wg->incoming_handshakes);
free_percpu(dev->tstats);
free_percpu(wg->incoming_handshakes_worker);
kvfree(wg->index_hashtable);
kvfree(wg->peer_hashtable);
mutex_unlock(&wg->device_update_lock);
Expand Down Expand Up @@ -312,7 +314,6 @@ static int wg_newlink(struct net *src_net, struct net_device *dev,
init_rwsem(&wg->static_identity.lock);
mutex_init(&wg->socket_update_lock);
mutex_init(&wg->device_update_lock);
skb_queue_head_init(&wg->incoming_handshakes);
wg_allowedips_init(&wg->peer_allowedips);
wg_cookie_checker_init(&wg->cookie_checker, wg);
INIT_LIST_HEAD(&wg->peer_list);
Expand All @@ -330,16 +331,10 @@ static int wg_newlink(struct net *src_net, struct net_device *dev,
if (!dev->tstats)
goto err_free_index_hashtable;

wg->incoming_handshakes_worker =
wg_packet_percpu_multicore_worker_alloc(
wg_packet_handshake_receive_worker, wg);
if (!wg->incoming_handshakes_worker)
goto err_free_tstats;

wg->handshake_receive_wq = alloc_workqueue("wg-kex-%s",
WQ_CPU_INTENSIVE | WQ_FREEZABLE, 0, dev->name);
if (!wg->handshake_receive_wq)
goto err_free_incoming_handshakes;
goto err_free_tstats;

wg->handshake_send_wq = alloc_workqueue("wg-kex-%s",
WQ_UNBOUND | WQ_FREEZABLE, 0, dev->name);
Expand All @@ -361,10 +356,15 @@ static int wg_newlink(struct net *src_net, struct net_device *dev,
if (ret < 0)
goto err_free_encrypt_queue;

ret = wg_ratelimiter_init();
ret = wg_packet_queue_init(&wg->handshake_queue, wg_packet_handshake_receive_worker,
MAX_QUEUED_INCOMING_HANDSHAKES);
if (ret < 0)
goto err_free_decrypt_queue;

ret = wg_ratelimiter_init();
if (ret < 0)
goto err_free_handshake_queue;

ret = register_netdevice(dev);
if (ret < 0)
goto err_uninit_ratelimiter;
Expand All @@ -381,18 +381,18 @@ static int wg_newlink(struct net *src_net, struct net_device *dev,

err_uninit_ratelimiter:
wg_ratelimiter_uninit();
err_free_handshake_queue:
wg_packet_queue_free(&wg->handshake_queue, false);
err_free_decrypt_queue:
wg_packet_queue_free(&wg->decrypt_queue);
wg_packet_queue_free(&wg->decrypt_queue, false);
err_free_encrypt_queue:
wg_packet_queue_free(&wg->encrypt_queue);
wg_packet_queue_free(&wg->encrypt_queue, false);
err_destroy_packet_crypt:
destroy_workqueue(wg->packet_crypt_wq);
err_destroy_handshake_send:
destroy_workqueue(wg->handshake_send_wq);
err_destroy_handshake_receive:
destroy_workqueue(wg->handshake_receive_wq);
err_free_incoming_handshakes:
free_percpu(wg->incoming_handshakes_worker);
err_free_tstats:
free_percpu(dev->tstats);
err_free_index_hashtable:
Expand Down
9 changes: 3 additions & 6 deletions drivers/net/wireguard/device.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,21 +39,18 @@ struct prev_queue {

struct wg_device {
struct net_device *dev;
struct crypt_queue encrypt_queue, decrypt_queue;
struct crypt_queue encrypt_queue, decrypt_queue, handshake_queue;
struct sock __rcu *sock4, *sock6;
struct net __rcu *creating_net;
struct noise_static_identity static_identity;
struct workqueue_struct *handshake_receive_wq, *handshake_send_wq;
struct workqueue_struct *packet_crypt_wq;
struct sk_buff_head incoming_handshakes;
int incoming_handshake_cpu;
struct multicore_worker __percpu *incoming_handshakes_worker;
struct workqueue_struct *packet_crypt_wq,*handshake_receive_wq, *handshake_send_wq;
struct cookie_checker cookie_checker;
struct pubkey_hashtable *peer_hashtable;
struct index_hashtable *index_hashtable;
struct allowedips peer_allowedips;
struct mutex device_update_lock, socket_update_lock;
struct list_head device_list, peer_list;
atomic_t handshake_queue_len;
unsigned int num_peers, device_update_gen;
u32 fwmark;
u16 incoming_port;
Expand Down
6 changes: 3 additions & 3 deletions drivers/net/wireguard/queueing.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,11 @@ int wg_packet_queue_init(struct crypt_queue *queue, work_func_t function,
return 0;
}

void wg_packet_queue_free(struct crypt_queue *queue)
void wg_packet_queue_free(struct crypt_queue *queue, bool purge)
{
free_percpu(queue->worker);
WARN_ON(!__ptr_ring_empty(&queue->ring));
ptr_ring_cleanup(&queue->ring, NULL);
WARN_ON(!purge && !__ptr_ring_empty(&queue->ring));
ptr_ring_cleanup(&queue->ring, purge ? (void(*)(void*))kfree_skb : NULL);
}

#define NEXT(skb) ((skb)->prev)
Expand Down
2 changes: 1 addition & 1 deletion drivers/net/wireguard/queueing.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ struct sk_buff;
/* queueing.c APIs: */
int wg_packet_queue_init(struct crypt_queue *queue, work_func_t function,
unsigned int len);
void wg_packet_queue_free(struct crypt_queue *queue);
void wg_packet_queue_free(struct crypt_queue *queue, bool purge);
struct multicore_worker __percpu *
wg_packet_percpu_multicore_worker_alloc(work_func_t function, void *ptr);

Expand Down
27 changes: 12 additions & 15 deletions drivers/net/wireguard/receive.c
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,8 @@ static void wg_receive_handshake_packet(struct wg_device *wg,
return;
}

under_load = skb_queue_len(&wg->incoming_handshakes) >=
MAX_QUEUED_INCOMING_HANDSHAKES / 8;
under_load = atomic_read(&wg->handshake_queue_len) >=
MAX_QUEUED_INCOMING_HANDSHAKES / 8;
if (under_load) {
last_under_load = ktime_get_coarse_boottime_ns();
} else if (last_under_load) {
Expand Down Expand Up @@ -213,13 +213,14 @@ static void wg_receive_handshake_packet(struct wg_device *wg,

void wg_packet_handshake_receive_worker(struct work_struct *work)
{
struct wg_device *wg = container_of(work, struct multicore_worker,
work)->ptr;
struct crypt_queue *queue = container_of(work, struct multicore_worker, work)->ptr;
struct wg_device *wg = container_of(queue, struct wg_device, handshake_queue);
struct sk_buff *skb;

while ((skb = skb_dequeue(&wg->incoming_handshakes)) != NULL) {
while ((skb = ptr_ring_consume_bh(&queue->ring)) != NULL) {
wg_receive_handshake_packet(wg, skb);
dev_kfree_skb(skb);
atomic_dec(&wg->handshake_queue_len);
cond_resched();
}
}
Expand Down Expand Up @@ -563,21 +564,17 @@ void wg_packet_receive(struct wg_device *wg, struct sk_buff *skb)
case cpu_to_le32(MESSAGE_HANDSHAKE_RESPONSE):
case cpu_to_le32(MESSAGE_HANDSHAKE_COOKIE): {
int cpu;

if (skb_queue_len(&wg->incoming_handshakes) >
MAX_QUEUED_INCOMING_HANDSHAKES ||
unlikely(!rng_is_initialized())) {
if (unlikely(!rng_is_initialized() ||
ptr_ring_produce_bh(&wg->handshake_queue.ring, skb))) {
net_dbg_skb_ratelimited("%s: Dropping handshake packet from %pISpfsc\n",
wg->dev->name, skb);
goto err;
}
skb_queue_tail(&wg->incoming_handshakes, skb);
/* Queues up a call to packet_process_queued_handshake_
* packets(skb):
*/
cpu = wg_cpumask_next_online(&wg->incoming_handshake_cpu);
atomic_inc(&wg->handshake_queue_len);
cpu = wg_cpumask_next_online(&wg->handshake_queue.last_cpu);
/* Queues up a call to packet_process_queued_handshake_packets(skb): */
queue_work_on(cpu, wg->handshake_receive_wq,
&per_cpu_ptr(wg->incoming_handshakes_worker, cpu)->work);
&per_cpu_ptr(wg->handshake_queue.worker, cpu)->work);
break;
}
case cpu_to_le32(MESSAGE_DATA):
Expand Down

0 comments on commit 93d3bdc

Please sign in to comment.