Java 并发编程总结

核心理论

  • 共享性
  • 互斥性
  • 原子性
  • 可见性
  • 有序性

锁种类:

  • 偏向锁 是一种编译解释锁。如果代码中不可能出现多线程并发争抢同一个锁的时候,JVM 编译代码,解释执行的时候,会自动的放弃同步信息。消除 synchronized 的同步代码结果。使用锁标记的形式记录锁状态。在 Monitor 中有变量 ACC_SYNCHRONIZED。当变量值使用的时候, 代表偏向锁锁定。可以避免锁的争抢和锁池状态的维护。提高效率。轻量级锁是为了在线程交替执行同步块时提高性能,而偏向锁则是在只有一个线程执行同步块时进一步提高性能
  • 轻量级锁 过渡锁。当偏向锁不满足,也就是有多线程并发访问,锁定同一个对象的时候,先提升为轻量级锁。也是使用标记 ACC_SYNCHRONIZED 标记记录的ACC_UNSYNCHRONIZED 标记记录未获取到锁信息的线程。就是只有两个线程争抢锁标记的时候,优先使用轻量级锁。 “轻量级”是相对于使用操作系统互斥量来实现的传统锁而言的
  • 重量级锁 重量级锁依赖于操作系统的互斥量(mutex) 实现。 会导致进程从用户态与内核态之间的切换, 是一个开销较大的操作
  • 自旋锁 是一个过渡锁,是偏向锁和轻量级锁的过渡。当获取锁的过程中,未获取到。为了提高效率,JVM 自动执行若干次空循环,再次申请锁,而不是进入阻塞状态的情况。称为自旋锁。自旋锁提高效率就是避免线程状态的变更

其他优化:

  • 适应性自旋(Adaptive Spinning) 简单来说就是线程如果自旋成功了,则下次自旋的次数会更多,如果自旋失败了,则自旋的次数就会减少
  • 锁粗化(Lock Coarsening):就是将多次连接在一起的加锁、解锁操作合并为一次,将多个连续的锁扩展成一个范围更大的锁
  • 锁消除(Lock Elimination)锁消除即删除不必要的加锁操作。根据代码逃逸技术,如果判断到一段代码中,堆上的数据不会逃逸出当前线程,那么可以认为这段代码是线程安全的,不必要加锁。
优点 缺点 适用场景
偏向锁 加锁和解锁不需要额外的消耗,和执行非同步方法比仅存在纳秒级的差距。 如果线程间存在锁竞争,会带来额外的锁撤销的消耗。 适用于只有一个线程访问同步块场景
轻量级锁 竞争的线程不会阻塞,提高了程序的响应速度 如果始终得不到锁竞争的线程使用自旋会消耗CPU 追求响应时间。同步块执行速度非常快
重量级锁 线程竞争不使用自旋,不会消耗CPU 线程阻塞,响应时间缓慢 追求吞吐量。同步块执行速度较长

线程状态:

  • New
  • Runnable
  • Running
  • Blocked
  • Dead

sleep yield join

sleep

sleep方法只是暂时让出CPU的执行权,并不释放锁。而wait方法则需要释放锁。

yield

yield方法的作用是暂停当前线程,以便其他线程有机会执行,不过不能指定暂停的时间,并且也不能保证当前线程马上停止

join

join方法的作用是父线程等待子线程执行完成后再执行,换句话说就是将异步执行的线程合并为同步的线程。对创建的线程都进行join()所有子线程从并发执行变成顺序执行

通过源码可以看出join方法就是通过wait方法来将线程的阻塞.注意:这里的join只调用了wait方法,却没有对应的notify方法,原因是Thread的start方法中做了相应的处理,所以当join的线程执行完成以后,会自动唤醒主线程继续往下执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
/**
* Waits for this thread to die.
*
* <p> An invocation of this method behaves in exactly the same
* way as the invocation
*
* <blockquote>
* {@linkplain #join(long) join}{@code (0)}
* </blockquote>
*
* @throws InterruptedException
* if any thread has interrupted the current thread. The
* <i>interrupted status</i> of the current thread is
* cleared when this exception is thrown.
*/
public final void join() throws InterruptedException {
join(0);
}
/**
* Waits at most {@code millis} milliseconds for this thread to
* die. A timeout of {@code 0} means to wait forever.
*
* <p> This implementation uses a loop of {@code this.wait} calls
* conditioned on {@code this.isAlive}. As a thread terminates the
* {@code this.notifyAll} method is invoked. It is recommended that
* applications not use {@code wait}, {@code notify}, or
* {@code notifyAll} on {@code Thread} instances.
*
* @param millis
* the time to wait in milliseconds
*
* @throws IllegalArgumentException
* if the value of {@code millis} is negative
*
* @throws InterruptedException
* if any thread has interrupted the current thread. The
* <i>interrupted status</i> of the current thread is
* cleared when this exception is thrown.
*/
public final synchronized void join(long millis)
throws InterruptedException {
long base = System.currentTimeMillis();
long now = 0;

if (millis < 0) {
throw new IllegalArgumentException("timeout value is negative");
}

if (millis == 0) {
while (isAlive()) {
wait(0);
}
} else {
while (isAlive()) {
long delay = millis - now;
if (delay <= 0) {
break;
}
wait(delay);
now = System.currentTimeMillis() - base;
}
}
}

