Spark—GraphX实战 OneID

OneID

前面我们学习了ID Mapping,包括ID Mapping 的背景介绍和业务场景,以及如何使用Spark 实现ID Mapping,这个过程中涉及到了很多东西,当然我们都通过文章的形式介绍给大家了,所以你再学习今天这一节之前,可以先看一下前面的文章

  1. Spark实战—GraphX编程指南
  2. 数仓建模—ID Mapping(上)
  3. 数仓建模—ID Mapping(下)

在上一节我们介绍ID Mapping 的时候我们就说过ID Mapping 是为了打通用户各个维度的数据,从而消除数据孤岛、避免数据歧义,从而更好的刻画用户,所以说ID Mapping是手段不是目的,目的是为了打通数据体系,ID Mapping最终的产出就是我们今天的主角OneID,也就是说数据收集过来之后通过ID Mapping 打通,从而产生OneID,这一步之后我们的整个数据体系就将使用OneID作为用户的ID,这样我们整个数据体系就得以打通

OneData

开始之前我们先看一下阿里的OneData 数据体系,从而更好认识一下OneID,前面我们说过ID Mapping 只是手段不是目的,目的是为了打通数据体系,ID Mapping最终的产出就是OneID

其实OneID在我们整个数据服务体系中,也只是起点不是终点或者说是手段,我们最终的目的是为了建设统一的数据资产体系。

u句

没有建设统一的数据资产体系之前,我们的数据体系建设存在下面诸多问题

  1. 数据孤岛:各产品、业务的数据相互隔离,难以通过共性ID打通
  2. 重复建设:重复的开发、计算、存储,带来高昂的数据成本
  3. 数据歧义:指标定义口径不一致,造成计算偏差,应用困难

在阿里巴巴 OneData 体系中,OneID 指统一数据萃取,是一套解决数据孤岛问题的思想和方法。数据孤岛是企业发展到一定阶段后普遍遇到的问题。各个部门、业务、产品,各自定义和存储其数据,使得这些数据间难以关联,变成孤岛一般的存在。

OneID的做法是通过统一的实体识别和连接,打破数据孤岛,实现数据通融。简单来说,用户、设备等业务实体,在对应的业务数据中,会被映射为唯一识别(UID)上,其各个维度的数据通过这个UID进行关联。

各个部门、业务、产品对业务实体的UID的定义和实现不一样,使得数据间无法直接关联,成为了数据孤岛。基于手机号、身份证、邮箱、设备ID等信息,结合业务规则、机器学习、图算法等算法,进行 ID-Mapping,将各种 UID 都映射到统一ID上。通过这个统一ID,便可关联起各个数据孤岛的数据,实现数据通融,以确保业务分析、用户画像等数据应用的准确和全面。

OneModel 统一数据构建和管理

将指标定位细化为:

1. 原子指标
2. 时间周期
3. 修饰词(统计粒度、业务限定, etc)

通过这些定义,设计出各类派生指标 基于数据分层,设计出维度表、明细事实表、汇总事实表,其实我们看到OneModel 其实没有什么新的内容,其实就是我们数仓建模的那一套东西

OneService 统一数据服务

OneService 基于复用而不是复制数据的思想,指得是我们的统一的数据服务,因为我们一直再提倡复用,包括我们数仓的建设,但是我们的数据服务这一块却是空白,所以OneService核心是服务的复用,能力包括:

  • 利用主题逻辑表屏蔽复杂物理表的主题式数据服务
  • 一般查询+ OLAP 分析+在线服务的统一且多样化数据服务
  • 屏蔽多种异构数据源的跨源数据服务
OneID 统一数据萃取

基于统一的实体识别、连接和标签生产,实现数据通融,包括:

  • ID自动化识别与连接
  • 行为元素和行为规则
  • 标签生产

OneID基于超强ID识别技术链接数据,高效生产标签;业务驱动技术价值化,消除数据孤岛,提升数据质量,提升数据价值。

而ID的打通,必须有ID-ID之间的两两映射打通关系通过ID映射关系表,才能将多种ID之间的关联打通,完全孤立的两种ID是无法打通的

