tipc: smooth change between replicast and broadcast

Currently, a multicast stream may start out using replicast, because
there are few destinations, and then it should ideally switch to
L2/broadcast IGMP/multicast when the number of destinations grows beyond
a certain limit. The opposite should happen when the number decreases
below the limit.

To eliminate the risk of message reordering caused by method change,
a sending socket must stick to a previously selected method until it
enters an idle period of 5 seconds. Means there is a 5 seconds pause
in the traffic from the sender socket.

If the sender never makes such a pause, the method will never change,
and transmission may become very inefficient as the cluster grows.

With this commit, we allow such a switch between replicast and
broadcast without any need for a traffic pause.

Solution is to send a dummy message with only the header, also with
the SYN bit set, via broadcast or replicast. For the data message,
the SYN bit is set and sending via replicast or broadcast (inverse
method with dummy).

Then, at receiving side any messages follow first SYN bit message
(data or dummy message), they will be held in deferred queue until
another pair (dummy or data message) arrived in other link.

v2: reverse christmas tree declaration

Acked-by: Jon Maloy <jon.maloy@ericsson.com>
Signed-off-by: Hoang Le <hoang.h.le@dektech.com.au>
Signed-off-by: David S. Miller <davem@davemloft.net>
This commit is contained in:
Hoang Le 2019-03-19 18:49:50 +07:00 committed by David S. Miller
parent ff2ebbfba6
commit c55c8edafa
4 changed files with 184 additions and 1 deletions

View File

