[TOC]

kernel/rcu Read-Copy Update (RCU) 高性能的内核同步机制

历史与背景

这项技术是为了解决什么特定问题而诞生的?

Read-Copy Update (RCU) 技术的诞生,是为了解决在多核处理器系统中,对**读多写少(Read-mostly)**类型共享数据进行同步时遇到的性能瓶颈问题。

传统的同步机制,如读写锁(Reader-Writer Locks),虽然允许多个读者同时访问数据,但存在以下问题:

  1. 读者开销:即使是读取操作,读者也必须获取一个锁(或至少执行原子操作来增加引用计数)。在多核环境下,这会导致缓存行在CPU之间频繁“弹跳”(cache line bouncing),造成严重的性能下降和扩展性问题。
  2. 锁争用:当写者需要获取锁时,它可能会被已有的读者阻塞,或者需要等待新的读者完成。反之,写者持有锁时,所有读者都必须等待。这种争用在高并发读取场景下尤为突出。

RCU 的核心目标就是让读者几乎零开销(wait-free)。它允许读者在访问数据时完全不需要获取任何锁、自旋锁或执行原子指令,从而消除了读者端的同步开销和扩展性瓶颈,极大地提升了读取性能。

它的发展经历了哪些重要的里程碑或版本迭代?

RCU 最初在DYNIX/ptx操作系统中被提出,由Paul E. McKenney等人引入并推广到Linux内核。

  • 经典RCU (Classic RCU):这是最早被合并到Linux内核的RCU实现。它要求RCU读侧临界区(read-side critical section)必须是不可抢占的,并且不能在其中睡眠。
  • 可抢占RCU (Preemptible RCU):为了解决经典RCU中读侧临界区禁止抢占对系统延迟造成的影响,内核引入了可抢占RCU。它允许在RCU读侧临界区中被抢占,通过更复杂的追踪机制来确定宽限期(Grace Period)。这对于桌面和实时系统至关重要。
  • 可睡眠RCU (Sleepable RCU, SRCU):经典RCU的读侧临界区不允许睡眠。为了满足某些需要在持有“锁”期间睡眠的场景(例如,需要从用户空间拷贝数据),SRCU被发明出来。它有自己独立的API和宽限期管理,代价是比常规RCU稍慢。
  • Tree RCU:随着CPU核心数量的爆炸式增长(成百上千个核心),原始RCU中用于检测宽限期的全局状态机成为了新的瓶颈。Tree RCU将CPU组织成一棵树,状态查询和更新在树的层级中进行,极大地提高了在超大规模多核系统上的扩展性。

目前该技术的社区活跃度和主流应用情况如何?

RCU是现代Linux内核中最核心、最基础的同步机制之一,其重要性与自旋锁、互斥锁相当。

  • 主流应用:它被内核中对性能要求最高的、读多写少的子系统广泛使用,包括:
    • 网络子系统:路由表的查找(读远多于更新)。
    • 虚拟文件系统 (VFS):目录项缓存(dcache)的查找。
    • 进程管理:通过PID查找task_struct
    • 模块管理:内核模块列表的遍历。
    • 安全子系统:如SELinux的访问向量缓存(AVC)。

核心原理与设计

它的核心工作原理是什么?

RCU的原理可以概括为:通过延迟销毁来换取无锁读取。它将更新过程分解为**移除(Removal)回收(Reclamation)**两个阶段。

  1. 读者 (Readers)

    • 读者通过 rcu_read_lock()rcu_read_unlock() 来界定一个读侧临界区
    • 这两个函数在大多数RCU变体中不会产生任何锁操作。它们通常只是禁用/启用抢占或做一些标记,以告知RCU的回收机制:“我正在读取,请不要释放我可能正在引用的数据”。
    • 在临界区内,读者通过 rcu_dereference() 安全地获取一个指向共享数据的指针,并可以自由地访问该数据。
  2. 更新者 (Updaters / Writers)

    • 当需要修改数据时,更新者不会在原地修改,而是:
      a. 复制 (Copy):创建一个要修改数据的新副本。
      b. 更新 (Update):在新副本上进行所有修改。
      c. 发布 (Publish):使用 rcu_assign_pointer() 等原子操作,将指向共享数据的全局指针切换到指向新的、修改后的副本。这一步是原子的,保证了读者要么看到旧数据,要么看到新数据,绝不会看到修改到一半的数据。
  3. 内存回收 (Reclamation) - RCU的精髓

    • 在发布新数据后,旧数据不能被立即释放,因为可能还有读者正在其上操作。
    • 更新者需要等待一个宽限期 (Grace Period) 过去。
    • 宽限期被定义为:从更新者发起回收请求开始,到系统中所有CPU都至少经历过一次静止状态 (Quiescent State) 的时间段。
    • 静止状态是指CPU可以保证它不再持有任何在宽限期开始之前获取的RCU保护的引用的状态(例如,发生了上下文切换、进入了idle状态或执行到用户空间)。
    • 一旦一个宽限期结束,RCU机制就可以保证,所有在更新发生之前进入读侧临界区的读者,现在肯定已经退出了。因此,此时释放旧数据是安全的。
    • 更新者通过调用 synchronize_rcu() (阻塞等待宽限期结束) 或 call_rcu() (注册一个在宽限期结束后执行的回调函数来释放内存) 来触发这个回收过程。

它的主要优势体现在哪些方面?

  • 极高的读取性能:读侧几乎没有同步开销,提供了极佳的性能和可扩展性。
  • 无死锁:读者不会被阻塞,因此不会与更新者或其他读者产生死锁。
  • 实时性友好:读侧的无锁特性使其非常适合实时系统(特别是可抢占RCU)。

它存在哪些已知的劣势、局限性或在特定场景下的不适用性?

  • 写者开销:更新操作相对复杂,并且 synchronize_rcu() 会阻塞,可能引入较长的延迟。
  • 内存延迟回收:内存的释放被推迟到宽限期之后,这可能会在写操作频繁的场景下导致瞬时内存占用增加。
  • 使用限制
    • 经典RCU的读侧临界区内不允许睡眠。
    • RCU主要保护的是通过指针访问的动态分配数据,对于需要原地修改的大块数据结构(如数组)不适用。
  • 编程复杂性:正确使用RCU比使用传统的锁要复杂,需要开发者理解其背后的原理。

使用场景

在哪些具体的业务或技术场景下,它是首选解决方案?