打通整个ID体系,看似简单,实则计算复杂,计算量非常大。假如某种对象有数亿个个体,每个个体又有数十种不同的ID标识,任意两种ID之间都有可能打通关系,想要完成这类对象的所有个体ID打通需要数亿次计算,一般的机器甚至大数据集群都无法完成。

大数据领域中的ID-Mapping技术就是用机器学习算法类来取代野蛮计算,解决对象数据打通的问题。基于输入的ID关系对,利用机器学习算法做稳定性和收敛性计算,输出关系稳定的ID关系对,并生成一个UID作为唯一识别该对象的标识码。

OneID实现过程中存在的问题

前面我们知道我们的ID Mapping 是通过图计算实现,核心就是连通图,其实实现OneID我们在打通ID 之后,我们就可以为一个个连通图生成一个ID, 因为一个连通图 就代表一个用户,这样我们生成的ID就是用户的OneID,这里的用户指的是自然人,而不是某一个平台上的用户。

OneID 的生成问题

首先我们需要一个ID 生成算法,因为我们需要为大量用户生成ID,我们的ID 要求是唯一的,所以在算法设计的时候就需要考虑到这一点,我们并不推荐使用UUID,原因是UUID了可能会出现重复,而且UUID 没有含义,所以我们不推荐使用UUID,我们这里使用的是MD5 算法,所以我们的MD5 算法的参数是我们的图的标示ID。

OneID 的更新问题

这里的更新问题主要就是我们的数据每天都在更新,也就是说我们的图关系在更新,也就是说我们要不要给这个自然人重新生成OneID ,因为他的图关系可能发生了变化。

其实这里我们不能为该自然人生成新的OneID ,否则我们数仓里的历史数据可能无法关联使用,所以我们的策略就是如果该自然人已经有OneID了,则不需要重新生成,其实这里我们就是判断该图中的所有的顶点是否存在OneID,我们后面在代码中体现着一点。

OneID 的选择问题

这个和上面的更新问题有点像,上面更新问题我们可以保证一个自然人的OneID不发生变化,但是选择问题会导致发生变化,但是这个问题是图计算中无法避免的,我们举个例子,假设我们有用户的两个ID(A_ID,C_ID),但是这两个ID 在当前是没有办法打通的,所以我们就会为这个两个ID 生成两个OneID,也就是(A_OneID,B_OneID),所以这个时候我们知道因为ID Mapping 不上,所以我们认为这两个ID 是两个人。

后面我们有了另外一个ID(B_ID),这个ID可以分别和其他的两个ID 打通,也就是B_ID<——>A_ID , B_ID<——>C_ID 这样我们就打通这个三个ID,这个时候我们知道

这个用户存在三个ID,并且这个时候已经存在了两个OneID,所以这个时候我们需要在这两个OneID中选择一个作为用户的OneID,简单粗暴点就可以选择最小的或者是最大的。

我们选择了之后,要将另外一个OneID对应的数据,对应到选择的OneID 下,否则没有被选择的OneID的历史数据就无法追溯了

OneID 代码实现

这个代码相比ID Mapping主要是多了OneID 的生成逻辑和更新逻辑 ,需要注意的是关于顶点集合的构造我们不是直接使用字符串的hashcode ,这是因为hashcode 很容易重复

object OneID  {
    val spark = SparkSession
      .builder()
      .appName("OneID")
      .getOrCreate()

  val sc = spark.sparkContext

