[TOC]

kernel/locking/rwsem.c 读写信号量(Read-Write Semaphore) 内核中可睡眠的读写锁

历史与背景

这项技术是为了解决什么特定问题而诞生的?

kernel/locking/rwsem.c 实现的读写信号量(Read-Write Semaphore, rwsem)是为了解决rwlock_t(读写自旋锁)无法解决的一类“读多写少”场景的并发控制问题:当临界区的执行时间较长,或者在临界区内需要睡眠时,如何高效地进行读写并发控制

rwlock_t是一个自旋锁,它要求临界区必须极短且绝对不能睡眠。然而,在内核的许多路径中(特别是与用户空间交互的系统调用),临界区可能很长,并且包含可能导致睡眠的操作(如copy_from_userkmalloc(GFP_KERNEL)或等待I/O)。

rwsem正是为了这种可睡眠的、读多写少的场景而设计的。它是一个睡眠锁,提供了与rwlock_t相同的逻辑:

  • 允许多个读者并发进入
  • 只允许一个写者独占进入
  • 读者和写者互斥

当一个任务试图获取一个被占用的rwsem时,它不会自旋,而是会被加入到一个等待队列中并进入睡眠,将CPU让给其他任务。这使得它非常适合用于保护那些在进程上下文中被访问的、读多写少的数据结构。

它的发展经历了哪些重要的里程碑或版本迭代?

rwsem的实现为了追求性能和公平性,经历了从简单到复杂的演进。

  • 早期的简单实现:最初的rwsem实现比较简单,可能存在性能瓶颈,特别是在高竞争和多核环境下。
  • 引入优化的、架构相关的实现:一个重要的里程碑是为不同的CPU架构(特别是x86)提供了高度优化的汇编实现。这些实现利用了架构特定的原子指令来构建高效的快速路径。
  • 现代通用C语言实现的优化:内核引入了一个通用的、经过优化的C语言版rwsem实现。这个实现的核心是一个原子计数器,它通过精巧的位操作来同时编码读者数量、写者状态以及是否有等待者等信息。它也区分了快速路径慢速路径
    • 快速路径:在无竞争或低竞争时,通过一次原子操作就能成功获取锁。
    • 慢速路径:当锁存在竞争时,进入慢速路径,处理等待队列和任务睡眠。
  • 写者优先策略的调整:为了防止写者饥饿,rwsem的实现通常会给予等待中的写者一定的优先权。当一个写者在等待时,后续到来的新读者也会被阻塞,以确保写者能有机会获得锁。

目前该技术的社区活跃度和主流应用情况如何?

rwsem是内核中最基础、最重要的同步原语之一,与mutexspinlock并驾齐驱。其代码非常核心和稳定。社区的活跃度主要体现在对其性能的持续分析和微调上。
它被广泛应用于内核中几乎所有符合**“读多写少”且“可睡眠”**条件的场景:

  • 内存管理:保护mm_struct结构体。当多个线程在同一个进程中读取内存映射(如缺页处理)时,它们可以共享读锁。当mmap()munmap()修改地址空间时,需要获取写锁。这是rwsem最经典的应用之一。
  • -文件系统:保护inodesuperblock中的某些字段。
  • 内核核心数据结构:保护模块列表、可执行文件格式列表等。

核心原理与设计

它的核心工作原理是什么?

现代rwsem的核心是一个原子计数器和一个等待队列的结合。

  1. 数据结构 (struct rw_semaphore)

    • count: 一个原子变量(atomic_long_t),用于编码锁的状态。
    • wait_lock: 一个raw_spinlock_t,用于保护等待队列。
    • wait_list: 等待队列的链表头。
  2. 状态编码 (count字段)

    • count > 0: 表示有count个读者持有锁,且没有写者。
    • count == 0: 锁未被持有。
    • count < 0: 通常表示有一个写者持有或正在等待锁。
  3. 获取读锁 (down_read)

    • 快速路径:尝试原子地给count加一。如果结果仍然是正数,说明之前没有写者,成功获取读锁。
    • 慢速路径:如果原子加一后的结果是0或负数(意味着有写者),则回退操作,然后进入慢速路径。在慢速路径中,任务会被加入等待队列并睡眠。
  4. 获取写锁 (down_write)

    • 快速路径:通过一次原子的比较并交换操作,尝试将count从0(未锁定)设置为一个表示“写入”状态的负值。
    • 慢速路径:如果count不为0(有读者或另一个写者),则进入慢速路径,加入等待队列并睡眠。
  5. 释放读锁 (up_read)

    • 快速路径:原子地给count减一。
    • 慢速路径:如果减一后count变为0或某个特定的负值(表明有等待的写者),则需要进入慢速路径来唤醒等待者。
  6. 释放写锁 (up_write)

    • 快速路径/慢速路径:原子地将count设置回0,然后检查等待队列。如果有等待者,则进入慢速路径唤醒它们。

它的主要优势体现在哪些方面?

  • 提高可睡眠场景下的并发性:完美地结合了rwlock的读共享特性和mutex的可睡眠特性,是“读多写少且可睡眠”场景下的最佳选择。
  • 高效性:与mutex类似,其快速路径实现开销极低。
  • 清晰的语义down_read/down_write清晰地表达了开发者的意图。

它存在哪些已知的劣势、局限性或在特定场景下的不适用性?

  • 不能在中断上下文中使用:作为睡眠锁,它绝对不能在任何原子上下文(硬中断、软中断、tasklet)中使用。
  • 开销高于mutex:其内部逻辑比简单的mutex更复杂,因此在纯互斥(没有读者)的场景下,其开销略高于mutex
  • 写者饥饿风险:虽然实现中已尽力避免,但在极端的读密集型负载下,写者仍可能面临比在mutex中更长的等待时间。

使用场景

在哪些具体的业务或技术场景下,它是首选解决方案?

rwsem是保护那些在进程上下文中被访问、且读多写少的数据结构的首选。

  • 保护mm_struct:如前所述,一个进程的地址空间被其所有线程频繁地读取(缺页、访问数据),但只在mmap/munmap等少数情况下被写入。这是rwsem最教科书式的应用。
  • 文件系统中的inode->i_rwsem:保护inode,允许多个进程同时读取一个文件的元数据,但当需要修改元数据(如改变文件大小、权限)时,需要获取写锁。
  • 模块列表保护:当lsmod命令运行时,它需要读取内核的模块列表。当insmodrmmod时,需要修改该列表。读取远比修改频繁。

是否有不推荐使用该技术的场景?为什么?

  • 中断上下文:绝对禁止。必须使用rwlock_t(读写自旋锁)。
  • 读写均衡或写操作频繁:在这种场景下,允许多个读者并发带来的好处会被更复杂的锁操作开销所抵消。一个简单的mutex通常是更好、更简单的选择。
  • 临界区极短且不睡眠:应使用rwlock_t

对比分析

请将其 与 其他相似技术 进行详细对比。

特性 读写信号量 (rw_semaphore) 读写自旋锁 (rwlock_t) 互斥锁 (Mutex)
基本行为 睡眠 (放弃CPU) 自旋 (忙等待) 睡眠 (放弃CPU)
使用上下文 仅进程上下文 任何上下文 仅进程上下文
并发模型 读者共享 / 写者互斥 读者共享 / 写者互斥 完全互斥
临界区约束 可以睡眠,可以较长。 绝对不能睡眠,必须极短。 可以睡眠,可以较长。
性能(无竞争) 非常高 (快速路径) 非常高 (原子操作) 非常高 (快速路径)
性能(有竞争) 中等 (上下文切换开销) 低 (CPU空转) 中等 (上下文切换开销)
典型场景 保护mm_struct,文件系统inode 保护tasklist_lock,网络路由表。 驱动ioctl,大多数进程上下文的互斥需求。

kernel/locking/rwsem.c

init_rwsem 初始化读写信号量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/*
* rw_semaphore的 count 字段指南。
*
*写锁状态:
当 count 字段的 RWSEM_WRITER_LOCKED 位被设置时,表示锁由写入器(写线程)拥有。
写锁是独占的,其他线程无法同时读取或写入。
* 读锁状态:
当以下条件同时满足时,锁由读取器(读线程)拥有:
RWSEM_WRITER_LOCKED 位未设置。
count 字段中设置了一些读取器位。
owner 字段设置了 RWSEM_READ_OWNED 位。
仅设置读取器位不足以保证锁由读取器拥有,因为读取器可能正在释放锁,而写入器可能刚刚释放锁并允许另一个写入器抢占锁。
*/