synchronized

特性:

  1. 非公平锁
  2. 可重入锁

锁对象:

  1. 锁当前对象this
  2. 锁定临界对象
  3. 锁类

Java 虚拟机中的同步(Synchronization)基于进入和退出管程(Monitor)对象实现。同步方法 并不是由 monitor enter 和 monitor exit 指令来实现同步的,而是由方法调用指令读取运行时常量池中方法的 ACC_SYNCHRONIZED 标志来隐式实现的。

  • 对象头:存储对象的 hashCode、锁信息或分代年龄或 GC 标志,类型指针指向对象的类元数据,JVM 通过这个指针确定该对象是哪个类的实例等信息。
  • 实例变量:存放类的属性数据信息,包括父类的属性信息
  • 填充数据:由于虚拟机要求对象起始地址必须是 8 字节的整数倍。填充数据不是必须存在的,仅仅是为了字节对齐

当在对象上加锁时,数据是记录在对象头中。当执行 synchronized 同步方法或同步代码块时,会在对象头中记录锁标记,锁标记指向的是 monitor 对象(也称为管程或监视器锁)的起始地址。每个对象都存在着一个 monitor 与之关联,对象与其 monitor 之间的关系有存在多种实现方式,如 monitor 可以与对象一起创建销毁或当线程试图获取对象锁时自动生成,但当一个 monitor 被某个线程持有后,它便处于锁定状态。

在 Java 虚拟机(HotSpot)中,monitor 是由 ObjectMonitor 实现的。

当多线程并发访问同一个同步代码时,首先会进入_EntryList,当线程获取锁标记后, monitor 中的_Owner 记录此线程,并在 monitor 中的计数器执行递增计算(+1),代表锁定,其他线程在_EntryList 中继续阻塞。若执行线程调用 wait 方法,则 monitor 中的计数器执行赋值为 0 计算,并将_Owner 标记赋值为 null,代表放弃锁,执行线程进如_WaitSet 中阻塞。若执行线程调用 notify/notifyAll 方法,_WaitSet 中的线程被唤醒,进入_EntryList 中阻塞,等待获取锁标记。若执行线程的同步代码执行结束,同样会释放锁标记,monitor 中的_Owner 标记赋值为 null,且计数器赋值为 0 计算。

  • synchronized修饰的非static方法其实是在执行方法前,先获取this对象的锁,如果对象锁被其他线程获取了,则进入阻塞状态;
  • synchronized(mutex)修饰的代码段是先获取mutex对象的锁,然后再进入代码段,否则进入阻塞;
  • 对于synchronized修饰static方法,只不过现在是获取该类的class对象,由于一个类只有一个class对象,这时可以真正做到只有一个线程能够执行该代码段;
  • 设计同步时,尽量避免直接使用synchronized修饰的非static方法,尽量缩小核心的同步代码段,最大化利用CPU;
  • 实例化多个含有synchronized代码段的对象时,可能由于设置的对象锁是非全局的,依然可以有多个线程同时执行同一个代码段
  • 是在JVM层面上实现的,如果代码执行出现异常,JVM会自动释放锁,但是Lock不行,要保证锁一定会被释放,就必须将unLock放到finally{}中(手动释放);

Lock相关

相关概念

1. 可重入锁

如果锁具备可重入性,则称作为可重入锁。像synchronized和ReentrantLock都是可重入锁,可重入性在我看来实际上表明了锁的分配机制:基于线程的分配,而不是基于方法调用的分配。举个简单的例子,当一个线程执行到某个synchronized方法时,比如说method1,而在method1中会调用另外一个synchronized方法method2,此时线程不必重新去申请锁,而是可以直接执行方法method2。

2. 可中断锁

就是可以响应中断的锁 在Java中,synchronized就不是可中断锁,而Lock是可中断锁。如果某一线程A正在执行锁中的代码,另一线程B正在等待获取该锁,可能由于等待时间过长,线程B不想等待了,想先处理其他事情,我们可以让它中断自己或者在别的线程中中断它,这种就是可中断锁。lockInterruptibly()

