总结学习了JUC的相关内容,包括线程池,AQS,内存模型。
线程状态转换
- 新建:创建后尚未启动
- 可运行:包含运行和就绪,也就是可能在运行也可能在等待CPU时间片
- 阻塞:多线程同步操作的场景,比如等待另一个线程的释放锁
- 无限期等待:等待其它线程显式地唤醒,否则不会被分配 CPU 时间片
- 限期等待:无需等待其它线程显式地唤醒,在一定时间之后会被系统自动唤醒。
- 死亡:run方法执行完毕正常死亡;没有捕获异常而意外死亡
Java线程的实现方式
- 实现Runnable接口,实现run方法
- 实现Callable接口,实现call方法
- 继承Thread类,重写run方法
其中Callable可以有返回值,通过FutureTask封装。
public static void main(String[] args) throws ExecutionException, InterruptedException {
FutureTask<Integer> ft = new FutureTask<Integer>(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
Thread.sleep(2000);
return 1;
}
});
Thread thread = new Thread(ft);
thread.start();
System.out.println(ft.get());
}
线程池
jdk提供的线程池包含四种,通过Executors工厂类创建
- newFixedThreadPool:指定线程数的线程池,corePoolSize == maxiPoolSize,阻塞队列为LinkedBlockingQuene
- newCachedThreadPool:可以缓存线程的线程池,线程数可达2147483647,阻塞队列为SynchronousQueue
- newSingleThreadExecutor:只有一个线程的线程池,阻塞队列为LinkedBlockingQuene
- newScheduledThreadPool:在指定的时间内周期性的执行所提交的任务,在实际的业务场景中可以使用该线程池定期的同步数据
参数意义:
- corePoolSize:核心线程数
- maxPoolSize:最大线程数
- keepAliveTime:线程存活时间(在corePore与maxPoolSize之间的情况下有用)
- timeUnit:存活时间的时间单位
- workQueue:阻塞队列(用来保存等待被执行的任务)
- threadFactory:线程工厂,主要用来创建线程;
- handler:表示当拒绝处理任务时的策略
阻塞队列:
- ArrayBlockingQueue:有界队列,基于数组结构的有界阻塞队列,按FIFO排序任务;
- inkedBlockingQuene:无界队列,基于链表结构的阻塞队列,按FIFO排序任务
- SynchronousQuene:直接提交,一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态
- PriorityBlockingQuene:具有优先级的无界阻塞队列;
饱和策略,当没有空闲工作线程,阻塞队列满了,处理新任务的策略。
- 丢弃任务并抛出RejectedExecutionException异常
- 丢弃任务,但是不抛出异常
- 丢弃队列最前面的任务,然后重新尝试执行任务
- 由调用线程处理该任务
线程池状态:
- RUNNING:会接收新任务,并处理阻塞队列中的任务
- SHUTDOWN:不会接收新任务,但会处理阻塞队列中的任务
- STOP:不会接收新任务,不处理阻塞队列的任务,会中断正在运行的任务
- TIDYING:线程池对线程进行整理优化
- TERMINATED:线程池停止工作
向线程池提交任务:
- Executor.execute(Runnable command);
- ExecutorService.submit(Callable
task);
execute实现:
- 通过workCountof获得当前线程池中的线程数,如果小于corePoolSize,则通过addWorker创建线程并执行该任务;否则放入阻塞队列
- 加入到阻塞队列后,如果线程池是非RUNNING状态,则将该任务从队列溢出,然后reject;如果是RUNNING状态,则检查线程池是否有空闲线程,如果有则执行
- 如果不能加入到阻塞队列,说明阻塞队列已满;通过addWorker(第二个参数为false)尝试创建新线程执行该任务,如果addWorker失败说明,线程池已满,reject该任务。
sumbit实现:
会将提交的Callable任务封装成一个FutureTask对象,FutureTask类实现了Runnable接口,这样就可以通过Executor.execute()提交FutureTask到线程池中等待被执行,最终执行的是FutureTask的run方法。submit方法可以返回持有计算结果的Future对象。
总结:
线程池中的核心线程数,当提交一个任务时,线程池创建一个新线程执行任务,直到当前线程数等于corePoolSize;如果当前线程数为corePoolSize,继续提交的任务被保存到阻塞队列中,等待被执行;如果阻塞队列满了,那就创建新的线程执行当前任务;直到线程池中的线程数达到maxPoolSize,之后的任务通过reject()处理。
中断
可以通过调用一个线程的interrupt来中断线程,但是并不会保证成功:
- 对于通过sleep或者wait阻塞的线程,会抛出InterruptedException,从而中断线程
- 对于IO阻塞和synchronized锁阻塞不会对interrupt做出反应
- 对于正常的无限循环的线程也不会中断,但是可以通过interrupted()判断当前线程是否被中断,从而做出判断结束线程
对于线程池,可以通过以下方法中断:
- Executor的shutdown方法会在线程执行完毕后再关闭
- Executor的shutdownNow则会调用每个线程的interrupt方法
- 如果像只中断一个线程,可以通过submit提交线程,会返回Futrue对象,通过Future的cancel(true)中断线程
线程间协作
join
一个线程中调用另一个线程的join(),当前线程会挂起,知道目标线程结束。
wait/notify/notifyAll
是Object的方法。 调用wait方法使线程挂起,当其他线程满足条件使,会调用notify方法唤醒挂起的方法。其中notify会随机唤醒一个挂起的线程,notifyAll会唤醒所有线程。 这些方法必须在synchronized方法或控制块中,并且调用wait的对象与synchronized锁的对象要是同一个,因为调用wait方法后,线程会释放锁,所以之前一定要有改锁。
await/signal/signalAll
是JUC提供的实现线程之间的协调方法,使用上与wait类似。通过Lock获得Condition类,通过condition对象调用await以及singal
sleep和wait的区别
- wait是Object方法,sleep是Thread的静态方法
- wait会释放锁,sleep不会
互斥同步
Java提供了两种锁机制控制多线程对资源的共享访问,第一个是JVM实现的synchronized,第二个是JDK中JUC包的ReetrantLock。
synchronized
作为关键字可以同步一个代码块,也可以写在函数名前同步一个函数。
- 对象锁
synchronized后接一个对象实例或者synchronized作用于普通函数都属于对象锁,只能作用于同一个对象。
synchronized (this) {
}
public synchronized void foo(){
}
- 类锁
synchronized后接一个class类或者synchronized作用于静态函数属于类锁,同一个类的不同对象也可以同步。
synchronized (Example.class) {
}
public synchronized static void foo(){
}
ReentrantLock
通过Lock对象的lock()和unlock()方法加锁或解锁
Lock lock = new ReentrantLock();
lock.lock();
lock.unlock();
比较
- 实现
synchronized是JVM实现的,而ReentrantLock是JDK实现的。
synchronized同步代码块是通过使用monitorenter和monitorexit指令实现,同步方法则是在Class文件的方法表中将该方法的access_flags字段中的synchronized标志位置。 synchronized用的锁是存在Java对象头里的,对象头主要包括两部分数据:Mark Word(标记字段)、Klass Pointer(类型指针),其中Mark包含偏向锁和锁标志。monitor则是线程私有的数据结构,每一个线程都有一个可用monitor record列表,每一个被锁住的对象都会和一个monitor关联。
ReentrantLock是通过CAS实现的。
- 性能
经过JVM优化后,两种锁的性能相当
- ReentrantLock其他高级功能
- 正在等待锁的线程可以放弃等待,ReentrantLock可中断
- 公平锁是指多个线程在等待同一个锁时,必须按照申请锁的时间顺序来依次获得锁。synchronized和ReentrantLock都是非公平锁,但是ReentrantLock可以设置为公平锁。
- ReentrantLock 可以同时绑定多个 Condition 对象
使用场景
除非需要使用ReentrantLock多个条件等高级功能,应该使用synchronized,synchronized由JVM实现,经过优化后性能相当,并且不用担心没有释放锁而导致死锁。
锁优化
JVM对synchronized做了一系列的优化。
- 自旋锁:自旋锁的思想是在请求共享数据时执行忙循环(自旋),以求在较短的时间中获得锁,并且避免进入阻塞状态。但是这会占用CPU时间,所以使用与锁定状态很短的场景,并且应该限定自旋次数。
- 锁消除:JVM检测出不可能存在竞争的共享数据的锁进行消除。一般通过逃逸分析来支持,也就是如果堆上的数据不会逃逸被其他线程访问,就可以当成线程私有数据,并消除锁
- 锁粗化:一系列操作对同一个对象反复加锁是浪费,可以把锁的范围扩展到整个操作序列的外部,只需要加解锁一次即可
- 轻量级锁:在对象头的mark word中标记锁的状态,分别为无锁状态,偏向锁状态,轻量级锁状态和重量级锁状态。轻量级锁使用CAS操作进行同步,CAS失败了再使用互斥同步。
- 偏向锁:偏向锁的思想是偏向第一个获取锁对象的线程,并且之后获取锁不再需要同步。当锁第一次被线程获取时,进入偏向状态,该线程后续不需要同步,直到有其他线程尝试获取锁的时候,结束偏向锁状态。
JUC组件
AQS基础
AQS是指AbstractQueuedSynchronizer,AQS是JUC的核心。
AQS维护了一个volatile int state状态(代表共享资源)和一个FIFO线程等待队列(多线程争用资源被阻塞时会进入此队列)。 state的方法方式有三种:
- getState()
- setState()
- compareAndSetState()
AQS定义两种资源共享方式:Exclusive独占和Share共享,不同的自定义同步器只需要实现共享资源state的获取与释放方式即可,等待队列的维护,AQS已经实现:
- isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它。
- tryAcquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false。
- tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false。
- tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
- tryReleaseShared(int):共享方式。尝试释放资源,成功则返回true,失败则返回false。
AQS实现
acquire(int)
是独占模式下线程获取共享资源的顶层入口,是lock()语义:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
- 调用自定义同步器的tryAcquire()尝试直接去获取资源,如果成功则直接返回
- 否则addWaiter()将该线程加入等待队列的尾部,并标记为独占模式;
- acquireQueued()使线程在等待队列中休息,有机会时(轮到自己,会被unpark())会去尝试获取资源。获取到资源后才返回。如果在整个等待过程中被中断过,则返回true,否则返回false。
- 获取资源后才再进行自我中断selfInterrupt(),将中断补上。
release(int)
与acquire是相反的操作。
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
会释放指定量的资源,如果彻底释放了(即state=0),它会唤醒等待队列里的其他线程来获取资源。
CountdownLatch
用来控制一个线程等待多个线程。维护了一个计数器 cnt,每次调用 countDown() 方法会让计数器的值减 1,减到 0 的时候,那些因为调用 await() 方法而在等待的线程就会被唤醒。
public static void main(String[] args) throws InterruptedException {
final int totalThread = 2;
CountDownLatch countDownLatch = new CountDownLatch(totalThread);
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < totalThread; i++) {
executorService.execute(() -> {
System.out.print("run..");
countDownLatch.countDown();
});
}
countDownLatch.await();
System.out.println("end");
executorService.shutdown();
}
run..run..end
CyclicBarrier
用来控制多个线程互相等待,只有当多个线程都到达时,这些线程才会继续执行。 通过维护计数器来实现的。线程执行 await() 方法之后计数器会减 1,并进行等待,直到计数器为 0,所有调用 awati() 方法而在等待的线程才能继续执行。 CyclicBarrier 的计数器通过调用 reset() 方法可以循环使用,所以它才叫做循环屏障。
public static void main(String[] args) {
final int totalThread = 2;
CyclicBarrier cyclicBarrier = new CyclicBarrier(totalThread);
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < totalThread; i++) {
executorService.execute(() -> {
System.out.print("run..");
try {
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
System.out.print("end ");
});
}
executorService.shutdown();
}
run..run..end end
Semaphore
Semaphore 就是操作系统中的信号量,可以控制对互斥资源的访问线程数。
并发编程三个概念与Java内存模型
内存模型
主内存和工作内存
处理器上的寄存器的读写的速度比内存快几个数量级,为了解决这种速度矛盾,在它们之间加入了高速缓存。 加入高速缓存带来了一个新的问题:缓存一致性。如果多个缓存共享同一块主内存区域,那么多个缓存的数据可能会不一致,需要一些协议来解决这个问题。 所有的变量都存储在主内存中,每个线程还有自己的工作内存,工作内存存储在高速缓存或者寄存器中,保存了该线程使用的变量的主内存副本拷贝。 线程只能直接操作工作内存中的变量,不同线程之间的变量值传递需要通过主内存来完成。
内存间相互操作
Java内存模型定义了8个原子操作来完成内存和工作内存的交互操作。
- read:把一个变量的值从主内存传输到工作内存中
- load:在 read 之后执行,把 read 得到的值放入工作内存的变量副本中
- use:把工作内存中一个变量的值传递给执行引擎
- assign:把一个从执行引擎接收到的值赋给工作内存的变量
- store:把工作内存的一个变量的值传送到主内存中
- write:在 store 之后执行,把 store 得到的值放入主内存的变量中
- lock:作用于主内存的变量,把一个变量标识为一条线程独占状态
- unlock:作用于主内存变量,把一个处于锁定状态的变量释放出来,释放后的变量才可以被其他线程锁定
原子性
即一个操作或者多个操作 要么全部执行并且执行的过程不会被任何因素打断,要么就都不执行。
x = 10; //语句1
y = x; //语句2
x++; //语句3
x = x + 1; //语句4
在Java中只有对基本数据类型的读取和赋值是原子性操作,也就是只有语句1是原子性的,要想保证其他语句的原子性,需要synchronized和Lock实现。
可见性
可见性是指当多个线程访问同一个变量时,一个线程修改了这个变量的值,其他线程能够立即看得到修改的值。 Java提供了volatile关键字来保证可见性。当一个共享变量被volatile修饰时,它会保证修改的值会立即被更新到主存,当有其他线程需要读取时,它会去内存中读取新值。通过synchronized和Lock也可以实现可见性。
volatile详解
- volatile关键字的两层语义
- 保证了不同线程对这个变量进行操作时的可见性,即一个线程修改了某个变量的值,这新值对其他线程来说是立即可见的
- 禁止进行指令重排序
- volatile保证原子性? 不可以,比如多线程自增操作,可能就会出现线程1读取了值,但是之后被阻塞,线程2读取原值自增(由于线程1还没有修改值,不会导致线程2的工作内存缓存无效),最后两次自增操作只增加了1。如下所述jdk提供了原子操作类通过volatile和cas机制实现原子性操作。
- volatile保证有序性? 因为禁止了指令重排,也就是不能把访问volatile变量的语句放在其前面或后面的语句执行,一定程度上保证有序性。
- volatile的原理和实现机制
加入volatile关键字时,会多一个lock前缀指令,lock前缀指令实际相当于一个内存屏障:
- 它确保指令重排序时不会把其后面的指令排到内存屏障之前的位置,也不会把前面的指令排到内存屏障的后面
- 它会强制将对缓存的修改操作立即写入主存
- 如果是写操作,它会导致其他CPU中对应的缓存行无效
- 使用volatile的场景
synchronized关键字是防止多个线程同时执行一段代码,那么就会很影响程序执行效率,而volatile关键字在某些情况下性能要优于synchronized,但是要注意volatile关键字是无法替代synchronized关键字的,因为volatile关键字无法保证操作的原子性。通常来说,使用volatile必须具备以下2个条件:1)对变量的写操作不依赖于当前值。2)该变量没有包含在具有其他变量的不变式中。也就是保证操作是原子性的。
- 状态标记量:
``` java
volatile boolean inited = false;
//线程1:
context = loadContext();
inited = true;
//线程2: while(!inited ){ sleep() } doSomethingwithconfig(context);
* double check: ``` java class Singleton{ private volatile static Singleton instance = null; private Singleton() { } public static Singleton getInstance() { if(instance==null) { synchronized (Singleton.class) { if(instance==null) instance = new Singleton(); } } return instance; } }
- 状态标记量:
``` java
volatile boolean inited = false;
//线程1:
context = loadContext();
有序性
即程序执行的顺序按照代码的先后顺序执行。处理器为了提高程序运行效率,可能会对输入代码进行优化,它不保证程序中各个语句的执行先后顺序同代码中的顺序一致,但是它会保证程序最终执行结果和代码顺序执行的结果是一致的。但是在多线程并发执行的发生指令重排可能影响执行的正确性。
Java中可以通过volatile保证一定的有序性
,也可以通过加锁来保证有序性。
Java内存模型具备一些先天的有序性
,即先行发生原则(happens-before):
- 程序次序规则:一个线程内,按照代码顺序,书写在前面的操作先行发生于书写在后面的操作
- 锁定规则:一个unLock操作先行发生于后面对同一个锁额lock操作
- volatile变量规则:对一个变量的写操作先行发生于后面对这个变量的读操作
- 传递规则:如果操作A先行发生于操作B,而操作B又先行发生于操作C,则可以得出操作A先行发生于操作C
- 线程启动规则:Thread对象的start()方法先行发生于此线程的每个一个动作
- 线程中断规则:对线程interrupt()方法的调用先行发生于被中断线程的代码检测到中断事件的发生
- 线程终结规则:线程中所有的操作都先行发生于线程的终止检测,我们可以通过Thread.join()方法结束、Thread.isAlive()的返回值手段检测到线程已经终止执行
- 对象终结规则:一个对象的初始化完成先行发生于他的finalize()方法的开始
CAS
CAS的全称为Compare-And-Swap,它是一条CPU并发原语。它的功能是判断内存某个位置的值是否为预期值,如果是则更改为新的值,这个过程是原子的。CAS并发原语体现在JAVA语言中就是sun.misc.Unsafe类中的各个方法。调用UnSafe类中的CAS方法,JVM会帮我们实现出CAS汇编指令。
当需要同步的操作粒度很细时,使用synchronized是不高效的,这时就有CAS存在的意义了。比如对于i++这种并发计数功能,使用synchronized就大材小用了,而使用CAS来实现就会更加的轻量级,性能更好。因此可以看到java.util.concurrent.atomic包中有类似AtomicInteger这种类,类中包含如下方法。
private volatile int value;
public final boolean compareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}
public final int incrementAndGet() {
for (;;) {
int current = get();
int next = current + 1;
if (compareAndSet(current, next))
return next;
}
}
在上面的代码中Unsafe类负责执行CAS并发原语,由JVM转化为汇编。在代码中使用CAS自旋volatile变量的形式实现非阻塞并发,volatile保证了变量了线程之间的可见性。这种方式是CAS的主要使用方式。CAS操作包含三个操作数——内存位置(V)、预期原值(A)和新值(B)。如果内存位置的值与预期原值相匹配,那么处理器会自动将该位置值更新为新值。否则,处理器不做任何操作。无论哪种情况,它都会在CAS指令之前返回该位置的值。CAS有效地说明了“我认为位置V应该包含值A;如果包含该值,则将B放到这个位置;否则,不要更改该位置。无论如何,告诉我原值。
CAS是乐观锁,是一种冲突重试的机制,在并发竞争不是很激烈的情况下(也是大多数情况),他的性能要好于基于锁的并发性能。因为并发竞争激烈的话,冲突重试的过程很多。而synchronized是一种独占锁,是一种悲观锁,会导致其它所有需要锁的线程挂起,等待持有锁的线程释放锁
CAS的典型步骤:
while(true){
int A=V.get();//mark1
if(A==V.compareAndSwap(A,newA))//mark2
return A;
}
在上面代码的mark1行,首先获取V的值为A,然后在mark2行会重新判断V的值是否还是A。现实中有这样的情况:在mark1行之后mark2行之前,V的值可能从A变成B又变成A。这个时候,现有的代码仍然认为V的值没有改变,而有些情况下,我们需要识别V的这种改变。这就是ABA问题。
解决ABA问题的一种思路就是在CAS操作的时候更新两个值,包括引用和该值的版本号,JUC中提供了这个类AtomicStampedReferance以及AtomicMarkedReference。这两个类支持在两个变量上执行CAS操作,用于解决ABA问题。