用户您好!请先登录!

Skywalking 源码分析 – Agent采集跟踪数据

Skywalking 源码分析 – Agent采集跟踪数据

1. 简介

本文源码解析使用的版本是 skywalking 7.0 , 不同版本实现上可能由一定差异,但是思想上大致相同

上篇文章介绍了skywalking-agent的整体架构以及插件的加载原理。

但是仅仅知道了他如何去加载插件,那至于在他使用agent去修改业务代码后如何去收集trace数据 那么请继续往下看

2. trace数据结构

在看源码之前,我们先来了解一下 在skywalkingtrace数据 是以什么样的数据结构去保存的,这里为了更清晰展示,这里选择从前端查看trace数据的结构

首先先来一个最简单的例子
在这里插入图片描述其实就是一个

请求,访问的三次数据库,对应方法大概如下

请求A(){
    访问数据库1()
    访问数据库2()
    访问数据库3()
}

然后我们来看看要显示这样一条 调用链 需要多少数据,这里把断点打在 trace-detail-chart-tree.vue 文件的 changeTree()方法上面
在这里插入图片描述

然后我们来查看 this.segmentId 中的数据
在这里插入图片描述

这里需要关注的数据我已经标注出来,但名字也大概能猜出这几个参数是干什么的,children 这个属性是用JS算出来挂载上去的对象,在实际的skywalking中并不是这么存储的,那么他是如何计算出children是谁,这里就要去关注一下那几个ID

这里为了结构更清晰 我精简了一下
在这里插入图片描述

从上图可以看出 traceId / segmentId 都是相同的,不同的只是 spanIdparentSpanId 这里 可以看出 childrenparentSpanId 属性指向了 父节点的 spanId属性。

嗯 这里可以看出 spanId 和 parentSpanId 的作用,就上面这个 例子来说可以认为 每一个记录点就是一个 span 他的唯一标识是 spanId , 然后父子关系通过parentSpanId 去关联。

然而真的是这样的吗?

我们看下面一个例子

在这里插入图片描述

这个调用链比较复杂 和上面最大的不同在于 SwCallableWapper 这是我自己封装的跨线程的插件(skywalking线程池插件,解决lamdba使用问题),也就是说 我这里的调用链上是跨了线程,同时 蓝色的点是远程调用了另外一个服务,也就是说我这边还跨了进程,
如果转成对应的方法就是

链路方法(){
    多线程异步方法(){
        访问数据库()
        远程调用服务() --> 在远程服务这边执行了数据库
    }
    ...后面同样的 异步方法 调用了 3次 
}

那我们来看看他有什么不同

下面我只截取了 从链路入口到 SwCallableWapper 也就是跨线程 的一段数据
在这里插入图片描述

可以看出 segmentId 不同了,但是 traceId 父子还是相同的 , 同时 spanId 2个都是从 0 开始的,而parentSpandId 指向都是 -1 ,那么最开始那个例子的 使用spanId 来关联 父子 关系的逻辑失效了,同时为了标识出 children 的父亲到底是谁,新增加了一个属性 refs,这个属性里关联上了 父节点的所有 ID 信息。 同样跨进程 的远程调用也是一样的。

其实可以大概看出来 spanId 就像一个 链条一样把 一条链路 给串起来,但是如果是 跨线程/进程 又会创建出一个新的链条,然后用 refs 把这几段链条给接连接起来

那么最后调用实际上的链路结构是这样的

在这里插入图片描述

相同颜色的小球可以任何 是在同一个线程中 , 在同一个线程中 的 segmentId 都是相同的,并且在同一个线程中 , 使用 spanId以及parentSpanId去连接链路,线程、进程中不同的链路使用refs去连接,同时多个 线程链路 组成一个 trace,他们的 traceID 都是相同的

至于为什么 不同线程的链路 需要用refs连接而不是 spanId一直传递下去的方式连接,后面会给出答案

下面就总结一下 上面出现的 几个 ID 都是什么含义

  • spanId : 同一个线程中唯一, 从0始,按照调用链路从0递增
  • parentSpanId : 在同一个线程的链路中,用来连接 span
  • segmentId : 同一个 线程链路中,这个值都是相同的,不同线程链路中 这个值不同
  • traceId : 在一个 链路 中 traceId 唯一

3. trace 数据采集

上面讲了这么多,那么agent 到底怎么去采集数据嗯, 接下来将会已 spring-mvc-plugin 插件为例子 来讲解

以下 源码来源于 agent 插件 mvc-annotation-commons,这个插件是官方自带的

这个插件会去 代理 所有打了 @requestMapping 注解的方法,让其在进入对应前以及方法结束后做一些事件。

