用户您好!请先登录!

线程上下文在全链路跟踪上的运用

线程上下文在全链路跟踪上的运用

ThreadLocal是JDK默认提供的本地线程变量,用来存储在整个调用链中都需要访问的数据,并且是线程安全的。由于本文的写作背景是笔者需要在公司落地全链路跟踪平台,一个基本并核心的功能需求是用户的每个操作动作需要在整个调用链中进行埋点传递,线程上下文环境成为解决这个问题最合适的技术。

ThreadLocal解决什么问题?

ThreadLocal是在Thread类之外实现的一个功能(java.lang.ThreadLocal), 但它会为每个线程分别存储一份唯一的数据。正如它的名字所说的,它为线程提供了本地存储,也就是说你所创建出来变量对每个线程实例来说都是唯一的。和线程 名,线程优先级类似,你可以自定义出一些属性,就好像它们是存储在Thread线程内部一样。

就以数据库事务这一常用场景来举例说明,比如每个线程需要访问数据库,就需要获取数据库的连接Connection对象,在实际中,我们会用数据库连接池来重复利用Connection,首先线程池,这里是一个共享变量,线程池的实现必须保证多个线程同时从线程池中获取Connection不会重复,然后每个线程使用单独的Connection,并且该Connection被一个线程占用后,其他线程压根就不会使用到,也不会试图去使用一个已经被其他线程占用的Connection对象。由于一个线程在执行过程中,可能需要多次操作数据库,所以我们的设计就是一个线程在执行过程中,只与一个Connection打交道,也就是整个线程的执行过程(执行环境)需要保存刚获取的Connection,最简单有效的办法,就是把这个Connection保存在线程对象的某个属性中,ThreadLocal就是干这事的。ThreadLocal并不是为这个Connection复制一份,多个线程都使用这个副本,不是这样的,一个Connection对象在任意时刻,没有被复制多份。

核心思想:ThreadLocal 提供了线程本地的实例。它与普通变量的区别在于,每个使用该变量的线程都会初始化一个完全独立的实例副本。ThreadLocal 变量通常被private static修饰。当一个线程结束时,它所使用的所有 ThreadLocal 相对的实例副本都可被回收。总的来说,ThreadLocal 适用于每个线程需要自己独立的实例且该实例需要在多个方法中被使用,也即变量在线程间隔离而在方法或类间共享的场景。

ThreadLocal原理

ThreadLocal相关的类的类图结构

全链路跟踪之线程上下文Thread Local实战(完整源码)

如上类图可知Thread类中有一个threadLocals和inheritableThreadLocals都是ThreadLocalMap类型的变量,而ThreadLocalMap是一个定制化的Hashmap,默认每个线程中这两个变量都为null,只有当前线程第一次调用了ThreadLocal的set或者get方法时候才会进行创建。其实每个线程的本地变量不是存放到ThreadLocal实例里面的,而是存放到调用线程的threadLocals变量里面。也就是说ThreadLocal类型的本地变量是存放到具体的线程内存空间的。

ThreadLocal就是一个工具壳,它通过set方法把value值放入调用线程的threadLocals里面存放起来,当调用线程调用它的get方法时候再从当前线程的threadLocals变量里面拿出来使用。如果调用线程一直不终止那么这个本地变量会一直存放到调用线程的threadLocals变量里面,所以当不需要使用本地变量时候可以通过调用ThreadLocal变量的remove方法从当前线程的threadLocals里面删除该本地变量。另外Thread里面的threadLocals为何设计为map结构呢?很明显是因为每个线程里面可以关联多个ThreadLocal变量。

源码分析

ThreadLocal对外提供的API如下:

  • public T get()

从线程上下文环境中获取设置的值。

  • public void set(T value)

将值存储到线程上下文环境中,供后续使用。

  • public void remove()

清除线程本地上下文环境。

public T get() {
   Thread t = Thread.currentThread();  // @1
   ThreadLocalMap map = getMap(t);  // @2
   if (map != null) {                                // @3
      ThreadLocalMap.Entry e = map.getEntry(this);
      if (e != null) {
         @SuppressWarnings("unchecked")
         T result = (T)e.value;
         return result;
      }
   }
   return setInitialValue();  // @4
}

