Skip to content

Commit

Permalink
---
Browse files Browse the repository at this point in the history
yaml
---
r: 214446
b: refs/heads/master
c: 0f4b1c7
h: refs/heads/master
v: v3
  • Loading branch information
Zach Brown authored and Andy Grover committed Sep 9, 2010
1 parent 5b3f694 commit eaff393
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 59 deletions.
2 changes: 1 addition & 1 deletion [refs]
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
---
refs/heads/master: 501dcccdb7a2335cde07d4acb56e636182d62944
refs/heads/master: 0f4b1c7e89e699f588807a914ec6e6396c851a72
19 changes: 5 additions & 14 deletions trunk/net/rds/connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,7 @@ static struct rds_connection *__rds_conn_create(__be32 laddr, __be32 faddr,
spin_lock_init(&conn->c_lock);
conn->c_next_tx_seq = 1;

spin_lock_init(&conn->c_send_lock);
atomic_set(&conn->c_send_generation, 1);
atomic_set(&conn->c_senders, 0);
init_waitqueue_head(&conn->c_waitq);
INIT_LIST_HEAD(&conn->c_send_queue);
INIT_LIST_HEAD(&conn->c_retrans);

Expand Down Expand Up @@ -275,15 +273,8 @@ void rds_conn_shutdown(struct rds_connection *conn)
}
mutex_unlock(&conn->c_cm_lock);

/* verify everybody's out of rds_send_xmit() */
spin_lock_irq(&conn->c_send_lock);
spin_unlock_irq(&conn->c_send_lock);

while(atomic_read(&conn->c_senders)) {
schedule_timeout(1);
spin_lock_irq(&conn->c_send_lock);
spin_unlock_irq(&conn->c_send_lock);
}
wait_event(conn->c_waitq,
!test_bit(RDS_IN_XMIT, &conn->c_flags));

conn->c_trans->conn_shutdown(conn);
rds_conn_reset(conn);
Expand Down Expand Up @@ -477,8 +468,8 @@ static int rds_conn_info_visitor(struct rds_connection *conn,
sizeof(cinfo->transport));
cinfo->flags = 0;

