Skip to content

Commit

Permalink
[PATCH] ocfs2: fix hang in dlm lock resource mastery
Browse files Browse the repository at this point in the history
fixes hangs in lock mastery related to refcounting on the mle structure

Signed-off-by: Kurt Hackel <kurt.hackel@oracle.com>
Signed-off-by: Mark Fasheh <mark.fasheh@oracle.com>
  • Loading branch information
Kurt Hackel authored and Mark Fasheh committed Mar 24, 2006
1 parent a74e1f0 commit 9c6510a
Showing 1 changed file with 92 additions and 32 deletions.
124 changes: 92 additions & 32 deletions fs/ocfs2/dlm/dlmmaster.c
Original file line number Diff line number Diff line change
Expand Up @@ -792,7 +792,15 @@ struct dlm_lock_resource * dlm_get_lock_resource(struct dlm_ctxt *dlm,
mlog_errno(ret);
if (mle->master != O2NM_MAX_NODES) {
/* found a master ! */
break;
if (mle->master <= nodenum)
break;
/* if our master request has not reached the master
* yet, keep going until it does. this is how the
* master will know that asserts are needed back to
* the lower nodes. */
mlog(0, "%s:%.*s: requests only up to %u but master "
"is %u, keep going\n", dlm->name, namelen,
lockid, nodenum, mle->master);
}
}

Expand Down Expand Up @@ -860,7 +868,19 @@ static int dlm_wait_for_lock_mastery(struct dlm_ctxt *dlm,
/* check if another node has already become the owner */
spin_lock(&res->spinlock);
if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) {
mlog(0, "%s:%.*s: owner is suddenly %u\n", dlm->name,
res->lockname.len, res->lockname.name, res->owner);
spin_unlock(&res->spinlock);
/* this will cause the master to re-assert across
* the whole cluster, freeing up mles */
ret = dlm_do_master_request(mle, res->owner);
if (ret < 0) {
/* give recovery a chance to run */
mlog(ML_ERROR, "link to %u went down?: %d\n", res->owner, ret);
msleep(500);
goto recheck;
}
ret = 0;
goto leave;
}
spin_unlock(&res->spinlock);
Expand Down Expand Up @@ -1244,13 +1264,14 @@ int dlm_master_request_handler(struct o2net_msg *msg, u32 len, void *data)
{
u8 response = DLM_MASTER_RESP_MAYBE;
struct dlm_ctxt *dlm = data;
struct dlm_lock_resource *res;
struct dlm_lock_resource *res = NULL;
struct dlm_master_request *request = (struct dlm_master_request *) msg->buf;
struct dlm_master_list_entry *mle = NULL, *tmpmle = NULL;
char *name;
unsigned int namelen;
int found, ret;
int set_maybe;
int dispatch_assert = 0;

if (!dlm_grab(dlm))
return DLM_MASTER_RESP_NO;
Expand Down Expand Up @@ -1287,7 +1308,6 @@ int dlm_master_request_handler(struct o2net_msg *msg, u32 len, void *data)
}

if (res->owner == dlm->node_num) {
u32 flags = DLM_ASSERT_MASTER_MLE_CLEANUP;
spin_unlock(&res->spinlock);
// mlog(0, "this node is the master\n");
response = DLM_MASTER_RESP_YES;
Expand All @@ -1300,16 +1320,7 @@ int dlm_master_request_handler(struct o2net_msg *msg, u32 len, void *data)
* caused all nodes up to this one to
* create mles. this node now needs to
* go back and clean those up. */
mlog(0, "%u is the owner of %.*s, cleaning everyone else\n",
dlm->node_num, res->lockname.len, res->lockname.name);
ret = dlm_dispatch_assert_master(dlm, res, 1,
request->node_idx,
flags);
if (ret < 0) {
mlog(ML_ERROR, "failed to dispatch assert "
"master work\n");
response = DLM_MASTER_RESP_ERROR;
}
dispatch_assert = 1;
goto send_response;
} else if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) {
spin_unlock(&res->spinlock);
Expand Down Expand Up @@ -1357,9 +1368,13 @@ int dlm_master_request_handler(struct o2net_msg *msg, u32 len, void *data)
}
} else if (tmpmle->master != DLM_LOCK_RES_OWNER_UNKNOWN) {
set_maybe = 0;
if (tmpmle->master == dlm->node_num)
if (tmpmle->master == dlm->node_num) {
response = DLM_MASTER_RESP_YES;
else
/* this node will be the owner.
* go back and clean the mles on any
* other nodes */
dispatch_assert = 1;
} else
response = DLM_MASTER_RESP_NO;
} else {
// mlog(0, "this node is attempting to "
Expand Down Expand Up @@ -1398,8 +1413,8 @@ int dlm_master_request_handler(struct o2net_msg *msg, u32 len, void *data)
mle = (struct dlm_master_list_entry *)
kmem_cache_alloc(dlm_mle_cache, GFP_KERNEL);
if (!mle) {
// bad bad bad... this sucks.
response = DLM_MASTER_RESP_ERROR;
mlog_errno(-ENOMEM);
goto send_response;
}
spin_lock(&dlm->spinlock);
Expand All @@ -1418,25 +1433,19 @@ int dlm_master_request_handler(struct o2net_msg *msg, u32 len, void *data)
// mlog(0, "mle was found\n");
set_maybe = 1;
spin_lock(&tmpmle->spinlock);
if (tmpmle->master == dlm->node_num) {
mlog(ML_ERROR, "no lockres, but an mle with this node as master!\n");
BUG();
}
if (tmpmle->type == DLM_MLE_BLOCK)
response = DLM_MASTER_RESP_NO;
else if (tmpmle->type == DLM_MLE_MIGRATION) {
mlog(0, "migration mle was found (%u->%u)\n",
tmpmle->master, tmpmle->new_master);
if (tmpmle->master == dlm->node_num) {
mlog(ML_ERROR, "no lockres, but migration mle "
"says that this node is master!\n");
BUG();
}
/* real master can respond on its own */
response = DLM_MASTER_RESP_NO;
} else {
if (tmpmle->master == dlm->node_num) {
response = DLM_MASTER_RESP_YES;
set_maybe = 0;
} else
response = DLM_MASTER_RESP_MAYBE;
}
} else
response = DLM_MASTER_RESP_MAYBE;
if (set_maybe)
set_bit(request->node_idx, tmpmle->maybe_map);
spin_unlock(&tmpmle->spinlock);
Expand All @@ -1449,6 +1458,24 @@ int dlm_master_request_handler(struct o2net_msg *msg, u32 len, void *data)
dlm_put_mle(tmpmle);
}
send_response:

