侧边栏壁纸
  • 累计撰写 98 篇文章
  • 累计创建 20 个标签
  • 累计收到 3 条评论

线程池ThreadPoolExecutor

林贤钦
2020-05-25 / 0 评论 / 12 点赞 / 930 阅读 / 0 字
温馨提示:
本文最后更新于 2020-06-03,若内容或图片失效,请留言反馈。部分素材来自网络,若不小心影响到您的利益,请联系我们删除。

线程池ThreadPoolExecutor

1、线程池介绍

使用线程池的原因

  • 降低资源消耗。 创建/销毁线程需要消耗系统资源,线程池可以复⽤已创建的线程。

  • 提升系统响应速度。并发数量过多,可能会导致资源消耗过多,从⽽造成服务器崩溃。

    通过复用线程,省去创建线程的过程,因此整体上提升了系统的响应速度;

  • 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,因此,需要使用线程池来管理线程。

  • 提供更多更强大的功能:线程池具备可拓展性,允许开发人员向其中增加更多的功能。

    比如延时定时线程池ScheduledThreadPoolExecutor,就允许任务延期执行或定期执行。

什么时候使用线程池?

  • 单个任务处理时间比较短
  • 需要处理的任务数量很大

线程池的类图

Executor框架接口

Executor框架是一个根据一组执行策略调用,调度,执行和控制的异步任务的框架,目的是提供一种将”任务提交”与”任务如何运行”分离开来的机制。

J.U.C中有三个Executor接口

  • Executor:一个运行新任务的简单接口;

  • ExecutorService:扩展了Executor接口。添加了一些用来管理执行器生命周期和任务生命周期的方法;

    提供了管理终止的方法,以及可为跟踪一个或多个异步任务执行状况而生成 Future 的方法。增加了shutDown(),shutDownNow(),invokeAll(),invokeAny()和submit()等方法。如果需要支持即时关闭,也就是shutDownNow()方法,则任务需要正确处理中断。

  • ScheduledExecutorService:扩展了ExecutorService。支持Future和定期执行任务。

    调用schedule方法可以在指定的延时后执行一个Runnable或者Callable任务。ScheduledExecutorService接口还定义了按照指定时间间隔定期执行任务的scheduleAtFixedRate()方法和scheduleWithFixedDelay()方法。

2、线程池的工作原理

  1. 线程池中刚开始没有线程,当一个任务提交给线程池后,线程池会创建一个新线程来执行任务。
  2. 当线程数达到 corePoolSize 并没有线程空闲,这时再加入任务,新加的任务会被加入workQueue 队列排队,直到有空闲的线程。
  3. 如果队列选择了有界队列,那么任务超过了队列大小时,会创建 maximumPoolSize - corePoolSize 数目的线程来救急。
  4. 如果线程到达 maximumPoolSize 仍然有新任务这时会执行拒绝策略。拒绝策略 jdk 提供了 4 种实现
  5. 当高峰过去后,超过corePoolSize 的救急线程如果一段时间没有任务做,需要结束节省资源,这个时间由keepAliveTime 和 unit 来控制。

3、ThreadPoolExecutor分析

ThreadPoolExecutor的成员变量方法

//这些信息存储在一个原子变量 ctl 中,目的是将线程池状态
//与线程个数合二为一,这样就可以用一次 cas 原子操作进行赋值
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits
// runState存储在高位
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;
// c 为旧值, ctlOf 返回结果为新值
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))));
//runStateOf:获取运行状态 
private static int runStateOf(int c)     { return c & ~CAPACITY; }
//
private static int workerCountOf(int c)  { return c & CAPACITY; }
// rs 为高 3 位代表线程池状态, wc 为低 29 位代表线程个数,ctl 是合并它们
private static int ctlOf(int rs, int wc) { return rs | wc; }

从这可以看出ThreadPoolExecutor 使用 int 的高 3 位来表示线程池状态,低 29 位表示线程数量

