[TOC]

include/asm-generic/bitops/generic-non-atomic.h

  1. generic_test_bit 和 const_test_bit 是两个用于测试位图中某个位是否被设置的函数。它们的实现方式略有不同,主要体现在对 volatile 关键字的使用上。generic_test_bit 函数使用了 volatile 关键字,这意味着它会读取内存中的值,而不是使用寄存器中的缓存值。这对于多线程或中断上下文中的位操作非常重要,因为它确保了读取的是最新的值。const_test_bit 函数则不使用 volatile 关键字,这意味着它可以在编译时进行优化,适用于编译时常量的测试。两者的实现都使用了位操作来确定指定的位是否被设置。这里仅进行了读取操作,没有进行写入操作.所以不需要进行原子操作

generic_test_bit

  • 这里具有volatile会在每次读取时都从内存中读取值,而不是进行优化
1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* generic_test_bit - 确定是否设置了位
* @nr:要测试的位号
* @addr:开始计数的地址
*/
static __always_inline bool
generic_test_bit(unsigned long nr, const volatile unsigned long *addr)
{
/*
* 与上面带有 '__' 前缀的 bitops 不同,这个 bits 是 atomic 的,所以 'volatile' 必须始终留在这里,没有丢弃。
* 有关详细信息,请参阅 'Documentation/atomic_bitops.txt' 。
*/
return 1UL & (addr[BIT_WORD(nr)] >> (nr & (BITS_PER_LONG-1)));
}

const_test_bit

  • 编译器可能会优化掉对 addr 的多次访问,例如将其缓存到寄存器中
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* const_test_bit - 确定是否设置了位
* @nr:要测试的位号
* @addr:开始计数的地址
*
* generic_test_bit() 的一个版本,它将 'volatile' 限定符丢弃为
* 允许编译器更努力地优化代码。非原子且仅被调用
* 用于测试编译时常量,例如通过相应的宏,而不是
* 直接从 “常规” 代码。
*/
static __always_inline bool
const_test_bit(unsigned long nr, const volatile unsigned long *addr)
{
const unsigned long *p = (const unsigned long *)addr + BIT_WORD(nr);
unsigned long mask = BIT_MASK(nr);
unsigned long val = *p;

return !!(val & mask);
}

include/linux/atomic/atomic-long.h

raw_atomic_long_read

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* raw_atomic_long_read() - atomic load with relaxed ordering
* @v: pointer to atomic_long_t
*
* Atomically loads the value of @v with relaxed ordering.
*
* Safe to use in noinstr code; prefer atomic_long_read() elsewhere.
*
* Return: The value loaded from @v.
*/
static __always_inline long
raw_atomic_long_read(const atomic_long_t *v)
{
#ifdef CONFIG_64BIT
return raw_atomic64_read(v);
#else
return raw_atomic_read(v);
#endif
}

include/linux/atomic/atomic-instrumented.h

xchg

  • xchg 的主要功能是以原子方式交换两个变量的值。原子操作意味着该操作在执行过程中不会被中断,因此在多线程环境中是线程安全的。
1
2
3
4
5
6
7
8
9
10
#define xchg(ptr, ...) \
({ \
typeof(ptr) __ai_ptr = (ptr); \
//扫描内存屏障
kcsan_mb(); \
//检查是否是原子操作读写
instrument_atomic_read_write(__ai_ptr, sizeof(*__ai_ptr)); \
//原始的操作
raw_xchg(__ai_ptr, __VA_ARGS__); \
})

