并发编程系列之自定义线程池

本人花费半年的时间总结的《Java面试指南》已拿腾讯等大厂offer,已开源在github ,欢迎star!

本文GitHub https://github.com/OUYANGSIHAI/JavaInterview 已收录,这是我花了6个月总结的一线大厂Java面试总结,本人已拿大厂offer,欢迎star

原文链接:blog.ouyangsihai.cn >> 并发编程系列之自定义线程池

并发编程系列之自定义线程池

前言

前面我们在讲并发工具类的时候,多次提到线程池,今天我们就来走进线程池的旅地,首先我们先不讲线程池框架Executors,我们今天先来介绍如何自己定义一个线程池,是不是已经迫不及待了,那么就让我们开启今天的旅途吧。

并发编程系列之自定义线程池

什么是线程池?

线程池可以理解为一个专门管理线程生命周期的池子,里面的线程都可以由这个池子本身来调度,使用线程池有哪些好处呢?

  • 降低资源消耗:通过重复利用已创建的线程降低线程创建和销毁造成的消耗
  • 提高响应速度:当任务到达时,任务可以不需要的等到线程创建就能立即执行
  • 提高线程的可管理性:使用线程池可以对线程进行统一的分配,调优和监控

  • 提高响应速度:当任务到达时,任务可以不需要的等到线程创建就能立即执行

    并发编程系列之自定义线程池

    线程池的实现原理

    首先我们看下面这张图,对着图进行分析:

    并发编程系列之自定义线程池

    过程分析:当任务到达时,首先会判断核心线程池是否还有空闲线程,如果有则创建一个新的工作线程执行任务,如果没有空闲线程,则说明核心线程池已满,进行工作队列是否满的判断,如果没有满,则将任务存放在等待队列中,如果工作队列也满了,则再去判断线程池是否满,如果没有满,则新建一个线程来执行任务,否则采取拒绝策略;

    下面我们来对核心线程池提交任务过程进行分析,这是线程池的核心角色,首先我们看下源码:

    
    public void execute(Runnable command) {
            if (command == null)
                throw new NullPointerException();
            int c = ctl.get();
            // 如果工作线程数量小于核心线程数,则创建一个新的工作线程执行任务
            if (workerCountOf(c)  corePoolSize) {
                // 创建工作线程成功,则直接返回
                if (addWorker(command, true))
                    return;
                c = ctl.get();
            }
            // 工作线程核心线程数或者工作线程创建失败时,将任务放入等待队列中
            if (isRunning(c) && workQueue.offer(command)) {
                int recheck = ctl.get();
                if (! isRunning(recheck) && remove(command))
                    reject(command);
                else if (workerCountOf(recheck) == 0)
                    addWorker(null, false);
            }
            // 如果等待队列满了,并且工作线程数量大于线程池总数量,则采取拒绝策略
            else if (!addWorker(command, false))
                reject(command);
        }
    

    我们会发现,当可以分配线程来执行任务时,我们总是新建一个工作线程Worker来执行任务,我们来了解下Worker特殊的地方,工作线程不仅会执行当前的任务,而且当前任务执行完毕之后,还会去等待队列中获取任务来执行,通过工作线程的源码可以得知这一点:

    
    final void runWorker(Worker w) {
            Thread wt = Thread.currentThread();
            Runnable task = w.firstTask;
            w.firstTask = null;
            w.unlock(); // allow interrupts
            boolean completedAbruptly = true;
            try {
                while (task != null || (task = getTask()) != null) {
                    w.lock();
                    if ((runStateAtLeast(ctl.get(), STOP) ||
                         (Thread.interrupted() &&
                          runStateAtLeast(ctl.get(), STOP))) &&
                        !wt.isInterrupted())
                        wt.interrupt();
                    try {
                        beforeExecute(wt, task);
                        Throwable thrown = null;
                        try {
                            task.run();
                        } catch (RuntimeException x) {
                            thrown = x; throw x;
                        } catch (Error x) {
                            thrown = x; throw x;
                        } catch (Throwable x) {
                            thrown = x; throw new Error(x);
                        } finally {
                            afterExecute(task, thrown);
                        }
                    } finally {
                        task = null;
                        w.completedTasks++;
                        w.unlock();
                    }
                }
                completedAbruptly = false;
            } finally {
                processWorkerExit(w, completedAbruptly);
            }
        }
    

    上面我们讲到的工作队列,我们之前学到过队列有很多种,例如阻塞队列和非阻塞队列,有界队列和无界队列,那么在我们线程池中当使用不同的工作工作队列又会有什么区别呢?

  • 使用有界队列时:有新的任务需要执行时,如果线程池实际线程数小于corePoolSize,则优先创建线程,如果大于corePoolSize,则会将任务先加入到队列,等待执行,如果队列也满了,则在总线程数不大于maximumPoolSize时,先创建新的线程,如果线程数大于了maximumPoolSize则执行拒绝策略,或者其他自己自定义的处理策略;
  • 使用无界队列时:LinkedBlockingQueue,与有界队列相比,除非系统资源被耗尽,否则无界队列的任务队列不存在任务入队列失败的情况,当有新任务到来,系统的线程数小于corePoolSize时,则新建线程执行任务,当线程数量达到corePoolSize值后,则线程不会继续新建,如果此时还持续有任务进来,而没有空闲的线程资源,则任务会进入队列排队等待,若任务创建和处理的速度差异很大,则无界队列会保持快速增长,直到资源耗尽内存,任务会一直堆积,直到内存满了,这种情况永远不会有有界队列中工作线程和线程池总数的比较过程;
  • 使用无界队列时:LinkedBlockingQueue,与有界队列相比,除非系统资源被耗尽,否则无界队列的任务队列不存在任务入队列失败的情况,当有新任务到来,系统的线程数小于corePoolSize时,则新建线程执行任务,当线程数量达到corePoolSize值后,则线程不会继续新建,如果此时还持续有任务进来,而没有空闲的线程资源,则任务会进入队列排队等待,若任务创建和处理的速度差异很大,则无界队列会保持快速增长,直到资源耗尽内存,任务会一直堆积,直到内存满了,这种情况永远不会有有界队列中工作线程和线程池总数的比较过程;

    并发编程系列之自定义线程池

    创建线程池:自定义线程池也是通过ThreadPoolExecutor(线程池执行器)来实现,构造方法如下

    
    public ThreadPoolExecutor(int corePoolSize, //核心线程数--线程池初始化创建的线程数量  
                      int maximumPoolSize, // 最大线程数,线程池中能创建的最大线程数
                      long keepAliveTime, // 线程空闲等待时间
                     TimeUnit unit, // 线程空闲等待时间的单位
                     BlockingQueueRunnable workQueue, // 存放待执行任务的等待队列
                     RejectedExecutionHandler handler // 拒绝任务的处理策略) {
            this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
                 Executors.defaultThreadFactory(), handler);
        }
    

    特别说明,拒绝策略有如下几种:

  • AbortPolicy策略:该策略直接抛出异常,阻止系统工作
  • CallerRunsPolicy策略:只要线程池未关闭,该策略直接在调用者线程中运行当前被丢弃的任务。显然这样不会真的丢弃任务,但是,调用者线程性能可能急剧下降
  • DiscardOledestPolicy策略:丢弃最老的一个请求任务,也就是丢弃一个即将被执行的任务,并尝试再次提交当前任务
  • DiscardPolicy策略:不处理,直接丢弃掉
  • CallerRunsPolicy策略:只要线程池未关闭,该策略直接在调用者线程中运行当前被丢弃的任务。显然这样不会真的丢弃任务,但是,调用者线程性能可能急剧下降

    DiscardPolicy策略:不处理,直接丢弃掉

    向线程池提交任务:execute方法在上面已经介绍过了,这里就不重复介绍了

    关闭线程池:关闭线程池有下面2个方法

    
    // 将线程池的状态设置成SHUTDOWN状态,然后中断所有没有正在执行任务的线程
    public void shutdown() {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                checkShutdownAccess();
                advanceRunState(SHUTDOWN);
               // 调用线程中断方法
                interruptIdleWorkers();
                onShutdown(); // hook for ScheduledThreadPoolExecutor
            } finally {
                mainLock.unlock();
            }
            tryTerminate();
        }
    
    
    // 遍历线程池中的工作线程,然后逐个调用线程的interrupt方法来中断线程,所以无法响应中断的任务可能永远无法终止。
    // shutdownNow会首先将线程池的状态设置成STOP,然后尝试停止所有的正在执行或暂停任务的线程,
    // 并返回等待执行任务的列表,如果任务不一定要执行完,可以使用此方法   
    public ListRunnable shutdownNow() {
            ListRunnable tasks;
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                checkShutdownAccess();
                advanceRunState(STOP);
                interruptWorkers();
                tasks = drainQueue();
            } finally {
                mainLock.unlock();
            }
            tryTerminate();
            return tasks;
        }
    
    并发编程系列之自定义线程池

    原文始发于微信公众号(Justin的后端书架):

    本人花费半年的时间总结的《Java面试指南》已拿腾讯等大厂offer,已开源在github ,欢迎star!

    本文GitHub https://github.com/OUYANGSIHAI/JavaInterview 已收录,这是我花了6个月总结的一线大厂Java面试总结,本人已拿大厂offer,欢迎star

    原文链接:blog.ouyangsihai.cn >> 并发编程系列之自定义线程池


     上一篇
    并发编程系列之Semaphore 并发编程系列之Semaphore
    前言 上节我们介绍了Java中的并发工具类CountDownLatch和Cyclicbarrier,今天我们再来说说另外两个并发工具类:Semaphore(信号量)和Exchanger(交换者),首先我们先来说说信号量这个东西,结合我
    2021-04-05
    下一篇 
    数据库中间件 MyCAT 源码分析 —— 【单库单表】查询 数据库中间件 MyCAT 源码分析 —— 【单库单表】查询
    本文主要基于 MyCAT 1.6.5 正式版 1. 概述 2. 接收请求,解析 SQL 3. 获得路由结果 4. 获得 MySQL 连接,执行 SQL 5. 响应执行 SQL 结果 6. 其他 :更新 / 删除 友
    2021-04-05