学无先后,达者为师

网站首页 编程语言 正文

并发编程之CAS和Atomic原子操作类

作者:bingtanghulu_6 更新时间: 2022-05-11 编程语言

目录

1. 线程安全问题 

1.1 CAS工具类代码 

2. 什么是CAS?

2. CAS的java实现

2.1  创建一个工厂类获取unsafe对象

2.2 测试unsafe的三个方法

3.ABA问题

3.1 什么是ABA问题?

 3.1 ABA解决方案

4.Atomic原子操作类详解

 4.1 AtomicInteger

4.2 AtomicIntegerArray 

4.3 AtomicReference

4.4 AtomicIntegerFieldUpdater

4.5 LongAdder

4.6 LongAccumulator 


1. 线程安全问题 

 sum++不能保证原子性,volatile只能保证可见性和有序性,为了解决线程安全问题,有以下三种解决方案:

1. synchronized

2. reentrantLock

3. CAS

import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;

public class testCASTest {

    private volatile static int sum = 0;

    private static Object obj = "";

    private static ReentrantLock lock = new ReentrantLock();

    private static CASLock casLock = new CASLock();

    private static AtomicInteger atomicInteger = new AtomicInteger(0);

    public static void main(String[] args) {

        //模拟10个线程计算10000的数,出现计算不等于100000的场景
//        for(int i =0;i<10;i++){
//            new Thread(()->{
//                for(int j =0;j<10000;j++){
//                    sum++;
//                }
//            }).start();
//        }

        //1.第一种方法,使用synchronized解决问题
//        for(int i =0;i<10;i++){
//            Thread thread = new Thread(()->{
//                synchronized (obj){
//                    for(int j =0;j<10000;j++){
//                        sum++;
//                    }
//                }
//            });
//            thread.start();
//        }

        //第二种方法,使用ReenTrantLock解决问题
//        for(int i =0;i<10;i++){
//            Thread thread = new Thread(()->{
//
//                lock.lock();
//                try{
//                    for(int j =0;j<10000;j++){
//                        sum++;
//                    }
//                }finally {
//                    lock.unlock();
//                }
//            });
//            thread.start();
//        }

        //第三种方法,使用CAS空转方式
//        for(int i =0;i<10;i++){
//            Thread thread = new Thread(()->{
//
//                //CAS空转的方式
//                for(;;){
//                    //当state为0时并且做一次CAS赋值操作时可以处理业务逻辑
//                    if(casLock.getState() == 0 && casLock.cas()){
//                        try{
//                            for(int j =0;j<10000;j++){
//                                sum++;
//                            }
//                        }finally {
//                            casLock.setState(0);
//                        }
//                        break;
//                    }
//                }
//            });
//            thread.start();
//        }
        System.out.println(sum);
        //第四种方法,使用CAS自带的工具类atomicInteger
        for(int i =0;i<10;i++){
            Thread thread = new Thread(()->{
                for(int j =0;j<10000;j++){
                    atomicInteger.incrementAndGet();
                }
            });
            thread.start();
        }

        //休眠3秒,模拟业务操作,等待10个线程计算完毕再打印最终值
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(atomicInteger.get());
    }
}

1.1 CAS工具类代码 

 CAS工具类代码如下:

import sun.misc.Unsafe;

public class CASLock {

    //加锁标记
    private volatile int state;
    private static final Unsafe UNSAFE;
    private static final long OFFSET;

    static {
        try {
            UNSAFE = UnsafeFactory.getUnsafe();
            OFFSET = UnsafeFactory.getFieldOffset(
                    UNSAFE, CASLock.class, "state");
        } catch (Exception e) {
            throw new Error(e);
        }
    }

    public boolean cas() {
        return UNSAFE.compareAndSwapInt(this, OFFSET, 0, 1);
    }

    public int getState() {
        return state;
    }

    public void setState(int state) {
        this.state = state;
    }

}

import java.lang.reflect.Field;

import sun.misc.Unsafe;

public class UnsafeFactory {

