Skip to content

Commit

Permalink
[DLM] block dlm_recv in recovery transition
Browse files Browse the repository at this point in the history
Introduce a per-lockspace rwsem that's held in read mode by dlm_recv
threads while working in the dlm.  This allows dlm_recv activity to be
suspended when the lockspace transitions to, from and between recovery
cycles.

The specific bug prompting this change is one where an in-progress
recovery cycle is aborted by a new recovery cycle.  While dlm_recv was
processing a recovery message, the recovery cycle was aborted and
dlm_recoverd began cleaning up.  dlm_recv decremented recover_locks_count
on an rsb after dlm_recoverd had reset it to zero.  This is fixed by
suspending dlm_recv (taking write lock on the rwsem) before aborting the
current recovery.

The transitions to/from normal and recovery modes are simplified by using
this new ability to block dlm_recv.  The switch from normal to recovery
mode means dlm_recv goes from processing locking messages, to saving them
for later, and vice versa.  Races are avoided by blocking dlm_recv when
setting the flag that switches between modes.

Signed-off-by: David Teigland <teigland@redhat.com>
Signed-off-by: Steven Whitehouse <swhiteho@redhat.com>
  • Loading branch information
David Teigland authored and Steven Whitehouse committed Oct 10, 2007
1 parent b434eda commit c36258b
Show file tree
Hide file tree
Showing 11 changed files with 161 additions and 152 deletions.
1 change: 1 addition & 0 deletions fs/dlm/dlm_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,7 @@ struct dlm_ls {
uint64_t ls_recover_seq;
struct dlm_recover *ls_recover_args;
struct rw_semaphore ls_in_recovery; /* block local requests */
struct rw_semaphore ls_recv_active; /* block dlm_recv */
struct list_head ls_requestqueue;/* queue remote requests */
struct mutex ls_requestqueue_mutex;
char *ls_recover_buf;
Expand Down
136 changes: 81 additions & 55 deletions fs/dlm/lock.c
Original file line number Diff line number Diff line change
Expand Up @@ -3638,55 +3638,8 @@ static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
dlm_put_lkb(lkb);
}

int dlm_receive_message(struct dlm_header *hd, int nodeid, int recovery)
static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms)
{
struct dlm_message *ms = (struct dlm_message *) hd;
struct dlm_ls *ls;
int error = 0;

if (!recovery)
dlm_message_in(ms);

ls = dlm_find_lockspace_global(hd->h_lockspace);
if (!ls) {
log_print("drop message %d from %d for unknown lockspace %d",
ms->m_type, nodeid, hd->h_lockspace);
return -EINVAL;
}

/* recovery may have just ended leaving a bunch of backed-up requests
in the requestqueue; wait while dlm_recoverd clears them */

if (!recovery)
dlm_wait_requestqueue(ls);

/* recovery may have just started while there were a bunch of
in-flight requests -- save them in requestqueue to be processed
after recovery. we can't let dlm_recvd block on the recovery
lock. if dlm_recoverd is calling this function to clear the
requestqueue, it needs to be interrupted (-EINTR) if another
recovery operation is starting. */

while (1) {
if (dlm_locking_stopped(ls)) {
if (recovery) {
error = -EINTR;
goto out;
}
error = dlm_add_requestqueue(ls, nodeid, hd);
if (error == -EAGAIN)
continue;
else {
error = -EINTR;
goto out;
}
}

if (dlm_lock_recovery_try(ls))
break;
schedule();
}

switch (ms->m_type) {

/* messages sent to a master node */
Expand Down Expand Up @@ -3761,17 +3714,90 @@ int dlm_receive_message(struct dlm_header *hd, int nodeid, int recovery)
log_error(ls, "unknown message type %d", ms->m_type);
}

dlm_unlock_recovery(ls);
out:
dlm_put_lockspace(ls);
dlm_astd_wake();
return error;
}

/* If the lockspace is in recovery mode (locking stopped), then normal
messages are saved on the requestqueue for processing after recovery is
done. When not in recovery mode, we wait for dlm_recoverd to drain saved
messages off the requestqueue before we process new ones. This occurs right
after recovery completes when we transition from saving all messages on
requestqueue, to processing all the saved messages, to processing new
messages as they arrive. */

/*
* Recovery related
*/
static void dlm_receive_message(struct dlm_ls *ls, struct dlm_message *ms,
int nodeid)
{
if (dlm_locking_stopped(ls)) {
dlm_add_requestqueue(ls, nodeid, (struct dlm_header *) ms);
} else {
dlm_wait_requestqueue(ls);
_receive_message(ls, ms);
}
}

/* This is called by dlm_recoverd to process messages that were saved on
the requestqueue. */

void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms)
{
_receive_message(ls, ms);
}

/* This is called by the midcomms layer when something is received for
the lockspace. It could be either a MSG (normal message sent as part of
standard locking activity) or an RCOM (recovery message sent as part of
lockspace recovery). */

