导入JDBC元数据到Apache Atlas

前言

前期实现了导入MySQL元数据到Apache Atlas, 由于是初步版本,且功能参照Atlas Hive Hook,实现的不够完美

本期对功能进行改进,实现了导入多种关系型数据库元数据到Apache Atlas

数据库schema与catalog

按照SQL标准的解释,在SQL环境下CatalogSchema都属于抽象概念,可以把它们理解为一个容器或者数据库对象命名空间中的一个层次,主要用来解决命名冲突问题。从概念上说,一个数据库系统包含多个Catalog,每个Catalog又包含多个Schema,而每个Schema又包含多个数据库对象(表、视图、字段等),反过来讲一个数据库对象必然属于一个Schema,而该Schema又必然属于一个Catalog,这样我们就可以得到该数据库对象的完全限定名称,从而解决命名冲突的问题了;例如数据库对象表的完全限定名称就可以表示为:Catalog名称.Schema名称.表名称。这里还有一点需要注意的是,SQL标准并不要求每个数据库对象的完全限定名称是唯一的。

从实现的角度来看,各种数据库系统对CatalogSchema的支持和实现方式千差万别,针对具体问题需要参考具体的产品说明书,比较简单而常用的实现方式是使用数据库名作为Catalog名,使用用户名作为Schema名,具体可参见下表:

表1 常用数据库

供应商Catalog支持Schema支持
Oracle不支持Oracle User ID
MySQL不支持数据库名
MS SQL Server数据库名对象属主名,2005版开始有变
DB2指定数据库对象时,Catalog部分省略Catalog属主名
Sybase数据库名数据库属主名
Informix不支持不需要
PointBase不支持数据库名

原文:https://www.cnblogs.com/ECNB/p/4611309.html

元数据模型层级抽象

不同的关系型数据库,其数据库模式有所区别,对应与下面的层级关系

在这里插入图片描述

  • Datasource -> Catalog -> Schema -> Table -> Column
  • Datasource -> Catalog -> Table -> Column
  • Datasource -> Schema -> Table -> Column

元数据转换设计

在这里插入图片描述

提供元数据

借鉴Apache DolphinScheduler中获取Connection的方式,不多赘述。

public Connection getConnection(DbType dbType, ConnectionParam connectionParam) throws ExecutionException {
        BaseConnectionParam baseConnectionParam = (BaseConnectionParam) connectionParam;
        String datasourceUniqueId = DataSourceUtils.getDatasourceUniqueId(baseConnectionParam, dbType);
        logger.info("Get connection from datasource {}", datasourceUniqueId);

        DataSourceClient dataSourceClient = uniqueId2dataSourceClientCache.get(datasourceUniqueId, () -> {
            Map<String, DataSourceChannel> dataSourceChannelMap = dataSourcePluginManager.getDataSourceChannelMap();
            DataSourceChannel dataSourceChannel = dataSourceChannelMap.get(dbType.getDescp());
            if (null == dataSourceChannel) {
                throw new RuntimeException(String.format("datasource plugin '%s' is not found", dbType.getDescp()));
            }
            return dataSourceChannel.createDataSourceClient(baseConnectionParam, dbType);
        });
        return dataSourceClient.getConnection();
    }

转换元数据

  1. 元数据模型

创建数据库的元数据模型

private AtlasEntityDef createJdbcDatabaseDef() {
   AtlasEntityDef typeDef = createClassTypeDef(DatabaseProperties.JDBC_TYPE_DATABASE,
           Collections.singleton(DatabaseProperties.ENTITY_TYPE_DATASET),
           createOptionalAttrDef(DatabaseProperties.ATTR_URL, "string"),
           createOptionalAttrDef(DatabaseProperties.ATTR_DRIVER_NAME, "string"),
           createOptionalAttrDef(DatabaseProperties.ATTR_PRODUCT_NAME, "string"),
           createOptionalAttrDef(DatabaseProperties.ATTR_PRODUCT_VERSION, "string")
   );

   typeDef.setServiceType(DatabaseProperties.ENTITY_SERVICE_TYPE);

   return typeDef;
}

