[toc]

include/linux/seqlock.h 顺序锁(Sequence Lock) 针对读多写少场景的无锁读者优化

历史与背景

这项技术是为了解决什么特定问题而诞生的?

seqlock(顺序锁)是一种高度特化的同步原语,它的诞生是为了解决一个在传统读写锁(rw_lock)中存在的经典性能问题:写者饥饿(Writer Starvation)

在标准的读写锁模型中,当一个或多个读者持有读锁时,任何试图获取写锁的写者都必须等待。如果读者频繁地、接连不断地到来,那么写者可能会被无限期地延迟,即“饿死”。这在某些读操作极其频繁、写操作较少的场景下会成为严重的性能瓶颈。

seqlock通过颠覆这个模型来解决问题:它赋予写者绝对的优先权。一个写者永远不会被读者阻塞,它可以随时进入临界区并修改数据,即使当时有读者正在读取。为了在这种“不加保护”的读取中保证数据的一致性,它引入了一个序列计数器,让读者自己来检测在读取期间是否有写者介入,并在检测到冲突时进行重试。

它的发展经历了哪些重要的里程碑或版本迭代?

  • 作为一种优化模式被引入seqlock不是一个通用的锁,而是作为一种针对特定场景的性能优化被引入内核的。其核心思想——使用序列计数器来验证读取的原子性——是其最重要的设计里程碑。
  • 在时间子系统中的应用seqlock最经典、最成功的应用是在内核的时间管理子系统中。系统中有大量的代码需要读取当前时间(如jiffiesxtime),而只有一个地方(时钟中断)会更新它。使用seqlock保护时间变量,使得无数的读者可以极快地、无锁地获取时间,而唯一的写者(时钟中断)也几乎不受影响。
  • seqcount_t的拆分:后来,内核将seqlock的核心机制——序列计数器——单独拆分出来,形成了一个更底层的原语seqcount_t。这允许开发者在更复杂的锁结构中只使用其计数器和验证逻辑,而将写者之间的互斥逻辑交由其他锁(如自旋锁或互斥锁)来管理,提供了更大的灵活性。

目前该技术的社区活跃度和主流应用情况如何?

seqlock是一个非常成熟和稳定的内核同步机制。由于其高度特化,它的代码本身不经常变动,但它在内核中一些对性能极其敏感的核心路径上被广泛使用。

  • 内核时间管理:保护jiffies_64xtime_lock等。
  • 调度器:用于保护某些调度器统计信息。
  • 其他读多写少的场景:例如,系统V IPC的某些状态变量。

它被认为是内核工具箱中一个重要的、但需要谨慎使用的专家级工具。

核心原理与设计

它的核心工作原理是什么?

seqlock的核心是一个序列计数器(sequence counter)和一个用于保护写者之间互斥的自旋锁(spinlock)

  1. 数据结构 (seqlock_t)

    • 一个seqcount_t类型的序列计数器。
    • 一个spinlock_t类型的自旋锁。
  2. 写者(Writer)的流程 (write_seqlock / write_sequnlock)
    a. 获取自旋锁,防止其他写者进入。
    b. 将序列计数器加一。此时,计数器变为奇数,这是一个明确的“正在写入”信号。
    c. 修改被保护的数据。
    d. 再次将序列计数器加一。此时,计数器恢复为偶数
    e. 释放自旋锁。

  3. 读者(Reader)的流程 (read_seqbegin / read_seqretry)
    a. 进入一个循环
    b. 调用read_seqbegin(),读取并保存当前序列号(我们称之为start_seq)。
    c. 检查奇偶性:如果start_seq是奇数,说明此时有写者正在修改数据,读者必须自旋等待,直到它变为偶数。(这是通过read_seqbegin_or_lock等更现代的接口实现的优化)。
    d. 读取被保护的数据副本。
    e. 调用read_seqretry(),它会再次读取序列号(我们称之为end_seq)。
    f. 验证:比较start_seqend_seq。如果start_seq != end_seq或者start_seq是奇数,说明在读者读取数据的过程中,一个写者进入并完成了修改。读者刚才读取的数据是无效的、可能被部分修改过的
    g. 如果验证失败,循环从头开始,重新读取。如果验证成功,说明读取过程没有被打扰,数据是一致的,循环结束。

