Neo4j实现表字段级血缘关系

需求背景

需要在前端页面展示当前表字段的所有上下游血缘关系,以进一步做数据诊断治理。大致效果图如下:
在这里插入图片描述首先这里解释什么是表字段血缘关系,SQL 示例:

CREATE TABLE IF NOT EXISTS table_b
AS SELECT order_id, order_status FROM table_a;

如上 DDL 语句中,创建的 table_b 的 order_id 和 order_status 字段来源于 table_a,代表table_a 就是 table_b 的来源表,也叫上游表,table_b 就是 table_a 下游表,另外 table_a.order_id 就是 table_b.order_id 的上游字段,它们之间就存在血缘关系。

INSERT INTO table_c
SELECT a.order_id, b.order_status
FROM table_a a JOIN table_b b ON a.order_id = b.order_id;

如上 DML 语句中,table_c 的 order_id 字段来源于 table_a,而 order_status 来源于 table_b,表示 table_c 和 table_a、table_b 之间也存在血缘关系。

由上也可看出想要存储血缘关系,还需要先解析 sql,这块儿主要使用了开源项目 calcite 的解析器,这篇文章不再展开,本篇主要讲如何存储和如何展示

环境配置

参考另一篇:SpringBoot 配置内嵌式 Neo4j

Node 数据结构定义

因为要展示表的字段之间的血缘关系,所以直接将表字段作为图节点存储,表字段之间的血缘关系就用图节点之间的关系表示,具体 node 定义如下:

public class ColumnVertex {
  // 唯一键
  private String name;

  public ColumnVertex(String catalogName, String databaseName, String tableName, String columnName) {
    this.name = catalogName + "." + databaseName + "." + tableName + "." + columnName;
  }

  public String getCatalogName() {
    return Long.parseLong(name.split("\\.")[0]);
  }

  public String getDatabaseName() {
    return name.split("\\.")[1];
  }

  public String getTableName() {
    return name.split("\\.")[2];
  }

  public String getColumnName() {
    return name.split("\\.")[3];
  }
}

通用 Service 定义

public interface EmbeddedGraphService {
    // 添加图节点以及与上游节点之间的关系
    void addColumnVertex(ColumnVertex currentVertex, ColumnVertex upstreamVertex);
    // 寻找上游节点
    List<ColumnVertex> findUpstreamColumnVertex(ColumnVertex currentVertex);
    // 寻找下游节点
    List<ColumnVertex> findDownstreamColumnVertex(ColumnVertex currentVertex);
}

Service 实现

import javax.annotation.Resource;
import org.neo4j.graphdb.GraphDatabaseService;
import org.neo4j.graphdb.Result;
import org.neo4j.graphdb.Transaction;
import org.springframework.stereotype.Service;

@Service
public class EmbeddedGraphServiceImpl implements EmbeddedGraphService {

  @Resource private GraphDatabaseService graphDb;

  @Override
  public void addColumnVertex(ColumnVertex currentVertex, ColumnVertex upstreamVertex) {
    try (Transaction tx = graphDb.beginTx()) {
      tx.execute(
          "MERGE (c:ColumnVertex {name: $currentName}) MERGE (u:ColumnVertex {name: $upstreamName})"
              + " MERGE (u)-[:UPSTREAM]->(c)",
          Map.of("currentName", currentVertex.getName(), "upstreamName", upstreamVertex.getName()));
      tx.commit();
    }
  }

  @Override
  public List<ColumnVertex> findUpstreamColumnVertex(ColumnVertex currentVertex) {
    List<ColumnVertex> result = new ArrayList<>();
    try (Transaction tx = graphDb.beginTx()) {
      Result queryResult =
          tx.execute(
              "MATCH (u:ColumnVertex)-[:UPSTREAM]->(c:ColumnVertex) WHERE c.name = $name RETURN"
                  + " u.name AS name",
              Map.of("name", currentVertex.getName()));
      while (queryResult.hasNext()) {
        Map<String, Object> row = queryResult.next();
        result.add(new ColumnVertex().setName((String) row.get("name")));
      }
      tx.commit();
    }
    return result;
  }

  @Override
  public List<ColumnVertex> findDownstreamColumnVertex(ColumnVertex currentVertex) {
    List<ColumnVertex> result = new ArrayList<>();
    try (Transaction tx = graphDb.beginTx()) {
      Result queryResult =
          tx.execute(
              "MATCH (c:ColumnVertex)-[:UPSTREAM]->(d:ColumnVertex) WHERE c.name = $name RETURN"
                  + " d.name AS name",
              Map.of("name", currentVertex.getName()));
      while (queryResult.hasNext()) {
        Map<String, Object> row = queryResult.next();
        result.add(new ColumnVertex().setName((String) row.get("name")));
      }
      tx.commit();
    }
    return result;
  }
}

