Skip to content

Commit

Permalink
---
Browse files Browse the repository at this point in the history
yaml
---
r: 89554
b: refs/heads/master
c: de55124
h: refs/heads/master
v: v3
  • Loading branch information
Joel Becker authored and Mark Fasheh committed Apr 18, 2008
1 parent badfec3 commit c7294c7
Show file tree
Hide file tree
Showing 4 changed files with 189 additions and 56 deletions.
2 changes: 1 addition & 1 deletion [refs]
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
---
refs/heads/master: 0abd6d1803b01c741430af270026d1d95a103d9c
refs/heads/master: de551246e7bc5558371c3427889a8db1b8cc60f4
199 changes: 171 additions & 28 deletions trunk/fs/ocfs2/dlmglue.c
Original file line number Diff line number Diff line change
Expand Up @@ -311,12 +311,13 @@ static int ocfs2_inode_lock_update(struct inode *inode,
struct buffer_head **bh);
static void ocfs2_drop_osb_locks(struct ocfs2_super *osb);
static inline int ocfs2_highest_compat_lock_level(int level);
static void ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres,
int new_level);
static unsigned int ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres,
int new_level);
static int ocfs2_downconvert_lock(struct ocfs2_super *osb,
struct ocfs2_lock_res *lockres,
int new_level,
int lvb);
int lvb,
unsigned int generation);
static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb,
struct ocfs2_lock_res *lockres);
static int ocfs2_cancel_convert(struct ocfs2_super *osb,
Expand Down Expand Up @@ -736,6 +737,113 @@ static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres,
return needs_downconvert;
}

/*
* OCFS2_LOCK_PENDING and l_pending_gen.
*
* Why does OCFS2_LOCK_PENDING exist? To close a race between setting
* OCFS2_LOCK_BUSY and calling ocfs2_dlm_lock(). See ocfs2_unblock_lock()
* for more details on the race.
*
* OCFS2_LOCK_PENDING closes the race quite nicely. However, it introduces
* a race on itself. In o2dlm, we can get the ast before ocfs2_dlm_lock()
* returns. The ast clears OCFS2_LOCK_BUSY, and must therefore clear
* OCFS2_LOCK_PENDING at the same time. When ocfs2_dlm_lock() returns,
* the caller is going to try to clear PENDING again. If nothing else is
* happening, __lockres_clear_pending() sees PENDING is unset and does
* nothing.
*
* But what if another path (eg downconvert thread) has just started a
* new locking action? The other path has re-set PENDING. Our path
* cannot clear PENDING, because that will re-open the original race
* window.
*
* [Example]
*
* ocfs2_meta_lock()
* ocfs2_cluster_lock()
* set BUSY
* set PENDING
* drop l_lock
* ocfs2_dlm_lock()
* ocfs2_locking_ast() ocfs2_downconvert_thread()
* clear PENDING ocfs2_unblock_lock()
* take_l_lock
* !BUSY
* ocfs2_prepare_downconvert()
* set BUSY
* set PENDING
* drop l_lock
* take l_lock
* clear PENDING
* drop l_lock
* <window>
* ocfs2_dlm_lock()
*
* So as you can see, we now have a window where l_lock is not held,
* PENDING is not set, and ocfs2_dlm_lock() has not been called.
*
* The core problem is that ocfs2_cluster_lock() has cleared the PENDING
* set by ocfs2_prepare_downconvert(). That wasn't nice.
*
* To solve this we introduce l_pending_gen. A call to
* lockres_clear_pending() will only do so when it is passed a generation
* number that matches the lockres. lockres_set_pending() will return the
* current generation number. When ocfs2_cluster_lock() goes to clear
* PENDING, it passes the generation it got from set_pending(). In our
* example above, the generation numbers will *not* match. Thus,
* ocfs2_cluster_lock() will not clear the PENDING set by
* ocfs2_prepare_downconvert().
*/

