用户您好!请先登录!

Skywalking 源码分析 – Agent启动分析

Skywalking 源码分析 – Agent启动分析

1. 简介

前2篇介绍了 agent 如何去修改我们的业务代码,以及如何去收集、发送 trace 数据,但是agent的作用不仅如此,那么他还对我们的代码做了一些什么事情,会对我们的程序造成什么样的性能影响,本文将为你揭晓。

2 . BootService 接口

方法 SkyWalkingAgent#premain 之前讲解过这是 agent 的入口方法,这个方法的 前半部分都是在 加载配置文件、加载插件以及拦截了类加载器以便后面去修改字节码,在此之外agent还有最后一个步骤,就是去加载所有BootService接口的实现类。

在这里插入图片描述
在这里插入图片描述

 

如上图所示,ServiceManager.INSTANCE.boot() 就是去加载所有的BootService 实现类,以及启动这些实现类,这个方法也非常的简单

   public void boot() {
        //去加载 BootService接口的实现类
        bootedServices = loadAllServices();
        //调用 BootService接口 的prepare 方法
        prepare();
        //调用 BootService接口 boot 方法
        startup();
        //调用 BootService接口 onComplete 方法
        onComplete();
    }

比起如何去加载BootService实现类 这种繁琐的无价值的操作,我更关心的 BootService 的实现类有什么,他们都干了什么事,这里可以看到类的实现有如下所示

在这里插入图片描述

 

下面会挑选几个重要的实现类做重点讲解。

2.1 GRPC 连接管理 : GRPCChannelManager

skywalking 的agent和 采集器用的 GRPC 作为通讯手段, 那么我们的agent是如何知道我们的skywalking采集器 所在服务器的IP和端口是什么,又或者当 服务器失联了,又如何重新连接,答案都在GRPCChannelManager

上面讲过,在agent启动的时候会自动调用 BootService接口prepare,boot,onComplete 三个方法,从名字可以看出,大概是干什么的,这里就不赘述了,我们直接找到 GRPCChannelManager#boot() 方法

@Override
    public void boot() {
        //解析用户配置的 skywalking.collector.backend_service 参数,如果写了多个IP地址用逗号分隔
        grpcServers = Arrays.asList(Config.Collector.BACKEND_SERVICE.split(","));
        //这里开了一个 定时任务,去连接 skywalking ,这里开了定时器去重连接 默认30s 连接一次
        connectCheckFuture = Executors.newSingleThreadScheduledExecutor(new DefaultNamedThreadFactory("GRPCChannelManager"))
                .scheduleAtFixedRate(new RunnableWithExceptionProtection(this,t -> logger.error("unexpected exception.", t)),
                0, Config.Collector.GRPC_CHANNEL_CHECK_INTERVAL, TimeUnit.SECONDS);
    }

代码很简单,就是把我们传进去的 服务器的IP地址 转成数组,以及开了一个 默认30S(可以使用配置collector.grpc_channel_check_interval修改)间隔的 定时器,我们来看看这个定时器干什么什么,因为定时器传入的执行对象是this所以,还是这个类,找到他的 run 方法

