diff --git a/fs/dlm/config.c b/fs/dlm/config.c
index e7e327d43fa59..9ccf7346834aa 100644
--- a/fs/dlm/config.c
+++ b/fs/dlm/config.c
@@ -96,7 +96,6 @@ struct dlm_cluster {
 	unsigned int cl_tcp_port;
 	unsigned int cl_buffer_size;
 	unsigned int cl_rsbtbl_size;
-	unsigned int cl_dirtbl_size;
 	unsigned int cl_recover_timer;
 	unsigned int cl_toss_secs;
 	unsigned int cl_scan_secs;
@@ -113,7 +112,6 @@ enum {
 	CLUSTER_ATTR_TCP_PORT = 0,
 	CLUSTER_ATTR_BUFFER_SIZE,
 	CLUSTER_ATTR_RSBTBL_SIZE,
-	CLUSTER_ATTR_DIRTBL_SIZE,
 	CLUSTER_ATTR_RECOVER_TIMER,
 	CLUSTER_ATTR_TOSS_SECS,
 	CLUSTER_ATTR_SCAN_SECS,
@@ -189,7 +187,6 @@ __CONFIGFS_ATTR(name, 0644, name##_read, name##_write)
 CLUSTER_ATTR(tcp_port, 1);
 CLUSTER_ATTR(buffer_size, 1);
 CLUSTER_ATTR(rsbtbl_size, 1);
-CLUSTER_ATTR(dirtbl_size, 1);
 CLUSTER_ATTR(recover_timer, 1);
 CLUSTER_ATTR(toss_secs, 1);
 CLUSTER_ATTR(scan_secs, 1);
@@ -204,7 +201,6 @@ static struct configfs_attribute *cluster_attrs[] = {
 	[CLUSTER_ATTR_TCP_PORT] = &cluster_attr_tcp_port.attr,
 	[CLUSTER_ATTR_BUFFER_SIZE] = &cluster_attr_buffer_size.attr,
 	[CLUSTER_ATTR_RSBTBL_SIZE] = &cluster_attr_rsbtbl_size.attr,
-	[CLUSTER_ATTR_DIRTBL_SIZE] = &cluster_attr_dirtbl_size.attr,
 	[CLUSTER_ATTR_RECOVER_TIMER] = &cluster_attr_recover_timer.attr,
 	[CLUSTER_ATTR_TOSS_SECS] = &cluster_attr_toss_secs.attr,
 	[CLUSTER_ATTR_SCAN_SECS] = &cluster_attr_scan_secs.attr,
@@ -478,7 +474,6 @@ static struct config_group *make_cluster(struct config_group *g,
 	cl->cl_tcp_port = dlm_config.ci_tcp_port;
 	cl->cl_buffer_size = dlm_config.ci_buffer_size;
 	cl->cl_rsbtbl_size = dlm_config.ci_rsbtbl_size;
-	cl->cl_dirtbl_size = dlm_config.ci_dirtbl_size;
 	cl->cl_recover_timer = dlm_config.ci_recover_timer;
 	cl->cl_toss_secs = dlm_config.ci_toss_secs;
 	cl->cl_scan_secs = dlm_config.ci_scan_secs;
@@ -1050,7 +1045,6 @@ int dlm_our_addr(struct sockaddr_storage *addr, int num)
 #define DEFAULT_TCP_PORT       21064
 #define DEFAULT_BUFFER_SIZE     4096
 #define DEFAULT_RSBTBL_SIZE     1024
-#define DEFAULT_DIRTBL_SIZE     1024
 #define DEFAULT_RECOVER_TIMER      5
 #define DEFAULT_TOSS_SECS         10
 #define DEFAULT_SCAN_SECS          5
@@ -1066,7 +1060,6 @@ struct dlm_config_info dlm_config = {
 	.ci_tcp_port = DEFAULT_TCP_PORT,
 	.ci_buffer_size = DEFAULT_BUFFER_SIZE,
 	.ci_rsbtbl_size = DEFAULT_RSBTBL_SIZE,
-	.ci_dirtbl_size = DEFAULT_DIRTBL_SIZE,
 	.ci_recover_timer = DEFAULT_RECOVER_TIMER,
 	.ci_toss_secs = DEFAULT_TOSS_SECS,
 	.ci_scan_secs = DEFAULT_SCAN_SECS,
diff --git a/fs/dlm/config.h b/fs/dlm/config.h
index 9f5e3663bb0c9..dbd35a08f3a5f 100644
--- a/fs/dlm/config.h
+++ b/fs/dlm/config.h
@@ -27,7 +27,6 @@ struct dlm_config_info {
 	int ci_tcp_port;
 	int ci_buffer_size;
 	int ci_rsbtbl_size;
-	int ci_dirtbl_size;
 	int ci_recover_timer;
 	int ci_toss_secs;
 	int ci_scan_secs;
diff --git a/fs/dlm/debug_fs.c b/fs/dlm/debug_fs.c
index 1c9b08095f987..b969deef9ebbe 100644
--- a/fs/dlm/debug_fs.c
+++ b/fs/dlm/debug_fs.c
@@ -344,6 +344,45 @@ static int print_format3(struct dlm_rsb *r, struct seq_file *s)
 	return rv;
 }
 
+static int print_format4(struct dlm_rsb *r, struct seq_file *s)
+{
+	int our_nodeid = dlm_our_nodeid();
+	int print_name = 1;
+	int i, rv;
+
+	lock_rsb(r);
+
+	rv = seq_printf(s, "rsb %p %d %d %d %d %lu %lx %d ",
+			r,
+			r->res_nodeid,
+			r->res_master_nodeid,
+			r->res_dir_nodeid,
+			our_nodeid,
+			r->res_toss_time,
+			r->res_flags,
+			r->res_length);
+	if (rv)
+		goto out;
+
+	for (i = 0; i < r->res_length; i++) {
+		if (!isascii(r->res_name[i]) || !isprint(r->res_name[i]))
+			print_name = 0;
+	}
+
+	seq_printf(s, "%s", print_name ? "str " : "hex");
+
+	for (i = 0; i < r->res_length; i++) {
+		if (print_name)
+			seq_printf(s, "%c", r->res_name[i]);
+		else
+			seq_printf(s, " %02x", (unsigned char)r->res_name[i]);
+	}
+	rv = seq_printf(s, "\n");
+ out:
+	unlock_rsb(r);
+	return rv;
+}
+
 struct rsbtbl_iter {
 	struct dlm_rsb *rsb;
 	unsigned bucket;
@@ -382,6 +421,13 @@ static int table_seq_show(struct seq_file *seq, void *iter_ptr)
 		}
 		rv = print_format3(ri->rsb, seq);
 		break;
+	case 4:
+		if (ri->header) {
+			seq_printf(seq, "version 4 rsb 2\n");
+			ri->header = 0;
+		}
+		rv = print_format4(ri->rsb, seq);
+		break;
 	}
 
 	return rv;
@@ -390,15 +436,18 @@ static int table_seq_show(struct seq_file *seq, void *iter_ptr)
 static const struct seq_operations format1_seq_ops;
 static const struct seq_operations format2_seq_ops;
 static const struct seq_operations format3_seq_ops;
+static const struct seq_operations format4_seq_ops;
 
 static void *table_seq_start(struct seq_file *seq, loff_t *pos)
 {
+	struct rb_root *tree;
 	struct rb_node *node;
 	struct dlm_ls *ls = seq->private;
 	struct rsbtbl_iter *ri;
 	struct dlm_rsb *r;
 	loff_t n = *pos;
 	unsigned bucket, entry;
+	int toss = (seq->op == &format4_seq_ops);
 
 	bucket = n >> 32;
 	entry = n & ((1LL << 32) - 1);
@@ -417,11 +466,14 @@ static void *table_seq_start(struct seq_file *seq, loff_t *pos)
 		ri->format = 2;
 	if (seq->op == &format3_seq_ops)
 		ri->format = 3;
+	if (seq->op == &format4_seq_ops)
+		ri->format = 4;
+
+	tree = toss ? &ls->ls_rsbtbl[bucket].toss : &ls->ls_rsbtbl[bucket].keep;
 
 	spin_lock(&ls->ls_rsbtbl[bucket].lock);
-	if (!RB_EMPTY_ROOT(&ls->ls_rsbtbl[bucket].keep)) {
-		for (node = rb_first(&ls->ls_rsbtbl[bucket].keep); node;
-		     node = rb_next(node)) {
+	if (!RB_EMPTY_ROOT(tree)) {
+		for (node = rb_first(tree); node; node = rb_next(node)) {
 			r = rb_entry(node, struct dlm_rsb, res_hashnode);
 			if (!entry--) {
 				dlm_hold_rsb(r);
@@ -449,10 +501,11 @@ static void *table_seq_start(struct seq_file *seq, loff_t *pos)
 			kfree(ri);
 			return NULL;
 		}
+		tree = toss ? &ls->ls_rsbtbl[bucket].toss : &ls->ls_rsbtbl[bucket].keep;
 
 		spin_lock(&ls->ls_rsbtbl[bucket].lock);
-		if (!RB_EMPTY_ROOT(&ls->ls_rsbtbl[bucket].keep)) {
-			node = rb_first(&ls->ls_rsbtbl[bucket].keep);
+		if (!RB_EMPTY_ROOT(tree)) {
+			node = rb_first(tree);
 			r = rb_entry(node, struct dlm_rsb, res_hashnode);
 			dlm_hold_rsb(r);
 			ri->rsb = r;
@@ -469,10 +522,12 @@ static void *table_seq_next(struct seq_file *seq, void *iter_ptr, loff_t *pos)
 {
 	struct dlm_ls *ls = seq->private;
 	struct rsbtbl_iter *ri = iter_ptr;
+	struct rb_root *tree;
 	struct rb_node *next;
 	struct dlm_rsb *r, *rp;
 	loff_t n = *pos;
 	unsigned bucket;
+	int toss = (seq->op == &format4_seq_ops);
 
 	bucket = n >> 32;
 
@@ -511,10 +566,11 @@ static void *table_seq_next(struct seq_file *seq, void *iter_ptr, loff_t *pos)
 			kfree(ri);
 			return NULL;
 		}
+		tree = toss ? &ls->ls_rsbtbl[bucket].toss : &ls->ls_rsbtbl[bucket].keep;
 
 		spin_lock(&ls->ls_rsbtbl[bucket].lock);
-		if (!RB_EMPTY_ROOT(&ls->ls_rsbtbl[bucket].keep)) {
-			next = rb_first(&ls->ls_rsbtbl[bucket].keep);
+		if (!RB_EMPTY_ROOT(tree)) {
+			next = rb_first(tree);
 			r = rb_entry(next, struct dlm_rsb, res_hashnode);
 			dlm_hold_rsb(r);
 			ri->rsb = r;
@@ -558,9 +614,17 @@ static const struct seq_operations format3_seq_ops = {
 	.show  = table_seq_show,
 };
 
+static const struct seq_operations format4_seq_ops = {
+	.start = table_seq_start,
+	.next  = table_seq_next,
+	.stop  = table_seq_stop,
+	.show  = table_seq_show,
+};
+
 static const struct file_operations format1_fops;
 static const struct file_operations format2_fops;
 static const struct file_operations format3_fops;
+static const struct file_operations format4_fops;
 
 static int table_open(struct inode *inode, struct file *file)
 {
@@ -573,6 +637,8 @@ static int table_open(struct inode *inode, struct file *file)
 		ret = seq_open(file, &format2_seq_ops);
 	else if (file->f_op == &format3_fops)
 		ret = seq_open(file, &format3_seq_ops);
+	else if (file->f_op == &format4_fops)
+		ret = seq_open(file, &format4_seq_ops);
 
 	if (ret)
 		return ret;
@@ -606,6 +672,14 @@ static const struct file_operations format3_fops = {
 	.release = seq_release
 };
 
+static const struct file_operations format4_fops = {
+	.owner   = THIS_MODULE,
+	.open    = table_open,
+	.read    = seq_read,
+	.llseek  = seq_lseek,
+	.release = seq_release
+};
+
 /*
  * dump lkb's on the ls_waiters list
  */
@@ -652,6 +726,8 @@ void dlm_delete_debug_file(struct dlm_ls *ls)
 		debugfs_remove(ls->ls_debug_locks_dentry);
 	if (ls->ls_debug_all_dentry)
 		debugfs_remove(ls->ls_debug_all_dentry);
+	if (ls->ls_debug_toss_dentry)
+		debugfs_remove(ls->ls_debug_toss_dentry);
 }
 
 int dlm_create_debug_file(struct dlm_ls *ls)
@@ -694,6 +770,19 @@ int dlm_create_debug_file(struct dlm_ls *ls)
 	if (!ls->ls_debug_all_dentry)
 		goto fail;
 
+	/* format 4 */
+
+	memset(name, 0, sizeof(name));
+	snprintf(name, DLM_LOCKSPACE_LEN+8, "%s_toss", ls->ls_name);
+
+	ls->ls_debug_toss_dentry = debugfs_create_file(name,
+						       S_IFREG | S_IRUGO,
+						       dlm_root,
+						       ls,
+						       &format4_fops);
+	if (!ls->ls_debug_toss_dentry)
+		goto fail;
+
 	memset(name, 0, sizeof(name));
 	snprintf(name, DLM_LOCKSPACE_LEN+8, "%s_waiters", ls->ls_name);
 
diff --git a/fs/dlm/dir.c b/fs/dlm/dir.c
index dc5eb598b81f2..278a75cda4463 100644
--- a/fs/dlm/dir.c
+++ b/fs/dlm/dir.c
@@ -23,50 +23,6 @@
 #include "lock.h"
 #include "dir.h"
 
-
-static void put_free_de(struct dlm_ls *ls, struct dlm_direntry *de)
-{
-	spin_lock(&ls->ls_recover_list_lock);
-	list_add(&de->list, &ls->ls_recover_list);
-	spin_unlock(&ls->ls_recover_list_lock);
-}
-
-static struct dlm_direntry *get_free_de(struct dlm_ls *ls, int len)
-{
-	int found = 0;
-	struct dlm_direntry *de;
-
-	spin_lock(&ls->ls_recover_list_lock);
-	list_for_each_entry(de, &ls->ls_recover_list, list) {
-		if (de->length == len) {
-			list_del(&de->list);
-			de->master_nodeid = 0;
-			memset(de->name, 0, len);
-			found = 1;
-			break;
-		}
-	}
-	spin_unlock(&ls->ls_recover_list_lock);
-
-	if (!found)
-		de = kzalloc(sizeof(struct dlm_direntry) + len, GFP_NOFS);
-	return de;
-}
-
-void dlm_clear_free_entries(struct dlm_ls *ls)
-{
-	struct dlm_direntry *de;
-
-	spin_lock(&ls->ls_recover_list_lock);
-	while (!list_empty(&ls->ls_recover_list)) {
-		de = list_entry(ls->ls_recover_list.next, struct dlm_direntry,
-				list);
-		list_del(&de->list);
-		kfree(de);
-	}
-	spin_unlock(&ls->ls_recover_list_lock);
-}
-
 /*
  * We use the upper 16 bits of the hash value to select the directory node.
  * Low bits are used for distribution of rsb's among hash buckets on each node.
@@ -78,144 +34,53 @@ void dlm_clear_free_entries(struct dlm_ls *ls)
 
 int dlm_hash2nodeid(struct dlm_ls *ls, uint32_t hash)
 {
-	struct list_head *tmp;
-	struct dlm_member *memb = NULL;
-	uint32_t node, n = 0;
-	int nodeid;
-
-	if (ls->ls_num_nodes == 1) {
-		nodeid = dlm_our_nodeid();
-		goto out;
-	}
+	uint32_t node;
 
-	if (ls->ls_node_array) {
+	if (ls->ls_num_nodes == 1)
+		return dlm_our_nodeid();
+	else {
 		node = (hash >> 16) % ls->ls_total_weight;
-		nodeid = ls->ls_node_array[node];
-		goto out;
-	}
-
-	/* make_member_array() failed to kmalloc ls_node_array... */
-
-	node = (hash >> 16) % ls->ls_num_nodes;
-
-	list_for_each(tmp, &ls->ls_nodes) {
-		if (n++ != node)
-			continue;
-		memb = list_entry(tmp, struct dlm_member, list);
-		break;
+		return ls->ls_node_array[node];
 	}
-
-	DLM_ASSERT(memb , printk("num_nodes=%u n=%u node=%u\n",
-				 ls->ls_num_nodes, n, node););
-	nodeid = memb->nodeid;
- out:
-	return nodeid;
 }
 
 int dlm_dir_nodeid(struct dlm_rsb *r)
 {
-	return dlm_hash2nodeid(r->res_ls, r->res_hash);
-}
-
-static inline uint32_t dir_hash(struct dlm_ls *ls, char *name, int len)
-{
-	uint32_t val;
-
-	val = jhash(name, len, 0);
-	val &= (ls->ls_dirtbl_size - 1);
-
-	return val;
-}
-
-static void add_entry_to_hash(struct dlm_ls *ls, struct dlm_direntry *de)
-{
-	uint32_t bucket;
-
-	bucket = dir_hash(ls, de->name, de->length);
-	list_add_tail(&de->list, &ls->ls_dirtbl[bucket].list);
+	return r->res_dir_nodeid;
 }
 
-static struct dlm_direntry *search_bucket(struct dlm_ls *ls, char *name,
-					  int namelen, uint32_t bucket)
+void dlm_recover_dir_nodeid(struct dlm_ls *ls)
 {
-	struct dlm_direntry *de;
-
-	list_for_each_entry(de, &ls->ls_dirtbl[bucket].list, list) {
-		if (de->length == namelen && !memcmp(name, de->name, namelen))
-			goto out;
-	}
-	de = NULL;
- out:
-	return de;
-}
-
-void dlm_dir_remove_entry(struct dlm_ls *ls, int nodeid, char *name, int namelen)
-{
-	struct dlm_direntry *de;
-	uint32_t bucket;
-
-	bucket = dir_hash(ls, name, namelen);
-
-	spin_lock(&ls->ls_dirtbl[bucket].lock);
-
-	de = search_bucket(ls, name, namelen, bucket);
-
-	if (!de) {
-		log_error(ls, "remove fr %u none", nodeid);
-		goto out;
-	}
-
-	if (de->master_nodeid != nodeid) {
-		log_error(ls, "remove fr %u ID %u", nodeid, de->master_nodeid);
-		goto out;
-	}
-
-	list_del(&de->list);
-	kfree(de);
- out:
-	spin_unlock(&ls->ls_dirtbl[bucket].lock);
-}
+	struct dlm_rsb *r;
 
-void dlm_dir_clear(struct dlm_ls *ls)
-{
-	struct list_head *head;
-	struct dlm_direntry *de;
-	int i;
-
-	DLM_ASSERT(list_empty(&ls->ls_recover_list), );
-
-	for (i = 0; i < ls->ls_dirtbl_size; i++) {
-		spin_lock(&ls->ls_dirtbl[i].lock);
-		head = &ls->ls_dirtbl[i].list;
-		while (!list_empty(head)) {
-			de = list_entry(head->next, struct dlm_direntry, list);
-			list_del(&de->list);
-			put_free_de(ls, de);
-		}
-		spin_unlock(&ls->ls_dirtbl[i].lock);
+	down_read(&ls->ls_root_sem);
+	list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
+		r->res_dir_nodeid = dlm_hash2nodeid(ls, r->res_hash);
 	}
+	up_read(&ls->ls_root_sem);
 }
 
 int dlm_recover_directory(struct dlm_ls *ls)
 {
 	struct dlm_member *memb;
-	struct dlm_direntry *de;
 	char *b, *last_name = NULL;
-	int error = -ENOMEM, last_len, count = 0;
+	int error = -ENOMEM, last_len, nodeid, result;
 	uint16_t namelen;
+	unsigned int count = 0, count_match = 0, count_bad = 0, count_add = 0;
 
 	log_debug(ls, "dlm_recover_directory");
 
 	if (dlm_no_directory(ls))
 		goto out_status;
 
-	dlm_dir_clear(ls);
-
 	last_name = kmalloc(DLM_RESNAME_MAXLEN, GFP_NOFS);
 	if (!last_name)
 		goto out;
 
 	list_for_each_entry(memb, &ls->ls_nodes, list) {
+		if (memb->nodeid == dlm_our_nodeid())
+			continue;
+
 		memset(last_name, 0, DLM_RESNAME_MAXLEN);
 		last_len = 0;
 
@@ -230,7 +95,7 @@ int dlm_recover_directory(struct dlm_ls *ls)
 			if (error)
 				goto out_free;
 
-			schedule();
+			cond_resched();
 
 			/*
 			 * pick namelen/name pairs out of received buffer
@@ -267,87 +132,71 @@ int dlm_recover_directory(struct dlm_ls *ls)
 				if (namelen > DLM_RESNAME_MAXLEN)
 					goto out_free;
 
-				error = -ENOMEM;
-				de = get_free_de(ls, namelen);
-				if (!de)
+				error = dlm_master_lookup(ls, memb->nodeid,
+							  b, namelen,
+							  DLM_LU_RECOVER_DIR,
+							  &nodeid, &result);
+				if (error) {
+					log_error(ls, "recover_dir lookup %d",
+						  error);
 					goto out_free;
+				}
+
+				/* The name was found in rsbtbl, but the
+				 * master nodeid is different from
+				 * memb->nodeid which says it is the master.
+				 * This should not happen. */
+
+				if (result == DLM_LU_MATCH &&
+				    nodeid != memb->nodeid) {
+					count_bad++;
+					log_error(ls, "recover_dir lookup %d "
+						  "nodeid %d memb %d bad %u",
+						  result, nodeid, memb->nodeid,
+						  count_bad);
+					print_hex_dump_bytes("dlm_recover_dir ",
+							     DUMP_PREFIX_NONE,
+							     b, namelen);
+				}
+
+				/* The name was found in rsbtbl, and the
+				 * master nodeid matches memb->nodeid. */
+
+				if (result == DLM_LU_MATCH &&
+				    nodeid == memb->nodeid) {
+					count_match++;
+				}
+
+				/* The name was not found in rsbtbl and was
+				 * added with memb->nodeid as the master. */
+
+				if (result == DLM_LU_ADD) {
+					count_add++;
+				}
 
-				de->master_nodeid = memb->nodeid;
-				de->length = namelen;
 				last_len = namelen;
-				memcpy(de->name, b, namelen);
 				memcpy(last_name, b, namelen);
 				b += namelen;
 				left -= namelen;
-
-				add_entry_to_hash(ls, de);
 				count++;
 			}
 		}
-         done:
+	 done:
 		;
 	}
 
  out_status:
 	error = 0;
-	log_debug(ls, "dlm_recover_directory %d entries", count);
+	dlm_set_recover_status(ls, DLM_RS_DIR);
+
+	log_debug(ls, "dlm_recover_directory %u in %u new",
+		  count, count_add);
  out_free:
 	kfree(last_name);
  out:
-	dlm_clear_free_entries(ls);
 	return error;
 }
 
-static int get_entry(struct dlm_ls *ls, int nodeid, char *name,
-		     int namelen, int *r_nodeid)
-{
-	struct dlm_direntry *de, *tmp;
-	uint32_t bucket;
-
-	bucket = dir_hash(ls, name, namelen);
-
-	spin_lock(&ls->ls_dirtbl[bucket].lock);
-	de = search_bucket(ls, name, namelen, bucket);
-	if (de) {
-		*r_nodeid = de->master_nodeid;
-		spin_unlock(&ls->ls_dirtbl[bucket].lock);
-		if (*r_nodeid == nodeid)
-			return -EEXIST;
-		return 0;
-	}
-
-	spin_unlock(&ls->ls_dirtbl[bucket].lock);
-
-	if (namelen > DLM_RESNAME_MAXLEN)
-		return -EINVAL;
-
-	de = kzalloc(sizeof(struct dlm_direntry) + namelen, GFP_NOFS);
-	if (!de)
-		return -ENOMEM;
-
-	de->master_nodeid = nodeid;
-	de->length = namelen;
-	memcpy(de->name, name, namelen);
-
-	spin_lock(&ls->ls_dirtbl[bucket].lock);
-	tmp = search_bucket(ls, name, namelen, bucket);
-	if (tmp) {
-		kfree(de);
-		de = tmp;
-	} else {
-		list_add_tail(&de->list, &ls->ls_dirtbl[bucket].list);
-	}
-	*r_nodeid = de->master_nodeid;
-	spin_unlock(&ls->ls_dirtbl[bucket].lock);
-	return 0;
-}
-
-int dlm_dir_lookup(struct dlm_ls *ls, int nodeid, char *name, int namelen,
-		   int *r_nodeid)
-{
-	return get_entry(ls, nodeid, name, namelen, r_nodeid);
-}
-
 static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, char *name, int len)
 {
 	struct dlm_rsb *r;
@@ -358,10 +207,10 @@ static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, char *name, int len)
 	bucket = hash & (ls->ls_rsbtbl_size - 1);
 
 	spin_lock(&ls->ls_rsbtbl[bucket].lock);
-	rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].keep, name, len, 0, &r);
+	rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].keep, name, len, &r);
 	if (rv)
 		rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].toss,
-					 name, len, 0, &r);
+					 name, len, &r);
 	spin_unlock(&ls->ls_rsbtbl[bucket].lock);
 
 	if (!rv)
@@ -371,7 +220,7 @@ static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, char *name, int len)
 	list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
 		if (len == r->res_length && !memcmp(name, r->res_name, len)) {
 			up_read(&ls->ls_root_sem);
-			log_error(ls, "find_rsb_root revert to root_list %s",
+			log_debug(ls, "find_rsb_root revert to root_list %s",
 				  r->res_name);
 			return r;
 		}
@@ -429,6 +278,7 @@ void dlm_copy_master_names(struct dlm_ls *ls, char *inbuf, int inlen,
 			be_namelen = cpu_to_be16(0);
 			memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
 			offset += sizeof(__be16);
+			ls->ls_recover_dir_sent_msg++;
 			goto out;
 		}
 
@@ -437,6 +287,7 @@ void dlm_copy_master_names(struct dlm_ls *ls, char *inbuf, int inlen,
 		offset += sizeof(__be16);
 		memcpy(outbuf + offset, r->res_name, r->res_length);
 		offset += r->res_length;
+		ls->ls_recover_dir_sent_res++;
 	}
 
 	/*
@@ -449,8 +300,8 @@ void dlm_copy_master_names(struct dlm_ls *ls, char *inbuf, int inlen,
 		be_namelen = cpu_to_be16(0xFFFF);
 		memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
 		offset += sizeof(__be16);
+		ls->ls_recover_dir_sent_msg++;
 	}
-
  out:
 	up_read(&ls->ls_root_sem);
 }
diff --git a/fs/dlm/dir.h b/fs/dlm/dir.h
index 0b0eb1267b6eb..4175063444562 100644
--- a/fs/dlm/dir.h
+++ b/fs/dlm/dir.h
@@ -14,15 +14,10 @@
 #ifndef __DIR_DOT_H__
 #define __DIR_DOT_H__
 
-
 int dlm_dir_nodeid(struct dlm_rsb *rsb);
 int dlm_hash2nodeid(struct dlm_ls *ls, uint32_t hash);
-void dlm_dir_remove_entry(struct dlm_ls *ls, int nodeid, char *name, int len);
-void dlm_dir_clear(struct dlm_ls *ls);
-void dlm_clear_free_entries(struct dlm_ls *ls);
+void dlm_recover_dir_nodeid(struct dlm_ls *ls);
 int dlm_recover_directory(struct dlm_ls *ls);
-int dlm_dir_lookup(struct dlm_ls *ls, int nodeid, char *name, int namelen,
-	int *r_nodeid);
 void dlm_copy_master_names(struct dlm_ls *ls, char *inbuf, int inlen,
 	char *outbuf, int outlen, int nodeid);
 
diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h
index bc342f7ac3afe..3093207a76840 100644
--- a/fs/dlm/dlm_internal.h
+++ b/fs/dlm/dlm_internal.h
@@ -55,8 +55,6 @@ struct dlm_lkb;
 struct dlm_rsb;
 struct dlm_member;
 struct dlm_rsbtable;
-struct dlm_dirtable;
-struct dlm_direntry;
 struct dlm_recover;
 struct dlm_header;
 struct dlm_message;
@@ -98,18 +96,6 @@ do { \
 }
 
 
-struct dlm_direntry {
-	struct list_head	list;
-	uint32_t		master_nodeid;
-	uint16_t		length;
-	char			name[1];
-};
-
-struct dlm_dirtable {
-	struct list_head	list;
-	spinlock_t		lock;
-};
-
 struct dlm_rsbtable {
 	struct rb_root		keep;
 	struct rb_root		toss;
@@ -283,6 +269,15 @@ struct dlm_lkb {
 	};
 };
 
+/*
+ * res_master_nodeid is "normal": 0 is unset/invalid, non-zero is the real
+ * nodeid, even when nodeid is our_nodeid.
+ *
+ * res_nodeid is "odd": -1 is unset/invalid, zero means our_nodeid,
+ * greater than zero when another nodeid.
+ *
+ * (TODO: remove res_nodeid and only use res_master_nodeid)
+ */
 
 struct dlm_rsb {
 	struct dlm_ls		*res_ls;	/* the lockspace */
@@ -291,6 +286,8 @@ struct dlm_rsb {
 	unsigned long		res_flags;
 	int			res_length;	/* length of rsb name */
 	int			res_nodeid;
+	int			res_master_nodeid;
+	int			res_dir_nodeid;
 	uint32_t                res_lvbseq;
 	uint32_t		res_hash;
 	uint32_t		res_bucket;	/* rsbtbl */
@@ -313,10 +310,21 @@ struct dlm_rsb {
 	char			res_name[DLM_RESNAME_MAXLEN+1];
 };
 
+/* dlm_master_lookup() flags */
+
+#define DLM_LU_RECOVER_DIR	1
+#define DLM_LU_RECOVER_MASTER	2
+
+/* dlm_master_lookup() results */
+
+#define DLM_LU_MATCH		1
+#define DLM_LU_ADD		2
+
 /* find_rsb() flags */
 
-#define R_MASTER		1	/* only return rsb if it's a master */
-#define R_CREATE		2	/* create/add rsb if not found */
+#define R_REQUEST		0x00000001
+#define R_RECEIVE_REQUEST	0x00000002
+#define R_RECEIVE_RECOVER	0x00000004
 
 /* rsb_flags */
 
@@ -509,9 +517,6 @@ struct dlm_ls {
 	struct dlm_rsbtable	*ls_rsbtbl;
 	uint32_t		ls_rsbtbl_size;
 
-	struct dlm_dirtable	*ls_dirtbl;
-	uint32_t		ls_dirtbl_size;
-
 	struct mutex		ls_waiters_mutex;
 	struct list_head	ls_waiters;	/* lkbs needing a reply */
 
@@ -545,6 +550,7 @@ struct dlm_ls {
 	struct dentry		*ls_debug_waiters_dentry; /* debugfs */
 	struct dentry		*ls_debug_locks_dentry; /* debugfs */
 	struct dentry		*ls_debug_all_dentry; /* debugfs */
+	struct dentry		*ls_debug_toss_dentry; /* debugfs */
 
 	wait_queue_head_t	ls_uevent_wait;	/* user part of join/leave */
 	int			ls_uevent_result;
@@ -573,6 +579,8 @@ struct dlm_ls {
 	struct mutex		ls_requestqueue_mutex;
 	struct dlm_rcom		*ls_recover_buf;
 	int			ls_recover_nodeid; /* for debugging */
+	unsigned int		ls_recover_dir_sent_res; /* for log info */
+	unsigned int		ls_recover_dir_sent_msg; /* for log info */
 	unsigned int		ls_recover_locks_in; /* for log info */
 	uint64_t		ls_rcom_seq;
 	spinlock_t		ls_rcom_spin;
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index bdafb65a52345..d9ee1b96549ab 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -90,6 +90,7 @@ static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
 static int receive_extralen(struct dlm_message *ms);
 static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
 static void del_timeout(struct dlm_lkb *lkb);
+static void toss_rsb(struct kref *kref);
 
 /*
  * Lock compatibilty matrix - thanks Steve
@@ -170,9 +171,11 @@ void dlm_print_lkb(struct dlm_lkb *lkb)
 
 static void dlm_print_rsb(struct dlm_rsb *r)
 {
-	printk(KERN_ERR "rsb: nodeid %d flags %lx first %x rlc %d name %s\n",
-	       r->res_nodeid, r->res_flags, r->res_first_lkid,
-	       r->res_recover_locks_count, r->res_name);
+	printk(KERN_ERR "rsb: nodeid %d master %d dir %d flags %lx first %x "
+	       "rlc %d name %s\n",
+	       r->res_nodeid, r->res_master_nodeid, r->res_dir_nodeid,
+	       r->res_flags, r->res_first_lkid, r->res_recover_locks_count,
+	       r->res_name);
 }
 
 void dlm_dump_rsb(struct dlm_rsb *r)
@@ -327,6 +330,37 @@ static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
  * Basic operations on rsb's and lkb's
  */
 
+/* This is only called to add a reference when the code already holds
+   a valid reference to the rsb, so there's no need for locking. */
+
+static inline void hold_rsb(struct dlm_rsb *r)
+{
+	kref_get(&r->res_ref);
+}
+
+void dlm_hold_rsb(struct dlm_rsb *r)
+{
+	hold_rsb(r);
+}
+
+/* When all references to the rsb are gone it's transferred to
+   the tossed list for later disposal. */
+
+static void put_rsb(struct dlm_rsb *r)
+{
+	struct dlm_ls *ls = r->res_ls;
+	uint32_t bucket = r->res_bucket;
+
+	spin_lock(&ls->ls_rsbtbl[bucket].lock);
+	kref_put(&r->res_ref, toss_rsb);
+	spin_unlock(&ls->ls_rsbtbl[bucket].lock);
+}
+
+void dlm_put_rsb(struct dlm_rsb *r)
+{
+	put_rsb(r);
+}
+
 static int pre_rsb_struct(struct dlm_ls *ls)
 {
 	struct dlm_rsb *r1, *r2;
@@ -411,11 +445,10 @@ static int rsb_cmp(struct dlm_rsb *r, const char *name, int nlen)
 }
 
 int dlm_search_rsb_tree(struct rb_root *tree, char *name, int len,
-			unsigned int flags, struct dlm_rsb **r_ret)
+			struct dlm_rsb **r_ret)
 {
 	struct rb_node *node = tree->rb_node;
 	struct dlm_rsb *r;
-	int error = 0;
 	int rc;
 
 	while (node) {
@@ -432,10 +465,8 @@ int dlm_search_rsb_tree(struct rb_root *tree, char *name, int len,
 	return -EBADR;
 
  found:
-	if (r->res_nodeid && (flags & R_MASTER))
-		error = -ENOTBLK;
 	*r_ret = r;
-	return error;
+	return 0;
 }
 
 static int rsb_insert(struct dlm_rsb *rsb, struct rb_root *tree)
@@ -467,124 +498,587 @@ static int rsb_insert(struct dlm_rsb *rsb, struct rb_root *tree)
 	return 0;
 }
 
-static int _search_rsb(struct dlm_ls *ls, char *name, int len, int b,
-		       unsigned int flags, struct dlm_rsb **r_ret)
+/*
+ * Find rsb in rsbtbl and potentially create/add one
+ *
+ * Delaying the release of rsb's has a similar benefit to applications keeping
+ * NL locks on an rsb, but without the guarantee that the cached master value
+ * will still be valid when the rsb is reused.  Apps aren't always smart enough
+ * to keep NL locks on an rsb that they may lock again shortly; this can lead
+ * to excessive master lookups and removals if we don't delay the release.
+ *
+ * Searching for an rsb means looking through both the normal list and toss
+ * list.  When found on the toss list the rsb is moved to the normal list with
+ * ref count of 1; when found on normal list the ref count is incremented.
+ *
+ * rsb's on the keep list are being used locally and refcounted.
+ * rsb's on the toss list are not being used locally, and are not refcounted.
+ *
+ * The toss list rsb's were either
+ * - previously used locally but not any more (were on keep list, then
+ *   moved to toss list when last refcount dropped)
+ * - created and put on toss list as a directory record for a lookup
+ *   (we are the dir node for the res, but are not using the res right now,
+ *   but some other node is)
+ *
+ * The purpose of find_rsb() is to return a refcounted rsb for local use.
+ * So, if the given rsb is on the toss list, it is moved to the keep list
+ * before being returned.
+ *
+ * toss_rsb() happens when all local usage of the rsb is done, i.e. no
+ * more refcounts exist, so the rsb is moved from the keep list to the
+ * toss list.
+ *
+ * rsb's on both keep and toss lists are used for doing a name to master
+ * lookups.  rsb's that are in use locally (and being refcounted) are on
+ * the keep list, rsb's that are not in use locally (not refcounted) and
+ * only exist for name/master lookups are on the toss list.
+ *
+ * rsb's on the toss list who's dir_nodeid is not local can have stale
+ * name/master mappings.  So, remote requests on such rsb's can potentially
+ * return with an error, which means the mapping is stale and needs to
+ * be updated with a new lookup.  (The idea behind MASTER UNCERTAIN and
+ * first_lkid is to keep only a single outstanding request on an rsb
+ * while that rsb has a potentially stale master.)
+ */
+
+static int find_rsb_dir(struct dlm_ls *ls, char *name, int len,
+			uint32_t hash, uint32_t b,
+			int dir_nodeid, int from_nodeid,
+			unsigned int flags, struct dlm_rsb **r_ret)
 {
-	struct dlm_rsb *r;
+	struct dlm_rsb *r = NULL;
+	int our_nodeid = dlm_our_nodeid();
+	int from_local = 0;
+	int from_other = 0;
+	int from_dir = 0;
+	int create = 0;
 	int error;
 
-	error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, flags, &r);
-	if (!error) {
-		kref_get(&r->res_ref);
-		goto out;
+	if (flags & R_RECEIVE_REQUEST) {
+		if (from_nodeid == dir_nodeid)
+			from_dir = 1;
+		else
+			from_other = 1;
+	} else if (flags & R_REQUEST) {
+		from_local = 1;
+	}
+
+	/*
+	 * flags & R_RECEIVE_RECOVER is from dlm_recover_master_copy, so
+	 * from_nodeid has sent us a lock in dlm_recover_locks, believing
+	 * we're the new master.  Our local recovery may not have set
+	 * res_master_nodeid to our_nodeid yet, so allow either.  Don't
+	 * create the rsb; dlm_recover_process_copy() will handle EBADR
+	 * by resending.
+	 *
+	 * If someone sends us a request, we are the dir node, and we do
+	 * not find the rsb anywhere, then recreate it.  This happens if
+	 * someone sends us a request after we have removed/freed an rsb
+	 * from our toss list.  (They sent a request instead of lookup
+	 * because they are using an rsb from their toss list.)
+	 */
+
+	if (from_local || from_dir ||
+	    (from_other && (dir_nodeid == our_nodeid))) {
+		create = 1;
 	}
-	if (error == -ENOTBLK)
-		goto out;
 
-	error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, flags, &r);
+ retry:
+	if (create) {
+		error = pre_rsb_struct(ls);
+		if (error < 0)
+			goto out;
+	}
+
+	spin_lock(&ls->ls_rsbtbl[b].lock);
+
+	error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
 	if (error)
-		goto out;
+		goto do_toss;
+	
+	/*
+	 * rsb is active, so we can't check master_nodeid without lock_rsb.
+	 */
 
-	rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
-	error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
+	kref_get(&r->res_ref);
+	error = 0;
+	goto out_unlock;
+
+
+ do_toss:
+	error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
 	if (error)
-		return error;
+		goto do_new;
 
-	if (dlm_no_directory(ls))
-		goto out;
+	/*
+	 * rsb found inactive (master_nodeid may be out of date unless
+	 * we are the dir_nodeid or were the master)  No other thread
+	 * is using this rsb because it's on the toss list, so we can
+	 * look at or update res_master_nodeid without lock_rsb.
+	 */
 
-	if (r->res_nodeid == -1) {
+	if ((r->res_master_nodeid != our_nodeid) && from_other) {
+		/* our rsb was not master, and another node (not the dir node)
+		   has sent us a request */
+		log_debug(ls, "find_rsb toss from_other %d master %d dir %d %s",
+			  from_nodeid, r->res_master_nodeid, dir_nodeid,
+			  r->res_name);
+		error = -ENOTBLK;
+		goto out_unlock;
+	}
+
+	if ((r->res_master_nodeid != our_nodeid) && from_dir) {
+		/* don't think this should ever happen */
+		log_error(ls, "find_rsb toss from_dir %d master %d",
+			  from_nodeid, r->res_master_nodeid);
+		dlm_print_rsb(r);
+		/* fix it and go on */
+		r->res_master_nodeid = our_nodeid;
+		r->res_nodeid = 0;
 		rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
 		r->res_first_lkid = 0;
-	} else if (r->res_nodeid > 0) {
+	}
+
+	if (from_local && (r->res_master_nodeid != our_nodeid)) {
+		/* Because we have held no locks on this rsb,
+		   res_master_nodeid could have become stale. */
 		rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
 		r->res_first_lkid = 0;
+	}
+
+	rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
+	error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
+	goto out_unlock;
+
+
+ do_new:
+	/*
+	 * rsb not found
+	 */
+
+	if (error == -EBADR && !create)
+		goto out_unlock;
+
+	error = get_rsb_struct(ls, name, len, &r);
+	if (error == -EAGAIN) {
+		spin_unlock(&ls->ls_rsbtbl[b].lock);
+		goto retry;
+	}
+	if (error)
+		goto out_unlock;
+
+	r->res_hash = hash;
+	r->res_bucket = b;
+	r->res_dir_nodeid = dir_nodeid;
+	kref_init(&r->res_ref);
+
+	if (from_dir) {
+		/* want to see how often this happens */
+		log_debug(ls, "find_rsb new from_dir %d recreate %s",
+			  from_nodeid, r->res_name);
+		r->res_master_nodeid = our_nodeid;
+		r->res_nodeid = 0;
+		goto out_add;
+	}
+
+	if (from_other && (dir_nodeid != our_nodeid)) {
+		/* should never happen */
+		log_error(ls, "find_rsb new from_other %d dir %d our %d %s",
+			  from_nodeid, dir_nodeid, our_nodeid, r->res_name);
+		dlm_free_rsb(r);
+		error = -ENOTBLK;
+		goto out_unlock;
+	}
+
+	if (from_other) {
+		log_debug(ls, "find_rsb new from_other %d dir %d %s",
+			  from_nodeid, dir_nodeid, r->res_name);
+	}
+
+	if (dir_nodeid == our_nodeid) {
+		/* When we are the dir nodeid, we can set the master
+		   node immediately */
+		r->res_master_nodeid = our_nodeid;
+		r->res_nodeid = 0;
 	} else {
-		DLM_ASSERT(r->res_nodeid == 0, dlm_print_rsb(r););
-		DLM_ASSERT(!rsb_flag(r, RSB_MASTER_UNCERTAIN),);
+		/* set_master will send_lookup to dir_nodeid */
+		r->res_master_nodeid = 0;
+		r->res_nodeid = -1;
 	}
+
+ out_add:
+	error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
+ out_unlock:
+	spin_unlock(&ls->ls_rsbtbl[b].lock);
  out:
 	*r_ret = r;
 	return error;
 }
 
+/* During recovery, other nodes can send us new MSTCPY locks (from
+   dlm_recover_locks) before we've made ourself master (in
+   dlm_recover_masters). */
+
+static int find_rsb_nodir(struct dlm_ls *ls, char *name, int len,
+			  uint32_t hash, uint32_t b,
+			  int dir_nodeid, int from_nodeid,
+			  unsigned int flags, struct dlm_rsb **r_ret)
+{
+	struct dlm_rsb *r = NULL;
+	int our_nodeid = dlm_our_nodeid();
+	int recover = (flags & R_RECEIVE_RECOVER);
+	int error;
+
+ retry:
+	error = pre_rsb_struct(ls);
+	if (error < 0)
+		goto out;
+
+	spin_lock(&ls->ls_rsbtbl[b].lock);
+
+	error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
+	if (error)
+		goto do_toss;
+
+	/*
+	 * rsb is active, so we can't check master_nodeid without lock_rsb.
+	 */
+
+	kref_get(&r->res_ref);
+	goto out_unlock;
+
+
+ do_toss:
+	error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
+	if (error)
+		goto do_new;
+
+	/*
+	 * rsb found inactive. No other thread is using this rsb because
+	 * it's on the toss list, so we can look at or update
+	 * res_master_nodeid without lock_rsb.
+	 */
+
+	if (!recover && (r->res_master_nodeid != our_nodeid) && from_nodeid) {
+		/* our rsb is not master, and another node has sent us a
+		   request; this should never happen */
+		log_error(ls, "find_rsb toss from_nodeid %d master %d dir %d",
+			  from_nodeid, r->res_master_nodeid, dir_nodeid);
+		dlm_print_rsb(r);
+		error = -ENOTBLK;
+		goto out_unlock;
+	}
+
+	if (!recover && (r->res_master_nodeid != our_nodeid) &&
+	    (dir_nodeid == our_nodeid)) {
+		/* our rsb is not master, and we are dir; may as well fix it;
+		   this should never happen */
+		log_error(ls, "find_rsb toss our %d master %d dir %d",
+			  our_nodeid, r->res_master_nodeid, dir_nodeid);
+		dlm_print_rsb(r);
+		r->res_master_nodeid = our_nodeid;
+		r->res_nodeid = 0;
+	}
+
+	rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
+	error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
+	goto out_unlock;
+
+
+ do_new:
+	/*
+	 * rsb not found
+	 */
+
+	error = get_rsb_struct(ls, name, len, &r);
+	if (error == -EAGAIN) {
+		spin_unlock(&ls->ls_rsbtbl[b].lock);
+		goto retry;
+	}
+	if (error)
+		goto out_unlock;
+
+	r->res_hash = hash;
+	r->res_bucket = b;
+	r->res_dir_nodeid = dir_nodeid;
+	r->res_master_nodeid = dir_nodeid;
+	r->res_nodeid = (dir_nodeid == our_nodeid) ? 0 : dir_nodeid;
+	kref_init(&r->res_ref);
+
+	error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
+ out_unlock:
+	spin_unlock(&ls->ls_rsbtbl[b].lock);
+ out:
+	*r_ret = r;
+	return error;
+}
+
+static int find_rsb(struct dlm_ls *ls, char *name, int len, int from_nodeid,
+		    unsigned int flags, struct dlm_rsb **r_ret)
+{
+	uint32_t hash, b;
+	int dir_nodeid;
+
+	if (len > DLM_RESNAME_MAXLEN)
+		return -EINVAL;
+
+	hash = jhash(name, len, 0);
+	b = hash & (ls->ls_rsbtbl_size - 1);
+
+	dir_nodeid = dlm_hash2nodeid(ls, hash);
+
+	if (dlm_no_directory(ls))
+		return find_rsb_nodir(ls, name, len, hash, b, dir_nodeid,
+				      from_nodeid, flags, r_ret);
+	else
+		return find_rsb_dir(ls, name, len, hash, b, dir_nodeid,
+				      from_nodeid, flags, r_ret);
+}
+
+/* we have received a request and found that res_master_nodeid != our_nodeid,
+   so we need to return an error or make ourself the master */
+
+static int validate_master_nodeid(struct dlm_ls *ls, struct dlm_rsb *r,
+				  int from_nodeid)
+{
+	if (dlm_no_directory(ls)) {
+		log_error(ls, "find_rsb keep from_nodeid %d master %d dir %d",
+			  from_nodeid, r->res_master_nodeid,
+			  r->res_dir_nodeid);
+		dlm_print_rsb(r);
+		return -ENOTBLK;
+	}
+
+	if (from_nodeid != r->res_dir_nodeid) {
+		/* our rsb is not master, and another node (not the dir node)
+	   	   has sent us a request.  this is much more common when our
+	   	   master_nodeid is zero, so limit debug to non-zero.  */
+
+		if (r->res_master_nodeid) {
+			log_debug(ls, "validate master from_other %d master %d "
+				  "dir %d first %x %s", from_nodeid,
+				  r->res_master_nodeid, r->res_dir_nodeid,
+				  r->res_first_lkid, r->res_name);
+		}
+		return -ENOTBLK;
+	} else {
+		/* our rsb is not master, but the dir nodeid has sent us a
+	   	   request; this could happen with master 0 / res_nodeid -1 */
+
+		if (r->res_master_nodeid) {
+			log_error(ls, "validate master from_dir %d master %d "
+				  "first %x %s",
+				  from_nodeid, r->res_master_nodeid,
+				  r->res_first_lkid, r->res_name);
+		}
+
+		r->res_master_nodeid = dlm_our_nodeid();
+		r->res_nodeid = 0;
+		return 0;
+	}
+}
+
 /*
- * Find rsb in rsbtbl and potentially create/add one
+ * We're the dir node for this res and another node wants to know the
+ * master nodeid.  During normal operation (non recovery) this is only
+ * called from receive_lookup(); master lookups when the local node is
+ * the dir node are done by find_rsb().
  *
- * Delaying the release of rsb's has a similar benefit to applications keeping
- * NL locks on an rsb, but without the guarantee that the cached master value
- * will still be valid when the rsb is reused.  Apps aren't always smart enough
- * to keep NL locks on an rsb that they may lock again shortly; this can lead
- * to excessive master lookups and removals if we don't delay the release.
+ * normal operation, we are the dir node for a resource
+ * . _request_lock
+ * . set_master
+ * . send_lookup
+ * . receive_lookup
+ * . dlm_master_lookup flags 0
  *
- * Searching for an rsb means looking through both the normal list and toss
- * list.  When found on the toss list the rsb is moved to the normal list with
- * ref count of 1; when found on normal list the ref count is incremented.
+ * recover directory, we are rebuilding dir for all resources
+ * . dlm_recover_directory
+ * . dlm_rcom_names
+ *   remote node sends back the rsb names it is master of and we are dir of
+ * . dlm_master_lookup RECOVER_DIR (fix_master 0, from_master 1)
+ *   we either create new rsb setting remote node as master, or find existing
+ *   rsb and set master to be the remote node.
+ *
+ * recover masters, we are finding the new master for resources
+ * . dlm_recover_masters
+ * . recover_master
+ * . dlm_send_rcom_lookup
+ * . receive_rcom_lookup
+ * . dlm_master_lookup RECOVER_MASTER (fix_master 1, from_master 0)
  */
 
-static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
-		    unsigned int flags, struct dlm_rsb **r_ret)
+int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, char *name, int len,
+		      unsigned int flags, int *r_nodeid, int *result)
 {
 	struct dlm_rsb *r = NULL;
-	uint32_t hash, bucket;
-	int error;
+	uint32_t hash, b;
+	int from_master = (flags & DLM_LU_RECOVER_DIR);
+	int fix_master = (flags & DLM_LU_RECOVER_MASTER);
+	int our_nodeid = dlm_our_nodeid();
+	int dir_nodeid, error, toss_list = 0;
 
-	if (namelen > DLM_RESNAME_MAXLEN) {
-		error = -EINVAL;
-		goto out;
+	if (len > DLM_RESNAME_MAXLEN)
+		return -EINVAL;
+
+	if (from_nodeid == our_nodeid) {
+		log_error(ls, "dlm_master_lookup from our_nodeid %d flags %x",
+			  our_nodeid, flags);
+		return -EINVAL;
 	}
 
-	if (dlm_no_directory(ls))
-		flags |= R_CREATE;
+	hash = jhash(name, len, 0);
+	b = hash & (ls->ls_rsbtbl_size - 1);
 
-	hash = jhash(name, namelen, 0);
-	bucket = hash & (ls->ls_rsbtbl_size - 1);
+	dir_nodeid = dlm_hash2nodeid(ls, hash);
+	if (dir_nodeid != our_nodeid) {
+		log_error(ls, "dlm_master_lookup from %d dir %d our %d h %x %d",
+			  from_nodeid, dir_nodeid, our_nodeid, hash,
+			  ls->ls_num_nodes);
+		*r_nodeid = -1;
+		return -EINVAL;
+	}
 
  retry:
-	if (flags & R_CREATE) {
-		error = pre_rsb_struct(ls);
-		if (error < 0)
-			goto out;
+	error = pre_rsb_struct(ls);
+	if (error < 0)
+		return error;
+
+	spin_lock(&ls->ls_rsbtbl[b].lock);
+	error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
+	if (!error) {
+		/* because the rsb is active, we need to lock_rsb before
+		   checking/changing re_master_nodeid */
+
+		hold_rsb(r);
+		spin_unlock(&ls->ls_rsbtbl[b].lock);
+		lock_rsb(r);
+		goto found;
 	}
 
-	spin_lock(&ls->ls_rsbtbl[bucket].lock);
+	error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
+	if (error)
+		goto not_found;
 
-	error = _search_rsb(ls, name, namelen, bucket, flags, &r);
-	if (!error)
-		goto out_unlock;
+	/* because the rsb is inactive (on toss list), it's not refcounted
+	   and lock_rsb is not used, but is protected by the rsbtbl lock */
 
-	if (error == -EBADR && !(flags & R_CREATE))
-		goto out_unlock;
+	toss_list = 1;
+ found:
+	if (r->res_dir_nodeid != our_nodeid) {
+		/* should not happen, but may as well fix it and carry on */
+		log_error(ls, "dlm_master_lookup res_dir %d our %d %s",
+			  r->res_dir_nodeid, our_nodeid, r->res_name);
+		r->res_dir_nodeid = our_nodeid;
+	}
+
+	if (fix_master && dlm_is_removed(ls, r->res_master_nodeid)) {
+		/* Recovery uses this function to set a new master when
+		   the previous master failed.  Setting NEW_MASTER will
+		   force dlm_recover_masters to call recover_master on this
+		   rsb even though the res_nodeid is no longer removed. */
+
+		r->res_master_nodeid = from_nodeid;
+		r->res_nodeid = from_nodeid;
+		rsb_set_flag(r, RSB_NEW_MASTER);
+
+		if (toss_list) {
+			/* I don't think we should ever find it on toss list. */
+			log_error(ls, "dlm_master_lookup fix_master on toss");
+			dlm_dump_rsb(r);
+		}
+	}
 
-	/* the rsb was found but wasn't a master copy */
-	if (error == -ENOTBLK)
-		goto out_unlock;
+	if (from_master && (r->res_master_nodeid != from_nodeid)) {
+		/* this will happen if from_nodeid became master during
+		   a previous recovery cycle, and we aborted the previous
+		   cycle before recovering this master value */
 
-	error = get_rsb_struct(ls, name, namelen, &r);
+		log_limit(ls, "dlm_master_lookup from_master %d "
+			  "master_nodeid %d res_nodeid %d first %x %s",
+			  from_nodeid, r->res_master_nodeid, r->res_nodeid,
+			  r->res_first_lkid, r->res_name);
+
+		if (r->res_master_nodeid == our_nodeid) {
+			log_error(ls, "from_master %d our_master", from_nodeid);
+			dlm_dump_rsb(r);
+			dlm_send_rcom_lookup_dump(r, from_nodeid);
+			goto out_found;
+		}
+
+		r->res_master_nodeid = from_nodeid;
+		r->res_nodeid = from_nodeid;
+		rsb_set_flag(r, RSB_NEW_MASTER);
+	}
+
+	if (!r->res_master_nodeid) {
+		/* this will happen if recovery happens while we're looking
+		   up the master for this rsb */
+
+		log_debug(ls, "dlm_master_lookup master 0 to %d first %x %s",
+			  from_nodeid, r->res_first_lkid, r->res_name);
+		r->res_master_nodeid = from_nodeid;
+		r->res_nodeid = from_nodeid;
+	}
+
+	if (!from_master && !fix_master &&
+	    (r->res_master_nodeid == from_nodeid)) {
+		/* this can happen when the master sends remove, the dir node
+		   finds the rsb on the keep list and ignores the remove,
+		   and the former master sends a lookup */
+
+		log_limit(ls, "dlm_master_lookup from master %d flags %x "
+			  "first %x %s", from_nodeid, flags,
+			  r->res_first_lkid, r->res_name);
+	}
+
+ out_found:
+	*r_nodeid = r->res_master_nodeid;
+	if (result)
+		*result = DLM_LU_MATCH;
+
+	if (toss_list) {
+		r->res_toss_time = jiffies;
+		/* the rsb was inactive (on toss list) */
+		spin_unlock(&ls->ls_rsbtbl[b].lock);
+	} else {
+		/* the rsb was active */
+		unlock_rsb(r);
+		put_rsb(r);
+	}
+	return 0;
+
+ not_found:
+	error = get_rsb_struct(ls, name, len, &r);
 	if (error == -EAGAIN) {
-		spin_unlock(&ls->ls_rsbtbl[bucket].lock);
+		spin_unlock(&ls->ls_rsbtbl[b].lock);
 		goto retry;
 	}
 	if (error)
 		goto out_unlock;
 
 	r->res_hash = hash;
-	r->res_bucket = bucket;
-	r->res_nodeid = -1;
+	r->res_bucket = b;
+	r->res_dir_nodeid = our_nodeid;
+	r->res_master_nodeid = from_nodeid;
+	r->res_nodeid = from_nodeid;
 	kref_init(&r->res_ref);
+	r->res_toss_time = jiffies;
 
-	/* With no directory, the master can be set immediately */
-	if (dlm_no_directory(ls)) {
-		int nodeid = dlm_dir_nodeid(r);
-		if (nodeid == dlm_our_nodeid())
-			nodeid = 0;
-		r->res_nodeid = nodeid;
+	error = rsb_insert(r, &ls->ls_rsbtbl[b].toss);
+	if (error) {
+		/* should never happen */
+		dlm_free_rsb(r);
+		spin_unlock(&ls->ls_rsbtbl[b].lock);
+		goto retry;
 	}
-	error = rsb_insert(r, &ls->ls_rsbtbl[bucket].keep);
+
+	if (result)
+		*result = DLM_LU_ADD;
+	*r_nodeid = from_nodeid;
+	error = 0;
  out_unlock:
-	spin_unlock(&ls->ls_rsbtbl[bucket].lock);
- out:
-	*r_ret = r;
+	spin_unlock(&ls->ls_rsbtbl[b].lock);
 	return error;
 }
 
@@ -605,17 +1099,27 @@ static void dlm_dump_rsb_hash(struct dlm_ls *ls, uint32_t hash)
 	}
 }
 
-/* This is only called to add a reference when the code already holds
-   a valid reference to the rsb, so there's no need for locking. */
-
-static inline void hold_rsb(struct dlm_rsb *r)
+void dlm_dump_rsb_name(struct dlm_ls *ls, char *name, int len)
 {
-	kref_get(&r->res_ref);
-}
+	struct dlm_rsb *r = NULL;
+	uint32_t hash, b;
+	int error;
 
-void dlm_hold_rsb(struct dlm_rsb *r)
-{
-	hold_rsb(r);
+	hash = jhash(name, len, 0);
+	b = hash & (ls->ls_rsbtbl_size - 1);
+
+	spin_lock(&ls->ls_rsbtbl[b].lock);
+	error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
+	if (!error)
+		goto out_dump;
+
+	error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
+	if (error)
+		goto out;
+ out_dump:
+	dlm_dump_rsb(r);
+ out:
+	spin_unlock(&ls->ls_rsbtbl[b].lock);
 }
 
 static void toss_rsb(struct kref *kref)
@@ -634,24 +1138,6 @@ static void toss_rsb(struct kref *kref)
 	}
 }
 
-/* When all references to the rsb are gone it's transferred to
-   the tossed list for later disposal. */
-
-static void put_rsb(struct dlm_rsb *r)
-{
-	struct dlm_ls *ls = r->res_ls;
-	uint32_t bucket = r->res_bucket;
-
-	spin_lock(&ls->ls_rsbtbl[bucket].lock);
-	kref_put(&r->res_ref, toss_rsb);
-	spin_unlock(&ls->ls_rsbtbl[bucket].lock);
-}
-
-void dlm_put_rsb(struct dlm_rsb *r)
-{
-	put_rsb(r);
-}
-
 /* See comment for unhold_lkb */
 
 static void unhold_rsb(struct dlm_rsb *r)
@@ -1138,27 +1624,13 @@ static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms)
 	return error;
 }
 
-static void dir_remove(struct dlm_rsb *r)
-{
-	int to_nodeid;
-
-	if (dlm_no_directory(r->res_ls))
-		return;
-
-	to_nodeid = dlm_dir_nodeid(r);
-	if (to_nodeid != dlm_our_nodeid())
-		send_remove(r);
-	else
-		dlm_dir_remove_entry(r->res_ls, to_nodeid,
-				     r->res_name, r->res_length);
-}
-
 /* FIXME: make this more efficient */
 
 static int shrink_bucket(struct dlm_ls *ls, int b)
 {
 	struct rb_node *n;
 	struct dlm_rsb *r;
+	int our_nodeid = dlm_our_nodeid();
 	int count = 0, found;
 
 	for (;;) {
@@ -1166,6 +1638,17 @@ static int shrink_bucket(struct dlm_ls *ls, int b)
 		spin_lock(&ls->ls_rsbtbl[b].lock);
 		for (n = rb_first(&ls->ls_rsbtbl[b].toss); n; n = rb_next(n)) {
 			r = rb_entry(n, struct dlm_rsb, res_hashnode);
+
+			/* If we're the directory record for this rsb, and
+			   we're not the master of it, then we need to wait
+			   for the master node to send us a dir remove for
+			   before removing the dir record. */
+
+			if (!dlm_no_directory(ls) && !is_master(r) &&
+			    (dlm_dir_nodeid(r) == our_nodeid)) {
+				continue;
+			}
+
 			if (!time_after_eq(jiffies, r->res_toss_time +
 					   dlm_config.ci_toss_secs * HZ))
 				continue;
@@ -1182,8 +1665,15 @@ static int shrink_bucket(struct dlm_ls *ls, int b)
 			rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
 			spin_unlock(&ls->ls_rsbtbl[b].lock);
 
-			if (is_master(r))
-				dir_remove(r);
+			/* We're the master of this rsb but we're not
+			   the directory record, so we need to tell the
+			   dir node to remove the dir record. */
+
+			if (!dlm_no_directory(ls) && is_master(r) &&
+			    (dlm_dir_nodeid(r) != our_nodeid)) {
+				send_remove(r);
+			}
+
 			dlm_free_rsb(r);
 			count++;
 		} else {
@@ -2078,8 +2568,7 @@ static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
 
 static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
 {
-	struct dlm_ls *ls = r->res_ls;
-	int i, error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid();
+	int our_nodeid = dlm_our_nodeid();
 
 	if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
 		rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
@@ -2093,53 +2582,35 @@ static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
 		return 1;
 	}
 
-	if (r->res_nodeid == 0) {
+	if (r->res_master_nodeid == our_nodeid) {
 		lkb->lkb_nodeid = 0;
 		return 0;
 	}
 
-	if (r->res_nodeid > 0) {
-		lkb->lkb_nodeid = r->res_nodeid;
+	if (r->res_master_nodeid) {
+		lkb->lkb_nodeid = r->res_master_nodeid;
 		return 0;
 	}
 
-	DLM_ASSERT(r->res_nodeid == -1, dlm_dump_rsb(r););
-
-	dir_nodeid = dlm_dir_nodeid(r);
-
-	if (dir_nodeid != our_nodeid) {
-		r->res_first_lkid = lkb->lkb_id;
-		send_lookup(r, lkb);
-		return 1;
-	}
-
-	for (i = 0; i < 2; i++) {
-		/* It's possible for dlm_scand to remove an old rsb for
-		   this same resource from the toss list, us to create
-		   a new one, look up the master locally, and find it
-		   already exists just before dlm_scand does the
-		   dir_remove() on the previous rsb. */
-
-		error = dlm_dir_lookup(ls, our_nodeid, r->res_name,
-				       r->res_length, &ret_nodeid);
-		if (!error)
-			break;
-		log_debug(ls, "dir_lookup error %d %s", error, r->res_name);
-		schedule();
-	}
-	if (error && error != -EEXIST)
-		return error;
-
-	if (ret_nodeid == our_nodeid) {
-		r->res_first_lkid = 0;
+	if (dlm_dir_nodeid(r) == our_nodeid) {
+		/* This is a somewhat unusual case; find_rsb will usually
+		   have set res_master_nodeid when dir nodeid is local, but
+		   there are cases where we become the dir node after we've
+		   past find_rsb and go through _request_lock again.
+		   confirm_master() or process_lookup_list() needs to be
+		   called after this. */
+		log_debug(r->res_ls, "set_master %x self master %d dir %d %s",
+			  lkb->lkb_id, r->res_master_nodeid, r->res_dir_nodeid,
+			  r->res_name);
+		r->res_master_nodeid = our_nodeid;
 		r->res_nodeid = 0;
 		lkb->lkb_nodeid = 0;
-	} else {
-		r->res_first_lkid = lkb->lkb_id;
-		r->res_nodeid = ret_nodeid;
-		lkb->lkb_nodeid = ret_nodeid;
+		return 0;
 	}
-	return 0;
+
+	r->res_first_lkid = lkb->lkb_id;
+	send_lookup(r, lkb);
+	return 1;
 }
 
 static void process_lookup_list(struct dlm_rsb *r)
@@ -2584,7 +3055,7 @@ static void do_unlock_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
 }
 
 /* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
- 
+
 static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
 {
 	int error;
@@ -2708,11 +3179,11 @@ static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
 
 	error = validate_lock_args(ls, lkb, args);
 	if (error)
-		goto out;
+		return error;
 
-	error = find_rsb(ls, name, len, R_CREATE, &r);
+	error = find_rsb(ls, name, len, 0, R_REQUEST, &r);
 	if (error)
-		goto out;
+		return error;
 
 	lock_rsb(r);
 
@@ -2723,8 +3194,6 @@ static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
 
 	unlock_rsb(r);
 	put_rsb(r);
-
- out:
 	return error;
 }
 
@@ -3406,8 +3875,11 @@ static int receive_request(struct dlm_ls *ls, struct dlm_message *ms)
 {
 	struct dlm_lkb *lkb;
 	struct dlm_rsb *r;
+	int from_nodeid;
 	int error, namelen;
 
+	from_nodeid = ms->m_header.h_nodeid;
+
 	error = create_lkb(ls, &lkb);
 	if (error)
 		goto fail;
@@ -3420,9 +3892,16 @@ static int receive_request(struct dlm_ls *ls, struct dlm_message *ms)
 		goto fail;
 	}
 
+	/* The dir node is the authority on whether we are the master
+	   for this rsb or not, so if the master sends us a request, we should
+	   recreate the rsb if we've destroyed it.   This race happens when we
+	   send a remove message to the dir node at the same time that the dir
+	   node sends us a request for the rsb. */
+
 	namelen = receive_extralen(ms);
 
-	error = find_rsb(ls, ms->m_extra, namelen, R_MASTER, &r);
+	error = find_rsb(ls, ms->m_extra, namelen, from_nodeid,
+			 R_RECEIVE_REQUEST, &r);
 	if (error) {
 		__put_lkb(ls, lkb);
 		goto fail;
@@ -3430,6 +3909,16 @@ static int receive_request(struct dlm_ls *ls, struct dlm_message *ms)
 
 	lock_rsb(r);
 
+	if (r->res_master_nodeid != dlm_our_nodeid()) {
+		error = validate_master_nodeid(ls, r, from_nodeid);
+		if (error) {
+			unlock_rsb(r);
+			put_rsb(r);
+			__put_lkb(ls, lkb);
+			goto fail;
+		}
+	}
+
 	attach_lkb(r, lkb);
 	error = do_request(r, lkb);
 	send_request_reply(r, lkb, error);
@@ -3445,6 +3934,23 @@ static int receive_request(struct dlm_ls *ls, struct dlm_message *ms)
 	return 0;
 
  fail:
+	/* TODO: instead of returning ENOTBLK, add the lkb to res_lookup
+	   and do this receive_request again from process_lookup_list once
+	   we get the lookup reply.  This would avoid a many repeated
+	   ENOTBLK request failures when the lookup reply designating us
+	   as master is delayed. */
+
+	/* We could repeatedly return -EBADR here if our send_remove() is
+	   delayed in being sent/arriving/being processed on the dir node.
+	   Another node would repeatedly lookup up the master, and the dir
+	   node would continue returning our nodeid until our send_remove
+	   took effect. */
+
+	if (error != -ENOTBLK) {
+		log_limit(ls, "receive_request %x from %d %d",
+			  ms->m_lkid, from_nodeid, error);
+	}
+
 	setup_stub_lkb(ls, ms);
 	send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
 	return error;
@@ -3651,49 +4157,110 @@ static int receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
 
 static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
 {
-	int len, error, ret_nodeid, dir_nodeid, from_nodeid, our_nodeid;
+	int len, error, ret_nodeid, from_nodeid, our_nodeid;
 
 	from_nodeid = ms->m_header.h_nodeid;
 	our_nodeid = dlm_our_nodeid();
 
 	len = receive_extralen(ms);
 
-	dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
-	if (dir_nodeid != our_nodeid) {
-		log_error(ls, "lookup dir_nodeid %d from %d",
-			  dir_nodeid, from_nodeid);
-		error = -EINVAL;
-		ret_nodeid = -1;
-		goto out;
-	}
-
-	error = dlm_dir_lookup(ls, from_nodeid, ms->m_extra, len, &ret_nodeid);
+	error = dlm_master_lookup(ls, from_nodeid, ms->m_extra, len, 0,
+				  &ret_nodeid, NULL);
 
 	/* Optimization: we're master so treat lookup as a request */
 	if (!error && ret_nodeid == our_nodeid) {
 		receive_request(ls, ms);
 		return;
 	}
- out:
 	send_lookup_reply(ls, ms, ret_nodeid, error);
 }
 
 static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms)
 {
-	int len, dir_nodeid, from_nodeid;
+	char name[DLM_RESNAME_MAXLEN+1];
+	struct dlm_rsb *r;
+	uint32_t hash, b;
+	int rv, len, dir_nodeid, from_nodeid;
 
 	from_nodeid = ms->m_header.h_nodeid;
 
 	len = receive_extralen(ms);
 
+	if (len > DLM_RESNAME_MAXLEN) {
+		log_error(ls, "receive_remove from %d bad len %d",
+			  from_nodeid, len);
+		return;
+	}
+
 	dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
 	if (dir_nodeid != dlm_our_nodeid()) {
-		log_error(ls, "remove dir entry dir_nodeid %d from %d",
-			  dir_nodeid, from_nodeid);
+		log_error(ls, "receive_remove from %d bad nodeid %d",
+			  from_nodeid, dir_nodeid);
 		return;
 	}
 
-	dlm_dir_remove_entry(ls, from_nodeid, ms->m_extra, len);
+	/* Look for name on rsbtbl.toss, if it's there, kill it.
+	   If it's on rsbtbl.keep, it's being used, and we should ignore this
+	   message.  This is an expected race between the dir node sending a
+	   request to the master node at the same time as the master node sends
+	   a remove to the dir node.  The resolution to that race is for the
+	   dir node to ignore the remove message, and the master node to
+	   recreate the master rsb when it gets a request from the dir node for
+	   an rsb it doesn't have. */
+
+	memset(name, 0, sizeof(name));
+	memcpy(name, ms->m_extra, len);
+
+	hash = jhash(name, len, 0);
+	b = hash & (ls->ls_rsbtbl_size - 1);
+
+	spin_lock(&ls->ls_rsbtbl[b].lock);
+
+	rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
+	if (rv) {
+		/* verify the rsb is on keep list per comment above */
+		rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
+		if (rv) {
+			/* should not happen */
+			log_error(ls, "receive_remove from %d not found %s",
+				  from_nodeid, name);
+			spin_unlock(&ls->ls_rsbtbl[b].lock);
+			return;
+		}
+		if (r->res_master_nodeid != from_nodeid) {
+			/* should not happen */
+			log_error(ls, "receive_remove keep from %d master %d",
+				  from_nodeid, r->res_master_nodeid);
+			dlm_print_rsb(r);
+			spin_unlock(&ls->ls_rsbtbl[b].lock);
+			return;
+		}
+
+		log_debug(ls, "receive_remove from %d master %d first %x %s",
+			  from_nodeid, r->res_master_nodeid, r->res_first_lkid,
+			  name);
+		spin_unlock(&ls->ls_rsbtbl[b].lock);
+		return;
+	}
+
+	if (r->res_master_nodeid != from_nodeid) {
+		log_error(ls, "receive_remove toss from %d master %d",
+			  from_nodeid, r->res_master_nodeid);
+		dlm_print_rsb(r);
+		spin_unlock(&ls->ls_rsbtbl[b].lock);
+		return;
+	}
+
+	if (kref_put(&r->res_ref, kill_rsb)) {
+		rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
+		spin_unlock(&ls->ls_rsbtbl[b].lock);
+		dlm_free_rsb(r);
+	} else {
+		log_error(ls, "receive_remove from %d rsb ref error",
+			  from_nodeid);
+		dlm_print_rsb(r);
+		spin_unlock(&ls->ls_rsbtbl[b].lock);
+	}
 }
 
 static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms)
@@ -3706,6 +4273,7 @@ static int receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
 	struct dlm_lkb *lkb;
 	struct dlm_rsb *r;
 	int error, mstype, result;
+	int from_nodeid = ms->m_header.h_nodeid;
 
 	error = find_lkb(ls, ms->m_remid, &lkb);
 	if (error)
@@ -3723,8 +4291,7 @@ static int receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
 	error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
 	if (error) {
 		log_error(ls, "receive_request_reply %x remote %d %x result %d",
-			  lkb->lkb_id, ms->m_header.h_nodeid, ms->m_lkid,
-			  ms->m_result);
+			  lkb->lkb_id, from_nodeid, ms->m_lkid, ms->m_result);
 		dlm_dump_rsb(r);
 		goto out;
 	}
@@ -3732,8 +4299,9 @@ static int receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
 	/* Optimization: the dir node was also the master, so it took our
 	   lookup as a request and sent request reply instead of lookup reply */
 	if (mstype == DLM_MSG_LOOKUP) {
-		r->res_nodeid = ms->m_header.h_nodeid;
-		lkb->lkb_nodeid = r->res_nodeid;
+		r->res_master_nodeid = from_nodeid;
+		r->res_nodeid = from_nodeid;
+		lkb->lkb_nodeid = from_nodeid;
 	}
 
 	/* this is the value returned from do_request() on the master */
@@ -3767,18 +4335,30 @@ static int receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
 	case -EBADR:
 	case -ENOTBLK:
 		/* find_rsb failed to find rsb or rsb wasn't master */
-		log_debug(ls, "receive_request_reply %x %x master diff %d %d",
-			  lkb->lkb_id, lkb->lkb_flags, r->res_nodeid, result);
-		r->res_nodeid = -1;
-		lkb->lkb_nodeid = -1;
+		log_limit(ls, "receive_request_reply %x from %d %d "
+			  "master %d dir %d first %x %s", lkb->lkb_id,
+			  from_nodeid, result, r->res_master_nodeid,
+			  r->res_dir_nodeid, r->res_first_lkid, r->res_name);
+
+		if (r->res_dir_nodeid != dlm_our_nodeid() &&
+		    r->res_master_nodeid != dlm_our_nodeid()) {
+			/* cause _request_lock->set_master->send_lookup */
+			r->res_master_nodeid = 0;
+			r->res_nodeid = -1;
+			lkb->lkb_nodeid = -1;
+		}
 
 		if (is_overlap(lkb)) {
 			/* we'll ignore error in cancel/unlock reply */
 			queue_cast_overlap(r, lkb);
 			confirm_master(r, result);
 			unhold_lkb(lkb); /* undoes create_lkb() */
-		} else
+		} else {
 			_request_lock(r, lkb);
+
+			if (r->res_master_nodeid == dlm_our_nodeid())
+				confirm_master(r, 0);
+		}
 		break;
 
 	default:
@@ -3994,6 +4574,7 @@ static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
 	struct dlm_lkb *lkb;
 	struct dlm_rsb *r;
 	int error, ret_nodeid;
+	int do_lookup_list = 0;
 
 	error = find_lkb(ls, ms->m_lkid, &lkb);
 	if (error) {
@@ -4001,7 +4582,7 @@ static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
 		return;
 	}
 
-	/* ms->m_result is the value returned by dlm_dir_lookup on dir node
+	/* ms->m_result is the value returned by dlm_master_lookup on dir node
 	   FIXME: will a non-zero error ever be returned? */
 
 	r = lkb->lkb_resource;
@@ -4013,12 +4594,37 @@ static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
 		goto out;
 
 	ret_nodeid = ms->m_nodeid;
+
+	/* We sometimes receive a request from the dir node for this
+	   rsb before we've received the dir node's loookup_reply for it.
+	   The request from the dir node implies we're the master, so we set
+	   ourself as master in receive_request_reply, and verify here that
+	   we are indeed the master. */
+
+	if (r->res_master_nodeid && (r->res_master_nodeid != ret_nodeid)) {
+		/* This should never happen */
+		log_error(ls, "receive_lookup_reply %x from %d ret %d "
+			  "master %d dir %d our %d first %x %s",
+			  lkb->lkb_id, ms->m_header.h_nodeid, ret_nodeid,
+			  r->res_master_nodeid, r->res_dir_nodeid,
+			  dlm_our_nodeid(), r->res_first_lkid, r->res_name);
+	}
+
 	if (ret_nodeid == dlm_our_nodeid()) {
+		r->res_master_nodeid = ret_nodeid;
 		r->res_nodeid = 0;
-		ret_nodeid = 0;
+		do_lookup_list = 1;
 		r->res_first_lkid = 0;
+	} else if (ret_nodeid == -1) {
+		/* the remote node doesn't believe it's the dir node */
+		log_error(ls, "receive_lookup_reply %x from %d bad ret_nodeid",
+			  lkb->lkb_id, ms->m_header.h_nodeid);
+		r->res_master_nodeid = 0;
+		r->res_nodeid = -1;
+		lkb->lkb_nodeid = -1;
 	} else {
-		/* set_master() will copy res_nodeid to lkb_nodeid */
+		/* set_master() will set lkb_nodeid from r */
+		r->res_master_nodeid = ret_nodeid;
 		r->res_nodeid = ret_nodeid;
 	}
 
@@ -4033,7 +4639,7 @@ static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
 	_request_lock(r, lkb);
 
  out_list:
-	if (!ret_nodeid)
+	if (do_lookup_list)
 		process_lookup_list(r);
  out:
 	unlock_rsb(r);
@@ -4047,7 +4653,7 @@ static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms,
 	int error = 0, noent = 0;
 
 	if (!dlm_is_member(ls, ms->m_header.h_nodeid)) {
-		log_debug(ls, "ignore non-member message %d from %d %x %x %d",
+		log_limit(ls, "receive %d from non-member %d %x %x %d",
 			  ms->m_type, ms->m_header.h_nodeid, ms->m_lkid,
 			  ms->m_remid, ms->m_result);
 		return;
@@ -4174,6 +4780,15 @@ static void dlm_receive_message(struct dlm_ls *ls, struct dlm_message *ms,
 				int nodeid)
 {
 	if (dlm_locking_stopped(ls)) {
+		/* If we were a member of this lockspace, left, and rejoined,
+		   other nodes may still be sending us messages from the
+		   lockspace generation before we left. */
+		if (!ls->ls_generation) {
+			log_limit(ls, "receive %d from %d ignore old gen",
+				  ms->m_type, nodeid);
+			return;
+		}
+
 		dlm_add_requestqueue(ls, nodeid, ms);
 	} else {
 		dlm_wait_requestqueue(ls);
@@ -4798,6 +5413,7 @@ int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
 	struct dlm_rsb *r;
 	struct dlm_lkb *lkb;
 	uint32_t remid = 0;
+	int from_nodeid = rc->rc_header.h_nodeid;
 	int error;
 
 	if (rl->rl_parent_lkid) {
@@ -4815,21 +5431,21 @@ int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
 	   we make ourselves master, dlm_recover_masters() won't touch the
 	   MSTCPY locks we've received early. */
 
-	error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen), 0, &r);
+	error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen),
+			 from_nodeid, R_RECEIVE_RECOVER, &r);
 	if (error)
 		goto out;
 
+	lock_rsb(r);
+
 	if (dlm_no_directory(ls) && (dlm_dir_nodeid(r) != dlm_our_nodeid())) {
 		log_error(ls, "dlm_recover_master_copy remote %d %x not dir",
-			  rc->rc_header.h_nodeid, remid);
+			  from_nodeid, remid);
 		error = -EBADR;
-		put_rsb(r);
-		goto out;
+		goto out_unlock;
 	}
 
-	lock_rsb(r);
-
-	lkb = search_remid(r, rc->rc_header.h_nodeid, remid);
+	lkb = search_remid(r, from_nodeid, remid);
 	if (lkb) {
 		error = -EEXIST;
 		goto out_remid;
@@ -4866,7 +5482,7 @@ int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
  out:
 	if (error && error != -EEXIST)
 		log_debug(ls, "dlm_recover_master_copy remote %d %x error %d",
-			  rc->rc_header.h_nodeid, remid, error);
+			  from_nodeid, remid, error);
 	rl->rl_result = cpu_to_le32(error);
 	return error;
 }
diff --git a/fs/dlm/lock.h b/fs/dlm/lock.h
index c8b226c62807a..5e0c72e36a9b3 100644
--- a/fs/dlm/lock.h
+++ b/fs/dlm/lock.h
@@ -14,6 +14,7 @@
 #define __LOCK_DOT_H__
 
 void dlm_dump_rsb(struct dlm_rsb *r);
+void dlm_dump_rsb_name(struct dlm_ls *ls, char *name, int len);
 void dlm_print_lkb(struct dlm_lkb *lkb);
 void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms,
 			       uint32_t saved_seq);
@@ -28,9 +29,11 @@ void dlm_unlock_recovery(struct dlm_ls *ls);
 void dlm_scan_waiters(struct dlm_ls *ls);
 void dlm_scan_timeout(struct dlm_ls *ls);
 void dlm_adjust_timeouts(struct dlm_ls *ls);
+int dlm_master_lookup(struct dlm_ls *ls, int nodeid, char *name, int len,
+		      unsigned int flags, int *r_nodeid, int *result);
 
 int dlm_search_rsb_tree(struct rb_root *tree, char *name, int len,
-			unsigned int flags, struct dlm_rsb **r_ret);
+			struct dlm_rsb **r_ret);
 
 void dlm_recover_purge(struct dlm_ls *ls);
 void dlm_purge_mstcpy_locks(struct dlm_rsb *r);
diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c
index ca506abbdd3b8..065bb75ed6095 100644
--- a/fs/dlm/lockspace.c
+++ b/fs/dlm/lockspace.c
@@ -509,17 +509,6 @@ static int new_lockspace(const char *name, const char *cluster,
 	idr_init(&ls->ls_lkbidr);
 	spin_lock_init(&ls->ls_lkbidr_spin);
 
-	size = dlm_config.ci_dirtbl_size;
-	ls->ls_dirtbl_size = size;
-
-	ls->ls_dirtbl = vmalloc(sizeof(struct dlm_dirtable) * size);
-	if (!ls->ls_dirtbl)
-		goto out_lkbfree;
-	for (i = 0; i < size; i++) {
-		INIT_LIST_HEAD(&ls->ls_dirtbl[i].list);
-		spin_lock_init(&ls->ls_dirtbl[i].lock);
-	}
-
 	INIT_LIST_HEAD(&ls->ls_waiters);
 	mutex_init(&ls->ls_waiters_mutex);
 	INIT_LIST_HEAD(&ls->ls_orphans);
@@ -567,7 +556,7 @@ static int new_lockspace(const char *name, const char *cluster,
 
 	ls->ls_recover_buf = kmalloc(dlm_config.ci_buffer_size, GFP_NOFS);
 	if (!ls->ls_recover_buf)
-		goto out_dirfree;
+		goto out_lkbfree;
 
 	ls->ls_slot = 0;
 	ls->ls_num_slots = 0;
@@ -648,8 +637,6 @@ static int new_lockspace(const char *name, const char *cluster,
 	list_del(&ls->ls_list);
 	spin_unlock(&lslist_lock);
 	kfree(ls->ls_recover_buf);
- out_dirfree:
-	vfree(ls->ls_dirtbl);
  out_lkbfree:
 	idr_destroy(&ls->ls_lkbidr);
 	vfree(ls->ls_rsbtbl);
@@ -778,13 +765,6 @@ static int release_lockspace(struct dlm_ls *ls, int force)
 
 	kfree(ls->ls_recover_buf);
 
-	/*
-	 * Free direntry structs.
-	 */
-
-	dlm_dir_clear(ls);
-	vfree(ls->ls_dirtbl);
-
 	/*
 	 * Free all lkb's in idr
 	 */
@@ -826,7 +806,6 @@ static int release_lockspace(struct dlm_ls *ls, int force)
 
 	dlm_purge_requestqueue(ls);
 	kfree(ls->ls_recover_args);
-	dlm_clear_free_entries(ls);
 	dlm_clear_members(ls);
 	dlm_clear_members_gone(ls);
 	kfree(ls->ls_node_array);
diff --git a/fs/dlm/rcom.c b/fs/dlm/rcom.c
index 64d3e2b958c78..c8c298d814639 100644
--- a/fs/dlm/rcom.c
+++ b/fs/dlm/rcom.c
@@ -23,8 +23,6 @@
 #include "memory.h"
 #include "lock.h"
 #include "util.h"
-#include "member.h"
-
 
 static int rcom_response(struct dlm_ls *ls)
 {
@@ -275,19 +273,9 @@ int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name, int last_len)
 	struct dlm_rcom *rc;
 	struct dlm_mhandle *mh;
 	int error = 0;
-	int max_size = dlm_config.ci_buffer_size - sizeof(struct dlm_rcom);
 
 	ls->ls_recover_nodeid = nodeid;
 
-	if (nodeid == dlm_our_nodeid()) {
-		ls->ls_recover_buf->rc_header.h_length =
-			dlm_config.ci_buffer_size;
-		dlm_copy_master_names(ls, last_name, last_len,
-		                      ls->ls_recover_buf->rc_buf,
-		                      max_size, nodeid);
-		goto out;
-	}
-
 	error = create_rcom(ls, nodeid, DLM_RCOM_NAMES, last_len, &rc, &mh);
 	if (error)
 		goto out;
@@ -344,6 +332,25 @@ int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid)
 	return error;
 }
 
+int dlm_send_rcom_lookup_dump(struct dlm_rsb *r, int to_nodeid)
+{
+	struct dlm_rcom *rc;
+	struct dlm_mhandle *mh;
+	struct dlm_ls *ls = r->res_ls;
+	int error;
+
+	error = create_rcom(ls, to_nodeid, DLM_RCOM_LOOKUP, r->res_length,
+			    &rc, &mh);
+	if (error)
+		goto out;
+	memcpy(rc->rc_buf, r->res_name, r->res_length);
+	rc->rc_id = 0xFFFFFFFF;
+
+	send_rcom(ls, mh, rc);
+ out:
+	return error;
+}
+
 static void receive_rcom_lookup(struct dlm_ls *ls, struct dlm_rcom *rc_in)
 {
 	struct dlm_rcom *rc;
@@ -355,7 +362,14 @@ static void receive_rcom_lookup(struct dlm_ls *ls, struct dlm_rcom *rc_in)
 	if (error)
 		return;
 
-	error = dlm_dir_lookup(ls, nodeid, rc_in->rc_buf, len, &ret_nodeid);
+	if (rc_in->rc_id == 0xFFFFFFFF) {
+		log_error(ls, "receive_rcom_lookup dump from %d", nodeid);
+		dlm_dump_rsb_name(ls, rc_in->rc_buf, len);
+		return;
+	}
+
+	error = dlm_master_lookup(ls, nodeid, rc_in->rc_buf, len,
+				  DLM_LU_RECOVER_MASTER, &ret_nodeid, NULL);
 	if (error)
 		ret_nodeid = error;
 	rc->rc_result = ret_nodeid;
@@ -486,17 +500,76 @@ int dlm_send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in)
 	return 0;
 }
 
+/*
+ * Ignore messages for stage Y before we set
+ * recover_status bit for stage X:
+ *
+ * recover_status = 0
+ *
+ * dlm_recover_members()
+ * - send nothing
+ * - recv nothing
+ * - ignore NAMES, NAMES_REPLY
+ * - ignore LOOKUP, LOOKUP_REPLY
+ * - ignore LOCK, LOCK_REPLY
+ *
+ * recover_status |= NODES
+ *
+ * dlm_recover_members_wait()
+ *
+ * dlm_recover_directory()
+ * - send NAMES
+ * - recv NAMES_REPLY
+ * - ignore LOOKUP, LOOKUP_REPLY
+ * - ignore LOCK, LOCK_REPLY
+ *
+ * recover_status |= DIR
+ *
+ * dlm_recover_directory_wait()
+ *
+ * dlm_recover_masters()
+ * - send LOOKUP
+ * - recv LOOKUP_REPLY
+ *
+ * dlm_recover_locks()
+ * - send LOCKS
+ * - recv LOCKS_REPLY
+ *
+ * recover_status |= LOCKS
+ *
+ * dlm_recover_locks_wait()
+ *
+ * recover_status |= DONE
+ */
+
 /* Called by dlm_recv; corresponds to dlm_receive_message() but special
    recovery-only comms are sent through here. */
 
 void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
 {
 	int lock_size = sizeof(struct dlm_rcom) + sizeof(struct rcom_lock);
-	int stop, reply = 0, lock = 0;
+	int stop, reply = 0, names = 0, lookup = 0, lock = 0;
 	uint32_t status;
 	uint64_t seq;
 
 	switch (rc->rc_type) {
+	case DLM_RCOM_STATUS_REPLY:
+		reply = 1;
+		break;
+	case DLM_RCOM_NAMES:
+		names = 1;
+		break;
+	case DLM_RCOM_NAMES_REPLY:
+		names = 1;
+		reply = 1;
+		break;
+	case DLM_RCOM_LOOKUP:
+		lookup = 1;
+		break;
+	case DLM_RCOM_LOOKUP_REPLY:
+		lookup = 1;
+		reply = 1;
+		break;
 	case DLM_RCOM_LOCK:
 		lock = 1;
 		break;
@@ -504,10 +577,6 @@ void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
 		lock = 1;
 		reply = 1;
 		break;
-	case DLM_RCOM_STATUS_REPLY:
-	case DLM_RCOM_NAMES_REPLY:
-	case DLM_RCOM_LOOKUP_REPLY:
-		reply = 1;
 	};
 
 	spin_lock(&ls->ls_recover_lock);
@@ -516,19 +585,17 @@ void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
 	seq = ls->ls_recover_seq;
 	spin_unlock(&ls->ls_recover_lock);
 
-	if ((stop && (rc->rc_type != DLM_RCOM_STATUS)) ||
-	    (reply && (rc->rc_seq_reply != seq)) ||
-	    (lock && !(status & DLM_RS_DIR))) {
-		log_limit(ls, "dlm_receive_rcom ignore msg %d "
-			  "from %d %llu %llu recover seq %llu sts %x gen %u",
-			   rc->rc_type,
-			   nodeid,
-			   (unsigned long long)rc->rc_seq,
-			   (unsigned long long)rc->rc_seq_reply,
-			   (unsigned long long)seq,
-			   status, ls->ls_generation);
-		goto out;
-	}
+	if (stop && (rc->rc_type != DLM_RCOM_STATUS))
+		goto ignore;
+
+	if (reply && (rc->rc_seq_reply != seq))
+		goto ignore;
+
+	if (!(status & DLM_RS_NODES) && (names || lookup || lock))
+		goto ignore;
+
+	if (!(status & DLM_RS_DIR) && (lookup || lock))
+		goto ignore;
 
 	switch (rc->rc_type) {
 	case DLM_RCOM_STATUS:
@@ -570,10 +637,20 @@ void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
 	default:
 		log_error(ls, "receive_rcom bad type %d", rc->rc_type);
 	}
-out:
+	return;
+
+ignore:
+	log_limit(ls, "dlm_receive_rcom ignore msg %d "
+		  "from %d %llu %llu recover seq %llu sts %x gen %u",
+		   rc->rc_type,
+		   nodeid,
+		   (unsigned long long)rc->rc_seq,
+		   (unsigned long long)rc->rc_seq_reply,
+		   (unsigned long long)seq,
+		   status, ls->ls_generation);
 	return;
 Eshort:
-	log_error(ls, "recovery message %x from %d is too short",
-			  rc->rc_type, nodeid);
+	log_error(ls, "recovery message %d from %d is too short",
+		  rc->rc_type, nodeid);
 }
 
diff --git a/fs/dlm/rcom.h b/fs/dlm/rcom.h
index 206723ab744dd..f8e243463c151 100644
--- a/fs/dlm/rcom.h
+++ b/fs/dlm/rcom.h
@@ -17,6 +17,7 @@
 int dlm_rcom_status(struct dlm_ls *ls, int nodeid, uint32_t status_flags);
 int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name,int last_len);
 int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid);
