Skip to content

Commit

Permalink
rcu/kvfree: Split ready for reclaim objects from a batch
Browse files Browse the repository at this point in the history
This patch splits the lists of objects so as to avoid sending any
through RCU that have already been queued for more than one grace
period.  These long-term-resident objects are immediately freed.
The remaining short-term-resident objects are queued for later freeing
using queue_rcu_work().

This change avoids delaying workqueue handlers with synchronize_rcu()
invocations.  Yes, workqueue handlers are designed to handle blocking,
but avoiding blocking when unnecessary improves performance during
low-memory situations.

Signed-off-by: Uladzislau Rezki (Sony) <urezki@gmail.com>
Signed-off-by: Paul E. McKenney <paulmck@kernel.org>
  • Loading branch information
Uladzislau Rezki (Sony) authored and Paul E. McKenney committed Jan 4, 2023
1 parent 4c33464 commit 2ca836b
Showing 1 changed file with 54 additions and 33 deletions.
87 changes: 54 additions & 33 deletions kernel/rcu/tree.c
Original file line number Diff line number Diff line change
Expand Up @@ -2900,22 +2900,21 @@ struct kvfree_rcu_bulk_data {
* struct kfree_rcu_cpu_work - single batch of kfree_rcu() requests
* @rcu_work: Let queue_rcu_work() invoke workqueue handler after grace period
* @head_free: List of kfree_rcu() objects waiting for a grace period
* @head_free_gp_snap: Snapshot of RCU state for objects placed to "@head_free"
* @bulk_head_free: Bulk-List of kvfree_rcu() objects waiting for a grace period
* @krcp: Pointer to @kfree_rcu_cpu structure
*/

struct kfree_rcu_cpu_work {
struct work_struct rcu_work;
struct rcu_work rcu_work;
struct rcu_head *head_free;
unsigned long head_free_gp_snap;
struct list_head bulk_head_free[FREE_N_CHANNELS];
struct kfree_rcu_cpu *krcp;
};

/**
* struct kfree_rcu_cpu - batch up kfree_rcu() requests for RCU grace period
* @head: List of kfree_rcu() objects not yet waiting for a grace period
* @head_gp_snap: Snapshot of RCU state for objects placed to "@head"
* @bulk_head: Bulk-List of kvfree_rcu() objects not yet waiting for a grace period
* @krw_arr: Array of batches of kfree_rcu() objects waiting for a grace period
* @lock: Synchronize access to this structure
Expand Down Expand Up @@ -2943,6 +2942,7 @@ struct kfree_rcu_cpu {
// Objects queued on a linked list
// through their rcu_head structures.
struct rcu_head *head;
unsigned long head_gp_snap;
atomic_t head_count;

// Objects queued on a bulk-list.
Expand Down Expand Up @@ -3111,10 +3111,9 @@ static void kfree_rcu_work(struct work_struct *work)
struct rcu_head *head;
struct kfree_rcu_cpu *krcp;
struct kfree_rcu_cpu_work *krwp;
unsigned long head_free_gp_snap;
int i;

krwp = container_of(work,
krwp = container_of(to_rcu_work(work),
struct kfree_rcu_cpu_work, rcu_work);
krcp = krwp->krcp;

Expand All @@ -3126,26 +3125,11 @@ static void kfree_rcu_work(struct work_struct *work)
// Channel 3.
head = krwp->head_free;
krwp->head_free = NULL;
head_free_gp_snap = krwp->head_free_gp_snap;
raw_spin_unlock_irqrestore(&krcp->lock, flags);

// Handle the first two channels.
for (i = 0; i < FREE_N_CHANNELS; i++) {
// Start from the tail page, so a GP is likely passed for it.
list_for_each_entry_safe_reverse(bnode, n, &bulk_head[i], list) {
// Not yet ready? Bail out since we need one more GP.
if (!poll_state_synchronize_rcu(bnode->gp_snap))
break;

list_del_init(&bnode->list);
kvfree_rcu_bulk(krcp, bnode, i);
}

// Please note a request for one more extra GP can
// occur only once for all objects in this batch.
if (!list_empty(&bulk_head[i]))
synchronize_rcu();

list_for_each_entry_safe(bnode, n, &bulk_head[i], list)
kvfree_rcu_bulk(krcp, bnode, i);
}
Expand All @@ -3157,10 +3141,7 @@ static void kfree_rcu_work(struct work_struct *work)
* queued on a linked list through their rcu_head structures.
* This list is named "Channel 3".
*/
if (head) {
cond_synchronize_rcu(head_free_gp_snap);
kvfree_rcu_list(head);
}
kvfree_rcu_list(head);
}