状态名高3位接收新任处理阻塞队列任务说明
RUNNING111YY
SHUTDOWN000NY不会接收新任务,但会处理阻塞队列剩余任务
STOP001NN会中断正在执行的任务,并抛弃阻塞队列任务
TIDYING010--任务全执行完毕,活动线程为 0 即将进入终结
TERMINATED011--终结状态

从数字上比较,TERMINATED > TIDYING > STOP > SHUTDOWN > RUNNING

状态变化

  • 线程池创建后处于RUNNING状态。
  • 调⽤shutdown()⽅法后处于SHUTDOWN状态,线程池不能接受新的任务,清除⼀些空闲worker,会等待阻塞队列的任务完成。
  • 调⽤shutdownNow()⽅法后处于STOP状态,线程池不能接受新的任务,中断所有线程,阻塞队列中没有被执⾏的任务全部丢弃。此时,poolsize=0,阻塞队列的size也为0。
  • 当所有的任务已终⽌,ctl记录的”任务数量”为0,线程池会变为TIDYING状态。 接着会执⾏terminated()函数。
  • 线程池处在TIDYING状态时,执⾏完**terminated()**⽅法之后,就会由 TIDYING-> TERMINATED, 线程池被设置为TERMINATED状态

ThreadPoolExecutor的构造方法

// 五个参数的构造函数
public ThreadPoolExecutor(int corePoolSize,
                         int maximumPoolSize,
                         long keepAliveTime,
                         TimeUnit unit,
                         BlockingQueue<Runnable> workQueue)
// 六个参数的构造函数-1
public ThreadPoolExecutor(int corePoolSize,
                         int maximumPoolSize,
                         long keepAliveTime,
                         TimeUnit unit,
                         BlockingQueue<Runnable> workQueue,
                         ThreadFactory threadFactory)
// 六个参数的构造函数-2
public ThreadPoolExecutor(int corePoolSize,
                         int maximumPoolSize,
                         long keepAliveTime,
                         TimeUnit unit,
                         BlockingQueue<Runnable> workQueue,
                         RejectedExecutionHandler handler)
// 七个参数的构造函数
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)
  • int corePoolSize

    该线程池中核⼼线程数最⼤值。核⼼线程:线程池中有两类线程,核⼼线程和⾮核⼼线程。

    核⼼线程默认情况下会⼀直存在于线程池中,即使这个核⼼线程什么都不⼲(铁饭碗),⽽⾮核⼼线程如果⻓时间的闲置,就会被销毁(临时⼯)。

  • int maximumPoolSize

    该线程池中线程总数最⼤值 。

    该值等于核⼼线程数量 + ⾮核⼼线程数量。

  • long keepAliveTime

    ⾮核⼼线程闲置超时时⻓。

    ⾮核⼼线程如果处于闲置状态超过该值,就会被销毁。如果设置allowCoreThreadTimeOut(true),则会也作⽤于核⼼线程。

  • TimeUnit unit

    keepAliveTime的单位。TimeUnit是⼀个枚举类型 ,包括以下属性:

    NANOSECONDS : 1微毫秒 = 1微秒 / 1000 ,MICROSECONDS : 1微秒 =1毫秒 / 1000, MILLISECONDS : 1毫秒 = 1秒 /1000 SECONDS : 秒,MINUTES : 分 ,HOURS : ⼩时 ,DAYS : 天

  • BlockingQueue workQueue

    阻塞队列,维护着等待执⾏的Runnable任务 对象。

  • ThreadFactory threadFactory

    创建线程的⼯⼚ ,⽤于批量创建线程,统⼀在创建线程时设置⼀些参数,如是否守护线程、线程的优先级等。如果不指定,会新建⼀个默认的线程⼯⼚。

  • RejectedExecutionHandler handler

    拒绝处理策略,线程数量⼤于最⼤线程数就会采⽤拒绝处理策略


