Skip to content

Commit

Permalink
ocfs2_dlm: Ensure correct ordering of set/clear refmap bit on lockres
Browse files Browse the repository at this point in the history
Eventhough the set refmap bit message is sent before the clear refmap
message, currently there is no guarentee that the set message will be
handled before the clear. This patch prevents the clear refmap to be
processed while the node is sending assert master messages to other
nodes. (The set refmap message is sent as a response to the assert
master request).

Signed-off-by: Sunil Mushran <sunil.mushran@oracle.com>
Signed-off-by: Mark Fasheh <mark.fasheh@oracle.com>
  • Loading branch information
Sunil Mushran authored and Mark Fasheh committed Feb 7, 2007
1 parent ab81afd commit f3f8546
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 11 deletions.
6 changes: 6 additions & 0 deletions fs/ocfs2/dlm/dlmcommon.h
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,11 @@ struct dlm_assert_master_priv
unsigned ignore_higher:1;
};

struct dlm_deref_lockres_priv
{
struct dlm_lock_resource *deref_res;
u8 deref_node;
};

struct dlm_work_item
{
Expand All @@ -191,6 +196,7 @@ struct dlm_work_item
struct dlm_request_all_locks_priv ral;
struct dlm_mig_lockres_priv ml;
struct dlm_assert_master_priv am;
struct dlm_deref_lockres_priv dl;
} u;
};

Expand Down
99 changes: 88 additions & 11 deletions fs/ocfs2/dlm/dlmmaster.c
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ static void dlm_assert_master_worker(struct dlm_work_item *item, void *data);
static int dlm_do_assert_master(struct dlm_ctxt *dlm,
struct dlm_lock_resource *res,
void *nodemap, u32 flags);
static void dlm_deref_lockres_worker(struct dlm_work_item *item, void *data);

static inline int dlm_mle_equal(struct dlm_ctxt *dlm,
struct dlm_master_list_entry *mle,
Expand Down Expand Up @@ -1717,6 +1718,11 @@ int dlm_do_assert_master(struct dlm_ctxt *dlm,
unsigned int namelen = res->lockname.len;

BUG_ON(namelen > O2NM_MAX_NAME_LEN);

spin_lock(&res->spinlock);
res->state |= DLM_LOCK_RES_SETREF_INPROG;
spin_unlock(&res->spinlock);

again:
reassert = 0;

Expand Down Expand Up @@ -1789,6 +1795,11 @@ int dlm_do_assert_master(struct dlm_ctxt *dlm,
if (reassert)
goto again;

spin_lock(&res->spinlock);
res->state &= ~DLM_LOCK_RES_SETREF_INPROG;
spin_unlock(&res->spinlock);
wake_up(&res->wq);

return ret;
}

Expand Down Expand Up @@ -2296,6 +2307,9 @@ int dlm_deref_lockres_handler(struct o2net_msg *msg, u32 len, void *data,
int ret = -EINVAL;
u8 node;
unsigned int hash;
struct dlm_work_item *item;
int cleared = 0;
int dispatch = 0;

if (!dlm_grab(dlm))
return 0;
Expand Down Expand Up @@ -2326,27 +2340,90 @@ int dlm_deref_lockres_handler(struct o2net_msg *msg, u32 len, void *data,
spin_unlock(&dlm->spinlock);

spin_lock(&res->spinlock);
BUG_ON(res->state & DLM_LOCK_RES_DROPPING_REF);
if (test_bit(node, res->refmap)) {
ret = 0;
dlm_lockres_clear_refmap_bit(node, res);
} else {
mlog(ML_ERROR, "%s:%.*s: node %u trying to drop ref "
"but it is already dropped!\n", dlm->name, namelen,
name, node);
__dlm_print_one_lock_resource(res);
if (res->state & DLM_LOCK_RES_SETREF_INPROG)
dispatch = 1;
else {
BUG_ON(res->state & DLM_LOCK_RES_DROPPING_REF);
if (test_bit(node, res->refmap)) {
dlm_lockres_clear_refmap_bit(node, res);
cleared = 1;
}
}
spin_unlock(&res->spinlock);

if (!ret)
dlm_lockres_calc_usage(dlm, res);
if (!dispatch) {
if (cleared)
dlm_lockres_calc_usage(dlm, res);
else {
mlog(ML_ERROR, "%s:%.*s: node %u trying to drop ref "
"but it is already dropped!\n", dlm->name,
res->lockname.len, res->lockname.name, node);
__dlm_print_one_lock_resource(res);
}
ret = 0;
goto done;
}

item = kzalloc(sizeof(*item), GFP_NOFS);
if (!item) {
ret = -ENOMEM;
mlog_errno(ret);
goto done;
}

dlm_init_work_item(dlm, item, dlm_deref_lockres_worker, NULL);
item->u.dl.deref_res = res;
item->u.dl.deref_node = node;

spin_lock(&dlm->work_lock);
list_add_tail(&item->list, &dlm->work_list);
spin_unlock(&dlm->work_lock);

queue_work(dlm->dlm_worker, &dlm->dispatched_work);
return 0;

done:
if (res)
dlm_lockres_put(res);
dlm_put(dlm);

return ret;
}

static void dlm_deref_lockres_worker(struct dlm_work_item *item, void *data)
{
struct dlm_ctxt *dlm;
struct dlm_lock_resource *res;
u8 node;
u8 cleared = 0;

dlm = item->dlm;
res = item->u.dl.deref_res;
node = item->u.dl.deref_node;

spin_lock(&res->spinlock);
BUG_ON(res->state & DLM_LOCK_RES_DROPPING_REF);
if (test_bit(node, res->refmap)) {
__dlm_wait_on_lockres_flags(res, DLM_LOCK_RES_SETREF_INPROG);
dlm_lockres_clear_refmap_bit(node, res);
cleared = 1;
}
spin_unlock(&res->spinlock);

if (cleared) {
mlog(0, "%s:%.*s node %u ref dropped in dispatch\n",
dlm->name, res->lockname.len, res->lockname.name, node);
dlm_lockres_calc_usage(dlm, res);
} else {
mlog(ML_ERROR, "%s:%.*s: node %u trying to drop ref "
"but it is already dropped!\n", dlm->name,
res->lockname.len, res->lockname.name, node);
__dlm_print_one_lock_resource(res);
}

dlm_lockres_put(res);
}


/*
* DLM_MIGRATE_LOCKRES
Expand Down

0 comments on commit f3f8546

Please sign in to comment.