Skip to content

Commit

Permalink
Merge git://git.kernel.org/pub/scm/linux/kernel/git/steve/gfs2-2.6-nmw
Browse files Browse the repository at this point in the history
* git://git.kernel.org/pub/scm/linux/kernel/git/steve/gfs2-2.6-nmw: (51 commits)
  [DLM] block dlm_recv in recovery transition
  [DLM] don't overwrite castparam if it's NULL
  [GFS2] Get superblock a different way
  [GFS2] Don't try to remove buffers that don't exist
  [GFS2] Alternate gfs2_iget to avoid looking up inodes being freed
  [GFS2] Data corruption fix
  [GFS2] Clean up journaled data writing
  [GFS2] GFS2: chmod hung - fix race in thread creation
  [DLM] Make dlm_sendd cond_resched more
  [GFS2] Move inode deletion out of blocking_cb
  [GFS2] flocks from same process trip kernel BUG at fs/gfs2/glock.c:1118!
  [GFS2] Clean up gfs2_trans_add_revoke()
  [GFS2] Use slab operations for all gfs2_bufdata allocations
  [GFS2] Replace revoke structure with bufdata structure
  [GFS2] Fix ordering of dirty/journal for ordered buffer unstuffing
  [GFS2] Clean up ordered write code
  [GFS2] Move pin/unpin into lops.c, clean up locking
  [GFS2] Don't mark jdata dirty in gfs2_unstuffer_page()
  [GFS2] Introduce gfs2_remove_from_ail
  [GFS2] Correct lock ordering in unlink
  ...
  • Loading branch information
Linus Torvalds committed Oct 12, 2007
2 parents 1462222 + c36258b commit f26e51f
Show file tree
Hide file tree
Showing 49 changed files with 1,139 additions and 961 deletions.
1 change: 1 addition & 0 deletions fs/dlm/dlm_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,7 @@ struct dlm_ls {
uint64_t ls_recover_seq;
struct dlm_recover *ls_recover_args;
struct rw_semaphore ls_in_recovery; /* block local requests */
struct rw_semaphore ls_recv_active; /* block dlm_recv */
struct list_head ls_requestqueue;/* queue remote requests */
struct mutex ls_requestqueue_mutex;
char *ls_recover_buf;
Expand Down
142 changes: 85 additions & 57 deletions fs/dlm/lock.c
Original file line number Diff line number Diff line change
Expand Up @@ -3638,55 +3638,8 @@ static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
dlm_put_lkb(lkb);
}

int dlm_receive_message(struct dlm_header *hd, int nodeid, int recovery)
static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms)
{
struct dlm_message *ms = (struct dlm_message *) hd;
struct dlm_ls *ls;
int error = 0;

if (!recovery)
dlm_message_in(ms);

ls = dlm_find_lockspace_global(hd->h_lockspace);
if (!ls) {
log_print("drop message %d from %d for unknown lockspace %d",
ms->m_type, nodeid, hd->h_lockspace);
return -EINVAL;
}

/* recovery may have just ended leaving a bunch of backed-up requests
in the requestqueue; wait while dlm_recoverd clears them */

if (!recovery)
dlm_wait_requestqueue(ls);

/* recovery may have just started while there were a bunch of
in-flight requests -- save them in requestqueue to be processed
after recovery. we can't let dlm_recvd block on the recovery
lock. if dlm_recoverd is calling this function to clear the
requestqueue, it needs to be interrupted (-EINTR) if another
recovery operation is starting. */

while (1) {
if (dlm_locking_stopped(ls)) {
if (recovery) {
error = -EINTR;
goto out;
}
error = dlm_add_requestqueue(ls, nodeid, hd);
if (error == -EAGAIN)
continue;
else {
error = -EINTR;
goto out;
}
}

if (dlm_lock_recovery_try(ls))
break;
schedule();
}

switch (ms->m_type) {

/* messages sent to a master node */
Expand Down Expand Up @@ -3761,17 +3714,90 @@ int dlm_receive_message(struct dlm_header *hd, int nodeid, int recovery)
log_error(ls, "unknown message type %d", ms->m_type);
}

dlm_unlock_recovery(ls);
out:
dlm_put_lockspace(ls);
dlm_astd_wake();
return error;
}

/* If the lockspace is in recovery mode (locking stopped), then normal
messages are saved on the requestqueue for processing after recovery is
done. When not in recovery mode, we wait for dlm_recoverd to drain saved
messages off the requestqueue before we process new ones. This occurs right
after recovery completes when we transition from saving all messages on
requestqueue, to processing all the saved messages, to processing new
messages as they arrive. */

/*
* Recovery related
*/
static void dlm_receive_message(struct dlm_ls *ls, struct dlm_message *ms,
int nodeid)
{
if (dlm_locking_stopped(ls)) {
dlm_add_requestqueue(ls, nodeid, (struct dlm_header *) ms);
} else {
dlm_wait_requestqueue(ls);
_receive_message(ls, ms);
}
}

