Skip to content

Commit

Permalink
Merge branch 'tipc_fragmentation'
Browse files Browse the repository at this point in the history
Erik Hugne says:

====================
tipc: message reassembly using fragment chain

We introduce a new reassembly algorithm that improves performance
and eliminates the risk of causing out-of-memory situations.

v3: -Use skb_try_coalesce, and revert to fraglist if this does not succeed.
    -Make sure reassembly list head is uncloned.

v2: -Rebased on Ying's indentation fix.
    -Node unlock call in msg_fragmenter case moved from patch #2 to #1.
     ('continue' with this lock held would cause spinlock recursion if only
      patch #1 is used)
====================

Signed-off-by: David S. Miller <davem@davemloft.net>
  • Loading branch information
davem330 committed Nov 7, 2013
2 parents b0db7b0 + a715b49 commit 95ed401
Show file tree
Hide file tree
Showing 6 changed files with 80 additions and 145 deletions.
16 changes: 11 additions & 5 deletions net/tipc/bcast.c
Original file line number Diff line number Diff line change
Expand Up @@ -480,18 +480,24 @@ void tipc_bclink_recv_pkt(struct sk_buff *buf)
tipc_node_unlock(node);
tipc_link_recv_bundle(buf);
} else if (msg_user(msg) == MSG_FRAGMENTER) {
int ret = tipc_link_recv_fragment(&node->bclink.defragm,
&buf, &msg);
if (ret < 0)
int ret;
ret = tipc_link_recv_fragment(&node->bclink.reasm_head,
&node->bclink.reasm_tail,
&buf);
if (ret == LINK_REASM_ERROR)
goto unlock;
spin_lock_bh(&bc_lock);
bclink_accept_pkt(node, seqno);
bcl->stats.recv_fragments++;
if (ret > 0)
if (ret == LINK_REASM_COMPLETE) {
bcl->stats.recv_fragmented++;
/* Point msg to inner header */
msg = buf_msg(buf);
spin_unlock_bh(&bc_lock);
goto receive;
}
spin_unlock_bh(&bc_lock);
tipc_node_unlock(node);
tipc_net_route_msg(buf);
} else if (msg_user(msg) == NAME_DISTRIBUTOR) {
spin_lock_bh(&bc_lock);
bclink_accept_pkt(node, seqno);
Expand Down
164 changes: 47 additions & 117 deletions net/tipc/link.c
Original file line number Diff line number Diff line change
Expand Up @@ -404,15 +404,9 @@ static void link_release_outqueue(struct tipc_link *l_ptr)
*/
void tipc_link_reset_fragments(struct tipc_link *l_ptr)
{
struct sk_buff *buf = l_ptr->defragm_buf;
struct sk_buff *next;

while (buf) {
next = buf->next;
kfree_skb(buf);
buf = next;
}
l_ptr->defragm_buf = NULL;
kfree_skb(l_ptr->reasm_head);
l_ptr->reasm_head = NULL;
l_ptr->reasm_tail = NULL;
}

/**
Expand Down Expand Up @@ -1649,15 +1643,18 @@ void tipc_recv_msg(struct sk_buff *head, struct tipc_bearer *b_ptr)
continue;
case MSG_FRAGMENTER:
l_ptr->stats.recv_fragments++;
ret = tipc_link_recv_fragment(&l_ptr->defragm_buf,
&buf, &msg);
if (ret == 1) {
ret = tipc_link_recv_fragment(&l_ptr->reasm_head,
&l_ptr->reasm_tail,
&buf);
if (ret == LINK_REASM_COMPLETE) {
l_ptr->stats.recv_fragmented++;
msg = buf_msg(buf);
goto deliver;
}
if (ret == -1)
l_ptr->next_in_no--;
break;
if (ret == LINK_REASM_ERROR)
tipc_link_reset(l_ptr);
tipc_node_unlock(n_ptr);
continue;
case CHANGEOVER_PROTOCOL:
type = msg_type(msg);
if (link_recv_changeover_msg(&l_ptr, &buf)) {
Expand Down Expand Up @@ -2341,115 +2338,48 @@ static int link_send_long_buf(struct tipc_link *l_ptr, struct sk_buff *buf)
return dsz;
}

/*
* A pending message being re-assembled must store certain values
* to handle subsequent fragments correctly. The following functions
* help storing these values in unused, available fields in the
* pending message. This makes dynamic memory allocation unnecessary.
*/
static void set_long_msg_seqno(struct sk_buff *buf, u32 seqno)
{
msg_set_seqno(buf_msg(buf), seqno);
}

static u32 get_fragm_size(struct sk_buff *buf)
{
return msg_ack(buf_msg(buf));
}

static void set_fragm_size(struct sk_buff *buf, u32 sz)
{
msg_set_ack(buf_msg(buf), sz);
}

static u32 get_expected_frags(struct sk_buff *buf)
{
return msg_bcast_ack(buf_msg(buf));
}

static void set_expected_frags(struct sk_buff *buf, u32 exp)
{
msg_set_bcast_ack(buf_msg(buf), exp);
}

/*
* tipc_link_recv_fragment(): Called with node lock on. Returns
* the reassembled buffer if message is complete.
*/
int tipc_link_recv_fragment(struct sk_buff **pending, struct sk_buff **fb,
struct tipc_msg **m)
{
struct sk_buff *prev = NULL;
struct sk_buff *fbuf = *fb;
struct tipc_msg *fragm = buf_msg(fbuf);
struct sk_buff *pbuf = *pending;
u32 long_msg_seq_no = msg_long_msgno(fragm);

*fb = NULL;

/* Is there an incomplete message waiting for this fragment? */
while (pbuf && ((buf_seqno(pbuf) != long_msg_seq_no) ||
(msg_orignode(fragm) != msg_orignode(buf_msg(pbuf))))) {
prev = pbuf;
pbuf = pbuf->next;
}

if (!pbuf && (msg_type(fragm) == FIRST_FRAGMENT)) {
struct tipc_msg *imsg = (struct tipc_msg *)msg_data(fragm);
u32 msg_sz = msg_size(imsg);
u32 fragm_sz = msg_data_sz(fragm);
u32 exp_fragm_cnt;
u32 max = TIPC_MAX_USER_MSG_SIZE + NAMED_H_SIZE;

if (msg_type(imsg) == TIPC_MCAST_MSG)
max = TIPC_MAX_USER_MSG_SIZE + MCAST_H_SIZE;
if (fragm_sz == 0 || msg_size(imsg) > max) {
kfree_skb(fbuf);
return 0;
}
exp_fragm_cnt = msg_sz / fragm_sz + !!(msg_sz % fragm_sz);
pbuf = tipc_buf_acquire(msg_size(imsg));
if (pbuf != NULL) {
pbuf->next = *pending;
*pending = pbuf;
skb_copy_to_linear_data(pbuf, imsg,
msg_data_sz(fragm));
/* Prepare buffer for subsequent fragments. */
set_long_msg_seqno(pbuf, long_msg_seq_no);
set_fragm_size(pbuf, fragm_sz);
set_expected_frags(pbuf, exp_fragm_cnt - 1);
} else {
pr_debug("Link unable to reassemble fragmented message\n");
kfree_skb(fbuf);
return -1;
}
kfree_skb(fbuf);
return 0;
} else if (pbuf && (msg_type(fragm) != FIRST_FRAGMENT)) {
u32 dsz = msg_data_sz(fragm);
u32 fsz = get_fragm_size(pbuf);
u32 crs = ((msg_fragm_no(fragm) - 1) * fsz);
u32 exp_frags = get_expected_frags(pbuf) - 1;
skb_copy_to_linear_data_offset(pbuf, crs,
msg_data(fragm), dsz);
kfree_skb(fbuf);

/* Is message complete? */
if (exp_frags == 0) {
if (prev)
prev->next = pbuf->next;
else
*pending = pbuf->next;
msg_reset_reroute_cnt(buf_msg(pbuf));
*fb = pbuf;
*m = buf_msg(pbuf);
return 1;
}
set_expected_frags(pbuf, exp_frags);
int tipc_link_recv_fragment(struct sk_buff **head, struct sk_buff **tail,
struct sk_buff **fbuf)
{
struct sk_buff *frag = *fbuf;
struct tipc_msg *msg = buf_msg(frag);
u32 fragid = msg_type(msg);
bool headstolen;
int delta;

skb_pull(frag, msg_hdr_sz(msg));
if (fragid == FIRST_FRAGMENT) {
if (*head || skb_unclone(frag, GFP_ATOMIC))
goto out_free;
*head = frag;
skb_frag_list_init(*head);
return 0;
} else if (skb_try_coalesce(*head, frag, &headstolen, &delta)) {
kfree_skb_partial(frag, headstolen);
} else {
if (!*head)
goto out_free;
if (!skb_has_frag_list(*head))
skb_shinfo(*head)->frag_list = frag;
else
(*tail)->next = frag;
*tail = frag;
(*head)->truesize += frag->truesize;
}
if (fragid == LAST_FRAGMENT) {
*fbuf = *head;
*tail = *head = NULL;
return LINK_REASM_COMPLETE;
}
kfree_skb(fbuf);
return 0;
out_free:
pr_warn_ratelimited("Link unable to reassemble fragmented message\n");
kfree_skb(*fbuf);
return LINK_REASM_ERROR;
}

static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tolerance)
Expand Down
20 changes: 14 additions & 6 deletions net/tipc/link.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@
#include "msg.h"
#include "node.h"

/*
* Link reassembly status codes
*/
#define LINK_REASM_ERROR -1
#define LINK_REASM_COMPLETE 1

/*
* Out-of-range value for link sequence numbers
*/
Expand Down Expand Up @@ -134,7 +140,8 @@ struct tipc_stats {
* @next_out: ptr to first unsent outbound message in queue
* @waiting_ports: linked list of ports waiting for link congestion to abate
* @long_msg_seq_no: next identifier to use for outbound fragmented messages
* @defragm_buf: list of partially reassembled inbound message fragments
* @reasm_head: list head of partially reassembled inbound message fragments
* @reasm_tail: last fragment received
* @stats: collects statistics regarding link activity
*/
struct tipc_link {
Expand Down Expand Up @@ -196,9 +203,10 @@ struct tipc_link {
struct sk_buff *next_out;
struct list_head waiting_ports;

/* Fragmentation/defragmentation */
/* Fragmentation/reassembly */
u32 long_msg_seq_no;
struct sk_buff *defragm_buf;
struct sk_buff *reasm_head;
struct sk_buff *reasm_tail;

/* Statistics */
struct tipc_stats stats;
Expand Down Expand Up @@ -229,9 +237,9 @@ int tipc_link_send_sections_fast(struct tipc_port *sender,
struct iovec const *msg_sect,
unsigned int len, u32 destnode);
void tipc_link_recv_bundle(struct sk_buff *buf);
int tipc_link_recv_fragment(struct sk_buff **pending,
struct sk_buff **fb,
struct tipc_msg **msg);
int tipc_link_recv_fragment(struct sk_buff **reasm_head,
struct sk_buff **reasm_tail,
struct sk_buff **fbuf);
void tipc_link_send_proto_msg(struct tipc_link *l_ptr, u32 msg_typ, int prob,
u32 gap, u32 tolerance, u32 priority,
u32 acked_mtu);
Expand Down
12 changes: 0 additions & 12 deletions net/tipc/msg.h
Original file line number Diff line number Diff line change
Expand Up @@ -554,12 +554,6 @@ static inline void msg_set_last_bcast(struct tipc_msg *m, u32 n)
msg_set_bits(m, 4, 16, 0xffff, n);
}


static inline u32 msg_fragm_no(struct tipc_msg *m)
{
return msg_bits(m, 4, 16, 0xffff);
}

static inline void msg_set_fragm_no(struct tipc_msg *m, u32 n)
{
msg_set_bits(m, 4, 16, 0xffff, n);
Expand All @@ -576,12 +570,6 @@ static inline void msg_set_next_sent(struct tipc_msg *m, u32 n)
msg_set_bits(m, 4, 0, 0xffff, n);
}


static inline u32 msg_long_msgno(struct tipc_msg *m)
{
return msg_bits(m, 4, 0, 0xffff);
}

static inline void msg_set_long_msgno(struct tipc_msg *m, u32 n)
{
msg_set_bits(m, 4, 0, 0xffff, n);
Expand Down
7 changes: 4 additions & 3 deletions net/tipc/node.c
Original file line number Diff line number Diff line change
Expand Up @@ -298,9 +298,10 @@ static void node_lost_contact(struct tipc_node *n_ptr)
}
n_ptr->bclink.deferred_size = 0;

if (n_ptr->bclink.defragm) {
kfree_skb(n_ptr->bclink.defragm);
n_ptr->bclink.defragm = NULL;
if (n_ptr->bclink.reasm_head) {
kfree_skb(n_ptr->bclink.reasm_head);
n_ptr->bclink.reasm_head = NULL;
n_ptr->bclink.reasm_tail = NULL;
}

tipc_bclink_remove_node(n_ptr->addr);
Expand Down
6 changes: 4 additions & 2 deletions net/tipc/node.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@
* @deferred_size: number of OOS b'cast messages in deferred queue
* @deferred_head: oldest OOS b'cast message received from node
* @deferred_tail: newest OOS b'cast message received from node
* @defragm: list of partially reassembled b'cast message fragments from node
* @reasm_head: broadcast reassembly queue head from node
* @reasm_tail: last broadcast fragment received from node
* @recv_permitted: true if node is allowed to receive b'cast messages
*/
struct tipc_node {
Expand All @@ -98,7 +99,8 @@ struct tipc_node {
u32 deferred_size;
struct sk_buff *deferred_head;
struct sk_buff *deferred_tail;
struct sk_buff *defragm;
struct sk_buff *reasm_head;
struct sk_buff *reasm_tail;
bool recv_permitted;
} bclink;
};
Expand Down

0 comments on commit 95ed401

Please sign in to comment.