[TOC]
lib/xarray.c eXtensible Array (XArray) 可扩展数组 历史与背景 这项技术是为了解决什么特定问题而诞生的? XArray(eXtensible Array,可扩展数组)的诞生是为了替代并改进 Linux内核中一个非常重要但使用起来十分复杂的数据结构——Radix Tree 。Radix Tree长期以来是内核(尤其是页缓存 page cache)用于管理稀疏长整型索引到指针映射的核心组件,但它存在诸多问题:
复杂的API和使用模式 :Radix Tree的API不够直观,开发者需要自行处理很多棘手的边界情况,比如存储NULL指针、处理特殊的“异常条目”(exceptional entries),以及管理并发访问的锁。这使得代码容易出错且难以维护。
锁机制外部化 :Radix Tree本身不提供锁机制,使用者必须在外部实现自己的锁来保护数据结构,这增加了驱动和子系统开发者的负担。
内存预加载问题 :为了在持有锁时避免睡眠(因分配内存可能导致睡眠),Radix Tree的使用者通常需要“预加载”节点内存,这种机制复杂且可能浪费内存。
功能局限 :Radix Tree API缺少一些常见的功能,如“比较并交换”(compare-and-swap)操作,导致多个使用者需要自己实现类似的逻辑。此外,它只能存储指针或特殊的“异常条目”,灵活性不足。
XArray被设计为一个全新的抽象层,它保留了Radix Tree高效的底层实现,但提供了一个更安全、更易用、功能更丰富的API,从根本上解决了上述问题。
它的发展经历了哪些重要的里程碑或版本迭代?
首次提出 :XArray由Matthew Wilcox提出,旨在通过提供一个更现代化的API来简化内核开发并减少错误。
合入内核主线 :经过多次迭代和审查,XArray最终在Linux 4.20版本左右被合入主线内核。最初的合入包含了XArray的实现,并立刻转换了内核中最复杂、最重要的Radix Tree用户——页缓存(page cache) 。
持续替代 :在被合入之后,内核社区持续将其他使用Radix Tree和IDR(一种基于Radix Tree的ID分配机制)的子系统迁移到XArray上。
目前该技术的社区活跃度和主流应用情况如何? XArray现在是Linux内核中用于管理稀疏索引到指针映射的标准和首选 数据结构。它已经完全取代了Radix Tree在内核新代码中的地位,并且大量旧代码也已被迁移。
核心应用 :页缓存(page cache)是其最重要和最复杂的用户。
广泛使用 :它也被用于IDR(ID Allocator)的替代实现、I/O子系统(如io_uring)、内存管理等多个关键领域。
API演进 :它的API被认为是后续新数据结构(如Maple Tree)设计的范本。
核心原理与设计 它的核心工作原理是什么? XArray在概念上表现为一个可以自动扩容的、巨大的指针数组 ,其索引是unsigned long类型。 尽管其外部表现为数组,但其内部实现仍然基于经过优化的Radix Tree 结构。
树状结构 :与Radix Tree类似,XArray使用多级树来存储数据。索引(一个长整型)被拆分成多个比特块,每个比特块用于在树的一层中选择一个槽(slot)。通过逐层向下遍历,最终可以定位到与完整索引对应的指针。
动态分配 :XArray初始时不占用任何内存。当第一次向某个索引存储数据时,它会按需分配树节点。这使得它对于存储稀疏索引(索引之间间隔很大)非常高效。
内置锁 :XArray结构体中内嵌了一个自旋锁(spinlock),其API(如 xa_store()、xa_erase())会自动处理加锁和解锁,极大地简化了并发编程。
RCU安全查找 :查找操作(xa_load())是无锁的,并且利用了RCU(Read-Copy-Update)机制,允许读操作与写操作并行,提高了并发性能。
它的主要优势体现在哪些方面?
API简洁安全 :提供了类似数组的xa_load()和xa_store()接口,隐藏了树操作的复杂性。内置的锁机制避免了大量并发相关的bug。
功能丰富 :
支持多种数据类型 :除了指针,还可以直接存储整数值(通过xa_mk_value())和带标签的指针。
处理NULL值 :可以清晰地区分“从未使用的索引”和“存储了NULL指针的索引”,后者可以用于预留位置。
多索引条目 :支持将一个条目存储到一段连续的索引范围内,对这段范围内的任何索引进行查找都会返回同一个条目。
高性能 :查找操作无锁且RCU安全。对于密集索引的场景,其缓存效率很高。
高效的内存使用 :按需分配节点,对于稀疏数据非常节省内存。
它存在哪些已知的劣势、局限性或在特定场景下的不适用性?
不适合哈希索引 :XArray对连续或聚集的索引表现最优。如果将对象的哈希值作为索引,会导致索引分布稀疏且随机,这会降低其性能和内存效率,因为可能需要创建很深的树来存储单个条目。
索引范围限制 :索引必须是unsigned long类型,无法支持更大的索引范围。
非最优的范围操作 :虽然支持存储多索引条目,但对于需要高效进行大量范围操作(如查找所有与某个范围重叠的条目)的场景,新的Maple Tree数据结构可能更优。
使用场景 在哪些具体的业务或技术场景下,它是首选解决方案?请举例说明。 XArray是内核中将长整型ID映射到指针 的通用首选方案,特别适用于ID稀疏分布的场景。
例一:文件页缓存(Page Cache) 这是XArray最经典的应用。当内核需要读写文件时,它将文件的页偏移量(一个pgoff_t,本质上是unsigned long)作为索引,将指向内存中对应struct page的指针存储在文件的i_pages XArray中。由于用户可能只访问了大文件中的某几部分,这些页偏移量是稀疏的,XArray能高效地管理这种映射。
例二:ID分配器(IDR) 内核需要为各种对象(如IPC对象、网络设备等)分配唯一的32位整数ID。新的IDR实现就基于XArray,它使用XArray来查找一个未被使用的ID(即XArray中为NULL的索引),并将该ID与指向内核对象的指针关联起来。
例-三:I/O环形缓冲区(io_uring) io_uring接口允许用户空间和内核通过一个共享的环形缓冲区高效地提交I/O请求。内核端使用XArray来管理注册的文件描述符,将用户提供的fd作为索引,映射到内部的struct file指针。
是否有不推荐使用该技术的场景?为什么?
密集的小范围整数ID :如果需要管理的ID是密集的、从0开始且范围不大(例如,小于几千),那么使用普通的数组(void *array[])会比XArray更简单、更快,因为它没有树遍历的开销。
基于字符串或复杂键的查找 :XArray是基于整数索引的。如果需要通过字符串或其他复杂结构作为键来查找对象,应该使用哈希表(hashtable)。
需要高效范围扫描的场景 :如前所述,对于以范围为核心操作的内存管理场景(例如,管理一个进程的虚拟内存区域VMA),红黑树(rbtree)或新的Maple Tree通常是更好的选择。
对比分析 请将其 与 其他相似技术 进行详细对比。
特性
XArray
Radix Tree (已被取代)
IDR (旧实现)
普通数组
哈希表
功能概述
用于稀疏长整型索引到指针映射的通用、安全且功能丰富的数据结构。
Radix Tree的底层实现,API复杂且不安全。
基于Radix Tree的32位整数ID分配和查找机制。
用于密集、小范围索引的简单指针数组。
用于任意类型键到值的映射,通过哈希函数定位。
实现方式
优化的多级Radix Tree。
多级Radix Tree。
Radix Tree + 位图进行ID分配。
连续的内存块。
哈希函数 + 桶数组(通常是链表或树)。
API与安全性
非常高 。API简洁,内置锁,RCU安全查找。
低 。API复杂,需要外部锁,易用错。
中等 。API专用,比Radix Tree简单,但不如XArray通用。
非常高 。就是原生数组访问。
中等 。需要提供好的哈希函数和处理冲突。
稀疏索引支持
优秀 。专为此设计,内存效率高。
优秀 。
优秀 。
差 。会浪费大量空间存储空指针。
优秀 。
性能开销
查找为对数时间复杂度,但常数因子小。插入/删除有锁开销。
与XArray底层类似,但API使用不当会引入额外开销。
与Radix Tree类似。
常数时间 O(1) ,非常快。
平均为常数时间 O(1) ,最差为O(n)。
资源占用
与存储的条目数量成正比,非常节省。
与XArray类似。
与Radix Tree类似。
与索引范围最大值成正比,可能非常浪费。
需要预分配桶数组,占用固定内存。
典型用途
页缓存 、新IDR 、io_uring 、文件描述符管理。
(历史) 页缓存、旧IDR。
(历史) IPC对象ID、设备ID分配。
进程的文件描述符表(files_struct中的fd_array)。
dentry缓存、网络连接跟踪。
include/linux/xarray.h XARRAY_INIT 1 2 3 4 5 #define XARRAY_INIT(name, flags) { \ .xa_lock = __SPIN_LOCK_UNLOCKED(name.xa_lock), \ .xa_flags = flags, \ .xa_head = NULL, \ }
xa_init_flags 初始化带有标志的空 XArray 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 static inline void xa_init_flags (struct xarray *xa, gfp_t flags) { spin_lock_init(&xa->xa_lock); xa->xa_flags = flags; xa->xa_head = NULL ; }
xa_init 初始化一个空的 XArray 1 2 3 4 5 6 7 8 9 10 11 12 static inline void xa_init (struct xarray *xa) { xa_init_flags(xa, 0 ); }
xa_marked 检查 XArray 中是否有任何条目设置了特定标记 1 2 3 4 5 6 7 8 9 10 11 12 static inline bool xa_marked (const struct xarray *xa, xa_mark_t mark) { return xa->xa_flags & XA_FLAGS_MARK(mark); }
xas_retry 用于判断是否需要重试 XArray 操作,并在必要时重置操作状态(xa_state) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 static inline bool xas_retry (struct xa_state *xas, const void *entry) { if (xa_is_zero(entry)) return true ; if (!xa_is_retry(entry)) return false ; xas_reset(xas); return true ; }
xas_reload 重新获取 XArray 中指定位置的条目
主要用途是验证之前加载的条目是否仍然保持相同的值,特别是在无锁的页面缓存查找场景中。通过重新加载条目,函数确保操作的正确性和一致性
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 static inline void *xas_reload (struct xa_state *xas) { struct xa_node *node = xas->xa_node; void *entry; char offset; if (!node) return xa_head(xas->xa); if (IS_ENABLED(CONFIG_XARRAY_MULTI)) { offset = (xas->xa_index >> node->shift) & XA_CHUNK_MASK; entry = xa_entry(xas->xa, node, offset); if (!xa_is_sibling(entry)) return entry; offset = xa_to_sibling(entry); } else { offset = xas->xa_offset; } return xa_entry(xas->xa, node, offset); }
XArray 遍历宏:xa_for_each 系列 本代码片段定义了一组 C 语言的预处理器宏,它们为 Linux 内核提供了一套便捷、安全的接口,用于遍历 XArray 数据结构中存储的条目 。这些宏将复杂的、基于底层查找函数的循环逻辑封装成一个标准的 for 循环语法,极大地提高了代码的可读性和易用性。它们支持多种遍历模式:全量遍历、从指定位置开始遍历、范围遍历以及按标记过滤遍历。
实现原理分析 此代码是 C 语言宏编程技巧的优秀范例,通过层层封装,构建出一套功能丰富的 API。
核心宏 (xa_for_each_range) :
xa_for_each_range 是所有 for-each 宏的基础。它被定义为一个 for 循环结构。
初始化部分 : index = start, entry = xa_find(xa, &index, last, XA_PRESENT) a. 将循环变量 index 初始化为指定的 start 值。 b. 调用 xa_find 函数。这个函数会从 xa 数组的 start 索引开始,查找第一个“存在”(XA_PRESENT)的条目。如果找到,它会返回该条目的指针并更新 index 为该条目的实际索引。xa_find 内部处理了遍历 XArray 树的复杂逻辑。
条件部分 : entry 循环会一直持续,直到 xa_find 或 xa_find_after 返回 NULL,表示没有更多符合条件的条目了。
增量部分 : entry = xa_find_after(xa, &index, last, XA_PRESENT) 调用 xa_find_after 函数,它从 index + 1 的位置开始,查找下一个存在的条目。
封装与派生 :
xa_for_each_start: 简单地调用 xa_for_each_range,并将范围的结束位置 last 硬编码为 ULONG_MAX,表示“直到数组末尾”。
xa_for_each: 进一步封装 xa_for_each_start,并将起始位置 start 硬编码为 0,实现了从头到尾的完整遍历。
xa_for_each_marked: 这个宏与 xa_for_each_range 结构类似,但它在调用 xa_find 和 xa_find_after 时,传入的是一个由调用者指定的 filter(标记),而不是 XA_PRESENT。这使得它可以只遍历那些被特定标记(如 XA_MARK_0)的条目,这是 XArray 的一个高级功能,常用于实现不同状态条目的分组管理。
安全性和并发 :
RCU 安全 : 注释明确指出,底层的 xa_find 和 xa_find_after 函数会自己处理 RCU 锁。这意味着遍历过程是 RCU 安全的,它可以在不阻塞写操作的情况下,安全地读取 XArray 的快照。
修改安全 : 注释还提到,在遍历过程中修改数组是安全的。这是因为 xa_find_after 是基于 index 来重新开始查找的,而不是依赖于一个固定的内部迭代器状态。如果在循环体内删除了当前条目 entry,下一次迭代时,xa_find_after 会从 index + 1 继续查找,不会受到影响。
代码分析 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 #define xa_for_each_range(xa, index, entry, start, last) \ \ for (index = start, \ entry = xa_find(xa, &index, last, XA_PRESENT); \ \ entry; \ \ entry = xa_find_after(xa, &index, last, XA_PRESENT)) #define xa_for_each_start(xa, index, entry, start) \ xa_for_each_range(xa, index, entry, start, ULONG_MAX) #define xa_for_each(xa, index, entry) \ xa_for_each_start(xa, index, entry, 0) #define xa_for_each_marked(xa, index, entry, filter) \ \ for (index = 0, entry = xa_find(xa, &index, ULONG_MAX, filter); \ entry; entry = xa_find_after(xa, &index, ULONG_MAX, filter))
lib/xarray.c xa_entry 获取xa_entry条目 1 2 3 4 5 6 7 8 static inline void *xa_entry (const struct xarray *xa, const struct xa_node *node, unsigned int offset) { XA_NODE_BUG_ON(node, offset >= XA_CHUNK_SIZE); return rcu_dereference_check(node->slots[offset], lockdep_is_held(&xa->xa_lock)); }
xas_descend 在 XArray 数据结构中从当前节点向下遍历到子节点或获取目标条目 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 static __always_inline void *xas_descend (struct xa_state *xas, struct xa_node *node) { unsigned int offset = get_offset(xas->xa_index, node); void *entry = xa_entry(xas->xa, node, offset); xas->xa_node = node; while (xa_is_sibling(entry)) { offset = xa_to_sibling(entry); entry = xa_entry(xas->xa, node, offset); if (node->shift && xa_is_node(entry)) entry = XA_RETRY_ENTRY; } xas->xa_offset = offset; return entry; }
xas_reload 从xarray中重新获取一个条目 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 static inline void *xas_reload (struct xa_state *xas) { struct xa_node *node = xas->xa_node; void *entry; char offset; if (!node) return xa_head(xas->xa); if (IS_ENABLED(CONFIG_XARRAY_MULTI)) { offset = (xas->xa_index >> node->shift) & XA_CHUNK_MASK; entry = xa_entry(xas->xa, node, offset); if (!xa_is_sibling(entry)) return entry; offset = xa_to_sibling(entry); } else { offset = xas->xa_offset; } return xa_entry(xas->xa, node, offset); }
xas_start 初始化或重新加载 xa_state(XArray 操作状态)以开始遍历 XArray 数据结构 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 static void *xas_start (struct xa_state *xas) { void *entry; if (xas_valid(xas)) return xas_reload(xas); if (xas_error(xas)) return NULL ; entry = xa_head(xas->xa); if (!xa_is_node(entry)) { if (xas->xa_index) return set_bounds(xas); } else { if ((xas->xa_index >> xa_to_node(entry)->shift) > XA_CHUNK_MASK) return set_bounds(xas); } xas->xa_node = NULL ; return entry; }
xa_load 加载XArray 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 void *xas_load (struct xa_state *xas) { void *entry = xas_start(xas); while (xa_is_node(entry)) { struct xa_node *node = xa_to_node(entry); if (xas->xa_shift > node->shift) break ; entry = xas_descend(xas, node); if (node->shift == 0 ) break ; } return entry; } EXPORT_SYMBOL_GPL(xas_load);
xas_store 将此条目存储在 XArray 中 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 void *xas_store (struct xa_state *xas, void *entry) { struct xa_node *node ; void __rcu **slot = &xas->xa->xa_head; unsigned int offset, max; int count = 0 ; int values = 0 ; void *first, *next; bool value = xa_is_value(entry); if (entry) { bool allow_root = !xa_is_node(entry) && !xa_is_zero(entry); first = xas_create(xas, allow_root); } else { first = xas_load(xas); }-+ if (xas_invalid(xas)) return first; node = xas->xa_node; if (node && (xas->xa_shift < node->shift)) xas->xa_sibs = 0 ; if ((first == entry) && !xas->xa_sibs) return first; next = first; offset = xas->xa_offset; max = xas->xa_offset + xas->xa_sibs; if (node) { slot = &node->slots[offset]; if (xas->xa_sibs) xas_squash_marks(xas); } if (!entry) xas_init_marks(xas); for (;;) { rcu_assign_pointer(*slot, entry); if (xa_is_node(next) && (!node || node->shift)) xas_free_nodes(xas, xa_to_node(next)); if (!node) break ; count += !next - !entry; values += !xa_is_value(first) - !value; if (entry) { if (offset == max) break ; if (!xa_is_sibling(entry)) entry = xa_mk_sibling(xas->xa_offset); } else { if (offset == XA_CHUNK_MASK) break ; } next = xa_entry_locked(xas->xa, node, ++offset); if (!xa_is_sibling(next)) { if (!entry && (offset > max)) break ; first = next; } slot++; } update_node(xas, node, count, values); return first; } EXPORT_SYMBOL_GPL(xas_store);
xas_find_marked 在 XArray 数据结构中查找具有特定标记(mark)的下一个条目 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 void *xas_find_marked (struct xa_state *xas, unsigned long max, xa_mark_t mark) { bool advance = true ; unsigned int offset; void *entry; if (xas_error(xas)) return NULL ; if (xas->xa_index > max) goto max; if (!xas->xa_node) { xas->xa_index = 1 ; goto out; } else if (xas_top(xas->xa_node)) { advance = false ; entry = xa_head(xas->xa); xas->xa_node = NULL ; if (xas->xa_index > max_index(entry)) goto out; if (!xa_is_node(entry)) { if (xa_marked(xas->xa, mark)) return entry; xas->xa_index = 1 ; goto out; } xas->xa_node = xa_to_node(entry); xas->xa_offset = xas->xa_index >> xas->xa_node->shift; } while (xas->xa_index <= max) { if (unlikely(xas->xa_offset == XA_CHUNK_SIZE)) { xas->xa_offset = xas->xa_node->offset + 1 ; xas->xa_node = xa_parent(xas->xa, xas->xa_node); if (!xas->xa_node) break ; advance = false ; continue ; } if (!advance) { entry = xa_entry(xas->xa, xas->xa_node, xas->xa_offset); if (xa_is_sibling(entry)) { xas->xa_offset = xa_to_sibling(entry); xas_move_index(xas, xas->xa_offset); } } offset = xas_find_chunk(xas, advance, mark); if (offset > xas->xa_offset) { advance = false ; xas_move_index(xas, offset); if ((xas->xa_index - 1 ) >= max) goto max; xas->xa_offset = offset; if (offset == XA_CHUNK_SIZE) continue ; } entry = xa_entry(xas->xa, xas->xa_node, xas->xa_offset); if (!entry && !(xa_track_free(xas->xa) && mark == XA_FREE_MARK)) continue ; if (xa_is_sibling(entry)) continue ; if (!xa_is_node(entry)) return entry; xas->xa_node = xa_to_node(entry); xas_set_offset(xas); } out: if (xas->xa_index > max) goto max; return set_bounds(xas); max: xas->xa_node = XAS_RESTART; return NULL ; } EXPORT_SYMBOL_GPL(xas_find_marked);
xas_find_marked 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 void *xas_find_marked (struct xa_state *xas, unsigned long max, xa_mark_t mark) { bool advance = true ; unsigned int offset; void *entry; if (xas_error(xas)) return NULL ; if (xas->xa_index > max) goto max; if (!xas->xa_node) { xas->xa_index = 1 ; goto out; } else if (xas_top(xas->xa_node)) { advance = false ; entry = xa_head(xas->xa); xas->xa_node = NULL ; if (xas->xa_index > max_index(entry)) goto out; if (!xa_is_node(entry)) { if (xa_marked(xas->xa, mark)) return entry; xas->xa_index = 1 ; goto out; } xas->xa_node = xa_to_node(entry); xas->xa_offset = xas->xa_index >> xas->xa_node->shift; } while (xas->xa_index <= max) { if (unlikely(xas->xa_offset == XA_CHUNK_SIZE)) { xas->xa_offset = xas->xa_node->offset + 1 ; xas->xa_node = xa_parent(xas->xa, xas->xa_node); if (!xas->xa_node) break ; advance = false ; continue ; } if (!advance) { entry = xa_entry(xas->xa, xas->xa_node, xas->xa_offset); if (xa_is_sibling(entry)) { xas->xa_offset = xa_to_sibling(entry); xas_move_index(xas, xas->xa_offset); } } offset = xas_find_chunk(xas, advance, mark); if (offset > xas->xa_offset) { advance = false ; xas_move_index(xas, offset); if ((xas->xa_index - 1 ) >= max) goto max; xas->xa_offset = offset; if (offset == XA_CHUNK_SIZE) continue ; } entry = xa_entry(xas->xa, xas->xa_node, xas->xa_offset); if (!entry && !(xa_track_free(xas->xa) && mark == XA_FREE_MARK)) continue ; if (xa_is_sibling(entry)) continue ; if (!xa_is_node(entry)) return entry; xas->xa_node = xa_to_node(entry); xas_set_offset(xas); } out: if (xas->xa_index > max) goto max; return set_bounds(xas); max: xas->xa_node = XAS_RESTART; return NULL ; } EXPORT_SYMBOL_GPL(xas_find_marked);
__xa_alloc 在 XArray 中分配一个新的条目 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 int __xa_alloc(struct xarray *xa, u32 *id, void *entry, struct xa_limit limit, gfp_t gfp) { XA_STATE(xas, xa, 0 ); if (WARN_ON_ONCE(xa_is_advanced(entry))) return -EINVAL; if (WARN_ON_ONCE(!xa_track_free(xa))) return -EINVAL; if (!entry) entry = XA_ZERO_ENTRY; do { xas.xa_index = limit.min; xas_find_marked(&xas, limit.max, XA_FREE_MARK); if (xas.xa_node == XAS_RESTART) xas_set_err(&xas, -EBUSY); else *id = xas.xa_index; xas_store(&xas, entry); xas_clear_mark(&xas, XA_FREE_MARK); } while (__xas_nomem(&xas, gfp)); return xas_error(&xas); } EXPORT_SYMBOL(__xa_alloc);
XArray 节点删除: xa_delete_node 与 xa_parent_locked 本代码片段定义了一个 XArray (一种高级的基数树实现) 的内部接口 xa_delete_node,其核心功能是从 XArray 树中完整地删除一个指定的 xa_node 节点 。这是一个专门为工作集(workingset)代码提供的底层接口,它通过精心构造一个 xa_state (XArray 状态机) 并调用通用的 xas_store 函数,以一种间接但高效的方式触发节点的级联删除和内存回收。
实现原理分析 此函数的实现并非直接操作链表或指针,而是巧妙地利用了 XArray 子系统自身的状态机驱动接口,将一个“删除节点”的特定请求,转化为一个“在父节点中存储一个 NULL 指针”的通用操作。
状态机初始化 (xa_state) : xa_delete_node 的主体是初始化一个 xa_state 结构体,这个结构体在 XArray 中扮演着“游标”或“上下文”的角色。
.xa: 指向节点所属的 XArray 实例。
.xa_node: 使用 xa_parent_locked 辅助函数安全地获取当前节点的父节点。
.xa_offset: 设置为 node->offset,这代表当前节点在其父节点的槽位数组中的索引。
.xa_shift 和 .xa_index: 这两个成员被设置为描述父节点 的层级和它所覆盖的索引范围。这精确地将 xa_state 的上下文定位到了父节点中那个指向当前 node 的指针槽位上。
.xa_update: 传入一个回调函数 update (即 workingset_update_node)。这个回调函数是实现级联更新的关键。
触发删除 (xas_store(&xas, NULL)) : 这是整个操作的核心。xas_store 是 XArray 的通用写入函数。通过向之前定位好的槽位写入 NULL,它实现了以下效果:
删除引用 : 父节点中指向 node 的指针被清空。
级联回收 : xas_store 的内部逻辑会检查,如果这次 NULL 写入导致父节点变空,它会自动地、递归地尝试删除这个父节点。这个过程会一直向上回溯,直到找到一个非空的祖先节点或到达树的根部。所有被清空的节点都会被释放回内存。
回调通知 : 在这个向上回溯的过程中,每当 xas_store 修改一个祖先节点时,它都会调用 xa_state 中注册的 .xa_update 回调函数。这使得工作集代码能够得到通知,并相应地更新这些祖先节点在 shadow_nodes LRU 链表中的状态。
安全的父指针解引用 (xa_parent_locked) :
这是一个内联辅助函数,用于安全地获取节点的父指针。XArray 节点的 parent 字段是受 RCU (Read-Copy-Update) 保护的。
它使用 rcu_dereference_protected,这是一个 RCU API,它允许在持有特定锁(此处为 xa->xa_lock)的情况下解引用 RCU 指针。这向内核断言,xa_lock足以保护这次访问,避免了更昂贵的 RCU 状态检查,是一种性能优化。
代码分析 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 static inline struct xa_node *xa_parent_locked (const struct xarray *xa, const struct xa_node *node) { return rcu_dereference_protected(node->parent, lockdep_is_held(&xa->xa_lock)); } void xa_delete_node (struct xa_node *node, xa_update_node_t update) { struct xa_state xas = { .xa = node->array , .xa_index = (unsigned long )node->offset << (node->shift + XA_CHUNK_SHIFT), .xa_shift = node->shift + XA_CHUNK_SHIFT, .xa_offset = node->offset, .xa_node = xa_parent_locked(node->array , node), .xa_update = update, }; xas_store(&xas, NULL ); } EXPORT_SYMBOL_GPL(xa_delete_node);
XArray 核心查找原语:xa_find 与 xa_find_after 本代码片段展示了 XArray 数据结构中两个最核心的底层查找函数。它们是上一节分析的 xa_for_each 系列遍历宏的实际执行引擎 。其核心功能是:在 RCU (Read-Copy-Update) 读锁的保护下,使用 XArray 的状态机接口(xas),在数组中查找符合特定过滤条件(存在或被标记)的条目。xa_find 从指定位置开始查找(包含该位置),而 xa_find_after 则严格从指定位置之后开始查找,两者都内置了对 XArray 内部“重试”条目的透明处理逻辑。
实现原理分析 这两个函数是将 XArray 强大但复杂的状态机(xas)接口,封装成一个更简单、无状态的函数调用的关键。其实现原理精妙地结合了 RCU、状态机和重试循环。
XArray 状态机 (XA_STATE) :
两个函数都始于 XA_STATE(xas, xa, ...)。这个宏在栈上创建并初始化一个 xa_state 结构。xa_state (简称 xas) 扮演着一个“游标”或“上下文”的角色,它记录了当前在 XArray 树中的位置(节点、槽位偏移量等)。这是执行任何底层 XArray 操作的基础。
RCU 读侧保护 :
所有的查找逻辑都被 rcu_read_lock() 和 rcu_read_unlock() 紧密包裹。这提供了至关重要的并发安全保证: a. 无锁读取 : 读取方(调用 xa_find 的任务)不需要获取任何会与写入方冲突的锁,极大地提高了并发性能。 b. 防止悬挂指针 : 即使在遍历 XArray 树的过程中,有另一个 CPU 上的任务正在修改数组(例如,删除节点),RCU 机制也能保证在 rcu_read_lock 临界区内,所有被访问到的节点指针都是有效的,不会被提前释放。
透明的重试处理 (xas_retry) :
XArray 内部有一种特殊的条目类型 XA_RETRY_ENTRY。当查找过程遇到它时,表示树的结构可能正在发生变化,当前路径已失效,需要从一个更新后的位置重新开始查找。
xa_find 使用 do-while (xas_retry(...)) 循环,而 xa_find_after 在其 for (;;) 循环中也包含了对 xas_retry 的检查。这个 xas_retry 函数会检查返回的 entry 是否是重试条目。如果是,它会自动更新 xas 状态机 以指向新的、正确的搜索起点,并返回 true,从而触发循环,重新执行查找。这个机制使得复杂的并发冲突处理对上层调用者完全透明。
查找逻辑 (xa_find vs xa_find_after) :
xa_find : XA_STATE(xas, xa, *indexp),从 *indexp 本身 开始搜索。
xa_find_after : XA_STATE(xas, xa, *indexp + 1),从 *indexp 的下一个 位置开始搜索。它还包含一个 if (xas.xa_index == 0) 的溢出检查,防止当 *indexp 为 ULONG_MAX 时导致的无限循环。
高级优化 (xas_sibling) :
xa_find_after 中的 xas_sibling 检查是一个内部优化。XArray 的节点被划分为多个槽位(slots)。当从一个索引开始向后查找时,如果下一个条目仍在同一个节点(并且是同一个多槽位组)内,xas_find 可能会很快找到它。但有时为了确保能正确跳到下一个“逻辑”条目(可能在不同的节点),需要强制状态机前进一步。xas_sibling 就是用于判断是否需要进行这种强制推进的内部检查。
代码分析 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 void *xa_find (struct xarray *xa, unsigned long *indexp, unsigned long max, xa_mark_t filter) { XA_STATE(xas, xa, *indexp); void *entry; rcu_read_lock(); do { if ((__force unsigned int )filter < XA_MAX_MARKS) entry = xas_find_marked(&xas, max, filter); else entry = xas_find(&xas, max); } while (xas_retry(&xas, entry)); rcu_read_unlock(); if (entry) *indexp = xas.xa_index; return entry; } EXPORT_SYMBOL(xa_find); static bool xas_sibling (struct xa_state *xas) { struct xa_node *node = xas->xa_node; unsigned long mask; if (!IS_ENABLED(CONFIG_XARRAY_MULTI) || !node) return false ; mask = (XA_CHUNK_SIZE << node->shift) - 1 ; return (xas->xa_index & mask) > ((unsigned long )xas->xa_offset << node->shift); } void *xa_find_after (struct xarray *xa, unsigned long *indexp, unsigned long max, xa_mark_t filter) { XA_STATE(xas, xa, *indexp + 1 ); void *entry; if (xas.xa_index == 0 ) return NULL ; rcu_read_lock(); for (;;) { if ((__force unsigned int )filter < XA_MAX_MARKS) entry = xas_find_marked(&xas, max, filter); else entry = xas_find(&xas, max); if (xas_invalid(&xas)) break ; if (xas_sibling(&xas)) continue ; if (!xas_retry(&xas, entry)) break ; } rcu_read_unlock(); if (entry) *indexp = xas.xa_index; return entry; } EXPORT_SYMBOL(xa_find_after);