atomic_long_try_cmpxchg

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
* atomic_long_try_cmpxchg() - 具有完全排序的原子比较和交换
* @v:指向 atomic_long_t 的指针
* @old:指向要比较的 long 值的指针
* @new:要分配的长值
*
* If (@v == @old),则以完全排序原子方式将 @v 更新为 @new。
* 否则,@v 不会被修改,@old 将更新为 @v 的当前值,
* 并提供宽松的排序。
*
* 在 noinstr 代码中使用不安全;在此处使用 raw_atomic_long_try_cmpxchg()。
*
* 退货:@true是否发生交换,否则@false。
*/
static __always_inline bool
atomic_long_try_cmpxchg(atomic_long_t *v, long *old, long new)
{
kcsan_mb();
instrument_atomic_read_write(v, sizeof(*v));
instrument_atomic_read_write(old, sizeof(*old));
return raw_atomic_long_try_cmpxchg(v, old, new);
}

atomic_long_read

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* atomic_long_read() - atomic load with relaxed ordering
* @v: pointer to atomic_long_t
*
* Atomically loads the value of @v with relaxed ordering.
*
* Unsafe to use in noinstr code; use raw_atomic_long_read() there.
*
* Return: The value loaded from @v.
*/
static __always_inline long
atomic_long_read(const atomic_long_t *v)
{
instrument_atomic_read(v, sizeof(*v));//检查
return raw_atomic_long_read(v);
}

atomic_long_read_acquire

raw_atomic_long_read_acquire 是一个与原子操作相关的函数或宏,通常用于在多线程或多核环境中安全地读取一个 atomic_long_t 类型的变量,同时确保读取操作具有“获取语义”(acquire semantics)。以下是对其的详细解释:

原子操作的背景

在多线程编程中,原子操作是指不可分割的操作,能够在不使用锁的情况下保证线程安全。atomic_long_t 是 Linux 内核中常用的原子类型,用于表示一个可以被多个线程安全访问的长整型变量。

获取语义(Acquire Semantics)

“获取语义”是一种内存屏障(memory barrier)的概念,用于确保在读取操作完成后,所有后续的内存操作都可以看到该读取操作的结果。换句话说,raw_atomic_long_read_acquire 不仅读取了变量的值,还确保了读取操作的顺序性,防止编译器或 CPU 对指令进行重排序。

raw_atomic_long_read_acquire 的作用

  1. 读取原子变量

    • 它读取一个 atomic_long_t 类型的变量的值。
    • 读取操作是线程安全的,确保不会因为其他线程的并发修改而导致数据不一致。
  2. 获取语义

    • 通过获取语义,确保读取操作的结果对后续的内存操作可见。
    • 这在需要严格的内存访问顺序时非常重要,例如在实现锁、信号量或其他同步原语时。

使用场景

  1. 同步原语

    • 在实现锁或条件变量时,确保在读取某个状态变量后,后续操作能够正确地感知到该状态的变化。
  2. 多线程数据共享

    • 在多线程环境中,确保一个线程读取的共享变量值是最新的,并且后续操作能够基于该值进行正确的逻辑处理。
  3. 性能优化

    • 与传统的锁机制相比,原子操作通常具有更高的性能,因为它们避免了上下文切换和锁竞争。

注意事项

  1. 底层实现

    • raw_atomic_long_read_acquire 的具体实现可能依赖于硬件架构和编译器支持。它通常通过内联汇编或内存屏障指令实现。
  2. 与普通读取的区别

    • 普通的读取操作可能会被编译器或 CPU 重排序,而 raw_atomic_long_read_acquire 通过获取语义防止了这种重排序。
  3. 仅限读取

    • 该操作仅用于读取变量的值。如果需要修改变量的值,应该使用其他原子操作(如 atomic_long_setatomic_long_add)。

总结

raw_atomic_long_read_acquire 是一个用于读取 atomic_long_t 类型变量的线程安全操作,同时提供获取语义以确保内存访问的顺序性。它在多线程编程中非常重要,尤其是在需要严格同步的场景中。通过这种操作,可以在不使用锁的情况下实现高效的线程间数据共享和同步。

include/linux/atomic/atomic-arch-fallback.h

raw_xchg

  • 根据使用选择对应的实现