RCU是读多写少场景下无可争议的性能王者。

  • 网络路由表:数据包转发(读取路由表)的频率远高于路由更新(写入路由表)。使用RCU可以确保数据包转发路径几乎没有同步开销。
  • VFS目录项缓存:文件路径查找(lookup)是极其频繁的读取操作,而文件重命名或删除是相对较少的写入操作。
  • IDR/IDA机制:内核中用于将一个整数ID映射到一个指针的机制,其查找操作由RCU保护。

是否有不推荐使用该技术的场景?为什么?

  • 写密集型负载:如果写入操作和读取操作一样频繁,甚至更频繁,那么RCU的写者开销和内存延迟回收的缺点会变得非常突出。此时,使用自旋锁或读写锁可能更合适。
  • 需要原地修改:当共享数据是一个大数组,且更新操作只是修改其中几个元素时,为整个数组创建副本的开销是不可接受的。
  • 读者需要长时间睡眠:如果读者在临界区内需要长时间睡眠,即使是SRCU,也可能导致宽限期被过度延长,从而使待释放的内存大量堆积。

对比分析

请将其 与 其他相似技术 进行详细对比。

特性 RCU (Read-Copy Update) 读写锁 (rwlock_t) Seqlocks (顺序锁)
读者开销 几乎为零。通常只是禁用抢占或内存屏障。 中等。需要执行原子操作来获取读锁。 。读取一个序列号,但可能需要重试。
写者开销 。需要复制数据并等待宽限期。 中等。需要获取排他锁。 。只需要获取一个自旋锁并更新序列号。
读-写并发 允许。读者和写者可以完全并发执行。 不允许。写者会阻塞所有读者,反之亦然。 允许 (乐观地)。但如果读期间有写,读者必须重试。
写-写并发 不允许。写者之间需要外部锁来同步。 不允许 不允许
饥饿问题 读者永远不会饿死写者。 写者可能饿死读者,反之亦然。 写者可能饿死读者 (读者不断重试)。
适用场景 读远多于写的指针链接数据结构。 读多于写,但对写性能和内存占用有一定要求。 读非常多,写非常少,且数据结构简单(通常是些数值)。

include/linux/rcupdate.h

rcu_read_lock 标记 RCU(Read-Copy-Update)读侧临界区的开始

这段代码定义了 rcu_read_lock() 函数,并通过详细的注释解释了其功能和使用规则。rcu_read_lock() 是一种内核同步机制,用于标记 RCU(Read-Copy-Update)读侧临界区的开始。RCU 是 Linux 内核中一种高效的同步机制,允许读操作与写操作并发进行,同时确保数据一致性。


功能概述

rcu_read_lock() 的主要作用是保护 RCU 读侧临界区,使得在该区域内的代码可以安全地访问共享数据,而无需担心数据被其他线程修改或释放。它与 synchronize_rcu()call_rcu() 配合使用,确保写操作不会在读侧临界区未完成时破坏数据一致性。例如:

  • synchronize_rcu() 被调用时,它会阻塞,直到所有 CPU 都退出其 RCU 读侧临界区。
  • call_rcu() 被调用时,RCU 回调函数的执行会被延迟,直到所有 CPU 的 RCU 读侧临界区结束。

并发行为

RCU 的一个重要特性是,RCU 回调函数可以与新的 RCU 读侧临界区并发运行。这种行为通过以下场景说明:

  1. CPU 0 进入 RCU 读侧临界区。
  2. CPU 1 调用 call_rcu() 注册一个 RCU 回调。
  3. CPU 0 退出 RCU 读侧临界区。
  4. CPU 2 进入新的 RCU 读侧临界区。
  5. RCU 回调被执行。

这种设计确保了高效的并发性,同时保证了在回调执行前,所有可能引用被释放资源的读侧临界区都已完成。


嵌套支持

RCU 读侧临界区可以嵌套使用。只有当最外层的 RCU 读侧临界区结束时,任何延迟的操作(如回调执行)才会被处理。这种嵌套支持为复杂的代码路径提供了灵活性。


使用限制

注释中强调了在不同类型的 RCU 实现中,RCU 读侧临界区的限制:

  1. 在非抢占式 RCU 实现(如 TREE_RCU 和 TINY_RCU)中,禁止在 RCU 读侧临界区内阻塞。
  2. 在支持抢占的 RCU 实现(PREEMPT_RCU)中,读侧临界区可以被抢占,但显式阻塞仍然是非法的。
  3. 在实时内核(带有 -rt 补丁)的 RCU 实现中,读侧临界区可以被抢占并阻塞,但仅限于获取支持优先级继承的自旋锁时。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
static void rcu_preempt_read_enter(void)
{
WRITE_ONCE(current->rcu_read_lock_nesting, READ_ONCE(current->rcu_read_lock_nesting) + 1);
}

/*
* rcu_read_lock() 的抢占式 RCU 实现。
* 只需增加 ->rcu_read_lock_nesting,如果我们阻止,共享状态将更新。
*/
void __rcu_read_lock(void)
{
rcu_preempt_read_enter();
if (IS_ENABLED(CONFIG_PROVE_LOCKING))
WARN_ON_ONCE(rcu_preempt_depth() > RCU_NEST_PMAX);
if (IS_ENABLED(CONFIG_RCU_STRICT_GRACE_PERIOD) && rcu_state.gp_kthread)
WRITE_ONCE(current->rcu_read_unlock_special.b.need_qs, true);
barrier(); /* critical section after entry code. */
}

static __always_inline void rcu_read_lock(void)
{
__rcu_read_lock();
__acquire(RCU);
rcu_lock_acquire(&rcu_lock_map);
RCU_LOCKDEP_WARN(!rcu_is_watching(),
"rcu_read_lock() used illegally while idle");
}

rcu_read_unlock 标记 RCU 读侧临界区的结束

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
static int rcu_preempt_read_exit(void)
{
int ret = READ_ONCE(current->rcu_read_lock_nesting) - 1;

WRITE_ONCE(current->rcu_read_lock_nesting, ret);
return ret;
}


/*
* rcu_read_unlock() 的抢占式 RCU 实现。递减 ->rcu_read_lock_nesting。
* 如果结果为零(最外层的 rcu_read_unlock())且 ->rcu_read_unlock_special 为非零,则调用 rcu_read_unlock_special() 在 RCU 读取端关键部分和其他特殊情况下进行上下文切换后进行清理。
*/
void __rcu_read_unlock(void)
{
struct task_struct *t = current;

barrier(); // critical section before exit code.
if (rcu_preempt_read_exit() == 0) {
barrier(); // critical-section exit before .s check.
/* 此时存在特殊标志(rcu_read_unlock_special),则调用 rcu_read_unlock_special() 进行清理操作 */
if (unlikely(READ_ONCE(t->rcu_read_unlock_special.s)))
rcu_read_unlock_special(t);
}
if (IS_ENABLED(CONFIG_PROVE_LOCKING)) {
int rrln = rcu_preempt_depth();

WARN_ON_ONCE(rrln < 0 || rrln > RCU_NEST_PMAX);
}
}