@ -220,9 +220,24 @@ static void tipc_bcast_select_xmit_method(struct net *net, int dests,
} }
/* Can current method be changed ? */ /* Can current method be changed ? */
method->expires = jiffies + TIPC_METHOD_EXPIRE; method->expires = jiffies + TIPC_METHOD_EXPIRE;
if (method->mandatory || time_before(jiffies, exp)) if (method->mandatory)
return; return;
if (!(tipc_net(net)->capabilities & TIPC_MCAST_RBCTL) &&
time_before(jiffies, exp))
return;
/* Configuration as force 'broadcast' method */
if (bb->force_bcast) {
method->rcast = false;
return;
}
/* Configuration as force 'replicast' method */
if (bb->force_rcast) {
method->rcast = true;
return;
}
/* Configuration as 'autoselect' or default method */
/* Determine method to use now */ /* Determine method to use now */
method->rcast = dests <= bb->bc_threshold; method->rcast = dests <= bb->bc_threshold;
} }
@ -285,6 +300,63 @@ static int tipc_rcast_xmit(struct net *net, struct sk_buff_head *pkts,
return 0; return 0;
} }
/* tipc_mcast_send_sync - deliver a dummy message with SYN bit
* @net: the applicable net namespace
* @skb: socket buffer to copy
* @method: send method to be used
* @dests: destination nodes for message.
* @cong_link_cnt: returns number of encountered congested destination links
* Returns 0 if success, otherwise errno
*/
static int tipc_mcast_send_sync(struct net *net, struct sk_buff *skb,
struct tipc_mc_method *method,
struct tipc_nlist *dests,
u16 *cong_link_cnt)
{
struct tipc_msg *hdr, *_hdr;
struct sk_buff_head tmpq;
struct sk_buff *_skb;
/* Is a cluster supporting with new capabilities ? */
if (!(tipc_net(net)->capabilities & TIPC_MCAST_RBCTL))
return 0;
hdr = buf_msg(skb);
if (msg_user(hdr) == MSG_FRAGMENTER)
hdr = msg_get_wrapped(hdr);
if (msg_type(hdr) != TIPC_MCAST_MSG)
return 0;
/* Allocate dummy message */
_skb = tipc_buf_acquire(MCAST_H_SIZE, GFP_KERNEL);
if (!skb)
return -ENOMEM;
/* Preparing for 'synching' header */
msg_set_syn(hdr, 1);
/* Copy skb's header into a dummy header */
skb_copy_to_linear_data(_skb, hdr, MCAST_H_SIZE);
skb_orphan(_skb);
/* Reverse method for dummy message */
_hdr = buf_msg(_skb);
msg_set_size(_hdr, MCAST_H_SIZE);
msg_set_is_rcast(_hdr, !msg_is_rcast(hdr));
skb_queue_head_init(&tmpq);
__skb_queue_tail(&tmpq, _skb);
if (method->rcast)
tipc_bcast_xmit(net, &tmpq, cong_link_cnt);
else
tipc_rcast_xmit(net, &tmpq, dests, cong_link_cnt);
/* This queue should normally be empty by now */
__skb_queue_purge(&tmpq);
return 0;
}
/* tipc_mcast_xmit - deliver message to indicated destination nodes /* tipc_mcast_xmit - deliver message to indicated destination nodes
* and to identified node local sockets * and to identified node local sockets
* @net: the applicable net namespace * @net: the applicable net namespace
@ -300,6 +372,9 @@ int tipc_mcast_xmit(struct net *net, struct sk_buff_head *pkts,
u16 *cong_link_cnt) u16 *cong_link_cnt)
{ {
struct sk_buff_head inputq, localq; struct sk_buff_head inputq, localq;
bool rcast = method->rcast;
struct tipc_msg *hdr;
struct sk_buff *skb;
int rc = 0; int rc = 0;
skb_queue_head_init(&inputq); skb_queue_head_init(&inputq);
@ -313,6 +388,18 @@ int tipc_mcast_xmit(struct net *net, struct sk_buff_head *pkts,
/* Send according to determined transmit method */ /* Send according to determined transmit method */
if (dests->remote) { if (dests->remote) {
tipc_bcast_select_xmit_method(net, dests->remote, method); tipc_bcast_select_xmit_method(net, dests->remote, method);
skb = skb_peek(pkts);
hdr = buf_msg(skb);
if (msg_user(hdr) == MSG_FRAGMENTER)
hdr = msg_get_wrapped(hdr);
msg_set_is_rcast(hdr, method->rcast);
/* Switch method ? */
if (rcast != method->rcast)
tipc_mcast_send_sync(net, skb, method,
dests, cong_link_cnt);
if (method->rcast) if (method->rcast)
rc = tipc_rcast_xmit(net, pkts, dests, cong_link_cnt); rc = tipc_rcast_xmit(net, pkts, dests, cong_link_cnt);
else else
@ -672,3 +759,79 @@ u32 tipc_bcast_get_broadcast_ratio(struct net *net)
return bb->rc_ratio; return bb->rc_ratio;
} }
void tipc_mcast_filter_msg(struct sk_buff_head *defq,
struct sk_buff_head *inputq)
{
struct sk_buff *skb, *_skb, *tmp;
struct tipc_msg *hdr, *_hdr;
bool match = false;
u32 node, port;
skb = skb_peek(inputq);
hdr = buf_msg(skb);
if (likely(!msg_is_syn(hdr) && skb_queue_empty(defq)))
return;
node = msg_orignode(hdr);
port = msg_origport(hdr);
/* Has the twin SYN message already arrived ? */
skb_queue_walk(defq, _skb) {
_hdr = buf_msg(_skb);
if (msg_orignode(_hdr) != node)
continue;
if (msg_origport(_hdr) != port)
continue;
match = true;
break;
}
if (!match) {
if (!msg_is_syn(hdr))
return;
__skb_dequeue(inputq);
__skb_queue_tail(defq, skb);
return;
}
/* Deliver non-SYN message from other link, otherwise queue it */
if (!msg_is_syn(hdr)) {
if (msg_is_rcast(hdr) != msg_is_rcast(_hdr))
return;
__skb_dequeue(inputq);
__skb_queue_tail(defq, skb);
return;
}
/* Queue non-SYN/SYN message from same link */
if (msg_is_rcast(hdr) == msg_is_rcast(_hdr)) {
__skb_dequeue(inputq);
__skb_queue_tail(defq, skb);
return;
}
/* Matching SYN messages => return the one with data, if any */
__skb_unlink(_skb, defq);
if (msg_data_sz(hdr)) {
kfree_skb(_skb);
} else {
__skb_dequeue(inputq);
kfree_skb(skb);
__skb_queue_tail(inputq, _skb);
}
/* Deliver subsequent non-SYN messages from same peer */
skb_queue_walk_safe(defq, _skb, tmp) {
_hdr = buf_msg(_skb);
if (msg_orignode(_hdr) != node)
continue;
if (msg_origport(_hdr) != port)
continue;
if (msg_is_syn(_hdr))
break;
__skb_unlink(_skb, defq);
__skb_queue_tail(inputq, _skb);
}
}

View File

