Volatile可见性原理
volatile关键字的作用是保证变量在多线程之间的可见性,volatile的可见性实现基于lock前缀指令和MESI缓存一致性协议。
lock前缀指令
volatile修饰的变量,执行写操作前会使用lock前缀指令。
1
volatile Singleton instance = new Singleton();
汇编代码:
1
2
3
0x01a3de1d: movb $0x0,0x1104800(%esi);
0x01a3de24: **lock** addl $0x0,(%esp);
有 volatile 变量修饰的共享变量进行写操作的时候会多第二行汇编代码,lock 前缀的指令在多核处理器下会引发了两件事情。
- 将当前处理器缓存行的数据会写回到系统内存;
- 这个写回内存的操作会引起在其他 CPU 里缓存了该内存地址的数据无效;
Lock 前缀指令会引起处理器缓存回写到内存。Lock 前缀指令导致在执行指令期间,声言处理器的 LOCK# 信号。在多处理器环境中,LOCK# 信号确保在声言该信号期间,处理器可以独占使用任何共享内存。(因为它会锁住总线,导致其他 CPU 不能访问总线,不能访问总线就意味着不能访问系统内存),但是在最近的处理器里,LOCK#信号一般不锁总线,而是锁缓存,毕竟锁总线开销比较大。对于 Intel486 和 Pentium 处理器,在锁操作时,总是在总线上声言 LOCK#信号。但在 P6 和最近的处理器中,如果访问的内存区域已经缓存在处理器内部,则不会声言 LOCK#信号。相反地,它会锁定这块内存区域的缓存并回写到内存,并使用缓存一致性机制来确保修改的原子性,此操作被称为“缓存锁定”,缓存一致性机制会阻止同时修改被两个以上处理器缓存的内存区域数据。
一个处理器的缓存回写到内存会导致其他处理器的缓存无效。IA-32 处理器和 Intel 64 处理器使用 MESI(修改,独占,共享,无效)控制协议去维护内部缓存和其他处理器缓存的一致性。在多核处理器系统中进行操作的时候,IA-32 和 Intel 64 处理器能嗅探其他处理器访问系统内存和它们的内部缓存。它们使用嗅探技术保证它的内部缓存,系统内存和其他处理器的缓存的数据在总线上保持一致。例如在 Pentium 和 P6 family 处理器中,如果通过嗅探一个处理器来检测其他处理器打算写内存地址,而这个地址当前处理共享状态,那么正在嗅探的处理器将无效它的缓存行,在下次访问相同内存地址时,强制执行缓存行填充。
LOCK 前缀指令的改进
Lock 前缀指令会引起处理器缓存回写到内存。 Lock 前缀指令导致在执行指令期间,声言处理器的 LOCK# 信号,在该信号期间,会独占使用任何共享内存
- 第一阶段:
- LOCK指令会锁住总线,导致其他的处理器不能访问总线,也就不能访问系统内存。将多线程的并发变成了串行执行。
- 优化后
- LOCK指令不再锁总线,而是锁缓存行。并将数据会写到该缓存,使用缓存一致性来保证原子性。
由lock指令回看volatile变量读写
工作内存Work Memory其实就是对CPU寄存器和高速缓存的抽象,或者说每个线程的工作内存也可以简单理解为CPU寄存器和高速缓存。那么当写两条线程Thread-A与Threab-B同时操作主存中的一个volatile变量i时,Thread-A写了变量i,那么:
- Thread-A发出LOCK#指令
- 发出的LOCK#指令锁总线(或锁缓存行),同时让Thread-B高速缓存中的缓存行内容失效
- Thread-A向主存回写最新修改的i
Thread-B读取变量i,那么:
- Thread-B发现对应地址的缓存行被锁了,等待锁的释放,缓存一致性协议会保证它读取到最新的值;
缓存一致性协议
为了解决这个缓存不致的问题,有两种机制,来同步两个不同核心里面的缓存数据。
-
写传播(Write Propagation)。写传播是说,在一个 CPU 核心里,Cache 数据更新,必须能够传播到其他的对应节点的 Cache Line 里。
-
事务的串行化(Transaction Serialization),事务串行化是说,在一个 CPU核心里面的读取和写入,在其他的节点看起来,顺序是一样的。
总线嗅探机制
要解决缓存一致性问题,首先要解决的是多个 CPU 核心之间的数据传播问题。最常见的一种解决方案呢,叫作总线嗅探(Bus Snooping)。这个策略,本质上就是把所有的读写请求都通过总线(Bus)广播给所有的 CPU 核心,然后让各个核心去“嗅探”这些请求,再根据本地的情况进行响应。总线嗅探这个办法也是 Intel CPU 进行缓存一致性处理的解决方案。
它的基本思想是: 所有内存的传输都发生在一条共享的总线上,而所有的处理器都能看到这条总线:缓存本身是独立的,但是内存是共享资源,所有的内存访问都要经过仲裁(同一个指令周期中,只有一个CPU缓存可以读写内存)。 CPU缓存不仅仅在做内存传输的时候才与总线打交道,而是不停在嗅探总线上发生的数据交换,跟踪其他缓存在做什么。所以当一个缓存代表它所属的处理器去读写内存时,其它处理器都会得到通知,它们以此来使自己的缓存保持同步。只要某个处理器一写内存,其它处理器马上知道这块内存在它们的缓存段中已失效。
MESI协议
MESI 协议,是一种叫作写失效(Write Invalidate)的协议。在写失效协议里,只有一个CPU 核心负责写入数据,其他的核心,只是同步读取到这个写入。在这个 CPU 核心写入Cache 之后,它会去广播一个“失效”请求告诉所有其他的 CPU 核心。其他的 CPU 核心,只是去判断自己是否也有一个“失效”版本的 Cache Block,然后把这个也标记成失效的就好了。
MESI 协议的由来呢,来自于对 Cache Line 的四个不同的标记,分别是:
-
M:代表已修改(Modified):Cache Block 里面的内容已经更新过了,但是还没有写回到主内存里面;
-
E:代表独占(Exclusive):在独占状态下,对应的 Cache Line 只加载到了当前 CPU 核所拥有的 Cache 里。其他的 CPU 核,并没有加载对应的数据到自己的 Cache 里。这个时候,如果要向独占的 Cache Block 写入数据,我们可以自由地写入数据,而不需要告知其他 CPU 核;
-
S:代表共享(Shared):共享状态下,因为同样的数据在多个 CPU 核心的 Cache 里都有。所以,想要更新 Cache 里面的数据的时候,不能直接修改,而是要先向所有的其他 CPU 核心广播一个请求,要求先把其他 CPU 核心里面的 Cache,都变成无效的状态,然后再更新当前Cache 里面的数据。这个广播操作,一般叫作 RFO(Request For Ownership),也就是获取当前对应 Cache Block 数据的所有权;
-
I:代表已失效(Invalidated): Cache Block 里面的数据已经失效了;
总线风暴
总线风暴,在java中使用unsafe实现cas,而其底层由cpp调用汇编指令实现的,如果是多核cpu是使用lock cmpxchg指令,单核cpu 使用compxch指令。如果在短时间内产生大量的cas操作在加上 volatile的嗅探机制则会不断地占用总线带宽,导致总线流量激增,就会产生总线风暴。 总之,就是因为volatile 和CAS 的操作导致BUS总线缓存一致性流量激增所造成的影响。
volatile修饰数组
volatile的数组只针对数组的引用具有volatile的语义,而不是它的元素。
在ConcurrentHashMap(1.8)中,内部使用一个volatile的数组table保存数据,细心的同学可以发现,Doug Lea每次在获取数组的元素时,采用Unsafe类的getObjectVolatile方法,在设置数组元素时,采用compareAndSwapObject方法,而不是直接通过下标去操作,这是为什么?
今天得到R大的确认:这个是因为Java数组在元素层面的元数据设计上的缺失,无法表达元素是final、volatile等语义,所以开了后门,使用getObjectVolatile用来补上无法表达元素是volatile的坑,@Stable用来补上final的坑,数组元素就跟没有标volatile的成员字段一样,无法保证线程之间可见性。
只有触发happens before关系的操作,才能保证线程之间的可见性,比如使用table[0] = new Object()直接赋值,这个赋值不会触发任何happens before关系的操作,相当于对一个无volatile变量进行赋值一样。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class ConcurrentHashMap<K,V> extends AbstractMap<K,V>
implements ConcurrentMap<K,V>, Serializable { public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 &&
//get没有使用锁同步,而是使用轻量级同步volatile原语sun.misc.Unsafe.getObjectVolatile(Object, long),保证读到的是最新的对象。
(e = tabAt(tab, (n - 1) & h)) != null) {
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
@SuppressWarnings("unchecked")
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}
}
数组元素可见性
Unsafe.getObjectVolatile
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class VolatileArray {
private volatile Object[] nums = new Object[]{null};
private static Unsafe U;
private static long NUMS;
static {
try {
Class<?> clazz = Class.forName("sun.misc.Unsafe");
Field theUnsafe = clazz.getDeclaredField("theUnsafe");
theUnsafe.setAccessible(true);
U = (Unsafe) theUnsafe.get(null);
NUMS = U.objectFieldOffset(VolatileArray.class.getDeclaredField("nums"));
} catch (ClassNotFoundException | NoSuchFieldException | IllegalAccessException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws InterruptedException {
VolatileArray example = new VolatileArray();
new Thread(() -> {
try {
TimeUnit.MILLISECONDS.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
example.nums[0] = new Object();
}).start();
new Thread(() -> {
while (true) {
//使用Unsafe.getObjectVolatile来
Object[] objects = (Object[]) U.getObjectVolatile(example, NUMS);
if (objects[0] != null) {
System.out.println(objects[0]);
System.out.println("index updated");
break;
}
}
}).start();
}
}
AtomicReferenceArray
JDK提供了AtomicReferenceArray
用来原子更新数组里的元素,基于Unsafe.getObjectVolatile
实现。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class AtomicReferenceArrays {
private AtomicReferenceArray<Object> atomicReferenceArray = new AtomicReferenceArray<>(1);
public static void main(String[] args) throws InterruptedException {
AtomicReferenceArrays example = new AtomicReferenceArrays();
new Thread(() -> {
try {
TimeUnit.MILLISECONDS.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
example.atomicReferenceArray.set(0, new Object());
}).start();
new Thread(() -> {
while (true) {
if (example.atomicReferenceArray.get(0) != null) {
System.out.println("index updated");
break;
}
}
}).start();
}
}