void dlm_receive_buffer(struct dlm_header *hd, int nodeid)
{
struct dlm_message *ms = (struct dlm_message *) hd;
struct dlm_rcom *rc = (struct dlm_rcom *) hd;
struct dlm_ls *ls;
int type = 0;

switch (hd->h_cmd) {
case DLM_MSG:
dlm_message_in(ms);
type = ms->m_type;
break;
case DLM_RCOM:
dlm_rcom_in(rc);
type = rc->rc_type;
break;
default:
log_print("invalid h_cmd %d from %u", hd->h_cmd, nodeid);
return;
}

if (hd->h_nodeid != nodeid) {
log_print("invalid h_nodeid %d from %d lockspace %x",
hd->h_nodeid, nodeid, hd->h_lockspace);
return;
}

ls = dlm_find_lockspace_global(hd->h_lockspace);
if (!ls) {
log_print("invalid h_lockspace %x from %d cmd %d type %d",
hd->h_lockspace, nodeid, hd->h_cmd, type);

if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS)
dlm_send_ls_not_ready(nodeid, rc);
return;
}

/* this rwsem allows dlm_ls_stop() to wait for all dlm_recv threads to
be inactive (in this ls) before transitioning to recovery mode */

down_read(&ls->ls_recv_active);
if (hd->h_cmd == DLM_MSG)
dlm_receive_message(ls, ms, nodeid);
else
dlm_receive_rcom(ls, rc, nodeid);
up_read(&ls->ls_recv_active);

dlm_put_lockspace(ls);
}

static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb)
{
Expand Down
3 changes: 2 additions & 1 deletion fs/dlm/lock.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
void dlm_print_rsb(struct dlm_rsb *r);
void dlm_dump_rsb(struct dlm_rsb *r);
void dlm_print_lkb(struct dlm_lkb *lkb);
int dlm_receive_message(struct dlm_header *hd, int nodeid, int recovery);
void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms);
void dlm_receive_buffer(struct dlm_header *hd, int nodeid);
int dlm_modes_compat(int mode1, int mode2);
int dlm_find_rsb(struct dlm_ls *ls, char *name, int namelen,
unsigned int flags, struct dlm_rsb **r_ret);
Expand Down
1 change: 1 addition & 0 deletions fs/dlm/lockspace.c
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,7 @@ static int new_lockspace(char *name, int namelen, void **lockspace,
ls->ls_recover_seq = 0;
ls->ls_recover_args = NULL;
init_rwsem(&ls->ls_in_recovery);
init_rwsem(&ls->ls_recv_active);
INIT_LIST_HEAD(&ls->ls_requestqueue);
mutex_init(&ls->ls_requestqueue_mutex);
mutex_init(&ls->ls_clear_proc_locks);
Expand Down
41 changes: 27 additions & 14 deletions fs/dlm/member.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,6 @@
#include "rcom.h"
#include "config.h"

/*
* Following called by dlm_recoverd thread
*/

static void add_ordered_member(struct dlm_ls *ls, struct dlm_member *new)
{
struct dlm_member *memb = NULL;
Expand Down Expand Up @@ -250,18 +246,30 @@ int dlm_recover_members(struct dlm_ls *ls, struct dlm_recover *rv, int *neg_out)
return error;
}

/*
* Following called from lockspace.c
*/
/* Userspace guarantees that dlm_ls_stop() has completed on all nodes before
dlm_ls_start() is called on any of them to start the new recovery. */

int dlm_ls_stop(struct dlm_ls *ls)
{
int new;

/*
* A stop cancels any recovery that's in progress (see RECOVERY_STOP,
* dlm_recovery_stopped()) and prevents any new locks from being
* processed (see RUNNING, dlm_locking_stopped()).
* Prevent dlm_recv from being in the middle of something when we do
* the stop. This includes ensuring dlm_recv isn't processing a
* recovery message (rcom), while dlm_recoverd is aborting and
* resetting things from an in-progress recovery. i.e. we want
* dlm_recoverd to abort its recovery without worrying about dlm_recv
* processing an rcom at the same time. Stopping dlm_recv also makes
* it easy for dlm_receive_message() to check locking stopped and add a
* message to the requestqueue without races.
*/

down_write(&ls->ls_recv_active);

/*
* Abort any recovery that's in progress (see RECOVERY_STOP,
* dlm_recovery_stopped()) and tell any other threads running in the
* dlm to quit any processing (see RUNNING, dlm_locking_stopped()).
*/

spin_lock(&ls->ls_recover_lock);
Expand All @@ -270,9 +278,15 @@ int dlm_ls_stop(struct dlm_ls *ls)
ls->ls_recover_seq++;
spin_unlock(&ls->ls_recover_lock);

/*
* Let dlm_recv run again, now any normal messages will be saved on the
* requestqueue for later.
*/

up_write(&ls->ls_recv_active);

/*
* This in_recovery lock does two things:
*
* 1) Keeps this function from returning until all threads are out
* of locking routines and locking is truely stopped.
* 2) Keeps any new requests from being processed until it's unlocked
Expand All @@ -284,9 +298,8 @@ int dlm_ls_stop(struct dlm_ls *ls)

/*
* The recoverd suspend/resume makes sure that dlm_recoverd (if
* running) has noticed the clearing of RUNNING above and quit
* processing the previous recovery. This will be true for all nodes
* before any nodes start the new recovery.
* running) has noticed RECOVERY_STOP above and quit processing the
* previous recovery.
*/

dlm_recoverd_suspend(ls);
Expand Down
17 changes: 2 additions & 15 deletions fs/dlm/midcomms.c
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
*******************************************************************************
**
** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
** Copyright (C) 2004-2005 Red Hat, Inc. All rights reserved.
** Copyright (C) 2004-2007 Red Hat, Inc. All rights reserved.
**
** This copyrighted material is made available to anyone wishing to use,
** modify, copy, or redistribute it subject to the terms and conditions
Expand All @@ -27,7 +27,6 @@
#include "dlm_internal.h"
#include "lowcomms.h"
#include "config.h"
#include "rcom.h"
#include "lock.h"
#include "midcomms.h"

Expand Down Expand Up @@ -117,19 +116,7 @@ int dlm_process_incoming_buffer(int nodeid, const void *base,
offset &= (limit - 1);
len -= msglen;

switch (msg->h_cmd) {
case DLM_MSG:
dlm_receive_message(msg, nodeid, 0);
break;

case DLM_RCOM:
dlm_receive_rcom(msg, nodeid);
break;

default:
log_print("unknown msg type %x from %u: %u %u %u %u",
msg->h_cmd, nodeid, msglen, len, offset, ret);
}
dlm_receive_buffer(msg, nodeid);
}

if (msg != (struct dlm_header *) __tmp)
Expand Down
36 changes: 8 additions & 28 deletions fs/dlm/rcom.c
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
*******************************************************************************
**
** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
** Copyright (C) 2005 Red Hat, Inc. All rights reserved.
** Copyright (C) 2005-2007 Red Hat, Inc. All rights reserved.
**
** This copyrighted material is made available to anyone wishing to use,
** modify, copy, or redistribute it subject to the terms and conditions
Expand Down Expand Up @@ -386,7 +386,10 @@ static void receive_rcom_lock_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in)
dlm_recover_process_copy(ls, rc_in);
}

