atals 是开源的数据元数据和数据资产管理平台,平台设计支持强大的图数数据库,nosql,和搜索引擎3个组件构建。都是基于开源构建。
目前市场上开源的元数据管理工具有Atlas, Datahub, Openmetadata等,你要说二次开发,谁最好,如果是java 开发,还是 Atlas ,灵活,简单。其他两个都要会python,多种语言。
atlas 虽然支持,hbase,hive,impala,sqoop等这些组件的实时元数据采集。但是其他的可以采用自定义hook来实现钩子函数。下图是一个钩子函数的流程:
我们了解钩子函数先了解,数据源,所谓钩子函数,其实是需要源系统配合,这个其实就是源系统的一个监听机制,就是在客户端(写sql)——执行端,在中间有个监听程序,可以获取sql解析过程。如果源系统没有,那就不能实现监听数据获取。
其实不会写监听程序,atlas 也好处理,中间的kafka 就是一个实时监听通道,只要你按照atlas 的格式要求,提交监控程序,就可以实现元数据管理。kafka 有两个topic:ATLAS_HOOK_TOPIC
和 ATLAS_ENTITIES_TOPIC 。只要满足这两个topic 的数据格式,可以实时写入元数据。
Atlas 在元数据管理,主要分为两部分API和kafka.在kafka之前我们先说一下什么是model .
其实models 类似我们的jdbc连接或者是presto 的catalog 信息。这个元数据的注册信息。就是你连接的是什么数据库,什么程序,字段,表,视图等这些信息需要进行注册,毕竟不同的库,这些信息不一样,比如hive 和hbase 的属性肯定不一样。那就需要建设model ,建model 有两种方式,一种是java API
另外一个是通过model json 进行提交
源码里面有很多的json model文件
curl -i -X POST -H "Content-Type: application/json" -d '{
"enumTypes": [],
"structTypes": [],
"classificationDefs": [],
"entityDefs": [
{
"category": "ENTITY",
"version": 1,
"name": "clickhouse_db",
"description": "clickhouse_db",
"typeVersion": "1.0",
"serviceType": "clickhouse",
"attributeDefs": [
{
"name": "location",
"typeName": "string",
"isOptional": true,
"cardinality": "SINGLE",
"valuesMinCount": 0,
"valuesMaxCount": 1,
"isUnique": false,
"isIndexable": false,
"includeInNotification": false,
"searchWeight": 5
},
{
"name": "clusterName",
"typeName": "string",
"isOptional": true,
"cardinality": "SINGLE",
"valuesMinCount": 0,
"valuesMaxCount": 1,
"isUnique": false,
"isIndexable": false,
"includeInNotification": false,
"searchWeight": 8
},
{
"name": "parameters",
"typeName": "map<string,string>",
"isOptional": true,
"cardinality": "SINGLE",
"valuesMinCount": 0,
"valuesMaxCount": 1,
"isUnique": false,
"isIndexable": false,
"includeInNotification": false,
"searchWeight": -1
},
{
"name": "ownerType",
"typeName": "string",
"isOptional": true,
"cardinality": "SINGLE",
"valuesMinCount": 0,
"valuesMaxCount": 1,
"isUnique": false,
"isIndexable": false,
"includeInNotification": false,
"searchWeight": -1
}
],
"superTypes": [
"DataSet"
],
"subTypes": [],
"relationshipAttributeDefs": [
{
"name": "inputToProcesses",
"typeName": "array<Process>",
"isOptional": true,
"cardinality": "SET",
"valuesMinCount": -1,
"valuesMaxCount": -1,
"isUnique": false,
"isIndexable": false,
"includeInNotification": false,
"searchWeight": -1,
"relationshipTypeName": "dataset_process_inputs",
"isLegacyAttribute": false
},
{
"name": "schema",
"typeName": "array<avro_schema>",
"isOptional": true,
"cardinality": "SET",
"valuesMinCount": -1,
"valuesMaxCount": -1,
"isUnique": false,
"isIndexable": false,
"includeInNotification": false,
"searchWeight": -1,
"relationshipTypeName": "avro_schema_associatedEntities",
"isLegacyAttribute": false
},
{
"name": "tables",
"typeName": "array<clickhouse_table>",
"isOptional": true,
"cardinality": "SET",
"valuesMinCount": -1,
"valuesMaxCount": -1,
"isUnique": false,
"isIndexable": false,
"includeInNotification": false,
"searchWeight": -1,
"relationshipTypeName": "clickhouse_table_db",
"isLegacyAttribute": false
},
{
"name": "meanings",
"typeName": "array<AtlasGlossaryTerm>",
"isOptional": true,
"cardinality": "SET",
"valuesMinCount": -1,
"valuesMaxCount": -1,
"isUnique": false,
"isIndexable": false,
"includeInNotification": false,
"searchWeight": -1,
"relationshipTypeName": "AtlasGlossarySemanticAssignment",
"isLegacyAttribute": false
},
{
"name": "outputFromProcesses",
"typeName": "array<Process>",
"isOptional": true,
"cardinality": "SET",
"valuesMinCount": -1,
"valuesMaxCount": -1,
"isUnique": false,
"isIndexable": false,
"includeInNotification": false,
"searchWeight": -1,
"relationshipTypeName": "process_dataset_outputs",
"isLegacyAttribute": false
}
],
"businessAttributeDefs": {}
}
],
"relationshipDefs": []
}' --user admin:admin "http://localhost:21000/api/atlas/v2/types/typedefs"
这一步是要注册数据库类型:注册数据库,注册数据表,注册字段等
下一步要对,库-表,字段进行关系映射
#/v2/types/typedefs
{
"entityDefs": [],
"classificationDefs": [],
"structDefs": [],
"enumDefs": [],
"relationshipDefs": [
{
"category": "RELATIONSHIP",
"version": 1,
"name": "clickhouse_table_db",
"description": "clickhouse_table_db",
"typeVersion": "1.0",
"serviceType": "clickhouse",
"attributeDefs": [],
"relationshipCategory": "AGGREGATION",
"propagateTags": "NONE",
"endDef1": {
"type": "clickhouse_table",
"name": "db",
"isContainer": false,
"cardinality": "SINGLE",
"isLegacyAttribute": false
},
"endDef2": {
"type": "clickhouse_db",
"name": "tables",
"isContainer": true,
"cardinality": "SET",
"isLegacyAttribute": false
}
},
{
"category": "RELATIONSHIP",
"version": 1,
"name": "clickhouse_table_columns",
"description": "clickhouse_table_columns",
"typeVersion": "1.0",
"serviceType": "clickhouse",
"attributeDefs": [],
"relationshipCategory": "COMPOSITION",
"propagateTags": "NONE",
"endDef1": {
"type": "clickhouse_table",
"name": "columns",
"isContainer": true,
"cardinality": "SET",
"isLegacyAttribute": false
},
"endDef2": {
"type": "clickhouse_column",
"name": "table",
"isContainer": false,
"cardinality": "SINGLE",
"isLegacyAttribute": false
}
},
{
"category": "RELATIONSHIP",
"version": 1,
"name": "clickhouse_table_storagedesc",
"description": "clickhouse_table_storagedesc",
"typeVersion": "1.0",
"serviceType": "clickhouse",
"attributeDefs": [],
"relationshipCategory": "ASSOCIATION",
"propagateTags": "NONE",
"endDef1": {
"type": "clickhouse_table",
"name": "sd",
"isContainer": false,
"cardinality": "SINGLE",
"isLegacyAttribute": false
},
"endDef2": {
"type": "clickhouse_storagedesc",
"name": "table",
"isContainer": false,
"cardinality": "SINGLE",
"isLegacyAttribute": false
}
}
]
}
关系是 数据库-表-字段-属性等关系映射,这个是为了映射跳转。
第二步:kafka写数据
写入数据,可以通过api调研,也可以通过kafka 提交:
{
"version": {
"version": "1.0.0",
"versionParts": Array[1]
},
"msgCompressionKind": "NONE",
"msgSplitIdx": 1,
"msgSplitCount": 1,
"msgSourceIP": "10.45.1.116",
"msgCreatedBy": "bi",
"msgCreationTime": 1710575827820,
"message": {
"type": "ENTITY_CREATE_V2",
"user": "bi",
"entities": {
"entities": [
{
"typeName": "clickhouse_table",
"attributes": {
"owner": "bi",
"ownerType": "USER",
"sd": Object{...},
"tableType": "MANAGED",
"createTime": 1710575827000,
"qualifiedName": "test.wuxl_0316_ss@primary",
"columns": [
Object{...},
Object{...}
],
"name": "wuxl_0316_ss",
"comment": "测试表",
"parameters": {
"transient_lastDdlTime": "1710575827"
},
"db": {
"typeName": "clickhouse_db",
"attributes": {
"owner": "bi",
"ownerType": "USER",
"qualifiedName": "test@primary",
"clusterName": "primary",
"name": "test",
"description": "",
"location": "hdfs://HDFS80727/bi/test.db",
"parameters": {
}
},
"guid": "-861237351166886",
"version": 0,
"proxy": false
}
},
"guid": "-861237351166888",
"version": 0,
"proxy": false
},
Object{...},
Object{...},
Object{...},
Object{...}
]
}
}
}
可以通过flink 提交
-- 使用Flinksql往Atlas自带的topic里写消息
CREATE TABLE ads_zdm_offsite_platform_daren_rank_df_to_kafka (
data string
) WITH (
'connector' = 'kafka',
'topic' = 'ATLAS_HOOK',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'raw'
);
insert into ads_zdm_offsite_platform_daren_rank_df_to_kafka
select '{"version":{"version":"1.0.0","versionParts":[1]},"msgCompressionKind":"NONE","msgSplitIdx":1,"msgSplitCount":1,"msgSourceIP":"10.45.1.116","msgCreatedBy":"bi","msgCreationTime":1710575827820,"message":{"type":"ENTITY_CREATE_V2","user":"bi","entities":{"entities":[{"typeName":"clickhouse_table","attributes":{"owner":"bi","ownerType":"USER","sd":{"typeName":"clickhouse_storagedesc","attributes":{"qualifiedName":"test.wuxl_0316_ss@primary_storage","name":"org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe","location":"hdfs://HDFS80727/bi/test.db/wuxl_0316_ss","compressed":false,"inputFormat":"org.apache.hadoop.mapred.TextInputFormat","outputFormat":"org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat","parameters":{"serialization.format":"1"}},"guid":"-861237351166887","version":0,"proxy":false},"tableType":"MANAGED","createTime":1710575827000,"qualifiedName":"test.wuxl_0316_ss@primary","columns":[{"typeName":"clickhouse_column","attributes":{"qualifiedName":"test.wuxl_0316_ss.column_tt_1@primary","name":"column_tt_1","comment":"测试字段1","type":"string","table":{"typeName":"clickhouse_table","attributes":{"qualifiedName":"test.wuxl_0316_ss@primary"},"guid":"-861237351166888","version":0,"proxy":false}},"guid":"-861237351166890","version":0,"proxy":false},{"typeName":"clickhouse_column","attributes":{"qualifiedName":"test.wuxl_0316_ss.column_tt_2@primary","name":"column_tt_2","comment":"测试字段2","type":"string","table":{"typeName":"clickhouse_table","attributes":{"qualifiedName":"test.wuxl_0316_ss@primary"},"guid":"-861237351166888","version":0,"proxy":false}},"guid":"-861237351166891","version":0,"proxy":false}],"name":"wuxl_0316_ss","comment":"测试表","parameters":{"transient_lastDdlTime":"1710575827"},"db":{"typeName":"clickhouse_db","attributes":{"owner":"bi","ownerType":"USER","qualifiedName":"test@primary","clusterName":"primary","name":"test","description":"","location":"hdfs://HDFS80727/bi/test.db","parameters":{}},"guid":"-861237351166886","version":0,"proxy":false}},"guid":"-861237351166888","version":0,"proxy":false},{"typeName":"clickhouse_db","attributes":{"owner":"bi","ownerType":"USER","qualifiedName":"test@primary","clusterName":"primary","name":"test","description":"","location":"hdfs://HDFS80727/bi/test.db","parameters":{}},"guid":"-861237351166886","version":0,"proxy":false},{"typeName":"clickhouse_storagedesc","attributes":{"qualifiedName":"test.wuxl_0316_ss@primary_storage","name":"org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe","location":"hdfs://HDFS80727/bi/test.db/wuxl_0316_ss","compressed":false,"inputFormat":"org.apache.hadoop.mapred.TextInputFormat","outputFormat":"org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat","parameters":{"serialization.format":"1"}},"guid":"-861237351166887","version":0,"proxy":false},{"typeName":"clickhouse_column","attributes":{"qualifiedName":"test.wuxl_0316_ss.column_tt_1@primary","name":"column_tt_1","comment":"测试字段1","type":"string","table":{"typeName":"clickhouse_table","attributes":{"qualifiedName":"test.wuxl_0316_ss@primary"},"guid":"-861237351166888","version":0,"proxy":false}},"guid":"-861237351166890","version":0,"proxy":false},{"typeName":"clickhouse_column","attributes":{"qualifiedName":"test.wuxl_0316_ss.column_tt_2@primary","name":"column_tt_2","comment":"测试字段2","type":"string","table":{"typeName":"clickhouse_table","attributes":{"qualifiedName":"test.wuxl_0316_ss@primary"},"guid":"-861237351166888","version":0,"proxy":false}},"guid":"-861237351166891","version":0,"proxy":false}]}}}' as data
;
atlas 在自定义表,应用程序,报表等都有很方便的接口,可以通过接口或者kafka提交实时的变更信息,方便实时监控。