/* This is called by dlm_recoverd to process messages that were saved on
the requestqueue. */

void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms)
{
_receive_message(ls, ms);
}

/* This is called by the midcomms layer when something is received for
the lockspace. It could be either a MSG (normal message sent as part of
standard locking activity) or an RCOM (recovery message sent as part of
lockspace recovery). */

void dlm_receive_buffer(struct dlm_header *hd, int nodeid)
{
struct dlm_message *ms = (struct dlm_message *) hd;
struct dlm_rcom *rc = (struct dlm_rcom *) hd;
struct dlm_ls *ls;
int type = 0;

switch (hd->h_cmd) {
case DLM_MSG:
dlm_message_in(ms);
type = ms->m_type;
break;
case DLM_RCOM:
dlm_rcom_in(rc);
type = rc->rc_type;
break;
default:
log_print("invalid h_cmd %d from %u", hd->h_cmd, nodeid);
return;
}

if (hd->h_nodeid != nodeid) {
log_print("invalid h_nodeid %d from %d lockspace %x",
hd->h_nodeid, nodeid, hd->h_lockspace);
return;
}

ls = dlm_find_lockspace_global(hd->h_lockspace);
if (!ls) {
log_print("invalid h_lockspace %x from %d cmd %d type %d",
hd->h_lockspace, nodeid, hd->h_cmd, type);

if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS)
dlm_send_ls_not_ready(nodeid, rc);
return;
}

/* this rwsem allows dlm_ls_stop() to wait for all dlm_recv threads to
be inactive (in this ls) before transitioning to recovery mode */

down_read(&ls->ls_recv_active);
if (hd->h_cmd == DLM_MSG)
dlm_receive_message(ls, ms, nodeid);
else
dlm_receive_rcom(ls, rc, nodeid);
up_read(&ls->ls_recv_active);

dlm_put_lockspace(ls);
}

static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb)
{
Expand Down Expand Up @@ -4429,7 +4455,8 @@ int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,

if (lvb_in && ua->lksb.sb_lvbptr)
memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
ua->castparam = ua_tmp->castparam;
if (ua_tmp->castparam)
ua->castparam = ua_tmp->castparam;
ua->user_lksb = ua_tmp->user_lksb;

error = set_unlock_args(flags, ua, &args);
Expand Down Expand Up @@ -4474,7 +4501,8 @@ int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
goto out;

ua = (struct dlm_user_args *)lkb->lkb_astparam;
ua->castparam = ua_tmp->castparam;
if (ua_tmp->castparam)
ua->castparam = ua_tmp->castparam;
ua->user_lksb = ua_tmp->user_lksb;

error = set_unlock_args(flags, ua, &args);
Expand Down
3 changes: 2 additions & 1 deletion fs/dlm/lock.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
void dlm_print_rsb(struct dlm_rsb *r);
void dlm_dump_rsb(struct dlm_rsb *r);
void dlm_print_lkb(struct dlm_lkb *lkb);
int dlm_receive_message(struct dlm_header *hd, int nodeid, int recovery);
void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms);
void dlm_receive_buffer(struct dlm_header *hd, int nodeid);
int dlm_modes_compat(int mode1, int mode2);
int dlm_find_rsb(struct dlm_ls *ls, char *name, int namelen,
unsigned int flags, struct dlm_rsb **r_ret);
Expand Down
1 change: 1 addition & 0 deletions fs/dlm/lockspace.c
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,7 @@ static int new_lockspace(char *name, int namelen, void **lockspace,
ls->ls_recover_seq = 0;
ls->ls_recover_args = NULL;
init_rwsem(&ls->ls_in_recovery);
init_rwsem(&ls->ls_recv_active);
INIT_LIST_HEAD(&ls->ls_requestqueue);
mutex_init(&ls->ls_requestqueue_mutex);
mutex_init(&ls->ls_clear_proc_locks);
Expand Down
23 changes: 8 additions & 15 deletions fs/dlm/lowcomms.c
Original file line number Diff line number Diff line change
Expand Up @@ -334,18 +334,8 @@ static void close_connection(struct connection *con, bool and_other)
con->rx_page = NULL;
}

/* If we are an 'othercon' then NULL the pointer to us
from the parent and tidy ourself up */
if (test_bit(CF_IS_OTHERCON, &con->flags)) {
struct connection *parent = __nodeid2con(con->nodeid, 0);
parent->othercon = NULL;
kmem_cache_free(con_cache, con);
}
else {
/* Parent connections get reused */
con->retries = 0;
mutex_unlock(&con->sock_mutex);
}
con->retries = 0;
mutex_unlock(&con->sock_mutex);
}