+int dlm_send_rcom_lookup_dump(struct dlm_rsb *r, int to_nodeid);
 int dlm_send_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
 void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid);
 int dlm_send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in);
diff --git a/fs/dlm/recover.c b/fs/dlm/recover.c
index 7554e4dac6bbb..3c025fe49ad3c 100644
--- a/fs/dlm/recover.c
+++ b/fs/dlm/recover.c
@@ -361,9 +361,8 @@ static void set_master_lkbs(struct dlm_rsb *r)
  * rsb's to consider.
  */
 
-static void set_new_master(struct dlm_rsb *r, int nodeid)
+static void set_new_master(struct dlm_rsb *r)
 {
-	r->res_nodeid = nodeid;
 	set_master_lkbs(r);
 	rsb_set_flag(r, RSB_NEW_MASTER);
 	rsb_set_flag(r, RSB_NEW_MASTER2);
@@ -372,31 +371,48 @@ static void set_new_master(struct dlm_rsb *r, int nodeid)
 /*
  * We do async lookups on rsb's that need new masters.  The rsb's
  * waiting for a lookup reply are kept on the recover_list.
+ *
+ * Another node recovering the master may have sent us a rcom lookup,
+ * and our dlm_master_lookup() set it as the new master, along with
+ * NEW_MASTER so that we'll recover it here (this implies dir_nodeid
+ * equals our_nodeid below).
  */
 
-static int recover_master(struct dlm_rsb *r)
+static int recover_master(struct dlm_rsb *r, unsigned int *count)
 {
 	struct dlm_ls *ls = r->res_ls;
-	int error, ret_nodeid;
-	int our_nodeid = dlm_our_nodeid();
-	int dir_nodeid = dlm_dir_nodeid(r);
+	int our_nodeid, dir_nodeid;
+	int is_removed = 0;
+	int error;
+
+	if (is_master(r))
+		return 0;
+
+	is_removed = dlm_is_removed(ls, r->res_nodeid);
+
+	if (!is_removed && !rsb_flag(r, RSB_NEW_MASTER))
+		return 0;
+
+	our_nodeid = dlm_our_nodeid();
+	dir_nodeid = dlm_dir_nodeid(r);
 
 	if (dir_nodeid == our_nodeid) {
-		error = dlm_dir_lookup(ls, our_nodeid, r->res_name,
-				       r->res_length, &ret_nodeid);
-		if (error)
-			log_error(ls, "recover dir lookup error %d", error);
+		if (is_removed) {
+			r->res_master_nodeid = our_nodeid;
+			r->res_nodeid = 0;
+		}
 
-		if (ret_nodeid == our_nodeid)
-			ret_nodeid = 0;
-		lock_rsb(r);
-		set_new_master(r, ret_nodeid);
-		unlock_rsb(r);
+		/* set master of lkbs to ourself when is_removed, or to
+		   another new master which we set along with NEW_MASTER
+		   in dlm_master_lookup */
+		set_new_master(r);
+		error = 0;
 	} else {
 		recover_list_add(r);
 		error = dlm_send_rcom_lookup(r, dir_nodeid);
 	}
 
+	(*count)++;
 	return error;
 }
 
@@ -415,7 +431,7 @@ static int recover_master(struct dlm_rsb *r)
  * resent.
  */
 
-static int recover_master_static(struct dlm_rsb *r)
+static int recover_master_static(struct dlm_rsb *r, unsigned int *count)
 {
 	int dir_nodeid = dlm_dir_nodeid(r);
 	int new_master = dir_nodeid;
@@ -423,11 +439,12 @@ static int recover_master_static(struct dlm_rsb *r)
 	if (dir_nodeid == dlm_our_nodeid())
 		new_master = 0;
 
-	lock_rsb(r);
 	dlm_purge_mstcpy_locks(r);
-	set_new_master(r, new_master);
-	unlock_rsb(r);
-	return 1;
+	r->res_master_nodeid = dir_nodeid;
+	r->res_nodeid = new_master;
+	set_new_master(r);
+	(*count)++;
+	return 0;
 }
 
 /*
@@ -443,7 +460,10 @@ static int recover_master_static(struct dlm_rsb *r)
 int dlm_recover_masters(struct dlm_ls *ls)
 {
 	struct dlm_rsb *r;
-	int error = 0, count = 0;
+	unsigned int total = 0;
+	unsigned int count = 0;
+	int nodir = dlm_no_directory(ls);
+	int error;
 
 	log_debug(ls, "dlm_recover_masters");
 
@@ -455,20 +475,23 @@ int dlm_recover_masters(struct dlm_ls *ls)
 			goto out;
 		}
 
-		if (dlm_no_directory(ls))
-			count += recover_master_static(r);
-		else if (!is_master(r) &&
-			 (dlm_is_removed(ls, r->res_nodeid) ||
-			  rsb_flag(r, RSB_NEW_MASTER))) {
-			recover_master(r);
-			count++;
-		}
+		lock_rsb(r);
+		if (nodir)
+			error = recover_master_static(r, &count);
+		else
+			error = recover_master(r, &count);
+		unlock_rsb(r);
+		cond_resched();
+		total++;
 
-		schedule();
+		if (error) {
+			up_read(&ls->ls_root_sem);
+			goto out;
+		}
 	}
 	up_read(&ls->ls_root_sem);
 
-	log_debug(ls, "dlm_recover_masters %d resources", count);
+	log_debug(ls, "dlm_recover_masters %u of %u", count, total);
 
 	error = dlm_wait_function(ls, &recover_list_empty);
  out:
@@ -480,7 +503,7 @@ int dlm_recover_masters(struct dlm_ls *ls)
 int dlm_recover_master_reply(struct dlm_ls *ls, struct dlm_rcom *rc)
 {
 	struct dlm_rsb *r;
-	int nodeid;
+	int ret_nodeid, new_master;
 
 	r = recover_list_find(ls, rc->rc_id);
 	if (!r) {
@@ -489,12 +512,17 @@ int dlm_recover_master_reply(struct dlm_ls *ls, struct dlm_rcom *rc)
 		goto out;
 	}
 
-	nodeid = rc->rc_result;
-	if (nodeid == dlm_our_nodeid())
-		nodeid = 0;
+	ret_nodeid = rc->rc_result;
+
+	if (ret_nodeid == dlm_our_nodeid())
+		new_master = 0;
+	else
+		new_master = ret_nodeid;
 
 	lock_rsb(r);
-	set_new_master(r, nodeid);
+	r->res_master_nodeid = ret_nodeid;
+	r->res_nodeid = new_master;
+	set_new_master(r);
 	unlock_rsb(r);
 	recover_list_del(r);
 
@@ -791,20 +819,8 @@ int dlm_create_root_list(struct dlm_ls *ls)
 			dlm_hold_rsb(r);
 		}
 
-		/* If we're using a directory, add tossed rsbs to the root
-		   list; they'll have entries created in the new directory,
-		   but no other recovery steps should do anything with them. */
-
-		if (dlm_no_directory(ls)) {
-			spin_unlock(&ls->ls_rsbtbl[i].lock);
-			continue;
-		}
-
-		for (n = rb_first(&ls->ls_rsbtbl[i].toss); n; n = rb_next(n)) {
-			r = rb_entry(n, struct dlm_rsb, res_hashnode);
-			list_add(&r->res_root_list, &ls->ls_root_list);
-			dlm_hold_rsb(r);
-		}
+		if (!RB_EMPTY_ROOT(&ls->ls_rsbtbl[i].toss))
+			log_error(ls, "dlm_create_root_list toss not empty");
 		spin_unlock(&ls->ls_rsbtbl[i].lock);
 	}
  out:
