[toc]

kernel/rcu/tree.c Tree RCU - 应对大规模多核系统的RCU扩展性实现

历史与背景

这项技術是爲了解决什么特定问题而诞生的?

kernel/rcu/tree.c 并不是一种新的RCU“类型”(像SRCU那样),而是标准RCU(包括Preemptible RCU)的一种高性能、高可扩展性的实现方式。它的诞生是为了解决随着多核处理器(SMP)系统中CPU核心数量急剧增加,最初的“扁平化”RCU实现所面临的严重扩展性瓶颈问题。

RCU的核心是等待一个“宽限期”(Grace Period)结束,而宽限期的结束依赖于确认系统中所有CPU都已经经历过至少一次静止状态(Quiescent State)。在早期的、核心数较少的系统中,RCU协调器可以通过检查一个全局的数据结构来轮询或追踪每个CPU的状态。

然而,当CPU核心数从个位数增长到几十、几百甚至上千时,这种“扁平化”的管理方式导致了:

  1. 全局锁/数据争用:所有CPU都需要向一个中心点报告自己的状态,或者由一个中心任务去检查所有CPU的状态。这会导致对同一个全局锁或缓存行的激烈争用,形成一个巨大的性能瓶颈,完全违背了RCU追求高性能的初衷。
  2. 唤醒空闲CPU:为了检查一个处于深度睡眠(tickless idle)的CPU的状态,可能需要向它发送一个处理器间中断(IPI),将其不必要地唤醒,这会增加功耗和延迟。

Tree RCU就是为了解决这个扩展性问题而设计的,它用一种分层、树状的结构取代了原有的扁平化管理模型。

它的发展经历了哪些重要的里程碑或版本迭代?

Tree RCU是RCU发展史上为了适应硬件趋势而进行的一次关键架构演进。

  • 经典RCU的瓶颈:内核开发者们(尤其是Paul E. McKenney)在高性能计算(HPC)环境中预见并实际遇到了经典RCU实现的扩展性问题。
  • Tree RCU的设计与实现:Paul E. McKenney设计了Tree RCU算法,将CPU组织成一个树状结构。状态的聚合和分发沿着这棵树进行,将全局操作分解为一系列局部操作。
  • 成为默认实现:由于其出色的扩展性,Tree RCU很快就取代了旧的实现,成为现代Linux内核中用于多核系统(CONFIG_SMP=y)的默认RCU实现kernel/rcu/tree.c 就是这一实现的核心代码。
  • NUMA亲和性优化:后续的优化使其能够感知NUMA(非统一内存访问)架构,构建的树会尽可能地让节点内的通信限制在同一个NUMA节点内,进一步减少了跨节点的内存访问延迟。

目前该技术的社区活跃度和主流应用情况如何?

kernel/rcu/tree.c 是当前Linux内核RCU子系统的主力实现,极其核心和稳定。它不是一个可选项,而是所有运行在多核服务器、桌面和高端嵌入式设备上的Linux系统的标准组成部分。对它的维护和优化仍在继续,以适应未来更多核心、更复杂层级的处理器架构。

核心原理与设计

它的核心工作原理是什么?

Tree RCU的核心思想是将对所有CPU状态的全局检查,分散到一个层级结构中去完成。

  1. rcu_node 树的构建

    • 内核在启动时,会根据系统中CPU的数量和拓扑结构,动态地构建一棵由 struct rcu_node 组成的树。
    • 系统中的CPU是这棵树的叶子节点
    • 多个叶子节点(或低层rcu_node)汇聚成一个父rcu_node,最终形成一个单一的树根rcu_node
  2. 宽限期的检测流程

    • 开始:当一个宽限期需要开始时,信息从树根开始,向下传播到所有rcu_node和CPU叶子节点。
    • 静止状态上报(核心)
      a. 每个CPU(叶子节点)在经历了一次静止状态后,会向其父rcu_node报告。
      b. 每个父rcu_node会等待其所有子节点(无论是CPU还是其他rcu_node)都报告了静止状态。
      c. 当一个rcu_node收到了其所有子节点的报告后,它就认为其代表的整个子树都已经“静止”了,于是它再向它自己的父节点报告。
    • 结束:这个“上报”过程沿着树逐级向上传递。当树的根节点最终收到了其所有子节点的报告后,就意味着系统中的每一个CPU都已经经历过静止状态。此时,整个宽限期宣告结束。

通过这种方式,原本需要对一个全局数据结构进行 N次(N为CPU数)访问的操作,被分解为一系列对树中不同节点的局部访问,极大地降低了锁争用和缓存行弹跳。

它的主要优势体现在哪些方面?

  • 极佳的扩展性:这是其最核心的优势。无论CPU核心数如何增长,同步开销都只呈对数级(log(N))增长,而不是线性或更差。
  • NUMA亲和性:树的构建可以与硬件的NUMA拓扑对齐,使得大部分状态上报都发生在同一个CPU插槽(socket)内部,减少了昂贵的跨插槽内存访问。
  • 对Tickless Idle友好:它有一套机制可以处理处于深度睡眠的CPU,而无需为了检查状态而频繁地唤醒它们。

它存在哪些已知的劣势、局限性或在特定场景下的不适用性?

  • 实现复杂性:相比于扁平化的模型,Tree RCU的实现逻辑要复杂得多。
  • 在极小系统上的微小开销:对于只有2或4个核心的系统,树状结构的额外管理开销理论上可能比一个极其简单的扁平模型要高一点点。但在实际中,由于现代内核已经为Tree RCU高度优化,这种差异可以忽略不计。
  • 内存占用rcu_node 结构本身会占用一定的内核内存。

使用场景

在哪些具体的业务或技术场景下,它是首选解决方案?

tree.c 中的Tree RCU不是一个像mutexSRCU那样由开发者直接“选择”的API,而是内核配置选择的底层实现。因此,它的使用场景就是所有标准RCU(rcu_read_lock, synchronize_rcu等)在现代多核硬件上的应用场景

  • 当你在一个多核Linux系统上使用任何依赖标准RCU的功能时,背后默默提供高性能支撑的就是Tree RCU。这些功能包括但不限于:
    • 网络路由和邻居缓存查询。
    • VFS的dcache路径查找。
    • 通过PID查找进程。
    • 内核模块列表遍历。

是否有不推荐使用该技术的场景?为什么?

这个问题应该理解为“在什么情况下内核不会使用Tree RCU实现?”

  • 单处理器系统(UP):如果内核被编译为非SMP(CONFIG_SMP=n),那么就不需要这种复杂的同步机制,会使用一个非常简单的、针对单处理器的RCU实现(rcu/tiny.c)。
  • 某些极简的嵌入式场景:在一些核心数很少且资源极其受限的实时或嵌入式配置下,可能会选择更轻量级的RCU实现。

对比分析

请将其 与 其他相似技术 进行详细对比。

最合适的对比对象是它所取代的**“经典”或“扁平化”RCU实现**。

特性 Tree RCU (kernel/rcu/tree.c) “扁平化” RCU (旧的/理论上的实现)
数据结构 树状层级结构 (struct rcu_node)。 单一的、全局的数据结构(如位图或数组)。
可扩展性 非常好 (O(log N))。 (O(N) 或更糟),受限于全局锁争用。
全局争用 。争用被分散到树的各个局部节点。 。所有CPU都争用同一个全局数据。
NUMA亲和性 支持。可以构建与硬件拓扑匹配的树。 不支持。无法避免跨NUMA节点的访问。
实现复杂性
适用硬件 大规模多核系统(现代服务器、桌面等)。 仅适用于核心数很少的早期SMP系统。

kernel/rcu/tree_exp.h

poll_state_synchronize_rcu 指定的 RCU 宽限期是否已完成?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/**
* poll_state_synchronize_rcu - 指定的 RCU 宽限期是否已完成?
* @oldstate: value from get_state_synchronize_rcu() or start_poll_synchronize_rcu()
*
函数功能概述
输入参数:

oldstate 是一个无符号长整型值,通常由 get_state_synchronize_rcu() 或 start_poll_synchronize_rcu() 提供,表示某一时刻的 RCU 状态快照。
功能逻辑:

如果 oldstate 等于 RCU_GET_STATE_COMPLETED,或者通过 rcu_seq_done_exact 检查发现宽限期序列号(rcu_state.gp_seq_polled)已经完成了 oldstate 所表示的宽限期,则返回 true,表示宽限期已完成。
否则,返回 false,表示宽限期尚未完成。
内存屏障:

在宽限期完成的情况下,调用 smp_mb() 设置内存屏障,确保宽限期的结束在后续的内存访问之前。这提供了与 synchronize_rcu() 相同的内存排序保证。
返回值:

返回布尔值 true 或 false,分别表示宽限期是否已经完成。
导出符号:

使用 EXPORT_SYMBOL_GPL 导出该函数,使其可以被其他内核模块使用。
*/
bool poll_state_synchronize_rcu(unsigned long oldstate)
{
if (oldstate == RCU_GET_STATE_COMPLETED ||
rcu_seq_done_exact(&rcu_state.gp_seq_polled, oldstate)) {
smp_mb(); /* Ensure GP ends before subsequent accesses. */
return true;
}
return false;
}
EXPORT_SYMBOL_GPL(poll_state_synchronize_rcu);

