[TOC]

mm/percpu.c Per-CPU Variables Management Per-CPU数据管理的核心实现

历史与背景

这项技术是为了解决什么特定问题而诞生的?

这项技术是为了从根本上解决在多核(SMP)系统中并发访问共享数据所带来的性能瓶颈而诞生的。

  • 锁争用(Lock Contention):在多核系统中,如果多个CPU核心频繁地更新同一个全局变量(例如,一个网络数据包统计计数器),它们必须使用锁(如自旋锁)来保护这个变量,以避免数据竞争。当核心数量增加时,对这个锁的争夺会变得非常激烈,导致CPU花费大量时间在等待锁上,而不是执行实际工作,从而严重限制了系统的扩展性。
  • 缓存一致性开销(Cache Coherency Overhead):即使不使用锁(例如使用原子操作),也会有性能问题。当一个CPU核心修改了共享变量,它所在的缓存行(Cache Line)会被标记为“脏”(Modified)。根据缓存一致性协议(如MESI),其他CPU核心上该缓存行的副本必须被置为“无效”(Invalidated)。当其他CPU也想访问这个变量时,就必须从修改过的那个核心的缓存或主存中重新加载数据。这种缓存行在多个CPU核心之间来回传递的现象被称为“缓存行弹跳”(Cache Bouncing)或“伪共享”(False Sharing),会消耗大量的总线带宽和时间。

percpu机制通过为系统中的每个CPU核心提供一个变量的私有副本来解决上述所有问题。每个CPU只访问自己的副本,因此完全不需要锁,也杜绝了缓存行弹跳,实现了真正的无锁并发。

它的发展经历了哪些重要的里程碑或版本迭代?

Linux的Per-CPU实现经历了几个阶段的演进,变得越来越高效和灵活。

  • 早期宏定义阶段:最原始的方法是使用一个简单的数组,并通过一个宏来访问,如 DEFINE_PER_CPU(type, name) 实际上定义了 type name[NR_CPUS],访问时使用 per_cpu(name, smp_processor_id())。这种方式的主要问题是无法解决伪共享,因为数组中相邻CPU的元素很可能位于同一个缓存行内。
  • 对齐与填充:为了解决伪共享,后续的实现为每个CPU的副本增加了填充(Padding),确保每个副本都独占一个或多个缓存行。
  • 专有分配器(Chunk Allocator):现代的 percpu 机制实现了一个高度优化的专有内存分配器,位于 mm/percpu.c。它不再是为每个per-cpu变量单独分配内存,而是将许多per-cpu变量组织到大的**内存块(Chunks)**中。每个块被划分为多个单元(Units),每个CPU一个单元。这种方式:
    1. 内存效率更高:减少了因对齐和填充造成的内存碎片。
    2. 寻址更快:通过巧妙的地址计算,特别是利用段寄存器(如x86上的gsfs),可以在一条指令内完成对当前CPU私有数据的访问,几乎没有运行时开销。
  • 动态分配的支持:除了编译时静态定义的per-cpu变量(DEFINE_PER_CPU),该框架也加入了对动态分配的支持(alloc_percpu, free_percpu),使得驱动程序和模块可以在运行时根据需要创建和销毁per-cpu变量。

目前该技术的社区活跃度和主流应用情况如何?

percpu机制是Linux内核实现高性能和高扩展性的基石之一,是一项极其成熟、稳定且被广泛应用的核心技术。

  • 社区活跃度:作为内核的核心基础设施,其代码非常稳定。相关的改动通常是为了支持新的体系结构、进行性能微调或内存优化。
  • 主流应用:它被内核的各个子系统广泛使用,特别是性能敏感的领域:
    • 调度器:每个CPU的运行队列(struct rq)就是per-cpu变量。
    • 网络栈:用于统计收发包数量、错误等。
    • 文件系统:用于缓存和计数器。
    • 内存管理:用于管理per-cpu的页面缓存。
    • 中断和定时器:每个CPU都有自己的本地定时器和中断处理数据。

核心原理与设计

它的核心工作原理是什么?

mm/percpu.c的核心是实现了一个特殊的内存分配器,它以“块(Chunk)”为单位进行管理。

  1. 静态变量布局:对于静态定义的per-cpu变量(使用DEFINE_PER_CPU),链接器会将它们收集到一个特殊的.data..percpu段中。内核启动时,会为这个段分配第一个per-cpu内存块(first chunk),并将所有静态变量的副本按CPU依次拷贝到每个CPU对应的单元中。
  2. 动态变量分配:当调用alloc_percpu()时,分配器会尝试在现有的内存块中寻找足够的空间。如果找不到,它会从页分配器申请新的物理页来创建一个新的内存块。
  3. 快速寻址:这是percpu机制性能的关键。在x86-64等架构上,内核会将段寄存器(如%gs)指向当前CPU的per-cpu区域的基地址。因此,访问一个静态per-cpu变量(例如 my_var)的汇编指令可以是 movq %gs:my_var_offset, %rax。这只是一条普通的内存访问指令,速度极快,完全避免了查找当前CPU ID再做数组索引的开销。
  4. 安全访问API:内核提供了get_cpu_var()put_cpu_var()等宏。它们的作用是在访问per-cpu变量之前禁用抢占,在访问结束后再恢复抢占。这是至关重要的,因为如果一个任务在访问per-cpu变量的过程中被抢占,并被调度到另一个CPU上运行,那么它后续的访问就会访问到新CPU的错误数据。禁用抢占确保了在访问期间,任务不会被迁移到其他CPU。

它的主要优势体现在哪些方面?

  • 极致的性能:无锁、无缓存争用,是解决SMP扩展性问题的最佳方案。
  • 高扩展性:系统性能不会因为CPU核心数量的增加而在线性更新场景下出现下降。
  • API简洁:对于使用者来说,其复杂性被很好地隐藏了,使用起来就像访问普通变量一样简单。

它存在哪些已知的劣势、局限性或在特定场景下的不适用性?

  • 内存开销:为了避免伪共享,percpu分配器会进行缓存行对齐,这会消耗比实际数据更多的内存。总内存消耗是 (数据大小 + 填充) * CPU核心数,对于大型数据结构,这可能是一个巨大的开销。
  • 访问模式限制:它被设计为“一个CPU主要访问自己的数据”。虽然一个CPU也可以访问另一个CPU的数据副本(通过per_cpu(var, cpu_id)),但这通常效率较低,并且需要额外的同步机制,违背了其设计的初衷。
  • 数据聚合开销:如果需要获取一个per-cpu变量的全局总和(例如,所有CPU上的数据包计数的总和),必须遍历所有可能的CPU核心,读取每个副本的值并相加。这个过程比读取一个单一的全局变量要慢。
  • 抢占上下文要求:访问per-cpu变量必须在禁用抢占的上下文中进行,否则可能导致数据不一致。

使用场景

在哪些具体的业务或技术场景下,它是首选解决方案?请举例说明。

当一个数据项被频繁地、独立地由每个CPU核心进行更新时,percpu唯一的、正确的、首选的解决方案。

  • 统计计数器:网络设备驱动中,每个CPU都需要统计自己处理的收发包数量和字节数。使用per-cpu计数器,每个CPU可以无锁地递增自己的计数器,性能极高。当需要读取总数时(例如,用户通过ifconfig查看),再遍历所有CPU的计数器求和。
  • Per-CPU数据结构:Linux调度器的运行队列(runqueue)是每个CPU私有的。当一个CPU需要选择下一个要运行的任务时,它只在自己的运行队列中查找,完全避免了与其他CPU的竞争。
  • 资源池/缓存:为了避免对全局内存池的锁争用,可以为每个CPU创建一个per-cpu的对象缓存。当一个CPU需要一个对象时,它首先尝试从自己的本地缓存中获取,失败时才去访问全局池。

是否有不推荐使用该技术的场景?为什么?

  • 真正的全局共享数据:如果一个数据项代表的是一个全局状态,需要被所有CPU看到一个统一的、一致的视图(例如,一个全局的只读配置、一个系统状态标志),那么不应该使用per-cpu变量。
  • 写操作稀疏的数据:如果一个共享变量的写操作非常少,那么使用自旋锁或原子操作的开销可以忽略不计。在这种情况下,使用per-cpu变量所带来的内存开销可能得不偿失。
  • 需要频繁跨CPU访问的数据:如果一个CPU经常需要读取或修改另一个CPU的数据副本,那么percpu的设计优势就不存在了,反而会因为复杂的访问逻辑和潜在的同步需求而使代码更复杂。

对比分析

请将其 与 其他相似技术 进行详细对比。

percpu变量的主要对比对象是其他处理并发访问共享数据的技术。

特性 Per-CPU变量 (percpu) 全局变量 + 自旋锁 (spinlock) 全局原子变量 (atomic_t) 简单Per-CPU数组 (无保护)
实现方式 为每个CPU创建独立的、缓存行对齐的数据副本。 单一的全局数据,通过锁机制保证互斥访问。 单一的全局数据,通过CPU的原子指令(如lock add)进行修改。 var[smp_processor_id()]
锁机制 无锁 有锁,存在锁争用和阻塞。 无锁,但硬件层面仍有总线锁或缓存锁。 无锁
缓存性能 极佳。无缓存行弹跳。 。持有锁的CPU会使其他CPU的缓存副本失效,导致缓存行弹跳。 较差。原子操作同样会导致缓存行在多核间弹跳。 极差。极易发生伪共享(False Sharing)。
扩展性 极佳。性能与CPU核心数无关。 。性能随CPU核心数增加而急剧下降。 较差。在高争用下,性能也会随核心数增加而下降。 。伪共享问题随核心数增加而恶化。
内存开销 较高size * NR_CPUS + 填充。 。单一实例。 。单一实例。 中等size * NR_CPUS,但无填充。
聚合成本 。需要遍历所有CPU求和。 。获取锁后直接读取。 。直接读取。 。需要遍历所有CPU求和。
适用场景 各CPU频繁、独立地进行写操作。如计数器、本地队列。 写操作不频繁,或临界区非常短。 简单的整数计数或标志位,争用不极端。 不应使用。是典型的反模式。

mm/percpu-internal.h

pcpu_chunk_map_bits 计算 chunk 映射位数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
* pcpu_nr_pages_to_map_bits - 将页面转换为位图的大小
* @pages:物理页数
*
* 此转换是从物理页到位图中所需的位数。
*/
static inline int pcpu_nr_pages_to_map_bits(int pages)
{
// 8 * 4096 / (1 << 2 ) = 8192
return pages * PAGE_SIZE / PCPU_MIN_ALLOC_SIZE;
}

/**
* pcpu_chunk_map_bits - 将 nr_pages 转换为位图大小的助手
* @chunk:感兴趣的块
*
* 此转换是从块提供的物理页数到位图中的位数。
*/
static inline int pcpu_chunk_map_bits(struct pcpu_chunk *chunk)
{
return pcpu_nr_pages_to_map_bits(chunk->nr_pages);
}

pcpu_chunk_nr_blocks

1
2
3
4
5
6
7
8
9
10
11
/**
* pcpu_chunk_nr_blocks - 将 nr_pages 转换为 # of md_blocks
* @chunk:感兴趣的块
*
* 此转换是从块提供的物理页面数到使用的位图块数。
*/
static inline int pcpu_chunk_nr_blocks(struct pcpu_chunk *chunk)
{
// 8 * PAGE_SIZE / PAGE_SIZE = 8
return chunk->nr_pages * PAGE_SIZE / PCPU_BITMAP_BLOCK_SIZE;
}

include/asm-generic/percpu.h

this_cpu_generic_to_op

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#define per_cpu_ptr(ptr, cpu)						\
({ \
(void)(cpu); \
__verify_pcpu_ptr(ptr); \
PERCPU_PTR(ptr); \
})

#define raw_cpu_ptr(ptr) per_cpu_ptr(ptr, 0)

#define raw_cpu_generic_to_op(pcp, val, op) \
do { \
*raw_cpu_ptr(&(pcp)) op val; \
} while (0)

#define this_cpu_generic_to_op(pcp, val, op) \
do { \
unsigned long __flags; \
raw_local_irq_save(__flags); \
raw_cpu_generic_to_op(pcp, val, op); \
raw_local_irq_restore(__flags); \
} while (0)

this_cpu_write

1
#define this_cpu_write_1(pcp, val)	this_cpu_generic_to_op(pcp, val, =)

include/linux/percpu.h

alloc_percpu 分配每 CPU 的内存

1
2
3
4
5
6
#define __alloc_percpu(_size, _align)					\
alloc_hooks(pcpu_alloc_noprof(_size, _align, false, GFP_KERNEL))

#define alloc_percpu(type) \
(typeof(type) __percpu *)__alloc_percpu(sizeof(type), \
__alignof__(type))

include/linux/percpu-defs.h

preempt_count_add preempt_count_sub 增加和减少抢占计数

1
2
3
4
5
6
7
8
9
10
#if defined(CONFIG_DEBUG_PREEMPT) || defined(CONFIG_TRACE_PREEMPT_TOGGLE)
extern void preempt_count_add(int val);
extern void preempt_count_sub(int val);
#define preempt_count_dec_and_test() \
({ preempt_count_sub(1); should_resched(0); })
#else
#define preempt_count_add(val) __preempt_count_add(val)
#define preempt_count_sub(val) __preempt_count_sub(val)
#define preempt_count_dec_and_test() __preempt_count_dec_and_test()
#endif

__preempt_count_inc __preempt_count_dec 增加和减少抢占计数

1
2
#define __preempt_count_inc() __preempt_count_add(1)
#define __preempt_count_dec() __preempt_count_sub(1)

get_cpu_ptr 获取当前CPU指针

1
2
3
4
5
#define get_cpu_ptr(var)						\
({ \
preempt_disable(); \
this_cpu_ptr(var); \
})

__PCPU_ATTRS

1
2
3
4
5
6
7
8
9
10
11
12
/*
* 每个 CPU 变量声明和定义的基本实现,其中要放置变量的部分由 'sec' 参数提供。 这可用于影响控制变量存储的参数。
*
* 注意! DECLARE 和 DEFINE 的节必须匹配,否则由于编译器生成了访问该节的错误代码而发生链接错误。
*/
#define __PCPU_ATTRS(sec) \
__percpu __attribute__((section(PER_CPU_BASE_SECTION sec))) \
PER_CPU_ATTRIBUTES