在类 AbstractMethodInterceptor#beforeMethod 方法里可以看到 当执行 @requestMapping 标注的的方法前将会做些什么 , 下面的代码是我删除过一些 代码,一遍能更清晰体现流程

@Override
   public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
       MethodInterceptResult result) throws Throwable {
       String operationName;
         //在之前提到过,在agent修改字节码的时候,会在对应的业务类里去 写入一个属性,然后让业务类实现接口 EnhancedInstance ,从此能有能力去获取这个属性
           EnhanceRequireObjectCache pathMappingCache = (EnhanceRequireObjectCache) objInst.getSkyWalkingDynamicField();
           //pathMappingCache 这个属性里存放是的是 method -> url 的数据,如果没有对应的缓存,就重新获取一下,然后存入缓存里
           String requestURL = pathMappingCache.findPathMapping(method);
           if (requestURL == null) {
               requestURL = getRequestURL(method);
               pathMappingCache.addPathMapping(method, requestURL);
               requestURL = getAcceptedMethodTypes(method) + pathMappingCache.findPathMapping(method);
           }
           operationName = requestURL;
       

       //获取了当前请求的 request
       HttpServletRequest request = (HttpServletRequest) ContextManager.getRuntimeContext()
                                                                       .get(REQUEST_KEY_IN_RUNTIME_CONTEXT);
       if (request != null) {
           // 获取 stackDepth, 用来记录本次调用链的深度
           //比如 如果这个请求 第一站进入了这个 interceptor 那么他的深度就是0/null , 如果在此之前有其他的 interceptor提前执行了,那么深度 +1, 例如: tomcat 的插件
           StackDepth stackDepth = (StackDepth) ContextManager.getRuntimeContext().get(CONTROLLER_METHOD_STACK_DEPTH);
           //等于null 说明是入口 span
           if (stackDepth == null) {
               //获取了跨进程的 context
               ContextCarrier contextCarrier = new ContextCarrier();
               CarrierItem next = contextCarrier.items();

               while (next.hasNext()) {
                   next = next.next();
                   //从header 里获取了远程调用时候传送过来的数据,塞入contextCarrier
                   //setHeadValue 的时候会自动反序列化
                   next.setHeadValue(request.getHeader(next.getHeadKey()));
               }
               //创建一个入口 span
               //下面都是在给这个span去塞入一些信息
               AbstractSpan span = ContextManager.createEntrySpan(operationName, contextCarrier);
               Tags.URL.set(span, request.getRequestURL().toString());
               Tags.HTTP.METHOD.set(span, request.getMethod());
               span.setComponent(ComponentsDefine.SPRING_MVC_ANNOTATION);
               SpanLayer.asHttp(span);
               
               //如果这个是入口类,那么创建一个新的 stackDepth
               stackDepth = new StackDepth();
               ContextManager.getRuntimeContext().put(CONTROLLER_METHOD_STACK_DEPTH, stackDepth);
           } else {
               AbstractSpan span = ContextManager.createLocalSpan(buildOperationName(objInst, method));
               span.setComponent(ComponentsDefine.SPRING_MVC_ANNOTATION);
           }
           //深度 +1,在afterMethod 的时候会去 -1
           stackDepth.increment();
       }
   }
   

上面代码其实要关注的是 span 的创建上面,从上面代码可以看出 如果 进入这个方法之前没有创建过任何一个 span那么就将会使用一个 ContextManager.createEntrySpan() 去创建span,如果在之前就已经创建过 span(比如如果使用了 tomcat 的插件,那么 tomcat 才将是 这次调用链的第一站),那么使用
ContextManager.createLocalSpan() 去创建 span,那么看名字是否猜到还有一个 ContextManager.createExitSpan() , 从这这三者 从名字上看 也就是标识 了 入口 -> 本地 -> 出口 上面,这也 暗示了 如果我们写插件,也需要 这么去定义 span

  • ContextManager.createEntrySpan() : 如果在一个进程内 这是第一个生成的 span 那么使用 createEntrySpan() 方法去创建,他除了会生成 span还会帮你把之前短接的调用链给连接起来(比如 远程调用 A – > B 在 B服务 调用 createEntrySpan 才能和A 关联起来)
  • ContextManager.createLocalSpan() : 本地 span, 最普通的创建 span 的方法
  • ContextManager.createExitSpan() : 如果在一个进程内 发现这已经是这个进程最后一个调用 span, 使用createExitSpan 去创建对应的 span , 比如 使用 okhttp 去调用别的服务,那么在 okhttp 发送之前就已经是最后一个 span了,方法 createExitSpan 除了 会帮你创建一个 span,还会帮你把 一些 id 信息带给 被调用方(okhttp 是把ID信息给序列化放在 header里),被调用方使用 createEntrySpan() 就能把整个请求给 连接起来,这里需要留意的是 这里带给的 id 其实就是上问提到的 traceId/segmentId/spanId ,这三者组成了一个 完成的 refs 属性,刚好对应上上文所讲,如果看不懂可以再回过去看一遍