sync_rcu_do_polled_gp 启动加速宽限期的轮询

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/*
* 用于确保 start_poll_synchronize_rcu_expedited 函数所需的加速宽限期(expedited grace periods)能够正确启动并完成。
* 该函数通过工作队列(work_struct)异步执行,主要负责处理与加速宽限期相关的轮询和同步操作
*/
static void sync_rcu_do_polled_gp(struct work_struct *wp)
{
unsigned long flags;
int i = 0;
struct rcu_node *rnp = container_of(wp, struct rcu_node, exp_poll_wq);
unsigned long s;

raw_spin_lock_irqsave(&rnp->exp_poll_lock, flags);
/* 读取并保存当前的宽限期请求序列号 */
s = rnp->exp_seq_poll_rq;
/* 设置为 RCU_GET_STATE_COMPLETED,表示当前没有未处理的宽限期请求 */
rnp->exp_seq_poll_rq = RCU_GET_STATE_COMPLETED;
raw_spin_unlock_irqrestore(&rnp->exp_poll_lock, flags);
if (s == RCU_GET_STATE_COMPLETED)
return;
/* 检查宽限期状态是否已经完成 */
while (!poll_state_synchronize_rcu(s)) {
/* 宽限期未完成启动一个新的加速宽限期 */
synchronize_rcu_expedited();
if (i == 10 || i == 20)
pr_info("%s: i = %d s = %lx gp_seq_polled = %lx\n", __func__, i, s, READ_ONCE(rcu_state.gp_seq_polled));
i++;
}
raw_spin_lock_irqsave(&rnp->exp_poll_lock, flags);
s = rnp->exp_seq_poll_rq;
if (poll_state_synchronize_rcu(s))
rnp->exp_seq_poll_rq = RCU_GET_STATE_COMPLETED;
raw_spin_unlock_irqrestore(&rnp->exp_poll_lock, flags);
}

start_poll_synchronize_rcu_expedited 快照当前RCU状态并开始加速宽限期

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/**
* start_poll_synchronize_rcu_expedited - 快照当前RCU状态并开始加速宽限期
*
* 返回一个cookie,以传递给调用cond_synchronize_rcu()、cond_synchronize_rcu_expedited()或poll_state_synchronize_rcu(),
* 使它们能够确定在此期间是否已经过去了任何形式的宽限期。如果所需的加快宽限期尚未计划开始,则启动该宽限期。
*/
unsigned long start_poll_synchronize_rcu_expedited(void)
{
unsigned long flags;
struct rcu_data *rdp;
struct rcu_node *rnp;
unsigned long s;

s = get_state_synchronize_rcu();/* 获取当前 RCU 的状态快照 */
rdp = per_cpu_ptr(&rcu_data, raw_smp_processor_id());
rnp = rdp->mynode;
if (rcu_init_invoked())
raw_spin_lock_irqsave(&rnp->exp_poll_lock, flags);
if (!poll_state_synchronize_rcu(s)) {
/* 检查当前状态 s 是否已经处于宽限期。如果尚未进入宽限期,则需要启动一个新的加速宽限期 */
if (rcu_init_invoked()) {
/* 请求一个新的加速宽限期 */
rnp->exp_seq_poll_rq = s;
/* rnp->exp_poll_wq 工作队列添加到全局 RCU 工作队列(rcu_gp_wq)中,以异步方式启动宽限期 */
queue_work(rcu_gp_wq, &rnp->exp_poll_wq);
}
}
if (rcu_init_invoked())
raw_spin_unlock_irqrestore(&rnp->exp_poll_lock, flags);

return s;
}
EXPORT_SYMBOL_GPL(start_poll_synchronize_rcu_expedited);

kernel/rcu/tree_plugin.h

rcu_preempt_ctxt_queue 将当前任务添加到 RCU 预处理上下文队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
/* Flags for rcu_preempt_ctxt_queue() decision table. */
#define RCU_GP_TASKS 0x8
#define RCU_EXP_TASKS 0x4
#define RCU_GP_BLKD 0x2
#define RCU_EXP_BLKD 0x1

/*
当任务在 RCU 读侧临界区被抢占时,CPU 无法继续记录其状态,因此需要将任务加入 blkd_tasks 队列。
根据任务是否阻塞宽限期,将任务添加到队列的头部或尾部,以优化宽限期的处理。
宽限期的处理逻辑:

正常宽限期(GP)和加速宽限期(Expedited GP)可能同时存在。
如果任务阻塞了宽限期,则宽限期必须等待任务完成其 RCU 读侧临界区。
*/
static void rcu_preempt_ctxt_queue(struct rcu_node *rnp, struct rcu_data *rdp)
__releases(rnp->lock) /* But leaves rrupts disabled. */
{
/* 根据任务是否阻塞正常宽限期或加速宽限期,计算任务的阻塞状态 */
int blkd_state = (rnp->gp_tasks ? RCU_GP_TASKS : 0) +
(rnp->exp_tasks ? RCU_EXP_TASKS : 0) +
(rnp->qsmask & rdp->grpmask ? RCU_GP_BLKD : 0) +
(rnp->expmask & rdp->grpmask ? RCU_EXP_BLKD : 0);
struct task_struct *t = current;

raw_lockdep_assert_held_rcu_node(rnp);
WARN_ON_ONCE(rdp->mynode != rnp);
WARN_ON_ONCE(!rcu_is_leaf_node(rnp));
/* RCU better not be waiting on newly onlined CPUs! */
WARN_ON_ONCE(rnp->qsmaskinitnext & ~rnp->qsmaskinit & rnp->qsmask &
rdp->grpmask);

/*
* 决定将新阻止的任务排队的位置。 理论上,
* 这可以是 if 语句。 在实践中,当我尝试
* 那,它相当混乱。
*/
switch (blkd_state) {
case 0:
case RCU_EXP_TASKS:
case RCU_EXP_TASKS | RCU_GP_BLKD:
case RCU_GP_TASKS:
case RCU_GP_TASKS | RCU_EXP_TASKS:

/*
* Blocking neither GP, or first task blocking the normal
* GP but not blocking the already-waiting expedited GP.
* Queue at the head of the list to avoid unnecessarily
* blocking the already-waiting GPs.
*/
list_add(&t->rcu_node_entry, &rnp->blkd_tasks);
break;

case RCU_EXP_BLKD:
case RCU_GP_BLKD:
case RCU_GP_BLKD | RCU_EXP_BLKD:
case RCU_GP_TASKS | RCU_EXP_BLKD:
case RCU_GP_TASKS | RCU_GP_BLKD | RCU_EXP_BLKD:
case RCU_GP_TASKS | RCU_EXP_TASKS | RCU_GP_BLKD | RCU_EXP_BLKD:

/*
* First task arriving that blocks either GP, or first task
* arriving that blocks the expedited GP (with the normal
* GP already waiting), or a task arriving that blocks
* both GPs with both GPs already waiting. Queue at the
* tail of the list to avoid any GP waiting on any of the
* already queued tasks that are not blocking it.
*/
list_add_tail(&t->rcu_node_entry, &rnp->blkd_tasks);
break;

case RCU_EXP_TASKS | RCU_EXP_BLKD:
case RCU_EXP_TASKS | RCU_GP_BLKD | RCU_EXP_BLKD:
case RCU_GP_TASKS | RCU_EXP_TASKS | RCU_EXP_BLKD:

/*
* Second or subsequent task blocking the expedited GP.
* The task either does not block the normal GP, or is the
* first task blocking the normal GP. Queue just after
* the first task blocking the expedited GP.
*/
list_add(&t->rcu_node_entry, rnp->exp_tasks);
break;

case RCU_GP_TASKS | RCU_GP_BLKD:
case RCU_GP_TASKS | RCU_EXP_TASKS | RCU_GP_BLKD:

/*
* Second or subsequent task blocking the normal GP.
* The task does not block the expedited GP. Queue just
* after the first task blocking the normal GP.
*/
list_add(&t->rcu_node_entry, rnp->gp_tasks);
break;

default:

/* Yet another exercise in excessive paranoia. */
WARN_ON_ONCE(1);
break;
}

/*
* We have now queued the task. If it was the first one to
* block either grace period, update the ->gp_tasks and/or
* ->exp_tasks pointers, respectively, to reference the newly
* blocked tasks.
*/
if (!rnp->gp_tasks && (blkd_state & RCU_GP_BLKD)) {
WRITE_ONCE(rnp->gp_tasks, &t->rcu_node_entry);
WARN_ON_ONCE(rnp->completedqs == rnp->gp_seq);
}
if (!rnp->exp_tasks && (blkd_state & RCU_EXP_BLKD))
WRITE_ONCE(rnp->exp_tasks, &t->rcu_node_entry);
WARN_ON_ONCE(!(blkd_state & RCU_GP_BLKD) !=
!(rnp->qsmask & rdp->grpmask));
WARN_ON_ONCE(!(blkd_state & RCU_EXP_BLKD) !=
!(rnp->expmask & rdp->grpmask));
raw_spin_unlock_rcu_node(rnp); /* interrupts remain disabled. */

/*
* Report the quiescent state for the expedited GP. This expedited
* GP should not be able to end until we report, so there should be
* no need to check for a subsequent expedited GP. (Though we are
* still in a quiescent state in any case.)
*
* Interrupts are disabled, so ->cpu_no_qs.b.exp cannot change.
*/
if (blkd_state & RCU_EXP_BLKD && rdp->cpu_no_qs.b.exp)
rcu_report_exp_rdp(rdp);
else
WARN_ON_ONCE(rdp->cpu_no_qs.b.exp);
ASSERT_EXCLUSIVE_WRITER_SCOPED(rdp->cpu_no_qs.b.exp);
}

