Skip to content

Commit

Permalink
---
Browse files Browse the repository at this point in the history
yaml
---
r: 23762
b: refs/heads/master
c: c03872f
h: refs/heads/master
v: v3
  • Loading branch information
Kurt Hackel authored and Mark Fasheh committed Mar 24, 2006
1 parent 33790b5 commit 6de667e
Show file tree
Hide file tree
Showing 5 changed files with 143 additions and 20 deletions.
2 changes: 1 addition & 1 deletion [refs]
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
---
refs/heads/master: 9c6510a5bfe2f1c5f5b93386c06954be02e974e4
refs/heads/master: c03872f5f50bc10f2a1a485f08879a8d01bcfe49
6 changes: 6 additions & 0 deletions trunk/fs/ocfs2/dlm/dlmcommon.h
Original file line number Diff line number Diff line change
Expand Up @@ -658,6 +658,7 @@ void dlm_complete_thread(struct dlm_ctxt *dlm);
int dlm_launch_recovery_thread(struct dlm_ctxt *dlm);
void dlm_complete_recovery_thread(struct dlm_ctxt *dlm);
void dlm_wait_for_recovery(struct dlm_ctxt *dlm);
void dlm_kick_recovery_thread(struct dlm_ctxt *dlm);
int dlm_is_node_dead(struct dlm_ctxt *dlm, u8 node);
int dlm_wait_for_node_death(struct dlm_ctxt *dlm, u8 node, int timeout);

Expand Down Expand Up @@ -762,6 +763,11 @@ int dlm_request_all_locks_handler(struct o2net_msg *msg, u32 len, void *data);
int dlm_reco_data_done_handler(struct o2net_msg *msg, u32 len, void *data);
int dlm_begin_reco_handler(struct o2net_msg *msg, u32 len, void *data);
int dlm_finalize_reco_handler(struct o2net_msg *msg, u32 len, void *data);
int dlm_do_master_requery(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
u8 nodenum, u8 *real_master);
int dlm_lockres_master_requery(struct dlm_ctxt *dlm,
struct dlm_lock_resource *res, u8 *real_master);