代码@1:获取当前线程。

代码@2:获取线程的threadLocals属性。

代码@3:如果线程对象的threadLocals属性不为空,则从该Map结构中,用threadLocal对象为键去查找值,如果能找到,则返回其value值,否则执行代码@4。

代码@4:如果线程对象的threadLocals属性为空,或未从threadLocals中找到对应的键值对,则调用该方法执行初始化。

private T setInitialValue() {
   T value = initialValue();    // @1
   Thread t = Thread.currentThread();    // @2
   ThreadLocalMap map = getMap(t);    // @3
   if (map != null)           //@4
      map.set(this, value);
   else
      createMap(t, value);     // @5
   return value;
}

代码@1:调用initialValue()获取默认初始化值,该方法默认返回null,子类可以重写,实现线程本地变量的初始化。

代码@2:获取当前线程。

代码@3:获取该线程对象的threadLocals属性。

代码@4:如果不为空,则将threadLocal:value存入线程对象的threadLocals属性中。

代码@5:否则初始化线程对象的threadLocals,然后将threadLocal:value键值对存入线程对象的threadLocals属性中。

public void set(T value) {
   Thread t = Thread.currentThread();
   ThreadLocalMap map = getMap(t);
   if (map != null)
      map.set(this, value);
   else
      createMap(t, value);
}

在掌握了get方法实现细节,set方法、remove其实现的逻辑基本一样,就是对线程对象的threadLocals属性进行操作(Map结构)。

适用场景

ThreadLocal 适用于如下两种场景:

  • 每个线程需要有自己单独的实例
  • 实例需要在多个方法中共享,但不希望被多线程共享

对于第一点,每个线程拥有自己实例,实现它的方式很多。例如可以在线程内部构建一个单独的实例。ThreadLoca 可以以非常方便的形式满足该需求。

对于第二点,可以在满足第一点(每个线程有自己的实例)的条件下,通过方法间引用传递的形式实现。ThreadLocal 使得代码耦合度更低,且实现更优雅。

0、自定义注解

  • 请求URL方法名称
  • 操作名称
  • 前端所属菜单
  • 请求行为描述
/**
 * 链路追踪0、自定义注解
 */
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD})
public @interface RequestMonitor {

    /**
     * 请求URL方法名称
     * @return
     */
    String urlMethodName();

    /**
     * 操作名称
     * @return
     */
    String opType();

    /**
     * 前端所属菜单
     * @return
     */
    String menuName() default "";

    /**
     * 请求行为描述<br></br>
     * 使用占位符 ${} 替换变量
     * @return
     */
    String behaveMark() default "";

}

0、定义全局上下文工具类(用于储存线程上下文的值)

  • 首先定义一个 私有、静态 ConcurrentHashMap
  • 给线程上下文赋值,KEY为线程ID
  • 根据当前线程ID获取线程上下文存储的值
  • 根据当前线程ID获取链路追踪日志对象值
  • 清空key为当前线程ID的值,防止内存泄漏
/**
 * 链路追踪0、全局上下文工具类,用于储存线程上下文的值
 */
public class ThreadLocalContext {

   /**
    * 定义Map
    */
    private static ConcurrentHashMap<Long, ThreadLocal<ShopNbMonitorLog>> currentThreadLocals = new ConcurrentHashMap<>();

    private ThreadLocalContext(){}

   public final static ThreadLocalContext INSTANCE = new ThreadLocalContext();

   /**
    * 给线程上下文赋值,KEY为线程ID
    * @param threadId
    * @param threadLocal
    */
   public void put(Long threadId, ThreadLocal<ShopNbMonitorLog> threadLocal){
        if(threadId != null && threadLocal != null) {
           //全链路跟踪日志记录对象
            ShopNbMonitorLog ShopNbMonitorLog = new ShopNbMonitorLog();
            //链路追踪ID
            String trackingId = UUID.randomUUID().toString().replaceAll("-","");
            ShopNbMonitorLog.setTrackingId(trackingId);
            threadLocal.set(ShopNbMonitorLog);
            currentThreadLocals.put(threadId, threadLocal);
        }
    }

