Java并发编程入门这一篇就够了

  • 一、进程与线程
    • 1. 进程
    • 2. 线程
    • 3. 二者对比
  • 二、并行与并发
  • 三、Java线程
    • 1. 创建和运行线程
    • 2. 线程运行原理
    • 3. 常见方法
    • 4. 常用方法详解及异同区分
    • 5. 两阶段终止模式(使得线程优雅的退出)
    • 6.主线程与守护线程
    • 7. 线程五种状态
    • 8. 线程六种状态
  • 四、共享模型之锁(管程/Monitor)
    • 4.1 synchronized原理
      • 4.1.1. Java 对象头
      • 4.1.2. Monitor 原理
      • 4.1.3. synchronized 原理
      • 4.1.4. synchronized 原理进阶
        • 1. 轻量级锁
        • 2. 锁膨胀(重量级锁)
        • 3. 重量级锁的自旋优化
        • 4. 偏向锁
        • 5. 锁消除
        • 6. 锁粗化
    • 4.2 wait notify 原理
      • 4.2.1. wait notify使用的正确姿势
      • 4.2.2. 保护性暂停模式
      • 4.2.3. 生产者消费者模式
    • 4.3 Park & Unpark使用及原理
      • 4.3.1. 基本使用
      • 4.3.2. 原理
    • 4.4 线程状态转换再理解
    • 4.5 多把锁
    • 4.6 活跃性
      • 4.6.1. 死锁
      • 4.6.2. 活锁
      • 4.6.3. 饥饿锁
    • 4.7 ReentrantLock
      • 4.7.1. 可重入
      • 4.7.2. 可打断
      • 4.7.3. 锁超时
      • 4.7.4. 公平锁
      • 4.7.5. 条件变量
  • 五、同步模式之顺序控制
    • 5.1. 固定运行顺序
    • 5.2. 交替输出
  • 六、共享模型之内存
    • 6.0. Java 内存模型(JMM)
    • 6.1. 可见性
    • 6.2. CPU 缓存结构原理
      • 6.2.1. CPU 缓存结构
      • 6.2.2. CPU 缓存读
      • 6.2.3. CPU 缓存一致性
      • 6.2.4. 内存屏障
    • 6.3. 有序性
      • 6.3.1 指令级并行原理
    • 6.4. volatile原理
      • 6.4.1 double-checked locking 问题
      • 6.4.2 double-checked locking 解决
    • 6.5. 同步模式之 Balking
    • 6.6. 线程安全单例实现
  • 七、共享模型之无锁(无锁并发)
    • 7.1. CAS 与 volatile
    • 7.2. 为什么无锁效率高
    • 7.3. CAS 的特点
    • 7.4. 原子整数
    • 7.5. 原子引用
    • 7.6. 原子数组
    • 7.7. 字段更新器
    • 7.8. 原子累加器
    • 7.9. Unsafe(自定义原子类)
  • 八、变量的线程安全分析
    • 8.1. 成员变量和静态变量是否线程安全?
    • 8.2. 局部变量是否线程安全?
    • 8.3. 常见线程安全类
  • 九、共享模型之不可变
    • 9.1. 不可变设计
    • 9.2. 保护性拷贝
    • 9.3. 享元模式
      • 9.3.1 定义: 英文名称:Flyweight pattern. ==当需要重用数量有限的同一类对象时==
      • 9.3.2 应用
    • 9.4. final 原理
      • 9.4.1 设置 final 变量的原理
      • 9.4.2 获取 final 变量的原理
    • 9.5. 无状态
  • 十、共享模型之工具
    • 10.1. 自定义线程池
    • 10.2. ThreadPoolExecutor
      • 10.2.1. 线程池状态
      • 10.2.2.构造方法
      • 10.2.3. newFixedThreadPool(固定个数线程池)
      • 10.2.4.newCachedThreadPool(缓存线程池,线程个数不限)
      • 10.2.5.newSingleThreadExecutor(单个线程的线程池)
      • 10.2.6.线程池提交任务方法
      • 10.2.7. 线程池关闭方法
      • 10.2.8.ScheduledExecutorService(任务调度线程池)
      • 10.2.9. 正确处理执行任务异常
    • 10.3. Worker Thread(工作线程)模式
      • 10.3.1. 定义
      • 10.3.2. 饥饿(线程分工不明确导致的)
      • 10.3.3. 创建多少线程合适之CPU 密集型运算
      • 10.3.4. 创建多少线程合适之I/O 密集型运算
    • 10.4. AQS 原理
      • 10.4.1. 概述
      • 10.4.2. 基于AQS(抽象队列同步框架)实现不可重入锁
      • 10.4.3. AQS总结
      • 10.4.4. 主要用到 AQS 的并发工具类
    • 10.5. ReentrantLock 原理
      • 10.5.1. 非公平锁实现原理
        • 1 构造函数分析
        • 2 分析一下非公平锁的加锁实现
        • 3 没有竞争时非公平锁的状态
        • 4 有竞争时非公平锁的状态
        • 5 分析一下解锁过程
      • 10.5.2. 非公平锁可重入原理
      • 10.5.3. 可打断原理
      • 10.5.4. 公平锁实现原理
      • 10.5.5. 条件变量实现原理
    • 10.6. 读写锁(ReentrantReadWriteLock)
      • 10.6.1. 读写锁的应用之缓存
      • 10.6.2. 读写锁原理
    • 10.7. stampedlock介绍
    • 10.8. Semaphore

一、进程与线程

1. 进程

  • 程序由指令和数据组成,但这些指令要运行,数据要读写,就必须将指令加载至 CPU,数据加载至内存。在指令运行过程中还需要用到磁盘、网络等设备。进程就是用来加载指令、管理内存、管理 IO 的。
  • 当一个程序被运行,从磁盘加载这个程序的代码至内存,这时就开启了一个进程
  • 进程就可以视为程序的一个实例。大部分程序可以同时运行多个实例进程(例如记事本、画图、浏览器等),也有的程序只能启动一个实例进程(例如网易云音乐、360 安全卫士等)

2. 线程

  • 一个进程之内可以分为一到多个线程。
  • 一个线程就是一个指令流,将指令流中的一条条指令以一定的顺序交给 CPU 执行
  • Java 中,线程作为最小调度单位进程作为资源分配的最小单位。 在 windows 中进程是不活动的,只是作为线程的容器。

3. 二者对比

  • 进程基本上相互独立的,而线程存在于进程内,是进程的一个子集
  • 进程拥有共享的资源,如内存空间等,供其内部的线程共享
  • 进程间通信较为复杂
    同一台计算机的进程通信称为 IPC(Inter-process communication)
    不同计算机之间的进程通信,需要通过网络,并遵守共同的协议,例如 HTTP
  • 线程通信相对简单,因为它们共享进程内的内存,一个例子是多个线程可以访问同一个共享变量
  • 线程更轻量,线程上下文切换成本一般上要比进程上下文切换低

二、并行与并发

  • 单核 cpu 下,线程实际还是 串行执行的。操作系统中有一个组件叫做任务调度器,将 cpu 的时间片(windows下时间片最小约为 15 毫秒)分给不同的程序使用,只是由于 cpu 在线程间(时间片很短)的切换非常快,人类感觉是 同时运行的 。总结为一句话就是: 微观串行,宏观并行
  • 一般会将这种线程轮流使用 CPU 的做法称为并发, concurrent
  • 多核 cpu下,每个 核(core) 都可以调度运行线程,这时候线程可以是并行的。
  • 应用
  1. 实现异步调用
  2. 提高效率
  • 总结
  1. 单核 cpu 下,多线程不能实际提高程序运行效率,只是为了能够在不同的任务之间切换,不同线程轮流使用cpu ,不至于一个线程总占用 cpu,别的线程没法干活
  2. 多核 cpu 可以并行跑多个线程,但能否提高程序运行效率还是要分情况的
    有些任务,经过精心设计,将任务拆分,并行执行,当然可以提高程序的运行效率。但不是所有计算任务都能拆分(参考后文的【阿姆达尔定律】)。
    也不是所有任务都需要拆分,任务的目的如果不同,谈拆分和效率没啥意义。
  3. IO 操作不占用 cpu,只是我们一般拷贝文件使用的是【阻塞 IO】,这时相当于线程虽然不用 cpu,但需要一直等待 IO 结束,没能充分利用线程。所以才有后面的【非阻塞 IO】和【异步 IO】优化