public void run() {
        if (reconnect) {
            if (grpcServers.size() > 0) {
                String server = "";
                try {
                    //如果是设置了集群,那么服务端可能会有多个 Ip
                    //这里随机在这些服务器里面选出一个倒霉蛋
                    int index = Math.abs(random.nextInt()) % grpcServers.size();
                    //如果这次选到的IP和上次选到的不同就生成 GRPCChannel 对象
                    if (index != selectedIdx) {
                        selectedIdx = index;
                        server = grpcServers.get(index);
                        String[] ipAndPort = server.split(":");
                        if (managedChannel != null) {
                            managedChannel.shutdownNow();
                        }

                        //用上面选出来的 ip地址转成 managedChannel
                        managedChannel = GRPCChannel.newBuilder(ipAndPort[0], Integer.parseInt(ipAndPort[1]))
                                                    .addManagedChannelBuilder(new StandardChannelBuilder())
                                                    .addManagedChannelBuilder(new TLSChannelBuilder())
                                                    .addChannelDecorator(new AgentIDDecorator())
                                                    .addChannelDecorator(new AuthenticationDecorator())
                                                    .build();
                        //这边构建完成就 就当做连接成功了,但是实际上这里并没有真正连接到服务器去检查是否连接
                        //这里改变状态后,所有注册到 GRPCChannelManager 的listener 都会收到 事件变更的事件
                        //其中有一个 Listener ServiceAndEndpointRegisterClient 在收到 status转成CONNECTED 后,他会有一个线程去真正检查服务连接是否通。
                        notify(GRPCChannelStatus.CONNECTED);
                        reconnectCount = 0;
                        reconnect = false;
                     //
                    } else if (managedChannel.isConnected(++reconnectCount > Config.Agent.FORCE_RECONNECTION_PERIOD)) {
                        // Reconnect to the same server is automatically done by GRPC,
                        // therefore we are responsible to check the connectivity and
                        // set the state and notify listeners
                        reconnectCount = 0;
                        notify(GRPCChannelStatus.CONNECTED);
                        reconnect = false;
                    }

                    return;
                } catch (Throwable t) {
                    logger.error(t, "Create channel to {} fail.", server);
                }
            }
        }
    }

代码上我也写了注释,他会随机从用户写的 服务器地址里面挑选一个,然后去生成一个 ManagedChannel 对象,需要注意的是,这里ManagedChannel 创建成功不代表 已经和 skywalking 成功连接 , 然后调用了 notify(GRPCChannelStatus.CONNECTED); 去通知实现了监听接口GRPCChannelListener 的类

这里只要记住,他使用ip/端口生成了ManagedChannel 对象,以及通知了所有的 监控接口 , skywalking已经连接成功(其实可能并没有连接成功),当做完这些所有事情后 把reconnect设置为false,也就是说 下一次定时器再次执行的时候就不会再去生成 ManagedChannel了。

流程如下
在这里插入图片描述

那么看到这里是不是很困惑,他并没有去判断连接到底 能不能真正连接到skywalking ,ManagedChannel 也只是生成一次就结束了,那么要这个定时器有何用?

其实在所有使用到 ManagedChannel 对象的地方,其实也就是发送GRPC的地方,如果发送失败了,都会去调用一个叫做 reportError() 的方法

 public void reportError(Throwable throwable) {
        if (isNetworkError(throwable)) {
            reconnect = true;
            notify(GRPCChannelStatus.DISCONNECT);
        }
    }

这个方法很简单,就是重新设置reconnect = true 这样这个定时器就又能再次去创建 连接对象ManagedChannel 了(这里可能会选择到其他的IP从而复活),然后去通知所有的监听器 ,skywalking失联了。

2.2 服务注册器 : ServiceAndEndpointRegisterClient

假设 agent和 skywalking 第一时间连通了,那么第一件事情是干什么嗯? 不是迫切的去发送 trace 等数据信息,而是要告诉skywalking 我是谁,这样后续发送过去的数据才能有迹可循,所以这就是 ServiceAndEndpointRegisterClient 的功能,把自己给注册进入skywalking

和前面一样还是 进入 ServiceAndEndpointRegisterClient#boot() 方法

 @Override
    public void boot() {
        applicationRegisterFuture = Executors.newSingleThreadScheduledExecutor(
            new DefaultNamedThreadFactory("ServiceAndEndpointRegisterClient")
        ).scheduleAtFixedRate(
            new RunnableWithExceptionProtection(
                this,
                t -> logger.error("unexpected exception.", t)
            ), 0, Config.Collector.APP_AND_SERVICE_REGISTER_CHECK_INTERVAL,
            TimeUnit.SECONDS
        );
    }