rds_conn_info_set(cinfo->flags,
spin_is_locked(&conn->c_send_lock), SENDING);
rds_conn_info_set(cinfo->flags, test_bit(RDS_IN_XMIT, &conn->c_flags),
SENDING);
/* XXX Future: return the state rather than these funky bits */
rds_conn_info_set(cinfo->flags,
atomic_read(&conn->c_state) == RDS_CONN_CONNECTING,
Expand Down
2 changes: 1 addition & 1 deletion trunk/net/rds/ib_send.c
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ void rds_ib_send_cq_comp_handler(struct ib_cq *cq, void *context)
* credits (see rds_ib_send_add_credits below).
*
* The RDS send code is essentially single-threaded; rds_send_xmit
* grabs c_send_lock to ensure exclusive access to the send ring.
* sets RDS_IN_XMIT to ensure exclusive access to the send ring.
* However, the ACK sending code is independent and can race with
* message SENDs.
*
Expand Down
5 changes: 2 additions & 3 deletions trunk/net/rds/rds.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ enum {
/* Bits for c_flags */
#define RDS_LL_SEND_FULL 0
#define RDS_RECONNECT_PENDING 1
#define RDS_IN_XMIT 2

struct rds_connection {
struct hlist_node c_hash_node;
Expand All @@ -91,9 +92,6 @@ struct rds_connection {
struct rds_cong_map *c_lcong;
struct rds_cong_map *c_fcong;

spinlock_t c_send_lock; /* protect send ring */
atomic_t c_send_generation;
atomic_t c_senders;
struct rds_message *c_xmit_rm;
unsigned long c_xmit_sg;
unsigned int c_xmit_hdr_off;
Expand All @@ -120,6 +118,7 @@ struct rds_connection {
struct delayed_work c_conn_w;
struct work_struct c_down_w;
struct mutex c_cm_lock; /* protect conn state & cm */
wait_queue_head_t c_waitq;

struct list_head c_map_item;
unsigned long c_map_queued;
Expand Down
82 changes: 43 additions & 39 deletions trunk/net/rds/send.c
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,14 @@ module_param(send_batch_count, int, 0444);
MODULE_PARM_DESC(send_batch_count, " batch factor when working the send queue");

/*
* Reset the send state. Caller must hold c_send_lock when calling here.
* Reset the send state. Callers must ensure that this doesn't race with
* rds_send_xmit().
*/
void rds_send_reset(struct rds_connection *conn)
{
struct rds_message *rm, *tmp;
unsigned long flags;

spin_lock_irqsave(&conn->c_send_lock, flags);
if (conn->c_xmit_rm) {
rm = conn->c_xmit_rm;
conn->c_xmit_rm = NULL;
Expand All @@ -69,11 +69,7 @@ void rds_send_reset(struct rds_connection *conn)
* independently) but as the connection is down, there's
* no ongoing RDMA to/from that memory */
rds_message_unmapped(rm);
spin_unlock_irqrestore(&conn->c_send_lock, flags);

rds_message_put(rm);
} else {
spin_unlock_irqrestore(&conn->c_send_lock, flags);
}

conn->c_xmit_sg = 0;
Expand All @@ -98,6 +94,25 @@ void rds_send_reset(struct rds_connection *conn)
spin_unlock_irqrestore(&conn->c_lock, flags);
}

static int acquire_in_xmit(struct rds_connection *conn)
{
return test_and_set_bit(RDS_IN_XMIT, &conn->c_flags) == 0;
}

static void release_in_xmit(struct rds_connection *conn)
{
clear_bit(RDS_IN_XMIT, &conn->c_flags);
smp_mb__after_clear_bit();
/*
* We don't use wait_on_bit()/wake_up_bit() because our waking is in a
* hot path and finding waiters is very rare. We don't want to walk
* the system-wide hashed waitqueue buckets in the fast path only to
* almost never find waiters.
*/
if (waitqueue_active(&conn->c_waitq))
wake_up_all(&conn->c_waitq);
}

/*
* We're making the concious trade-off here to only send one message
* down the connection at a time.
Expand All @@ -119,12 +134,9 @@ int rds_send_xmit(struct rds_connection *conn)
unsigned int tmp;
struct scatterlist *sg;
int ret = 0;
int gen = 0;
LIST_HEAD(to_be_dropped);

restart:
if (!rds_conn_up(conn))
goto out;

/*
* sendmsg calls here after having queued its message on the send
Expand All @@ -133,18 +145,25 @@ int rds_send_xmit(struct rds_connection *conn)
* avoids blocking the caller and trading per-connection data between
* caches per message.
*/
if (!spin_trylock_irqsave(&conn->c_send_lock, flags)) {
if (!acquire_in_xmit(conn)) {
rds_stats_inc(s_send_lock_contention);
ret = -ENOMEM;
goto out;
}
atomic_inc(&conn->c_senders);

/*
* rds_conn_shutdown() sets the conn state and then tests RDS_IN_XMIT,
* we do the opposite to avoid races.
*/
if (!rds_conn_up(conn)) {
release_in_xmit(conn);
ret = 0;
goto out;
}

if (conn->c_trans->xmit_prepare)
conn->c_trans->xmit_prepare(conn);

gen = atomic_inc_return(&conn->c_send_generation);

/*
* spin trying to push headers and data down the connection until
* the connection doesn't make forward progress.
Expand Down Expand Up @@ -178,7 +197,7 @@ int rds_send_xmit(struct rds_connection *conn)
if (!rm) {
unsigned int len;

spin_lock(&conn->c_lock);
spin_lock_irqsave(&conn->c_lock, flags);

if (!list_empty(&conn->c_send_queue)) {
rm = list_entry(conn->c_send_queue.next,
Expand All @@ -193,7 +212,7 @@ int rds_send_xmit(struct rds_connection *conn)
list_move_tail(&rm->m_conn_item, &conn->c_retrans);
}

spin_unlock(&conn->c_lock);
spin_unlock_irqrestore(&conn->c_lock, flags);

if (!rm)
break;
Expand All @@ -207,10 +226,10 @@ int rds_send_xmit(struct rds_connection *conn)
*/
if (rm->rdma.op_active &&
test_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags)) {
spin_lock(&conn->c_lock);
spin_lock_irqsave(&conn->c_lock, flags);
if (test_and_clear_bit(RDS_MSG_ON_CONN, &rm->m_flags))
list_move(&rm->m_conn_item, &to_be_dropped);
spin_unlock(&conn->c_lock);
spin_unlock_irqrestore(&conn->c_lock, flags);
continue;
}

Expand Down Expand Up @@ -336,19 +355,7 @@ int rds_send_xmit(struct rds_connection *conn)
if (conn->c_trans->xmit_complete)
conn->c_trans->xmit_complete(conn);

/*
* We might be racing with another sender who queued a message but
* backed off on noticing that we held the c_send_lock. If we check
* for queued messages after dropping the sem then either we'll
* see the queued message or the queuer will get the sem. If we
* notice the queued message then we trigger an immediate retry.
*
* We need to be careful only to do this when we stopped processing
* the send queue because it was empty. It's the only way we
* stop processing the loop when the transport hasn't taken
* responsibility for forward progress.
*/
spin_unlock_irqrestore(&conn->c_send_lock, flags);
release_in_xmit(conn);

/* Nuke any messages we decided not to retransmit. */
if (!list_empty(&to_be_dropped)) {
Expand All @@ -358,13 +365,12 @@ int rds_send_xmit(struct rds_connection *conn)
rds_send_remove_from_sock(&to_be_dropped, RDS_RDMA_DROPPED);
}

atomic_dec(&conn->c_senders);

/*
* Other senders will see we have c_send_lock and exit. We
* need to recheck the send queue and race again for c_send_lock
* to make sure messages don't just sit on the send queue, if
* somebody hasn't already beat us into the loop.
* Other senders can queue a message after we last test the send queue
* but before we clear RDS_IN_XMIT. In that case they'd back off and
* not try and send their newly queued message. We need to check the
* send queue after having cleared RDS_IN_XMIT so that their message
* doesn't get stuck on the send queue.
*
* If the transport cannot continue (i.e ret != 0), then it must
* call us when more room is available, such as from the tx
Expand All @@ -374,9 +380,7 @@ int rds_send_xmit(struct rds_connection *conn)
smp_mb();
if (!list_empty(&conn->c_send_queue)) {
rds_stats_inc(s_send_lock_queue_raced);
if (gen == atomic_read(&conn->c_send_generation)) {
goto restart;
}
goto restart;
}
}
out:
Expand Down
2 changes: 1 addition & 1 deletion trunk/net/rds/threads.c
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
*
* Transition to state DISCONNECTING/DOWN:
* - Inside the shutdown worker; synchronizes with xmit path
* through c_send_lock, and with connection management callbacks
* through RDS_IN_XMIT, and with connection management callbacks
* via c_cm_lock.
*
* For receive callbacks, we rely on the underlying transport
Expand Down

0 comments on commit eaff393

Please sign in to comment.