   /**
    * 根据当前线程ID获取线程上下文存储的值
    * @param threadId
    * @return
    */
   public ThreadLocal<ShopNbMonitorLog> get(Long threadId){
        return currentThreadLocals.get(threadId);
    }

   /**
    * 根据当前线程ID获取链路追踪日志对象值
    * @param threadId
    * @return
    */
   public ShopNbMonitorLog getThreadLocalValue(Long threadId){
        return currentThreadLocals.get(threadId).get();
    }

   /**
    * 清空key为当前线程ID的值
    * @param threadId
    */
   public void remove(Long threadId){
        currentThreadLocals.remove(threadId);
    }

}

0、全链路跟踪日志记录操作类型

/**
 * 链路追踪0、全链路跟踪日志记录操作类型
 */
public class MonitorOpType {

   public static final String BASE_OP_100 = "100"; //新增
   public static final String BASE_OP_101 = "101"; //修改
   public static final String BASE_OP_102 = "102"; //删除
   public static final String BASE_OP_103 = "103"; //查询
   public static final String BASE_OP_104 = "104"; //导出
   public static final String BASE_OP_105 = "105"; //确认
   public static final String BASE_OP_106 = "106"; //导入
   public static final String BASE_OP_107 = "107"; //列表
   public static final String BASE_OP_108 = "108"; //关闭
   public static final String BASE_OP_109 = "109"; //批量更新
   public static final String BASE_OP_110 = "110"; //导出列表

   public static final String BIZ_OP_200 = "200"; //登陆
   …………
}

0、拦截器配置

/**
 * 链路追踪0、拦截器配置
 **/
@Configuration
public class MonitorConfiguration implements WebMvcConfigurer {
   @Override
   public void addInterceptors(InterceptorRegistry registry){ //拦截器
      //增加一个拦截器
      registry.addInterceptor(new LogHandlerInterceptor()).addPathPatterns("/**");
   }
}

1、请求方法拦截日志类

一个controller方式的执行过程中,拦截器最先开始执行,最后执行完毕。

对于请求参数的获取,AOP方式更加的完整、准确。

AOP方式对于请求信息获取的更加详细,可以获取到方法名称和所在的类。

preHandle:Controller方法处理之前执行

  • 给线程上下文赋值,KEY为线程ID
  • 性能耗时日志封装

postHandle:调用前提为preHandle返回true

  • 控制台打印日志
  • 记录访问的URL 日志
/**
 * 链路追踪1、请求方法拦截日志
 * 一个controler方式的执行过程中,拦截器最先开始执行,最后执行完毕。
 * 对于请求参数的获取,AOP方式更加的完整、准确。
 * AOP方式对于请求信息获取的更加详细,可以获取到方法名称和所在的类。
 */
@Slf4j
public class LogHandlerInterceptor implements HandlerInterceptor {

   //性能耗时本地线程
   private ThreadLocal<Map<String,Long>> takeTimeThreadLocal;

   @Override
   public boolean preHandle(HttpServletRequest request, HttpServletResponse response,
         Object handler) throws Exception {
      takeTimeThreadLocal = new ThreadLocal();
      /**
       * 给线程上下文赋值,KEY为线程ID
       */
      ThreadLocalContext.INSTANCE.put(Thread.currentThread().getId(), new ThreadLocal());
      /**
       * 性能耗时日志
       */
      try {
         MethodTaketimeLogHandler.Builder builder = MethodTaketimeLogHandler.build();
         builder.setHttpServletRequest(request)
               .setTrackingId(takeTimeThreadLocal)
               .handleFilter(handler);
      }catch (Exception e){}
      return true;
   }
   