static bool
Expand Down Expand Up @@ -3201,6 +3182,44 @@ schedule_delayed_monitor_work(struct kfree_rcu_cpu *krcp)
queue_delayed_work(system_wq, &krcp->monitor_work, delay);
}

static void
kvfree_rcu_drain_ready(struct kfree_rcu_cpu *krcp)
{
struct list_head bulk_ready[FREE_N_CHANNELS];
struct kvfree_rcu_bulk_data *bnode, *n;
struct rcu_head *head_ready = NULL;
unsigned long flags;
int i;

raw_spin_lock_irqsave(&krcp->lock, flags);
for (i = 0; i < FREE_N_CHANNELS; i++) {
INIT_LIST_HEAD(&bulk_ready[i]);

list_for_each_entry_safe_reverse(bnode, n, &krcp->bulk_head[i], list) {
if (!poll_state_synchronize_rcu(bnode->gp_snap))
break;

atomic_sub(bnode->nr_records, &krcp->bulk_count[i]);
list_move(&bnode->list, &bulk_ready[i]);
}
}

if (krcp->head && poll_state_synchronize_rcu(krcp->head_gp_snap)) {
head_ready = krcp->head;
atomic_set(&krcp->head_count, 0);
WRITE_ONCE(krcp->head, NULL);
}
raw_spin_unlock_irqrestore(&krcp->lock, flags);

for (i = 0; i < FREE_N_CHANNELS; i++) {
list_for_each_entry_safe(bnode, n, &bulk_ready[i], list)
kvfree_rcu_bulk(krcp, bnode, i);
}

if (head_ready)
kvfree_rcu_list(head_ready);
}

/*
* This function is invoked after the KFREE_DRAIN_JIFFIES timeout.
*/
Expand All @@ -3211,6 +3230,9 @@ static void kfree_rcu_monitor(struct work_struct *work)
unsigned long flags;
int i, j;

// Drain ready for reclaim.
kvfree_rcu_drain_ready(krcp);

raw_spin_lock_irqsave(&krcp->lock, flags);

// Attempt to start a new batch.
Expand All @@ -3230,30 +3252,26 @@ static void kfree_rcu_monitor(struct work_struct *work)
// Channel 2 corresponds to vmalloc-pointer bulk path.
for (j = 0; j < FREE_N_CHANNELS; j++) {
if (list_empty(&krwp->bulk_head_free[j])) {
list_replace_init(&krcp->bulk_head[j], &krwp->bulk_head_free[j]);
atomic_set(&krcp->bulk_count[j], 0);
list_replace_init(&krcp->bulk_head[j],
&krwp->bulk_head_free[j]);
}
}

// Channel 3 corresponds to both SLAB and vmalloc
// objects queued on the linked list.
if (!krwp->head_free) {
krwp->head_free = krcp->head;
WRITE_ONCE(krcp->head, NULL);
atomic_set(&krcp->head_count, 0);

// Take a snapshot for this krwp. Please note no more
// any objects can be added to attached head_free channel
// therefore fixate a GP for it here.
krwp->head_free_gp_snap = get_state_synchronize_rcu();
WRITE_ONCE(krcp->head, NULL);
}

// One work is per one batch, so there are three
// "free channels", the batch can handle. It can
// be that the work is in the pending state when
// channels have been detached following by each
// other.
queue_work(system_wq, &krwp->rcu_work);
queue_rcu_work(system_wq, &krwp->rcu_work);
}
}

Expand Down Expand Up @@ -3440,6 +3458,9 @@ void kvfree_call_rcu(struct rcu_head *head, void *ptr)
head->next = krcp->head;
WRITE_ONCE(krcp->head, head);
atomic_inc(&krcp->head_count);

// Take a snapshot for this krcp.
krcp->head_gp_snap = get_state_synchronize_rcu();
success = true;
}

Expand Down Expand Up @@ -4834,7 +4855,7 @@ static void __init kfree_rcu_batch_init(void)
struct kfree_rcu_cpu *krcp = per_cpu_ptr(&krc, cpu);

for (i = 0; i < KFREE_N_BATCHES; i++) {
INIT_WORK(&krcp->krw_arr[i].rcu_work, kfree_rcu_work);
INIT_RCU_WORK(&krcp->krw_arr[i].rcu_work, kfree_rcu_work);
krcp->krw_arr[i].krcp = krcp;

for (j = 0; j < FREE_N_CHANNELS; j++)
Expand Down

0 comments on commit 2ca836b

Please sign in to comment.