基于Skywalking开发分布式监控(三)

回顾上期的问题,当我们搭建完成Skywalking的搭建,顺利完成应用监控之后,就会面临一类问题,怎么利用获取的监控数据,包括三方面:
1 应用的Trace和SW收集Service/Endpoint不一定完全一致,可能定位不到,更无法在UI展示
2 按Trace-Span进行下钻分析,SW并不支持,更别说,对于按Trace不同Span特征(可以理解为一项业务在不同阶段的特征数据)进行分析
3 业务本身要求监控展示统一技术标准

好在SkyWalking提供了GraphQL数据接口,并配合OAL观察查询语句,使得用户可以直接通过简单的GraphQL查询语言获得数据,
其原理和提供原生查询介绍可以参考官方文档:SW 官方文档
或者网络材料Skywalking-11:Skywalking查询协议——案例分析

我们监控使用grafana8, 因此选择 ,在grafana8通过GraphQL数据接口,接入SW监控数据,接入的过程参考
Linux环境安装开发grafana插件
以及grafana结合Skywalking追踪Trace

但是grafana本身的数据处理能力太弱,于是可以选择在grafana和Skywalking之间增加一个java开发的数据处理模块TraceProcessor,通过TraceProcessor获取SW的trace和Span数据,然后进行加工处理后在ES进行持久化,然后由grafana直接展示ES的数据。

TraceProcessor的主要架构是基于多线程多任务的定时任务,定时获取,计算Trace数据,并支持Graphql,ES接口,以及按配置定制任务的能力,架构如下
在这里插入图片描述
我们先从配置工具 config tools入手,希望通过配置文件完成配置数据源(graqhQL)和持久化工具(ES)以及各类定时任务的配置关联,运行时通过反射方式加载各个操作类和定时任务(参考java以SSL方式连ES),以满足敏捷灵活的开发需求

{
 "datasource" : {
    "name": "datasource.GraphQLServiceImp",
    "para": {
        "url":"http://127.0.0.1:8090/graphql"       
    }
 },
 "targetdb" : {
    "name": "target.EsServiceImp",
    "para": {
        "url":"http://127.0.0.1:9200"
    }
 },
 "tasks" : [
    {
      "name": "task.QueryTraces",
      "para" : {
          "serviceName" : "TradeService",
          "endpointName" : "OrderSend",
          "businessTag" : { "key": "businessTag", "value": "Auto"},
          "tags" : {},
          "traces_index" :  "traces_-"
      },
      "switch" : "on",
      "interval" : "60"
    },
  ...
  {
     "name": "task.Caculator",
     "para" : {
       "businessTags" :[{ "key": "businessTag", "value": "Auto"},{"key": "systemFlag","value": "RealTime"}],
       "traces_index" :  "traces-",
       "stat_index" : "traces_index-"
     },
     "switch" : "on",
     "interval" : "60",
     "delay" : 10
   },
  ...
 ]
}

config.json的总体风格定义 执行类(包括路径)和参数,例如数据源graphql,连接,查询执行类就是datasource.GraphQLServiceImp,参数只有一个url http://127.0.0.1:8090/graphql 具体连接参考备忘:python和 java graphql client连Sky walking Server查询数据的联通性中的java部分

除了联通性,datasource.GraphQLServiceImp包含方法还有(以接口形式罗列)

//GraphQLServiceImp对应的interface
public interface DatasourceService {
    //联通及初始化方法
    public void initConnect(String url);
    //按ServiceName查询ServiceId
    public String queryServiceId(String ServiceName);
    //按ServiceName和EndpointName查询EndpointId
    public String queryEndPointId(String endpointName,String serviceName);
    //单页查询,按ServiceId,EndpointId,start_time和End_time以及tags查询trace (page=1)
    public ArrayNode getTotalTraces(String serviceId,String endpointId,String start_time, String end_time,JsonNode tags);
    //多页查询,按ServiceId,EndpointId,start_time和End_time以及tags,和pages 查询trace 
    public ArrayNode getTotalTraces2(String serviceId,String endpointId,String start_time,
                                            String end_time,JsonNode tags,int pageNum);
     //按TraceId查span
    public ArrayNode getTraceSpans(String traceId);

}