3. 公平锁/非公平锁

公平锁即尽量以请求锁的顺序来获取锁。比如同是有多个线程在等待一个锁,当这个锁被释放时,等待时间最久的线程(最先请求的线程)会获得该锁,这种就是公平锁。
非公平锁即无法保证锁的获取是按照请求锁的顺序进行的。这样就可能导致某个或者一些线程永远获取不到锁。
在Java中,synchronized 就是非公平锁,它无法保证等待的线程获取锁的顺序。
而对于ReentrantLock和ReentrantReadWriteLock,它默认情况下是非公平锁,但是可以设置为公平锁。

jdk1.5并发包中ReentrantLock的创建可以指定构造函数的boolean类型来得到公平锁或非公平锁

  • 公平锁,就是很公平,在并发环境中,每个线程在获取锁时会先查看此锁维护的等待队列,如果为空,或者当前线程是等待队列的第一个,就占有锁,否则就会加入到等待队列中,以后会按照FIFO的规则从队列中取到自己
  • 非公平锁一上来就直接尝试占有锁,如果尝试失败,就再采用类似公平锁那种方式

4.读写锁

读写锁将对一个资源(比如文件)的访问分成了2个锁,一个读锁和一个写锁。

正因为有了读写锁,才使得多个线程之间的读操作不会发生冲突。

ReadWriteLock就是读写锁,它是一个接口,ReentrantReadWriteLock实现了这个接口。

可以通过readLock()获取读锁,通过writeLock()获取写锁。

CountDownLatch

门闩是 concurrent 包中定义的一个类型,是用于多线程通讯的一个辅助类型。
门闩相当于在一个门上加多个锁,当线程调用 await 方法时,会检查门闩数量,如果门闩数量大于 0,线程会阻塞等待。当线程调用 countDown 时,会递减门闩的数量,当门闩数量为 0 时,await 阻塞线程可执行。

Lock

Lock是一个接口

ReentrantLock

特性:

  1. 创建公平锁和非公平锁(默认)
  2. 可重入锁
  3. 实现了Lock接口

重入锁,建议应用的同步方式。相对效率比 synchronized 高。量级较轻。synchronized 在 JDK1.5 版本开始,尝试优化。到 JDK1.7 版本后,优化效率已经非常好了。在绝对效率上,不比 reentrantLock 差多少。 使用重入锁,必须必须必须手工释放锁标记。一般都是在 finally 代码块中定义释放锁标记的 unlock 方法。
在 Java 中,同步锁是可以重入的。只有同一线程调用同步方法或执行同步代码块,对同一个对象加锁时才可重入。
当线程持有锁时,会在 monitor 的计数器中执行递增计算,若当前线程调用其他同步代码,且同步代码的锁对象相同时,monitor 中的计数器继续递增。每个同步代码执行结束,
monitor 中的计数器都会递减,直至所有同步代码执行结束,monitor 中的计数器为 0 时,释放锁标记,_Owner 标记赋值为 null。

ReadWriteLock

ReadWriteLock是Java 5 中新增的一个接口,一个ReadWriteLock维护一对关联的锁,一个用于只读操作一个用于写。在没有写线程的情况下一个读锁可能会同时被多个读线程持有。写锁是独占的,你可以使用JDK中的ReentrantReadWriteLock来实现这个规则,它最多支持65535个写锁和65535个读锁。

ReentrantReadWriteLock

可重入的读写锁

Lock和synchronized的选择

总结来说,Lock和synchronized有以下几点不同:

  • Lock是一个接口,而synchronized是Java中的关键字,synchronized是内置的语言实现;
  • synchronized在发生异常时,会自动释放线程占有的锁,因此不会导致死锁现象发生;而Lock在发生异常时,如果没有主动通过unLock()去释放锁,则很可能造成死锁现象,因此使用Lock时需要在finally块中释放锁;
  • Lock可以让等待锁的线程响应中断,而synchronized却不行,使用synchronized时,等待的线程会一直等待下去,不能够响应中断;
  • 通过Lock可以知道有没有成功获取锁,而synchronized却无法办到。
  • Lock可以提高多个线程进行读操作的效率。

同步容器

线程安全的容器对象: Vector, Hashtable。线程安全容器对象,都是使用 synchronized 。保证单操作线程安全,组合操作还是要自己考虑安全问题。
concurrent 包中的同步容器,大多数是使用系统底层技术实现的线程安全。类似 native
Java8 中使用 CAS

Map/Set

ConcurrentHashMap/ConcurrentHashSet

底层哈希实现的同步 Map(Set)。效率高,线程安全。使用系统底层技术实现线程安全。量级较 synchronized 低。key 和 value 不能为 null。

