[toc]
在这里插入图片描述

include/linux/irq_work.h

__IRQ_WORK_INIT

1
2
3
4
5
6
7
#define __IRQ_WORK_INIT(_func, _flags) (struct irq_work){	\
.node = { .u_flags = (_flags), }, \
.func = (_func), \
.irqwait = __RCUWAIT_INITIALIZER(irqwait), \
}

#define IRQ_WORK_INIT(_func) __IRQ_WORK_INIT(_func, 0)

kernel/irq_work.c IRQ上下文延迟工作(IRQ Context Deferred Work) 在安全时机执行来自中断的紧急任务

历史与背景

这项技术是为了解决什么特定问题而诞生的?

kernel/irq_work.c 实现的irq_work机制是为了解决一个非常特殊且棘手的内核同步问题:如何从一个“硬中断(Hard IRQ)”上下文中,安全地执行一个需要“中断已打开”环境的、但又必须在当前CPU上尽快执行的任务

在标准的硬中断处理程序(Top-Half)中,执行环境是极其受限的:

  • 当前CPU上的中断通常是被屏蔽的。
  • 绝对不能睡眠。
  • 执行时间必须极短。

然而,有时硬中断处理程序需要执行一个操作,这个操作本身很简短,但它可能需要获取一个要求中断打开的锁(例如,某些spinlock_t的变体或需要与其他CPU通信的锁),或者需要执行一个依赖于中断使能状态的架构相关指令。

在这种情况下,标准的下半部机制(softirq, tasklet)并不完全适用,因为:

  • 它们虽然也在中断上下文执行,但其调度时机可能相对“较晚”,无法保证在当前中断返回到用户空间或空闲状态之前执行。
  • softirq可能会在其他CPU上执行,无法保证在当前CPU上完成任务。

irq_work的诞生就是为了填补这个空白:它提供了一种机制,能将一个工作项(work item)从硬中断上下文“延迟”到一个非常近的未来,在这个时间点,硬件中断已经重新打开,但我们仍然在当前CPU的中断上下文中,并且可以保证这个工作会在CPU做任何其他事情(如返回用户空间或进入idle)之前被执行。

它的发展经历了哪些重要的里程碑或版本迭代?

irq_work是一个相对现代的内核机制,它的出现本身就是一个重要的里程碑,标志着内核对SMP(对称多处理)系统中复杂竞态条件和性能优化的理解达到了新的深度。

  • 作为特定问题的解决方案被引入:它最初是为了解决一些特定的、与体系结构相关的同步问题而被引入的,例如ARM架构中的惰性TLB(Translation Lookaside Buffer)刷新管理。
  • 被核心子系统采用:由于其独特的执行时机保证,它很快被内核中其他对延迟和上下文敏感的子系统所采用,例如硬件性能计数器(perf events)、跟踪(tracing)等。

目前该技术的社区活跃度和主流应用情况如何?

irq_work是一个高度特化的、底层的内核工具,其代码非常稳定。它不是给普通设备驱动开发者使用的通用接口。它的用户主要是内核核心开发者和体系结构维护者。
其主流应用包括:

  • 体系结构相关的维护工作:如TLB管理、缓存维护等。
  • 性能分析与跟踪perfftrace等工具在处理来自中断上下文的事件时,会使用irq_work来安全地更新数据结构。
  • 虚拟化:在某些虚拟化场景中,用于处理来自Guest的、需要特殊上下文的请求。

核心原理与设计

它的核心工作原理是什么?