创建数据库模式的元数据模型

private AtlasEntityDef createJdbcSchemaDef() {
    AtlasEntityDef typeDef = AtlasTypeUtil.createClassTypeDef(
            SchemaProperties.JDBC_TYPE_SCHEMA,
            Collections.singleton(SchemaProperties.ENTITY_TYPE_DATASET)
    );

    typeDef.setServiceType(SchemaProperties.ENTITY_SERVICE_TYPE);
    typeDef.setOptions(new HashMap<>() {{
        put("schemaElementsAttribute", "tables");
    }});

    return typeDef;
}

创建数据库表的元数据模型

private AtlasEntityDef createJdbcTableDef() {
    AtlasEntityDef typeDef = createClassTypeDef(
            TableProperties.JDBC_TYPE_TABLE,
            Collections.singleton(TableProperties.ENTITY_TYPE_DATASET),
            createOptionalAttrDef(TableProperties.ATTR_TABLE_TYPE, "string")
    );

    typeDef.setServiceType(BaseProperties.ENTITY_SERVICE_TYPE);
    typeDef.setOptions(new HashMap<>() {{
        put("schemaElementsAttribute", "columns");
    }});

    return typeDef;
}

创建数据库列的元数据模型

private AtlasEntityDef createJdbcColumnDef() {
    AtlasEntityDef typeDef = createClassTypeDef(
            ColumnProperties.JDBC_TYPE_COLUMN,
            Collections.singleton(ColumnProperties.ENTITY_TYPE_DATASET),
            createOptionalAttrDef(ColumnProperties.ATTR_COLUMN_TYPE, "string"),
            createOptionalAttrDef(ColumnProperties.ATTR_IS_PRIMARY_KEY, "string"),
            createOptionalAttrDef(ColumnProperties.ATTR_COLUMN_IS_NULLABLE, "string"),
            createOptionalAttrDef(ColumnProperties.ATTR_COLUMN_DEFAULT_VALUE, "string"),
            createOptionalAttrDef(ColumnProperties.ATTR_COLUMN_AUTO_INCREMENT, "string")
    );

    typeDef.setServiceType(BaseProperties.ENTITY_SERVICE_TYPE);
    HashMap<String, String> options = new HashMap<>() {{
        put("schemaAttributes", "[\"name\", \"isPrimaryKey\", \"columnType\", \"isNullable\" , \"isAutoIncrement\", \"description\"]");
    }};
    typeDef.setOptions(options);

    return typeDef;
}

创建实体之间的关系模型