#define __PCPU_DUMMY_ATTRS \
__section(".discard") __attribute__((unused))

DEFINE_PER_CPU DECLARE_PER_CPU 定义每个 CPU 变量 声明每个 CPU 变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/*
* s390 和 alpha 模块要求将 perCPU 变量定义为 weak,以强制编译器为它们生成基于 GOT 的外部引用。 这是必要的,因为 percpu 部分将位于通常可寻址区域之外。
*
* 此定义在定义 percpu 变量时施加了以下两个额外限制。
*
* 1.该元件必须是全局唯一的,即使是静态的也是如此。
* 2.静态 percpu 变量不能在函数内部定义。
*
* 需要弱 percpu 定义的 Arch 应在必要时在 asm/percpu.h 中定义 ARCH_NEEDS_WEAK_PER_CPU。
*
* 为确保通用代码遵守上述两个限制,如果设置了 CONFIG_DEBUG_FORCE_WEAK_PER_CPU,则所有情况都使用弱定义。
*/
#if defined(ARCH_NEEDS_WEAK_PER_CPU) || defined(CONFIG_DEBUG_FORCE_WEAK_PER_CPU)
#else
#define DECLARE_PER_CPU_SECTION(type, name, sec) \
extern __PCPU_ATTRS(sec) __typeof__(type) name

#define DEFINE_PER_CPU_SECTION(type, name, sec) \
__PCPU_ATTRS(sec) __typeof__(type) name
#endif

/*
* 用于普通 CPU 变量的 per-CPU 变量声明/定义主题的变体。
*/
#define DECLARE_PER_CPU(type, name) \
DECLARE_PER_CPU_SECTION(type, name, "")

#define DEFINE_PER_CPU(type, name) \
DEFINE_PER_CPU_SECTION(type, name, "")

__verify_pcpu_ptr 验证@ptr是否为 percpu 指针

1
2
3
4
5
6
7
8
9
10
11
12
/*
* __verify_pcpu_ptr() 验证@ptr是否为 percpu 指针而不计算@ptr并且在所有访问器和作访问 percpu 区域之前调用一次。
* 这是在 percpu 的泛型部分执行的,arch 覆盖不需要担心它;
* 但是,如果 Arch 想要实现特定于 Arch 的 perCPU 访问器或作,则可以使用 __verify_pcpu_ptr() 来验证参数。
*
* 需要 0 才能将指针类型从潜在的数组类型转换为指向数组中单个项目的指针。
*/
#define __verify_pcpu_ptr(ptr) \
do { \
const void __percpu *__vpp_verify = (typeof((ptr) + 0))NULL; \
(void)__vpp_verify; \
} while (0)

__pcpu_size_call

1
2
3
4
5
6
7
8
9
10
11
12
#define __pcpu_size_call(stem, variable, ...)				\
do { \
__verify_pcpu_ptr(&(variable)); \
switch(sizeof(variable)) { \
case 1: stem##1(variable, __VA_ARGS__);break; \
case 2: stem##2(variable, __VA_ARGS__);break; \
case 4: stem##4(variable, __VA_ARGS__);break; \
case 8: stem##8(variable, __VA_ARGS__);break; \
default: \
__bad_size_call_parameter();break; \
} \
} while (0)

this_cpu_write 写入每个 CPU 变量

1
2
3
4
/*
* 具有隐含抢占/中断保护的作。 这些操作无需担心抢占或中断即可使用。
*/
#define this_cpu_write(pcp, val) __pcpu_size_call(this_cpu_write_, pcp, val)

PERCPU_PTR

1
2
3
#define PERCPU_PTR(__p)							\
(TYPEOF_UNQUAL(*(__p)) __force __kernel *)((__force unsigned long)(__p))

include/linux/percpu_counter.h

percpu_counter_init 初始化每 CPU 计数器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
static inline int percpu_counter_init_many(struct percpu_counter *fbc,
s64 amount, gfp_t gfp,
u32 nr_counters)
{
u32 i;

for (i = 0; i < nr_counters; i++)
fbc[i].count = amount;

return 0;
}

static inline int percpu_counter_init(struct percpu_counter *fbc, s64 amount,
gfp_t gfp)
{
return percpu_counter_init_many(fbc, amount, gfp, 1);
}

mm/percpu-km.c percpu 的 kmalloc 风格分配器 一种用于大型或特殊对齐每 CPU 数据的后备分配器

历史与背景

这项技术是为了解决什么特定问题而诞生的?

这项技术以及 mm/percpu-km.c 文件的存在,是为了解决主 Per-CPU 分配器(在 mm/percpu.c 中实现)的内在局限性。主分配器(chunk allocator)为了极致的性能和内存密度,做了一些设计上的取舍,这使得它不适合所有场景。percpu-km.c 提供了一种**备用或回退(fallback)**的分配策略,专门用于处理主分配器不擅长的情况:

  • 大型Per-CPU分配:主 Per-CPU 分配器基于“块(Chunk)”来管理内存,它将多个 Per-CPU 变量紧凑地排列在为每个CPU划分的“单元(Unit)”中。这种设计对于大量的小型、中型变量非常高效。但是,如果要为一个非常大的数据结构(例如,一个几KB甚至更大的缓冲区)分配Per-CPU实例,将其放入主分配器的块中会非常低效,甚至可能因为超过单元大小而无法分配。这会造成巨大的内部碎片,并破坏主分配器的紧凑布局。
  • 特殊的内存对齐要求:主分配器有其自己的内部对齐保证(通常是缓存行对齐),但它无法满足任意的、更严格的对齐要求。例如,某些DMA引擎可能要求其缓冲区必须是页对齐(PAGE_ALIGNED)的。主分配器无法提供这种保证。

percpu-km.c 实现的分配器,通过一种更简单、更直接的方式解决了这两个问题,它为每个CPU的副本进行独立的内存分配。

它的发展经历了哪些重要的里程碑或版本迭代?

percpu-km.c 的发展与主 Per-CPU 框架的演进紧密相连,它不是一个独立演进的功能,而是作为整个Per-CPU基础设施的补充而存在的。

  • 其概念的出现源于对主分配器局限性的认识。当内核开发者需要在驱动中创建大型的、需要特殊对齐的Per-CPU数据结构时,就需要一种不同于标准alloc_percpu()的机制。
  • 最初,这种备用分配可能是通过一些临时的宏或函数实现的。后来,为了统一和规范化,内核将这种“为每个CPU单独调用kmalloc”的模式抽象出来,形成了mm/percpu-km.c中的实现。
  • 最重要的里程碑是 pcpu_alloc() API 的引入。这个API充当了一个前端或调度器:它会检查请求的分配大小。如果大小小于一个阈值(PCPU_MIN_UNIT_SIZE),它就调用mm/percpu.c中的高性能块分配器;如果大于该阈值,它就自动调用mm/percpu-km.c中的备用分配器。这使得整个过程对大多数内核开发者来说是透明的。

目前该技术的社区活跃度和主流应用情况如何?

percpu-km.c是Per-CPU基础设施中一个稳定且必要的组成部分。

  • 社区活跃度:代码非常稳定,几乎没有大的变动。相关的修改通常是随着底层内存分配器(如slab, page allocator)的API变化而进行的适应性调整。
  • 主流应用:它虽然是“备用”方案,但在很多关键场景下被间接使用。任何时候内核代码需要一个尺寸较大的动态Per-CPU内存区域时,pcpu_alloc()就会自动路由到percpu-km.c的实现。例如,某些性能分析工具、复杂的网络驱动或存储驱动中需要的大型Per-CPU缓冲区,都可能使用此机制。

核心原理与设计

它的核心工作原理是什么?

percpu-km.c 的工作原理与主分配器截然不同,它更加简单直接:

  1. 独立分配:当percpu-km.c中的分配函数被调用时,它不会去操作复杂的“块”和“单元”。相反,它会进入一个循环,为系统中的每一个CPU核心,单独调用底层的内存分配器(如 kmalloc__get_free_pages)来分配一块独立的内存。
  2. 指针数组管理:它会额外分配一个小的数组,这个数组的大小与CPU核心数相同。在循环中,每次为一个CPU成功分配了内存块后,就将这块内存的地址存入这个指针数组中对应的CPU索引位置。
  3. 地址翻译:当需要访问特定CPU的副本时(例如,通过 per_cpu_ptr(ptr, cpu)),访问逻辑会首先获取这个指针数组的基地址,然后根据传入的cpu ID,从数组中取出对应CPU的独立内存块的地址并返回。
  4. 释放:释放时,过程相反。它会遍历指针数组,对其中的每一个指针调用 kfreefree_pages,最后再释放这个指针数组本身。

它的主要优势体现在哪些方面?

  • 大小和对齐的灵活性:可以分配任意大小的Per-CPU数据(只要底层kmalloc能满足),并且可以轻易地支持任意的对齐要求(通过调用kmalloc的对齐变体或直接从页分配器获取对齐的页)。
  • 隔离性:将大型分配与主Per-CPU分配器的精密数据布局分离开来,避免了对主分配器造成碎片化污染。
  • 简单性:其实现逻辑比主分配器的块管理算法要简单得多。

它存在哪些已知的劣势、局限性或在特定场景下的不适用性?

  • 访问性能较低:这是其最主要的缺点。与主分配器可以通过单条指令(利用段寄存器)访问Per-CPU数据不同,通过percpu-km.c分配的内存,其访问过程需要“获取CPU ID -> 访问指针数组 -> 间接寻址到最终内存”这几个步骤,速度明显慢于主分配器。
  • 内存开销:由于为每个CPU进行单独分配,可能会产生更多的内存碎片(取决于底层分配器的效率)。此外,还需要额外的空间来存储那个管理所有副本指针的数组。
  • 缓存不友好:虽然每个CPU的副本是独立的,但管理这些副本的指针数组本身是可能被所有CPU共享访问的,可能会带来一些微小的缓存一致性开销。

使用场景

在哪些具体的业务或技术场景下,它是首选解决方案?请举例说明。

percpu-km.c中的机制是作为pcpu_alloc()的后端,在以下场景中被自动选择为首选解决方案:

  • 大型Per-CPU数据结构:一个驱动程序需要为每个CPU维护一个大的I/O环形缓冲区(ring buffer),大小可能是几个内存页。在这种情况下,调用pcpu_alloc(PAGE_SIZE * 4, ...)时,内核会检测到请求的大小超过了阈值,并自动使用percpu-km.c的逻辑,为每个CPU分配独立的4个页。
  • 需要特定对齐的Per-CPU数据:一个存储驱动需要为每个CPU准备一个用于DMA的元数据区域,并且硬件要求这个区域必须对齐到1024字节。通过调用pcpu_alloc()并传入相应的对齐参数,最终会由percpu-km.c的逻辑来满足这个特殊的对齐要求。

是否有不推荐使用该技术的场景?为什么?

绝对不应该将此机制用于小型、性能极其敏感的Per-CPU变量,例如统计计数器或调度器中的小标志。

  • 原因:对于这类变量,访问速度是第一位的。主Per-CPU分配器(mm/percpu.c)提供的单指令快速访问是专门为此优化的。如果强行使用percpu-km.c的模式来分配一个4字节的计数器,不仅会浪费大量内存(每个CPU都会分配一个独立的、至少为SLAB最小尺寸的内存块),而且每次访问计数器都会有显著的性能惩罚,这完全违背了使用Per-CPU变量的初衷。

对比分析

请将其 与 其他相似技术 进行详细对比。

percpu-km.c所实现的分配器,其唯一的、也是最重要的对比对象,就是mm/percpu.c中实现的主Per-CPU块分配器。

特性 主分配器 (mm/percpu.c) 后备分配器 (mm/percpu-km.c)
别名 Chunk Allocator, First-chunk Allocator Page-based Allocator, kmalloc-based Allocator
核心机制 将多个变量打包进预先分配的大块内存(Chunks),每个Chunk为所有CPU划分了单元(Units)。 为每个CPU的副本单独调用 kmalloc 或页分配器进行分配,并用一个指针数组来管理。
访问性能 极高。通常可通过一条指令(利用段寄存器如%gs)直接访问,无额外开销。 较低。访问需要通过指针数组进行间接寻址,涉及多次内存读取,速度较慢。
适用大小 优化用于小型到中型的变量(小于 PCPU_MIN_UNIT_SIZE)。 设计用于大型变量(大于 PCPU_MIN_UNIT_SIZE)。
对齐支持 提供基本的缓存行对齐,但不支持任意的、严格的对齐要求。 灵活。可以支持由底层分配器提供的任意对齐要求(如页对齐)。
内存效率 对于大量小变量,内存密度非常高,碎片少。 对于大变量,避免了污染主分配器。但自身可能因多次独立分配而产生碎片。
主要API入口 alloc_percpu() (动态), DEFINE_PER_CPU (静态) pcpu_alloc() 在检测到大尺寸请求时自动调用

pcpu_mem_zalloc pcpu_mem_free

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
* pcpu_mem_zalloc - 分配内存
* @size:要分配的字节数
* @gfp:分配标志
*
* 分配 @size 字节。 如果 @size 小于 PAGE_SIZE,则使用 kzalloc() ;否则,使用 vzalloc() 的等效项。这是为了方便传递列入白名单的标志。 返回的内存始终归零。
*
*返回:
* 成功时指向已分配区域的指针,失败时指向 NULL。
*/
static void *pcpu_mem_zalloc(size_t size, gfp_t gfp)
{
if (WARN_ON_ONCE(!slab_is_available()))
return NULL;

if (size <= PAGE_SIZE) //分配小于 PAGE_SIZE 的内存时使用 kzalloc
return kzalloc(size, gfp);
else //__vmalloc 通常用于分配较大的内存块
return __vmalloc(size, gfp | __GFP_ZERO);
}

/**
* pcpu_mem_free - free memory
* @ptr: memory to free
*
* Free @ptr. @ptr should have been allocated using pcpu_mem_zalloc().
*/
static void pcpu_mem_free(void *ptr)
{
kvfree(ptr);
}

