图解Spark Graphx实现顶点关联邻接顶点的collectNeighbors函数原理

image

一、场景案例

在一张社区网络里,可能需要查询出各个顶点邻接关联的顶点集合,类似查询某个人关系比较近的都有哪些人的场景。

在用Spark graphx中,通过函数collectNeighbors便可以获取到源顶点邻接顶点的数据。

下面以一个例子来说明,首先,先基于顶点集和边来创建一个Graph图。

image

该图的顶点集合为——

(1L, "Alice"),
(2L, "Bob"),
(3L, "Charlie"),
(4L, "David"),
(5L, "Eve"),
(6L, "Frank"),
(7L, "Grace"),
(8L, "Henry"),
(9L, "Ivy")

边的集合为——

Edge(1L, 2L, "friend"),
Edge(1L, 5L, "friend"),
Edge(2L, 3L, "friend"),
Edge(2L, 4L, "friend"),
Edge(3L, 4L, "friend"),
Edge(4L, 6L, "friend"),
Edge(5L, 7L, "friend"),
Edge(5L, 8L, "friend"),
Edge(6L, 9L, "friend"),
Edge(7L, 8L, "friend"),
Edge(8L, 9L, "friend")

基于以上顶点和边,分别建立一个顶点RDD 和边RDD,然后通过Graph(vertices, edges, defaultVertex)创建一个Graph图,代码如下——

val conf = new SparkConf().setMaster("local[*]").setAppName("graphx")
val ss = SparkSession.builder().config(conf).getOrCreate()

// 创建顶点RDD
val vertices = ss.sparkContext.parallelize(Seq(
  (1L, "Alice"),
  (2L, "Bob"),
  (3L, "Charlie"),
  (4L, "David"),
  (5L, "Eve"),
  (6L, "Frank"),
  (7L, "Grace"),
  (8L, "Henry"),
  (9L, "Ivy")
))


// 创建边RDD
val edges = ss.sparkContext.parallelize(Seq(
  Edge(1L, 2L, "friend"),
  Edge(1L, 5L, "friend"),
  Edge(2L, 3L, "friend"),
  Edge(2L, 4L, "friend"),
  Edge(3L, 4L, "friend"),
  Edge(4L, 6L, "friend"),
  Edge(5L, 7L, "friend"),
  Edge(5L, 8L, "friend"),
  Edge(6L, 9L, "friend"),
  Edge(7L, 8L, "friend"),
  Edge(8L, 9L, "friend")
))

val graph = Graph(vertices, edges, null)

在成功创建图之后,就可以基于已有的图,通过collectNeighbors方法,分别得到每个顶点关联邻接顶点的数据——

val neighborVertexs = graph.mapVertices{
  case (id,(label)) => (label)
}.collectNeighbors(EdgeDirection.Either)

最终得到的neighborVertexs是一个VertexRDD[Array[(VertexId, VD)]]类型的RDD,可以通过neighborVertexs.foreach(println)打印观察一下,发现数据里,是每一个【顶点,元组】的结构,注意看,大概就能猜出来,通过neighborVertexs得到的RDD其实就是每个顶点关联了邻接顶点集合元组的数据——