private List<AtlasRelationshipDef> createAtlasRelationshipDef() {
    String version = "1.0";
    // 数据库和模式的关系
    AtlasRelationshipDef databaseSchemasDef = createRelationshipTypeDef(
            BaseProperties.RELATIONSHIP_DATABASE_SCHEMAS,
            BaseProperties.RELATIONSHIP_DATABASE_SCHEMAS,
            version, COMPOSITION, AtlasRelationshipDef.PropagateTags.NONE,
            createRelationshipEndDef(BaseProperties.JDBC_TYPE_DATABASE, "schemas", SET, true),
            createRelationshipEndDef(BaseProperties.JDBC_TYPE_SCHEMA, "database", SINGLE, false)
    );
    databaseSchemasDef.setServiceType(BaseProperties.ENTITY_SERVICE_TYPE);

    AtlasRelationshipDef databaseTablesDef = createRelationshipTypeDef(
            BaseProperties.RELATIONSHIP_DATABASE_TABLES,
            BaseProperties.RELATIONSHIP_DATABASE_TABLES,
            version, AGGREGATION, AtlasRelationshipDef.PropagateTags.NONE,
            createRelationshipEndDef(BaseProperties.JDBC_TYPE_DATABASE, "tables", SET, true),
            createRelationshipEndDef(BaseProperties.JDBC_TYPE_TABLE, "database", SINGLE, false)
    );
    databaseTablesDef.setServiceType(BaseProperties.ENTITY_SERVICE_TYPE);

    // 模式和数据表的关系
    // 注意 schema 已经被使用, 需要更换否则会冲突, 例如改为 Jschema(jdbc_schema)
    AtlasRelationshipDef schemaTablesDef = createRelationshipTypeDef(
            BaseProperties.RELATIONSHIP_SCHEMA_TABLES,
            BaseProperties.RELATIONSHIP_SCHEMA_TABLES,
            version, AGGREGATION, AtlasRelationshipDef.PropagateTags.NONE,
            createRelationshipEndDef(BaseProperties.JDBC_TYPE_SCHEMA, "tables", SET, true),
            createRelationshipEndDef(BaseProperties.JDBC_TYPE_TABLE, "Jschema", SINGLE, false)
    );
    schemaTablesDef.setServiceType(BaseProperties.ENTITY_SERVICE_TYPE);

    // 表和数据列的关系
    AtlasRelationshipDef tableColumnsDef = createRelationshipTypeDef(
            BaseProperties.RELATIONSHIP_TABLE_COLUMNS,
            BaseProperties.RELATIONSHIP_TABLE_COLUMNS,
            version, COMPOSITION, AtlasRelationshipDef.PropagateTags.NONE,
            createRelationshipEndDef(BaseProperties.JDBC_TYPE_TABLE, "columns", SET, true),
            createRelationshipEndDef(BaseProperties.JDBC_TYPE_COLUMN, "table", SINGLE, false)
    );
    tableColumnsDef.setServiceType(BaseProperties.ENTITY_SERVICE_TYPE);

    return Arrays.asList(databaseSchemasDef, databaseTablesDef, schemaTablesDef, tableColumnsDef);
}
  1. 提取元数据

    不再赘述

  2. 转换元数据

使用工厂模式,提供不同类型的元数据转换方式

public interface JdbcTransferFactory {

    JdbcTransfer getTransfer(DatabaseMetaData metaData, AtlasClientV2 client);

    boolean supportType(String type);

    String getName();
}

List ignorePatterns 用来过滤不想导入的数据库元数据,例如mysqlinformation_schema

public interface JdbcTransfer {

    void transfer();

    JdbcTransfer setIgnorePatterns(List<Pattern> ignorePatterns);
}

举例:JdbcMysqlTransfer 和 MysqlTransferFactory

@AutoService(JdbcTransferFactory.class)
public class MysqlTransferFactory implements JdbcTransferFactory {

    public static final String MYSQL = "mysql";

    @Override
    public JdbcTransfer getTransfer(DatabaseMetaData metaData, AtlasClientV2 client) {
        return new JdbcMysqlTransfer(metaData, client);
    }

    @Override
    public boolean supportType(String type) {
        return MYSQL.equalsIgnoreCase(type);
    }

    @Override
    public String getName() {

        return MYSQL;
    }
}
public class JdbcMysqlTransfer implements JdbcTransfer {

    private final Jdbc jdbc;
    private final AtlasService atlasService;
    private List<Pattern> ignorePatterns;

    public JdbcMysqlTransfer(DatabaseMetaData metaData, AtlasClientV2 client) {
        this.jdbc = new Jdbc(new JdbcMetadata(metaData));
        this.atlasService = new AtlasService(client);
        this.ignorePatterns = Collections.emptyList();
    }


    @Override
    public JdbcTransfer setIgnorePatterns(List<Pattern> ignorePatterns) {
        this.ignorePatterns = ignorePatterns;
        return this;
    }

    private boolean tableIsNotIgnored(String tableName) {
        return ignorePatterns.stream().noneMatch(regex -> regex.matcher(tableName).matches());
    }

