注意:所有文章除特别说明外,转载请注明出处.
Java 并发编程
1 并发任务
1.1 运行任务
在Java中,Runnable接口描述一个想要运行的任务,通常与其它任务并行运行。
public interface Runnable {
//该方法在线程中执行
void run();
}
//两个并行任务同时进行
public static void main(String[] args){
Runnable hellos = () -> {
for(int i = 0; i <= 100; i++){
System.out.println("hello" + i);
}
};
Runnable goodbyes = () -> {
for(int i = 1; i <= 1000; i++){
System.out.println("goodbye" + i);
}
};
//调用该对象会产生一个针对很多短暂任务或者任务大多数时间处于等待状态的程序优化过的executor。
ExecutorService executor = Executors.newCachedThreadPool();
executor.execute(hellos);
executor.execute(goodbyes);
}
注意:在线程池的线程空闲一段时间之后,executor会终止这些线程,然后程序才终止。
1.2 Future
虽然Runnable执行任务,但是其没有返回值。如果任务需要计算结果,则使用Callable
public interface Callable<V> {
v call() throws Exception();
}
提示:这里要执行Callable,将其提交到ExecutorService。当提交任务时,会得到一个future,代表计算的对象,将来会用到的计算结果。
1.2.1 Future 接口的方法
//get()方法会被阻塞,直到有可用的结果或者到达超时
1.V get()
2.V get(long timeout, TimeUnit unit)
//该方法试图取消任务,如果任务还没有运行则不会将任务加入运行计划
3.boolean cancel(boolean mayInterruptIfRunning)
4.boolean isCancelled();
5.boolean isDone();
任务有时候可能需要等待多个子任务的完成结果,可以将Callable实例的一个集合传递给一个invokeAll()方法,实现一次提交多个任务。
String word = ...;
String paths = ...;
List<Callable<Long>> tasks = new ArrayList<>();
for(Path p : paths){
task.add(
() -> {return number of .. word in p};
List<Future<Long>> results = executor.invokeAll(tasks);
//该调用会阻塞,直到所有任务tasks都完成
long total = 0;
for(Future<Long> result : results){
total += result.get();
}
)
}
提示:invokeAny()方法和invokeAll()方法很像,但只要提交的所有任务中的任何一个完成了并且没有抛出异常,它就返回。它返回Future值,其它任务被取消。
2 异步计算
2.1 可完成的Future
在拥有Future对象时,就需要调用get()方法获取值,get()方法会被阻塞,直到有可用的值。CompletableFuture类实现Future接口,并且提供获取结果的第二种机制。注册回调函数,一旦结果可用,会立刻处理。
如果想要异步运行任务并获取CompletableFuture时,不要直接将任务提交到ExecutorService,调用CompletableFuture.supplyAsync。
CompletableFuture<String> f = CompletableFuture.supplyAsync(
() -> {String result; Compute the result; return result;},
executor
);
提示:这里如果省略executor,任务会在默认的executor上运行。
注意:在多线程中调用同一个future的complete或者completeExceptionally()方法是安全的。如果future已经完成,这些调用没有影响。
2.2 组合可完成的Future
CompletableFuture类通过提供一种将异步任务组合到一个处理流水线中的机制。
2.3 UI回调中的长时间运行任务
在程序需要做一些耗时的事情时,不能在UI线程中执行这样的任务,否则UI就会冻结。所以在这种时候可以启动另外一个工作线程。
Button read = new Button("Read");
read.setOnAction(event -> {
Scanner in = new Scanner(url.openStream());
while(in.hasNextLine()){
String line = in.nextLine();
..
}
});
//在另外一个线程做 长时间运行的任务在单一的线程中
read.setOnAction(event -> {
Runnable task = () -> {
Scanner in = new Scanner(url.openStream());
while(in.hasNextLine()){
String line = in.nextLine();
...
}
}
executor.execute(task);
})
3 线程安全
3.1 可见性
private static boolean done = false;
public static void main(String[] args){
Runnable hellos = () -> {
for(int i = 1; i <= 1000; i++){
System.out.println("hello" + i);
done = true;
}
};
Runnable goodbye = () -> {
int i = 1;
while(!done){
i++;
}
System.out.println("goodbye" + i);
};
Executor executor = Executors.newCachedThreadPool();
executor.execute(hellos);
executor.execute(goodbye);
}
提示:这一程序中的done对第二个任务线程是不可见的。
注意:总结集中方式可以确保对变量的更新是可见的:1.final变量的值在初始化后是可见的。2.static变量的初始值在静态初始化后是可见的。3.对volatie变量的改变是可见的。
//所以上面的done变量声明时带上volatile修饰符
private static volatile boolean done;
3.2 竞争条件
多个并发任务更新一个整数共享计数器。这时候就会发生一种因为共享变量变化不一致的问题。这种错误称之为:竞争条件,因为其依赖于哪个线程赢得更新共享变量的竞争。
提示:竞争条件是修改共享变量时的一个问题。如果指定的复杂序列在不合适的时候被暂停,而其它任务获得对序列的控制,此时序列会处于不一致的状态,这样很多事情都会出错。此时如果我们需要确保操作的整个序列是一起执行的。这样的执行序列是:临界区。可以使用锁来保护临界区,但是锁很难恰当的使用。
3.3 安全并发策略
1.限制:在任务中间不共享数据,每个任务都有自己私有计数器,如果后面需要统一的结果再将任务的结果切换到另一任务中执行合并。
2.不变性:共享不可修改的对象是安全的。
3.锁
3.4 不可变类
如果一个类的实例一旦构造完毕,就不能再次改变,那么此类是不可变类。
4 并行算法
4.1 并行流
long result = coll.parallelStream().filter(s -> s.startsWith("A")).count();
parallelStream()方法产生一个并行流。
4.2 并行数组操作
Arrays类有很多并行化操作。这些操作将数组分解为片,在片上并发工作,最后合并结果。静态Arrays.parallelSetAll()方法以函数的计算值填充数组。
Arrays.parallelSetAll(values, i -> i % 10);
Arrays.parallelSort(words, Comparator.comparing(String::length));
5 线程安全的数据结构
java.util.concurrent包中的集合都是巧妙实现的,因此多个线程可以在不阻塞的情况下实现对数据的访问。
注意:这些集合产生 弱一致性 的迭代器。
5.1 ConcurrentHashMap
ConcurrentHashMap 首先是哈希映射,其操作时线程安全的。不管多少线程同时在map上操作,内部结构都不会损坏。一些线程可能会临时被阻塞,但map可以高效支持大量并发操作以及一定数量的并发写操作。
5.2 阻塞队列
阻塞队列 是在任务中间协调工作的一个常用工具。生产者任务在队列中插入项,消费者在队列中获取项。队列让我们安全的将数据从一个任务转移到另外一个任务。
java.util.concurrent包提供几个阻塞队列的变体:
1.LinkedBlockingQueue 基于链表
2.ArrayBlockingQueue 使用循环数组
5.3 其它线程安全的数据结构
6 原子计数器和累加器
如果多个线程更新一个共享计数器,那么我们需要保证更新操作时按照线程安全方式进行的。java.util.concurrent.atomic包中有很多类,它们使用 安全并且高效 的机器级指令确保对整数和long和boolean值、对象引用和数组操作的原子性。
public static AtomicLong nextNumber = new AtomicLong();
//某些线程中..
long id = nextNumber.incrementAndGet();
提示:这里的incrementAndGet()方法会自动将AtomicLong的值加1,返回加之后的值。
public static AtomicLong largest = new AtomicLong();
//某些线程中..
largest.updateAndGet(x -> Math.max(x, observed));
//或者
largest.accumulateAndGet(observed, Math::max);
7 锁和条件
7.1 锁
7.2 synchronized关键字
7.3 条件等待
这里的synchronized方法确保这些操作是原子的。】
public class Queue{
class Node{
Object value;
Node next;
}
private Node head;
private Node tail;
public synchronized void add(Object newValue){
Node n = new Node();
if(head == null){
head = n;
}else{
tail.next = n;
}
tail = n;
tail.value = newValue;
}
public synchronized Object remove(){
if(head == null){
return null;
}
Node n = head;
head = n.next;
return n.value;
}
}
8 线程
8.1 启动线程
//等待一个线程完成
thread.join(millis);