pcpu_alloc_chunk 分配 chunk

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
static struct pcpu_chunk *pcpu_alloc_chunk(gfp_t gfp)
{
struct pcpu_chunk *chunk;
int region_bits;

chunk = pcpu_mem_zalloc(pcpu_chunk_struct_size, gfp);
if (!chunk)
return NULL;

INIT_LIST_HEAD(&chunk->list);
chunk->nr_pages = pcpu_unit_pages;
region_bits = pcpu_chunk_map_bits(chunk);

chunk->alloc_map = pcpu_mem_zalloc(BITS_TO_LONGS(region_bits) *
sizeof(chunk->alloc_map[0]), gfp);
if (!chunk->alloc_map)
goto alloc_map_fail;

chunk->bound_map = pcpu_mem_zalloc(BITS_TO_LONGS(region_bits + 1) *
sizeof(chunk->bound_map[0]), gfp);
if (!chunk->bound_map)
goto bound_map_fail;

chunk->md_blocks = pcpu_mem_zalloc(pcpu_chunk_nr_blocks(chunk) *
sizeof(chunk->md_blocks[0]), gfp);
if (!chunk->md_blocks)
goto md_blocks_fail;

#ifdef NEED_PCPUOBJ_EXT
if (need_pcpuobj_ext()) {
chunk->obj_exts =
pcpu_mem_zalloc(pcpu_chunk_map_bits(chunk) *
sizeof(struct pcpuobj_ext), gfp);
if (!chunk->obj_exts)
goto objcg_fail;
}
#endif

pcpu_init_md_blocks(chunk);

/* init metadata */
chunk->free_bytes = chunk->nr_pages * PAGE_SIZE;

return chunk;

#ifdef NEED_PCPUOBJ_EXT
objcg_fail:
pcpu_mem_free(chunk->md_blocks);
#endif
md_blocks_fail:
pcpu_mem_free(chunk->bound_map);
bound_map_fail:
pcpu_mem_free(chunk->alloc_map);
alloc_map_fail:
pcpu_mem_free(chunk);

return NULL;
}

pcpu_create_chunk 创建 chunk

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
static struct pcpu_chunk *pcpu_create_chunk(gfp_t gfp)
{
//计算页数
const int nr_pages = pcpu_group_sizes[0] >> PAGE_SHIFT;
struct pcpu_chunk *chunk;
struct page *pages;
unsigned long flags;
int i;

chunk = pcpu_alloc_chunk(gfp);
if (!chunk)
return NULL;
//分配内存页
pages = alloc_pages(gfp, order_base_2(nr_pages)); //计算分配所需的页阶(order),确保分配的内存是 2 的幂次对齐
if (!pages) {
pcpu_free_chunk(chunk);
return NULL;
}
//初始化页与块的关联
for (i = 0; i < nr_pages; i++)
pcpu_set_page_chunk(nth_page(pages, i), chunk); //page->private = (unsigned long)pcpu;
//初始化块的元数据
chunk->data = pages;
chunk->base_addr = page_address(pages);

spin_lock_irqsave(&pcpu_lock, flags);
pcpu_chunk_populated(chunk, 0, nr_pages); //标记块为已填充
spin_unlock_irqrestore(&pcpu_lock, flags);
//更新统计信息并记录跟踪
pcpu_stats_chunk_alloc();
trace_percpu_create_chunk(chunk->base_addr);

return chunk;
}

mm/percpu.c

pcpu_alloc_alloc_info 分配 perCPU 分配信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
/**
* pcpu_alloc_alloc_info - 分配 perCPU 分配信息
* @nr_groups:组数
* @nr_units:单位数
*
* 分配足够大的 ai,用于包含 @nr_units 个单元的 @nr_groups 组。
* 返回的 ai's groups[0].cpu_map 指向 cpu_map 数组,该数组足够长,可以@nr_units并填充NR_CPUS。
* 调用方负责初始化其他组cpu_map指针。
*
* 返回:
* 成功时指向已分配pcpu_alloc_info的指针,失败时指向 NULL。
*/
struct pcpu_alloc_info * __init pcpu_alloc_alloc_info(int nr_groups,
int nr_units)
{
struct pcpu_alloc_info *ai;
size_t base_size, ai_size;
void *ptr;
int unit;

base_size = ALIGN(
struct_size(ai, groups, nr_groups), //根据nr_groups计算结构体最后的灵活数组的大小
__alignof__(ai->groups[0].cpu_map[0]));
ai_size = base_size + nr_units * sizeof(ai->groups[0].cpu_map[0]);

ptr = memblock_alloc(PFN_ALIGN(ai_size), PAGE_SIZE);
if (!ptr)
return NULL;
ai = ptr;
ptr += base_size;

ai->groups[0].cpu_map = ptr;

for (unit = 0; unit < nr_units; unit++)
ai->groups[0].cpu_map[unit] = NR_CPUS;

ai->nr_groups = nr_groups;
ai->__ai_size = PFN_ALIGN(ai_size);

return ai;
}

pcpu_init_md_blocks 初始化元数据块结构体块

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/*
* PCPU_BITMAP_BLOCK_SIZE 的大小必须与 PAGE_SIZE 的大小相同,
* 因为 hint 的更新用于管理 chunk 和全局nr_empty_pop_pages。
*/
#define PCPU_BITMAP_BLOCK_SIZE PAGE_SIZE
// 4096 >> 2 = 1024
#define PCPU_BITMAP_BLOCK_BITS (PCPU_BITMAP_BLOCK_SIZE >> PCPU_MIN_ALLOC_SHIFT)

static void pcpu_init_md_block(struct pcpu_block_md *block, int nr_bits)
{
block->scan_hint = 0;
block->contig_hint = nr_bits;
block->left_free = nr_bits;
block->right_free = nr_bits;
block->first_free = 0;
block->nr_bits = nr_bits;
}

static void pcpu_init_md_blocks(struct pcpu_chunk *chunk)
{
struct pcpu_block_md *md_block;

/* init the chunk's block 8192*/
pcpu_init_md_block(&chunk->chunk_md, pcpu_chunk_map_bits(chunk));
//有8个块,执行每个的初始化
for (md_block = chunk->md_blocks;
md_block != chunk->md_blocks + pcpu_chunk_nr_blocks(chunk);
md_block++)
pcpu_init_md_block(md_block, PCPU_BITMAP_BLOCK_BITS);
}

pcpu_alloc_first_chunk 分配第一个 chunk

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
/**
* pcpu_alloc_first_chunk - 创建为第一个数据块提供服务的数据块
* @tmp_addr:所服务区域的开始时间
* @map_size:所服务区域的大小
*
* 这负责创建为第一个 chunk 提供服务的 chunk。
* base_addr是页面向下对齐@tmp_addr而区域末端是页面向上对齐。
* 跟踪偏移量以确定服务的区域。所有这些都是为了安抚位图分配器以避免部分块。
*
* 返回:
* 以 @map_size @tmp_addr 为区域提供服务的区块。
*/
static struct pcpu_chunk * __init pcpu_alloc_first_chunk(unsigned long tmp_addr,
int map_size)
{
struct pcpu_chunk *chunk;
unsigned long aligned_addr;
int start_offset, offset_bits, region_size, region_bits;
size_t alloc_size;

/*区域计算*/
aligned_addr = tmp_addr & PAGE_MASK;

start_offset = tmp_addr - aligned_addr;
region_size = ALIGN(start_offset + map_size, PAGE_SIZE);

/* allocate chunk */
//PAGE_SHIFT=12 BITS_TO_LONGS(region_size >> PAGE_SHIFT) =1 alloc_size= 88 + 4
alloc_size = struct_size(chunk, populated, //92
BITS_TO_LONGS(region_size >> PAGE_SHIFT));
chunk = memblock_alloc_or_panic(alloc_size, SMP_CACHE_BYTES);

INIT_LIST_HEAD(&chunk->list);

chunk->base_addr = (void *)aligned_addr;
chunk->start_offset = start_offset;
chunk->end_offset = region_size - chunk->start_offset - map_size;

chunk->nr_pages = region_size >> PAGE_SHIFT; //32768 >> 12 = 8
region_bits = pcpu_chunk_map_bits(chunk); //8192

//对于 alloc_map,只需要 region_bits 位即可,因为它只需要标记每个分配单元的使用情况。
alloc_size = BITS_TO_LONGS(region_bits) * sizeof(chunk->alloc_map[0]); //1024
chunk->alloc_map = memblock_alloc_or_panic(alloc_size, SMP_CACHE_BYTES);

//对于 bound_map,需要额外的 1 位来标记最后一个分配区域的结束位置,因此需要 region_bits + 1。
alloc_size =
BITS_TO_LONGS(region_bits + 1) * sizeof(chunk->bound_map[0]); //1028
chunk->bound_map = memblock_alloc_or_panic(alloc_size, SMP_CACHE_BYTES);

// = 8 * 32 = 256
alloc_size = pcpu_chunk_nr_blocks(chunk) * sizeof(chunk->md_blocks[0]); //256
chunk->md_blocks = memblock_alloc_or_panic(alloc_size, SMP_CACHE_BYTES);
#ifdef NEED_PCPUOBJ_EXT
/*第一个块可以免费使用 */
chunk->obj_exts = NULL;
#endif
pcpu_init_md_blocks(chunk);

/* 管理填充的页面位图*/
chunk->immutable = true;
bitmap_fill(chunk->populated, chunk->nr_pages);
chunk->nr_populated = chunk->nr_pages; //8
chunk->nr_empty_pop_pages = chunk->nr_pages; //8

chunk->free_bytes = map_size; //32768

if (chunk->start_offset) {
/* 隐藏位图的开头*/
offset_bits = chunk->start_offset / PCPU_MIN_ALLOC_SIZE;
bitmap_set(chunk->alloc_map, 0, offset_bits);
set_bit(0, chunk->bound_map);
set_bit(offset_bits, chunk->bound_map);

chunk->chunk_md.first_free = offset_bits;

pcpu_block_update_hint_alloc(chunk, 0, offset_bits);
}

if (chunk->end_offset) {
/* 隐藏位图的结尾 */
offset_bits = chunk->end_offset / PCPU_MIN_ALLOC_SIZE;
bitmap_set(chunk->alloc_map,
pcpu_chunk_map_bits(chunk) - offset_bits,
offset_bits);
set_bit((start_offset + map_size) / PCPU_MIN_ALLOC_SIZE,
chunk->bound_map);
set_bit(region_bits, chunk->bound_map);

pcpu_block_update_hint_alloc(chunk, pcpu_chunk_map_bits(chunk)
- offset_bits, offset_bits);
}

return chunk;
}

pcpu_chunk_move

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
static void __pcpu_chunk_move(struct pcpu_chunk *chunk, int slot,
bool move_front)
{
if (chunk != pcpu_reserved_chunk) {
if (move_front)
list_move(&chunk->list, &pcpu_chunk_lists[slot]);
else
list_move_tail(&chunk->list, &pcpu_chunk_lists[slot]);
}
}

static void pcpu_chunk_move(struct pcpu_chunk *chunk, int slot)
{
__pcpu_chunk_move(chunk, slot, true);
}

pcpu_chunk_relocate 重新定位 chunk

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
static int pcpu_size_to_slot(int size)
{
// == 32768
if (size == pcpu_unit_size)
return pcpu_free_slot; //15
return __pcpu_size_to_slot(size);
}

static int pcpu_chunk_slot(const struct pcpu_chunk *chunk)
{
const struct pcpu_block_md *chunk_md = &chunk->chunk_md;

if (chunk->free_bytes < PCPU_MIN_ALLOC_SIZE ||
chunk_md->contig_hint == 0)
return 0;
//8192 * 4
return pcpu_size_to_slot(chunk_md->contig_hint * PCPU_MIN_ALLOC_SIZE);
}

/**
* pcpu_chunk_relocate - 将 chunk 放入适当的 chunk 插槽中
* @chunk:感兴趣的块
* @oslot:上一个插槽
*
* 此函数在分配或自由更改@chunk后调用。根据更改的状态确定新插槽,并将@chunk移动到插槽中。 请注意,reserved chunk 永远不会放在 chunk 插槽上。
*
*上下文:
* pcpu_lock。
*/
static void pcpu_chunk_relocate(struct pcpu_chunk *chunk, int oslot)
{
int nslot = pcpu_chunk_slot(chunk); //15

/*将隔离的块保留在原地 */
if (chunk->isolated)
return;

if (oslot != nslot)
__pcpu_chunk_move(chunk, nslot, oslot < nslot);
}

