Skip to content

Commit

Permalink
s390/qeth: make queue lock a proper spinlock
Browse files Browse the repository at this point in the history
queue->state is a ternary spinlock in disguise, used by
OSA's TX completion path to lock the Output Queue and flush any pending
packets on it to the device. If the Queue is already locked by our TX
code, setting the lock word to QETH_OUT_Q_LOCKED_FLUSH lets the TX
completion code move on - the TX path will later take care of things
when it unlocks the Queue.

This sort of DIY locking is a non-starter of course, just let the
TX completion path block on the spinlock when necessary. If that ends up
causing additional latency due to lock contention, then converting
the OSA path to use xmit_more is the right way to go forward.

Also slightly expand the locked section and capture all of
qeth_do_send_packet(), so that the update for the 'bufs_pack' statistics
is done race-free.

While reworking the TX completion path's code, remove a barrier() that
doesn't make any sense.

Signed-off-by: Julian Wiedmann <jwi@linux.ibm.com>
Signed-off-by: David S. Miller <davem@davemloft.net>
  • Loading branch information
Julian Wiedmann authored and David S. Miller committed Aug 27, 2020
1 parent beaadcc commit a166847
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 65 deletions.
8 changes: 1 addition & 7 deletions drivers/s390/net/qeth_core.h
Original file line number Diff line number Diff line change
Expand Up @@ -420,12 +420,6 @@ struct qeth_qdio_out_buffer {

struct qeth_card;

enum qeth_out_q_states {
QETH_OUT_Q_UNLOCKED,
QETH_OUT_Q_LOCKED,
QETH_OUT_Q_LOCKED_FLUSH,
};

#define QETH_CARD_STAT_ADD(_c, _stat, _val) ((_c)->stats._stat += (_val))
#define QETH_CARD_STAT_INC(_c, _stat) QETH_CARD_STAT_ADD(_c, _stat, 1)

Expand Down Expand Up @@ -486,12 +480,12 @@ struct qeth_qdio_out_q {
struct qeth_qdio_out_buffer *bufs[QDIO_MAX_BUFFERS_PER_Q];
struct qdio_outbuf_state *bufstates; /* convenience pointer */
struct qeth_out_q_stats stats;
spinlock_t lock;
u8 next_buf_to_fill;
u8 max_elements;
u8 queue_no;
u8 do_pack;
struct qeth_card *card;
atomic_t state;
/*
* number of buffers that are currently filled (PRIMED)
* -> these buffers are hardware-owned
Expand Down
80 changes: 22 additions & 58 deletions drivers/s390/net/qeth_core_main.c
Original file line number Diff line number Diff line change
Expand Up @@ -2702,6 +2702,7 @@ static int qeth_alloc_qdio_queues(struct qeth_card *card)
card->qdio.out_qs[i] = queue;
queue->card = card;
queue->queue_no = i;
spin_lock_init(&queue->lock);
timer_setup(&queue->timer, qeth_tx_completion_timer, 0);
queue->coalesce_usecs = QETH_TX_COALESCE_USECS;
queue->max_coalesced_frames = QETH_TX_MAX_COALESCED_FRAMES;
Expand Down Expand Up @@ -3068,7 +3069,6 @@ static int qeth_init_qdio_queues(struct qeth_card *card)
queue->bulk_max = qeth_tx_select_bulk_max(card, queue);
atomic_set(&queue->used_buffers, 0);
atomic_set(&queue->set_pci_flags_count, 0);
atomic_set(&queue->state, QETH_OUT_Q_UNLOCKED);
netdev_tx_reset_queue(netdev_get_tx_queue(card->dev, i));
}
return 0;
Expand Down Expand Up @@ -3741,37 +3741,31 @@ static void qeth_flush_queue(struct qeth_qdio_out_q *queue)

static void qeth_check_outbound_queue(struct qeth_qdio_out_q *queue)
{
int index;
int flush_cnt = 0;
int q_was_packing = 0;

/*
* check if weed have to switch to non-packing mode or if
* we have to get a pci flag out on the queue
*/
if ((atomic_read(&queue->used_buffers) <= QETH_LOW_WATERMARK_PACK) ||
!atomic_read(&queue->set_pci_flags_count)) {
if (atomic_xchg(&queue->state, QETH_OUT_Q_LOCKED_FLUSH) ==
QETH_OUT_Q_UNLOCKED) {
/*
* If we get in here, there was no action in
* do_send_packet. So, we check if there is a
* packing buffer to be flushed here.
*/
index = queue->next_buf_to_fill;
q_was_packing = queue->do_pack;
/* queue->do_pack may change */
barrier();
flush_cnt += qeth_switch_to_nonpacking_if_needed(queue);
if (!flush_cnt &&
!atomic_read(&queue->set_pci_flags_count))
flush_cnt += qeth_prep_flush_pack_buffer(queue);
unsigned int index, flush_cnt;
bool q_was_packing;

spin_lock(&queue->lock);

index = queue->next_buf_to_fill;
q_was_packing = queue->do_pack;

flush_cnt = qeth_switch_to_nonpacking_if_needed(queue);
if (!flush_cnt && !atomic_read(&queue->set_pci_flags_count))
flush_cnt = qeth_prep_flush_pack_buffer(queue);

if (flush_cnt) {
qeth_flush_buffers(queue, index, flush_cnt);
if (q_was_packing)
QETH_TXQ_STAT_ADD(queue, bufs_pack, flush_cnt);
if (flush_cnt)
qeth_flush_buffers(queue, index, flush_cnt);
atomic_set(&queue->state, QETH_OUT_Q_UNLOCKED);
}

spin_unlock(&queue->lock);
}
}

Expand Down Expand Up @@ -4283,29 +4277,22 @@ int qeth_do_send_packet(struct qeth_card *card, struct qeth_qdio_out_q *queue,
unsigned int offset, unsigned int hd_len,
int elements_needed)
{
unsigned int start_index = queue->next_buf_to_fill;
struct qeth_qdio_out_buffer *buffer;
unsigned int next_element;
struct netdev_queue *txq;
bool stopped = false;
int start_index;
int flush_count = 0;
int do_pack = 0;
int tmp;
int rc = 0;

/* spin until we get the queue ... */
while (atomic_cmpxchg(&queue->state, QETH_OUT_Q_UNLOCKED,
QETH_OUT_Q_LOCKED) != QETH_OUT_Q_UNLOCKED);
start_index = queue->next_buf_to_fill;
buffer = queue->bufs[queue->next_buf_to_fill];

/* Just a sanity check, the wake/stop logic should ensure that we always
* get a free buffer.
*/
if (atomic_read(&buffer->state) != QETH_QDIO_BUF_EMPTY) {
atomic_set(&queue->state, QETH_OUT_Q_UNLOCKED);
if (atomic_read(&buffer->state) != QETH_QDIO_BUF_EMPTY)
return -EBUSY;
}

txq = netdev_get_tx_queue(card->dev, skb_get_queue_mapping(skb));

Expand All @@ -4328,8 +4315,6 @@ int qeth_do_send_packet(struct qeth_card *card, struct qeth_qdio_out_q *queue,
QETH_QDIO_BUF_EMPTY) {
qeth_flush_buffers(queue, start_index,
flush_count);
atomic_set(&queue->state,
QETH_OUT_Q_UNLOCKED);
rc = -EBUSY;
goto out;
}
Expand Down Expand Up @@ -4361,31 +4346,8 @@ int qeth_do_send_packet(struct qeth_card *card, struct qeth_qdio_out_q *queue,

if (flush_count)
qeth_flush_buffers(queue, start_index, flush_count);
else if (!atomic_read(&queue->set_pci_flags_count))
atomic_xchg(&queue->state, QETH_OUT_Q_LOCKED_FLUSH);
/*
* queue->state will go from LOCKED -> UNLOCKED or from
* LOCKED_FLUSH -> LOCKED if output_handler wanted to 'notify' us
* (switch packing state or flush buffer to get another pci flag out).
* In that case we will enter this loop
*/
while (atomic_dec_return(&queue->state)) {
start_index = queue->next_buf_to_fill;
/* check if we can go back to non-packing state */
tmp = qeth_switch_to_nonpacking_if_needed(queue);
/*
* check if we need to flush a packing buffer to get a pci
* flag out on the queue
*/
if (!tmp && !atomic_read(&queue->set_pci_flags_count))
tmp = qeth_prep_flush_pack_buffer(queue);
if (tmp) {
qeth_flush_buffers(queue, start_index, tmp);
flush_count += tmp;
}
}

out:
/* at this point the queue is UNLOCKED again */
if (do_pack)
QETH_TXQ_STAT_ADD(queue, bufs_pack, flush_count);

Expand Down Expand Up @@ -4459,8 +4421,10 @@ int qeth_xmit(struct qeth_card *card, struct sk_buff *skb,
} else {
/* TODO: drop skb_orphan() once TX completion is fast enough */
skb_orphan(skb);
spin_lock(&queue->lock);
rc = qeth_do_send_packet(card, queue, skb, hdr, data_offset,
hd_len, elements);
spin_unlock(&queue->lock);
}

if (rc && !push_len)
Expand Down

0 comments on commit a166847

Please sign in to comment.