/*
* RCU 并没有提供 rcu_write_lock(),因为 RCU 的设计目标是允许读操作和写操作并发运行,而不需要写操作阻塞读操作。
* 这种设计是 RCU 高性能的核心原因之一。
* 虽然 RCU 不负责协调写操作之间的冲突,但开发者可以使用自旋锁(spinlock)或其他同步机制来确保写操作的互斥性。R
* RCU 的重点是提供高效的读操作,而写操作的同步由开发者自行管理。
*/

/**
* rcu_read_unlock() 的主要作用是标记 RCU 读侧临界区的结束。它通知 RCU 机制,当前线程已完成对共享数据的访问。与 rcu_read_lock() 一样,rcu_read_unlock() 的设计也考虑了高效性和安全性:

它几乎不会引发死锁,因为它的实现避免了与调度器运行队列和优先级继承自旋锁的冲突。
即使在中断被禁用的情况下调用,rcu_read_unlock() 也能通过延迟进入静止状态(quiescent state)来避免潜在问题。
*/
static inline void rcu_read_unlock(void)
{
RCU_LOCKDEP_WARN(!rcu_is_watching(),
"rcu_read_unlock() used illegally while idle");
rcu_lock_release(&rcu_lock_map); /* Keep acq info for rls diags. */
__release(RCU);
__rcu_read_unlock();
}

rcu_assign_pointer RCU写入更新

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
* RCU_INITIALIZER() - statically initialize an RCU-protected global variable
* @v: The value to statically initialize with.
*/
#define RCU_INITIALIZER(v) (typeof(*(v)) __force __rcu *)(v)

/**
* rcu_assign_pointer() - 分配给受 RCU 保护的指针
* @p:要分配给的指针
* @v:要分配的值(发布)
*
* 将指定的值分配给指定的受 RCU 保护的指针,确保任何并发 RCU 读取器都可以看到任何先前的初始化。
*
* 在需要内存屏障的体系结构(大多数体系结构)上插入内存屏障,并且还阻止编译器在指针赋值后对初始化结构的代码重新排序。 更重要的是,此调用记录了 RCU 读取端代码将取消引用哪些指针。
*
* 在某些特殊情况下,您可以使用 RCU_INIT_POINTER() 而不是 rcu_assign_pointer()。
RCU_INIT_POINTER() 的速度要快一些,因为它既不约束 CPU 也不约束编译器。也就是说,在你应该使用 RCU_INIT_POINTER() 的情况下使用 rcu_assign_pointer() 是一件非常糟糕的事情,会导致无法诊断的内存损坏。 所以请小心。有关详细信息,请参阅 RCU_INIT_POINTER() 注释标头。
*
* 请注意,rcu_assign_pointer() 只计算其每个参数一次,尽管表面上如此。 其中一个 “额外” 评估在 typeof() 中,另一个仅对 sparse (__CHECKER__) 可见,这两个值实际上都不执行参数。 与大多数 cpp 宏一样,此 execute-arguments-only-once 属性很重要,因此在更改 rcu_assign_pointer() 及其调用的其他宏时请小心。
*/
#define rcu_assign_pointer(p, v) \
do { \
uintptr_t _r_a_p__v = (uintptr_t)(v); \
rcu_check_sparse(p, __rcu); \
\
if (__builtin_constant_p(v) && (_r_a_p__v) == (uintptr_t)NULL) \
WRITE_ONCE((p), (typeof(p))(_r_a_p__v)); \
else \
/* smp_store_release 是一个内存屏障操作,用于确保在赋值之前的所有内存操作都已完成 */
smp_store_release(&p, RCU_INITIALIZER((typeof(p))_r_a_p__v)); \
} while (0)

synchronize_rcu RCU同步

  • 用于等待一个 RCU 宽限期(grace period)结束。宽限期的结束意味着所有当前正在执行的 RCU 读侧临界区(RCU read-side critical sections)都已经完成
  • 函数目的: synchronize_rcu 的主要作用是确保在调用它之前开始的所有 RCU 读侧临界区都已完成。这对于需要安全地释放或修改被 RCU 保护的数据结构的写侧代码至关重要。调用该函数后,调用者可以确信没有任何 RCU 读侧代码仍在访问相关数据。
  • RCU 读侧临界区由 rcu_read_lock() 和 rcu_read_unlock() 包围,或者在某些情况下由禁用中断、软中断或抢占的代码段隐式定义。宽限期的结束意味着所有这些读侧临界区都已退出。
  • synchronize_rcu 提供了强大的内存排序保证:在多 CPU 系统中,当 synchronize_rcu 返回时,每个 CPU 都会在其最后一个 RCU 读侧临界区结束后执行一个完整的内存屏障。
    如果某个 CPU 的 RCU 读侧临界区在 synchronize_rcu 返回后仍在运行,则该 CPU 必须在 synchronize_rcu 开始和该读侧临界区开始之间执行一个完整的内存屏障。
    即使在单 CPU 系统中,调用 synchronize_rcu 的 CPU 也会在函数执行期间执行一个完整的内存屏障。
    这些内存屏障确保了数据的正确性和一致性,特别是在并发环境中。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
