Skip to content

Commit

Permalink
ocfs2_dlm: fix cluster-wide refcounting of lock resources
Browse files Browse the repository at this point in the history
This was previously broken and migration of some locks had to be temporarily
disabled. We use a new (and backward-incompatible) set of network messages
to account for all references to a lock resources held across the cluster.
once these are all freed, the master node may then free the lock resource
memory once its local references are dropped.

Signed-off-by: Kurt Hackel <kurt.hackel@oracle.com>
Signed-off-by: Mark Fasheh <mark.fasheh@oracle.com>
  • Loading branch information
Kurt Hackel authored and Mark Fasheh committed Feb 7, 2007
1 parent 5331be0 commit ba2bf21
Show file tree
Hide file tree
Showing 8 changed files with 729 additions and 174 deletions.
5 changes: 4 additions & 1 deletion fs/ocfs2/cluster/tcp_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@
* locking semantics of the file system using the protocol. It should
* be somewhere else, I'm sure, but right now it isn't.
*
* New in version 6:
* - DLM lockres remote refcount fixes.
*
* New in version 5:
* - Network timeout checking protocol
*
Expand All @@ -51,7 +54,7 @@
* - full 64 bit i_size in the metadata lock lvbs
* - introduction of "rw" lock and pushing meta/data locking down
*/
#define O2NET_PROTOCOL_VERSION 5ULL
#define O2NET_PROTOCOL_VERSION 6ULL
struct o2net_handshake {
__be64 protocol_version;
__be64 connector_id;
Expand Down
75 changes: 71 additions & 4 deletions fs/ocfs2/dlm/dlmcommon.h
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ static inline void __dlm_set_joining_node(struct dlm_ctxt *dlm,
#define DLM_LOCK_RES_DIRTY 0x00000008
#define DLM_LOCK_RES_IN_PROGRESS 0x00000010
#define DLM_LOCK_RES_MIGRATING 0x00000020
#define DLM_LOCK_RES_DROPPING_REF 0x00000040

/* max milliseconds to wait to sync up a network failure with a node death */
#define DLM_NODE_DEATH_WAIT_MAX (5 * 1000)
Expand Down Expand Up @@ -265,6 +266,8 @@ struct dlm_lock_resource
u8 owner; //node which owns the lock resource, or unknown
u16 state;
char lvb[DLM_LVB_LEN];
unsigned int inflight_locks;
unsigned long refmap[BITS_TO_LONGS(O2NM_MAX_NODES)];
};

struct dlm_migratable_lock
Expand Down Expand Up @@ -367,7 +370,7 @@ enum {
DLM_CONVERT_LOCK_MSG, /* 504 */
DLM_PROXY_AST_MSG, /* 505 */
DLM_UNLOCK_LOCK_MSG, /* 506 */
DLM_UNUSED_MSG2, /* 507 */
DLM_DEREF_LOCKRES_MSG, /* 507 */
DLM_MIGRATE_REQUEST_MSG, /* 508 */
DLM_MIG_LOCKRES_MSG, /* 509 */
DLM_QUERY_JOIN_MSG, /* 510 */
Expand Down Expand Up @@ -417,6 +420,9 @@ struct dlm_master_request
u8 name[O2NM_MAX_NAME_LEN];
};

#define DLM_ASSERT_RESPONSE_REASSERT 0x00000001
#define DLM_ASSERT_RESPONSE_MASTERY_REF 0x00000002

#define DLM_ASSERT_MASTER_MLE_CLEANUP 0x00000001
#define DLM_ASSERT_MASTER_REQUERY 0x00000002
#define DLM_ASSERT_MASTER_FINISH_MIGRATION 0x00000004
Expand All @@ -430,6 +436,8 @@ struct dlm_assert_master
u8 name[O2NM_MAX_NAME_LEN];
};

#define DLM_MIGRATE_RESPONSE_MASTERY_REF 0x00000001

struct dlm_migrate_request
{
u8 master;
Expand Down Expand Up @@ -648,6 +656,16 @@ struct dlm_finalize_reco
__be32 pad2;
};

struct dlm_deref_lockres
{
u32 pad1;
u16 pad2;
u8 node_idx;
u8 namelen;

u8 name[O2NM_MAX_NAME_LEN];
};