rcu_note_context_switch 在任务上下文切换时记录与 RCU(Read-Copy-Update)相关的状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
/*
* 当任务进入调度器并可能被上下文切换时,如果该任务处于 RCU 读侧临界区,CPU 将无法继续记录该状态。
* 为了确保 RCU 的正确性,任务会被加入 blkd_tasks 列表,并在退出 RCU 读侧临界区时自行移除。
*
* 调用此函数的上下文必须禁用中断,以确保操作的安全性。
*/
void rcu_note_context_switch(bool preempt)
{
struct task_struct *t = current;
struct rcu_data *rdp = this_cpu_ptr(&rcu_data);
struct rcu_node *rnp;

trace_rcu_utilization(TPS("Start context switch"));
lockdep_assert_irqs_disabled();
WARN_ONCE(!preempt && rcu_preempt_depth() > 0, "Voluntary context switch within RCU read-side critical section!");
if (rcu_preempt_depth() > 0 /* 果任务处于 RCU 读侧临界区 */
&& !t->rcu_read_unlock_special.b.blocked) { /* 且尚未被标记为阻塞 */

/* Possibly blocking in an RCU read-side critical section. */
rnp = rdp->mynode;
raw_spin_lock_rcu_node(rnp);
/* 将其标记为阻塞状态 */
t->rcu_read_unlock_special.b.blocked = true;
/* 记录其所属的 RCU 节点 */
t->rcu_blocked_node = rnp;

/*
* 验证 CPU 的健全性,跟踪抢占,以及
* 然后根据状态根据需要对任务进行排队
* 任何正在进行的和加急的宽限期。
*/
WARN_ON_ONCE(!rcu_rdp_cpu_online(rdp));
WARN_ON_ONCE(!list_empty(&t->rcu_node_entry));
trace_rcu_preempt_task(rcu_state.name,
t->pid,
(rnp->qsmask & rdp->grpmask)
? rnp->gp_seq
: rcu_seq_snap(&rnp->gp_seq));
/* 将其加入 RCU 节点的队列 */
rcu_preempt_ctxt_queue(rnp, rdp);
} else {
/* 任务未处于 RCU 读侧临界区,则处理延迟的任务状态 */
rcu_preempt_deferred_qs(t);
}

/*
* 要么我们没有在 RCU 读取端关键部分
* 从该关键部分开始,或者我们现在已记录该关键部分
*全球。 无论哪种方式,我们现在可以注意到一个静止状态
* 对于此 CPU。 同样,如果我们在 RCU 读取端 Critical 中
* 部分,如果该关键部分阻止了当前的
* 宽限期,则任务已入队
* 表示我们继续阻止当前的宽限期。
*/
/* 记录 CPU 的静止状态(quiescent state),表示当前 CPU 不再参与 RCU 读侧临界区 */
rcu_qs();
if (rdp->cpu_no_qs.b.exp)
/* 如果 CPU 阻塞了加速的 RCU宽限期,则报告其状态 */
rcu_report_exp_rdp(rdp);
rcu_tasks_qs(current, preempt);
trace_rcu_utilization(TPS("End context switch"));
}
EXPORT_SYMBOL_GPL(rcu_note_context_switch);

rcu_qs 记录 CPU 的静止状态(quiescent state)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
/*
* 为指定的CPU记录一个可抢占RCU的静止状态。
* 注意,这并不一定意味着当前在该CPU上运行的任务处于静止状态:
* 相反,它意味着当前的宽限期无需再等待任何*之后*在此CPU上开始的
* RCU读端临界区。它也意味着,如果当前任务正处于一个RCU读端临界区中,
* 它已经将自己添加到了某个叶子rcu_node结构体的->blkd_tasks链表中。
* 除了当前任务,可能还有任意数量的其他任务在RCU读端临界区中被阻塞。
*
* 与非可抢占RCU不同,用于加速宽限期(expedited grace periods)的
* 静止状态报告,是通过延迟静止状态和上下文切换事件来单独处理的。
*
* 此函数的调用者必须禁用抢占。
*/
static void rcu_qs(void)
{
/*
* 这是一个锁依赖检测器的警告宏。它检查当前上下文是否是可抢占的。
* 如果是,则说明调用者违反了“必须禁用抢占”的契约,会打印一条
* 包含“!!!”的严重内核警告。
*/
RCU_LOCKDEP_WARN(preemptible(), "rcu_qs() invoked with preemption enabled!!!\n");

/*
* 读取当前CPU的per-cpu变量rcu_data.cpu_no_qs.b.norm。
* 这个标志位如果为true,表示RCU的宽限期状态机正在等待当前CPU
* 报告一个静止状态。
*/
if (__this_cpu_read(rcu_data.cpu_no_qs.b.norm)) {
/*
* 如果需要报告QS,则首先向追踪系统发出一个事件,记录下
* RCU类型("rcu_preempt")、当前的宽限期序列号(gp_seq),
* 以及事件类型("cpuqs")。
*/
trace_rcu_grace_period(TPS("rcu_preempt"),
__this_cpu_read(rcu_data.gp_seq),
TPS("cpuqs"));
/*
* 核心操作:将当前CPU的“需要QS”标志置为false。
* 这就向RCU状态机宣告:“本CPU已静止”。
*/
__this_cpu_write(rcu_data.cpu_no_qs.b.norm, false);
/*
* 插入一个编译器屏障。用于与rcu_flavor_sched_clock_irq()中的
* 读操作进行协调,防止编译器进行有害的指令重排。
*/
barrier();
/*
* 使用WRITE_ONCE原子地将当前任务的“需要QS”请求标志也置为false。
* 因为QS已经在此被报告,所以这个任务不再需要去触发另一个QS了。
*/
WRITE_ONCE(current->rcu_read_unlock_special.b.need_qs, false);
}
}

rcu_preempt_need_deferred_qs 检查是否需要报告延迟的静止状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/*
* 这是一个静态的、不被追踪的内联函数,用于判断是否需要报告一个延迟的静止状态。
*
* 注释:是否存在一个待处理的延迟静止状态,并且我们当前也“不”在
* 一个RCU读端临界区内?调用者有责任确保在其他方面(如中断、
* 抢占上下文)报告任何延迟的静止状态是安全的。原因是,
* 即使抢占被禁用,在上下文切换期间报告静止状态也是安全的。
* 不能期望本函数去理解这些细微差别,所以调用者必须处理它们。
*
* @t: 指向需要被检查的任务。
*
* 返回值: 如果需要且可以安全地报告一个延迟QS,则返回true;否则返回false。
*/
static notrace bool rcu_preempt_need_deferred_qs(struct task_struct *t)
{
/*
* 整个表达式由一个逻辑与(&&)连接,必须同时满足两个条件。
*/
return (
/*
* 第一个条件:检查是否有任何待处理的QS请求。
* 使用逻辑或(||)连接两个可能的请求来源。
*/
__this_cpu_read(rcu_data.cpu_no_qs.b.exp) || /* 条件1a: RCU加速宽限期是否正在等待本CPU? */
READ_ONCE(t->rcu_read_unlock_special.s) /* 条件1b: 任务t自身是否有一个挂起的延迟QS请求? */
) &&
/*
* 第二个条件:检查当前上下文是否可以安全地报告QS。
* rcu_preempt_depth()返回当前任务的RCU读锁嵌套深度。
* 只有当深度为0时,才表示任务已完全退出所有RCU读端临界区。
*/
rcu_preempt_depth() == 0;
}

rcu_preempt_deferred_qs_irqrestore 延迟报告静止状态并恢复中断(未分析)

  • 它的核心作用是:处理所有与“延迟静止状态(deferred QS)”相关的事务。这主要包括两个方面:1. 报告一个简单的延迟QS。 2. 处理一个更复杂的情况——当一个任务在RCU读端临界区内被阻塞(block)时,将它从RCU的“阻塞任务链表”中安全地移除,并在此过程中判断是否可以因此报告一个QS。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