irq_work的核心原理是利用一个**自定义的、低优先级的处理器间中断(Inter-Processor Interrupt, IPI)**作为“延迟执行”的触发器。

  1. 数据结构:每个CPU都有一个私有的irq_work链表,用于存放待处理的工作项(struct irq_work)。

  2. 排队 (irq_work_queue)

    • 当一个硬中断处理程序需要调度一个irq_work时,它会调用irq_work_queue()
    • 这个函数非常快。它首先会尝试将工作项原子地添加到当前CPU的私有链表中。
    • 如果添加成功(即链表之前是空的),它会向当前CPU发送一个特设的IPI(IRQ_WORK_VECTOR)。
  3. IPI处理程序

    • 这个特设IPI的处理程序是irq_work机制的“魔力”所在。当CPU响应这个IPI时,它会执行irq_work_run()函数。
    • 关键点:与硬中断处理程序不同,这个IPI处理程序在执行时,硬件中断是重新打开的
    • irq_work_run()会遍历当前CPU的irq_work链表,并依次执行每个工作项注册的回调函数。
  4. 执行时机保证

    • 由于IPI是在硬中断处理程序即将结束时发送的,并且IPI本身也是一种中断,它通常会在CPU从原始中断返回、准备恢复执行被中断的任务(如用户进程或idle线程)之前被处理。
    • 这就提供了irq_work最独特的保证:工作将在当前CPU上,以中断打开的方式,在下一次调度发生之前被执行

它的主要优势体现在哪些方面?

  • 强执行时机保证:能够确保任务在极低的延迟内,在当前CPU上,在上下文切换之前被执行。这是其他下半部机制无法提供的。
  • 提供了更宽松的执行上下文:它将任务从“硬中断关闭”的环境转移到了“硬中断打开”的环境,允许执行更复杂的操作,特别是涉及需要中断使能的锁。
  • 低排队开销irq_work_queue()本身是一个非常轻量级的操作。

它存在哪些已知的劣势、局限性或在特定场景下的不适用性?

  • 绝对不能睡眠:尽管中断是打开的,但irq_work的回调函数仍然在中断上下文中执行,因此绝对不能调用任何可能导致睡眠的函数
  • 高度特化:它不是一个通用的延迟执行工具,其使用场景非常狭窄,误用可能会导致难以调试的系统问题。
  • Per-CPU特性irq_work总是被调度在发起它的那个CPU上,无法跨CPU调度。

使用场景

在哪些具体的业务或技术场景下,它是首选解决方案?

irq_work是解决那些对执行时机、执行CPU和执行上下文有严格、苛刻要求的场景的首选。

  • 跨CPU的缓存/TLB一致性维护:当CPU A上的一个操作(如修改页表)需要CPU B也执行一个动作(如刷新TLB)时,CPU A可以向CPU B发送一个IPI。在CPU B的IPI处理程序中,直接刷新TLB可能因为锁等原因不安全,此时它就可以调度一个irq_work来在一个安全的环境中完成刷新。
  • 硬件性能事件处理:当一个perf事件(如CPU周期计数器溢出)在硬中断上下文中触发时,其处理程序可能需要更新一些由自旋锁保护的数据结构。使用irq_work可以确保这个更新在当前CPU上立即发生,同时又能安全地获取锁。

是否有不推荐使用该技术的场景?为什么?

  • 普通设备驱动:一个典型的设备驱动(如网络、存储、USB)几乎永远不需要使用irq_work。对于它们的延迟任务,tasklet(如果任务不睡眠)或workqueue(如果任务需要睡眠)是正确且安全的选择。
  • 任何可能睡眠的任务:如果延迟任务需要分配内存(GFP_KERNEL)、与用户空间交互或获取互斥锁/信号量,必须使用workqueue,因为它提供了一个可以安全睡眠的进程上下文。

对比分析

请将其 与 其他相似技术 进行详细对比。

irq_work填补了内核异步执行机制中的一个独特生态位。

特性 硬中断处理程序 (Top-Half) irq_work Tasklet / Softirq Workqueue
执行上下文 硬中断 (中断关闭) 中断 (中断打开) 中断 (中断打开) 进程
能否睡眠 绝对不能 绝对不能 绝对不能 可以
执行CPU 当前CPU 保证在当前CPU 通常在当前CPU (Tasklet);可在任意CPU (Softirq) 可在任意CPU
执行时机 立即 极快。保证在当前中断返回到调度点之前。 。在中断返回后等安全点执行,但无irq_work的强保证。 。由内核调度器决定,作为普通线程运行。
主要用途 紧急硬件交互。 需要中断打开的、紧急的、与CPU相关的维护任务。 高性能数据处理(Softirq),驱动延迟任务(Tasklet)。 任何可能睡眠或耗时长的延迟任务。
编程模型 注册中断处理函数。 irq_work_queue() tasklet_schedule() queue_work()
独特优势 硬件级最低延迟。 独特的时机+地点+上下文保证。 高吞吐量(Softirq),简单易用(Tasklet)。 可以睡眠

