diff options
Diffstat (limited to 'net')
-rw-r--r-- | net/tipc/bcast.c | 53 | ||||
-rw-r--r-- | net/tipc/discover.c | 3 | ||||
-rw-r--r-- | net/tipc/link.c | 352 | ||||
-rw-r--r-- | net/tipc/link.h | 17 | ||||
-rw-r--r-- | net/tipc/msg.c | 119 | ||||
-rw-r--r-- | net/tipc/msg.h | 87 | ||||
-rw-r--r-- | net/tipc/node.c | 4 | ||||
-rw-r--r-- | net/tipc/node.h | 6 |
8 files changed, 300 insertions, 341 deletions
diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c index 3e41704832de..5aff0844d4d3 100644 --- a/net/tipc/bcast.c +++ b/net/tipc/bcast.c @@ -135,9 +135,10 @@ static void bclink_set_last_sent(struct net *net) { struct tipc_net *tn = net_generic(net, tipc_net_id); struct tipc_link *bcl = tn->bcl; + struct sk_buff *skb = skb_peek(&bcl->backlogq); - if (bcl->next_out) - bcl->fsm_msg_cnt = mod(buf_seqno(bcl->next_out) - 1); + if (skb) + bcl->fsm_msg_cnt = mod(buf_seqno(skb) - 1); else bcl->fsm_msg_cnt = mod(bcl->next_out_no - 1); } @@ -180,7 +181,7 @@ static void bclink_retransmit_pkt(struct tipc_net *tn, u32 after, u32 to) struct sk_buff *skb; struct tipc_link *bcl = tn->bcl; - skb_queue_walk(&bcl->outqueue, skb) { + skb_queue_walk(&bcl->transmq, skb) { if (more(buf_seqno(skb), after)) { tipc_link_retransmit(bcl, skb, mod(to - after)); break; @@ -210,14 +211,17 @@ void tipc_bclink_wakeup_users(struct net *net) void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked) { struct sk_buff *skb, *tmp; - struct sk_buff *next; unsigned int released = 0; struct net *net = n_ptr->net; struct tipc_net *tn = net_generic(net, tipc_net_id); + if (unlikely(!n_ptr->bclink.recv_permitted)) + return; + tipc_bclink_lock(net); + /* Bail out if tx queue is empty (no clean up is required) */ - skb = skb_peek(&tn->bcl->outqueue); + skb = skb_peek(&tn->bcl->transmq); if (!skb) goto exit; @@ -244,27 +248,19 @@ void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked) } /* Skip over packets that node has previously acknowledged */ - skb_queue_walk(&tn->bcl->outqueue, skb) { + skb_queue_walk(&tn->bcl->transmq, skb) { if (more(buf_seqno(skb), n_ptr->bclink.acked)) break; } /* Update packets that node is now acknowledging */ - skb_queue_walk_from_safe(&tn->bcl->outqueue, skb, tmp) { + skb_queue_walk_from_safe(&tn->bcl->transmq, skb, tmp) { if (more(buf_seqno(skb), acked)) break; - - next = tipc_skb_queue_next(&tn->bcl->outqueue, skb); - if (skb != tn->bcl->next_out) { - bcbuf_decr_acks(skb); - } else { - bcbuf_set_acks(skb, 0); - tn->bcl->next_out = next; - bclink_set_last_sent(net); - } - + bcbuf_decr_acks(skb); + bclink_set_last_sent(net); if (bcbuf_acks(skb) == 0) { - __skb_unlink(skb, &tn->bcl->outqueue); + __skb_unlink(skb, &tn->bcl->transmq); kfree_skb(skb); released = 1; } @@ -272,7 +268,7 @@ void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked) n_ptr->bclink.acked = acked; /* Try resolving broadcast link congestion, if necessary */ - if (unlikely(tn->bcl->next_out)) { + if (unlikely(skb_peek(&tn->bcl->backlogq))) { tipc_link_push_packets(tn->bcl); bclink_set_last_sent(net); } @@ -319,7 +315,7 @@ void tipc_bclink_update_link_state(struct tipc_node *n_ptr, buf = tipc_buf_acquire(INT_H_SIZE); if (buf) { struct tipc_msg *msg = buf_msg(buf); - struct sk_buff *skb = skb_peek(&n_ptr->bclink.deferred_queue); + struct sk_buff *skb = skb_peek(&n_ptr->bclink.deferdq); u32 to = skb ? buf_seqno(skb) - 1 : n_ptr->bclink.last_sent; tipc_msg_init(tn->own_addr, msg, BCAST_PROTOCOL, STATE_MSG, @@ -387,14 +383,13 @@ int tipc_bclink_xmit(struct net *net, struct sk_buff_head *list) __skb_queue_purge(list); return -EHOSTUNREACH; } - /* Broadcast to all nodes */ if (likely(bclink)) { tipc_bclink_lock(net); if (likely(bclink->bcast_nodes.count)) { rc = __tipc_link_xmit(net, bcl, list); if (likely(!rc)) { - u32 len = skb_queue_len(&bcl->outqueue); + u32 len = skb_queue_len(&bcl->transmq); bclink_set_last_sent(net); bcl->stats.queue_sz_counts++; @@ -559,25 +554,25 @@ receive: if (node->bclink.last_in == node->bclink.last_sent) goto unlock; - if (skb_queue_empty(&node->bclink.deferred_queue)) { + if (skb_queue_empty(&node->bclink.deferdq)) { node->bclink.oos_state = 1; goto unlock; } - msg = buf_msg(skb_peek(&node->bclink.deferred_queue)); + msg = buf_msg(skb_peek(&node->bclink.deferdq)); seqno = msg_seqno(msg); next_in = mod(next_in + 1); if (seqno != next_in) goto unlock; /* Take in-sequence message from deferred queue & deliver it */ - buf = __skb_dequeue(&node->bclink.deferred_queue); + buf = __skb_dequeue(&node->bclink.deferdq); goto receive; } /* Handle out-of-sequence broadcast message */ if (less(next_in, seqno)) { - deferred = tipc_link_defer_pkt(&node->bclink.deferred_queue, + deferred = tipc_link_defer_pkt(&node->bclink.deferdq, buf); bclink_update_last_sent(node, seqno); buf = NULL; @@ -634,7 +629,6 @@ static int tipc_bcbearer_send(struct net *net, struct sk_buff *buf, msg_set_non_seq(msg, 1); msg_set_mc_netid(msg, tn->net_id); tn->bcl->stats.sent_info++; - if (WARN_ON(!bclink->bcast_nodes.count)) { dump_stack(); return 0; @@ -913,8 +907,9 @@ int tipc_bclink_init(struct net *net) sprintf(bcbearer->media.name, "tipc-broadcast"); spin_lock_init(&bclink->lock); - __skb_queue_head_init(&bcl->outqueue); - __skb_queue_head_init(&bcl->deferred_queue); + __skb_queue_head_init(&bcl->transmq); + __skb_queue_head_init(&bcl->backlogq); + __skb_queue_head_init(&bcl->deferdq); skb_queue_head_init(&bcl->wakeupq); bcl->next_out_no = 1; spin_lock_init(&bclink->node.lock); diff --git a/net/tipc/discover.c b/net/tipc/discover.c index 5967506833ce..169f3dd038b9 100644 --- a/net/tipc/discover.c +++ b/net/tipc/discover.c @@ -89,6 +89,7 @@ static void tipc_disc_init_msg(struct net *net, struct sk_buff *buf, u32 type, MAX_H_SIZE, dest_domain); msg_set_non_seq(msg, 1); msg_set_node_sig(msg, tn->random); + msg_set_node_capabilities(msg, 0); msg_set_dest_domain(msg, dest_domain); msg_set_bc_netid(msg, tn->net_id); b_ptr->media->addr2msg(msg_media_addr(msg), &b_ptr->addr); @@ -133,6 +134,7 @@ void tipc_disc_rcv(struct net *net, struct sk_buff *buf, u32 net_id = msg_bc_netid(msg); u32 mtyp = msg_type(msg); u32 signature = msg_node_sig(msg); + u16 caps = msg_node_capabilities(msg); bool addr_match = false; bool sign_match = false; bool link_up = false; @@ -167,6 +169,7 @@ void tipc_disc_rcv(struct net *net, struct sk_buff *buf, if (!node) return; tipc_node_lock(node); + node->capabilities = caps; link = node->links[bearer->identity]; /* Prepare to validate requesting node's signature and media address */ diff --git a/net/tipc/link.c b/net/tipc/link.c index 98609fdfb06a..bc49120bfb44 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -1,7 +1,7 @@ /* * net/tipc/link.c: TIPC link code * - * Copyright (c) 1996-2007, 2012-2014, Ericsson AB + * Copyright (c) 1996-2007, 2012-2015, Ericsson AB * Copyright (c) 2004-2007, 2010-2013, Wind River Systems * All rights reserved. * @@ -35,6 +35,7 @@ */ #include "core.h" +#include "subscr.h" #include "link.h" #include "bcast.h" #include "socket.h" @@ -194,10 +195,10 @@ static void link_timeout(unsigned long data) tipc_node_lock(l_ptr->owner); /* update counters used in statistical profiling of send traffic */ - l_ptr->stats.accu_queue_sz += skb_queue_len(&l_ptr->outqueue); + l_ptr->stats.accu_queue_sz += skb_queue_len(&l_ptr->transmq); l_ptr->stats.queue_sz_counts++; - skb = skb_peek(&l_ptr->outqueue); + skb = skb_peek(&l_ptr->transmq); if (skb) { struct tipc_msg *msg = buf_msg(skb); u32 length = msg_size(msg); @@ -229,7 +230,7 @@ static void link_timeout(unsigned long data) /* do all other link processing performed on a periodic basis */ link_state_event(l_ptr, TIMEOUT_EVT); - if (l_ptr->next_out) + if (skb_queue_len(&l_ptr->backlogq)) tipc_link_push_packets(l_ptr); tipc_node_unlock(l_ptr->owner); @@ -305,16 +306,15 @@ struct tipc_link *tipc_link_create(struct tipc_node *n_ptr, msg_set_session(msg, (tn->random & 0xffff)); msg_set_bearer_id(msg, b_ptr->identity); strcpy((char *)msg_data(msg), if_name); - - l_ptr->priority = b_ptr->priority; - tipc_link_set_queue_limits(l_ptr, b_ptr->window); - l_ptr->net_plane = b_ptr->net_plane; link_init_max_pkt(l_ptr); + l_ptr->priority = b_ptr->priority; + tipc_link_set_queue_limits(l_ptr, b_ptr->window); l_ptr->next_out_no = 1; - __skb_queue_head_init(&l_ptr->outqueue); - __skb_queue_head_init(&l_ptr->deferred_queue); + __skb_queue_head_init(&l_ptr->transmq); + __skb_queue_head_init(&l_ptr->backlogq); + __skb_queue_head_init(&l_ptr->deferdq); skb_queue_head_init(&l_ptr->wakeupq); skb_queue_head_init(&l_ptr->inputq); skb_queue_head_init(&l_ptr->namedq); @@ -400,7 +400,7 @@ static bool link_schedule_user(struct tipc_link *link, u32 oport, */ void link_prepare_wakeup(struct tipc_link *link) { - uint pend_qsz = skb_queue_len(&link->outqueue); + uint pend_qsz = skb_queue_len(&link->backlogq); struct sk_buff *skb, *tmp; skb_queue_walk_safe(&link->wakeupq, skb, tmp) { @@ -430,8 +430,9 @@ void tipc_link_reset_fragments(struct tipc_link *l_ptr) */ void tipc_link_purge_queues(struct tipc_link *l_ptr) { - __skb_queue_purge(&l_ptr->deferred_queue); - __skb_queue_purge(&l_ptr->outqueue); + __skb_queue_purge(&l_ptr->deferdq); + __skb_queue_purge(&l_ptr->transmq); + __skb_queue_purge(&l_ptr->backlogq); tipc_link_reset_fragments(l_ptr); } @@ -464,15 +465,15 @@ void tipc_link_reset(struct tipc_link *l_ptr) } /* Clean up all queues, except inputq: */ - __skb_queue_purge(&l_ptr->outqueue); - __skb_queue_purge(&l_ptr->deferred_queue); + __skb_queue_purge(&l_ptr->transmq); + __skb_queue_purge(&l_ptr->backlogq); + __skb_queue_purge(&l_ptr->deferdq); if (!owner->inputq) owner->inputq = &l_ptr->inputq; skb_queue_splice_init(&l_ptr->wakeupq, owner->inputq); if (!skb_queue_empty(owner->inputq)) owner->action_flags |= TIPC_MSG_EVT; - l_ptr->next_out = NULL; - l_ptr->unacked_window = 0; + l_ptr->rcv_unacked = 0; l_ptr->checkpoint = 1; l_ptr->next_out_no = 1; l_ptr->fsm_msg_cnt = 0; @@ -706,7 +707,7 @@ static int tipc_link_cong(struct tipc_link *link, struct sk_buff_head *list) { struct sk_buff *skb = skb_peek(list); struct tipc_msg *msg = buf_msg(skb); - uint imp = tipc_msg_tot_importance(msg); + int imp = msg_importance(msg); u32 oport = msg_tot_origport(msg); if (unlikely(imp > TIPC_CRITICAL_IMPORTANCE)) { @@ -742,54 +743,51 @@ int __tipc_link_xmit(struct net *net, struct tipc_link *link, struct sk_buff_head *list) { struct tipc_msg *msg = buf_msg(skb_peek(list)); - uint psz = msg_size(msg); - uint sndlim = link->queue_limit[0]; - uint imp = tipc_msg_tot_importance(msg); + unsigned int maxwin = link->window; + unsigned int imp = msg_importance(msg); uint mtu = link->max_pkt; uint ack = mod(link->next_in_no - 1); uint seqno = link->next_out_no; uint bc_last_in = link->owner->bclink.last_in; struct tipc_media_addr *addr = &link->media_addr; - struct sk_buff_head *outqueue = &link->outqueue; + struct sk_buff_head *transmq = &link->transmq; + struct sk_buff_head *backlogq = &link->backlogq; struct sk_buff *skb, *tmp; - /* Match queue limits against msg importance: */ - if (unlikely(skb_queue_len(outqueue) >= link->queue_limit[imp])) + /* Match queue limit against msg importance: */ + if (unlikely(skb_queue_len(backlogq) >= link->queue_limit[imp])) return tipc_link_cong(link, list); /* Has valid packet limit been used ? */ - if (unlikely(psz > mtu)) { + if (unlikely(msg_size(msg) > mtu)) { __skb_queue_purge(list); return -EMSGSIZE; } - /* Prepare each packet for sending, and add to outqueue: */ + /* Prepare each packet for sending, and add to relevant queue: */ skb_queue_walk_safe(list, skb, tmp) { __skb_unlink(skb, list); msg = buf_msg(skb); - msg_set_word(msg, 2, ((ack << 16) | mod(seqno))); + msg_set_seqno(msg, seqno); + msg_set_ack(msg, ack); msg_set_bcast_ack(msg, bc_last_in); - if (skb_queue_len(outqueue) < sndlim) { - __skb_queue_tail(outqueue, skb); - tipc_bearer_send(net, link->bearer_id, - skb, addr); - link->next_out = NULL; - link->unacked_window = 0; - } else if (tipc_msg_bundle(outqueue, skb, mtu)) { + if (likely(skb_queue_len(transmq) < maxwin)) { + __skb_queue_tail(transmq, skb); + tipc_bearer_send(net, link->bearer_id, skb, addr); + link->rcv_unacked = 0; + seqno++; + continue; + } + if (tipc_msg_bundle(skb_peek_tail(backlogq), skb, mtu)) { link->stats.sent_bundled++; continue; - } else if (tipc_msg_make_bundle(outqueue, skb, mtu, - link->addr)) { + } + if (tipc_msg_make_bundle(&skb, mtu, link->addr)) { link->stats.sent_bundled++; link->stats.sent_bundles++; - if (!link->next_out) - link->next_out = skb_peek_tail(outqueue); - } else { - __skb_queue_tail(outqueue, skb); - if (!link->next_out) - link->next_out = skb; } + __skb_queue_tail(backlogq, skb); seqno++; } link->next_out_no = seqno; @@ -895,14 +893,6 @@ static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf) kfree_skb(buf); } -struct sk_buff *tipc_skb_queue_next(const struct sk_buff_head *list, - const struct sk_buff *skb) -{ - if (skb_queue_is_last(list, skb)) - return NULL; - return skb->next; -} - /* * tipc_link_push_packets - push unsent packets to bearer * @@ -911,30 +901,23 @@ struct sk_buff *tipc_skb_queue_next(const struct sk_buff_head *list, * * Called with node locked */ -void tipc_link_push_packets(struct tipc_link *l_ptr) +void tipc_link_push_packets(struct tipc_link *link) { - struct sk_buff_head *outqueue = &l_ptr->outqueue; - struct sk_buff *skb = l_ptr->next_out; + struct sk_buff *skb; struct tipc_msg *msg; - u32 next, first; + unsigned int ack = mod(link->next_in_no - 1); - skb_queue_walk_from(outqueue, skb) { - msg = buf_msg(skb); - next = msg_seqno(msg); - first = buf_seqno(skb_peek(outqueue)); - - if (mod(next - first) < l_ptr->queue_limit[0]) { - msg_set_ack(msg, mod(l_ptr->next_in_no - 1)); - msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in); - if (msg_user(msg) == MSG_BUNDLER) - TIPC_SKB_CB(skb)->bundling = false; - tipc_bearer_send(l_ptr->owner->net, - l_ptr->bearer_id, skb, - &l_ptr->media_addr); - l_ptr->next_out = tipc_skb_queue_next(outqueue, skb); - } else { + while (skb_queue_len(&link->transmq) < link->window) { + skb = __skb_dequeue(&link->backlogq); + if (!skb) break; - } + msg = buf_msg(skb); + msg_set_ack(msg, ack); + msg_set_bcast_ack(msg, link->owner->bclink.last_in); + link->rcv_unacked = 0; + __skb_queue_tail(&link->transmq, skb); + tipc_bearer_send(link->owner->net, link->bearer_id, + skb, &link->media_addr); } } @@ -1021,8 +1004,8 @@ void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *skb, l_ptr->stale_count = 1; } - skb_queue_walk_from(&l_ptr->outqueue, skb) { - if (!retransmits || skb == l_ptr->next_out) + skb_queue_walk_from(&l_ptr->transmq, skb) { + if (!retransmits) break; msg = buf_msg(skb); msg_set_ack(msg, mod(l_ptr->next_in_no - 1)); @@ -1039,67 +1022,12 @@ static void link_retrieve_defq(struct tipc_link *link, { u32 seq_no; - if (skb_queue_empty(&link->deferred_queue)) + if (skb_queue_empty(&link->deferdq)) return; - seq_no = buf_seqno(skb_peek(&link->deferred_queue)); + seq_no = buf_seqno(skb_peek(&link->deferdq)); if (seq_no == mod(link->next_in_no)) - skb_queue_splice_tail_init(&link->deferred_queue, list); -} - -/** - * link_recv_buf_validate - validate basic format of received message - * - * This routine ensures a TIPC message has an acceptable header, and at least - * as much data as the header indicates it should. The routine also ensures - * that the entire message header is stored in the main fragment of the message - * buffer, to simplify future access to message header fields. - * - * Note: Having extra info present in the message header or data areas is OK. - * TIPC will ignore the excess, under the assumption that it is optional info - * introduced by a later release of the protocol. - */ -static int link_recv_buf_validate(struct sk_buff *buf) -{ - static u32 min_data_hdr_size[8] = { - SHORT_H_SIZE, MCAST_H_SIZE, NAMED_H_SIZE, BASIC_H_SIZE, - MAX_H_SIZE, MAX_H_SIZE, MAX_H_SIZE, MAX_H_SIZE - }; - - struct tipc_msg *msg; - u32 tipc_hdr[2]; - u32 size; - u32 hdr_size; - u32 min_hdr_size; - - /* If this packet comes from the defer queue, the skb has already - * been validated - */ - if (unlikely(TIPC_SKB_CB(buf)->deferred)) - return 1; - - if (unlikely(buf->len < MIN_H_SIZE)) - return 0; - - msg = skb_header_pointer(buf, 0, sizeof(tipc_hdr), tipc_hdr); - if (msg == NULL) - return 0; - - if (unlikely(msg_version(msg) != TIPC_VERSION)) - return 0; - - size = msg_size(msg); - hdr_size = msg_hdr_sz(msg); - min_hdr_size = msg_isdata(msg) ? - min_data_hdr_size[msg_type(msg)] : INT_H_SIZE; - - if (unlikely((hdr_size < min_hdr_size) || - (size < hdr_size) || - (buf->len < size) || - (size - hdr_size > TIPC_MAX_USER_MSG_SIZE))) - return 0; - - return pskb_may_pull(buf, hdr_size); + skb_queue_splice_tail_init(&link->deferdq, list); } /** @@ -1127,16 +1055,11 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b_ptr) while ((skb = __skb_dequeue(&head))) { /* Ensure message is well-formed */ - if (unlikely(!link_recv_buf_validate(skb))) - goto discard; - - /* Ensure message data is a single contiguous unit */ - if (unlikely(skb_linearize(skb))) + if (unlikely(!tipc_msg_validate(skb))) goto discard; /* Handle arrival of a non-unicast link message */ msg = buf_msg(skb); - if (unlikely(msg_non_seq(msg))) { if (msg_user(msg) == LINK_CONFIG) tipc_disc_rcv(net, skb, b_ptr); @@ -1177,21 +1100,20 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b_ptr) ackd = msg_ack(msg); /* Release acked messages */ - if (n_ptr->bclink.recv_permitted) + if (unlikely(n_ptr->bclink.acked != msg_bcast_ack(msg))) tipc_bclink_acknowledge(n_ptr, msg_bcast_ack(msg)); released = 0; - skb_queue_walk_safe(&l_ptr->outqueue, skb1, tmp) { - if (skb1 == l_ptr->next_out || - more(buf_seqno(skb1), ackd)) + skb_queue_walk_safe(&l_ptr->transmq, skb1, tmp) { + if (more(buf_seqno(skb1), ackd)) break; - __skb_unlink(skb1, &l_ptr->outqueue); + __skb_unlink(skb1, &l_ptr->transmq); kfree_skb(skb1); released = 1; } /* Try sending any messages link endpoint has pending */ - if (unlikely(l_ptr->next_out)) + if (unlikely(skb_queue_len(&l_ptr->backlogq))) tipc_link_push_packets(l_ptr); if (released && !skb_queue_empty(&l_ptr->wakeupq)) @@ -1226,10 +1148,9 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b_ptr) goto unlock; } l_ptr->next_in_no++; - if (unlikely(!skb_queue_empty(&l_ptr->deferred_queue))) + if (unlikely(!skb_queue_empty(&l_ptr->deferdq))) link_retrieve_defq(l_ptr, &head); - - if (unlikely(++l_ptr->unacked_window >= TIPC_MIN_LINK_WIN)) { + if (unlikely(++l_ptr->rcv_unacked >= TIPC_MIN_LINK_WIN)) { l_ptr->stats.sent_acks++; tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0, 0); } @@ -1396,10 +1317,9 @@ static void link_handle_out_of_seq_msg(struct tipc_link *l_ptr, return; } - if (tipc_link_defer_pkt(&l_ptr->deferred_queue, buf)) { + if (tipc_link_defer_pkt(&l_ptr->deferdq, buf)) { l_ptr->stats.deferred_recv++; - TIPC_SKB_CB(buf)->deferred = true; - if ((skb_queue_len(&l_ptr->deferred_queue) % 16) == 1) + if ((skb_queue_len(&l_ptr->deferdq) % TIPC_MIN_LINK_WIN) == 1) tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0, 0); } else { l_ptr->stats.duplicates++; @@ -1436,11 +1356,11 @@ void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int probe_msg, if (!tipc_link_is_up(l_ptr)) return; - if (l_ptr->next_out) - next_sent = buf_seqno(l_ptr->next_out); + if (skb_queue_len(&l_ptr->backlogq)) + next_sent = buf_seqno(skb_peek(&l_ptr->backlogq)); msg_set_next_sent(msg, next_sent); - if (!skb_queue_empty(&l_ptr->deferred_queue)) { - u32 rec = buf_seqno(skb_peek(&l_ptr->deferred_queue)); + if (!skb_queue_empty(&l_ptr->deferdq)) { + u32 rec = buf_seqno(skb_peek(&l_ptr->deferdq)); gap = mod(rec - mod(l_ptr->next_in_no)); } msg_set_seq_gap(msg, gap); @@ -1492,10 +1412,9 @@ void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int probe_msg, skb_copy_to_linear_data(buf, msg, sizeof(l_ptr->proto_msg)); buf->priority = TC_PRIO_CONTROL; - tipc_bearer_send(l_ptr->owner->net, l_ptr->bearer_id, buf, &l_ptr->media_addr); - l_ptr->unacked_window = 0; + l_ptr->rcv_unacked = 0; kfree_skb(buf); } @@ -1630,7 +1549,7 @@ static void tipc_link_proto_rcv(struct tipc_link *l_ptr, } if (msg_seq_gap(msg)) { l_ptr->stats.recv_nacks++; - tipc_link_retransmit(l_ptr, skb_peek(&l_ptr->outqueue), + tipc_link_retransmit(l_ptr, skb_peek(&l_ptr->transmq), msg_seq_gap(msg)); } break; @@ -1677,7 +1596,7 @@ static void tipc_link_tunnel_xmit(struct tipc_link *l_ptr, */ void tipc_link_failover_send_queue(struct tipc_link *l_ptr) { - u32 msgcount = skb_queue_len(&l_ptr->outqueue); + int msgcount; struct tipc_link *tunnel = l_ptr->owner->active_links[0]; struct tipc_msg tunnel_hdr; struct sk_buff *skb; @@ -1688,10 +1607,12 @@ void tipc_link_failover_send_queue(struct tipc_link *l_ptr) tipc_msg_init(link_own_addr(l_ptr), &tunnel_hdr, CHANGEOVER_PROTOCOL, ORIGINAL_MSG, INT_H_SIZE, l_ptr->addr); + skb_queue_splice_tail_init(&l_ptr->backlogq, &l_ptr->transmq); + msgcount = skb_queue_len(&l_ptr->transmq); msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id); msg_set_msgcnt(&tunnel_hdr, msgcount); - if (skb_queue_empty(&l_ptr->outqueue)) { + if (skb_queue_empty(&l_ptr->transmq)) { skb = tipc_buf_acquire(INT_H_SIZE); if (skb) { skb_copy_to_linear_data(skb, &tunnel_hdr, INT_H_SIZE); @@ -1707,7 +1628,7 @@ void tipc_link_failover_send_queue(struct tipc_link *l_ptr) split_bundles = (l_ptr->owner->active_links[0] != l_ptr->owner->active_links[1]); - skb_queue_walk(&l_ptr->outqueue, skb) { + skb_queue_walk(&l_ptr->transmq, skb) { struct tipc_msg *msg = buf_msg(skb); if ((msg_user(msg) == MSG_BUNDLER) && split_bundles) { @@ -1738,80 +1659,66 @@ void tipc_link_failover_send_queue(struct tipc_link *l_ptr) * and sequence order is preserved per sender/receiver socket pair. * Owner node is locked. */ -void tipc_link_dup_queue_xmit(struct tipc_link *l_ptr, - struct tipc_link *tunnel) +void tipc_link_dup_queue_xmit(struct tipc_link *link, + struct tipc_link *tnl) { struct sk_buff *skb; - struct tipc_msg tunnel_hdr; - - tipc_msg_init(link_own_addr(l_ptr), &tunnel_hdr, CHANGEOVER_PROTOCOL, - DUPLICATE_MSG, INT_H_SIZE, l_ptr->addr); - msg_set_msgcnt(&tunnel_hdr, skb_queue_len(&l_ptr->outqueue)); - msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id); - skb_queue_walk(&l_ptr->outqueue, skb) { + struct tipc_msg tnl_hdr; + struct sk_buff_head *queue = &link->transmq; + int mcnt; + + tipc_msg_init(link_own_addr(link), &tnl_hdr, CHANGEOVER_PROTOCOL, + DUPLICATE_MSG, INT_H_SIZE, link->addr); + mcnt = skb_queue_len(&link->transmq) + skb_queue_len(&link->backlogq); + msg_set_msgcnt(&tnl_hdr, mcnt); + msg_set_bearer_id(&tnl_hdr, link->peer_bearer_id); + +tunnel_queue: + skb_queue_walk(queue, skb) { struct sk_buff *outskb; struct tipc_msg *msg = buf_msg(skb); - u32 length = msg_size(msg); + u32 len = msg_size(msg); - if (msg_user(msg) == MSG_BUNDLER) - msg_set_type(msg, CLOSED_MSG); - msg_set_ack(msg, mod(l_ptr->next_in_no - 1)); /* Update */ - msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in); - msg_set_size(&tunnel_hdr, length + INT_H_SIZE); - outskb = tipc_buf_acquire(length + INT_H_SIZE); + msg_set_ack(msg, mod(link->next_in_no - 1)); + msg_set_bcast_ack(msg, link->owner->bclink.last_in); + msg_set_size(&tnl_hdr, len + INT_H_SIZE); + outskb = tipc_buf_acquire(len + INT_H_SIZE); if (outskb == NULL) { pr_warn("%sunable to send duplicate msg\n", link_co_err); return; } - skb_copy_to_linear_data(outskb, &tunnel_hdr, INT_H_SIZE); - skb_copy_to_linear_data_offset(outskb, INT_H_SIZE, skb->data, - length); - __tipc_link_xmit_skb(tunnel, outskb); - if (!tipc_link_is_up(l_ptr)) + skb_copy_to_linear_data(outskb, &tnl_hdr, INT_H_SIZE); + skb_copy_to_linear_data_offset(outskb, INT_H_SIZE, + skb->data, len); + __tipc_link_xmit_skb(tnl, outskb); + if (!tipc_link_is_up(link)) return; } -} - -/** - * buf_extract - extracts embedded TIPC message from another message - * @skb: encapsulating message buffer - * @from_pos: offset to extract from - * - * Returns a new message buffer containing an embedded message. The - * encapsulating buffer is left unchanged. - */ -static struct sk_buff *buf_extract(struct sk_buff *skb, u32 from_pos) -{ - struct tipc_msg *msg = (struct tipc_msg *)(skb->data + from_pos); - u32 size = msg_size(msg); - struct sk_buff *eb; - - eb = tipc_buf_acquire(size); - if (eb) - skb_copy_to_linear_data(eb, msg, size); - return eb; + if (queue == &link->backlogq) + return; + queue = &link->backlogq; + goto tunnel_queue; } /* tipc_link_dup_rcv(): Receive a tunnelled DUPLICATE_MSG packet. * Owner node is locked. */ -static void tipc_link_dup_rcv(struct tipc_link *l_ptr, - struct sk_buff *t_buf) +static void tipc_link_dup_rcv(struct tipc_link *link, + struct sk_buff *skb) { - struct sk_buff *buf; + struct sk_buff *iskb; + int pos = 0; - if (!tipc_link_is_up(l_ptr)) + if (!tipc_link_is_up(link)) return; - buf = buf_extract(t_buf, INT_H_SIZE); - if (buf == NULL) { + if (!tipc_msg_extract(skb, &iskb, &pos)) { pr_warn("%sfailed to extract inner dup pkt\n", link_co_err); return; } - - /* Add buffer to deferred queue, if applicable: */ - link_handle_out_of_seq_msg(l_ptr, buf); + /* Append buffer to deferred queue, if applicable: */ + link_handle_out_of_seq_msg(link, iskb); } /* tipc_link_failover_rcv(): Receive a tunnelled ORIGINAL_MSG packet @@ -1823,6 +1730,7 @@ static struct sk_buff *tipc_link_failover_rcv(struct tipc_link *l_ptr, struct tipc_msg *t_msg = buf_msg(t_buf); struct sk_buff *buf = NULL; struct tipc_msg *msg; + int pos = 0; if (tipc_link_is_up(l_ptr)) tipc_link_reset(l_ptr); @@ -1834,8 +1742,7 @@ static struct sk_buff *tipc_link_failover_rcv(struct tipc_link *l_ptr, /* Should there be an inner packet? */ if (l_ptr->exp_msg_count) { l_ptr->exp_msg_count--; - buf = buf_extract(t_buf, INT_H_SIZE); - if (buf == NULL) { + if (!tipc_msg_extract(t_buf, &buf, &pos)) { pr_warn("%sno inner failover pkt\n", link_co_err); goto exit; } @@ -1903,23 +1810,16 @@ static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tol) l_ptr->abort_limit = tol / (jiffies_to_msecs(l_ptr->cont_intv) / 4); } -void tipc_link_set_queue_limits(struct tipc_link *l_ptr, u32 window) +void tipc_link_set_queue_limits(struct tipc_link *l, u32 win) { - /* Data messages from this node, inclusive FIRST_FRAGM */ - l_ptr->queue_limit[TIPC_LOW_IMPORTANCE] = window; - l_ptr->queue_limit[TIPC_MEDIUM_IMPORTANCE] = (window / 3) * 4; - l_ptr->queue_limit[TIPC_HIGH_IMPORTANCE] = (window / 3) * 5; - l_ptr->queue_limit[TIPC_CRITICAL_IMPORTANCE] = (window / 3) * 6; - /* Transiting data messages,inclusive FIRST_FRAGM */ - l_ptr->queue_limit[TIPC_LOW_IMPORTANCE + 4] = 300; - l_ptr->queue_limit[TIPC_MEDIUM_IMPORTANCE + 4] = 600; - l_ptr->queue_limit[TIPC_HIGH_IMPORTANCE + 4] = 900; - l_ptr->queue_limit[TIPC_CRITICAL_IMPORTANCE + 4] = 1200; - l_ptr->queue_limit[CONN_MANAGER] = 1200; - l_ptr->queue_limit[CHANGEOVER_PROTOCOL] = 2500; - l_ptr->queue_limit[NAME_DISTRIBUTOR] = 3000; - /* FRAGMENT and LAST_FRAGMENT packets */ - l_ptr->queue_limit[MSG_FRAGMENTER] = 4000; + int max_bulk = TIPC_MAX_PUBLICATIONS / (l->max_pkt / ITEM_SIZE); + + l->window = win; + l->queue_limit[TIPC_LOW_IMPORTANCE] = win / 2; + l->queue_limit[TIPC_MEDIUM_IMPORTANCE] = win; + l->queue_limit[TIPC_HIGH_IMPORTANCE] = win / 2 * 3; + l->queue_limit[TIPC_CRITICAL_IMPORTANCE] = win * 2; + l->queue_limit[TIPC_SYSTEM_IMPORTANCE] = max_bulk; } /* tipc_link_find_owner - locate owner node of link by link's name diff --git a/net/tipc/link.h b/net/tipc/link.h index 7aeb52092bf3..eec3ecf2d450 100644 --- a/net/tipc/link.h +++ b/net/tipc/link.h @@ -124,7 +124,8 @@ struct tipc_stats { * @max_pkt: current maximum packet size for this link * @max_pkt_target: desired maximum packet size for this link * @max_pkt_probes: # of probes based on current (max_pkt, max_pkt_target) - * @outqueue: outbound message queue + * @transmitq: queue for sent, non-acked messages + * @backlogq: queue for messages waiting to be sent * @next_out_no: next sequence number to use for outbound messages * @last_retransmitted: sequence number of most recently retransmitted message * @stale_count: # of identical retransmit requests made by peer @@ -177,20 +178,21 @@ struct tipc_link { u32 max_pkt_probes; /* Sending */ - struct sk_buff_head outqueue; + struct sk_buff_head transmq; + struct sk_buff_head backlogq; u32 next_out_no; + u32 window; u32 last_retransmitted; u32 stale_count; /* Reception */ u32 next_in_no; - struct sk_buff_head deferred_queue; - u32 unacked_window; + u32 rcv_unacked; + struct sk_buff_head deferdq; struct sk_buff_head inputq; struct sk_buff_head namedq; /* Congestion handling */ - struct sk_buff *next_out; struct sk_buff_head wakeupq; /* Fragmentation/reassembly */ @@ -302,9 +304,4 @@ static inline int link_reset_reset(struct tipc_link *l_ptr) return l_ptr->state == RESET_RESET; } -static inline int link_congested(struct tipc_link *l_ptr) -{ - return skb_queue_len(&l_ptr->outqueue) >= l_ptr->queue_limit[0]; -} - #endif diff --git a/net/tipc/msg.c b/net/tipc/msg.c index b6eb90cd3ef7..0c6dad8180a0 100644 --- a/net/tipc/msg.c +++ b/net/tipc/msg.c @@ -1,7 +1,7 @@ /* * net/tipc/msg.c: TIPC message header routines * - * Copyright (c) 2000-2006, 2014, Ericsson AB + * Copyright (c) 2000-2006, 2014-2015, Ericsson AB * Copyright (c) 2005, 2010-2011, Wind River Systems * All rights reserved. * @@ -165,6 +165,9 @@ int tipc_buf_append(struct sk_buff **headbuf, struct sk_buff **buf) } if (fragid == LAST_FRAGMENT) { + TIPC_SKB_CB(head)->validated = false; + if (unlikely(!tipc_msg_validate(head))) + goto err; *buf = head; TIPC_SKB_CB(head)->tail = NULL; *headbuf = NULL; @@ -172,7 +175,6 @@ int tipc_buf_append(struct sk_buff **headbuf, struct sk_buff **buf) } *buf = NULL; return 0; - err: pr_warn_ratelimited("Unable to build fragment list\n"); kfree_skb(*buf); @@ -181,6 +183,48 @@ err: return 0; } +/* tipc_msg_validate - validate basic format of received message + * + * This routine ensures a TIPC message has an acceptable header, and at least + * as much data as the header indicates it should. The routine also ensures + * that the entire message header is stored in the main fragment of the message + * buffer, to simplify future access to message header fields. + * + * Note: Having extra info present in the message header or data areas is OK. + * TIPC will ignore the excess, under the assumption that it is optional info + * introduced by a later release of the protocol. + */ +bool tipc_msg_validate(struct sk_buff *skb) +{ + struct tipc_msg *msg; + int msz, hsz; + + if (unlikely(TIPC_SKB_CB(skb)->validated)) + return true; + if (unlikely(!pskb_may_pull(skb, MIN_H_SIZE))) + return false; + + hsz = msg_hdr_sz(buf_msg(skb)); + if (unlikely(hsz < MIN_H_SIZE) || (hsz > MAX_H_SIZE)) + return false; + if (unlikely(!pskb_may_pull(skb, hsz))) + return false; + + msg = buf_msg(skb); + if (unlikely(msg_version(msg) != TIPC_VERSION)) + return false; + + msz = msg_size(msg); + if (unlikely(msz < hsz)) + return false; + if (unlikely((msz - hsz) > TIPC_MAX_USER_MSG_SIZE)) + return false; + if (unlikely(skb->len < msz)) + return false; + + TIPC_SKB_CB(skb)->validated = true; + return true; +} /** * tipc_msg_build - create buffer chain containing specified header and data @@ -228,6 +272,7 @@ int tipc_msg_build(struct tipc_msg *mhdr, struct msghdr *m, FIRST_FRAGMENT, INT_H_SIZE, msg_destnode(mhdr)); msg_set_size(&pkthdr, pktmax); msg_set_fragm_no(&pkthdr, pktno); + msg_set_importance(&pkthdr, msg_importance(mhdr)); /* Prepare first fragment */ skb = tipc_buf_acquire(pktmax); @@ -286,33 +331,36 @@ error: /** * tipc_msg_bundle(): Append contents of a buffer to tail of an existing one - * @list: the buffer chain of the existing buffer ("bundle") + * @bskb: the buffer to append to ("bundle") * @skb: buffer to be appended * @mtu: max allowable size for the bundle buffer * Consumes buffer if successful * Returns true if bundling could be performed, otherwise false */ -bool tipc_msg_bundle(struct sk_buff_head *list, struct sk_buff *skb, u32 mtu) +bool tipc_msg_bundle(struct sk_buff *bskb, struct sk_buff *skb, u32 mtu) { - struct sk_buff *bskb = skb_peek_tail(list); - struct tipc_msg *bmsg = buf_msg(bskb); + struct tipc_msg *bmsg; struct tipc_msg *msg = buf_msg(skb); - unsigned int bsz = msg_size(bmsg); + unsigned int bsz; unsigned int msz = msg_size(msg); - u32 start = align(bsz); + u32 start, pad; u32 max = mtu - INT_H_SIZE; - u32 pad = start - bsz; if (likely(msg_user(msg) == MSG_FRAGMENTER)) return false; + if (!bskb) + return false; + bmsg = buf_msg(bskb); + bsz = msg_size(bmsg); + start = align(bsz); + pad = start - bsz; + if (unlikely(msg_user(msg) == CHANGEOVER_PROTOCOL)) return false; if (unlikely(msg_user(msg) == BCAST_PROTOCOL)) return false; if (likely(msg_user(bmsg) != MSG_BUNDLER)) return false; - if (likely(!TIPC_SKB_CB(bskb)->bundling)) - return false; if (unlikely(skb_tailroom(bskb) < (pad + msz))) return false; if (unlikely(max < (start + msz))) @@ -328,34 +376,40 @@ bool tipc_msg_bundle(struct sk_buff_head *list, struct sk_buff *skb, u32 mtu) /** * tipc_msg_extract(): extract bundled inner packet from buffer - * @skb: linear outer buffer, to be extracted from. + * @skb: buffer to be extracted from. * @iskb: extracted inner buffer, to be returned - * @pos: position of msg to be extracted. Returns with pointer of next msg + * @pos: position in outer message of msg to be extracted. + * Returns position of next msg * Consumes outer buffer when last packet extracted * Returns true when when there is an extracted buffer, otherwise false */ bool tipc_msg_extract(struct sk_buff *skb, struct sk_buff **iskb, int *pos) { - struct tipc_msg *msg = buf_msg(skb); - int imsz; - struct tipc_msg *imsg = (struct tipc_msg *)(msg_data(msg) + *pos); + struct tipc_msg *msg; + int imsz, offset; + + *iskb = NULL; + if (unlikely(skb_linearize(skb))) + goto none; - /* Is there space left for shortest possible message? */ - if (*pos > (msg_data_sz(msg) - SHORT_H_SIZE)) + msg = buf_msg(skb); + offset = msg_hdr_sz(msg) + *pos; + if (unlikely(offset > (msg_size(msg) - MIN_H_SIZE))) goto none; - imsz = msg_size(imsg); - /* Is there space left for current message ? */ - if ((*pos + imsz) > msg_data_sz(msg)) + *iskb = skb_clone(skb, GFP_ATOMIC); + if (unlikely(!*iskb)) goto none; - *iskb = tipc_buf_acquire(imsz); - if (!*iskb) + skb_pull(*iskb, offset); + imsz = msg_size(buf_msg(*iskb)); + skb_trim(*iskb, imsz); + if (unlikely(!tipc_msg_validate(*iskb))) goto none; - skb_copy_to_linear_data(*iskb, imsg, imsz); *pos += align(imsz); return true; none: kfree_skb(skb); + kfree_skb(*iskb); *iskb = NULL; return false; } @@ -369,12 +423,11 @@ none: * Replaces buffer if successful * Returns true if success, otherwise false */ -bool tipc_msg_make_bundle(struct sk_buff_head *list, - struct sk_buff *skb, u32 mtu, u32 dnode) +bool tipc_msg_make_bundle(struct sk_buff **skb, u32 mtu, u32 dnode) { struct sk_buff *bskb; struct tipc_msg *bmsg; - struct tipc_msg *msg = buf_msg(skb); + struct tipc_msg *msg = buf_msg(*skb); u32 msz = msg_size(msg); u32 max = mtu - INT_H_SIZE; @@ -398,9 +451,9 @@ bool tipc_msg_make_bundle(struct sk_buff_head *list, msg_set_seqno(bmsg, msg_seqno(msg)); msg_set_ack(bmsg, msg_ack(msg)); msg_set_bcast_ack(bmsg, msg_bcast_ack(msg)); - TIPC_SKB_CB(bskb)->bundling = true; - __skb_queue_tail(list, bskb); - return tipc_msg_bundle(list, skb, mtu); + tipc_msg_bundle(bskb, *skb, mtu); + *skb = bskb; + return true; } /** @@ -415,21 +468,17 @@ bool tipc_msg_reverse(u32 own_addr, struct sk_buff *buf, u32 *dnode, int err) { struct tipc_msg *msg = buf_msg(buf); - uint imp = msg_importance(msg); struct tipc_msg ohdr; uint rdsz = min_t(uint, msg_data_sz(msg), MAX_FORWARD_SIZE); if (skb_linearize(buf)) goto exit; + msg = buf_msg(buf); if (msg_dest_droppable(msg)) goto exit; if (msg_errcode(msg)) goto exit; - memcpy(&ohdr, msg, msg_hdr_sz(msg)); - imp = min_t(uint, imp + 1, TIPC_CRITICAL_IMPORTANCE); - if (msg_isdata(msg)) - msg_set_importance(msg, imp); msg_set_errcode(msg, err); msg_set_origport(msg, msg_destport(&ohdr)); msg_set_destport(msg, msg_origport(&ohdr)); diff --git a/net/tipc/msg.h b/net/tipc/msg.h index fa167846d1ab..bd3969a80dd4 100644 --- a/net/tipc/msg.h +++ b/net/tipc/msg.h @@ -1,7 +1,7 @@ /* * net/tipc/msg.h: Include file for TIPC message header routines * - * Copyright (c) 2000-2007, 2014, Ericsson AB + * Copyright (c) 2000-2007, 2014-2015 Ericsson AB * Copyright (c) 2005-2008, 2010-2011, Wind River Systems * All rights reserved. * @@ -54,6 +54,8 @@ struct plist; * - TIPC_HIGH_IMPORTANCE * - TIPC_CRITICAL_IMPORTANCE */ +#define TIPC_SYSTEM_IMPORTANCE 4 + /* * Payload message types @@ -64,6 +66,19 @@ struct plist; #define TIPC_DIRECT_MSG 3 /* + * Internal message users + */ +#define BCAST_PROTOCOL 5 +#define MSG_BUNDLER 6 +#define LINK_PROTOCOL 7 +#define CONN_MANAGER 8 +#define CHANGEOVER_PROTOCOL 10 +#define NAME_DISTRIBUTOR 11 +#define MSG_FRAGMENTER 12 +#define LINK_CONFIG 13 +#define SOCK_WAKEUP 14 /* pseudo user */ + +/* * Message header sizes */ #define SHORT_H_SIZE 24 /* In-cluster basic payload message */ @@ -92,7 +107,7 @@ struct plist; struct tipc_skb_cb { void *handle; struct sk_buff *tail; - bool deferred; + bool validated; bool wakeup_pending; bool bundling; u16 chain_sz; @@ -170,16 +185,6 @@ static inline void msg_set_user(struct tipc_msg *m, u32 n) msg_set_bits(m, 0, 25, 0xf, n); } -static inline u32 msg_importance(struct tipc_msg *m) -{ - return msg_bits(m, 0, 25, 0xf); -} - -static inline void msg_set_importance(struct tipc_msg *m, u32 i) -{ - msg_set_user(m, i); -} - static inline u32 msg_hdr_sz(struct tipc_msg *m) { return msg_bits(m, 0, 21, 0xf) << 2; @@ -336,6 +341,25 @@ static inline void msg_set_seqno(struct tipc_msg *m, u32 n) /* * Words 3-10 */ +static inline u32 msg_importance(struct tipc_msg *m) +{ + if (unlikely(msg_user(m) == MSG_FRAGMENTER)) + return msg_bits(m, 5, 13, 0x7); + if (likely(msg_isdata(m) && !msg_errcode(m))) + return msg_user(m); + return TIPC_SYSTEM_IMPORTANCE; +} + +static inline void msg_set_importance(struct tipc_msg *m, u32 i) +{ + if (unlikely(msg_user(m) == MSG_FRAGMENTER)) + msg_set_bits(m, 5, 13, 0x7, i); + else if (likely(i < TIPC_SYSTEM_IMPORTANCE)) + msg_set_user(m, i); + else + pr_warn("Trying to set illegal importance in message\n"); +} + static inline u32 msg_prevnode(struct tipc_msg *m) { return msg_word(m, 3); @@ -458,20 +482,6 @@ static inline struct tipc_msg *msg_get_wrapped(struct tipc_msg *m) */ /* - * Internal message users - */ -#define BCAST_PROTOCOL 5 -#define MSG_BUNDLER 6 -#define LINK_PROTOCOL 7 -#define CONN_MANAGER 8 -#define ROUTE_DISTRIBUTOR 9 /* obsoleted */ -#define CHANGEOVER_PROTOCOL 10 -#define NAME_DISTRIBUTOR 11 -#define MSG_FRAGMENTER 12 -#define LINK_CONFIG 13 -#define SOCK_WAKEUP 14 /* pseudo user */ - -/* * Connection management protocol message types */ #define CONN_PROBE 0 @@ -510,7 +520,6 @@ static inline struct tipc_msg *msg_get_wrapped(struct tipc_msg *m) #define DSC_REQ_MSG 0 #define DSC_RESP_MSG 1 - /* * Word 1 */ @@ -534,6 +543,16 @@ static inline void msg_set_node_sig(struct tipc_msg *m, u32 n) msg_set_bits(m, 1, 0, 0xffff, n); } +static inline u32 msg_node_capabilities(struct tipc_msg *m) +{ + return msg_bits(m, 1, 15, 0x1fff); +} + +static inline void msg_set_node_capabilities(struct tipc_msg *m, u32 n) +{ + msg_set_bits(m, 1, 15, 0x1fff, n); +} + /* * Word 2 @@ -734,13 +753,6 @@ static inline void msg_set_link_tolerance(struct tipc_msg *m, u32 n) msg_set_bits(m, 9, 0, 0xffff, n); } -static inline u32 tipc_msg_tot_importance(struct tipc_msg *m) -{ - if ((msg_user(m) == MSG_FRAGMENTER) && (msg_type(m) == FIRST_FRAGMENT)) - return msg_importance(msg_get_wrapped(m)); - return msg_importance(m); -} - static inline u32 msg_tot_origport(struct tipc_msg *m) { if ((msg_user(m) == MSG_FRAGMENTER) && (msg_type(m) == FIRST_FRAGMENT)) @@ -749,6 +761,7 @@ static inline u32 msg_tot_origport(struct tipc_msg *m) } struct sk_buff *tipc_buf_acquire(u32 size); +bool tipc_msg_validate(struct sk_buff *skb); bool tipc_msg_reverse(u32 own_addr, struct sk_buff *buf, u32 *dnode, int err); void tipc_msg_init(u32 own_addr, struct tipc_msg *m, u32 user, u32 type, @@ -757,9 +770,9 @@ struct sk_buff *tipc_msg_create(uint user, uint type, uint hdr_sz, uint data_sz, u32 dnode, u32 onode, u32 dport, u32 oport, int errcode); int tipc_buf_append(struct sk_buff **headbuf, struct sk_buff **buf); -bool tipc_msg_bundle(struct sk_buff_head *list, struct sk_buff *skb, u32 mtu); -bool tipc_msg_make_bundle(struct sk_buff_head *list, - struct sk_buff *skb, u32 mtu, u32 dnode); +bool tipc_msg_bundle(struct sk_buff *bskb, struct sk_buff *skb, u32 mtu); + +bool tipc_msg_make_bundle(struct sk_buff **skb, u32 mtu, u32 dnode); bool tipc_msg_extract(struct sk_buff *skb, struct sk_buff **iskb, int *pos); int tipc_msg_build(struct tipc_msg *mhdr, struct msghdr *m, int offset, int dsz, int mtu, struct sk_buff_head *list); diff --git a/net/tipc/node.c b/net/tipc/node.c index 86152de8248d..26d1de1bf34d 100644 --- a/net/tipc/node.c +++ b/net/tipc/node.c @@ -111,7 +111,7 @@ struct tipc_node *tipc_node_create(struct net *net, u32 addr) INIT_LIST_HEAD(&n_ptr->list); INIT_LIST_HEAD(&n_ptr->publ_list); INIT_LIST_HEAD(&n_ptr->conn_sks); - __skb_queue_head_init(&n_ptr->bclink.deferred_queue); + __skb_queue_head_init(&n_ptr->bclink.deferdq); hlist_add_head_rcu(&n_ptr->hash, &tn->node_htable[tipc_hashfn(addr)]); list_for_each_entry_rcu(temp_node, &tn->node_list, list) { if (n_ptr->addr < temp_node->addr) @@ -354,7 +354,7 @@ static void node_lost_contact(struct tipc_node *n_ptr) /* Flush broadcast link info associated with lost node */ if (n_ptr->bclink.recv_permitted) { - __skb_queue_purge(&n_ptr->bclink.deferred_queue); + __skb_queue_purge(&n_ptr->bclink.deferdq); if (n_ptr->bclink.reasm_buf) { kfree_skb(n_ptr->bclink.reasm_buf); diff --git a/net/tipc/node.h b/net/tipc/node.h index 3d18c66b7f78..e89ac04ec2c3 100644 --- a/net/tipc/node.h +++ b/net/tipc/node.h @@ -84,7 +84,7 @@ struct tipc_node_bclink { u32 last_sent; u32 oos_state; u32 deferred_size; - struct sk_buff_head deferred_queue; + struct sk_buff_head deferdq; struct sk_buff *reasm_buf; int inputq_map; bool recv_permitted; @@ -106,6 +106,7 @@ struct tipc_node_bclink { * @list: links to adjacent nodes in sorted list of cluster's nodes * @working_links: number of working links to node (both active and standby) * @link_cnt: number of links to node + * @capabilities: bitmap, indicating peer node's functional capabilities * @signature: node instance identifier * @link_id: local and remote bearer ids of changing link, if any * @publ_list: list of publications @@ -125,7 +126,8 @@ struct tipc_node { struct tipc_node_bclink bclink; struct list_head list; int link_cnt; - int working_links; + u16 working_links; + u16 capabilities; u32 signature; u32 link_id; struct list_head publ_list; |