static inline enum dlm_status
__dlm_lockres_state_to_status(struct dlm_lock_resource *res)
{
Expand Down Expand Up @@ -721,8 +739,8 @@ void __dlm_lockres_calc_usage(struct dlm_ctxt *dlm,
struct dlm_lock_resource *res);
void dlm_lockres_calc_usage(struct dlm_ctxt *dlm,
struct dlm_lock_resource *res);
void dlm_purge_lockres(struct dlm_ctxt *dlm,
struct dlm_lock_resource *lockres);
int dlm_purge_lockres(struct dlm_ctxt *dlm,
struct dlm_lock_resource *lockres);
static inline void dlm_lockres_get(struct dlm_lock_resource *res)
{
/* This is called on every lookup, so it might be worth
Expand All @@ -733,6 +751,10 @@ void dlm_lockres_put(struct dlm_lock_resource *res);
void __dlm_unhash_lockres(struct dlm_lock_resource *res);
void __dlm_insert_lockres(struct dlm_ctxt *dlm,
struct dlm_lock_resource *res);
struct dlm_lock_resource * __dlm_lookup_lockres_full(struct dlm_ctxt *dlm,
const char *name,
unsigned int len,
unsigned int hash);
struct dlm_lock_resource * __dlm_lookup_lockres(struct dlm_ctxt *dlm,
const char *name,
unsigned int len,
Expand All @@ -753,6 +775,47 @@ struct dlm_lock_resource *dlm_new_lockres(struct dlm_ctxt *dlm,
const char *name,
unsigned int namelen);

#define dlm_lockres_set_refmap_bit(bit,res) \
__dlm_lockres_set_refmap_bit(bit,res,__FILE__,__LINE__)
#define dlm_lockres_clear_refmap_bit(bit,res) \
__dlm_lockres_clear_refmap_bit(bit,res,__FILE__,__LINE__)

static inline void __dlm_lockres_set_refmap_bit(int bit,
struct dlm_lock_resource *res,
const char *file,
int line)
{
//printk("%s:%d:%.*s: setting bit %d\n", file, line,
// res->lockname.len, res->lockname.name, bit);
set_bit(bit, res->refmap);
}

static inline void __dlm_lockres_clear_refmap_bit(int bit,
struct dlm_lock_resource *res,
const char *file,
int line)
{
//printk("%s:%d:%.*s: clearing bit %d\n", file, line,
// res->lockname.len, res->lockname.name, bit);
clear_bit(bit, res->refmap);
}

void __dlm_lockres_drop_inflight_ref(struct dlm_ctxt *dlm,
struct dlm_lock_resource *res,
const char *file,
int line);
void __dlm_lockres_grab_inflight_ref(struct dlm_ctxt *dlm,
struct dlm_lock_resource *res,
int new_lockres,
const char *file,
int line);
#define dlm_lockres_drop_inflight_ref(d,r) \
__dlm_lockres_drop_inflight_ref(d,r,__FILE__,__LINE__)
#define dlm_lockres_grab_inflight_ref(d,r) \
__dlm_lockres_grab_inflight_ref(d,r,0,__FILE__,__LINE__)
#define dlm_lockres_grab_inflight_ref_new(d,r) \
__dlm_lockres_grab_inflight_ref(d,r,1,__FILE__,__LINE__)

void dlm_queue_ast(struct dlm_ctxt *dlm, struct dlm_lock *lock);
void dlm_queue_bast(struct dlm_ctxt *dlm, struct dlm_lock *lock);
void dlm_do_local_ast(struct dlm_ctxt *dlm,
Expand Down Expand Up @@ -805,6 +868,7 @@ int dlm_lockres_is_dirty(struct dlm_ctxt *dlm, struct dlm_lock_resource *res);
int dlm_migrate_lockres(struct dlm_ctxt *dlm,
struct dlm_lock_resource *res,
u8 target);
int dlm_empty_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res);
int dlm_finish_migration(struct dlm_ctxt *dlm,
struct dlm_lock_resource *res,
u8 old_master);
Expand All @@ -814,6 +878,7 @@ void __dlm_lockres_reserve_ast(struct dlm_lock_resource *res);

int dlm_master_request_handler(struct o2net_msg *msg, u32 len, void *data);
int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data);
int dlm_deref_lockres_handler(struct o2net_msg *msg, u32 len, void *data);
int dlm_migrate_request_handler(struct o2net_msg *msg, u32 len, void *data);
int dlm_mig_lockres_handler(struct o2net_msg *msg, u32 len, void *data);
int dlm_master_requery_handler(struct o2net_msg *msg, u32 len, void *data);
Expand Down Expand Up @@ -856,10 +921,12 @@ static inline void __dlm_wait_on_lockres(struct dlm_lock_resource *res)
int dlm_init_mle_cache(void);
void dlm_destroy_mle_cache(void);
void dlm_hb_event_notify_attached(struct dlm_ctxt *dlm, int idx, int node_up);
int dlm_drop_lockres_ref(struct dlm_ctxt *dlm,
struct dlm_lock_resource *res);
void dlm_clean_master_list(struct dlm_ctxt *dlm,
u8 dead_node);
int dlm_lock_basts_flushed(struct dlm_ctxt *dlm, struct dlm_lock *lock);

int __dlm_lockres_has_locks(struct dlm_lock_resource *res);
int __dlm_lockres_unused(struct dlm_lock_resource *res);

static inline const char * dlm_lock_mode_name(int mode)
Expand Down
18 changes: 18 additions & 0 deletions fs/ocfs2/dlm/dlmdebug.c
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,23 @@ void dlm_print_one_lock_resource(struct dlm_lock_resource *res)
spin_unlock(&res->spinlock);
}

static void dlm_print_lockres_refmap(struct dlm_lock_resource *res)
{
int bit;
assert_spin_locked(&res->spinlock);

mlog(ML_NOTICE, " refmap nodes: [ ");
bit = 0;
while (1) {
bit = find_next_bit(res->refmap, O2NM_MAX_NODES, bit);
if (bit >= O2NM_MAX_NODES)
break;
printk("%u ", bit);
bit++;
}
printk("], inflight=%u\n", res->inflight_locks);
}

void __dlm_print_one_lock_resource(struct dlm_lock_resource *res)
{
struct list_head *iter2;
Expand All @@ -65,6 +82,7 @@ void __dlm_print_one_lock_resource(struct dlm_lock_resource *res)
res->owner, res->state);
mlog(ML_NOTICE, " last used: %lu, on purge list: %s\n",
res->last_used, list_empty(&res->purge) ? "no" : "yes");
dlm_print_lockres_refmap(res);
mlog(ML_NOTICE, " granted queue: \n");
list_for_each(iter2, &res->granted) {
lock = list_entry(iter2, struct dlm_lock, list);
Expand Down
117 changes: 87 additions & 30 deletions fs/ocfs2/dlm/dlmdomain.c
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,10 @@ void __dlm_insert_lockres(struct dlm_ctxt *dlm,
hlist_add_head(&res->hash_node, bucket);
}

struct dlm_lock_resource * __dlm_lookup_lockres(struct dlm_ctxt *dlm,
const char *name,
unsigned int len,
unsigned int hash)
struct dlm_lock_resource * __dlm_lookup_lockres_full(struct dlm_ctxt *dlm,
const char *name,
unsigned int len,
unsigned int hash)
{
struct hlist_head *bucket;
struct hlist_node *list;
Expand All @@ -154,6 +154,37 @@ struct dlm_lock_resource * __dlm_lookup_lockres(struct dlm_ctxt *dlm,
return NULL;
}

/* intended to be called by functions which do not care about lock
* resources which are being purged (most net _handler functions).
* this will return NULL for any lock resource which is found but
* currently in the process of dropping its mastery reference.
* use __dlm_lookup_lockres_full when you need the lock resource
* regardless (e.g. dlm_get_lock_resource) */
struct dlm_lock_resource * __dlm_lookup_lockres(struct dlm_ctxt *dlm,
const char *name,
unsigned int len,
unsigned int hash)
{
struct dlm_lock_resource *res = NULL;

mlog_entry("%.*s\n", len, name);

assert_spin_locked(&dlm->spinlock);

res = __dlm_lookup_lockres_full(dlm, name, len, hash);
if (res) {
spin_lock(&res->spinlock);
if (res->state & DLM_LOCK_RES_DROPPING_REF) {
spin_unlock(&res->spinlock);
dlm_lockres_put(res);
return NULL;
}
spin_unlock(&res->spinlock);
}

return res;
}

struct dlm_lock_resource * dlm_lookup_lockres(struct dlm_ctxt *dlm,
const char *name,
unsigned int len)
Expand Down Expand Up @@ -330,43 +361,60 @@ static void dlm_complete_dlm_shutdown(struct dlm_ctxt *dlm)
wake_up(&dlm_domain_events);
}

static void dlm_migrate_all_locks(struct dlm_ctxt *dlm)
static int dlm_migrate_all_locks(struct dlm_ctxt *dlm)
{
int i;
int i, num, n, ret = 0;
struct dlm_lock_resource *res;
struct hlist_node *iter;
struct hlist_head *bucket;
int dropped;

mlog(0, "Migrating locks from domain %s\n", dlm->name);
restart:

num = 0;
spin_lock(&dlm->spinlock);
for (i = 0; i < DLM_HASH_BUCKETS; i++) {
while (!hlist_empty(dlm_lockres_hash(dlm, i))) {
res = hlist_entry(dlm_lockres_hash(dlm, i)->first,
struct dlm_lock_resource, hash_node);
/* need reference when manually grabbing lockres */
redo_bucket:
n = 0;
bucket = dlm_lockres_hash(dlm, i);
iter = bucket->first;
while (iter) {
n++;
res = hlist_entry(iter, struct dlm_lock_resource,
hash_node);
dlm_lockres_get(res);
/* this should unhash the lockres
* and exit with dlm->spinlock */
mlog(0, "purging res=%p\n", res);
if (dlm_lockres_is_dirty(dlm, res)) {
/* HACK! this should absolutely go.
* need to figure out why some empty
* lockreses are still marked dirty */
mlog(ML_ERROR, "lockres %.*s dirty!\n",
res->lockname.len, res->lockname.name);

spin_unlock(&dlm->spinlock);
dlm_kick_thread(dlm, res);
wait_event(dlm->ast_wq, !dlm_lockres_is_dirty(dlm, res));
dlm_lockres_put(res);
goto restart;
}
dlm_purge_lockres(dlm, res);
/* migrate, if necessary. this will drop the dlm
* spinlock and retake it if it does migration. */
dropped = dlm_empty_lockres(dlm, res);

spin_lock(&res->spinlock);
__dlm_lockres_calc_usage(dlm, res);
iter = res->hash_node.next;
spin_unlock(&res->spinlock);

dlm_lockres_put(res);

cond_resched_lock(&dlm->spinlock);

if (dropped)
goto redo_bucket;
}
num += n;
mlog(0, "%s: touched %d lockreses in bucket %d "
"(tot=%d)\n", dlm->name, n, i, num);
}
spin_unlock(&dlm->spinlock);

wake_up(&dlm->dlm_thread_wq);

/* let the dlm thread take care of purging, keep scanning until
* nothing remains in the hash */
if (num) {
mlog(0, "%s: %d lock resources in hash last pass\n",
dlm->name, num);
ret = -EAGAIN;
}
mlog(0, "DONE Migrating locks from domain %s\n", dlm->name);
return ret;
}

static int dlm_no_joining_node(struct dlm_ctxt *dlm)
Expand Down Expand Up @@ -571,7 +619,9 @@ void dlm_unregister_domain(struct dlm_ctxt *dlm)
/* We changed dlm state, notify the thread */
dlm_kick_thread(dlm, NULL);

dlm_migrate_all_locks(dlm);
while (dlm_migrate_all_locks(dlm)) {
mlog(0, "%s: more migration to do\n", dlm->name);
}
dlm_mark_domain_leaving(dlm);
dlm_leave_domain(dlm);
dlm_complete_dlm_shutdown(dlm);
Expand Down Expand Up @@ -1082,6 +1132,13 @@ static int dlm_register_domain_handlers(struct dlm_ctxt *dlm)
if (status)
goto bail;

status = o2net_register_handler(DLM_DEREF_LOCKRES_MSG, dlm->key,
sizeof(struct dlm_deref_lockres),
dlm_deref_lockres_handler,
dlm, &dlm->dlm_domain_handlers);
if (status)
goto bail;

status = o2net_register_handler(DLM_MIGRATE_REQUEST_MSG, dlm->key,
sizeof(struct dlm_migrate_request),
dlm_migrate_request_handler,
Expand Down
4 changes: 4 additions & 0 deletions fs/ocfs2/dlm/dlmlock.c
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,10 @@ static enum dlm_status dlmlock_master(struct dlm_ctxt *dlm,
kick_thread = 1;
}
}
/* reduce the inflight count, this may result in the lockres
* being purged below during calc_usage */
if (lock->ml.node == dlm->node_num)
dlm_lockres_drop_inflight_ref(dlm, res);

spin_unlock(&res->spinlock);
wake_up(&res->wq);
Expand Down
Loading

0 comments on commit ba2bf21

Please sign in to comment.