注意:所有文章除特别说明外,转载请注明出处.
第六章 Java并发容器和框架
[TOC]
6.1 ConcurrentHashMap的实现原理和使用
ConcurrentHashMap是线程安全且高效的HashMap。
6.1.1 为何使用ConcurrentHashMap
在并发编程中使用HashMap可能会导致死循环,而使用线程安全的HashTable效率非常低下,基于上面两个原因,所以有了ConcurrentHashMap。
1.线程不安全的HashMap
多线程环境下,使用HashMap进行put操作会导致死循环,导致CPU利用率接近1。
2.效率低下的HashTable
HashTable容器使用synchronize来保证线程安全,但是在竞争激励的情况下HashTable效率低下。当一个线程访问HashTable的同步方法,其它线程访问将会进入阻塞或者是轮询状态。
3.ConcurrentHashMap的锁分段技术可有效提升并发访问率
提示:HashTable在并发环境下表现效率低下的原因是因为所有访问HashTable的线程都必须要竞争同一把锁,
如果容器中存在多把锁,每一把锁用于锁定容器中的一部分数据
,那么多线程访问容器里不同数据段的数据时,线程间就不存在锁竞争(锁分段技术)。
锁分段技术:将数据分成一段一段的存储,然后给每一段数据配一把锁,当一个线程占用锁访问其中一个段数据时,其它段的数据也能被其它线程访问。
6.1.2 ConcurrentHashMap的结构
ConcurrentHashMap是由 Segment数组结构 和 HashEntry数组结构 组成。Segment数组结构 是一种可重入锁(ReentrantLock),在ConcurrentHashMap里扮演锁的角色。HashEntry 则用于存储键值数据。
一个ConcurrentHashMap里包含一个 Segment数组,Segment的结构和HashMap类似,是一种数组和链表结构。一个Segment包含一个HashEntry数组,每个HashEntry是一个链表结构的元素。
6.1.3 ConcurrentHashMap初始化
ConcurrentHashMap初始化方法是通过 initialCapacity loadFactor concurrencyLevel等几个参数来初始化segment数组、段偏移量segmentshift、段掩码segmentMask和每个segment里的HashEntry数组实现。
6.1.4 定位Segment
因为ConcurrentHashMap是通过分段锁Segment来保护不同段的数据,那么在插入和获取元素
的时候,必须先通过散列算法定位到Segment。
6.1.5 ConcurrentHashMap的操作
1.get操作
Segment的get操作非常高效和简单。先通过一次再散列
,然后使用这个散列值
通过散列运算定位到Segment,再通过散列算法定位到元素。
2.put操作
由于put方法需要对共享变量进行写入操作,所以为了线程安全,在操作共享变量时必须加锁。
3.size操作
如果需要统计整个ConcurrentHashMap中元素的大小,需要统计所有Segment里元素的大小后求和。
6.2 ConcurrentLinkedQueue
在并发编程环境下,需要使用线程安全的队列。如果要实现一个线程安全的队列有两种方式:1.使用阻塞算法。2.使用非阻塞算法。
1.使用阻塞算法的队列可以用一个锁(入队和出队用同一把锁)或者两把锁。
2.非阻塞实现方式则可以使用循环CAS的方式来实现。
ConcurrentLinkedQueue 是一个基于链接节点的无界线程安全队列,采用先进先出的规则对节点进行排序,当添加一个元素的时候,它会添加到队列的尾部。当获取一个元素时,会返回队列的头部。采用CAS算法实现。
6.2.1 ConcurrentLinkedQueue的结构
ConcurrentLinkedQueue 由head节点和tail节点组成,每个节点Node由节点元素(item)和指向下一个节点(next)的引用组成,节点与节点之间就是通过next关联起来,从而组成一张链表结构的队列。
6.2.2 入队列
入队列就是将入队节点添加到队列的尾部。
6.2.3 出队列
出队列就是从队列中返回一个结点元素,并清空该节点对元素的引用。
6.3 Java阻塞队列
6.3.1 什么是阻塞队列
阻塞队列,指一个支持两个附加操作的队列。这两个附件操作支持阻塞的插入和移除操作。
1.支持阻塞的插入方法:当队列满时,队列会阻塞插入元素的线程,直到队列不满。
2.支持阻塞的移除方法:对队列为空时,获取元素的线程会等待队列变为非空。
提示:阻塞队列常用于生产者和消费者的场景。
注意:如果是无界阻塞队列,队列不可能会出现满的情况,所以使用put或offer方法永远不会被阻塞,而且使用offer方法时,该方法永远返回true。
6.3.2 Java里的阻塞队列
JDK 7 提供7个阻塞队列:
1.ArrayBlockingQueue 数组组成的有界阻塞队列,按照先进先出的原则对元素进行排序。默认情况不保证线程公平的访问队列。
//在初始化时指定阻塞队列的容量和公平性
ArrayBlockingQueue(int capacity, boolean fair);
提示:公平访问队列是指,阻塞的线程可以按照阻塞的先后顺序访问队列,先阻塞的线程先访问队列。非公平性是指,阻塞的线程可以争夺访问队列的资格。
2.LinkedBlockingQueue 链表实现的有界阻塞队列
3.PriorityBlockingQueue 是一个支持优先级的无界阻塞队列。默认情况采取自然升序排列。
4.DelayQueue 支持延时获取元素的无界阻塞队列
5.SynchronousQueue 不存储元素的阻塞队列,每一个put操作都必须等待一个take操作,否则不能继续添加元素。
//可以指定公平性策略
public SynchronousQueue(boolean fair);
6.LinkedTransferQueue
7.LinkedBlockingDeque
6.3.3 阻塞队列的实现原理
通知模式
当生产者往满的队列里添加元素时会阻塞生产者,当消费者消费队列中的一元素之后会通知生产者当前队列可用。
6.4 Fork | Join框架
6.4.1 Fork/Join框架简介
并行执行任务框架,将一个大任务分割成若干个小任务,最终汇总每个任务结果后得到大任务结果的框架。
Fork是将大任务切分成若干小任务并行的执行,Join是合并这些子任务的执行结果,最后得到大任务的结果。
6.4.2 工作窃取算法
该算法表示某个线程从其它队列里窃取任务来执行。此时会发生窃取任务线程和被窃取任务线程之间的竞争,为了解决这个问题,使用双端队列,被窃取的任务线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行。
1.优点:充分利用线程进行并行运算,减少线程间的竞争。
2.缺点:某些情况下还是存在竞争。如:双端队列中只有一个任务时。
6.4.3 Fork/Join框架的设计
1.分割任务
2.执行任务并合并结果
那么这里Fork/Join使用两个类来完成上面两件事情:
1.ForkJoinTask:我们要使用ForkJoin框架,必须首先创建一个ForkJoin任务。它提供在任务中执行fork()和join()操作的机制。
提示:ForkJoinTask与一般任务的主要区别在于它需要实现compute()方法。在此方法中首先需要判断任务足够小,才执行当前子任务并返回结果。
在这里不需要直接继承ForkJoinTask类,只需要继承它的子类,Fork/Join框架提供以下两个子类:
1.RecursiveAction:用于没有返回结果的任务。
2.RecursiveTask:用于有返回结果的任务。
2.ForkJoinPool:ForkJoinTask需要通过ForkJoinPool来执行。
6.4.4 使用Fork/Join框架
package Java.ChapterSeven.ForkJoin;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;
public class ForkJoinExample extends RecursiveTask<Integer> {
private final int threshold = 5;
private int firstOne;
private int lastOne;
public ForkJoinExample(int firstOne, int lastOne) {
this.firstOne = firstOne;
this.lastOne = lastOne;
}
@Override
protected Integer compute() {
int result = 0;
//任务小可以直接计算
if (lastOne - firstOne <= threshold){
for (int i = firstOne; i <= lastOne; i++){
result += i;
}
}else {
//拆分成小任务
int middle = firstOne + (lastOne - firstOne) / 2;
ForkJoinExample leftTask = new ForkJoinExample(firstOne, lastOne);
ForkJoinExample rightTask = new ForkJoinExample(middle + 1, lastOne);
leftTask.fork();
rightTask.fork();
result = leftTask.join() + rightTask.join();
}
return result;
}
public static void main(String[] args) {
ForkJoinExample example = new ForkJoinExample(1, 1000);
ForkJoinPool forkJoinPool = new ForkJoinPool();
Future<Integer> result = forkJoinPool.submit(example);
try {
System.out.println(result.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
6.4.5 Fork/Join框架的异常处理和实现原理
ForkJoinTask在执行的时候可能会抛出异常,但是我们没办法在主线程里直接捕获异常,所以ForkJoinTask提供isCompletedAbnormally()方法检查任务是否已经抛出异常或者被取消了,并且可以通过ForkJoinTask的getException()方法获取异常。
ForkJoinPool由ForkJoinTask数组和ForkJoinWorkerThread数组组成,ForkJoinTask数组负责将存放程序提交给ForkJoinPool的任务,而ForkJoinWorkerThread数组负责执行这些任务。
6.4.6 Fork/Join框架的实现原理
ForkJoinPool由ForkJoinTask数组和ForkJoinWorkerThread数组组成,ForkJoinTask数组负责存放程序提交给ForkJoinPool的任务。ForkJoinWorkerThread负责执行这些任务。