pcpu_setup_first_chunk 设置第一个 chunk

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
/**
* pcpu_setup_first_chunk - 初始化第一个 percpu 块
* @ai:pcpu_alloc_info描述如何调整 CPU 区域
* @base_addr:映射地址
*
* 初始化第一个 percpu 块,其中包含内核静态 percpu 区域。 该函数从 arch percpu area 设置路径调用。
*
* @ai 包含初始化第一个 chunk 和 prime 动态 percpu 分配器所需的所有信息。
*
* @ai->static_size 是静态 percpu 区域的大小。
*
* @ai->reserved_size 如果为非零,则指定在第一个块的静态区域之后要保留的字节数。 这将保留第一个块,以便它只能通过保留的 percpu 分配使用。 这主要用于在寻址模型对符号重定位的偏移范围有限的体系结构上提供模块 percpu 静态区域,以保证模块 percpu 符号位于可重定位范围内。
*
* @ai->dyn_size 确定第一个块中可用于动态分配的字节数。 @ai->static_size @ai->reserved_size @ai->dyn_size 和 @ai->unit_size 之间的区域未使用。
*
* @ai->unit_size 指定商品大小,并且必须与 PAGE_SIZE 对齐,且等于或大于 @ai->static_size @ai->reserved_size @ai->dyn_size。
*
* @ai->atom_size 是分配原子大小,用作 VM 区域的对齐方式。
*
* @ai->alloc_size 是分配大小,始终是 的倍数
* @ai->atom_size。 >atom_size@ai如果
* @ai->unit_size大于 @ai->atom_size。
*
* @ai->nr_groups 和 @ai->groups 描述了 percpu 区域的虚拟内存布局。 应归置的单元将归入同一组。 动态 VM 区域将根据这些分组进行分配。 如果 @ai->nr_groups 为零,则假定包含所有单位的单个组。
*
* 调用方应已映射 @base_addr 的第一个块,并将静态数据复制到每个单元。
*
* 第一个数据块将始终包含一个静态区域和一个动态区域。但是,静态区域不受任何块管理。 如果第一个数据块还包含预留区域,则它由两个数据块提供服务 - 一个用于预留区域,一个用于动态区域。 它们共享同一个 VM,但在区域分配映射中使用偏移区域。为动态区域提供服务的 chunk 在 chunk 插槽中循环,并且可以像任何其他 chunk 一样进行动态分配。
*/
void __init pcpu_setup_first_chunk(const struct pcpu_alloc_info *ai,
void *base_addr)
{
size_t size_sum = ai->static_size + ai->reserved_size + ai->dyn_size;
size_t static_size, dyn_size;
unsigned long *group_offsets;
size_t *group_sizes;
unsigned long *unit_off;
unsigned int cpu;
int *unit_map;
int group, unit, i;
unsigned long tmp_addr;
size_t alloc_size;

#define PCPU_SETUP_BUG_ON(cond) do { \
if (unlikely(cond)) { \
pr_emerg("failed to initialize, %s\n", #cond); \
pr_emerg("cpu_possible_mask=%*pb\n", \
cpumask_pr_args(cpu_possible_mask)); \
pcpu_dump_alloc_info(KERN_EMERG, ai); \
BUG(); \
} \
} while (0)

/* sanity checks */
PCPU_SETUP_BUG_ON(ai->nr_groups <= 0);
#ifdef CONFIG_SMP
PCPU_SETUP_BUG_ON(!ai->static_size);
PCPU_SETUP_BUG_ON(offset_in_page(__per_cpu_start));
#endif
PCPU_SETUP_BUG_ON(!base_addr);
PCPU_SETUP_BUG_ON(offset_in_page(base_addr));
PCPU_SETUP_BUG_ON(ai->unit_size < size_sum);
PCPU_SETUP_BUG_ON(offset_in_page(ai->unit_size));
PCPU_SETUP_BUG_ON(ai->unit_size < PCPU_MIN_UNIT_SIZE);
PCPU_SETUP_BUG_ON(!IS_ALIGNED(ai->unit_size, PCPU_BITMAP_BLOCK_SIZE));
PCPU_SETUP_BUG_ON(ai->dyn_size < PERCPU_DYNAMIC_EARLY_SIZE);
PCPU_SETUP_BUG_ON(!IS_ALIGNED(ai->reserved_size, PCPU_MIN_ALLOC_SIZE));
PCPU_SETUP_BUG_ON(!(IS_ALIGNED(PCPU_BITMAP_BLOCK_SIZE, PAGE_SIZE) ||
IS_ALIGNED(PAGE_SIZE, PCPU_BITMAP_BLOCK_SIZE)));
PCPU_SETUP_BUG_ON(pcpu_verify_alloc_info(ai) < 0);

/* 处理组信息并相应地构建配置表 */
alloc_size = ai->nr_groups * sizeof(group_offsets[0]);
group_offsets = memblock_alloc_or_panic(alloc_size, SMP_CACHE_BYTES);

alloc_size = ai->nr_groups * sizeof(group_sizes[0]);
group_sizes = memblock_alloc_or_panic(alloc_size, SMP_CACHE_BYTES);

alloc_size = nr_cpu_ids * sizeof(unit_map[0]);
unit_map = memblock_alloc_or_panic(alloc_size, SMP_CACHE_BYTES);

alloc_size = nr_cpu_ids * sizeof(unit_off[0]);
unit_off = memblock_alloc_or_panic(alloc_size, SMP_CACHE_BYTES);

for (cpu = 0; cpu < nr_cpu_ids; cpu++)
unit_map[cpu] = UINT_MAX;

pcpu_low_unit_cpu = NR_CPUS;
pcpu_high_unit_cpu = NR_CPUS;
//单核这段无效果
for (group = 0, unit = 0; group < ai->nr_groups; group++, unit += i) {
const struct pcpu_group_info *gi = &ai->groups[group];

group_offsets[group] = gi->base_offset;
group_sizes[group] = gi->nr_units * ai->unit_size;

for (i = 0; i < gi->nr_units; i++) {
cpu = gi->cpu_map[i];
if (cpu == NR_CPUS)
continue;

PCPU_SETUP_BUG_ON(cpu >= nr_cpu_ids);
PCPU_SETUP_BUG_ON(!cpu_possible(cpu));
PCPU_SETUP_BUG_ON(unit_map[cpu] != UINT_MAX);

unit_map[cpu] = unit + i;
unit_off[cpu] = gi->base_offset + i * ai->unit_size;

/* determine low/high unit_cpu */
if (pcpu_low_unit_cpu == NR_CPUS ||
unit_off[cpu] < unit_off[pcpu_low_unit_cpu])
pcpu_low_unit_cpu = cpu;
if (pcpu_high_unit_cpu == NR_CPUS ||
unit_off[cpu] > unit_off[pcpu_high_unit_cpu])
pcpu_high_unit_cpu = cpu;
}
}
pcpu_nr_units = unit;

for_each_possible_cpu(cpu)
PCPU_SETUP_BUG_ON(unit_map[cpu] == UINT_MAX);

/* 我们已经完成了对输入的解析,undefine BUG 宏和转储配置 */
#undef PCPU_SETUP_BUG_ON
/* pcpu-alloc: s0 r0 d32768 u32768 alloc=1*32768
pcpu-alloc: [0] 0 */
pcpu_dump_alloc_info(KERN_DEBUG, ai);

pcpu_nr_groups = ai->nr_groups;
pcpu_group_offsets = group_offsets;
pcpu_group_sizes = group_sizes;
pcpu_unit_map = unit_map;
pcpu_unit_offsets = unit_off;

/* 确定基本参数 */
pcpu_unit_pages = ai->unit_size >> PAGE_SHIFT; //8
pcpu_unit_size = pcpu_unit_pages << PAGE_SHIFT; //32768
pcpu_atom_size = ai->atom_size;
pcpu_chunk_struct_size = struct_size((struct pcpu_chunk *)0, populated,
BITS_TO_LONGS(pcpu_unit_pages));

pcpu_stats_save_ai(ai);

/*
* 分配区块槽。 活动插槽之后的插槽是:
* sidelined_slot - 隔离的、减少填充的块
* free_slot - 完全免费的块
* to_depopulate_slot - 隔离,要取消填充的块
*/
pcpu_sidelined_slot = __pcpu_size_to_slot(pcpu_unit_size) + 1; //14
pcpu_free_slot = pcpu_sidelined_slot + 1; //15
pcpu_to_depopulate_slot = pcpu_free_slot + 1; //16
pcpu_nr_slots = pcpu_to_depopulate_slot + 1; //17
pcpu_chunk_lists = memblock_alloc_or_panic(pcpu_nr_slots *
sizeof(pcpu_chunk_lists[0]),
SMP_CACHE_BYTES);

for (i = 0; i < pcpu_nr_slots; i++)
INIT_LIST_HEAD(&pcpu_chunk_lists[i]);

/*
* 静态区域的末尾需要与最小分配大小对齐,因为这会偏移保留区域和动态区域。
* 第一个 chunk 通过扩展动态区域结束页面对齐,因此可以缩小动态区域以进行补偿,同时仍保持在配置的大小之上。
*/
//ai->static_size = 0,对齐结果为0
static_size = ALIGN(ai->static_size, PCPU_MIN_ALLOC_SIZE);
dyn_size = ai->dyn_size - (static_size - ai->static_size);

/*
* 初始化第一个 chunk:
* 此块分为 3 个部分:
* <static | [reserved] | dynamic>
* - static - 没有后备块,因为这些分配永远无法释放。
* - reserved (pcpu_reserved_chunk) - 主要用于提供模块负载的分配。
* - dynamic (pcpu_first_chunk) - 提供第一个块的动态部分。
*/
tmp_addr = (unsigned long)base_addr + static_size;
if (ai->reserved_size)
pcpu_reserved_chunk = pcpu_alloc_first_chunk(tmp_addr,
ai->reserved_size);
tmp_addr = (unsigned long)base_addr + static_size + ai->reserved_size;
pcpu_first_chunk = pcpu_alloc_first_chunk(tmp_addr, dyn_size);

pcpu_nr_empty_pop_pages = pcpu_first_chunk->nr_empty_pop_pages; //8
pcpu_chunk_relocate(pcpu_first_chunk, -1);

/* 包括第一个数据块的所有区域*/
pcpu_nr_populated += PFN_DOWN(size_sum); //32768

pcpu_stats_chunk_alloc();
trace_percpu_create_chunk(base_addr);

/*我们完成了 */
pcpu_base_addr = base_addr;
}

释放的 perCPU 分配信息

1
2
3
4
5
6
7
8
9
10
/**
* pcpu_free_alloc_info - 释放的 perCPU 分配信息
* @ai:pcpu_alloc_info释放
*
* 由 pcpu_alloc_alloc_info() 分配的 Free @ai。
*/
void __init pcpu_free_alloc_info(struct pcpu_alloc_info *ai)
{
memblock_free(ai, ai->__ai_size);
}

setup_per_cpu_areas UP percpu 区域设置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/*
* UP percpu 区域设置。
*
* UP 始终使用基于 km 的 percpu 分配器与身份映射。静态 percpu 变量与通常的静态变量没有区别,不需要任何特殊准备。
*/
void __init setup_per_cpu_areas(void)
{
//32768
const size_t unit_size =
roundup_pow_of_two(max_t(size_t, PCPU_MIN_UNIT_SIZE,
PERCPU_DYNAMIC_RESERVE));
struct pcpu_alloc_info *ai;
void *fc;

ai = pcpu_alloc_alloc_info(1, 1); //分配 perCPU 分配信息 ,单核只需要一个组和一个单位
fc = memblock_alloc_from(unit_size, PAGE_SIZE, __pa(MAX_DMA_ADDRESS));
if (!ai || !fc)
panic("Failed to allocate memory for percpu areas.");
/* kmemleak tracks the percpu allocations separately */
kmemleak_ignore_phys(__pa(fc));

ai->dyn_size = unit_size;
ai->unit_size = unit_size;
ai->atom_size = unit_size;
ai->alloc_size = unit_size;
ai->groups[0].nr_units = 1;
ai->groups[0].cpu_map[0] = 0;

pcpu_setup_first_chunk(ai, fc);
pcpu_free_alloc_info(ai);
}

pcpu_size_to_slot 计算 chunk 的大小

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
//该函数将内存大小映射到一个离散的槽位编号,便于分配器根据槽位快速找到合适的内存块
static int __pcpu_size_to_slot(int size)
{
//返回 size 的最高有效位的位置(从 1 开始计数)
int highbit = fls(size); /* size is in bytes */
return max(highbit - PCPU_SLOT_BASE_SHIFT + 2, 1);
}

static int pcpu_size_to_slot(int size)
{
//pcpu_setup_first_chunk() pcpu_unit_size = pcpu_unit_pages << PAGE_SHIFT; //32768
if (size == pcpu_unit_size)
return pcpu_free_slot; //pcpu_free_slot = pcpu_sidelined_slot + 1; //15
return __pcpu_size_to_slot(size);
}

pcpu_check_block_hint 检查块提示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* pcpu_check_block_hint - 检查 contig 提示
* @block:兴趣块
* @bits:分配大小
* @align:区域对齐(最大PAGE_SIZE)
*
* 检查 allocation 是否适合块的 contig 提示。请注意,一个 chunk 使用与块相同的 hint,因此这也可以检查 chunk 的 contig hint。
*/
static bool pcpu_check_block_hint(struct pcpu_block_md *block, int bits,
size_t align)
{
int bit_off = ALIGN(block->contig_hint_start, align) -
block->contig_hint_start;

return bit_off + bits <= block->contig_hint;
}

pcpu_for_each_fit_region pcpu_for_each_md_free_region

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
/**
* pcpu_next_md_free_region - 查找下一个无提示区域
* @chunk:感兴趣的块
* @bit_off:块偏移量
* @bits:自由区大小
*
* pcpu_for_each_md_free_region的辅助函数。 它检查 block->contig_hint 并跨块执行聚合以查找下一个提示。 它就地修改 bit_off 和 bits 以在循环中使用。
*/
static void pcpu_next_md_free_region(struct pcpu_chunk *chunk, int *bit_off,
int *bits)
{
int i = pcpu_off_to_block_index(*bit_off); //当前偏移量 bit_off 所在的元数据块索引
int block_off = pcpu_off_to_block_off(*bit_off); //当前偏移量在块内的偏移
struct pcpu_block_md *block;

*bits = 0;
//遍历从当前块索引 i 开始的所有元数据块,直到块的总数(pcpu_chunk_nr_blocks(chunk))
for (block = chunk->md_blocks + i; i < pcpu_chunk_nr_blocks(chunk);
block++, i++) {
/* 处理跨块的重叠区域 */
if (*bits) { //如果 *bits 不为 0,表示之前已经找到了一部分空闲区域
*bits += block->left_free; //将当前块的左侧空闲位数(block->left_free)累加到 *bits。
if (block->left_free == PCPU_BITMAP_BLOCK_BITS) //如果当前块完全空闲 继续处理下一个块。
continue;
return; //如果当前块不完全空闲,返回当前的空闲区域信息
}

/*
* 这将检查三件事。 首先是要检查的contig_hint。 其次,我们之前是否通过比较 block_off 来检查过这个提示。 第三,这与正确的重叠群提示相同吗? 在最后一种情况下,它会溢出到下一个块中,并且应该由跨块代码的 contig 区域处理。
*/
*bits = block->contig_hint;
if (*bits && block->contig_hint_start >= block_off &&
*bits + block->contig_hint_start < PCPU_BITMAP_BLOCK_BITS) {
*bit_off = pcpu_block_off_to_off(i,
block->contig_hint_start);
return;
}
/* 处理右侧空闲区域 */
block_off = 0;

*bits = block->right_free; //检查当前块的右侧空闲区域
*bit_off = (i + 1) * PCPU_BITMAP_BLOCK_BITS - block->right_free; //更新 *bit_off 为右侧空闲区域的起始位置
}
}