ConcurrentSkipListMap/ConcurrentSkipListSet

底层跳表(SkipList)实现的同步 Map(Set)。有序,效率比 ConcurrentHashMap 稍低。

CopyOnWriteArraySet

集合元素不重复。线程安全。因为通常需要复制整个基础数组,所以可变操作(add()、set()、remove()等)的开销大
它最适合于具有以下特征的应用程序:Set 大小通常保持很小,只读操作远多于可变操作,需要在遍历期间防止线程间的冲突。
CopyOnWriteArraySet包含CopyOnWriteArrayList对象,它是通过CopyOnWriteArrayList实现的,和CopyOnWriteArrayList一样,是通过volatile和互斥锁来实现的

List

CopyOnWriteArrayList

是通过volatile数组和互斥锁来实现的,写时复制集合。写入效率低,读取效率高。每次写入数据,都会创建一个新的底层数组。在并发访问的情景下,当需要修改元素时,不直接修改该容器,而是先复制一份副本,在副本上进行修改。修改完成之后,将指向原来容器的引用指向新的容器(副本容器)。

  1. 由于不会修改原始容器,只修改副本容器。因此,可以对原始容器进行并发地读。其次,实现了读操作与写操作的分离,读操作发生在原始容器上,写操作发生在副本容器上。
  2. 数据一致性问题:读操作的线程可能不会立即读取到新修改的数据(仍然指向旧数组),因为修改操作发生在副本上。但最终修改操作会完成并更新容器,因此这是最终一致性

查询是没有加锁的获取volatile数组中数据,对于业务上需要对查询进行加锁的场景需要自行加锁,比如读写顺序的控制,多线程下可能先进行了读后进行了写需要自行等待写入进程完毕才能进行读取线程的读取操作,类似于事务(比如列表中账号资金+1,多个线程先读取金额,+1,保存这整个过程必须加锁,不然就会2个线程读取到的金额都是100,都进行了加1,结果保存是101,实际应该要102才对,这种需要在业务上加锁保证业务原子性,纯粹在查询或者更新单个操作上加锁是没用的)。CopyOnWriteArrayList新增、修改、删除都加锁了可以保证数据的准确性。多线程并发读写的时候每次获取的都是副本,所以过程中打印出来的内容是不一样的,只有等到后续不进行写入了,读取出来的副本才是最终副本

适用场景:List大小保持很小,只读操作远多于可变操作,需要在遍历期间防止线程间的冲突

CopyOnWriteArrayList中ListIterator是对snapshot进行遍历,不支持CopyOnWriteArrayList进行迭代删除的。一边遍历一边删除ArrayList要注意ConcurrentModifiedException,Iterator当每次删除操作之后,会重新将modCount赋值给expectedModCount。CopyOnWriteArrayList如果遍历期间,有其他线程对该 list 进行了增删改,那么 snapshot 就是快照,因为增删改后 list 中的数组会被新数组替换,这时候旧数组被 snapshot 所引用,使用迭代器遍历变量元素时,其它线程对该 list 进行的增删改是不可见的,这也就是弱一致性的

Queue

ConcurrentLinkedQueue

基础链表同步队列

LinkedBlockingQueue

阻塞队列,队列容量不足自动阻塞,队列容量为 0 自动阻塞。

ArrayBlockingQueue

底层数组实现的有界队列。自动阻塞。根据调用 API(add/put/offer)不同,有不同特性。
当容量不足的时候,有阻塞能力。
add 方法在容量不足的时候,抛出异常。
put 方法在容量不足的时候,阻塞等待。
offer 方法 单参数 offer 方法,不阻塞。容量不足的时候,返回 false。当前新增数据操作放弃。三参数 offer 方法(offer(value,times,timeunit)),容量不足的时候,阻塞 times 时长(单位为 timeunit),如果在阻塞时长内,有容量空闲,新增数据返回 true。如果阻塞时长范围内,无容量空闲,放弃新增数据,返回 false。

DelayQueue

延时队列。根据比较机制,实现自定义处理顺序的队列。常用于定时任务。如:定时关机。

LinkedTransferQueue

转移队列,使用 transfer 方法,实现数据的即时处理。没有消费者,就阻塞。

SynchronusQueue

同步队列,是一个容量为 0 的队列。是一个特殊的 TransferQueue。必须现有消费线程等待,才能使用的队列。
add 方法,无阻塞。若没有消费线程阻塞等待数据,则抛出异常。
put 方法,有阻塞。若没有消费线程阻塞等待数据,则阻塞。

线程池

如果在程序中需要有大量的线程执行,对于每一个线程都调用native去创建并运行,势必会造成很大的资源消耗,更多的计算资源集中在创建、开始、销毁线程的工作上,而这时线程池的出现可以解决这方面的问题。

