[版权申明] 非商业目的注明出处可自由转载
出自:shusheng007
概述
每个Java程序员都有一颗搞高并发的心,所以线程池几乎也是面试必考题。讲线程池的文章网上也特别多特别好,所以本文只是聊一下如何在SpringBoot中使用线程池。
异步初探
在SpringBoot中简单使用异步编程非常简单,只需要两步
- 使用
@EnableAsync
开启异步支持
@EnableAsync
@Configuration
public class ConcurrencyConfig {
...
}
- 使用
@Async
注解相关方法
@Async
public void runAsync(Integer id){
...
}
注意,使用@Async
标记的方法必须是public
的,而且返回值必须是void
或者Future
。
so easy,有没有不?面试要是这么回答差不多也该回家等消息了。对于稍微有些并发并发量的服务就需要自定义线程池,而不使用Spring默认的SimpleAsyncTaskExecutor
,因为其不够灵活。
线程池
线程池相对来说还是比较复杂的,下面是其类图。其中以ThreadPoolExecutor
最为重要,面试也基本考这个。
ThreadPoolExecutor
既然是线程池就会存在各种配置,下面是ThreadPoolExecutor最复查的一个构造函数
/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor
* creates a new thread
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue}
* or {@code threadFactory} or {@code handler} is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
...
}
我们一般会使用下面这个重载版本。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
对于每个参数的含义注释已经写的很清楚了,只是没有实践过的话理解可能不到位。所以一会我们会结合实际来看一下,下面我们简单的解释一下这些配置参数。
- corePoolSize
线程池中核心线程数目,会一直驻留在线程池中(除非设置allowCoreThreadTimeOut为true,默认为false)。
- maximumPoolSize
整个线程池允许创建的最大线程数,这个数目包含核心线程数。例如其设置为5,corePoolSize设置为3,那么最多可以再创建2个线程。
- workQueue:
当新任务到来时,如果没有闲着的核心线程,任务首先会被存放在队列中。
- keepAliveTime:
那些闲着的非核心线程的存活时间
-
unit
keepAliveTime 参数的时间单位。
-
handler
饱和策略,当线程池没有能力再接收新任务时的处理策略,平台为我们预定义了4种
AbortPolicy:直接抛RejectedExecutionException
异常,告知程序线程池已经满负荷了,无法接收新任务
CallerRunsPolicy:让调用线程池的那个线程执行新任务。其实就是因为线程池满负荷了没法执行,它自己把任务执行了。
DiscardOldestPolicy:将任务队列队首第一个任务给丢弃掉,腾出个位置给新任务。
DiscardPolicy:默默的把新任务扔了,连个水花都没有...
从上面的解释我们可以得出,一个线程池最大负荷为 maximumPoolSize + workQueue
个任务。
前三个参数最为重要,配置的时候需要一定的考量,需要根据自己的业务和执行环境来调节。下面是广为流传的配置线程池最大线程个数的一个公式,但是这个只做参考,具体还是要根据自己的实际情况来调节
- CPU 密集型任务(N+1)
系统大部分时间都在占用CPU 资源,例如内存排序,计算公式等工作,可以将最大线程数设置为 CPU 核心数+1。比 CPU 核心数多出来的一个线程是为了防止线程偶发的缺页中断,或者其它原因导致的任务暂停而带来的影响。一旦任务暂停,CPU 就会处于空闲状态,而在这种情况下多出来的一个线程就可以充分利用 CPU 的空闲时间。
- I/O 密集型任务(2N)
系统大部分时间都在处理I/O交互,例如读取网络文件等工作,而此时是不占用CPU处理时间的。所以我们将线程池的最大线程配置为CUP核数的2倍
ThreadPoolTaskExecutor
上面我们了解了一ThreadPoolExecutor
,它是java提供的类。Spring提供了一个它的包装类ThreadPoolTaskExecutor
,使得其更容易在spring中使用。我们在Spring程序中一般使用这个类,各个参数含义与ThreadPoolExecutor几乎一样。
了解了线程池的一些概念,让我们来完成配置自定义线程池的任务吧。
- 在配置文件中申明一个TaskExecutor类型的Bean
@EnableAsync
@Configuration
public class ConcurrencyConfig {
@Bean
public TaskExecutor threadPoolExecutorCpu(){
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(2);
executor.setMaxPoolSize(3);
executor.setQueueCapacity(2);
executor.setKeepAliveSeconds(1);
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setThreadNamePrefix("task-thread-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
executor.initialize();
return executor;
}
}
其中大部分参数我们已经在上面讲过了,我这里设置核心线程数目为2,最大线程数目为3,任务队列容量为2,非核心线程闲暇时存活时间为1秒,线程前缀为"task-thread-",无法接收新任务时的策略为AbortPolicy。
- 将线程池配置给
@Async
,如果只有一个线程池的话是可选的
@Async("threadPoolExecutorCpu")
public void runAsync(Integer id){
log.info("start:{},num:{}",Thread.currentThread().getId(),id);
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
log.info("end:{},num:{}",Thread.currentThread().getId(),id);
}
验证线程池配置
我们来实际验证一下,加深印象。
下面是一个controller方法,使用postman调用时传入一个count参数来可以产生count
个线程调用,模拟并发,每个线程启动时间隔200毫秒,这样线程就有了顺序。
@GetMapping("/run-async")
public String runAsync(@RequestParam("count") Integer count) {
List<Integer> collect = IntStream.rangeClosed(1, count).boxed().collect(Collectors.toList());
for (int i : collect) {
new Thread(() -> concurrencyService.runAsync(i)).start();
try {
Thread.sleep(200);
} catch (InterruptedException e) {
log.error("error", e);
}
}
return "ok";
}
拒绝策略为AbortPolicy
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
- 5个并发输出:
2023-03-07 16:59:36.944 INFO 17512 --- [ task-thread-1] t.s.c.concurrency.ConcurrencyService : start:42,num:1
2023-03-07 16:59:37.144 INFO 17512 --- [ task-thread-2] t.s.c.concurrency.ConcurrencyService : start:44,num:2
2023-03-07 16:59:37.819 INFO 17512 --- [ task-thread-3] t.s.c.concurrency.ConcurrencyService : start:48,num:5
2023-03-07 16:59:39.975 INFO 17512 --- [ task-thread-1] t.s.c.concurrency.ConcurrencyService : end:42,num:1
2023-03-07 16:59:39.976 INFO 17512 --- [ task-thread-1] t.s.c.concurrency.ConcurrencyService : start:42,num:3
2023-03-07 16:59:40.158 INFO 17512 --- [ task-thread-2] t.s.c.concurrency.ConcurrencyService : end:44,num:2
2023-03-07 16:59:40.158 INFO 17512 --- [ task-thread-2] t.s.c.concurrency.ConcurrencyService : start:44,num:4
2023-03-07 16:59:40.820 INFO 17512 --- [ task-thread-3] t.s.c.concurrency.ConcurrencyService : end:48,num:5
2023-03-07 16:59:42.988 INFO 17512 --- [ task-thread-1] t.s.c.concurrency.ConcurrencyService : end:42,num:3
2023-03-07 16:59:43.169 INFO 17512 --- [ task-thread-2] t.s.c.concurrency.ConcurrencyService : end:44,num:4
来分析一下结果:
第一:线程都是以task-thread
开头的,所以都是线程池的线程
第二:
核心线程42执行任务1,核心线程44执行任务2,两个核心线程都被占用。
任务3和4进入队列,因为队列容量为2,所以队列满了,因为最大线程数为3,于是启动一个新线程48执行任务5。
线程42完成任务1,然后从队列头部将任务3取出执行。
线程44完成任务2,然后从队列头部将任务4取出执行。
线程48完成任务5
线程42完成任务3
线程44完成任务4
执行结果和我们预想的一样,且这个线程池最多可以同时执行5个任务,再多就要触发饱和策略了。
- 6个并发输出:
2023-03-07 17:16:31.478 INFO 18636 --- [ task-thread-1] t.s.c.concurrency.ConcurrencyService : start:42,num:1
2023-03-07 17:16:31.683 INFO 18636 --- [ task-thread-2] t.s.c.concurrency.ConcurrencyService : start:44,num:2
2023-03-07 17:16:32.313 INFO 18636 --- [ task-thread-3] t.s.c.concurrency.ConcurrencyService : start:48,num:5
Exception in thread "Thread-12" org.springframework.core.task.TaskRejectedException: Executor [java.util.concurrent.ThreadPoolExecutor@68e7e518[Running, pool size = 3, active threads = 3, queued tasks = 2, completed tasks = 0]] did not accept task: org.springframework.aop.interceptor.AsyncExecutionInterceptor$$Lambda$743/0x00000008004c3840@29daef23
...
2023-03-07 17:16:34.491 INFO 18636 --- [ task-thread-1] t.s.c.concurrency.ConcurrencyService : end:42,num:1
2023-03-07 17:16:34.491 INFO 18636 --- [ task-thread-1] t.s.c.concurrency.ConcurrencyService : start:42,num:3
2023-03-07 17:16:34.689 INFO 18636 --- [ task-thread-2] t.s.c.concurrency.ConcurrencyService : end:44,num:2
2023-03-07 17:16:34.690 INFO 18636 --- [ task-thread-2] t.s.c.concurrency.ConcurrencyService : start:44,num:4
2023-03-07 17:16:35.324 INFO 18636 --- [ task-thread-3] t.s.c.concurrency.ConcurrencyService : end:48,num:5
2023-03-07 17:16:37.503 INFO 18636 --- [ task-thread-1] t.s.c.concurrency.ConcurrencyService : end:42,num:3
2023-03-07 17:16:37.700 INFO 18636 --- [ task-thread-2] t.s.c.concurrency.ConcurrencyService : end:44,num:4
从输出结果可以看到,任务6抛了RejectedExecutionException异常。
拒绝策略为DiscardOldestPolicy
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());
6个并发输出:
2023-03-07 17:39:59.405 INFO 3344 --- [ task-thread-1] t.s.c.concurrency.ConcurrencyService : start:56,num:1
2023-03-07 17:39:59.600 INFO 3344 --- [ task-thread-2] t.s.c.concurrency.ConcurrencyService : start:58,num:2
2023-03-07 17:40:00.214 INFO 3344 --- [ task-thread-3] t.s.c.concurrency.ConcurrencyService : start:62,num:5
2023-03-07 17:40:02.414 INFO 3344 --- [ task-thread-1] t.s.c.concurrency.ConcurrencyService : end:56,num:1
2023-03-07 17:40:02.414 INFO 3344 --- [ task-thread-1] t.s.c.concurrency.ConcurrencyService : start:56,num:4
2023-03-07 17:40:02.610 INFO 3344 --- [ task-thread-2] t.s.c.concurrency.ConcurrencyService : end:58,num:2
2023-03-07 17:40:02.611 INFO 3344 --- [ task-thread-2] t.s.c.concurrency.ConcurrencyService : start:58,num:6
2023-03-07 17:40:03.226 INFO 3344 --- [ task-thread-3] t.s.c.concurrency.ConcurrencyService : end:62,num:5
2023-03-07 17:40:05.421 INFO 3344 --- [ task-thread-1] t.s.c.concurrency.ConcurrencyService : end:56,num:4
2023-03-07 17:40:05.616 INFO 3344 --- [ task-thread-2] t.s.c.concurrency.ConcurrencyService : end:58,num:6
从上面的输出可以发现,任务3没有被执行。因为任务3最先入队,所以当任务6来的时候饱和按照策略将其删除了。
拒绝策略为CallerRunsPolicy
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
6个并发输出:
2023-03-07 17:40:58.578 INFO 19116 --- [ task-thread-1] t.s.c.concurrency.ConcurrencyService : start:53,num:1
2023-03-07 17:40:58.739 INFO 19116 --- [ task-thread-2] t.s.c.concurrency.ConcurrencyService : start:55,num:2
2023-03-07 17:40:59.366 INFO 19116 --- [ task-thread-3] t.s.c.concurrency.ConcurrencyService : start:59,num:5
2023-03-07 17:40:59.580 INFO 19116 --- [ Thread-12] t.s.c.concurrency.ConcurrencyService : start:60,num:6
2023-03-07 17:41:01.587 INFO 19116 --- [ task-thread-1] t.s.c.concurrency.ConcurrencyService : end:53,num:1
2023-03-07 17:41:01.587 INFO 19116 --- [ task-thread-1] t.s.c.concurrency.ConcurrencyService : start:53,num:3
2023-03-07 17:41:01.753 INFO 19116 --- [ task-thread-2] t.s.c.concurrency.ConcurrencyService : end:55,num:2
2023-03-07 17:41:01.753 INFO 19116 --- [ task-thread-2] t.s.c.concurrency.ConcurrencyService : start:55,num:4
2023-03-07 17:41:02.375 INFO 19116 --- [ task-thread-3] t.s.c.concurrency.ConcurrencyService : end:59,num:5
2023-03-07 17:41:02.587 INFO 19116 --- [ Thread-12] t.s.c.concurrency.ConcurrencyService : end:60,num:6
2023-03-07 17:41:04.589 INFO 19116 --- [ task-thread-1] t.s.c.concurrency.ConcurrencyService : end:53,num:3
2023-03-07 17:41:04.756 INFO 19116 --- [ task-thread-2] t.s.c.concurrency.ConcurrencyService : end:55,num:4
从输出可以看到,任务6运行在线程 Thread-12上,这个不是线程池的线程,线程池的线程都是以task-thread开头的。因为线程池的并发是5,所以第6个并发任务按照饱和策略就在调用线程执行了。
拒绝策略为DiscardPolicy
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
6个并发输出:
2023-03-07 16:59:36.944 INFO 17512 --- [ task-thread-1] t.s.c.concurrency.ConcurrencyService : start:42,num:1
2023-03-07 16:59:37.144 INFO 17512 --- [ task-thread-2] t.s.c.concurrency.ConcurrencyService : start:44,num:2
2023-03-07 16:59:37.819 INFO 17512 --- [ task-thread-3] t.s.c.concurrency.ConcurrencyService : start:48,num:5
2023-03-07 16:59:39.975 INFO 17512 --- [ task-thread-1] t.s.c.concurrency.ConcurrencyService : end:42,num:1
2023-03-07 16:59:39.976 INFO 17512 --- [ task-thread-1] t.s.c.concurrency.ConcurrencyService : start:42,num:3
2023-03-07 16:59:40.158 INFO 17512 --- [ task-thread-2] t.s.c.concurrency.ConcurrencyService : end:44,num:2
2023-03-07 16:59:40.158 INFO 17512 --- [ task-thread-2] t.s.c.concurrency.ConcurrencyService : start:44,num:4
2023-03-07 16:59:40.820 INFO 17512 --- [ task-thread-3] t.s.c.concurrency.ConcurrencyService : end:48,num:5
2023-03-07 16:59:42.988 INFO 17512 --- [ task-thread-1] t.s.c.concurrency.ConcurrencyService : end:42,num:3
2023-03-07 16:59:43.169 INFO 17512 --- [ task-thread-2] t.s.c.concurrency.ConcurrencyService : end:44,num:4
第6个任务被默默的拒绝了,没有被执行。
@Async 使用
我们知道,Spring 使用动态代理来使@Async
其作用,所以要求其修饰的方法必须为public
级别,且不能在同一个类调用。其修饰的方法返回值必须是void
或者Future
。所以在必要的时候,我们可以返回CompletableFuture
,然后使用其强大的功能完成异步工作。
@Async
public CompletableFuture<String> getFirstName() {
log.info("start get first name");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return CompletableFuture.completedFuture("shusheng");
}
@Async
public CompletableFuture<String> getLastName() {
log.info("start get last name");
try {
Thread.sleep(4000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return CompletableFuture.completedFuture("007");
}
CompletableFuture
是处理异步编程非常强大的工具,我们应该在合适的时机优先使用。
总结
当前由于框架的广泛使用,程序员并发编程的机会其实没有那么多,但是掌握其知识却是基本功,所以努力吧,少年。。。
源码
一如既往,你可以从GitHub上找到本文的源码:spring-learn
文章评论
大佬多多更新,我爱看