CAS详解

starlin 741 2018-07-05

CAS,Compare And Swap,即比较并交换。Doug lea大神在同步组件中大量使用CAS技术鬼斧神工地实现了Java多线程的并发操作。整个AQS同步组件、Atomic原子类操作等等都是以CAS实现的,甚至ConcurrentHashMap在1.8的版本中也调整为了CAS+Synchronized。可以说CAS是整个JUC的基石。

CAS分析

在CAS中有三个参数:内存值V,预期值E,要写入的新值N,当且仅当内存值V等于旧的预期值E时,才会将内存值V修改为N,否则什么都不干,

示意图如下:

我们以AutomicInteger为例来阐述CAS的实现,源码如下:

public class AtomicInteger extends Number implements java.io.Serializable {
    private static final long serialVersionUID = 6214790243416807050L;

    // setup to use Unsafe.compareAndSwapInt for updates
    private static final Unsafe unsafe = Unsafe.getUnsafe();
    private static final long valueOffset;

    static {
        try {
            valueOffset = unsafe.objectFieldOffset
                (AtomicInteger.class.getDeclaredField("value"));
        } catch (Exception ex) { throw new Error(ex); }
    }

    private volatile int value;

从上面的代码可以看出Unsafe类是CAS的核心,Java无法直接访问底层系统,而是通过本地native方法来访问。不过尽管如此,JVM还是提供了访问方式就是UnSafe,它提供了硬件级别的原子操作。

valueOffset为变量值在内存中的偏移地址,unsafe就是通过偏移地址来得到数据的原值的。

value当前值,使用volatile修饰,保证多线程环境下看见的是同一个。

我们就以AtomicInteger的addAndGet()方法来做说明,先看源代码:

/**
     * Atomically adds the given value to the current value.
     *
     * @param delta the value to add
     * @return the updated value
     */
    public final int addAndGet(int delta) {
        return unsafe.getAndAddInt(this, valueOffset, delta) + delta;
    }


    public final int getAndAddInt(Object var1, long var2, int var4) {
        int var5;
        do {
            var5 = this.getIntVolatile(var1, var2);
        } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));

        return var5;
    }

内部调用unsafe的getAndAddInt方法,在getAndAddInt方法中主要是看compareAndSwapInt方法:

    public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);

该方法为本地方法,有四个参数,分别代表:对象、对象的地址、预期值、修改值

CAS缺陷

CAS虽然高效地解决了原子操作,但是还是存在一些缺陷的,主要表现在三个方法:循环时间太长、只能保证一个共享变量原子操作、ABA问题。

循环时间太长
如果CAS一直不成功呢?这种情况绝对有可能发生,如果自旋CAS长时间地不成功,则会给CPU带来非常大的开销。在JUC中有些地方就限制了CAS自旋的次数,例如BlockingQueue的SynchronousQueue。
JDK 1.8 中新增的 LongAdder,通过把原值进行拆分,最后再以 sum 的方式,减少 CAS 操作冲突的概率,性能要比 AtomicLong 高出 10 倍左右。

只能保证一个共享变量原子操作
看了CAS的实现就知道这只能针对一个共享变量,如果是多个共享变量就只能使用锁了,当然如果你有办法把多个变量整成一个变量,利用CAS也不错。例如读写锁中state的高地位

ABA问题
CAS需要检查操作值有没有发生改变,如果没有发生改变则更新。但是存在这样一种情况:ABA 问题指的是,线程拿到了最初的预期原值 A,然而在将要进行 CAS 的时候,被其他线程抢占了执行权,把此值从 A 变成了 B,然后其他线程又把此值从 B 变成了 A,然而此时的 A 值已经并非原来的 A 值了,但最初的线程并不知道这个情况,在它进行 CAS 的时候,只对比了预期原值为 A 就进行了修改,这就造成了 ABA 的问题。

以警匪剧为例,假如某人把装了 100W 现金的箱子放在了家里,几分钟之后要拿它去赎人,然而在趁他不注意的时候,进来了一个小偷,用空箱子换走了装满钱的箱子,当某人进来之后看到箱子还是一模一样的,他会以为这就是原来的箱子,就拿着它去赎人了,这种情况肯定有问题,因为箱子已经是空的了,这就是 ABA 的问题。

