Skip to content

Commit

Permalink
dlm: keep lkbs in idr
Browse files Browse the repository at this point in the history
This is simpler and quicker than the hash table, and
avoids needing to search the hash list for every new
lkid to check if it's used.

Signed-off-by: David Teigland <teigland@redhat.com>
  • Loading branch information
David Teigland committed Jul 11, 2011
1 parent a22ca48 commit 3d6aa67
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 125 deletions.
7 changes: 0 additions & 7 deletions fs/dlm/config.c
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@ struct dlm_cluster {
unsigned int cl_tcp_port;
unsigned int cl_buffer_size;
unsigned int cl_rsbtbl_size;
unsigned int cl_lkbtbl_size;
unsigned int cl_dirtbl_size;
unsigned int cl_recover_timer;
unsigned int cl_toss_secs;
Expand All @@ -109,7 +108,6 @@ enum {
CLUSTER_ATTR_TCP_PORT = 0,
CLUSTER_ATTR_BUFFER_SIZE,
CLUSTER_ATTR_RSBTBL_SIZE,
CLUSTER_ATTR_LKBTBL_SIZE,
CLUSTER_ATTR_DIRTBL_SIZE,
CLUSTER_ATTR_RECOVER_TIMER,
CLUSTER_ATTR_TOSS_SECS,
Expand Down Expand Up @@ -162,7 +160,6 @@ __CONFIGFS_ATTR(name, 0644, name##_read, name##_write)
CLUSTER_ATTR(tcp_port, 1);
CLUSTER_ATTR(buffer_size, 1);
CLUSTER_ATTR(rsbtbl_size, 1);
CLUSTER_ATTR(lkbtbl_size, 1);
CLUSTER_ATTR(dirtbl_size, 1);
CLUSTER_ATTR(recover_timer, 1);
CLUSTER_ATTR(toss_secs, 1);
Expand All @@ -176,7 +173,6 @@ static struct configfs_attribute *cluster_attrs[] = {
[CLUSTER_ATTR_TCP_PORT] = &cluster_attr_tcp_port.attr,
[CLUSTER_ATTR_BUFFER_SIZE] = &cluster_attr_buffer_size.attr,
[CLUSTER_ATTR_RSBTBL_SIZE] = &cluster_attr_rsbtbl_size.attr,
[CLUSTER_ATTR_LKBTBL_SIZE] = &cluster_attr_lkbtbl_size.attr,
[CLUSTER_ATTR_DIRTBL_SIZE] = &cluster_attr_dirtbl_size.attr,
[CLUSTER_ATTR_RECOVER_TIMER] = &cluster_attr_recover_timer.attr,
[CLUSTER_ATTR_TOSS_SECS] = &cluster_attr_toss_secs.attr,
Expand Down Expand Up @@ -446,7 +442,6 @@ static struct config_group *make_cluster(struct config_group *g,
cl->cl_tcp_port = dlm_config.ci_tcp_port;
cl->cl_buffer_size = dlm_config.ci_buffer_size;
cl->cl_rsbtbl_size = dlm_config.ci_rsbtbl_size;
cl->cl_lkbtbl_size = dlm_config.ci_lkbtbl_size;
cl->cl_dirtbl_size = dlm_config.ci_dirtbl_size;
cl->cl_recover_timer = dlm_config.ci_recover_timer;
cl->cl_toss_secs = dlm_config.ci_toss_secs;
Expand Down Expand Up @@ -1038,7 +1033,6 @@ int dlm_our_addr(struct sockaddr_storage *addr, int num)
#define DEFAULT_TCP_PORT 21064
#define DEFAULT_BUFFER_SIZE 4096
#define DEFAULT_RSBTBL_SIZE 1024
#define DEFAULT_LKBTBL_SIZE 1024
#define DEFAULT_DIRTBL_SIZE 1024
#define DEFAULT_RECOVER_TIMER 5
#define DEFAULT_TOSS_SECS 10
Expand All @@ -1052,7 +1046,6 @@ struct dlm_config_info dlm_config = {
.ci_tcp_port = DEFAULT_TCP_PORT,
.ci_buffer_size = DEFAULT_BUFFER_SIZE,
.ci_rsbtbl_size = DEFAULT_RSBTBL_SIZE,
.ci_lkbtbl_size = DEFAULT_LKBTBL_SIZE,
.ci_dirtbl_size = DEFAULT_DIRTBL_SIZE,
.ci_recover_timer = DEFAULT_RECOVER_TIMER,
.ci_toss_secs = DEFAULT_TOSS_SECS,
Expand Down
1 change: 0 additions & 1 deletion fs/dlm/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ struct dlm_config_info {
int ci_tcp_port;
int ci_buffer_size;
int ci_rsbtbl_size;
int ci_lkbtbl_size;
int ci_dirtbl_size;
int ci_recover_timer;
int ci_toss_secs;
Expand Down
14 changes: 4 additions & 10 deletions fs/dlm/dlm_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include <linux/jhash.h>
#include <linux/miscdevice.h>
#include <linux/mutex.h>
#include <linux/idr.h>
#include <asm/uaccess.h>

#include <linux/dlm.h>
Expand All @@ -52,7 +53,6 @@ struct dlm_ls;
struct dlm_lkb;
struct dlm_rsb;
struct dlm_member;
struct dlm_lkbtable;
struct dlm_rsbtable;
struct dlm_dirtable;
struct dlm_direntry;
Expand Down Expand Up @@ -108,11 +108,6 @@ struct dlm_rsbtable {
spinlock_t lock;
};

struct dlm_lkbtable {
struct list_head list;
rwlock_t lock;
uint16_t counter;
};

/*
* Lockspace member (per node in a ls)
Expand Down Expand Up @@ -248,7 +243,6 @@ struct dlm_lkb {
int8_t lkb_wait_count;
int lkb_wait_nodeid; /* for debugging */

struct list_head lkb_idtbl_list; /* lockspace lkbtbl */
struct list_head lkb_statequeue; /* rsb g/c/w list */
struct list_head lkb_rsb_lookup; /* waiting for rsb lookup */
struct list_head lkb_wait_reply; /* waiting for remote reply */
Expand Down Expand Up @@ -465,12 +459,12 @@ struct dlm_ls {
unsigned long ls_scan_time;
struct kobject ls_kobj;

struct idr ls_lkbidr;
spinlock_t ls_lkbidr_spin;

struct dlm_rsbtable *ls_rsbtbl;
uint32_t ls_rsbtbl_size;

struct dlm_lkbtable *ls_lkbtbl;
uint32_t ls_lkbtbl_size;

struct dlm_dirtable *ls_dirtbl;
uint32_t ls_dirtbl_size;

Expand Down
69 changes: 24 additions & 45 deletions fs/dlm/lock.c
Original file line number Diff line number Diff line change
Expand Up @@ -580,9 +580,8 @@ static void detach_lkb(struct dlm_lkb *lkb)

static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
{
struct dlm_lkb *lkb, *tmp;
uint32_t lkid = 0;
uint16_t bucket;
struct dlm_lkb *lkb;
int rv, id;

lkb = dlm_allocate_lkb(ls);
if (!lkb)
Expand All @@ -596,58 +595,38 @@ static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
INIT_LIST_HEAD(&lkb->lkb_time_list);
INIT_LIST_HEAD(&lkb->lkb_astqueue);

get_random_bytes(&bucket, sizeof(bucket));
bucket &= (ls->ls_lkbtbl_size - 1);

write_lock(&ls->ls_lkbtbl[bucket].lock);
retry:
rv = idr_pre_get(&ls->ls_lkbidr, GFP_NOFS);
if (!rv)
return -ENOMEM;

/* counter can roll over so we must verify lkid is not in use */
spin_lock(&ls->ls_lkbidr_spin);
rv = idr_get_new_above(&ls->ls_lkbidr, lkb, 1, &id);
if (!rv)
lkb->lkb_id = id;
spin_unlock(&ls->ls_lkbidr_spin);

while (lkid == 0) {
lkid = (bucket << 16) | ls->ls_lkbtbl[bucket].counter++;
if (rv == -EAGAIN)
goto retry;

list_for_each_entry(tmp, &ls->ls_lkbtbl[bucket].list,
lkb_idtbl_list) {
if (tmp->lkb_id != lkid)
continue;
lkid = 0;
break;
}
if (rv < 0) {
log_error(ls, "create_lkb idr error %d", rv);
return rv;
}

lkb->lkb_id = lkid;
list_add(&lkb->lkb_idtbl_list, &ls->ls_lkbtbl[bucket].list);
write_unlock(&ls->ls_lkbtbl[bucket].lock);

*lkb_ret = lkb;
return 0;
}

static struct dlm_lkb *__find_lkb(struct dlm_ls *ls, uint32_t lkid)
{
struct dlm_lkb *lkb;
uint16_t bucket = (lkid >> 16);

list_for_each_entry(lkb, &ls->ls_lkbtbl[bucket].list, lkb_idtbl_list) {
if (lkb->lkb_id == lkid)
return lkb;
}
return NULL;
}

static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
{
struct dlm_lkb *lkb;
uint16_t bucket = (lkid >> 16);

if (bucket >= ls->ls_lkbtbl_size)
return -EBADSLT;

read_lock(&ls->ls_lkbtbl[bucket].lock);
lkb = __find_lkb(ls, lkid);
spin_lock(&ls->ls_lkbidr_spin);
lkb = idr_find(&ls->ls_lkbidr, lkid);
if (lkb)
kref_get(&lkb->lkb_ref);
read_unlock(&ls->ls_lkbtbl[bucket].lock);
spin_unlock(&ls->ls_lkbidr_spin);

*lkb_ret = lkb;
return lkb ? 0 : -ENOENT;
Expand All @@ -668,12 +647,12 @@ static void kill_lkb(struct kref *kref)

static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
{
uint16_t bucket = (lkb->lkb_id >> 16);
uint32_t lkid = lkb->lkb_id;

write_lock(&ls->ls_lkbtbl[bucket].lock);
spin_lock(&ls->ls_lkbidr_spin);
if (kref_put(&lkb->lkb_ref, kill_lkb)) {
list_del(&lkb->lkb_idtbl_list);
write_unlock(&ls->ls_lkbtbl[bucket].lock);
idr_remove(&ls->ls_lkbidr, lkid);
spin_unlock(&ls->ls_lkbidr_spin);

detach_lkb(lkb);

Expand All @@ -683,7 +662,7 @@ static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
dlm_free_lkb(lkb);
return 1;
} else {
write_unlock(&ls->ls_lkbtbl[bucket].lock);
spin_unlock(&ls->ls_lkbidr_spin);
return 0;
}
}
Expand Down
116 changes: 54 additions & 62 deletions fs/dlm/lockspace.c
Original file line number Diff line number Diff line change
Expand Up @@ -472,17 +472,8 @@ static int new_lockspace(const char *name, int namelen, void **lockspace,
spin_lock_init(&ls->ls_rsbtbl[i].lock);
}

size = dlm_config.ci_lkbtbl_size;
ls->ls_lkbtbl_size = size;

ls->ls_lkbtbl = vmalloc(sizeof(struct dlm_lkbtable) * size);
if (!ls->ls_lkbtbl)
goto out_rsbfree;
for (i = 0; i < size; i++) {
INIT_LIST_HEAD(&ls->ls_lkbtbl[i].list);
rwlock_init(&ls->ls_lkbtbl[i].lock);
ls->ls_lkbtbl[i].counter = 1;
}
idr_init(&ls->ls_lkbidr);
spin_lock_init(&ls->ls_lkbidr_spin);

size = dlm_config.ci_dirtbl_size;
ls->ls_dirtbl_size = size;
Expand Down Expand Up @@ -605,8 +596,7 @@ static int new_lockspace(const char *name, int namelen, void **lockspace,
out_dirfree:
vfree(ls->ls_dirtbl);
out_lkbfree:
vfree(ls->ls_lkbtbl);
out_rsbfree:
idr_destroy(&ls->ls_lkbidr);
vfree(ls->ls_rsbtbl);
out_lsfree:
if (do_unreg)
Expand Down Expand Up @@ -641,50 +631,66 @@ int dlm_new_lockspace(const char *name, int namelen, void **lockspace,
return error;
}

/* Return 1 if the lockspace still has active remote locks,
* 2 if the lockspace still has active local locks.
*/
static int lockspace_busy(struct dlm_ls *ls)
{
int i, lkb_found = 0;
struct dlm_lkb *lkb;

/* NOTE: We check the lockidtbl here rather than the resource table.
This is because there may be LKBs queued as ASTs that have been
unlinked from their RSBs and are pending deletion once the AST has
been delivered */

for (i = 0; i < ls->ls_lkbtbl_size; i++) {
read_lock(&ls->ls_lkbtbl[i].lock);
if (!list_empty(&ls->ls_lkbtbl[i].list)) {
lkb_found = 1;
list_for_each_entry(lkb, &ls->ls_lkbtbl[i].list,
lkb_idtbl_list) {
if (!lkb->lkb_nodeid) {
read_unlock(&ls->ls_lkbtbl[i].lock);
return 2;
}
}
}
read_unlock(&ls->ls_lkbtbl[i].lock);
static int lkb_idr_is_local(int id, void *p, void *data)
{
struct dlm_lkb *lkb = p;

if (!lkb->lkb_nodeid)
return 1;
return 0;
}

static int lkb_idr_is_any(int id, void *p, void *data)
{
return 1;
}

static int lkb_idr_free(int id, void *p, void *data)
{
struct dlm_lkb *lkb = p;

dlm_del_ast(lkb);

if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
dlm_free_lvb(lkb->lkb_lvbptr);

dlm_free_lkb(lkb);
return 0;
}

/* NOTE: We check the lkbidr here rather than the resource table.
This is because there may be LKBs queued as ASTs that have been unlinked
from their RSBs and are pending deletion once the AST has been delivered */

static int lockspace_busy(struct dlm_ls *ls, int force)
{
int rv;

spin_lock(&ls->ls_lkbidr_spin);
if (force == 0) {
rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_any, ls);
} else if (force == 1) {
rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_local, ls);
} else {
rv = 0;
}
return lkb_found;
spin_unlock(&ls->ls_lkbidr_spin);
return rv;
}

static int release_lockspace(struct dlm_ls *ls, int force)
{
struct dlm_lkb *lkb;
struct dlm_rsb *rsb;
struct list_head *head;
int i, busy, rv;

busy = lockspace_busy(ls);
busy = lockspace_busy(ls, force);

spin_lock(&lslist_lock);
if (ls->ls_create_count == 1) {
if (busy > force)
if (busy) {
rv = -EBUSY;
else {
} else {
/* remove_lockspace takes ls off lslist */
ls->ls_create_count = 0;
rv = 0;
Expand Down Expand Up @@ -724,29 +730,15 @@ static int release_lockspace(struct dlm_ls *ls, int force)
vfree(ls->ls_dirtbl);

/*
* Free all lkb's on lkbtbl[] lists.
* Free all lkb's in idr
*/

for (i = 0; i < ls->ls_lkbtbl_size; i++) {
head = &ls->ls_lkbtbl[i].list;
while (!list_empty(head)) {
lkb = list_entry(head->next, struct dlm_lkb,
lkb_idtbl_list);

list_del(&lkb->lkb_idtbl_list);

dlm_del_ast(lkb);
idr_for_each(&ls->ls_lkbidr, lkb_idr_free, ls);
idr_remove_all(&ls->ls_lkbidr);
idr_destroy(&ls->ls_lkbidr);

if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
dlm_free_lvb(lkb->lkb_lvbptr);

dlm_free_lkb(lkb);
}
}
dlm_astd_resume();

vfree(ls->ls_lkbtbl);

/*
* Free all rsb's on rsbtbl[] lists
*/
Expand Down

0 comments on commit 3d6aa67

Please sign in to comment.