同理,ES执行类是target.EsServiceImp,参数url为http://127.0.0.1:9200,同样按接口方式罗列操作方法
具体连接可以参考java以SSL方式连ES

public interface TargetdbService {
    //初始化连接
    public void initConnect(String url,String userName,String password);
    //按索引名判断是否存在
    public boolean isExisted(String indexName);
    //按key value判断健值对是否在指定索引中存在
    public boolean isNotInTheIndex(String indexName,String key,String value);
    //按索引名和mapping创建索引
    public boolean createForm(String indexName, XContentBuilder mapping);
    //按索引名删除索引
    public boolean deleteForm(String indexName);
    //按索引名和关键值seqNo,插入Map
    public boolean insertDate(String indexName, String seqNo,Map dataMap);
     //按索引名,批量更新map(List)
    public boolean updateDate(String indexName,Map<String, List<Map<String,Object>>> resultMap);
    //按关键字(startTime,endTime和tag标签)查询索引
    public Map<String, Object> queryData(String indexName, ArrayNode businessTags, String startTime,
                                         String endTime, String resultTag);


}

对于定时任务,分为两类:
1) tarce查询任务:按需求和Skywalking查询条件,定时查询并筛选Trace,加上业务标签,并持久化

2) trace的指标计算任务:根据查询数据,按业务标签定时计算指标,例如每分钟请求数,平均/最大/百分位数延时、并持久化

这些任务可以根据定时框架调用,分为定时任务类TaskManager

public class TaskManager {
    private ScheduledExecutorService executorService;

    public TaskManager() {
        executorService = Executors.newScheduledThreadPool(10);
    }

    public void addTask(Runnable task, long delay, long period, TimeUnit timeUnit) {
        executorService.scheduleAtFixedRate(task, delay, period, timeUnit);
    }

    public void shutdown() {
        executorService.shutdown();
    }

}

和调度类MyTaskProcess

public class MyTaskProcess {
    private final static Logger logger = LoggerFactory.getLogger(MyTaskProcess.class);
    public static void main(String[] args) {
        TaskManager taskManager = new TaskManager();   //任务管理器
        try{
            // 读入配置文件
            ConfigParser config=new ConfigParser("config.json");

            // 连接SW Server 数据接口
            DatasourceService datasourceInstance=config.getDatasource();
            String datasourceUrl= config.getGraphqlUrl();
            datasourceInstance.initConnect(datasourceUrl);

            // 连接ES,获得可用的数据库
            TargetdbService targetdbInstance=config.getTargetdb();
            String targetdbUrl=config.getTargetDBUrl();
            targetdbInstance.initConnect(targetdbUrl);

            
            logger.info("start:: {} ...",new Date());
            //读入任务列表,并且遍历
            ArrayNode taskList=config.getTaskList();

            taskList.forEach(JsonNode->{
                String taskName=JsonNode.get("name").asText();
                String switch_on=JsonNode.get("switch").asText();

                
                logger.info("taskName:: {} switch_on:: {}",taskName,switch_on);

                if(switch_on.equals("on")){
                    //判断开关是否打开
                    
                    int interval=JsonNode.get("interval").asInt();
                    int delay=1; // 默认延迟
                    if(null!=JsonNode.get("delay"))
                        if(JsonNode.get("delay").asInt()>0){
                            delay=JsonNode.get("delay").asInt();
                            logger.info("delay:: {}",delay);
                        }
                    try {
                        TaskService task=(TaskService)config.getClass(taskName);
                        task.init(JsonNode.get("para"),datasourceInstance,targetdbInstance);
                        taskManager.addTask((Runnable) task, delay, interval, TimeUnit.SECONDS);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }


                }

            });
          

        }catch (Exception e){
            e.printStackTrace();
        }finally {
            // 注册钩子线程
            Runtime.getRuntime().addShutdownHook(new Thread(() -> {
                System.out.println("Shutting down SWTraceProcessor...");
                taskManager.shutdown();
            }));
        }

        //taskManager.shutdown();
    }
}