多线程技术主要解决处理器单元内多个线程执行的问题,它可以显著减少处理器单元的闲置时间,增加处理器单元的吞吐能力。
假设一个服务器完成一项任务所需时间为:T1 创建线程时间,T2 在线程中执行任务的时间,T3 销毁线程时间。
如果:T1 + T3 远大于 T2,则可以采用线程池,以提高服务器性能。
一个线程池包括以下四个基本组成部分:

  1. 线程池管理器(ThreadPool):用于创建并管理线程池,包括 创建线程池,销毁线程池,添加新任务;
  2. 工作线程(PoolWorker):线程池中线程,在没有任务时处于等待状态,可以循环的执行任务;
  3. 任务接口(Task):每个任务必须实现的接口,以供工作线程调度任务的执行,它主要规定了任务的入口,任务执行完后的收尾工作,任务的执行状态等;
  4. 任务队列(taskQueue):用于存放没有处理的任务。提供一种缓冲机制。

线程池技术正是关注如何缩短或调整T1,T3时间的技术,从而提高服务器程序性能的。它把T1,T3分别安排在服务器程序的启动和结束的时间段或者一些空闲的时间段,这样在服务器程序处理客户请求时,不会有T1,T3的开销了。

注意:

  1. 线程池的大小
  2. 关闭线程池

如何自己实现线程池?

也就是实现上面提到的那几个部分。

Executors创建的常见线程池:
newCachedThreadPool() 在有任务时才创建新线程
newFixedThreadPool(int nThreads) 线程池中包含固定数目的线程
newSingleThreadExecutor() 只有一个工作线程
newScheduledThreadPool(int corePoolSize) 按时间计划执行任务
newSingleThreadScheduledExecutor() 一个工作线程,并能够按时间计划来执行任务

Executor

线程池顶级接口。
常用方法: void execute(Runnable command);
作用是: 启动线程任务的。

ExecutorService

Executor 接口的子接口。
常见方法: <T> Future<T> submit(Callable<T> task)Future<?> submit(Runnable task)

Future

未来结果,代表线程任务执行结束后的结果。

Callable

可执行接口。
接口方法: V call() throws Exception;相当于 Runnable 接口中的 run 方法。区别为此方法有返回值。不能抛出已检查异常。和 Runnable 接口的选择,需要返回值或需要抛出异常时,使用 Callable,其他情况可任意选择。

Executors

工具类型。为 Executor 线程池提供工具方法。类似 Arrays,Collections 等工具类型的功用。

FixedThreadPool

容量固定的线程池
queued tasks - 任务队列
completed tasks - 结束任务队列

CachedThreadPool

缓存的线程池。容量不限(Integer.MAX_VALUE)。自动扩容。默认线程空闲 60 秒,自动销毁。

ScheduledThreadPool

计划任务线程池。可以根据计划自动执行任务的线程池。

SingleThreadExceutor

单一容量的线程池。

ForkJoinPool

分支合并线程池(MapReduce 类似的设计思想)。适合用于处理复杂任务。初始化线程容量与 CPU 核心数相关。
线程池中运行的内容必须是 ForkJoinTask 的子类型(RecursiveTask,RecursiveAction)。

WorkStealingPool

JDK1.8 新增的线程池。工作窃取线程池。当线程池中有空闲连接时,自动到等待队列中窃取未完成任务,自动执行。
初始化线程容量与 CPU 核心数相关。使用分治法来解决问题。
ExecutorService.newWorkStealingPool();

ThreadPoolExecutor

线程池底层实现。除 ForkJoinPool 外,其他常用线程池底层都是使用 ThreadPoolExecutor
实现的。

1
2
3
4
5
6
7
8
9
10
public ThreadPoolExecutor (
int corePoolSize, // 核心容量
int maximumPoolSize, // 最大容量
long keepAliveTime, // 生命周期,0 为永久
TimeUnit unit, // 生命周期单位
BlockingQueue workQueue // 任务队列,阻塞队列
);
...
public void execute(Runnable command)
...
1
2
3
4
5
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this); // 默认工厂:Executors.defaultThreadFactory()
}

如何实现线程复用不被销毁的主要看runWorker()中的while()与getTask()方法中for(;;) 部分。
执行runWorker方法中的while循环里的任务的run方法,执行完成后,又继续进入getTask从任务队列中获取下一个任务。然而在我们开始分析execute的时候,这个方法中的三个部分都会调用addWorker去执行任务,在addWorker方法中都会去新建一个线程来执行任务,这样的话是不是每次execute都是去创建线程了?事实上,复用机制跟线程池的阻塞队列有很大关系,我们可以看到,在execute在核心线程满了,但是队列不满的时候会把任务加入到队列中,一旦加入成功,之前被阻塞的线程就会被唤醒去执行新的任务,这样就不会重新创建线程了。

