Skip to content

Commit

Permalink
drbd: fix potential distributed deadlock
Browse files Browse the repository at this point in the history
We limit ourselves to a configurable maximum number of pages used as
temporary bio pages.

If the configured "max_buffers" is not big enough to match the bandwidth
of the respective deployment, a distributed deadlock could be triggered
by e.g. fast online verify and heavy application IO.

TCP connections would block on congestion, because both receivers
would wait on pages to become available.

Fortunately the respective senders in this case would be able to give
back some pages already. So do that.

Signed-off-by: Philipp Reisner <philipp.reisner@linbit.com>
Signed-off-by: Lars Ellenberg <lars.ellenberg@linbit.com>
  • Loading branch information
Lars Ellenberg authored and Philipp Reisner committed May 24, 2011
1 parent 600942e commit 53ea433
Showing 1 changed file with 59 additions and 35 deletions.
94 changes: 59 additions & 35 deletions drivers/block/drbd/drbd_worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -297,42 +297,48 @@ void drbd_csum_bio(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *
crypto_hash_final(&desc, digest);
}

static int w_e_send_csum(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
/* TODO merge common code with w_e_end_ov_req */
int w_e_send_csum(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
{
struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
int digest_size;
void *digest;
int ok;
int ok = 1;

D_ASSERT(e->block_id == DRBD_MAGIC + 0xbeef);

if (unlikely(cancel)) {
drbd_free_ee(mdev, e);
return 1;
}
if (unlikely(cancel))
goto out;

if (likely((e->flags & EE_WAS_ERROR) == 0)) {
digest_size = crypto_hash_digestsize(mdev->csums_tfm);
digest = kmalloc(digest_size, GFP_NOIO);
if (digest) {
drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
if (likely((e->flags & EE_WAS_ERROR) != 0))
goto out;

inc_rs_pending(mdev);
ok = drbd_send_drequest_csum(mdev,
e->sector,
e->size,
digest,
digest_size,
P_CSUM_RS_REQUEST);
kfree(digest);
} else {
dev_err(DEV, "kmalloc() of digest failed.\n");
ok = 0;
}
} else
ok = 1;
digest_size = crypto_hash_digestsize(mdev->csums_tfm);
digest = kmalloc(digest_size, GFP_NOIO);
if (digest) {
sector_t sector = e->sector;
unsigned int size = e->size;
drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
/* Free e and pages before send.
* In case we block on congestion, we could otherwise run into
* some distributed deadlock, if the other side blocks on
* congestion as well, because our receiver blocks in
* drbd_pp_alloc due to pp_in_use > max_buffers. */
drbd_free_ee(mdev, e);
e = NULL;
inc_rs_pending(mdev);
ok = drbd_send_drequest_csum(mdev, sector, size,
digest, digest_size,
P_CSUM_RS_REQUEST);
kfree(digest);
} else {
dev_err(DEV, "kmalloc() of digest failed.\n");
ok = 0;
}

drbd_free_ee(mdev, e);
out:
if (e)
drbd_free_ee(mdev, e);

if (unlikely(!ok))
dev_err(DEV, "drbd_send_drequest(..., csum) failed\n");
Expand Down Expand Up @@ -1071,9 +1077,12 @@ int w_e_end_csum_rs_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
return ok;
}

/* TODO merge common code with w_e_send_csum */
int w_e_end_ov_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
{
struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
sector_t sector = e->sector;
unsigned int size = e->size;
int digest_size;
void *digest;
int ok = 1;
Expand All @@ -1093,17 +1102,25 @@ int w_e_end_ov_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
else
memset(digest, 0, digest_size);

/* Free e and pages before send.
* In case we block on congestion, we could otherwise run into
* some distributed deadlock, if the other side blocks on
* congestion as well, because our receiver blocks in
* drbd_pp_alloc due to pp_in_use > max_buffers. */
drbd_free_ee(mdev, e);
e = NULL;
inc_rs_pending(mdev);
ok = drbd_send_drequest_csum(mdev, e->sector, e->size,
digest, digest_size, P_OV_REPLY);
ok = drbd_send_drequest_csum(mdev, sector, size,
digest, digest_size,
P_OV_REPLY);
if (!ok)
dec_rs_pending(mdev);
kfree(digest);

out:
drbd_free_ee(mdev, e);
if (e)
drbd_free_ee(mdev, e);
dec_unacked(mdev);

return ok;
}

Expand All @@ -1122,8 +1139,10 @@ int w_e_end_ov_reply(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
{
struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
struct digest_info *di;
int digest_size;
void *digest;
sector_t sector = e->sector;
unsigned int size = e->size;
int digest_size;
int ok, eq = 0;

if (unlikely(cancel)) {
Expand Down Expand Up @@ -1153,16 +1172,21 @@ int w_e_end_ov_reply(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
}
}

dec_unacked(mdev);
/* Free e and pages before send.
* In case we block on congestion, we could otherwise run into
* some distributed deadlock, if the other side blocks on
* congestion as well, because our receiver blocks in
* drbd_pp_alloc due to pp_in_use > max_buffers. */
drbd_free_ee(mdev, e);
if (!eq)
drbd_ov_oos_found(mdev, e->sector, e->size);
drbd_ov_oos_found(mdev, sector, size);
else
ov_oos_print(mdev);

ok = drbd_send_ack_ex(mdev, P_OV_RESULT, e->sector, e->size,
ok = drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size,
eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);

drbd_free_ee(mdev, e);
dec_unacked(mdev);

--mdev->ov_left;

Expand Down

0 comments on commit 53ea433

Please sign in to comment.