三、Java线程

1. 创建和运行线程

  • 方法一,直接使用 Thread
// 创建线程对象
Thread t = new Thread() {
public void run() {
// 要执行的任务
}
};
// 启动线程
t.start();
  • 方法二,使用 Runnable 配合 Thread
    把【线程 Thread】和【任务 Runnable】(要执行的代码)分开
Runnable runnable = new Runnable() {
public void run(){
// 要执行的任务
	}
};
// 创建线程对象
Thread t = new Thread( runnable );
// 启动线程
t.start();
  • 方法三,FutureTask 配合 Thread
    FutureTask 能够接收 Callable 类型的参数,用来处理有返回结果的情况。
// 创建任务对象
FutureTask<Integer> task3 = new FutureTask<>(() -> {
	log.debug("hello");
	return 100;
});
// 参数1 是任务对象; 参数2 是线程名字,推荐
new Thread(task3, "t3").start();
// 主线程阻塞,同步等待 task 执行完毕的结果
Integer result = task3.get();
log.debug("结果是:{}", result);

FutureTask类的继承关系

public class FutureTask<V> implements RunnableFuture<V>
public interface RunnableFuture<V> extends Runnable, Future<V> 
public interface Runnable
public interface Future<V> 

2. 线程运行原理

  • 我们都知道 JVM 中由堆、栈、方法区所组成,其中栈内存是给谁用的呢?其实就是线程,每个线程启动后,虚拟机就会为其分配一块栈内存。
    每个栈由多个栈帧(Frame)组成,对应着每次方法调用时所占用的内存
    每个线程只能有一个活动栈帧,对应着当前正在执行的那个方法
  • 线程上下文切换(Thread Context Switch)
    因为以下一些原因导致 cpu 不再执行当前的线程,转而执行另一个线程的代码
    1.线程的 cpu 时间片用完
    2.垃圾回收
    3.有更高优先级的线程需要运行
    4.线程自己调用了 sleep、yield、wait、join、park、synchronized、lock 等方法
    当 Context Switch 发生时,需要由操作系统保存当前线程的状态,并恢复另一个线程的状态,Java 中对应的概念就是程序计数器(Program Counter Register),它的作用是记住下一条 jvm 指令的执行地址,是线程私有的
    1.状态包括程序计数器、虚拟机栈中每个栈帧的信息,如局部变量、操作数栈、返回地址等
    2.Context Switch 频繁发生会影响性能

3. 常见方法