(5,[Lscala.Tuple2;@bb793d7)
(8,[Lscala.Tuple2;@6d5786e6)
(1,[Lscala.Tuple2;@398cb9ea)
(9,[Lscala.Tuple2;@61c4eeb2)
(2,[Lscala.Tuple2;@d7d0256)
(6,[Lscala.Tuple2;@538f0156)
(7,[Lscala.Tuple2;@77a17e3d)
(3,[Lscala.Tuple2;@1be2a4fb)
(4,[Lscala.Tuple2;@1e0153f9)

可以进一步验证,将元组里的数据进行展开打印,通过以下代码进行验证——先通过coalesce(1)将分区设置为一个分区,多个分区打印难以确定打印顺序。然后再通过foreach遍历RDD里每一个元素,这里的元素结构如(5,[Lscala.Tuple2;@bb793d7),x._1表示是顶点5,x._2表示[Lscala.Tuple2;@bb793d7,既然是元组,那就可以进一步进行遍历打印,即 x._2.foreach(y => {...})——

neighborVertexs.coalesce(1).foreach(x => {
    print("顶点:" + x._1 + "关联的邻居顶点集合->{" )
    var str = "";
    x._2.foreach(y => {
      str += y + ","})
    print(str.substring(0, str.length - 1 ) +"}")
    println()
})

可以观察一下最后打印结果——

顶点:8关联的邻居顶点集合->{(5,Eve),(7,Grace),(9,Ivy)}
顶点:1关联的邻居顶点集合->{(2,Bob),(5,Eve)}
顶点:9关联的邻居顶点集合->{(6,Frank),(8,Henry)}
顶点:2关联的邻居顶点集合->{(1,Alice),(3,Charlie),(4,David)}
顶点:3关联的邻居顶点集合->{(2,Bob),(4,David)}
顶点:4关联的邻居顶点集合->{(2,Bob),(3,Charlie),(6,Frank)}
顶点:5关联的邻居顶点集合->{(1,Alice),(7,Grace),(8,Henry)}
顶点:6关联的邻居顶点集合->{(4,David),(9,Ivy)}
顶点:7关联的邻居顶点集合->{(5,Eve),(8,Henry)}

结合文章开始的那一个图验证一下,顶点1关联的邻接顶点是(2,Bob),(5,Eve),正确;顶点8关联的邻接顶点是(5,Eve),(7,Grace),(9,Ivy),正确。其他验证都与下图情况符合。可见,通过collectNeighbors(EdgeDirection.Either)确实可以获取网络里每个顶点关联邻接顶点的数据。

image

二、函数代码原理解析

以上就是顶点关联邻接顶点的用法案例,接下来,让我们分析一下collectNeighbors(EdgeDirection.Either)源码,该函数实现了收集顶点邻居顶点的信息——

def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[Array[(VertexId, VD)]] = {
  val nbrs = edgeDirection match {
    //聚合本顶点出度指向的邻居顶点和入度指向本顶点的邻居顶点
    case EdgeDirection.Either =>
      graph.aggregateMessages[Array[(VertexId, VD)]](
        ctx => {
          ctx.sendToSrc(Array((ctx.dstId, ctx.dstAttr)))
          ctx.sendToDst(Array((ctx.srcId, ctx.srcAttr)))
        },
        (a, b) => a ++ b, TripletFields.All)
    //聚合本顶点出度指向的邻居顶点
    case EdgeDirection.In =>
      graph.aggregateMessages[Array[(VertexId, VD)]](
        ctx => ctx.sendToDst(Array((ctx.srcId, ctx.srcAttr))),
        (a, b) => a ++ b, TripletFields.Src)
    //聚合入度指向本顶点的邻居顶点
    case EdgeDirection.Out =>
      graph.aggregateMessages[Array[(VertexId, VD)]](
        ctx => ctx.sendToSrc(Array((ctx.dstId, ctx.dstAttr))),
        (a, b) => a ++ b, TripletFields.Dst)
    case EdgeDirection.Both =>
      throw new SparkException("collectEdges does not support EdgeDirection.Both. Use" +
        "EdgeDirection.Either instead.")
  }
  graph.vertices.leftJoin(nbrs) { (vid, vdata, nbrsOpt) =>
    nbrsOpt.getOrElse(Array.empty[(VertexId, VD)])
  }
} // end of collectNeighbor

该函数用match做了一个类似Java的switch匹配,匹配有四种结果,其中,最后一种EdgeDirection.Both已经不支持,故而这里就不解读了,只讲仍然有用的三种。

用一个图来说明吧,假如有以下边指向的图——

Edge(2L, 1L),
Edge(2L, 4L),
Edge(3L, 2L),
Edge(2L, 5L),

image

  • EdgeDirection.Either表示本顶点的出度邻居和入度邻居。若本顶点为2,那么它得到邻居顶点包括(1,4,3,5),该参数表示只要与顶点2一度边关联的,都会聚集成邻居顶点。

  • EdgeDirection.In表示指向本顶点的邻居,即本顶点的入度邻居。若本顶点为2,图里邻居顶点只有3是指向2的,那么顶点2得到邻居顶点包括(3)。

  • EdgeDirection.Out表示本顶点的出度指向的邻居顶点。若本顶点为2,图里从顶点2指向邻居顶点的,将得到(1,4,5)。

由此可知,顶点关联邻居顶点的函数collectNeighbors(EdgeDirection.Either)里面的参数,就是可以基于该参数得到不同情况的邻居顶点。

这里以collectNeighbors(EdgeDirection.Either)说明函数核心逻辑——

 graph.aggregateMessages[Array[(VertexId, VD)]](
        ctx => {
          ctx.sendToSrc(Array((ctx.dstId, ctx.dstAttr)))
          ctx.sendToDst(Array((ctx.srcId, ctx.srcAttr)))
        },
        (a, b) => a ++ b, TripletFields.All)

该代码做了聚合,表示会对图里的所有边做处理。

图里有一种边结构,叫三元组(Triplet),这种结构由以下三个部分组成——

  1. 源顶点(Source Vertex):图中的一条边的起始点或源节点。
  2. 目标顶点(Destination Vertex):图中的一条边的结束点或目标节点。
  3. 边属性(Edge Attribute):连接源顶点和目标顶点之间的边上的属性值。

在graph.aggregateMessages[Array[(VertexId, VD)]]( ctx => {......})聚合函数里,就是基于三元组去做聚合统计的。

该聚合函数有两个参数,第一个参数是一个函数(ctx) => { ... },里面定义了每个顶点如何发送消息给邻居顶点。

注意看,这里的ctx正是一个三元组对象,基于该对象,可以获取一下信息——

  • ctx.srcId:获取源顶点的ID。

  • ctx.srcAttr:获取源顶点的属性。

  • ctx.dstId:获取目标顶点的ID。

  • ctx.dstAttr:获取目标顶点的属性。

ctx作为一个知道源顶点、目标顶点的三元组对象,就像一个邮差一样,负责给两边顶点发送消息。

1、ctx.sendToSrc(Array((ctx.dstId, ctx.dstAttr)))函数,这里顶点A是作为目标顶点,邻居节点B是源顶点,ctx对象就会将目标顶点B的顶点ID和属性组成的元组(ctx.dstId, ctx.dstAttr)当作消息传给源顶点A,A会将收到的消息保存下来,这样就知道EdgeDirection.Either无向边情况下,它有一个邻居B了。

image

2、 ctx.sendToDst(Array((ctx.srcId, ctx.srcAttr)))函数,这时A成为了源顶点,C成为了目标顶点,ctx对象就会将源顶点A的顶点ID和属性组成的元组(ctx.dstId, ctx.dstAttr)当作消息传给源顶点B。B会将收到的消息以数组格式Array((ctx.dstId, ctx.dstAttr))保存下来,这样B以后就知道EdgeDirection.Either无向边情况下,它有一个邻居A了。

image

这里ctx.sendToDst()用Array((ctx.dstId, ctx.dstAttr))数组形式发送,是方便后面的(a, b) => a ++ b 合并函数操作,最后每个顶点可以将它收到的邻居顶点数组合并到一个大的数组,即所有邻居顶点聚集到一个数组里返回。

还有一个TripletFields枚举需要了解下——

TripletFields.All表示本顶点将聚合包括源顶点以及目标顶点发送顶点消息。

TripletFields.Src表示本顶点只聚合源顶点发送过来的顶点消息。

TripletFields.Dst表示本顶点只聚合目标顶点发送过来的顶点消息。

EdgeDirection.Either参数对应的是TripletFields.All,表示需要将本顶点接收到的所有源顶点以及目标顶点发送的顶点消息进行聚合。

接下来,就是做聚合了——

整个图里会有许多类似邮差角色的ctx对象,只需要处理完这些对象,那么,每个顶点就会收到通过ctx对象传送过来的邻居顶点信息。

例如,A收到的ctx对象发过来的邻居消息如下——

Array((B,属性))

Array((C,属性))

Array((D,属性))

......

这时,就可以基于顶点A作为分组key,将组内的Array((B,属性))、Array((C,属性))、Array((D,属性))都合并到一个组里,即通过(a, b) => a ++ b将分组各个数据合并成一个大数组{(B,属性),(C,属性),(D,属性)},这个分组group的key是收到各个ctx对象发送邻居消息过来的顶点A。

各个顶点聚合完后,返回一个nbrs,该RDD的每一个元素,即(顶点,顶点属性,Array(邻居顶点))——

val nbrs = edgeDirection match {
  case EdgeDirection.Either =>
    graph.aggregateMessages[Array[(VertexId, VD)]](
      ctx => {
        ctx.sendToSrc(Array((ctx.dstId, ctx.dstAttr)))
        ctx.sendToDst(Array((ctx.srcId, ctx.srcAttr)))
      },
      (a, b) => a ++ b, TripletFields.All)
		......  
}

接着将原图graph的顶点vertices的rdd与聚合结果nbrs做左连接,返回一个新的 VertexRDD 对象,其中每个顶点都附带了它的邻居信息。如果某个顶点没有邻居信息(在 nbrs 中不存在对应的条目),则使用空数组来表示它的邻居。

graph.vertices.leftJoin(nbrs) { (vid, vdata, nbrsOpt) =>
  nbrsOpt.getOrElse(Array.empty[(VertexId, VD)])
}

最后,得到的顶点关联邻居顶点的RDD情况,就如前文打印的那样——

(5,[Lscala.Tuple2;@bb793d7) 顶点5展开邻居顶点=> 顶点:5关联的邻居顶点集合->{(1,Alice),(7,Grace),(8,Henry)}
(8,[Lscala.Tuple2;@6d5786e6) 顶点8展开邻居顶点=> 顶点:8关联的邻居顶点集合->{(5,Eve),(7,Grace),(9,Ivy)}
(1,[Lscala.Tuple2;@398cb9ea) 顶点1展开邻居顶点=> 顶点:1关联的邻居顶点集合->{(2,Bob),(5,Eve)}
(9,[Lscala.Tuple2;@61c4eeb2) 顶点9展开邻居顶点=> 顶点:9关联的邻居顶点集合->{(6,Frank),(8,Henry)}
(2,[Lscala.Tuple2;@d7d0256) 顶点2展开邻居顶点=> 顶点:2关联的邻居顶点集合->{(1,Alice),(3,Charlie),(4,David)}
(6,[Lscala.Tuple2;@538f0156) 顶点6展开邻居顶点=> 顶点:6关联的邻居顶点集合->{(4,David),(9,Ivy)}
(7,[Lscala.Tuple2;@77a17e3d) 顶点7展开邻居顶点=> 顶点:7关联的邻居顶点集合->{(5,Eve),(8,Henry)}
(3,[Lscala.Tuple2;@1be2a4fb) 顶点3展开邻居顶点=> 顶点:3关联的邻居顶点集合->{(2,Bob),(4,David)}
(4,[Lscala.Tuple2;@1e0153f9) 顶点4展开邻居顶点=> 顶点:4关联的邻居顶点集合->{(2,Bob),(3,Charlie),(6,Frank)}

image

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/221703.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

C语言之程序的组成和元素格式

目录 关键字 运算符 标识符 姓名和标识符 分隔符 常量和字符串常量 自由的书写格式 书写限制 连接相邻的字符串常量 缩进 本节我们来学习程序的各组成元素(关键字、运算符等)和格式相关的内容。 关键字 在C语言中,相if和else这样的标识…

Arduino学习笔记2023年11月30日

目录 1 编程软件下载2 代码结构3 IO引脚控制3.1 引脚初始化3.2 引脚使用数字量输出数字量输入模拟量输出模拟量输入 4 串口串口初始化串口输出串口输入 5 外部中断6 函数6.1 映射区间函数6.2 延时函数 总结 1 编程软件下载 官网链接:https://www.arduino.cc/ 下载链…

python学习:opencv+用鼠标画矩形和圆形

目录 步骤 定义数据 新建一个窗口黑色画布 显示黑色画布 添加鼠标回调函数 循环 一直显示图片 一直判断有没有按下字母 m 关闭所有窗口 鼠标回调函数 步骤 当鼠标按下记录坐标并记录鼠标标记位为true,移动的时候就会不断的画矩形或者圆,松下的时候就再…

Apache Doris 在某工商信息商业查询平台的湖仓一体建设实践

本文导读: 信息服务行业可以提供多样化、便捷、高效、安全的信息化服务,为个人及商业决策提供了重要支撑与参考。本文以某工商信息商业查询平台为例,介绍其从传统 Lambda 架构到基于 Doris Multi-Catalog 的湖仓一体架构演进历程。同时通过一…

CC++内存管理方式

文章目录 1. C/C内存分布总结 C语言中动态内存管理C内存管理方式new/delete操作内置类型new和delete操作自定义类型c推荐是用new和deleteoperator new与operator delete函数 定位new 1. C/C内存分布 我们先来看下面的一段代码和相关问题 int globalVar 1; static int static…

用Python手把手教你WordCloud可视化

目录 WordCloud是什么? 具体使用 总结 WordCloud是什么? WordCloud是一种数据可视化技术,通过根据文本中单词的频率或权重来生成一个视觉上吸引人的词云图。在词云图中,单词的大小和颜色通常与其在文本中的出现频率相关&#…

STM32F1外部中断EXTI

目录 1. EXTI简介 2. EXTI基本结构 3. AFIO复用IO口 4. EXTI框图 5. EXTI程序配置 5.1 首先先配置要使用的GPIO口的引脚 5.2 配置AFIO数据选择器,选择想要中断的引脚 5.3 EXTI配置 1. EXTI简介 EXTI(Extern Interrupt)外部中…

Qt创建和使用动态库链接

首先建立库文件 顺序确认完成后,构建完成 注意:上图中mydll_global.h中的内容可以复制到mydll.h中去,在以后调用时只调用mydll.h即可,否则调用时需要两个头文件同事使用。 在mydll.h和mydll.cpp中可以正常编写代码&#xff…

这个sql有点东西,记录一下

我有一个需求:在订单表里面查询指定时间的订单数据,如果要是没有订单的话,需要展示当天日期和数据,数据为0 先看一下效果: 话不多说,直接上SQL SELECTdate_range.date AS 日期,COUNT( oco.id ) AS 总订单…

计算机网络——数据链路层-差错检测(奇偶校验、循环冗余校验CRC)

目录 奇偶校验 循环冗余校验CRC 发送方操作 接收方操作 生成多项式 举例-1 举例-2 我们知道, 实际的通信链路都不是理想的,比特在传输过程中可能会产生差错;1可能变成0,而0也可能变成1,这称为比特差错。 如下…

浪潮信息 KeyarchOS 安全可信攻防体验

1. KeyarchOS——云峦操作系统简介 KeyarchOS 即云峦服务器操作系统(简称 KOS)是浪潮信息基于 Linux 内核、龙蜥等开源技术自主研发的一款服务器操作系统,支持 x86、ARM 等主流架构处理器,广泛兼容传统 CentOS 生态产品和创新技术产品,可为用…

c++--面向对象特性

1.面向对象指的是继承,封装,多态。 继承主要关注类的构造,赋值,析构。 以下对多态,封装进行补充说明。 2、多态 2.1.定义 a.赋值 派生类的指针,可以赋值给基类的指针。 派送类的对象,可以赋值给…

Avalonia中使用Prism实现区域导航功能

前言 上一篇文章我们讲了在Avalonia开发中,引入Prism框架来完成项目的MVVM迁移。本章内容将带领大家学习如何在Avalonia中使用Prism框架实现区域导航功能。如果你还不知道Avalonia中如何引入Prism框架,请看我上一篇文章:Avalonia框架下面使用…

【WPF.NET开发】构造动态布局

本文内容 系统必备创建项目配置默认的 Grid Panel 控件向面板中添加控件测试布局汇总所有内容后续步骤 在动态定位中,您通过指定子元素相对于父元素应该如何排列以及应该如何包装来排列子元素。 您还可以将窗口和控件设置为在其内容扩展时自动扩展。 适用于 Vis…

Oracle merge into语句(merge into Statement)

在Oracle中,常规的DML语句只能完成单一功能,,例如insert/delete/update只能三选一,而merge into语句可以同时对一张表进行更新/插入/删除。 目录 一、基本语法 二、用法示例 2.1 同时更新和插入 2.2 where子句 2.3 delete子句 2.4…

数据库Delete的多种用法

数据库的Delete操作是用来删除数据库中的数据记录的,它是数据库操作中的一种重要操作,能够帮助用户删除不需要的数据,以便保持数据库的整洁和高效。在使用Delete操作时,需要注意确保操作的准确性和安全性,以免误删重要…

[JavaScript前端开发及实例教程]计算器井字棋游戏的实现

计算器&#xff08;网页内实现效果&#xff09; HTML部分 <html lang"en"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><title>My Calculator&l…

Ruff智能物联网网关助力工厂数智化运营,实现产量提升5%

数字化转型是大势所趋&#xff0c;以工业互联网为代表的数实融合是发展数字经济的重要引擎&#xff0c;也是新质生产力的一大助力。工业互联网是新工业革命的重要基石&#xff0c;加快工业互联网规模化应用&#xff0c;是数字技术和实体经济深度融合的关键支撑&#xff0c;是新…

回归预测 | MATLAB实现CNN-BiLSTM(卷积双向长短期记忆神经网络

效果一览 基本介绍 提出一种同时考虑时间与空间因素的卷积&#xff0d;双向长短期记忆&#xff08; CNN-BiLSTM&#xff09;模型&#xff0c;将具有空间局部特征提取能力的卷积神经网络&#xff08;CNN&#xff09;和具有能同时考虑前后方向长时间信息的双向长短期记忆&#xf…

JavaScript基础知识21——for循环

哈喽&#xff0c;大家好&#xff0c;我是雷工&#xff01; 今天学习for循环&#xff0c;以下为学习笔记。 1、while循环和for循环有啥不同&#xff1f; 1.1、在实际开发中&#xff0c;while循环用来解决循环次数不确定时使用&#xff0c;当一个循环不确定会循环多少次时&#…