# Java拾遗-并发-ThreadPool

一个个小例子深入了解知识点

# 12. ThreadPool

ThreadPool 的例子加学习

# 12.1. Executors

/**
 * 线程池 - Executor接口,ExecutorService接口,Executors类
 *
 * Executors类里默认提供了一些线程池,不过都不推荐使用,推荐自定义
 *
 * isShutDown: 当调用shutdown()或shutdownNow()方法后返回为true
 * isTerminated: 当调用shutdown()方法后,并且所有提交的任务完成后返回为true
 *
 * newSingleThreadExecutor和newFixedThreadPool: 默认提供的线程池使用的任务队列是LinkedBlockingQueue,
 * 构造方法默认大小是Integer.MAX_VALUE,堆积的请求处理队列可能会耗费非常大的内存
 *
 * newCachedThreadPool: 核心线程数是0,最大线程数是Integer.MAX_VALUE,可能会创建数量非常多的线程,
 * 任务队列使用的SynchronousQueue
 *
 * newScheduledThreadPool: 支持定时以及周期性执行任务,核心线程数需要指定,
 * 最大线程数是Integer.MAX_VALUE,可能会创建数量非常多的线程,任务队列使用的DelayedWorkQueue
 *
 * SingleThreadPool和FixedThreadPool允许的请求队列长度为Integer.MAX_VALUE,
 * 可能会堆积大量的请求,从而导致OOM
 *
 * CachedThreadPool允许的创建线程数量为Integer.MAX_VALUE,可能会创建大量的线程,从而导致OOM
 *
 * @author wliduo[i@dolyw.com]
 * @date 2020/4/26 17:17
 */
public class T12_ThreadPool_1_Executors {

    public static class CustomExecutor implements Executor {

        @Override
        public void execute(Runnable command) {
            // new Thread(command).run();
            command.run();
        }
    }

    /**
     * Executor接口和ExecutorService接口
     *
     * @param
     * @return void
     * @throws
     * @author wliduo[i@dolyw.com]
     * @date 2020/4/27 17:14
     */
    public static void executorAndExecutorService() {
        // Executor接口
        new CustomExecutor().execute(() -> System.out.println("Hello Executor"));
        // 官方默认提供的线程池newFixedThreadPool
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        // 线程池启动6个线程
        for (int i = 0; i < 6; i++) {
            executorService.execute(() -> {
                try {
                    TimeUnit.MILLISECONDS.sleep(555);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName());
            });
        }
        System.out.println(executorService);
        // 停止线程池
        executorService.shutdown();
        System.out.println(executorService.isTerminated());
        System.out.println(executorService.isShutdown());
        System.out.println(executorService);
        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(executorService.isTerminated());
        System.out.println(executorService.isShutdown());
        System.out.println(executorService);
    }

    /**
     * 单线程的线程池newSingleThreadExecutor
     * 任务队列LinkedBlockingQueue不赋值,构造方法默认大小Integer.MAX_VALUE
     * 无法自定义拒绝策略
     * 堆积的请求处理队列可能会耗费非常大的内存
     *
     * @param
     * @return void
     * @throws
     * @author wliduo[i@dolyw.com]
     * @date 2020/4/27 17:04
     */
    public static void newSingleThreadExecutor() {
        System.out.println("-----newSingleThreadExecutor-----");
        ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
        for (int i = 0; i < 5; i++) {
            final int j = i;
            singleThreadExecutor.execute(() -> {
                System.out.println(j + " " + Thread.currentThread().getName());
            });
        }
        singleThreadExecutor.shutdown();
    }

    /**
     * 固定线程池newFixedThreadPool
     * 核心线程数和最大线程数必须赋值,而且一样
     * 任务队列LinkedBlockingQueue不赋值,构造方法默认大小Integer.MAX_VALUE
     * 无法自定义拒绝策略
     * 堆积的请求处理队列可能会耗费非常大的内存
     *
     * @param
     * @return void
     * @throws
     * @author wliduo[i@dolyw.com]
     * @date 2020/4/27 17:04
     */
    public static void newFixedThreadPool() {
        System.out.println("-----newFixedThreadPool-----");
        ExecutorService fixedThreadPool = Executors.newFixedThreadPool(10);
        for (int i = 0; i < 5; i++) {
            final int j = i;
            fixedThreadPool.execute(() -> {
                System.out.println(j + " " + Thread.currentThread().getName());
            });
        }
        fixedThreadPool.shutdown();
    }