线程管理总结:
当有新任务来的时候,先看看当前的线程数有没有超过核心线程数,如果没超过就直接新建一个线程来执行新的任务,如果超过了就看看缓存队列有没有满,没满就将新任务放进缓存队列中,满了就新建一个线程来执行新的任务,如果线程池中的线程数已经达到了指定的最大线程数了,那就根据相应的策略拒绝任务。
当缓存队列中的任务都执行完了的时候,线程池中的线程数如果大于核心线程数,就销毁多出来的线程,直到线程池中的线程数等于核心线程数。此时这些线程就不会被销毁了,它们一直处于阻塞状态,等待新的任务到来。

扩展:
线程池执行汇总各个线程计算结果?可以结果放在list中,进行二次计算
线程池一个线程执行过程中用到另一个线程的执行结果?拆分成两个线程池,一个线程池获取另一个线程池的执行结果

Condition

  1. Condition中的await()方法相当于Object的wait()方法,Condition中的signal()方法相当于Object的notify()方法,Condition中的signalAll()相当于Object的notifyAll()方法。不同的是,Object中的这些方法是和同步锁捆绑使用的;而Condition是需要与互斥锁/共享锁捆绑使用的。
  2. Condition它更强大的地方在于:能够更加精细的控制多线程的休眠与唤醒。对于同一个锁,我们可以创建多个Condition,在不同的情况下使用不同的Condition。

例如,假如多线程读/写同一个缓冲区:当向缓冲区中写入数据之后,唤醒”读线程”;当从缓冲区读出数据之后,唤醒”写线程”;并且当缓冲区满的时候,”写线程”需要等待;当缓冲区为空时,”读线程”需要等待。

Fork Join

Fork/Join框架是Java7提供了的一个用于并行执行任务的框架, 是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。

工作窃取算法

工作窃取(work-stealing)算法是指某个线程从其他队列里窃取任务来执行。假如我们需要做一个比较大的任务,我们可以把这个任务分割为若干互不依赖的子任务,为了减少线程间的竞争,于是把这些子任务分别放到不同的队列里,并为每个队列创建一个单独的线程来执行队列里的任务,线程和队列一一对应,比如A线程负责处理A队列里的任务。

Fork/Join工作步骤

  • 第一步分割任务
  • 第二步执行任务并合并结果

Fork/Join使用两个类来完成以上两件事情

  • ForkJoinTask 我们要使用ForkJoin框架,必须首先创建一个ForkJoin任务
  • ForkJoinPool ForkJoinTask需要通过ForkJoinPool来执行,任务分割出的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部。当一个工作线程的队列里暂时没有任务时,它会随机从其他工作线程的队列的尾部获取一个任务

CAS 原子操作

CAS(Compare and swap)比较和替换是设计并发算法时用到的一种技术。简单来说,比较和替换是使用一个期望值和一个变量的当前值进行比较,如果当前变量的值与我们期望的值相等,就使用一个新值替换当前变量的值。
volatile变量的读写和CAS是concurrent包得以实现的基础。CAS表示如果当前状态值等于预期值,则以原子方式将同步状态设置为给定的更新值,此操作具有volatile读和写的内存语义。AQS通过volatile的读/写和CAS所具有的volatile读和写的内存语义来实现线程之间的通信。

CAS是由操作系统提供的原子操作,CAS适合对简单数据进行操作,不适合复杂结构比如连表,比如链表删除了两个节点又把第一个加点新增回上去CAS会认为链表没有进行过操作,改进方案是加上version版本号。

synchronized是悲观锁每次都锁定资源,阻塞激活也是不小开销,CAS算乐观锁了。

AtomicBoolean
AtomicInteger
AtomicLong
AtomicIntegerArray
AtomicLongArray
AtomicStampedReference

AQS 并发同步框架

抽象队列同步器(AbstractQueuedSynchronizer,简称AQS)是用来构建锁或者其他同步组件的基础框架,它使用一个整型的volatile变量(命名为state)来维护同步状态,通过内置的FIFO队列来完成资源获取线程的排队工作。

类型 内容
高层类  Lock 同步器 阻塞队列 Executor 并发容器
基础类 AQS 非阻塞数据结构 原子变量类
底层类 volatile的读/写 CAS

concurrent包的实现结构如上所示,AQS、非阻塞数据结构和原子变量类等基础类都是基于volatile变量的读/写和CAS实现,而像Lock、同步器、阻塞队列、Executor和并发容器等高层类又是基于基础类实现。