irq_work_single 执行单个irq_work项

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
void irq_work_single(void *arg)
{
struct irq_work *work = arg;
int flags;

/*
* Clear the PENDING bit, after this point the @work can be re-used.
* The PENDING bit acts as a lock, and we own it, so we can clear it
* without atomic ops.
*/
flags = atomic_read(&work->node.a_flags);
flags &= ~IRQ_WORK_PENDING;
atomic_set(&work->node.a_flags, flags);

/*
* See irq_work_claim().
*/
smp_mb();

lockdep_irq_work_enter(flags);
work->func(work);
lockdep_irq_work_exit(flags);

/*
* Clear the BUSY bit, if set, and return to the free state if no-one
* else claimed it meanwhile.
*/
(void)atomic_cmpxchg(&work->node.a_flags, flags, flags & ~IRQ_WORK_BUSY);

if ((IS_ENABLED(CONFIG_PREEMPT_RT) && !irq_work_is_hard(work)) ||
!arch_irq_work_has_interrupt())
rcuwait_wake_up(&work->irqwait);
}

irq_work_run_list 遍历并执行链表中的所有irq_work项

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
static void irq_work_run_list(struct llist_head *list)
{
struct irq_work *work, *tmp;
struct llist_node *llnode;

/*
* On PREEMPT_RT IRQ-work which is not marked as HARD will be processed
* in a per-CPU thread in preemptible context. Only the items which are
* marked as IRQ_WORK_HARD_IRQ will be processed in hardirq context.
*/
BUG_ON(!irqs_disabled() && !IS_ENABLED(CONFIG_PREEMPT_RT));

if (llist_empty(list))
return;

llnode = llist_del_all(list);
llist_for_each_entry_safe(work, tmp, llnode, node.llist)
irq_work_single(work);
}

irq_work_tick 在每个tick上,检查并执行当前CPU上所有待处理的“IRQ work”

  • 它的核心作用是:在每个tick上,检查并执行当前CPU上所有待处理的“IRQ work”。
  • irq_work_tick的工作原理是一个周期性的轮询和分发流程:
    • 处理紧急工作 (raised_list):
    • 处理懒惰工作 (lazy_list):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/*
* 这个函数在每个CPU的周期性时钟tick中断中被调用,用于处理irq_work。
*/
void irq_work_tick(void)
{
/* raised: 指向当前CPU私有的、用于存放“被请求执行的”(通常是跨CPU)紧急irq_work的链表头。*/
struct llist_head *raised = this_cpu_ptr(&raised_list);

/*
* 检查raised_list是否不为空,并且当前硬件架构没有专门的、用于处理irq_work的
* 自触发中断机制(如IPI)。
*/
/* arch_irq_work_has_interrupt() = false */
if (!llist_empty(raised) && !arch_irq_work_has_interrupt())
/*
* 如果满足条件,说明我们必须依赖tick中断来轮询和处理这些紧急工作。
* 调用irq_work_run_list来遍历并执行raised_list上的所有工作项。
*/
irq_work_run_list(raised);

/* 检查内核是否不是PREEMPT_RT(实时)内核。*/
if (!IS_ENABLED(CONFIG_PREEMPT_RT))
/*
* 在非实时内核中,直接调用irq_work_run_list来处理当前CPU的
* “懒惰”irq_work链表(lazy_list)。
*/
irq_work_run_list(this_cpu_ptr(&lazy_list));
else
/*
* 在实时内核中,为了降低硬中断处理的延迟,不直接执行irq_work。
* 而是调用wake_irq_workd()来唤醒一个专门的irq_workd内核线程,
* 将处理工作委托给该线程。
*/
wake_irq_workd();
}

irq_work_queue: irq_work的入队与触发

本代码片段展示了Linux内核中irq_work子系统的核心入队逻辑。其主要功能是提供一个名为irq_work_queue的API,允许内核的其他部分将一个irq_work项(一个待在中断上下文中执行的函数)安全、高效地加入到当前CPU的待处理队列中。这个过程是无锁的,并且通过一个状态机来防止重复入队,同时决定是否需要立即触发一个中断来处理该工作。