void synchronize_rcu(void)
{
unsigned long flags;
struct rcu_node *rnp;

RCU_LOCKDEP_WARN(lock_is_held(&rcu_bh_lock_map) ||
lock_is_held(&rcu_lock_map) ||
lock_is_held(&rcu_sched_lock_map),
"Illegal synchronize_rcu() in RCU read-side critical section");
if (!rcu_blocking_is_gp()) {
if (rcu_gp_is_expedited())
synchronize_rcu_expedited();
else
synchronize_rcu_normal();
return;
}

/* 上下文允许空洞的宽限期。
请注意,此代码使用 !抢占 & & !SMP 的。
此外,所有推进宽限期的代码都在进程级别运行。 因此,这个正常的 GP 与其他正常的 GP 重叠,只是因为它们完全嵌套在它们中,这允许重复使用 ->gp_seq_polled_snap。 */
rcu_poll_gp_seq_start_unlocked(&rcu_state.gp_seq_polled_snap);
rcu_poll_gp_seq_end_unlocked(&rcu_state.gp_seq_polled_snap);

/* 更新正常的 grace-period 计数器以记录此宽限期,但仅记录引导 CPU 使用的宽限期。
rcu_scheduler_starting() 将处理其余的这些计数器。 */
local_irq_save(flags);
WARN_ON_ONCE(num_online_cpus() > 1);
/* 记录了一个新的宽限期的开始。这种设计允许宽限期序列号以高效的方式递增,同时为其他 RCU 操作提供一致性保证 */
rcu_state.gp_seq += (1 << RCU_SEQ_CTR_SHIFT);
/* 遍历当前 CPU 的 RCU 数据结构(rcu_data)中的节点链表(mynode),
将每个节点的 gp_seq 和 gp_seq_needed 更新为全局宽限期序列号。
这些计数器用于跟踪每个节点的宽限期状态,确保 RCU 的同步机制能够正确地覆盖所有节点
this_cpu_ptr(&rcu_data): 获取当前 CPU 的 RCU 数据指针。
mynode: 指向当前 CPU 所属的 RCU 节点。
rnp->parent: 遍历节点的父节点,直到根节点。
*/
for (rnp = this_cpu_ptr(&rcu_data)->mynode; rnp; rnp = rnp->parent)
rnp->gp_seq_needed = rnp->gp_seq = rcu_state.gp_seq;
local_irq_restore(flags);
}
EXPORT_SYMBOL_GPL(synchronize_rcu);

rcu_preempt_depth 获取 RCU 预处理嵌套深度

1
2
3
4
5
6
7
/*
* Defined as a macro as it is a very low level header included from
* areas that don't even know about current. This gives the rcu_read_lock()
* nesting depth, but makes sense only if CONFIG_PREEMPT_RCU -- in other
* types of kernel builds, the rcu_read_lock() nesting depth is unknowable.
*/
#define rcu_preempt_depth() READ_ONCE(current->rcu_read_lock_nesting)

include/linux/rculist.h

__list_add_rcu 插入新条目到 RCU 保护的链表中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
/*
* Insert a new entry between two known consecutive entries.
*
* This is only for internal list manipulation where we know
* the prev/next entries already!
*/
static inline void __list_add_rcu(struct list_head *new,
struct list_head *prev, struct list_head *next)
{
if (!__list_add_valid(new, prev, next))
return;

new->next = next;
new->prev = prev;
rcu_assign_pointer(list_next_rcu(prev), new);
next->prev = new;
}

/**
* list_add_rcu - add a new entry to rcu-protected list
* @new: new entry to be added
* @head: list head to add it after
*
* Insert a new entry after the specified head.
* This is good for implementing stacks.
*
* The caller must take whatever precautions are necessary
* (such as holding appropriate locks) to avoid racing
* with another list-mutation primitive, such as list_add_rcu()
* or list_del_rcu(), running on this same list.
* However, it is perfectly legal to run concurrently with
* the _rcu list-traversal primitives, such as
* list_for_each_entry_rcu().
*/
static inline void list_add_rcu(struct list_head *new, struct list_head *head)
{
__list_add_rcu(new, head, head->next);
}

/**
* list_add_tail_rcu - add a new entry to rcu-protected list
* @new: new entry to be added
* @head: list head to add it before
*
* Insert a new entry before the specified head.
* This is useful for implementing queues.
*
* The caller must take whatever precautions are necessary
* (such as holding appropriate locks) to avoid racing
* with another list-mutation primitive, such as list_add_tail_rcu()
* or list_del_rcu(), running on this same list.
* However, it is perfectly legal to run concurrently with
* the _rcu list-traversal primitives, such as
* list_for_each_entry_rcu().
*/
static inline void list_add_tail_rcu(struct list_head *new,
struct list_head *head)
{
__list_add_rcu(new, head->prev, head);
}

include/linux/rcuwait.h

___rcuwait_wait_event 阻塞等待条件满足与状态满足

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
void finish_rcuwait(struct rcuwait *w)
{
rcu_assign_pointer(w->task, NULL);
__set_current_state(TASK_RUNNING);
}

/*
* The caller is responsible for locking around rcuwait_wait_event(),
* and [prepare_to/finish]_rcuwait() such that writes to @task are
* properly serialized.
*/

static inline void prepare_to_rcuwait(struct rcuwait *w)
{
rcu_assign_pointer(w->task, current);
}

#define ___rcuwait_wait_event(w, condition, state, ret, cmd) \
({ \
long __ret = ret; \
prepare_to_rcuwait(w); \
for (;;) { \
/* \
* Implicit barrier (A) pairs with (B) in \
* rcuwait_wake_up(). \
*/ \
set_current_state(state); \
if (condition) \
break; \
\
if (signal_pending_state(state, current)) { \
__ret = -EINTR; \
break; \
} \
\
cmd; \
} \
finish_rcuwait(w); \
__ret; \
})

rcuwait_wait_event 阻塞等待条件满足与状态满足

1
2
#define rcuwait_wait_event(w, condition, state)				\
___rcuwait_wait_event(w, condition, state, 0, schedule())

rcuwait_init 初始化 rcuwait 结构体

1
2
3
4
static inline void rcuwait_init(struct rcuwait *w)
{
w->task = NULL;
}

kernel/rcu/rcu.h

rcu_init_levelspread 计算每层的扇出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/*
* Compute the per-level fanout, either using the exact fanout specified
* or balancing the tree, depending on the rcu_fanout_exact boot parameter.
*/
static inline void rcu_init_levelspread(int *levelspread, const int *levelcnt)
{
int i;

for (i = 0; i < RCU_NUM_LVLS; i++)
levelspread[i] = INT_MIN;
if (rcu_fanout_exact) {
levelspread[rcu_num_lvls - 1] = rcu_fanout_leaf;
for (i = rcu_num_lvls - 2; i >= 0; i--)
levelspread[i] = RCU_FANOUT;
} else {
/* 如果 rcu_fanout_exact 未设置,则根据 CPU 数量和每层的节点数量动态平衡树的结构 */
int ccur;
int cprv;
/* 从最底层开始,逐层向上计算每层的扇出值。
使用公式 (cprv + ccur - 1) / ccur 计算扇出值,其中:
cprv 表示上一层的节点数量。
ccur 表示当前层的节点数量。
该公式确保扇出值向上取整,避免节点不足。 */
cprv = nr_cpu_ids;
for (i = rcu_num_lvls - 1; i >= 0; i--) {
ccur = levelcnt[i];
levelspread[i] = (cprv + ccur - 1) / ccur;
cprv = ccur;
}
}
}

rcu_seq_done_exact 精确判断 RCU(Read-Copy-Update)序列号是否已经完成更新操作

