1. 概述
本文主要分享 SkyWalking JVM 指标的收集与存储。大体流程如下:
- Agent 每秒定时收集 JVM 指标到缓冲队列。
- Agent 每秒定时将缓冲队列的 JVM 指标发送到 Collector 。
- Collector 接收到 JVM 指标,异步批量存储到存储器( 例如,ES )。
目前JVM 指标包括四个维度:
- CPU
- Memory
- MemoryPool
- GC
SkyWalking UI 界面如下:
2. Agent 收集 JVM 指标
2.1 JVMService
org.skywalking.apm.agent.core.jvm.JVMService
,实现 BootService 、Runnable 接口,JVM 指标服务,负责将 JVM 指标收集并发送给 Collector 。代码如下:
- queue 属性,收集指标队列。
- collectMetricFuture 属性,收集指标定时任务。
- sendMetricFuture 属性,发送指标定时任务。
- sender 属性,发送器。
#beforeBoot() 方法,初始化 queue ,sender 属性,并将自己添加到 GRPCChannelManager ,从而监听与 Collector 的连接状态。
boot
方法,创建两个定时任务:
- 第 88 至 90 行:创建收集指标定时任务,0 秒延迟,1 秒间隔,调用 JVMService#run() 方法。
- 第 92 至 94 行:创建发送指标定时任务,0 秒延迟,1 秒间隔,调用 Sender#run() 方法。
2.1.1 定时收集
JVMService#run() 方法,代码如下:
- 第 110 至 111 行:应用实例注册后,才收集 JVM 指标。
-
第 116 至 122 行:创建 JVMMetric 对象。
- 第 118 行:调用 CPUProvider#getCpuMetric() 方法,获得 GC 指标。
- 第 119 行:调用 MemoryProvider#getMemoryMetricList() 方法,获得 Memory 指标。
- 第 120 行:调用 MemoryPoolProvider#getMemoryPoolMetricList() 方法,获得 MemoryPool 指标。
- 第 121 行:调用 GCProvider#getGCList() 方法,获得 GC 指标。
- 第 125 至 128 行:提交 JVMMetric 对象到收集指标队列。
2.1.2 定时发送
JVMService.Sender
,实现 Runnable 、GRPCChannelListener 接口,JVM 指标发送器。代码如下:
-
status 属性,连接状态。
-
stub 属性,阻塞 Stub 。
-
#statusChanged(GRPCChannelStatus) 方法,当连接成功时,创建阻塞 Stub 。
-
#run() 方法,代码如下:
-
第 148 至 151 行:应用实例注册后,并且连接中,才发送 JVM 指标。
- 第 153 至 155 行:调用 #drainTo(Collection) 方法,从队列移除所有 JVMMetric 到 buffer 数组。
-
第 157 至 162 行:使用 Stub ,批量发送到 Collector 。
2.2 CPU
org.skywalking.apm.agent.core.jvm.cpu.CPUProvider ,CPU 提供者,提供 #getCpuMetric() 方法,采集 CPU 指标,如下图所示:
- usagePercent :JVM 进程占用 CPU 百分比。
- 第 51 行:调用 CPUMetricAccessor#getCPUMetric() 方法,获得 CPU 指标。
在CPUProvider 构造方法 中,初始化 cpuMetricAccessor
数量,代码如下:
- 第 37 行:调用 ProcessorUtil#getNumberOfProcessors() 方法,获得 CPU 数量。
- 第 40 至 42 行:创建 SunCpuAccessor 对象。
- 第 44 至 46 行:发生异常,说明不支持,创建 NoSupportedCPUAccessor 对象。
- 为什么需要使用 ClassLoader#loadClass(className) 方法呢?因为 SkyWalking Agent 是通过 JavaAgent 机制,实际未引入,所以通过该方式加载类。
2.2.1 CPUMetricAccessor
org.skywalking.apm.agent.core.jvm.cpu.CPUMetricAccessor
,CPU 指标访问器抽象类。代码如下:
- lastCPUTimeNs 属性,获得进程占用 CPU 时长,单位:纳秒。
- lastSampleTimeNs 属性,最后采样时间,单位:纳秒。
- cpuCoreNum 属性,CPU 数量。
- #init() 方法,初始化 lastCPUTimeNs 、lastSampleTimeNs 。
- #getCpuTime() 抽象方法,获得 CPU 占用时间,由子类完成。
- #getCPUMetric() 方法,获得 CPU 指标。放在和 SunCpuAccessor 一起分享。这里我先记得,JVM 进程占用 CPU 率的计算公式:进程 CPU 占用总时间 / ( 进程启动总时间 * CPU 数量) 。
CPUMetricAccessor 有两个子类,实际上文我们已经看到它的创建:
- SunCpuAccessor ,基于 SUN 提供的方法,获取 CPU 指标访问器。
- NoSupportedCPUAccessor ,不支持的 CPU 指标访问器。因此,使用该类的情况下,获取不到具体的进程 CPU 占用率。
SunCpuAccessor 构造方法 ,代码如下:
- 第 32 行:设置 CPU 数量。
- 第 33 行:获得 OperatingSystemMXBean 对象。通过该对象,在 #getCpuTime() 实现方法,调用 OperatingSystemMXBean#getProcessCpuTime()) 方法,获得 JVM 进程占用 CPU 总时长。
long getProcessCpuTime()
Returns the CPU time used by the process on which the Java virtual machine is running in nanoseconds. The returned value is of nanoseconds precision but not necessarily nanoseconds accuracy. This method returns -1 if the the platform does not support this operation.
Returns:
the CPU time used by the process in nanoseconds, or -1 if this operation is not supported.
- 第 34 行:调用 #init() 方法,初始化 lastCPUTimeNs 、lastSampleTimeNs 。
#getCPUMetric() 方法,获得 CPU 指标。代码如下:
- 第 58 至 59 行:获得 JVM 进程占用 CPU 总时长。
- 第 64 行:now - lastSampleTimeNs ,获得 JVM 进程启动总时长。
- 这里为什么相减呢?因为 CPUMetricAccessor 不是在 JVM 启动时就进行计算,通过相减,解决偏差。
- 第 63 至 64 行:计算 JVM 进程占用 CPU 率。
2.3 Memory
org.skywalking.apm.agent.core.jvm.memory.MemoryProvider ,Memory 提供者,提供 #getMemoryMetricList() 方法,采集 Memory 指标,如下图所示:
-
推荐阅读文章:
-
isHeap :是否堆内内存。
-
init :初始化的内存数量。
-
max :最大的内存数量。
-
used :已使用的内存数量。
-
committed :可以使用的内存数量。
-
第 44 至 51 行:使用 MemoryMXBean 对象,获得堆内( Heap )内存。
-
第 54 至 61 行:使用 MemoryMXBean 对象,获得非堆内( None-Heap )内存。
2.4 MemoryPool
org.skywalking.apm.agent.core.jvm.memorypool.MemoryPoolProvider ,MemoryPool 提供者,提供 #getMemoryPoolMetricList() 方法,采集 MemoryPool 指标数组,如下图:
-
推荐阅读文章:
-
type :内存区域类型。MemoryPool 和 Memory 的差别在于拆分的维度不同,如下图:
- init :初始化的内存数量。
- max :最大的内存数量。
- used :已使用的内存数量。
- committed :可以使用的内存数量。
MemoryPoolProvider 构造方法,代码如下:
- 第 38 行:获得 MemoryPoolMXBean 数组。每个 MemoryPoolMXBean 对象,代表上面的一个区域类型。
- 第 39 至 46 行:循环 MemoryPoolMXBean 数组,调用 #findByBeanName(name) 方法,找到对应的 GC 算法,创建对应的 MemoryPoolMetricAccessor 对象。
- 第 47 至 49 行:未找到匹配的 GC 算法,创建 UnknownMemoryPool 对象。
2.4.1 MemoryPoolMetricAccessor
org.skywalking.apm.agent.core.jvm.memorypool.MemoryPoolMetricAccessor
,MemoryPool 指标访问器接口。
- 定义了 #getMemoryPoolMetricList() 接口,获得 MemoryPool 指标数组。
MemoryPoolMetricAccessor 子类如下图:
- UnknownMemoryPool ,未知的 MemoryPool 指标访问器实现类。每次 #getMemoryPoolMetricList() 方法,返回 MemoryPool 指标数组,但是每个指标元素是无具体数据的。
2.4.2 MemoryPoolModule
org.skywalking.apm.agent.core.jvm.memorypool.MemoryPoolModule
,实现 MemoryPoolMetricAccessor 接口,MemoryPool 指标访问器抽象类。不同 GC 算法之间,内存区域命名不同,通过如下六个方法抽象,分别对应不同内存区域,形成映射关系,屏蔽差异:
-
胖友可以看看 MemoryPoolModule 子类的实现:
#getMemoryPoolMetricList() 实现方法,代码如下:
- 第 44 行:循环每个内存区域,收集每个 MemoryPool 指标。
- 第 47 至 62 行:调用 #contains(possibleNames, name) 方法,逐个内存区域名字判断,获得对应的内存区域类型。
- 第 65 至 71 行:创建 MemoryUsage 对象,并添加到结果数组。
2.5 GC
整体实现类似 「2.4 MemoryPool」 。
org.skywalking.apm.agent.core.jvm.memorypool.GCProvider ,GC 提供者,提供 #getGCList() 方法,采集 GC 指标数组,如下图:
- phrase :生代类型,包括新生代、老生代。
- count :总回收次数。
- time :总回收占用时间。
GCProvider 构造方法,代码如下:
- 第 38 行:获得 GarbageCollectorMXBean 数组。
- 第 39 至 46 行:循环 MemoryPoolMXBean 数组,调用 #findByBeanName(name) 方法,找到对应的 GC 算法,创建对应的 GCMetricAccessor 对象。
- 第 47 至 49 行:未找到匹配的 GC 算法,创建 UnknowGC 对象。
2.5.1 GCMetricAccessor
org.skywalking.apm.agent.core.jvm.gc.GCMetricAccessor
,GC 指标访问器接口。
- 定义了 #getGCList() 接口,获得 GC 指标数组。
GCMetricAccessor 子类如下图:
- UnknowGC ,未知的 GC 指标访问器实现类。每次 #getGCList() 方法,返回 GC 指标数组,但是每个指标元素是无具体数据的。
2.5.2 GCModule
org.skywalking.apm.agent.core.jvm.gc.GCModule
,实现 GCMetricAccessor 接口,GC 指标访问器抽象类。不同 GC 算法之间,生代命名不同,通过如下两个方法抽象,分别对应两个生代,形成映射关系,屏蔽差异:
-
胖友可以看看 GCModule 子类的实现:
#getGCList() 实现方法,代码如下:
- 第 44 行:循环 GarbageCollectorMXBean 数组,收集每个 GC 指标。
- 第 47 至 62 行:获得生代类型。
- 第 65 至 71 行:创建 GC 对象,并添加到结果数组。
3. Collector 存储 JVM 指标
3.1 JVMMetricsServiceHandler
我们先来看看 API 的定义,JVMMetricsService.proto
,如下图所示:
JVMMetricsServiceHandler#collect(JVMMetrics, StreamObserver<Downstream>
), 代码如下:
- 第 60 行:循环接收到的 JVMMetric 数组。
- 第 62 行:调用 #sendToInstanceHeartBeatService(...) 方法,发送心跳,记录应用实例的最后心跳时间。因为目前 SkyWaling 主要用于 JVM 平台,通过每秒的 JVM 指标收集的同时,记录应用实例的最后心跳时间。
- 第 62 行:调用 #sendToCpuMetricService(...) 方法,处理 CPU 数据。
- 第 64 行:调用 #sendToMemoryMetricService(...) 方法,处理 Memory 数据。
- 第 66 行:调用 #sendToMemoryPoolMetricService(...) 方法,处理 Memory Pool 数据。
- 第 68 行:调用 #sendToGCMetricService(...) 方法,处理 GC 数据。
- 第 73 至 74 行:全部处理完成,返回成功。
上述的 #sendToXXX() 方法,内部每个对应调用一个如下图 Service 提供的方法:
- 每个 Service 的实现,对应一个数据实体和一个 Graph 对象,通过流式处理,最终存储到存储器( 例如 ES ) ,流程如下图:
- 具体的实现代码,我们放在下面的数据实体一起分享。
3.2 CPU
org.skywalking.apm.collector.storage.table.jvm.CpuMetric
,CPU 指标。
-
org.skywalking.apm.collector.storage.table.jvm.CpuMetricTable , CpuMetric 表( cpu_metric )。字段如下:
-
instance_id
:应用实例编号。 -
usage_percent
:CPU 占用率。 -
time_bucket
:时间。 -
org.skywalking.apm.collector.storage.es.dao.CpuMetricEsPersistenceDAO ,CpuMetric 的 EsDAO 。
-
在 ES 存储例子如下图:
- org.skywalking.apm.collector.agent.stream.worker.jvm.CpuMetricService ,CPU 指标服务,调用 CPUMetric 对应的 Graph
<CPUMetric>
对象,流式处理,最终 CPUMetric 保存到存储器。 - org.skywalking.apm.collector.agent.stream.worker.jvm.CpuMetricPersistenceWorker , CPU 指标批量存储 Worker 。
3.3 Memory
org.skywalking.apm.collector.storage.table.jvm.MemoryMetric
,Memory 指标。
-
org.skywalking.apm.collector.storage.table.jvm.MemoryMetricTable , MemoryMetric 表( memory_metric )。字段如下:
-
instance_id
:应用实例编号。 -
isHeap
:是否堆内内存。 -
init
:初始化的内存数量。 -
max
:最大的内存数量。 -
used
:已使用的内存数量。 -
committed
:可以使用的内存数量。 -
time_bucket
:时间。 -
org.skywalking.apm.collector.storage.es.dao.MemoryMetricEsPersistenceDAO ,MemoryMetric 的 EsDAO 。
-
在 ES 存储例子如下图:
- org.skywalking.apm.collector.agent.stream.worker.jvm.MemoryMetricService ,Memory 指标服务,调用 MemoryMetric 对应的 Graph
<MemoryMetric>
对象,流式处理,最终 MemoryMetric 保存到存储器。 - org.skywalking.apm.collector.agent.stream.worker.jvm.MemoryMetricPersistenceWorker , Memory 指标批量存储 Worker 。
3.4 MemoryPool
org.skywalking.apm.collector.storage.table.jvm.MemoryPoolMetric
,MemoryPool 指标。
-
org.skywalking.apm.collector.storage.table.jvm.MemoryPoolMetricTable , MemoryPool 表( memory_pool_metric )。字段如下:
-
instance_id
:应用实例编号。 -
pool_type
:内存区域类型。 -
init
:初始化的内存数量。 -
max
:最大的内存数量。 -
used
:已使用的内存数量。 -
committed
:可以使用的内存数量。 -
time_bucket
:时间。 -
org.skywalking.apm.collector.storage.es.dao.MemoryPoolMetricEsPersistenceDAO ,MemoryPoolMetric 的 EsDAO 。
-
在 ES 存储例子如下图:
- org.skywalking.apm.collector.agent.stream.worker.jvm.MemoryMetricService ,MemoryPoolMetric 指标服务,调用 MemoryPoolMetric 对应的 Graph
<MemoryPoolMetric>
对象,流式处理,最终 MemoryPoolMetric 保存到存储器。 - org.skywalking.apm.collector.agent.stream.worker.jvm.MemoryMetricPersistenceWorker , MemoryPool 指标批量存储 Worker 。
3.5 GC
org.skywalking.apm.collector.storage.table.jvm.GCMetric
,GC 指标。
-
org.skywalking.apm.collector.storage.table.jvm.GCMetricTable , GCMetric 表( gc_metric )。字段如下:
-
instance_id
:应用实例编号。 -
phrase
:生代类型,包括新生代、老生代。 -
count
:总次数。 -
time
:总时间。 -
time_bucket
:时间。 -
org.skywalking.apm.collector.storage.es.dao.GCMetricEsPersistenceDAO ,GCMetric 的 EsDAO 。
-
在 ES 存储例子如下图:
- org.skywalking.apm.collector.agent.stream.worker.jvm.GCMetricService ,GCMetric 指标服务,调用 GCMetric 对应的 Graph
<GCMetric>
对象,流式处理,最终 GCMetric 保存到存储器。 - org.skywalking.apm.collector.agent.stream.worker.jvm.MemoryMetricPersistenceWorker , MemoryPool 指标批量存储 Worker 。
4. 心跳
Collector 在接收到 GC 指标上传后,调用 JVMMetricsServiceHandler#sendToInstanceHeartBeatService(...) 方法,发送心跳,记录应用实例的最后心跳时间。因为目前 SkyWaling 主要用于 JVM 平台,通过每秒的 JVM 指标收集的同时,记录应用实例的最后心跳时间。
- org.skywalking.apm.collector.agent.stream.worker.jvm.InstanceHeartBeatService ,应用实例心跳服务,调用 Instance 对应的 Graph
<Instance>
对象,流式处理,最终更新 Instance 的最后心跳时间( heartbeat_time )到存储器。