还是去开启了一个定时器,执行周期默认是 3S , 执行类还是他自己,所以继续看 ServiceAndEndpointRegisterClient#run() 方法

@Override
    public void run() {

        if (coolDownStartTime > 0) {
            final long coolDownDurationInMillis = TimeUnit.MINUTES.toMillis(Config.Agent.COOL_DOWN_THRESHOLD);
            if (System.currentTimeMillis() - coolDownStartTime < coolDownDurationInMillis) {
                return;
            } else {
            }
        }
        coolDownStartTime = -1;

        boolean shouldTry = true;

        //这个 while 里 用if 分隔了 3个任务,每次进入 while 只会执行其中一个,这里吧里面的任务分成 A B C三个
        //A:去注册应用信息 B:去注册实例信息 C:执行ping
        // 如果是 第一次 状态变为 connect  (A -> B) -> (C) -> (C) -> (C) ...
        // 如果一直都是 connect 状态 那么  (C) -> (C) -> (C) ...
        while (GRPCChannelStatus.CONNECTED.equals(status) && shouldTry) {
            shouldTry = false;
            try {
                if (RemoteDownstreamConfig.Agent.SERVICE_ID == DictionaryUtil.nullValue()) {
                    if (registerBlockingStub != null) {
                        //生成一个只有 30S 生命周期的 客户端
                        RegisterGrpc.RegisterBlockingStub registerBlockingStub =
                                this.registerBlockingStub.withDeadlineAfter(GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS);
                        //发送消息给skywalking 告诉他本大爷的服务名字是什么 也就是 skywalking.agent.service_name 配置的名字
                        ServiceRegisterMapping serviceRegisterMapping = registerBlockingStub.doServiceRegister(...);
                        if (serviceRegisterMapping != null) {
                            // skywalking 把所有注册进去的服务的名称都返回回来了,这里会去遍历如果发现有自己就算注册成功了
                            for (KeyIntValuePair registered : serviceRegisterMapping.getServicesList()) {
                                if (Config.Agent.SERVICE_NAME.equals(registered.getKey())) {
                                    RemoteDownstreamConfig.Agent.SERVICE_ID = registered.getValue();
                                    //这个设置成 true 那么这个 while 将会再执行一次,下一次将会去注册实例信息
                                    shouldTry = true;
                                }
                            }
                        }
                    }
                } else {
                    if (registerBlockingStub != null) {
                        if (RemoteDownstreamConfig.Agent.SERVICE_INSTANCE_ID == DictionaryUtil.nullValue()) {
                            // 把实例名称给注册进去了
                            ServiceInstanceRegisterMapping instanceMapping = registerBlockingStub.withDeadlineAfter(
                                GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS
                            ).doServiceInstanceRegister(...);
                            //同样如果注册成功设置一下值
                            for (KeyIntValuePair serviceInstance : instanceMapping.getServiceInstancesList()) {
                                if (INSTANCE_UUID.equals(serviceInstance.getKey())) {
                                    int serviceInstanceId = serviceInstance.getValue();
                                    if (serviceInstanceId != DictionaryUtil.nullValue()) {
                                        RemoteDownstreamConfig.Agent.SERVICE_INSTANCE_ID = serviceInstanceId;
                                        RemoteDownstreamConfig.Agent.INSTANCE_REGISTERED_TIME = System.currentTimeMillis();
                                    }
                                }
                            }
                        } else {
                            //ping 一下告诉 skywalking 自己还活着,算一个心跳机制
                            final Commands commands = serviceInstancePingStub.withDeadlineAfter(
                                GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS
                            ).doPing(...);

                            //把 GRPC 的句柄给了 NetworkAddressDictionary 以及 EndpointNameDictionary ,都是只有30S的使用时间
                            NetworkAddressDictionary.INSTANCE.syncRemoteDictionary(
                                registerBlockingStub.withDeadlineAfter(GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS));
                            EndpointNameDictionary.INSTANCE.syncRemoteDictionary(
                                registerBlockingStub.withDeadlineAfter(GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS));
                            ServiceManager.INSTANCE.findService(CommandService.class).receiveCommand(commands);
                        }
                    }
                }
            } catch (Throwable t) {
                //这里会把连接状态改成 DISCONNECT
                ServiceManager.INSTANCE.findService(GRPCChannelManager.class).reportError(t);
            }
        }
    }