它的主要优势体现在哪些方面?

  • 写者无饥饿:写者永远不会等待读者,这是其核心设计目标。
  • 极快的读者路径:在没有写者竞争的理想情况下,读者路径是完全无锁的。它只涉及几次内存读取操作(读计数器和数据),不会导致不同CPU之间的缓存行争用,性能极高。
  • 低开销:锁本身的数据结构非常小。

它存在哪些已知的劣势、局限性或在特定场景下的不适用性?

  • 读者可能重试:如果写操作频繁或耗时较长,读者可能会多次重试,导致读者路径的性能急剧下降。因此,它只适用于写操作非常少且非常快的场景。
  • 被保护的数据不能包含指针:这是seqlock最重要、最危险的限制。因为读者读取数据和验证数据有效性是分离的。读者可能读取到一个指针,然后被抢占;此时写者介入,修改了数据并释放了该指针指向的内存;读者恢复执行,它通过序列号检查发现数据无效,但它手中已经持有一个悬空指针,如果后续代码不小心使用了这个指针,将导致Use-After-Free漏洞。因此,seqlock只能用于保护那些可以安全地按值复制的、不含指针的简单数据(如整数、不含指针的结构体)。

使用场景

在哪些具体的业务或技术场景下,它是首选解决方案?

seqlock是**“读多、写极少、写极快、数据为纯值”**场景下的性能优化利器。

  • 内核时间变量:如上所述,这是最完美的例子。只有一个写者(时钟中断),写操作极快(更新一个64位整数),有无数的读者,且数据是纯值。
  • 系统状态标志或计数器:某个子系统有一组状态变量,由该子系统内部的少数几个点进行快速更新,而内核的其他很多地方都需要读取这些状态。

是否有不推荐使用该技术的场景?为什么?

  • 写操作频繁或耗时:会造成读者性能严重下降,此时标准的rw_lockmutex是更好的选择。
  • 保护含有指针的数据结构:绝对禁止,原因如上所述。应使用rw_lock或RCU(Read-Copy-Update)。
  • 通用的驱动程序锁:对于大多数设备驱动程序,其临界区保护逻辑不符合seqlock的苛刻前提。开发者应默认使用更安全、更通用的mutexspinlock

对比分析

请将其 与 其他相似技术 进行详细对比。

特性 Seqlock RCU (Read-Copy-Update) 读写锁 (RW_Lock) 自旋锁/互斥锁 (Spinlock/Mutex)
写者饥饿 不会 不会 (读者优先版本) 不适用 (只有一个持有者)
读者开销(无争用) 极低 (无锁,几次内存读) 极低 (无锁,无原子操作) 中等 (有原子操作) 中等/高 (有原子操作/可能睡眠)
读者行为(有争用) 自旋重试 (可能性能差) 无阻塞 (可看到旧数据) 阻塞 (自旋或睡眠) 阻塞 (自旋或睡眠)
数据约束 严格 (不能含指针) 严格 (指针管理复杂,需优雅期)
写者开销 低 (一个自旋锁) 高 (复制数据结构,管理回调) 中等 (原子操作,可能阻塞) 中等/高
能否在读者中睡眠 否 (spinlock版) / 是 (semaphore版) 否 (spinlock) / 是 (mutex)
核心思想 读者乐观读取,事后验证 写者创建新副本,延迟释放旧副本 读者之间共享,读者与写者互斥 完全互斥