遍历图节点

实现逻辑:

  1. restful 接口入参:当前表(catalogName, databaseName, tableName)
  2. 定义返回给前端的数据结构,采用 nodes 和 edges 方式返回,然后前端再根据节点与边关系渲染出完整的血缘关系图;
public class ColumnLineageVO {
  List<ColumnLineageNode> nodes;
  List<ColumnLineageEdge> edges;
}

public class ColumnLineageNode {
  private String databaseName;
  private String tableName;
  private List<String> columnNames;
}

public class ColumnLineageEdge {
  private ColumnLineageEdgePoint source;
  private ColumnLineageEdgePoint target;
}

public class ColumnLineageEdgePoint {
  private String databaseName;
  private String tableName;
  private String columnName;
}
  1. 查询表字段;
  2. 采用递归的方式,利用当前表字段遍历与当前表字段关联的所有上下游图节点;
  3. 将所有节点封装成 List ColumnLineageVO 返回给前端 。
public ColumnLineageVO getColumnLineage(Table table) {
    ColumnLineageVO columnLineageVO = new ColumnLineageVO();
    List<ColumnLineageNode> nodes = new ArrayList<>();
    List<ColumnLineageEdge> edges = new ArrayList<>();
    // Deduplication
    Set<String> visitedNodes = new HashSet<>();
    Set<String> visitedEdges = new HashSet<>();
    Map<String, List<ColumnVertex>> upstreamCache = new HashMap<>();
    Map<String, List<ColumnVertex>> downstreamCache = new HashMap<>();

    ColumnLineageNode currentNode =
        ColumnLineageNode.builder()
            .databaseName(table.getDatabaseName())
            .tableName(table.getTableName())
            .type(TableType.EXTERNAL_TABLE.getDesc())
            .build();
    nodes.add(currentNode);
    visitedNodes.add(currentNode.getDatabaseName() + "." + currentNode.getTableName());

    for (String columnName : table.getColumnNames()) {
      ColumnVertex currentVertex =
          new ColumnVertex(
              table.getScriptId(), table.getDatabaseName(), table.getTableName(), columnName);
      traverseUpstreamColumnVertex(
          currentVertex, nodes, edges, visitedNodes, visitedEdges, upstreamCache);
      traverseDownstreamColumnVertex(
          currentVertex, nodes, edges, visitedNodes, visitedEdges, downstreamCache);
    }

    columnLineageVO.setNodes(nodes);
    columnLineageVO.setEdges(edges);
    return columnLineageVO;
  }

private void traverseUpstreamColumnVertex(
      ColumnVertex currentVertex,
      List<ColumnLineageNode> nodes,
      List<ColumnLineageEdge> edges,
      Set<String> visitedNodes,
      Set<String> visitedEdges,
      Map<String, List<ColumnVertex>> cache) {
    List<ColumnVertex> upstreamVertices;
    if (cache.containsKey(currentVertex.getName())) {
      upstreamVertices = cache.get(currentVertex.getName());
    } else {
      upstreamVertices = embeddedGraphService.findUpstreamColumnVertex(currentVertex);
      cache.put(currentVertex.getName(), upstreamVertices);
    }
    for (ColumnVertex upstreamVertex : upstreamVertices) {
      String nodeKey = upstreamVertex.getDatabaseName() + "." + upstreamVertex.getTableName();
      if (!visitedNodes.contains(nodeKey)) {
        ColumnLineageNode upstreamNode =
            ColumnLineageNode.builder()
                .databaseName(upstreamVertex.getDatabaseName())
                .tableName(upstreamVertex.getTableName())
                .type(TableType.EXTERNAL_TABLE.getDesc())
                .build();
        nodes.add(upstreamNode);
        visitedNodes.add(nodeKey);
      }
      String edgeKey =
          upstreamVertex.getDatabaseName()
              + upstreamVertex.getTableName()
              + upstreamVertex.getColumnName()
              + currentVertex.getDatabaseName()
              + currentVertex.getTableName()
              + currentVertex.getColumnName();
      if (!visitedEdges.contains(edgeKey)) {
        ColumnLineageEdge edge = createEdge(upstreamVertex, currentVertex);
        edges.add(edge);
        visitedEdges.add(edgeKey);
      }
      traverseUpstreamColumnVertex(upstreamVertex, nodes, edges, visitedNodes, visitedEdges, cache);
    }
  }
  