4. 常用方法详解及异同区分

  1. start 与 run
    直接调用 run 是在主线程中执行了 run,没有启动新的线程
    使用 start 是启动新的线程,通过新的线程间接执行 run 中的代码
  2. sleep 与 yield

    sleep会使得线程让出CPU处于阻塞状态,调度器不会给它分配时间片。
    yield是让出CPU处于就绪状态,此时CPU仍让会分配给它时间片,只是当前这个时间片被用于处理其它线程的任务了。
  3. join 方法可以传入一个时间,实现有时效的等待(注意,如果线程已经结束,但是等待时间还没到,那也会结束等待,所以等待时间只是一个上线值而已,并不是非要等待这么久
  4. interrupt 方法详解
    打断 sleep,wait,join 的线程,会清空打断状态,即isInterrupted为false
    打断正常运行的线程,不会清空打断状态,即isInterrupted为true
    打断 park 线程, 也不会清空打断状态。

5. 两阶段终止模式(使得线程优雅的退出)



模式实现一

class TPTInterrupt {
        private Thread thread;

        public void start() {
            thread = new Thread(() -> {
                while (true) {
                    Thread current = Thread.currentThread();
                    if (current.isInterrupted()) {
                        log.debug("料理后事");
                        break;
                    }
                    try {
                        Thread.sleep(1000);
                        log.debug("将结果保存");
                    } catch (InterruptedException e) {
                        current.interrupt();//因为打断睡眠中的线程,会清楚打断标记,此处再打断一下,添加打断标记
                    }
// 执行监控操作
                }
            }, "监控线程");
            thread.start();
        }

        public void stop() {
            thread.interrupt();
        }
    }

模式实现二

class TPTVolatile {
        private Thread thread;
        private volatile boolean stop = false;

        public void start() {
            thread = new Thread(() -> {
                while (true) {
                    Thread current = Thread.currentThread();
                    if (stop) {
                        log.debug("料理后事");
                        break;
                    }
                    try {
                        Thread.sleep(1000);
                        log.debug("将结果保存");
                    } catch (InterruptedException e) {
                    }
// 执行监控操作
                }
            }, "监控线程");
            thread.start();
        }

        public void stop() {
            stop = true;
            thread.interrupt();
        }
    }

6.主线程与守护线程

默认情况下,Java 进程需要等待所有线程都运行结束,才会结束。有一种特殊的线程叫做守护线程,只要其它非守护线程运行结束了,即使守护线程的代码没有执行完,也会强制结束。

垃圾回收器线程就是一种守护线程

7. 线程五种状态


8. 线程六种状态

这是从 Java API 层面来描述的
根据 Thread.State 枚举,分为六种状态

四、共享模型之锁(管程/Monitor)

4.1 synchronized原理

4.1.1. Java 对象头

以 32 位虚拟机为例


4.1.2. Monitor 原理

Monitor 被翻译为监视器或管程
每个 Java 对象都可以关联一个 Monitor 对象,如果使用 synchronized 给对象上锁(重量级)之后,该对象头的Mark Word中就被设置指向 Monitor 对象的指针

Monitor 结构如下

4.1.3. synchronized 原理



4.1.4. synchronized 原理进阶

1. 轻量级锁

轻量级锁的使用场景:如果一个对象虽然有多线程要加锁加锁的时间是错开的(也就是没有竞争),那么可以使用轻量级锁来优化。
轻量级锁对使用者是透明的,即语法仍然是 synchronized,即由synchronized内部优化。

两个重要概念:1. 锁记录:它存在每个线程的栈帧中,它记录了指向锁对象的地址(用于找到锁对象),并且记录了锁记录(本身)的地址和状态码(用于找到锁记录);2. 每个线程的某个方法给对象上锁的时候,会尝试用cas替换锁对象头中的Mark Word,将其存入锁记录。

假设有两个方法同步块,利用同一个对象加锁

static final Object obj = new Object();
public static void method1() {
	synchronized( obj ) {
		// 同步块 A
		method2();
	}
}
public static void method2() {
	synchronized( obj ) {
		// 同步块 B
	}
}






2. 锁膨胀(重量级锁)

如果在尝试加轻量级锁的过程中,CAS 操作无法成功,这时一种情况就是有其它线程为此对象加上了轻量级锁(有竞争),这时需要进行锁膨胀,将轻量级锁变为重量级锁。

3. 重量级锁的自旋优化

自旋需要使用CPU,因此自旋只有在多核的情况下才有意义。

重量级锁竞争的时候,还可以使用自旋来进行优化,如果当前线程自旋成功(即这时候持锁线程已经退出了同步块,释放了锁),这时当前线程就可以避免阻塞。

  • 自旋重试成功的情况(等了一会儿等到锁了)
  • 自旋重试失败的情况(等了很久还是等不到锁)

4. 偏向锁


每次cas操作,都需要将锁记录中的地址信息与对象头中的信息进行比较,尝试交换,这会耗费一定的性能。因此要避免这种同一个线程自己重复加锁,即锁重入的情况。

static final Object obj = new Object();
public static void m1() {
	synchronized( obj ) {
		// 同步块 A
		m2();
	}
}
public static void m2() {
	synchronized( obj ) {
		// 同步块 B
		m3();
	}
}
public static void m3() {
	synchronized( obj ) {
		// 同步块 C
	}
}
  • 轻量级锁处理流程
  • 偏向锁处理流程
  • 偏向状态
    64位系统对象头

  • 撤销偏向锁的情况
    1.调用对象 hashCode
    调用了对象的 hashCode,但偏向锁的对象 MarkWord 中存储的是线程 id,如果调用 hashCode 会导致偏向锁被撤销(因为线程id占了54位,hashcode占了31位,一共85位大于对象头的64位空间,导致装不下,只能撤销)。
    轻量级锁会在锁记录中记录 hashCode,所以不会出现装不下的情况
    重量级锁会在 Monitor 中记录 hashCode,所以不会出现装不下的情况
    2.其它线程使用对象
    当有其它线程使用偏向锁对象时,会将偏向锁升级为轻量级锁
    3.调用 wait/notify
    重力级锁有的概念,因此肯定会升级为重量级锁
  • 批量重偏向
  • 批量撤销
    当撤销偏向锁阈值超过 40 次后,jvm 会这样觉得,自己确实偏向错了,根本就不该偏向。于是整个类的所有对象都会变为不可偏向的,新建的对象也是不可偏向的

5. 锁消除

jvm没有检查到锁的对象会共享,那么就不必加锁,将锁消除

6. 锁粗化

对相同对象多次加锁,导致线程发生多次重入,可以使用锁粗化方式来优化,这不同于之前讲的细分锁的粒度。

4.2 wait notify 原理

4.2.1. wait notify使用的正确姿势

synchronized(lock) {
	while(条件不成立) {
	lock.wait();
	}
	// 干活
}
//另一个线程
synchronized(lock) {
	lock.notifyAll();
}

4.2.2. 保护性暂停模式

即 Guarded Suspension,用在一个线程等待另一个线程的执行结果

要点
1.有一个结果需要从一个线程传递到另一个线程,让他们关联同一个 GuardedObject
2.如果有结果不断从一个线程到另一个线程那么可以使用消息队列(见生产者/消费者)
3.JDK 中,join 的实现、Future 的实现,采用的就是此模式
4.因为要等待另一方的结果,因此归类到同步模式

  • 不带超时的实现
class GuardedObject {
	private Object response;
	private final Object lock = new Object();
	public Object get() {
		synchronized (lock) {
		// 条件不满足则等待
		while (response == null) {
			try {
			lock.wait();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
		return response;
	}
}
public void complete(Object response) {
	synchronized (lock) {
		// 条件满足,通知等待线程
		this.response = response;
		lock.notifyAll();
	}
}
  • 应用
 public static void main(String[] args) {
        GuardedObject guardedObject = new GuardedObject();
        new Thread(() -> {
            try {
                List<String> response = download();
                log.debug("download complete...");
                guardedObject.complete(response);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }).start();

        log.debug("waiting...");
        Object response = guardedObject.get();
        log.debug("get response: [{}] lines", ((List<String>) response).size());

    }
  • 带超时的实现
class GuardedObjectV2 {

    private Object response;
    private final Object lock = new Object();

    public Object get(long millis) {
        synchronized (lock) {
            // 1) 记录最初时间
            long last = System.currentTimeMillis();
            // 2) 已经经历的时间
            long timePassed = 0;
            while (response == null) {
                // 4) 假设 millis 是 1000,结果在 400 时唤醒了,那么还有 600 要等
                long waitTime = millis - timePassed;
                log.debug("waitTime: {}", waitTime);
                if (waitTime <= 0) {
                    log.debug("break...");
                    break;
                }
                try {
                    lock.wait(waitTime);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                // 3) 如果提前被唤醒,这时已经经历的时间假设为 400
                timePassed = System.currentTimeMillis() - last;
                log.debug("timePassed: {}, object is null {}", timePassed, response == null);
            }
            return response;
        }
    }

    public void complete(Object response) {
        synchronized (lock) {
            // 条件满足,通知等待线程
            this.response = response;
            log.debug("notify...");
            lock.notifyAll();
        }
    }
}
  • 应用
 public static void main(String[] args) {
        GuardedObjectV2 v2 = new GuardedObjectV2();
        new Thread(() -> {
            sleep(1);
            v2.complete(null);
            sleep(1);
            v2.complete(Arrays.asList("a", "b", "c"));
        }).start();

        Object response = v2.get(2500);
        if (response != null) {
            log.debug("get response: [{}] lines", ((List<String>) response).size());
        } else {
            log.debug("can't get response");
        }
    }

4.2.3. 生产者消费者模式





public static void main(String[] args) {
        MessageQueue messageQueue = new MessageQueue(2);
        for (int i = 0; i < 4; i++) {
            int id = i;
            new Thread(() -> {
                try {
                    log.debug("download...");
                    List<String> response = Downloader.download();
                    log.debug("try put message({})", id);
                    messageQueue.put(new Message(id, response));
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }, "生产者" + i).start();
        }

        new Thread(() -> {
            while (true) {
                Message message = messageQueue.take();
                List<String> response = (List<String>) message.getMessage();
                log.debug("take message({}): [{}] lines", message.getId(), response.size());
            }

        }, "消费者").start();
    }

4.3 Park & Unpark使用及原理

4.3.1. 基本使用




可以精确的唤醒某个线程
park & unpark 可以先 unpark,而wait & notify 不能先 notify

4.3.2. 原理




4.4 线程状态转换再理解


同一状态多条线,代表不同的方法达到的状态转换效果




4.5 多把锁

即将锁的粒度细分

  • 好处,是可以增强并发度
  • 坏处,如果一个线程需要同时获得多把锁,就容易发生死锁

4.6 活跃性

线程没有按预期结束,执行不下去的情况,归类为【活跃性】问题,除了死锁以外,还有活锁和饥饿锁两种情况

4.6.1. 死锁

有这样的情况:一个线程需要同时获取多把锁,这时就容易发生死锁。
如:t1 线程 获得 A对象 锁,接下来想获取 B对象的锁;而 t2 线程 获得 B对象 锁,接下来想获取 A对象的锁,这就会发生死锁。

检测死锁可以使用 jconsole工具,或者使用 jps 定位进程 id,再用 jstack 定位死锁

4.6.2. 活锁

活锁出现在两个线程互相改变对方的结束条件,最后谁也无法结束

4.6.3. 饥饿锁

很多教程中把饥饿定义为,一个线程由于优先级太低,始终得不到 CPU 调度执行,也不能够结束。

4.7 ReentrantLock

4.7.1. 可重入

可重入是指同一个线程如果首次获得了这把锁,那么因为它是这把锁的拥有者,因此有权利再次获取这把锁如果是不可重入锁,那么第二次获得锁时,自己也会被锁挡住。
示例:

4.7.2. 可打断

拥有锁的线程,可以主动打断尝试获取该锁的线程,让他不要再等下去了,等不到的。一定程度上也可以防止死锁的发生。(舔狗恋爱者)



注意如果是不可中断模式(即不是方法lockInterruptibly()),那么即使使用了 interrupt 也不会让等待中断

4.7.3. 锁超时

自己主动放弃等一个锁,而不是被动的由其它线程执行打断操作(非舔狗恋爱者)。

  • 立刻放弃
    即尝试拿锁,一旦拿不到,就立刻放弃。


  • 等待一定的时间再放弃
    即,尝试拿锁,如果拿不到,那我就等一定的时间,如果超过了等待的时间,那我就不等了,返回。

4.7.4. 公平锁

ReentrantLock 默认是不公平的。
ReentrantLock lock = new ReentrantLock(false);启动非公平锁
ReentrantLock lock = new ReentrantLock(true);启动公平锁
公平锁一般没有必要,会降低并发度。

4.7.5. 条件变量





五、同步模式之顺序控制

5.1. 固定运行顺序

比如,必须先 2 后 1 打印

  • wait notify 版

  • Park Unpark 版

5.2. 交替输出

线程 1 输出 a 5 次,线程 2 输出 b 5 次,线程 3 输出 c 5 次。现在要求输出 abcabcabcabcabc 怎么实现?

  • wait notify 版

  • ReentrantLock 条件变量版
public static void main(String[] args) {
        AwaitSignal as=new AwaitSignal(5);
        Condition aWaitSet = as.newCondition();
        Condition bWaitSet = as.newCondition();
        Condition cWaitSet = as.newCondition();
        new Thread(()->{
            as.print("a",aWaitSet,bWaitSet);
        }).start();
        new Thread(()->{
            as.print("b",bWaitSet,cWaitSet);
        }).start();
        new Thread(()->{
            as.print("c",cWaitSet,aWaitSet);
        }).start();
        as.start(aWaitSet);
    }
static class AwaitSignal extends ReentrantLock{
        // 循环次数
        private int loopNumber;
        public AwaitSignal(int loopNumber) {
            this.loopNumber = loopNumber;
        }

        /**
         *
         * @param str 待打印内容
         * @param current 待唤醒的休息区
         * @param next 下一个待唤醒的休息区
         */
        public void print(String str, Condition current,Condition next)
        {
            for (int i = 0; i < loopNumber; i++) {
                this.lock();//上锁
                try {
                    current.await();//当前线程去休息区等待(等待的时候,释放锁)
                    System.out.print(str);
                    next.signal();//唤醒下一个休息区中的线程
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                finally {
                    this.unlock();//解锁
                }
            }
        }

        //唤醒第一个休息区中的线程
        public void start(Condition first)
        {
            this.lock();
            try {
                first.signal();
            }
            finally {
                this.unlock();
            }
        }
    }
  • Park Unpark 版
 public static void main(String[] args) {
        SyncPark syncPark = new SyncPark(5);
        Thread t1 = new Thread(() -> {
            syncPark.print("a");
        });
        Thread t2 = new Thread(() -> {
            syncPark.print("b");
        });
        Thread t3 = new Thread(() -> {
            syncPark.print("c\n");
        });
        syncPark.setThreads(t1, t2, t3);
        syncPark.start();
    }
static class SyncPark {
        private int loopNumber;
        private Thread[] threads;
        public SyncPark(int loopNumber) {
            this.loopNumber = loopNumber;
        }
        public void setThreads(Thread... threads) {
            this.threads = threads;
        }
        public void print(String str) {
            for (int i = 0; i < loopNumber; i++) {
                LockSupport.park();
                System.out.print(str);
                LockSupport.unpark(nextThread());
            }
        }
        private Thread nextThread() {
            Thread current = Thread.currentThread();
            int index = 0;
            for (int i = 0; i < threads.length; i++) {
                if(threads[i] == current) {
                    index = i;
                    break;
                }
            }
            if(index < threads.length - 1) {
                return threads[index+1];
            } else {
                return threads[0];
            }
        }
        public void start() {
            for (Thread thread : threads) {
                thread.start();//线程全部休息
            }
            LockSupport.unpark(threads[0]);//指定唤醒线程
        }
    }

六、共享模型之内存

6.0. Java 内存模型(JMM)

6.1. 可见性






注意:前面例子体现的实际就是可见性,它保证的是在多个线程之间,一个线程对 volatile 变量的修改对另一个线程可见, 不能保证原子性仅用在一个写线程,多个读线程的情况

synchronized 语句块既可以保证代码块的原子性,也同时保证代码块内变量的可见性。但缺点是synchronized 是属于重量级操作,性能相对更低

6.2. CPU 缓存结构原理

6.2.1. CPU 缓存结构


各级缓存速度比较

6.2.2. CPU 缓存读


6.2.3. CPU 缓存一致性


6.2.4. 内存屏障

Memory Barrier(Memory Fence)

6.3. 有序性




解决方法
volatile 修饰的变量,可以禁用指令重排

6.3.1 指令级并行原理

  • 为什么需要 指令级并行?
    举个例子,加工一条鱼需要 50 分钟,只能一条鱼、一条鱼顺序加工…。
    但是,如果我们把加工流程细分为 5 个步骤:
    去鳞清洗 10分钟
    蒸煮沥水 10分钟
    加注汤料 10分钟
    杀菌出锅 10分钟
    真空封罐 10分钟
  • 这就引出了指令重排序优化
    事实上,现代处理器会设计为一个时钟周期完成一条执行时间最长的 CPU 指令。为什么这么做呢?可以想到指令还可以再划分成一个个更小的阶段,例如,每条指令都可以分为: 取指令 - 指令译码 - 执行指令 - 内存访问 - 数据写回 这 5 个阶段。
    在不改变程序结果的前提下,这些指令的各个阶段可以通过重排序和组合来实现指令级并行,这一技术在 80’s 中叶到 90’s 中叶占据了计算架构的重要地位。

6.4. volatile原理



6.4.1 double-checked locking 问题





6.4.2 double-checked locking 解决



6.5. 同步模式之 Balking


6.6. 线程安全单例实现

  • 饿汉式实现一

    问题1:防止类被继承,破坏单例
    问题3:构造函数设置私有是防止别的对象无限创建,破坏单例;但并不能防止反射创建新的实例
    问题4:没有线程安全问题,因为静态成员变量的初始化工作是在类加载阶段完成,类加载阶段由JVM保证其线程安全。
    问题5:1. 用方法可以更好的封装性2. 创建单例时有更多的控制3. 还可以支持泛型等等
  • 饿汉式实现二

    问题2:没有。因为反编译可以发现,枚举的成员就是Singleton类中的一个静态成员变量而已,而静态成员变量的初始化在类加载阶段完成,由JVM保证其线程安全。
    问题3:不能
    问题4:不能
    问题5:饿汉式,也是在类加载阶段完成初始化
    问题6:和普通类一样操作即可
  • 懒汉式实现三
  • 懒汉式实现四
    (double-checked locking)对实现3的改进,将锁的内容变少了,不需要重复加锁。

    问题3:为了防止首次多个线程同时竞争锁,然后竞争锁成功的线程完成对象创建后,释放了锁,这时EntryList中阻塞的线程就会获得锁,然后再尝试去创建对象了,但添加了判断,就可以防止重复创建对象。
  • 懒汉式实五

    问题1:属于懒汉式的,因为类加载本身就是懒惰式的,只有第一次用到时才会触发类加载操作。如果只是用了外面的Singleton,并没有调用getInstance方法,那么并不会触发内部类LazyHolder的类加载操作的,因此内部的初始化操作也没有执行。
    问题2:不会有并发问题,因为静态成员变量,是在类加载阶段初始化,由JVM保证它的线程安全

七、共享模型之无锁(无锁并发)

7.1. CAS 与 volatile

private AtomicInteger balance;


其中的关键是 compareAndSet,它的简称就是 CAS (也有 Compare And Swap 的说法),它必须是原子操作。

注意:其实 CAS 的底层是lock cmpxchg指令(X86 架构),在单核 CPU 和多核 CPU 下都能够保证【比较-交换】的原子性。

在多核状态下,某个核执行到带 lock 的指令时,CPU 会让总线锁住,当这个核把此指令执行完毕,再开启总线。这个过程中不会被线程的调度机制所打断,保证了多个线程对内存操作的准确性,是原子的。

需要用volatile的解释:

  1. 获取共享变量时,为了保证该变量的可见性,需要使用 volatile 修饰。
  2. 它可以用来修饰成员变量和静态成员变量,他可以避免线程从自己的工作缓存中查找变量的值,必须到主存中获取它的值,线程操作 volatile 变量都是直接操作主存。即一个线程对 volatile 变量的修改,对另一个线程可见。
  3. volatile 仅仅保证了共享变量的可见性,让其它线程能够看到最新值,但不能解决指令交错问题(不能保证原子性)
  4. CAS 必须借助 volatile 才能读取到共享变量的最新值来实现【比较并交换】的效果

7.2. 为什么无锁效率高

7.3. CAS 的特点

7.4. 原子整数

J.U.C (java.util.concurrent)并发包提供了:
AtomicBoolean
AtomicInteger
AtomicLong

7.5. 原子引用

AtomicReference;不考虑版本号。会出现ABA问题
AtomicStampedReference;考虑版本号,也可记录中间修改过程。
AtomicMarkableReference;只关心是否更改过,不关系引用变量更改了几次

  • 例如:使用 CAS安全实现balance的减法操作

  • ABA 问题及解决 AtomicStampedReference




输出结果:

  • AtomicMarkableReference
    如下案例,主人只关心垃圾袋空不空,只要是空那就不还垃圾袋,而不管它是不是之前被倒空过。




7.6. 原子数组

AtomicIntegerArray
AtomicLongArray
AtomicReferenceArray

7.7. 字段更新器

AtomicReferenceFieldUpdater // 域 字段
AtomicIntegerFieldUpdater
AtomicLongFieldUpdater
利用字段更新器,可以针对对象的某个域(Field)进行原子操作,只能配合 volatile 修饰的字段使用,否则会出现异常

7.8. 原子累加器

。。。

7.9. Unsafe(自定义原子类)

Unsafe 对象提供了非常底层的,操作内存、线程的方法,Unsafe 对象不能直接调用,只能通过反射获得。

CAS和LockSupport的park,unpark方法底层都是基于Unsafe

  • 获得Unsafe对象
  • Unsafe CAS 操作
  • 自定义原子类

八、变量的线程安全分析

8.1. 成员变量和静态变量是否线程安全?

8.2. 局部变量是否线程安全?


局部变量引用对象线程不安全示例:




将 list 修改为局部变量解决以上代码的线程不安全问题



因此方法修饰符使用不当,也会造成线程不安全问题。

8.3. 常见线程安全类

  • 线程安全类方法的组合
    分析下面代码是否线程安全?
Hashtable table = new Hashtable();
	// 线程1,线程2
	if( table.get("key") == null) {
		table.put("key", value);
	}


很明显单个方法get和put都是线程安全的,但是两个方法组合并不能保证其线程安全。

  • 不可变类线程安全性
    String、Integer 等都是不可变类,因为其内部的状态不可以改变,因此它们的方法都是线程安全的
    有同学或许有疑问,String 有 replace,substring 等方法【可以】改变值啊,那么这些方法又是如何保证线程安全的呢?
    因为这些修改的方法并没有改变原始字符串的属性,而是又创建了一个新的字符串去改变的。

九、共享模型之不可变

如果一个对象在不能够修改其内部状态(属性),那么它就是线程安全的,因为不存在并发修改啊!
不可变对象,实际是另一种避免竞争的方式。

9.1. 不可变设计

另一个大家更为熟悉的 String 类也是不可变的,以它为例,说明一下不可变设计的要素

9.2. 保护性拷贝

但有同学会说,使用字符串时,也有一些跟修改相关的方法啊,比如 substring 等,那么下面就看一看这些方法是如何实现的,就以 substring 为例:

发现其内部是调用 String 的构造方法创建了一个新字符串,再进入这个构造看看,是否对 final char[] value 做出了修改

结果发现也没有,构造新字符串对象时,会生成新的 char[] value,对内容进行复制 。这种通过创建副本对象来避免共享的手段称之为【保护性拷贝(defensive copy)】

9.3. 享元模式

9.3.1 定义: 英文名称:Flyweight pattern. 当需要重用数量有限的同一类对象时

9.3.2 应用

  • 包装类

  • String 串池
  • BigDecimal BigInteger
  • 利用享元模式自定义可重复使用的数据库连接池


9.4. final 原理

9.4.1 设置 final 变量的原理

9.4.2 获取 final 变量的原理

加了final,会把变量的值复制到本地方法栈中,然后从栈中读取数据,而如果不加final,就会去堆中读取数据,这个效率比不上直接在栈中读取数据的效率。

9.5. 无状态

在 web 阶段学习时,设计 Servlet 时为了保证其线程安全,都会有这样的建议,不要为 Servlet 设置成员变量,这种没有任何成员变量的类是线程安全的

因为成员变量保存的数据也可以称为状态信息,因此没有成员变量就称之为【无状态】

十、共享模型之工具

10.1. 自定义线程池

  • 思想

    线程池中新建指定核心数的线程个数,如果任务个数大于线程池中的线程个数,那么新来的任务到阻塞队列中去排队等待,如果阻塞队列也满了,那就要采取一定的“拒绝策略”如死等阻塞队列中有空位、带超时的等待、放弃这个任务、把这个任务交给自己执行等等,因为有很多中选择,我们可以把拒绝策略的选择权交给调用者,因此定义一个拒绝策略的函数式(只有一个待实现的函数)接口,由用户传入拒绝策略。

  • 实现

  1. 自定义阻塞队列类
@Slf4j(topic = "c.BlockingQueue")
class BlockingQueue<T> {
    // 1. 任务队列
    private Deque<T> queue = new ArrayDeque<>();

    // 2. 锁
    private ReentrantLock lock = new ReentrantLock();

    // 3. 生产者条件变量
    private Condition fullWaitSet = lock.newCondition();

    // 4. 消费者条件变量
    private Condition emptyWaitSet = lock.newCondition();

    // 5. 容量
    private int capcity;

    public BlockingQueue(int capcity) {
        this.capcity = capcity;
    }

    // 带超时阻塞获取
    public T poll(long timeout, TimeUnit unit) {
        lock.lock();
        try {
            // 将 timeout 统一转换为 纳秒
            long nanos = unit.toNanos(timeout);
            while (queue.isEmpty()) {
                try {
                    // 返回值是剩余时间
                    if (nanos <= 0) {
                        return null;
                    }
                    nanos = emptyWaitSet.awaitNanos(nanos);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            T t = queue.removeFirst();
            fullWaitSet.signal();
            return t;
        } finally {
            lock.unlock();
        }
    }

    // 阻塞获取
    public T take() {
        lock.lock();
        try {
            while (queue.isEmpty()) {
                try {
                    emptyWaitSet.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            T t = queue.removeFirst();
            fullWaitSet.signal();
            return t;
        } finally {
            lock.unlock();
        }
    }

    // 阻塞添加
    public void put(T task) {
        lock.lock();
        try {
            while (queue.size() == capcity) {
                try {
                    log.debug("等待加入任务队列 {} ...", task);
                    fullWaitSet.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            log.debug("加入任务队列 {}", task);
            queue.addLast(task);
            emptyWaitSet.signal();
        } finally {
            lock.unlock();
        }
    }

    // 带超时时间阻塞添加
    public boolean offer(T task, long timeout, TimeUnit timeUnit) {
        lock.lock();
        try {
            long nanos = timeUnit.toNanos(timeout);
            while (queue.size() == capcity) {
                try {
                    if(nanos <= 0) {
                        return false;
                    }
                    log.debug("等待加入任务队列 {} ...", task);
                    nanos = fullWaitSet.awaitNanos(nanos);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            log.debug("加入任务队列 {}", task);
            queue.addLast(task);
            emptyWaitSet.signal();
            return true;
        } finally {
            lock.unlock();
        }
    }

    public int size() {
        lock.lock();
        try {
            return queue.size();
        } finally {
            lock.unlock();
        }
    }

    public void tryPut(RejectPolicy<T> rejectPolicy, T task) {
        lock.lock();
        try {
            // 判断队列是否满
            if(queue.size() == capcity) {
                rejectPolicy.reject(this, task);
            } else {  // 有空闲
                log.debug("加入任务队列 {}", task);
                queue.addLast(task);
                emptyWaitSet.signal();
            }
        } finally {
            lock.unlock();
        }
    }
}
  1. 自定义线程池类
@Slf4j(topic = "c.ThreadPool")
class ThreadPool {
    // 任务队列
    private BlockingQueue<Runnable> taskQueue;

    // 线程集合
    private HashSet<Worker> workers = new HashSet<>();

    // 核心线程数
    private int coreSize;

    // 获取任务时的超时时间
    private long timeout;

    private TimeUnit timeUnit;

    private RejectPolicy<Runnable> rejectPolicy;

    // 执行任务
    public void execute(Runnable task) {
        // 当任务数没有超过 coreSize 时,直接交给 worker 对象执行
        // 如果任务数超过 coreSize 时,加入任务队列暂存
        synchronized (workers) {
            if(workers.size() < coreSize) {
                Worker worker = new Worker(task);
                log.debug("新增 worker{}, {}", worker, task);
                workers.add(worker);
                worker.start();
            } else {
//                taskQueue.put(task);
                // 1) 死等
                // 2) 带超时等待
                // 3) 让调用者放弃任务执行
                // 4) 让调用者抛出异常
                // 5) 让调用者自己执行任务
                taskQueue.tryPut(rejectPolicy, task);
            }
        }
    }

    public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueCapcity, RejectPolicy<Runnable> rejectPolicy) {
        this.coreSize = coreSize;
        this.timeout = timeout;
        this.timeUnit = timeUnit;
        this.taskQueue = new BlockingQueue<>(queueCapcity);
        this.rejectPolicy = rejectPolicy;
    }
	//线程单元内部类
    class Worker extends Thread{
        private Runnable task;

        public Worker(Runnable task) {
            this.task = task;
        }

        @Override
        public void run() {
            // 执行任务
            // 1) 当 task 不为空,执行任务
            // 2) 当 task 执行完毕,再接着从任务队列获取任务并执行
//            while(task != null || (task = taskQueue.take()) != null) {
            while(task != null || (task = taskQueue.poll(timeout, timeUnit)) != null) {
                try {
                    log.debug("正在执行...{}", task);
                    task.run();
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    task = null;
                }
            }
            synchronized (workers) {
                log.debug("worker 被移除{}", this);
                workers.remove(this);
            }
        }
    }
  1. 拒绝策略
@FunctionalInterface // 拒绝策略
interface RejectPolicy<T> {
    void reject(BlockingQueue<T> queue, T task);
}
  • 应用
 public static void main(String[] args) {
        ThreadPool threadPool = new ThreadPool(1,
                1000, TimeUnit.MILLISECONDS, 1, (queue, task)->{
            // 1. 死等
//            queue.put(task);
            // 2) 带超时等待
//            queue.offer(task, 1500, TimeUnit.MILLISECONDS);
            // 3) 让调用者放弃任务执行
//            log.debug("放弃{}", task);
            // 4) 让调用者抛出异常
//            throw new RuntimeException("任务执行失败 " + task);
            // 5) 让调用者自己执行任务
            task.run();
        });
        for (int i = 0; i < 4; i++) {
            int j = i;
            threadPool.execute(() -> {
                try {
                    Thread.sleep(1000L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                log.debug("{}", j);
            });
        }
    }

10.2. ThreadPoolExecutor

10.2.1. 线程池状态


状态相关变量

	private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

    // Packing and unpacking ctl
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    private static int ctlOf(int rs, int wc) { return rs | wc; }

从源码也可以看出,ThreadPoolExecutor的实现思路和我们上面的自定义线程池差不多,也是由阻塞队列以及ReentrantLock锁来实现。(面试问题:讲讲lock在JDK中的应用?而这个线程池实现类就是Lock在JDK中的一个应用哦)

10.2.2.构造方法

 public ThreadPoolExecutor(int corePoolSize,//线程池中存在的最多线程,之所以叫核心,我理解核心线程数目应该是和CPU的核数相对应。
                              int maximumPoolSize,//最大线程数目=核心线程数+救急线程数
                              long keepAliveTime,//控制救急线程的生存时间,如果达到这个时间,救急线程还是用不上,那就关闭这个线程,需要用的时候再开启。
                              TimeUnit unit,//将上面存活时间转换为统一单位的工具
                              BlockingQueue<Runnable> workQueue,//如果核心线程都在执行任务,那么再来任务,暂时没空执行,就放到阻塞队列中
                              ThreadFactory threadFactory,//创建线程,并为线程起一个名字
                              RejectedExecutionHandler handler//如果所有线程都没空处理这个任务,并且阻塞队列也满了,救急线程也在忙,那么这个拒绝策略就是用户选择以何种方式去处理这个任务(比如死等其它线程空闲,或者阻塞队列有位置;放弃这个任务;自己执行等等)) 


工作流程




10.2.3. newFixedThreadPool(固定个数线程池)

10.2.4.newCachedThreadPool(缓存线程池,线程个数不限)



SynchronousQueue特点验证:

@Slf4j(topic = "c.TestSynchronousQueue")
public class TestSynchronousQueue {
    public static void main(String[] args) {
        SynchronousQueue<Integer> integers = new SynchronousQueue<>();
        new Thread(() -> {
            try {
                log.debug("putting {} ", 1);
                integers.put(1);
                log.debug("{} putted...", 1);

                log.debug("putting...{} ", 2);
                integers.put(2);
                log.debug("{} putted...", 2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },"t1").start();

        sleep(1);

        new Thread(() -> {
            try {
                log.debug("taking {}", 1);
                integers.take();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },"t2").start();

        sleep(1);

        new Thread(() -> {
            try {
                log.debug("taking {}", 2);
                integers.take();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },"t3").start();
    }
}

10.2.5.newSingleThreadExecutor(单个线程的线程池)


10.2.6.线程池提交任务方法



10.2.7. 线程池关闭方法



其它相关方法

10.2.8.ScheduledExecutorService(任务调度线程池)

  • Timer实现


  • ScheduledExecutorService实现
    schedule方法【延时执行任务】
    scheduleAtFixedRate方法【延时,定时执行任务】


scheduleWithFixedDelay方法【延时,定时执行任务】

10.2.9. 正确处理执行任务异常

  • 方法1:主动捉异常(try catch)
  • 方法2:使用 Future

10.3. Worker Thread(工作线程)模式

10.3.1. 定义


10.3.2. 饥饿(线程分工不明确导致的)


示例:



如果只有一位客人,那没问题

但如果有两位客人,就会出现“饥饿”问题

解决办法:

@Slf4j(topic = "c.TestDeadLock")
public class TestStarvation {

    static final List<String> MENU = Arrays.asList("地三鲜", "宫保鸡丁", "辣子鸡丁", "烤鸡翅");
    static Random RANDOM = new Random();
    static String cooking() {
        return MENU.get(RANDOM.nextInt(MENU.size()));
    }
    public static void main(String[] args) {
        ExecutorService waiterPool = Executors.newFixedThreadPool(1);
        ExecutorService cookPool = Executors.newFixedThreadPool(1);

        waiterPool.execute(() -> {
            log.debug("处理点餐...");
            Future<String> f = cookPool.submit(() -> {
                log.debug("做菜");
                return cooking();
            });
            try {
                log.debug("上菜: {}", f.get());
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        });
        waiterPool.execute(() -> {
            log.debug("处理点餐...");
            Future<String> f = cookPool.submit(() -> {
                log.debug("做菜");
                return cooking();
            });
            try {
                log.debug("上菜: {}", f.get());
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        });

    }
}

10.3.3. 创建多少线程合适之CPU 密集型运算


10.3.4. 创建多少线程合适之I/O 密集型运算

10.4. AQS 原理

10.4.1. 概述


10.4.2. 基于AQS(抽象队列同步框架)实现不可重入锁

1.继承AbstractQueuedSynchronizer(AQS)实现自定义的同步器

// 独占锁  同步器类
    class MySync extends AbstractQueuedSynchronizer {
        @Override
        protected boolean tryAcquire(int arg) {
            if(compareAndSetState(0, 1)) {
                // 加上了锁,并设置 owner 为当前线程
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        @Override
        protected boolean tryRelease(int arg) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        @Override // 是否持有独占锁
        protected boolean isHeldExclusively() {
            return getState() == 1;
        }

        public Condition newCondition() {
            return new ConditionObject();
        }
    }

2.基于自定义同步器,集成Lock接口实现自定义锁对象(Lock接口中各个方法由AQS实现)

// 自定义锁(不可重入锁)
class MyLock implements Lock {
    //自定义锁
    private MySync sync = new MySync();

    @Override // 加锁(不成功会进入等待队列)
    public void lock() {
        sync.acquire(1);
    }

    @Override // 加锁,可打断
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }

    @Override // 尝试加锁(一次)
    public boolean tryLock() {
        return sync.tryAcquire(1);
    }

    @Override // 尝试加锁,带超时
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(time));
    }

    @Override // 解锁
    public void unlock() {
        sync.release(1);
    }

    @Override // 创建条件变量
    public Condition newCondition() {
        return sync.newCondition();
    }
}

10.4.3. AQS总结







CLH 好处:
1.无锁,使用自旋
2.快速,无阻塞

10.4.4. 主要用到 AQS 的并发工具类

10.5. ReentrantLock 原理

10.5.1. 非公平锁实现原理

1 构造函数分析

public ReentrantLock() {
        sync = new NonfairSync();//非公平同步器
    }
  • 默认是非公平锁的实现
  • 分析同步器类的关系
static final class NonfairSync extends Sync 
static final class FairSync extends Sync 
abstract static class Sync extends AbstractQueuedSynchronizer 
  • 可以看到Sync同步器和我们上面的自定义不可重入锁的实现一致,继承自AQS,只是这个Sync是一个抽象实现,后续NonfairSync 和FairSync 分别继承实现了Sync,NonfairSync 是非公平同步器,对应于非公平锁的实现,FairSync 是公平同步器,对应于公平锁的实现。

2 分析一下非公平锁的加锁实现

   public void lock() {
        sync.lock();
    }
  final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }
public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))//addWaiter创建节点对象
            selfInterrupt();
    }

可以看到加锁方式和我们上面自定义不可重入锁逻辑一样,先利用cas尝试将status改为1,如果成功则将当前线程设置为Owner线程,否则,会再去尝试一次,如果尝试成功,则获得锁,否则创建节点加入等待队列。

3 没有竞争时非公平锁的状态

4 有竞争时非公平锁的状态

首次尝试加锁失败,进入acquire逻辑

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))//addWaiter创建节点对象
            selfInterrupt();
    }



 final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }



private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

5 分析一下解锁过程

  public void unlock() {
        sync.release(1);
    }
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
 protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);//释放锁
            }
            setState(c);//设置状态
            return free;
        }



private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);//解锁
    }


10.5.2. 非公平锁可重入原理

整体思路:
加锁时,如果当前线程已经获得锁,那么让状态自增acquires(一般取值为1)
解锁时,首先将状态自减acquires,如果状态不等于0,则解锁失败,否则解锁成功

  final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            // 如果已经获得了锁, 线程还是当前线程, 表示发生了锁重入
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

        protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            // 支持锁重入, 只有 state 减为 0, 才释放成功
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }

10.5.3. 可打断原理

  • 不可打断模式
    整体思路:
    在此模式下,即使它被打断,仍会驻留在 AQS 队列中,一直要等到获得锁后方能得知自己被打断了

源码:

final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }
public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();//执行打断
    }
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    //还是需要获得锁后, 才能返回打断状态
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    //虽然打断了,但是并没有返回,仍然在循环中
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    private final boolean parkAndCheckInterrupt() {
        // 如果打断标记已经是 true, 则 park 会失效
        LockSupport.park(this);
        // interrupted 会清除打断标记
        return Thread.interrupted();
    }
    static void selfInterrupt() {
        Thread.currentThread().interrupt();
    }
  • 可打断模式
    整体思路
    一旦检测到线程被打断,那么立即抛出异常,终止循环,执行打断,不必等到获得锁之后再打断

源码

  public final void acquireInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (!tryAcquire(arg))
            doAcquireInterruptibly(arg);
    }
private void doAcquireInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    // 在 park 过程中如果被 interrupt 会进入此
					// 这时候抛出异常, 而不会再次进入 for (;;)
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

10.5.4. 公平锁实现原理

源码分析:公平锁会在一个线程加锁前判断队列是否有前驱节点,如果有前驱节点,且当前线程并不是前驱节点,则不能去竞争锁。

 protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                /// 先检查 AQS 队列中是否有前驱节点, 没有才去竞争
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
public final boolean hasQueuedPredecessors() {
        // The correctness of this depends on head being initialized
        // before tail and on head.next being accurate if the current
        // thread is first in queue.
        Node t = tail; // Read fields in reverse initialization order
        Node h = head;
        Node s;
        return h != t &&
            // (s = h.next) == null 表示队列中还有没有老二
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }

10.5.5. 条件变量实现原理

每个条件变量其实就对应着一个等待队列,其实现类是 ConditionObject

  • await 流程
 public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            Node node = addConditionWaiter();//向 ConditionObject类的等候队列中添加节点
            int savedState = fullyRelease(node);//释放锁,并唤醒AQS队列中的线程
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);//自己park住,处于阻塞状态
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }
  private Node addConditionWaiter() {
            Node t = lastWaiter;
            // If lastWaiter is cancelled, clean out.
            if (t != null && t.waitStatus != Node.CONDITION) {
                unlinkCancelledWaiters();
                t = lastWaiter;
            }
            Node node = new Node(Thread.currentThread(), Node.CONDITION);//Node.CONDITION=2
            if (t == null)
                firstWaiter = node;
            else
                t.nextWaiter = node;
            lastWaiter = node;
            return node;
        }

final int fullyRelease(Node node) {
        boolean failed = true;
        try {
            int savedState = getState();
            if (release(savedState)) {//解决锁重入的问题,把重入锁的状态直接减到0
                failed = false;
                return savedState;
            } else {
                throw new IllegalMonitorStateException();
            }
        } finally {
            if (failed)
                node.waitStatus = Node.CANCELLED;
        }
    }

fullyRelease的同时,unpark AQS 队列中的下一个节点,竞争锁,假设没有其他竞争线程,那么 Thread-1 竞争成功

public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);//释放锁
            return true;
        }
        return false;
    }


  • signal 流程
 public final void signal() {
            if (!isHeldExclusively())//检查当前是否是锁的持有者
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                doSignal(first);//唤醒阻塞队列中的第一个线程节点
        }
  private void doSignal(Node first) {
            do {
                //将待唤醒的线程从阻塞队列中移除
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                first.nextWaiter = null;
            } while (!transferForSignal(first) &&//将待唤醒的线程节点转移到AQS队列中去竞争锁,有可能失败(比如超时,或是被打断,也就是这个线程不去竞争锁了,那就没必要放到AQS队列中了)
                     (first = firstWaiter) != null);
        }


 final boolean transferForSignal(Node node) {
        /*
         * If cannot change waitStatus, the node has been cancelled.
         */
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))//设置待唤醒线程节点的状态为0
            return false;

        /*
         * Splice onto queue and try to set waitStatus of predecessor to
         * indicate that thread is (probably) waiting. If cancelled or
         * attempt to set waitStatus fails, wake up to resync (in which
         * case the waitStatus can be transiently and harmlessly wrong).
         */
        Node p = enq(node);//将待唤醒线程的节点加入AQS队列
        int ws = p.waitStatus;
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))//将待唤醒节点的前驱节点的状态改为-1,Node.SIGNAL=-1
            LockSupport.unpark(node.thread);//待唤醒线程节点解锁
        return true;
    }