1
2
3
4
5
6
7
8
9
10
11
12
13
/*
* 根据来自 rcu_seq_snap() 的快照,确定是否发生了完整的更新操作,但不允许使用 (ULONG_MAX / 2) 的安全因子/防护带。
* 该函数通过比较当前序列号和给定的快照序列号,确定是否已经发生了完整的更新操作
*/
static inline bool rcu_seq_done_exact(unsigned long *sp, unsigned long s)
{
unsigned long cur_s = READ_ONCE(*sp);
/* 如果当前序列号 cur_s 大于或等于快照序列号 s
* 如果当前序列号 cur_s 小于 s - (3 * RCU_SEQ_STATE_MASK + 1)(ULONG_CMP_LT(cur_s, s - (3 * RCU_SEQ_STATE_MASK + 1))),
* 则也认为更新操作已完成。这种情况通常用于处理序列号的回绕(wraparound)
*/
return ULONG_CMP_GE(cur_s, s) || ULONG_CMP_LT(cur_s, s - (3 * RCU_SEQ_STATE_MASK + 1));
}

rcu_for_each_node_breadth_first 对 rcu_node 结构进行广度优先遍历

1
2
3
4
5
6
7
8
9
10
11
/*
* 对指定状态结构(用于SRCU)或唯一的rcu_state结构(用于RCU)
* 的{s,}rcu_node结构进行全面的广度优先扫描。
*/
#define _rcu_for_each_node_breadth_first(sp, rnp) \
/* 从数组的第一个节点 */
for ((rnp) = &(sp)->node[0]; \
/* 直到最后一个节点 sp->node[rcu_num_nodes - 1] */
(rnp) < &(sp)->node[rcu_num_nodes]; (rnp)++)
#define rcu_for_each_node_breadth_first(rnp) \
_rcu_for_each_node_breadth_first(&rcu_state, rnp)

kernel/rcu/update.c

rcu_set_runtime_mode 设置运行模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/*
* Test each non-SRCU synchronous grace-period wait API. This is
* useful just after a change in mode for these primitives, and
* during early boot.
*/
void rcu_test_sync_prims(void)
{
if (!IS_ENABLED(CONFIG_PROVE_RCU))
return;
pr_info("Running RCU synchronous self tests\n");
synchronize_rcu();
synchronize_rcu_expedited();
}

#if !defined(CONFIG_TINY_RCU)

/*
* Switch to run-time mode once RCU has fully initialized.
*/
static int __init rcu_set_runtime_mode(void)
{
rcu_test_sync_prims();
rcu_scheduler_active = RCU_SCHEDULER_RUNNING;
kfree_rcu_scheduler_running();
rcu_test_sync_prims();
return 0;
}
core_initcall(rcu_set_runtime_mode);

rcu_test_sync_prims 测试 RCU 同步原语

1
2
3
4
5
6
7
8
9
10
11
12
/*
* 测试每个非SRCU同步宽限期等待API。
* 这在这些原语的模式更改后以及早期启动期间非常有用。
*/
void rcu_test_sync_prims(void)
{
if (!IS_ENABLED(CONFIG_PROVE_RCU))
return;
pr_info("Running RCU synchronous self tests\n");
synchronize_rcu();
synchronize_rcu_expedited();
}

kernel/rcu/sync.c RCU同步等待(RCU Synchronous Waiting) 实现优雅周期完成的阻塞机制

历史与背景

这项技术是为了解决什么特定问题而诞生的?

这项技术是RCU(Read-Copy Update)机制的核心组成部分,它为了解决一个根本性的问题:当一个写者(Updater)用新数据替换了旧数据后,它必须等待多长时间才能安全地释放(free)旧数据的内存?

RCU的核心思想是允许读者在访问共享数据时无需获取任何锁,从而实现极高的读性能和伸缩性。但这也带来了内存回收的难题:当一个写者通过原子操作(如指针替换)更新了数据后,可能仍有读者正在引用旧版本的数据。如果写者立即释放旧数据,这些读者就会遇到“使用已释放内存”(Use-After-Free)的严重错误。

kernel/rcu/sync.c 中实现的 synchronize_rcu() 及其变体,就是为了提供一个同步的、阻塞式的等待机制,来解决这个问题。它能保证当函数返回时,所有在更新操作之前就已经存在的读者(pre-existing readers)都已经完成了它们的读侧临界区。这个等待的“时间段”被称为优雅周期(Grace Period)

它的发展经历了哪些重要的里程碑或版本迭代?

synchronize_rcu() 的发展与RCU机制本身的演进紧密相连,由Paul E. McKenney主导。

  • 经典RCU (Classic RCU):早期的RCU实现中,优雅周期的检测相对简单,主要依赖于观察所有CPU是否都经历了一次上下文切换。
  • 抢占式RCU (Preemptible RCU):为了满足实时性需求,内核引入了CONFIG_PREEMPT_RCU。这使得在RCU读侧临界区内也可以被抢占。这极大地复杂化了优雅周期的检测,因为一个持有旧数据引用的线程可能被抢占并长时间睡眠。sync.c的内部逻辑需要与更复杂的RCU状态机(如dytick-idle处理、CPU热插拔等)进行交互来正确地检测优雅周期的结束。
  • 性能优化synchronize_rcu() 本身可能成为性能瓶颈,如果被频繁调用,会引发所谓的“GP-storm”(优雅周期风暴)。内核开发者们对RCU的批处理(batching)机制进行了大量优化,使得多个synchronize_rcu()的请求可以被合并到同一个优雅周期中,从而降低开销。

目前该技术的社区活跃度和主流应用情况如何?

synchronize_rcu() 是Linux内核中一个基础、稳定且被广泛使用的同步原语。RCU是内核数据结构保护的基石之一,而synchronize_rcu()是RCU提供的两种核心等待机制之一(另一种是异步的call_rcu())。它的代码库非常稳定,社区的活跃度主要体现在对RCU整体性能的优化、以及对新硬件架构和内核特性的适应性调整上。它被内核中大量需要在可睡眠上下文中进行RCU同步的子系统所使用。

核心原理与设计

它的核心工作原理是什么?

synchronize_rcu() 的核心原理是发起并等待一个完整的RCU优雅周期的结束

一个RCU优雅周期的定义是:从某个时间点开始,直到系统中所有CPU都至少经历了一次**静止状态(Quiescent State)**为止的这段时间。静止状态是指CPU保证不再持有任何在该优雅周期开始前获取的RCU读侧临界区引用的状态。常见的静止状态包括:

  • 上下文切换
  • 进入idle循环
  • 从内核态返回用户态