  def main(args: Array[String]): Unit = {
    val bizdate=args(0)
    val c = Calendar.getInstance
    val format = new SimpleDateFormat("yyyyMMdd")
    c.setTime(format.parse(bizdate))

    c.add(Calendar.DATE, -1)
    val bizlastdate = format.format(c.getTime)

    println(s" 时间参数  ${bizdate}    ${bizlastdate}")
    // dwd_patient_identity_info_df 就是我们用户的各个ID ,也就是我们的数据源
    // 获取字段,这样我们就可以扩展新的ID 字段,但是不用更新代码
    val columns = spark.sql(
      s"""
         |select
         |   *
         |from
         |   lezk_dw.dwd_patient_identity_info_df
         |where
         |   ds='${bizdate}'
         |limit
         |   1
         |""".stripMargin)
      .schema.fields.map(f => f.name).filterNot(e=>e.equals("ds")).toList

    // 获取数据
    val dataFrame = spark.sql(
      s"""
        |select
        |   ${columns.mkString(",")}
        |from
        |   lezk_dw.dwd_patient_identity_info_df
        |where
        |   ds='${bizdate}'
        |""".stripMargin
    )

    // 数据准备
    val data = dataFrame.rdd.map(row => {
      val list = new ListBuffer[String]()
      for (column <- columns) {
        val value = row.getAs[String](column)
        list.append(value)
      }
      list.toList
    })
    import spark.implicits._
    // 顶点集合
    val veritx= data.flatMap(list => {
      for (i <- 0 until columns.length if StringUtil.isNotBlank(list(i)) && (!"null".equals(list(i))))
        yield (new BigInteger(DigestUtils.md5Hex(list(i)),16).longValue, list(i))

    }).distinct

    val veritxDF=veritx.toDF("id_hashcode","id")
    veritxDF.createOrReplaceTempView("veritx")

    // 生成边的集合
    val edges = data.flatMap(list => {
      for (i <- 0 to list.length - 2 if StringUtil.isNotBlank(list(i)) && (!"null".equals(list(i)))
           ; j <- i + 1 to list.length - 1 if StringUtil.isNotBlank(list(j)) && (!"null".equals(list(j))))
      yield Edge(new BigInteger(DigestUtils.md5Hex(list(i)),16).longValue,new BigInteger(DigestUtils.md5Hex(list(j)),16).longValue, "")
    }).distinct


    // 开始使用点集合与边集合进行图计算训练
    val graph = Graph(veritx, edges)
    val connectedGraph=graph.connectedComponents()

    // 连通节点
    val  vertices = connectedGraph.vertices.toDF("id_hashcode","guid_hashcode")
    vertices.createOrReplaceTempView("to_graph")

    // 加载昨日的oneid 数据 (oneid,id,id_hashcode) 
    val ye_oneid = spark.sql(
      s"""
        |select
        |   oneid,id,id_hashcode
        |from
        |   lezk_dw.dwd_patient_oneid_info_df
        |where
        |   ds='${bizlastdate}'
        |""".stripMargin
    )
    ye_oneid.createOrReplaceTempView("ye_oneid")

    // 关联获取 已经存在的 oneid,这里的min 函数就是我们说的oneid 的选择问题
    val exists_oneid=spark.sql(
      """
        |select
        |   a.guid_hashcode,min(b.oneid) as oneid
        |from
        |   to_graph a
        |inner join
        |   ye_oneid b
        |on
        |   a.id_hashcode=b.id_hashcode
        |group by
        |   a.guid_hashcode
        |""".stripMargin
    )
    exists_oneid.createOrReplaceTempView("exists_oneid")
    // 不存在则生成 存在则取已有的 这里nvl 就是oneid  的更新逻辑,存在则获取 不存在则生成
    val today_oneid=spark.sql(
      s"""
        |insert overwrite table dwd_patient_oneid_info_df partition(ds='${bizdate}')
        |select
        |   nvl(b.oneid,md5(cast(a.guid_hashcode as string))) as oneid,c.id,a.id_hashcode,d.id as guid,a.guid_hashcode
        |from
        |   to_graph a
        |left join
        |   exists_oneid b
        |on
        |   a.guid_hashcode=b.guid_hashcode
        |left join
        |   veritx c
        |on
        |   a.id_hashcode=c.id_hashcode
        |left join
        |   veritx d
        |on
        |   a.guid_hashcode=d.id_hashcode
        |""".stripMargin
    )
    sc.stop
  }

}

这个代码中我们使用了SparkSQL,其实你如果更加擅长RDD的API,也可以使用RDD 优化,需要注意的是网上的很多代码中使用了广播变量,将vertices 变量广播了出去,其实这个时候存在一个风险那就是如果你的vertices 变量非常大,你广播的时候存在OOM 的风险,但是如果你使用了SparkSQL的话,Spark 就会根据实际的情况,帮你自动优化。

优化点 增量优化

我们看到我们每次都是全量的图,其实我们可以将我们的OneID 表加载进来,然后将我们的增量数据和已有的图数据进行合并,然后再去生成图