10.6. 读写锁(ReentrantReadWriteLock)

当读操作远远高于写操作时,这时候使用 读写锁 让 读-读 可以并发,提高性能。 类似于数据库中的 select …from … lock in share mode
即加读锁能使得多线程并发读取共享资源(不互斥);加写锁使得只能有一个线程写这个资源(互斥)
读锁-读锁 并发;
读锁-写锁 相互阻塞;
写锁-写锁 相互阻塞;


10.6.1. 读写锁的应用之缓存

  1. 场景:查询数据库的时候,将sql语句的查询结果存入缓存,下次再查询相同sql语句时,直接从缓存中取数据即可。
  2. 初步实现:分析可知多线程情况下,先更新数据库再清缓存较好


  • 基于读写锁的多线程安全实现


10.6.2. 读写锁原理

读写锁用的是同一个 Sycn 同步器,因此等待队列、state 等也是同一个

  • 场景一:t1线程加写锁,t2线程加读锁
  • 源码解读
public void lock() {
            sync.acquire(1);
        }
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
protected final boolean tryAcquire(int acquires) {
            /*
             * Walkthrough:
             * 1. If read count nonzero or write count nonzero
             *    and owner is a different thread, fail.
             * 2. If count would saturate, fail. (This can only
             *    happen if count is already nonzero.)
             * 3. Otherwise, this thread is eligible for lock if
             *    it is either a reentrant acquire or
             *    queue policy allows it. If so, update state
             *    and set owner.
             */
            Thread current = Thread.currentThread();
            int c = getState();
            int w = exclusiveCount(c);
            //c!=0代表此时有其它线程上锁了(但不能判断是加了读锁还是写锁)
            if (c != 0) {
                // (Note: if c != 0 and w == 0 then shared count != 0)
                //w=0代表不是写锁,那说明加的是读锁,否则判断当前线程是不是获得锁的线程,若不是,加锁失败
                if (w == 0 || current != getExclusiveOwnerThread())
                    return false;
                //异常处理
                if (w + exclusiveCount(acquires) > MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                // Reentrant acquire
                //加写锁
                setState(c + acquires);
                return true;
            }
            //cas加写锁,非公平锁情况下,writerShouldBlock()一直为false
            if (writerShouldBlock() ||
                !compareAndSetState(c, c + acquires))
                return false;
            setExclusiveOwnerThread(current);
            return true;
        }




  • 源码解读
public void lock() {
            sync.acquireShared(1);
        }
   public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }
 protected final int tryAcquireShared(int unused) {
            /*
             * Walkthrough:
             * 1. If write lock held by another thread, fail.
             * 2. Otherwise, this thread is eligible for
             *    lock wrt state, so ask if it should block
             *    because of queue policy. If not, try
             *    to grant by CASing state and updating count.
             *    Note that step does not check for reentrant
             *    acquires, which is postponed to full version
             *    to avoid having to check hold count in
             *    the more typical non-reentrant case.
             * 3. If step 2 fails either because thread
             *    apparently not eligible or CAS fails or count
             *    saturated, chain to version with full retry loop.
             */
            Thread current = Thread.currentThread();
            int c = getState();
            //加了写锁,并且不是本线程加的写锁,则加锁失败
            if (exclusiveCount(c) != 0 &&
                getExclusiveOwnerThread() != current)
                return -1;
            int r = sharedCount(c);
            if (!readerShouldBlock() &&
                r < MAX_COUNT &&
                compareAndSetState(c, c + SHARED_UNIT)) {
                if (r == 0) {
                    firstReader = current;
                    firstReaderHoldCount = 1;
                } else if (firstReader == current) {
                    firstReaderHoldCount++;
                } else {
                    HoldCounter rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current))
                        cachedHoldCounter = rh = readHolds.get();
                    else if (rh.count == 0)
                        readHolds.set(rh);
                    rh.count++;
                }
                return 1;
            }
            return fullTryAcquireShared(current);
        }