方法有点长,这边在总结一下,实际上 他做了 3件事 注册服务注册实例心跳

  • 服务 : 使用参数 skywalking.agent.service_name 来指定服务的名称
  • 实例 : 同一个服务,可能会开不同的节点,每个节点就是一个实例

在这里插入图片描述

2.3 指令服务:CommandService

对于上面的服务注册,有的同学可能会有点困惑 ,他使用全局静态变量RemoteDownstreamConfig.Agent.SERVICE_ID 和 RemoteDownstreamConfig.Agent.SERVICE_INSTANCE_ID 是否有值来判断 skywalking 是否成功注册了服务信息,这2个全局静态变量 的值都是在成功注册信息后 设置上去的,但是如果在成功设置 这2个静态变量后,skywalking服务重启了怎么办? 要解决这个问题就有请我们的CommandExecutorService

同样来看他的CommandService#boot() 方法

 private ExecutorService executorService = Executors.newSingleThreadExecutor();
  
 @Override
    public void boot() throws Throwable {
        executorService.submit(new RunnableWithExceptionProtection(this, t -> LOGGER.error(t, "CommandService failed to execute commands")));
    }

同样也是启动了一个 单线程线程池, 然后把自己给搭进去了,继续看他的 run() 方法

@Override
    public void run() {
        final CommandExecutorService commandExecutorService = ServiceManager.INSTANCE.findService(CommandExecutorService.class);

        while (isRunning) {
               BaseCommand command = commands.take();
                if (isCommandExecuted(command)) {
                    continue;
                }
              commandExecutorService.execute(command);
        }
    }

简单的不行,就是阻塞去管道里拿东西,拿到了就去 执行CommandExecutorService#execute 方法,方法如下

@Override
    public void execute(final BaseCommand command) throws CommandExecutionException {
        executorForCommand(command).execute(command);
    }

一个很典型的策略模式,也就是根据 command 的类型是什么去执行对应的执行器

我们来看看他都有什么样的执行器,这里可以从CommandExecutorService#prepare() 方法可以看到,注意的是prepare() 方法也是BootService 的接口方法,在我们老朋友boot()之前执行

 @Override
    public void prepare() throws Throwable {
        commandExecutorMap = new HashMap<String, CommandExecutor>();

        // Register all the supported commands with their executors here
        commandExecutorMap.put(ServiceResetCommand.NAME, new ServiceResetCommandExecutor());

        // Profile task executor
        commandExecutorMap.put(ProfileTaskCommand.NAME, new ProfileTaskCommandExecutor());
    }

从名字可以看到一个叫做 ServiceResetCommandExecutor() 的执行器 ,点进去一看

 /**
     * 如果 oap 给重启了,那么就会执行这个方法,在这个方法里最主要的就是把各种 参数给置空了,回到了初始状态
     * @param command the command that is to be executed
     * @throws CommandExecutionException
     */
    @Override
    public void execute(final BaseCommand command) throws CommandExecutionException {
        LOGGER.warn("Received ServiceResetCommand, a re-register task is scheduled.");

        //这里把 心跳检查都暂时关闭了,因为 oap那边可能正在重启中
        ServiceManager.INSTANCE.findService(ServiceAndEndpointRegisterClient.class).coolDown();

        //置空各种数据
        RemoteDownstreamConfig.Agent.SERVICE_ID = DictionaryUtil.nullValue();
        RemoteDownstreamConfig.Agent.SERVICE_INSTANCE_ID = DictionaryUtil.nullValue();
        RemoteDownstreamConfig.Agent.INSTANCE_REGISTERED_TIME = DictionaryUtil.nullValue();

        NetworkAddressDictionary.INSTANCE.clear();
        EndpointNameDictionary.INSTANCE.clear();
    }