synchronize_rcu() 的工作流程如下:

  1. 调用:当一个内核线程调用 synchronize_rcu() 时,它向RCU核心子系统注册一个“我需要等待一个优雅周期”的请求。
  2. 阻塞:该线程随后被置于睡眠(TASK_UNINTERRUPTIBLE)状态并从CPU上调度出去。
  3. 优雅周期检测:RCU核心的状态机在后台持续工作。它会检测并记录每个CPU是否已经通过了一个静止状态。
  4. 优雅周期结束:当RCU核心确认所有在线CPU都已经历过至少一次静止状态后,它就认为一个优雅周期已经结束。
  5. 唤醒:RCU核心会唤醒所有正在等待该优雅周期结束的线程,包括那个调用 synchronize_rcu() 的线程。
  6. 返回synchronize_rcu() 函数返回,此时调用者可以确信,在它调用该函数之前进入RCU读临界区的所有读者都已经退出了,因此现在释放旧数据是安全的。

它的主要优势体现在哪些方面?

  • 接口简单:提供了一个简单明了的阻塞式API。开发者只需像调用mutex_lock()一样调用它,无需处理复杂的回调逻辑。
  • 编程模型直接:代码流程是线性的。等待完成后,代码继续向下执行,这符合大多数同步场景的直观逻辑。
  • 健壮性:由RCU核心保证了其正确性,为内核开发者提供了一个强大而可靠的内存回收屏障。

它存在哪些已知的劣势、局限性或在特定场景下的不适用性?

  • 阻塞行为:这是其最主要的限制。调用synchronize_rcu()的线程会睡眠,等待时间可能很长(通常是毫秒级,但可能更长),具体取决于系统中其他任务的行为。
  • 上下文限制:由于会引起睡眠,synchronize_rcu() 绝对不能在原子上下文(atomic context)中使用,例如:
    • 中断处理程序 (Interrupt handlers)
    • 软中断 (Softirqs)
    • 持有自旋锁 (spinlock) 的临界区
  • 性能开销:频繁地调用synchronize_rcu()会给RCU子系统带来压力,可能导致系统吞吐量下降。

使用场景

在哪些具体的业务或技术场景下,它是首选解决方案?请举例说明。

它适用于那些需要在可睡眠的(preemptible)内核上下文中,进行RCU保护的数据结构更新后,必须同步等待直到可以安全释放旧资源才能继续下一步操作的场景。

  • 内核模块卸载:这是synchronize_rcu()最经典的用例。当一个内核模块被卸载时(rmmod),其module_exit函数必须确保没有其他内核线程仍在使用该模块提供的回调函数或数据结构(这些通常是用RCU保护的)。因此,module_exit函数在释放模块内存之前,通常会调用synchronize_rcu(),以确保所有之前的“用户”都已经离开。
  • 网络协议栈中的路由表更新:当一个路由条目被删除时,内核需要确保所有正在使用该旧路由条目进行数据包转发的CPU都已经完成操作,然后才能释放该路由条目的内存。如果删除操作是在一个可以睡眠的进程上下文中进行的,就可以使用synchronize_rcu()
  • 某些配置的同步更新:当需要更改一个全局配置,并且后续的初始化步骤依赖于旧配置完全不再被使用时,可以使用synchronize_rcu()来等待。

是否有不推荐使用该技术的场景?为什么?

  • 原子上下文:如上所述,在任何不能睡眠的上下文中使用synchronize_rcu()都是一个严重的编程错误,会导致系统崩溃或死锁。
  • 高性能、高并发的更新路径:在需要频繁更新RCU保护数据的热点路径上,使用synchronize_rcu()会导致大量的阻塞和上下文切换,严重影响性能。在这种场景下,异步的call_rcu()是唯一的正确选择。
  • 只需要释放内存的简单场景:如果更新操作之后唯一要做的就是释放旧数据的内存,而没有其他必须同步等待才能执行的逻辑,那么使用call_rcu()通常是更高效的选择,因为它不会阻塞当前线程。

对比分析

请将其 与 其他相似技术 进行详细对比。

synchronize_rcu() vs. call_rcu()

这是RCU更新者最重要的选择,也是fs/rcu/sync.c与RCU回调机制的核心对比。

特性 synchronize_rcu() call_rcu()
工作模式 同步 (Synchronous) / 阻塞 (Blocking) 异步 (Asynchronous) / 非阻塞 (Non-blocking)
实现方式 调用者线程进入睡眠,等待一个优雅周期结束后被唤醒。 调用者注册一个回调函数和数据指针,然后立即返回。RCU核心会在一个优雅周期结束后,在某个软中断上下文中调用该回调函数。
编程模型 简单、线性的代码流。 基于回调,逻辑被分割。
上下文限制 只能在可睡眠的上下文中使用。 可以在任何上下文中使用,包括原子上下文。
性能影响 可能阻塞关键路径,引起上下文切换,对吞吐量有影响。 不会阻塞调用者,对调用路径的性能影响极小。
适用场景 模块卸载等需要同步等待才能继续清理工作的场景。 高性能数据路径、原子上下文中的更新,以及所有简单的“更新后释放内存”场景。

synchronize_rcu() vs. mutex_lock() / rwsem

特性 synchronize_rcu() Mutex / Read-Write Semaphore
保护对象 保护的是写者和读者之间的关系,确保内存回收安全。它不提供写者之间的互斥。 提供读者之间、写者之间、以及读写者之间的互斥
读者开销 几乎为零。读者无需写内存,是RCU的核心优势。 读者需要修改锁的共享状态,存在缓存争用,有性能开销。
写者模型 写者通常需要配合自旋锁或互斥锁来保证写操作的原子性,然后调用synchronize_rcu()来等待读者。 一个锁同时完成了写者互斥和等待读者的功能。
适用场景 读多写少的场景,读性能至关重要。 读写平衡或写操作频繁的场景,或者读者需要阻塞写者的场景。

rcu_sync_init 初始化一个 rcu_sync 结构

1
2
3
4
5
6
7
8
9
/**
* rcu_sync_init() - 初始化一个 rcu_sync 结构
* @rsp: 指向要初始化的 rcu_sync 结构的指针
*/
void rcu_sync_init(struct rcu_sync *rsp)
{
memset(rsp, 0, sizeof(*rsp));
init_waitqueue_head(&rsp->gp_wait);
}

RCU_SYNC 状态机:协调读者快慢路径转换的同步原语