@@ -824,28 +840,26 @@ void dlm_release_root_list(struct dlm_ls *ls)
 	up_write(&ls->ls_root_sem);
 }
 
-/* If not using a directory, clear the entire toss list, there's no benefit to
-   caching the master value since it's fixed.  If we are using a dir, keep the
-   rsb's we're the master of.  Recovery will add them to the root list and from
-   there they'll be entered in the rebuilt directory. */
-
-void dlm_clear_toss_list(struct dlm_ls *ls)
+void dlm_clear_toss(struct dlm_ls *ls)
 {
 	struct rb_node *n, *next;
-	struct dlm_rsb *rsb;
+	struct dlm_rsb *r;
+	unsigned int count = 0;
 	int i;
 
 	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
 		spin_lock(&ls->ls_rsbtbl[i].lock);
 		for (n = rb_first(&ls->ls_rsbtbl[i].toss); n; n = next) {
-			next = rb_next(n);;
-			rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
-			if (dlm_no_directory(ls) || !is_master(rsb)) {
-				rb_erase(n, &ls->ls_rsbtbl[i].toss);
-				dlm_free_rsb(rsb);
-			}
+			next = rb_next(n);
+			r = rb_entry(n, struct dlm_rsb, res_hashnode);
+			rb_erase(n, &ls->ls_rsbtbl[i].toss);
+			dlm_free_rsb(r);
+			count++;
 		}
 		spin_unlock(&ls->ls_rsbtbl[i].lock);
 	}