    /**
     * 缓存线程池newCachedThreadPool
     * 核心线程数是0,最大线程数是Integer.MAX_VALUE,可能会创建数量非常多的线程
     * 任务队列使用的SynchronousQueue
     * 无法自定义拒绝策略
     *
     * @param
     * @return void
     * @throws
     * @author wliduo[i@dolyw.com]
     * @date 2020/4/27 17:04
     */
    public static void newCachedThreadPool() {
        System.out.println("-----newCachedThreadPool-----");
        ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
        System.out.println(cachedThreadPool);
        for (int i = 0; i < 2; i++) {
            cachedThreadPool.execute(() -> {
                try {
                    TimeUnit.MILLISECONDS.sleep(499);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName());
            });
        }
        System.out.println(cachedThreadPool);
        try {
            TimeUnit.SECONDS.sleep(80);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(cachedThreadPool);
    }

    /**
     * newScheduledThreadPool线程池支持定时以及周期性执行任务
     * 核心线程数需要指定,最大线程数是Integer.MAX_VALUE,可能会创建数量非常多的线程
     * 任务队列使用的DelayedWorkQueue
     * 无法自定义拒绝策略
     *
     * @param
     * @return void
     * @throws
     * @author wliduo[i@dolyw.com]
     * @date 2020/4/27 18:06
     */
    public static void newScheduledThreadPool() {
        System.out.println("-----newScheduledThreadPool-----");
        ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(10);
        scheduledThreadPool.scheduleAtFixedRate(() -> {
            try {
                TimeUnit.MILLISECONDS.sleep(new Random().nextInt(1000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName());
        }, 0, 500, TimeUnit.MILLISECONDS);
    }

    public static void main(String[] args) {
        // executorAndExecutorService();
        // newSingleThreadExecutor();
        // newCachedThreadPool();
        newFixedThreadPool();
        // newScheduledThreadPool();
    }

}

# 12.2. ThreadPoolExecutor

/**
 * 自定义线程池 - ThreadPoolExecutor
 *
 * 自定义线程池很重要,如果线程池中的线程数量过多,就会竞争稀缺的处理器和内存资源,浪费大量时间在上下文切换,
 * 反之,线程数量过少,处理器的一个核就无法充分利用到(Java并发编程实战)
 *
 * N-Thread(线程数) = N-CPU * U-CPU * (1 + W/C)
 *
 * N-CPU: 处理器核的数目,可以通过Runtime.getRuntime().availableProcessors()获取
 * U-CPU: 期望的CPU利用率,0-1之前,CPU稳定在百分之多少,一般不可能为1(百分之百)
 * W/C: 等待时间与计算时间的比率
 *
 * 线程池6个参数
 * int corePoolSize: 核心线程数
 * int maximumPoolSize: 最大线程数
 * long keepAliveTime: 空闲线程存活时间
 * TimeUnit unit: 存活时间单位
 * BlockingQueue<Runnable> workQueue: 任务队列
 * ThreadFactory threadFactory: 线程工厂
 * RejectedExecutionHandler handler: 拒绝策略
 *
 * 1. Running的线程小于corePoolSize,直接创建新的线程在Pool执行
 * 2. Running的线程等于corePoolSize,则任务加入工作队列
 * 3. Running的线程等于corePoolSize,工作队列已满,则加入大于corePoolSize小于maximumPoolSize线程
 * 4. 全部满,执行拒绝策略
 *
 * 核心线程数: 线程池中会维护一个最小的线程数量,即使这些线程是空闲状态,他们也不会被销毁,
 * 除非设置了allowCoreThreadTimeOut
 *
 * 空闲线程存活时间: 这个只作用于核心线程之外的线程,除非设置了allowCoreThreadTimeOut
 *
 * 默认提供线程工厂
 * Executors.defaultThreadFactory()
 * Executors.privilegedThreadFactory()
 *
 * 默认提供拒绝策略
 * new ThreadPoolExecutor.AbortPolicy(): 直接丢弃任务,并抛出RejectedExecutionException异常
 * new ThreadPoolExecutor.DiscardPolicy(): 直接丢弃任务,什么都不做
 * new ThreadPoolExecutor.DiscardOldestPolicy(): 抛弃队列最早的那个任务,然后尝试把这次拒绝的任务放入队列
 * new ThreadPoolExecutor.CallerRunsPolicy(): 不在新线程中执行任务,而是由调用者所在的线程来执行,
 * 除非线程池已经isShutDown,则直接抛弃任务
 *
 * @author wliduo[i@dolyw.com]
 * @date 2020/4/26 17:17
 */
public class T12_ThreadPool_2_ThreadPoolExecutor {

    public static class Task implements Runnable {
        private String name;

        public Task(String name) {
            this.name = name;
        }

        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName() + " Task " + name);
            try {
                // 每个线程阻塞
                System.in.read();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

        @Override
        public String toString() {
            return "Task{" + "no=" + name + '}';
        }
    }

    /**
     * 自定义拒绝策略
     *
     * @author wliduo[i@dolyw.com]
     * @date 2020/4/27 18:18
     */
    public static class CustomRejectedExecutionHandler implements RejectedExecutionHandler {

        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            // log("r rejected")
            // save r kafka mysql redis
            // try 3 times
            if(executor.getQueue().size() < 10000) {
                // try put again();
            }
        }
    }

    public static void main(String[] args) {
        // 创建线程池
        /*ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 4,
                60, TimeUnit.SECONDS,
                new ArrayBlockingQueue<Runnable>(4),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.CallerRunsPolicy());*/
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 4,
                60, TimeUnit.SECONDS,
                new ArrayBlockingQueue<Runnable>(4),
                Executors.privilegedThreadFactory(),
                new CustomRejectedExecutionHandler());
        // 启动8个任务
        for (Integer i = 0; i < 8; i++) {
            threadPoolExecutor.execute(new Task(i.toString()));
        }
        // 输出当前线程池任务队列
        System.out.println(threadPoolExecutor.getQueue());
        // 再启动一个任务,超过了最大核心线程数+队列数(4+4=8),第9个任务将执行拒绝策略
        threadPoolExecutor.execute(new Task("M"));
        // 关闭线程池
        threadPoolExecutor.shutdown();
    }

}

# 12.3. Callable_Future

/**
 * 带返回值的异步线程 - Callable
 *
 * 使用submit提交Future进行接收返回值,future.get()阻塞获取返回结果
 *
 * future.get()还可以设置阻塞时间,超过了直接抛出TimeoutException异常
 *
 * @author wliduo[i@dolyw.com]
 * @date 2020/4/26 17:17
 */
public class T12_ThreadPool_3_Callable_Future {