1
2
3
4
5
6
7
8
9
#if defined(arch_xchg)
#define raw_xchg arch_xchg
#elif defined(arch_xchg_relaxed)
#define raw_xchg(...) \
__atomic_op_fence(arch_xchg, __VA_ARGS__)
#else
extern void raw_xchg_not_implemented(void);
#define raw_xchg(...) raw_xchg_not_implemented()
#endif

raw_atomic_try_cmpxchg 尝试比较和交换 true是发生交换,否则@false

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/**
* raw_atomic_try_cmpxchg() - 具有完整排序的原子比较和交换
* @v:指向 atomic_t 的指针
* @old:指向要与之比较的 int 值的指针
* @new:要分配的 int 值
*
* If (@v == @old),则以完全排序原子方式将 @v 更新为 @new。
* 否则,@v 不会被修改,@old 将更新为 @v 的当前值,
* 并提供宽松的排序。
*
* 在 noinstr 代码中安全使用;在其他地方首选 atomic_try_cmpxchg()。
*
* 退货:@true是发生交换,否则@false。
*/
static __always_inline bool
raw_atomic_try_cmpxchg(atomic_t *v, int *old, int new)
{
#if defined(arch_atomic_try_cmpxchg)
return arch_atomic_try_cmpxchg(v, old, new);
#elif defined(arch_atomic_try_cmpxchg_relaxed)
bool ret;
__atomic_pre_full_fence();
ret = arch_atomic_try_cmpxchg_relaxed(v, old, new);
__atomic_post_full_fence();
return ret;
#else
int r, o = *old;
r = raw_atomic_cmpxchg(v, o, new);
if (unlikely(r != o))
*old = r;
return likely(r == o);
#endif
}

include/linux/atomic.h

__atomic_op_fence 原子操作围栏

  1. 原子操作前后设置内存屏障等操作保证顺序
  2. 执行对应的op##_relaxe操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#ifndef __atomic_pre_full_fence
#define __atomic_pre_full_fence smp_mb__before_atomic
#endif

#ifndef __atomic_post_full_fence
#define __atomic_post_full_fence smp_mb__after_atomic
#endif

#define __atomic_op_fence(op, args...) \
({ \
typeof(op##_relaxed(args)) __ret; \
__atomic_pre_full_fence(); \
__ret = op##_relaxed(args); \
__atomic_post_full_fence(); \
__ret; \
})

arch/arm/include/asm/cmpxchg.h

arch_xchg_relaxed __arch_xchg

  1. 交换数据的逻辑都一样
  2. 使用ldrexb加载数据,它还会设置一个标志,表示该地址被标记为“独占访问”,以便后续的存储操作可以检查是否有其他处理器访问了该地址。
  3. 使用strexb存储数据,如果成功,返回0;如果失败,返回1,并且会清除“独占访问”标志。
  4. 如果strexb失败,循环会重新尝试加载和存储数据,直到成功为止。
  5. 该函数的返回值是交换前的值。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
#define arch_xchg_relaxed(ptr, x) ({					\
(__typeof__(*(ptr)))__arch_xchg((unsigned long)(x), (ptr), \
sizeof(*(ptr))); \
})