private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED);//读锁节点
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
   private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }
  • 场景二:t3加读锁,t4加写锁
  • 场景三:t1线程释放写锁
public void unlock() {
            sync.release(1);
        }
public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
 protected final boolean tryRelease(int releases) {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            int nextc = getState() - releases;
            boolean free = exclusiveCount(nextc) == 0;
            if (free)
                setExclusiveOwnerThread(null);
            setState(nextc);
            return free;
        }

private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }
 private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);//尝试加锁,设置读锁状态加1
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);//t2原本所在节点设置为头结点
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())//此处之前park线程2,unpark之后从这开始执行
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
 protected final int tryAcquireShared(int unused) {
            /*
             * Walkthrough:
             * 1. If write lock held by another thread, fail.
             * 2. Otherwise, this thread is eligible for
             *    lock wrt state, so ask if it should block
             *    because of queue policy. If not, try
             *    to grant by CASing state and updating count.
             *    Note that step does not check for reentrant
             *    acquires, which is postponed to full version
             *    to avoid having to check hold count in
             *    the more typical non-reentrant case.
             * 3. If step 2 fails either because thread
             *    apparently not eligible or CAS fails or count
             *    saturated, chain to version with full retry loop.
             */
            Thread current = Thread.currentThread();
            int c = getState();
            //判断是否写锁占有锁,此场景下写锁t1线程已释放,故跳过
            if (exclusiveCount(c) != 0 &&
                getExclusiveOwnerThread() != current)
                return -1;
            int r = sharedCount(c);//拿到共享锁的状态
            if (!readerShouldBlock() &&
                r < MAX_COUNT &&
                compareAndSetState(c, c + SHARED_UNIT)) {//cas尝试将共享锁的状态加1(状态变量高位加1)
                if (r == 0) {
                    firstReader = current;
                    firstReaderHoldCount = 1;
                } else if (firstReader == current) {
                    firstReaderHoldCount++;
                } else {
                    HoldCounter rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current))
                        cachedHoldCounter = rh = readHolds.get();
                    else if (rh.count == 0)
                        readHolds.set(rh);
                    rh.count++;
                }
                return 1;
            }
            return fullTryAcquireShared(current);
        }