是不是到此为止,上面的疑惑就解开了,在skywalking重启的时候会向我们的agent发送一个重启命令,告诉agent 爷我重启了,快去清除以前的那些数据

如果杠精同学看到这里,可能又会问,skywalking 都重启了,如果GRPC用的是长连接,那么连接句柄早已经失效了,他如何通知agent的,如果GRPC用的是短连接,那更没理由能够通知agent了,其实要解决这个问题很简单,不需要skywalking来主动通知我们,只要agent发送的数据的时候,skywalking 发现发送过来的数据是一个不存在的服务名称发送过来的,就会返回一个ResetCommand,回到上一章将的心跳机制上面,在执行ping的时候他是不是返回了一个 commands 对象,其实不仅仅是ping,还有很多GRPC的接口都返回的commands

在这里插入图片描述

 

如果细心的同学 可以看到除了 ServiceResetCommand还有一个叫做 ProfileTaskCommand 的命令,这个具体干什么我们看一下个BootService

2.4 性能剖析 : ProfileTaskChannelService

先说个结论:这个玩意在笔者看来比较鸡肋,还占用3个线程,如果没需要可以把他关闭(默认是打开的)

skywalking的页面上可以新建一个性能剖析 的任务,然后你可以看到对应线程现在的调用栈信息
在这里插入图片描述

 

还是进去 ProfileTaskChannelService#boot() 方法

if (Config.Profile.ACTIVE) {
            // query task list
            getTaskListFuture = Executors.newSingleThreadScheduledExecutor(
                new DefaultNamedThreadFactory("ProfileGetTaskService")
            ).scheduleWithFixedDelay(
                new RunnableWithExceptionProtection(
                    this,
                    t -> logger.error("Query profile task list failure.", t)
                ), 0, Config.Collector.GET_PROFILE_TASK_INTERVAL, TimeUnit.SECONDS
            );

            sendSnapshotFuture = Executors.newSingleThreadScheduledExecutor(
                new DefaultNamedThreadFactory("ProfileSendSnapshotService")
            ).scheduleWithFixedDelay(
                new RunnableWithExceptionProtection(
                    new SnapshotSender(),
                    t -> logger.error("Profile segment snapshot upload failure.", t)
                ), 0, 500, TimeUnit.MILLISECONDS
            );
        }

可以看出 可以用配置 profile.active 来控制是否开启(默认开启),创建了2个定时器,一个 20S执行一次一个 500ms 执行一次

我们先看 20S执行一次的,他的名字叫做 getTaskListFuture,从名字上看是获取任务列表 , 我来看他的 run 方法,下面是精简过的代码

@Override
    public void run() {
        if (RemoteDownstreamConfig.Agent.SERVICE_ID != DictionaryUtil.nullValue()
            && RemoteDownstreamConfig.Agent.SERVICE_INSTANCE_ID != DictionaryUtil.nullValue()) {
            if (status == GRPCChannelStatus.CONNECTED) {
                ProfileTaskCommandQuery.Builder builder = ProfileTaskCommandQuery.newBuilder();

                    // 把服务ID和实例ID传给skywalking 看看有没对应的 ProfileTask
                    builder.setServiceId(RemoteDownstreamConfig.Agent.SERVICE_ID)
                           .setInstanceId(RemoteDownstreamConfig.Agent.SERVICE_INSTANCE_ID);

                   builder.setLastCommandTime(ServiceManager.INSTANCE.findService(ProfileTaskExecutionService.class)
                                                                      .getLastCommandCreateTime());

                    Commands commands = profileTaskBlockingStub.withDeadlineAfter(GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS)
                                                               .getProfileTaskCommands(builder.build());

                    ServiceManager.INSTANCE.findService(CommandService.class).receiveCommand(commands);
            }
        }
    }