既然有前置代理方法,那就肯定有 后置 代理方法,在后置代理方法上面,聪明的小伙伴都能想到会把方法执行的结果,异常 等信息存入 span, 那么如果我没有对应的 放回数据 , 也不想记录异常,那么是否可以不写后置处理方法嗯? 答案是 不可以

因为可以 把在同一线程中的 调用链路 看做是一个  , 执行createSpan() 是入栈的过程,那么执行ContextManager.stopSpan() 就是出栈的过程

在这里插入图片描述

 

入上图所示, 在这个情况下 如果我直接 ContextManager.stopSpan() 那么 停止的就是远程调用B创建出来的 span,所以如果自定义插件,一定要确保自己的 span成功出栈

4. trace 数据采集

上面讲了如何去创建 span 那么 这些数据会如何发送到 skywalking ?

首先,要发送这些span 数据不能够阻塞 我们的业务线程,而然后有一定的数据量,需要批量发送等功能,所以这边 skywalking 使用了 生产-消费 的模型

4.1 生产者

上面提到,每当一个方法结束后 都需要调用一下 ContextManager.stopSpan() 方法,没错这个方法就是 将 span塞入队列的 方法,但是 并不是 每次调用 ContextManager.stopSpan() 都会把出栈的 span扔入队列的

在 TracingContext#finish() 中可以看到
在这里插入图片描述

这里其实他就是去检查了一下 上面的 span栈空了 才会执行后面的方法 . 换句话说就是,需要等同一个线程里面所有的span 都出栈了,才会去把这整个 segment 给放入 消费队列中。

同样在 TracingContext#finish() 的方法中可以看到他如何塞入消费队列的

 //这里会去通知所有注册了的 listener,本TraceSegment成功结束了
 // 这里会有一个叫做 TraceSegmentServiceClient 的listener 收到这个事件后,会把 TraceSegment 放入队列 等待消费
 TracingContext.ListenerManager.notifyFinish(finishedSegment);
4.2 消费者

agent 启动的时候会去 加载 模块apm-agent-core中一个 叫做 TraceSegmentServiceClient( 至于他是如何加载的 下篇文章会讲,这里就不赘述了) 的类
在这个类初始化的时候,执行了boot() 方法。

@Override
    public void boot() {
        lastLogTime = System.currentTimeMillis();
        segmentUplinkedCounter = 0;
        segmentAbandonedCounter = 0;
        //这里会去创建 消费队列 可以使用 buffer.channel_size 来指定 消费队列的长度,以及 buffer.buffer_size 来指定消费队列的大小
        carrier = new DataCarrier<>(CHANNEL_SIZE, BUFFER_SIZE);
        carrier.setBufferStrategy(BufferStrategy.IF_POSSIBLE);
        //定义了消费者,Trace 的数据将会由这个消费者去发送给 skywalking,这里传了 this 消费者就是自己
        //参数2 定义了有几个消费线程,每个线程会持有不同的队列
        carrier.consume(this, 1);
    }
    

从上面可以看出 他可以配置 多个 队列以及多个线程,但实际上他写死了 使用 1个线程去发送数据,其实我觉得多个队列也没多少意义

但是如果有多个 多线的话 每个线程有自己的所属队列 去发送数据
在这里插入图片描述

 

消费者的 远程调用 直接逻辑 在 TraceSegmentServiceClient#consume(List<TraceSegment> data)

 @Override
    public void consume(List<TraceSegment> data) {
        if (CONNECTED.equals(status)) {
            final GRPCStreamServiceStatus status = new GRPCStreamServiceStatus(false);
            StreamObserver<UpstreamSegment> upstreamSegmentStreamObserver = serviceStub.withDeadlineAfter(
                Config.Collector.GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS
            ).collect(new StreamObserver<Commands>() {..GRPC 的一些回调..});

            for (TraceSegment segment : data) {
                    //转换一下 segment 成 proto 数据
                    UpstreamSegment upstreamSegment = segment.transform();
                    //GRPC 发送
                    upstreamSegmentStreamObserver.onNext(upstreamSegment);
            }
            //告诉 GRPC 流已经完全写入进去了,等待他全部把数据发送后会回调上面的 StreamObserver定义的回调方法
            upstreamSegmentStreamObserver.onCompleted();

            status.wait4Finish();
            segmentUplinkedCounter += data.size();
        } else {
            segmentAbandonedCounter += data.size();
        }
    }

直接用的 GRPC 批量发送了所有的 span 数据

行走的code
行走的code

要发表评论,您必须先登录