Skip to content

Commit

Permalink
Merge branch 'ipc-scalability'
Browse files Browse the repository at this point in the history
Merge IPC cleanup and scalability patches from Andrew Morton.

This cleans up many of the oddities in the IPC code, uses the list
iterator helpers, splits out locking and adds per-semaphore locks for
greater scalability of the IPC semaphore code.

Most normal user-level locking by now uses futexes (ie pthreads, but
also a lot of specialized locks), but SysV IPC semaphores are apparently
still used in some big applications, either for portability reasons, or
because they offer tracking and undo (and you don't need to have a
special shared memory area for them).

Our IPC semaphore scalability was pitiful.  We used to lock much too big
ranges, and we used to have a single ipc lock per ipc semaphore array.
Most loads never cared, but some do.  There are some numbers in the
individual commits.

* ipc-scalability:
  ipc: sysv shared memory limited to 8TiB
  ipc/msg.c: use list_for_each_entry_[safe] for list traversing
  ipc,sem: fine grained locking for semtimedop
  ipc,sem: have only one list in struct sem_queue
  ipc,sem: open code and rename sem_lock
  ipc,sem: do not hold ipc lock more than necessary
  ipc: introduce lockless pre_down ipcctl
  ipc: introduce obtaining a lockless ipc object
  ipc: remove bogus lock comment for ipc_checkid
  ipc/msgutil.c: use linux/uaccess.h
  ipc: refactor msg list search into separate function
  ipc: simplify msg list search
  ipc: implement MSG_COPY as a new receive mode
  ipc: remove msg handling from queue scan
  ipc: set EFAULT as default error in load_msg()
  ipc: tighten msg copy loops
  ipc: separate msg allocation from userspace copy
  ipc: clamp with min()
  • Loading branch information