常⽤的⼏个阻塞队列:

  1. LinkedBlockingQueue

    链式阻塞队列,底层数据结构是链表,默认⼤⼩是 Integer.MAX_VALUE , 也可以指定⼤⼩。

  2. ArrayBlockingQueue

    数组阻塞队列,底层数据结构是数组,需要指定队列的⼤⼩。

  3. SynchronousQueue

    同步队列,内部容量为0,每个put操作必须等待⼀个take操作,反之亦

  4. DelayQueue

    延迟队列,该队列中的元素只有当其指定的延迟时间到了,才能够从队列 中获取到该元素 。

四种拒绝处理的策略

  1. ThreadPoolExecutor.AbortPolicy:默认拒绝处理策略,丢弃任务并抛出RejectedExecutionException异常。

  2. ThreadPoolExecutor.DiscardPolicy:丢弃新来的任务,但是不抛出异常。

  3. ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列头部(最旧的)的任务,然后重新尝试执⾏程序

    (如果再次失败,重复此过程)。

  4. ThreadPoolExecutor.CallerRunsPolicy:由调⽤线程处理该任务

execute方法

处理任务的核⼼⽅法是 execute

 // JDK 1.8 
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    // 1.当前线程数⼩于corePoolSize,则调⽤addWorker创建核⼼线程执⾏任务
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // 2.如果不⼩于corePoolSize,则将任务添加到workQueue队列。
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        // 2.1 如果isRunning返回false(状态检查),则remove这个任务,然后执⾏拒绝策略。
        if (! isRunning(recheck) && remove(command))
            reject(command);
        // 2.2 线程池处于running状态,但是没有线程,则创建线程
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 3.如果放⼊workQueue失败,则创建⾮核⼼线程执⾏任务,
    // 如果这时创建⾮核⼼线程失败(当前线程总数不⼩于maximumPoolSize时),就会执⾏拒绝策略。
    else if (!addWorker(command, false))
        reject(command);
}

为什么要⼆次检查线程池的状态?

在多线程的环境下,线程池的状态是时刻发⽣变化的。很有可能刚获取线程池状态 后线程池状态就改变了。判断是否将 command 加⼊ workqueue 是线程池之前的状态。

倘若没有⼆次检查,万⼀线程池处于⾮RUNNING状态(在多线程环境下很有 可能发⽣),那么command 永远不会执⾏。

总结⼀下处理流程

  1. 线程总数量 < corePoolSize,⽆论线程是否空闲,都会新建⼀个核⼼线程执⾏ 任务(让核⼼线程数量快速达到corePoolSize,在核⼼线程数量 <corePoolSize时)。注意,这⼀步需要获得全局锁。
  2. 线程总数量 >= corePoolSize时,新来的线程任务会进⼊任务队列中等待,然 后空闲的核⼼线程会依次去缓存队列中取任务来执⾏(体现了线程复⽤)。
  3. 当缓存队列满了,说明这个时候任务已经多到爆棚,需要⼀些“临时⼯”来执⾏ 这些任务了。于是会创建⾮核⼼线程去执⾏这个任务。注意,这⼀步需要获得 全局锁
  4. 缓存队列满了, 且总线程数达到了maximumPoolSize,则会采取上⾯提到的拒绝策略进⾏处理。

四种常⻅的线程池

如果看懂了前⾯讲的 ThreadPoolExecutor 构造⽅法中各种参数的意义,那么⼀看到 Executors 类中提供的线程池的源码就应该知道这个线程池是⼲嘛的。

newCachedThreadPool

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
            60L, TimeUnit.SECONDS,
            new SynchronousQueue<Runnable>());
}

核心线程数是 0, 最大线程数是 Integer.MAX_VALUE,救急线程的空闲生存时间是 60s,意味着

  • 全部都是救急线程(60s 后可以回收)

  • 救急线程可以无限创建

队列采用了 SynchronousQueue 实现特点是,它没有容量,没有线程来取是放不进去的

(一手交钱、一手交货)

newFixedThreadPool

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
            0L, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<Runnable>());
}

核心线程数 == 最大线程数(没有救急线程被创建),因此也无需超时时间,阻塞队列是无界的,可以放任意数量的任务

评价 适用于任务量已知,相对耗时的任务

newSingleThreadExecutor