代码非常的简单,就是把 服务ID实例ID传给skywalking 看看有没对应的 ProfileTask , 如果有的话就让 CommandService 去执行对应的 commands,这里的 commands 其实也就是上面提到的 ProfileTaskCommand , 也就是说这是一个 20S周期的轮询器

这里我们继续往下看,如果收到了 ProfileTaskCommand 指令 他会去干什么

ProfileTaskCommandExecutor#execute 方法

@Override
    public void execute(BaseCommand command) throws CommandExecutionException {
        final ProfileTaskCommand profileTaskCommand = (ProfileTaskCommand) command;

        // build profile task
        final ProfileTask profileTask = new ProfileTask();
        profileTask.setTaskId(profileTaskCommand.getTaskId());
        profileTask.setFistSpanOPName(profileTaskCommand.getEndpointName());
        profileTask.setDuration(profileTaskCommand.getDuration());
        profileTask.setMinDurationThreshold(profileTaskCommand.getMinDurationThreshold());
        profileTask.setThreadDumpPeriod(profileTaskCommand.getDumpPeriod());
        profileTask.setMaxSamplingCount(profileTaskCommand.getMaxSamplingCount());
        profileTask.setStartTime(profileTaskCommand.getStartTime());
        profileTask.setCreateTime(profileTaskCommand.getCreateTime());

        // send to executor
        ServiceManager.INSTANCE.findService(ProfileTaskExecutionService.class).addProfileTask(profileTask);
    }

这里又把任务饶了一个圈 到了 ProfileTaskExecutionService#addProfileTask 里面,这个 addProfileTask很简单,就是把任务加入队列,然后开启一个定时器,在指定时间后去执行任务(可以看上面页面,用户是可以指定开始时间的)

public void addProfileTask(ProfileTask task) {
        
        // 把ProfileTaskCommand 任务存入队列
        profileTaskList.add(task);

        //到达用户设置的时间后开启任务
        long timeToProcessMills = task.getStartTime() - System.currentTimeMillis();
        PROFILE_TASK_SCHEDULE.schedule(() -> processProfileTask(task), timeToProcessMills, TimeUnit.MILLISECONDS);
    }

如果时间到了,就会执行下面的方法

private synchronized void processProfileTask(ProfileTask task) {
        // 确保上一个 任务已经停止
        stopCurrentProfileTask(taskExecutionContext.get());

        // 创建一个新的任务
        final ProfileTaskExecutionContext currentStartedTaskContext = new ProfileTaskExecutionContext(task);
        taskExecutionContext.set(currentStartedTaskContext);

        // 开了一个线程去执行对应的性能剖析任务
        currentStartedTaskContext.startProfiling(PROFILE_EXECUTOR);

        PROFILE_TASK_SCHEDULE.schedule(
            () -> stopCurrentProfileTask(currentStartedTaskContext), task.getDuration(), TimeUnit.MINUTES);
    }

他在这个定时器中去开启了一个线程去执行这个性能 剖析任务,最后又开启了一个定时器到达指定时间后关闭这个性能剖析任务(这线程一层套一层这个套娃行为也是醉了),同时这里还有一个重要的信息,就是同一个agent只能开启一个 性能剖析任务,因为开启新的任务前,都会去关闭老的任务

接下来就是去看看那个 性能剖析任务 的任务都干了什么,这里就不贴代码,我贴累了 如果想看的去
ProfileThread#profiling, 稍稍简述一下就是,里面有个死循环,不断去一个队列里获取currentProfiler (这个算业务线程的包装类,里面是保存了业务线程的句柄), 如果获取到了 判断一下状态,如果状态是 PROFILING 就 从currentProfiler获取业务线程对象,然后把他的 dump给保存下来 ,

那么 currentProfiler 对象又是从什么地方保存进去的嗯?