@ -67,11 +67,13 @@ void tipc_nlist_del(struct tipc_nlist *nl, u32 node);
/* Cookie to be used between socket and broadcast layer /* Cookie to be used between socket and broadcast layer
* @rcast: replicast (instead of broadcast) was used at previous xmit * @rcast: replicast (instead of broadcast) was used at previous xmit
* @mandatory: broadcast/replicast indication was set by user * @mandatory: broadcast/replicast indication was set by user
* @deferredq: defer queue to make message in order
* @expires: re-evaluate non-mandatory transmit method if we are past this * @expires: re-evaluate non-mandatory transmit method if we are past this
*/ */
struct tipc_mc_method { struct tipc_mc_method {
bool rcast; bool rcast;
bool mandatory; bool mandatory;
struct sk_buff_head deferredq;
unsigned long expires; unsigned long expires;
}; };
@ -99,6 +101,9 @@ int tipc_bclink_reset_stats(struct net *net);
u32 tipc_bcast_get_broadcast_mode(struct net *net); u32 tipc_bcast_get_broadcast_mode(struct net *net);
u32 tipc_bcast_get_broadcast_ratio(struct net *net); u32 tipc_bcast_get_broadcast_ratio(struct net *net);
void tipc_mcast_filter_msg(struct sk_buff_head *defq,
struct sk_buff_head *inputq);
static inline void tipc_bcast_lock(struct net *net) static inline void tipc_bcast_lock(struct net *net)
{ {
spin_lock_bh(&tipc_net(net)->bclock); spin_lock_bh(&tipc_net(net)->bclock);

View File

@ -257,6 +257,16 @@ static inline void msg_set_src_droppable(struct tipc_msg *m, u32 d)
msg_set_bits(m, 0, 18, 1, d); msg_set_bits(m, 0, 18, 1, d);
} }
static inline bool msg_is_rcast(struct tipc_msg *m)
{
return msg_bits(m, 0, 18, 0x1);
}
static inline void msg_set_is_rcast(struct tipc_msg *m, bool d)
{
msg_set_bits(m, 0, 18, 0x1, d);
}
static inline void msg_set_size(struct tipc_msg *m, u32 sz) static inline void msg_set_size(struct tipc_msg *m, u32 sz)
{ {
m->hdr[0] = htonl((msg_word(m, 0) & ~0x1ffff) | sz); m->hdr[0] = htonl((msg_word(m, 0) & ~0x1ffff) | sz);

View File

@ -485,6 +485,7 @@ static int tipc_sk_create(struct net *net, struct socket *sock,
tsk_set_unreturnable(tsk, true); tsk_set_unreturnable(tsk, true);
if (sock->type == SOCK_DGRAM) if (sock->type == SOCK_DGRAM)
tsk_set_unreliable(tsk, true); tsk_set_unreliable(tsk, true);
__skb_queue_head_init(&tsk->mc_method.deferredq);
} }
trace_tipc_sk_create(sk, NULL, TIPC_DUMP_NONE, " "); trace_tipc_sk_create(sk, NULL, TIPC_DUMP_NONE, " ");
@ -582,6 +583,7 @@ static int tipc_release(struct socket *sock)
sk->sk_shutdown = SHUTDOWN_MASK; sk->sk_shutdown = SHUTDOWN_MASK;
tipc_sk_leave(tsk); tipc_sk_leave(tsk);
tipc_sk_withdraw(tsk, 0, NULL); tipc_sk_withdraw(tsk, 0, NULL);
__skb_queue_purge(&tsk->mc_method.deferredq);
sk_stop_timer(sk, &sk->sk_timer); sk_stop_timer(sk, &sk->sk_timer);
tipc_sk_remove(tsk); tipc_sk_remove(tsk);
@ -2162,6 +2164,9 @@ static void tipc_sk_filter_rcv(struct sock *sk, struct sk_buff *skb,
if (unlikely(grp)) if (unlikely(grp))
tipc_group_filter_msg(grp, &inputq, xmitq); tipc_group_filter_msg(grp, &inputq, xmitq);
if (msg_type(hdr) == TIPC_MCAST_MSG)
tipc_mcast_filter_msg(&tsk->mc_method.deferredq, &inputq);
/* Validate and add to receive buffer if there is space */ /* Validate and add to receive buffer if there is space */
while ((skb = __skb_dequeue(&inputq))) { while ((skb = __skb_dequeue(&inputq))) {
hdr = buf_msg(skb); hdr = buf_msg(skb);