实现原理分析

该机制是一个为高性能和低延迟设计的、基于原子操作的无锁队列系统。它的核心在于如何协调“入队”操作和“触发中断”这两个步骤,同时处理“紧急”与“懒加载”两种工作模式。

  1. 状态机与所有权 (irq_work_claim):

    • 一个irq_work结构体不能被同时多次排入队列。为了管理这一点,其状态由work->node.a_flags中的原子标志位控制。
    • irq_work_claim是入队的第一步,它的目标是原子地“声明所有权”。
    • 它使用atomic_fetch_or,这是一个原子的“读-修改-写”操作。它会尝试将IRQ_WORK_CLAIMED标志位置位,并返回之前的标志值。
    • 关键逻辑: 通过检查返回的旧标志值中是否包含IRQ_WORK_PENDING,可以判断出这个irq_work在本次操作之前是否已经在队列中了。
      • 如果已在队列中 (oflags & IRQ_WORK_PENDING),意味着另一个上下文已经把它入队并且它正在等待被处理。此时,当前调用者无需再做任何事,irq_work_claim返回false
      • 如果不在队列中,那么当前调用者就成功地“声明”了这个工作项,并有责任将它真正地放入队列中。irq_work_claim返回true
    • 这个机制完美地解决了重复入队的竞态问题,而且完全无锁。
  2. 双队列与懒加载 (__irq_work_queue_local):

    • 每个CPU都有两个irq_work队列:raised_list用于紧急工作,lazy_list用于非紧急工作。
    • 一个irq_work可以通过设置IRQ_WORK_LAZY标志来表明自己是“懒”的。
    • 紧急工作: 如果工作不是懒的,它会被加入到raised_list中,并且必须立即通过irq_work_raise()触发一个中断来尽快处理它。
    • 懒工作: 如果工作是懒的,它会被加入lazy_list。系统不会为它专门触发一个中断,而是期望它能在下一次自然发生的中断(如时钟中断)中被顺便处理掉。这节省了触发一次中断的开销。
    • llist(Lock-Less List)是一种专为单生产者-单消费者场景设计的无锁链表,非常适合用于CPU自身(生产者)和该CPU的中断处理程序(消费者)之间的通信。
  3. 中断触发 (irq_work_raise):

    • 这是将“有工作待处理”这一信息通知给CPU中断处理程序的机制。
    • arch_irq_work_raise()是一个架构相关的函数。在多核系统上,它通常会向当前CPU发送一个“处理器间中断”(Inter-Processor Interrupt, IPI)。这个IPI会中断当前CPU正在执行的任何代码,并跳转到预设的irq_work中断处理函数,该函数会负责处理raised_list中的工作。
    • __weak属性表示这是一个弱符号,如果特定架构没有提供自己的强实现,就会链接到这个空的默认版本。

代码分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
/*
* An entry can be in one of four states:
*
* free NULL, 0 -> {claimed} : free to be used
* claimed NULL, 3 -> {pending} : claimed to be enqueued
* pending next, 3 -> {busy} : queued, pending callback
* busy NULL, 2 -> {free, claimed} : callback in progress, can be claimed
*/

struct irq_work {
struct __call_single_node node;
void (*func)(struct irq_work *);
struct rcuwait irqwait;
};

#define __IRQ_WORK_INIT(_func, _flags) (struct irq_work){ \
.node = { .u_flags = (_flags), }, \
.func = (_func), \
.irqwait = __RCUWAIT_INITIALIZER(irqwait), \
}

#define IRQ_WORK_INIT(_func) __IRQ_WORK_INIT(_func, 0)
#define IRQ_WORK_INIT_LAZY(_func) __IRQ_WORK_INIT(_func, IRQ_WORK_LAZY)
#define IRQ_WORK_INIT_HARD(_func) __IRQ_WORK_INIT(_func, IRQ_WORK_HARD_IRQ)