seqcount_latch_t seqcount_t

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/*
* 序列计数器 (seqcount_t)
*
* 这是原始计数机制,没有任何写入器保护。
*
* 写入端关键部分必须是序列化的且不可抢占的。
*
* 如果可以从 hardirq 或 softirq 上下文调用 reader,则在进入写入部分之前,还必须分别禁用中断或下半部分。
*
* 如果受保护的数据包含指针,则不能使用此机制,因为写入器可以使读取器正在遵循的指针失效。
*
* 如果写序列化机制是常见的内核锁定原语之一,请改用带有关联锁 (seqcount_LOCKNAME_t) 的序列计数器。
*
* 如果需要自动处理 sequence counter writer 序列化和非抢占性要求,请改用 sequential lock (seqlock_t)。
*
* 参见 Documentation/locking/seqlock.rst
*/
typedef struct seqcount {
unsigned sequence;
#ifdef CONFIG_DEBUG_LOCK_ALLOC
struct lockdep_map dep_map;
#endif
} seqcount_t;
/*
* 锁存序列计数器 (seqcount_latch_t)
*
* 一种序列计数器变体,其中计数器偶数/奇数值用于在受保护数据的两个副本之间切换。这允许读取路径(通常是 NMI)安全地中断写入端关键部分。
*
* 由于 write 部分是完全抢占的,因此不需要特殊处理forPREEMPT_RT。
*/
typedef struct {
seqcount_t seqcount;
} seqcount_latch_t;

write_seqlock 写入序列锁加锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
static inline void do_raw_write_seqcount_begin(seqcount_t *s)
{
kcsan_nestable_atomic_begin();
s->sequence++;
smp_wmb();
}

static inline void do_write_seqcount_begin_nested(seqcount_t *s, int subclass)
{
seqcount_acquire(&s->dep_map, subclass, 0, _RET_IP_); //锁定检测
do_raw_write_seqcount_begin(s);
}

static inline void do_write_seqcount_begin(seqcount_t *s)
{
do_write_seqcount_begin_nested(s, 0);
}

/**
* write_seqlock() - 开始一个 seqlock_t 写入端关键部分
* @sl:指向 seqlock_t 的指针
*
* write_seqlock 打开给定seqlock_t的写入端关键部分。 它还隐式获取嵌入在该 Sequential 锁中的 spinlock_t。因此,所有 seqlock_t 写入侧段都会自动序列化且不可抢占。
*
* 上下文:如果 seqlock_t 读取部分或其他写入端关键部分可以从 hardirq 或 softirq 上下文中调用,请改用此函数的 _irqsave 或 _bh 变体。
*/
static inline void write_seqlock(seqlock_t *sl)
{
spin_lock(&sl->lock);
do_write_seqcount_begin(&sl->seqcount.seqcount);
}

write_sequnlock 写入序列锁解锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
static inline void do_raw_write_seqcount_end(seqcount_t *s)
{
smp_wmb();
s->sequence++;
kcsan_nestable_atomic_end();
}

static inline void do_write_seqcount_end(seqcount_t *s)
{
seqcount_release(&s->dep_map, _RET_IP_);
do_raw_write_seqcount_end(s);
}

/**
* write_sequnlock() - 结束 seqlock_t 写入端关键部分
* @sl:指向 seqlock_t 的指针
*
* write_sequnlock 关闭给定seqlock_t的 (序列化和非抢占式) write 端 critical 部分。
*/
static inline void write_sequnlock(seqlock_t *sl)
{
do_write_seqcount_end(&sl->seqcount.seqcount);
spin_unlock(&sl->lock);
}

__seqprop

  • 利用了 C11 标准中的 _Generic 关键字来实现类型泛型编程。类型泛型允许根据表达式的类型选择不同的代码路径,从而实现类似于函数重载的功能。
  • _Generic 是 C11 中引入的一个关键字,用于实现类型分发。它的语法类似于一个类型匹配的 switch 语句。_Generic(*(s), …) 会根据 *(s) 的类型(即 s 所指向的对象的类型)选择对应的分支。
1
2
3
4
5
6
7
8
9
#define __seqprop_case(s, lockname, prop)				\
seqcount_##lockname##_t: __seqprop_##lockname##_##prop

#define __seqprop(s, prop) _Generic(*(s), \
seqcount_t: __seqprop_##prop, \
__seqprop_case((s), raw_spinlock, prop), \
__seqprop_case((s), spinlock, prop), \
__seqprop_case((s), rwlock, prop), \
__seqprop_case((s), mutex, prop))

read_seqbegin 读取序列锁开始

  • 计数为奇数,证明写入端正在进行.读者需要重新读取.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
static inline unsigned __seqprop_sequence(const seqcount_t *s)
{
return smp_load_acquire(&s->sequence);
}