static int send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in)
/* If the lockspace doesn't exist then still send a status message
back; it's possible that it just doesn't have its global_id yet. */

int dlm_send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in)
{
struct dlm_rcom *rc;
struct rcom_config *rf;
Expand Down Expand Up @@ -446,28 +449,11 @@ static int is_old_reply(struct dlm_ls *ls, struct dlm_rcom *rc)
return rv;
}

/* Called by dlm_recvd; corresponds to dlm_receive_message() but special
/* Called by dlm_recv; corresponds to dlm_receive_message() but special
recovery-only comms are sent through here. */

void dlm_receive_rcom(struct dlm_header *hd, int nodeid)
void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
{
struct dlm_rcom *rc = (struct dlm_rcom *) hd;
struct dlm_ls *ls;

dlm_rcom_in(rc);

/* If the lockspace doesn't exist then still send a status message
back; it's possible that it just doesn't have its global_id yet. */

ls = dlm_find_lockspace_global(hd->h_lockspace);
if (!ls) {
log_print("lockspace %x from %d type %x not found",
hd->h_lockspace, nodeid, rc->rc_type);
if (rc->rc_type == DLM_RCOM_STATUS)
send_ls_not_ready(nodeid, rc);
return;
}

if (dlm_recovery_stopped(ls) && (rc->rc_type != DLM_RCOM_STATUS)) {
log_debug(ls, "ignoring recovery message %x from %d",
rc->rc_type, nodeid);
Expand All @@ -477,12 +463,6 @@ void dlm_receive_rcom(struct dlm_header *hd, int nodeid)
if (is_old_reply(ls, rc))
goto out;

if (nodeid != rc->rc_header.h_nodeid) {
log_error(ls, "bad rcom nodeid %d from %d",
rc->rc_header.h_nodeid, nodeid);
goto out;
}

switch (rc->rc_type) {
case DLM_RCOM_STATUS:
receive_rcom_status(ls, rc);
Expand Down Expand Up @@ -520,6 +500,6 @@ void dlm_receive_rcom(struct dlm_header *hd, int nodeid)
DLM_ASSERT(0, printk("rc_type=%x\n", rc->rc_type););
}
out:
dlm_put_lockspace(ls);
return;
}

5 changes: 3 additions & 2 deletions fs/dlm/rcom.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
*******************************************************************************
**
** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
** Copyright (C) 2005 Red Hat, Inc. All rights reserved.
** Copyright (C) 2005-2007 Red Hat, Inc. All rights reserved.
**
** This copyrighted material is made available to anyone wishing to use,
** modify, copy, or redistribute it subject to the terms and conditions
Expand All @@ -18,7 +18,8 @@ int dlm_rcom_status(struct dlm_ls *ls, int nodeid);
int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name,int last_len);
int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid);
int dlm_send_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
void dlm_receive_rcom(struct dlm_header *hd, int nodeid);
void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid);
int dlm_send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in);

#endif

Loading

0 comments on commit c36258b

Please sign in to comment.