private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        setHead(node);
        /*
         * Try to signal next queued node if:
         *   Propagation was indicated by caller,
         *     or was recorded (as h.waitStatus either before
         *     or after setHead) by a previous operation
         *     (note: this uses sign-check of waitStatus because
         *      PROPAGATE status may transition to SIGNAL.)
         * and
         *   The next node is waiting in shared mode,
         *     or we don't know, because it appears null
         *
         * The conservatism in both of these checks may cause
         * unnecessary wake-ups, but only when there are multiple
         * racing acquires/releases, so most need signals now or soon
         * anyway.
         */
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())//判断下一个节点(此场景下是t3线程),若是共享锁(读锁),则继续释放该共享锁
                doReleaseShared();
        }
    }

读锁不会阻塞的原因来了:被唤醒的负责继续唤醒下一个读锁线程

private void doReleaseShared() {
        /*
         * Ensure that a release propagates, even if there are other
         * in-progress acquires/releases.  This proceeds in the usual
         * way of trying to unparkSuccessor of head if it needs
         * signal. But if it does not, status is set to PROPAGATE to
         * ensure that upon release, propagation continues.
         * Additionally, we must loop in case a new node is added
         * while we are doing this. Also, unlike other uses of
         * unparkSuccessor, we need to know if CAS to reset status
         * fails, if so rechecking.
         */
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))//首先尝试将头结点的唤醒状态设置为0,如果是-1,有可能会导致其它线程抢先换线t3,因此设置为0,避免其它线程的影响
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);//唤醒该线程(本场景是t3线程)
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }

  • 场景四:线程t2和t3释放读锁

  public void unlock() {
            sync.releaseShared(1);
        }
