背景
Skywalking默认场景下,Tracing对于消息队列的发送场景,无法将TraceId传递到下游消费者,但对于微服务场景下,是有大量消息队列的业务场景的,这显然无法满足业务预期。
解决方案
Skywalking的官方社区中,有用户提出了该场景问题,Skywalking在补充工具包中,提供了对Kafka的tracing支持。
代码实现:
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>apm-toolkit-kafka</artifactId>
<version>${skywalking.version}</version>
</dependency>
对于该工具包,默认情况下,是针对KafkaTemplate进行trace,即如果使用KafkaTemplate发送消息,代码层面无需做任何改动。
如果没有使用KafkaTemplate的场景,toolkit也提供的了注解的支持:
public class ConsumerThread2 extends Thread {
@Override
public void run() {
Properties consumerProperties = new Properties();
//...consumerProperties.put()
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProperties);
consumer.subscribe(topicPattern, new NoOpConsumerRebalanceListener());
while (true) {
if (pollAndInvoke(consumer)) break;
}
consumer.close();
}
@KafkaPollAndInvoke
private boolean pollAndInvoke(KafkaConsumer<String, String> consumer) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
ConsumerRecords<String, String> records = consumer.poll(100);
if (!records.isEmpty()) {
OkHttpClient client = new OkHttpClient.Builder().build();
Request request = new Request.Builder().url("http://localhost:8080/kafka-scenario/case/kafka-thread2-ping").build();
Response response = null;
try {
response = client.newCall(request).execute();
} catch (IOException e) {
}
response.body().close();
return true;
}
return false;
}
}
异步线程Tracing
对于Kafka消息的发送,经常会配合异步线程池的场景使用,Tracing的基本原理是基于ThreadLocal进行实现的,那么对于异步场景,是会丢失TraceId,通常的解决方式,是需要手动将主线程的TraceId手动赋值给子线程,但这种方式需要手动代码侵入,并不友好。
幸运的是,Skywalking的toolkit中提供了对于异步线程tracing的支持。
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>apm-toolkit-trace</artifactId>
<version>${skywalking.version}</version>
</dependency>
推荐用法:
ExecutorService executorService = Executors.newFixedThreadPool(1);
executorService.execute(RunnableWrapper.of(new Runnable() {
@Override public void run() {
//your code
}
}));
或者:
@TraceCrossThread
public static class MyCallable<String> implements Callable<String> {
@Override
public String call() throws Exception {
return null;
}
}
...
ExecutorService executorService = Executors.newFixedThreadPool(1);
executorService.submit(new MyCallable());
PS:事实上,RunnableWrapper也是基于@TraceCrossThread实现。
相关文档:
https://skywalking.apache.org/docs/skywalking-java/v8.16.0/en/setup/service-agent/java-agent/application-toolkit-kafka/
https://skyapm.github.io/document-cn-translation-of-skywalking/zh/6.1.0/setup/service-agent/java-agent/Application-toolkit-trace-cross-thread.html
https://blog.51cto.com/knifeedge/5268667
https://blog.csdn.net/lijunwyf/article/details/107954543