Linus Torvalds committed May 1, 2013
2 parents 149b306 + d69f3ba commit 823e75f
Show file tree
Hide file tree
Showing 7 changed files with 540 additions and 341 deletions.
2 changes: 1 addition & 1 deletion include/linux/ipc_namespace.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ struct ipc_namespace {

size_t shm_ctlmax;
size_t shm_ctlall;
unsigned long shm_tot;
int shm_ctlmni;
int shm_tot;
/*
* Defines whether IPC_RMID is forced for _all_ shm segments regardless
* of shmctl()
Expand Down
124 changes: 50 additions & 74 deletions ipc/msg.c
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ struct msg_sender {
#define SEARCH_EQUAL 2
#define SEARCH_NOTEQUAL 3
#define SEARCH_LESSEQUAL 4
#define SEARCH_NUMBER 5

#define msg_ids(ns) ((ns)->ids[IPC_MSG_IDS])

Expand Down Expand Up @@ -237,14 +238,9 @@ static inline void ss_del(struct msg_sender *mss)

static void ss_wakeup(struct list_head *h, int kill)
{
struct list_head *tmp;
struct msg_sender *mss, *t;

tmp = h->next;
while (tmp != h) {
struct msg_sender *mss;

mss = list_entry(tmp, struct msg_sender, list);
tmp = tmp->next;
list_for_each_entry_safe(mss, t, h, list) {
if (kill)
mss->list.next = NULL;
wake_up_process(mss->tsk);
Expand All @@ -253,14 +249,9 @@ static void ss_wakeup(struct list_head *h, int kill)

static void expunge_all(struct msg_queue *msq, int res)
{
struct list_head *tmp;

tmp = msq->q_receivers.next;
while (tmp != &msq->q_receivers) {
struct msg_receiver *msr;
struct msg_receiver *msr, *t;

msr = list_entry(tmp, struct msg_receiver, r_list);
tmp = tmp->next;
list_for_each_entry_safe(msr, t, &msq->q_receivers, r_list) {
msr->r_msg = NULL;
wake_up_process(msr->r_tsk);
smp_mb();
Expand All @@ -278,19 +269,15 @@ static void expunge_all(struct msg_queue *msq, int res)
*/
static void freeque(struct ipc_namespace *ns, struct kern_ipc_perm *ipcp)
{
struct list_head *tmp;
struct msg_msg *msg, *t;
struct msg_queue *msq = container_of(ipcp, struct msg_queue, q_perm);

expunge_all(msq, -EIDRM);
ss_wakeup(&msq->q_senders, 1);
msg_rmid(ns, msq);
msg_unlock(msq);

tmp = msq->q_messages.next;
while (tmp != &msq->q_messages) {
struct msg_msg *msg = list_entry(tmp, struct msg_msg, m_list);

tmp = tmp->next;
list_for_each_entry_safe(msg, t, &msq->q_messages, m_list) {
atomic_dec(&ns->msg_hdrs);
free_msg(msg);
}
Expand Down Expand Up @@ -583,6 +570,7 @@ static int testmsg(struct msg_msg *msg, long type, int mode)
switch(mode)
{
case SEARCH_ANY:
case SEARCH_NUMBER:
return 1;
case SEARCH_LESSEQUAL:
if (msg->m_type <=type)
Expand All @@ -602,14 +590,9 @@ static int testmsg(struct msg_msg *msg, long type, int mode)

static inline int pipelined_send(struct msg_queue *msq, struct msg_msg *msg)
{
struct list_head *tmp;
struct msg_receiver *msr, *t;

tmp = msq->q_receivers.next;
while (tmp != &msq->q_receivers) {
struct msg_receiver *msr;

msr = list_entry(tmp, struct msg_receiver, r_list);
tmp = tmp->next;
list_for_each_entry_safe(msr, t, &msq->q_receivers, r_list) {
if (testmsg(msg, msr->r_msgtype, msr->r_mode) &&
!security_msg_queue_msgrcv(msq, msg, msr->r_tsk,
msr->r_msgtype, msr->r_mode)) {
Expand Down Expand Up @@ -685,7 +668,12 @@ long do_msgsnd(int msqid, long mtype, void __user *mtext,
goto out_unlock_free;
}
ss_add(msq, &s);
ipc_rcu_getref(msq);

if (!ipc_rcu_getref(msq)) {
err = -EIDRM;
goto out_unlock_free;
}

msg_unlock(msq);
schedule();

Expand Down Expand Up @@ -738,6 +726,8 @@ SYSCALL_DEFINE4(msgsnd, int, msqid, struct msgbuf __user *, msgp, size_t, msgsz,

static inline int convert_mode(long *msgtyp, int msgflg)
{
if (msgflg & MSG_COPY)
return SEARCH_NUMBER;
/*
* find message of correct type.
* msgtyp = 0 => get first.
Expand Down Expand Up @@ -774,14 +764,10 @@ static long do_msg_fill(void __user *dest, struct msg_msg *msg, size_t bufsz)
* This function creates new kernel message structure, large enough to store
* bufsz message bytes.
*/
static inline struct msg_msg *prepare_copy(void __user *buf, size_t bufsz,
int msgflg, long *msgtyp,
unsigned long *copy_number)
static inline struct msg_msg *prepare_copy(void __user *buf, size_t bufsz)
{
struct msg_msg *copy;

*copy_number = *msgtyp;
*msgtyp = 0;
/*
* Create dummy message to copy real message to.
*/
Expand All @@ -797,9 +783,7 @@ static inline void free_copy(struct msg_msg *copy)
free_msg(copy);
}
#else
static inline struct msg_msg *prepare_copy(void __user *buf, size_t bufsz,
int msgflg, long *msgtyp,
unsigned long *copy_number)
static inline struct msg_msg *prepare_copy(void __user *buf, size_t bufsz)
{
return ERR_PTR(-ENOSYS);
}
Expand All @@ -809,6 +793,30 @@ static inline void free_copy(struct msg_msg *copy)
}
#endif

static struct msg_msg *find_msg(struct msg_queue *msq, long *msgtyp, int mode)
{
struct msg_msg *msg;
long count = 0;

list_for_each_entry(msg, &msq->q_messages, m_list) {
if (testmsg(msg, *msgtyp, mode) &&
!security_msg_queue_msgrcv(msq, msg, current,
*msgtyp, mode)) {
if (mode == SEARCH_LESSEQUAL && msg->m_type != 1) {
*msgtyp = msg->m_type - 1;
} else if (mode == SEARCH_NUMBER) {
if (*msgtyp == count)
return msg;
} else
return msg;
count++;
}
}

return ERR_PTR(-EAGAIN);
}


long do_msgrcv(int msqid, void __user *buf, size_t bufsz, long msgtyp,
int msgflg,
long (*msg_handler)(void __user *, struct msg_msg *, size_t))
Expand All @@ -818,15 +826,13 @@ long do_msgrcv(int msqid, void __user *buf, size_t bufsz, long msgtyp,
int mode;
struct ipc_namespace *ns;
struct msg_msg *copy = NULL;
unsigned long copy_number = 0;

ns = current->nsproxy->ipc_ns;

if (msqid < 0 || (long) bufsz < 0)
return -EINVAL;
if (msgflg & MSG_COPY) {
copy = prepare_copy(buf, min_t(size_t, bufsz, ns->msg_ctlmax),
msgflg, &msgtyp, &copy_number);
copy = prepare_copy(buf, min_t(size_t, bufsz, ns->msg_ctlmax));
if (IS_ERR(copy))
return PTR_ERR(copy);
}
Expand All @@ -840,45 +846,13 @@ long do_msgrcv(int msqid, void __user *buf, size_t bufsz, long msgtyp,

for (;;) {
struct msg_receiver msr_d;
struct list_head *tmp;
long msg_counter = 0;

msg = ERR_PTR(-EACCES);
if (ipcperms(ns, &msq->q_perm, S_IRUGO))
goto out_unlock;

msg = ERR_PTR(-EAGAIN);
tmp = msq->q_messages.next;
while (tmp != &msq->q_messages) {
struct msg_msg *walk_msg;

walk_msg = list_entry(tmp, struct msg_msg, m_list);
if (testmsg(walk_msg, msgtyp, mode) &&
!security_msg_queue_msgrcv(msq, walk_msg, current,
msgtyp, mode)) {

msg = walk_msg;
if (mode == SEARCH_LESSEQUAL &&
walk_msg->m_type != 1) {
msgtyp = walk_msg->m_type - 1;
} else if (msgflg & MSG_COPY) {
if (copy_number == msg_counter) {
/*
* Found requested message.
* Copy it.
*/
msg = copy_msg(msg, copy);
if (IS_ERR(msg))
goto out_unlock;
break;
}
msg = ERR_PTR(-EAGAIN);
} else
break;
msg_counter++;
}
tmp = tmp->next;
}
msg = find_msg(msq, &msgtyp, mode);

if (!IS_ERR(msg)) {
/*
* Found a suitable message.
Expand All @@ -892,8 +866,10 @@ long do_msgrcv(int msqid, void __user *buf, size_t bufsz, long msgtyp,
* If we are copying, then do not unlink message and do
* not update queue parameters.
*/
if (msgflg & MSG_COPY)
if (msgflg & MSG_COPY) {
msg = copy_msg(msg, copy);
goto out_unlock;
}
list_del(&msg->m_list);
msq->q_qnum--;
msq->q_rtime = get_seconds();
Expand Down
Loading

0 comments on commit 823e75f

Please sign in to comment.