val veritx = ye_veritx.union(to_veritx)
val edges = ye_edges.union(to_edges)

val graph = Graph(veritx, edges)

总结

  1. ID Mapping 是OneID 的提前,OneID 是ID Mapping 的结果,所以要想做OneID必须先做ID Mapping
  2. OneID 是为了打通整个数据体系的数据,所以OneID 需要以服务的方式对外提供服务,在数仓里面就是作为基础表使用,对外的话我们就需要提供接口对外提供服务

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/491860.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

python项目子模块配置

创建模块子应用 1.在项目中新建一个apps的目录&#xff0c;用于存放所有子模块应用 2.在apps包下创建所需应用 注册模块子应用 1.在主模块里面寻找到配置文件 2.在配置文件中找到 INSTALLED_APPS&#xff0c;添加相应路径apps.users Tips: 由于每次添加都要输入前缀apps.会…

解决:WARN:Tue Mar 26 23:36:57 CST 2024 WARN: Establishing SSL connection

小码在学习Java的JDBC编程中&#xff0c;碰到了一条非常长的异常警告&#xff0c;idea的框都装不下了&#xff01;&#xff01; 一、异常信息 Tue Mar 26 23:36:57 CST 2024 WARN: Establishing SSL connection without servers identity verification is not recommended. Ac…

C++类和对象、面向对象编程 (OOP)

文章目录 一、封装1.抽象、封装2.类和对象(0)学习视频(1)类的构成(2)三种访问权限(3)struct和class的区别(4)私有的成员变量、共有的成员函数(5)类内可以直接访问私有成员&#xff0c;不需要经过对象 二、继承三、多态1.概念2.多态的满足条件3.多态的使用条件4.多态原理剖析5.纯…

设计模式-装饰者模式在Java中使用实例-打印发票装饰抬头和脚注

场景 设计模式-装饰者模式在Java中的使用示例&#xff1a; 设计模式-装饰者模式在Java中的使用示例_java装饰者模式例子-CSDN博客 上面装饰器的调用示例如下 AbstarctComputer computer;//要买1台电脑computer new BaseComputer();//加一个内存条computer new MemoryDecor…

备考ICA----Istio实验9---熔断Circuit Breaking 实验

备考ICA----Istio实验9—熔断Circuit Breaking 实验 1. 环境准备 创建httpbin环境 kubectl apply -f istio/samples/httpbin/httpbin.yaml kubectl get svc httpbin2. 创建测试用客户端 kubectl apply -f istio/samples/httpbin/sample-client/fortio-deploy.yaml3. 创建Ht…

YOLOv8融入低照度图像增强算法---传统算法篇

YOLOv8n原图检测YOLOv8n增强后检测召回率和置信度都有提升 前言 这篇博客讲讲低照度,大家都催我出一些内容,没想到这么多同学搞这个,恰好我也做过这方面的一些工作,那今天就来讲解一些方法,低照度的图像增强大体分“传统算法”和“深度学习算法”; 目前低照度的图像增…

mysql安装及操作

一、Mysql 1.1 MySQL数据库介绍 1.1.1 什么是数据库DB&#xff1f; DB的全称是database&#xff0c;即数据库的意思。数据库实际上就是一个文件集合&#xff0c;是一个存储数据的仓库&#xff0c;数据库是按照特定的格式把数据存储起来&#xff0c;用户可以对存储的数据进行…

管理能力学习笔记三:管理者的时间管理法

时间管理三步法 1、对任务进行分类 2、估算任务时间 3、持续反思评估 对任务进行分类 分类方法&#xff1a;时间管理四象限 A类 B类 C类 D类 估算时间 需要预留休息时间和机动时间 持续反思评估 核对检查任务 自我提问 处理日常干扰的办法 对事情发出提问 对话内容进行…

使用llamafile 构建本地大模型运用

安装 https://github.com/Mozilla-Ocho/llamafile 下载 大模型文件&#xff0c;选择列表中任意一个 wget https://huggingface.co/jartine/llava-v1.5-7B-GGUF/resolve/main/llava-v1.5-7b-q4.llamafile?downloadtrue https://github.com/Mozilla-Ocho/llamafile?tabre…

2024我国内燃机深度研究报告