    public static class Task implements Callable<String> {
        @Override
        public String call() throws Exception {
            Thread.sleep(1000);
            return "Hello Callable";
        }
    }

    public static void main(String[] args) {
        // 创建线程池
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 4,
                60, TimeUnit.SECONDS,
                new ArrayBlockingQueue<Runnable>(4),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.CallerRunsPolicy());
        // 异步提交任务
        Future<String> future = threadPoolExecutor.submit(new Task());
        System.out.println("Start");
        try {
            // 阻塞
            // System.out.println(future.get());
            // 阻塞多久超时
            System.out.println(future.get(500, TimeUnit.MILLISECONDS));
        } catch (TimeoutException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        System.out.println("End");
        // 关闭线程池
        threadPoolExecutor.shutdown();
    }

}

# 12.4. FutureTask

/**
 * 带返回值的异步线程 - FutureTask - 继承Runnable, Future
 * 使用FutureTask声明结果值,可以直接用execute()执行
 *
 * 还有一种使用,Guava提供了FutureCallback接口,可以在成功或失败时回调处理,但是代码不太好维护
 * 还不如直接拿到结果过进行处理就是
 * https://github.com/bjmashibing/JUC/blob/master/src/main/java/com/mashibing/juc/c_027_future_to_loom/T02_ListenableFuture.java
 *
 * @author wliduo[i@dolyw.com]
 * @date 2020/4/26 17:17
 */
public class T12_ThreadPool_4_FutureTask {

    public static class Task implements Callable<String> {
        @Override
        public String call() throws Exception {
            Thread.sleep(1000);
            return "Hello Callable";
        }
    }