/*
* 这是一个静态的、不被追踪的函数,在中断禁用的情况下执行。
* 它报告延迟的静止状态。延迟的时间可能非常短,例如,
* 在从rcu_read_unlock_special()调用过来的情况下。
*
* @t: 指向需要处理延迟QS的任务。
* @flags: 需要在函数退出时恢复的中断状态。
*/
static notrace void
rcu_preempt_deferred_qs_irqrestore(struct task_struct *t, unsigned long flags)
{
/* empty_*: 布尔标志,用于记录各种链表是否为空。*/
bool empty_exp, empty_norm, empty_exp_now;
/* np: 链表指针,用于在删除节点时,获取下一个节点的地址。*/
struct list_head *np;
/* drop_boost_mutex: 布尔标志,记录是否需要为任务进行优先级“降级”。*/
bool drop_boost_mutex = false;
/* rdp: 指向当前CPU的RCU数据结构。*/
struct rcu_data *rdp;
/* rnp: 指向任务t曾阻塞于其上的那个rcu_node。*/
struct rcu_node *rnp;
/* special: 临时存储任务t的rcu_read_unlock_special状态字的快照。*/
union rcu_special special;

/*
* 如果RCU核心正在等待此CPU退出其临界区,则报告它已经退出的事实。
* 因为中断是禁用的,t->rcu_read_unlock_special的值不会改变。
*/
/* 读取状态快照。*/
special = t->rcu_read_unlock_special;
rdp = this_cpu_ptr(&rcu_data);
/* 快速路径检查:如果没有任何延迟QS或加速QS请求,则直接恢复中断并返回。*/
if (!special.s && !rdp->cpu_no_qs.b.exp) {
local_irq_restore(flags);
return;
}
/* 原子地将任务的延迟QS请求清零,表示我们即将处理它。*/
t->rcu_read_unlock_special.s = 0;
/* 如果快照中的need_qs位被设置,说明这是一个简单的延迟QS请求。*/
if (special.b.need_qs) {
/* 在严格的宽限期检查模式下,执行更复杂的操作并引入延迟。*/
if (IS_ENABLED(CONFIG_RCU_STRICT_GRACE_PERIOD)) {
rdp->cpu_no_qs.b.norm = false;
rcu_report_qs_rdp(rdp);
udelay(rcu_unlock_delay);
} else {
/* 在常规模式下,直接调用rcu_qs()报告静止状态。*/
rcu_qs();
}
}

/*
* 响应一个加速宽限期对此CPU的静止状态请求。
* 注意,来自任务的请求在下面从阻塞任务列表中移除任务时处理。
*/
/* 如果有系统级的加速QS请求,则调用rcu_report_exp_rdp报告。*/
if (rdp->cpu_no_qs.b.exp)
rcu_report_exp_rdp(rdp);

/* 如果在RCU读端临界区期间被阻塞了,则进行清理。*/
if (special.b.blocked) {

/*
* 将此任务从它所阻塞的链表中移除。该任务现在仍然排队在
* 它首次阻塞时对应的那个rcu_node上,所以不再需要循环。
* 出于纯粹的偏执,保留一个WARN_ON_ONCE()。
*/
/* 获取任务被阻塞时所在的rcu_node。*/
rnp = t->rcu_blocked_node;
/* 获取该rcu_node的自旋锁(中断已被禁用)。*/
raw_spin_lock_rcu_node(rnp);
/* 健壮性检查:确保在我们操作期间,任务没有被移动到别的node上。*/
WARN_ON_ONCE(rnp != t->rcu_blocked_node);
WARN_ON_ONCE(!rcu_is_leaf_node(rnp));
/* 检查移除前,普通宽限期的阻塞读者链表是否为空。*/
empty_norm = !rcu_preempt_blocked_readers_cgp(rnp);
WARN_ON_ONCE(rnp->completedqs == rnp->gp_seq &&
(!empty_norm || rnp->qsmask));
/* 检查移除前,加速宽限期的阻塞读者链表是否为空。*/
empty_exp = sync_rcu_exp_done(rnp);
/* 内存屏障,确保在删除链表节点前,所有状态更新都对其他CPU可见。*/
smp_mb();
/* 获取t在链表中的下一个节点,用于后续可能的链表头更新。*/
np = rcu_next_node_entry(t, rnp);
/* 将t的链表节点从链表中物理地移除。*/
list_del_init(&t->rcu_node_entry);
/* 将t的阻塞节点指针清零。*/
t->rcu_blocked_node = NULL;
/* 记录一次“解除被抢占任务锁定”的追踪事件。*/
trace_rcu_unlock_preempted_task(TPS("rcu_preempt"),
rnp->gp_seq, t->pid);
/* 如果t恰好是普通链表的头,则更新链表头指针。*/
if (&t->rcu_node_entry == rnp->gp_tasks)
WRITE_ONCE(rnp->gp_tasks, np);
/* 如果t恰好是加速链表的头,则更新链表头指针。*/
if (&t->rcu_node_entry == rnp->exp_tasks)
WRITE_ONCE(rnp->exp_tasks, np);
/* 如果配置了RCU优先级提升... */
if (IS_ENABLED(CONFIG_RCU_BOOST)) {
/* 在持有rnp->lock的情况下,快照boost_mtx的持有者是否是t。*/
drop_boost_mutex = rt_mutex_owner(&rnp->boost_mtx.rtmutex) == t;
/* 如果t恰好是boost链表的头,则更新链表头指针。*/
if (&t->rcu_node_entry == rnp->boost_tasks)
WRITE_ONCE(rnp->boost_tasks, np);
}

/*
* 如果这是当前链表上的最后一个任务,并且我们没有在等待任何CPU,
* 就报告静止状态。注意rcu_report_unblock_qs_rnp()会释放
* rnp->lock,所以我们必须快照加速的状态。
*/
/* 再次检查加速链表是否为空。*/
empty_exp_now = sync_rcu_exp_done(rnp);
/* 如果之前普通链表非空,但现在因为移除了t而变为空... */
if (!empty_norm && !rcu_preempt_blocked_readers_cgp(rnp)) {
/* ...则说明可以为这个rcu_node报告一个普通宽限期的QS了。*/
trace_rcu_quiescent_state_report(/* ... */);
/* 调用此函数报告QS,它内部会释放锁并恢复中断。*/
rcu_report_unblock_qs_rnp(rnp, flags);
} else {
/* 如果链表没有变空,则只需释放锁并恢复中断。*/
raw_spin_unlock_irqrestore_rcu_node(rnp, flags);
}

/*
* 如果这是加速链表上的最后一个任务,我们需要向rcu_node层级树报告。
*/
if (!empty_exp && empty_exp_now)
rcu_report_exp_rnp(rnp, true);

/* 如果我们之前被提升了优先级,现在进行降级。*/
if (IS_ENABLED(CONFIG_RCU_BOOST) && drop_boost_mutex)
rt_mutex_futex_unlock(&rnp->boost_mtx.rtmutex);
} else {
/* 如果special.b.blocked为假,说明没有阻塞QS要处理,直接恢复中断。*/
local_irq_restore(flags);
}
}

rcu_preempt_deferred_qs 延迟报告静止状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
/*
* notrace: GCC属性,告诉内核追踪器(ftrace),不要追踪这个函数的入口和出口。
* 因为RCU本身是追踪系统的一部分,追踪它可能导致无限递归。
*/
/*
* 如果需要并且安全,则报告一个延迟的静止状态。
* 与rcu_preempt_need_deferred_qs()一样,“安全”仅涉及到
* 不在RCU读端临界区内。调用者必须从中断、软中断和抢占
* 禁用的角度来评估安全性。
*
* @t: 指向需要为其报告延迟QS的任务。
*/
notrace void rcu_preempt_deferred_qs(struct task_struct *t)
{
/* flags: 用于保存local_irq_save获取锁前的中断状态。*/
unsigned long flags;

/*
* 调用辅助函数rcu_preempt_need_deferred_qs(),检查任务t的内部RCU状态,
* 判断是否真的有一个被延迟的静止状态需要报告。
*/
if (!rcu_preempt_need_deferred_qs(t))
/* 如果不需要,则直接返回,这是高效的快速路径。*/
return;

/*
* 如果需要报告,则首先调用local_irq_save,保存当前的中断状态,
* 然后禁用本地中断。这是为了给接下来的底层RCU函数提供一个
* 安全的、无中断的执行环境。
*/
local_irq_save(flags);

/*
* 将实际的工作委托给rcu_preempt_deferred_qs_irqrestore函数。
* 这个函数会在完成工作后,使用传入的flags来恢复之前的中断状态。
* 这种将“加锁”和“解锁”分离的模式在内核中很常见,
* 使得核心逻辑可以被复用。
*/
rcu_preempt_deferred_qs_irqrestore(t, flags);
}

kernel/rcu/tree.c

call_rcu 回调rcu处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
/*
* Enqueue the specified callback onto the specified rcu_segcblist
* structure, updating accounting as needed. Note that the ->len
* field may be accessed locklessly, hence the WRITE_ONCE().
* The ->len field is used by rcu_barrier() and friends to determine
* if it must post a callback on this structure, and it is OK
* for rcu_barrier() to sometimes post callbacks needlessly, but
* absolutely not OK for it to ever miss posting a callback.
*/
void rcu_segcblist_enqueue(struct rcu_segcblist *rsclp,
struct rcu_head *rhp)
{
/* 增加队列的总长度计数 */
rcu_segcblist_inc_len(rsclp);
/* 将指定的回调(rhp)添加到 rcu_segcblist 的尾部(RCU_NEXT_TAIL 段) */
rcu_segcblist_inc_seglen(rsclp, RCU_NEXT_TAIL);
rhp->next = NULL;
WRITE_ONCE(*rsclp->tails[RCU_NEXT_TAIL], rhp);
WRITE_ONCE(rsclp->tails[RCU_NEXT_TAIL], &rhp->next);
}

static void rcutree_enqueue(struct rcu_data *rdp, struct rcu_head *head, rcu_callback_t func)
{
rcu_segcblist_enqueue(&rdp->cblist, head);
trace_rcu_callback(rcu_state.name, head,
rcu_segcblist_n_cbs(&rdp->cblist));
trace_rcu_segcb_stats(&rdp->cblist, TPS("SegCBQueued"));
}