    @Override
    public void transfer() {
        // 1.数据库实体转换
        DatabaseTransfer databaseTransfer = new DatabaseTransfer(atlasService);
        AtlasEntity databaseEntity = databaseTransfer.apply(jdbc);

        // 2.表实体转换
        String catalog = (String) databaseEntity.getAttribute(BaseProperties.ATTR_NAME);
        List<AtlasEntity> tableEntities = jdbc.getTables(catalog, catalog).parallelStream()
                .filter(jdbcTable -> tableIsNotIgnored(jdbcTable.getTableName()))
                .map(new TableTransfer(atlasService, databaseEntity))
                .toList();

        // 3.列转换
        for (AtlasEntity tableEntity : tableEntities) {
            String tableName = (String) tableEntity.getAttribute(BaseProperties.ATTR_NAME);
            List<JdbcPrimaryKey> primaryKeys = jdbc.getPrimaryKeys(catalog, tableName);
            jdbc.getColumns(catalog, catalog, tableName).parallelStream()
                    .forEach(new ColumnTransfer(atlasService, tableEntity, primaryKeys));
        }
    }

}
  1. 元数据存入Atlas
public class DatabaseTransfer implements Function<Jdbc, AtlasEntity> {

    private final AtlasService atlasService;

    public DatabaseTransfer(AtlasService atlasService) {
        this.atlasService = atlasService;
    }

    @Override
    public AtlasEntity apply(Jdbc jdbc) {
        String userName = jdbc.getUserName();
        String driverName = jdbc.getDriverName();
        String productName = jdbc.getDatabaseProductName();
        String productVersion = jdbc.getDatabaseProductVersion();

        String url = jdbc.getUrl();
        String urlWithNoParams = url.contains("?") ? url.substring(0, url.indexOf("?")) : url;
        String catalogName = urlWithNoParams.substring(urlWithNoParams.lastIndexOf("/") + 1);
        // 特殊处理 Oracle
        if (productName.equalsIgnoreCase("oracle")){
            catalogName = userName.toUpperCase();
            urlWithNoParams = urlWithNoParams + "/" + catalogName;
        }

        DatabaseProperties properties = new DatabaseProperties();
        properties.setQualifiedName(urlWithNoParams);
        properties.setDisplayName(catalogName);
        properties.setOwner(userName);
        properties.setUrl(url);
        properties.setDriverName(driverName);
        properties.setProductName(productName);
        properties.setProductVersion(productVersion);

        // 1.创建Atlas Entity
        AtlasEntity atlasEntity = new AtlasEntity(DatabaseProperties.JDBC_TYPE_DATABASE, properties.getAttributes());

        // 2.判断是否存在实体, 存在则填充GUID
        Map<String, String> searchParam = Collections.singletonMap(DatabaseProperties.ATTR_QUALIFIED_NAME, urlWithNoParams);
        Optional<AtlasEntityHeader> entityHeader = atlasService.checkAtlasEntityExists(DatabaseProperties.JDBC_TYPE_DATABASE, searchParam);
        entityHeader.ifPresent(header -> atlasEntity.setGuid(header.getGuid()));

        // 3,存储或者更新到Atlas中
        if (entityHeader.isPresent()){
            atlasService.createAtlasEntity(new AtlasEntity.AtlasEntityWithExtInfo(atlasEntity));
        }else {
            AtlasEntityHeader header = atlasService.createAtlasEntity(new AtlasEntity.AtlasEntityWithExtInfo(atlasEntity));
            atlasEntity.setGuid(header.getGuid());
        }
        return atlasEntity;
    }
}

效果展示

  1. 元数据类型定义

在这里插入图片描述

在这里插入图片描述

  1. 测试导入元数据

由于mysql没有采用schema,因此jdbc_schema为空

在这里插入图片描述

如图所示,可以清晰的了解mysql数据库中demo数据库的数据表内容

在这里插入图片描述

数据表元数据,qualifiedName使用数据库连接url.表名
在这里插入图片描述

如同所示,数据表内各个列的元数据;可以清晰的了解该数据表的各个字段信息

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/218090.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

直观清晰的带你了解KMP算法(超详细)