    /**
     * 获取 Unsafe 对象
     * @return
     */
    public static Unsafe getUnsafe() {
        try {
            Field field = Unsafe.class.getDeclaredField("theUnsafe");
            field.setAccessible(true);
            return (Unsafe) field.get(null);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }

    /**
     * 获取字段的内存偏移量
     * @param unsafe
     * @param clazz
     * @param fieldName
     * @return
     */
    public static long getFieldOffset(Unsafe unsafe, Class clazz, String fieldName) {
        try {
            return unsafe.objectFieldOffset(clazz.getDeclaredField(fieldName));
        } catch (NoSuchFieldException e) {
            throw new Error(e);
        }
    }


}

 

2. 什么是CAS?

CAS是一组原子操作通过跟某一时刻内存中的值进行比较,如果相等就赋予新值。

cas硬件上实现了并发三大特性的原子性,通过自旋锁的机制实现了自增操作。

CAS的三个问题:

        1.自旋时间过长造成CPU开销过大

        2.只能对一个共享变量进行原子操作。

        3.ABA问题(基本用不到,在CAS更新以后中间可能有线程把值改回原来的值比较过后仍然可以更改,造成数据紊乱)

//伪代码
if(value==expectValue){
    value == newValue;
}
import java.util.concurrent.atomic.AtomicInteger;

public class AtomicIntegerTest {

    static AtomicInteger sum = new AtomicInteger(0);

    public static void main(String[] args) {

        for (int i = 0; i < 10; i++) {
            Thread thread = new Thread(() -> {
                for (int j = 0; j < 10000; j++) {
                    // 原子自增  CAS
                    sum.incrementAndGet();
                    //count++;

                }
            });
            thread.start();
        }

        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(sum.get());

    }

}

 

2. CAS的java实现

cas中提供了Unsafe类的实现。常用的几种操作不同类型的方法如下截图。

compareAndSwapObject compareAndSwapInt compareAndSwapLong

2.1  创建一个工厂类获取unsafe对象

import java.lang.reflect.Field;

import sun.misc.Unsafe;

//获取unsafe对象和获取字段内存偏移量
public class UnsafeFactory {

    /**
     * 获取 Unsafe 对象
     * @return
     */
    public static Unsafe getUnsafe() {
        try {
            Field field = Unsafe.class.getDeclaredField("theUnsafe");
            field.setAccessible(true);
            return (Unsafe) field.get(null);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }

    /**
     * 获取字段的内存偏移量
     * @param unsafe
     * @param clazz
     * @param fieldName
     * @return
     */
    public static long getFieldOffset(Unsafe unsafe, Class clazz, String fieldName) {
        try {
            return unsafe.objectFieldOffset(clazz.getDeclaredField(fieldName));
        } catch (NoSuchFieldException e) {
            throw new Error(e);
        }
    }


}

2.2 测试unsafe的三个方法

import *.UnsafeFactory;
import sun.misc.Unsafe;


public class CASTest {

    public static void main(String[] args) {
        Entity entity = new Entity();

        Unsafe unsafe = UnsafeFactory.getUnsafe();

        //计算偏移量对象头8+对象指针4
        long offset = UnsafeFactory.getFieldOffset(unsafe, Entity.class, "x");
        System.out.println(offset);
        boolean successful;

        // 4个参数分别是:对象实例、字段的内存偏移量、字段期望值、字段更新值
        successful = unsafe.compareAndSwapInt(entity, offset, 0, 3);
        System.out.println(successful + "\t" + entity.x);

        successful = unsafe.compareAndSwapInt(entity, offset, 3, 5);
        System.out.println(successful + "\t" + entity.x);

        successful = unsafe.compareAndSwapInt(entity, offset, 3, 8);
        System.out.println(successful + "\t" + entity.x);

    }


}


class Entity{
    int x;
}

3.ABA问题

3.1 什么是ABA问题?

ABA问题就是CAS获取内存值进行比较再进行赋值的过程存在时间差,在这个时间差内可能有线程将其更改为初始值而不被下一个线程察觉,仍然会更改成功。

import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.LockSupport;

import lombok.extern.slf4j.Slf4j;

//ABA问题复现
@Slf4j
public class ABATest {