/*
* Handle any core-RCU processing required by a call_rcu() invocation.
*/
static void call_rcu_core(struct rcu_data *rdp, struct rcu_head *head,
rcu_callback_t func, unsigned long flags)
{
/* 将回调加入队列 */
rcutree_enqueue(rdp, head, func);
/*
* If called from an extended quiescent state, invoke the RCU
* core in order to force a re-evaluation of RCU's idleness.
*/
if (!rcu_is_watching())
invoke_rcu_core();

/* If interrupts were disabled or CPU offline, don't invoke RCU core. */
if (irqs_disabled_flags(flags) || cpu_is_offline(smp_processor_id()))
return;

/*
* 如果回调队列的长度超过了阈值(qhimark),或者等待时间过长,则需要强制触发宽限期
*/
if (unlikely(rcu_segcblist_n_cbs(&rdp->cblist) >
rdp->qlen_last_fqs_check + qhimark)) {

/* Are we ignoring a completed grace period? */
note_gp_changes(rdp);

/* Start a new grace period if one not already started. */
if (!rcu_gp_in_progress()) {
rcu_accelerate_cbs_unlocked(rdp->mynode, rdp);
} else {
/* Give the grace period a kick. */
rdp->blimit = DEFAULT_MAX_RCU_BLIMIT;
if (READ_ONCE(rcu_state.n_force_qs) == rdp->n_force_qs_snap &&
rcu_segcblist_first_pend_cb(&rdp->cblist) != head)
rcu_force_quiescent_state();
rdp->n_force_qs_snap = READ_ONCE(rcu_state.n_force_qs);
rdp->qlen_last_fqs_check = rcu_segcblist_n_cbs(&rdp->cblist);
}
}
}

static void
__call_rcu_common(struct rcu_head *head, rcu_callback_t func, bool lazy_in)
{
static atomic_t doublefrees;
unsigned long flags;
bool lazy;
struct rcu_data *rdp;

/* Misaligned rcu_head! */
/* 回调头对齐检查 */
WARN_ON_ONCE((unsigned long)head & (sizeof(void *) - 1));
/* 查是否存在重复的 call_rcu 调用 */
if (debug_rcu_head_queue(head)) {
/*
* Probable double call_rcu(), so leak the callback.
* Use rcu:rcu_callback trace event to find the previous
* time callback was passed to call_rcu().
*/
if (atomic_inc_return(&doublefrees) < 4) {
pr_err("%s(): Double-freed CB %p->%pS()!!! ", __func__, head, head->func);
mem_dump_obj(head);
}
WRITE_ONCE(head->func, rcu_leak_callback);
return;
}
head->func = func;
head->next = NULL;
kasan_record_aux_stack(head);

local_irq_save(flags);
rdp = this_cpu_ptr(&rcu_data);
/* 检查回调队列过载 */
check_cb_ovld(rdp);

if (unlikely(rcu_rdp_is_offloaded(rdp)))
call_rcu_nocb(rdp, head, func, flags, lazy);
else
call_rcu_core(rdp, head, func, flags);
local_irq_restore(flags);
}

/**
* call_rcu() - Queue an RCU callback for invocation after a grace period.
* By default the callbacks are 'lazy' and are kept hidden from the main
* ->cblist to prevent starting of grace periods too soon.
* If you desire grace periods to start very soon, use call_rcu_hurry().
*
* @head: structure to be used for queueing the RCU updates.
* @func: actual callback function to be invoked after the grace period
*
* The callback function will be invoked some time after a full grace
* period elapses, in other words after all pre-existing RCU read-side
* critical sections have completed. However, the callback function
* might well execute concurrently with RCU read-side critical sections
* that started after call_rcu() was invoked.
*
* It is perfectly legal to repost an RCU callback, potentially with
* a different callback function, from within its callback function.
* The specified function will be invoked after another full grace period
* has elapsed. This use case is similar in form to the common practice
* of reposting a timer from within its own handler.
*
* RCU read-side critical sections are delimited by rcu_read_lock()
* and rcu_read_unlock(), and may be nested. In addition, but only in
* v5.0 and later, regions of code across which interrupts, preemption,
* or softirqs have been disabled also serve as RCU read-side critical
* sections. This includes hardware interrupt handlers, softirq handlers,
* and NMI handlers.
*
* Note that all CPUs must agree that the grace period extended beyond
* all pre-existing RCU read-side critical section. On systems with more
* than one CPU, this means that when "func()" is invoked, each CPU is
* guaranteed to have executed a full memory barrier since the end of its
* last RCU read-side critical section whose beginning preceded the call
* to call_rcu(). It also means that each CPU executing an RCU read-side
* critical section that continues beyond the start of "func()" must have
* executed a memory barrier after the call_rcu() but before the beginning
* of that RCU read-side critical section. Note that these guarantees
* include CPUs that are offline, idle, or executing in user mode, as
* well as CPUs that are executing in the kernel.
*
* Furthermore, if CPU A invoked call_rcu() and CPU B invoked the
* resulting RCU callback function "func()", then both CPU A and CPU B are
* guaranteed to execute a full memory barrier during the time interval
* between the call to call_rcu() and the invocation of "func()" -- even
* if CPU A and CPU B are the same CPU (but again only if the system has
* more than one CPU).
*
* Implementation of these memory-ordering guarantees is described here:
* Documentation/RCU/Design/Memory-Ordering/Tree-RCU-Memory-Ordering.rst.
*
* Specific to call_rcu() (as opposed to the other call_rcu*() functions),
* in kernels built with CONFIG_RCU_LAZY=y, call_rcu() might delay for many
* seconds before starting the grace period needed by the corresponding
* callback. This delay can significantly improve energy-efficiency
* on low-utilization battery-powered devices. To avoid this delay,
* in latency-sensitive kernel code, use call_rcu_hurry().
*/
void call_rcu(struct rcu_head *head, rcu_callback_t func)
{
__call_rcu_common(head, func, enable_rcu_lazy);
}
EXPORT_SYMBOL_GPL(call_rcu);

这段代码实现了 call_rcu 函数,它是 Linux 内核 RCU(Read-Copy-Update)机制的一个核心接口,用于在一个“宽限期”(grace period)结束后执行回调函数。RCU 是一种高效的同步机制,广泛用于内核中需要读写并发的场景。


函数功能概述

call_rcu 的主要作用是将一个回调函数(func)与一个回调头(head)关联,并将其排队等待执行。回调函数会在所有当前的 RCU 读侧临界区(RCU read-side critical sections)完成后被调用。RCU 读侧临界区通常由 rcu_read_lock()rcu_read_unlock() 包围。

默认情况下,call_rcu 会以“惰性”(lazy)的方式处理回调,这意味着它可能会延迟触发宽限期的开始,从而提高能效,特别是在低利用率的电池供电设备上。如果需要更快地触发宽限期,可以使用 call_rcu_hurry


参数说明

  • head:一个 rcu_head 结构,用于将回调函数排队。它是回调函数的上下文。
  • func:回调函数指针,在宽限期结束后被调用。

关键点与行为

  1. 宽限期的定义
    宽限期是指所有当前的 RCU 读侧临界区都已完成的时间段。回调函数会在宽限期结束后执行,但可能与新的 RCU 读侧临界区并发运行。

  2. 回调的重新排队
    在回调函数中,可以再次调用 call_rcu 将另一个回调函数排队。这种行为类似于定时器在其处理函数中重新设置自身。

  3. RCU 读侧临界区
    RCU 读侧临界区由 rcu_read_lock()rcu_read_unlock() 包围,并且可以嵌套。在 Linux 5.0 及之后的版本中,禁用中断、抢占或软中断的代码区域也被视为 RCU 读侧临界区,包括硬件中断处理程序、软中断处理程序和 NMI(非屏蔽中断)处理程序。

  4. 内存屏障保证

    • 当回调函数被调用时,所有 CPU 都保证已经完成了其在调用 call_rcu 之前开始的 RCU 读侧临界区。
    • 如果一个 CPU 调用了 call_rcu,另一个 CPU 执行了对应的回调函数,则这两个 CPU 在此期间都会执行一个完整的内存屏障。
  5. 惰性回调
    如果内核启用了 CONFIG_RCU_LAZY 配置选项,call_rcu 可能会延迟宽限期的触发,从而延迟回调函数的执行。这种行为适用于能效优先的场景。如果需要更低的延迟,可以使用 call_rcu_hurry


实现细节

call_rcu 的实现非常简单,它调用了内部的 __call_rcu_common 函数,并传递了一个惰性标志(enable_rcu_lazy)。__call_rcu_common 是一个通用的内部函数,负责处理回调的排队和宽限期的触发逻辑。


总结

call_rcu 是 RCU 机制的一个核心接口,用于异步地在宽限期结束后执行回调函数。它的设计兼顾了性能和能效,通过惰性处理和内存屏障保证了系统的一致性和高效性。开发者可以根据需求选择默认的惰性行为或使用更低延迟的接口(如 call_rcu_hurry)。这种灵活性使得 RCU 成为 Linux 内核中处理并发和同步的强大工具。

rcu_spawn_gp_kthread(待分析)

  • 检测与触发rcu回调的线程
1
2

early_initcall(rcu_spawn_gp_kthread);

rcu_init_geometry 根据内核参数计算rcu_node树形结构的几何形状

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
/*
* 确保我们给予 grace-period kthread 时间,以检测任何闲置 CPU,然后再采取积极措施强制进入静态状态。
* 然而,对于非常大的系统,不要低于100毫秒,调整为更高。
*/
static void adjust_jiffies_till_sched_qs(void)
{
unsigned long j;

/* 如果指定了 jiffies_till_sched_qs,则遵守该请求。*/
if (jiffies_till_sched_qs != ULONG_MAX) {
WRITE_ONCE(jiffies_to_sched_qs, jiffies_till_sched_qs);
return;
}
/* 否则,设置为第三个fqs扫描,但在大型系统上上下限。 */
j = READ_ONCE(jiffies_till_first_fqs) +
2 * READ_ONCE(jiffies_till_next_fqs);
if (j < HZ / 10 + nr_cpu_ids / RCU_JIFFIES_FQS_DIV)
j = HZ / 10 + nr_cpu_ids / RCU_JIFFIES_FQS_DIV;
pr_info("RCU calculated value of scheduler-enlistment delay is %ld jiffies.\n", j);
WRITE_ONCE(jiffies_to_sched_qs, j);
}

