BNX2X: Transmit synchronization
authorBarret Rhoden <brho@cs.berkeley.edu>
Fri, 27 Feb 2015 23:05:11 +0000 (18:05 -0500)
committerBarret Rhoden <brho@cs.berkeley.edu>
Mon, 2 Mar 2015 16:59:11 +0000 (11:59 -0500)
The transmit function is called serially, per txdata, meaning there is only
ever one caller of it.  Instead of a netif lock, we use the poker, which
guarantees mutual exclusion (per poke_tracker).

The transmitter stops when the device is full.  Concurrently, the tx_int
could have freed up space in the tx ring.  When tx_int calls poke, poke
will make sure the transmit function runs at least once after poke is
called.  This ensures we don't stall on transmits.

If you're concerned about the network stack calling transmit over and
over while the TX ring is full, we can optimize slightly.  See the notes
above __bnx2x_tx_queue() for details.  In short, poke() is powerful, but
it won't solve all of your problems.

kern/drivers/net/bnx2x/bnx2x.h
kern/drivers/net/bnx2x/bnx2x_cmn.c
kern/drivers/net/bnx2x/bnx2x_cmn.h
kern/drivers/net/bnx2x/bnx2x_dev.c

index ba58476..1aec4b4 100644 (file)
@@ -502,6 +502,9 @@ struct bnx2x_fp_txdata {
        int                     txq_index;
        struct bnx2x_fastpath   *parent_fp;
        int                     tx_ring_size;
+
+       struct poke_tracker                     poker;
+       struct queue                            *oq;
 };
 
 enum bnx2x_tpa_mode_t {
index 4c5f0d5..ec5cb0c 100644 (file)
@@ -267,7 +267,6 @@ static uint16_t bnx2x_free_tx_pkt(struct bnx2x *bp, struct bnx2x_fp_txdata *txda
 
 int bnx2x_tx_int(struct bnx2x *bp, struct bnx2x_fp_txdata *txdata)
 {
-       //struct netdev_queue *txq; // AKAROS_PORT
        uint16_t hw_cons, sw_cons, bd_cons = txdata->tx_bd_cons;
        unsigned int pkts_compl = 0, bytes_compl = 0;
 
@@ -276,7 +275,6 @@ int bnx2x_tx_int(struct bnx2x *bp, struct bnx2x_fp_txdata *txdata)
                return -1;
 #endif
 
-       //txq = netdev_get_tx_queue(bp->dev, txdata->txq_index); // AKAROS_PORT
        hw_cons = le16_to_cpu(*txdata->tx_cons_sb);
        sw_cons = txdata->tx_pkt_cons;
 
@@ -295,48 +293,11 @@ int bnx2x_tx_int(struct bnx2x *bp, struct bnx2x_fp_txdata *txdata)
                sw_cons++;
        }
 
-       //netdev_tx_completed_queue(txq, pkts_compl, bytes_compl); // AKAROS_PORT
-
        txdata->tx_pkt_cons = sw_cons;
        txdata->tx_bd_cons = bd_cons;
 
-       /* Need to make the tx_bd_cons update visible to start_xmit()
-        * before checking for netif_tx_queue_stopped().  Without the
-        * memory barrier, there is a small possibility that
-        * start_xmit() will miss it and cause the queue to be stopped
-        * forever.
-        * On the other hand we need an rmb() here to ensure the proper
-        * ordering of bit testing in the following
-        * netif_tx_queue_stopped(txq) call.
-        */
-       mb();
-
-       /* TODO: AKAROS_PORT XME restart transmit */
-
-       return 0;
-#if 0 // AKAROS_PORT netif queue stuff on tx_int
-       if (unlikely(netif_tx_queue_stopped(txq))) {
-               /* Taking tx_lock() is needed to prevent re-enabling the queue
-                * while it's empty. This could have happen if rx_action() gets
-                * suspended in bnx2x_tx_int() after the condition before
-                * netif_tx_wake_queue(), while tx_action (bnx2x_start_xmit()):
-                *
-                * stops the queue->sees fresh tx_bd_cons->releases the queue->
-                * sends some packets consuming the whole queue again->
-                * stops the queue
-                */
-
-               __netif_tx_lock(txq, core_id());
-
-               if ((netif_tx_queue_stopped(txq)) &&
-                   (bp->state == BNX2X_STATE_OPEN) &&
-                   (bnx2x_tx_avail(bp, txdata) >= MAX_DESC_PER_TX_PKT))
-                       netif_tx_wake_queue(txq);
-
-               __netif_tx_unlock(txq);
-       }
+       poke(&txdata->poker, txdata);
        return 0;
-#endif
 }
 
 static inline void bnx2x_update_last_max_sge(struct bnx2x_fastpath *fp,
@@ -3829,11 +3790,11 @@ panic("Not implemented");
  * bnx2x_tx_int() runs without netif_tx_lock unless it needs to call
  * netif_wake_queue()
  */
-netdev_tx_t bnx2x_start_xmit(struct block *block, struct ether *dev)
+netdev_tx_t bnx2x_start_xmit(struct block *block,
+                             struct bnx2x_fp_txdata *txdata)
 {
-       struct bnx2x *bp = netdev_priv(dev);
+       struct bnx2x *bp = txdata->parent_fp->bp;
 
-       struct bnx2x_fp_txdata *txdata;
        struct sw_tx_bd *tx_buf;
        struct eth_tx_start_bd *tx_start_bd, *first_bd;
        struct eth_tx_bd *tx_data_bd, *total_pkt_bd = NULL;
@@ -3856,16 +3817,12 @@ netdev_tx_t bnx2x_start_xmit(struct block *block, struct ether *dev)
                return NETDEV_TX_BUSY;
 #endif
 
-#if 0 // AKAROS_PORT TODO: pick a queue
-       txq_index = skb_get_queue_mapping(skb);
-       txq = netdev_get_tx_queue(dev, txq_index);
-#else
-       txq_index = 0;
-#endif
+       txq_index = txdata->txq_index;
+       assert(txq_index == 0); // AKAROS_PORT til we get multi-queue working
+       assert(txdata == &bp->bnx2x_txq[txq_index]);
 
        assert(!(txq_index >= MAX_ETH_TXQ_IDX(bp) + (CNIC_LOADED(bp) ? 1 : 0)));
 
-       txdata = &bp->bnx2x_txq[txq_index];
 
        /* enable this debug print to view the transmission queue being used
        DP(NETIF_MSG_TX_QUEUED, "indices: txq %d, fp %d, txdata %d\n",
@@ -3911,7 +3868,7 @@ netdev_tx_t bnx2x_start_xmit(struct block *block, struct ether *dev)
 
        /* set flag according to packet type (UNICAST_ADDRESS is default)*/
        if (unlikely(is_multicast_ether_addr(eth->d))) {
-               if (eaddrcmp(eth->d, dev->bcast))
+               if (eaddrcmp(eth->d, bp->edev->bcast))
                        mac_type = BROADCAST_ADDRESS;
                else
                        mac_type = MULTICAST_ADDRESS;
@@ -4273,18 +4230,6 @@ netdev_tx_t bnx2x_start_xmit(struct block *block, struct ether *dev)
 
        txdata->tx_bd_prod += nbd;
 
-       if (unlikely(bnx2x_tx_avail(bp, txdata) < MAX_DESC_PER_TX_PKT)) {
-               netif_tx_stop_queue(txq);
-
-               /* paired memory barrier is in bnx2x_tx_int(), we have to keep
-                * ordering of set_bit() in netif_tx_stop_queue() and read of
-                * fp->bd_tx_cons */
-               mb();
-
-               bnx2x_fp_qstats(bp, txdata->parent_fp)->driver_xoff++;
-               if (bnx2x_tx_avail(bp, txdata) >= MAX_DESC_PER_TX_PKT)
-                       netif_tx_wake_queue(txq);
-       }
        txdata->tx_pkt++;
 
        return NETDEV_TX_OK;
index 4d928e6..6bb1051 100644 (file)
@@ -1123,6 +1123,12 @@ static inline void bnx2x_init_txdata(struct bnx2x *bp,
        txdata->parent_fp = fp;
        txdata->tx_ring_size = IS_FCOE_FP(fp) ? MAX_TX_AVAIL : bp->tx_ring_size;
 
+       /* Poke function - ghetto extern from bnx2x_dev.c */
+       extern void __bnx2x_tx_queue(void *txdata_arg);
+       poke_init(&txdata->poker, __bnx2x_tx_queue);
+       /* TODO: AKAROS_PORT multi queue: assign proper oq */
+       txdata->oq = bp->edev->oq;
+
        DP(NETIF_MSG_IFUP, "created tx data cid %d, txq %d\n",
           txdata->cid, txdata->txq_index);
 }
index 96bc8e1..8a3c8f9 100644 (file)
@@ -34,7 +34,8 @@ extern int bnx2x_init_one(struct ether *dev, struct bnx2x *bp,
                           const struct pci_device_id *ent);
 extern int bnx2x_open(struct ether *dev);
 extern void bnx2x_set_rx_mode(struct ether *dev);
-extern netdev_tx_t bnx2x_start_xmit(struct block *block, struct ether *dev);
+extern netdev_tx_t bnx2x_start_xmit(struct block *block,
+                                    struct bnx2x_fp_txdata *txdata);
 
 spinlock_t bnx2x_tq_lock = SPINLOCK_INITIALIZER;
 TAILQ_HEAD(bnx2x_tq, bnx2x);
@@ -137,40 +138,83 @@ static void bnx2x_multicast(void *arg, uint8_t * addr, int add)
        /* TODO: add or remove a multicast addr */
 }
 
-/* Transmit initialization.  Not mandatory for 9ns, but a good idea */
-static void bnx2x_txinit(struct bnx2x *ctlr)
+/* The poke function: we are guaranteed that only one copy of this func runs
+ * per poke tracker (per queue).  Both transmit and tx_int will poke, and after
+ * any pokes, the func will run at least once.
+ *
+ * Some notes for optimizing and synchronization:
+ *
+ * If we want a flag or something to keep us from checking the oq and attempting
+ * the xmit, all that will do is speed up xmit when the tx rings are full.
+ * You'd need to be careful.  The post/poke makes sure that this'll run after
+ * work was posted, but if this function sets an abort flag and later checks it,
+ * you need to check tx_avail *after* setting the flag (check, signal, check
+ * again).  Consider this:
+ *
+ * this func:
+ *             calls start_xmit, fails with BUSY.  wants to set ABORT flag
+ *
+ *      PAUSE - meanwhile:
+ *
+ * tx_int clears the ABORT flag, then pokes:
+ *             drain so there is room;
+ *             clear flag (combo of these is "post work");
+ *             poke;.  guaranteed that poke will happen after we cleared flag.
+ *                     but it is concurrent with this function
+ *
+ *             RESUME this func:
+ *
+ *             sets ABORT flag
+ *             returns.
+ *             tx_int's poke ensures we run again
+ *             we run again and see ABORT, then return
+ *             never try again til the next tx_int, if ever
+ *
+ * Instead, in this func, we must set ABORT flag, then check tx_avail.  Or
+ * have two flags, one set by us, another set by tx_int, where this func only
+ * clears the tx_int flag when it will attempt a start_xmit.
+ *
+ * It's probably easier to just check tx_avail before entering the while loop,
+ * if you're really concerned.  If you want to do the flag thing, probably use
+ * two flags (atomically), and be careful. */
+void __bnx2x_tx_queue(void *txdata_arg)
 {
+       struct bnx2x_fp_txdata *txdata = txdata_arg;
+       struct block *block;
+       struct queue *oq = txdata->oq;
+
+       /* TODO: avoid bugs til multi-queue is working */
+       assert(oq);
+       assert(txdata->txq_index == 0);
+
+       while ((block = qget(oq))) {
+               if ((bnx2x_start_xmit(block, txdata) != NETDEV_TX_OK)) {
+                       /* all queue readers are sync'd by the poke, so we can putback
+                        * without fear of going out of order. */
+
+                       /* TODO: q code has methods that should be called with the spinlock
+                        * held, but no methods to do the locking... */
+                       //spin_unlock_irqsave(&oq->lock);
+                       qputback(oq, block);
+                       //spin_lock_irqsave(&oq->lock);
+
+                       /* device can't handle any more, we're done for now.  tx_int will
+                        * poke when space frees up.  it may be poking concurrently, and in
+                        * which case, we'll run again immediately. */
+                       break;
+               }
+       }
 }
 
 static void bnx2x_transmit(struct ether *edev)
 {
-       struct block *bp;
-       struct bnx2x *ctlr;
-
-       ctlr = edev->ctlr;
+       struct bnx2x *ctlr = edev->ctlr;
+       struct bnx2x_fp_txdata *txdata;
+       /* TODO: determine the tx queue we're supposed to work on */
+       int txq_index = 0;
 
-       /* Don't forget to spin_lock_irqsave */
-
-       /* TODO: Free any completed packets */
-
-       /* Try to fill the ring back up.  While there is space, yank from the output
-        * queue (oq) and put them in the Tx desc. */
-       while (1) {
-       //while (NEXT_RING(tdt, ctlr->ntd) != tdh) {
-               if ((bp = qget(edev->oq)) == NULL)
-                       break;
-               bnx2x_start_xmit(bp, edev);
-               //td = &ctlr->tdba[tdt];
-               //td->addr[0] = paddr_low32(bp->rp);
-               //td->addr[1] = paddr_high32(bp->rp);
-               /* if we're breaking out, make sure to set the IRQ mask */
-               //if (NEXT_RING(tdt, ctlr->ntd) == tdh) {
-               //      // other stuff removed
-               //      csr32w(ctlr, Tdt, tdt);
-               //      igbeim(ctlr, Txdw);
-               //      break;
-               //}
-       }
+       txdata = &ctlr->bnx2x_txq[txq_index];
+       poke(&txdata->poker, txdata);
 }
 
 /* Not mandatory.  Called to make sure there are free blocks available for
@@ -288,8 +332,6 @@ static void bnx2x_attach(struct ether *edev)
        snprintf(name, KNAMELEN, "#l%d-bnx2x_rproc", edev->ctlrno);
        ktask(name, bnx2x_rproc, edev);
 
-       bnx2x_txinit(ctlr);
-
        qunlock(&ctlr->alock);
 }