public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
                (new ThreadPoolExecutor(1, 1,
                        0L, TimeUnit.MILLISECONDS,
                        new LinkedBlockingQueue<Runnable>()));
    }

使用场景:

希望多个任务排队执行。线程数固定为 1,任务数多于 1 时,会放入无界队列排队。任务执行完毕,这唯一的线程也不会被释放。

区别:

自己创建一个单线程串行执行任务,如果任务执行失败而终止那么没有任何补救措施,而线程池还会新建一个线程,保证池的正常工作

  • Executors.newSingleThreadExecutor() 线程个数始终为1,不能修改

    FinalizableDelegatedExecutorService 应用的是装饰器模式,只对外暴露了 ExecutorService 接口,因此不能调用ThreadPoolExecutor 中特有的方法

  • Executors.newFixedThreadPool(1) 初始时为1,以后还可以修改

    对外暴露的是 ThreadPoolExecutor 对象,可以强转后调用 setCorePoolSize 等方法进行修改

newScheduledThreadPool

创建⼀个定⻓线程池,⽀持定时及周期性任务执⾏。

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize
	 return new ScheduledThreadPoolExecutor(corePoolSize);
}
//ScheduledThreadPoolExecutor():
public ScheduledThreadPoolExecutor(int corePoolSize) {
     super(corePoolSize, Integer.MAX_VALUE,
     DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
     new DelayedWorkQueue());
}

四种常⻅的线程池基本够我们使⽤了,但是《阿⾥把把开发⼿册》不建议我们直接使⽤Executors类中的线程池,⽽是通过 ThreadPoolExecutor 的⽅式,这样的处理⽅式需要更加明确线程池的运⾏规则,规避资源耗尽的⻛险

提交任务

执行任务

void execute(Runnable command);

ExecutorService pool = Executors.newFixedThreadPool(3);
pool.execute(() -> {
    System.out.println("执行任务");
});

结果

执行任务

提交任务 task

用返回值 Future 获得任务执行结果

Future submit(Callable task);

ExecutorService pool = Executors.newFixedThreadPool(3);
Future<String> future = pool.submit(() -> {
    return "提交任务 task";
});
System.out.println(future.get());

结果

提交任务 task

提交 tasks 中所有任务

List<Future> invokeAll(Collection<? extends Callable> tasks) throws InterruptedException;

ExecutorService pool = Executors.newFixedThreadPool(3);
      List<Future<String>> futures = pool.invokeAll(
              Arrays.asList(
                      () -> {
                          return "1";
                      },
                      () -> {
                          return "2";
                      },
                      () -> {
                          return "3";
                      }));
    futures.forEach(f -> {
        try {
            System.out.println(f.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    });

结果

1
2
3

  • 提交 tasks 中所有任务,带超时时间

List<Future> invokeAll(Collection<? extends Callable> tasks, long timeout, TimeUnit unit) throws InterruptedException;

提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消

T invokeAny(Collection<? extends Callable> tasks)

throws InterruptedException, ExecutionException;

String str =
       	pool.invokeAny(
            Arrays.asList(
                () -> {
                  log.debug("begin");
                  Thread.sleep(1000);
                  return "1";
                },
                () -> {
                  log.debug("begin");
                   Thread.sleep(500);
                  return "2";
                },
                () -> {
                  log.debug("begin");
                  Thread.sleep(2000);
                  return "3";
                }));
  • 提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消,带超时时间

T invokeAny(Collection<? extends Callable> tasks,

long timeout, TimeUnit unit)

throws InterruptedException, ExecutionException, TimeoutException;

关闭线程池

  1. shutdown()

    shutdown只是将线程池的状态设置为SHUTDOWN状态,然后中断所有没有正在执行任务的线程

    • 不会接收新任务
    • 但已提交任务会执行完
    • 此方法不会阻塞调用线程的执行

    源码

    public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            // 修改线程池状态
            advanceRunState(SHUTDOWN);
            // 仅会打断空闲线程
            interruptIdleWorkers();
            onShutdown(); // 扩展点 ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        // 尝试终结(没有运行的线程可以立刻终结,如果还有运行的线程也不会等)
        tryTerminate();
    }
    
  2. shutdownNow

    shutdownNow首先将线程池的状态设置为STOP,然后尝试停止所有的正在执行和未执行任务的线程,并返回等待执行任务的列表;

    • 不会接收新任务
    • 会将队列中的任务返回
    • 并用 interrupt 的方式中断正在执行的任务

    源码

    public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            // 修改线程池状态
            advanceRunState(STOP);
            // 打断所有线程
            interruptWorkers();
            // 获取队列中剩余任务
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
        // 尝试终结
        tryTerminate();
        return tasks; 
    }
    
  3. 其它方法

    • 不在 RUNNING 状态的线程池,此方法就返回 true

    boolean isShutdown();

    • 线程池状态是否是 TERMINATED

      boolean isTerminated();

    • 调用 shutdown 后,由于调用线程并不会等待所有任务运行结束,因此如果它想在线程池 TERMINATED 后做些事情,可以利用此方法等待

      boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;