int dlm_dispatch_assert_master(struct dlm_ctxt *dlm,
struct dlm_lock_resource *res,
Expand Down
14 changes: 12 additions & 2 deletions trunk/fs/ocfs2/dlm/dlmlock.c
Original file line number Diff line number Diff line change
Expand Up @@ -141,13 +141,23 @@ static enum dlm_status dlmlock_master(struct dlm_ctxt *dlm,
res->lockname.len)) {
kick_thread = 1;
call_ast = 1;
} else {
mlog(0, "%s: returning DLM_NORMAL to "
"node %u for reco lock\n", dlm->name,
lock->ml.node);
}
} else {
/* for NOQUEUE request, unless we get the
* lock right away, return DLM_NOTQUEUED */
if (flags & LKM_NOQUEUE)
if (flags & LKM_NOQUEUE) {
status = DLM_NOTQUEUED;
else {
if (dlm_is_recovery_lock(res->lockname.name,
res->lockname.len)) {
mlog(0, "%s: returning NOTQUEUED to "
"node %u for reco lock\n", dlm->name,
lock->ml.node);
}
} else {
dlm_lock_get(lock);
list_add_tail(&lock->list, &res->blocked);
kick_thread = 1;
Expand Down
103 changes: 103 additions & 0 deletions trunk/fs/ocfs2/dlm/dlmmaster.c
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,8 @@ static void dlm_remove_nonlocal_locks(struct dlm_ctxt *dlm,
static int dlm_mark_lockres_migrating(struct dlm_ctxt *dlm,
struct dlm_lock_resource *res,
u8 target);
static int dlm_pre_master_reco_lockres(struct dlm_ctxt *dlm,
struct dlm_lock_resource *res);


int dlm_is_host_down(int errno)
Expand Down Expand Up @@ -677,6 +679,7 @@ struct dlm_lock_resource * dlm_get_lock_resource(struct dlm_ctxt *dlm,
struct dlm_node_iter iter;
unsigned int namelen;
int tries = 0;
int bit, wait_on_recovery = 0;

BUG_ON(!lockid);

Expand Down Expand Up @@ -762,6 +765,18 @@ struct dlm_lock_resource * dlm_get_lock_resource(struct dlm_ctxt *dlm,
dlm_init_mle(mle, DLM_MLE_MASTER, dlm, res, NULL, 0);
set_bit(dlm->node_num, mle->maybe_map);
list_add(&mle->list, &dlm->master_list);

/* still holding the dlm spinlock, check the recovery map
* to see if there are any nodes that still need to be
* considered. these will not appear in the mle nodemap
* but they might own this lockres. wait on them. */
bit = find_next_bit(dlm->recovery_map, O2NM_MAX_NODES, 0);
if (bit < O2NM_MAX_NODES) {
mlog(ML_NOTICE, "%s:%.*s: at least one node (%d) to"
"recover before lock mastery can begin\n",
dlm->name, namelen, (char *)lockid, bit);
wait_on_recovery = 1;
}
}

/* at this point there is either a DLM_MLE_BLOCK or a
Expand All @@ -779,6 +794,39 @@ struct dlm_lock_resource * dlm_get_lock_resource(struct dlm_ctxt *dlm,
spin_unlock(&dlm->master_lock);
spin_unlock(&dlm->spinlock);

while (wait_on_recovery) {
/* any cluster changes that occurred after dropping the
* dlm spinlock would be detectable be a change on the mle,
* so we only need to clear out the recovery map once. */
if (dlm_is_recovery_lock(lockid, namelen)) {
mlog(ML_NOTICE, "%s: recovery map is not empty, but "
"must master $RECOVERY lock now\n", dlm->name);
if (!dlm_pre_master_reco_lockres(dlm, res))
wait_on_recovery = 0;
else {
mlog(0, "%s: waiting 500ms for heartbeat state "
"change\n", dlm->name);
msleep(500);
}
continue;
}

dlm_kick_recovery_thread(dlm);
msleep(100);
dlm_wait_for_recovery(dlm);

spin_lock(&dlm->spinlock);
bit = find_next_bit(dlm->recovery_map, O2NM_MAX_NODES, 0);
if (bit < O2NM_MAX_NODES) {
mlog(ML_NOTICE, "%s:%.*s: at least one node (%d) to"
"recover before lock mastery can begin\n",
dlm->name, namelen, (char *)lockid, bit);
wait_on_recovery = 1;
} else
wait_on_recovery = 0;
spin_unlock(&dlm->spinlock);
}

/* must wait for lock to be mastered elsewhere */
if (blocked)
goto wait;
Expand Down Expand Up @@ -1835,6 +1883,61 @@ static void dlm_assert_master_worker(struct dlm_work_item *item, void *data)
mlog(0, "finished with dlm_assert_master_worker\n");
}

/* SPECIAL CASE for the $RECOVERY lock used by the recovery thread.
* We cannot wait for node recovery to complete to begin mastering this
* lockres because this lockres is used to kick off recovery! ;-)
* So, do a pre-check on all living nodes to see if any of those nodes
* think that $RECOVERY is currently mastered by a dead node. If so,
* we wait a short time to allow that node to get notified by its own
* heartbeat stack, then check again. All $RECOVERY lock resources
* mastered by dead nodes are purged when the hearbeat callback is
* fired, so we can know for sure that it is safe to continue once
* the node returns a live node or no node. */
static int dlm_pre_master_reco_lockres(struct dlm_ctxt *dlm,
struct dlm_lock_resource *res)
{
struct dlm_node_iter iter;
int nodenum;
int ret = 0;
u8 master = DLM_LOCK_RES_OWNER_UNKNOWN;

spin_lock(&dlm->spinlock);
dlm_node_iter_init(dlm->domain_map, &iter);
spin_unlock(&dlm->spinlock);

while ((nodenum = dlm_node_iter_next(&iter)) >= 0) {
/* do not send to self */
if (nodenum == dlm->node_num)
continue;
ret = dlm_do_master_requery(dlm, res, nodenum, &master);
if (ret < 0) {
mlog_errno(ret);
if (!dlm_is_host_down(ret))
BUG();
/* host is down, so answer for that node would be
* DLM_LOCK_RES_OWNER_UNKNOWN. continue. */
}

if (master != DLM_LOCK_RES_OWNER_UNKNOWN) {
/* check to see if this master is in the recovery map */
spin_lock(&dlm->spinlock);
if (test_bit(master, dlm->recovery_map)) {
mlog(ML_NOTICE, "%s: node %u has not seen "
"node %u go down yet, and thinks the "
"dead node is mastering the recovery "
"lock. must wait.\n", dlm->name,
nodenum, master);
ret = -EAGAIN;
}
spin_unlock(&dlm->spinlock);
mlog(0, "%s: reco lock master is %u\n", dlm->name,
master);
break;
}
}
return ret;
}


/*
* DLM_MIGRATE_LOCKRES
Expand Down
38 changes: 21 additions & 17 deletions trunk/fs/ocfs2/dlm/dlmrecovery.c
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ static void dlm_do_local_recovery_cleanup(struct dlm_ctxt *dlm, u8 dead_node);
static int dlm_recovery_thread(void *data);
void dlm_complete_recovery_thread(struct dlm_ctxt *dlm);
int dlm_launch_recovery_thread(struct dlm_ctxt *dlm);
static void dlm_kick_recovery_thread(struct dlm_ctxt *dlm);
void dlm_kick_recovery_thread(struct dlm_ctxt *dlm);
static int dlm_do_recovery(struct dlm_ctxt *dlm);

static int dlm_pick_recovery_master(struct dlm_ctxt *dlm);
Expand All @@ -78,15 +78,9 @@ static int dlm_send_mig_lockres_msg(struct dlm_ctxt *dlm,
u8 send_to,
struct dlm_lock_resource *res,
int total_locks);
static int dlm_lockres_master_requery(struct dlm_ctxt *dlm,
struct dlm_lock_resource *res,
u8 *real_master);
static int dlm_process_recovery_data(struct dlm_ctxt *dlm,
struct dlm_lock_resource *res,
struct dlm_migratable_lockres *mres);
static int dlm_do_master_requery(struct dlm_ctxt *dlm,
struct dlm_lock_resource *res,
u8 nodenum, u8 *real_master);
static int dlm_send_finalize_reco_message(struct dlm_ctxt *dlm);
static int dlm_send_all_done_msg(struct dlm_ctxt *dlm,
u8 dead_node, u8 send_to);
Expand Down Expand Up @@ -165,7 +159,7 @@ void dlm_dispatch_work(void *data)
* RECOVERY THREAD
*/

static void dlm_kick_recovery_thread(struct dlm_ctxt *dlm)
void dlm_kick_recovery_thread(struct dlm_ctxt *dlm)
{
/* wake the recovery thread
* this will wake the reco thread in one of three places
Expand Down Expand Up @@ -1316,9 +1310,8 @@ static void dlm_mig_lockres_worker(struct dlm_work_item *item, void *data)



static int dlm_lockres_master_requery(struct dlm_ctxt *dlm,
struct dlm_lock_resource *res,
u8 *real_master)
int dlm_lockres_master_requery(struct dlm_ctxt *dlm,
struct dlm_lock_resource *res, u8 *real_master)
{
struct dlm_node_iter iter;
int nodenum;
Expand Down Expand Up @@ -1360,8 +1353,10 @@ static int dlm_lockres_master_requery(struct dlm_ctxt *dlm,
ret = dlm_do_master_requery(dlm, res, nodenum, real_master);
if (ret < 0) {
mlog_errno(ret);
BUG();
/* TODO: need to figure a way to restart this */
if (!dlm_is_host_down(ret))
BUG();
/* host is down, so answer for that node would be
* DLM_LOCK_RES_OWNER_UNKNOWN. continue. */
}
if (*real_master != DLM_LOCK_RES_OWNER_UNKNOWN) {
mlog(0, "lock master is %u\n", *real_master);
Expand All @@ -1372,9 +1367,8 @@ static int dlm_lockres_master_requery(struct dlm_ctxt *dlm,
}


static int dlm_do_master_requery(struct dlm_ctxt *dlm,
struct dlm_lock_resource *res,
u8 nodenum, u8 *real_master)
int dlm_do_master_requery(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
u8 nodenum, u8 *real_master)
{
int ret = -EINVAL;
struct dlm_master_requery req;
Expand Down Expand Up @@ -1739,6 +1733,13 @@ static void dlm_finish_local_lockres_recovery(struct dlm_ctxt *dlm,
} else
continue;

if (!list_empty(&res->recovering)) {
mlog(0, "%s:%.*s: lockres was "
"marked RECOVERING, owner=%u\n",
dlm->name, res->lockname.len,
res->lockname.name, res->owner);
list_del_init(&res->recovering);
}
spin_lock(&res->spinlock);
dlm_change_lockres_owner(dlm, res, new_master);
res->state &= ~DLM_LOCK_RES_RECOVERING;
Expand Down Expand Up @@ -2258,7 +2259,10 @@ int dlm_begin_reco_handler(struct o2net_msg *msg, u32 len, void *data)
mlog(0, "%u not in domain/live_nodes map "
"so setting it in reco map manually\n",
br->dead_node);
set_bit(br->dead_node, dlm->recovery_map);
/* force the recovery cleanup in __dlm_hb_node_down
* both of these will be cleared in a moment */
set_bit(br->dead_node, dlm->domain_map);
set_bit(br->dead_node, dlm->live_nodes_map);
__dlm_hb_node_down(dlm, br->dead_node);
}
spin_unlock(&dlm->spinlock);
Expand Down

0 comments on commit 6de667e

Please sign in to comment.