/*
* Initialize an rwsem:
*/
void __init_rwsem(struct rw_semaphore *sem, const char *name,
struct lock_class_key *key)
{
atomic_long_set(&sem->count, RWSEM_UNLOCKED_VALUE);
raw_spin_lock_init(&sem->wait_lock);
INIT_LIST_HEAD(&sem->wait_list);
atomic_long_set(&sem->owner, 0L);
}
EXPORT_SYMBOL(__init_rwsem);

#define init_rwsem(sem) \
do { \
static struct lock_class_key __key; \
\
__init_rwsem((sem), #sem, &__key); \
} while (0)

rwsem_add_waiter 添加等待者到等待队列

1
2
3
4
5
6
7
static inline void
rwsem_add_waiter(struct rw_semaphore *sem, struct rwsem_waiter *waiter)
{
lockdep_assert_held(&sem->wait_lock);
list_add_tail(&waiter->list, &sem->wait_list);
/* caller will set RWSEM_FLAG_WAITERS */
}

rwsem_first_waiter 获取等待队列中的第一个等待者

1
2
#define rwsem_first_waiter(sem) \
list_first_entry(&sem->wait_list, struct rwsem_waiter, list)

__rwsem_set_reader_owned 设置读者拥有的状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/*
* 最后一个拥有的读者的task_struct指针将保留在所有者字段中。
*
* 请注意,owner 值仅表示任务之前拥有 rwsem,
* 当检查该字段时,它可能不再是真正的 owner 或真正的 owner 之一,因此请持保留态度。
*
* 保留读取器不可旋转的位。
*/
static inline void __rwsem_set_reader_owned(struct rw_semaphore *sem,
struct task_struct *owner)
{
/* 表示信号量当前由读者持有 */
unsigned long val = (unsigned long)owner | RWSEM_READER_OWNED |
/* 留 RWSEM_NONSPINNABLE 位,表示信号量的非自旋状态 */
(atomic_long_read(&sem->owner) & RWSEM_NONSPINNABLE);

atomic_long_set(&sem->owner, val);
}

rwsem_mark_wake 处理锁释放时的唤醒逻辑,当有阻塞进程可以被唤醒时调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
```c
/*
* 处理锁释放时的唤醒逻辑,当有阻塞进程可以被唤醒时调用
* - 如果是从 up_xxxx() 进入此函数,RWSEM_FLAG_WAITERS 位必须已设置
* - 等待队列中必须有任务
* - 调用者必须持有 wait_lock
* - 被标记唤醒的任务会被加入 wake_q,调用者稍后需调用 wake_up_q() 实际唤醒这些任务,并释放引用计数,最好在释放 wait_lock 后进行
* - 被唤醒的进程块在 task 被清零后会从队列中移除
* - 仅当 downgrading 为 false 时,writer 才会被标记为唤醒
*
* 对所有被唤醒的 reader,隐含调用 rwsem_del_waiter()
*/
static void rwsem_mark_wake(struct rw_semaphore *sem,
enum rwsem_wake_type wake_type,
struct wake_q_head *wake_q)
{
struct rwsem_waiter *waiter, *tmp;
long oldcount, woken = 0, adjustment = 0;
struct list_head wlist;

lockdep_assert_held(&sem->wait_lock);

/*
* 查看队列头部的 waiter,以决定唤醒哪些任务
*/
waiter = rwsem_first_waiter(sem);
/* 队列头部的任务是写锁等待者 */
if (waiter->type == RWSEM_WAITING_FOR_WRITE) {
if (wake_type == RWSEM_WAKE_ANY) {
/*
* 标记队列头部的 writer 可被唤醒。
* 在调用者实际唤醒任务前,其他 writer 仍可“偷走”锁。
* 而 reader 会因检测到队列中有 writer 而阻塞。
*/
wake_q_add(wake_q, waiter->task);
lockevent_inc(rwsem_wake_writer);
}

return;
}

/*
* 如果 reader 数量过多,则不唤醒新的 reader。
*/
if (unlikely(atomic_long_read(&sem->count) < 0))
return;

/*
* writer 可能会在我们授予下一个 reader 锁之前“偷走”锁。
* 优先授予第一个 reader 锁,如果 writer 抢锁则提前退出。
*/
/* RWSEM_WAKE_READ_OWNED(指示当前信号量的锁状态已经被读者持有) */
if (wake_type != RWSEM_WAKE_READ_OWNED) {
struct task_struct *owner;
/* 当读锁被获取时,信号量的计数会增加 RWSEM_READER_BIAS,表示读锁的存在 */
adjustment = RWSEM_READER_BIAS;
oldcount = atomic_long_fetch_add(adjustment, &sem->count);
if (unlikely(oldcount & RWSEM_WRITER_MASK)) {
/*
* 如果等待 writer 释放锁时间过长,则请求 HANDOFF 强制交接。
*/
if (time_after(jiffies, waiter->timeout)) {
if (!(oldcount & RWSEM_FLAG_HANDOFF)) {
adjustment -= RWSEM_FLAG_HANDOFF;
lockevent_inc(rwsem_rlock_handoff);
}
waiter->handoff_set = true;
}

atomic_long_add(-adjustment, &sem->count);
return;
}
/*
* 设置为 reader-owned,便于自旋 reader 早期检测到锁已被 reader 持有。
* reader 的 nonspinnable 位会被复制。
*/
owner = waiter->task;
__rwsem_set_reader_owned(sem, owner);
}

/*
* 最多唤醒 MAX_READERS_WAKEUP 个 reader。
* 采用两步法防止 reader 计数先减后增导致的竞态。
* 1) 收集所有可唤醒的 reader 到新链表,计数并一次性增加 reader 计数
* 2) 遍历新链表,将 waiter->task 置空并加入 wake_q,稍后唤醒
*/
INIT_LIST_HEAD(&wlist);
list_for_each_entry_safe(waiter, tmp, &sem->wait_list, list) {
/* 表示等待队列中的任务正在等待获取写锁 */
if (waiter->type == RWSEM_WAITING_FOR_WRITE)
continue;

woken++;
list_move_tail(&waiter->list, &wlist);

/*
* 限制每次唤醒的 reader 数量
*/
if (unlikely(woken >= MAX_READERS_WAKEUP))
break;
}

/* 根据唤醒的读锁数量调整信号量的计数 */
adjustment = woken * RWSEM_READER_BIAS - adjustment;
lockevent_cond_inc(rwsem_wake_reader, woken);

oldcount = atomic_long_read(&sem->count);
if (list_empty(&sem->wait_list)) {
/*
* 配合 list_move_tail(),等价于 rwsem_del_waiter()
*/
/* 清除 RWSEM_FLAG_WAITERS 标志,表示没有任务在等待锁 */
adjustment -= RWSEM_FLAG_WAITERS;
/* 包含 HANDOFF 标志,进一步调整计数以清除该标志 */
if (oldcount & RWSEM_FLAG_HANDOFF)
adjustment -= RWSEM_FLAG_HANDOFF;
} else if (woken) {
/*
* 唤醒 reader 后,不再强制 writer 让出锁,可清除 HANDOFF
*/
if (oldcount & RWSEM_FLAG_HANDOFF)
adjustment -= RWSEM_FLAG_HANDOFF;
}

if (adjustment)
atomic_long_add(adjustment, &sem->count);

/* 第二步,实际唤醒 */
list_for_each_entry_safe(waiter, tmp, &wlist, list) {
struct task_struct *tsk;

tsk = waiter->task;
/* 增加任务的引用计数,确保任务不会在唤醒前被释放 */
get_task_struct(tsk);

/*
* 先 get_task_struct,再将 waiter->task 置空,防止与 do_exit() 竞态
*/
/* 表示任务已经被处理 */
smp_store_release(&waiter->task, NULL);
/*
* 确保 waiter->task 置空后再唤醒
* 将任务安全地加入唤醒队列(wake_q),稍后由调用者实际唤醒任务
*/
wake_q_add_safe(wake_q, tsk);
}
}

rwsem_cond_wake_waiter 根据读写信号量(rw_semaphore)的当前状态,决定是否唤醒等待队列中的任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/*
* 准备唤醒等待队列中的等待者:
* 如果 rwsem 锁的拥有者不是写者,则将等待者加入给定的 wake_q。
* 如果 rwsem 可能被读者持有,则唤醒队列前面的读锁等待者;
* 否则唤醒队列前面的任意等待者。
*
* 该函数会在读者和写者的慢路径中被调用。
*/
static inline void rwsem_cond_wake_waiter(struct rw_semaphore *sem, long count,
struct wake_q_head *wake_q)
{
enum rwsem_wake_type wake_type;
/* 号量的状态表明当前锁的持有者是写者(通过 RWSEM_WRITER_MASK 检查),函数直接返回,不执行唤醒操作 */
if (count & RWSEM_WRITER_MASK)
return;
/* 信号量的状态表明当前锁的持有者是读者(通过 RWSEM_READER_MASK 检查),设置唤醒类型为 RWSEM_WAKE_READERS */
if (count & RWSEM_READER_MASK) {
wake_type = RWSEM_WAKE_READERS;
} else {
/* 设置唤醒类型为 RWSEM_WAKE_ANY,表示可以唤醒等待队列中的任何任务。 */
wake_type = RWSEM_WAKE_ANY;
/* 清除信号量的非自旋状态,优化后续锁的获取 */
clear_nonspinnable(sem);
}
/* 根据唤醒类型将任务标记为可唤醒,并将其加入唤醒队列(wake_q) */
rwsem_mark_wake(sem, wake_type, wake_q);
}

rwsem_try_write_lock 尝试获取写锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
/*
* 必须在持有 sem->wait_lock 的情况下调用此函数,以防止在检查 rwsem 等待队列和相应设置 sem->count 之间发生竞争条件。
*
* 成功时隐含调用 rwsem_del_waiter()。
*/
static inline bool rwsem_try_write_lock(struct rw_semaphore *sem,
struct rwsem_waiter *waiter)
{
struct rwsem_waiter *first = rwsem_first_waiter(sem);
long count, new;

lockdep_assert_held(&sem->wait_lock);

count = atomic_long_read(&sem->count);
do {
bool has_handoff = !!(count & RWSEM_FLAG_HANDOFF);

if (has_handoff) {
/*
* 如果设置了 handoff 位,仅当第一个等待者是设置该位的等待者时才让出锁。
* 否则,仍然尝试获取 rwsem。
*/
if (first->handoff_set && (waiter != first))
return false;
}

new = count;
/* 信号量已被锁定(RWSEM_LOCK_MASK) */
if (count & RWSEM_LOCK_MASK) {
/*
* 如果是 RT 任务或在等待队列中等待时间过长,等待者(无论是否为第一个)都可以设置 handoff 位。
*/
if (has_handoff || (!rt_or_dl_task(waiter->task) &&
!time_after(jiffies, waiter->timeout)))
return false;

new |= RWSEM_FLAG_HANDOFF;
} else {
new |= RWSEM_WRITER_LOCKED;
new &= ~RWSEM_FLAG_HANDOFF;

if (list_is_singular(&sem->wait_list))
new &= ~RWSEM_FLAG_WAITERS;
}
} while (!atomic_long_try_cmpxchg_acquire(&sem->count, &count, new));

/*
* 已经获取了锁(handoff 位被清除)或设置了 handoff 位。只有第一个等待者可以在这里设置 handoff_set,
* 以便在慢路径循环中启用乐观自旋。
*/
if (new & RWSEM_FLAG_HANDOFF) {
first->handoff_set = true;
lockevent_inc(rwsem_wlock_handoff);
return false;
}

/*
* rwsem_try_write_lock() 成功时完全隐含 rwsem_del_waiter()。
*/
list_del(&waiter->list);
rwsem_set_owner(sem);
return true;
}

rwsem_spin_on_owner Rwsem 对所有者的旋转

1
2
3
4
5
static inline enum owner_state
rwsem_spin_on_owner(struct rw_semaphore *sem)
{
return OWNER_NONSPINNABLE;
}

rwsem_down_write_slowpath 通过阻塞等待直到成功获取写锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
/*
* 等待我们成功获取写锁
*/
static struct rw_semaphore __sched *
rwsem_down_write_slowpath(struct rw_semaphore *sem, int state)
{
struct rwsem_waiter waiter;
DEFINE_WAKE_Q(wake_q);

/* 首先尝试乐观自旋(optimistic spinning),通过检查锁的所有者状态并尝试获取锁
如果自旋成功,直接返回信号量,避免进入慢路径。
乐观自旋适用于锁竞争较低的场景,可以减少上下文切换的开销 */
/* rwsem_can_spin_on_owner {return false} */
if (rwsem_can_spin_on_owner(sem) && rwsem_optimistic_spin(sem)) {
/* rwsem_optimistic_spin() 表示成功时 ACQUIRE */
return sem;
}

/*
* 乐观旋转失败,继续 slowpath 并阻塞,直到我们可以获取 sem。
*/
/* 初始化等待者(rwsem_waiter)结构,记录当前任务、等待类型和超时时间 */
waiter.task = current;
waiter.type = RWSEM_WAITING_FOR_WRITE;
waiter.timeout = jiffies + RWSEM_WAIT_TIMEOUT;
waiter.handoff_set = false;

raw_spin_lock_irq(&sem->wait_lock);
/* 将当前任务添加到信号量的等待队列中 */
rwsem_add_waiter(sem, &waiter);

/* 如果当前任务不是队列中的第一个等待者,尝试唤醒其他等待者以减少锁竞争 */
if (rwsem_first_waiter(sem) != &waiter) {
/* 根据信号量的当前状态决定是否唤醒等待队列中的任务 */
rwsem_cond_wake_waiter(sem, atomic_long_read(&sem->count),
&wake_q);
/* 如果唤醒队列中有任务需要唤醒,进入后续处理逻辑 */
if (!wake_q_empty(&wake_q)) {
/*
* 我们希望最大限度地减少wait_lock等待时间,尤其是当需要唤醒大量读者时。
*/
raw_spin_unlock_irq(&sem->wait_lock);
/* 唤醒唤醒队列中的任务 */
wake_up_q(&wake_q);
raw_spin_lock_irq(&sem->wait_lock);
}
} else {
/* 如果当前任务是第一个等待者,设置 RWSEM_FLAG_WAITERS 标志,表示有任务在等待锁 */
atomic_long_or(RWSEM_FLAG_WAITERS, &sem->count);
}

/* 等待我们成功获取锁 */
set_current_state(state);
trace_contention_begin(sem, LCB_F_WRITE);

for (;;) {
/* 尝试获取写锁。如果成功,退出循环 */
if (rwsem_try_write_lock(sem, &waiter)) {
/* rwsem_try_write_lock() implies ACQUIRE on success */
break;
}

raw_spin_unlock_irq(&sem->wait_lock);
/* 如果收到中断,跳转到 out_nolock 处理逻辑 */
if (signal_pending_state(state, current))
goto out_nolock;

/*
* 设置切换位但无法获取锁后,尝试对所有者进行旋转以加速锁转移。如果前一个所有者是 on-cpu 写入器,并且它刚刚释放了锁,则将返回 OWNER_NULL。在这种情况下,我们尝试在不休眠的情况下再次获取锁。
*/
/* 如果设置了交接位(handoff bit),尝试通过自旋加速锁的交接 */
if (waiter.handoff_set) {
enum owner_state owner_state;

owner_state = rwsem_spin_on_owner(sem);
/* 如果锁的前一个持有者刚刚释放锁,返回 OWNER_NULL,
直接跳转到 trylock_again 再次尝试获取锁 */
if (owner_state == OWNER_NULL)
goto trylock_again;
}
/* 进入调度等待,允许其他任务运行 */
schedule_preempt_disabled();
/* 更新任务状态为等待状态,并记录锁事件 */
lockevent_inc(rwsem_sleep_writer);
set_current_state(state);
trylock_again:
raw_spin_lock_irq(&sem->wait_lock);
}
__set_current_state(TASK_RUNNING);
raw_spin_unlock_irq(&sem->wait_lock);
lockevent_inc(rwsem_wlock);
trace_contention_end(sem, 0);
return sem;

out_nolock:
/* 如果任务在等待期间收到信号中断,移除等待队列中的任务,并记录锁失败事 */
__set_current_state(TASK_RUNNING);
raw_spin_lock_irq(&sem->wait_lock);
rwsem_del_wake_waiter(sem, &waiter, &wake_q);
lockevent_inc(rwsem_wlock_fail);
trace_contention_end(sem, -EINTR);
/* 返回错误指针(ERR_PTR(-EINTR)),表示操作被中断 */
return ERR_PTR(-EINTR);
}

down_write 获取读写信号量(rw_semaphore)的写锁获取读写信号量(rw_semaphore)的写锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
/*
* 所有对 owner 的写入都受到 WRITE_ONCE() 的保护,以确保 store 撕裂不会发生,因为乐观微调器可能会在没有锁的情况下并发读取和使用 owner 值。但是,从 owner 读取可能不需要 READ_ONCE(),只要指针值仅用于比较并且没有被取消引用。
*
* 两个 rwsem_{set,clear}_owner() 函数都应该与更改 sem->count 的原子作位于同一个 preempt disable 部分。
*/
static inline void rwsem_set_owner(struct rw_semaphore *sem)
{
lockdep_assert_preemption_disabled();
atomic_long_set(&sem->owner, (long)current);
}

static inline bool rwsem_write_trylock(struct rw_semaphore *sem)
{
/* 表示信号量当前处于未锁定状态 */
long tmp = RWSEM_UNLOCKED_VALUE;
/* 如果 sem->count 的当前值与 tmp 相等(即信号量未锁定),将其更新为 RWSEM_WRITER_LOCKED,表示写锁已获取 */
if (atomic_long_try_cmpxchg_acquire(&sem->count, &tmp, RWSEM_WRITER_LOCKED)) {
rwsem_set_owner(sem);
return true;
}
/* 如果信号量已被其他线程锁定,函数返回 false,表示写锁获取失败 */
return false;
}

/*
* lock for writing
*/
static __always_inline int __down_write_common(struct rw_semaphore *sem, int state)
{
int ret = 0;

preempt_disable();
if (unlikely(!rwsem_write_trylock(sem))) {
/* 如果锁已被其他线程持有,进入慢路径处理 */
if (IS_ERR(rwsem_down_write_slowpath(sem, state)))
ret = -EINTR;
}
preempt_enable();
return ret;
}

static __always_inline void __down_write(struct rw_semaphore *sem)
{
__down_write_common(sem, TASK_UNINTERRUPTIBLE);
}

/*
* lock for writing
*/
void __sched down_write(struct rw_semaphore *sem)
{
might_sleep();
rwsem_acquire(&sem->dep_map, 0, 0, _RET_IP_);
/* __down_write(sem) */
LOCK_CONTENDED(sem, __down_write_trylock, __down_write);
}
EXPORT_SYMBOL(down_write);

rwsem_wake 处理唤醒信号量上的等待者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/*
* 处理唤醒信号量上的等待者
* - 如果进入此处,up_read/up_write 已经减少了 count 的活动部分
*/
static struct rw_semaphore *rwsem_wake(struct rw_semaphore *sem)
{
unsigned long flags;
DEFINE_WAKE_Q(wake_q);

raw_spin_lock_irqsave(&sem->wait_lock, flags);

if (!list_empty(&sem->wait_list))
rwsem_mark_wake(sem, RWSEM_WAKE_ANY, &wake_q);

raw_spin_unlock_irqrestore(&sem->wait_lock, flags);
wake_up_q(&wake_q);

return sem;
}

up_write 释放写锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
/*
* unlock after writing
*/
static inline void __up_write(struct rw_semaphore *sem)
{
long tmp;
/* 信号量的 magic 字段是否与信号量本身匹配,确保信号量的完整性 */
DEBUG_RWSEMS_WARN_ON(sem->magic != sem, sem);
/*
* 如果通过设置 RWSEM_NONSPINNABLE 位将所有权转移给匿名写者,
* 此时 sem->owner 可能与 current 不同。
*/
DEBUG_RWSEMS_WARN_ON((rwsem_owner(sem) != current) &&
!rwsem_test_oflags(sem, RWSEM_NONSPINNABLE), sem);

preempt_disable();
/* 清除信号量的所有者字段,表示写锁已被释放 */
rwsem_clear_owner(sem);
/* 使用原子操作减少信号量的计数,清除写锁标志(RWSEM_WRITER_LOCKED) */
tmp = atomic_long_fetch_add_release(-RWSEM_WRITER_LOCKED, &sem->count);
/* RWSEM_FLAG_WAITERS 标志,表示有任务在等待锁 */
if (unlikely(tmp & RWSEM_FLAG_WAITERS))
/* 唤等待队列中的任务 */
rwsem_wake(sem);
preempt_enable();
}

/*
* release a write lock
*/
void up_write(struct rw_semaphore *sem)
{
rwsem_release(&sem->dep_map, _RET_IP_);
__up_write(sem);
}
EXPORT_SYMBOL(up_write);

include/linux/percpu-rwsem.h

Per-CPU读写信号量:高性能的冻结保护锁实现

本代码片段揭示了sb_start_writesb_end_write背后的核心实现——一个高度优化的、基于每CPU变量的读写信号量。其核心功能是提供一个允许多个“读者”(在此场景中是文件系统写入者)并发进入,但只允许一个“写者”(文件系统冻结者)独占访问的同步原语。它的设计目标是极致的性能,尤其是在无争用的“快速路径”上,通过避免昂贵的原子操作和多核间的缓存同步,来最小化对文件系统写入路径的性能影响。

实现原理分析

该实现的精妙之处在于它将锁的状态分散到每个CPU上,并结合RCU(Read-Copy-Update)的思想来同步读者和写者。

  1. 核心数据结构: struct percpu_rw_semaphore包含两个关键部分:
    • read_count: 一个每CPU(per-cpu)的整型变量。每个CPU都有自己独立的计数器。
    • rss (RCU Sync Structure) 和 writer (wait queue): 一个由“写者”(冻结者)用来与所有“读者”同步的RCU和等待队列结构。
  2. 获取锁 (percpu_down_read_internal) - 读者/写入者路径:
    • 快速路径 (Fast Path): 这是绝大多数情况下(即没有文件系统冻结操作时)的执行路径。
      • 代码首先通过rcu_sync_is_idle()检查是否有“写者”(冻结者)正在活动。
      • 如果没有,它会进入快速路径,只执行this_cpu_inc(*sem->read_count)。这是一个非原子的、针对当前CPU本地变量的增量操作。因为它只修改本地CPU的数据,所以不会导致其他CPU的缓存行失效,速度极快。
      • preempt_disable()/preempt_enable()对将这个操作包裹起来,形成了一个RCU-sched的读侧临界区。这可以防止当前任务被抢占到其他CPU,并确保“写者”可以通过RCU机制等待所有读者完成。
    • 慢速路径 (Slow Path): 当rcu_sync_is_idle()返回false时,表示有一个“写者”(冻结者)正在尝试获取独占访问权。
      • 此时,代码会调用一个更复杂的函数__percpu_down_read(未在此处显示)。
      • 这个慢速路径函数会执行必要的内存屏障,并可能使当前任务睡眠,直到“写者”完成其工作并解冻文件系统。
      • freezable参数是关键,它告知慢速路径函数在等待时,需要检查当前任务是否应被冻结(这是内核任务冻结机制的一部分),如果是,则进入冻结睡眠状态。
  3. 释放锁 (percpu_up_read) - 读者/写入者路径:
    • 快速路径: 同样,它首先检查rcu_sync_is_idle()。如果没有等待的“写者”,它只需执行this_cpu_dec(*sem->read_count),这是一个极快的本地CPU操作。
    • 慢速路径: 如果有“写者”正在等待,情况就变得复杂。
      • 它必须首先放置一个smp_mb()内存屏障。这确保了读者临界区内的所有内存操作,对于“写者”来说,都发生在读计数值减少之前。
      • 在递减了本地CPU的计数值后,它会调用rcuwait_wake_up(&sem->writer)唤醒可能正在睡眠的“写者”。“写者”在被唤醒后,会检查所有CPU上的read_count之和是否为0,如果是,它就知道所有读者都已离开,自己可以安全地进入临界区。

代码分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
// percpu_up_read: 释放一个per-cpu读锁(对应文件系统写入者结束写操作)。
static inline void percpu_up_read(struct percpu_rw_semaphore *sem)
{
// 为锁调试器(lockdep)标记锁被释放。
rwsem_release(&sem->dep_map, _RET_IP_);

// 禁用抢占,创建一个RCU-sched读侧临界区的结束点。
preempt_disable();
/*
* 与percpu_down_read()中的逻辑类似。
*/
// 快速路径:检查是否有写者(冻结者)正在等待。
if (likely(rcu_sync_is_idle(&sem->rss))) {
// 如果没有,只需递减当前CPU的本地读计数器。这是一个非常快的操作。
this_cpu_dec(*sem->read_count);
} else {
/*
* 慢速路径:有写者正在等待。
*/
smp_mb(); // 放置一个完整的内存屏障,确保临界区内的操作对写者可见。
/*
* 换句话说,如果写者看到了我们的计数值减少(特别是减少到0时),
* 它也必须能看到我们临界区内执行的所有操作。
*/
this_cpu_dec(*sem->read_count);
// 唤醒可能正在等待的写者。
rcuwait_wake_up(&sem->writer);
}
// 重新启用抢占。
preempt_enable();
}

// percpu_down_read_internal: 获取一个per-cpu读锁的核心实现。
// @freezable: 布尔值,指示在等待时是否应该响应内核的任务冻结请求。
static inline void percpu_down_read_internal(struct percpu_rw_semaphore *sem,
bool freezable)
{
// 静态检查,提示此函数可能会睡眠(在慢速路径中)。
might_sleep();

// 为锁调试器(lockdep)标记正在获取读锁。
rwsem_acquire_read(&sem->dep_map, 0, 0, _RET_IP_);

// 禁用抢占,创建一个RCU-sched读侧临界区的起始点。
preempt_disable();
/*
* 我们正处于一个RCU-sched读侧临界区中,所以写者无法在我们执行期间
* 改变sem->state并开始检查计数器。因此,如果我们看到!sem->state,
* 我们就知道写者在我们调用preempt_enable()之前不会检查计数器。
*/
// 快速路径:检查是否有写者(冻结者)正在活动。
if (likely(rcu_sync_is_idle(&sem->rss)))
// 如果没有,只需递增当前CPU的本地读计数器。
this_cpu_inc(*sem->read_count);
else
// 慢速路径:调用更复杂的函数,它会处理等待和冻结逻辑。
__percpu_down_read(sem, false, freezable);
/*
* preempt_enable()会阻止编译器将临界区之外的代码优化进来。
*/
// 重新启用抢占,结束临界区。
preempt_enable();
}

// percpu_down_read: percpu_down_read_internal的不可冻结版本封装。
static inline void percpu_down_read(struct percpu_rw_semaphore *sem)
{
percpu_down_read_internal(sem, false);
}

// percpu_down_read_freezable: percpu_down_read_internal的可冻结版本封装。
// 这就是sb_start_write最终调用的函数。
static inline void percpu_down_read_freezable(struct percpu_rw_semaphore *sem,
bool freeze)
{
percpu_down_read_internal(sem, freeze);
}

kernel/locking/percpu-rwsem.c Per-CPU读写信号量(Per-CPU Read-Write Semaphore) 优化读多写少场景的锁机制

历史与背景

这项技术是为了解决什么特定问题而诞生的?

这项技术是为了解决在高度并行的多核CPU系统上,标准读写信号量(rw_semaphore)在**“读多写少”(Read-mostly)场景下存在的读侧伸缩性(Read-side scalability)**瓶颈问题。

具体来说,标准rw_semaphore存在以下问题:

  • 缓存行伪共享(Cache Line False Sharing)rw_semaphore内部有一个共享的原子计数器,用于追踪活跃的读写者数量。当多个CPU上的线程同时获取读锁时,它们都需要去修改这个位于同一内存地址的计数器。这会导致该计数器所在的缓存行在不同CPU的缓存之间疯狂传递(称为“缓存行弹跳”或Cache Bouncing)。这种跨CPU的缓存同步开销巨大,严重限制了读操作的并行性能。即使读操作本身是并行的,对锁的获取操作也变成了串行的瓶颈。

percpu-rwsem 的诞生就是为了消除这个读侧瓶颈,使得在拥有大量CPU核心的系统上,读者的数量可以几乎无限制地扩展而不会相互干扰。

它的发展经历了哪些重要的里程碑或版本迭代?

percpu-rwsem 是由内核锁子系统的维护者们(如Peter Zijlstra)为了解决实际性能问题而设计的一种专用锁。

  • 引入:该机制作为一种优化的锁原语被引入内核,旨在为那些被证明存在读侧扩展性问题的关键数据结构提供一个通用的解决方案。
  • 应用:它被引入后,最重要的一个应用就是替换了进程内存管理中极其关键的 mmap_sem 锁。进程的内存映射区(VMA列表)是典型的读多写少场景:大量的缺页异常、内存访问会读取这个数据结构,而mmap()munmap()等修改操作则相对较少。将mmap_sem从标准rw_semaphore替换为percpu-rwsem,显著提升了多线程应用程序的性能。
  • 完善:后续的内核版本对该实现进行了一些微调和优化,以处理更复杂的CPU热插拔和调度器场景。

目前该技术的社区活跃度和主流应用情况如何?

percpu-rwsem 是Linux内核中一个成熟且标准的同步原语,但它是一个专用锁,而不是通用锁。它被用于内核中性能极其关键、且明确符合“读多写少”模式的少数几个地方。它的代码库非常稳定,社区的“活跃度”主要体现在当发现新的符合其应用场景的性能瓶颈时,会考虑使用它进行替换。

核心原理与设计

它的核心工作原理是什么?

percpu-rwsem 的核心思想是将读者的状态分散到每个CPU上,从而避免全局争用

  • 读者路径 (Read Path):

    1. 当一个线程需要获取读锁时,它不会去修改一个全局的共享计数器
    2. 它首先检查一个全局的“写者标志位”,确认当前没有写者正在或准备写入。
    3. 如果无写者,它会原子地增加当前CPU专属的一个per-CPU计数器。这个操作是CPU本地的,几乎不会产生跨CPU的缓存同步,因此速度极快。
    4. 为了防止在持有读锁期间,线程被抢占并迁移到其他CPU(这会导致其在原CPU上的计数无法被写者正确观察到),读者在获取锁时会禁止抢占
    5. 释放读锁时,读者只需原子地递减其所在CPU的per-CPU计数器,并重新允许抢占。
  • 写者路径 (Write Path):

    1. 写者路径是“慢路径”,它承担了协调所有CPU的代价。
    2. 当一个线程需要获取写锁时,它首先获取一个全局的互斥锁(mutex),确保同一时间只有一个写者可以进入。
    3. 然后,它设置那个全局的“写者标志位”,阻止任何新的读者进入。
    4. 接下来是关键步骤:写者必须等待所有已经存在的读者退出。它通过遍历系统中每一个CPU的per-CPU读者计数器,并等待该计数器的值变为0来实现。
    5. 一旦所有CPU的读者计数器都清零,写者就获得了独占访问权限,可以进行写操作。
    6. 释放写锁时,写者清除全局“写者标志位”,并释放全局互斥锁。

它的主要优势体现在哪些方面?

  • 极致的读侧伸缩性:读者之间几乎没有争用,读锁的获取和释放非常快。系统中的CPU核心越多,这个优势越明显。
  • 无伪共享:从根本上消除了读操作对共享锁计数器的缓存行弹跳问题。

它存在哪些已知的劣势、局限性或在特定场景下的不适用性?

  • 写者路径缓慢:写者获取锁的成本与系统中的CPU数量成正比,因为它需要轮询所有CPU的计数器。在CPU核心数非常多的系统上,写者获取锁的延迟会很高。
  • 可能导致写者饥饿:如果系统中有源源不断的读请求,写者可能需要等待很长时间才能等到所有CPU的读者计数都清零的时刻。
  • 读者禁止抢占:读者在持有锁的期间是不可抢占的。这意味着读侧的临界区必须非常短小且确定。如果在读锁保护的区域内进行任何可能导致睡眠或长时间运行的操作,将会严重影响系统的调度延迟和响应性。
  • 内存占用更大:需要为每个CPU都分配一个计数器,因此比标准的rw_semaphore占用更多的内存。

使用场景

在哪些具体的业务或技术场景下,它是首选解决方案?请举例说明。

它只适用于同时满足以下所有条件的极端场景:

  1. 读操作的频率远高于写操作
  2. 该数据结构是系统级的性能瓶颈点
  3. 读侧临界区非常短,代码执行路径清晰,绝不会睡眠
  4. 运行在拥有大量CPU核心的系统上,标准锁的伸缩性问题已经凸显
  • mm/mmap.c中的mmap_sem:这是percpu-rwsem最经典、最完美的用例。一个进程的虚拟内存区域(VMA)链表被频繁地读取(例如,在处理缺页中断时遍历VMA),而修改操作(mmap(), munmap())则少得多。读临界区也非常短。

是否有不推荐使用该技术的场景?为什么?

  • 写操作频繁或与读操作频率相当:在这种场景下,其缓慢的写者路径将成为性能瓶颈,性能会比标准的rw_semaphore甚至简单的mutex更差。
  • 读侧临界区较长或可能睡眠:由于读者禁止抢占,长时间持有该锁会严重损害系统实时性和调度公平性。
  • 单核或少核系统:在这种环境下,标准rw_semaphore的缓存争用问题不严重,使用percpu-rwsem只会带来不必要的复杂性和内存开销。

对比分析

请将其 与 其他相似技术 进行详细对比。

特性 percpu_rw_semaphore rw_semaphore (标准读写信号量)
实现方式 读者状态分散在Per-CPU变量中,写者协调所有CPU。 读者和写者状态都集中在一个全局共享的原子计数器中。
读侧获取成本 极低。仅修改本地CPU变量,无跨CPU争用。 中等。需要原子地修改一个全局共享变量,可能导致缓存行弹跳。
写侧获取成本 。与CPU核心数成正比,需要轮询所有CPU的读者计数。 中等。需要原子地修改全局变量,并等待所有读者退出。
读侧伸缩性 极高 。在多核环境下,读侧性能会因缓存争用而迅速饱和。
写者饥饿风险 相对较高。 较低(现代实现有反饥饿机制)。
读者临界区限制 必须禁止抢占 (non-preemptible),且必须非常短。 无此限制,读者可以被抢占。
内存占用 较高(需要Per-CPU数据结构)。 低(一个单一的结构体)。
最佳适用场景 极端读多写少、读临界区极短、且已成为多核系统性能瓶颈的关键数据结构。 通用的读多写少场景,是大部分情况下的默认选择。

__percpu_init_rwsem 初始化每 CPU 读写信号量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
int __percpu_init_rwsem(struct percpu_rw_semaphore *sem,
const char *name, struct lock_class_key *key)
{
sem->read_count = alloc_percpu(int);
if (unlikely(!sem->read_count))
return -ENOMEM;

rcu_sync_init(&sem->rss);
rcuwait_init(&sem->writer);
init_waitqueue_head(&sem->waiters);
atomic_set(&sem->block, 0);
#ifdef CONFIG_DEBUG_LOCK_ALLOC
debug_check_no_locks_freed((void *)sem, sizeof(*sem));
lockdep_init_map(&sem->dep_map, name, key, 0);
#endif
return 0;
}
EXPORT_SYMBOL_GPL(__percpu_init_rwsem);

Per-CPU Read-Write Semaphore: 极致优化读性能的并发锁

本代码片段定义了 percpu_rw_semaphore 的写者(writer)路径函数:percpu_down_write(获取写锁)和 percpu_up_write(释放写锁)。这是一种高度优化的读写锁,其核心设计目标是为读密集型(read-heavy)的工作负载提供极致的性能和可伸缩性。它通过允许读者(reader)在绝大多数情况下只访问其本地CPU的数据,几乎完全消除了读者之间的锁竞争和缓存争用。然而,这种极致的读性能是以写者路径的复杂性和较高开销为代价的。

实现原理分析

percpu_rw_semaphore 的实现借鉴了RCU(Read-Copy-Update)的核心思想。它没有一个全局的读计数器,而是为每个CPU都分配了一个本地的读计数器。

  • 读者(Reader)的快速路径(Fast Path) (未在代码中显示,但可以推断):

    1. 读者检查一个全局的sem->block标志。
    2. 如果标志为0,读者原子地递增其当前CPU的本地读计数器
    3. 进入临界区。
    4. 退出临界区后,原子地递减其本地CPU的读计数器
    • 这个过程极快,因为读者只修改本地CPU的缓存行,不会与其他CPU上的读者发生任何冲突。
  • 写者(Writer)的路径(percpu_down_write:
    写者的任务要复杂得多,它必须确保在进入临界区之前,所有CPU上所有已经存在的读者都已经退出了,并且要阻止任何新的读者进入。

    1. 宣告意图 (rcu_sync_enter): 写者首先进入一个RCU同步阶段。这相当于“拉响警报”,通知读者快速路径即将关闭。
    2. 获取写者锁 (__percpu_down_write_trylock): 写者尝试原子地设置全局的sem->block标志。这是一个互斥操作,确保了同一时间只有一个写者可以进行此过程(写者-写者互斥)。如果失败,说明已有其他写者,当前写者必须睡眠等待(percpu_rwsem_wait)。
    3. 等待现有读者完成 (rcuwait_wait_event): 成功设置sem->block标志后,新的读者会被阻塞在慢速路径上。但此时,可能还有一些“老”的读者(在block标志被设置前就进入了临界区)仍在运行。写者现在必须等待所有这些老读者全部完成。它通过rcuwait_wait_event睡眠,等待readers_active_check(sem)条件满足。readers_active_check函数会遍历所有CPU的本地读计数器,只有当所有计数器都变为0时,它才返回true,唤醒写者。
    4. 进入临界区: 一旦写者被唤醒,就意味着整个系统不再有任何读者,写者现在独占了访问权。
  • 释放写锁(percpu_up_write:

    1. 清除写者锁 (atomic_set_release): 写者原子地将sem->block标志清零。_release后缀是一个内存屏障,确保写者在临界区内的所有内存修改,对于其他CPU来说,都在block标志被清零之前变得可见。
    2. 唤醒等待者 (__wake_up): 唤醒所有因sem->block而睡眠等待的写者或读者(慢速路径)。
    3. 重新开放快速路径 (rcu_sync_exit): 写者退出RCU同步阶段。这会启动一个RCU宽限期(grace period)。只有在这个宽限期结束之后,读者才能再次安全地使用快速路径。这确保了所有CPU都观察到了block标志的清零。

特定场景分析:单核、无MMU的STM32H750平台

硬件交互与MMU

此机制是纯粹的内核同步原语,与硬件或MMU无关。

单核环境下的行为

虽然percpu_rw_semaphore是为多核系统设计的,但它在单核系统上的行为依然正确,并且其设计思想仍然适用,只是对抗的目标从“多核竞争”变为了“内核抢占”。

  • Per-CPU 退化: “Per-CPU数据”退化为“这一个CPU的数据”。所有操作都只针对CPU 0的本地计数器。
  • 写者路径 (percpu_down_write):
    1. 写者任务运行,调用percpu_down_write
    2. 它设置sem->block标志。
    3. 此时,一个读者任务可能会抢占写者。如果读者在写者设置block标志前已经进入临界区(即增加了CPU 0的本地计数器),那么当写者恢复执行并调用rcuwait_wait_event时,它会发现CPU 0的计数器不为0,于是会睡眠
    4. 调度器会切换回读者任务。读者任务完成后,递减计数器为0。
    5. 最终,写者任务被唤醒,检查到计数器为0,成功进入临界区。
  • 锁的必要性: rcuwait, atomic_*等操作在单核系统上依然通过禁止/使能中断禁止/使能抢占来实现原子性,这对于防止因任务抢占或中断服务程序导致的数据竞争是绝对必要的。

实际意义

file_rwsem(用于保护全局文件锁链表)使用这种锁,是因为读取/proc/locks是一个非常频繁的“读”操作,而文件锁的增删(例如,模块加载/卸载时)是相对稀少的“写”操作。在STM32H750上:

  • 优势: 如果有多个任务或线程可能同时读取/proc/lockspercpu_rw_semaphore可以确保它们之间几乎没有干扰,即使在单核上(因为它们不会因锁而睡眠,除非有写者)。
  • 代价: 代价是写者路径非常慢。它需要等待一个完整的RCU宽限期。在实时性要求高的场景中,写者可能会引入不可预测的延迟。
  • 结论: 即使在单核系统上,percpu_rw_semaphore也是一个合理的选择,前提是系统的访问模式是压倒性的读多写少,并且可以容忍较高的写延迟。

代码分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
// percpu_down_write: 获取per-CPU读写信号量的写锁。
void __sched percpu_down_write(struct percpu_rw_semaphore *sem)
{
bool contended = false;

// 标记此函数可能会导致睡眠。
might_sleep();
// lockdep注解:获取锁依赖映射。
rwsem_acquire(&sem->dep_map, 0, 0, _RET_IP_);

// 步骤1: 进入RCU同步状态,通知读者快速路径即将关闭。
rcu_sync_enter(&sem->rss);

// 步骤2: 尝试获取写者-写者互斥锁(设置sem->block)。
if (!__percpu_down_write_trylock(sem)) {
// 如果失败,说明有其他写者,进入等待。
trace_contention_begin(sem, LCB_F_PERCPU | LCB_F_WRITE);
percpu_rwsem_wait(sem, /* .reader = */ false, false);
contended = true;
}

// 步骤3: 等待所有已存在的读者完成。
// rcuwait_wait_event会睡眠,直到readers_active_check(sem)返回true
// (即所有per-CPU读计数器都为0)。
rcuwait_wait_event(&sem->writer, readers_active_check(sem), TASK_UNINTERRUPTIBLE);
if (contended)
trace_contention_end(sem, 0);
}
EXPORT_SYMBOL_GPL(percpu_down_write);

// percpu_up_write: 释放per-CPU读写信号量的写锁。
void percpu_up_write(struct percpu_rw_semaphore *sem)
{
// lockdep注解:释放锁依赖映射。
rwsem_release(&sem->dep_map, _RET_IP_);

// 步骤1: 原子地清除sem->block标志,并使用release内存屏障
// 确保临界区内的修改对其他CPU可见。
atomic_set_release(&sem->block, 0);

// 步骤2: 唤醒任何正在等待的读者或写者。
__wake_up(&sem->waiters, TASK_NORMAL, 1, sem);

// 步骤3: 退出RCU同步状态。
// 在一个RCU宽限期之后,读者将能够再次使用快速路径。
rcu_sync_exit(&sem->rss);
}
EXPORT_SYMBOL_GPL(percpu_up_write);

Per-CPU RW Semaphore: 读者快速路径与公平慢速路径的实现

本代码片段揭示了 percpu_rw_semaphore 如何实现其核心的性能优势——读者的无锁快速路径,以及当快速路径不可用时,读者和写者如何通过一个**公平的等待队列(慢速路径)**进行仲裁。这部分代码是理解这种高级锁机制如何在实践中运作的关键。

实现原理分析

1. 读者的快速路径 (__percpu_down_read_trylock)

这是整个机制中性能最关键的部分。它的目标是让读者尽可能快地进入临界区,避免任何跨CPU的同步开销。

  1. 乐观递增: 函数上来就乐观地递增当前CPU的本地读计数器 (this_cpu_inc)。这是一个极快的操作,因为它只修改当前CPU的私有数据。
  2. 内存屏障 (smp_mb()): 这是一个至关重要的配对屏障。它与percpu_up_write中的atomic_set_release(&sem->block, 0)配对,与__percpu_down_write_trylock中的atomic_xchg配对。这个屏障确保了:
    • 读者视角: 在检查sem->block标志之前,本地读计数器的递增操作必须完成。
    • 写者视角: 在设置sem->block标志之后,它才能去检查所有per-CPU的读计数器。
    • 保证: 这就形成了一个完美的逻辑互锁:如果读者没有看到写者设置的block标志(即!sem->block为真),那么写者必定能看到读者递增后的计数器,从而会等待该读者完成。反之,如果写者没有看到读者的计数器,那么读者必定会看到block标志,从而进入慢速路径。
  3. 检查写者标志: atomic_read_acquire(&sem->block)检查全局的block标志。likely()告诉编译器,绝大多数情况下这个标志都是0。_acquire屏障确保在临界区内的任何读操作都不会被乱序执行到检查block标志之前。
  4. 成功路径: 如果block标志为0,trylock成功,函数返回true。读者已成功进入临界区。
  5. 失败路径(慢速路径入口): 如果block标志不为0,说明有写者正在活动。trylock失败,读者必须撤销之前的乐观递增(this_cpu_dec),然后返回falsercuwait_wake_up(&sem->writer)用于唤醒可能正在等待读者计数器清零的写者,以防万一。

2. 写者的“尝试”锁 (__percpu_down_write_trylock)

这个函数非常简单,它通过atomic_xchg原子地将sem->block的值从0交换为1。如果交换前的值是0,则说明成功获取了写者-写者互斥锁,返回true

3. 公平的慢速路径 (percpu_rwsem_waitpercpu_rwsem_wake_function)

当读者的快速路径或写者的trylock失败时,它们都会进入percpu_rwsem_wait,排队等待。

  1. 入队 (percpu_rwsem_wait):

    • 任务创建一个等待队列项wq_entry,并将其唤醒函数指定为percpu_rwsem_wake_function
    • sem->waiters.lock自旋锁的保护下,它再次尝试获取锁 (__percpu_rwsem_trylock)。这是为了防止在它准备睡眠和锁被释放之间发生竞态条件。
    • 如果仍然失败,它将自己添加到sem->waiters等待队列的尾部,然后进入循环睡眠(schedule())。
  2. 唤醒与仲裁 (percpu_rwsem_wake_function):

    • 当写者调用percpu_up_write时,它会调用__wake_up,这会遍历等待队列,并对每个等待项调用percpu_rwsem_wake_function
    • 尝试获取锁: 唤醒函数做的第一件事就是代表被唤醒的任务尝试获取锁
    • 公平FIFO与读者优先: wake_function的返回值有特殊含义。通过返回!reader,它实现了一种读者优先的FIFO(先进先出)策略__wake_up会持续唤醒队列中的任务,直到wake_function返回一个真值(即>0)。
      • 如果被唤醒的是一个读者!readerfalse(0),__wake_up会继续唤醒下一个等待者。
      • 如果被唤醒的是一个写者!readertrue(1),__wake_up会停止唤醒。
      • 效果: percpu_up_write的一次调用会唤醒队列头部所有连续的读者,直到遇到第一个写者为止,然后只唤醒那个写者。这既保证了FIFO顺序(不会饿死写者),又允许多个读者并发进入,提高了吞吐量。

特定场景分析:单核、无MMU的STM32H750平台

硬件交互与MMU

这部分代码是纯粹的内核同步原语,与硬件或MMU无关。

单核环境下的行为

  • __percpu_down_read_trylock:
    • preempt_disable()/preempt_enable()(在__percpu_rwsem_trylock中)是关键。它确保了“乐观递增”和“检查block标志”这两个步骤之间不会发生任务切换。这就防止了在单核系统上因抢占而导致的数据竞争。
    • 内存屏障smp_mb()在单核系统上退化为编译器屏障barrier(),仍然是必需的,以防止编译器进行有害的指令重排。
  • 等待队列:
    • 等待队列和schedule()机制在单核系统上是标准的核心调度器功能,其行为与多核系统一致。当一个任务等待锁时,它会被置为睡眠状态,调度器会选择另一个可运行的任务来执行。

实际意义

这部分代码展示了percpu_rw_semaphore如何在保证高性能快速路径的同时,提供一个健壮、公平的慢速路径。在STM32H750上:

  • 当没有文件锁操作时,多个任务读取/proc/locks的开销极低,因为它们都走的快速路径。
  • 当有任务在修改文件锁(例如,flock()一个文件)时,sem->block被设置。此时,其他尝试读取/proc/locks的任务会发现快速路径关闭,然后通过percpu_rwsem_wait进入睡眠,礼让写者。
  • 写者完成后,它会公平地唤醒正在等待的读者。

这种机制使得即使在单核嵌入式系统上,也能在读多写少的场景下获得良好的并发性能,同时保证了写者不会被饿死。

代码分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
// __percpu_down_read_trylock: 读者的快速路径尝试加锁。
static bool __percpu_down_read_trylock(struct percpu_rw_semaphore *sem)
{
// 1. 乐观地递增本地CPU的读计数器。
this_cpu_inc(*sem->read_count);

// 2. 内存屏障,与写者路径的屏障配对。
smp_mb();

// 3. 检查写者是否活动。acquire屏障确保临界区代码不会被重排到此之前。
if (likely(!atomic_read_acquire(&sem->block)))
return true; // 成功进入临界区。

// 4. 失败路径:写者活动,撤销之前的递增。
this_cpu_dec(*sem->read_count);
// 唤醒可能正在等待的写者。
rcuwait_wake_up(&sem->writer);

return false;
}

// __percpu_down_write_trylock: 写者的尝试加锁。
static inline bool __percpu_down_write_trylock(struct percpu_rw_semaphore *sem)
{
// 检查block标志是否已被设置。
if (atomic_read(&sem->block))
return false;
// 原子地将block标志从0交换为1。如果交换前是0,则成功。
return atomic_xchg(&sem->block, 1) == 0;
}

static bool __percpu_rwsem_trylock(struct percpu_rw_semaphore *sem, bool reader)
{
if (reader) {
bool ret;

preempt_disable();
ret = __percpu_down_read_trylock(sem);
preempt_enable();

return ret;
}
return __percpu_down_write_trylock(sem);
}

// percpu_rwsem_wake_function: 等待队列的自定义唤醒函数。
static int percpu_rwsem_wake_function(struct wait_queue_entry *wq_entry,
unsigned int mode, int wake_flags,
void *key)
{
// ... (获取任务指针等) ...
bool reader = wq_entry->flags & WQ_FLAG_CUSTOM;
struct percpu_rw_semaphore *sem = key;
struct task_struct *p;

// 代表被唤醒的任务尝试获取锁。
if (!__percpu_rwsem_trylock(sem, reader))
return 1; // 尝试失败,但仍然返回1,让唤醒者停止。

// ... (成功获取锁,将任务从等待队列移除并唤醒) ...
p = get_task_struct(wq_entry->private);
list_del_init(&wq_entry->entry);
smp_store_release(&wq_entry->private, NULL);

// 关键的返回逻辑:如果是读者,返回0,让唤醒者继续唤醒下一个。
wake_up_process(p);
put_task_struct(p);

// 如果是写者,返回1,让唤醒者停止。
return !reader;
}

// percpu_rwsem_wait: 慢速路径,用于排队等待。
static void percpu_rwsem_wait(struct percpu_rw_semaphore *sem, bool reader,
bool freeze)
{
DEFINE_WAIT_FUNC(wq_entry, percpu_rwsem_wake_function);
bool wait;

spin_lock_irq(&sem->waiters.lock);
// 在锁保护下,再次尝试获取锁,防止竞态。
wait = !__percpu_rwsem_trylock(sem, reader);
if (wait) {
// 如果仍然失败,将自己添加到等待队列尾部。
__add_wait_queue_entry_tail(&sem->waiters, &wq_entry);
}
spin_unlock_irq(&sem->waiters.lock);

// 循环睡眠,直到被唤醒并成功获取锁。
while (wait) {
set_current_state(TASK_UNINTERRUPTIBLE |
(freeze ? TASK_FREEZABLE : 0));
// 检查wq_entry.private是否为NULL,这是被唤醒的信号。
if (!smp_load_acquire(&wq_entry.private))
break;
schedule();
}
__set_current_state(TASK_RUNNING);
}

bool __sched __percpu_down_read(struct percpu_rw_semaphore *sem, bool try,
bool freeze)
{
if (__percpu_down_read_trylock(sem))
return true;

if (try)
return false;

trace_contention_begin(sem, LCB_F_PERCPU | LCB_F_READ);
preempt_enable();
percpu_rwsem_wait(sem, /* .reader = */ true, freeze);
preempt_disable();
trace_contention_end(sem, 0);

return true;
}
EXPORT_SYMBOL_GPL(__percpu_down_read);

Per-CPU读写信号量销毁:释放其占用的Per-CPU内存

本代码片段定义了 percpu_free_rwsem 函数,它是 percpu_rw_semaphore最终销毁函数。当一个 percpu_rw_semaphore 对象不再被需要时(例如,当一个文件系统被卸载时,其 super_block 中的锁被销毁),内核会调用此函数。其核心功能是安全地释放与该信号量关联的所有动态分配的资源,其中最主要的就是为每个CPU核心分配的读者计数器(reader counters)

实现原理分析

percpu_free_rwsem 的销毁过程必须小心谨慎,因为它要清理的是一个用于复杂并发控制的结构体。

  1. 安全检查 (Kludge):

    • 函数的第一行是 if (!sem->read_count) return;
    • sem->read_count 是一个指向 per-CPU 读者计数器内存的指针。这个指针是在信号量初始化时通过 alloc_percpu 分配的。
    • 代码注释中提到,这是一个“临时的权宜之计(temporary kludge)”。这是因为在某些错误处理路径中(如 alloc_super 分配超级块失败),一个 super_block 结构体可能已经被 kzalloc 分配并清零,但其内部的 percpu_rw_semaphore 尚未被正确初始化。在这种情况下,sem->read_count 将是 NULL。这个检查确保了在这种“半初始化”状态下调用 percpu_free_rwsem 不会导致对 NULL 指针的解引用,从而避免了内核崩溃。
  2. RCU 同步销毁 (rcu_sync_dtor):

    • percpu_rw_semaphore 依赖一个 rcu_sync 状态机来安全地切换读者的快慢路径。这个状态机可能会通过 call_rcu 注册一些延迟回调函数。
    • rcu_sync_dtor(&sem->rss) 的作用是确保所有与 sem->rss 相关的、可能还在等待执行的RCU回调都被安全地取消或等待其完成
    • 这是一个至关重要的步骤。如果没有这一步,free_percpu 可能会释放掉 read_count 的内存,但之后一个“延迟”的RCU回调被触发,它可能会尝试访问这个已经被释放的 sem 结构体,导致典型的 use-after-free 错误。
  3. 释放Per-CPU内存 (free_percpu):

    • 这是函数的核心操作。free_percpualloc_percpu 的配对函数。
    • 它会遍历所有可能的CPU,并释放当初为每个CPU分配的读者计数器所占用的内存。最终,它会释放整个 sem->read_count 指针所指向的内存区域。
  4. 指针投毒 (Poisoning):

    • sem->read_count = NULL;。在释放了指针指向的内存后,立即将指针本身设置为NULL。这是一种防御性编程的最佳实践,被称为“指针投毒”。
    • 这样做的好处是,如果内核中存在错误的代码,在 percpu_free_rwsem 执行后仍然尝试通过 sem->read_count 访问内存,它不会访问到一块随机的、可能已被重新分配的内存(这会导致难以追踪的数据损坏),而是会立即触发一个空指针解引用,导致一个明确的、容易调试的内核崩溃。

代码分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// percpu_free_rwsem: 释放一个per-CPU读写信号量所占用的资源。
void percpu_free_rwsem(struct percpu_rw_semaphore *sem)
{
/*
* XXX: 临时的权宜之计。alloc_super()中的错误处理路径
* 假定在kzalloc()之后直接调用percpu_free_rwsem()是安全的。
*/
// 安全检查:如果read_count指针是NULL(意味着信号量从未被成功初始化),
// 则直接返回,避免对空指针进行操作。
if (!sem->read_count)
return;

// 步骤1: 销毁并同步RCU同步结构体。
// 确保所有与此信号量相关的延迟RCU回调都已完成或被取消。
rcu_sync_dtor(&sem->rss);
// 步骤2: 释放为读者计数器分配的per-CPU内存。
free_percpu(sem->read_count);
// 步骤3: 指针投毒。将指针设为NULL,以帮助捕获use-after-free类型的bug。
sem->read_count = NULL;
}
// 导出符号,使得其他GPL许可的内核模块可以调用此函数。
EXPORT_SYMBOL_GPL(percpu_free_rwsem);