/* Unlocked version for ocfs2_locking_ast() */
static void __lockres_clear_pending(struct ocfs2_lock_res *lockres,
unsigned int generation,
struct ocfs2_super *osb)
{
assert_spin_locked(&lockres->l_lock);

/*
* The ast and locking functions can race us here. The winner
* will clear pending, the loser will not.
*/
if (!(lockres->l_flags & OCFS2_LOCK_PENDING) ||
(lockres->l_pending_gen != generation))
return;

lockres_clear_flags(lockres, OCFS2_LOCK_PENDING);
lockres->l_pending_gen++;

/*
* The downconvert thread may have skipped us because we
* were PENDING. Wake it up.
*/
if (lockres->l_flags & OCFS2_LOCK_BLOCKED)
ocfs2_wake_downconvert_thread(osb);
}

/* Locked version for callers of ocfs2_dlm_lock() */
static void lockres_clear_pending(struct ocfs2_lock_res *lockres,
unsigned int generation,
struct ocfs2_super *osb)
{
unsigned long flags;

spin_lock_irqsave(&lockres->l_lock, flags);
__lockres_clear_pending(lockres, generation, osb);
spin_unlock_irqrestore(&lockres->l_lock, flags);
}

static unsigned int lockres_set_pending(struct ocfs2_lock_res *lockres)
{
assert_spin_locked(&lockres->l_lock);
BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY));

lockres_or_flags(lockres, OCFS2_LOCK_PENDING);

return lockres->l_pending_gen;
}