public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
 protected final boolean tryReleaseShared(int unused) {
            Thread current = Thread.currentThread();
            if (firstReader == current) {
                // assert firstReaderHoldCount > 0;
                if (firstReaderHoldCount == 1)
                    firstReader = null;
                else
                    firstReaderHoldCount--;
            } else {
                HoldCounter rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    rh = readHolds.get();
                int count = rh.count;
                if (count <= 1) {
                    readHolds.remove();
                    if (count <= 0)
                        throw unmatchedUnlockException();
                }
                --rh.count;
            }
            for (;;) {
                int c = getState();
                int nextc = c - SHARED_UNIT;//让共享锁的状态减1
                if (compareAndSetState(c, nextc))
                    // Releasing the read lock has no effect on readers,
                    // but it may allow waiting writers to proceed if
                    // both read and write locks are now free.
                    return nextc == 0;//释放t2线程时,还有一个t3线程,此时返回false
            }
        }

 private void doReleaseShared() {
        /*
         * Ensure that a release propagates, even if there are other
         * in-progress acquires/releases.  This proceeds in the usual
         * way of trying to unparkSuccessor of head if it needs
         * signal. But if it does not, status is set to PROPAGATE to
         * ensure that upon release, propagation continues.
         * Additionally, we must loop in case a new node is added
         * while we are doing this. Also, unlike other uses of
         * unparkSuccessor, we need to know if CAS to reset status
         * fails, if so rechecking.
         */
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);//唤醒头结点,此场景下是t4线程
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }

  final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {//获取写锁
                    setHead(node);//修改头结点
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())//park线程位置,t4线程唤醒后,从这个位置继续运行
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
protected final boolean tryAcquire(int acquires) {
            /*
             * Walkthrough:
             * 1. If read count nonzero or write count nonzero
             *    and owner is a different thread, fail.
             * 2. If count would saturate, fail. (This can only
             *    happen if count is already nonzero.)
             * 3. Otherwise, this thread is eligible for lock if
             *    it is either a reentrant acquire or
             *    queue policy allows it. If so, update state
             *    and set owner.
             */
            Thread current = Thread.currentThread();
            int c = getState();
            int w = exclusiveCount(c);
            if (c != 0) {
                // (Note: if c != 0 and w == 0 then shared count != 0)
                if (w == 0 || current != getExclusiveOwnerThread())
                    return false;
                if (w + exclusiveCount(acquires) > MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                // Reentrant acquire
                setState(c + acquires);
                return true;
            }
            if (writerShouldBlock() ||
                !compareAndSetState(c, c + acquires))//设置锁状态
                return false;
            setExclusiveOwnerThread(current);//设置owner线程为当前线程
            return true;
        }

10.7. stampedlock介绍

是对于读写锁的进一步优化来提升性能?如何对读写锁再优化呢?答案是读写锁还是加锁了的,那么加锁的效率怎么都比不上不加锁的效率。因此 Semaphore锁就是一种先尝试不加锁的方法实现,若失败再升级锁(类似Syncronized锁优化的那一套)。 == stampedlock锁用“时间戳”的概念代替锁==

stampedlock锁的缺点:
1.不支持条件变量
1.不支持锁重入
所以 stampedlock锁并不能完全取代ReentrantReadWriteLock

10.8. Semaphore

概念:信号量,用来限制能同时访问共享资源的线程上限。


以上输出可以看出Semaphore可以多共享资源的线程个数进行限制。

更多推荐

Java并发编程入门这一篇就够了(文章很长,但很好哦)