/*
* 根据内核参数计算rcu_node树形结构的几何形状。这不能替代tree.h中的定义,因为这些定义用于确定rcu_state结构中的->node数组的大小。
*/
void rcu_init_geometry(void)
{
ulong d;
int i;
static unsigned long old_nr_cpu_ids;
int rcu_capacity[RCU_NUM_LVLS];
static bool initialized;

if (initialized) {
/*
* Warn if setup_nr_cpu_ids() had not yet been invoked,
* unless nr_cpus_ids == NR_CPUS, in which case who cares?
*/
WARN_ON_ONCE(old_nr_cpu_ids != nr_cpu_ids);
return;
}

old_nr_cpu_ids = nr_cpu_ids;
initialized = true;

/*
* 初始化任何未指定的启动参数.
* jiffies_till_first_fqs 和 jiffies_till_next_fqs 的默认值设置为 RCU_JIFFIES_TILL_FORCE_QS 值,该值是 HZ 的函数,然后对于系统中可能存在的每个 RCU_JIFFIES_FQS_DIV CPU 加一。.
*/
d = RCU_JIFFIES_TILL_FORCE_QS + nr_cpu_ids / RCU_JIFFIES_FQS_DIV;
if (jiffies_till_first_fqs == ULONG_MAX)
jiffies_till_first_fqs = d;
if (jiffies_till_next_fqs == ULONG_MAX)
jiffies_till_next_fqs = d;
adjust_jiffies_till_sched_qs();

/*如果编译时的值是准确的,那就离开吧。 */
if (rcu_fanout_leaf == RCU_FANOUT_LEAF &&
nr_cpu_ids == NR_CPUS)
return;
}

rcu_boot_init_percpu_data 初始化每个 CPU 的 RCU 数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/*
* 对每个 CPU 的 RCU 数据进行启动时初始化。
*/
static void __init
rcu_boot_init_percpu_data(int cpu)
{
struct context_tracking *ct = this_cpu_ptr(&context_tracking);
struct rcu_data *rdp = per_cpu_ptr(&rcu_data, cpu);

/* 计算并设置当前 CPU 在其所属叶节点中的掩码(grpmask) */
rdp->grpmask = leaf_node_cpu_bit(rdp->mynode, cpu);
/* 用于处理严格模式下的 RCU 操作 */
INIT_WORK(&rdp->strict_work, strict_work_handler);
/* 检查上下文跟踪的嵌套级别是否为 1(ct->nesting != 1),确保上下文状态正确 */
WARN_ON_ONCE(ct->nesting != 1);
/* 检查当前 CPU 是否处于 RCU 观察状态(rcu_watching_snap_in_eqs),以验证 RCU 的运行环境 */
WARN_ON_ONCE(rcu_watching_snap_in_eqs(ct_rcu_watching_cpu(cpu)));
rdp->barrier_seq_snap = rcu_state.barrier_sequence;
rdp->rcu_ofl_gp_seq = rcu_state.gp_seq;
rdp->rcu_ofl_gp_state = RCU_GP_CLEANED;
rdp->rcu_onl_gp_seq = rcu_state.gp_seq;
rdp->rcu_onl_gp_state = RCU_GP_CLEANED;
rdp->last_sched_clock = jiffies;
rdp->cpu = cpu;
rcu_boot_init_nocb_percpu_data(rdp);
}

rcu_init_one 用于初始化 rcu_state 结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
/*
* rcu_init() 的帮助函数,用于初始化 rcu_state 结构。
*/
static void __init rcu_init_one(void)
{
static const char * const buf[] = RCU_NODE_NAME_INIT;
static const char * const fqs[] = RCU_FQS_NAME_INIT;
static struct lock_class_key rcu_node_class[RCU_NUM_LVLS];
static struct lock_class_key rcu_fqs_class[RCU_NUM_LVLS];

int levelspread[RCU_NUM_LVLS]; /* kids/node in each level. */
int cpustride = 1;
int i;
int j;
struct rcu_node *rnp;

BUILD_BUG_ON(RCU_NUM_LVLS > ARRAY_SIZE(buf)); /* Fix buf[] init! */

/* Silence gcc 4.8 false positive about array index out of range. */
if (rcu_num_lvls <= 0 || rcu_num_lvls > RCU_NUM_LVLS)
panic("rcu_init_one: rcu_num_lvls out of range");

/* 初始化水平跟踪数组。 */
/* 层次结构初始化: 初始化 rcu_state.level 数组,该数组记录了每层 RCU 节点的起始地址*/
for (i = 1; i < rcu_num_lvls; i++)
rcu_state.level[i] =
rcu_state.level[i - 1] + num_rcu_lvl[i - 1];
/* 计算每层的分布信息(levelspread),用于确定每层节点的跨度 */
rcu_init_levelspread(levelspread, num_rcu_lvl);

/* 从叶子开始,初始化元素本身. */
/* RCU 节点初始化 从叶节点开始,逐层初始化所有 RCU 节点*/
for (i = rcu_num_lvls - 1; i >= 0; i--) {
cpustride *= levelspread[i];
rnp = rcu_state.level[i];
for (j = 0; j < num_rcu_lvl[i]; j++, rnp++) {
raw_spin_lock_init(&ACCESS_PRIVATE(rnp, lock));
lockdep_set_class_and_name(&ACCESS_PRIVATE(rnp, lock),
&rcu_node_class[i], buf[i]);
raw_spin_lock_init(&rnp->fqslock);
lockdep_set_class_and_name(&rnp->fqslock,
&rcu_fqs_class[i], fqs[i]);
rnp->gp_seq = rcu_state.gp_seq;
rnp->gp_seq_needed = rcu_state.gp_seq;
rnp->completedqs = rcu_state.gp_seq;
rnp->qsmask = 0;
rnp->qsmaskinit = 0;
/* 设置节点的范围(grplo 和 grphi),表示该节点管理的 CPU 范围。 */
rnp->grplo = j * cpustride;
rnp->grphi = (j + 1) * cpustride - 1;
if (rnp->grphi >= nr_cpu_ids)
rnp->grphi = nr_cpu_ids - 1;
if (i == 0) {
/* 如果是叶节点,设置 grpnum 和 grpmask 为 0,并将 parent 指针置为 NULL */
rnp->grpnum = 0;
rnp->grpmask = 0;
rnp->parent = NULL;
} else {
/* 如果是非叶节点,计算 grpnum 和 grpmask,并设置 parent 指针指向上一层的父节点 */
rnp->grpnum = j % levelspread[i - 1];
rnp->grpmask = BIT(rnp->grpnum);
rnp->parent = rcu_state.level[i - 1] +
j / levelspread[i - 1];
}
rnp->level = i;
INIT_LIST_HEAD(&rnp->blkd_tasks);
rcu_init_one_nocb(rnp);
init_waitqueue_head(&rnp->exp_wq[0]);
init_waitqueue_head(&rnp->exp_wq[1]);
init_waitqueue_head(&rnp->exp_wq[2]);
init_waitqueue_head(&rnp->exp_wq[3]);
spin_lock_init(&rnp->exp_lock);
mutex_init(&rnp->kthread_mutex);
raw_spin_lock_init(&rnp->exp_poll_lock);
rnp->exp_seq_poll_rq = RCU_GET_STATE_COMPLETED;
INIT_WORK(&rnp->exp_poll_wq, sync_rcu_do_polled_gp);
}
}

/* 初始化全局的等待队列(gp_wq 和 expedited_wq),用于管理 RCU 的宽限期和加速操作 */
init_swait_queue_head(&rcu_state.gp_wq);
init_swait_queue_head(&rcu_state.expedited_wq);

rnp = rcu_first_leaf_node();
/* 为每个 CPU 分配对应的 RCU 节点(mynode) */
for_each_possible_cpu(i) {
while (i > rnp->grphi)
rnp++;
per_cpu_ptr(&rcu_data, i)->mynode = rnp;
per_cpu_ptr(&rcu_data, i)->barrier_head.next =
&per_cpu_ptr(&rcu_data, i)->barrier_head;
rcu_boot_init_percpu_data(i);
}
}

