多线程 为什么要创建线程池 1 如果系统要运行多个线程,大量反复的启动创建和回收线程会非常占用系统资源,导致性能下降.
创建线程池,可以: 1.降低资源消耗 2.提升响应速度 3.提高
线程池原理
线程池一般由两种角色构成:多个工作线程 和 一个阻塞队列。
1 2 3 4 提交一个任务到线程池中,线程池的处理流程如下: 1.判断线程池里的核心线程是否都在执行任务,如果不是(核心线程空闲或者核心线程没有被创建)则创建一个新的工作线程来执行任务.如果核心线程都在执行任务,则进入下个流程. 2.线程池判断工作队列是否已满,如果工作路径没有满,则新提交的任务储存在这个工作队列里.如果工作队列满了,则进入下个流程. 3.判断线程池里的线程是否都处于工作状态,如果没有,则创建一个新的工作线程来执行任务.如果已经满了,则交给饱和策略来处理这个任务.
线程池的分类 1 2 3 4 5 6 7 public ThreadPoolExecutor (int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) ;
ThreadPoolExecutor是线程池的真正实现, 他通过构造方法的一系列参数,来构成不同配置的线程池。 corePoolSize: 核心池的大小。 当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中 maximumPoolSize: 线程池最大线程数,它表示在线程池中最多能创建多少个线程; keepAliveTime: 表示线程没有任务执行时最多保持多久时间会终止。 unit: 参数keepAliveTime的时间单位,有7种取值,在TimeUnit类中有7种静态属性。 workQueue:一个阻塞队列,提交的任务将会被放到这个队列里。 threadFactory:线程工厂,用来创建线程,主要是为了给线程起名字,默认工厂的线程名字:pool-1-thread-3。 handler:拒绝策略,当线程池里线程被耗尽,且队列也满了的时候会调用。
线程池的创建方法 Java通过Executors(jdk1.5并发包)提供四种线程池,分别为:
newCachedThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。 案例演示:
newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行。
newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 newCachedThreadPool 创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程, 若无可回收,则新建线程。 package cn.qbz.thread;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;public class Test111907 { public static void main (String[] args) { ExecutorService executorService = Executors.newCachedThreadPool(); for (int i = 0 ; i < 10 ; i++) { final int temp = i; executorService.execute(new Runnable () { @Override public void run () { try { Thread.sleep(1000 ); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " i=" + temp); } }); } } } public static ExecutorService newCachedThreadPool () { return new ThreadPoolExecutor (0 , Integer.MAX_VALUE, 60L , TimeUnit.SECONDS, new SynchronousQueue <Runnable>()); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。 package cn.qbz.thread; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class Test111907 { public static void main(String[] args) { ExecutorService executorService = Executors.newFixedThreadPool(3); for (int i = 0; i < 10; i++) { final int temp = i; executorService.execute(new Runnable () { @Override public void run () { try { Thread.sleep (100); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " i=" + temp); } }); } } } public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行。 package cn.qbz.thread; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; public class Test111907 { public static void main(String[] args) { final long begin = System.currentTimeMillis(); ExecutorService executorService = Executors.newScheduledThreadPool(3); for (int i = 0; i < 10; i++) { final int temp = i; final long time = begin; executorService.schedule(new Runnable () { @Override public void run () { try { Thread.sleep (100); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " i=" + temp + " time=" + (System.currentTimeMillis() - time )); } }, 5, TimeUnit.SECONDS); } } } public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); } public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS, new DelayedWorkQueue()); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务, 保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。 public class Test111907 { public static void main(String[] args) { ExecutorService executorService = Executors.newSingleThreadExecutor(); for (int i = 0; i < 10; i++) { final int temp = i; executorService.execute(new Runnable () { @Override public void run () { try { Thread.sleep (100); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " i=" + temp); } }); } } }
关闭线程池 关闭线程池有两种方式:shutdown和shutdownNow,关闭时,会遍历所有的线程,调用它们的interrupt函数中断线程。但这两种方式对于正在执行的线程处理方式不同。
shutdown() 仅停止阻塞队列中等待的线程,那些正在执行的线程就会让他们执行结束。
shutdownNow() 不仅会停止阻塞队列中的线程,而且会停止正在执行的线程。
ThreadPoolExecutor运行机制 当有请求到来时:
若当前实际线程数量 少于 corePoolSize,即使有空闲线程,也会创建一个新的工作线程;
若当前实际线程数量处于corePoolSize和maximumPoolSize之间,并且阻塞队列没满,则任务将被放入阻塞队列中等待执行;
若当前实际线程数量 小于 maximumPoolSize,但阻塞队列已满,则直接创建新线程处理任务;
若当前实际线程数量已经达到maximumPoolSize,并且阻塞队列已满,则使用饱和策略。
设置合理的线程池大小 任务一般可分为:CPU密集型、IO密集型、混合型,对于不同类型的任务需要分配不同大小的线程池。
CPU密集型任务
尽量使用较小的线程池,一般为CPU核心数+1。 因为CPU密集型任务使得CPU使用率很高,若开过多的线程数,只能增加上下文切换的次数,因此会带来额外的开销。
IO密集型任务 可以使用稍大的线程池,一般为2*CPU核心数。 IO密集型任务CPU使用率并不高,因此可以让CPU在等待IO的时候去处理别的任务,充分利用CPU时间。
混合型任务 可以将任务分成IO密集型和CPU密集型任务,然后分别用不同的线程池去处理。 只要分完之后两个任务的执行时间相差不大,那么就会比串行执行来的高效。 因为如果划分之后两个任务执行时间相差甚远,那么先执行完的任务就要等后执行完的任务,最终的时间仍然取决于后执行完的任务,而且还要加上任务拆分与合并的开销,得不偿失。
Executor两级调度模型
在HotSpot虚拟机中,Java中的线程将会被一一映射为操作系统的线程。 在Java虚拟机层面,用户将多个任务提交给Executor框架,Executor负责分配线程执行它们; 在操作系统层面,操作系统再将这些线程分配给处理器执行。
Executor结构
Executor框架中的所有类可以分成三类:
任务 任务有两种类型:Runnable和Callable。
任务执行器 Executor框架最核心的接口是Executor,它表示任务的执行器。 Executor的子接口为ExecutorService。 ExecutorService有两大实现类:ThreadPoolExecutor和ScheduledThreadPoolExecutor。
执行结果 Future接口表示异步的执行结果,它的实现类为FutureTask。
线程池 Executors工厂类可以创建四种类型的线程池,通过Executors.newXXX即可创建。
1. FixedThreadPool
1 2 3 public static ExecutorService newFixedThreadPool(int nThreads){ return new ThreadPoolExecutor(nThreads,nThreads,0L,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()); }
它是一种固定大小的线程池;
corePoolSize和maximunPoolSize都为用户设定的线程数量nThreads;
keepAliveTime为0,意味着一旦有多余的空闲线程,就会被立即停止掉;但这里keepAliveTime无效;
阻塞队列采用了LinkedBlockingQueue,它是一个无界队列;
由于阻塞队列是一个无界队列,因此永远不可能拒绝任务;
由于采用了无界队列,实际线程数量将永远维持在nThreads,因此maximumPoolSize和keepAliveTime将无效。
2. CachedThreadPool
1 2 3 4 public static ExecutorService newCachedThreadPool(){ return new ThreadPoolExecutor(0,Integer.MAX_VALUE,60L,TimeUnit.MILLISECONDS,new SynchronousQueue<Runnable>()); } 123
它是一个可以无限扩大的线程池;
它比较适合处理执行时间比较小的任务;
corePoolSize为0,maximumPoolSize为无限大,意味着线程数量可以无限大;
keepAliveTime为60S,意味着线程空闲时间超过60S就会被杀死;
采用SynchronousQueue装等待的任务,这个阻塞队列没有存储空间,这意味着只要有请求到来,就必须要找到一条工作线程处理他,如果当前没有空闲的线程,那么就会再创建一条新的线程。
3. SingleThreadExecutor
1 2 3 4 public static ExecutorService newSingleThreadExecutor(){ return new ThreadPoolExecutor(1,1,0L,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()); } 123
它只会创建一条工作线程处理任务;
采用的阻塞队列为LinkedBlockingQueue;
4. ScheduledThreadPool
它用来处理延时任务或定时任务。
它接收SchduledFutureTask类型的任务,有两种提交任务的方式:
scheduledAtFixedRate
scheduledWithFixedDelay
SchduledFutureTask接收的参数:
time:任务开始的时间
sequenceNumber:任务的序号
period:任务执行的时间间隔
它采用DelayQueue存储等待的任务
DelayQueue内部封装了一个PriorityQueue,它会根据time的先后时间排序,若time相同则根据sequenceNumber排序;
DelayQueue也是一个无界队列;
工作线程的执行过程:
工作线程会从DelayQueue取已经到期的任务去执行;
执行结束后重新设置任务的到期时间,再次放回DelayQueue
微信公众号:每日学习干货