04、SkyWalking 源码分析 Agent 发送 Trace 数据

1. 概述

分布式链路追踪系统,链路的追踪大体流程如下:

1、 Agent收集Trace数据;
2、 Agent发送Trace数据给Collector
3、 Collector接收Trace数据;
4、 Collector存储Trace数据到存储器,例如,数据库;

本文主要分享【第二部分】 SkyWalking Agent 发送 Trace 数据

考虑到减少外部组件的依赖,Agent 收集到 Trace 数据后,不是写入外部消息队列( 例如,Kafka )或者日志文件,而是 Agent 写入内存消息队列后台线程异步】发送给 Collector 。

本文涉及的类非常少,如下图所示:

 

2. TraceSegmentServiceClient

org.skywalking.apm.agent.core.remote.TraceSegmentServiceClient ,TraceSegment 发送服务客户端。它是一个服务,也是一个客户端,负责将 TraceSegment 异步发送到 Collector 。

我们先来看看 TraceSegmentServiceClient 的属性

  • TIMEOUT 静态属性,发送等待超时时长,单位:毫秒。
  • lastLogTime 属性,最后打印日志时间。该属性主要用于开发调试。
  • segmentUplinkedCounter 属性,TraceSegment 发送数量。
  • segmentAbandonedCounter 属性,TraceSegment 被丢弃数量。在 Agent 未连接上 Collector 时,产生的 TraceSegment 将被丢弃。
  • carrier 属性,内存队列。在 《SkyWalking 源码分析 —— DataCarrier 异步处理库》 有对 DataCarrier 的详细解析。
  • serviceStub 属性,非阻塞 Stub 。
  • status 属性,连接状态。

下面,我们来介绍 TraceSegmentServiceClient 实现的接口以及对应的方法。

2.1 实现 BootService 接口

#beforeBoot() 方法,代码如下:

  • 第 86 行:调用 GRPCChannelManager#addChannelListener(this) 方法,将自己添加到 GRPCChannelManager 中,作为一个监听器,从而调用 #statusChanged(GRPCChannelStatus) 方法,实现对连接状态( status )的监听处理。

#boot() 方法,代码如下:

  • 第 95 至 97 行:创建 DataCarrier 对象,作为内存队列,并设置自己作为消费者,从而调用 #consume(List<TraceSegment> ) 方法,实现异步发送 TraceSegment 到 Collector 。

#afterBoot() 方法,代码如下:

  • 第 102 行:调用 TracingContext.ListenerManager#add(this) 方法,将自己添加到 ListenerManager 中,作为一个监听器,从而调用 #afterFinished(TraceSegment) 方法,实现收集到新的 TraceSegment ,添加到内存队列。

#shutdown() 方法,代码如下:

  • 第 107 行:调用 DataCarrier#shutdownConsumers() 方法,停止消费。

2.2 实现 GRPCChannelListener 接口

#statusChanged(GRPCChannelStatus) 方法,代码如下:

  • 第 211 至 214 行:连接成功,创建 Stub 对象。
  • 第 215 行:记录连接状态。

2.3 实现 TracingContextListener 接口

#afterFinished(TraceSegment) 方法,代码如下:

  • 第 197 至 199 行:当 TraceSegment.ignore = true 时,忽略该 TraceSegment 。
  • 第 201 行:提交 TraceSegment 到内存队列。

2.4 实现 IConsumer 接口

#consume(List<TraceSegment>) 方法,代码如下:

ps:目前 DataCarrier 最长每 20 秒消费一次。

#onError(List<TraceSegment>, Throwable) 方法,当消费发生异常时,打印日志。