CAS与AQS介绍
目录
CAS介绍
CAS是什么?全称是Compare And Swap比较并交换,这是一种无锁解决线程安全的方案,我们都知道原子类都是线程安全的,他们都是基于CAS实现的(例如AtomicInteger 等)
原理
CAS 操作包含三个操作数 —— 内存位置(V)、预期原值(A)和新值(B)。如果内存位置的值V与预期原值A相匹配,那么处理器会自动将该位置值V更新为新值B,否则,处理器不做任何操作,整个操作保证了原子性,即在对比V==A后、设置V=B之前不会有其他线程修改V的值。
在Java中则是通过Unsafe类实现,该类中提供了底层的调用方法:
/** 拿对象o在内存偏移offset处的对象与expected比较,如果相等,则设置o.offset=x并返回true,否则返回false */
public final native boolean compareAndSwapObject(Object o, long offset, Object expected, Object x);
/** 拿对象o在内存偏移offset处的long值与expected比较,如果相等则设置o.offset=x */
public final native boolean compareAndSwapLong(Object o, long offset, long expected, long x);
/** 拿对象o在内存偏移offset处的int值与expected比较,如果相等则设置o.offset=x */
public final native boolean compareAndSwapInt(Object o, long offset, int expected, int x);
/** 获取字段f的偏移量 */
public native long objectFieldOffset(Field f);
/** 获取静态field在对象中的偏移量 */
public native long staticFieldOffset(Field f);
存在的问题
1.ABA问题:正如上所说对比原值和内存的值一致才会更新新值,那么就会存在ABA问题,一个线程先把值从A改成了B,又从B改成了A,那其他线程再更新判断的时候就会觉得值没变没被修改过,于是会执行修改操作,但事实上已经被修改了,虽然对结果没影响但是在一定的场景中却存在着问题,解决方案就是添加版本号,每次修改后版本号+1
2.失败场景:既然值一致才会更新,那么不一致就会更新失败,在原子类中采用了自旋的做法,失败了就一直CAS重试,这或许会导致长时间的阻塞,实际业务场景中需要考虑失败的场景
AQS介绍
AQS是什么?全称是AbstractQueuedSynchronizer翻译过来就是抽象的队列式同步器,是一个抽象类,可以说是JUC包里面的核心,为啥平时开发没用过呢?因为这个是一个多线程访问共享资源的同步器框架,当然不能直接用了,但他的其他实现类你们一定很熟,ReentrantLock、CountDownLatch、Semaphore、ReentrantReadWriteLock等等
一、框架结构以及核心方法
先看看内部变量:
// 头节点
private transient volatile Node head;
// 尾节点
private transient volatile Node tail;
// 共享资源
private volatile int state;
// 节点定义如下(部分代码,关键的定义):
static final class Node {
// 代表节点是共享节点
static final Node SHARED = new Node();
// 代表节点是独占节点
static final Node EXCLUSIVE = null;
// 节点状态
volatile int waitStatus;
// 上一个节点
volatile Node prev;
// 下一个节点
volatile Node next;
// 线程信息
volatile Thread thread;
}
从上面看底层数据结构是双向链表,根据名称结合可以猜测底层是一个双向链表为基础的双向队列,节点里面保存着线程信息、节点类型、节点状态,先不管这些是干嘛的,大家可以先想一个问题
你会怎么解决多线程里面共享资源的占用问题(保证线程安全)?
最简单的是不是加一个标记变量,为0时代表可以操作资源;为1时代表资源正在被其他线程操作,此时操作不了要等待吧,要管理正在等待中的线程,那就用队列嘛;那这样我是不是只需要保证标记变量的操作是原子性的就可以解决多线程共享资源占用的安全问题了
以上只是个思路,并不代表AQS是这样做的,AQS设计的更加全面、更加细致化,但是你可以把上面的思路代入到AQS的理解学习中,你会更加轻松
AQS结构
如上所说可以把state当做标记变量,队列则是由装载等待线程信息的节点构成
state资源变量的访问方式有三种(变量的操作是安全的):
- getState():因为变量被volatile修饰所以此操作具有原子性
- setState():因为变量被volatile修饰所以此操作具有原子性
- compareAndSetState():CAS保证对state的操作具有原子性
线程节点有两种类型,说明AQS有两种资源共享方式:
- Exclusive:独占型,只有一个线程能执行,如ReentrantLock
- Share:共享型,可以多个线程同时执行,如CountDownLatch
节点的五种状态
线程节点有5种状态,用来表示当前线程的等待状态(一个节点就是一个线程嘛):
- CANCELLED(1):表示当前节点已取消调度。当timeout或被中断时,会触发变更为此状态
- SIGNAL(-1):表示后继节点在等待当前节点唤醒。后继结点入队时,会将前继节点的状态更新为SIGNAL。
- CONDITION(-2):表示结点等待在Condition上,当其他线程调用了Condition的signal()方法后,CONDITION状态的结点将从等待队列转移到同步队列中,等待获取同步锁。
- PROPAGATE(-3):共享模式下,前继结点不仅会唤醒其后继结点,同时也可能会唤醒后继的后继结点。
- 0:新结点入队时的默认状态。
从状态上看,负值代表有效的等待状态,正值就代表节点已经无效了
核心方法
AQS是同步器框架,所以很多东西都已经实现好了,我们如果需要用的话只需要实现以下几个方法就可以了:
- isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它。
- tryAcquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false。
- tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false。
- tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
- tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。
因为有两种资源模式嘛,所以以上的4个方法需要两两组合来使用tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared,为什么一定要两两组合,因为获取资源就有释放资源嘛
下面我们随着这几个方法一起去看看到底是怎么实现的
二、独占模式
我们找到获取资源和释放资源的顶层入口acquire(int)和release(int)
获取资源acquire(int)
- tryAcquire():尝试获取资源,成功:true 失败:false (成功就不会有下面的操作了)
- addWaiter():加入等待队列尾部,因为获取失败了嘛;传参代表生成独占节点类型
- acquireQueued():让线程阻塞在等待队列中,直到获取资源,然后返回是否有被中断的状态;被中断返回true,否则就是false
- selfInterrupt():线程中断;从上面能看的出来能到这只有获取了资源并且被中断过才行,所以线程在等待过程被中断的时候并不是立马响应的,而且在获取资源后才进行的自我中断
源代码如下:
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)){
// 过程有被中断过就会自我中断一次
selfInterrupt();
}
}
// 自我中断
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
tryAcquire()
尝试获取资源,AQS只是一个框架,具体资源的获取/释放是需要自己去实现的,但不能违反原则,即获取资源返回true ,没获取到返回false (要是返回状态错了就尴尬了)
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
addWaiter()
- 先根据传参选择创建独占节点 or 共享节点
- 尝试直接在队尾加入
- 死循环加入队尾直至成功
这里面节点的更新都是CAS操作,变量也是volatile ,循环其实就是自旋
private Node addWaiter(Node mode) {
// 根据传参 选择创建独占节点 还是 共享节点
Node node = new Node(Thread.currentThread(), mode);
// 尝试直接在队尾加入节点
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 上面加入失败了 就死循环 直至加入成功
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
// 尾节点为空说明队列 为空 需要先初始化头节点
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
acquireQueued()
- 到了这节点已经被加入到队列中了,所以会先判断一次是否能获取到资源
- 没获取到则会检查前驱状态、设置前驱状态,并进入等待状态,等待被唤醒(unpark or interrupt)
- 被唤醒后又会判断一次是否能获取到资源,获取到了就把自己设置成头节点返回,没拿到继续步骤2
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true; //标记是否成功拿到资源
try {
boolean interrupted = false; //标记等待过程中是否被中断过
// 死循环
for (;;) {
// 获取前驱节点
final Node p = node.predecessor();
// 前驱节点就是头结点 则尝试获取资源
if (p == head && tryAcquire(arg)) {
// 获取资源成功了 就把自己设置成头节点 返回中断状态
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 先是判断前驱节点状态以及设置设置前驱节点状态
// 状态都设置完了 就使线程进入等待状态,唤醒后返回是否被中断标记
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()){
interrupted = true;
}
}
} finally {
// 获取资源失败了,就要取消节点在队列中的等待
if (failed)
cancelAcquire(node);
}
}
// 该方法主要是设置前驱节点的状态 以及判断前驱节点的状态是否失效
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
// 前驱节点状态已经失效了 就一直往前找,找到一个最近的正常状态节点,并排它后面
// 失效的节点会被移除
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 前驱节点正常,就把前驱的状态设置成SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
private final boolean parkAndCheckInterrupt() {
//调用park()使线程进入waiting状态
LockSupport.park(this);
// 返回是否中断过
return Thread.interrupted();
}
总结
- 先尝试获取资源,成功则直接返回
- 没获取到资源就要进队等待,于是要加入队尾
- 然后就进行自旋,进入等待状态前先尝试获取一下资源,获取不到则检查及设置前驱状态,然后再进入等待状态休息
- 等待时被unpark()唤醒或者被interrupt()中断,就继续步骤3,直到获取到资源
- 获取到资源后根据是否有被中断过返回,根据状态判断是否自我中断一次
释放资源release(int)
释放资源逻辑挺简单的,就是尝试释放资源,释放成功就返回true ,释放失败就返回false,释放成功后多了一步唤醒节点的操作
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
// 判断头节点是否存在 以及状态是否等于0
if (h != null && h.waitStatus != 0)
// 头节点存在状态不等于0 说明可能存在后继等待节点 需要去唤醒它
unparkSuccessor(h);
return true;
}
return false;
}
tryRelease()
这一步与尝试获取资源是对应的,对于独占模式来说,获取多少资源就必要释放多少资源,这一定要对应好
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
unparkSuccessor()
这一步总体逻辑也很简单,就是找到队列里在排队的下一个有效节点然后唤醒它
为什么是唤醒头节点的下一个有效节点唤醒,而不是唤醒头节点呢?
在上面有认真看的就会知道,队列里的节点获取到资源后会自己设置成头节点,所以需要唤醒的是头节点的下一个有效节点
为什么下一个有效节点是从后往前找而不是从前往后找?
因为双向链表分next链和prev链,我们在进行操作的时候prev链是强一致的,因为是尾部入队,每一个加入的节点prev都不会为空,但next会存在某些情况为空,因为入队后,还需要把上一个节点的next设置成最新的尾部节点,这整个过程其实分了两步了,这就导致了间隙可能存在并发问题导致next链并不完整
private void unparkSuccessor(Node node) {
// 获取头节点状态
int ws = node.waitStatus;
// 把状态置为 0
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// 获取后继节点
Node s = node.next;
// 如果后继节点为空或者后继节点状态无效
if (s == null || s.waitStatus > 0) {
s = null;
// 从后往前找 找到一个离自己最近的一个状态有效的节点
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 将此节点唤醒
if (s != null)
LockSupport.unpark(s.thread);
}
总结
很简单就两步 1.释放资源 2.唤醒队列中下一个有效节点
三、共享模式
与上面一样我们直接看获取资源和释放资源的顶层入口 acquireShared(int)、releaseShared()
acquireShared(int)
共享模式其实整体流程和独占模式差不多:
- tryAcquireShared()尝试获取资源,成功则返回
- 资源获取失败则进去等待队列直至获取成功
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
tryAcquireShared()
这个与独占模式一样,都是需要自己实现的,但原则变了,返回的不是资源获取的成功与否,而且返回存留的资源数,返回<0 则代表资源获取失败,≥0则代表资源获取成功 (可以看做是一个资源池子,不能透支获取)
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
doAcquireShared()
整个过程与独占模式基本一致,但是因为是共享模式所以同一时间自然不可能只有一个线程能获取到资源,所以每个线程获取完资源后,都会尝试唤醒等待队列里面的其他节点,万一存留的资源够其他节点获取资源的需求呢,但是前提条件只有满足当前线程的资源需求才会尝试唤醒其他节点
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED); //共享节点 加入队列尾部
boolean failed = true; //是否成功标志
try {
boolean interrupted = false; //等待过程中是否被中断过的标志
for (;;) {
final Node p = node.predecessor();//获取前驱节点
if (p == head) {
int r = tryAcquireShared(arg); // 尝试获取资源
if (r >= 0) {
// 如果存留资源有多余的还可以尝试唤醒下一个有效节点
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
// 先是判断前驱节点状态以及设置设置前驱节点状态
// 状态都设置完了 就使线程进入等待状态,唤醒后返回是否被中断标记
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
// 获取资源失败了,就要取消节点在队列中的等待
cancelAcquire(node);
}
}
setHeadAndPropagate()如下:
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
// 将自己设置为头节点
setHead(node);
// 资源如果还有多余的 就尝试唤醒下一个共享节点
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
// 唤醒后继结点 (方法见下面释放资源)
doReleaseShared();
}
}
releaseShared()
与独占模式一样,尝试释放资源,然后再唤醒下一个节点,释放资源这个就不再多说了需要自己实现的
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
doReleaseShared()
private void doReleaseShared() {
for (;;) {
Node h = head;
// 头节点不为空 且队列还存在其他节点
if (h != null && h != tail) {
int ws = h.waitStatus;
// 代表需要唤醒后一个有效节点
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
// 以上操作直至没有改变 则会退出循环
// 为啥要死循环 因为共享模式可以唤醒多个节点 没节点唤醒了,无事可做了就退出循环呗
if (h == head) // loop if head changed
break;
}
}
总结
共享模式总体上与独占模式流程一致,但多了一步节点的循环唤醒,在尝试获取资源的语义上也有所区别
四、可中断独占和共享
上面我们分别看了独占模式和共享模式整体工作模式,但是不知道大家发现没有两种模式下的线程都不可被中断,就是发生中断后依旧还是会排队等待获取资源,直至获取成功后才会补上一个自我中断
AQS作为一个同步器框架自然不会考虑不到这一点,所以还提供了可被中断的独占和共享,与不可被中断相比只是换了两个获取资源的入口,acquireInterruptibly() 和acquireSharedInterruptibly()
我们以可中断独占模式为例acquireInterruptibly():
依旧还是会尝试获取资源这个没变,但没获取资源后 进入了可中断的处理方法
public final void acquireInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
doAcquireInterruptibly:
该方法基本与独占模式那里一致,唯一的区别就是没用中断标识位了,而且在发生中断后直接抛出了中断异常,抛出异常后结果就是该节点将会变成失效节点被移除队列
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
// 区别所在 直接抛出中断异常
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
可中断的共享模式与之一样就不在阐述了
五、利用AQS实现一把锁
到这AQS的核心点基本都完了,或许还有点懵逼吧,没事,我们来做个例子加强理解,利用AQS实现一把简单的锁,这里要记住两个核心的语义:
- 独占模式:获取到资源返回True,没获取到返回false 然后进入队列排队
- 共享模式:获取到资源返回剩下的资源数,≥0代表获取资源成功,<0代表资源不够你获取了(获取失败)
好了,接下来我们通过独占模式实现一把小锁:
- 实现Lock接口
- 内部类实现AQS,重写tryAcquire和tryRelease方法
package com.example.spkie;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
/**
* @program: spkie
* @description:
* @author: colins
* @create: 2022-11-04 21:43
**/
public class MyLock implements Lock {
private final MyAqs myAqs;
public MyLock() {
myAqs = new MyAqs();
}
class MyAqs extends AbstractQueuedSynchronizer {
@Override
protected boolean tryAcquire(int arg) {
// 获取当前资源标记
int state = getState();
if (state == 1) {
// 标记等于1代表资源已经被占用了,所以返回false 表示获取失败 需要排队
return false;
}else {
// 不等于1我就尝试获取资源 用CAS
if(compareAndSetState(state,1)){
// 修改成功代表我获取资源成功 返回true
return true;
}
// 修改失败代表我获取资源失败 返回false 需要排队
return false;
}
}
@Override
protected boolean tryRelease(int arg) {
// 获取资源
int state = getState();
// 资源等于 1 表示被占用 所以我才需要释放
if(state==1){
// 释放就是把资源变成 0 嘛
if(compareAndSetState(state,0)){
// 修改成功代表释放资源成功
return true;
}
//修改失败代表释放资源失败
return false;
}else {
// 资源不等于1 说明不需要释放直接返回
return false;
}
}
}
@Override
public void lock() {
// 加锁就是获取资源 尝试获取资源我们已经重写了 这里直接调用AQS独占模式
// 因为我们写死了获取和释放资源都是 1 ,所以这里传什么无所谓
myAqs.acquire(1);
}
@Override
public void lockInterruptibly() throws InterruptedException {
// 可中断的这里就不演示了
}
@Override
public boolean tryLock() {
// 尝试获取锁 就是尝试获取资源 也就是我们重写的那个AQS方法
// 因为我们写死了获取和释放资源都是 1 ,所以这里传什么无所谓
return myAqs.tryAcquire(1);
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
// 一段时间内尝试获取资源 到时间超时 AQS里面也有方法直接调用即可
// 这里不演示了
// myAqs.tryAcquireNanos();
return false;
}
@Override
public void unlock() {
// 解锁 就是释放资源
// 因为我们写死了获取和释放资源都是 1 ,所以这里传什么无所谓
myAqs.release(1);
}
@Override
public Condition newCondition() {
return null;
}
}
测试:
public static int aa=0;
public static void main(String[] args) throws InterruptedException {
MyLock lock=new MyLock();
for (int i = 0; i < 10; i++) {
new Thread(new Runnable() {
@Override
public void run() {
for (int j = 0; j < 1000; j++) {
lock.lock();
try{
aa++;
}finally {
lock.unlock();
}
}
}
}).start();
}
Thread.sleep(3000);
System.out.println(aa);
}
// 结果是10000 代表锁OK
// 把锁拿掉 结果 <=10000 代表出现了线程安全问题
通过上述例子可以知道,AQS最最关键的就是那几个可以重写的方法,怎么利用那几个方法变得尤其重要,而要懂得用那几个方法,最关键的是搞清楚两种模式的语义,也就是重写的方法代表了什么,返回代表了什么
案例:
- ReentrantLock:利用AQS实现了公平锁、非公平锁、可重入锁
- ReentrantReadWriteLock:利用AQS使用读锁、写锁、公平、非公平
- CountDownLatch:利用AQS反向实现了计数同步器
还有Cyclicbarrier、Semaphore等都是利用AQS