# Java拾遗-并发-ThreadPool
一个个小例子深入了解知识点
代码地址
# 12. ThreadPool
ThreadPool 的例子加学习
# 12.1. Executors
/**
* 线程池 - Executor接口,ExecutorService接口,Executors类
*
* Executors类里默认提供了一些线程池,不过都不推荐使用,推荐自定义
*
* isShutDown: 当调用shutdown()或shutdownNow()方法后返回为true
* isTerminated: 当调用shutdown()方法后,并且所有提交的任务完成后返回为true
*
* newSingleThreadExecutor和newFixedThreadPool: 默认提供的线程池使用的任务队列是LinkedBlockingQueue,
* 构造方法默认大小是Integer.MAX_VALUE,堆积的请求处理队列可能会耗费非常大的内存
*
* newCachedThreadPool: 核心线程数是0,最大线程数是Integer.MAX_VALUE,可能会创建数量非常多的线程,
* 任务队列使用的SynchronousQueue
*
* newScheduledThreadPool: 支持定时以及周期性执行任务,核心线程数需要指定,
* 最大线程数是Integer.MAX_VALUE,可能会创建数量非常多的线程,任务队列使用的DelayedWorkQueue
*
* SingleThreadPool和FixedThreadPool允许的请求队列长度为Integer.MAX_VALUE,
* 可能会堆积大量的请求,从而导致OOM
*
* CachedThreadPool允许的创建线程数量为Integer.MAX_VALUE,可能会创建大量的线程,从而导致OOM
*
* @author wliduo[i@dolyw.com]
* @date 2020/4/26 17:17
*/
public class T12_ThreadPool_1_Executors {
public static class CustomExecutor implements Executor {
@Override
public void execute(Runnable command) {
// new Thread(command).run();
command.run();
}
}
/**
* Executor接口和ExecutorService接口
*
* @param
* @return void
* @throws
* @author wliduo[i@dolyw.com]
* @date 2020/4/27 17:14
*/
public static void executorAndExecutorService() {
// Executor接口
new CustomExecutor().execute(() -> System.out.println("Hello Executor"));
// 官方默认提供的线程池newFixedThreadPool
ExecutorService executorService = Executors.newFixedThreadPool(10);
// 线程池启动6个线程
for (int i = 0; i < 6; i++) {
executorService.execute(() -> {
try {
TimeUnit.MILLISECONDS.sleep(555);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName());
});
}
System.out.println(executorService);
// 停止线程池
executorService.shutdown();
System.out.println(executorService.isTerminated());
System.out.println(executorService.isShutdown());
System.out.println(executorService);
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(executorService.isTerminated());
System.out.println(executorService.isShutdown());
System.out.println(executorService);
}
/**
* 单线程的线程池newSingleThreadExecutor
* 任务队列LinkedBlockingQueue不赋值,构造方法默认大小Integer.MAX_VALUE
* 无法自定义拒绝策略
* 堆积的请求处理队列可能会耗费非常大的内存
*
* @param
* @return void
* @throws
* @author wliduo[i@dolyw.com]
* @date 2020/4/27 17:04
*/
public static void newSingleThreadExecutor() {
System.out.println("-----newSingleThreadExecutor-----");
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
for (int i = 0; i < 5; i++) {
final int j = i;
singleThreadExecutor.execute(() -> {
System.out.println(j + " " + Thread.currentThread().getName());
});
}
singleThreadExecutor.shutdown();
}
/**
* 固定线程池newFixedThreadPool
* 核心线程数和最大线程数必须赋值,而且一样
* 任务队列LinkedBlockingQueue不赋值,构造方法默认大小Integer.MAX_VALUE
* 无法自定义拒绝策略
* 堆积的请求处理队列可能会耗费非常大的内存
*
* @param
* @return void
* @throws
* @author wliduo[i@dolyw.com]
* @date 2020/4/27 17:04
*/
public static void newFixedThreadPool() {
System.out.println("-----newFixedThreadPool-----");
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(10);
for (int i = 0; i < 5; i++) {
final int j = i;
fixedThreadPool.execute(() -> {
System.out.println(j + " " + Thread.currentThread().getName());
});
}
fixedThreadPool.shutdown();
}
/**
* 缓存线程池newCachedThreadPool
* 核心线程数是0,最大线程数是Integer.MAX_VALUE,可能会创建数量非常多的线程
* 任务队列使用的SynchronousQueue
* 无法自定义拒绝策略
*
* @param
* @return void
* @throws
* @author wliduo[i@dolyw.com]
* @date 2020/4/27 17:04
*/
public static void newCachedThreadPool() {
System.out.println("-----newCachedThreadPool-----");
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
System.out.println(cachedThreadPool);
for (int i = 0; i < 2; i++) {
cachedThreadPool.execute(() -> {
try {
TimeUnit.MILLISECONDS.sleep(499);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName());
});
}
System.out.println(cachedThreadPool);
try {
TimeUnit.SECONDS.sleep(80);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(cachedThreadPool);
}
/**
* newScheduledThreadPool线程池支持定时以及周期性执行任务
* 核心线程数需要指定,最大线程数是Integer.MAX_VALUE,可能会创建数量非常多的线程
* 任务队列使用的DelayedWorkQueue
* 无法自定义拒绝策略
*
* @param
* @return void
* @throws
* @author wliduo[i@dolyw.com]
* @date 2020/4/27 18:06
*/
public static void newScheduledThreadPool() {
System.out.println("-----newScheduledThreadPool-----");
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(10);
scheduledThreadPool.scheduleAtFixedRate(() -> {
try {
TimeUnit.MILLISECONDS.sleep(new Random().nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName());
}, 0, 500, TimeUnit.MILLISECONDS);
}
public static void main(String[] args) {
// executorAndExecutorService();
// newSingleThreadExecutor();
// newCachedThreadPool();
newFixedThreadPool();
// newScheduledThreadPool();
}
}
# 12.2. ThreadPoolExecutor
/**
* 自定义线程池 - ThreadPoolExecutor
*
* 自定义线程池很重要,如果线程池中的线程数量过多,就会竞争稀缺的处理器和内存资源,浪费大量时间在上下文切换,
* 反之,线程数量过少,处理器的一个核就无法充分利用到(Java并发编程实战)
*
* N-Thread(线程数) = N-CPU * U-CPU * (1 + W/C)
*
* N-CPU: 处理器核的数目,可以通过Runtime.getRuntime().availableProcessors()获取
* U-CPU: 期望的CPU利用率,0-1之前,CPU稳定在百分之多少,一般不可能为1(百分之百)
* W/C: 等待时间与计算时间的比率
*
* 线程池6个参数
* int corePoolSize: 核心线程数
* int maximumPoolSize: 最大线程数
* long keepAliveTime: 空闲线程存活时间
* TimeUnit unit: 存活时间单位
* BlockingQueue<Runnable> workQueue: 任务队列
* ThreadFactory threadFactory: 线程工厂
* RejectedExecutionHandler handler: 拒绝策略
*
* 1. Running的线程小于corePoolSize,直接创建新的线程在Pool执行
* 2. Running的线程等于corePoolSize,则任务加入工作队列
* 3. Running的线程等于corePoolSize,工作队列已满,则加入大于corePoolSize小于maximumPoolSize线程
* 4. 全部满,执行拒绝策略
*
* 核心线程数: 线程池中会维护一个最小的线程数量,即使这些线程是空闲状态,他们也不会被销毁,
* 除非设置了allowCoreThreadTimeOut
*
* 空闲线程存活时间: 这个只作用于核心线程之外的线程,除非设置了allowCoreThreadTimeOut
*
* 默认提供线程工厂
* Executors.defaultThreadFactory()
* Executors.privilegedThreadFactory()
*
* 默认提供拒绝策略
* new ThreadPoolExecutor.AbortPolicy(): 直接丢弃任务,并抛出RejectedExecutionException异常
* new ThreadPoolExecutor.DiscardPolicy(): 直接丢弃任务,什么都不做
* new ThreadPoolExecutor.DiscardOldestPolicy(): 抛弃队列最早的那个任务,然后尝试把这次拒绝的任务放入队列
* new ThreadPoolExecutor.CallerRunsPolicy(): 不在新线程中执行任务,而是由调用者所在的线程来执行,
* 除非线程池已经isShutDown,则直接抛弃任务
*
* @author wliduo[i@dolyw.com]
* @date 2020/4/26 17:17
*/
public class T12_ThreadPool_2_ThreadPoolExecutor {
public static class Task implements Runnable {
private String name;
public Task(String name) {
this.name = name;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " Task " + name);
try {
// 每个线程阻塞
System.in.read();
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public String toString() {
return "Task{" + "no=" + name + '}';
}
}
/**
* 自定义拒绝策略
*
* @author wliduo[i@dolyw.com]
* @date 2020/4/27 18:18
*/
public static class CustomRejectedExecutionHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
// log("r rejected")
// save r kafka mysql redis
// try 3 times
if(executor.getQueue().size() < 10000) {
// try put again();
}
}
}
public static void main(String[] args) {
// 创建线程池
/*ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 4,
60, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(4),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.CallerRunsPolicy());*/
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 4,
60, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(4),
Executors.privilegedThreadFactory(),
new CustomRejectedExecutionHandler());
// 启动8个任务
for (Integer i = 0; i < 8; i++) {
threadPoolExecutor.execute(new Task(i.toString()));
}
// 输出当前线程池任务队列
System.out.println(threadPoolExecutor.getQueue());
// 再启动一个任务,超过了最大核心线程数+队列数(4+4=8),第9个任务将执行拒绝策略
threadPoolExecutor.execute(new Task("M"));
// 关闭线程池
threadPoolExecutor.shutdown();
}
}
# 12.3. Callable_Future
/**
* 带返回值的异步线程 - Callable
*
* 使用submit提交Future进行接收返回值,future.get()阻塞获取返回结果
*
* future.get()还可以设置阻塞时间,超过了直接抛出TimeoutException异常
*
* @author wliduo[i@dolyw.com]
* @date 2020/4/26 17:17
*/
public class T12_ThreadPool_3_Callable_Future {
public static class Task implements Callable<String> {
@Override
public String call() throws Exception {
Thread.sleep(1000);
return "Hello Callable";
}
}
public static void main(String[] args) {
// 创建线程池
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 4,
60, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(4),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.CallerRunsPolicy());
// 异步提交任务
Future<String> future = threadPoolExecutor.submit(new Task());
System.out.println("Start");
try {
// 阻塞
// System.out.println(future.get());
// 阻塞多久超时
System.out.println(future.get(500, TimeUnit.MILLISECONDS));
} catch (TimeoutException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
System.out.println("End");
// 关闭线程池
threadPoolExecutor.shutdown();
}
}
# 12.4. FutureTask
/**
* 带返回值的异步线程 - FutureTask - 继承Runnable, Future
* 使用FutureTask声明结果值,可以直接用execute()执行
*
* 还有一种使用,Guava提供了FutureCallback接口,可以在成功或失败时回调处理,但是代码不太好维护
* 还不如直接拿到结果过进行处理就是
* https://github.com/bjmashibing/JUC/blob/master/src/main/java/com/mashibing/juc/c_027_future_to_loom/T02_ListenableFuture.java
*
* @author wliduo[i@dolyw.com]
* @date 2020/4/26 17:17
*/
public class T12_ThreadPool_4_FutureTask {
public static class Task implements Callable<String> {
@Override
public String call() throws Exception {
Thread.sleep(1000);
return "Hello Callable";
}
}
public static void main(String[] args) {
// 线程执行
FutureTask<String> futureTaskTemp = new FutureTask<String>(() -> {
return "Hello FutureTask";
});
new Thread(futureTaskTemp).start();
try {
System.out.println(futureTaskTemp.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
// 创建线程池
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 4,
60, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(4),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.CallerRunsPolicy());
// FutureTask声明任务
FutureTask<String> futureTask = new FutureTask<>(new Task());
// 异步提交任务execute也行
threadPoolExecutor.execute(futureTask);
System.out.println("Start");
try {
// 阻塞
System.out.println(futureTask.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
System.out.println("End");
// 关闭线程池
threadPoolExecutor.shutdown();
}
}
# 12.5. CompletableFuture
/**
* CompletableFuture - 线程异步结果汇总操作
*
* 下面代码,priceOfTM(),priceOfTB(),priceOfJD()三个方法代码去不同地方查询价格
* delay()睡眠500ms
*
* @author wliduo[i@dolyw.com]
* @date 2020/4/26 17:17
*/
public class T12_ThreadPool_5_CompletableFuture {
private static double priceOfTM() {
delay();
return 1.00;
}
private static double priceOfTB() {
delay();
return 2.00;
}
private static double priceOfJD() {
delay();
return 3.00;
}
/*private static double priceOfAmazon() {
delay();
throw new RuntimeException("product not exist!");
}*/
private static void delay() {
try {
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
long start, end;
// 按顺序执行
start = System.currentTimeMillis();
priceOfTM();
priceOfTB();
priceOfJD();
end = System.currentTimeMillis();
System.out.println("use serial method call! " + (end - start));
// 线程异步执行
start = System.currentTimeMillis();
CompletableFuture<Double> futureTM = CompletableFuture.supplyAsync(() -> priceOfTM());
CompletableFuture<Double> futureTB = CompletableFuture.supplyAsync(() -> priceOfTB());
CompletableFuture<Double> futureJD = CompletableFuture.supplyAsync(() -> priceOfJD());
// join等三个任务执行完成
CompletableFuture.allOf(futureTM, futureTB, futureJD).join();
// 做一些操作
/*CompletableFuture.supplyAsync(() -> priceOfTM())
.thenApply(String::valueOf)
.thenApply(str -> "price " + str)
.thenAccept(System.out::println);*/
end = System.currentTimeMillis();
System.out.println("use completable future! " + (end - start));
try {
// Main程序不停止
System.in.read();
} catch (IOException e) {
e.printStackTrace();
}
}
}
# 12.6. WorkStealingPool
/**
* WorkStealingPool - JDK1.8新增newWorkStealingPool默认线程池
* https://blog.csdn.net/tjbsl/article/details/98480843
*
* 使用多个Work Queue,采用Work Stealing算法,多个线程在执行的时候,线程1执行完了,
* 会自动去拿一些别的线程的任务来执行,分担别的线程的任务
*
* 底层使用的是ForkJoinPool
*
* @author wliduo[i@dolyw.com]
* @date 2020/4/26 17:17
*/
public class T12_ThreadPool_6_WorkStealingPool {
public static class R implements Runnable {
int time;
R(int t) {
this.time = t;
}
@Override
public void run() {
try {
TimeUnit.MILLISECONDS.sleep(time);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(time + " " + Thread.currentThread().getName());
}
}
public static void main(String[] args) throws IOException {
ExecutorService service = Executors.newWorkStealingPool();
System.out.println(Runtime.getRuntime().availableProcessors());
service.execute(new R(1000));
service.execute(new R(2000));
service.execute(new R(2000));
// daemon
service.execute(new R(2000));
service.execute(new R(2000));
// 由于产生的是精灵线程(守护线程、后台线程),主线程不阻塞的话,看不到输出
System.in.read();
}
}
# 12.7. ForkJoinPool
/**
* ForkJoinPool - JDK1.8新增newWorkStealingPool的底层实现
* https://blog.csdn.net/tjbsl/article/details/98480843
*
* 分解汇总的任务
* 用很少的线程可以执行很多的任务(子任务),ThreadPoolExecutor做不到先执行子任务
* CPU密集型
*
* @author wliduo[i@dolyw.com]
* @date 2020/4/26 17:17
*/
public class T12_ThreadPool_7_ForkJoinPool {
public static class TestForkJoinPool {
static int[] nums = new int[1000000];
static final int MAX_NUM = 50000;
static Random r = new Random();
static {
for (int i = 0; i < nums.length; i++) {
nums[i] = r.nextInt(100);
}
// stream api
System.out.println("---" + Arrays.stream(nums).sum());
}
public static class AddTask extends RecursiveAction {
int start, end;
AddTask(int s, int e) {
start = s;
end = e;
}
@Override
protected void compute() {
if (end - start <= MAX_NUM) {
long sum = 0L;
for (int i = start; i < end; i++) sum += nums[i];
System.out.println("from:" + start + " to:" + end + " = " + sum);
} else {
int middle = start + (end - start) / 2;
AddTask subTask1 = new AddTask(start, middle);
AddTask subTask2 = new AddTask(middle, end);
subTask1.fork();
subTask2.fork();
}
}
}
public static class AddTaskRet extends RecursiveTask<Long> {
private static final long serialVersionUID = 1L;
int start, end;
AddTaskRet(int s, int e) {
start = s;
end = e;
}
@Override
protected Long compute() {
if (end - start <= MAX_NUM) {
long sum = 0L;
for (int i = start; i < end; i++) sum += nums[i];
return sum;
}
int middle = start + (end - start) / 2;
AddTaskRet subTask1 = new AddTaskRet(start, middle);
AddTaskRet subTask2 = new AddTaskRet(middle, end);
subTask1.fork();
subTask2.fork();
return subTask1.join() + subTask2.join();
}
}
public static void main(String[] args) throws IOException {
/*ForkJoinPool fjp = new ForkJoinPool();
AddTask task = new AddTask(0, nums.length);
fjp.execute(task);*/
TestForkJoinPool temp = new TestForkJoinPool();
ForkJoinPool fjp = new ForkJoinPool();
AddTaskRet task = new AddTaskRet(0, nums.length);
fjp.execute(task);
long result = task.join();
System.out.println(result);
// System.in.read();
}
}
}
# 12.8. ParallelStream
/**
* ParallelStream - 并行流 - 底层用的ForkJoinPool
*
* 把任务都拆成子任务,不用保证线程同步安全可以使用加快效率
*
* @author wliduo[i@dolyw.com]
* @date 2020/4/27 18:36
*/
public class T12_ThreadPool_7_ParallelStream {
public static class ParallelStream {
/**
* 是不是质数
*
* @param num
* @return boolean
* @throws
* @author wliduo[i@dolyw.com]
* @date 2020/4/27 18:42
*/
public static boolean isPrime(int num) {
for (int i = 2; i <= num / 2; i++) {
if (num % i == 0) return false;
}
return true;
}
public static void main(String[] args) {
List<Integer> integerList = new ArrayList<>();
Random r = new Random();
for (int i = 0; i < 10000; i++) {
integerList.add(1000000 + r.nextInt(1000000));
}
// System.out.println(integerList);
long start = System.currentTimeMillis();
// Stream流
integerList.forEach(v -> isPrime(v));
long end = System.currentTimeMillis();
System.out.println(end - start);
// ParallelStream并行流,把任务都拆成子任务
// 不用保证线程同步安全可以使用加快效率
start = System.currentTimeMillis();
integerList.parallelStream().forEach(ParallelStream::isPrime);
end = System.currentTimeMillis();
System.out.println(end - start);
}
}
}