后续我们会给出一个例子,探讨对trace数据深加工的目标和具体实现

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/401570.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【快速上手QT】05-绘画Paint

我们写一个QT程序&#xff0c;说实话&#xff0c;很难昧着良心说这个QT界面很好看&#xff08;技术高超的小伙伴请忽略我这句话&#xff09;。但是我们可以使用绘画事件来弥补一下“相貌丑陋”的这个缺点。 paintEvent 我们可以对主界面进行绘图&#xff0c;从而达到美化界面…

js设计模式:计算属性模式

作用: 将对象中的某些值与其他值进行关联,根据其他值来计算该值的结果 vue中的计算属性就是很经典的例子 示例: let nowDate 2023const wjtInfo {brithDate:1995,get age(){return nowDate-this.brithDate}}console.log(wjtInfo.age,wjt年龄)nowDate 1console.log(wjtInf…

【UI自动化】使用poco框架进行元素唯一定位

直接选择&#xff1a; 1.poco(text买入).click() 2.poco("android.widget.ImageView").click()相对选择、空间选择&#xff1a; 3.poco(text/name).parent().child()[0].click()正则表达式&#xff1a; 4.listpoco(textMatches".*ETF")今天主要想记录下…

操作系统导论-课后作业-ch19

1. 本书在第6章中有过介绍&#xff0c;gettimeofday函数最多精确到us&#xff0c;并且大致精确&#xff08;并不完全精确&#xff09;&#xff0c;需要多迭代几次减少误差&#xff0c;循环次数太多也会导致结束时间小于开始时间&#xff08;即回滚&#xff09;的现象&#xff…

Kaggle实践之《Home Credit Default Risk》的逐步优化

记录下每一次的改进及其score。 1、只用训练集的特征简单处理 特征只用训练集的特征&#xff0c;把string型的特征全部进行one-hot转化&#xff0c;然后随机1:4分成测试集训练集&#xff0c;模型也调参直接出结果。 最终的score是训练集80.13%、验证集76.33%、线上74.28%。 …

Java 注解机制解密并发编程的时间之谜:揭开Happens-Before的神秘面纱

优质博文&#xff1a;IT-BLOG-CN 一、简介 为什么需要happens-before原则&#xff1a; 主要是因为Java内存模型 &#xff0c; 为了提高CPU效率&#xff0c;通过工作内存Cache代替了主内存。修改这个临界资源会更新work memory但并不一定立刻刷到主存中。通常JMM会将编写的代码…

⭐北邮复试刷题LCR 052. 递增顺序搜索树__DFS (力扣119经典题变种挑战)

LCR 052. 递增顺序搜索树 给你一棵二叉搜索树&#xff0c;请 按中序遍历 将其重新排列为一棵递增顺序搜索树&#xff0c;使树中最左边的节点成为树的根节点&#xff0c;并且每个节点没有左子节点&#xff0c;只有一个右子节点。 示例 1&#xff1a; 输入&#xff1a;root [5,…

零基础学习8051单片机(十六)

继续学习8051单片机&#xff0c;本次通过观看视频在此学习8051单片机的中断系统 要掌握单片机中断系统的硬件结构和工作原理 掌握&#xff1a;中断系统的初始化编程以及中断服务子程序设计。 一、中断的基本过程图&#xff1a; 中断的响应和处理过程&#xff1a; 当中断提出…

Redis篇----第十三篇

系列文章目录 文章目录 系列文章目录前言一、假如 Redis 里面有 1 亿个 key,其中有 10w 个 key是以某个固定的已知的前缀开头的,如果将它们全部找出来?二、如果有大量的 key 需要设置同一时间过期,一般需要注意什么?三、使用过 Redis 做异步队列么,你是怎么用的?四、使用…

Apache Doris 发展历程、技术特性及云原生时代的未来规划

文章目录 作者介绍 Apache Doris特性极简结构高效自运维高并发场景支持MPP 执行引擎明细与聚合模型的统一便捷数据接入 Apache Doris 极速 1.0 时代极速 关于 Apache Doris 开源社区基于云原生向量数据库 Milvus 的云平台设计实践作者介绍图书推荐 本文节选自《基础软件之路&am…

第六篇【传奇开心果系列】Python文本和语音相互转换库技术点案例示例:深度解读Kaldi库个性化定制语音搜索引擎

传奇开心果短博文系列 系列短博文目录Python文本和语音相互转换库技术点案例示例系列 短博文目录前言一、雏形示例代码二、扩展思路介绍三、数据准备示例代码四、特征提取示例代码五、声学模型训练示例代码六、语言模型训练示例代码七、解码示例代码八、评估和调优示例代码九、…

LeetCode | 寻找两个正序数组的中位数 Python C语言

Problem: 4. 寻找两个正序数组的中位数 文章目录 思路解题方法Code结果结果一些思考 思路 先合并&#xff0c;后排序&#xff0c;最后找中间轴。 解题方法 由解题思路可知 Code 这是python3的代码。 class Solution(object):def findMedianSortedArrays(self, nums1, num…

分享Video.js观看Web视频流

界面效果 HTML结构 <div class"homePopup" ><div class"search_box animate__animated animate__fadeInDown" style"display: none;"><div class"van-search" style"background: rgba(0, 0, 0, 0);">&…

【云原生】Docker consul的容器服务更新与发现

目录 什么是服务注册与发现 什么是consul consul提供的一些关键特性&#xff1a; consul 部署 consul服务器 1. 建立 Consul 服务 设置代理&#xff0c;在后台启动 consul 服务端 2. 查看集群信息 查看members状态 查看集群状态 3. 通过 http api 获取集群信息 regi…

SpringBoot指定外部环境配置

nohup java -Xms256m -Xmx512m -Dfile.encodingUTF-8 -jar /usr/local/xxxx.jar --spring.profiles.activeprod > system.log 2>&1 & --spring.profiles.activeprod修改的是多环境配置中内部application.properties里的spring.profiles.active值 -Dspring.config…

新手初期交易是盈利的,等熟练了却开始亏损了?

许多个人投资者涌入市场&#xff0c;初衷是期望能实现暴利并创造奇迹。梦想是美好的&#xff0c;追求暴利也无可非议&#xff0c;但最担心的是一开始就设定了不切实际的目标。例如&#xff0c;他们可能认为&#xff0c;若一年内不能赚取五倍或十倍的收益&#xff0c;就等于失败…

【AI大语言模型】ChatGPT在地学、GIS、气象、农业、生态、环境等领域中的应用

以ChatGPT、LLaMA、Gemini、DALLE、Midjourney、Stable Diffusion、星火大模型、文心一言、千问为代表AI大语言模型带来了新一波人工智能浪潮&#xff0c;可以面向科研选题、思维导图、数据清洗、统计分析、高级编程、代码调试、算法学习、论文检索、写作、翻译、润色、文献辅助…

Android13 编译ninja failed with: exit status 137

描述 现象很奇怪&#xff0c;主机是ubuntu 18.04&#xff0c; 内存有32G&#xff0c;并且系统中有两份Android13代码&#xff0c; 有一份编译正常&#xff0c;另外一份编译不正常&#xff0c;一度以为是因为下载源码不齐全导致&#xff0c;后面仔细看日志&#xff0c;原来是内…

同步系统时间chrony

安装 yum install chrony 启动并启用chronyd服务&#xff1a; systemctl start chronyd systemctl enable chronyd 查看chrony的状态&#xff0c;确认是否已经与NTP服务器同步 chronyc tracking

ElasticSearch DSL查询、排序 、分页的原理及语法

1. DSL查询分类和基本语法 ElasticSearch提供了基于Json的DSL来定义查询&#xff0c;常见的查询类型包括&#xff1a; • 查询所有&#xff1a;查询出所有数据&#xff0c;一般测试用&#xff0c;一般不是查出所有&#xff0c;一次性查询20条。例如 match_all • 全文检索(ful…