对于ABA问题其解决方案是加上版本号,即在每个变量都加上一个版本号,每次改变时加1,即A —> B —> A,变成1A —> 2B —> 3A。

实际上java提供了AtomicStampedReference来解决ABA问题,此类维护了一个“版本号” Stamp,每次在比较时不止比较当前值还比较版本号,这样就解决了 ABA 的问题
AtomicStampedReference的compareAndSet()方法定义如下:该方法有4个参数,预期引用、更新后的引用、预期标志、更新后的标志。

public boolean compareAndSet(V   expectedReference,
                                 V   newReference,
                                 int expectedStamp,
                                 int newStamp) {
        Pair<V> current = pair;
        return
        //预期的引用等于当前引用,预期标识等于当前标识,更新后的引用和标志等于当前引用和标志则返回true,
        //否则通过Pair生成一个新的当前pair CAS替换
            expectedReference == current.reference &&
            expectedStamp == current.stamp &&
            ((newReference == current.reference &&
              newStamp == current.stamp) ||
             casPair(current, Pair.of(newReference, newStamp)));
    }

Pair为AtomicStampedReference的内部类,主要用于记录引用和版本戳信息(标识),定义如下:

private static class Pair<T> {
        final T reference;
        final int stamp;
        private Pair(T reference, int stamp) {
            this.reference = reference;
            this.stamp = stamp;
        }
        static <T> Pair<T> of(T reference, int stamp) {
            return new Pair<T>(reference, stamp);
        }
    }

Pair记录着对象的引用和版本戳,版本戳为int型,保持自增。同时Pair是一个不可变对象,其所有属性全部定义为final,对外提供一个of方法,该方法返回一个新建的Pari对象。pair对象定义为volatile,保证多线程环境下的可见性。在AtomicStampedReference中,大多方法都是通过调用Pair的of方法来产生一个新的Pair对象,然后赋值给变量pair。

下面来通过一个例子来说明AtomicInteger和AtomicStampedReference对于ABA问题执行结果

public class ABATest {
    private static AtomicInteger atomicInteger = new AtomicInteger(100);
    private static AtomicStampedReference<Integer> atomicStampedReference = new AtomicStampedReference<Integer>(100,0);

    public static void main(String[] args) throws InterruptedException {
        Thread thread1 = new Thread(new Runnable() {
            @Override
            public void run() {
                atomicInteger.compareAndSet(100, 110);
                atomicInteger.compareAndSet(110, 100);
            }
        });


        Thread thread2 = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    TimeUnit.SECONDS.sleep(2);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("AtomicInteger----->" + atomicInteger.compareAndSet(100,110));
            }
        });

        thread1.start();
        thread2.start();
        thread1.join();
        thread2.join();

        Thread thread3 = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    TimeUnit.SECONDS.sleep(2);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                atomicStampedReference.compareAndSet(100, 110, atomicStampedReference.getStamp(),
                        atomicStampedReference.getStamp() + 1);
                atomicStampedReference.compareAndSet(110, 100, atomicStampedReference.getStamp(),
                        atomicStampedReference.getStamp() + 1);
            }
        });

        Thread thread4 = new Thread(new Runnable() {
            @Override
            public void run() {
                int stamp = atomicStampedReference.getStamp();
                System.out.println("before stamp---->" + stamp);
                try {
                    TimeUnit.SECONDS.sleep(3);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("after stamp----->" + atomicStampedReference.getStamp());
                System.out.println("AtomicStampedReference----->" + atomicStampedReference.compareAndSet(100,110,stamp,stamp+1));
            }
        });

        thread3.start();
        thread4.start();
    }
}

运行结果如下:

AtomicInteger----->true
before stamp---->0
after stamp----->2
AtomicStampedReference----->false

明显可以看出AtomicInteger执行CAS是成功了,加了stamp的AtomicStampedReference执行CAS是失败的

end,感谢阅读!!!