   @Override
   public void postHandle(HttpServletRequest request, HttpServletResponse response,
         Object handler, ModelAndView modelAndView) throws Exception {
      //控制台打印日志
      if(takeTimeThreadLocal != null){
         MethodTaketimeLogHandler.printExecFinishLog(takeTimeThreadLocal.get());
      }
      //记录访问的URL 日志
      try {
         MethodMonitorLogHandler.build().setHttpServletRequest(request).handleFilter(handler);
      }catch (Exception e){
         log.error("记录访问的URL日志异常",e);
      }
   }
}

2、性能耗时日志输出工具类

  • 性能耗时日志输出打印
  • 设置HttpServletRequest
  • 获取链路追踪ID
  • 设置链路追踪ID
  • 设置能耗时日志值
/**
 * 链路追踪2、性能耗时日志输出
 */
public class MethodTaketimeLogHandler {

    private static final Logger logger = LoggerFactory.getLogger(MethodTaketimeLogHandler.class);

    private MethodTaketimeLogHandler(){
    }

    public static MethodTaketimeLogHandler.Builder build(){
        return new Builder();
    }

   /**
    * 性能耗时日志输出打印
    * @param data
    */
   public static void printExecFinishLog(Map<String, Long> data){
        if(data == null){
            return;
        }
        StringBuilder sbf = new StringBuilder();
        Iterator<Map.Entry<String, Long>> iterator = data.entrySet().iterator();
        while (iterator.hasNext()){
            Map.Entry<String, Long> entry = iterator.next();
            long methodStartTime = entry.getValue();
            long methodEndTime = System.currentTimeMillis();
            sbf.append("TrackingId:").append(entry.getKey()).append(",执行耗时:")
            .append(methodEndTime - methodStartTime).append("ms");
        }
        logger.info(sbf.toString());
    }

    public static class Builder{

        private HttpServletRequest request;
        private String trackingId;

      /**
       * 设置HttpServletRequest
       * @param request
       * @return
       */
      public Builder setHttpServletRequest(HttpServletRequest request){
            this.request = request;
            return this;
        }

      /**
       * 获取链路追踪ID
       */
      public Builder(){
            trackingId = UUID.randomUUID().toString().replaceAll("-","");
        }

      /**
       * 设置链路追踪ID
       * @param threadLocal
       * @return
       */
      public Builder setTrackingId(ThreadLocal<Map<String,Long>> threadLocal){
            Map<String,Long> trackingIdMap = new HashMap<>();
            trackingIdMap.put(trackingId, System.currentTimeMillis());
            threadLocal.set(trackingIdMap);
            return this;
        }

      /**
       * 设置能耗时日志值
       * @param handler
       * @return
       */
      public Builder handleFilter(Object handler){
            if (handler instanceof HandlerMethod) {
                StringBuilder sbf = new StringBuilder();
                sbf.append("TrackingId:").append(trackingId).append(",");
                sbf.append("Request URL:").append(this.request.getRequestURL()).append(",");
                HandlerMethod handlerMethod = (HandlerMethod) handler;
                sbf.append("Controller:").append(handlerMethod.getBean().getClass().getName()).append(",");
                sbf.append("Method:").append(handlerMethod.getMethod().getName()).append(",");
                sbf.append("Params:").append(getParamString(request.getParameterMap())).append(",");
                logger.info(sbf.toString());
            }
            return this;
        }

        //参数封装
        private String getParamString(Map<String, String[]> map) {
            StringBuilder sbf = new StringBuilder();
            for(Map.Entry<String,String[]> e:map.entrySet()){
                sbf.append(e.getKey()).append("=");
                String[] value = e.getValue();
                if(value != null && value.length == 1){
                    sbf.append(value[0]).append("\t");
                }else{
                    sbf.append(Arrays.toString(value)).append("\t");
                }
            }
            return sbf.toString();
        }

    }
}

3、AOP切面类

实现步骤:

  • 增强环绕,定义切点(根据实际业务进行配置)
  • 获取当前线程ID
  • 根据当前线程ID获取取线程上下文存储的值
  • 获取链路追踪日志对象值
  • 获取当前类有 RequestMonitor 注解的方法值
  • 执行真实请求