private void traverseDownstreamColumnVertex(
      ColumnVertex currentVertex,
      List<ColumnLineageNode> nodes,
      List<ColumnLineageEdge> edges,
      Set<String> visitedNodes,
      Set<String> visitedEdges,
      Map<String, List<ColumnVertex>> cache) {
    List<ColumnVertex> downstreamVertices;
    if (cache.containsKey(currentVertex.getName())) {
      downstreamVertices = cache.get(currentVertex.getName());
    } else {
      downstreamVertices = embeddedGraphService.findDownstreamColumnVertex(currentVertex);
      cache.put(currentVertex.getName(), downstreamVertices);
    }
    for (ColumnVertex downstreamVertex : downstreamVertices) {
      String nodeKey = downstreamVertex.getDatabaseName() + "." + downstreamVertex.getTableName();
      if (!visitedNodes.contains(nodeKey)) {
        ColumnLineageNode downstreamNode =
            ColumnLineageNode.builder()
                .databaseName(downstreamVertex.getDatabaseName())
                .tableName(downstreamVertex.getTableName())
                .type(TableType.EXTERNAL_TABLE.getDesc())
                .build();
        nodes.add(downstreamNode);
        visitedNodes.add(nodeKey);
      }
      String edgeKey =
          currentVertex.getDatabaseName()
              + currentVertex.getTableName()
              + currentVertex.getColumnName()
              + downstreamVertex.getDatabaseName()
              + downstreamVertex.getTableName()
              + downstreamVertex.getColumnName();
      if (!visitedEdges.contains(edgeKey)) {
        ColumnLineageEdge edge = createEdge(currentVertex, downstreamVertex);
        edges.add(edge);
        visitedEdges.add(edgeKey);
      }
      traverseDownstreamColumnVertex(
          downstreamVertex, nodes, edges, visitedNodes, visitedEdges, cache);
    }
  }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/89744.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

十五、systemctl命令如何使用?

在Linux系统中&#xff0c;一些内置服务可以通过systemctl控制&#xff0c;部分第三方软件也可以通过systemctl控制。 1、基础语法 start&#xff1a;开启服务&#xff1b; stop&#xff1a;关闭服务&#xff1b; status&#xff1a;查看服务当前状态&#xff1b; enable&a…

Centos 7.6 安装mongodb

以下是在CentOS 7.6上安装MongoDB的步骤&#xff1a; 打开终端并以root用户身份登录系统。 创建一个新的MongoDB存储库文件 /etc/yum.repos.d/mongodb-org-4.4.repo 并编辑它。 sudo vi /etc/yum.repos.d/mongodb-org-4.4.repo在编辑器中&#xff0c;添加下面的内容到文件中并…

Vue中使用element-plus中的el-dialog定义弹窗-内部样式修改-v-model实现-demo

效果图 实现代码 <template><el-dialog class"no-code-dialog" v-model"isShow" title"没有收到验证码&#xff1f;"><div class"nocode-body"><div class"tips">请尝试一下操作</div><d…

智慧化工地SaaS平台源码,PC端+APP端+智慧数据可视化大屏端,源码完全开源不封装,自主研发,支持二开,项目使用,微服务+Java++vue+mysql

智慧工地管理平台充分运用数字化技术&#xff0c;聚焦施工现场岗位一线&#xff0c;依托物联网、互联网、AI等技术&#xff0c;围绕施工现场管理的人、机、料、法、环五大维度&#xff0c;以及施工过程管理的进度、质量、安全三大体系为基础应用&#xff0c;实现全面高效的工程…

CAD哪个版本最好用?学习一下CAD版本转换方法

CAD即计算机辅助设计&#xff0c;是一个制图软件&#xff0c;用于绘制建筑、机械、电子等领域的图纸。CAD文件通常被称为“图纸”或“工程图”。 CAD文件通常在以下方面使用&#xff1a; 1. 建筑&#xff1a;建筑师使用CAD文件来创建建筑物的平面图、立体图和剖面图。 2. 机…

Hugo托管到Github Pages

Github通过其Github Pages服务可以user、project或organization提供免费快速的静态托管&#xff0c;同时使用Github Actions自动化开发工作流和构建。 1.创建Github仓库 可见性为public。 命名为username.github.io&#xff0c;username为你的Github用户名。 2.添加远程仓库…

Hystrix: 服务降级

cloud是基础&#xff0c;eureka是服务注册和发现&#xff0c;consumer是消费者去消费provider里的东西&#xff0c;消费方式就是Feign和Ribbon&#xff0c;feign 接口消费&#xff0c;ribbon Rest消费 服务降级发生在客户端&#xff0c;客户端因为请求关闭的服务器&#xff0…

备份集中的数据库备份与现有的数据库不同?

数据已经成为公司的主要资产,特别是对于企业来说&#xff0c;数据库中存储的信息通常是其业务运营的核心。 因此&#xff0c;确保数据库的安全性和完整性至关重要。这导致数据库备份成为企业信息管理的重要组成部分。本文将详细介绍备份密集数据库备份的必要性&#xff0c;以及…

Mybatis的动态SQL及关键属性和标识的区别(对SQL更灵活的使用)