    public static void main(String[] args) {
        // 线程执行
        FutureTask<String> futureTaskTemp = new FutureTask<String>(() -> {
            return "Hello FutureTask";
        });
        new Thread(futureTaskTemp).start();
        try {
            System.out.println(futureTaskTemp.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        // 创建线程池
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 4,
                60, TimeUnit.SECONDS,
                new ArrayBlockingQueue<Runnable>(4),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.CallerRunsPolicy());
        // FutureTask声明任务
        FutureTask<String> futureTask = new FutureTask<>(new Task());
        // 异步提交任务execute也行
        threadPoolExecutor.execute(futureTask);
        System.out.println("Start");
        try {
            // 阻塞
            System.out.println(futureTask.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        System.out.println("End");
        // 关闭线程池
        threadPoolExecutor.shutdown();
    }

}

# 12.5. CompletableFuture

/**
 * CompletableFuture - 线程异步结果汇总操作
 *
 * 下面代码,priceOfTM(),priceOfTB(),priceOfJD()三个方法代码去不同地方查询价格
 * delay()睡眠500ms
 *
 * @author wliduo[i@dolyw.com]
 * @date 2020/4/26 17:17
 */
public class T12_ThreadPool_5_CompletableFuture {

    private static double priceOfTM() {
        delay();
        return 1.00;
    }

    private static double priceOfTB() {
        delay();
        return 2.00;
    }

    private static double priceOfJD() {
        delay();
        return 3.00;
    }

    /*private static double priceOfAmazon() {
        delay();
        throw new RuntimeException("product not exist!");
    }*/

    private static void delay() {
        try {
            TimeUnit.MILLISECONDS.sleep(500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        long start, end;
        // 按顺序执行
        start = System.currentTimeMillis();
        priceOfTM();
        priceOfTB();
        priceOfJD();
        end = System.currentTimeMillis();
        System.out.println("use serial method call! " + (end - start));
        // 线程异步执行
        start = System.currentTimeMillis();
        CompletableFuture<Double> futureTM = CompletableFuture.supplyAsync(() -> priceOfTM());
        CompletableFuture<Double> futureTB = CompletableFuture.supplyAsync(() -> priceOfTB());
        CompletableFuture<Double> futureJD = CompletableFuture.supplyAsync(() -> priceOfJD());
        // join等三个任务执行完成
        CompletableFuture.allOf(futureTM, futureTB, futureJD).join();
        // 做一些操作
        /*CompletableFuture.supplyAsync(() -> priceOfTM())
                .thenApply(String::valueOf)
                .thenApply(str -> "price " + str)
                .thenAccept(System.out::println);*/
        end = System.currentTimeMillis();
        System.out.println("use completable future! " + (end - start));
        try {
            // Main程序不停止
            System.in.read();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

}

# 12.6. WorkStealingPool

/**
 * WorkStealingPool - JDK1.8新增newWorkStealingPool默认线程池
 * https://blog.csdn.net/tjbsl/article/details/98480843
 *
 * 使用多个Work Queue,采用Work Stealing算法,多个线程在执行的时候,线程1执行完了,
 * 会自动去拿一些别的线程的任务来执行,分担别的线程的任务
 *
 * 底层使用的是ForkJoinPool
 *
 * @author wliduo[i@dolyw.com]
 * @date 2020/4/26 17:17
 */
public class T12_ThreadPool_6_WorkStealingPool {

    public static class R implements Runnable {
        int time;

        R(int t) {
            this.time = t;
        }

        @Override
        public void run() {
            try {
                TimeUnit.MILLISECONDS.sleep(time);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(time + " " + Thread.currentThread().getName());
        }
    }

    public static void main(String[] args) throws IOException {
        ExecutorService service = Executors.newWorkStealingPool();
        System.out.println(Runtime.getRuntime().availableProcessors());
        service.execute(new R(1000));
        service.execute(new R(2000));
        service.execute(new R(2000));
        // daemon
        service.execute(new R(2000));
        service.execute(new R(2000));
        // 由于产生的是精灵线程(守护线程、后台线程),主线程不阻塞的话,看不到输出
        System.in.read();
    }

}

# 12.7. ForkJoinPool

/**
 * ForkJoinPool - JDK1.8新增newWorkStealingPool的底层实现
 * https://blog.csdn.net/tjbsl/article/details/98480843
 *
 * 分解汇总的任务
 * 用很少的线程可以执行很多的任务(子任务),ThreadPoolExecutor做不到先执行子任务
 * CPU密集型
 *
 * @author wliduo[i@dolyw.com]
 * @date 2020/4/26 17:17
 */
public class T12_ThreadPool_7_ForkJoinPool {

    public static class TestForkJoinPool {
        static int[] nums = new int[1000000];
        static final int MAX_NUM = 50000;
        static Random r = new Random();

        static {
            for (int i = 0; i < nums.length; i++) {
                nums[i] = r.nextInt(100);
            }
            // stream api
            System.out.println("---" + Arrays.stream(nums).sum());
        }

        public static class AddTask extends RecursiveAction {
            int start, end;

            AddTask(int s, int e) {
                start = s;
                end = e;
            }

            @Override
            protected void compute() {
                if (end - start <= MAX_NUM) {
                    long sum = 0L;
                    for (int i = start; i < end; i++) sum += nums[i];
                    System.out.println("from:" + start + " to:" + end + " = " + sum);
                } else {
                    int middle = start + (end - start) / 2;

                    AddTask subTask1 = new AddTask(start, middle);
                    AddTask subTask2 = new AddTask(middle, end);
                    subTask1.fork();
                    subTask2.fork();
                }
            }
        }


        public static class AddTaskRet extends RecursiveTask<Long> {

            private static final long serialVersionUID = 1L;
            int start, end;

            AddTaskRet(int s, int e) {
                start = s;
                end = e;
            }

            @Override
            protected Long compute() {
                if (end - start <= MAX_NUM) {
                    long sum = 0L;
                    for (int i = start; i < end; i++) sum += nums[i];
                    return sum;
                }
                int middle = start + (end - start) / 2;
                AddTaskRet subTask1 = new AddTaskRet(start, middle);
                AddTaskRet subTask2 = new AddTaskRet(middle, end);
                subTask1.fork();
                subTask2.fork();
                return subTask1.join() + subTask2.join();
            }
        }

        public static void main(String[] args) throws IOException {
            /*ForkJoinPool fjp = new ForkJoinPool();
            AddTask task = new AddTask(0, nums.length);
            fjp.execute(task);*/
            TestForkJoinPool temp = new TestForkJoinPool();
            ForkJoinPool fjp = new ForkJoinPool();
            AddTaskRet task = new AddTaskRet(0, nums.length);
            fjp.execute(task);
            long result = task.join();
            System.out.println(result);
            // System.in.read();
        }
    }

}

# 12.8. ParallelStream

/**
 * ParallelStream - 并行流 - 底层用的ForkJoinPool
 *
 * 把任务都拆成子任务,不用保证线程同步安全可以使用加快效率
 *
 * @author wliduo[i@dolyw.com]
 * @date 2020/4/27 18:36
 */
public class T12_ThreadPool_7_ParallelStream {

    public static class ParallelStream {

        /**
         * 是不是质数
         *
         * @param num
         * @return boolean
         * @throws
         * @author wliduo[i@dolyw.com]
         * @date 2020/4/27 18:42
         */
        public static boolean isPrime(int num) {
            for (int i = 2; i <= num / 2; i++) {
                if (num % i == 0) return false;
            }
            return true;
        }

        public static void main(String[] args) {
            List<Integer> integerList = new ArrayList<>();
            Random r = new Random();
            for (int i = 0; i < 10000; i++) {
                integerList.add(1000000 + r.nextInt(1000000));
            }

            // System.out.println(integerList);

            long start = System.currentTimeMillis();
            // Stream流
            integerList.forEach(v -> isPrime(v));
            long end = System.currentTimeMillis();
            System.out.println(end - start);
            // ParallelStream并行流,把任务都拆成子任务
            // 不用保证线程同步安全可以使用加快效率
            start = System.currentTimeMillis();
            integerList.parallelStream().forEach(ParallelStream::isPrime);
            end = System.currentTimeMillis();
            System.out.println(end - start);
        }
    }

}
上次更新时间: 2023-12-15 03:14:55