    public static void main(String[] args) {
        AtomicInteger atomicInteger = new AtomicInteger(1);

        new Thread(()->{
            int value = atomicInteger.get();
            log.debug("Thread1 read value: " + value);

            // 阻塞1s
            LockSupport.parkNanos(1000000000L);

            // Thread1通过CAS修改value值为3
            if (atomicInteger.compareAndSet(value, 3)) {
                log.debug("Thread1 update from " + value + " to 3");
            } else {
                log.debug("Thread1 update fail!");
            }
        },"Thread1").start();

        new Thread(()->{
            int value = atomicInteger.get();
            log.debug("Thread2 read value: " + value);
            // Thread2通过CAS修改value值为2
            if (atomicInteger.compareAndSet(value, 2)) {
                log.debug("Thread2 update from " + value + " to 2");

                // do something
                value = atomicInteger.get();
                log.debug("Thread2 read value: " + value);
                // Thread2通过CAS修改value值为1
                if (atomicInteger.compareAndSet(value, 1)) {
                    log.debug("Thread2 update from " + value + " to 1");
                }
            }
        },"Thread2").start();
    }
}

 3.1 ABA解决方案

import java.util.concurrent.atomic.AtomicStampedReference;
import java.util.concurrent.locks.LockSupport;

import lombok.extern.slf4j.Slf4j;


@Slf4j
public class AtomicStampedReferenceTest {

    public static void main(String[] args) {
        // 定义AtomicStampedReference    Pair.reference值为1, Pair.stamp为1, stamp是版本
        AtomicStampedReference atomicStampedReference = new AtomicStampedReference(1,1);

        new Thread(()->{
            int[] stampHolder = new int[1];
            int value = (int) atomicStampedReference.get(stampHolder);
            int stamp = stampHolder[0];
            log.debug("Thread1 read value: " + value + ", stamp: " + stamp);

            // 阻塞1s
            LockSupport.parkNanos(1000000000L);
            // Thread1通过CAS修改value值为3   stamp是版本,每次修改可以通过+1保证版本唯一性
            if (atomicStampedReference.compareAndSet(value, 3,stamp,stamp+1)) {
                log.debug("Thread1 update from " + value + " to 3");
            } else {
                log.debug("Thread1 update fail!");
            }
        },"Thread1").start();

        new Thread(()->{
            int[] stampHolder = new int[1];
            int value = (int)atomicStampedReference.get(stampHolder);
            int stamp = stampHolder[0];
            log.debug("Thread2 read value: " + value+ ", stamp: " + stamp);
            // Thread2通过CAS修改value值为2
            if (atomicStampedReference.compareAndSet(value, 2,stamp,stamp+1)) {
                log.debug("Thread2 update from " + value + " to 2");

                // do something

                value = (int) atomicStampedReference.get(stampHolder);
                stamp = stampHolder[0];
                log.debug("Thread2 read value: " + value+ ", stamp: " + stamp);
                // Thread2通过CAS修改value值为1
                if (atomicStampedReference.compareAndSet(value, 1,stamp,stamp+1)) {
                    log.debug("Thread2 update from " + value + " to 1");
                }
            }
        },"Thread2").start();
    }
}

4.Atomic原子操作类详解

atomic是java提供的java.util.concurrent.atomic包下的用于解决线程问题的操作类。我们常用的一般都是synchronized关键字,但是用此包下面的类会更加高效。

在java.util.concurrent.atomic包里提供了一组原子操作类:

基本类型:AtomicInteger、AtomicLong、AtomicBoolean;

引用类型:AtomicReference、AtomicStampedRerence、AtomicMarkableReference;

数组类型:AtomicIntegerArray、AtomicLongArray、AtomicReferenceArray

对象属性原子修改器:AtomicIntegerFieldUpdater、AtomicLongFieldUpdater、AtomicReferenceFieldUpdater

原子类型累加器(jdk1.8增加的类):DoubleAccumulator、DoubleAdder、LongAccumulator、LongAdder、Striped64