rcutree_prepare_cpu CPU 上线过程的早期阶段被调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
/*
* 在CPU上线过程的早期调用,当几乎所有服务都可用时。传入的CPU不存在。
*
* 初始化CPU的每个CPU RCU数据。 注意,在给定时间只能发生一个在线或离线事件。 还要注意,由于该CPU不可能有任何未卸载的RCU回调正在进行,因此我们可以接受在rsp->gp_seq访问中存在一些误差。 而任何已卸载的回调则在其他地方被编号。
*/
int rcutree_prepare_cpu(unsigned int cpu)
{
unsigned long flags;
struct context_tracking *ct = per_cpu_ptr(&context_tracking, cpu);
struct rcu_data *rdp = per_cpu_ptr(&rcu_data, cpu);
struct rcu_node *rnp = rcu_get_root();

/* 设置本地状态,确保全局状态的一致视图. */
raw_spin_lock_irqsave_rcu_node(rnp, flags);
rdp->qlen_last_fqs_check = 0;
rdp->n_force_qs_snap = READ_ONCE(rcu_state.n_force_qs);
rdp->blimit = blimit;
ct->nesting = 1; /* CPU未开启,没有撕裂。 */
raw_spin_unlock_rcu_node(rnp); /* irqs remain disabled. */

/*
* 只有没有早期启动回调的非NOCB CPU需要(重新)初始化。
* 如果目标 CPU 的回调队列尚未启用
*/
if (!rcu_segcblist_is_enabled(&rdp->cblist))
rcu_segcblist_init(&rdp->cblist); /* 重新启用回调队列. */

/*
* 将 CPU 添加到 leaf rcu_node 待在线位掩码。 任何需要向上传播到 rcu_node 树的操作将在下一个宽限期开始时进行。.
*/
rnp = rdp->mynode;
raw_spin_lock_rcu_node(rnp); /* irqs already disabled. */
rdp->gp_seq = READ_ONCE(rnp->gp_seq);
rdp->gp_seq_needed = rdp->gp_seq;
rdp->cpu_no_qs.b.norm = true;
rdp->core_needs_qs = false;
rdp->rcu_iw_pending = false;
rdp->rcu_iw = IRQ_WORK_INIT_HARD(rcu_iw_handler);
rdp->rcu_iw_gp_seq = rdp->gp_seq - 1;
trace_rcu_grace_period(rcu_state.name, rdp->gp_seq, TPS("cpuonl"));
raw_spin_unlock_irqrestore_rcu_node(rnp, flags);
/* 为目标 CPU 启动 RCU 相关的内核线程 */
rcu_spawn_rnp_kthreads(rnp);
rcu_spawn_cpu_nocb_kthread(cpu);

ASSERT_EXCLUSIVE_WRITER(rcu_state.n_online_cpus);
WRITE_ONCE(rcu_state.n_online_cpus, rcu_state.n_online_cpus + 1);

return 0;
}

rcutree_report_cpu_starting 报告 CPU 正在启动

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
/*
* 用于在 CPU 上线过程中将指定的 CPU 标记为在线状态,
* 使得后续的 RCU(Read-Copy-Update)宽限期(grace periods)能够正确地等待该 CPU 的参与。
* 该函数的调用位置非常关键,必须在目标 CPU 尚未启用中断的情况下执行。
*/
void rcutree_report_cpu_starting(unsigned int cpu)
{
unsigned long mask;
struct rcu_data *rdp;
struct rcu_node *rnp;
bool newcpu;

lockdep_assert_irqs_disabled();
rdp = per_cpu_ptr(&rcu_data, cpu);
if (rdp->cpu_started)
return;
rdp->cpu_started = true;

rnp = rdp->mynode;
mask = rdp->grpmask;
arch_spin_lock(&rcu_state.ofl_lock);
rcu_watching_online();
raw_spin_lock(&rcu_state.barrier_lock);
raw_spin_lock_rcu_node(rnp);
WRITE_ONCE(rnp->qsmaskinitnext, rnp->qsmaskinitnext | mask);
raw_spin_unlock(&rcu_state.barrier_lock);
newcpu = !(rnp->expmaskinitnext & mask);
rnp->expmaskinitnext |= mask;
/* Allow lockless access for expedited grace periods. */
smp_store_release(&rcu_state.ncpus, rcu_state.ncpus + newcpu); /* ^^^ */
ASSERT_EXCLUSIVE_WRITER(rcu_state.ncpus);
rcu_gpnum_ovf(rnp, rdp); /* Offline-induced counter wrap? */
rdp->rcu_onl_gp_seq = READ_ONCE(rcu_state.gp_seq);
rdp->rcu_onl_gp_state = READ_ONCE(rcu_state.gp_state);

/* An incoming CPU should never be blocking a grace period. */
if (WARN_ON_ONCE(rnp->qsmask & mask)) { /* RCU waiting on incoming CPU? */
/* rcu_report_qs_rnp() *really* wants some flags to restore */
unsigned long flags;

local_irq_save(flags);
rcu_disable_urgency_upon_qs(rdp);
/* Report QS -after- changing ->qsmaskinitnext! */
rcu_report_qs_rnp(mask, rnp, rnp->gp_seq, flags);
} else {
raw_spin_unlock_rcu_node(rnp);
}
arch_spin_unlock(&rcu_state.ofl_lock);
smp_store_release(&rdp->beenonline, true);
smp_mb(); /* Ensure RCU read-side usage follows above initialization. */
}

rcu_init rcu初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
/* Dump rcu_node combining tree at boot to verify correct setup. */
static bool dump_tree;
module_param(dump_tree, bool, 0444);

/* By default, use RCU_SOFTIRQ instead of rcuc kthreads. */
static bool use_softirq = !IS_ENABLED(CONFIG_PREEMPT_RT);

void __init rcu_init(void)
{
int cpu = smp_processor_id();

rcu_early_boot_tests();
/* rcu: Preemptible hierarchical RCU implementation.
rcu: RCU event tracing is enabled.
rcu: RCU calculated value of scheduler-enlistment delay is 10 jiffies. */
rcu_bootup_announce();
/* 清理和设置 RCU 内核线程的优先级 */
sanitize_kthread_prio();
/* 初始化 RCU 的几何配置(如层次结构和 fanout 参数)
确认jiffies_to_sched_qs的参数*/
rcu_init_geometry();
/* 初始化 RCU 的核心数据结构 */
rcu_init_one();
if (dump_tree)
rcu_dump_rcu_node_tree();
if (use_softirq)
/*启用了软中断模式, 注册 RCU 的软中断处理函数 rcu_core_si */
open_softirq(RCU_SOFTIRQ, rcu_core_si);

/*
* 注册一个电源管理通知器(pm_notifier),用于在电源状态变化时通知 RCU
*/
pm_notifier(rcu_pm_notify, 0);
WARN_ON(num_online_cpus() > 1); // Only one CPU this early in boot.

/* 完成当前 CPU 的 RCU 初始化。这些函数确保 RCU 数据结构与 CPU 的状态一致 */
rcutree_prepare_cpu(cpu);
rcutree_report_cpu_starting(cpu);
rcutree_online_cpu(cpu);

/* 用于处理 RCU 的加速宽限期(expedited grace periods */
rcu_gp_wq = alloc_workqueue("rcu_gp", WQ_MEM_RECLAIM, 0);
WARN_ON(!rcu_gp_wq);
/* 用于同步操作 */
sync_wq = alloc_workqueue("sync_wq", WQ_MEM_RECLAIM, 0);
WARN_ON(!sync_wq);

/*根据启动参数 qovld 初始化回调队列的过载阈值(qovld_calc)。如果未指定,则使用默认值 */
if (qovld < 0)
qovld_calc = DEFAULT_RCU_QOVLD_MULT * qhimark;
else
qovld_calc = qovld;

/* 启动加速宽限期的轮询机制 */
(void)start_poll_synchronize_rcu_expedited();

/* 测试 RCU 的同步原语 */
rcu_test_sync_prims();

/* 初始化任务回调列表 */
tasks_cblist_init_generic();
}

rcu_scheduler_active 用于跟踪 RCU 调度器的状态

1
2
3
4
5
6
7
8
9
10
11
/*
* rcu_scheduler_active变量初始化为RCU_SCHEDULER_INACTIVE值,并在
* 第一个任务生成之前转换为RCU_SCHEDULER_INIT。因此,当该变量为
* RCU_SCHEDULER_INACTIVE时,RCU可以假设只有一个任务,从而允许
* RCU(例如)将synchronize_rcu()优化为一个简单的屏障()。当该变量为
* RCU_SCHEDULER_INIT时,RCU必须实际完成检测真实宽限期所需的所有
* 繁重工作。该变量还用于抑制来自lockdep-RCU错误检查的引导时
* 误报。最后,在RCU完全初始化后,包括所有k线程已生成后,
* 它从RCU_SCHEDULER_INIT转换为RCU_SCHEDULER_RUNNING。
*/
int rcu_scheduler_active __read_mostly;

rcu_scheduler_starting 在调度器初始化的后期阶段切换 RCU(Read-Copy-Update)子系统的模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
/*
* 此函数在调度器初始化过程的末尾被调用。在调用此函数之前,
* 空闲任务可能包含同步宽限期原语(在此期间,此空闲任务正在
* 启动系统,而这些原语是无操作的)。调用此函数之后,任何同步
* 宽限期原语都将作为加速运行,由请求任务推动宽限期向前推进。
* 稍后的 core_initcall() rcu_set_runtime_mode() 将切换到完整的
* 运行时 RCU 功能。
*/
void rcu_scheduler_starting(void)
{
unsigned long flags;
struct rcu_node *rnp;

/* 确保只有一个 CPU 处于在线状态 */
WARN_ON(num_online_cpus() != 1);
/* 确保没有发生上下文切换 */
WARN_ON(nr_context_switches() > 0);
/* 测试 RCU 的同步原语,确保其在当前模式下正常工作 */
rcu_test_sync_prims();

// 修复 ->gp_seq 计数器。
local_irq_save(flags);
/* 以广度优先的方式遍历 RCU 节点树中的所有节点。
广度优先遍历确保先处理树的较低层级节点,再处理较高层级节点 */
rcu_for_each_node_breadth_first(rnp)
/* gp_seq 是 RCU 节点的 "grace period sequence"(宽限期序列号)。
它表示当前节点的宽限期序列号,用于跟踪 RCU 的宽限期进度
被设置为全局 RCU 状态的 gp_seq 值(rcu_state.gp_seq),
确保所有节点的宽限期序列号与全局状态一致 */
/* gp_seq_needed 表示当前节点所需的宽限期序列号。
它用于记录该节点是否需要参与某个宽限期的处理
被设置为与 gp_seq 相同的值,表示当前节点不需要额外的宽限期处理 */
rnp->gp_seq_needed = rnp->gp_seq = rcu_state.gp_seq;
local_irq_restore(flags);

// 切换出早期启动模式, 表示 RCU 已切换到调度器模式
rcu_scheduler_active = RCU_SCHEDULER_INIT;
/* 测试同步原语,确保切换后的模式正常工作 */
rcu_test_sync_prims();
}