KMP算法用来找某个字符串是否存在某个连续的真子串的 下面举一个例子让抽象的KMP算法更加直观&#xff0c;有助于理解 首先我们要了解KMP算法首先要找到一个next数组来表示主串中每一个字符的回退的下标&#xff08;这个下标是对于真子串而言的&#xff0c;主串不需要回退&…

编写并调试运行一个简单的 Java 应用程序,显示自己的学号、姓名、兴趣爱好等。

源代码&#xff1a; public class Main { public static void main(String[] args) { System.out.println("学号是:""0233217821"); System.out.println("姓名是:""赵港"); System.out.println("兴趣爱好是:""运动&qu…

【若依框架实现上传文件组件】

若依框架中只有个人中心有上传图片组件&#xff0c;但是这个组件不适用于el-dialog中的el-form表单页面 于是通过elementui重新写了一个上传组件&#xff0c;如图是实现效果 vue代码 <el-dialog :title"title" v-model"find" width"600px"…

基于Eclipse+Mysql+Tomcat开发的 教学评价管理系统

基于EclipseMysqlTomcat开发的 教学评价管理系统 项目介绍&#x1f481;&#x1f3fb; 随着教育信息化的发展&#xff0c;教学评价管理系统已经成为了学校、教育机构等场所必不可少的一部分。本项目是基于EclipseMysqlTomcat开发的一套教学评价管理系统&#xff0c;旨在帮助教育…

Linux系统上RabbitMQ安装教程

一、安装前环境准备 Linux&#xff1a;CentOS 7.9 RabbitMQ Erlang 1、系统内须有C等基本工具 yum install build-essential openssl openssl-devel unixODBC unixODBC-devel make gcc gcc-c kernel-devel m4 ncurses-devel tk tc xz socat2、下载安装包 1&#xff09;首先&a…

C++12.4

沙发床的多继承 多继承代码实现沙发床沙发床继承于沙发和床 代码&#xff1a; #include <iostream>using namespace std;//封装 沙发 类 class Sofa { private:string sitting;double *size; public://无参构造函数Sofa() {cout << "Sofa::无参构造函数&quo…

JAVAEE初阶相关内容第十六弹--网络原理之TCP_IP

目录 1. TCP-IP五层模型 2. UDP协议 2.1 特点 2.2 UDP协议端格式 2.3 校验和 3. TCP协议 3.1 特点 3.2 TCP协议段格式 3.2.1 首部长度 3.2.2 选项 3.2.3 保留6位 3.3 TCP内部的工作机制 3.3.1 确认应答 &#xff08;1&#xff09;应答报文ack &#xff08;2&…

【Filament】Filament环境搭建

1 前言 Filament 是一个实时物理渲染引擎&#xff0c;用于 Android、iOS、Linux、macOS、Windows 和 WebGL 平台。该引擎旨在提供高效、实时的图形渲染&#xff0c;并被设计为在 Android 平台上尽可能小而尽可能高效。Filament 支持基于物理的渲染&#xff08;PBR&#xff09;&…

【Vue】使用 Vue CLI 脚手架创建 Vue 项目(使用GUI创建)

前言 在开始使用Vue进行开发之前&#xff0c;我们需要先创建一个Vue项目。Vue CLI&#xff08;Command Line Interface&#xff09;是一个官方提供的脚手架工具&#xff0c;可以帮助我们快速创建Vue项目。Vue CLI也提供了一个可视化的GUI界面来创建和管理Vue项目。 步骤 打开终…

接口获取数据控制台打印有值但是展开又没有了

谷歌浏览器只会展现响应式数据最后的结果&#xff0c;证明原来接口是有值的&#xff0c;后面对这个数据进行操作后&#xff0c;最终没有值了。所以对数据进行操作时最好对数据进行一次深拷贝 JSON.parse(JSON.stringify(data))

任意文件上传漏洞实战和防范

