16、SkyWalking 源码分析 Collector Streaming Computing 流式处理(一)

1. 概述

本文主要分享 Collector Streaming 流式处理。主要包含如下部分:

  • apm-collector-core 模块的 graph 包,提供最精简单节点的流式处理的封装。如下图所示:

![ ][nbsp]

  • apm-collector-stream 模块,在 graph 包的基础上,提供异步跨节点等等的流式处理的封装。如下图所示:

![ ][nbsp 1]

免打脸大保健:笔者对流式处理非常不了解,本文可能是一本正经的胡说八道。考虑到笔者是靠脸吃饭(颜值我只服我红雷哥),所以读者老爷请爱护下笔者。

Collector Streaming 在 SkyWalking 架构图处于如下位置( 红框 ) :

FROM [https://github.com/apache/incubating-skywalking][https_github.com_apache_incubating-skywalking]

![ ][nbsp 2]

OK,下面来一本正经的代码走起!

2. apm-collector-core/graph

整体类图如下:

![ ][nbsp 3]

看起来略复杂,不要方,我们先来看一个流式大数据处理框架 Apache Storm 的说明:

FROM [《流式大数据处理的三种框架:Storm,Spark和Samza》][Storm_Spark_Samza]
在[Storm][] 中,先要设计一个用于实时计算的图状结构,我们称之为拓扑(topology)。这个拓扑将会被提交给集群,由集群中的主控节点(master node)分发代码,将任务分配给工作节点(worker node)执行。

  • Graph :定义了一个数据各个 Node 的处理拓扑图。

  • WayToNode :提交数据给 Node 的方式

  • Node :节点,包含一个 NodeProcessor 和 一个 Next 。

  • NodeProcessor :Node 处理器,处理数据

  • Next :包含 WayToNode 数组,即 Node 提交数据给 Next 的 Node 数组方式

整体交互流程如下:

![ ][nbsp 4]

  • 粉色箭头:当数据进来时,提交给 Grpah 。按照定义的拓扑图,使用 NodeWay 提交给 Node ,NodeProcessor 进行处理。

  • 蓝色箭头:当 NodeProcessor 处理完成后,Next 逐个使用 NodeWay 数组提交给下面的 Node ,继续处理。

  • ps :注意,这块流程,根据不同的 NodeProcessor 的实现类会有不同,蓝色箭头的过程,只是其中的一种,下面会详细解析。

整体顺序图如下:

![ ][nbsp 5]

  • DirectWay 是 WayToNode 接口的一种实现,正如其名,直接提交数据给 Node 。在 [「3. apm-collector-stream」][3. apm-collector-stream] 会看到其他实现,例如提交到其他服务器节点的 Node,从而实现跨服务器节点的流式处理。
  • AbstractWorker 在 apm-collector-stream 模块,是 NodeProcessor 接口的一种实现,处理提交给 Node 的数据。在 #onWork(message) 抽象方法里,子类可以实现该方法,根据自身需求,是否调用 #onNext(message) 方法,Next 逐个使用 NodeWay 数组提交给下面的 Node ,继续处理。

下面,我们来详细分别看看如下逻辑的详细代码实现:

  • Graph 创建
  • Graph 启动

2.1 Graph 创建

创建Graph 的顺序图如下:

![ ][nbsp 6]

  • 第一步,调用 GraphManager#createIfAbsent(graphId, input) 方法( input 参数没用 ),创建一个 Graph 对象。
  • 第二步,调用 Graph#addNode(WayToNode) 方法,创建该 Graph 的首个 Node 对象。
  • 第三步,调用 Node#addNext(WayToNode) 方法,创建该 Node 的下一个 Node 对象。

如下是 collector-agent-stream-provider 模块,TraceStreamGraph#createServiceReferenceGraph() 方法的代码:


<table> 
<tbody> 
<tr> 
   <td> <pre><br><p>```java</p><br><code>public void createServiceReferenceGraph() {
QueueCreatorService<ServiceReference> queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class);
RemoteSenderService remoteSenderService = moduleManager.find(RemoteModule.NAME).getService(RemoteSenderService.class);

Graph<ServiceReference> graph = GraphManager.INSTANCE.createIfAbsent(SERVICE_REFERENCE_GRAPH_ID, ServiceReference.class);
graph.addNode(new ServiceReferenceAggregationWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener))
.addNext(new ServiceReferenceRemoteWorker.Factory(moduleManager, remoteSenderService, SERVICE_REFERENCE_GRAPH_ID).create(workerCreateListener))
.addNext(new ServiceReferencePersistenceWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener));
}


让我们来看看每个方法的具体代码实现。

--------------------

**第一步**

[GraphManager#createIfAbsent(graphId, input)][GraphManager_createIfAbsent_graphId_ input] 方法,创建一个 Graph 对象,并添加到 Graph 映射。代码如下:

 *  INSTANCE 属性,单例。
 *  allGraphs 属性,Graph 映射。其中映射的 KEY 为**每个** Graph 全局唯一编号。在 [JvmMetricStreamGraph][] 、[RegisterStreamGraph][] 、[TraceStreamGraph][] 类中,枚举了实际使用的 Graph 编号们。
 *  第 50 至 58 行:当 Graph 映射里不存在指定 Graph 编号时,创建 Graph 对象,并返回。

--------------------

**第二步**

[Graph#addNode(WayToNode)][Graph_addNode_WayToNode] 方法,创建该 Graph 的**首个** Node 对象。代码如下:

 *  id 属性,Graph 编号。
 *  entryWay,**首个**提交数据给 Node 的方式。
 *  第 58 行 :将方法参数 entryWay 赋值给 this.entryWay 属性。在下分享的 Graph#start(input) 方法里,我们会看到这是 Graph 启动的入口,**首个**提交给 Node 的方式。
 *  第 60 至 62 行 :调用 WayToNode#buildDestination(Graph) 方法,创建 Node 对象,并**返回该 Node** 。在上文中,我们已经说过创建的 Node 对象,为该 Graph 的**首个** Node 。

[WayToNode#buildDestination()][WayToNode_buildDestination] 方法,创建该 **WayToNode** 的 Node 对象。代码如下:

 *  destination 属性,目标 Node 。即该 WayToNode 提交**数据**到的 Node 。
 *  destinationHandler 属性,目标 Node 的处理器。见 [#out(INPUT)][out_INPUT] 方法。
 *  第 42 行:创建 Node 对象。

 *  目前,destinationHandler 属性,除了用于创建 Node 对象,无其他用途。

[Node 构造方法][Node] 方法,代码如下:

 *  nodeProcessor 属性,节点处理器。
 *  next 属性,包含 WayToNode 数组,即 Node 提交数据给 Next 的 Node 数组的方式。
 *  第 44 行:调用 Graph#checkForNewNode(Node) 方法,校验 Node 的 NodeProcessor 在其 Graph 里,**编号唯一**。

[Graph#checkForNewNode(Node)][Graph_checkForNewNode_Node] 方法,校验 Node 的 NodeProcessor 在 Graph 里,**编号唯一**,代码如下:

 *  nodeIndex 属性,处理器编号与 Node 的映射。其中映射的 KEY 为 NodeProcessor#id() 。
 *  第 72 至 78 行:校验 Node 的 NodeProcessor 在 Graph 里,**编号唯一**。

--------------------

**第三步**

[Node#addNext(WayToNode)][Node_addNext_WayToNode] 方法,创建该 Node 的下一个 Node 对象。代码如下:

 *  第 54 行:调用 WayToNode#buildDestination(Graph) 方法,创建该 Node 的下面的 Node 对象。
 *  第 56 行:添加创建的 Node 对象到 next 属性。
 *  第 58 行:返回创建的 Node 对象。

### 2.2 Graph 启动 ###

创建Graph 的顺序图如下:

![ ][nbsp 7]

数据流向 FROM TO 逻辑
第一步 Graph WayToNode
第二步 WayToNode Node
第三步 Node NodeProcessor
第四步 NodeProcessor Next 根据具体实现,若到 Next ,重复第一步
![ ][nbsp 8] -------------------- **第一步** [Graph#start(input)][Graph_start_input] 方法,启动 Graph ,处理数据。代码如下: * 第 49 行:调用 WayToNode#in(input) 方法,输入数据给 WayToNode 。 [WayToNode#in(input)][WayToNode_in_input] **抽象**方法,以 [DirectWay#in(input)][DirectWay_in_input] **实现**方法举例子,代码如下: * 第 30 行:调用 super#out(input) 方法,**直接**输出数据,调用 Node#execute(input) 方法,提交数据给 Node ,进行处理。 -------------------- **第二步** [Node#execute][Node_execute] 方法,调用 NodeProcessor#process(input, next) 方法,处理数据。 -------------------- **第三步** [NodeProcessor#process(input, next)][NodeProcessor_process_input_ next] **接口**方法,以 [AbstractWorker#process(input, next)][AbstractWorker_process_input_ next] **实现**方法举例子,代码如下: * 第 64 行:将方法参数 next 赋值给 this.next 属性。this.next 属性,用于封装的 #onNext(OUTPUT) 方法,提交数据给当前 Node 的 Next ( 下面的 Node 们 )继续处理数据。 * 第 67 行:调用 [#onWork][onWork] **抽象**方法,处理数据。当 AbstractWorker **抽象类**的实现类需要继续讲数据提交给 Next 时,需要在 #onWork 方法里,调用 #onNext(OUTPUT) 方法,例如 [ApplicationRegisterRemoteWorker#onWork(Application)][ApplicationRegisterRemoteWorker_onWork_Application] 。 -------------------- **第四步** [Next#execute(INPUT)][Next_execute_INPUT] 方法,**循环** WayToNode 数组,输入数据给 WayNode ,相当于”**重回**“【第一步】。 ## 3. apm-collector-stream ## 在文章的开头,我们提到了 apm-collector-stream 模块,在 graph 包的基础上,提供**异步**、**跨节点**等等的流式处理的封装。主要在 WayToNode 、NodeProcessor 的实现类上做文章。 ### 3.1 WayToNode 实现类 ### 整体类图如下: ![ ][nbsp 9] #### 3.1.1 WorkerRef #### [org.skywalking.apm.collector.stream.worker.base.WorkerRef][org.skywalking.apm.collector.stream.worker.base.WorkerRef] ,Worker 引用**抽象类**。 在apm-collector-stream 模块里,我们会发现类的命名从 Node / NodeProcessor 转向了 Worker ?**这是为什么呢**?关于这一点,我们特意采访( 请教 )了官方大佬。 >Worker 更具业务含义 >Node / Processor 更偏技术含义 目前,WorkerRef 无具体的方法。 #### 3.1.2 LocalAsyncWorkerRef #### org.skywalking.apm.collector.stream.worker.base.LocalAsyncWorkerRef ,异步 Worker 引用**实现类**,提供了**异步**的流式处理封装。 我们回到 [「2.2 Graph 创建」][3. apm-collector-stream] 的【第一步】。 [LocalAsyncWorkerRef#in(INPUT)][LocalAsyncWorkerRef_in_INPUT] 方法,代码如下: * [queueEventHandler][queueEventHandler] 属性,队列事件处理器。在 [《SkyWalking 源码分析 —— Collector Queue 队列组件》][SkyWalking _ _ Collector Queue] 我们会详细解析它的代码实现,这里只简单介绍下。 * 第 47 行:将输入的数据,作为”**事件**“,提交到队列事件处理器中,不再执行后续逻辑。此后,队列事件处理器,会在**后台**处理到该”**事件**“( 数据 ),回调 [LocalAsyncWorkerRef#execute][LocalAsyncWorkerRef_execute] 方法,从而提交数据到 Worker ( Node )。详细参见 [DisruptorEventHandler\#onEvent(…)][DisruptorEventHandler_onEvent] 方法。 **那么为什么会回调呢**?LocalAsyncWorkerRef 实现了 [org.skywalking.apm.collector.queue.base.QueueExecutor][org.skywalking.apm.collector.queue.base.QueueExecutor] 接口,它自身被设置到 QueueEventHandler 中, 作为”**事件**“的执行器。 整体流程如下: ![ ][nbsp 10] #### 3.1.3 RemoteWorkerRef #### org.skywalking.apm.collector.stream.worker.base.RemoteWorkerRef ,远程 Worker 引用**实现类**,提供了**远程跨节点**的流式处理的封装。 我们再回到 [「2.2 Graph 创建」][3. apm-collector-stream] 的【第一步】。 [RemoteWorkerRef#in(INPUT)][RemoteWorkerRef_in_INPUT] 方法,代码如下: * remoteSenderService 属性,远程发送服务。在 [《SkyWalking 源码分析 —— Collector Remote 远程通信服务》「3.2 GRPCRemoteSenderService」][SkyWalking _ _ Collector Remote _3.2 GRPCRemoteSenderService] 我们会详细解析它的代码实现,这里只简单介绍下。 * remoteWorker 属性,远程 Worker 。在下文会详细分享它的实现。 * 第 56 行:调用 RemoteSenderService#send(...) 方法,根据远程 Worker 的 [Selector 选择器][Selector],选择一个 Worker 进行发送。 * 第 58 至 60 行:当选择的 Worker 为本地模式( [Mode][] )时,调用 #out(INPUT) 方法,提交数据到本地的 Worker ( Node )。 ### 3.2 NodeProcessor 实现类 ### 整体类图如下: ![ ][nbsp 11] * [org.skywalking.apm.collector.stream.worker.base.Provider][org.skywalking.apm.collector.stream.worker.base.Provider] ,Worker 供应者**接口**,用于创建 Worker 和 WorkerRef 对象的**工厂**。 #### 3.2.1 AbstractWorker #### AbstractWorker 的代码实现,在 [「2.2 Graph 启动」][3. apm-collector-stream] 已经详细解析。 [org.skywalking.apm.collector.stream.worker.base.AbstractWorkerProvider][org.skywalking.apm.collector.stream.worker.base.AbstractWorkerProvider] ,Worker 供应者**抽象类**,定义了 [#workerInstance(ModuleManager)][workerInstance_ModuleManager] **抽象**方法,用于创建 Worker 对象。 #### 3.2.2 AbstractLocalAsyncWorker #### [org.skywalking.apm.collector.stream.worker.base.AbstractLocalAsyncWorker][org.skywalking.apm.collector.stream.worker.base.AbstractLocalAsyncWorker] ,异步 Worker **抽象类**。 目前,AbstractLocalAsyncWorker 无具体的方法。 实际使用时,继承 AbstractLocalAsyncWorker 类,实现 #work(INPUT) 方法,例如:[ApplicationRegisterSerialWorker][] 。 -------------------- org.skywalking.apm.collector.stream.worker.base.AbstractLocalAsyncWorkerProvider ,LocalAsyncWorker 供应者**抽象类**。 * [queueCreatorService][queueCreatorService] 属性,队列创建服务,用于创建 QueueEventHandler 对象。 * [#queueSize()][queueSize] **抽象**方法,声明队列大小。 * [#create(WorkerCreateListener)][create_WorkerCreateListener] **实现**方法,创建 AbstractLocalAsyncWorker 和 LocalAsyncWorkerRef 对象。 * 第 51 行:创建 AbstractLocalAsyncWorker **实现类**的对象。参见 [ApplicationRegisterSerialWorker.Factory\#workerInstance(ModuleManager)][ApplicationRegisterSerialWorker.Factory_workerInstance_ModuleManager] 方法。 * 第 54 行:添加 AbstractLocalAsyncWorker 到 WorkerCreateListener ( Worker 创建监听器 )。WorkerCreateListener 在 [《SkyWalking 源码分析 —— Collector Streaming Computing 流式处理(二)》「4.1 WorkerCreateListener」][SkyWalking _ _ Collector Streaming Computing _4.1 WorkerCreateListener] 详细解析。 * 第 57 行:创建 LocalAsyncWorkerRef 对象。 * 第 60 行:调用 QueueCreatorService#create(...) 方法,创建 QueueEventHandler 对象,**并设置 LocalAsyncWorkerRef 作为它的执行器**。 * 第 63 行:设置 LocalAsyncWorkerRef 的 QueueEventHandler 属性。 #### 3.2.3 AbstractRemoteWorker #### [org.skywalking.apm.collector.stream.worker.base.AbstractRemoteWorker][org.skywalking.apm.collector.stream.worker.base.AbstractRemoteWorker] ,远程 Worker **抽象类**,定义了 [#selector()][selector] **抽象**方法,获得选择器。RemoteSenderService 根据选择器,调用 [RemoteClientSelector#select(...)][RemoteClientSelector_select_...] 方法,选择好远程节点,而后进行发送数据。 实际使用时,继承 AbstractLocalAsyncWorker 类,实现 #work(INPUT) 方法,例如:[ApplicationRegisterRemoteWorker][] 。 -------------------- org.skywalking.apm.collector.stream.worker.base.AbstractRemoteWorkerProvider ,AbstractRemoteWorker 供应者**抽象类**。 * [remoteSenderService][remoteSenderService] 属性,远程发送服务。 * [#create(WorkerCreateListener)][create_WorkerCreateListener 1] **实现**方法,创建 AbstractRemoteWorker 和 RemoteWorkerRef 对象。 * 第 58 行:创建 AbstractRemoteWorker **实现类**的对象。参见 [ApplicationRegisterRemoteWorker.Factory\#workerInstance(ModuleManager)][ApplicationRegisterRemoteWorker.Factory_workerInstance_ModuleManager] 方法。 * 第 61 行:添加 AbstractLocalAsyncWorker 到 WorkerCreateListener ( Worker 创建监听器 )。WorkerCreateListener 在 [《SkyWalking 源码分析 —— Collector Streaming Computing 流式处理(二)》「4.1 WorkerCreateListener」][SkyWalking _ _ Collector Streaming Computing _4.1 WorkerCreateListener] 详细解析。 * 第 64 行:创建 RemoteWorkerRef 对象。 [nbsp]: https://www.feiz.vip/images/pachong/2024/3/3/2124/1709472262450.png [nbsp 1]: https://www.feiz.vip/images/pachong/2024/3/3/2124/1709472262481.png [https_github.com_apache_incubating-skywalking]: https://github.com/apache/incubating-skywalking [nbsp 2]: https://www.feiz.vip/images/pachong/2024/3/3/2124/1709472262520.png [nbsp 3]: https://www.feiz.vip/images/pachong/2024/3/3/2124/1709472262560.png [Storm_Spark_Samza]: http://www.csdn.net/article/2015-03-09/2824135 [Storm]: https://storm.apache.org/ [nbsp 4]: https://www.feiz.vip/images/pachong/2024/3/3/2124/1709472262596.png [nbsp 5]: https://www.feiz.vip/images/pachong/2024/3/3/2124/1709472262634.png [3. apm-collector-stream]: https://www.iocoder.cn/SkyWalking/collector-streaming-first/# [nbsp 6]: https://www.feiz.vip/images/pachong/2024/3/3/2124/1709472262679.png [GraphManager_createIfAbsent_graphId_ input]: https://github.com/YunaiV/skywalking/blob/a0d559d08e87879a08bd7269b9651188083ce05e/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/graph/GraphManager.java#L50 [JvmMetricStreamGraph]: https://github.com/YunaiV/skywalking/blob/a0d559d08e87879a08bd7269b9651188083ce05e/apm-collector/apm-collector-agent-stream/collector-agent-stream-provider/src/main/java/org/skywalking/apm/collector/agent/stream/graph/JvmMetricStreamGraph.java [RegisterStreamGraph]: https://github.com/YunaiV/skywalking/blob/a0d559d08e87879a08bd7269b9651188083ce05e/apm-collector/apm-collector-agent-stream/collector-agent-stream-provider/src/main/java/org/skywalking/apm/collector/agent/stream/graph/RegisterStreamGraph.java [TraceStreamGraph]: https://github.com/YunaiV/skywalking/blob/a0d559d08e87879a08bd7269b9651188083ce05e/apm-collector/apm-collector-agent-stream/collector-agent-stream-provider/src/main/java/org/skywalking/apm/collector/agent/stream/graph/TraceStreamGraph.java [Graph_addNode_WayToNode]: https://github.com/YunaiV/skywalking/blob/a0d559d08e87879a08bd7269b9651188083ce05e/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/graph/Graph.java#L56 [WayToNode_buildDestination]: https://github.com/YunaiV/skywalking/blob/a0d559d08e87879a08bd7269b9651188083ce05e/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/graph/WayToNode.java#L41 [out_INPUT]: https://github.com/YunaiV/skywalking/blob/a0d559d08e87879a08bd7269b9651188083ce05e/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/graph/WayToNode.java#L47 [Node]: https://github.com/YunaiV/skywalking/blob/a0d559d08e87879a08bd7269b9651188083ce05e/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/graph/Node.java#L40 [Graph_checkForNewNode_Node]: https://github.com/YunaiV/skywalking/blob/a0d559d08e87879a08bd7269b9651188083ce05e/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/graph/Graph.java#L71 [Node_addNext_WayToNode]: https://github.com/YunaiV/skywalking/blob/a0d559d08e87879a08bd7269b9651188083ce05e/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/graph/Node.java#L51 [nbsp 7]: https://www.feiz.vip/images/pachong/2024/3/3/2124/1709472262707.png [nbsp 8]: https://www.feiz.vip/images/pachong/2024/3/3/2124/1709472262750.png [Graph_start_input]: https://github.com/YunaiV/skywalking/blob/a0d559d08e87879a08bd7269b9651188083ce05e/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/graph/Graph.java#L48 [WayToNode_in_input]: https://github.com/YunaiV/skywalking/blob/a0d559d08e87879a08bd7269b9651188083ce05e/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/graph/WayToNode.java#L45 [DirectWay_in_input]: https://github.com/YunaiV/skywalking/blob/a0d559d08e87879a08bd7269b9651188083ce05e/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/graph/DirectWay.java#L29 [Node_execute]: https://github.com/YunaiV/skywalking/blob/a0d559d08e87879a08bd7269b9651188083ce05e/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/graph/Node.java#L62 [NodeProcessor_process_input_ next]: https://github.com/YunaiV/skywalking/blob/a0d559d08e87879a08bd7269b9651188083ce05e/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/graph/NodeProcessor.java#L34 [AbstractWorker_process_input_ next]: https://github.com/YunaiV/skywalking/blob/a0d559d08e87879a08bd7269b9651188083ce05e/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/base/AbstractWorker.java#L62 [onWork]: https://github.com/YunaiV/skywalking/blob/a0d559d08e87879a08bd7269b9651188083ce05e/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/base/AbstractWorker.java#L60 [ApplicationRegisterRemoteWorker_onWork_Application]: https://github.com/YunaiV/skywalking/blob/a0d559d08e87879a08bd7269b9651188083ce05e/apm-collector/apm-collector-agent-stream/collector-agent-stream-provider/src/main/java/org/skywalking/apm/collector/agent/stream/worker/register/ApplicationRegisterRemoteWorker.java#L46 [Next_execute_INPUT]: https://github.com/YunaiV/skywalking/blob/a0d559d08e87879a08bd7269b9651188083ce05e/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/graph/Next.java#L50 [nbsp 9]: https://www.feiz.vip/images/pachong/2024/3/3/2124/1709472262782.png [org.skywalking.apm.collector.stream.worker.base.WorkerRef]: https://github.com/YunaiV/skywalking/blob/a0d559d08e87879a08bd7269b9651188083ce05e/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/base/WorkerRef.java [LocalAsyncWorkerRef_in_INPUT]: https://github.com/YunaiV/skywalking/blob/a0d559d08e87879a08bd7269b9651188083ce05e/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/base/LocalAsyncWorkerRef.java#L50 [queueEventHandler]: https://github.com/YunaiV/skywalking/blob/a0d559d08e87879a08bd7269b9651188083ce05e/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/base/LocalAsyncWorkerRef.java#L36 [SkyWalking _ _ Collector Queue]: http://www.iocoder.cn/SkyWalking/collector-queue-module/?self [LocalAsyncWorkerRef_execute]: https://github.com/YunaiV/skywalking/blob/a0d559d08e87879a08bd7269b9651188083ce05e/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/base/LocalAsyncWorkerRef.java#L46 [DisruptorEventHandler_onEvent]: https://github.com/YunaiV/skywalking/blob/a0d559d08e87879a08bd7269b9651188083ce05e/apm-collector/apm-collector-queue/collector-queue-disruptor-provider/src/main/java/org/skywalking/apm/collector/queue/disruptor/base/DisruptorEventHandler.java#L54 [org.skywalking.apm.collector.queue.base.QueueExecutor]: https://github.com/YunaiV/skywalking/blob/a0d559d08e87879a08bd7269b9651188083ce05e/apm-collector/apm-collector-queue/collector-queue-define/src/main/java/org/skywalking/apm/collector/queue/base/QueueExecutor.java [nbsp 10]: https://www.feiz.vip/images/pachong/2024/3/3/2124/1709472262849.png [RemoteWorkerRef_in_INPUT]: https://github.com/YunaiV/skywalking/blob/a0d559d08e87879a08bd7269b9651188083ce05e/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/base/RemoteWorkerRef.java#L53 [SkyWalking _ _ Collector Remote _3.2 GRPCRemoteSenderService]: http://www.iocoder.cn/SkyWalking/collector-remote-module?self [Selector]: https://github.com/YunaiV/skywalking/blob/a0d559d08e87879a08bd7269b9651188083ce05e/apm-collector/apm-collector-remote/collector-remote-define/src/main/java/org/skywalking/apm/collector/remote/service/Selector.java [Mode]: https://github.com/YunaiV/skywalking/blob/a0d559d08e87879a08bd7269b9651188083ce05e/apm-collector/apm-collector-remote/collector-remote-define/src/main/java/org/skywalking/apm/collector/remote/service/RemoteSenderService.java#L36 [nbsp 11]: https://www.feiz.vip/images/pachong/2024/3/3/2124/1709472262878.png [org.skywalking.apm.collector.stream.worker.base.Provider]: https://github.com/YunaiV/skywalking/blob/23c2146c134e0ef0a37a43758a1e04727de7697a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/base/Provider.java [org.skywalking.apm.collector.stream.worker.base.AbstractWorkerProvider]: https://github.com/YunaiV/skywalking/blob/a0d559d08e87879a08bd7269b9651188083ce05e/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/base/AbstractWorkerProvider.java [workerInstance_ModuleManager]: https://github.com/YunaiV/skywalking/blob/a0d559d08e87879a08bd7269b9651188083ce05e/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/base/AbstractWorkerProvider.java#L46 [org.skywalking.apm.collector.stream.worker.base.AbstractLocalAsyncWorker]: https://github.com/YunaiV/skywalking/blob/a0d559d08e87879a08bd7269b9651188083ce05e/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/base/AbstractLocalAsyncWorker.java [ApplicationRegisterSerialWorker]: https://github.com/YunaiV/skywalking/blob/a0d559d08e87879a08bd7269b9651188083ce05e/apm-collector/apm-collector-agent-stream/collector-agent-stream-provider/src/main/java/org/skywalking/apm/collector/agent/stream/worker/register/ApplicationRegisterSerialWorker.java#L56 [queueCreatorService]: https://github.com/YunaiV/skywalking/blob/a0d559d08e87879a08bd7269b9651188083ce05e/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/base/AbstractLocalAsyncWorkerProvider.java#L35 [queueSize]: https://github.com/YunaiV/skywalking/blob/a0d559d08e87879a08bd7269b9651188083ce05e/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/base/AbstractLocalAsyncWorkerProvider.java#L40 [create_WorkerCreateListener]: https://www.iocoder.cn/SkyWalking/collector-streaming-first/ [ApplicationRegisterSerialWorker.Factory_workerInstance_ModuleManager]: https://github.com/YunaiV/skywalking/blob/a0d559d08e87879a08bd7269b9651188083ce05e/apm-collector/apm-collector-agent-stream/collector-agent-stream-provider/src/main/java/org/skywalking/apm/collector/agent/stream/worker/register/ApplicationRegisterSerialWorker.java#L90 [SkyWalking _ _ Collector Streaming Computing _4.1 WorkerCreateListener]: http://www.iocoder.cn/SkyWalking/collector-streaming-second/?self [org.skywalking.apm.collector.stream.worker.base.AbstractRemoteWorker]: https://github.com/YunaiV/skywalking/blob/a0d559d08e87879a08bd7269b9651188083ce05e/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/base/AbstractRemoteWorker.java [selector]: https://github.com/YunaiV/skywalking/blob/a0d559d08e87879a08bd7269b9651188083ce05e/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/base/AbstractRemoteWorker.java#L45 [RemoteClientSelector_select_...]: https://github.com/YunaiV/skywalking/blob/a0d559d08e87879a08bd7269b9651188083ce05e/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/skywalking/apm/collector/remote/grpc/service/selector/RemoteClientSelector.java#L31 [ApplicationRegisterRemoteWorker]: https://github.com/YunaiV/skywalking/blob/a0d559d08e87879a08bd7269b9651188083ce05e/apm-collector/apm-collector-agent-stream/collector-agent-stream-provider/src/main/java/org/skywalking/apm/collector/agent/stream/worker/register/ApplicationRegisterRemoteWorker.java [remoteSenderService]: https://github.com/YunaiV/skywalking/blob/a0d559d08e87879a08bd7269b9651188083ce05e/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/base/AbstractRemoteWorkerProvider.java#L40 [create_WorkerCreateListener 1]: https://github.com/YunaiV/skywalking/blob/a0d559d08e87879a08bd7269b9651188083ce05e/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/base/AbstractRemoteWorkerProvider.java#L56 [ApplicationRegisterRemoteWorker.Factory_workerInstance_ModuleManager]: https://github.com/YunaiV/skywalking/blob/a0d559d08e87879a08bd7269b9651188083ce05e/apm-collector/apm-collector-agent-stream/collector-agent-stream-provider/src/main/java/org/skywalking/apm/collector/agent/stream/worker/register/ApplicationRegisterRemoteWorker.java#L61