+
+	if (count)
+		log_debug(ls, "dlm_clear_toss %u done", count);
 }
 
diff --git a/fs/dlm/recover.h b/fs/dlm/recover.h
index ebd0363f1e08f..d8c8738c70eb7 100644
--- a/fs/dlm/recover.h
+++ b/fs/dlm/recover.h
@@ -27,7 +27,7 @@ int dlm_recover_locks(struct dlm_ls *ls);
 void dlm_recovered_lock(struct dlm_rsb *r);
 int dlm_create_root_list(struct dlm_ls *ls);
 void dlm_release_root_list(struct dlm_ls *ls);
-void dlm_clear_toss_list(struct dlm_ls *ls);
+void dlm_clear_toss(struct dlm_ls *ls);
 void dlm_recover_rsbs(struct dlm_ls *ls);
 
 #endif				/* __RECOVER_DOT_H__ */
diff --git a/fs/dlm/recoverd.c b/fs/dlm/recoverd.c
index f1a9073c0835e..88ce65ff021ed 100644
--- a/fs/dlm/recoverd.c
+++ b/fs/dlm/recoverd.c
@@ -60,12 +60,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
 
 	dlm_callback_suspend(ls);
 
-	/*
-	 * Free non-master tossed rsb's.  Master rsb's are kept on toss
-	 * list and put on root list to be included in resdir recovery.
-	 */
-
-	dlm_clear_toss_list(ls);
+	dlm_clear_toss(ls);
 
 	/*
 	 * This list of root rsb's will be the basis of most of the recovery
@@ -84,6 +79,10 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
 		goto fail;
 	}
 
+	dlm_recover_dir_nodeid(ls);
+
+	ls->ls_recover_dir_sent_res = 0;
+	ls->ls_recover_dir_sent_msg = 0;
 	ls->ls_recover_locks_in = 0;
 
 	dlm_set_recover_status(ls, DLM_RS_NODES);
@@ -115,6 +114,9 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
 		goto fail;
 	}
 
+	log_debug(ls, "dlm_recover_directory %u out %u messages",
+		  ls->ls_recover_dir_sent_res, ls->ls_recover_dir_sent_msg);
+
 	/*
 	 * We may have outstanding operations that are waiting for a reply from
 	 * a failed node.  Mark these to be resent after recovery.  Unlock and