if (dispatch_assert) {
if (response != DLM_MASTER_RESP_YES)
mlog(ML_ERROR, "invalid response %d\n", response);
if (!res) {
mlog(ML_ERROR, "bad lockres while trying to assert!\n");
BUG();
}
mlog(0, "%u is the owner of %.*s, cleaning everyone else\n",
dlm->node_num, res->lockname.len, res->lockname.name);
ret = dlm_dispatch_assert_master(dlm, res, 0, request->node_idx,
DLM_ASSERT_MASTER_MLE_CLEANUP);
if (ret < 0) {
mlog(ML_ERROR, "failed to dispatch assert master work\n");
response = DLM_MASTER_RESP_ERROR;
}
}

dlm_put(dlm);
return response;
}
Expand All @@ -1471,8 +1498,11 @@ static int dlm_do_assert_master(struct dlm_ctxt *dlm, const char *lockname,
int to, tmpret;
struct dlm_node_iter iter;
int ret = 0;
int reassert;

BUG_ON(namelen > O2NM_MAX_NAME_LEN);
again:
reassert = 0;

/* note that if this nodemap is empty, it returns 0 */
dlm_node_iter_init(nodemap, &iter);
Expand Down Expand Up @@ -1504,9 +1534,17 @@ static int dlm_do_assert_master(struct dlm_ctxt *dlm, const char *lockname,
"got %d.\n", namelen, lockname, to, r);
dlm_dump_lock_resources(dlm);
BUG();
} else if (r == EAGAIN) {
mlog(0, "%.*s: node %u create mles on other "
"nodes and requests a re-assert\n",
namelen, lockname, to);
reassert = 1;
}
}

if (reassert)
goto again;

return ret;
}

Expand All @@ -1528,6 +1566,8 @@ int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data)
char *name;
unsigned int namelen;
u32 flags;
int master_request = 0;
int ret = 0;

if (!dlm_grab(dlm))
return 0;
Expand Down Expand Up @@ -1642,11 +1682,22 @@ int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data)
// mlog(0, "woo! got an assert_master from node %u!\n",
// assert->node_idx);
if (mle) {
int extra_ref;
int extra_ref = 0;
int nn = -1;

spin_lock(&mle->spinlock);
extra_ref = !!(mle->type == DLM_MLE_BLOCK
|| mle->type == DLM_MLE_MIGRATION);
if (mle->type == DLM_MLE_BLOCK || mle->type == DLM_MLE_MIGRATION)
extra_ref = 1;
else {
/* MASTER mle: if any bits set in the response map
* then the calling node needs to re-assert to clear
* up nodes that this node contacted */
while ((nn = find_next_bit (mle->response_map, O2NM_MAX_NODES,
nn+1)) < O2NM_MAX_NODES) {
if (nn != dlm->node_num && nn != assert->node_idx)
master_request = 1;
}
}
mle->master = assert->node_idx;
atomic_set(&mle->woken, 1);
wake_up(&mle->wq);
Expand Down Expand Up @@ -1677,10 +1728,15 @@ int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data)
}

done:
ret = 0;
if (res)
dlm_lockres_put(res);
dlm_put(dlm);
return 0;
if (master_request) {
mlog(0, "need to tell master to reassert\n");
ret = EAGAIN; // positive. negative would shoot down the node.
}
return ret;

kill:
/* kill the caller! */
Expand Down Expand Up @@ -1713,6 +1769,10 @@ int dlm_dispatch_assert_master(struct dlm_ctxt *dlm,
item->u.am.request_from = request_from;
item->u.am.flags = flags;

if (ignore_higher)
mlog(0, "IGNORE HIGHER: %.*s\n", res->lockname.len,
res->lockname.name);

spin_lock(&dlm->work_lock);
list_add_tail(&item->list, &dlm->work_list);
spin_unlock(&dlm->work_lock);
Expand Down

0 comments on commit 9c6510a

Please sign in to comment.