环洋市场咨询Global Info Research的内燃机市场调研报告提供内燃机市场的基本概况&#xff0c;包括定义&#xff0c;分类&#xff0c;应用和产业链结构&#xff0c;同时还讨论发展政策和计划以及制造流程和成本结构&#xff0c;分析内燃机市场的发展现状与未来市场趋势&#xf…

langchain调用语言模型chatglm4从智谱AI

目录 ​0.langchain agent 原理 ReAct 1.langchain agent使用chatgpt调用tools的源代码 2.自定义本地语言模型的代码 3.其他加速方法 背景&#xff1a;如果使用openai的chatgpt4进行语言问答&#xff0c;是需要从国内到国外的一个客户请求-->openai服务器response的一个…

RuoYi-Vue若依框架-如何更换标题、图标以及登录背景

以下操作建立在RuoYi-Vue已成功启动可进行二次开发 更换标题 打开vue前端&#xff0c;我用的是vscode&#xff0c;进行全局搜索&#xff0c;把若依管理系统换成自己想要显示的项目名称即可&#xff0c;我这里已经换过了&#xff0c;所以没有显示&#xff0c;更换完了重启或者…

IDEA 2023右下角无git分支显示解决

当你排除项目问题之后&#xff0c;可能就是idea配置问题了&#xff0c;需要在View -> Appearance -> Status Bar Widgets 中 把 git Branch 勾上。

百度蜘蛛池平台在线发外链-原理以及搭建教程

蜘蛛池平台是一款非常实用的SEO优化工具&#xff0c;它可以帮助网站管理员提高网站的排名和流量。百度蜘蛛池原理是基于百度搜索引擎的搜索算法&#xff0c;通过对网页的内容、结构、链接等方面进行分析和评估&#xff0c;从而判断网页的质量和重要性&#xff0c;从而对网页进行…

010、如何阅读Revit的API文档

Revit的API很大&#xff0c;其文档也很大&#xff0c;本次文章简单教你如何阅读它。 虽然Autodesk的官方文档Revit API可以在其SDK中找到一个.chm文件&#xff0c;但我建议大家访问APIDocs.co来了解Revit API。 这个由软件开发人员Gui Talarico创建的站点记录了Revit、Rhino、…

基于javaweb(springboot+mybatis)生活美食分享平台管理系统设计和实现以及文档报告

基于javaweb(springbootmybatis)生活美食分享平台管理系统设计和实现以及文档报告 博主介绍&#xff1a;多年java开发经验&#xff0c;专注Java开发、定制、远程、文档编写指导等,csdn特邀作者、专注于Java技术领域 作者主页 央顺技术团队 Java毕设项目精品实战案例《1000套》 …

什么是V R美术馆|V R互动体验店加盟|虚拟现实元宇宙

VR美术馆是利用虚拟现实&#xff08;VR&#xff09;技术构建的数字化美术馆&#xff0c;通过虚拟展厅和虚拟展览等形式展示艺术作品、举办艺术展览&#xff0c;为用户提供一种沉浸式的艺术体验。用户可以通过穿戴VR头显等设备&#xff0c;在虚拟环境中自由浏览各种艺术作品&…

利用AI技术预测未被充分监测的流域中的极端洪水事件笔记

利用人工智能&#xff08;AI&#xff09;技术预测未被充分监测的流域&#xff08;ungauged watersheds&#xff09;中的极端洪水事件 文章目录 利用人工智能&#xff08;AI&#xff09;技术预测未被充分监测的流域&#xff08;ungauged watersheds&#xff09;中的极端洪水事件…

[HackMyVM]靶场Crossbow

kali:192.168.56.104 靶机:192.168.56.136 端口扫描 # nmap 192.168.56.136 Starting Nmap 7.94SVN ( https://nmap.org ) at 2024-03-26 22:17 CST Nmap scan report for crossbow.hmv (192.168.56.136) Host is up (0.0057s latency). Not shown: 997 closed tcp…

实现ls -l 功能,index,rindex函数的使用

index();----------------------------------------------------------------- index第一次遇到字符c&#xff0c;rindex最后一次遇到字符c&#xff0c;返回值都是从那个位置开始往后的字符串地址 #include <stdio.h> #include <sys/types.h> #include <pwd.h&g…