主要方法:

  • getState()
  • setState()
  • compareAndSetState()
  • isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它。
  • tryAcquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false。
  • tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false。
  • tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
  • tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。

ReentrantLock为例,state初始化为0,表示未锁定状态。A线程lock()时,会调用tryAcquire()独占该锁并将state+1。此后,其他线程再tryAcquire()时就会失败,直到A线程unlock()到state=0(即释放锁)为止,其它线程才有机会获取该锁。当然,释放锁之前,A线程自己是可以重复获取此锁的(state会累加),这就是可重入的概念。但要注意,获取多少次就要释放多么次,这样才能保证state是能回到零态的。

再以CountDownLatch以例,任务分为N个子线程去执行,state也初始化为N(注意N要与线程个数一致)。这N个子线程是并行执行的,每个子线程执行完后countDown()一次,state会CAS减1。等到所有子线程都执行完后(即state=0),会unpark()主调用线程,然后主调用线程就会从await()函数返回,继续后余动作。

一般来说,自定义同步器要么是独占方法,要么是共享方式,他们也只需实现tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一种即可。但AQS也支持自定义同步器同时实现独占和共享两种方式,如ReentrantReadWriteLock。

ReentrantLock,Semaphore就是基于AQS

并发队列

ConcurrentLinkedQueue

CAS实现的非阻塞队列

LinkedBlockingQueue

独占锁实现的阻塞队列

1
2
3
4
5
6
7
8
9
10
// 传说中的无界队列
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
// 传说中的有界队列
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 队列容量
private final int capacity;
// 队列中的元素数量
private final AtomicInteger count = new AtomicInteger(0);
// 队头
private transient Node<E> head;
// 队尾
private transient Node<E> last;
// take, poll, peek 等读操作的方法需要获取到这个锁
private final ReentrantLock takeLock = new ReentrantLock();
// 如果读操作的时候队列是空的,那么等待 notEmpty 条件
private final Condition notEmpty = takeLock.newCondition();
// put, offer 等写操作的方法需要获取到这个锁
private final ReentrantLock putLock = new ReentrantLock();
// 如果写操作的时候队列是满的,那么等待 notFull 条件
private final Condition notFull = putLock.newCondition();

ArrayBlockingQueue

ArrayBlockingQueue 是 BlockingQueue 接口的有界队列实现类,底层采用数组来实现

1
2
3
4
5
6
7
8
9
10
11
12
13
// 用于存放元素的数组
final Object[] items;
// 下一次读取操作的位置
int takeIndex;
// 下一次写入操作的位置
int putIndex;
// 队列中的元素数量
int count;

// 以下几个就是控制并发用的同步器
final ReentrantLock lock;
private final Condition notEmpty;
private final Condition notFull;

SynchronousQueue

这里说的并不是多线程的并发问题,而是因为当一个线程往队列中写入一个元素时,写入操作不会立即返回,需要等待另一个线程来将这个元素拿走;同理,当一个读线程做读操作的时候,同样需要一个相匹配的写线程的写操作
比较少使用到 SynchronousQueue 这个类,不过它在线程池的实现类 ScheduledThreadPoolExecutor 中得到了应用

PriorityBlockingQueue

带排序的 BlockingQueue 实现,其并发控制采用的是 ReentrantLock,队列为无界队列,基于数组,数据结构为二叉堆树

其他相关技术

  • volatile 线程可见性, 是一个特殊的修饰符,只有成员变量才能使用它。在Java并发程序缺少同步类的情况下,多线程对成员变量的操作对其它线程是透明的。volatile变量可以保证下一个读取操作会在前一个写操作之后发生,就是上一题的volatile变量规则
  • ThreadLocal 是Java里一种特殊的变量。每个线程都有一个ThreadLocal就是每个线程都拥有了自己独立的一个变量,竞争条件被彻底消除了。
  • FutureTask FutureTask表示一个可以取消的异步运算。它有启动和取消运算、查询运算是否完成和取回运算结果等方法
  • java堆\栈 每个线程都有自己的栈内存,用于存储本地变量,方法参数和栈调用,一个线程中存储的变量对其它线程是不可见的。而堆是所有线程共享的一片公用内存区域
    检测线程是否有锁 java.lang.Thread中有一个方法叫holdsLock()
  • ConcurrentHashMap map划分成若干部分来实现它的可扩展性和线程安全
  • Semaphore Semaphore是一个计数信号量,它的本质是一个”共享锁”。(要理解共享锁,就是多个线程可以同时取获取得到锁)信号量维护了一个信号量许可集。线程可以通过调用acquire()来获取信号量的许可;当信号量中有可用的许可时,线程能获取该许可;否则线程必须等待,直到有可用的许可为止。 线程可以通过release()来释放它所持有的信号量许可。信号量值可以自己定义比如10,20等
  • volatile atomic volatile 变量和 atomic 变量看起来很像,但功能却不一样。Volatile变量可以确保先行关系,即写操作会发生在后续的读操作之前, 但它并不能保证原子性。例如用volatile修饰count变量那么 count++ 操作就不是原子性的。而AtomicInteger类提供的atomic方法可以让这种操作具有原子性如getAndIncrement()方法会原子性的进行增量操作把当前值加一,其它数据类型和引用变量也可以进行相似操作.
  • wait() join() wait()方法用于线程间通信,如果等待条件为真且其它线程被唤醒时它会释放锁,而sleep()方法仅仅释放CPU资源或者让当前线程停止执行一段时间,但不会释放锁
  • AtomicXxx 类型 原子类型 使用CAS实现 在 concurrent.atomic 包中定义了若干原子类型,这些类型中的每个方法都是保证了原子操作的。多线程并发访问原子类型对象中的方法,不会出现数据错误。在多线程开发中,如果某数据需要多个线程同时操作,且要求计算原子性,可以考虑使用原子类型对象.注意:原子类型中的方法是保证了原子操作,但多个方法之间是没有原子性的
  • mutex 互斥锁
  • monitor 管程