/**
 * 链路追踪3、AOP切面类
 */
@Slf4j
@Aspect //标注增强处理类(切面类)
@Component //交由Spring容器管理
public class RequestMonitorAspect  {

   /**
    * 增强环绕
    * @param joinPoint
    * @return
    * @throws Throwable
    */
    @Around("@annotation(cn.wonhigh.shop.ms.infrastructure.annotation.RequestMonitor)")
    public Object doAround(ProceedingJoinPoint joinPoint) throws Throwable {
       //获取当前线程ID
        long currentThreadId = Thread.currentThread().getId();
        //根据当前线程ID获取取线程上下文存储的值
        ThreadLocal<ShopNbMonitorLog> currentThread = ThreadLocalContext.INSTANCE.get(currentThreadId);
        //获取链路追踪日志对象值
        ShopNbMonitorLog ShopNbMonitorLog = currentThread.get();
        log.info("RequestMonitorAspect aroundMethod start - currentThreadId:{}, trackingId:{}"
                ,currentThreadId, ShopNbMonitorLog.getTrackingId());

        MethodSignature signature = (MethodSignature) joinPoint.getSignature();
        RequestMonitor operate = null;
      try {
         //获取抽象方法
         Method method = signature.getMethod();
         //获取当前类的对象
         Class<?> clazz = joinPoint.getTarget().getClass();
         //获取当前类有 RequestMonitor 注解的方法
         method = clazz.getMethod(method.getName(), method.getParameterTypes());
         operate = method.getAnnotation(RequestMonitor.class);
      } catch (Exception e) {
         log.error("获取当前类 RequestMonitor 注解的方法 异常",e);
      }
        if (operate != null) {
            ShopNbMonitorLog.setMenuName(operate.menuName());
            ShopNbMonitorLog.setRequestOpType(operate.opType());
            ShopNbMonitorLog.setRequestUrlName(operate.urlMethodName());

            //封装请求参数
            try {
                StringBuilder params = new StringBuilder();
                Object[] args = joinPoint.getArgs();
                if (args != null && args.length > 0) {
                    int i = 0;
                    for (Object arg : args) {
                        if (arg instanceof HttpServletRequest || arg instanceof HttpServletResponse) {
                            continue;
                        }
                        params.append(JSON.toJSONString(arg));
                        if (i < args.length - 1) {
                            params.append("|");
                        }
                        i++;
                    }
                    ShopNbMonitorLog.setRequestParams(params.toString());
                }
            }catch (Exception e){
                log.error("Request Monitor Aspect exception has happend! trackingId:{}",ShopNbMonitorLog.getTrackingId(), e);
            }
        }
        // 执行真实请求
        final Object proceed = joinPoint.proceed();
        try {
            String opTime = DateUtils.format(new Date(),"yyyy-MM-dd HH:mm:ss");
            ShopNbMonitorLog.setCreateTime(new Date());
            ShopNbMonitorLog.setRequestResult(JSON.toJSONString(proceed));
            ShopNbMonitorLog.setRequestResultDesc(opTime + " " + ShopNbMonitorLog.getRequestUrlName());
        }catch (Exception e){
            log.error("Request Monitor Aspect exception has happend! trackingId:{}", ShopNbMonitorLog.getTrackingId(), e);
        }

        log.info("RequestMonitorAspect aroundMethod end - currentThreadId:{}, trackingId:{}"
                ,currentThreadId, ShopNbMonitorLog.getTrackingId());
        return proceed;
    }
    
}

4、访问数据/操作记录/写入库

实现步骤:

  • 获取当前线程ID
  • 根据当前线程ID获取线程上下文存储的值
  • 判断是否加自定义链路跟踪注解
  • 填充业务逻辑信息
  • 写入数据库
  • 清空key为当前线程ID的值
/**
 * 链路追踪4、访问数据/操作记录/写入库
 */
@Slf4j
public class MethodMonitorLogHandler {

    private MethodMonitorLogHandler(){}

   public static MethodMonitorLogHandler.Builder build(){
        return new Builder();
    }