&#xff08; 虽然文章中有大多文本内容&#xff0c;想了解更深需要耐心看完&#xff0c;必定大有受益 &#xff09; 目录 一、动态SQL ( 1 ) 是什么 ( 2 ) 作用 ( 3 ) 优点 ( 4 ) 特殊标签 ( 5 ) 演示 二、#和$的区别 2.1 #使用 ( 1 ) #占位符语法 ( 2 ) #优点 2.…

如何快速在vscode中实现不同python文件的对比查看

总体而言&#xff1a;两种方式。一种是直接点击vscode右上角的图标&#xff08;见下图&#xff09;。 另一种方式就是使用快捷键啦“**Ctrl\**”&#xff0c;用的时候选中想要对比的python文件&#xff0c;然后快捷键就可以达到下图效果了&#xff1a; 建议大家直接使用第二…

用pytorch实现AlexNet

AlexNet经典网络由Alex Krizhevsky、Hinton等人在2012年提出&#xff0c;发表在NIPS&#xff0c;论文名为《ImageNet Classification with Deep Convolutional Neural Networks》&#xff0c;论文见&#xff1a;http://www.cs.toronto.edu/~hinton/absps/imagenet.pdf &#xf…

开学需要买哪些电容笔?ipad可以用的手写笔

因为iPad的功能亮眼&#xff0c;让iPad的用户越来越多&#xff0c;并且越来越受欢迎。用来画画、做笔记都很有用&#xff0c;但要是用来看电视、打游戏的话&#xff0c;使用价值就显得低了。如果你不想买一支价格不菲的苹果电容笔&#xff0c;或是只是想要日常用于书写记录&…

Linux socket网络编程概述 和 相关API讲解

socket网络编程的步骤 大体上&#xff0c;连接的建立过程就是&#xff1a;服务器在确定协议类型后&#xff0c;向外广播IP地址和端口号&#xff0c;并监听等待&#xff0c;直到客户端获取了IP地址和端口号并成功连接&#xff1a; 使用socket来进行tcp协议的网络编程的大体步骤…

Java虚拟机(JVM):引用计数算法

一、引言 我们学习了Java内存运行时区域的各个部分&#xff0c;其中程序计数器、虚拟机栈、本地方法栈3个区域随线程而生&#xff0c;随线程而灭。栈中的栈帧随着方法的进入和退出而有条不紊地执行着出栈和入栈操作。每一个栈帧中分配多少内存基本上是在类结构确定下来就已知的…

Spring Boot 集成 WebSocket 实现服务端推送消息到客户端

WebSocket 简介 WebSocket 协议是基于 TCP 的一种新的网络协议&#xff0c;它实现了浏览器与服务器全双工&#xff08;full-duplex&#xff09;通信—允许服务器主动发送信息给客户端&#xff0c;这样就可以实现从客户端发送消息到服务器&#xff0c;而服务器又可以转发消息到客…

C语言易错点整理

前言&#xff1a; 本文涵盖了博主在平常写C语言题目时经常犯的一些错误&#xff0c;在这里帮大家整理出来&#xff0c;一些易错点会帮大家标识出来&#xff0c;希望大家看完这篇文章后有所得&#xff0c;引以为戒~ 一、 题目&#xff1a; 解答&#xff1a; 首先在这个程序中…

【LeetCode-中等题】73. 矩阵置零

题目 题解一&#xff1a;使用标记数组 public void setZeroes(int[][] matrix) {int m matrix.length;int n matrix[0].length;boolean[] row new boolean[m];boolean[] col new boolean[n];for(int i0; i< m;i){for(int j 0;j<n;j){if (matrix[i][j] 0) row[i]col…

GOLANG面向对象:封装 继承 多态

面向过程转换到面向对象&#xff0c;那么必然会涉及到几个特性&#xff1a;封装&#xff0c;继承&#xff0c;多态&#xff0c;那么Golang中的面向过程会有什么特性&#xff1f;那我们来仔细说一说&#xff1a; 封装&#xff1a; 首先要一个类的概念&#xff0c;类就像一下工厂…

springboot 基于JAVA的动漫周边商城的设计与实现64n21

动漫周边商城分为二个模块&#xff0c;分别是管理员功能模块和用户功能模块。管理员功能模块包括&#xff1a;文章资讯、文章类型、动漫活动、动漫商品功能&#xff0c;用户功能模块包括&#xff1a;文章资讯、动漫活动、动漫商品、购物车&#xff0c;传统的管理方式对时间、地…

《Java极简设计模式》第04章:建造者模式(Builder)

作者&#xff1a;冰河 星球&#xff1a;http://m6z.cn/6aeFbs 博客&#xff1a;https://binghe.gitcode.host 文章汇总&#xff1a;https://binghe.gitcode.host/md/all/all.html 源码地址&#xff1a;https://github.com/binghe001/java-simple-design-patterns/tree/master/j…