回顾上期的问题,当我们搭建完成Skywalking的搭建,顺利完成应用监控之后,就会面临一类问题,怎么利用获取的监控数据,包括三方面:
1 应用的Trace和SW收集Service/Endpoint不一定完全一致,可能定位不到,更无法在UI展示
2 按Trace-Span进行下钻分析,SW并不支持,更别说,对于按Trace不同Span特征(可以理解为一项业务在不同阶段的特征数据)进行分析
3 业务本身要求监控展示统一技术标准
好在SkyWalking提供了GraphQL数据接口,并配合OAL观察查询语句,使得用户可以直接通过简单的GraphQL查询语言获得数据,
其原理和提供原生查询介绍可以参考官方文档:SW 官方文档
或者网络材料Skywalking-11:Skywalking查询协议——案例分析
我们监控使用grafana8, 因此选择 ,在grafana8通过GraphQL数据接口,接入SW监控数据,接入的过程参考
Linux环境安装开发grafana插件
以及grafana结合Skywalking追踪Trace
但是grafana本身的数据处理能力太弱,于是可以选择在grafana和Skywalking之间增加一个java开发的数据处理模块TraceProcessor,通过TraceProcessor获取SW的trace和Span数据,然后进行加工处理后在ES进行持久化,然后由grafana直接展示ES的数据。
TraceProcessor的主要架构是基于多线程多任务的定时任务,定时获取,计算Trace数据,并支持Graphql,ES接口,以及按配置定制任务的能力,架构如下
我们先从配置工具 config tools入手,希望通过配置文件完成配置数据源(graqhQL)和持久化工具(ES)以及各类定时任务的配置关联,运行时通过反射方式加载各个操作类和定时任务(参考java以SSL方式连ES),以满足敏捷灵活的开发需求
{
"datasource" : {
"name": "datasource.GraphQLServiceImp",
"para": {
"url":"http://127.0.0.1:8090/graphql"
}
},
"targetdb" : {
"name": "target.EsServiceImp",
"para": {
"url":"http://127.0.0.1:9200"
}
},
"tasks" : [
{
"name": "task.QueryTraces",
"para" : {
"serviceName" : "TradeService",
"endpointName" : "OrderSend",
"businessTag" : { "key": "businessTag", "value": "Auto"},
"tags" : {},
"traces_index" : "traces_-"
},
"switch" : "on",
"interval" : "60"
},
...
{
"name": "task.Caculator",
"para" : {
"businessTags" :[{ "key": "businessTag", "value": "Auto"},{"key": "systemFlag","value": "RealTime"}],
"traces_index" : "traces-",
"stat_index" : "traces_index-"
},
"switch" : "on",
"interval" : "60",
"delay" : 10
},
...
]
}
config.json的总体风格定义 执行类(包括路径)和参数,例如数据源graphql,连接,查询执行类就是datasource.GraphQLServiceImp,参数只有一个url http://127.0.0.1:8090/graphql 具体连接参考备忘:python和 java graphql client连Sky walking Server查询数据的联通性中的java部分
除了联通性,datasource.GraphQLServiceImp包含方法还有(以接口形式罗列)
//GraphQLServiceImp对应的interface
public interface DatasourceService {
//联通及初始化方法
public void initConnect(String url);
//按ServiceName查询ServiceId
public String queryServiceId(String ServiceName);
//按ServiceName和EndpointName查询EndpointId
public String queryEndPointId(String endpointName,String serviceName);
//单页查询,按ServiceId,EndpointId,start_time和End_time以及tags查询trace (page=1)
public ArrayNode getTotalTraces(String serviceId,String endpointId,String start_time, String end_time,JsonNode tags);
//多页查询,按ServiceId,EndpointId,start_time和End_time以及tags,和pages 查询trace
public ArrayNode getTotalTraces2(String serviceId,String endpointId,String start_time,
String end_time,JsonNode tags,int pageNum);
//按TraceId查span
public ArrayNode getTraceSpans(String traceId);
}
同理,ES执行类是target.EsServiceImp,参数url为http://127.0.0.1:9200,同样按接口方式罗列操作方法
具体连接可以参考java以SSL方式连ES
public interface TargetdbService {
//初始化连接
public void initConnect(String url,String userName,String password);
//按索引名判断是否存在
public boolean isExisted(String indexName);
//按key value判断健值对是否在指定索引中存在
public boolean isNotInTheIndex(String indexName,String key,String value);
//按索引名和mapping创建索引
public boolean createForm(String indexName, XContentBuilder mapping);
//按索引名删除索引
public boolean deleteForm(String indexName);
//按索引名和关键值seqNo,插入Map
public boolean insertDate(String indexName, String seqNo,Map dataMap);
//按索引名,批量更新map(List)
public boolean updateDate(String indexName,Map<String, List<Map<String,Object>>> resultMap);
//按关键字(startTime,endTime和tag标签)查询索引
public Map<String, Object> queryData(String indexName, ArrayNode businessTags, String startTime,
String endTime, String resultTag);
}
对于定时任务,分为两类:
1) tarce查询任务:按需求和Skywalking查询条件,定时查询并筛选Trace,加上业务标签,并持久化
2) trace的指标计算任务:根据查询数据,按业务标签定时计算指标,例如每分钟请求数,平均/最大/百分位数延时、并持久化
这些任务可以根据定时框架调用,分为定时任务类TaskManager
public class TaskManager {
private ScheduledExecutorService executorService;
public TaskManager() {
executorService = Executors.newScheduledThreadPool(10);
}
public void addTask(Runnable task, long delay, long period, TimeUnit timeUnit) {
executorService.scheduleAtFixedRate(task, delay, period, timeUnit);
}
public void shutdown() {
executorService.shutdown();
}
}
和调度类MyTaskProcess
public class MyTaskProcess {
private final static Logger logger = LoggerFactory.getLogger(MyTaskProcess.class);
public static void main(String[] args) {
TaskManager taskManager = new TaskManager(); //任务管理器
try{
// 读入配置文件
ConfigParser config=new ConfigParser("config.json");
// 连接SW Server 数据接口
DatasourceService datasourceInstance=config.getDatasource();
String datasourceUrl= config.getGraphqlUrl();
datasourceInstance.initConnect(datasourceUrl);
// 连接ES,获得可用的数据库
TargetdbService targetdbInstance=config.getTargetdb();
String targetdbUrl=config.getTargetDBUrl();
targetdbInstance.initConnect(targetdbUrl);
logger.info("start:: {} ...",new Date());
//读入任务列表,并且遍历
ArrayNode taskList=config.getTaskList();
taskList.forEach(JsonNode->{
String taskName=JsonNode.get("name").asText();
String switch_on=JsonNode.get("switch").asText();
logger.info("taskName:: {} switch_on:: {}",taskName,switch_on);
if(switch_on.equals("on")){
//判断开关是否打开
int interval=JsonNode.get("interval").asInt();
int delay=1; // 默认延迟
if(null!=JsonNode.get("delay"))
if(JsonNode.get("delay").asInt()>0){
delay=JsonNode.get("delay").asInt();
logger.info("delay:: {}",delay);
}
try {
TaskService task=(TaskService)config.getClass(taskName);
task.init(JsonNode.get("para"),datasourceInstance,targetdbInstance);
taskManager.addTask((Runnable) task, delay, interval, TimeUnit.SECONDS);
} catch (Exception e) {
e.printStackTrace();
}
}
});
}catch (Exception e){
e.printStackTrace();
}finally {
// 注册钩子线程
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
System.out.println("Shutting down SWTraceProcessor...");
taskManager.shutdown();
}));
}
//taskManager.shutdown();
}
}
后续我们会给出一个例子,探讨对trace数据深加工的目标和具体实现