高并发开发框架

Disruptor

并发框架,高效无锁内存队列,但是没有分布式的功效,只适合单独的一台服务器,一个轻量化的消息队列,可以用来代替queue 线程队列。

使用场景

  • 生产者-消费者
  • 线程之间交换数据
  • Disruptor就是一个轻量化的消息队列。在并发量比较大的情况下,提前给客户端或不同线程返回执行结果的时候可以使用Disruptor
  • Disruptor的速度比LinkedBlockingQueue快

执行策略

  • BlockingWaitStrategy:默认等待策略。和BlockingQueue的实现很类似,通过使用锁和条件(Condition)进行线程同步和唤醒。此策略对于线程切换来说,最节约CPU资源,但在高并发场景下性能有限。
  • SleepingWaitStrategy:CPU友好型策略。会在循环中不断等待数据。首先进行自旋等待,若不成功,则使用Thread.yield()让出CPU,并使用LockSupport.parkNanos(1)进行线程睡眠。所以,此策略数据处理数据可能会有较高的延迟,适合用于对延迟不敏感的场景。优点是对生产者线程影响小,典型应用场景是异步日志。
  • YieldingWaitStrategy:低延时策略。消费者线程会不断循环监控RingBuffer的变化,在循环内部使用Thread.yield()让出CPU给其他线程。
  • BusySpinWaitStrategy:死循环策略。消费者线程会尽最大可能监控缓冲区的变化,会占用所有CPU资源。

解决CPU Cache伪共享问题

为了解决CPU和内存速度不匹配的问题,CPU中有多个高速缓存Cache。在Cache中,读写数据的基本单位是缓存行,缓存行是内存复制到缓存的最小单位。解决伪共享问题,可以在变量的前后都占据一定的填充位置,尽量让变量占用一个完整的缓存行。在Sequence的实现中,主要使用的是Value,但通过LhsPaddingRhsPadding在Value的前后填充了一些空间,使Value无冲突的存在于缓存行中。

linux进程间通讯

  1. 信号
  2. 信号量
  3. 管道
  4. 消息队列
  5. 共享内存
  6. 套接字

局部变量,全局变量,成员变量

存储区域:
全局变量(全局静态变量)是放在方法区中。

成员变量如果没有实例化那么变量是放在栈中;实例化了对象放在堆中,栈中放的是指向堆中对象的引用地址。

局部变量放在栈中,new的对象放在堆中,8中基本数据类型变量放在栈中,变量所对应的值是放在栈帧中。

生命周期:
全局变量:当类加载的时候,就开始被创建,在类中只有一份; 会跟着类的消失而消失,生存时间叫长。

成员变量:在对象被创建时而存在,当对象被GC回收的同时,他也会消失,生存时间适中。

局部变量:当方法被调用时而存在,当方法调用结束而消失,生存时间短。

作用域:
全局变量:作用整个类中,直接被类调用。

成员变量:作用在整个类中(除静态方法不能使用,静态方法没有隐式的this),被对象调用。

局部变量:作用在一个局部区域,比如说在一个方法中,方法调用。

线程安全相关:
volatile修饰成员变量保持内存可见性和防止指令重排序
有些情况会把成员变量的值赋值给局部变量,这样在多线程情况下局部变量指向的跟成员变量是同一块堆空间比如List集合。如果有方法对成员变量的List集合删除元素,局部变量进行for循环就会出现并发修改异常。

参考