需求
在访问量较大的分布式系统中,时时刻刻在打印着巨量的日志,当我们需要排查问题时,需要从巨量的日志信息中找到本次排查内容的日志是相对复杂的,那么,如何才能使日志看起来逻辑清晰呢?如果每一次请求都有一个全局唯一的id,当我们需要排查时,根据其他日志打印关键字定位到对应请求的全局唯一id,再根据id去搜索、筛选即可找到对应请求全流程的日志信息。接下来就是需要找一种方案,可以生成全局唯一id和在不同的线程中存储这个id。
解决方案
LogBack这个日志框架提供了MDC( Mapped Diagnostic Context,映射调试上下文 ) 这个功能,MDC可以理解为与线程绑定的数据存储器。数据可以被当前线程访问,当前线程的子线程会继承其父线程中MDC的内容。MDC 在 Spring Boot 中的作用是为日志事件提供上下文信息,并将其与特定的请求、线程或操作关联起来。通过使用 MDC,可以更好地理解和分析日志,并在多线程环境中确保日志的准确性和一致性。此外,MDC 还可以用于日志审计、故障排查和跟踪特定操作的执行路径。
代码
实现日志打印全局链路唯一id的功能,需要三个信息:
- 全局唯一ID生成器
- 请求拦截器
- 自定义线程池(可选)
- 日志配置
全局唯一ID生成器
生成器可选方案有:
- UUID,快速随机生成、极小概率重复
- Snowflake,有序递增
- 时间戳
雪花算法(Snowflake)更适用于需要自增的业务场景,如数据库主键、订单号、消息队列的消息ID等, 时间戳一般是微秒级别,极限情况下,一微秒内可能同时多个请求进来导致重复。系统时钟回拨时,UUID可能会重复,但是一般不会出现该情况,因此UUID这种方案的缺点可以接受,本案例使用UUID方案。
/**
* 全局链路id生成工具类
*
* @author Ltx
* @version 1.0
*/
public class RequestIdUtil {
public RequestIdUtil() {
}
public static void setRequestId() {
//往MDC中存入UUID唯一标识
MDC.put(Constant.TRACE_ID, UUID.randomUUID().toString());
}
public static void setRequestId(String requestId) {
MDC.put(Constant.TRACE_ID, requestId);
}
public static String getRequestId() {
return MDC.get(Constant.TRACE_ID);
}
public static void clear() {
//需要释放,避免OOM
MDC.clear();
}
}
/**
* Author: liu_pc
* Date: 2023/8/8
* Description: 常量定义类
* Version: 1.0
*/
public class Constant {
/**
* 全局唯一链路id
*/
public final static String TRACE_ID = "traceId";
}
自定义全局唯一拦截器
Filter是Java Servlet 规范定义的一种过滤器接口,它的主要作用是在 Servlet 容器中对请求和响应进行拦截和处理,实现对请求和响应的预处理、后处理和转换等功能。通过实现 Filter 接口,开发人员可以自定义一些过滤器来实现各种功能,如身份验证、日志记录、字符编码转换、防止 XSS 攻击、防止 CSRF 攻击等。那么这里我们使用它对请求做MDC赋值处理。
@Component
public class RequestIdFilter implements Filter {
@Override
public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) throws IOException, ServletException{
try {
HttpServletRequest httpServletRequest = (HttpServletRequest) servletRequest;
String requestId = httpServletRequest.getHeader("requestId");
if (StringUtils.isBlank(requestId)) {
RequestIdUtil.setRequestId();
} else {
RequestIdUtil.setRequestId(requestId);
}
// 继续将请求传递给下一个过滤器或目标资源(比如Controller)
filterChain.doFilter(servletRequest, servletResponse);
} finally {
RequestIdUtil.clear();
}
}
@Override
public void init(FilterConfig filterConfig) throws ServletException {
Filter.super.init(filterConfig);
}
@Override
public void destroy() {
Filter.super.destroy();
}
}
/**
* 测试MDC异步任务全局链路
*
* @param param 请求参数
* @return new String Info
*/
public String test(String param) {
logger.info("测试MDC test 接口开始,请求参数:{}", param);
String requestId = RequestIdUtil.getRequestId();
logger.info("MDC RequestId :{}", requestId);
return "hello";
}
日志配置
输出到控制台:
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<!-- 配置输出到控制台(可选输出到文件) -->
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<!-- 配置日志格式 -->
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %mdc %msg%n</pattern>
</encoder>
</appender>
<!-- 配置根日志记录器 -->
<root level="INFO">
<appender-ref ref="CONSOLE"/>
</root>
<!-- 配置MDC -->
<contextListener class="ch.qos.logback.classic.jul.LevelChangePropagator">
<resetJUL>true</resetJUL>
</contextListener>
<!-- 配置MDC插件 -->
<conversionRule conversionWord="%mdc" converterClass="org.springframework.boot.logging.logback.ExtendedWhitespaceThrowableProxyConverter"/>
</configuration>
输出到文件:
<configuration>
<!-- 配置输出到文件 -->
<appender name="FILE" class="ch.qos.logback.core.FileAppender">
<!-- 指定日志文件路径和文件名 -->
<file>/Users/liu_pc/Documents/code/mdc_logback/logs/app.log</file>
<encoder>
<!-- 配置日志格式 -->
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %mdc %msg%n</pattern>
</encoder>
</appender>
<!-- 配置根日志记录器 -->
<root level="INFO">
<appender-ref ref="FILE"/>
</root>
<!-- 其他配置... -->
</configuration>
功能实现。
子线程获取traceId问题
使用多线程时,子线程打印日志拿不到traceId。如果在子线程中获取traceId,那么就相当于往各自线程中的MDC赋值了traceId,会导致子线程traceId不一致的问题。
public void wrongHelloAsync(String param) {
logger.info("helloAsync 开始执行异步操作,请求参数:{}", param);
List<Integer> simulateThreadList = new ArrayList<>(5);
for (int i = 0; i <= 5; i++) {
simulateThreadList.add(i);
}
for (Integer thread : simulateThreadList) {
CompletableFuture.runAsync(() -> {
//在子线程中赋值
String requestId = RequestIdUtil.getRequestId();
logger.info("子线程信息:{},traceId:{} ", thread, requestId);
}, executor);
}
}
}
子线程获取traceId方案
使用子线程时,可以使用自定义线程池重写部分方法,在重写的方法中获取当前MDC数据副本,再将副本信息赋值给子线程的方案。
/**
* Author: liu_pc
* Date: 2023/8/8
* Description: 自定义线程池配置
* Version: 1.0
*/
@Configuration
public class ThreadPoolConfig {
@Bean(name = "ownThreadPoolExecutor")
public ThreadPoolExecutor customThreadPool() {
int corePoolSize = 5;
int maximumPoolSize = 10;
long keepAliveTime = 60;
TimeUnit unit = TimeUnit.SECONDS;
BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(50);
ThreadFactory threadFactory = Executors.defaultThreadFactory();
RejectedExecutionHandler handler = new ThreadPoolExecutor.AbortPolicy();
return new ThreadPoolExecutor(
corePoolSize,
maximumPoolSize,
keepAliveTime,
unit,
workQueue,
threadFactory,
handler
);
}
}
线程包装类
/**
* Author: liu_pc
* Date: 2023/8/8
* Description: 线程池包装类
* Version: 1.0
*/
public class ThreadPoolExecutorMdcWrapper extends ThreadPoolExecutor {
public ThreadPoolExecutorMdcWrapper(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
public ThreadPoolExecutorMdcWrapper(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
}
public ThreadPoolExecutorMdcWrapper(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
}
public ThreadPoolExecutorMdcWrapper(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}
@Override
public void execute(@NotNull Runnable task) {
super.execute(ThreadMdcUtil.wrap(task, MDC.getCopyOfContextMap()));
}
@Override
public <T> Future<T> submit(@NotNull Runnable task, T result) {
return super.submit(ThreadMdcUtil.wrap(task, MDC.getCopyOfContextMap()), result);
}
@Override
public <T> Future<T> submit(@NotNull Callable<T> task) {
return super.submit(ThreadMdcUtil.wrap(task, MDC.getCopyOfContextMap()));
}
@Override
public Future<?> submit(@NotNull Runnable task) {
return super.submit(ThreadMdcUtil.wrap(task, MDC.getCopyOfContextMap()));
}
}
/**
* Author: liu_pc
* Date: 2023/8/8
* Description: 线程池包装类
* Version: 1.0
*/
public class ThreadMdcUtil {
/**
* 当线程中的traceId为空时,赋值新的TraceId
*/
public static void setTraceIdIfAbsent() {
if (MDC.get(Constant.TRACE_ID) == null) {
RequestIdUtil.setRequestId();
}
}
/**
* 将原始Callable对象封装成新的Callable对象,增加日志上下文信息
* @param callable 任务线程对象
* @param context 上下文
* @return Callable返回值的类型
* @param <T> 封装后的新的Callable对象
*/
public static <T> Callable<T> wrap(final Callable<T> callable, final Map<String, String> context) {
return () -> {
if (context == null) {
MDC.clear();
} else {
MDC.setContextMap(context);
}
setTraceIdIfAbsent();
try {
return callable.call();
} finally {
MDC.clear();
}
};
}
/**
* 将原始Runnable对象封装成新的Runnable对象,增加日志上下文信息
* @param runnable 无返回结果的任务线程对象
* @param context 上下文
* @return Callable返回值的类型
*/
public static Runnable wrap(final Runnable runnable, final Map<String, String> context) {
return new Runnable() {
@Override
public void run() {
if (context == null) {
MDC.clear();
} else {
MDC.setContextMap(context);
}
setTraceIdIfAbsent();
try {
runnable.run();
} finally {
MDC.clear();
}
}
};
}
}
测试代码:
/**
* 测试MDC异步任务全局链路
*
* @param param 请求参数
* @return new String Info
*/
public String test(String param) {
logger.info("测试MDC test 接口开始,请求参数:{}", param);
String requestId = RequestIdUtil.getRequestId();
logger.info("MDC RequestId :{}", requestId);
helloAsyncService.helloAsync(param, requestId);
return "hello";
}
private final Executor executor;
public HelloAsyncService(@Qualifier("ownThreadPoolExecutor") Executor executor) {
this.executor = executor;
}
/**
* 使用异步数据测试打印日志
*
* @param param 请求参数
*/
public void helloAsync(String param) {
logger.info("helloAsync 开始执行异步操作,请求参数:{}", param);
List<Integer> simulateThreadList = new ArrayList<>(5);
for (int i = 0; i <= 5; i++) {
simulateThreadList.add(i);
}
for (Integer thread : simulateThreadList) {
Map<String, String> mainMdcContext = MDC.getCopyOfContextMap();
CompletableFuture.runAsync(() -> {
try {
MDC.setContextMap(mainMdcContext);
//模拟其他业务逻辑处理耗时
Thread.sleep(1000);
} catch (Exception e) {
throw new RuntimeException(e);
}
logger.info("threadIndex:{} Another thread running", thread);
}, executor);
}
}
HTTP请求接口无法获取到TraceId问题
解决该问题的方法是通过添加适当的HTTP请求工具的拦截器来实现。你可以在文章的末尾附上整个项目的Gitee地址,这样小伙伴们就可以参考我以OKHttp工具为例的代码来实现相同的效果。
代码地址
Gitee代码地址