文件上传漏洞广泛存在于Web1.0时代&#xff0c;恶意攻击者的主要攻击手法是将可执行脚本&#xff08;WebShell&#xff09;上传至目标服务器&#xff0c;以达到控制目标服务器的目的。 此漏洞成立的前提条件至少有下面两个&#xff1a; 1.可以上传对应的脚本文件&#xff0c;…

图解系列--HTTPS,认证

确保 Web 安全的HTTPS 1.HTTP 的缺点 1.1.通信使用明文可能会被窃听 加密处理防止被窃听 加密的对象可以有这么几个。 (1).通信的加密 HTTP 协议中没有加密机制&#xff0c;但可以通过和 SSL&#xff08;Secure Socket Layer&#xff0c;安全套接层&#xff09;或TLS&#xff…

Redis持久化及常见问题解决

持久化缓存雪崩缓存穿透缓存击穿缓存预热 持久化 Redis的储存形式&#xff1a;一份在内存、一份在磁盘。内存的是最新的&#xff1b;磁盘里的会隔一段时间更新。 Redis持久化方式&#xff1a; RDB:快照方式&#xff1b;将某⼀个时刻的内存数据&#xff0c;以⼆进制的⽅式写⼊…

Vue框架学习笔记——列表渲染:v-for

文章目录 前文提要代码正文 前文提要 本人仅做个人学习记录&#xff0c;如有错误&#xff0c;请多包涵 主要学习链接&#xff1a;尚硅谷Vue2.0Vue3.0全套教程丨vuejs从入门到精通 代码正文 <body><div id"box"><ul><li v-for"(p,index)…

基于Python自动化测试框架之接口测试

前段时间由于公司测试方向的转型&#xff0c;由原来的web页面功能测试转变成接口测试&#xff0c;之前大多都是手工进行&#xff0c;利用postman和jmeter进行的接口测试&#xff0c;后来&#xff0c;组内有人讲原先web自动化的测试框架移驾成接口的自动化框架&#xff0c;使用的…

leetcode算法之栈

目录 1.删除字符串中的所有相邻重复项2.比较含退格的字符串3.基本计算器II4.字符串解码5.验证栈序列 1.删除字符串中的所有相邻重复项 删除字符串中的所有相邻重复项 class Solution { public:string removeDuplicates(string s) {string ret;//使用数组模拟栈操作for(auto …

龙迅#LT6911GX是一款高性能HDMI2.1至MIPI或LVDS芯片,支持图像处理,DSC压缩和嵌入式LPDDR4 旋转功能!

1.描述 应用功能&#xff1a;LT6711GX适用于HDMI2.1转MIPICSI/DSI&#xff1b;HDMI2.1转LVDS&#xff0c;支持高刷模式&#xff0c;带HDCP 方案&#xff01; 分辨率&#xff1a;最高支持8K30HZ 工作温度范围&#xff1a;−40C to 85C 产品封装&#xff1a;BGA169&#xff08;9*…

clip-path,css裁剪函数

https://www.cnblogs.com/dzyany/p/13985939.html clip-path - CSS&#xff1a;层叠样式表 | MDN 我们看下这个例子 polygon里有四个值分别代表这四个点相对于原图左上方的偏移量。 裁剪个五角星

【跨境营商】创新科技助力数码转型 增强大湾区企业核心竞争力

粤港澳大湾区作为国家的重点发展区域&#xff0c;坐拥丰富的资源及商机&#xff0c;企业积极推行数码化&#xff0c;务求在大湾区抢占先机。香港电讯商业客户业务董事总经理吴家隆表示&#xff0c;近年企业锐意加快数码化步伐&#xff0c;香港电讯以创新科技融入的数码方案&…

解密Prompt系列20. LLM Agent之再谈RAG的召回多样性优化

几个月前我们就聊过RAG的经典方案解密Prompt系列14. LLM Agent之搜索应用设计。前几天刚看完openAI在DevDay闭门会议上介绍的RAG相关的经验&#xff0c;有些新的感悟&#xff0c;借此机会再梳理下RAG相关的优化方案。推荐直接看原视频&#xff08;外网&#xff09;A Survey of …