    public static class Builder{

        private HttpServletRequest request;

        public Builder setHttpServletRequest(HttpServletRequest request){
            this.request = request;
            return this;
        }

        public Builder handleFilter(Object handler){
            //获取当前线程ID
            long currentThreadId = Thread.currentThread().getId();
            //根据当前线程ID获取线程上下文存储的值
            ThreadLocal<ShopNbMonitorLog> currentThread = ThreadLocalContext.INSTANCE.get(currentThreadId);
            ShopNbMonitorLog ShopNbMonitorLog = currentThread.get();
            log.info("MethodMonitorLogHandler handleFilter start - currentThreadId:{}, trackingId:{},URL:{}"
                    ,currentThreadId, ShopNbMonitorLog.getTrackingId(), request.getRequestURI());

            //判断是否加自定义链路跟踪注解
            if (handler instanceof HandlerMethod) {
                HandlerMethod method = (HandlerMethod) handler;
                final RequestMonitor operate = method.getMethodAnnotation(RequestMonitor.class);
                if(operate == null){
                    log.info("method has`nt RequestMonitor annotation ,request skip! trackingId:{}", ShopNbMonitorLog.getTrackingId());
                    return this;
                }
            }else {
                return this;
            }

            //填充业务逻辑信息
         HttpSession session = request.getSession();
         SystemUser systemUser = (SystemUser)session.getAttribute("session_user");
            if(systemUser == null){
                return this;
            }

            ShopNbMonitorLog.setOrganTypeNo(systemUser.getOrganTypeNo());
            ShopNbMonitorLog.setUserNo(systemUser.getUserNo());
            ShopNbMonitorLog.setUserName(systemUser.getUsername());
            ShopNbMonitorLog.setRequestUrl(this.request.getRequestURI());
            ShopNbMonitorLog.setCreateUser(systemUser.getUsername());

            //写入数据库
         try {
            new Thread(new Runnable() {
               @Override
               public void run() {
                  SpringContextHolder.getBean(ShopNbMonitorLogMapper.class).insert(ShopNbMonitorLog);
               }
            }).start();
         } catch (Exception e) {
            log.error("MethodMonitorLogHandler insert mysql Erroe",e);
         }
         //清空key为当前线程ID的值
            ThreadLocalContext.INSTANCE.remove(currentThreadId);
            log.info("MethodMonitorLogHandler handleFilter end - currentThreadId:{}, trackingId:{},URL:{}"
                    ,currentThreadId, ShopNbMonitorLog.getTrackingId(), request.getRequestURI());
            return this;
        }
    }
}

5、在链路跟踪方法上添加注解RequestMonitor(根据实际业务配置注解)

/**
     * UC配置路径跳转
     * @param model
     * @param request
     * @return
     */
    @RequestMonitor(urlMethodName = "列表", opType = MonitorOpType.BASE_OP_107, menuName = "链路跟踪日志")
    @RequestMapping(value = "/list")
    public String viewMonitorLog(Model model, HttpServletRequest request) {
        return "monitor/monitor_log_info";
    }

6、链路跟踪操作数据库数据

具体业务数据库字段可自行设计,页面展现可根据系统实际需要实现。

总结

  • ThreadLocal 并不解决线程间共享数据的问题。
  • ThreadLocal 通过隐式的在不同线程内创建独立实例副本避免了实例线程安全的问题。
  • 每个线程持有一个 Map 并维护了 ThreadLocal 对象与具体实例的映射,该 Map 由于只被持有它的线程访问,故不存在线程安全以及锁的问题。
  • ThreadLocalMap 的 Entry 对 ThreadLocal 的引用为弱引用,避免了 ThreadLocal 对象无法被回收的问题。
  • ThreadLocalMap 的 set 方法通过调用 replaceStaleEntry 方法回收键为 null 的 Entry 对象的值(即为具体实例)以及 Entry 对象本身从而防止内存泄漏。
  • ThreadLocal 适用于变量在线程间隔离且在方法间共享的场景。
行走的code
行走的code

要发表评论,您必须先登录