/**
* @brief 声明一个irq_work的所有权,以准备将其入队。
* @param work 指向要声明的irq_work结构的指针。
* @return bool 如果成功声明(即之前未入队),返回true;否则返回false。
*/
static bool irq_work_claim(struct irq_work *work)
{
int oflags;

/*
* 原子地将 IRQ_WORK_CLAIMED 和 CSD_TYPE_IRQ_WORK 标志位置位,
* 并返回原来的标志值。这是一个原子的“读-修改-写”操作。
*/
oflags = atomic_fetch_or(IRQ_WORK_CLAIMED | CSD_TYPE_IRQ_WORK, &work->node.a_flags);

/*
* 如果工作项之前已经处于“待处理”(PENDING)状态,
* 意味着它已经在队列里了,我们无需再做任何事。
* 这里有一个隐式的内存屏障与irq_work_single()配对,
* 确保我们之前的操作对处理该工作的中断上下文可见。
*/
if (oflags & IRQ_WORK_PENDING)
return false;

/* 之前未入队,我们成功声明了所有权。 */
return true;
}

/**
* @brief 架构相关的触发irq_work中断的函数(弱符号默认实现)。
* @details 具体的CPU架构应该提供一个强实现来触发一个中断。
* 对于没有特殊中断机制的“简陋”架构,
* irq_work将依赖于下一个时钟节拍中断来处理。
*/
void __weak arch_irq_work_raise(void)
{
/* 空实现 */
}

/**
* @brief 触发一个中断来处理irq_work。
* @param work 指向被入队的irq_work的指针,主要用于追踪。
*/
static __always_inline void irq_work_raise(struct irq_work *work)
{
/* 如果启用了IPI追踪,则记录一次“发送IPI”事件。 */
if (trace_ipi_send_cpu_enabled() && arch_irq_work_has_interrupt())
trace_ipi_send_cpu(smp_processor_id(), _RET_IP_, work->func);

/* 调用架构相关的函数来实际触发中断。 */
arch_irq_work_raise();
}

/**
* @brief 将一个irq_work项加入当前CPU的本地队列。
* @param work 已经被声明所有权的irq_work项。
* @pre 调用此函数前必须已禁用抢占。
*/
static void __irq_work_queue_local(struct irq_work *work)
{
struct llist_head *list;
bool rt_lazy_work = false;
bool lazy_work = false;
int work_flags;

/* 读取工作项的标志。 */
work_flags = atomic_read(&work->node.a_flags);
/* 检查是否是“懒”工作。 */
if (work_flags & IRQ_WORK_LAZY)
lazy_work = true;
/* 在实时(PREEMPT_RT)内核中,非硬中断工作也被当作懒工作处理。 */
else if (IS_ENABLED(CONFIG_PREEMPT_RT) &&
!(work_flags & IRQ_WORK_HARD_IRQ))
rt_lazy_work = true;

/* 根据是否为懒工作,选择对应的每CPU队列。 */
if (lazy_work || rt_lazy_work)
list = this_cpu_ptr(&lazy_list);
else
list = this_cpu_ptr(&raised_list);

/* 使用无锁链表(llist)将工作项添加到队列头部。如果链表非空,返回true。*/
if (!llist_add(&work->node.llist, list))
return;

/* 如果工作是“懒”的,只有在NOHZ模式下tick停止时才需要强制触发中断。 */
/* 否则,对于非懒工作,总是触发中断。 */
/*
bool tick_nohz_tick_stopped(void)
{
struct tick_sched *ts = this_cpu_ptr(&tick_cpu_sched);

return tick_sched_flag_test(ts, TS_FLAG_STOPPED);
}
*/
if (!lazy_work || tick_nohz_tick_stopped())
irq_work_raise(work);
}

/**
* @brief 将一个irq_work项入队到当前CPU。
* @param work 指向要入队的irq_work结构的指针。
* @return bool 如果成功入队,返回true;如果已在队列中,返回false。
*/
bool irq_work_queue(struct irq_work *work)
{
/* 只有在之前未入队的情况下才进行入队操作。 */
if (!irq_work_claim(work))
return false;

/* 禁用抢占,以保护对每CPU队列的访问。 */
preempt_disable();
/* 将工作项加入本地队列,并根据需要触发中断。 */
__irq_work_queue_local(work);
/* 恢复抢占。 */
preempt_enable();

return true;
}
EXPORT_SYMBOL_GPL(irq_work_queue);