/**
* pcpu_next_fit_region - 为给定的分配请求查找合适的区域
* @chunk:感兴趣的块
* @alloc_bits:分配大小
* @align:区域对齐(最大PAGE_SIZE)
* @bit_off:块偏移量
* @bits:自由区大小
*
* 查找下一个可用于给定大小和对齐方式的空闲区域。 仅当存在用于此分配的有效区域时,才会返回此 URL。 如果分配请求适合该块,则返回 block->first_free 以查看是否可以在 contig 提示之前完成该请求。
*/
static void pcpu_next_fit_region(struct pcpu_chunk *chunk, int alloc_bits,
int align, int *bit_off, int *bits)
{
int i = pcpu_off_to_block_index(*bit_off);
int block_off = pcpu_off_to_block_off(*bit_off);
struct pcpu_block_md *block;

*bits = 0;
for (block = chunk->md_blocks + i; i < pcpu_chunk_nr_blocks(chunk);
block++, i++) {
/* handles contig area across blocks */
if (*bits) {
*bits += block->left_free;
if (*bits >= alloc_bits)
return;
if (block->left_free == PCPU_BITMAP_BLOCK_BITS)
continue;
}

/* check block->contig_hint */
*bits = ALIGN(block->contig_hint_start, align) -
block->contig_hint_start;
/*
* This uses the block offset to determine if this has been
* checked in the prior iteration.
*/
if (block->contig_hint &&
block->contig_hint_start >= block_off &&
block->contig_hint >= *bits + alloc_bits) {
int start = pcpu_next_hint(block, alloc_bits);

*bits += alloc_bits + block->contig_hint_start -
start;
*bit_off = pcpu_block_off_to_off(i, start);
return;
}
/* reset to satisfy the second predicate above */
block_off = 0;

*bit_off = ALIGN(PCPU_BITMAP_BLOCK_BITS - block->right_free,
align);
*bits = PCPU_BITMAP_BLOCK_BITS - *bit_off;
*bit_off = pcpu_block_off_to_off(i, *bit_off);
if (*bits >= alloc_bits)
return;
}

/* no valid offsets were found - fail condition */
*bit_off = pcpu_chunk_map_bits(chunk);
}

/*
* 基于元数据的迭代器,用于遍历内存块中的空闲区域或满足特定条件的区域
*/
/* 遍历指定 chunk 中的所有空闲区域
每次迭代返回一个空闲区域的起始偏移量 bit_off 和大小(以位为单位)bits */
#define pcpu_for_each_md_free_region(chunk, bit_off, bits) \
//获取下一个空闲区域的起始位置和大小
for (pcpu_next_md_free_region((chunk), &(bit_off), &(bits)); \
//确保当前偏移量未超出块的总位数
(bit_off) < pcpu_chunk_map_bits((chunk)); \
(bit_off) += (bits) + 1, \
//获取下一个空闲区域
pcpu_next_md_free_region((chunk), &(bit_off), &(bits)))
/* 遍历指定 chunk 中满足分配需求的区域。
每次迭代返回一个符合条件的区域的起始偏移量 bit_off 和大小 bits */
#define pcpu_for_each_fit_region(chunk, alloc_bits, align, bit_off, bits) \
//获取满足分配需求(alloc_bits 和 align)的下一个区域
for (pcpu_next_fit_region((chunk), (alloc_bits), (align), &(bit_off), \
&(bits)); \
//确保未超出块的总位数
(bit_off) < pcpu_chunk_map_bits((chunk)); \
(bit_off) += (bits), \
pcpu_next_fit_region((chunk), (alloc_bits), (align), &(bit_off), \
&(bits)))

pcpu_next_hint 确定要使用的提示

  • 决定扫描的起始位置,是从 scan_hint 开始还是从 first_free 开始。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/*
* pcpu_next_hint - 确定要使用的提示
* @block:兴趣块
* @alloc_bits:分配大小
*
* 这决定了我们应该根据 scan_hint 还是 first_free 进行扫描。
- 通常,我们希望从 first_free 进行扫描,以通过 First Fit 完成分配。
- 但是,如果我们知道某个scan_hint位于位置scan_hint_start无法完成分配,我们可以从那里开始扫描,因为知道该contig_hint将是我们的后备。
*/
static int pcpu_next_hint(struct pcpu_block_md *block, int alloc_bits)
{
/*
* 以下三个条件决定了我们是否可以跳过 thescan_hint。
* 首先,扫描提示是否存在。
* 其次,thecontig_hint scan_hint 之后(可能不是真的 iffcontig_hint == scan_hint)。
* 第三,分配请求是否大于scan_hint。
*/
if (block->scan_hint &&
block->contig_hint_start > block->scan_hint_start &&
alloc_bits > block->scan_hint)
return block->scan_hint_start + block->scan_hint;

return block->first_free;
}

pcpu_is_populated 检查是否填充区域

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/**
* pcpu_is_populated - 确定是否填充区域
* @chunk:感兴趣的块
* @bit_off:块偏移量
* @bits:面积大小
* @next_off:返回下一个偏移量的值以开始搜索
*
* 对于原子分配,请检查是否已填充后备页面。
*
*返回:
* 如果填充了后台页面,则为 Bool。next_index 是跳过 pcpu_find_block_fit 中未填充的块。
*/
static bool pcpu_is_populated(struct pcpu_chunk *chunk, int bit_off, int bits,
int *next_off)
{
unsigned int start, end;
//计算起始和结束页帧号
start = PFN_DOWN(bit_off * PCPU_MIN_ALLOC_SIZE);
end = PFN_UP((bit_off + bits) * PCPU_MIN_ALLOC_SIZE);
//查找第一个未填充的位
start = find_next_zero_bit(chunk->populated, end, start);
if (start >= end)
return true;
//查找下一个已填充的位
end = find_next_bit(chunk->populated, end, start + 1);
//计算下一个偏移量
*next_off = end * PAGE_SIZE / PCPU_MIN_ALLOC_SIZE;
return false;
}

pcpu_find_block_fit 查找块适合

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
/**
* pcpu_find_block_fit - 查找区块索引以开始搜索
* @chunk:感兴趣的块
* @alloc_bits:以分配单位为单位的请求大小
* @align:区域对齐(最大 PAGE_SIZE 字节)
* @pop_only:指示是否仅使用已填充的区域
*
* 给定一个 chunk 和一个 allocation spec,找到偏移量以开始搜索空闲区域。 这将迭代位图元数据块,以找到保证满足要求的偏移量。 它不是第一次拟合,如果分配不适合块或 chunk 的 contig 提示,它就会被跳过。 这在谨慎的一面是错误的,以防止过多的迭代。 对齐不良会导致分配器跳过具有有效空闲区域的块和块。
*
*返回:
* 要开始搜索的位图中的偏移量。
* -1 如果未找到偏移量。
*/
static int pcpu_find_block_fit(struct pcpu_chunk *chunk, int alloc_bits,
size_t align, bool pop_only)
{
struct pcpu_block_md *chunk_md = &chunk->chunk_md;
int bit_off, bits, next_off;

/*
* 这是一项优化,通过假设分配无法适应全局提示,则存在内存压力并且很快就会创建新块来防止扫描。
pcpu_init_md_block() 在 chunk 初始化时设置了 contig_hint。
*/
if (!pcpu_check_block_hint(chunk_md, alloc_bits, align))
return -1;

bit_off = pcpu_next_hint(chunk_md, alloc_bits); //取下一个分配提示位置
bits = 0;
//扫描寻找合适的块
pcpu_for_each_fit_region(chunk, alloc_bits, align, bit_off, bits) {
if (!pop_only || pcpu_is_populated(chunk, bit_off, bits, //当前块已填充
&next_off))
break;

bit_off = next_off;
bits = 0;
}
//检查是否超出块范围
if (bit_off == pcpu_chunk_map_bits(chunk))
return -1;

return bit_off;
}

pcpu_find_zero_area 在位图中查找一段连续的零位区域

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
/*
* pcpu_find_zero_area - 修改自 bitmap_find_next_zero_area_off()
* @map:搜索所依据的地址
* @size:位图大小(以位为单位)
* @start:开始搜索的 bitnumber
* @nr:我们正在寻找的 0 位数
* @align_mask:零区域的对准掩码
* @largest_off:跳过的最大区域的偏移量
* @largest_bits:跳过的最大区域的大小
*
* @align_mask应小于 2 的幂 1。
*
* 这是 bitmap_find_next_zero_area_off() 的修改版本,用于记住跳过的最大区域。
* 这并不完美,但总的来说已经足够好了。
* 最大的记住区域是看到的最大失败区域。
* 这不包括我们可能因对齐而跳过的任何内容。
* pcpu_block_update_scan() 会向后扫描以尝试恢复因对齐而丢失的内容。
* 虽然这可能会导致扫描错过早期可能的空闲区域,但较小的分配最终会填补这些漏洞。
*/
static unsigned long pcpu_find_zero_area(unsigned long *map,
unsigned long size,
unsigned long start,
unsigned long nr,
unsigned long align_mask,
unsigned long *largest_off,
unsigned long *largest_bits)
{
unsigned long index, end, i, area_off, area_bits;
again:
index = find_next_zero_bit(map, size, start);

/* Align allocation */
index = __ALIGN_MASK(index, align_mask);
area_off = index;

end = index + nr;
if (end > size)
return end;
//检查从 index 到 end 的区域是否完全由零位组成。如果在范围内找到非零位,则说明区域不连续
i = find_next_bit(map, end, index);
if (i < end) {
area_bits = i - area_off;
/* 记住最大未使用区域,最佳对齐*/
if (area_bits > *largest_bits || //当前跳过的空闲区域比之前记录的最大空闲区域更大
(area_bits == *largest_bits && *largest_off &&
(!area_off || __ffs(area_off) > __ffs(*largest_off)))) {
*largest_off = area_off;
*largest_bits = area_bits;
}

start = i + 1;
goto again;
}
return index;
}

pcpu_block_update 更新块的元数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
/**
* pcpu_block_update - 更新给定 free area 的块
* @block:兴趣块
* @start:块中的起始偏移
* @end:块中的末端偏移
*
* 在给定已知空闲区域的情况下更新区块。 区域 [start, end] 应为块内的整个可用区域。 如果 contig 提示相等,则选择最佳起始偏移量。
*/
static void pcpu_block_update(struct pcpu_block_md *block, int start, int end)
{
int contig = end - start;
//更新 first_free 和边界空闲信息
block->first_free = min(block->first_free, start);
if (start == 0)
block->left_free = contig;

if (end == block->nr_bits)
block->right_free = contig;
//更新 contig_hint 和 scan_hint

//空闲区域大于当前的 contig_hint
if (contig > block->contig_hint) {
/*将旧的 contig_hint 提升为新的 scan_hint */
if (start > block->contig_hint_start) {
if (block->contig_hint > block->scan_hint) {
block->scan_hint_start =
block->contig_hint_start;
block->scan_hint = block->contig_hint;
} else if (start < block->scan_hint_start) {
/*
* The old contig_hint == scan_hint. But, the
* new contig is larger so hold the invariant
* scan_hint_start < contig_hint_start.
*/
block->scan_hint = 0;
}
} else {
block->scan_hint = 0;
}
block->contig_hint_start = start;
block->contig_hint = contig;
} else if (contig == block->contig_hint) { //空闲区域等于当前的 contig_hint
//选择更优的起始位置
if (block->contig_hint_start &&
(!start ||
__ffs(start) > __ffs(block->contig_hint_start))) {
/* start has a better alignment so use it */
block->contig_hint_start = start;
if (start < block->scan_hint_start &&
block->contig_hint > block->scan_hint)
block->scan_hint = 0;
} else if (start > block->scan_hint_start ||
block->contig_hint > block->scan_hint) {
/*
* Knowing contig == contig_hint, update the scan_hint
* if it is farther than or larger than the current
* scan_hint.
*/
block->scan_hint_start = start;
block->scan_hint = contig;
}
} else { //空闲区域小于当前的 contig_hint
/*
* The region is smaller than the contig_hint. So only update
* the scan_hint if it is larger than or equal and farther than
* the current scan_hint.
*/
if ((start < block->contig_hint_start &&
(contig > block->scan_hint ||
(contig == block->scan_hint &&
start > block->scan_hint_start)))) {
block->scan_hint_start = start;
block->scan_hint = contig;
}
}
}