本代码片段定义了一个名为 rcu_sync 的同步原语。其核心功能是实现一个复杂的状态机,用于协调写者(updater)和大量读者(reader)之间的同步。写者通过调用rcu_sync_enter来“宣告”一个更新周期的开始,强制所有后续读者进入“慢速路径”;通过调用rcu_sync_exit来宣告更新周期的结束,允许读者在经过一个RCU宽限期后返回“快速路径”。这个机制的设计目标是:既要保证正确性(即在更新期间读者必须走慢路径),又要通过状态压缩和回调重放等技巧,最大限度地减少因频繁的快慢路径切换而导致的宽限期等待,从而提高性能。

实现原理分析

rcu_sync的核心是一个五状态的状态机(GP_IDLE, GP_ENTER, GP_PASSED, GP_EXIT, GP_REPLAY)和一个引用计数器 gp_count

  • gp_count: 这是一个嵌套计数器。每次调用rcu_sync_enter时它加一,每次调用rcu_sync_exit时它减一。这允许多个写者可以嵌套地进入“写模式”。只有当最后一个写者退出时(gp_count变为0),才真正开始让读者返回快速路径的流程。
  • 状态机 (gp_state):
    • GP_IDLE: 空闲状态。所有读者都可以走快速路径。
    • GP_ENTER: 第一个写者进入。已启动一个宽限期等待,以确保所有CPU都观察到状态变化,从而阻止新的读者走快速路径。
    • GP_PASSED: 宽限期已过。现在可以保证,系统中的所有读者都已经观察到了GP_IDLE -> GP_ENTER的转变,因此要么已经退出,要么正在走慢速路径。
    • GP_EXIT: 最后一个写者已退出。启动一个新的宽限期,以确保所有CPU都能观察到写者临界区的所有内存修改。
    • GP_REPLAY: 一个优化状态。如果在GP_EXIT状态的宽限期内,又有一个新的写者进入并退出了,就进入此状态,表示需要“重放”一次GP_EXIT的宽限期回调。

工作流程剖析:

  1. 第一个写者进入 (rcu_sync_enter):

    • 系统处于GP_IDLE
    • gp_count从0变为1。
    • 状态变为GP_ENTER
    • 关键rcu_sync_enter会调用synchronize_rcu(),这是一个阻塞操作,它会立即等待一个完整的RCU宽限期结束。
    • 宽限期结束后,调用rcu_sync_func,状态变为GP_PASSED
    • rcu_sync_enter返回。此时,写者可以确信,所有读者都已经看到了状态变化。
  2. 嵌套的写者进入:

    • 系统已处于GP_PASSED状态。
    • 新的写者调用rcu_sync_enter
    • gp_count递增。
    • 由于状态已是GP_PASSEDwait_event条件立即满足,函数几乎无开销地立即返回。这是对嵌套写者的重要优化。
  3. 最后一个写者退出 (rcu_sync_exit):

    • gp_count从1减为0。
    • 系统处于GP_PASSED状态。
    • 状态变为GP_EXIT
    • 调用rcu_sync_call,它会异步地在未来的一个宽限期结束后,调用rcu_sync_func
  4. RCU回调 (rcu_sync_func):

    • 这是状态机的驱动核心,它总是在一个宽限期之后被调用。
    • 场景1 (来自GP_EXIT): 如果当前状态是GP_EXIT,说明自最后一个写者退出以来,至少经过了一个宽限期。现在可以安全地让读者返回快速路径了。它将状态设置回GP_IDLE
    • 场景2 (优化 - 来自GP_REPLAY): 如果在GP_EXIT的宽限期内,有新的写者进入又退出,状态会变为GP_REPLAY。当回调发生时,它发现状态是GP_REPLAY,就明白需要再启动一个新的宽限期来服务那个新的退出者。它将状态变回GP_EXIT并重新调用rcu_sync_call。这避免了为每个退出都启动一个独立的、可能重叠的宽限期。

特定场景分析:单核、无MMU的STM32H750平台

硬件交互与MMU

此机制是纯软件的同步原语,与硬件或MMU无关。

单核环境下的RCU

RCU (Read-Copy-Update) 机制在单核系统上依然有效,但其实现会退化。

  • 宽限期 (Grace Period): 在多核系统中,一个RCU宽限期是“等待所有CPU都经过一次上下文切换”的时间。在单核可抢占内核中,一个宽限期就是“等待所有已存在的读端临界区都完成执行”的时间,这通常通过等待一个调度器时钟节拍来近似。在单核非抢占内核中,RCU的读端临界区不会被抢占,因此宽限期几乎是瞬时的。
  • synchronize_rcu(): 在单核系统上,此函数仍然会阻塞,以确保所有在调用前已经开始的RCU读端临-临界区都已完成。
  • call_rcu(): 仍然会注册一个回调,该回调会在一个宽限期之后被执行。

实际意义

percpu_rw_semaphore依赖rcu_sync来保证其正确性。因此,即使在STM32H750上,rcu_sync的这套复杂逻辑也是必需的

  • rcu_sync_enter中的synchronize_rcu()保证了当写者开始等待读者计数器清零时,不会有新的读者再进入快速路径。
  • rcu_sync_exit中的call_rcu()保证了在读者被允许返回快速路径之前,写者临界区的所有内存修改对系统中的所有任务都已可见。

结论rcu_syncpercpu_rw_semaphore能够安全工作的理论基石。它通过RCU的宽限期保证,将写者对“快慢路径开关”(即sem->block标志)的修改,安全地广播给所有潜在的读者。在单核STM32H750上,这套机制依然通过管理抢占和调度,正确地实现了其同步语义。

代码分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
// rcu_sync_enter: 强制读者进入慢速路径。
void rcu_sync_enter(struct rcu_sync *rsp)
{
int gp_state;

spin_lock_irq(&rsp->rss_lock);
gp_state = rsp->gp_state;
// 关键路径:第一个写者进入。
if (gp_state == GP_IDLE) {
WRITE_ONCE(rsp->gp_state, GP_ENTER);
// ... (一些优化相关的注释) ...
}
// 引用计数加一,支持嵌套。
rsp->gp_count++;
spin_unlock_irq(&rsp->rss_lock);

// 仅当是第一个写者时,才需要阻塞等待一个宽限期。
if (gp_state == GP_IDLE) {
// 阻塞等待一个RCU宽限期。
synchronize_rcu();
// 手动调用回调函数来将状态推进到GP_PASSED。
rcu_sync_func(&rsp->cb_head);
return;
}

// 对于嵌套的写者,只需等待状态变为GP_PASSED即可,通常会立即满足。
wait_event(rsp->gp_wait, READ_ONCE(rsp->gp_state) >= GP_PASSED);
}