static inline unsigned long
__arch_xchg(unsigned long x, volatile void *ptr, int size)
{
extern void __bad_xchg(volatile void *, int);
unsigned long ret;
#ifdef swp_is_buggy //仅有(CONFIG_CPU_SA1100) || defined(CONFIG_CPU_SA110)才需要修复
unsigned long flags;
#endif
#if __LINUX_ARM_ARCH__ >= 6 //armV6架构及以上
unsigned int tmp;
#endif

prefetchw((const void *)ptr); //写预取
//需要交换的数据大小
switch (size) {
#if __LINUX_ARM_ARCH__ >= 6
#ifndef CONFIG_CPU_V6 /* MIN ARCH >= V6K */
case 1:
asm volatile("@ __xchg1\n"
"1: ldrexb %0, [%3]\n"
" strexb %1, %2, [%3]\n"
" teq %1, #0\n"
" bne 1b"
: "=&r" (ret), "=&r" (tmp)
: "r" (x), "r" (ptr)
: "memory", "cc");
break;
case 2:
asm volatile("@ __xchg2\n"
"1: ldrexh %0, [%3]\n"
" strexh %1, %2, [%3]\n"
" teq %1, #0\n"
" bne 1b"
: "=&r" (ret), "=&r" (tmp)
: "r" (x), "r" (ptr)
: "memory", "cc");
break;
#endif
case 4:
asm volatile("@ __xchg4\n"
"1: ldrex %0, [%3]\n"
" strex %1, %2, [%3]\n"
" teq %1, #0\n"
" bne 1b"
: "=&r" (ret), "=&r" (tmp)
: "r" (x), "r" (ptr)
: "memory", "cc");
break;
#elif defined(swp_is_buggy)
#ifdef CONFIG_SMP
#error SMP is not supported on this platform
#endif
case 1:
raw_local_irq_save(flags);
ret = *(volatile unsigned char *)ptr;
*(volatile unsigned char *)ptr = x;
raw_local_irq_restore(flags);
break;

case 4:
raw_local_irq_save(flags);
ret = *(volatile unsigned long *)ptr;
*(volatile unsigned long *)ptr = x;
raw_local_irq_restore(flags);
break;
#else
case 1:
asm volatile("@ __xchg1\n"
" swpb %0, %1, [%2]"
: "=&r" (ret)
: "r" (x), "r" (ptr)
: "memory", "cc");
break;
case 4:
asm volatile("@ __xchg4\n"
" swp %0, %1, [%2]"
: "=&r" (ret)
: "r" (x), "r" (ptr)
: "memory", "cc");
break;
#endif
default:
/* Cause a link-time error, the xchg() size is not supported */
__bad_xchg(ptr, size), ret = 0;
break;
}

return ret;
}

arch/arm/include/asm/bitops.h