#define seqprop_sequence(s) __seqprop(s, sequence)(s)

#define __read_seqcount_begin(s) \
({ \
unsigned __seq; \
\
while (unlikely((__seq = seqprop_sequence(s)) & 1)) \
cpu_relax(); \ //barrier
\
kcsan_atomic_next(KCSAN_SEQLOCK_REGION_MAX); \
__seq; \
})

#define raw_read_seqcount_begin(s) __read_seqcount_begin(s)

#define read_seqcount_begin(s) \
({ \
seqcount_lockdep_reader_access(seqprop_const_ptr(s)); \
raw_read_seqcount_begin(s); \
})

/**
* read_seqbegin() - 开始一个 seqlock_t 读取端关键部分
* @sl:指向 seqlock_t 的指针
*
* 返回: count,传递给 read_seqretry()
*/
static inline unsigned read_seqbegin(const seqlock_t *sl)
{
return read_seqcount_begin(&sl->seqcount);
}

read_seqretry 读取序列锁结束

  • 返回读取时的计数.如果读取时的计数和当前计数不一致,则返回 true,需要重新读取.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
static inline int do___read_seqcount_retry(const seqcount_t *s, unsigned start)
{
kcsan_atomic_next(0);
return unlikely(READ_ONCE(s->sequence) != start);
}

static inline int do_read_seqcount_retry(const seqcount_t *s, unsigned start)
{
smp_rmb();
return do___read_seqcount_retry(s, start);
}

#define read_seqcount_retry(s, start) \
do_read_seqcount_retry(seqprop_const_ptr(s), start)

/**
* read_seqretry() - 结束 seqlock_t 读取侧部分
* @sl:指向 seqlock_t 的指针
* @start:计数,从 read_seqbegin()
*
* read_seqretry 关闭给定seqlock_t的读取端临界部分。如果 critical 部分无效,则必须忽略它 (并且通常会重试)。
*
* 返回:如果需要重试读取部分,则为 true,否则为 false
*/
static inline unsigned read_seqretry(const seqlock_t *sl, unsigned start)
{
return read_seqcount_retry(&sl->seqcount, start);
}

seqcount_raw_spinlock_init Seqcount 原始自旋锁初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
static inline void __seqcount_init(seqcount_t *s, const char *name,
struct lock_class_key *key)
{
/*
* Make sure we are not reinitializing a held lock:
*/
lockdep_init_map(&s->dep_map, name, key, 0);
s->sequence = 0;
}

# define seqcount_init(s) __seqcount_init(s, NULL, NULL)

/*
* seqcount_LOCKNAME_init() - runtime initializer for seqcount_LOCKNAME_t
* @s: Pointer to the seqcount_LOCKNAME_t instance
* @lock: Pointer to the associated lock
*/

#define seqcount_LOCKNAME_init(s, _lock, lockname) \
do { \
seqcount_##lockname##_t *____s = (s); \
seqcount_init(&____s->seqcount); \
__SEQ_LOCK(____s->lock = (_lock)); \
} while (0)

#define seqcount_raw_spinlock_init(s, lock) seqcount_LOCKNAME_init(s, lock, raw_spinlock)

read_seqcount_latch_retry 结束 seqcount_latch_t 阅读部分

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/**
* raw_read_seqcount_latch_retry() - 结束 seqcount_latch_t 阅读部分
* @s:指向 seqcount_latch_t 的指针
* @start:计数,从 raw_read_seqcount_latch()
*
* 返回:如果需要重试读取部分,则为 true,否则为 false
*/
static __always_inline int
raw_read_seqcount_latch_retry(const seqcount_latch_t *s, unsigned start)
{
smp_rmb();
return unlikely(READ_ONCE(s->seqcount.sequence) != start);
}
/**
* read_seqcount_latch_retry() - 结束 seqcount_latch_t 阅读部分
* @s:指向 seqcount_latch_t 的指针
* @start:计数,从 read_seqcount_latch()
*
* 返回:如果需要重试读取部分,则为 true,否则为 false
*/
static __always_inline int
read_seqcount_latch_retry(const seqcount_latch_t *s, unsigned start)
{
kcsan_atomic_next(0);
return raw_read_seqcount_latch_retry(s, start);
}

