07、SkyWalking 源码分析 Agent Remote 远程通信服务

1. 概述

本文主要分享 SkyWalking Agent Remote 远程通信服务。该服务用于 Agent 和Collector 集群的通信。

 

《SkyWalking 源码分析 —— Collector Naming Server 命名服务》 一文中,我们已经看到,Agent 使用定时轮询,从 Collector Naming Server 中,获得 Collector 集群的 Collector Agent gRPC Server 的所有地址

 

2. GRPCChannelManager

org.skywalking.apm.agent.core.remote.GRPCChannelManager ,实现 BootService 、Runnable 接口,gRPC Channel 管理器。GRPCChannelManager 负责管理与 Collector Agent gRPC Server 集群的连接的管理提供给其他服务使用

  • managedChannel 属性,连接 gRPC Server 的 Channel 。同一时间,GRPCChannelManager 只连接一个 Collector Agent gRPC Server 节点,并且在 Channel 不因为各种网络问题断开的情况下,持续保持
  • connectCheckFuture 属性,定时重连 gRPC Server 的定时任务
  • reconnect 属性,是否重连。当 Channel 未连接需要连接,或者 Channel 断开需要重连时,标记 reconnect = true 。后台线程会根据该标识进行连接( 重连 )。
  • listeners 属性,监听器( org.skywalking.apm.agent.core.remote.GRPCChannelListener ) 数组。使用 Channel 的其他服务,注册监听器到 GRPCChannelManager 上,从而根据连接状态( org.skywalking.apm.agent.core.remote.GRPCChannelStatus ),实现自定义逻辑。

#boot() 实现方法,调用 ScheduledExecutorService#scheduleAtFixedRate(...) 方法,创建定时任务。该定时任务无初始化延迟,每 Config.GRPC_CHANNEL_CHECK_INTERVAL ( 默认:30 s ) 执行一次 #run() 方法。

#run() 实现方法,执行 Channel 的连接( 重连 )逻辑。代码如下:

  • 第 99 行:当 reconnect = true 时,才执行连接( 重连 )。
  • 第 100 行:当本地已经获取到 Collector Agent gRPC Server 集群地址时,参见 《SkyWalking 源码分析 —— Collector Naming Server 命名服务》
  • 第 101 至 106 行:随机选择准备链接的 Collector Agent gRPC Server 。
  • 第 107 至 113 行:创建 Channel 并进行连接。此处主要是 gRPC 的 API 使用,不熟悉的胖友,请 Google 下进行了解了解。
  • 第 115 至 117 行:连接成功,标记 reconnect = false ,这样,下次执行 #run() 方法不会重连。而后,调用 #notify(GRPCChannelStatus.CONNECTED) 方法,通知监听器连接成功。
  • 第 118 至 121 行:连接成功,不标记 reconnect ,这样,下次执行 #run() 方法会继续重连。而后,调用 #notify(GRPCChannelStatus.DISCONNECT) 方法,通知监听器连接处于断开状态。
  • 第 124 至 126 行:连接异常,不标记 reconnect ,这样,下次执行 #run() 方法会继续重连。而后,调用 #notify(GRPCChannelStatus.DISCONNECT) 方法,通知监听器连接处于断开状态。

实际使用中,Channel 可能因为各种原因断开,那么 GRPCChannelManager 是怎么检测的呢?在使用 Channel 的其他服务,当使用 Channel 时发生异常,调用 #reportError(Throwable) 方法,判断是否为网络异常( #isNetworkError(Throwable) ) 。若是,标记 reconnect = true ,等待后台进行重连。

3. GRPCChannelListener

org.skywalking.apm.agent.core.remote.GRPCChannelListener ,gRPC Channel 的监听器接口,定义了 #statusChanged(GRPCChannelStatus) ,通知 gRPC Channel 状态变更。

GRPCChannelListener 实现类如下图,后续文章会详细解析。