- 精尽 Dubbo 原理与源码专栏( 已经完成 69+ 篇,预计总共 75+ 篇 )
- 中文详细注释的开源项目
- Java 并发源码合集
- RocketMQ 源码合集
- Sharding-JDBC 源码解析合集
- Spring MVC 和 Security 源码合集
- MyCAT 源码解析合集
**本文主要基于 Eureka 1.8.X 版本**
- 概述
- 整体流程
- 任务处理器
- 4. 创建任务分发器
4.1 批量任务执行分发器
4.2 单任务执行分发器
6.1 创建批量任务执行器
6.2 创建单任务执行器
6.3 工作线程抽象类
10.1 批量任务工作线程
10.2 单任务工作线程
1. 概述
本文主要分享 任务批处理。Eureka-Server 集群通过任务批处理同步应用实例注册实例,所以本文也是为 Eureka-Server 集群同步的分享做铺垫。本文涉及类在
com.netflix.eureka.util.batcher
包下,涉及到主体类的类图如下( 打开大图 ):
- 紫色部分 —— 任务分发器
- 蓝色部分 —— 任务接收器
- 红色部分 —— 任务执行器
- 绿色部分 —— 任务处理器
- 黄色部分 —— 任务持有者( 任务 )
推荐 Spring Cloud 书籍:
- 请支持正版。下载盗版,等于主动编写低级 BUG 。
- 程序猿DD —— 《Spring Cloud微服务实战》
- 周立 —— 《Spring Cloud与Docker微服务架构实战》
- 两书齐买,京东包邮。
推荐 Spring Cloud 视频:
- Java 微服务实践 - Spring Boot
- Java 微服务实践 - Spring Cloud
- Java 微服务实践 - Spring Boot / Spring Cloud
2. 整体流程
任务执行的整体流程如下( 打开大图 ):
细箭头 —— 任务执行经历的操作
粗箭头 —— 任务队列流转的方向
- **不同于**一般情况下,任务提交了**立即**同步或异步执行,任务的执行拆分了**三层队列**:
蓝线:分发器在收到任务执行请求后,提交到接收队列,任务实际未执行。
黄线:执行器的工作线程处理任务失败,将符合条件( 见 「3. 任务处理器」 )的失败任务提交到重新执行队列。
第一层,接收队列(
acceptorQueue
),重新处理队列(reprocessQueue
)。接收队列,避免处理任务的阻塞等待。
接收线程( Runner )合并任务,将相同任务编号( 是的,任务是带有编号的 )的任务合并,只执行一次。
Eureka-Server 为集群同步提供批量操作多个应用实例的接口,一个批量任务可以一次调度接口完成,避免多次调用的开销。当然,这样做的前提是合并任务,这也导致 Eureka-Server 集群之间对应用实例的注册和下线带来更大的延迟。毕竟,Eureka 是在 CAP 之间,选择了 AP。
3. 任务处理器
com.netflix.eureka.util.batcher.TaskProcessor
,任务处理器接口。接口代码如下:
// ... 省略代码,超过微信文章上限
-
- `Success` ,成功。
- `Congestion` ,拥挤错误,**任务将会被重试**。例如,请求被限流。
- `TransientError` ,瞬时错误,**任务将会被重试**。例如,网络请求超时。
- `PermanentError` ,永久错误,**任务将会被丢弃**。例如,执行时发生程序异常。
4. 创建任务分发器
com.netflix.eureka.util.batcher.TaskDispatcher
,任务分发器接口。接口代码如下:
// ... 省略代码,超过微信文章上限
#process(…)
方法,提交任务编号,任务,任务过期时间给任务分发器处理。
com.netflix.eureka.util.batcher.TaskDispatchers
,任务分发器工厂类,用于创建任务分发器。其内部提供两种任务分发器的实现:
- 批量任务执行的分发器,用于 Eureka-Server 集群注册信息的同步任务。
- 单任务执行的分发器,用于 Eureka-Server 向亚马逊 AWS 的 ASG ( Autoscaling Group ) 同步状态。虽然本系列暂时对 AWS 相关的不做解析,从工具类的角度来说,本文会对该分发器进行分享。
com.netflix.eureka.cluster.ReplicationTaskProcessor
,实现 TaskDispatcher ,Eureka-Server 集群任务处理器。感兴趣的同学,可以点击链接自己研究,我们将在 《Eureka 源码解析 —— Eureka-Server 集群同步》 有详细解析。
4.1 批量任务执行分发器
调用
TaskDispatchers#createBatchingTaskDispatcher(...)
方法,创建批量任务执行的分发器,实现代码如下:
// TaskDispatchers.java
1: /**
2: * 创建批量任务执行的分发器
3: *
4: * @param id 任务执行器编号
5: * @param maxBufferSize 待执行队列最大数量
6: * @param workloadSize 单个批量任务包含任务最大数量
7: * @param workerCount 任务执行器工作线程数
8: * @param maxBatchingDelay 批量任务等待最大延迟时长,单位:毫秒
9: * @param congestionRetryDelayMs 请求限流延迟重试时间,单位:毫秒
10: * @param networkFailureRetryMs 网络失败延迟重试时长,单位:毫秒
11: * @param taskProcessor 任务处理器
12: * @param ID 任务编号泛型
13: * @param 任务泛型
14: * @return 批量任务执行的分发器
15: */
// ... 省略代码,超过微信文章上限
-
- `workloadSize` 参数,单个批量任务包含任务最大数量。
- `taskProcessor` 参数,**自定义任务执行器实现**。
- 第 32 至 35 行 :
#process()
方法的实现,调用AcceptorExecutor#process(…)
方法,提交 [ 任务编号 , 任务 , 任务过期时间 ] 给任务分发器处理。
4.2 单任务执行分发器
调用
TaskDispatchers#createNonBatchingTaskDispatcher(...)
方法,创建单任务执行的分发器,实现代码如下:
1: /**
2: * 创建单任务执行的分发器
3: *
4: * @param id 任务执行器编号
5: * @param maxBufferSize 待执行队列最大数量
6: * @param workerCount 任务执行器工作线程数
7: * @param maxBatchingDelay 批量任务等待最大延迟时长,单位:毫秒
8: * @param congestionRetryDelayMs 请求限流延迟重试时间,单位:毫秒
9: * @param networkFailureRetryMs 网络失败延迟重试时长,单位:毫秒
10: * @param taskProcessor 任务处理器
11: * @param ID 任务编号泛型
12: * @param 任务泛型
13: * @return 单任务执行的分发器
14: */
15: public static ID, TaskDispatcherID, createNonBatchingTaskDispatcher(String id,
16: int maxBufferSize,
17: int workerCount,
18: long maxBatchingDelay,
19: long congestionRetryDelayMs,
20: long networkFailureRetryMs,
21: TaskProcessor taskProcessor) {
22: // 创建 任务接收执行器
23: final AcceptorExecutorID, acceptorExecutor = new AcceptorExecutor(
24: id, maxBufferSize, /* workloadSize = 1 */1, maxBatchingDelay, congestionRetryDelayMs, networkFailureRetryMs
25: );
26: final TaskExecutorsID, taskExecutor = TaskExecutors.singleItemExecutors(id, workerCount, taskProcessor, acceptorExecutor);
27: return new TaskDispatcherID, () {
28: @Override
29: public void process(ID id, T task, long expiryTime) {
30: acceptorExecutor.process(id, task, expiryTime);
31: }
32:
33: @Override
34: public void shutdown() {
35: acceptorExecutor.shutdown();
36: taskExecutor.shutdown();
37: }
38: };
39: }
-
- `workloadSize` 参数,相比 `#createBatchingTaskDispatcher(…)` 少这个参数。**在第 24 行,你会发现该参数传递给 AcceptorExecutor 使用 1 噢**。
- `taskProcessor` 参数,**自定义任务执行器实现**。
5. 创建任务接收执行器
com.netflix.eureka.util.batcher.AcceptorExecutor
,任务接收执行器。创建构造方法代码如下:
// ... 省略代码,超过微信文章上限
-
- 眼尖如你,会发现 AcceptorExecutor 即存在单任务工作队列( `singleItemWorkQueue` ),又存在批量任务工作队列( `batchWorkQueue` ) ,在 「9. 任务接收线程【调度任务】」 会解答这个疑惑。
6. 创建任务执行器
com.netflix.eureka.util.batcher.TaskExecutors
,任务执行器。其内部提供创建单任务和批量任务执行器的两种方法。TaskExecutors 构造方法如下:
// ... 省略代码,超过微信文章上限
workerThreads
属性,工作线程池。工作任务队列会被工作线程池并发拉取,并发执行。com.netflix.eureka.util.batcher.TaskExecutors.WorkerRunnableFactory
,创建工作线程工厂接口。单任务和批量任务执行器的工作线程实现不同,通过自定义工厂实现类创建。
6.1 创建批量任务执行器
调用
TaskExecutors#batchExecutors(...)
方法,创建批量任务执行器。实现代码如下:
/**
* 创建批量任务执行器
*
* @param name 任务执行器名
* @param workerCount 任务执行器工作线程数
* @param processor 任务处理器
* @param acceptorExecutor 接收任务执行器
* @param ID 任务编号泛型
* @param 任务泛型
* @return 批量任务执行器
*/
// ... 省略代码,超过微信文章上限
com.netflix.eureka.util.batcher.TaskExecutors.WorkerRunnable.BatchWorkerRunnable
,批量任务工作线程。
6.2 创建单任务执行器
调用
TaskExecutors#singleItemExecutors(...)
方法,创建批量任务执行器。实现代码如下:
/**
* 创建单任务执行器
*
* @param name 任务执行器名
* @param workerCount 任务执行器工作线程数
* @param processor 任务处理器
* @param acceptorExecutor 接收任务执行器
* @param ID 任务编号泛型
* @param 任务泛型
* @return 单任务执行器
*/
// ... 省略代码,超过微信文章上限
com.netflix.eureka.util.batcher.TaskExecutors.WorkerRunnable.SingleTaskWorkerRunnable
,单任务工作线程。
6.3 工作线程抽象类
com.netflix.eureka.util.batcher.TaskExecutors.WorkerRunnable
,任务工作线程抽象类。BatchWorkerRunnable 和 SingleTaskWorkerRunnable 都实现该类,差异在
#run()
的自定义实现。WorkerRunnable 实现代码如下:
// ... 省略代码,超过微信文章上限
7. 网络通信整形器
com.netflix.eureka.util.batcher.TrafficShaper
,网络通信整形器。当任务执行发生请求限流,或是请求网络失败的情况,则延时 AcceptorRunner 将任务提交到工作任务队列,从而避免任务很快去执行,再次发生上述情况。TrafficShaper 实现代码如下:
// ... 省略代码,超过微信文章上限
#registerFailure(…)
,在任务执行失败时,提交任务结果给 TrafficShaper ,记录发生时间。在 「10. 任务执行器【执行任务】」 会看到调用该方法。#transmissionDelay(…)
,计算提交延迟,单位:毫秒。「9. 任务接收线程【调度任务】」 会看到调用该方法。
8. 任务接收执行器【处理任务】
调用
AcceptorExecutor#process(...)
方法,添加任务到接收任务队列。实现代码如下:
// AcceptorExecutor.java
// ... 省略代码,超过微信文章上限
`// ... 省略代码,超过微信文章上限`
9. 任务接收线程【调度任务】
后台线程执行
AcceptorRunner#run(...)
方法,调度任务。实现代码如下:
// ... 省略代码,超过微信文章上限
- 第 4 行 :无限循环执行调度,直到关闭。
- 第 6 至 7 行 :调用 `#drainInputQueues()` 方法,**循环**处理完输入队列( 接收队列 + 重新执行队列 ),**直到**有待执行的任务。实现代码如下:
// ... 省略代码,超过微信文章上限
- 第 4 行 :优先从重新执行任务的队尾拿较新的任务,从而实现保留更新的任务在待执行任务映射(
pendingTasks
) 里。 - 第 12 行 :添加任务编号到待执行队列( `processingOrder` ) 的头部。效果如下图:
`// ... 省略代码,超过微信文章上限`
`// ... 省略代码,超过微信文章上限`
// ... 省略代码,超过微信文章上限
- 当
scheduleTime
小于当前时间,不重新计算,即此时需要延迟等待调度。 - 当
scheduleTime
大于等于当前时间,配合TrafficShaper#transmissionDelay(…)
重新计算。
// ... 省略代码,超过微信文章上限
- x
- 第 2 行 :调用 `#hasEnoughTasksForNextBatch()` 方法,判断是否有足够任务进行下一次批量任务调度:1)待执行任务( `processingOrder` )映射已满;或者 2)到达批量任务处理最大等待延迟。实现代码如下:
`// ... 省略代码,超过微信文章上限`
- 第 5 至 17 行 :获取批量任务(
holders
)。😈 你会发现,本文说了半天的批量任务,实际是Listtaskholderid, t="" style="font-size: inherit;color: inherit;line-height: inherit;"/id,>/taskholderid,/id,
哈。 - 第 4 行 :获取批量任务工作请求信号量(
batchWorkRequests
) 。在任务执行器的批量任务执行器,每次执行时,发出batchWorkRequests
。每一个信号量需要保证获取到一个批量任务。 - 第 19 至 20 行 :未调度到批量任务,释放请求信号量,代表请求实际未完成,每一个信号量需要保证获取到一个批量任务。
- 第 21 至 24 行 :添加批量任务到批量任务工作队列。
- 第 23 行 :调用
#assignSingleItemWork()
方法,调度单任务。
// ... 省略代码,超过微信文章上限
- x
10. 任务执行器【执行任务】
10.1 批量任务工作线程
批量任务工作后台线程( BatchWorkerRunnable )执行
#run(...)
方法,调度任务。实现代码如下:
//
// ... 省略代码,超过微信文章上限
- 第 4 行 :无限循环执行调度,直到关闭。
- 第 6 行 :调用 `getWork()` 方法,获取**一个**批量任务直到成功。实现代码如下:
`// ... 省略代码,超过微信文章上限`
- 注意,批量任务工作队列(
batchWorkQueue
) 和单任务工作队列(singleItemWorkQueue
) 是不同的队列。 - 第 3 行 :调用 `TaskDispatcher#requestWorkItems()` 方法,发起请求信号量,并获得批量任务的工作队列。实现代码如下:
// TaskDispatcher.java // ... 省略代码,超过微信文章上限
- 第 5 至 8 行 :循环获取一个批量任务,直到成功。
// TaskDispatcher.java
// ... 省略代码,超过微信文章上限
// ... 省略代码,超过微信文章上限
- x
// AcceptorExecutor.java
// ... 省略代码,超过微信文章上限
10.2 单任务工作线程
单任务工作后台线程( SingleTaskWorkerRunnable )执行
#run(...)
方法,调度任务,和
BatchWorkerRunnable#run(...)
基本类似,就不啰嗦了。实现代码如下:
@Override
// SingleTaskWorkerRunnable.java
// ... 省略代码,超过微信文章上限
666. 彩蛋
😈 又是一篇长文。建议边看代码,边对照着整体流程图,理解实际不难。当然,欢迎你有任何疑问,在我的公众号( 芋道源码 ) 留言。胖友,分享我的公众号( 芋道源码 ) 给你的胖友可好?
如果你对 Dubbo 感兴趣,欢迎加入我的知识星球一起交流。
目前在知识星球(https://t.zsxq.com/2VbiaEu)更新了如下 Dubbo 源码解析如下:01. 调试环境搭建
02. 项目结构一览
- 配置 Configuration
04. 核心流程一览05. 拓展机制 SPI06. 线程池07. 服务暴露 Export08. 服务引用 Refer09. 注册中心 Registry10. 动态编译 Compile11. 动态代理 Proxy12. 服务调用 Invoke13. 调用特性 14. 过滤器 Filter15. NIO 服务器16. P2P 服务器17. HTTP 服务器18. 序列化 Serialization19. 集群容错 Cluster20. 优雅停机21. 日志适配22. 状态检查23. 监控中心 Monitor24. 管理中心 Admin25. 运维命令 QOS26. 链路追踪 Tracing…
一共 60 篇++