pcpu_block_update_scan 更新块的元数据(metadata block

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/*
* pcpu_block_update_scan - 从扫描中更新给定空闲区域的块
* @chunk:感兴趣的块
* @bit_off:块偏移量
* @bits:自由区大小
*
* 找到最终的分配点,首先通过 pcpu_find_block_fit() 找到一个可以保存分配的块,然后是 pcpu_alloc_area() 使用扫描。 当 allocation 需要特定的对齐方式时,我们可能会无意中创建在 alloc 或 free paths 中看不到的漏洞。
*
* 这将采用给定的自由区域孔并更新一个块,因为它可能会改变scan_hint。 我们需要向后扫描以确保我们不会错过对齐中的空闲位。
*/
static void pcpu_block_update_scan(struct pcpu_chunk *chunk, int bit_off,
int bits)
{
int s_off = pcpu_off_to_block_off(bit_off);
int e_off = s_off + bits;
int s_index, l_bit;
struct pcpu_block_md *block;

if (e_off > PCPU_BITMAP_BLOCK_BITS)
return;

s_index = pcpu_off_to_block_index(bit_off);
block = chunk->md_blocks + s_index;

/* 在对齐跳过空闲位的情况下向后扫描 找到最后一个已分配的位*/
l_bit = find_last_bit(pcpu_index_alloc_map(chunk, s_index), s_off);
s_off = (s_off == l_bit) ? 0 : l_bit + 1;

pcpu_block_update(block, s_off, e_off);
}

pcpu_block_update_hint_alloc 更新块的元数据(分配路径)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
/**
* pcpu_block_update_hint_alloc - 在分配路径上更新提示
* @chunk:感兴趣的块
* @bit_off:块偏移量
* @bits:请求大小
*
* 更新分配路径的元数据。 只有在 chunk 的 contig 提示损坏时,才需要通过完全扫描刷新元数据。 如果块的 contig 提示被破坏,则需要进行块级扫描。
*/
static void pcpu_block_update_hint_alloc(struct pcpu_chunk *chunk, int bit_off,
int bits)
{
struct pcpu_block_md *chunk_md = &chunk->chunk_md;
int nr_empty_pages = 0;
struct pcpu_block_md *s_block, *e_block, *block;
int s_index, e_index; /* block indexes of the freed allocation */
int s_off, e_off; /* block offsets of the freed allocation */

/*
* 计算每个区块偏移量。计算使用包含范围,但生成的偏移量为 [start, end)。
* e_index始终指向范围中的最后一个块。
*/
s_index = pcpu_off_to_block_index(bit_off);
e_index = pcpu_off_to_block_index(bit_off + bits - 1);
s_off = pcpu_off_to_block_off(bit_off);
e_off = pcpu_off_to_block_off(bit_off + bits - 1) + 1;

s_block = chunk->md_blocks + s_index;
e_block = chunk->md_blocks + e_index;

/*
* 更新起始块 (s_block) 的元数据
*/
if (s_block->contig_hint == PCPU_BITMAP_BLOCK_BITS)
nr_empty_pages++;

/*
* 如果分配取代了 block->first_free 必须更新。
* 如果分配中断了contig_hint,则需要扫描以恢复此提示。
*/
if (s_off == s_block->first_free)
s_block->first_free = find_next_zero_bit(
pcpu_index_alloc_map(chunk, s_index),
PCPU_BITMAP_BLOCK_BITS,
s_off + bits);
//分配区域与 scan_hint 或 contig_hint 重叠,清除对应的提示信息
if (pcpu_region_overlap(s_block->scan_hint_start,
s_block->scan_hint_start + s_block->scan_hint,
s_off,
s_off + bits))
s_block->scan_hint = 0;

if (pcpu_region_overlap(s_block->contig_hint_start,
s_block->contig_hint_start +
s_block->contig_hint,
s_off,
s_off + bits)) {
/* block contig hint is broken - scan to fix it */
if (!s_off)
s_block->left_free = 0;
pcpu_block_refresh_hint(chunk, s_index);
} else {
/* 如果分配区域未破坏 contig_hint,手动更新 left_free 和 right_free */
s_block->left_free = min(s_block->left_free, s_off);
if (s_index == e_index)
s_block->right_free = min_t(int, s_block->right_free,
PCPU_BITMAP_BLOCK_BITS - e_off);
else
s_block->right_free = 0;
}

/*
* Update e_block.
*/
if (s_index != e_index) {
if (e_block->contig_hint == PCPU_BITMAP_BLOCK_BITS)
nr_empty_pages++;

/*
* When the allocation is across blocks, the end is along
* the left part of the e_block.
*/
e_block->first_free = find_next_zero_bit(
pcpu_index_alloc_map(chunk, e_index),
PCPU_BITMAP_BLOCK_BITS, e_off);

if (e_off == PCPU_BITMAP_BLOCK_BITS) {
/* reset the block */
e_block++;
} else {
if (e_off > e_block->scan_hint_start)
e_block->scan_hint = 0;

e_block->left_free = 0;
if (e_off > e_block->contig_hint_start) {
/* contig hint is broken - scan to fix it */
pcpu_block_refresh_hint(chunk, e_index);
} else {
e_block->right_free =
min_t(int, e_block->right_free,
PCPU_BITMAP_BLOCK_BITS - e_off);
}
}

/* 更新中间块的元数据 */
nr_empty_pages += (e_index - s_index - 1);
for (block = s_block + 1; block < e_block; block++) {
block->scan_hint = 0;
block->contig_hint = 0;
block->left_free = 0;
block->right_free = 0;
}
}

//更新块组的元数据
/*
* 如果分配不是原子的,则某些块可能不会填充页面,而我们在此处将其考虑在内。 填充页面时,将使用 pcpu_chunk_populated() 重新添加页面数。
*/
//更新空闲页计数
if (nr_empty_pages)
pcpu_update_empty_pages(chunk, -nr_empty_pages);

if (pcpu_region_overlap(chunk_md->scan_hint_start,
chunk_md->scan_hint_start +
chunk_md->scan_hint,
bit_off,
bit_off + bits))
chunk_md->scan_hint = 0;

/*
* The only time a full chunk scan is required is if the chunk
* contig hint is broken. Otherwise, it means a smaller space
* was used and therefore the chunk contig hint is still correct.
*/
if (pcpu_region_overlap(chunk_md->contig_hint_start,
chunk_md->contig_hint_start +
chunk_md->contig_hint,
bit_off,
bit_off + bits))
pcpu_chunk_refresh_hint(chunk, false);
}

pcpu_alloc_area 从 pcpu_chunk 中分配一段内存区域

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
/**
* pcpu_alloc_area - 从pcpu_chunk分配区域
* @chunk:感兴趣的块
* @alloc_bits:以分配单位为单位的请求大小
* @align:区域对齐(最大PAGE_SIZE)
* @start:bit_off开始搜索
*
* 函数从 chunk 的分配位图中搜索一个满足大小和对齐要求的空闲区域。如果找到合适的区域,它会更新分配位图和边界位图,并返回分配区域的偏移量;如果找不到,则返回 -1。
*
*返回:
* 成功时以 @chunk 分配 addr 偏移量。
* -1 如果未找到匹配的区域。
*/
static int pcpu_alloc_area(struct pcpu_chunk *chunk, int alloc_bits,
size_t align, int start)
{
struct pcpu_block_md *chunk_md = &chunk->chunk_md;
size_t align_mask = (align) ? (align - 1) : 0;
unsigned long area_off = 0, area_bits = 0;
int bit_off, end, oslot;

lockdep_assert_held(&pcpu_lock); //确保调用此函数时持有 pcpu_lock 锁,以保证线程安全

oslot = pcpu_chunk_slot(chunk);

/*
* Search to find a fit.
*/
end = min_t(int, start + alloc_bits + PCPU_BITMAP_BLOCK_BITS,
pcpu_chunk_map_bits(chunk)); //确定搜索的结束位置
bit_off = pcpu_find_zero_area(chunk->alloc_map, end, start, alloc_bits,
align_mask, &area_off, &area_bits); //在分配位图中搜索满足条件的空闲区域
if (bit_off >= end)
return -1;

if (area_bits)
//如果找到的区域跨越多个块(area_bits 非零),调用 pcpu_block_update_scan 更新扫描信息
pcpu_block_update_scan(chunk, area_off, area_bits);

/* update alloc map */
bitmap_set(chunk->alloc_map, bit_off, alloc_bits);

/* update 边界位图 */
set_bit(bit_off, chunk->bound_map);
bitmap_clear(chunk->bound_map, bit_off + 1, alloc_bits - 1);
set_bit(bit_off + alloc_bits, chunk->bound_map);

chunk->free_bytes -= alloc_bits * PCPU_MIN_ALLOC_SIZE;

/* 更新第一个免费位 */
if (bit_off == chunk_md->first_free)
chunk_md->first_free = find_next_zero_bit(
chunk->alloc_map,
pcpu_chunk_map_bits(chunk),
bit_off + alloc_bits);
//更新块的分配提示信息
pcpu_block_update_hint_alloc(chunk, bit_off, alloc_bits);
//根据分配后的状态重新调整块的位置
pcpu_chunk_relocate(chunk, oslot);

return bit_off * PCPU_MIN_ALLOC_SIZE;
}

pcpu_alloc_noprof 分配 perCPU 内存

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
/**
* pcpu_alloc - percpu 分配器
* @size:要分配的区域大小(以字节为单位)
* @align:区域对齐(最大PAGE_SIZE)
* @reserved:从保留的 chunk 分配(如果可用)
* @gfp:分配标志
*
* 分配 @size 字节的 percpu 区域,@align对齐。 如果 @gfp 不包含 %GFP_KERNEL,则分配是原子的。如果 @gfp 已__GFP_NOWARN则不会在无效或失败的分配请求上触发警告。
*
*返回:
* 成功时指向已分配区域的 Percpu 指针,失败时为 NULL。
*/
void __percpu *pcpu_alloc_noprof(size_t size, size_t align, bool reserved,
gfp_t gfp)
{
gfp_t pcpu_gfp;
bool is_atomic;
bool do_warn;
struct obj_cgroup *objcg = NULL;
static int warn_limit = 10;
struct pcpu_chunk *chunk, *next;
const char *err;
int slot, off, cpu, ret;
unsigned long flags;
void __percpu *ptr;
size_t bits, bit_align;

gfp = current_gfp_context(gfp);
/* 可以传递给后备分配器的白名单标志 */
pcpu_gfp = gfp & (GFP_KERNEL | __GFP_NORETRY | __GFP_NOWARN);
is_atomic = !gfpflags_allow_blocking(gfp);
do_warn = !(gfp & __GFP_NOWARN);

/*
* 现在的最小分配大小为 PCPU_MIN_ALLOC_SIZE,因此对齐必须至少为该字节数。
* 分配可能具有内部碎片,最多可四舍五入 PCPU_MIN_ALLOC_SIZE - 1 个字节。
*/
if (unlikely(align < PCPU_MIN_ALLOC_SIZE))
align = PCPU_MIN_ALLOC_SIZE;

size = ALIGN(size, PCPU_MIN_ALLOC_SIZE);
bits = size >> PCPU_MIN_ALLOC_SHIFT;
bit_align = align >> PCPU_MIN_ALLOC_SHIFT;

if (unlikely(!size || size > PCPU_MIN_UNIT_SIZE || align > PAGE_SIZE ||
!is_power_of_2(align))) {
WARN(do_warn, "illegal size (%zu) or align (%zu) for percpu allocation\n",
size, align);
return NULL;
}

if (unlikely(!pcpu_memcg_pre_alloc_hook(size, gfp, &objcg)))
return NULL;

if (!is_atomic) {
/*
* pcpu_balance_workfn() 在此互斥锁下分配内存,它可能会等待内存回收。
允许当前任务在内存压力下成为 OOM 受害者。
*/
if (gfp & __GFP_NOFAIL) {
mutex_lock(&pcpu_alloc_mutex);
} else if (mutex_lock_killable(&pcpu_alloc_mutex)) { //可中断的锁获取函数。如果线程在等待锁时被信号中断,函数会返回非零值
pcpu_memcg_post_alloc_hook(objcg, NULL, 0, size); //如果锁获取失败(被中断),调用 pcpu_memcg_post_alloc_hook 释放与内存控制组相关的资源
return NULL;
}
}

spin_lock_irqsave(&pcpu_lock, flags);

/*从 reserved chunk 提供 reserved allocations(如果可用) */
if (reserved && pcpu_reserved_chunk) {
chunk = pcpu_reserved_chunk;

off = pcpu_find_block_fit(chunk, bits, bit_align, is_atomic);
if (off < 0) {
err = "alloc from reserved chunk failed";
goto fail_unlock;
}

off = pcpu_alloc_area(chunk, bits, bit_align, off);
if (off >= 0)
goto area_found;

err = "alloc from reserved chunk failed";
goto fail_unlock;
}

restart:
/* 搜索普通块 */
/* search through normal chunks */
for (slot = pcpu_size_to_slot(size); slot <= pcpu_free_slot; slot++) {
//pcpu_chunk_lists 是一个按大小分类的内存块链表数组,每个链表存储特定大小范围的内存块
//pcpu_chunk_lists的第一个对象由pcpu_chunk_relocate(pcpu_first_chunk, -1);设置
list_for_each_entry_safe(chunk, next, &pcpu_chunk_lists[slot],
list) {
//检查当前内存块是否有足够的空间容纳请求的内存
off = pcpu_find_block_fit(chunk, bits, bit_align,
is_atomic);
if (off < 0) { //当前块无法满足分配需求
if (slot < PCPU_SLOT_FAIL_THRESHOLD)
pcpu_chunk_move(chunk, 0); //用 pcpu_chunk_move 将块移动到更合适的分类
continue;
}

off = pcpu_alloc_area(chunk, bits, bit_align, off); //在块中分配内存
if (off >= 0) {
pcpu_reintegrate_chunk(chunk); //更新块的状态
goto area_found;
}
}
}

spin_unlock_irqrestore(&pcpu_lock, flags);

if (is_atomic) {
err = "atomic alloc failed, no space left";
goto fail;
}

/* 没有剩余空间。 创建一个新数据块。 */
if (list_empty(&pcpu_chunk_lists[pcpu_free_slot])) {
chunk = pcpu_create_chunk(pcpu_gfp);
if (!chunk) {
err = "failed to allocate new chunk";
goto fail;
}

spin_lock_irqsave(&pcpu_lock, flags);
pcpu_chunk_relocate(chunk, -1);
} else {
spin_lock_irqsave(&pcpu_lock, flags);
}

goto restart;

area_found:
pcpu_stats_area_alloc(chunk, size);
//空闲的已填充页面数低于阈值
if (pcpu_nr_empty_pop_pages < PCPU_EMPTY_POP_PAGES_LOW)
pcpu_schedule_balance_work(); //调度平衡工作

spin_unlock_irqrestore(&pcpu_lock, flags);

/* 如果不是所有页面都已经存在,则填充 */
if (!is_atomic) {
unsigned int page_end, rs, re;

rs = PFN_DOWN(off);
page_end = PFN_UP(off + size);
//遍历 chunk->populated 位图中未填充的页面范围
for_each_clear_bitrange_from(rs, re, chunk->populated, page_end) {
WARN_ON(chunk->immutable);
//填充页面
ret = pcpu_populate_chunk(chunk, rs, re, pcpu_gfp);

spin_lock_irqsave(&pcpu_lock, flags);
if (ret) {
pcpu_free_area(chunk, off);
err = "failed to populate";
goto fail_unlock;
}
pcpu_chunk_populated(chunk, rs, re);
spin_unlock_irqrestore(&pcpu_lock, flags);
}

mutex_unlock(&pcpu_alloc_mutex);
}

/* 遍历所有可能的 CPU,将分配区域初始化为零 */
for_each_possible_cpu(cpu)
memset((void *)pcpu_chunk_addr(chunk, cpu, 0) + off, 0, size);

ptr = __addr_to_pcpu_ptr(chunk->base_addr + off);
kmemleak_alloc_percpu(ptr, size, gfp);

trace_percpu_alloc_percpu(_RET_IP_, reserved, is_atomic, size, align,
chunk->base_addr, off, ptr,
pcpu_obj_full_size(size), gfp);

pcpu_memcg_post_alloc_hook(objcg, chunk, off, size); //执行分配后的钩子操作

pcpu_alloc_tag_alloc_hook(chunk, off, size);

return ptr;

fail_unlock:
spin_unlock_irqrestore(&pcpu_lock, flags);
fail:
trace_percpu_alloc_percpu_fail(reserved, is_atomic, size, align);

if (do_warn && warn_limit) {
pr_warn("allocation failed, size=%zu align=%zu atomic=%d, %s\n",
size, align, is_atomic, err);
if (!is_atomic)
dump_stack();
if (!--warn_limit)
pr_info("limit reached, disable warning\n");
}

if (is_atomic) {
/* see the flag handling in pcpu_balance_workfn() */
pcpu_atomic_alloc_failed = true;
pcpu_schedule_balance_work();
} else {
mutex_unlock(&pcpu_alloc_mutex);
}

pcpu_memcg_post_alloc_hook(objcg, NULL, 0, size);

return NULL;
}

pcpu_chunk_addr_search 确定包含指定地址的块

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
/**
* pcpu_addr_in_chunk - 检查地址是否由此块提供
* @chunk:感兴趣的块
* @addr:percpu 地址
*
*返回:
* 如果地址从此 chunk 提供,则为 True。
*/
static bool pcpu_addr_in_chunk(struct pcpu_chunk *chunk, void *addr)
{
void *start_addr, *end_addr;

if (!chunk)
return false;

start_addr = chunk->base_addr + chunk->start_offset;
end_addr = chunk->base_addr + chunk->nr_pages * PAGE_SIZE -
chunk->end_offset;

return addr >= start_addr && addr < end_addr;
}

/**
* pcpu_chunk_addr_search - 确定包含指定地址的块
* @addr:需要确定 chunk 的地址。
*
* 这是一个内部函数,用于处理除静态分配之外的所有分配。
* 静态 percpu 地址值绝不应传递到分配器中。
*
*返回:
* 找到的 chunk 的地址。
*/
static struct pcpu_chunk *pcpu_chunk_addr_search(void *addr)
{
/* i在动态区域中 (First Chunk)? */
if (pcpu_addr_in_chunk(pcpu_first_chunk, addr))
return pcpu_first_chunk;

/* 是否在预留区域中?*/
if (pcpu_addr_in_chunk(pcpu_reserved_chunk, addr))
return pcpu_reserved_chunk;

/*
* 地址是相对于 unit0 的,它可能未使用,因此未映射。
* 先将地址偏移到当前处理器的单位空间,然后再在 vmalloc 空间中查找它。
* 请注意,这里可以使用任何可能的 cpu id,因此无需担心抢占或 cpu 热插拔。
*/
addr += pcpu_unit_offsets[raw_smp_processor_id()];
return pcpu_get_page_chunk(pcpu_addr_to_page(addr)); //(struct pcpu_chunk *)page->private;
}

pcpu_free_area 释放指定偏移量的内存区域

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
/**
* pcpu_free_area - 释放相应的偏移量
* @chunk:感兴趣的块
* @off:addr offset 到 chunk
*
* 此函数使用边界位图确定要释放的分配的大小,并清除分配映射。
*
*返回:
* 释放的字节数。
*/
static int pcpu_free_area(struct pcpu_chunk *chunk, int off)
{
struct pcpu_block_md *chunk_md = &chunk->chunk_md;
int bit_off, bits, end, oslot, freed;

lockdep_assert_held(&pcpu_lock);
pcpu_stats_area_dealloc(chunk);

oslot = pcpu_chunk_slot(chunk);

bit_off = off / PCPU_MIN_ALLOC_SIZE;

/* find end index */
end = find_next_bit(chunk->bound_map, pcpu_chunk_map_bits(chunk),
bit_off + 1);
bits = end - bit_off;
bitmap_clear(chunk->alloc_map, bit_off, bits);

freed = bits * PCPU_MIN_ALLOC_SIZE;

/* update metadata */
chunk->free_bytes += freed;

/* update first free bit */
chunk_md->first_free = min(chunk_md->first_free, bit_off);

pcpu_block_update_hint_free(chunk, bit_off, bits);

pcpu_chunk_relocate(chunk, oslot);

return freed;
}

free_percpu 释放分配的内存

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
/**
* free_percpu - 空闲 perCPU 区域
* @ptr:指向要释放的区域的指针
*
* @ptr 的 percpu 区域。
*
*上下文:
* 可以从原子上下文中调用。
*/
void free_percpu(void __percpu *ptr)
{
void *addr;
struct pcpu_chunk *chunk;
unsigned long flags;
int size, off;
bool need_balance = false;

if (!ptr)
return;

kmemleak_free_percpu(ptr);

addr = __pcpu_ptr_to_addr(ptr);
chunk = pcpu_chunk_addr_search(addr); //查找该地址所属的 pcpu_chunk(per-CPU 内存块
off = addr - chunk->base_addr; //计算该地址在内存块中的偏移量(off)

spin_lock_irqsave(&pcpu_lock, flags);
size = pcpu_free_area(chunk, off); //释放指定偏移量的内存区域,并返回释放的大小

pcpu_alloc_tag_free_hook(chunk, off, size);

pcpu_memcg_free_hook(chunk, off, size);

//检查是否需要平衡或回收
/*
* 如果有多个完全空闲的区块,请唤醒 grim reaper。
* 如果块是隔离的,它可能正在被回收。 让 reclaim 管理该块的清理。
*/
if (!chunk->isolated && chunk->free_bytes == pcpu_unit_size) {
struct pcpu_chunk *pos;

list_for_each_entry(pos, &pcpu_chunk_lists[pcpu_free_slot], list)
if (pos != chunk) {
need_balance = true;
break;
}
} else if (pcpu_should_reclaim_chunk(chunk)) {
pcpu_isolate_chunk(chunk);
need_balance = true;
}

trace_percpu_free_percpu(chunk->base_addr, off, ptr);

spin_unlock_irqrestore(&pcpu_lock, flags);

if (need_balance)
pcpu_schedule_balance_work();
}

pcpu_balance_free: 异步回收空闲的Per-CPU内存块

此函数是内核Per-CPU分配器后台平衡工作(pcpu_balance_workfn)的一个核心组成部分。它的根本作用是扮演一个垃圾回收器的角色, 负责识别并销毁系统中多余的、完全空闲的per-cpu内存块(chunks), 以便将它们占用的物理内存和虚拟地址空间返还给主系统

由于销毁一个内存块(chunk)是一个相对耗时且可能导致睡眠的操作(因为它需要归还多个物理页面给伙伴系统), 这个任务必须在可以安全睡眠的后台工作线程中异步执行, 而不能在对性能和延迟要求极高的per-cpu变量分配/释放的”快路径”上进行。

该函数最核心的实现原理是一个两阶段的”锁-释放-重锁”(lock-unlock-relock)模式, 以兼顾安全性和性能:

  1. 阶段一: 快速选择 (在自旋锁保护下)
    • 函数首先获取保护per-cpu核心数据结构的自旋锁pcpu_lock
    • 它快速地遍历全局的”空闲块列表”(pcpu_free_slot)。
    • 它的策略是永远保留至少一个空闲块作为备用, 以便快速响应未来的分配请求。因此, 它会跳过列表中的第一个块。
    • 对于其余的空闲块, 它根据empty_only标志来决定是否要回收它们, 并将选中的块从全局列表移动到一个临时的、函数本地的to_free链表中。这个移动操作非常快。
  2. 阶段二: 慢速销毁 (在释放自旋锁后)
    • 在完成了快速的选择阶段后, 函数会释放pcpu_lock自旋锁。这是至关重要的一步, 因为接下来的操作很慢, 且可能需要睡眠。释放锁可以避免长时间持有锁而导致系统其他部分停顿。
    • 现在, 它安全地遍历临时的to_free链表。对于链表中的每一个块:
      • 它会先”去填充”(depopulate)该块, 即将该块所持有的所有物理内存页面逐一返还给内核的伙伴系统(buddy allocator)。
      • 然后, 它调用pcpu_destroy_chunk来释放pcpu_chunk结构体本身, 以及它所占用的虚拟地址空间(vmalloc area)。
      • 在循环中, 它会调用cond_resched(), 主动检查是否需要进行任务调度。这可以防止在销毁大量块时长时间独占CPU, 从而保持系统的响应性。
    • 在完成了所有销毁操作后, 函数重新获取pcpu_lock自旋锁, 因为调用它的pcpu_balance_workfn函数期望在它返回时锁仍然是被持有的。

empty_only标志的意义:
pcpu_balance_workfn会调用此函数两次, 每次使用不同的empty_only值, 这是一种精细的平衡策略。

  • pcpu_balance_free(false) (首次调用, “激进模式”): 此时, 它会回收所有多余的空闲块, 不管这些块是否还预留着已填充但未使用的页面
  • pcpu_balance_free(true) (末次调用, “保守模式”): 在经过了中间的”回收”和”填充”步骤后, 可能会产生一些新的、不仅自身空闲、连预留页面也都没有的”纯空”块。这次调用会清理掉这些纯空块, 但会保留那些虽然空闲但仍持有预留页面的块, 因为这些页面可能马上就会被用到。

代码逐行解析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
/**
* pcpu_balance_free - 管理空闲块的数量
* @empty_only: 仅当没有已填充页面时才释放块
*
* 如果 empty_only 为 %false, 则回收所有完全空闲的块, 不论已填充页面的数量如何.
* 否则, 仅回收那些没有已填充页面的块.
*
* 上下文:
* 必须持有 pcpu_lock (但可以被临时释放)
*/
static void pcpu_balance_free(bool empty_only)
{
/* 在栈上创建一个临时的链表头, 用于存放待释放的块. */
LIST_HEAD(to_free);
/* 获取指向全局"完全空闲"块链表的指针. */
struct list_head *free_head = &pcpu_chunk_lists[pcpu_free_slot];
struct pcpu_chunk *chunk, *next;

/* 静态断言, 确保调用者已经持有了 pcpu_lock. */
lockdep_assert_held(&pcpu_lock);

/*
* 阶段一: 在锁保护下, 快速选择要释放的块.
* 策略: 保留多个未使用的块没有意义, 且虚拟内存区域可能稀缺, 因此销毁除第一个之外的所有空闲块.
*/
list_for_each_entry_safe(chunk, next, free_head, list) {
/* 这是一个不应该被销毁的特殊块, 如果遇到, 说明有bug. */
WARN_ON(chunk->immutable);

/* 保留第一个块作为备用, 跳过它. */
if (chunk == list_first_entry(free_head, struct pcpu_chunk, list))
continue;

/*
* 根据 empty_only 标志决定是否要移动这个块.
* 如果是激进模式(!empty_only), 或者如果是保守模式且块中已无预留页面, 则满足条件.
*/
if (!empty_only || chunk->nr_empty_pop_pages == 0)
/* 将块从全局链表移动到临时的 to_free 链表. 这是一个快速的指针操作. */
list_move(&chunk->list, &to_free);
}

/* 如果没有找到需要释放的块, 直接返回, 避免不必要的锁释放和重获取. */
if (list_empty(&to_free))
return;

/*
* 阶段二: 释放锁, 执行慢速的销毁操作.
*/
spin_unlock_irq(&pcpu_lock);
list_for_each_entry_safe(chunk, next, &to_free, list) {
unsigned int rs, re;

/* 遍历块中所有被标记为"已填充"的页面位图. */
for_each_set_bitrange(rs, re, chunk->populated, chunk->nr_pages) {
/* "去填充"块, 即将物理页面返还给伙伴系统. 这是个慢操作. */
pcpu_depopulate_chunk(chunk, rs, re);
/* 临时重获取锁, 以原子方式更新全局的页面计数器. */
spin_lock_irq(&pcpu_lock);
pcpu_chunk_depopulated(chunk, rs, re);
spin_unlock_irq(&pcpu_lock);
}
/* 销毁块结构体本身和其虚拟地址空间. 这是个慢操作. */
pcpu_destroy_chunk(chunk);
/* 主动让出CPU, 允许其他任务运行, 保持系统响应. */
cond_resched();
}
/* 在函数返回前, 重新获取锁, 以满足调用者的上下文要求. */
spin_lock_irq(&pcpu_lock);
}

Per-CPU 内存池填充与回收

这两个函数, pcpu_balance_populatedpcpu_reclaim_populated, 是Linux内核Per-CPU分配器后台平衡机制中相互协作、互为补充的两个核心组件。它们共同的目标是动态维护一个”水位线”——即预先分配并映射好、随时可以被原子分配器使用的”热”页面池, 以确保无锁、高速的原子分配请求总能快速得到满足, 同时在系统空闲时回收多余的页面以节约内存


pcpu_balance_populated: 主动填充页面池

此函数扮演着主动防御的角色。它的核心原理是检查当前”热”页面的数量是否低于预设的最低水位线 (PCPU_EMPTY_POP_PAGES_LOW), 如果是, 它就会主动从主内存分配器申请新的物理页面来”填充”(populate)per-cpu内存块, 直到页面数量达到最高水位线(PCPU_EMPTY_POP_PAGES_HIGH)

工作流程与原理:

  1. 确定目标: 函数首先计算需要填充多少页面(nr_to_pop)才能达到最高水位线。它有一个特殊的”应急模式”: 如果之前有原子分配失败过(pcpu_atomic_alloc_failed), 它会无条件地尝试填充到最高水位, 以尽快恢复服务能力。
  2. 选择填充位置: 它会优先选择那些已经被部分使用的块(chunk)来进行填充。这样做是为了减少内存碎片, 尽量将分配集中在少数几个块中, 而不是让很多块都处于半满状态。
  3. “锁-释放-重锁”执行: 与pcpu_balance_free类似, 它在循环中执行耗时的页面分配操作(pcpu_populate_chunk)时, 会临时释放pcpu_lock自旋锁, 以避免长时间阻塞快路径。分配完成后, 它会重新获取锁, 以原子方式更新块的populated位图和全局计数器。
  4. 创建新块: 如果遍历完所有现存的块都无法满足填充目标(例如, 所有块都已满), 函数会尝试创建一个全新的块(pcpu_create_chunk), 然后通过goto retry_pop重新开始整个填充过程。
  5. 无IO分配: 它使用GFP_KERNEL | __GFP_NORETRY | __GFP_NOWARN作为内存分配标志。__GFP_NORETRY__GFP_NOWARN是很重要的优化: 它告诉主内存分配器, “请尽力而为, 但如果现在没有可用内存, 不要进入耗时的内存回收流程, 也不要打印OOM警告, 直接失败返回即可”。这是因为后台平衡工作是一种”尽力而为”的预取机制, 它不应该在系统内存紧张时通过触发内存回收来加剧系统压力。真正的内存压力应该由实际的分配请求来触发。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
/* pcpu_balance_populated - 管理已填充页面的数量 */
static void pcpu_balance_populated(void)
{
/* 使用特殊的gfp标志, 尽力而为, 不触发内存回收, 不打印警告. */
const gfp_t gfp = GFP_KERNEL | __GFP_NORETRY | __GFP_NOWARN;
struct pcpu_chunk *chunk;
int slot, nr_to_pop, ret;

lockdep_assert_held(&pcpu_lock);

retry_pop:
/* 如果之前有原子分配失败, 则进入应急模式, 尝试填充到最高水位. */
if (pcpu_atomic_alloc_failed) {
nr_to_pop = PCPU_EMPTY_POP_PAGES_HIGH;
pcpu_atomic_alloc_failed = false;
} else {
/* 否则, 计算需要填充的页面数, 以达到最高水位. */
nr_to_pop = clamp(PCPU_EMPTY_POP_PAGES_HIGH -
pcpu_nr_empty_pop_pages,
0, PCPU_EMPTY_POP_PAGES_HIGH);
}

/* 遍历所有可能包含空闲空间的块列表. */
for (slot = pcpu_size_to_slot(PAGE_SIZE); slot <= pcpu_free_slot; slot++) {
if (!nr_to_pop)
break;

/* 找到第一个有未填充页面的块. */
list_for_each_entry(chunk, &pcpu_chunk_lists[slot], list) {
nr_unpop = chunk->nr_pages - chunk->nr_populated;
if (nr_unpop)
break;
}

if (!nr_unpop)
continue;

/* 遍历块中所有未填充的页面范围. */
for_each_clear_bitrange(rs, re, chunk->populated, chunk->nr_pages) {
int nr = min_t(int, re - rs, nr_to_pop); // 计算本次要填充的数量.

spin_unlock_irq(&pcpu_lock); // 释放锁.
ret = pcpu_populate_chunk(chunk, rs, rs + nr, gfp); // 执行填充(慢操作).
cond_resched(); // 主动调度.
spin_lock_irq(&pcpu_lock); // 重获取锁.

if (!ret) { /* 成功 */
nr_to_pop -= nr;
pcpu_chunk_populated(chunk, rs, rs + nr); // 更新元数据.
} else { /* 失败 */
nr_to_pop = 0;
}
if (!nr_to_pop)
break;
}
}

/* 如果现有块都无法满足填充需求. */
if (nr_to_pop) {
spin_unlock_irq(&pcpu_lock);
chunk = pcpu_create_chunk(gfp); // 创建一个新块.
cond_resched();
spin_lock_irq(&pcpu_lock);
if (chunk) {
pcpu_chunk_relocate(chunk, -1); // 将新块放入合适的列表.
goto retry_pop; // 重新开始填充过程.
}
}
}

pcpu_reclaim_populated: 回收空闲的已填充页面

此函数扮演着被动清理的角色。当per-cpu的free操作导致某个块中的某些页面变为空闲时, 这些页面仍然占据着物理内存。此函数的原理是扫描那些被标记为”待回收”(to_depopulate)的块, 识别出其中完全空闲的页面, 并将这些页面返还给主内存系统(“去填充” - depopulate)

工作流程与原理:

  1. 处理”待回收”列表: 函数只处理pcpu_to_depopulate_slot列表中的块。一个块在何种条件下被放入这个列表是由free路径的逻辑决定的(通常是当块的空闲空间超过某个阈值时)。
  2. 反向扫描: 它以反向顺序扫描块中的页面。这是一个重要的优化, 目的是将所有仍在使用的分配”压缩”到块的前端, 从而让块的后端更容易形成连续的大片空闲页面, 使得整页回收的可能性最大化。
  3. “锁-释放-重锁”执行: 与填充操作类似, 它在执行pcpu_depopulate_chunk这个慢速操作时也会释放pcpu_lock
  4. 批量TLB刷新: “去填充”操作(本质上是unmap)会导致TLB(翻译后备缓冲器)中的条目失效。为了摊销昂贵的TLB刷新操作的成本, 此函数会记录下在一个块中所有被释放的页面范围, 然后在处理完整个块后, 调用pcpu_post_unmap_tlb_flush来执行一次批量的TLB刷新
  5. 块的最终归宿: 在扫描完一个块后, 会根据其最终状态决定其去向:
    • 重新整合(Reintegrate): 如果块变为了完全空闲, 或者全局的”热”页面水位线过低需要紧急补充, 这个块会被pcpu_reintegrate_chunk重新放回到活跃的分配列表中。
    • 搁置(Sideline): 如果块仍然包含部分分配, 但已经被清理过, 它会被移动到pcpu_sidelined_slot搁置列表中, 暂时不参与新的分配, 除非系统内存非常紧张。

在STM32H750单核系统上, pcpu_lock的保护作用、”锁-释放-重锁”模式避免阻塞快路径的原理、以及cond_resched维持系统响应性的机制都与之前分析的函数完全相同。这两个函数共同构成了一个精巧的后台内存池管理器, 确保了per-cpu分配器在提供极高性能的同时, 也能高效地利用系统内存。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
/* pcpu_reclaim_populated - 扫描待回收块并释放空闲页面 */
static void pcpu_reclaim_populated(void)
{
/* ... 变量声明 ... */
struct pcpu_chunk *chunk;
struct pcpu_block_md *block;
int freed_page_start, freed_page_end;
int i, end;
bool reintegrate;

lockdep_assert_held(&pcpu_lock);

/* 循环处理所有在"待回收"列表中的块. */
while ((chunk = list_first_entry_or_null(
&pcpu_chunk_lists[pcpu_to_depopulate_slot],
struct pcpu_chunk, list))) {
WARN_ON(chunk->immutable);

/* 反向扫描块中的页面, i 从最后一页开始. */
for (i = chunk->nr_pages - 1, end = -1; i >= 0; i--) {
/* 如果全局"热"页面水位过低, 立即停止回收并重新整合此块以应急. */
if (pcpu_nr_empty_pop_pages < PCPU_EMPTY_POP_PAGES_HIGH) {
reintegrate = true;
break;
}

/* 查找连续的、已填充且完全空闲的页面范围 (i, end). */
block = chunk->md_blocks + i;
if (block->contig_hint == PCPU_BITMAP_BLOCK_BITS &&
test_bit(i, chunk->populated)) {
if (end == -1)
end = i;
if (i > 0)
continue;
i--;
}

/* 如果存在活动范围,则取消填充 */
if (end == -1)
continue;

spin_unlock_irq(&pcpu_lock); // 释放锁.
pcpu_depopulate_chunk(chunk, i + 1, end + 1); // "去填充"(慢操作).
cond_resched(); // 主动调度.
spin_lock_irq(&pcpu_lock); // 重获取锁.
/* ... 更新元数据 ... */
pcpu_chunk_depopulated(chunk, i + 1, end + 1);
freed_page_start = min(freed_page_start, i + 1);
freed_page_end = max(freed_page_end, end + 1);

/* 重置范围并继续*/
end = -1;
}

/* 如果释放了任何页面, 执行一次批量的TLB刷新. */
if (freed_page_start < freed_page_end) {
spin_unlock_irq(&pcpu_lock);
pcpu_post_unmap_tlb_flush(chunk, freed_page_start, freed_page_end);
cond_resched();
spin_lock_irq(&pcpu_lock);
}

/* 根据最终状态决定块的去向: 重新整合或搁置. */
if (reintegrate || chunk->free_bytes == pcpu_unit_size)
pcpu_reintegrate_chunk(chunk);
else
list_move_tail(&chunk->list,
&pcpu_chunk_lists[pcpu_sidelined_slot]);
}
}

Per-CPU 异步平衡工作

此代码片段展示了Linux内核中per-cpu变量分配器的后台维护机制。per-cpu分配器是一种高度优化的内存分配方案, 它为系统中的每个CPU核心都预留了独立的内存区域, 用于存储同一变量的不同副本。当内核代码需要访问一个per-cpu变量时, 它可以直接访问其当前CPU核心的私有副本, 从而完全避免了昂贵的锁操作和缓存行伪共享(cache line false sharing), 极大地提升了性能。

然而, 这种高性能的分配/释放”快路径”(fast-path)背后, 需要一个”慢路径”(slow-path)的维护机制来管理其内部的内存池。这个维护机制就是由本代码片段中的**异步平衡工作(asynchronous balance work)**来实现的。

核心原理:

  1. 问题: per-cpu分配器将其内存池划分为多个”块”(chunks), 每个块由一或多页内存组成。在使用过程中, 某些块可能会被完全释放变为空闲, 而当空闲内存不足时, 又需要从主内存分配器申请新的页面来”填充”(populate)块。这些分配/释放整页内存的操作相对耗时, 且可能需要睡眠(例如, 等待内存回收), 因此绝对不能在要求高速、原子性的”快路径”(即alloc_percpu/free_percpu调用)中直接执行。
  2. 解决方案: 将这些耗时的维护工作(平衡内存池)异步化。当快路径代码检测到内存池状态失衡时(例如, 可用空间低于某个阈值, 或空闲块过多), 它不会自己去处理, 而是仅仅调用pcpu_schedule_balance_work调度一个后台工作
  3. 延迟启动: per-cpu分配器本身在内核启动的极早期阶段就需要被初始化, 此时工作队列(workqueue)子系统甚至还不可用。因此, 内核采用了一种延迟启动策略:
    • pcpu_async_enabled全局标志在启动时默认为false
    • 当内核初始化进行到subsys_initcall阶段时, 工作队列已经可用, percpu_enable_async函数会被调用, 将pcpu_async_enabled设置为true。从此, 异步平衡机制才正式启用。
  4. 后台执行: 内核的工作队列线程会在稍后的某个时间点, 从队列中取出pcpu_balance_work这个工作项, 并执行其处理函数pcpu_balance_workfn。这个函数运行在标准的内核线程上下文中, 可以安全地执行加锁、内存分配等可能导致睡眠的操作。

代码逐行解析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
/* pcpu_balance_workfn - 管理空闲块和已填充页面的数量 */
static void pcpu_balance_workfn(struct work_struct *work)
{
/*
* 保存当前的内存分配上下文, 并进入"不允许I/O"模式.
* 这是为了防止在平衡工作中分配内存时, 触发文件系统回写或块设备I/O, 从而导致死锁.
*/
unsigned int flags = memalloc_noio_save();
/* 获取互斥锁, 保护整个平衡操作的逻辑流程不被并发执行. */
mutex_lock(&pcpu_alloc_mutex);
/* 获取自旋锁并禁用中断, 保护核心数据结构不被快路径的 alloc/free 操作或其他CPU访问. */
spin_lock_irq(&pcpu_lock);

/*
* 注释解释了为什么调用两次 pcpu_balance_free.
* 这是一个多阶段的平衡策略:
* 1. pcpu_balance_free(false): 第一次尝试释放完全空闲的块.
* 2. pcpu_reclaim_populated(): 回收并整理已填充的页面, 可能会产生新的空闲块.
* 3. pcpu_balance_populated(): 根据需要, 填充新的页面以满足预留水位.
* 4. pcpu_balance_free(true): 第二次释放, 清理在第2、3步中可能新产生的空闲块.
*/
pcpu_balance_free(false);
pcpu_reclaim_populated();
pcpu_balance_populated();
pcpu_balance_free(true);

/* 按相反顺序释放锁和恢复上下文. */
spin_unlock_irq(&pcpu_lock);
mutex_unlock(&pcpu_alloc_mutex);
memalloc_noio_restore(flags);
}

/* 静态声明处理函数. */
static void pcpu_balance_workfn(struct work_struct *work);
/*
* 使用 DECLARE_WORK 静态地定义一个工作项.
* @name: pcpu_balance_work, 工作项的变量名.
* @func: pcpu_balance_workfn, 与此工作项关联的处理函数.
*/
static DECLARE_WORK(pcpu_balance_work, pcpu_balance_workfn);
/*
* __read_mostly 告诉编译器, 这个变量绝大多数情况下是只读的,
* 有助于优化, 例如将其放入只读内存节.
*/
static bool pcpu_async_enabled __read_mostly;
static bool pcpu_atomic_alloc_failed;

/* 调度平衡工作的函数. */
static void pcpu_schedule_balance_work(void)
{
/* 只有在异步模式被启用后, 才真正地调度工作. */
if (pcpu_async_enabled)
schedule_work(&pcpu_balance_work);
}
/*
* percpu_enable_async: 在内核启动后期启用异步管理.
*/
static int __init percpu_enable_async(void)
{
pcpu_async_enabled = true;
return 0;
}
/*
* 使用 subsys_initcall 注册.
* 这确保了此函数在工作队列子系统初始化之后, 但在大多数驱动程序初始化之前被调用.
*/
subsys_initcall(percpu_enable_async);