raw_write_seqcount_latch 将锁存读取器重定向到偶数/奇数副本

1
2
3
4
5
6
7
8
9
10
/**
* raw_write_seqcount_latch() - 将锁存读取器重定向到偶数/奇数副本
* @s:指向 seqcount_latch_t 的指针
*/
static __always_inline void raw_write_seqcount_latch(seqcount_latch_t *s)
{
smp_wmb(); /* 在递增 “sequence” 之前先存储 */
s->seqcount.sequence++;
smp_wmb(); /* 在跟随 stores 之前增加 “sequence” */
}

write_seqcount_latch_begin 将 Latch 读取器重定向到 奇数副本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
/**
* write_seqcount_latch_begin() - 将 Latch 读取器重定向到 奇数副本
* @s: 指向 seqcount_latch_t 的指针
*
* latch 技术是一种多版本并发控制方法,允许在非原子修改期间进行查询。
* 如果您可以保证查询永远不会中断修改 —— 例如,并发严格在 CPU 之间 —— 您很可能不需要这样做。
*
* 传统的 RCU/无锁数据结构依赖于原子修改来确保查询遵守旧状态或新状态,而锁存器允许非原子更新。
* 权衡是存储成本翻倍;我们必须维护整个数据结构的两个副本。
*
* 简单来说:我们先修改一个副本,然后再修改另一个副本。这确保了始终有一个副本处于稳定状态,随时准备给我们答案。
*
* 基本形式是这样的数据结构:
*
* struct latch_struct {
* seqcount_latch_t seq;
* struct data_struct data[2];
* };
*
* 其中,假定为外部序列化的修改执行以下作:
*
* void latch_modify(struct latch_struct *latch, ...)
* {
* write_seqcount_latch_begin(&latch->seq);
* modify(latch->data[0], ...);
* write_seqcount_latch(&latch->seq);
* modify(latch->data[1], ...);
* write_seqcount_latch_end(&latch->seq);
* }
*
* 查询将具有如下所示的形式:
*
* struct entry *latch_query(struct latch_struct *latch, ...)
* {
* struct entry *entry;
* unsigned seq, idx;
*
* do {
* seq = read_seqcount_latch(&latch->seq);
*
* idx = seq & 0x01;
* entry = data_query(latch->data[idx], ...);
*
* // This includes needed smp_rmb()
* } while (read_seqcount_latch_retry(&latch->seq, seq));
*
* return entry;
* }
*
* 因此,在修改过程中,查询首先重定向到 data[1]。然后 wemodify data[0]。
* 完成后,我们将查询重定向回 data[0],我们可以修改 data[1].
*
* NOTE:
*
* 对原子修改的非要求_不包括在 data 是动态数据结构的情况下发布新条目。
*
* 迭代可能会从 data[0] 开始并暂停足够长的时间以错过整个修改序列,一旦恢复,它可能会观察到新的条目。
*
* NOTE 2:
*
* 当数据是动态数据结构时;应该使用常规的 RCU 模式来管理其中对象的生命周期。
*/
static __always_inline void write_seqcount_latch_begin(seqcount_latch_t *s)
{
kcsan_nestable_atomic_begin();
raw_write_seqcount_latch(s);
}

write_seqcount_latch 重定向 Latch Reader 以均匀复制

1
2
3
4
5
6
7
8
/**
* write_seqcount_latch() - 重定向 Latch Reader 以均匀复制
* @s:指向 seqcount_latch_t 的指针
*/
static __always_inline void write_seqcount_latch(seqcount_latch_t *s)
{
raw_write_seqcount_latch(s);
}

write_seqcount_latch_end 结束 seqcount_latch_t 写入部分

1
2
3
4
5
6
7
8
9
10
/**
* write_seqcount_latch_end() - 结束 seqcount_latch_t 写入部分
* @s:指向 seqcount_latch_t 的指针
*
* 在更新闩锁保护数据的所有副本之后,标记 seqcount_latch_t writer 部分的结束。
*/
static __always_inline void write_seqcount_latch_end(seqcount_latch_t *s)
{
kcsan_nestable_atomic_end();
}