1. 概述
本文主要分享 SkyWalking Agent Remote 远程通信服务。该服务用于 Agent 和Collector 集群的通信。
在《SkyWalking 源码分析 —— Collector Naming Server 命名服务》 一文中,我们已经看到,Agent 使用定时轮询,从 Collector Naming Server 中,获得 Collector 集群的 Collector Agent gRPC Server 的所有地址。
- 红框部分,即为 Agent 和Collector 集群的通信部分。
- 另外,Collector 也提供 Collector Agent Jetty Server ,目前暂不使用。相比来说,Collector Agent gRPC Server 性能更优。
2. GRPCChannelManager
org.skywalking.apm.agent.core.remote.GRPCChannelManager
,实现 BootService 、Runnable 接口,gRPC Channel 管理器。GRPCChannelManager 负责管理与 Collector Agent gRPC Server 集群的连接的管理,提供给其他服务使用。
- managedChannel 属性,连接 gRPC Server 的 Channel 。同一时间,GRPCChannelManager 只连接一个 Collector Agent gRPC Server 节点,并且在 Channel 不因为各种网络问题断开的情况下,持续保持。
- connectCheckFuture 属性,定时重连 gRPC Server 的定时任务。
- reconnect 属性,是否重连。当 Channel 未连接需要连接,或者 Channel 断开需要重连时,标记 reconnect = true 。后台线程会根据该标识进行连接( 重连 )。
- listeners 属性,监听器( org.skywalking.apm.agent.core.remote.GRPCChannelListener ) 数组。使用 Channel 的其他服务,注册监听器到 GRPCChannelManager 上,从而根据连接状态( org.skywalking.apm.agent.core.remote.GRPCChannelStatus ),实现自定义逻辑。
#boot() 实现方法,调用 ScheduledExecutorService#scheduleAtFixedRate(...) 方法,创建定时任务。该定时任务无初始化延迟,每 Config.GRPC_CHANNEL_CHECK_INTERVAL ( 默认:30 s ) 执行一次 #run() 方法。
#run() 实现方法,执行 Channel 的连接( 重连 )逻辑。代码如下:
- 第 99 行:当 reconnect = true 时,才执行连接( 重连 )。
- 第 100 行:当本地已经获取到 Collector Agent gRPC Server 集群地址时,参见 《SkyWalking 源码分析 —— Collector Naming Server 命名服务》 。
- 第 101 至 106 行:随机选择准备链接的 Collector Agent gRPC Server 。
- 第 107 至 113 行:创建 Channel 并进行连接。此处主要是 gRPC 的 API 使用,不熟悉的胖友,请 Google 下进行了解了解。
- 第 115 至 117 行:连接成功,标记 reconnect = false ,这样,下次执行 #run() 方法不会重连。而后,调用 #notify(GRPCChannelStatus.CONNECTED) 方法,通知监听器连接成功。
- 第 118 至 121 行:连接成功,不标记 reconnect ,这样,下次执行 #run() 方法会继续重连。而后,调用 #notify(GRPCChannelStatus.DISCONNECT) 方法,通知监听器连接处于断开状态。
- 第 124 至 126 行:连接异常,不标记 reconnect ,这样,下次执行 #run() 方法会继续重连。而后,调用 #notify(GRPCChannelStatus.DISCONNECT) 方法,通知监听器连接处于断开状态。
实际使用中,Channel 可能因为各种原因断开,那么 GRPCChannelManager 是怎么检测的呢?在使用 Channel 的其他服务,当使用 Channel 时发生异常,调用 #reportError(Throwable) 方法,判断是否为网络异常( #isNetworkError(Throwable) ) 。若是,标记 reconnect = true ,等待后台进行重连。
3. GRPCChannelListener
org.skywalking.apm.agent.core.remote.GRPCChannelListener ,gRPC Channel 的监听器接口,定义了 #statusChanged(GRPCChannelStatus) ,通知 gRPC Channel 状态变更。
GRPCChannelListener 实现类如下图,后续文章会详细解析。