Native endian atomic definitions

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#ifndef CONFIG_SMP
/*
* The __* form of bitops are non-atomic and may be reordered.
*/
//传入常量使用__atomic_*函数,传入变量使用_*
#define ATOMIC_BITOP(name,nr,p) \
(__builtin_constant_p(nr) ? ____atomic_##name(nr, p) : _##name(nr,p))
#else
#define ATOMIC_BITOP(name,nr,p) _##name(nr,p)
#endif

#define set_bit(nr,p) ATOMIC_BITOP(set_bit,nr,p)
#define clear_bit(nr,p) ATOMIC_BITOP(clear_bit,nr,p)
#define change_bit(nr,p) ATOMIC_BITOP(change_bit,nr,p)
#define test_and_set_bit(nr,p) ATOMIC_BITOP(test_and_set_bit,nr,p)
#define test_and_clear_bit(nr,p) ATOMIC_BITOP(test_and_clear_bit,nr,p)
#define test_and_change_bit(nr,p) ATOMIC_BITOP(test_and_change_bit,nr,p)

为什么常量位号使用禁用中断的方式?

  1. 编译时优化

    • 如果 nr 是常量,编译器可以在编译时确定具体的位号,从而生成更高效的代码。
    • 使用禁用中断的方式可以避免引入额外的汇编代码,同时保持代码的可读性和可移植性。
  2. 简单性

    • 对于常量位号,禁用中断的方式已经足够确保原子性,尤其是在单核环境或不需要跨核同步的场景中。
    • 这种方式避免了引入复杂的汇编代码,同时利用了 C 语言的灵活性。
  3. 适合单核环境

    • 在单核环境中,禁用中断可以确保当前 CPU 不会被中断打断,从而保证操作的原子性。
    • 这种方式在单核系统中性能开销较小,且实现简单。

为什么动态位号使用汇编实现?

  1. 动态位号的复杂性

    • 如果 nr 不是常量,编译器无法在编译时确定具体的位号,因此需要在运行时处理。
    • 汇编代码可以直接操作寄存器或内存,灵活处理动态位号。
  2. 性能优化

    • 汇编实现通常比禁用中断的方式更高效,尤其是在多核环境中。
    • 汇编代码可以利用硬件提供的原子指令(如 ldrexstrex),避免禁用中断带来的性能开销。
  3. 多核环境支持

    • 在多核系统中,禁用中断无法解决跨核的竞争问题,而汇编实现可以利用硬件的原子操作指令来确保跨核的同步。

总结

  • 常量位号:使用禁用中断的方式,因为这种方式简单、高效,适合处理固定位号的操作。
  • 动态位号:使用汇编实现,因为汇编代码更灵活,可以处理运行时确定的位号,同时在多核环境中性能更优。

这种设计在性能和灵活性之间取得了平衡:对于简单的常量位号操作,使用禁用中断的方式;对于复杂的动态位号操作,使用汇编代码以确保高效和正确性。

____atomic_test_and_set_bit 测试某个位是否已设置,如果该位未设置,则将其设置为 1。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
static inline int
____atomic_test_and_set_bit(unsigned int bit, volatile unsigned long *p)
{
unsigned long flags;
unsigned int res;
unsigned long mask = BIT_MASK(bit);

p += BIT_WORD(bit);

raw_local_irq_save(flags);
res = *p;
*p = res | mask;
raw_local_irq_restore(flags);

return (res & mask) != 0;
}

arch/arm/lib/testsetbit.S

_test_and_set_bit

1
2
3
4
5
6
7
//orreq:在条件为“等于”(equal)时执行 OR 操作。
//streq:在条件为“等于”时执行存储操作。
testop _test_and_set_bit, orreq, streq

#if __LINUX_ARM_ARCH__ >= 6
sync_testop _sync_test_and_set_bit, orreq, streq
#endif

find_first_zero_bit find_next_zero_bit

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#ifndef __ARMEB__
/*
* These are the little endian, atomic definitions.
*/
#define find_first_zero_bit(p,sz) _find_first_zero_bit_le(p,sz)
#define find_next_zero_bit(p,sz,off) _find_next_zero_bit_le(p,sz,off)
#define find_first_bit(p,sz) _find_first_bit_le(p,sz)
#define find_next_bit(p,sz,off) _find_next_bit_le(p,sz,off)

#else
/*
* These are the big endian, atomic definitions.
*/
#define find_first_zero_bit(p,sz) _find_first_zero_bit_be(p,sz)
#define find_next_zero_bit(p,sz,off) _find_next_zero_bit_be(p,sz,off)
#define find_first_bit(p,sz) _find_first_bit_be(p,sz)
#define find_next_bit(p,sz,off) _find_next_bit_be(p,sz,off)

#endif
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
#ifdef __ARMEB__
#define SWAB_ENDIAN le
#else
#define SWAB_ENDIAN be
#endif

.macro find_first, endian, set, name
ENTRY(_find_first_\name\()bit_\endian)
UNWIND( .fnstart)
teq r1, #0 //size == 0?
beq 3f //goto 3
mov r2, #0 //r2 = 0
1: ldr r3, [r0], #4 //r3 = *r0++
//根据 set 参数决定是否反转位
.ifeq \set //set 为 1
mvns r3, r3 @ 反转所有位
.else
movs r3, r3 @ 直接测试位值
.endif
//检查字节序
.ifc \endian, SWAB_ENDIAN
bne .L_found_swab
.else
bne .L_found @ found the bit?
.endif
//如果未找到目标位,增加位索引,继续循环
//r2 = r2 + 1
add r2, r2, #32 @ next index
2: cmp r2, r1 @ any more?
blo 1b //if(r2 < r1) goto 1
3: mov r0, r1 @ no more bits
ret lr
UNWIND( .fnend)
ENDPROC(_find_first_\name\()bit_\endian)
.endm

.macro find_next, endian, set, name
ENTRY(_find_next_\name\()bit_\endian)
UNWIND( .fnstart)
cmp r2, r1
bhs 3b
mov ip, r2, lsr #5 @ word index
add r0, r0, ip, lsl #2
ands ip, r2, #31 @ bit position
beq 1b
ldr r3, [r0], #4
.ifeq \set
mvn r3, r3 @ invert bits
.endif
.ifc \endian, SWAB_ENDIAN
rev_l r3, ip
.if .Lrev_l_uses_tmp
@ we need to recompute ip because rev_l will have overwritten
@ it.
and ip, r2, #31 @ bit position
.endif
.endif
movs r3, r3, lsr ip @ shift off unused bits
bne .L_found
orr r2, r2, #31 @ no zero bits
add r2, r2, #1 @ align bit pointer
b 2b @ loop for next bit
UNWIND( .fnend)
ENDPROC(_find_next_\name\()bit_\endian)
.endm

.macro find_bit, endian, set, name
find_first \endian, \set, \name
find_next \endian, \set, \name
.endm

/* _find_first_zero_bit_le and _find_next_zero_bit_le */
find_bit le, 0, zero_

/* _find_first_bit_le and _find_next_bit_le */
find_bit le, 1

#ifdef __ARMEB__

/* _find_first_zero_bit_be and _find_next_zero_bit_be */
find_bit be, 0, zero_

/* _find_first_bit_be and _find_next_bit_be */
find_bit be, 1

#endif

/*
* 假设 r3 的 LSB 中的一个或多个位被设置。
* 高效地查找位图中最低有效位中第一个被设置的位的位置
*/
.L_found_swab:
UNWIND( .fnstart)
//如果需要调整字节序(例如从大端到小端),会使用 rev_l 指令对寄存器 r3 的位进行字节顺序反转
rev_l r3, ip
.L_found:
#if __LINUX_ARM_ARCH__ >= 7
rbit r3, r3 @ 将寄存器 r3 的位顺序反转
clz r3, r3 @ 计算寄存器 r3 中高位连续零的数量,这相当于找到最低有效位的偏移量
add r0, r2, r3 @ 将偏移量加到当前基地址 r2,得到第一个被设置位的全局偏移量
#elif __LINUX_ARM_ARCH__ >= 5
rsb r0, r3, #0
and r3, r3, r0 @ mask out lowest bit set
clz r3, r3 @ count high zero bits
rsb r3, r3, #31 @ offset of first set bit
add r0, r2, r3 @ add offset of first set bit
#else
mov ip, #~0
tst r3, ip, lsr #16 @ test bits 0-15
addeq r2, r2, #16
moveq r3, r3, lsr #16
tst r3, #0x00ff
addeq r2, r2, #8
moveq r3, r3, lsr #8
tst r3, #0x000f
addeq r2, r2, #4
moveq r3, r3, lsr #4
tst r3, #0x0003
addeq r2, r2, #2
moveq r3, r3, lsr #2
tst r3, #0x0001
addeq r2, r2, #1
mov r0, r2
#endif
//比较计算出的偏移量 r0 和最大允许值 r1
cmp r1, r0 @ Clamp to maxbit
movlo r0, r1 //如果 r0 超过 r1,将 r0 设置为 r1
ret lr
UNWIND( .fnend)

arch/arm/include/asm/atomic.h

arch_atomic_read arch_atomic_set

1
2
3
4
/* 在 ARM 上,普通赋值(str 指令)不会清除某些实现上的本地 strex/ldrex 监视器。我们可以将它用于 atomic_set() 的原因是在每个异常返回时完成的 clrex 或 dummy strex。
*/
#define arch_atomic_read(v) READ_ONCE((v)->counter)
#define arch_atomic_set(v,i) WRITE_ONCE(((v)->counter), (i))