/* We only send shutdown messages to nodes that are not part of the cluster */
Expand Down Expand Up @@ -731,6 +721,8 @@ static int tcp_accept_from_sock(struct connection *con)
INIT_WORK(&othercon->swork, process_send_sockets);
INIT_WORK(&othercon->rwork, process_recv_sockets);
set_bit(CF_IS_OTHERCON, &othercon->flags);
}
if (!othercon->sock) {
newcon->othercon = othercon;
othercon->sock = newsock;
newsock->sk->sk_user_data = othercon;
Expand Down Expand Up @@ -1272,14 +1264,15 @@ static void send_to_sock(struct connection *con)
if (len) {
ret = sendpage(con->sock, e->page, offset, len,
msg_flags);
if (ret == -EAGAIN || ret == 0)
if (ret == -EAGAIN || ret == 0) {
cond_resched();
goto out;
}
if (ret <= 0)
goto send_error;
} else {
}
/* Don't starve people filling buffers */
cond_resched();
}

spin_lock(&con->writequeue_lock);
e->offset += ret;
Expand Down
41 changes: 27 additions & 14 deletions fs/dlm/member.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,6 @@
#include "rcom.h"
#include "config.h"

/*
* Following called by dlm_recoverd thread
*/

static void add_ordered_member(struct dlm_ls *ls, struct dlm_member *new)
{
struct dlm_member *memb = NULL;
Expand Down Expand Up @@ -250,18 +246,30 @@ int dlm_recover_members(struct dlm_ls *ls, struct dlm_recover *rv, int *neg_out)
return error;
}

/*
* Following called from lockspace.c
*/
/* Userspace guarantees that dlm_ls_stop() has completed on all nodes before
dlm_ls_start() is called on any of them to start the new recovery. */

int dlm_ls_stop(struct dlm_ls *ls)
{
int new;

/*
* A stop cancels any recovery that's in progress (see RECOVERY_STOP,
* dlm_recovery_stopped()) and prevents any new locks from being
* processed (see RUNNING, dlm_locking_stopped()).
* Prevent dlm_recv from being in the middle of something when we do
* the stop. This includes ensuring dlm_recv isn't processing a
* recovery message (rcom), while dlm_recoverd is aborting and
* resetting things from an in-progress recovery. i.e. we want
* dlm_recoverd to abort its recovery without worrying about dlm_recv
* processing an rcom at the same time. Stopping dlm_recv also makes
* it easy for dlm_receive_message() to check locking stopped and add a
* message to the requestqueue without races.
*/

down_write(&ls->ls_recv_active);

/*
* Abort any recovery that's in progress (see RECOVERY_STOP,
* dlm_recovery_stopped()) and tell any other threads running in the
* dlm to quit any processing (see RUNNING, dlm_locking_stopped()).
*/

spin_lock(&ls->ls_recover_lock);
Expand All @@ -270,9 +278,15 @@ int dlm_ls_stop(struct dlm_ls *ls)
ls->ls_recover_seq++;
spin_unlock(&ls->ls_recover_lock);

/*
* Let dlm_recv run again, now any normal messages will be saved on the
* requestqueue for later.
*/

up_write(&ls->ls_recv_active);

/*
* This in_recovery lock does two things:
*
* 1) Keeps this function from returning until all threads are out
* of locking routines and locking is truely stopped.
* 2) Keeps any new requests from being processed until it's unlocked
Expand All @@ -284,9 +298,8 @@ int dlm_ls_stop(struct dlm_ls *ls)

/*
* The recoverd suspend/resume makes sure that dlm_recoverd (if
* running) has noticed the clearing of RUNNING above and quit
* processing the previous recovery. This will be true for all nodes
* before any nodes start the new recovery.
* running) has noticed RECOVERY_STOP above and quit processing the
* previous recovery.
*/

dlm_recoverd_suspend(ls);
Expand Down
17 changes: 2 additions & 15 deletions fs/dlm/midcomms.c
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
*******************************************************************************
**
** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
** Copyright (C) 2004-2005 Red Hat, Inc. All rights reserved.
** Copyright (C) 2004-2007 Red Hat, Inc. All rights reserved.
**
** This copyrighted material is made available to anyone wishing to use,
** modify, copy, or redistribute it subject to the terms and conditions
Expand All @@ -27,7 +27,6 @@
#include "dlm_internal.h"
#include "lowcomms.h"
#include "config.h"
#include "rcom.h"
#include "lock.h"
#include "midcomms.h"

Expand Down Expand Up @@ -117,19 +116,7 @@ int dlm_process_incoming_buffer(int nodeid, const void *base,
offset &= (limit - 1);
len -= msglen;

switch (msg->h_cmd) {
case DLM_MSG:
dlm_receive_message(msg, nodeid, 0);
break;

case DLM_RCOM:
dlm_receive_rcom(msg, nodeid);
break;

default:
log_print("unknown msg type %x from %u: %u %u %u %u",
msg->h_cmd, nodeid, msglen, len, offset, ret);
}
dlm_receive_buffer(msg, nodeid);
}

if (msg != (struct dlm_header *) __tmp)
Expand Down
Loading

0 comments on commit f26e51f

Please sign in to comment.