rcu_softirq_qs 在一段禁止抢占或禁止软中断的代码区域内,手动、显式地向RCU子系统报告一个“静止状态(Quiescent State, QS)

  • 它的核心作用是:在一段禁止抢占或禁止软中断的代码区域内,手动、显式地向RCU子系统报告一个“静止状态(Quiescent State, QS)”。

其工作原理可以概括为**“主动宣告空闲”**:

  1. RCU的等待机制: RCU的核心是等待所有CPU都经历过至少一次“静止状态”,然后才能安全地释放被延迟的内存。一个静止状态,本质上是内核能够向RCU保证“当前CPU上已经没有任何正在进行的、旧的RCU读端临界区了”的一个时间点。
  2. 静止状态的通常来源: 在常规代码中,静止状态通常是隐式发生的。例如:
    • 当一个CPU的上下文切换到另一个任务时。
    • 当一个CPU从内核态返回到用户态时。
    • 当一个CPU进入idle状态时。
    • local_bh_disable/enablepreempt_disable/enable配对调用完成时。
  3. 软中断处理的问题: handle_softirqs函数是一个长时间运行的循环,并且在它的主处理循环中,中断是开启的,但软中断是被逻辑上“占用”的(通过local_bh_disable的效果)。ksoftirqd线程更是一个可能长时间运行且不被抢占的内核线程。如果一个软中断处理程序耗时很长,或者有大量的软中断需要处理,那么这个CPU可能会很长一段时间都不经历上述的任何一种常规静止状态。
  4. RCU“停滞”风险: 如果一个CPU长时间不报告静止状态,那么等待这个CPU的RCU回调函数就永远无法被执行,导致内存泄漏或系统功能异常。
  5. rcu_softirq_qs的解决方案: 这个函数就是为了解决上述问题而生的。它允许软中断处理代码在处理完一批工作后,主动地、手动地告诉RCU系统:“嘿,我现在虽然还在软中断上下文中,但我可以向你保证,到目前为止,所有旧的RCU读操作都已经结束了。你可以认为我这里出现了一个静止状态。” 这就打破了RCU的等待,让RCU的回调处理流程可以继续向前推进。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
/**
* rcu_softirq_qs - 在软中断处理中提供一组RCU静止状态
*
* 为RCU、任务RCU(Tasks RCU)和任务追踪RCU(Tasks Trace RCU)标记一个静止状态。
* 这是一个特殊用途的函数,用于软中断基础设施,也可能用于偶尔的
* 长时间运行的软中断处理程序。
*
* 注意,从RCU的视角来看,一次对rcu_softirq_qs()的调用,等价于
* 瞬间地、完全地使能了一次抢占。例如,给定以下代码::
*
* local_bh_disable();
* do_something();
* rcu_softirq_qs(); // A点
* do_something_else();
* local_bh_enable(); // B点
*
* 一个与do_something()调用并发开始的synchronize_rcu()调用,
* 将被保证只需等待到执行到达A点。如果没有那次rcu_softirq_qs()调用,
* 同一个synchronize_rcu()将不得不等待直到执行到达B点。
*/
void rcu_softirq_qs(void)
{
/*
* 这是一个用于锁依赖检测器(lockdep)的警告。它断言在调用本函数时,
* 当前上下文不应该持有任何RCU的读端锁(如rcu_read_lock())。
* 因为报告一个静止状态意味着读端临界区已经结束,如果此时还持有锁,
* 就是一个严重的逻辑错误。
*/
RCU_LOCKDEP_WARN(lock_is_held(&rcu_bh_lock_map) ||
lock_is_held(&rcu_lock_map) ||
lock_is_held(&rcu_sched_lock_map),
"Illegal rcu_softirq_qs() in RCU read-side critical section");

/*
* 为经典的、基于抢占的RCU(Preempt RCU)报告一个静止状态。
* rcu_qs()是“RCU quiescent state”的缩写。
*/
rcu_qs();
/*
* 为可抢占RCU的“延迟静止状态”机制报告一个静止状态。
* 这与一个任务被自愿上下文切换出去有关。
*/
rcu_preempt_deferred_qs(current);
/*
* 为专门用于追踪任务生命周期的Tasks RCU和Tasks Trace RCU
* 报告一个静止状态。
*/
// rcu_tasks_qs(current, false);
}

rcu_core_si 为当前CPU执行RCU核心处理工作(未分析)

  • rcu_core是RCU(Read-Copy Update)子系统的核心周期性处理函数。它在每个CPU上,通常由RCU_SOFTIRQ软中断触发执行。
    它的核心作用是:驱动RCU状态机向前演进。这包括检查静止状态(Quiescent States)、推进宽限期(Grace Periods),以及在宽限期结束后,调用被延迟的RCU回调函数以释放资源。
    可以将其理解为RCU的**“主引擎”或“调度员”**。它不处理单个的RCU事件,而是周期性地审视全局和per-cpu的RCU状态,并据此执行所有必要的推进工作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
/*
* 为当前CPU执行RCU核心处理工作。
* __latent_entropy: GCC属性,表示此函数执行时间的微小变化可以为内核随机数熵池贡献熵。
*/
static __latent_entropy void rcu_core(void)
{
/* flags: 用于在关闭中断时保存中断状态。*/
unsigned long flags;
/* rdp: 指向当前CPU的RCU私有数据结构(rcu_data)。raw_cpu_ptr用于在特殊上下文中安全访问。*/
struct rcu_data *rdp = raw_cpu_ptr(&rcu_data);
/* rnp: 指向rdp所属的RCU层级节点(rcu_node)。*/
struct rcu_node *rnp = rdp->mynode;

/* 如果当前CPU已经离线,则不进行任何RCU处理。*/
if (cpu_is_offline(smp_processor_id()))
return;
/* 记录一条RCU利用率的追踪信息,标记RCU核心处理开始。*/
trace_rcu_utilization(TPS("Start RCU core"));
/* 健壮性检查:确保当前CPU已经被标记为在线过,防止在未初始化状态下被调用。*/
WARN_ON_ONCE(!rdp->beenonline);

/* 如果配置了可抢占内核,并且当前是可抢占的(抢占计数为0)。*/
if (IS_ENABLED(CONFIG_PREEMPT_COUNT) && (!(preempt_count() & PREEMPT_MASK))) {
/* 则直接调用rcu_preempt_deferred_qs处理任何待处理的延迟静止状态。*/
rcu_preempt_deferred_qs(current);
/* 否则,如果虽然不可抢占,但确实有延迟静止状态需要处理。*/
} else if (rcu_preempt_need_deferred_qs(current)) {
/* 则设置重新调度标志,请求一次调度。在调度(上下文切换)的路径上,这个延迟QS会被安全地处理。*/
set_tsk_need_resched(current);
set_preempt_need_resched();
}

/* 基于任何最近的静止状态来更新RCU状态。这是推进宽限期状态机的核心调用。*/
rcu_check_quiescent_state(rdp);

/* 当前没有宽限期在进行,并且有已注册的回调函数,并且回调处理没有被卸载到kthread?*/
if (!rcu_gp_in_progress() &&
rcu_segcblist_is_enabled(&rdp->cblist) && !rcu_rdp_is_offloaded(rdp)) {
/* 在中断关闭的情况下,安全地检查回调链表。*/
local_irq_save(flags);
/* 如果链表中“下一个待处理”段不为空,说明有回调在等待。*/
if (!rcu_segcblist_restempty(&rdp->cblist, RCU_NEXT_READY_TAIL))
/* 调用此函数,尝试加速启动一个新的宽限期。*/
rcu_accelerate_cbs_unlocked(rnp, rdp);
local_irq_restore(flags);
}

/* 检查宽限期的启动是否被卡住(stall)了太长时间。*/
rcu_check_gp_start_stall(rnp, rdp, rcu_jiffies_till_stall_check());

/* 如果有就绪的回调函数,并且RCU调度器已完全激活,则调用它们。*/
if (!rcu_rdp_is_offloaded(rdp) && rcu_segcblist_ready_cbs(&rdp->cblist) &&
likely(READ_ONCE(rcu_scheduler_fully_active))) {
/* 调用rcu_do_batch,以批处理的方式执行所有已到期的回调函数。*/
rcu_do_batch(rdp);
/* 如果执行完一批后,仍然有就绪的回调函数。*/
if (rcu_segcblist_ready_cbs(&rdp->cblist))
/* 再次调用invoke_rcu_core(),这会再次触发RCU_SOFTIRQ,以尽快处理下一批。*/
invoke_rcu_core();
}

/* 为无回调(nocb)模式,执行任何需要的延迟唤醒rcuo kthread的操作。*/
do_nocb_deferred_wakeup(rdp);
/* 记录RCU核心处理结束的追踪信息。*/
trace_rcu_utilization(TPS("End RCU core"));

/* 如果配置了严格的宽限期模式,则将一个工作项排队,在一个干净的环境中调度一个RCU读者。*/
if (IS_ENABLED(CONFIG_RCU_STRICT_GRACE_PERIOD))
queue_work_on(rdp->cpu, rcu_gp_wq, &rdp->strict_work);
}

static void rcu_core_si(void)
{
rcu_core();
}

rcu_sched_clock_irq (未分析)