 4.1 AtomicInteger

常用方法总结:

// getAndIncrement() 以原子的方式将实例中的原值加1,返回的是自增前的旧值;

//getAndSet(int newValue):将实例中的值更新为新值,并返回旧值;

//incrementAndGet() :以原子的方式将实例中的原值进行加1操作,并返回最终相加后的结果;

//addAndGet(int delta) :以原子方式将输入的数值与实例中原本的值相加,并返回最后的结果;

import java.util.concurrent.atomic.AtomicInteger;

public class AtomicIntegerTest {

    static AtomicInteger sum = new AtomicInteger(0);

    public static void main(String[] args) {

        for (int i = 0; i < 10; i++) {
            Thread thread = new Thread(() -> {
                for (int j = 0; j < 10000; j++) {
                    // 原子自增  CAS
                    sum.incrementAndGet();
                    //count++;

                }
            });
            thread.start();
        }

        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(sum.get());

    }

}

4.2 AtomicIntegerArray 

//addAndGet(int i, int delta):以原子更新的方式将数组中索引为i的元素与输入值相加;

//getAndIncrement(int i):以原子更新的方式将数组中索引为i的元素自增加1;

//compareAndSet(int i, int expect, int update):将数组中索引为i的位置的元素进行更新

import java.util.concurrent.atomic.AtomicIntegerArray;


public class AtomicIntegerArrayTest {

    static int[] value = new int[]{ 1, 2, 3, 4, 5 };
    static AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(value);


    public static void main(String[] args) throws InterruptedException {

        //设置索引0的元素为100
        atomicIntegerArray.set(0, 100);
        System.out.println(atomicIntegerArray.get(0));
        //以原子更新的方式将数组中索引为1的元素与输入值相加
        atomicIntegerArray.getAndAdd(1,5);

        System.out.println(atomicIntegerArray);
    }
}

4.3 AtomicReference

AtomicReference作用是对普通对象的封装,它可以保证你在修改对象引用时的线程安全性。

import java.util.concurrent.atomic.AtomicReference;

import lombok.AllArgsConstructor;
import lombok.Data;

public class AtomicReferenceTest {

    public static void main( String[] args ) {
        User user1 = new User("张三", 23);
        User user2 = new User("李四", 25);
        User user3 = new User("王五", 20);

        //初始化为 user1
        AtomicReference atomicReference = new AtomicReference<>();
        atomicReference.set(user1);

        //把 user2 赋给 atomicReference
        atomicReference.compareAndSet(user1, user2);
        System.out.println(atomicReference.get());

        //把 user3 赋给 atomicReference
        atomicReference.compareAndSet(user1, user3);
        System.out.println(atomicReference.get());

    }

}


@Data
@AllArgsConstructor
class User {
    private String name;
    private Integer age;
}

4.4 AtomicIntegerFieldUpdater

AtomicIntegerFieldUpdater可以线程安全地更新对象中的整型变量。

对于AtomicIntegerFieldUpdater 的使用稍微有一些限制和约束,约束如下:

(1)字段必须是volatile类型的,在线程之间共享变量时保证立即可见.eg:volatile int value = 3

(2)字段的描述类型(修饰符public/protected/default/private)与调用者与操作对象字段的关系一致。也就是说调用者能够直接操作对象字段,那么就可以反射进行原子操作。但是对于父类的字段,子类是不能直接操作的,尽管子类可以访问父类的字段。

(3)只能是实例变量,不能是类变量,也就是说不能加static关键字。

(4)只能是可修改变量,不能使final变量,因为final的语义就是不可修改。实际上final的语义和volatile是有冲突的,这两个关键字不能同时存在。

(5)对于AtomicIntegerFieldUpdater和AtomicLongFieldUpdater只能修改int/long类型的字段,不能修改其包装类型(Integer/Long)。如果要修改包装类型就需要使用AtomicReferenceFieldUpdater。

import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;


public class AtomicIntegerFieldUpdaterTest {


    public static class Candidate {
        //字段必须是volatile类型
        volatile int score = 0;

        AtomicInteger score2 = new AtomicInteger();
    }