他在 创建 TracingContext 对象的构造器的时候就会生成 currentProfiler 塞入队列里, TracingContext 对象的生成其实就是在 createSpan() 的时候(具体详情可以看 skywalking源码解析系列二 : agent采集trace数据),也就是在 创建 链路开始的时候生成了 currentProfiler, 同时也会在 链路 结束的时候把队列给清除掉,并且第一次生成的 currentProfiler 状态是 READY,在下次循环的时候才会升级成PROFILING , 那么如果 在升级到PROFILING 之前链路就结束了,那么你就无法看到 对应的调用栈

在这里插入图片描述

从上来看,开启一个 ProfileTask 最差可能会额外启动 4个线程

这里需要补充的是,并不是这个功能没有价值了,这个功能的本身就是用来去定位那些特别慢的方法,可以通过调用栈来定位到底这个方法卡在什么地方,但是如果是一个正常的调用,别乱加任务即可

2.4 采样收集 : SamplingService

正常的情况下,每一次的 调用信息都会发送给 skywalking,但是这样可能会对系统性能造成一定的影响,SamplingService 的作用就是可以限定 3S内发送多少 trace 数据,超过的数据将会被丢弃

老规矩,入口先看 SamplingService#boot()

@Override
    public void boot() {
        if (Config.Agent.SAMPLE_N_PER_3_SECS > 0) {
            on = true;
            this.resetSamplingFactor();
            ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor(
                new DefaultNamedThreadFactory("SamplingService"));
            scheduledFuture = service.scheduleAtFixedRate(new RunnableWithExceptionProtection(
                this::resetSamplingFactor, t -> logger.error("unexpected exception.", t)), 0, 3, TimeUnit.SECONDS);
        }
    }

如果设置了属性 sample_n_per_3_secs(默认是-1 也就是不开启) 开启了一个 周期是 3S的定时器,需要注意的是这个定时器是用了 清空记录的

SamplingService内使用了一个AtomicInteger 来计数,保证了并发问题,在每次 调用链 结束,准备发送数据到 skywalking的时候,就会去调用一下 SamplingService#trySampling 方法来看看超过上限没有,如果没有就能够正常发送,同时计数器+1 , 如果已经达到上限了,那么就会把这个Segment 设置成 Ignore,也自然不会发送给 skywalking

在这里插入图片描述
然后如果到达了 3s 的周期,定时器会把计数器里的值给清空

在这里插入图片描述

2.5 JVM数据采集 : JVMService

skywalking 在页面上也能看到可以看到 一些JVM的信息,这些信息都是 JVMService 采集的
还是看 boot() 方法

@Override
    public void boot() throws Throwable {
        //间隔1秒 搜集一次 jvm的信息
        collectMetricFuture = Executors.newSingleThreadScheduledExecutor(new DefaultNamedThreadFactory("JVMService-produce"))
                                       .scheduleAtFixedRate(new RunnableWithExceptionProtection(this, t -> logger.error("JVMService produces metrics failure.", t)), 0, 1, TimeUnit.SECONDS);
        //间隔1秒,发送一次 jvm的信息
        sendMetricFuture = Executors.newSingleThreadScheduledExecutor(new DefaultNamedThreadFactory("JVMService-consume"))
                                    .scheduleAtFixedRate(new RunnableWithExceptionProtection(sender, t -> logger.error("JVMService consumes and upload failure.", t)), 0, 1, TimeUnit.SECONDS);
    }

同样开了2个线程 一个采集一个发送,典型的生产消费模式,这里就不赘述了,套路和上面那些都是一样的。

3. 总结

agent 在我们应用程序 最坏的情况下 开启了 10 个左右的线程去监控/发送 数据,对性能还是有一定影响,如果有的功能基本用不到的,建议可以关闭对应的线程,来节约对应的资源,没错说的就是你ProfileTaskChannelService

行走的code

要发表评论,您必须先登录