// rcu_sync_exit: 允许读者在宽限期后返回快速路径。
void rcu_sync_exit(struct rcu_sync *rsp)
{
// ... (一些警告检查) ...

spin_lock_irq(&rsp->rss_lock);
// 引用计数减一。
if (!--rsp->gp_count) {
// 关键路径:最后一个写者退出。
if (rsp->gp_state == GP_PASSED) {
// 状态变为GP_EXIT,并异步注册回调。
WRITE_ONCE(rsp->gp_state, GP_EXIT);
rcu_sync_call(rsp);
} else if (rsp->gp_state == GP_EXIT) {
// 优化:如果在GP_EXIT的宽限期内又有写者进入并退出,
// 将状态设为GP_REPLAY,以便在回调中“重放”一次宽限期。
WRITE_ONCE(rsp->gp_state, GP_REPLAY);
}
}
spin_unlock_irq(&rsp->rss_lock);
}

// rcu_sync_func: RCU回调,状态机的核心驱动。
static void rcu_sync_func(struct rcu_head *rhp)
{
// ... (获取rsp指针和锁) ...

// 情况1: 来自第一个写者的synchronize_rcu()
if (rsp->gp_count) {
// 宽限期已过,写者可以安全地继续。
WRITE_ONCE(rsp->gp_state, GP_PASSED);
wake_up_locked(&rsp->gp_wait); // 唤醒enter函数。
}
// 情况3 (优化): 来自GP_EXIT,但期间有新的退出。
else if (rsp->gp_state == GP_REPLAY) {
// 重置状态为GP_EXIT并重新注册回调,相当于合并了宽限期。
WRITE_ONCE(rsp->gp_state, GP_EXIT);
rcu_sync_call(rsp);
}
// 情况2: 来自最后一个写者的rcu_sync_exit()
else {
// 宽限期已过,写者的修改已对所有CPU可见。
// 安全地将状态重置为IDLE,允许读者返回快速路径。
WRITE_ONCE(rsp->gp_state, GP_IDLE);
}
// ... (释放锁) ...
}

RCU_SYNC 析构函数:确保所有RCU回调完成

本代码片段定义了 rcu_sync_dtor 函数,它是 rcu_sync 状态机的析构函数(Destructor)。当一个包含 rcu_sync 结构体的对象(例如 percpu_rw_semaphore)即将被销毁时,必须调用此函数。其核心功能是确保所有由该 rcu_sync 实例注册的、可能还在等待执行的RCU回调都已经全部完成。这是一个强力的同步屏障,用于防止在对象被释放后,仍有“游荡”的RCU回调尝试访问该对象的内存,从而避免 use-after-free 错误。

实现原理分析

rcu_sync_dtor 的实现依赖于一个非常强力的RCU同步原语:rcu_barrier()

  1. 状态检查与警告:

    • WARN_ON_ONCE(READ_ONCE(rsp->gp_state) == GP_PASSED): 函数首先检查状态是否为 GP_PASSED。这个状态意味着有写者正处于其临界区中(rcu_sync_enter已完成,但rcu_sync_exit尚未调用)。在这种状态下销毁 rcu_sync 结构是严重的逻辑错误,因为它破坏了锁的协议,因此会触发一个内核警告。
    • WARN_ON_ONCE(rsp->gp_count): 在锁的保护下,它再次检查引用计数 gp_count。如果计数不为零,同样意味着有未配对的 enter/exit 调用,也是一个逻辑错误。
  2. 处理 GP_REPLAY 状态:

    • if (rsp->gp_state == GP_REPLAY) WRITE_ONCE(rsp->gp_state, GP_EXIT);
    • 这是一个微妙的清理步骤。GP_REPLAY 是一个瞬态,表示“需要重放一次 GP_EXIT 回调”。如果在销毁时恰好处于这个状态,直接将其强制拨回 GP_EXIT 状态。这样做简化了 rcu_barrier 的处理逻辑,因为 rcu_barrier 只需要关心是否有处于 GP_EXIT 状态的回调即可。
  3. 核心同步屏障 (rcu_barrier):

    • if (gp_state != GP_IDLE) { rcu_barrier(); ... }
    • 这是整个函数的关键。如果 rcu_sync 状态机不处于 GP_IDLE(空闲)状态,就意味着它至少处于 GP_EXIT 状态,即有一个 rcu_sync_call 注册的回调函数正在等待未来的某个RCU宽限期结束后被执行。
    • rcu_barrier() 是一个阻塞函数。它会一直等待,直到系统中所有已注册的RCU回调都已经执行完毕
    • 通过调用 rcu_barrier()rcu_sync_dtor 强制等待那个挂起的 rcu_sync_func 回调执行。当 rcu_sync_func 执行时,它会将状态最终转换回 GP_IDLE
    • 因此,当 rcu_barrier() 返回时,可以百分之百地保证 rsp->gp_state 已经被回调函数设置为 GP_IDLE
  4. 最终状态断言:

    • WARN_ON_ONCE(rsp->gp_state != GP_IDLE);
    • rcu_barrier() 之后,再次检查状态。如果此时状态不是 GP_IDLE,说明 rcu_sync 的状态机逻辑存在严重缺陷,会触发一个内核警告。这是一个用于验证正确性的断言。

代码分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/**
* rcu_sync_dtor() - 清理一个rcu_sync结构体
* @rsp: 指向需要被清理的rcu_sync结构体的指针
*/
void rcu_sync_dtor(struct rcu_sync *rsp)
{
int gp_state;

// 警告:如果在有写者处于临界区时(GP_PASSED状态)销毁,是逻辑错误。
WARN_ON_ONCE(READ_ONCE(rsp->gp_state) == GP_PASSED);

spin_lock_irq(&rsp->rss_lock);
// 警告:如果引用计数不为0,说明有未配对的enter/exit调用,是逻辑错误。
WARN_ON_ONCE(rsp->gp_count);
// 如果处于瞬态的GP_REPLAY,将其强制拨回GP_EXIT,以简化后续处理。
if (rsp->gp_state == GP_REPLAY)
WRITE_ONCE(rsp->gp_state, GP_EXIT);
gp_state = rsp->gp_state;
spin_unlock_irq(&rsp->rss_lock);

// 如果状态不是IDLE,说明有挂起的RCU回调。
if (gp_state != GP_IDLE) {
// rcu_barrier()是一个强同步点:它会阻塞,直到所有已注册的
// RCU回调都执行完毕。
rcu_barrier();
// 在rcu_barrier()返回后,我们断言状态机必须已经回到了IDLE状态。
// 如果不是,说明状态机逻辑有bug。
WARN_ON_ONCE(rsp->gp_state != GP_IDLE);
}
}