    public static final AtomicIntegerFieldUpdater scoreUpdater =
            AtomicIntegerFieldUpdater.newUpdater(Candidate.class, "score");

    public static AtomicInteger realScore = new AtomicInteger(0);

    public static void main(String[] args) throws InterruptedException {

        final Candidate candidate = new Candidate();

        Thread[] t = new Thread[10000];
        for (int i = 0; i < 10000; i++) {
            t[i] = new Thread(new Runnable() {
                @Override
                public void run() {
                    if (Math.random() > 0.4) {
                        candidate.score2.incrementAndGet();
                        scoreUpdater.incrementAndGet(candidate);
                        realScore.incrementAndGet();
                    }
                }
            });
            t[i].start();
        }
        for (int i = 0; i < 10000; i++) {
            t[i].join();
        }
        System.out.println("AtomicIntegerFieldUpdater Score=" + candidate.score);
        System.out.println("AtomicInteger Score=" + candidate.score2.get());
        System.out.println("realScore=" + realScore.get());

    }
}

4.5 LongAdder

为了解决atomicInteger等自旋失败的问题,创建了longadder等类,原理是通过对atomicInteger的数据进行分散槽位的形式,如果要获取真实的值进行相加操作即可。

在低并发情况下使用atomicInteger即可。在大量并发情况下使用longadder等类。

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;

public class LongAdderTest {

    public static void main(String[] args) {
        testAtomicLongVSLongAdder(10, 10000);
        System.out.println("==================");
        testAtomicLongVSLongAdder(10, 200000);
        System.out.println("==================");
        testAtomicLongVSLongAdder(100, 200000);
    }

    static void testAtomicLongVSLongAdder(final int threadCount, final int times) {
        try {
            long start = System.currentTimeMillis();
            testLongAdder(threadCount, times);
            long end = System.currentTimeMillis() - start;
            System.out.println("条件>>>>>>线程数:" + threadCount + ", 单线程操作计数" + times);
            System.out.println("结果>>>>>>LongAdder方式增加计数" + (threadCount * times) + "次,共计耗时:" + end);

            long start2 = System.currentTimeMillis();
            testAtomicLong(threadCount, times);
            long end2 = System.currentTimeMillis() - start2;
            System.out.println("条件>>>>>>线程数:" + threadCount + ", 单线程操作计数" + times);
            System.out.println("结果>>>>>>AtomicLong方式增加计数" + (threadCount * times) + "次,共计耗时:" + end2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    static void testAtomicLong(final int threadCount, final int times) throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(threadCount);
        AtomicLong atomicLong = new AtomicLong();
        for (int i = 0; i < threadCount; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    for (int j = 0; j < times; j++) {
                        atomicLong.incrementAndGet();
                    }
                    countDownLatch.countDown();
                }
            }, "my-thread" + i).start();
        }
        countDownLatch.await();
    }

    static void testLongAdder(final int threadCount, final int times) throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(threadCount);
        LongAdder longAdder = new LongAdder();
        for (int i = 0; i < threadCount; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    for (int j = 0; j < times; j++) {
                        longAdder.add(1);
                    }
                    countDownLatch.countDown();
                }
            }, "my-thread" + i).start();
        }

        countDownLatch.await();
    }
}

4.6 LongAccumulator 

LongAccumulator跟longadder原理相似,但是比longadder厉害的一点是可以实现对入参的任意操作。longadder只能实现加减操作。
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.LongAccumulator;
import java.util.stream.IntStream;

public class LongAccumulatorTest {

    public static void main(String[] args) throws InterruptedException {
        // 累加 x+y
        LongAccumulator accumulator = new LongAccumulator((x, y) -> x + y, 0);

        ExecutorService executor = Executors.newFixedThreadPool(8);
        // 1到9累加
        IntStream.range(1, 10).forEach(i -> executor.submit(() -> accumulator.accumulate(i)));

        Thread.sleep(2000);
        System.out.println(accumulator.getThenReset());

    }
}

原文链接:https://blog.csdn.net/qq_21575929/article/details/122849263

栏目分类
最近更新