-
1、Java ExecutorService
ExecutorService 是 Java java.util.concurrent 包的重要组成部分,是 Java JDK 提供的框架,用于简化异步模式下任务的执行。
一般来说,ExecutorService 会自动提供一个线程池和相关 API,用于为其分配任务。
实例化 ExecutorService
实例化ExecutorService 的方式有两种:一种是工厂方法,另一种是直接创建。
Executors.newFixedThreadPool() 工厂方法创建 ExecutorService 实例
创建ExecutorService 实例的最简单方法是使用 Executors 类的提供的工厂方法。比如
当然还有其它很多工厂方法,每种工厂方法都可以创建满足特定用例的预定义 ExecutorService 实例。你所需要做的就是找到自己想要
的合适的方法。这些方法都在 Oracle 的 JDK 官方文档中有列出
直接创建 ExecutorService 的实例
因为ExecutorService 是只是一个接口,因此可以使用其任何实现类的实例。Java java.util.concurrent 包已经预定义了几种实现可供我
们选择,或者你也可以创建自己的实现。
例如,ThreadPoolExecutor 类实现了 ExecutorService 接口并提供了一些构造函数用于配置执行程序服务及其内部池。
你可能会注意到,上面的代码与工厂方法 newSingleThreadExecutor() 的 源代码 非常相似。对于大多数情况,不需要详细的手动配置。
将任务分配给 ExecutorService
ExecutorService 可以执行 Runnable 和 Callable 任务。为了使本文简单易懂。我们将使用两个两个原始任务,如下面的代码所示。
注意: 上面的代码使用了 lambda 表达式而不是匿名内部类。
创建完了任务之后,就可以使用多种方法将任务分配给 ExecutorService ,比如 execute() 方法,还有 submit()、invokeAny() 和
invokeAll() 等方法。这些方法都继承自 Executor 接口。
1、 首先来看看execute()方法;
该方法返回值为空 ( void )。因此使用该方法没有任何可能获得任务执行结果或检查任务的状态( 是正在运行 ( running )
还是执行完毕 ( executed ) )。
2、 其次看看submit()方法;
submit() 方法会将一个 Callable 或 Runnable 任务提交给 ExecutorService 并返回 Future 类型的结果。
3、 然后是invokeAny()方法;
invokeAny() 方法将一组任务分配给 ExecutorService,使每个任务执行,并返回任意一个成功执行的任务的结果 ( 如果成功执行 )
4、 最后是invokeAll()方法;
invokeAll() 方法将一组任务分配给 ExecutorService ,使每个任务执行,并以 Future 类型的对象列表的形式返回所有任务执行的结果。
在继续深入理解 ExecutorService 之前,我们必须先讲解下另外两件事:关闭 ExecutorService 和处理 Future 返回类型。
关闭 ExecutorService
一般情况下,ExecutorService 并不会自动关闭,即使所有任务都执行完毕,或者没有要处理的任务,也不会自动销毁 ExecutorService 。
它会一直出于等待状态,等待我们给它分配新的工作。
这种机制,在某些情况下是非常有用的,比如,如果应用程序需要处理不定期出现的任务,或者在编译时不知道这些任务的数量。
但另一方面,这也带来了副作用:即使应用程序可能已经到达它的终点,但并不会被停止,因为等待的 ExecutorService 将导致 JVM
继续运行。这样,我们就需要主动关闭 ExecutorService。
要正确的关闭 ExecutorService,可以调用实例的 shutdown() 或 shutdownNow() 方法。
1、 shutdown()方法:
shutdown() 方法并不会立即销毁 ExecutorService 实例,而是首先让 ExecutorService 停止接受新任务,并在所有正在运行的线程完成
当前工作后关闭。
2、 shutdownNow()方法:shutdownNow() 方法会尝试立即销毁 ExecutorService 实例,所以并不能保证所有正在运行的线程将同时停止。该方法会返回等待处
理的任务列表,由开发人员自行决定如何处理这些任务。
因为提供了两个方法,因此关闭 ExecutorService 实例的最佳实战 ( 也是 Oracle 所推荐的 )就是同时使用这两种方法并结合
awaitTermination() 方法。
使用这种方式,ExecutorService 首先停止执行新任务,等待指定的时间段完成所有任务。如果该时间到期,则立即停止执行。
Future 接口
submit() 方法和 invokeAll() 方法返回一个 Future 接口的对象或 Future 类型的对象集合。这些 Future 接口的对象允许我们获取任务
执行的结果或检查任务的状态 ( 是正在运行还是执行完毕 )。
Future 接口 get() 方法
Future 接口提供了一个特殊的阻塞方法 get(),它返回 Callable 任务执行的实际结果,但如果是 Runnable 任务,则只会返回 null。
因为get() 方法是阻塞的。如果调用 get() 方法时任务仍在运行,那么调用将会一直被执阻塞,直到任务正确执行完毕并且结果可用
时才返回。
而且更重要的是,正在被执行的任务随时都可能抛出异常或中断执行。因此我们要将 get() 调用放在 try catch 语句
块中,并捕捉 InterruptedException 或 ExecutionException 异常。
因为get() 方法是阻塞的,而且并不知道要阻塞多长时间。因此可能导致应用程序的性能降低。如果结果数据并不重要,那么我们可
以使用超时机制来避免长时间阻塞。
这个get() 的重载,第一个参数为超时的时间,第二个参数为时间的单位。上面的实例所表示就的就是等待 200 毫秒。
注意,这个 get() 重载方法,如果在超时时间内正常结束,那么返回的是 Future 类型的结果,如果超时了还没结束,
那么将抛出 TimeoutException 异常。
除了get() 方法之外,Future 还提供了其它很多方法,我们将几个重要的方法罗列在此
方法 说明 isDone() 检查已分配的任务是否已处理 cancel() 取消任务执行 isCancelled() 检查任务是否已取消 这些方法的使用方式如下
boolean isDone = future.isDone(); boolean canceled = future.cancel(true); boolean isCancelled = future.isCancelled();
ScheduledExecutorService 接口
ScheduledExecutorService 接口用于在一些预定义的延迟之后运行任务和( 或 )定期运行任务。
同样的,实例化 ScheduledExecutorService 的最佳方式是使用 Executors 类的工厂方法。
Executors 类为很多类都提供了工厂方法,简直就是工厂方法的集大成者。
本章节为了简单起见,我们只使用只有一个线程的 ScheduledExecutorService 实例
ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
有了实例之后,事情就好办了,比如,要在固定延迟后安排单个任务的执行,可以使用 ScheduledExecutorService 实例的
scheduled() 方法
Future<String> resultFuture = executorService.schedule(callableTask, 1, TimeUnit.SECONDS);
上面这个实例中的代码在执行 callableTask 之前延迟了一秒钟。
scheduled() 方法有两个重载,分别用于执行 Runnable 任务或 Callable 任务。
另外,ScheduledExecutorService 实例还提供了另一个重要方法 scheduleAtFixedRate() ,它允许在固定延迟后定期执行任务。
Future<String> resultFuture = service.scheduleAtFixedRate(runnableTask, 100, 450, TimeUnit.MILLISECONDS);
上面的代码块将在 100 毫秒的初始延迟后执行任务,之后,它将每 450 毫秒执行相同的任务。
如果处理器需要更多时间来执行分配的任务,那么可以使用 scheduleAtFixedRate() 方法的 period 参数,ScheduledExecutorService
将等到当前任务完成后再开始下一个任务。
如果任务迭代之间必须具有固定长度的延迟,那么可以使用 scheduleWithFixedDelay() 方法 。例如,以下代码将保证当前执行结束
与另一个执行结束之间的暂停时间为 150 毫秒。
service.scheduleWithFixedDelay(task, 100, 150, TimeUnit.MILLISECONDS);
根据scheduleAtFixedRate() 和 scheduleWithFixedDelay() 方法契约,在任务执行期间,如果 ExecutorService 终止了或任务抛出了异
常,那么任务将自动结束。
ExecutorService 或 Fork/Join
Fork/Join 是 Java 7 提供的新框架,在 Java 7 发布之后,许多开发人员都作出了将 ExecutorService 框架替换为 fork/join 框架的决定。
但,这并不总是正确的决定。尽管 fork/join 使用起来更加简单且频繁使用时更带来更快的性能,但开发人员对并发执行的控制量也
有所减少。
使用ExecutorService ,开发人员能够控制生成的线程数以及应由不同线程执行的任务粒度。ExecutorService 的最佳用例是处理独立
任务,例如根据 「 一个任务的一个线程 」 方案的事务或请求。
而相比之下,根据 Oracle 文档,fork/join 旨在简化和加速工作,可以将任务递归地分成更小的部分。
后记
尽管ExecutorService 相对简单,但仍有一些常见的陷阱。我们罗列于此
1、 保持未使用的ExecutorService存活;
本文中对如何关闭 ExecutorService 已经做出了详细解释。
2、 使用固定长度的线程池时设置了错误的线程池容量;使用ExecutorService 最重要的一件事,就是确定应用程序有效执行任务所需的线程数
太大的线程池只会产生不必要的开销,只会创建大多数处于等待模式的线程。
太少的线程池会让应用程序看起来没有响应,因为队列中的任务等待时间很长。
3、在取消任务后调用Future的get()方法;
尝试获取已取消任务的结果将触发 CancellationException 异常。
4、 使用Future的get()方法意外地阻塞了很长时间;应该使用超时来避免意外的等待。
-
2、Java Fork-Join
fork/join 框架是 Java 7 中引入的 ,它是一个工具,通过 「 分而治之 」 的方法尝试将所有可用的处理器内核使用起来帮助
加速并行处理。
在实际使用过程中,这种 「 分而治之 」的方法意味着框架首先要 fork ,递归地将任务分解为较小的独立子任务,直到它们
足够简单以便异步执行。然后,join 部分开始工作,将所有子任务的结果递归地连接成单个结果,或者在返回 void 的任务的
情况下,程序只是等待每个子任务执行完毕。
为了提供有效的并行执行,fork/join 框架使用了一个名为 ForkJoinPool 的线程池,用于管理 ForkJoinWorkerThread 类型的工
作线程。
ForkJoinPool 线程池
ForkJoinPool 是 fork/join 框架的核心,是 ExecutorService 的一个实现,用于管理工作线程,并提供了一些工具来帮助获取
有关线程池状态和性能的信息。
工作线程一次只能执行一个任务。
ForkJoinPool 线程池并不会为每个子任务创建一个单独的线程,相反,池中的每个线程都有自己的双端队列用于存储任务
( double-ended queue )( 或 deque,发音 deck )。
这种架构使用了一种名为工作窃取( work-stealing )算法来平衡线程的工作负载。
工作窃取( work-stealing )算法
要怎么解释 「 工作窃取算法 」 呢 ?
简单来说,就是 空闲的线程试图从繁忙线程的 deques 中 窃取 工作。
默认情况下,每个工作线程从其自己的双端队列中获取任务。但如果自己的双端队列中的任务已经执行完毕,双端队列为空时,
工作线程就会从另一个忙线程的双端队列尾部或全局入口队列中获取任务,因为这是最大概率可能找到工作的地方。
这种方法最大限度地减少了线程竞争任务的可能性。它还减少了工作线程寻找任务的次数,因为它首先在最大可用的工作块上工作。
ForkJoinPool 线程池的实例化
Java 8
在Java 8 中,创建 ForkJoinPool 实例的最简单的方式就是使用其静态方法 commonPool()。
commonPool() 静态方法,见名思义,就是提供了对公共池的引用,公共池是每个 ForkJoinTask 的默认线程池。
根据Oracle 的官方文档,使用预定义的公共池可以减少资源消耗,因为它会阻止每个任务创建一个单独的线程池。
ForkJoinPool commonPool = ForkJoinPool.commonPool();
Java 7
如果要在 Java 7 中实现相同的行为,则需要通过创建 ForkJoinPool 的实例并将其赋值给实用程序类的公共静态字段。
public static ForkJoinPool forkJoinPool = new ForkJoinPool(2);
使用构造函数实例化 ForkJoinPool 时,可以创建具有指定级别的并行性,线程工厂和异常处理程序的自定义线程池。在上面的
示例中,线程池的并行度级别为 2 ,意味着线程池将使用 2 个处理器核心。
然后就可以通过这个公共静态字段轻松的访问 ForkJoinPool 的实例
ForkJoinPool forkJoinPool = PoolUtil.forkJoinPool;
ForkJoinTask
ForkJoinTask 是 ForkJoinPool 线程之中执行的任务的基本类型。我们日常使用时,一般不直接使用 ForkJoinTask ,而是扩展它的
两个子类中的任意一个
1、 任务不返回结果(返回void)的RecursiveAction;
2、 返回值的任务的RecursiveTask<V>
;这两个类都有一个抽象方法 compute() ,用于定义任务的逻辑。
我们所要做的,就是继承任意一个类,然后实现 compute() 方法。
RecursiveAction 使用示例
出于演示目的,示例当然是尽可能的简单,因此,我们的示例,执行了一个比较荒谬的任务:将输入转为大写并记录。
所有的代码如下所示
public class CustomRecursiveAction extends RecursiveAction { private String workload = ""; private static final int THRESHOLD = 4; private static Logger logger = Logger.getAnonymousLogger(); public CustomRecursiveAction(String workload) { this.workload = workload; } @Override protected void compute() { if (workload.length() > THRESHOLD) { ForkJoinTask.invokeAll(createSubtasks()); } else { processing(workload); } } private List<CustomRecursiveAction> createSubtasks() { List<CustomRecursiveAction> subtasks = new ArrayList<>(); String partOne = workload.substring(0, workload.length() / 2); String partTwo = workload.substring(workload.length() / 2, workload.length()); subtasks.add(new CustomRecursiveAction(partOne)); subtasks.add(new CustomRecursiveAction(partTwo)); return subtasks; } private void processing(String work) { String result = work.toUpperCase(); logger.info("This result - (" + result + ") - was processed by " + Thread.currentThread().getName()); }}
在这个示例中,我们使用了一个字符串类型 ( String ) 的名为 workload 属性来表示要处理的工作单元。
同时,为了演示 fork/join 框架的 fork 行为,在该示例中,如果 workload.length() 大于指定的阈值,那么就使用 createSubtask()
方法拆分任务。
在createSubtasks() 方法中,输入的字符串被递归地划分为子串,然后创建基于这些子串的 CustomRecursiveTask 实例。
当递归分割字符串完毕时,createSubtasks() 方法返回 List
<CustomRecursiveAction>
作为结果。然后在compute() 方法中使用 invokeAll() 方法将任务列表提交给 ForkJoinPool 线程池。
我们来总结下创建 RecursiveAction 的步骤:
1、 创建一个表示工作总量的对象;
2、 选择合适的阈值;
3、 定义分割工作的方法;
4、 定义执行工作的方法;类似的,我们可以使用相同的方式开发自己的 RecursiveAction 类。
RecursiveTask 使用示例
对于有返回值的任务,除了将每个子任务的结果在一个结果中合并,其它逻辑和 RecursiveAction 都差不多。
public class CustomRecursiveTask extends RecursiveTask<Integer> { private int[] arr; private static final int THRESHOLD = 20; public CustomRecursiveTask(int[] arr) { this.arr = arr; } @Override protected Integer compute() { if (arr.length > THRESHOLD) { return ForkJoinTask.invokeAll(createSubtasks()) .stream() .mapToInt(ForkJoinTask::join) .sum(); } else { return processing(arr); } } private Collection<CustomRecursiveTask> createSubtasks() { List<CustomRecursiveTask> dividedTasks = new ArrayList<>(); dividedTasks.add(new CustomRecursiveTask( Arrays.copyOfRange(arr, 0, arr.length / 2))); dividedTasks.add(new CustomRecursiveTask( Arrays.copyOfRange(arr, arr.length / 2, arr.length))); return dividedTasks; } private Integer processing(int[] arr) { return Arrays.stream(arr) .filter(a -> a > 10 && a < 27) .map(a -> a * 10) .sum(); }}
在上面这个示例中,任务由存储在 CustomRecursiveTask 类的 arr 字段中的数组表示。
createSubtask() 方法递归地将任务划分为较小的工作,直到每个部分小于阈值。然后,invokeAll()方法将子任务提交给公共拉取
并返回 Future 列表。
要触发执行,需要为每个子任务调用 join() 方法。
上面这个示例中,我们使用了 Java 8 的流 ( Stream ) API , sum() 方法用于将子结果组合到最终结果中。
将任务提交到 ForkJoinPool 线程池中
只要使用很少的方法,就可以把任务提交到 ForkJoinPool 线程池中。
1、 submit()或execute()方法;
这两个方法的调用方式是相同的
forkJoinPool.execute(customRecursiveTask);int result = customRecursiveTask.join();
2、 使用invoke()方法fork任务并等待结果,不需要任何手动连接(join);
int result = forkJoinPool.invoke(customRecursiveTask);
3、 invokeAll()方法是将ForkJoinTasks序列提交给ForkJoinPool的最方便的方法它将任务作为参数(两个任务,varargs或集合),
fork它们,并按照生成它们的顺序返回Future对象的集合;
4、 或者,我们还可以使用单独的fork()和join()方法;fork() 方法将任务提交给线程池,但不会触发任务的执行。
join() 方法则用于触发任务的执行。在 RecursiveAction 的情况下,join() 返回 null,但对于 RecursiveTask
<V>
,它返回任务执行的结果。
customRecursiveTaskFirst.fork(); result = customRecursiveTaskLast.join();
上面的RecursiveTask
<V>
示例中,我们使用 invokeAll() 方法向线程池提交一系列子任务。同样的工作,也可以使用 fork() 和join() 来完成,但这可能会对结果的排序产生影响。
为了避免混淆,当涉及到多个任务且要保证任务的顺序时,通常都是使用 ForkJoinPool.invokeAll() 。
结束语
使用fork/join 框架可以加速处理大型任务,但要实现这一结果,应遵循一些指导原则:
使用尽可能少的线程池。绝大多数情况下,最好的决定是每个应用程序或系统只使用一个线程池。 (是线程池而不是线程)。
当不需要任何调整时,使用默认的公共线程池。
使用合理的阈值。将 ForkJoingTask 任务拆分为子任务。
避免在 ForkJoingTasks 中出现任何阻塞
-
3、Java 线程池 ( Thread Pool )
本文我们将讲解 Java 中的线程池 ( Thread Pool ),从 Java 标准库中的线程池的不同实现开始,到 Google 开发的 Guava 库的前世今生。
本章节涉及到很多前几个章节中阐述的知识点。我们希望你是按照顺序阅读下来的,不然有些知识会一头雾水。
Java 语言的实现中,把 Java 线程一一映射到操作系统级的线程,而后者是操作系统的资源,这意味着,如果开发者毫无节制地
创建线程,那么线程资源就会被快速的耗尽。
在Windows 操作系统上,每个线程要预留出 1m 的内存空间,意味着 2G 的内存理论上做多只能创建 2048 个线程。而在 Linux
上,最大线程数由常量 PTHREAD_THREADS_MAX 决定,一般为 1024。
出于模拟并行性的目的,Java 线程之间的上下文切换也由操作系统完成。因为线程上下文切换需要消耗时间,所以,一个简单的观点
是:产生的线程越多,每个线程花在实际工作上的时间就越少。
为什么会有线程上下文切换?
一台电脑,运行起来后,它的 CPU 是固定的,05 年之前,还是单核时代,也就是一次只能运行一个线程,虽然随着时间的推
移,现在的 CPU 已经有很多个核心,比如 8 核 16 核之类的。但相比于一个应用程序能够创建的线程数,那真的是太少了。而
每个核心一次只能运行一个线程,所以多个线程需要运行时就需要来回不停的在多个线程间切换,这就是线程之间的上下文切换。
为了节制创建线程的数量,也为了节省创建线程的开销,因此提出了线程池的概念。线程池模式有助于节省多线程应用程序中的资源
,还可以在某些预定义的限制内包含并行性。
当我们使用线程池时,我们可以以并行任务的形式编写并发代码并将其提交到线程池的实例中执行。
这个线程池实例控制了多个重用线程以执行这些任务。
这种线程池模式,允许我们控制应用程序创建的线程数,生命周期,以及计划任务的执行并将传入的任务保留在队列中。
Java 中的线程池
Executors、Executor 和 ExecutorService
Executors 是一个帮助类,提供了创建几种预配置线程池实例的方法。如果你不需要应用任何自定义的微调,可以调用这些方法创
建默认配置的线程池,因为它能节省很多时间和代码。
Executor 和 ExecutorService 接口则用于与 Java 中不同线程池的实现协同工作。通常,你应该将代码与线程池的实际实现分离,并
在整个应用程序中使用这些接口。
Executor 接口提供了一个 execute() 方法将 Runnable 实例提交到线程池中执行。
下面的代码是一个快速示例,演示了如何使用 Executors API 获取包含了单个线程池和无限队列支持的 Executor 实例,以便按顺序
执行任务。
Executor executor = Executors.newSingleThreadExecutor();
获取了Executor 示例后,我们就可以使用 execute() 方法将一个只在屏幕上打印 Hello World 的任务提交到队列中执行。
executor.execute(() -> System.out.println("Hello World"));
上面这个示例使用了 lambda ( Java 8特性 )提交任务,JVM 会自动推断该任务为 Runnable
我们在Java Shell 演示下上面的代码
jshell> import java.util.concurrent.*jshell> Executor executor = Executors.newSingleThreadExecutor(); executor ==> java.util.concurrent.Executors$FinalizableDelegatedExecutorService@1e127982jshell> executor.execute(() -> System.out.println("Hello World")); jshell> Hello Worldjshell>
ExecutorService 接口则包含大量用于控制任务进度和管理服务终止的方法。我们可以使用此接口来提交要执行的任务,还可以使用
此接口返回的 Future 实例控制任务的执行。
下面的示例中,我们创建了一个 ExecutorService 的实例,提交了一个任务,然后使用返回的 Future 的 get() 方法等待提交的任务完
成并返回值。
ExecutorService executorService = Executors.newFixedThreadPool(10); Future<String> future = executorService.submit(() -> "Hello World"); // 一些其它操作String result = future.get();
在实际使用时,我们并不会立即调用 future.get() 方法,可能会等待一些时间,推迟调用它直到我们需要它的值用于计算等目的。
ExecutorService 中的 submit() 方法被重载为支持 Runnable 或 Callable ,它们都是功能接口,可以接收一个 lambdas 作为参数(
从 Java 8 开始 ):
使用 Runnable 作为参数的方法不会抛出异常也不会返回任何值 ( 返回 void )
使用 Callable 作为参数的方法则可以抛出异常也可以返回值。
如果想让编译器将参数推断为 Callable 类型,只需要 lambda 返回一个值即可。
ExecutorService 接口的更多使用范例和特性,你可以访问前面的章节 一文秒懂 Java ExecutorService。
-
4、Java 线程池之 ThreadPoolExecutor
因为上一章节篇幅有限,所以我决定把 一文秒懂 Java 线程池 拆分为三篇文章单独介绍。本章节,我们就来看看 ThreadPoolExecutor 。
ThreadPoolExecutor
ThreadPoolExecutor 是一个可被继承 ( extends ) 的线程池实现,包含了用于微调的许多参数和钩子。
我们并不会讨论 ThreadPoolExecutor 类中的所有的参数和钩子,只会讨论几个主要的配置参数:
1、 corePoolSize;
2、 maximumPoolSize;
3、 keepAliveTime;ThreadPoolExecutor 创建的线程池由固定数量的核心线程组成,这些线程在 ThreadPoolExecutor 生命周期内始终存在,除此之外
还有一些额外的线程可能会被创建,并会在不需要时主动销毁。corePoolSize 参数用于指定在线程池中实例化并保留的核心线程
数。如果所有核心线程都忙,并且提交了更多任务,则允许线程池增长到 maximumPoolSize 。
keepAliveTime 参数是额外的线程( 即,实例化超过 corePoolSize 的线程 )在空闲状态下的存活时间。
这三个参数涵盖了广泛的使用场景,但最典型的配置是在 Executors 静态方法中预定义的。
Executors.newFixedThreadPool()
例如,Executors.newFixedThreadPool() 静态方法创建了一个 ThreadPoolExecutor ,它的参数 corePoolSize 和 maximumPoolSize
都是相等的,且参数 keepAliveTime 始终为 0 ,也就意味着此线程池中的线程数始终相同。
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(2); executor.submit(() -> { Thread.sleep(1000); return null;}); executor.submit(() -> { Thread.sleep(1000); return null;}); executor.submit(() -> { Thread.sleep(1000); return null;}); assertEquals(2, executor.getPoolSize()); assertEquals(1, executor.getQueue().size());
上面这个示例中,我们实例化了一个固定线程数为 2 的 ThreadPoolExecutor。这意味着如果同时运行的任务的数量始终小于或等
于 2 ,那么这些任务会立即执行。否则,其中一些任务可能会被放入队列中等待轮到它们。
上面这个示例中,我们创建了三个 Callable 任务,通过睡眠模拟 1000 毫秒的繁重工作。前两个任务将立即执行,第三个任务
必须在队列中等待。我们可以通过在提交任务后立即调用 getPoolSize() 和 getQueue().size() 来方法来验证。
Executors.newCachedThreadPool()
Executors 还提供了 Executors.newCachedThreadPool() 静态方法创建另一个预配置的 ThreadPoolExecutor。该方法创建的线程池没
有任何核心线程,因为它将 corePoolSize 属性设置为 0,但同时有可以创建最大数量的额外线程,因为它将 maximumPo
olSize 设置为 Integer.MAX_VALUE ,且将 keepAliveTime 的值设置为 60 秒。
这些参数值意味着缓存的线程池可以无限制地增长以容纳任何数量的已提交任务。但是,当不再需要线程时,它们将在 60秒不活动
后被销毁。这种线程池的使用场景一般是你的应用程序中有很多短期任务。
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newCachedThreadPool();executor.submit(() -> { Thread.sleep(1000); return null;});executor.submit(() -> { Thread.sleep(1000); return null;});executor.submit(() -> { Thread.sleep(1000); return null;});assertEquals(3, executor.getPoolSize());assertEquals(0, executor.getQueue().size());
上面这个示例中的队列大小始终为 0 ,因为在内部使用了 SynchronousQueue 的实例。在 SynchronousQueue 中,插入和删除操作
总是成对出现且同时发生。因此队列实际上从不包含任何内容。
Executors.newSingleThreadExecutor()
Executors.newSingleThreadExecutor() 静态方法则创建另一种典型的只包含单个线程的 ThreadPoolExecutor 实例。
这种单线程执行程序是创建事件循环的理想选择。
在这个单线程 ThreadPoolExecutor 实例中,属性 corePoolSize 和属性 maximumPoolSize 的值都为 1,而属性 keepAliveTime 的
值为 0 。
在单线程 ThreadPoolExecutor 实例中,所有的任务都按顺序执行。因此,下面的示例中,任务完成后标志的值是 2。
AtomicInteger counter = new AtomicInteger();ExecutorService executor = Executors.newSingleThreadExecutor(); executor.submit(() -> { counter.set(1);});executor.submit(() -> { counter.compareAndSet(1, 2);});
此外,单线程 ThreadPoolExecutor 实例使用了不可变包装器进行修饰,因此在创建后无法重新配置。当然了,这也是我们无法将该
示例强制转换为 ThreadPoolExecutor 的原因。
-
5、ScheduledThreadPoolExecutor
ScheduledThreadPoolExecutor 扩展自 一文秒懂 Java 线程池之 ThreadPoolExecutor 讲解的 了ThreadPoolExecutor 类,并且添加
了其它方法实现了 ScheduledExecutorService 接口。
schedule() 方法允许在指定的延迟后执行一次任务
scheduleAtFixedRate() 方法允许在指定的初始延迟后执行任务,然后以一定的周期重复执行,其中 period 参数用于指定两个任
务的开始时间之间的间隔时间,因此任务执行的频率是固定的。
scheduleWithFixedDelay() 方法类似于 scheduleAtFixedRate() ,它也重复执行给定的任务,但period 参数用于指定前一个任务的
结束和下一个任务的开始之间的间隔时间。也就是指定下一个任务延时多久后才执行。执行频率可能会有所不同,具体取决于执
行任何给定任务所需的时间。
静态方法 Executors.newScheduledThreadPool() 方法用于创建包含了指定 corePoolSize,无上限 maximumPoolSize 和 0 存活时间
keepAliveTime 的 ScheduledThreadPoolExecutor 实例。
例如下面的示例创建了一个包含了 5 个核心线程的 ScheduledThreadPoolExecutor 实例,且每隔 500 毫秒运行一个输出 Hello World
的任务
ScheduledExecutorService executor = Executors.newScheduledThreadPool(5);executor.schedule(() -> { System.out.println("Hello World");}, 500, TimeUnit.MILLISECONDS);
范例 2
下面的代码则演示了如何在 500 毫秒延迟后执行任务,然后每 100 毫秒重复执行一次。
CountDownLatch lock = new CountDownLatch(3); ScheduledExecutorService executor = Executors.newScheduledThreadPool(5); ScheduledFuture<?> future = executor.scheduleAtFixedRate(() -> { System.out.println("Hello World"); lock.countDown();}, 500, 100, TimeUnit.MILLISECONDS); lock.await(1000, TimeUnit.MILLISECONDS); future.cancel(true);
-
6、Java ForkJoinPool
ForkJoinPool 是Java 7 中引入的 fork/join 框架的核心之一。它解决了一个常见的问题: 如何在递归中生成多个任务。因为,
即使是使用一个简单的 ThreadPoolExecutor ,也会在不断的递归中快速耗尽线程。因为每个任务或子任务都需要自己的线程来运行。
在fork/join 框架中,任何任务都可以生成 ( fork ) 多个子任务并使用 join() 方法等待它们的完成。fork/join 框架的好处是它不会为
每个任务或子任务创建新线程,而是实现了 工作窃取 ( Work Stealing ) 算法。关于 fork/join 框架的详细信息,你可以访问我们的
一文秒懂 Java Fork/Join。
接下来,我们看一个使用 ForkJoinPool 遍历节点树并计算所有叶值之和的简单示例。在这个示例中,树是一个由节点,int 值和
一组子节点组成。
static class TreeNode { int value; Set<TreeNode> children; TreeNode(int value, TreeNode... children) { this.value = value; this.children = Sets.newHashSet(children); }}
创建了树 TreeNode 之后,如果我们想要并行地对树中的所有值求和,我们需要实现一个 RecursiveTask
<Integer>
接口。每个任务都接收自己的节点,并将其值添加到其子节点的值之和上。
要计算子节点值的总和,任务实现执行以下操作
1、 将子节点集合转换为流(stream);
2、 映射前面操作中创建的流,为每个元素创建一个新的CountingTask;
3、 通过fork执行每个子任务;
4、 通过在每个fork任务上调用join()方法来收集结果;
5、 使用·Collectors.summingInt收集器对结果求和;public static class CountingTask extends RecursiveTask<Integer> { private final TreeNode node; public CountingTask(TreeNode node) { this.node = node; } @Override protected Integer compute() { return node.value + node.children.stream() .map(childNode -> new CountingTask(childNode).fork()) .collect(Collectors.summingInt(ForkJoinTask::join)); }}
在树上运行计算的代码非常简单:
TreeNode tree = new TreeNode(5, new TreeNode(3), new TreeNode(2, new TreeNode(2), new TreeNode(8))); ForkJoinPool forkJoinPool = ForkJoinPool.commonPool(); int sum = forkJoinPool.invoke(new CountingTask(tree));
-
7、Java Google Guava 实现
Guava 是托管在 Github.com 上的流行的 Google 开源的 Java 线程池库。
Guava 包含了许多有用的并发类,同时还包含了几个方便的 ExecutorService 实现,但这些实现类都无法通过直接实例化或子类化
来创建实例。取而代之的是提供了 MoreExecutors 助手类来创建它们的实例。
给 Maven 添加 Guava 依赖
为了将Google Guava 库包含进当前的项目中,需要将下面的依赖项添加到 Maven pom 文件中。
<dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>26.0</version> </dependency>
你可以在 Maven 中央仓库 中找到最新版本的 Guava 库
直接执行者和直接执行者服务
有时候,我们希望在当前线程或线程池中执行任务,具体在哪里取决于某些条件。这种情况下,你应该会更喜欢使用单个
Executor 接口,且只需切换实现即可。
虽然将当前线程中的任务的 Executor 或 ExecutorService 的提取出来单独实现并不困难,但它仍然需要编写一些样板代码。
值得庆幸的是,Guava 为我们提供了预定义的实例。
下面的范例演示了如何在同一个线程中执行任务。简单起见,提交的任务会将当前线程休眠 500 毫秒并阻塞当前线程,并在执
行的调用完成后让结果立即可用
Executor executor = MoreExecutors.directExecutor(); AtomicBoolean executed = new AtomicBoolean(); executor.execute(() -> { try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } executed.set(true);});assertTrue(executed.get());
directExecutor() 方法返回的实例实际上是一个静态单例,因此使用此方法根本不会在对象创建上带来任何开销。
你应该更喜欢使用此方法来访问 MoreExecutors.newDirectExecutorService(),因为该 API 会在每次调用时创建完整的执行程序服务
实现。
退出 Executor 服务
另一个常见问题是: 在线程池仍在运行其任务时关闭虚拟机。即使采用了取消机制,也无法保证任务执行良好,并在执行程序服
务 ( Executor )关闭时停止工作。这可能会导致 JVM 在任务继续工作时无限期挂起。
为了解决这个问题,Guava 引入了一系列已经实例化好的执行器 ( Executor ) 服务。它们是守护线程模式,但会与 JVM 一起终止。
这些执行器服务还提供了 Runtime.getRuntime().addShutdownHook() 方法用于添加一个关闭钩子,用于设置 VM 在放弃挂起的任
务之前等待一段预配置的超时时间。
下面的示例中,我们提交了一个无限循环的任务,我们使用了包含 100 毫秒超时时间的已经存在的执行程序服务来运行任务,并在
超过配置的超时时间之后终止 VM 。如果没有 exitingExecutorService ,此任务将导致 VM 无限期挂起。
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(5); ExecutorService executorService = MoreExecutors.getExitingExecutorService(executor, 100, TimeUnit.MILLISECONDS);executorService.submit(() -> { while (true) { }});
监听装饰器
监听装饰器允许我们封装 ExecutorService 并在提交任务时返回 ListenableFuture 实例而不是简简单单的 Future 实例。
ListenableFuture 接口扩展自 Future 接口,并添加了一个新方法 addListener(),该方法用于添加在将来完成时调用的侦听器。
一般情况下,我们很少直接使用 ListenableFuture.addListener() 方法,而是使用 Futures 类提供的许多辅助方法。例如,通过
Futures.allAsList() 方法,我们可以在单个 ListenableFuture 中组合多个 ListenableFuture 实例,并会在这些实例在成功完成后
将所有的 futures 合并并返回结果。
ExecutorService executorService = Executors.newCachedThreadPool(); ListeningExecutorService listeningExecutorService = MoreExecutors.listeningDecorator(executorService);ListenableFuture<String> future1 = listeningExecutorService.submit(() -> "简单");ListenableFuture<String> future2 = listeningExecutorService.submit(() -> "教程");String greeting = Futures.allAsList(future1, future2).get() .stream() .collect(Collectors.joining(""));assertEquals("DDKK.COM 弟弟快看,程序员编程资料站", greeting);
-
8、并发编程实战
应聘Java 岗,总是免不了几个 Java 并发编程的面试题,不过大多数都局限在 java.util.concurrent 包下的知识和实现问题。
本文针对 Java 并发相关的常见的面试题做一些解释。
Q1: 进程和线程的区别?
这是一个非常基础的面试题,如果这道题没有回答的比较满意,一般情况下,面试官会认为应聘者在并发方面的基础知
识不牢固,就不会继续深入询问其它并发问题了。
1、 进程和线程都是并发单元,但它们有一个根本区别:** 进程不共享公共内存,而线程则共享**;
2、 从操作系统的角度来看,进程是一个独立的软件,在其自己的虚拟内存空间中运行任何一个多任务操作系统(这几乎意味着任何现代操作系统)都必须将内存中的进程分开,这样一个失败的进程就不会通过加扰公共内存来拖累所有其它进程因此,进程通常
是隔离的,它们通过进程间通信进行协作,进程间通信由操作系统定义为一种中间API;
3、 相反,线程是应用程序的一部分,它与同一应用程序的其他线程共享公共内存使用公共内存可以减少大量开销,因此使用线程可以更快的交换数据和进行线程间协作;
关于进程间通讯那一块可以不用回答,如果你不懂的话,不必然会导致接下来的某个问题是 进程间通讯的的原理.
Q2: 如何创建一个线程实例并且运行它?
这道题考察的是对 Runnable 的理解。
创建一个线程的实例,有两种方法可供选择:
1、 把Runnable的实例传递给Thread的构造函数并调用start()方法;
Thread thread1 = new Thread(() -> System.out.println("Hello World from Runnable!"));thread1.start();
Runnable是一个函数接口,因此可以作为 lambda 表达式传递
2、 因为线程本身也实现了Runnable接口,所以另一种创建线程的方法是创建一个匿名子类,覆写它的run()方法,然后调用start();Thread thread2 = new Thread() { @Override public void run() { System.out.println("Hello World from subclass!"); }};thread2.start();
Q3: 描述线程的不同状态以及何时发生状态转换 ?
这道题考察的是对线程生命周期的理解。
1、 一般情况下,我们会使用Thread.getState()方法检查线程(Thread)的状态;
2、 线程的不同状态都定义在Thread.State枚举中;
3、 线程的所有状态如下所示;1、 NEW:一个尚未调用Thread.start()方法启动的新Thread实例;
2、 RUNNABLE:一个正在运行的线程它被称为runnable,因为在任何给定时间,它要么正在运行要么在等待线程调度当调用Thread.start()方法时,会将一个NEW线程进入RUNNABLE状态;
3、 BLOCKED:如果正在运行的线程需要进入同步部分但由于另一个线程持有此部分的监视器而无法执行此操作,则该线程将被阻塞;
4、 WAITING:如果线程等待另一个线程执行特定操作,则该线程进入此状态例如,一个线程在它持有的监视器上调用Object.wait()法时进入此状态,或者在另一个线程上调用Thread.join()方法也会进入此状态;
5、 IMED_WAITING:跟WAITING状态差不多但线程在调用Thread.sleep()、Object.wait()、或Thread.join()和其他一些方法的定时版本后进入此状态;
6、 TERMINATED:当一个线程已经完成它的Runnable.run()方法的执行并终止时进入此状态;Q4: Runnable 和 Callable 接口有什么区别?它们是如何使用的?
1、 Runnable接口表示必须在单独的线程中运行的计算单位,它只有一个run()方法Runnable接口不允许此方法返回值或抛出未经
检查的异常;
2、 Callable接口表示具有返回值的任务,它只有一个call()方法call()方法可以返回一个值(可以是Void),也可以抛出一个异常Callable通常在ExecutorService实例中用于启动异步任务,然后调用返回的Future实例以获取其值;
Q5: 什么是守护线程,它的使用场景是什么?如何创建守护线程 ?
1、 守护线程是一个不阻止Java虚拟机(JVM)退出的线程当所有非守护线程终止时,JVM只是放弃所有剩余的守护线程;
2、 守护线程通常用于为其他线程执行一些支持或服务任务,但我们应该考虑到它们可能随时被放弃;
3、 要将一个线程作为守护线程启动,应该在调用start()之前使用setDaemon()方法设置为守护线程如下所示;Thread daemon = new Thread(() -> System.out.println("Hello from daemon!"));daemon.setDaemon(true);daemon.start();
奇怪的是,如果将上面的代码放在 main() 内运行,则可能无法打印该消息。而发生这种情况的原因,是因为 main() 线程在守护
线程运行到打印消息之前就已经终止。
我们不应该在守护线程中执行任何 I/O 操作,因为它们甚至无法执行其 finally 块并在被放弃时关闭资源。
Q6: 什么是 Thread 的中断标志?怎么设置和检查它?它与 InterruptedException
有什么关系?
1、 中断(interrupt)标志或中断状态是线程中断时设置的内部线程标志(flag属性);
2、 要设置一个线程的中断标志,只需要简单的在线程对象上调用thread.interrupt()方法;
3、 如果在某个方法内部的一个线程抛出了InterruptedException(wait、join、sleep等),那么此方法会立即抛出InterruptedException线程可以根据自己的逻辑自由处理此异常如果一个线程不在这样的方法中并且调用了thread.interrupt(),则不会发生任何
特殊情况;
4、 线程的中断状态可以通过使用静态Thread.interrupted()方法或实例的isInterrupted()方法定期检查这两个方法的区别是静态Thread.interrupt()会清除了中断标志,而isInterrupted()则不会;
Q7: 什么是 Executor 和 ExecutorService ?这两个接口有什么区别?
1、 Executor和ExecutorService是java.util.concurrent框架提供的两个相关接口;
2、 Executor是一个非常简单的接口,只有一个execute()方法接受Runnable实例来执行在大多数情况下,这是我们的任务执行代码应该依赖的接口;
3、 ExecutorService扩展了Executor接口,并且添加了许多其它方法以处理和检查并发任务执行服务的生命周期(在关闭时终止任务)和更复杂的异步任务处理,包括Futures;
更多 Executor 和 ExecutorService 的知识,可以访问 一文秒懂 Java ExecutorService。
Q8: java.util.concurrent 标准库中 ExecutorService 的可用实现是什么 ?
这是一个非常变 tai 的问题。问这个问题的面试官,你想咋样啊 ?
ExecutorService 接口有三个标准实现
1、 ThreadPoolExecutor:使用线程池执行任务一旦某个线程完成执行任务,它就会回到线程池中如果池中的所有线程都忙,则任
务必须等待轮到它;
2、 ScheduledThreadPoolExecutor:允许安排任务执行,而不是简单的在线程可用时立即运行任务它还可以按固定频率或固定延迟安排任务;
3、 ForkJoinPool:是一个特殊的ExecutorService,用于处理递归算法任务如果你使用常规ThreadPoolExecutor进行递归算法,那么你很快发现所有线程都在忙着等待较低级别的递归完成ForkJoinPool实现了所谓的工作窃取算法,允许它更有效地使用可用线程;
Q9: 什么是 Java 内存模型( JMM )?描述下其目的和基本思想
Java 内存模式是 Java 语言规范的一部分,在 [第 17.4 章][17.4] 中描述。
JMM规定了多个线程如何访问并发 Java 应用程序中的公共内存,以及一个线程的数据更改如何对其他线程可见。
是不是很简单,虽然简短又简洁,但如果没有强大的数学背景,JMM 可能很难掌握。
对内存模型的需求源于这样一个事实:** Java 代码访问数据的方式并不像它在底层实际发生的那样**。
在保证内存读写的可观察结果是相同的情况下,Java 编译器,JIT 编译器甚至 CPU 都可以对内存读写进行重新排序或优化。
当我们的应用程序扩展到多个线程时,这会导致反直觉的结果,因为大多数这些优化只会考虑单个执行线程( 跨线程优化器仍
然非常难以实现 )。
另一个可怕的问题是现代系统中的内存是多层的:** 处理器的多个内核可能会在其缓存或读/写缓冲区中保留一些非刷新数据,这
也会影响从其它内核观察到的内存状态**。
更糟糕的是,不同内存访问架构的存在将打破Java 「 一次编写,随处运行 」 的承诺。
但另所有 Java 程序员高兴的是,JMM 指定了在设计多线程应用程序时可能依赖的一些保证。坚持这些保证有助于程序员编写在
各种体系结构之间稳定且可移植的多线程代码。
JMM的主要概念是:
动作 ( Action ) : 这些是线程间的动作,可以由一个线程执行并由另一个线程检测,如读取或写入变量,锁定/解锁监视器等等。
同步动作 ( Synchronization actions ) : 某个动作子集,例如读取/写入易失性变量,或锁定/解锁监视器。
程序顺序 ( Program Order ) : 俗称 PO,单个线程内可观察的动作总顺序。
同步顺序 ( Synchronization Order ) : 俗称 SO,所有同步操作之间的总顺序 – 它必须与程序顺序一致,也就是说,如果两个
同步操作在PO 中一个接一个地出现,它们在 SO 中以相同的顺序出现 。
同步与( synchronizes-with) : 俗称 SW ,某些同步操作之间的关系,例如解锁监视器和锁定同一监视器( 在另一个或同一
个线程中 )。
发生在顺序之前 ( Happens-before Order ) : 将 PO 与 SW 结合( 在集合论中称为传递闭包 ),以创建线程之间所有动作的部
分排序。如果一个动作发生在另一个动作之前,则第二个动作可以观察到第一个动作的结果( 例如,在一个线程中写入变量并
在另一个线程中读取 )。
发生在一致性之前 ( Happens-before consistency ) : 如果每次读取都遵循先前发生的顺序中对该位置的最后一次写入,或者
通过数据竞争进行其他一些写入操作,则一组操作是 HB 一致的。
执行 ( Execution ) : 它们之间有一组有序的动作和一致性规则
对于给定的程序,我们可以观察到具有各种结果的多个不同的执行.但是如果一个程序正确同步,那么它的所有执行似乎都是顺序一
致的,这意味着我们可以将多线程程序推断为一系列按顺序发生的动作。这样可以省去考虑引擎盖下重新排序,优化或数据缓存的麻烦。
如果你了解协程,相关的概念和协程很相像的。
Q10: 什么是易失 ( volatile ) 字段,JMM 对这样的领域有什么保证?
根据Java 内存模型 ( 参见 Q9 ) ,volatile 字段具有特殊属性。volatile 变量的读取和写入是同步操作,这意味着它们具有总排序
( 所有线程将遵循这些操作的一致顺序 )。根据此顺序,保证读取 volatile 变量可以观察到对此变量的最后一次写入。
如果你有一个从多个线程访问的字段,且至少有一个线程写入它,那么你应该考虑使它变得 volatile ,否则某个线程从这个字段读
取的内容并不会得到一丝的保证。
volatile 的另一个保证是写入和读取 64 位值( long 类型和 double 类型 )的原子性。如果没有 volatile 修饰符,读取此类字段可能
会观察到另一个线程部分写入的值。
Q11: 以下哪项操作是原子操作 ?
写入一个非 volatile int 类型
写入一个 volatile int 类型
写入一个非 volatile long 类型
写入一个 volatile long 类型
递增一个 volatile long 类型
是不是瞬间蒙了?我们来解释一下
1、 对int类型(32位)变量的写入保证是原子的,无论它是否是易失性的;
2、 long类型(64位)变量可能需要在两个单独的步骤中写入,例如,在32位体系结构上,因此默认情况下,没有原子性保证但是,如果添加了volatile修饰符,则保证以原子方式访问long变量;
3、 递增操作通常由多个步骤完成(检索值,更改它并写回),因此它永远不会保证是原子的,变量是易变的如果要实现值的原子增量,则应使用类AtomicInteger,AtomicLong等;
Q12: JMM 对添加了 final 修饰符的类的字段有什么特殊保证 ?
JVM基本上会保证在任何线程获取对象之前初始化类的 final 字段。
如果没有这种保证,由于重新排序或其他优化,在初始化该对象的所有字段之前,可以向另一个线程发布对象的引用,即变得可见。
这可能会导致对这些字段的访问。
这就是为什么在创建不可变对象时,应始终将其所有字段设为 final,即使它们不能通过 getter 方法访问。
Q13: 方法定义中 synchronized 关键字的含义是什么?静态方法?在一个块之前 ?
块(block ) 之前的 synchronized 关键字表示进入该块的任何线程都必须获取监视器( 括号中的对象 )。
synchronized(object) { // ...}如果监视器已被另一个线程获取,则前一个线程将进入 BLOCKED 状态并等待监视器被释放。 同步实例方法具有相同的语义,但会使用实例本身充当监视器。 ```javasynchronized void instanceMethod() { // ...}
对于静态同步方法,监视器是表示声明类的 Class 对象。
static synchronized void staticMethod() { // ...}
Q14: 如果两个线程同时在不同的对象实例上调用 synchronized 方法,这些
线程中的一个是否会阻塞?如果该方法是静态的,该怎么办?
如果方法是实例方法,则实例充当方法的监视器。在不同实例上调用该方法的两个线程获取不同的监视器,因此它们都不会
被阻塞。
如果方法是静态的,则监视器是 Class 对象。对于两个线程,监视器是相同的,因此其中一个可能会阻塞并等待另一个退出
synchronized 方法。
Q15: Object类的 wait,notify 和 notifyAll 方法的目的是什么 ?
拥有对象监视器的线程( 例如,已进入由对象保护的同步部分的线程 )可以调用 object.wait() 来临时释放监视器并为其他线程提
供获取监视器的机会。例如,这可以在等待某个条件的情况下完成。
当另一个获取监视器的线程满足条件时,它可以调用 object.notify() 或 object.notifyAll() 并释放监视器。notify() 方法唤醒处于等
待状态的单个线程,notifyAll() 方法唤醒等待此监视器的所有线程,并且它们都竞争重新获取锁定。
下面的BlockingQueue 实现演示了多个线程如何通过 wait-notify 模式一起工作。如果我们将一个元素放入一个空队列,那么在
take() 方法中等待的所有线程都会唤醒并尝试接收该值。如果我们将一个元素放入一个已经满了的队列,put() 方法将等待对
get() 方法的调用。get() 方法删除一个元素,并通知在 put() 方法中等待队列对新项目有空位置的线程。
public class BlockingQueue<T> { private List<T> queue = new LinkedList<T>(); private int limit = 10; public synchronized void put(T item) { while (queue.size() == limit) { try { wait(); } catch (InterruptedException e) {} } if (queue.isEmpty()) { notifyAll(); } queue.add(item); } public synchronized T take() throws InterruptedException { while (queue.isEmpty()) { try { wait(); } catch (InterruptedException e) {} } if (queue.size() == limit) { notifyAll(); } return queue.remove(0); }}
Q16: 描述死锁,存活锁和饥饿的条件。描述这些情况的可能原因 ?
死锁 ( DeadLock ) 是一组无法进行的线程中的条件,因为组中的每个线程都必须获取已由组中的另一个线程获取的
某些资源。最简单的情况是两个线程需要锁定两个资源才能进行,第一个资源已被一个线程锁定,第二个资源已被另一
个线程锁定。因为这些线程永远不会获得对两个资源的锁定,因此永远不会进展。
存活锁 ( LiveLock ) 是多线程对自己生成的条件或事件做出反应的一种情况。事件发生在一个线程中,必须由另一个线
程处理。在此处理期间,发生的新事件必须在第一个线程中处理,依此类推。这样的线程是活着的并且没有被阻挡,但
是仍然没有取得任何进展,因为它们用无用的工作压倒了对方
饥饿锁 ( Starvation ) 是线程无法获取资源的情况,因为其他线程(或多个线程)占用它太长时间或具有更高的优先级。
线程无法取得进展,因此无法完成有用的工作。
Q17: 描述 fork/join 框架的用途和用例
fork/join 框架允许并行化递归算法。使用 ThreadPoolExecutor 之类的并行递归的主要问题是,可能会快速耗尽线程,因为每个递
归步骤都需要自己的线程,而堆栈中的线程将处于空闲状态并等待。
fork/join 框架入口点是 ForkJoinPool 类,它是 ExecutorService 的一个实现。它实现了工作窃取算法,空闲线程会试图从忙线程中
「 窃取 」 工作。这允许在不同线程之间传播计算并在使用比通常的线程池所需的更少的线程时取得进展
-
9、Java CountDownLatch
本章节我们来讨论下 java.util.concurrent.CountDownLatch 这个类,顺带演示下如何在一些实际例子中使用它。
CountDownLatch 类的作用呢? 怎么说呢? 简单来说,我们可以使用它来阻塞线程,直到其他线程完成给定任务。
并发编程中使用 CountDownLatch
简而言之,CountDownLatch 有一个计数器字段,我们可以根据需要减少它,因此,我们可以使用它来阻止调用线程,直到它被计
数到零。
如果我们正在进行一些并行处理,我们可以使用与计数器相同的值来实例化 CountDownLatch,因为我们想要处理多个线程。然后,
我们可以在每个线程完成后调用 countdown(),保证调用 await() 的依赖线程将阻塞,直到工作线程完成。
使用 CountDownLatch 等待线程池完成
我们通过创建一个 Worker 来尝试这个模式,并使用 CountDownLatch 字段来指示它何时完成
public class Worker implements Runnable { private List<String> outputScraper; private CountDownLatch countDownLatch; public Worker(List<String> outputScraper, CountDownLatch countDownLatch) { this.outputScraper = outputScraper; this.countDownLatch = countDownLatch; } @Override public void run() { doSomeWork(); outputScraper.add("Counted down"); countDownLatch.countDown(); }}
然后,我们创建一个测试,以证明我们可以让 CountDownLatch 等待 Worker 实例完成
@Testpublic void whenParallelProcessing_thenMainThreadWillBlockUntilCompletion() throws InterruptedException { List<String> outputScraper = Collections.synchronizedList(new ArrayList<>()); CountDownLatch countDownLatch = new CountDownLatch(5); List<Thread> workers = Stream .generate(() -> new Thread(new Worker(outputScraper, countDownLatch))) .limit(5) .collect(toList()); workers.forEach(Thread::start); countDownLatch.await(); outputScraper.add("Latch released"); assertThat(outputScraper) .containsExactly( "Counted down", "Counted down", "Counted down", "Counted down", "Counted down", "Latch released" );}
上面这个示例中,"Latch release" 将始终是最后一个输出 – 因为它取决于 CountDownLatch 的释放。
注意,如果我们没有调用 await() 方法,我们将无法保证线程执行的顺序,因此测试会随机失败。
在等待开始的线程池中使用 CountDownLatch
我们重用前面的示例,但是这次开启了了数千个线程而不是 5 个线程,很可能许多早期的线程在后面的线程上调用 start() 之前已经
完成了处理。这可能会使尝试重现并发问题变得困难,因为我们无法让所有线程并行运行。
为了解决这个问题,我们让 CountdownLatch 的工作方式与上一个示例有所不同。在某些子线程完成之前,我们可以阻止每个子线
程直到所有其他子线程都启动,而不是阻塞父线程。
我们把上一个示例的 run() 方法修改下,使其在处理之前阻塞
public class WaitingWorker implements Runnable { private List<String> outputScraper; private CountDownLatch readyThreadCounter; private CountDownLatch callingThreadBlocker; private CountDownLatch completedThreadCounter; public WaitingWorker( List<String> outputScraper, CountDownLatch readyThreadCounter, CountDownLatch callingThreadBlocker, CountDownLatch completedThreadCounter) { this.outputScraper = outputScraper; this.readyThreadCounter = readyThreadCounter; this.callingThreadBlocker = callingThreadBlocker; this.completedThreadCounter = completedThreadCounter; } @Override public void run() { readyThreadCounter.countDown(); try { callingThreadBlocker.await(); doSomeWork(); outputScraper.add("Counted down"); } catch (InterruptedException e) { e.printStackTrace(); } finally { completedThreadCounter.countDown(); } }}
接下来,我们修改下测试,直到所有工人都已启动,解锁工人,然后阻止,直到工人完成
@Testpublic void whenDoingLotsOfThreadsInParallel_thenStartThemAtTheSameTime() throws InterruptedException { List<String> outputScraper = Collections.synchronizedList(new ArrayList<>()); CountDownLatch readyThreadCounter = new CountDownLatch(5); CountDownLatch callingThreadBlocker = new CountDownLatch(1); CountDownLatch completedThreadCounter = new CountDownLatch(5); List<Thread> workers = Stream .generate(() -> new Thread(new WaitingWorker( outputScraper, readyThreadCounter, callingThreadBlocker, completedThreadCounter))) .limit(5) .collect(toList()); workers.forEach(Thread::start); readyThreadCounter.await(); outputScraper.add("Workers ready"); callingThreadBlocker.countDown(); completedThreadCounter.await(); outputScraper.add("Workers complete"); assertThat(outputScraper) .containsExactly( "Workers ready", "Counted down", "Counted down", "Counted down", "Counted down", "Counted down", "Workers complete" );}
这种模式对于尝试重现并发错误非常有用,可以用来强制数千个线程尝试并行执行某些逻辑。
让 CountdownLatch 尽早结束
有时,我们可能会遇到一个情况,即在 CountdownLatch 倒计时之前,Workers 已经终止了错误。这可能导致它永远不会达到零并
且 await() 永远不会终止。
@Overridepublic void run() { if (true) { throw new RuntimeException("Oh dear, I'm a BrokenWorker"); } countDownLatch.countDown(); outputScraper.add("Counted down");}
我们修改下之前的测试以使用 BrokenWorker,来演示 await() 将如何永久阻塞
@Testpublic void whenFailingToParallelProcess_thenMainThreadShouldGetNotGetStuck() throws InterruptedException { List<String> outputScraper = Collections.synchronizedList(new ArrayList<>()); CountDownLatch countDownLatch = new CountDownLatch(5); List<Thread> workers = Stream .generate(() -> new Thread(new BrokenWorker(outputScraper, countDownLatch))) .limit(5) .collect(toList()); workers.forEach(Thread::start); countDownLatch.await();}
显然,这不是我们想要的行为 – 应用程序继续比无限阻塞要好得多。
为了解决这个问题,我们在调用 await() 时添加一个超时参数。
boolean completed = countDownLatch.await(3L, TimeUnit.SECONDS);assertThat(completed).isFalse();
然后,我们可以看到,测试最终会超时,await() 将返回 false
-
10、Java BlockingQueue
本文中,我们将介绍一个 java.util.concurrent 包提供的用于解决并发生产者 – 消费者问题的最有用的类 – BlockQueue。
我们将介绍BlockingQueue 接口的 API 以及如何使用该接口的方法使编写并发程序更容易。
在本文的后面,我们将展示一个具有多个生产者线程和多个消费者线程的简单程序的示例。
BlockingQueue 的队列类型
java.util.concurrent 提供了两种类型的 BlockingQueue:
1、 无限队列(unboundedqueue)–几乎可以无限增长;
2、 有限队列(boundedqueue)–定义了最大容量;无限队列
创建一个无限队列的方法很简单
BlockingQueue<String> blockingQueue = new LinkedBlockingDeque<>();
上面这段代码中,blockingQueue 的容量将设置为 Integer.MAX_VALUE 。
向无限队列添加元素的所有操作都将永远不会阻塞,因此它可以增长到非常大的容量。
使用无限 BlockingQueue 设计生产者 – 消费者模型时最重要的是 消费者应该能够像生产者向队列添加消息一样快地消费消息 。
否则,内存可能会填满,然后就会得到一个 OutOfMemory 异常。
有限队列
第二种类型的队列是有限队列。我们可以通过将容量作为参数传递给构造函数来创建这样的队列
BlockingQueue<String> blockingQueue = new LinkedBlockingDeque<>(10);
上面这句代码中,我们设置了 blockingQueue 的容量为 10 。这意味着当消费者尝试将元素添加到已经满了的队列时,结果取决
于添加元素的方法( offer() 、add() 、put() ) ,它将阻塞,直到有足够的空间可以插入元素。否则,添加操作将会失败。
使用有限队列是设计并发程序的好方法,因为当我们将元素插入到已经满了的队列时,这些操作需要等到消费者赶上并在队列中
提供一些空间。这种机制可以让那个我们不做任何其它更改就可以实现节流。
BlockingQueue API
BlockingQueue 接口的所有方法可以分为两大类:负责向队列添加元素的方法和检索这些元素的方法。
在队列满/空的情况下,来自这两个组的每个方法的行为都不同。
添加元素
BlockingQueue 提供了以下方法用于添加元素
方法 说明 add() 如果插入成功则返回 true,否则抛出 IllegalStateException 异常 put() 将指定的元素插入队列,如果队列满了,那么会阻塞直到有空间插入 offer() 如果插入成功则返回 true,否则返回 false offer(E e, long timeout, TimeUnit unit) 尝试将元素插入队列,如果队列已满,那么会阻塞直到有空间插入 检索元素
BlockingQueue 提供了以下方法用于检索元素
方法 说明 take() 获取队列的头部元素并将其删除,如果队列为空,则阻塞并等待元素变为可用 poll(long timeout, TimeUnit unit) 检索并删除队列的头部,如有必要,等待指定的等待时间以使元素可用,如果超时,则返回 null 在构建生产者 – 消费者程序时,这些方法是 BlockingQueue 接口中最重要的构建块。
多线程生产者 – 消费者示例
接下来我们创建一个由两部分组成的程序 – 生产者 ( Producer ) 和消费者 ( Consumer ) 。
生产者将生成一个 0 到 100 的随机数,并将该数字放在 BlockingQueue 中。我们将创建 4 个线程用于生成随机数并使用 put() 方法
阻塞,直到队列中有可用空间。
需要记住的重要一点是,我们需要阻止我们的消费者线程无限期地等待元素出现在队列中。
从生产者向消费者发出信号的好方法是,不需要处理消息,而是发送称为毒 ( poison ) 丸 ( pill ) 的特殊消息。 我们需要发送尽
可能多的毒 ( poison ) 丸 ( pill ) ,因为我们有消费者。然后当消费者从队列中获取特殊的毒 ( poison ) 丸 ( pill )消息时,
它将优雅地完成执行。
我们来看以下生产者的代码
public class NumbersProducer implements Runnable { private BlockingQueue<Integer> numbersQueue; private final int poisonPill; private final int poisonPillPerProducer; public NumbersProducer(BlockingQueue<Integer> numbersQueue, int poisonPill, int poisonPillPerProducer) { this.numbersQueue = numbersQueue; this.poisonPill = poisonPill; this.poisonPillPerProducer = poisonPillPerProducer; } public void run() { try { generateNumbers(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } private void generateNumbers() throws InterruptedException { for (int i = 0; i < 100; i++) { numbersQueue.put(ThreadLocalRandom.current().nextInt(100)); } for (int j = 0; j < poisonPillPerProducer; j++) { numbersQueue.put(poisonPill); } }}
我们的生成器构造函数将 BlockingQueue 作为参数,用于协调生产者和使用者之间的处理。我们看到方法 generateNumbers() 将
100 个元素放入队列中。它还需要有毒 ( poison ) 丸 ( pill ) 消息,以便知道在执行完成时放入队列的消息类型。
该消息需要将 poisonPillPerProducer 次放入队列中。
每个消费者将使用 take() 方法从 BlockingQueue 获取一个元素,因此它将阻塞,直到队列中有一个元素。从队列中取出一个 Integer
后,它会检查该消息是否是毒 ( poison ) 丸 ( pill ) ,如果是,则完成一个线程的执行。否则,它将在标准输出上打印出结果以
及当前线程的名称。
这将使我们深入了解消费者的内部运作机制
public class NumbersConsumer implements Runnable { private BlockingQueue<Integer> queue; private final int poisonPill; public NumbersConsumer(BlockingQueue<Integer> queue, int poisonPill) { this.queue = queue; this.poisonPill = poisonPill; } public void run() { try { while (true) { Integer number = queue.take(); if (number.equals(poisonPill)) { return; } System.out.println(Thread.currentThread().getName() + " result: " + number); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }}
需要注意的重要事项是队列的使用。与生成器构造函数中的相同,队列作为参数传递。我们可以这样做,是因为 BlockingQueue
可以在线程之间共享而无需任何显式同步。
既然我们有生产者和消费者,我们就可以开始我们的计划。我们需要定义队列的容量,并将其设置为 100 个元素。
我们希望有 4 个生产者线程,并且有许多消费者线程将等于可用处理器的数量
int BOUND = 10; int N_PRODUCERS = 4; int N_CONSUMERS = Runtime.getRuntime().availableProcessors(); int poisonPill = Integer.MAX_VALUE; int poisonPillPerProducer = N_CONSUMERS / N_PRODUCERS; int mod = N_CONSUMERS % N_PRODUCERS; BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(BOUND); for (int i = 1; i < N_PRODUCERS; i++) { new Thread(new NumbersProducer(queue, poisonPill, poisonPillPerProducer)).start(); } for (int j = 0; j < N_CONSUMERS; j++) { new Thread(new NumbersConsumer(queue, poisonPill)).start(); } new Thread(new NumbersProducer(queue, poisonPill, poisonPillPerProducer + mod)).start();
BlockingQueue 是使用具有容量的构造创建的。我们正在创造 4 个生产者和 N 个消费者。我们将我们的毒 ( poison ) 丸 ( pill )
消息指定为 Integer.MAX_VALUE,因为我们的生产者在正常工作条件下永远不会发送这样的值。这里要注意的最重要的事情是
BlockingQueue 用于协调它们之间的工作。
当我们运行程序时,4 个生产者线程将随机整数放入 BlockingQueue 中,消费者将从队列中获取这些元素。每个线程将打印到标准
输出线程的名称和结果。
-
11、Java java.util.concurrent.Locks
对于Java 来讲,锁 ( Lock ) 是一种比标准同步块 ( synchronized block ) 更灵活,更复杂的线程同步机制。
其实,Java 1.5 就已经存在 Lock 接口了。这个 Lock 接口在 java.util.concurrent.lock 包中定义,提供了大量的锁操作。
本文中,我们将讲解 Lock 接口的不同实现并介绍如何在应用程序中使用锁。
锁 ( lock ) 和同步块 ( synchronized block ) 之间的差异
使用synchronized 块和使用 Lock API 之间几乎没有区别:
同步块完全包含在方法中 : 在独立的方法中,我们可以使用 Lock 提供的 lock() 和 unlock() 实现锁和解锁操作。
同步块不支持公平竞争,任何线程都可以获取释放的锁定,且不能指定优先级。但锁 ( Lock ) 就不一样了,可以通过指定公
平属性来实现 Lock 中的公平性。这可以确保最长的等待线程被授予锁定权限。
如果线程无法访问同步块,则会阻塞该线程。Lock 则提供了 tryLock() 方法。线程只有在可用且不被任何其他线程保持时
才获取锁定。这减少了线程等待锁定的阻塞时间。
处于 「 等待 」 状态以获取对同步块的访问的线程不能被中断。Lock 提供了一个 lockInterruptibly() 方法,可用于在等待锁
定时中断线程。
从上面的对比来看,同步块的所有机制,锁 ( Lock ) 都有相应的 API 对应。
Lock API
我们来看看 Lock 接口提供了哪些方法:
方法 说明 void lock() 尝试获取锁(如果可用),如果锁不可用,则线程会被阻塞,直到锁被释放 oid lockInterruptibly() 类似于 lock()
,但它允许被阻塞的线程被中断并通过抛出的java.lang.InterruptedException
恢复执行boolean tryLock() lock()
方法的非阻塞版本,它会立即尝试获取锁定,如果锁定成功则返回 trueboolean tryLock(long timeout, TimeUnit timeUnit) 类似于 tryLock()
,但它可以指定超时,达到超时之后就会自动放弃获取锁void unlock() 解锁 Lock 实例 锁定的实例应该始终被解锁以避免死锁情况。
锁的推荐使用方式是将锁相关的代码块放在 try/catch 和 finally 块中。
Lock lock = ...; lock.lock();try { // access to the shared resource} finally { lock.unlock();}
除了Lock 接口之外,java.util.concurrent.lock 包还提供了一个 ReadWriteLock 接口,俗称 「读写锁」,它维护一对锁,一个用于
只读操作,一个用于写操作。
对于读写锁,只要没有写入,读锁定可以由多个线程同时保持。
ReadWriteLock 声明了两个方法用于获取读取或写入锁
方法 说明 Lock readLock() 返回一个用于读取的锁 Lock writeLock() 返回一个用于写的锁 锁的实现
ReentrantLock 锁
ReentrantLock 类实现了 Lock 接口。它提供了相同的并发和内存语义,如使用 synchronized 方法和语句访问的隐式监视器锁,而
且可以被子类化。
我们写一个范例演示下如何使用 ReenrtantLock 来实现同步
public class SharedObject { //... ReentrantLock lock = new ReentrantLock(); int counter = 0; public void perform() { lock.lock(); try { // Critical section here count++; } finally { lock.unlock(); } } //...}
正如上面的示例所示,我们需要确保在 try-finally 块中包装 lock() 和 unlock() 调用以避免死锁情况。
现在,让我们来看看 tryLock() 的工作原理
public void performTryLock(){ //... boolean isLockAcquired = lock.tryLock(1, TimeUnit.SECONDS); if(isLockAcquired) { try { //Critical section here } finally { lock.unlock(); } } //...}
上面这个范例中,调用 tryLock() 的线程将等待一秒钟,如果锁定不可用则放弃等待。
ReentrantReadWriteLock
ReentrantReadWriteLock 类实现了 ReadWriteLock 接口。
我们来看一下线程获取 ReadLock 或 WriteLock 的规则:
读锁 : 如果没有线程获得写锁定或请求它,则多个线程可以获取读锁定。
写锁 : 如果没有线程正在读或写,则只有一个线程可以获取写锁。
我们写一个范例演示下如何使用 ReadWriteLock
public class SynchronizedHashMapWithReadWriteLock { Map<String,String> syncHashMap = new HashMap<>(); ReadWriteLock lock = new ReentrantReadWriteLock(); //... Lock writeLock = lock.writeLock(); public void put(String key, String value) { try { writeLock.lock(); syncHashMap.put(key, value); } finally { writeLock.unlock(); } } ... public String remove(String key){ try { writeLock.lock(); return syncHashMap.remove(key); } finally { writeLock.unlock(); } } //...}
对于这两种 “写” 操作,我们需要使用写锁定来包围临界区,只有一个线程可以访问它
Lock readLock = lock.readLock();//...public String get(String key){ try { readLock.lock(); return syncHashMap.get(key); } finally { readLock.unlock(); }}public boolean containsKey(String key) { try { readLock.lock(); return syncHashMap.containsKey(key); } finally { readLock.unlock(); }}
对于这两种 “读” 操作,我们需要使用读锁定来包围临界区。如果没有正在进行的写操作,多个线程可以访问此部分。
StampedLock
StampedLock 是 Java 8 中引入的。它支持读写锁定。不同的是,锁的获取方法返回的戳记 ( stamp ) 可以用于释放锁定或检查锁
定是否仍然有效。
public class StampedLockDemo { Map<String,String> map = new HashMap<>(); private StampedLock lock = new StampedLock(); public void put(String key, String value){ long stamp = lock.writeLock(); try { map.put(key, value); } finally { lock.unlockWrite(stamp); } } public String get(String key) throws InterruptedException { long stamp = lock.readLock(); try { return map.get(key); } finally { lock.unlockRead(stamp); } }}
StampedLock 提供的另一个功能是 「 乐观锁 」 。大多数时候,读操作不需要等待写操作完成,因此不需要完全成熟的读锁。相反,
我们可以升级到读锁。
public String readWithOptimisticLock(String key) { long stamp = lock.tryOptimisticRead(); String value = map.get(key); if(!lock.validate(stamp)) { stamp = lock.readLock(); try { return map.get(key); } finally { lock.unlock(stamp); } } return value;}
使用条件
Condition 类让线程能够在执行临界区时等待某些条件发生。当线程获得对临界区的访问但没有执行其操作的必要条件时,可能
会发生这种情况。
例如,读线程可以访问共享队列的锁,该队列仍然没有任何数据可供使用。
传统上,Java 为线程互通提供了 wait()、notify() 和 notifyAll() 方法。
Condition 类有类似的机制,而且,还允许我们指定多个条件。
public class ReentrantLockWithCondition { Stack<String> stack = new Stack<>(); int CAPACITY = 5; ReentrantLock lock = new ReentrantLock(); Condition stackEmptyCondition = lock.newCondition(); Condition stackFullCondition = lock.newCondition(); public void pushToStack(String item){ try { lock.lock(); while(stack.size() == CAPACITY){ stackFullCondition.await(); } stack.push(item); stackEmptyCondition.signalAll(); } finally { lock.unlock(); } } public String popFromStack() { try { lock.lock(); while(stack.size() == 0){ stackEmptyCondition.await(); } return stack.pop(); } finally { stackFullCondition.signalAll(); lock.unlock(); } }}
-
12、Java 守护线程 ( Daemon Thread )
在这篇简短的文章中,我们将讲解下 Java 中的守护线程,看看它们可以做什么。我们还将解释守护线程和用户线程之间的区别。
守护线程和用户线程的区别
Java 提供了两种类型的线程:** 守护线程** 和 用户线程
用户线程 是高优先级线程。JVM 会在终止之前等待任何用户线程完成其任务。
守护线程 是低优先级线程。其唯一作用是为用户线程提供服务。
由于守护线程的作用是为用户线程提供服务,并且仅在用户线程运行时才需要,因此一旦所有用户线程完成执行,JVM 就会终止。
也就是说 守护线程不会阻止 JVM 退出。
这也是为什么通常存在于守护线程中的无限循环不会导致问题,因为任何代码(包括 finally 块 )都不会在所有用户线程完成执行
后执行。
这也是为什么我们并不推荐 在守护线程中执行 I/O 任务 。因为可能导致无法正确关闭资源。
但是,守护线程并不是 100% 不能阻止 JVM 退出的。守护线程中设计不良的代码可能会阻止 JVM 退出。例如,在正在运行的守
护线程上调用Thread.join() 可以阻止应用程序的关闭。
守护线程能用来做什么?
常见的做法,就是将守护线程用于后台支持任务,比如垃圾回收、释放未使用对象的内存、从缓存中删除不需要的条目。
咦,按照这个解释,那么大多数 JVM 线程都是守护线程。
如何创建守护线程 ?
守护线程也是一个线程,因此它的创建和启动其实和普通线程没什么区别?
要将普通线程设置为守护线程,方法很简单,只需要调用 Thread.setDaemon() 方法即可。
例如下面这段代码,假设我们继承 Thread 类创建了一个新类 NewThread 。那么我们就可以创建这个类的实例并设置为守护线程
NewThread daemonThread = new NewThread();daemonThread.setDaemon(true);daemonThread.start();
在Java 语言中,线程的状态是自动继承的。任何线程都会继承创建它的线程的守护程序状态。怎么理解呢?
1、 如果一个线程是普通线程(用户线程),那么它创建的子线程默认也是普通线程(用户线程);
2、 如果一个线程是守护线程,那么它创建的子线程默认也是守护线程;因此,我们可以推演出: 由于主线程是用户线程,因此在 main() 方法内创建的任何线程默认为用户线程。
需要注意的是调用 setDaemon() 方法的时机,该方法只能在创建 Thread 对象并且在启动线程前调用。在线程运行时尝试调用
setDaemon() 将抛出 IllegalThreadStateException 异常。
@Test(expected = IllegalThreadStateException.class) public void whenSetDaemonWhileRunning_thenIllegalThreadStateException() { NewThread daemonThread = new NewThread(); daemonThread.start(); daemonThread.setDaemon(true);}
如何检查一个线程是守护线程还是用户线程?
检查一个线程是否是守护线程,可以简单地调用方法 isDaemon() ,如下代码所示
@Testpublic void whenCallIsDaemon_thenCorrect() { NewThread daemonThread = new NewThread(); NewThread userThread = new NewThread(); daemonThread.setDaemon(true); daemonThread.start(); userThread.start(); assertTrue(daemonThread.isDaemon()); assertFalse(userThread.isDaemon());}
后记
守护线程的概念是不是很简单?
-
13、Java java.util.concurrent.Future
写了几篇 Java 一文秒懂 XXX 系列的文章后,对 Java 并发编程的设计思想真的是竖然起敬。
Java 在并发方面引入了 「 将来 」( Future ) 这个概念。把所有不在主线程执行的代码都附加了将来这个灵魂。主线程只负责其它
并发线程的创建、启动、监视和处理并发线程完成任务或发生异常时的回调。其它情况,则交给并发线程自己去处理。而双方之间
的沟通,就是通过一个个被称之为 「 将来 」 的类出处理。
Future 定义在 java.util.concurrent 包中,这是一个接口,自 Java 1.5 以来一直存在的接口,用于处理异步调用和处理并发编程。
创建 Future
简单地说,Future 类表示异步计算的未来结果 – 在处理完成后最终将出现在 Future 中的结果。
是不是又很难理解,文字越少,内容越多。上面这句话的意思,就是主线程会创建一个 Future 接口的对象,然后启动并发线程,
并告诉并发线程,一旦你执行完毕,就把结果存储在这个 Future 对象里。
因此,理解 Future 的第一步,就是要知道如何创建和返回 Future 实例。
一般情况下,我们会把长时间运行的逻辑放在异步线程中进行处理,这是使用 Future 接口最理想的场景。主线程只要简单的将异
步任务封装在 Future 里,然后开始等待 Future 的完成,在这段等待的时间内,可以处理一些其它逻辑,一旦 Future 执行完毕,就
可以从中获取执行的结果并进一步处理。
针对上面这种表述,我们来看看具体哪些场景可以使用 Future :
计算密集型( 数学和科学计算 )
操纵大数据结构( 大数据 )
远程方法调用(下载文件,HTML 爬取,Web 服务)
实现了 Future 的 FutureTask
我们先来看一段代码:
public class SquareCalculator { private ExecutorService executor = Executors.newSingleThreadExecutor(); public Future<Integer> calculate(Integer input) { return executor.submit(() -> { Thread.sleep(1000); return input * input; }); }}
如果你认真读过前几个章节,想必对这段代码不陌生了。
在上面这段代码中,我们创建了一个简单的类用于计算一个整型 ( Integer ) 的平方。当然了,计算平方这个任务肯定不能划到
「 长时间运行 」 这个类别里,所以我们在它之前又添加了 Thread.sleep(1000)。
不要小看 1s。这已经是相当长的任务了。
在上面这段代码中,实际执行的计算是作为 Lambda 表达式参数传递给 call() 方法。当然了,这个实际执行的代码,除了 Thread.sleep()
之外好像也没有什么特别之处。
好了,现在,我们应该将注意力转移到 Callable 和 ExecutorService 的使用,因为它们才是最有趣的。
Callable 是一个接口,用于表示一个任务,这个任务可以返回值。Callable 接口只有一个方法 call()。上面的示例中。那个 Lambda
其实就是一个 Callable 实例。
啥? 不会看不懂吧? 好吧,我找个时间好好写一些 Java Lambda 方面的文章。
Callable 实例创建完成后并不会立即执行,我们仍然需要将它传递给一个 「 执行器 」( Executor , 执行程序 ) ,这个执行器将负责
在新线程中启动该任务并返回一个包含了值的 Future 对象。
这个执行器,是 Executor 的实例,通常,它是一个 ExecutorService 类的实例。
Java 其实提供了很多方法创建 ExecutorService 的实例,但最常用的,也是最推荐的做法是使用 Executors 的静态工厂方法。上面的
示例中,我们就使用了 Executors.newSingleThreadExecutor() 方法创建了一个能够处理单个线程的 ExecutorService。
一旦我们有了一个 ExecutorService 对象,我们只需要调用它的 submit() 并传递我们的 Callable 作为参数即可。 submit() 会启动任
务并返回一个 FutureTask 对象。
FutureTask 是一个类,实现了 Future 接口, 在 java.util.concurrent 包中定义。
消费( 使用 ) Future
用了相当长的篇幅,我们终于讲完了如何创建一个 Future 实例,接下来,我们将进入如何消费(使用) 刚刚创建的 Future 实例。
使用 isDone() 和 get() 方法来获取结果
现在,是时候调用 calculate() 方法获取返回的 Future 实例了,通过 Future 实例,我们就能进一步获取计算的整型结果。
要从Future 实例中获取结果,我们需要用到两个方法:isDone() 和 get()。
1、 Future.isDone()方法用于获取我们的执行器是否已完成任务处理如果任务完成,则返回true,否则返回false;
2、 从计算中返回实际结果的方法是Future.get()但要注意的是,Future.get()方法是一个阻塞方法如果任务还没执行完毕,那么会一直阻塞直到直到任务完成,;
为了防止调用 Future.get() 方法阻塞当前线程,推荐的做法是先调用 Future.isDone() 判断任务是否完成,然后再调用 Future.get() 从
完成的任务中获取任务执行的结果。
因为Future.isDone() 和 Future.get() 的存在,我们就可以在等待任务完成时运行其它一些代码,就像下面示例中所演示的那样
Future<Integer> future = new SquareCalculator().calculate(10);while(!future.isDone()) { System.out.println("Calculating..."); Thread.sleep(300);}Integer result = future.get();
上面这段代码,我们在等待计算任务完成的同时执行了一条输出语句,用于提醒用户当前程序还是在运行的,并没有僵死。
我们使用了一个 while 循环,使用 future.isDone() 来检查任务是否完成,一旦完成,就会立即终止循环,并调用 future.get() 方法获
取计算的结果。
因为实现使用了 isDone() 判断任务是否完成,所以 future.get() 并不会发生阻塞,想法,简直就是立即返回。
使用isDone() 和 get() 方法来获取结果,这应该是消费 Future 最常见的方式。
当然了,值得一提的是,Future.get() 方法有一个可以超时等待的重载版本,这个重载版本接收两个参数,一个是超时的时间,另一个
是超时时间的单位。方法原型如下
V get(long timeout, TimeUnit unit) throws InterruptedException,ExecutionException,TimeoutException
而使用方式如下
Integer result = future.get(500, TimeUnit.MILLISECONDS);
get(long, TimeUnit) 和 get() 的不同之处,是前者在经过指定的超时时间后任务仍未返回,那么就会抛出一个 TimeoutException 异常,
表示执行超时。
使用 Future.cancel() 方法取消 Future
假设我们已经触发了一项任务,但由于某种原因,我们不再关心结果了。我们可以使用 Future.cancel(boolean) 告诉执行器停止操作
并中断其底层线程。该方法很简单,使用演示如下
Future<Integer> future = new SquareCalculator().calculate(4);boolean canceled = future.cancel(true);
上面这两行代码,我们的 Future 实例永远不会完成它的操作。实际上,如果我们尝试在调用了 cancel() 方法之后立即调用 get() 方法,
将会获得一个 CancellationException 异常。
为了防止 Future.get() 抛出一个 CancellationException 异常,我们可以使用 Future.isCancelled() 检查 Future 是否已被取消。
注意
1、 对cancel()的调用可能会失败如果调用失败,那么它会返回false;
2、 cancel()方法接受一个布尔值作为参数,该参数用于控制执行此任务的线程是否应该被中断;多线程 vs 线程池
上面的示例中,我们的 ExecutorService 实例是单线程的,因为它是使用 Executors.newSingleThreadExecutor() 方法获得的。
为了突出演示它是 「 单线程 」,我们改一下代码同时触发两个计算
SquareCalculator squareCalculator = new SquareCalculator(); Future<Integer> future1 = squareCalculator.calculate(10); Future<Integer> future2 = squareCalculator.calculate(100); while (!(future1.isDone() && future2.isDone())) { System.out.println( String.format( "future1 is %s and future2 is %s", future1.isDone() ? "done" : "not done", future2.isDone() ? "done" : "not done" ) ); Thread.sleep(300);}Integer result1 = future1.get(); Integer result2 = future2.get(); System.out.println(result1 + " and " + result2); squareCalculator.shutdown();
然后我们就会获得类似下面的输出
calculating square for: 10future1 is not done and future2 is not done future1 is not done and future2 is not done future1 is not done and future2 is not done future1 is not done and future2 is not done calculating square for: 100future1 is done and future2 is not done future1 is done and future2 is not done future1 is done and future2 is not done100 and 10000
很明显,整个过程并不是并行执行的。因为第二个任务仅在第一个任务完成后才开始,所以,整个过程大约需要 2 秒钟才能完成。
为了使我们的程序真正具有多线程,我们应该使用不同风格的 ExecutorService 。例如下面这段代码,我们
使用工厂方法Executors.newFixedThreadPool() 创建一个固定大小的线程池,并观察输出的结果有何变化。
public class SquareCalculator { private ExecutorService executor = Executors.newFixedThreadPool(2); //...}
我比较懒,你把相应的代码替换下即可,省略号那段就不用替换了。
这段代码,对 SquareCalculator 类的做了一处简单的更改,使得我们的执行器拥有了 2 个同步线程。
如果我们再次运行完全相同的客户端代码,我们获得的输出可能如下
calculating square for: 10calculating square for: 100future1 is not done and future2 is not done future1 is not done and future2 is not done future1 is not done and future2 is not done future1 is not done and future2 is not done100 and 10000
现在看起来心情是否愉快多了,你应该留意到了, 2 个任务是如何同时开始和结束运行的,整个过程大约需要 1 秒钟就能完成。
-
14、Java ThreadLocalRandom
随机数生成是一个非常常见的操作,而且 Java 也提供了 java.util.Random 类用于生成随机数,而且呢,这个类也是线程安全的,
就是有一点不好,在多线程下,它的性能不佳。
为什么多线程下,Random 的性能不佳?
因为,它采用了多个线程共享一个 Random 实例。这样就会导致多个线程争用。
为了解决这个问题,Java 7 引入了 java.util.concurrent.ThreadLocalRandom 类,用于在多线程环境中生成随机数。
本文接下来的部分,就来看看如何 ThreadLocalRandom 如何执行以及如何在实际应用程序中使用它。
ThreadLocalRandom Via Random
ThreadLocalRandom 是 ThreadLocal 类和 Random 类的组合,它与当前线程隔离,通过简单地避免对 Random 对象的任何并发访
问,在多线程环境中实现了更好的性能。
也就是说,相比于 java.util.Random 类全局的提供随机数生成, 使用 ThreadLocalRandom,一个线程获得的随机数不受另一个线程
的影响。
另一个与 Random 类不同的是,ThreadLocalRandom 不支持显式设置种子。因为它重写了从 Random 继承的 setSeed(long seed)
方法,会在调用时始终抛出 UnsupportedOperationException。
接下来我们看看如何使用 ThreadLocalRandom 生成随机 int、long 和 double 值。
使用 ThreadLocalRandom 生成随机数
根据Oracle 文档,我们只需要调用 ThreadLocalRandom.current() 方法,就能返回当前线程的 ThreadLocalRandom 实例。然后,
我们可以通过实例的相关方法来生成随机值。
比如下面的代码,生成一个没有任何边界的随机 int 值
int unboundedRandomValue = ThreadLocalRandom.current().nextInt());
其实是有边界的,它的边界就是 int 的边界。
接下来,我们看看如何生成有边界的随机 int 值,这意味着我们需要传递边界下限和边界上限作为参数
int boundedRandomValue = ThreadLocalRandom.current().nextInt(0, 100);
请注意,这是一个左闭右开区间,也就是说,上面的实例生成的随机数在 [0,100) 之间,包含了 0 但不包含 100。
同样的,我们可以通过调用 nextLong() 和 nextDouble() 方法生成 long 和 double 类型的随机值,调用方式与上面示例中 nextInt()
类似。
Java 8 还添加了 nextGaussian() 方法从生成器序列中生成下一个正态分布的值,其值范围在 0.0 和 1.0 之间。
与Random 方法类似,ThreadLocalRandom 也提供了 doubles() 、ints() 和 longs() 方法生成一序列流式 ( stream ) 的随机值。
使用 JMH 比较 ThreadLocalRandom 和 Random
记下来,我们看看如何在多线程环境中分别使用这两个类生成随机值,然后再使用 JMH 比较它们的性能。
首先,我们创建一个示例,其中所有线程共享一个 Random 实例。
ExecutorService executor = Executors.newWorkStealingPool(); List<Callable<Integer>> callables = new ArrayList<>(); Random random = new Random(); for (int i = 0; i < 1000; i++) { callables.add(() -> { return random.nextInt(); });}executor.invokeAll(callables);
上面的代码中,我们把使用 Random 实例生成随机值的任务提交给 ExecutorService 。
然后,我们使用 JMH 基准测试来检查上面代码的性能
# Run complete. Total time: 00:00:36Benchmark Mode Cnt Score Error UnitsThreadLocalRandomBenchMarker.randomValuesUsingRandom avgt 20 771.613 ± 222.220 us/op
接着,类似地,我们使用 ThreadLocalRandom 而不是 Random 实例
ExecutorService executor = Executors.newWorkStealingPool(); List<Callable<Integer>> callables = new ArrayList<>(); for (int i = 0; i < 1000; i++) { callables.add(() -> { return ThreadLocalRandom.current().nextInt(); });}executor.invokeAll(callables);
上面的代码,为线程池中的每个线程单独使用了一个 ThreadLocalRandom 实例。
下面是使用 JMH 对 ThreadLocalRandom 的测试结果
# Run complete. Total time: 00:00:36Benchmark Mode Cnt Score Error UnitsThreadLocalRandomBenchMarker.randomValuesUsingThreadLocalRandom avgt 20 624.911 ± 113.268 us/op
通过JMH 的测试结果中可以看出,使用 Random 生成 1000 个随机值所花费的平均时间是 772 微秒,但使用 ThreadLocalRandom
只花了 625 微秒。嗯,差距不是很大,但好歹也是有差距的,因为生成 1000 个随机数是瞬间的事情。
因此,我们可以得出结论,ThreadLocalRandom 在高度并发的环境中更有效。
-
15、Java Thread 生命周期
本文中,我想详细的讨论下 Java 中的核心概念 – 线程的生命周期。我会使用一张我自制的图片加上实用的代码片段,一步一步的
详细剖析线程的各个状态和各个状态之间如何转换。
Java 中的多线程
Java 语言中, 多线程是由 Thread 的核心概念驱动的。因为多线程中的每一个线程都相互独立,有着自己的生命周期和状态转换。
Java 线程中的生命周期
Java 中,每一个线程都是 java.lang.Thread 类的实例。而且,Java 个线程生命周期中的各个状态都定义在 Thread 类的一个静态
的 State 枚举中。
State 枚举定义了线程的所有潜在状态。总共有 6 个,分别对应者上图中的 6 个绿色背景的矩形和椭圆型。
NEW : 新创建的,且未调用 start() 方法开始执行的线程。
RUNNABLE : 已经在运行中的线程或正在等待资源分配的准备运行的线程。
BLOCKED : 等待获取进入或重新进入同步块或方法的监视器锁的线程。
WAITING : 等待其他一些线程执行特定操作,没有任何时间限制。
TIMED_WAITING: 等待某个其他线程在指定时间段内执行特定操作
TERMINATED : 线程完成了它的任务。
需要注意的是: 在任何给定的时间点,线程只能处于这些状态之一。
NEW 状态,应该很好理解,比如,车,厂家生产出来,只要还没被卖出过,那么它就是新的 ( NEW )
RUNNABLE 只要线程不出于其它状态,它就是 RUNNABLE 状态。怎么理解呢? 车买来了,只要它没坏没出什么毛病没借给
别人,那么它就出于可开状态,不管是呆在家里吃灰还是已经在上路运行。
WAITING : 无时间显示的等待其它线程完成任务时就处于这个状态,怎么理解呢?比如长假告诉公路大堵车,要等待别人前
进了几个蜗牛步我们才能往前几个蜗牛步,有时候一等就是昏天暗地,可能长达几天,也可能,一辈子吧。
TIMED_WAITING : 一直处于 WAITING 总不是办法,所以可能会设置一个超时时间,如果过了时间,就不等待了。同样的,
如果可以后退,那么我们在堵车的时候可能会等待那么十几分钟,发现确实走不了,就等了呗。
TERMINATED : 当一个线程结束了它的任务(可能完成了,也可能没完成)就会处于这个状态。如果拿车做比喻,那么当
车彻底报废,已经再也不能上路了,就处于这个状态。
其实拿车作比喻感觉有点怪,我觉得拿追女朋友来做比喻比较恰当些。
NEW 状态
NEW状态的线程(或已经创建的新线程)是已创建但尚未启动的线程。线程会一直保持这个 NEW 状态,直到在该线程上调用了
start() 方法启动它。
下面的代码,我们创建了一个 NEW 状态的线程
Runnable runnable = new NewState();Thread t = new Thread(runnable);Log.info(t.getState());
由于我们没有启动线程,因此 t.getState() 方法将打印输出
NEW
RUNNABLE 状态
当在一个 NEW 状态的线程上调用 start() 方法时,该线程的状态会从 NEW 转换为 RUNNABLE。处于该状态的线程要么是已经在
运行中,那么是在处于正在等待系统的资源分配(准备运行)。
在多线程环境中,线程调度器 ( Thread-Scheduler,它是 JVM 的一部分)会为每个线程分配固定的时间。线程并不是一直都在
执行的,调度器会把暂时空闲的线程的 CPU ( 还是在 RUNNABLE 状态 )让出来,让其它需要的线程去运行。因此它会运行一段
特定的时间,然后将控制权放弃给其他 RUNNABLE 线程。
注意: 这里的等待资源,不是等待其它线程,而是等待 CPU 排队。打个比方,新车上路。要等待的是有没有路,如果没有
路,就开不了,这是本质的问题。
例如,让我们将 t.start() 方法添加到我们之前的代码中并尝试访问其当前状态
Runnable runnable = new NewState();Thread t = new Thread(runnable);t.start();Log.info(t.getState());
此代码最有可能返回输出
RUNNABLE
为什么说是最有可能呢?如果是一个空转线程,除了 CPU 不需要其它资源,那么很大概率就是 RUNNABLE ,但如果需要其它资源,
可能会因为竞争资源而处于其它状态。还有一种情况,可能还没运行到 t.getState() ,线程任务就执行完毕了,那么也不会是
RUNNABLE 状态。
BLOCKED 状态
当一个线程当前没有资格运行时,它处于 BLOCKED 状态。如果线程在尝试访问由某个其他线程锁定的代码段时,那它会因为需要等
待获取监视器锁进入此状态。
我们使用一小段代码来重现下这个状态
public class BlockedState { public static void main(String[] args) throws InterruptedException { Thread t1 = new Thread(new DemoThreadB()); Thread t2 = new Thread(new DemoThreadB()); t1.start(); t2.start(); Thread.sleep(1000); Log.info(t2.getState()); System.exit(0); }}class DemoThreadB implements Runnable { @Override public void run() { commonResource(); } public static synchronized void commonResource() { while(true) { // Infinite loop to mimic heavy processing // 't1' won't leave this method // when 't2' try to enters this } }}
在上面这段代码中
我们创建了两个不同的线程– t1 和 t2 。
t1 启动后就进入了同步的 commonResource()方法,同步方法意味着一次只能有一个线程可以访问它。尝试访问此方法的所有其
他后续线程将被阻止进一步执行,直到当前线程完成处理。
当 t1 进入这个方法时,它保持了无限循环,这只是为了模仿繁重的处理,以便所有其他线程都无法进入此方法。
接着我们开启 t2 ,它尝试输入已经被 t1 访问的 commonResource() 方法,这时,因为 commonResource() 被 t1 锁定,所以 t2
将保持在 BLOCKED 状态
在这个状态上,当我们使用 t.getState() 时将输出
BLOCKED
WAITTING 状态
线程在等待某个其他线程执行特定操作时处于 WAITING 状态。根据 Oracle 官方文档,任何线程都可以通过调用以下三种方法中的任
何一种来进入此状态:
1、 object.wait();
2、 thread.join();
3、 LockSupport.park();请注意,我们没有为 wait() 和 join() 定义任何超时时间,因为下一节将介绍该方案。
我们以后会写一个单独的教程,详细讨论了 wait()、notify() 和 notifyAll() 的使用。
下面,我们写一段代码尝试重现这种状态
public class WaitingState implements Runnable { public static Thread t1; public static void main(String[] args) { t1 = new Thread(new WaitingState()); t1.start(); } public void run() { Thread t2 = new Thread(new DemoThreadWS()); t2.start(); try { t2.join(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); Log.error("Thread interrupted", e); } }}class DemoThreadWS implements Runnable { public void run() { try { Thread.sleep(1000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); Log.error("Thread interrupted", e); } Log.info(WaitingState.t1.getState()); }}
我们来讨论一下上面的代码做的事情
1、 首先,我们创建并启动了t1;
2、 其次,t1创建了t2并启动它;
3、 当t2的处理继续时,我们调用t2.join(),这使t1处于WAITING状态,直到t2完成执行;
4、 由于t1正在等待t2完成,我们从t2调用t1.getState();输出结果一般为
WAITING
请留意在哪里调用 t1.getState() 。
所以,WAITING 和 BLOCKED 两个状态的区别是什么?
BLOCKED 是因为线程竞争不到资源而处于 BLOCKED 状态。这个是被动的。因为别无选择。
WAITING 是因为线程主动等待别人完成而处于 WAITING 状态。这个是主动的。因为它可以不调用那三个方法,不用等待其它人
完成。它可以选择挥一挥衣袖,不不带走一片云彩
TIMED_WAITING 状态
线程在等待另一个线程在规定的时间内执行特定操作时处于 TIMED_WAITING 状态。根据 Java Docs 文档,有五种方法可以
将线程置于TIMED_WAITING 状态:
1、 thread.sleep(longmillis);
2、 wait(inttimeout)orwait(inttimeout,intnanos);
3、 thread.join(longmillis);
4、 LockSupport.parkNanos;
5、 LockSupport.parkUntil;下面,我们写一段代码尝试重现这种状态
public class TimedWaitingState { public static void main(String[] args) throws InterruptedException { DemoThread obj1 = new DemoThread(); Thread t1 = new Thread(obj1); t1.start(); // The following sleep will give enough time for ThreadScheduler // to start processing of thread t1 Thread.sleep(1000); Log.info(t1.getState()); }}class DemoThread implements Runnable { @Override public void run() { try { Thread.sleep(5000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); Log.error("Thread interrupted", e); } }}
整体代码和 WAITING 状态的差不多,我们创建并启动了一个线程 t1,并它进入睡眠状态,超时时间为 5 秒。
输出结果为
TIMED_WAITING
TERMINATED 状态
这是一个 「 已死 」 线程的状态。当一个线程已经完成执行或异常终止时,它处于 TERMINATED 状态。我们在 怎么关闭一个 Java
线程 ( Thread ) ? 一文中讨论过杀死线程的不同方法。
这个状态没什么好讨论的,我们写一段代码尝试重现这种状态
public class TerminatedState implements Runnable { public static void main(String[] args) throws InterruptedException { Thread t1 = new Thread(new TerminatedState()); t1.start(); // The following sleep method will give enough time for // thread t1 to complete Thread.sleep(1000); Log.info(t1.getState()); } @Override public void run() { // No processing in this block }}
上面这段代码中,我们启动线程 t1 时,下一个语句 Thread.sleep(1000) 为 t1 提供了足够的时间来完成。
因此,上面这个示例输出结果为
TERMINATED
后记
在本教程中,我们了解了 Java 中线程的生命周期。我们详细介绍了 Thread.State 枚举定义的所有七个状态,并使用一些示例来演示
他们。
虽然代码片段几乎可以在每台机器上提供相同的输出,但在某些特殊情况下,我们可能会得到一些不同的输出,因为线程调度程序的
确切行为无法确定。
所以,有任何问题,欢迎回帖咨询
-
16、Java 之 Runnable 还是 Thread ?
写Java 代码的时候,我们经常会有这样的疑问:我到底是实现一个 Runnable 呢,还是扩展一个 Thread 类?
你的答案是什么呢? 那有没有标准答案呢?
答案是什么呢?
我们先来分析下,看看哪种方法在实践中更有意义以及为什么?
扩展一个线程 ( Thread 类 )
简单起见,我们就来定义一个扩展自 Thread 的 SimpleThread 类
public class SimpleThread extends Thread { private String message; // standard logger, constructor @Override public void run() { log.info(message); }}
代码也真是简单了,然后我们看看如何运行这个 SimpleThread 类
@Testpublic void givenAThread_whenRunIt_thenResult() throws Exception { Thread thread = new SimpleThread( "SimpleThread executed using Thread"); thread.start(); thread.join();}
我们也可以把这个 SimpleThread 放到前面章节 一文秒懂 Java ExecutorService 中提到的 ExecutorService 中运行。
@Testpublic void givenAThread_whenSubmitToES_thenResult() throws Exception { executorService.submit(new SimpleThread( "SimpleThread executed using ExecutorService")).get();}
看起来感觉是不是有点复杂,我们只想在单独的线程中运行单个日志操作而已,使用 Thread 的方式看起来有点复杂化了,要么是
start() 和 join() ,要么是 ExecutorService。
当然,这不是最糟糕的,更糟糕的是,SimpleThread 再也不能扩展任何其它类,因为 Java 不支持多重继承。
实现 ( implements) 一个 Runnable
同样的简单起见,我们创建一个实现了 java.lang.Runnable 接口的简单任务。
class SimpleRunnable implements Runnable { private String message; // standard logger, constructor @Override public void run() { log.info(message); }}
这段代码是不是和上面的 SimpleThread 很相似?
因为这个 SimpleRunnable 只是一个任务,一个在一个单独的线程中运行的任务。
为了运行这个任务,有多种方式可供选择,其中之一,就是使用一个 Thread 类。
@Testpublic void givenRunnable_whenRunIt_thenResult() throws Exception { Thread thread = new Thread(new SimpleRunnable( "SimpleRunnable executed using Thread")); thread.start(); thread.join();}
同样的,还可以使用 ExecutorService:
@Testpublic void givenARunnable_whenSubmitToES_thenResult() throws Exception { executorService.submit(new SimpleRunnable( "SimpleRunnable executed using ExecutorService")).get();}
看到这里,你是不是很疑惑?Runnable 和继承一个 Thread 没有什么区别啊 ?同样多的代码,同样多的步骤。
别急,哈哈,重点来了。
由于我们的 SimpleRunnable 实现了一个接口,因此,如果需要,我们可以自由扩展自另一个基类。
更简单的是,一个几行代码的 Runnable 还可以写成一个简单的 Lambda 表达式
@Testpublic void givenARunnableLambda_whenSubmitToES_thenResult() throws Exception { executorService.submit( () -> log.info("Lambda runnable executed!"));}
这才是Runnable 的杀手锏。真的是简单的不要太多。
Runnable or Thread?
看到这里,你想要的是 Runnable 还是 Thread ?
看我上文的描述,肯定是倾向使用 Runnable 多过 Thread:
在扩展 Thread 类时,我们并没有被要求覆盖它的任何方法。相反,我们需要覆盖 Runnable 的 run() 方法( Thread 类已经实现了 )。这显然违反了 IS-A Thread 原则。
我们可以创建一个 Runnable 的实现并将其传递给 Thread 类。这利用的是组合而不是继承。这更灵活。
在扩展了 Thread 类之后,我们无法扩展任何其他类。
从 Java 8 开始,Runnables 可以重写为 lambda 表达式。
-
17、Java wait() 和 notify() 方法
大家有没有发现,其实 「 一文秒懂 」 系列讲述的都是多线程并发开发的问题。这个话题太大了,估计没有上百篇文章都解释不清楚。
本文,我们来讲解下 Java 并发中的基础的基础,核心的核心,Java 并发编程中的最基本的机制之一 – 「 线程同步 」
为了方便你理解并发编程中的各种概念和术语,我们首先会来一阵扫盲,讨论一些基本的并发相关术语和方法。接着,我们将开发一
个简单的应用程序,并在合格应用程序里处理并发问题,以方便大家理解和巩固 wait() 和 notify()。
Java 中的线程同步 ( Thread Synchronization )
在并发编程中,在多线程环境下,多个线程可能会尝试修改同一资源。如果线程管理不当,这显然会导致一致性问题。
Java 中的哨兵块 ( guarded block )
Java 中,可以用来协调多个线程操作的一个工具是 「 哨兵块 」。这个哨兵块会在恢复执行前检查特定条件。
基于这种哨兵检查的思想,Java 在所有类的基类 Object 中提供了两个方法
方法 说明 Object.wait()
暂停一个线程 Object.notify()
唤醒一个线程 是不是有点难以理解,别担心,看下面这个图,这个图描绘了线程的的生命周期。
wait() 方法
对照上图,简单的说,当我们调用 wait() 时会强制当前线程等待,直到某个其它线程在同一个对象上调用 notify() 或 notifyAll() 方法。
因此,当前线程必须拥有对象的监视器。根据 Java docs 的说法,这可能发生在
我们已经为给定对象执行了同步实例方法
我们已经在给定对象上执行了 synchronized 块的主体
通过为 Class 类型的对象执行同步静态方法
请注意,一次只有一个活动线程可以拥有对象的监视器。
除了无参数 wait() 方法外,Java 还重载了另一个 wait() 方法
wait() 方法
wait() 方法导致当前线程无限期地等待,直到另一个线程调用此对象的 notify() 或 notifyAll() 方法
wait(long timeout) 方法
使用此方法,我们可以指定一个超时,在此之后将自动唤醒线程。
当然了,我们可以在到达超时之前使用 notify() 或 notifyAll() 提前唤醒线程。
请注意,调用 wait(0) 与调用 wait() 相同
wait(long timeout, int nanos)
这是与wait(long timeout) 提供相同功能的签名,唯一的区别是我们可以提供更高的精度。
该方法计算超时之间的方式为:
总超时时间(以纳秒为单位)= 1_000_000 * 超时 + nanos
notify() 或 notifyAll() 方法
notify() 和 notifyAll() 方法用于唤醒等待访问此对象监视器的线程。
它们以不同的方式通知等待线程。
notify() 方法
对于在此对象的监视器上等待的所有线程(通过使用任何一个重载 wait() 方法 ),notify() 通知将会随机唤醒任何一个线程。
也就是说,我们并不能确切知道唤醒了哪个线程,这取决于实现。
因为notify() 提供了唤醒一个随机线程的机制,因此它可用于实现线程执行类似任务的互斥锁定。
但在大多数情况下,使用 notifyAll() 会是一个更可行的方案。
notifyAll() 方法
notifyAll() 方法用于唤醒正在此对象的监视器上等待的所有线程。唤醒的线程将以常规的方式完成 – 就像任何其他线程一样。
但,有一点要注意的是,对于任意一个线程,但在我们允许其继续执行之前,请始终快速检查继续执行该线程所需的条件。因为在
某些情况下线程被唤醒而没有收到通知(这个场景将在后面的例子中讨论 )
发送者 – 接收者同步问题
线程同步的问题,我们已经有了个大概的了解,接下来,我们看一个简单的 Sender-Receiver ( 发送者 – 接收者 ) 应用程序,这个应
用程序将利用wait() 和 notify() 方法建立它们之间的同步。
发送者应该向接收者发送数据包
在发送方完成发送之前,接收方无法处理数据包
同样,发送方不得尝试发送另一个数据包,除非接收方已处理过上一个数据包
我们首先创建一个 Data 类,用于包含将从 Sender 发送到 Receiver 的数据包,同时,我们将使用 wait() 和 notifyAll() 来设置它们之
间的同步。
public class Data { private String packet; // True if receiver should wait // False if sender should wait private boolean transfer = true; public synchronized void send(String packet) { while (!transfer) { try { wait(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); Log.error("Thread interrupted", e); } } transfer = false; this.packet = packet; notifyAll(); } public synchronized String receive() { while (transfer) { try { wait(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); Log.error("Thread interrupted", e); } } transfer = true; notifyAll(); return packet; }}
范例有点小长,我们一步一步分析下代码
1、 私有属性packet用于表示通过网络传输的数据;
2、 布尔类型的私有属性transfer用于Sender和Receiver之间的同步;* 如果此变量为 true,则 Receiver 应等待 Sender 发送消息 * 如果它是 false ,那么 Sender 应该等待 Receiver 接收消息
3、 Sender使用send()方法将数据发送给Receiver:
* 如果 transfer 为 false ,我们将在此线程上调用 wait() * 但如果它为 true ,我们需要切换状态,设置我们的消息并调用 notifyAll() 来唤醒其他线程以指定发生了重大事件, 然后这些线程它们自己可以自查是否可以继续执行。
4、 同样的,Receiver将使用receive()方法接收数据;
* 如果 Sender 将传输设置为 false,那么继续,否则将在此线程上调用 wait() * 满足条件时,我们切换状态,通知所有等待的线程唤醒并返回 Receiver 的数据包
为什么在 while 循环中包含 wait()
由于notify() 和 notifyAll() 随机唤醒正在此对象监视器上等待的线程,因此满足条件并不总是很重要。有时可能会发生线程被唤醒,
但实际上并没有满足条件。
当然了,跟进一步说,我们还可以定义一个检查来避免虚假唤醒 – 线程可以从等待中醒来而不会收到通知。
我们为什么需要同步 send() 和 receive() 方法
我们将这些方法放在 synchronized 方法是为了提供内部锁。
如果调用 wait() 方法的线程不拥有固有锁,则会抛出错误。
现在,是时候创建 Sender 和 Receiver 并在两者上实现 Runnable 接口,以便它们的实例可以由线程执行。
我们先来看看 Sender 将如何工作
public class Sender implements Runnable { private Data data; // standard constructors public void run() { String packets[] = { "First packet", "Second packet", "Third packet", "Fourth packet", "End" }; for (String packet : packets) { data.send(packet); // Thread.sleep() to mimic heavy server-side processing try { Thread.sleep(ThreadLocalRandom.current().nextInt(1000, 5000)); } catch (InterruptedException e) { Thread.currentThread().interrupt(); Log.error("Thread interrupted", e); } } }}
对于这个 Sender :
我们正在创建一些随机数据包,这些数据包将通过网络以 packet[] 数组的形式发送
对于每个数据包,我们只是调用 send() 而不做其它动作
然后我们用随机时间间隔调用 Thread.sleep() 来模仿繁重的服务器端处理
接下来,我们来看看如何实现 Receiver
public class Receiver implements Runnable { private Data load; // standard constructors public void run() { for(String receivedMessage = load.receive(); !"End".equals(receivedMessage); receivedMessage = load.receive()) { System.out.println(receivedMessage); // ... try { Thread.sleep(ThreadLocalRandom.current().nextInt(1000, 5000)); } catch (InterruptedException e) { Thread.currentThread().interrupt(); Log.error("Thread interrupted", e); } } }}
上面这段代码很简单,只是在循环中调用 load.receive() ,直到我们得到最后一个 “End” 数据包。
最后,我们就可以写一个 main() 方法来运行它们了
public static void main(String[] args) { Data data = new Data(); Thread sender = new Thread(new Sender(data)); Thread receiver = new Thread(new Receiver(data)); sender.start(); receiver.start();}
运行范例,输出结果如下
First packetSecond packetThird packetFourth packet
完美!
我们在这里 – 我们以正确的顺序接收所有数据包,并成功建立了发送方和接收方之间的正确通信。
关闭