4、线程池定时任务

如何让每周四 18:00:00 定时执行任务?

//获取当前时间
LocalDateTime now = LocalDateTime.now();
//获取周四时间
LocalDateTime time = now.withHour(18).withMinute(0).withSecond(0).withNano(0).with(DayOfWeek.THURSDAY);
//如果当前时间小于本周周四,必须找下一周周四
if(now.compareTo(time) >= 0) {
    time = time.plusWeeks(1);
}
//initTailDelay当前时间和周四时间的查
long initTailDelay = Duration.between(now, time).toMillis();
//period 一周的时间间隔
long period = 1000*60*60*24*7;
ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
pool.scheduleAtFixedRate(()->{
    System.out.println("running");
},initTailDelay,period, TimeUnit.SECONDS);

5、合理配置线程池参数

创建多少线程池合适

  • 过小会导致程序不能充分地利用系统资源、容易导致饥饿

  • 过大会导致更多的线程上下文切换,占用更多内存

CPU 密集型运算

通常采用 cpu 核数 + 1 能够实现最优的 CPU 利用率,+1 是保证当线程由于页缺失故障(操作系统)或其它原因导致暂停时,额外的这个线程就能顶上去,保证 CPU 时钟周期不被浪费

3.2 I/O 密集型运算

CPU 不总是处于繁忙状态,例如,当你执行业务计算时,这时候会使用 CPU 资源,但当你执行 I/O 操作时、远程RPC 调用时,包括进行数据库操作时,这时候 CPU 就闲下来了,你可以利用多线程提高它的利用率。

经验公式如下

线程数 = 核数 * 期望 CPU 利用率 * 总时间(CPU计算时间+等待时间) / CPU 计算时间

  • 例如 4 核 CPU 计算时间是 50% ,其它等待时间是 50%,期望 cpu 被 100% 利用,套用公式4 * 100% * 100% / 50% = 8

  • 例如 4 核 CPU 计算时间是 10% ,其它等待时间是 90%,期望 cpu 被 100% 利用,套用公式4 * 100% * 100% / 10% = 40

6、线程池的监控

通过线程池提供的参数进行监控。线程池里有一些属性在监控线程池的时候可以使用

  • getTaskCount:线程池已经执行的和未执行的任务总数;
  • getCompletedTaskCount:线程池已完成的任务数量,该值小于等于taskCount;
  • getLargestPoolSize:线程池曾经创建过的最大线程数量。通过这个数据可以知道线程池是否满过,也就是达到了maximumPoolSize;
  • getPoolSize:线程池当前的线程数量;
  • getActiveCount:当前线程池中正在执行任务的线程数量。

通过这些方法,可以对线程池进行监控,在ThreadPoolExecutor类中提供了几个空方法,如beforeExecute方法,afterExecute方法和terminated方法,可以扩展这些方法在执行前或执行后增加一些新的操作,例如统计线程池的执行任务的时间等,可以继承自ThreadPoolExecutor来进行扩展。

12

评论区