1.Java线程池实现原理及其在美团业务中的源码实践
2.Java中Timer类的schedule方法开始计时时间是不是创建timer对象的时刻
3.å¦ä½ç¨mavenå°java8åç代ç ç¼è¯ä¸ºjava6å¹³å°ç
4.OpenJDK17-JVM 源码阅读 - ZGC - 并发标记 | 京东物流技术团队
5.如何实现定时任务- Java Timer/TimerTask 源码解析
6.Timer & TimerTask 源码分析
Java线程池实现原理及其在美团业务中的实践
随着计算机行业的飞速发展,摩尔定律逐渐失效,源码多核CPU成为主流。源码使用多线程并行计算逐渐成为开发人员提升服务器性能的源码基本武器。J.U.C提供的源码线程池ThreadPoolExecutor类,帮助开发人员管理线程并方便地执行并行任务。源码机构通道源码了解并合理使用线程池,源码是源码一个开发人员必修的基本功。本文开篇简述了线程池概念和用途,源码接着结合线程池的源码源码,帮助大家领略线程池的源码设计思路,最后回归实践,源码通过案例讲述使用线程池遇到的源码问题,并给出了一种动态化线程池解决方案。源码一、源码写在前面
1.1 线程池是什么
线程池(Thread Pool)是一种基于池化思想管理线程的工具,经常出现在多线程服务器中,如MySQL。线程过多会带来额外的开销,包括创建销毁线程的开销、调度线程的开销等,同时也降低了计算机的整体性能。线程池维护多个线程,等待监督管理者分配可并发执行的任务。这种做法一方面避免了处理任务时创建销毁线程开销的代价,另一方面避免了线程数量膨胀导致的过分调度问题,保证了对内核的充分利用。本文描述的线程池是JDK中提供的ThreadPoolExecutor类。
1.2 线程池解决的问题是什么
线程池解决的核心问题就是资源管理问题。在并发环境下,系统不能确定在任意时刻有多少任务需要执行,有多少资源需要投入。这种不确定性将带来以下问题:资源分配问题、线程调度问题等。线程池采用了“池化”思想来解决这些问题。Pooling是将资源统一管理的一种思想,不仅能应用在计算机领域,还在金融、设备、人员管理、工作管理等领域有相关应用。在计算机领域,表现为统一管理IT资源,包括服务器、存储、网络等,通过共享资源在低投入中获益。
二、线程池核心设计与实现
Java中的线程池核心实现类是ThreadPoolExecutor,本文基于JDK 1.8的源码来分析线程池的核心设计与实现。首先,我们通过ThreadPoolExecutor的UML类图了解其继承关系,然后深入探讨其设计与实现。
2.1 总体设计
ThreadPoolExecutor实现的顶层接口是Executor,提供了一种思想:将任务提交和任务执行进行解耦。用户只需提供Runnable对象,fastboot 源码将任务的运行逻辑提交到执行器(Executor)中,由Executor框架完成线程的调配和任务的执行。ExecutorService接口增加了能力,如补充可以为一个或一批异步任务生成Future的方法以及提供管控线程池的方法,如停止线程池运行。
AbstractExecutorService是上层的抽象类,将执行任务的流程串联起来,保证下层实现只需关注执行任务的方法。ThreadPoolExecutor作为最下层的实现类,实现最复杂的运行部分,负责维护自身的生命周期和管理线程与任务,使两者结合执行并行任务。
ThreadPoolExecutor运行机制分为任务管理和线程管理两部分。任务管理充当生产者的角色,线程池会根据任务的流转决定执行流程。线程管理是消费者,维护线程池内的线程,根据任务请求进行线程分配。
2.2 生命周期管理
线程池运行状态由内部维护,使用变量控制线程池的运行状态和有效线程数量。线程池内部使用AtomicInteger存储关键参数,实现线程池运行状态和线程数量的高效管理。线程池提供方法供用户获取当前运行状态和线程数量,通过位运算实现快速计算。
ThreadPoolExecutor的运行状态有五种,包含生命周期转换。
2.3 任务执行机制
2.3.1 任务调度
任务调度是线程池核心入口,用户提交任务后,决定任务执行流程。通过execute方法完成检查线程池状态、运行线程数和运行策略,决定执行流程,如直接申请线程执行或缓冲到队列执行,或直接拒绝任务。执行流程如下。
2.3.2 任务缓冲
任务缓冲模块实现任务和线程的管理,通过生产者消费者模式和阻塞队列实现。阻塞队列缓存任务,工作线程从队列中获取任务。
2.3.3 任务申请
任务执行有两种可能:直接由新创建的线程执行或从队列中获取任务执行。线程从任务缓存模块不断获取任务,通过getTask方法实现线程管理和任务管理之间的通信。
2.3.4 任务拒绝
任务拒绝策略保护线程池,实现拒绝策略接口定制策略或选择JDK提供的四种已有策略。拒绝策略特点如下。
2.4 Worker线程管理
2.4.1 Worker线程
Worker线程实现Runnable接口,持有线程和任务,通过构造方法创建。Worker线程执行任务模型如下,线程池通过AQS实现独占锁,控制线程生命周期,回收线程。
2.4.2 Worker线程增加
Worker线程增加通过addWorker方法实现,增加线程时考虑线程池状态,策略在上一步完成,opendaylight 源码仅完成增加线程并运行,最后返回成功结果。方法参数包括firstTask和core,用于指定任务和线程策略。
2.4.3 Worker线程回收
Worker线程回收依赖JVM自动回收,线程池维护线程引用,通过添加和移除引用控制线程生命周期。Worker被创建后,不断获取任务执行,核心线程无限等待,非核心线程限时获取。当无法获取任务时,循环结束,Worker主动移除自身引用。
2.4.4 Worker线程执行任务
Worker线程执行任务通过runWorker方法实现,执行流程如下。
三、线程池在业务中的实践
业务实践中,线程池用于获取并发性,提供典型场景和问题解决方案。
3.1 业务背景
互联网业界追求CPU多核性能,通过线程池管理线程获取并发性。常见场景包括快速响应用户请求和快速处理批量任务。
3.2 实际问题及方案思考
线程池使用面临核心问题:参数配置困难。调研替代方案、参数设置合理性以及线程池参数动态化,动态化线程池提供简单有效的方法解决参数修改成本问题。
3.3 动态化线程池
动态化线程池设计包括整体设计、功能架构,提供参数动态化、监控和告警能力。动态化线程池允许用户在管理平台上修改参数,实时生效,并监控线程池负载、任务执行情况,提供任务级别监控和运行时状态查看。
3.4 实践总结
面对使用线程池的实际问题,动态化线程池提供成本效益平衡的解决方案,降低故障发生的概率,适用于业务需求。
四、参考资料
1. JDK 1.8 源码
2. 维基百科-线程池
3. 更好的使用Java线程池
4. 维基百科Pooling(Resource Management)
5. 深入理解Java线程池:ThreadPoolExecutor
6. 《Java并发编程实践》
Java中Timer类的schedule方法开始计时时间是不是创建timer对象的时刻
通过查看JDK源码可以知道如下:
public void schedule(TimerTask task, long delay) {
if (delay < 0)
throw new IllegalArgumentException("Negative delay.");
sched(task, System.currentTimeMillis()+delay, 0);
}
实际上调用的是如下方法:
private void sched(TimerTask task, long time, long period)
//task:安排的任务,the scheduled task;
//time:指定的时间去执行
* at the specified time with the specified period, in milliseconds.
//period:如果period是正数,则会重复执行任务,如果是零则只执行任务一次
If period is
* positive, the task is scheduled for repeated execution;
* zero, the task is scheduled for one-time execution.
因此可以分析到 中间书写的语句不会算在延迟时间中,程序的开始时刻就是执行到timer.schedule(new Task(),);//语句时,开始计时。而Timer timer = new Timer() 只是创建了一个Timer类对象。
只有程序执行到timer.schedule(new Task(),)时,才会调用
sched(task, System.currentTimeMillis()+delay, 0)方法,而这时该方法
才去执行System.currentTimeMillis取得当前时间,并将该任务加到TaskQueue队列中
(自带任务队列),经过System.currentTimeMillis+毫秒后根据指定状态执行指定任务.
å¦ä½ç¨mavenå°java8åç代ç ç¼è¯ä¸ºjava6å¹³å°ç
ããå¨ä¸è¬çJavaåºç¨å¼åè¿ç¨ä¸ï¼å¼å人å使ç¨Javaçæ¹å¼æ¯è¾ç®åãæå¼æ¯ç¨çIDEï¼ç¼åJavaæºä»£ç ï¼åå©ç¨IDEæä¾çåè½ç´æ¥è¿è¡Java ç¨åºå°±å¯ä»¥äºãè¿ç§å¼å模å¼èåçè¿ç¨æ¯ï¼å¼å人åç¼åçæ¯Javaæºä»£ç æ件ï¼.javaï¼ï¼IDEä¼è´è´£è°ç¨Javaçç¼è¯å¨æJavaæºä»£ç ç¼è¯æå¹³å°æ å ³çåè代ç ï¼byte codeï¼ï¼ä»¥ç±»æ件çå½¢å¼ä¿åå¨ç£çä¸ï¼.classï¼ãJavaèææºï¼JVMï¼ä¼è´è´£æJavaåè代ç å 载并æ§è¡ãJavaéè¿è¿ç§æ¹å¼æ¥å®ç°å ¶âç¼åä¸æ¬¡ï¼å°å¤è¿è¡ï¼Write once, run anywhereï¼â çç®æ ãJavaç±»æ件ä¸å å«çåè代ç å¯ä»¥è¢«ä¸åå¹³å°ä¸çJVMæ使ç¨ãJavaåè代ç ä¸ä» å¯ä»¥ä»¥æ件形å¼åå¨äºç£çä¸ï¼ä¹å¯ä»¥éè¿ç½ç»æ¹å¼æ¥ä¸è½½ï¼è¿å¯ä»¥åªåå¨äºå åä¸ãJVMä¸çç±»å è½½å¨ä¼è´è´£ä»å å«åè代ç çåèæ°ç»ï¼byte[]ï¼ä¸å®ä¹åºJavaç±»ãå¨æäºæ åµä¸ï¼å¯è½ä¼éè¦å¨æççæ Javaåè代ç ï¼ææ¯å¯¹å·²æçJavaåè代ç è¿è¡ä¿®æ¹ãè¿ä¸ªæ¶åå°±éè¦ç¨å°æ¬æä¸å°è¦ä»ç»çç¸å ³ææ¯ãé¦å ä»ç»ä¸ä¸å¦ä½å¨æç¼è¯Javaæºæ件ã
ããå¨æç¼è¯Javaæºæ件
ããå¨ä¸è¬æ åµä¸ï¼å¼å人åé½æ¯å¨ç¨åºè¿è¡ä¹åå°±ç¼åå®æäºå ¨é¨çJavaæºä»£ç 并ä¸æåç¼è¯ã对æäºåºç¨æ¥è¯´ï¼Javaæºä»£ç çå 容å¨è¿è¡æ¶å»æè½ç¡®å®ãè¿ä¸ªæ¶åå°±éè¦å¨æç¼è¯æºä»£ç æ¥çæJavaåè代ç ï¼åç±JVMæ¥å è½½æ§è¡ãå ¸åçåºæ¯æ¯å¾å¤ç®æ³ç«èµçå¨çº¿è¯æµç³»ç»ï¼å¦PKU JudgeOnlineï¼ï¼å 许ç¨æ·ä¸ä¼ Java代ç ï¼ç±ç³»ç»å¨åå°ç¼è¯ãè¿è¡å¹¶è¿è¡å¤å®ãå¨å¨æç¼è¯Javaæºæ件æ¶ï¼ä½¿ç¨çåæ³æ¯ç´æ¥å¨ç¨åºä¸è°ç¨Javaç¼è¯å¨ã
ããJSR å¼å ¥äºJavaç¼è¯å¨APIãå¦æ使ç¨JDK 6çè¯ï¼å¯ä»¥éè¿æ¤APIæ¥å¨æç¼è¯Java代ç ãæ¯å¦ä¸é¢ç代ç ç¨æ¥å¨æç¼è¯æç®åçHello Worldç±»ã该Javaç±»ç代ç æ¯ä¿åå¨ä¸ä¸ªå符串ä¸çã
ãã public class CompilerTest {
ãã public static void main(String[] args) throws Exception {
ãã String source = "public class Main { public static void main(String[] args) { System.out.println(\"Hello World!\");} }";
ãã JavaCompiler compiler = ToolProvider.getSystemJavaCompiler();
ãã StandardJavaFileManager fileManager = compiler.getStandardFileManager(null, null, null);
ãã StringSourceJavaObject sourceObject = newCompilerTest.StringSourceJavaObject("Main", source);
ãã Iterable< extends JavaFileObject> fileObjects = Arrays.asList(sourceObject);
ãã CompilationTask task = compiler.getTask(null, fileManager, null,null, null, fileObjects);
ãã boolean result = task.call();
ãã if (result) {
ãã System.out.println("ç¼è¯æåã");
ãã }
ãã }
ãã
ãã static class StringSourceJavaObject extends SimpleJavaFileObject {
ãã
ãã private String content = null;
ãã public StringSourceJavaObject(String name, String content) ?throwsURISyntaxException {
ãã super(URI.create("string:///" + name.replace('.','/') + Kind.SOURCE.extension), Kind.SOURCE);
ãã this.content = content;
ãã }
ãã
ãã public CharSequence getCharContent(boolean ignoreEncodingErrors) ?throws IOException {
ãã return content;
ãã }
ãã }
ãã }
ããå¦æä¸è½ä½¿ç¨JDK 6æä¾çJavaç¼è¯å¨APIçè¯ï¼å¯ä»¥ä½¿ç¨JDKä¸çå·¥å ·ç±»com.sun.tools.javac.Mainï¼ä¸è¿è¯¥å·¥å ·ç±»åªè½ç¼è¯åæ¾å¨ç£çä¸çæ件ï¼ç±»ä¼¼äºç´æ¥ä½¿ç¨javacå½ä»¤ã
ããå¦å¤ä¸ä¸ªå¯ç¨çå·¥å ·æ¯Eclipse JDT Coreæä¾çç¼è¯å¨ãè¿æ¯Eclipse Javaå¼åç¯å¢ä½¿ç¨çå¢éå¼Javaç¼è¯å¨ï¼æ¯æè¿è¡åè°è¯æé误ç代ç ã该ç¼è¯å¨ä¹å¯ä»¥åç¬ä½¿ç¨ãPlayæ¡æ¶å¨å é¨ä½¿ç¨äºJDTçç¼è¯å¨æ¥å¨æç¼è¯Javaæºä»£ç ãå¨å¼å模å¼ä¸ï¼Playæ¡æ¶ä¼å®ææ«æ项ç®ä¸çJavaæºä»£ç æ件ï¼ä¸æ¦åç°æä¿®æ¹ï¼ä¼èªå¨ç¼è¯ Javaæºä»£ç ãå æ¤å¨ä¿®æ¹ä»£ç ä¹åï¼å·æ°é¡µé¢å°±å¯ä»¥çå°ååã使ç¨è¿äºå¨æç¼è¯çæ¹å¼çæ¶åï¼éè¦ç¡®ä¿JDKä¸çtools.jarå¨åºç¨ç CLASSPATHä¸ã
ããä¸é¢ä»ç»ä¸ä¸ªä¾åï¼æ¯å ³äºå¦ä½å¨Javaéé¢åååè¿ç®ï¼æ¯å¦æ±åºæ¥(3+4)*7-çå¼ãä¸è¬çåæ³æ¯åæè¾å ¥çè¿ç®è¡¨è¾¾å¼ï¼èªå·±æ¥æ¨¡æ计ç®è¿ç¨ãèèå°æ¬å·çåå¨åè¿ç®ç¬¦çä¼å 级çé®é¢ï¼è¿æ ·ç计ç®è¿ç¨ä¼æ¯è¾å¤æï¼èä¸å®¹æåºéãå¦å¤ä¸ç§åæ³æ¯å¯ä»¥ç¨JSR å¼å ¥çèæ¬è¯è¨æ¯æï¼ç´æ¥æè¾å ¥ç表达å¼å½åJavaScriptææ¯JavaFXèæ¬æ¥æ§è¡ï¼å¾å°ç»æãä¸é¢ç代ç 使ç¨çåæ³æ¯å¨æçæJavaæºä»£ç 并ç¼è¯ï¼æ¥çå è½½Javaç±»æ¥æ§è¡å¹¶è·åç»æãè¿ç§åæ³å®å ¨ä½¿ç¨Javaæ¥å®ç°ã
ãã private static double calculate(String expr) throws CalculationException {
ãã String className = "CalculatorMain";
ãã String methodName = "calculate";
ãã String source = "public class " + className
ãã + " { public static double " + methodName + "() { return " + expr +"; } }";
ãã //çç¥å¨æç¼è¯Javaæºä»£ç çç¸å ³ä»£ç ï¼åè§ä¸ä¸è
ãã boolean result = task.call();
ãã if (result) {
ãã ClassLoader loader = Calculator.class.getClassLoader();
ãã try {
ãã Class<?> clazz = loader.loadClass(className);
ãã Method method = clazz.getMethod(methodName, new Class<?>[] { });
ãã Object value = method.invoke(null, new Object[] { });
ãã return (Double) value;
ãã } catch (Exception e) {
ãã throw new CalculationException("å é¨é误ã");
ãã }
ãã } else {
ãã throw new CalculationException("é误ç表达å¼ã");
ãã }
ãã }
ããä¸é¢ç代ç ç»åºäºä½¿ç¨å¨æçæçJavaåè代ç çåºæ¬æ¨¡å¼ï¼å³éè¿ç±»å è½½å¨æ¥å è½½åè代ç ï¼å建Javaç±»ç对象çå®ä¾ï¼åéè¿Javaåå°APIæ¥è°ç¨å¯¹è±¡ä¸çæ¹æ³ã
ããJavaåè代ç å¢å¼º
ããJava åè代ç å¢å¼ºæçæ¯å¨Javaåè代ç çæä¹åï¼å¯¹å ¶è¿è¡ä¿®æ¹ï¼å¢å¼ºå ¶åè½ãè¿ç§åæ³ç¸å½äºå¯¹åºç¨ç¨åºçäºè¿å¶æ件è¿è¡ä¿®æ¹ãå¨å¾å¤Javaæ¡æ¶ä¸é½å¯ä»¥è§å°è¿ç§å®ç°æ¹å¼ãJavaåè代ç å¢å¼ºé常ä¸Javaæºæ件ä¸ç注解ï¼annotationï¼ä¸å使ç¨ã注解å¨Javaæºä»£ç ä¸å£°æäºéè¦å¢å¼ºçè¡ä¸ºåç¸å ³çå æ°æ®ï¼ç±æ¡æ¶å¨è¿è¡æ¶å»å®æ对åè代ç çå¢å¼ºãJavaåè代ç å¢å¼ºåºç¨çåºæ¯æ¯è¾å¤ï¼ä¸è¬é½éä¸å¨åå°åä½ä»£ç å对å¼å人åå±è½åºå±çå®ç°ç»èä¸ãç¨è¿JavaBeansç人å¯è½å¯¹å ¶ä¸é£äºå¿ 须添å çgetter/setteræ¹æ³æå°å¾ç¹çï¼å¹¶ä¸é¾ä»¥ç»´æ¤ãèéè¿åè代ç å¢å¼ºï¼å¼å人ååªéè¦å£°æBeanä¸çå±æ§å³å¯ï¼getter/setteræ¹æ³å¯ä»¥éè¿ä¿®æ¹åè代ç æ¥èªå¨æ·»å ãç¨è¿JPAç人ï¼å¨è°è¯ç¨åºçæ¶åï¼ä¼åç°å®ä½ç±»ä¸è¢«æ·»å äºä¸äºé¢å¤ç ååæ¹æ³ãè¿äºååæ¹æ³æ¯å¨è¿è¡æ¶å»ç±JPAçå®ç°å¨ææ·»å çãåè代ç å¢å¼ºå¨é¢åæ¹é¢ç¼ç¨ï¼AOPï¼çä¸äºå®ç°ä¸ä¹æ使ç¨ã
OpenJDK-JVM 源码阅读 - ZGC - 并发标记 | 京东物流技术团队
ZGC简介:
ZGC是Java垃圾回收器的前沿技术,支持低延迟、大容量堆、染色指针、读屏障等特性,myie 源码自JDK起作为试验特性,JDK起支持Windows,JDK正式投入生产使用。在JDK中已实现分代收集,预计不久将发布,性能将更优秀。
ZGC特征:
1. 低延迟
2. 大容量堆
3. 染色指针
4. 读屏障
并发标记过程:
ZGC并发标记主要分为三个阶段:初始标记、并发标记/重映射、重分配。本篇主要分析并发标记/重映射部分源代码。
入口与并发标记:
整个ZGC源码入口是ZDriver::gc函数,其中concurrent()是一个宏定义。并发标记函数是concurrent_mark。
并发标记流程:
从ZHeap::heap()进入mark函数,使用任务框架执行任务逻辑在ZMarkTask里,具体执行函数是work。工作逻辑循环从标记条带中取出数据,直到取完或时间到。此循环即为ZGC三色标记主循环。之后进入drain函数,从栈中取出指针进行标记,直到栈排空。标记过程包括从栈取数据,标记和递归标记。
标记与迭代:
标记过程涉及对象迭代遍历。标记流程中,ZGC通过map存储对象地址的finalizable和inc_live信息。map大小约为堆中对象对齐大小的二分之一。接着通过oop_iterate函数对对象中的指针进行迭代,使用ZMarkBarrierOopClosure作为读屏障,实现了指针自愈和防止漏标。
读屏障细节:
ZMarkBarrierOopClosure函数在标记非静态成员变量的指针时触发读屏障。慢路径处理和指针自愈是核心逻辑,慢路径标记指针,快速路径通过cas操作修复坏指针,并重新标记。
重映射过程:
读屏障触发标记后,对象被推入栈中,下次标记循环时取出。ZGC并发标记流程至此结束。
问题回顾:
本文解答了ZGC如何标记指针、三色标记过程、如何防止漏标、指针自愈和并发重映射过程的问题。
扩展思考:
ZGC在指针上标记,当回收某个region时,如何得知对象是否存活?答案需要结合标记阶段和重分配阶段的代码。
结束语:
本文深入分析了ZGC并发标记的源码细节,对您有启发或帮助的话,请多多点赞支持。作者:京东物流 刘家存,来源:京东云开发者社区 自猿其说 Tech。转载请注明来源。
如何实现定时任务- Java Timer/TimerTask 源码解析
日常实现各种服务端系统时,我们一定会有一些定时任务的pchunter源码需求。比如会议提前半小时自动提醒,异步任务定时/周期执行等。那么如何去实现这样的一个定时任务系统呢? Java JDK提供的Timer类就是一个很好的工具,通过简单的API调用,我们就可以实现定时任务。
现在就来看一下java.util.Timer是如何实现这样的定时功能的。
首先,我们来看一下一个使用demo
基本的使用方法:
加入任务的API如下:
可以看到API方法内部都是调用sched方法,其中time参数下一次任务执行时间点,是通过计算得到。period参数为0的话则表示为一次性任务。
那么我们来看一下Timer内部是如何实现调度的。
内部结构
先看一下Timer的组成部分:
Timer有3个重要的模块,分别是 TimerTask, TaskQueue, TimerThread
那么,在加入任务之后,整个Timer是怎么样运行的呢?可以看下面的示意图:
图中所示是简化的逻辑,多个任务加入到TaskQueue中,会自动排序,队首任务一定是当前执行时间最早的任务。TimerThread会有一个一直执行的循环,从TaskQueue取队首任务,判断当前时间是否已经到了任务执行时间点,如果是则执行任务。
工作线程
流程中加了一些锁,用来避免同时加入TimerTask的并发问题。可以看到sched方法的逻辑比较简单,task赋值之后入队,队列会自动按照nextExecutionTime排序(升序,排序的实现原理后面会提到)。
从mainLoop的源码中可以看出,基本的流程如下所示
当发现是周期任务时,会计算下一次任务执行的时间,这个时候有两种计算方式,即前面API中的
优先队列
当从队列中移除任务,或者是修改任务执行时间之后,队列会自动排序。始终保持执行时间最早的任务在队首。 那么这是如何实现的呢?
看一下TaskQueue的源码就清楚了
可以看到其实TaskQueue内部就是基于数组实现了一个最小堆 (balanced binary heap), 堆中元素根据 执行时间nextExecutionTime排序,执行时间最早的任务始终会排在堆顶。这样工作线程每次检查的任务就是当前最早需要执行的任务。堆的初始大小为,有简单的倍增扩容机制。
TimerTask 任务有四种状态:
Timer 还提供了cancel和purge方法
常见应用
Java的Timer广泛被用于实现异步任务系统,在一些开源项目中也很常见, 例如消息队列RocketMQ的 延时消息/消费重试 中的异步逻辑。
上面这段代码是RocketMQ的延时消息投递任务 ScheduleMessageService 的核心逻辑,就是使用了Timer实现的异步定时任务。
不管是实现简单的异步逻辑,还是构建复杂的任务系统,Java的Timer确实是一个方便实用,而且又稳定的工具类。从Timer的实现原理,我们也可以窥见定时系统的一个基础实现:线程循环 + 优先队列。这对于我们自己去设计相关的系统,也会有一定的启发。
Timer & TimerTask 源码分析
尽管 Timer 已经在现代 Java 开发中鲜少使用,但其内部结构对理解和实现自动化流程有着重要参考价值。这篇源码分析着重于 Timer 和 TimerTask 的工作原理,它们通过维护一个 TaskQueue,确保任务按照预设时间执行,其中的并发处理策略对初学者极具启发性。
在 Timer 类中,每个 Timer 实例对应一个单独的线程,这可能导致任务执行顺序受阻。Timer 的生命周期不确定,任务完成后可能不会立即回收,而 ScheduledThreadPoolExecutor 是推荐的替代方案。Timer 是线程安全的,但不保证任务执行的实时性,而是依赖于 wait() 等待机制。TaskQueue 是 TimerThread 的核心,它负责调度任务的执行。
TimerThread 是负责执行任务的线程,继承自 Thread,其简洁的实现表明了其功能的专注。Timer 的构造器和 schedule 方法提供多种重载形式,而 sched 方法是它们的最终调用者。TimerTask 是一个抽象类,实现了 Runnable,用户需创建其子类并覆盖 run 方法,定义了任务的状态标识和执行时间属性。
尽管 Timer 已经过时,但理解其内部机制有助于在需要定时任务的场景中找到更高效、可靠的解决方案。
企业在线考试系统源码
企业在线考试系统的源码设计是提升培训效果的关键。本文主要探讨了基于Java技术和中间件构建高效、防作弊的在线考试平台的过程。首先,丰富的试题库通过Java实现批量导入和管理;试卷组卷功能支持固定和随机选项,同样由Java代码驱动。考试任务的设置,如考试次数限制,由Java实体类和Repository接口来设定,如ExamTask实体。
为了保证公平,系统内置防作弊措施,通过Java实现复杂的数据验证和监控。考试结束后,自动成绩评估和报告生成功能为管理者提供详尽信息。系统能够与HRM和LMS等其他系统无缝集成,实现数据同步,且支持二次开发以满足个性化需求。
Java中间件在这个过程中扮演重要角色,如Spring Boot简化了开发,Apache Kafka处理实时数据,RabbitMQ负责异步任务。例如,Spring Boot的内置服务器简化应用部署,Kafka确保日志处理高效,RabbitMQ则用于处理消息传递。通过这些技术,企业可以构建出稳定且可扩展的在线考试平台,如内训宝企业在线培训平台,助力企业提高培训效率和公平性。
这篇文章希望能为企业构建在线考试系统提供实用指导。如有任何疑问或需求,欢迎随时咨询。
java 线程池 工作队列是如何工作的
使用线程池的好处1、降低资源消耗
可以重复利用已创建的线程降低线程创建和销毁造成的消耗。
2、提高响应速度
当任务到达时,任务可以不需要等到线程创建就能立即执行。
3、提高线程的可管理性
线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控
线程池的工作原理
首先我们看下当一个新的任务提交到线程池之后,线程池是如何处理的
1、线程池判断核心线程池里的线程是否都在执行任务。如果不是,则创建一个新的工作线程来执行任务。如果核心线程池里的线程都在执行任务,则执行第二步。
2、线程池判断工作队列是否已经满。如果工作队列没有满,则将新提交的任务存储在这个工作队列里进行等待。如果工作队列满了,则执行第三步
3、线程池判断线程池的线程是否都处于工作状态。如果没有,则创建一个新的工作线程来执行任务。如果已经满了,则交给饱和策略来处理这个任务
线程池饱和策略
这里提到了线程池的饱和策略,那我们就简单介绍下有哪些饱和策略:
AbortPolicy
为Java线程池默认的阻塞策略,不执行此任务,而且直接抛出一个运行时异常,切记ThreadPoolExecutor.execute需要try catch,否则程序会直接退出。
DiscardPolicy
直接抛弃,任务不执行,空方法
DiscardOldestPolicy
从队列里面抛弃head的一个任务,并再次execute 此task。
CallerRunsPolicy
在调用execute的线程里面执行此command,会阻塞入口
用户自定义拒绝策略(最常用)
实现RejectedExecutionHandler,并自己定义策略模式
下我们以ThreadPoolExecutor为例展示下线程池的工作流程图
1.jpg
2.jpg
1、如果当前运行的线程少于corePoolSize,则创建新线程来执行任务(注意,执行这一步骤需要获取全局锁)。
2、如果运行的线程等于或多于corePoolSize,则将任务加入BlockingQueue。
3、如果无法将任务加入BlockingQueue(队列已满),则在非corePool中创建新的线程来处理任务(注意,执行这一步骤需要获取全局锁)。
4、如果创建新线程将使当前运行的线程超出maximumPoolSize,任务将被拒绝,并调用RejectedExecutionHandler.rejectedExecution()方法。
ThreadPoolExecutor采取上述步骤的总体设计思路,是为了在执行execute()方法时,尽可能地避免获取全局锁(那将会是一个严重的可伸缩瓶颈)。在ThreadPoolExecutor完成预热之后(当前运行的线程数大于等于corePoolSize),几乎所有的execute()方法调用都是执行步骤2,而步骤2不需要获取全局锁。
线程池只是并发编程中的一小部分,下图是史上最全面的Java的并发编程学习技术总汇
3.jpg
关键方法源码分析
我们看看核心方法添加到线程池方法execute的源码如下:
// //Executes the given task sometime in the future. The task //may execute in a new thread or in an existing pooled thread. // // If the task cannot be submitted for execution, either because this // executor has been shutdown or because its capacity has been reached, // the task is handled by the current { @code RejectedExecutionHandler}. // // @param command the task to execute // @throws RejectedExecutionException at discretion of // { @code RejectedExecutionHandler}, if the task // cannot be accepted for execution // @throws NullPointerException if { @code command} is null // public void execute(Runnable command) { if (command == null) throw new NullPointerException(); // // Proceed in 3 steps: // // 1. If fewer than corePoolSize threads are running, try to // start a new thread with the given command as its first // task. The call to addWorker atomically checks runState and // workerCount, and so prevents false alarms that would add // threads when it shouldn't, by returning false. // 翻译如下: // 判断当前的线程数是否小于corePoolSize如果是,使用入参任务通过addWord方法创建一个新的线程, // 如果能完成新线程创建exexute方法结束,成功提交任务 // 2. If a task can be successfully queued, then we still need // to double-check whether we should have added a thread // (because existing ones died since last checking) or that // the pool shut down since entry into this method. So we // recheck state and if necessary roll back the enqueuing if // stopped, or start a new thread if there are none. // 翻译如下: // 在第一步没有完成任务提交;状态为运行并且能否成功加入任务到工作队列后,再进行一次check,如果状态 // 在任务加入队列后变为了非运行(有可能是在执行到这里线程池shutdown了),非运行状态下当然是需要 // reject;然后再判断当前线程数是否为0(有可能这个时候线程数变为了0),如是,新增一个线程; // 3. If we cannot queue task, then we try to add a new // thread. If it fails, we know we are shut down or saturated // and so reject the task. // 翻译如下: // 如果不能加入任务到工作队列,将尝试使用任务新增一个线程,如果失败,则是线程池已经shutdown或者线程池 // 已经达到饱和状态,所以reject这个他任务 // int c = ctl.get(); // 工作线程数小于核心线程数 if (workerCountOf(c) < corePoolSize) { // 直接启动新线程,true表示会再次检查workerCount是否小于corePoolSize if (addWorker(command, true)) return; c = ctl.get(); } // 如果工作线程数大于等于核心线程数 // 线程的的状态未RUNNING并且队列notfull if (isRunning(c) && workQueue.offer(command)) { // 再次检查线程的运行状态,如果不是RUNNING直接从队列中移除 int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) // 移除成功,拒绝该非运行的任务 reject(command); else if (workerCountOf(recheck) == 0) // 防止了SHUTDOWN状态下没有活动线程了,但是队列里还有任务没执行这种特殊情况。 // 添加一个null任务是因为SHUTDOWN状态下,线程池不再接受新任务 addWorker(null, false); } // 如果队列满了或者是非运行的任务都拒绝执行 else if (!addWorker(command, false)) reject(command); }
深入理解 RxJava2:Scheduler(2)
欢迎来到深入理解 RxJava2 系列第二篇,本文基于 RxJava 2.2.0 正式版源码,将探讨 Scheduler 与 Worker 的概念及其实现原理。
Scheduler 与 Worker 在 RxJava2 中扮演着至关重要的角色,它们是线程调度的核心与基石。虽然 Scheduler 的作用较为熟悉,但 Worker 的概念了解的人可能较少。为何在已有 Scheduler 的情况下,还要引入 Worker 的概念呢?让我们继续探讨。
首先,Scheduler 的核心定义是调度 Runnable,支持立即、延时和周期性调用。而 Worker 是任务的最小单元的载体。在 RxJava2 内部实现中,通常一个或多个 Worker 对应一个 ScheduledThreadPoolExecutor 对象,这里暂不深入探讨。
在 RxJava 1.x 中,Scheduler 没有 scheduleDirect/schedulePeriodicallyDirect 方法,只能先创建 Worker,再通过 Worker 来调度任务。这些方法是对 Worker 调度的简化,可以理解为创建一个只能调度一次任务的 Worker 并立即调度该任务。在 Scheduler 基类的源码中,默认实现是直接创建 Worker 并创建对应的 Task(虽然在部分 Scheduler 的覆盖实现上并没有创建 Worker,但可以认为存在虚拟的 Worker)。
一个 Scheduler 可以创建多个 Worker,这两者是一对多的关系,而 Worker 与 Task 也是一对多的关系。Worker 的存在旨在确保两件事:统一调度 Runnable 和统一取消任务。例如,在 observeOn 操作符中,可以通过 Worker 来统一调度和取消一系列的 Runnable。
RxJava2 默认内置了多种 Scheduler 实现,适用于不同场景,这些 Scheduler 都可以在 Schedulers 类中直接获得。以下是两个常用 Scheduler 的源码分析:computation 和 io。
NewThreadWorker 在 computation、io 和 newThread 中都有涉及,下面简单了解一下这个类。NewThreadWorker 与 ScheduledThreadPoolExecutor 之间是一对一的关系,在构造函数中通过工厂方法创建一个 corePoolSize 为 1 的 ScheduledThreadPoolExecutor 对象并持有。
ScheduledThreadPoolExecutor 从 JDK1.5 开始存在,这个类继承于 ThreadPoolExecutor,支持立即、延时和周期性任务。但是注意,在 ScheduledThreadPoolExecutor 中,maximumPoolSize 参数是无效的,corePoolSize 表示最大线程数,且它的队列是无界的。这里不再深入探讨该类,否则会涉及太多内容。
有了这个类,RxJava2 在实现 Worker 时就站在了巨人的肩膀上,线程调度可以直接使用该类解决,唯一的麻烦之处就是封装一层 Disposable 的逻辑。
ComputationScheduler 是计算密集型的 Scheduler,其线程数与 CPU 核心数密切相关。当线程数远超过 CPU 核心数目时,CPU 的时间更多地损耗在了线程的上下文切换。因此,保持最大线程数与 CPU 核心数一致是比较通用的方式。
FixedSchedulerPool 可以看作是固定数量的真正 Worker 的缓存池。确定了 MAX_THREADS 后,在 ComputationScheduler 的构造函数中会创建 FixedSchedulerPool 对象,FixedSchedulerPool 内部会直接创建一个长度为 MAX_THREADS 的 PoolWorker 数组。PoolWorker 继承自 NewThreadWorker,但没有任何额外的代码。
PoolWorker 的使用方法是从池子里取一个 PoolWorker 并返回。但是需要注意,每个 Worker 是独立的,每个 Worker 内部的任务是绑定在这个 Worker 中的。如果按照上述方法暴露 PoolWorker,会出现两个问题:
为了解决上述问题,需要在 PoolWorker 外再包一层 EventLoopWorker。EventLoopWorker 是一个代理对象,它会将 Runnable 代理给 FixedSchedulerPool 中取到的 PoolWorker 来调度,并负责管理通过它创建的任务。当自身被取消时,会将创建的任务全部取消。
与 ComputationScheduler 恰恰相反,IoScheduler 的线程数是无上限的。这是因为 IO 设备的速度远低于 CPU 速度,在等待 IO 操作时,CPU 往往是闲置的。因此,应该创建更多的线程让 CPU 尽可能地利用。当然,并不是线程越多越好,线程数目膨胀到一定程度会影响 CPU 的效率,也会消耗大量的内存。在 IoScheduler 中,每个 Worker 在空置一段时间后就会被清除以控制线程的数目。
CachedWorkerPool 是一个变长并定期清理的 ThreadWorker 的缓存池,内部通过一个 ConcurrentLinkedQueue 维护。和 PoolWorker 类似,ThreadWorker 也是继承自 NewThreadWorker。仅仅是增加了一个 expirationTime 字段,用来标识这个 ThreadWorker 的超时时间。
在 CachedWorkerPool 初始化时,会传入 Worker 的超时时间,目前是写死的 秒。这个超时时间表示 ThreadWorker 闲置后最大存活时间(实际中不保证 秒时被回收)。
IoScheduler 中也存在一个 EventLoopWorker 类,它和 ComputationScheduler 中的作用类似。因为 CachedWorkerPool 是每隔 秒清理一次队列的,所以 ThreadWorker 的存活时间取决于入队的时机。如果一直没有被再次取出,其被实际清理的延迟在 - 秒之间。
熟悉线程的读者会发现,ComputationScheduler 与 IoScheduler 很像某些参数下的 ThreadPoolExecutor。它们对线程的控制外在表现很相似,但实际的线程执行对象不一样。这两者的对比有助于我们更深刻地理解 Scheduler 设计的内在逻辑。
Scheduler 是 RxJava 线程的核心概念,RxJava 基于此屏蔽了 Thread 相关的概念,只与 Scheduler/Worker/Runnable 打交道。
本来计划继续基于 Scheduler 和大家一起探讨 subscribeOn 与 observeOn,但考虑到篇幅问题,这些留待下篇分享。
感谢大家的阅读,欢迎关注笔者的公众号,可以第一时间获取更新,同时欢迎留言沟通。