static void ocfs2_blocking_ast(void *opaque, int level)
{
struct ocfs2_lock_res *lockres = opaque;
Expand Down Expand Up @@ -770,6 +878,7 @@ static void ocfs2_blocking_ast(void *opaque, int level)
static void ocfs2_locking_ast(void *opaque)
{
struct ocfs2_lock_res *lockres = opaque;
struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres);
unsigned long flags;

spin_lock_irqsave(&lockres->l_lock, flags);
Expand Down Expand Up @@ -805,6 +914,18 @@ static void ocfs2_locking_ast(void *opaque)
* can catch it. */
lockres->l_action = OCFS2_AST_INVALID;

/* Did we try to cancel this lock? Clear that state */
if (lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT)
lockres->l_unlock_action = OCFS2_UNLOCK_INVALID;

/*
* We may have beaten the locking functions here. We certainly
* know that dlm_lock() has been called :-)
* Because we can't have two lock calls in flight at once, we
* can use lockres->l_pending_gen.
*/
__lockres_clear_pending(lockres, lockres->l_pending_gen, osb);

wake_up(&lockres->l_event);
spin_unlock_irqrestore(&lockres->l_lock, flags);
}
Expand Down Expand Up @@ -838,6 +959,7 @@ static int ocfs2_lock_create(struct ocfs2_super *osb,
{
int ret = 0;
unsigned long flags;
unsigned int gen;

mlog_entry_void();

Expand All @@ -854,6 +976,7 @@ static int ocfs2_lock_create(struct ocfs2_super *osb,
lockres->l_action = OCFS2_AST_ATTACH;
lockres->l_requested = level;
lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
gen = lockres_set_pending(lockres);
spin_unlock_irqrestore(&lockres->l_lock, flags);

ret = ocfs2_dlm_lock(osb->cconn,
Expand All @@ -863,6 +986,7 @@ static int ocfs2_lock_create(struct ocfs2_super *osb,
lockres->l_name,
OCFS2_LOCK_ID_MAX_LEN - 1,
lockres);
lockres_clear_pending(lockres, gen, osb);
if (ret) {
ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres);
ocfs2_recover_from_dlm_error(lockres, 1);
Expand Down Expand Up @@ -988,6 +1112,7 @@ static int ocfs2_cluster_lock(struct ocfs2_super *osb,
int wait, catch_signals = !(osb->s_mount_opt & OCFS2_MOUNT_NOINTR);
int ret = 0; /* gcc doesn't realize wait = 1 guarantees ret is set */
unsigned long flags;
unsigned int gen;

mlog_entry_void();

Expand Down Expand Up @@ -1046,6 +1171,7 @@ static int ocfs2_cluster_lock(struct ocfs2_super *osb,

lockres->l_requested = level;
lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
gen = lockres_set_pending(lockres);
spin_unlock_irqrestore(&lockres->l_lock, flags);

BUG_ON(level == DLM_LOCK_IV);
Expand All @@ -1062,6 +1188,7 @@ static int ocfs2_cluster_lock(struct ocfs2_super *osb,
lockres->l_name,
OCFS2_LOCK_ID_MAX_LEN - 1,
lockres);
lockres_clear_pending(lockres, gen, osb);
if (ret) {
if (!(lkm_flags & DLM_LKF_NOQUEUE) ||
(ret != -EAGAIN)) {
Expand Down Expand Up @@ -1506,6 +1633,7 @@ int ocfs2_file_lock(struct file *file, int ex, int trylock)
void ocfs2_file_unlock(struct file *file)
{
int ret;
unsigned int gen;
unsigned long flags;
struct ocfs2_file_private *fp = file->private_data;
struct ocfs2_lock_res *lockres = &fp->fp_flock;
Expand All @@ -1531,11 +1659,11 @@ void ocfs2_file_unlock(struct file *file)
lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED);
lockres->l_blocking = DLM_LOCK_EX;

ocfs2_prepare_downconvert(lockres, LKM_NLMODE);
gen = ocfs2_prepare_downconvert(lockres, LKM_NLMODE);
lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
spin_unlock_irqrestore(&lockres->l_lock, flags);

ret = ocfs2_downconvert_lock(osb, lockres, LKM_NLMODE, 0);
ret = ocfs2_downconvert_lock(osb, lockres, LKM_NLMODE, 0, gen);
if (ret) {
mlog_errno(ret);
return;
Expand Down Expand Up @@ -2555,23 +2683,7 @@ static void ocfs2_unlock_ast(void *opaque, int error)
lockres->l_unlock_action);

spin_lock_irqsave(&lockres->l_lock, flags);
/* We tried to cancel a convert request, but it was already
* granted. All we want to do here is clear our unlock
* state. The wake_up call done at the bottom is redundant
* (ocfs2_prepare_cancel_convert doesn't sleep on this) but doesn't
* hurt anything anyway */
if (error == -DLM_ECANCEL &&
lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT) {
mlog(0, "Got cancelgrant for %s\n", lockres->l_name);

/* We don't clear the busy flag in this case as it
* should have been cleared by the ast which the dlm
* has called. */
goto complete_unlock;
}

/* DLM_EUNLOCK is the success code for unlock */
if (error != -DLM_EUNLOCK) {
if (error) {
mlog(ML_ERROR, "Dlm passes error %d for lock %s, "
"unlock_action %d\n", error, lockres->l_name,
lockres->l_unlock_action);
Expand All @@ -2592,7 +2704,6 @@ static void ocfs2_unlock_ast(void *opaque, int error)
}

lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
complete_unlock:
lockres->l_unlock_action = OCFS2_UNLOCK_INVALID;
spin_unlock_irqrestore(&lockres->l_lock, flags);

Expand Down Expand Up @@ -2768,8 +2879,8 @@ int ocfs2_drop_inode_locks(struct inode *inode)
return status;
}

static void ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres,
int new_level)
static unsigned int ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres,
int new_level)
{
assert_spin_locked(&lockres->l_lock);

Expand All @@ -2787,12 +2898,14 @@ static void ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres,
lockres->l_action = OCFS2_AST_DOWNCONVERT;
lockres->l_requested = new_level;
lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
return lockres_set_pending(lockres);
}

static int ocfs2_downconvert_lock(struct ocfs2_super *osb,
struct ocfs2_lock_res *lockres,
int new_level,
int lvb)
int lvb,
unsigned int generation)
{
int ret;
u32 dlm_flags = DLM_LKF_CONVERT;
Expand All @@ -2809,6 +2922,7 @@ static int ocfs2_downconvert_lock(struct ocfs2_super *osb,
lockres->l_name,
OCFS2_LOCK_ID_MAX_LEN - 1,
lockres);
lockres_clear_pending(lockres, generation, osb);
if (ret) {
ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres);
ocfs2_recover_from_dlm_error(lockres, 1);
Expand Down Expand Up @@ -2883,6 +2997,7 @@ static int ocfs2_unblock_lock(struct ocfs2_super *osb,
int new_level;
int ret = 0;
int set_lvb = 0;
unsigned int gen;

mlog_entry_void();

Expand All @@ -2892,6 +3007,32 @@ static int ocfs2_unblock_lock(struct ocfs2_super *osb,

recheck:
if (lockres->l_flags & OCFS2_LOCK_BUSY) {
/* XXX
* This is a *big* race. The OCFS2_LOCK_PENDING flag
* exists entirely for one reason - another thread has set
* OCFS2_LOCK_BUSY, but has *NOT* yet called dlm_lock().
*
* If we do ocfs2_cancel_convert() before the other thread
* calls dlm_lock(), our cancel will do nothing. We will
* get no ast, and we will have no way of knowing the
* cancel failed. Meanwhile, the other thread will call
* into dlm_lock() and wait...forever.
*
* Why forever? Because another node has asked for the
* lock first; that's why we're here in unblock_lock().
*
* The solution is OCFS2_LOCK_PENDING. When PENDING is
* set, we just requeue the unblock. Only when the other
* thread has called dlm_lock() and cleared PENDING will
* we then cancel their request.
*
* All callers of dlm_lock() must set OCFS2_DLM_PENDING
* at the same time they set OCFS2_DLM_BUSY. They must
* clear OCFS2_DLM_PENDING after dlm_lock() returns.
*/
if (lockres->l_flags & OCFS2_LOCK_PENDING)
goto leave_requeue;

ctl->requeue = 1;
ret = ocfs2_prepare_cancel_convert(osb, lockres);
spin_unlock_irqrestore(&lockres->l_lock, flags);
Expand Down Expand Up @@ -2971,9 +3112,11 @@ static int ocfs2_unblock_lock(struct ocfs2_super *osb,
lockres->l_ops->set_lvb(lockres);
}

ocfs2_prepare_downconvert(lockres, new_level);
gen = ocfs2_prepare_downconvert(lockres, new_level);
spin_unlock_irqrestore(&lockres->l_lock, flags);
ret = ocfs2_downconvert_lock(osb, lockres, new_level, set_lvb);
ret = ocfs2_downconvert_lock(osb, lockres, new_level, set_lvb,
gen);

leave:
mlog_exit(ret);
return ret;
Expand Down
4 changes: 4 additions & 0 deletions trunk/fs/ocfs2/ocfs2.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ enum ocfs2_unlock_action {
* dropped. */
#define OCFS2_LOCK_QUEUED (0x00000100) /* queued for downconvert */
#define OCFS2_LOCK_NOCACHE (0x00000200) /* don't use a holder count */
#define OCFS2_LOCK_PENDING (0x00000400) /* This lockres is pending a
call to dlm_lock. Only
exists with BUSY set. */

struct ocfs2_lock_res_ops;

Expand All @@ -124,6 +127,7 @@ struct ocfs2_lock_res {
enum ocfs2_unlock_action l_unlock_action;
int l_requested;
int l_blocking;
unsigned int l_pending_gen;

wait_queue_head_t l_event;

Expand Down
Loading

0 comments on commit c7294c7

Please sign in to comment.