【大数据学习 | Spark-Core】Spark的分区器(HashPartitioner和RangePartitioner)

之前学过的kv类型上面的算子

groupby groupByKey reduceBykey sortBy sortByKey join[cogroup left inner right] shuffle的

mapValues keys values flatMapValues 普通算子,管道形式的算子

shuffle的过程是因为数据产生了打乱重分,分组、排序、join等算子需要将数据重新排版。

shuffle的过程是上游的数据处理完毕写出到自己的磁盘上,然后下游的数据从磁盘上面拉取。

重新排版打乱重分是需要存在规则的。

中间数据的流向规则叫做分区器 partitioner,这个分区器一般是存在于shuffle类算子中的,我们可以这么说,shuffle类算子一定会带有分区器,分区器也可以单独存在,人为定义分发规则。

groupBy groupBykey reduceBykey 自带的分区器HashPartitioner。

sortby sortBykey rangePartitioner

hashPartitioner

规则 按照key的hashCode %下游分区 = 分区编号

处理key-value类型数据,如果key为0,就分配去0号分区。否则调用nonNegativeMod函数。

保证取余的结果为正向结果。

hash取余的方式,不管数据分发到下游的什么分区中,最终结果都是相同的数据放入到一起。

演示结果:

scala> val arr = Array(1,2,3,4,5,6,7,8,9)
arr: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)

scala> sc.makeRDD(arr,3)
res78: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[73] at makeRDD at <console>:27

scala> res78.mapPartitionsWithIndex((index,it)=> it.map((index,_)))
res79: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[74] at mapPartitionsWithIndex at <console>:26

scala> res79.collect
res80: Array[(Int, Int)] = Array((0,1), (0,2), (0,3), (1,4), (1,5), (1,6), (2,7), (2,8), (2,9))

scala> res78.map(t=>(t,t))
res81: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[75] at map at <console>:26

scala> res78.partitioner
res82: Option[org.apache.spark.Partitioner] = None

scala> res81.reduceByKey(_+_)
res84: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[76] at reduceByKey at <console>:26

scala> res84.partitioner
res85: Option[org.apache.spark.Partitioner] = Some(org.apache.spark.HashPartitioner@3)

scala> res84.mapPartitionsWithIndex((index,it)=> it.map((index,_)))
res88: org.apache.spark.rdd.RDD[(Int, (Int, Int))] = MapPartitionsRDD[77] at mapPartitionsWithIndex at <console>:26

scala> res88.collect
res89: Array[(Int, (Int, Int))] = Array((0,(6,6)), (0,(3,3)), (0,(9,9)), (1,(4,4)), (1,(1,1)), (1,(7,7)), (2,(8,8)), (2,(5,5)), (2,(2,2)))

演示的逻辑,就是按照key.hashcode进行分区,int类型的hashcode值是自己的本身。

并且hash分区器的规则致使我们可以任意的修改下游的分区数量。

scala> res81.reduceByKey(_+_,100)
res91: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[78] at reduceByKey at <console>:26

scala> res91.partitions.size
res92: Int = 100

scala> res81.reduceByKey(_+_,2)
res93: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[79] at reduceByKey at <console>:26

scala> res93.partitions.size
res94: Int = 2

rangePartitioner

hashPartitioner规则非常简单,直接规定来一个数据按照hashcode规则的分配,规则比较简答,但是会出现数据倾斜。

range分区规则中存在两个方法。

rangeBounds界限,在使用这个分区器之前先做一个界限划定。

首先使用水塘抽样算法,在未知的数据集中抽取能够代表整个数据集的样本,根据样本进行规则设定。

然后在使用getPartitions。

首先存在水塘抽样,规定数据的流向以后再执行整体逻辑,会先触发计算。

sortBykey是转换类的算子,不会触发计算。

但是我们发现它触发计算了,因为首先在计算之前先进行水塘抽样,能够规定下游的数据规则,然后再进行数据的计算。

scala> arr
res101: Array[Int] = Array(1, 9, 2, 8, 3, 7, 4, 6, 5)

scala> arr.map(t=> (t,t))
res102: Array[(Int, Int)] = Array((1,1), (9,9), (2,2), (8,8), (3,3), (7,7), (4,4), (6,6), (5,5))

scala> sc.makeRDD(res102)
res104: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[94] at makeRDD at <console>:27

scala> res104.sortByKey()
res105: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[97] at sortByKey at <console>:26

scala> res105.partitioner
res106: Option[org.apache.spark.Partitioner] = Some(org.apache.spark.RangePartitioner@fe1f9dea)

scala> res104.sortByKey(true,3)
res107: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[100] at sortByKey at <console>:26

scala> res107.mapPartitionsWithIndex((index,it)=> it.map((index,_)))
res109: org.apache.spark.rdd.RDD[(Int, (Int, Int))] = MapPartitionsRDD[101] at mapPartitionsWithIndex at <console>:26

scala> res109.collect
res110: Array[(Int, (Int, Int))] = Array((0,(1,1)), (0,(2,2)), (0,(3,3)), (1,(4,4)), (1,(5,5)), (1,(6,6)), (2,(7,7)), (2,(8,8)), (2,(9,9)))

range分区器,它是先做抽样然后指定下游分区的数据界限。

它可以修改分区数量,但是分区数量不能大于元素个数,必须要保证每个分区中都有元素。

scala> res104.sortByKey(true,3)
res111: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[104] at sortByKey at <console>:26

scala> res104.sortByKey(true,300)
res112: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[107] at sortByKey at <console>:26

scala> res111.partitions.size
res114: Int = 3

scala> res112.part

自定义分区器

工作的过程中我们会遇见数据分类的情况,想要根据自己的需求定义分区的规则,让符合规则的数据发送到不同的分区中,这个时候我们就需要自定义分区器了。

定义分区器,让数据发送到不同的分区,从而不同的task任务输出的文件结果内容也不同

# 自己创建数据data/a.txt
hello tom hello jack
hello tom hello jack
hello tom hello jack
hello tom hello jack
hello tom hello jack
# 需求就是将数据按照规则进行分发到不同的分区中
# 存储的时候一个文件存储hello其他的文件存储tom jack

分区器的定义需要实现分区器的接口

class MyPartitioner extends Partitioner{
  override def numPartitions: Int = ???
// 设定下游存在几个分区
  override def getPartition(key: Any): Int = ???
// 按照key设定分区的位置
}

整体代码:

package com.hainiu.spark

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.rdd.RDD
import org.apache.spark.{Partitioner, SparkConf, SparkContext}

object Test1 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setAppName("parse")
    conf.setMaster("local[*]")
    val sc = new SparkContext(conf)
    val rdd = sc.textFile("data/a.txt")
      .flatMap(_.split(" "))
      .map((_, 1))
      .reduceByKey(_ + _)
    val rdd1 = rdd.partitionBy(new MyPartitioner)
    val fs = FileSystem.get(new Configuration())
    val out = "data/res"
    if(fs.exists(new Path(out)))
      fs.delete(new Path(out),true)
    rdd1.saveAsTextFile(out)
  }
}
class MyPartitioner extends Partitioner{
  override def numPartitions: Int = 2

  override def getPartition(key: Any): Int = {
    if(key.toString.equals("hello"))
      0
    else
      1
  }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/923916.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Java代码实现数字信封

1. 前言 本篇博客是工作经验总结&#xff0c;如果您发现此篇博客有疏漏或有待改进之处&#xff0c;欢迎评论区交流。 2. 数字信封 数字信封使用的是接收者的非对称密钥对。即&#xff1a;用接收者的公钥加密&#xff0c;且只能由接收者的私钥解密。其实现过程如下&#xff1a;…

第 4 章 Java 并发包中原子操作类原理剖析

原子变量操作类 AtomicLong 是原子性递增或者递减类&#xff0c;其内部使用 Unsafe 来实现&#xff0c;AtomicLong类也是在 rt.jar 包下面的&#xff0c;AtomicLong 类就是通过 BootStarp 类加载器进行加载的。这里的原子操作类都使用 CAS 非阻塞算法 private static final lon…

Android调起系统分享图片到其他应用

Android调起系统分享图片到其他应用 有时候分享不想接第三方的&#xff0c;其实如果你的分享要求不是很高&#xff0c;调系统的分享也是可以的。 一、思路&#xff1a; 用intent.action Intent.ACTION_SEND 二、效果图&#xff1a; 三、关键代码&#xff1a; //这个是分享…

C++中虚继承为什么可以解决菱形继承的数据冗余问题

在C中菱形继承会有数据冗余的问题发生&#xff0c;我们可以使用虚继承来解决&#xff0c;那虚继承的原理是什么&#xff0c;为什么它可以解决这个问题。 菱形继承的数据冗余问题 class A { public:int data; };class B : public A {};class C : public A {};class D : public…

LSA详情与特殊区域

LSA是构成LSDB的重要原材料&#xff0c;在OSPF中发挥很大作用。 报文 通用头部 LS age&#xff1a;LSA寿命&#xff0c;0-3600s Options&#xff1a;可选项 LS type&#xff1a;LSA类型&#xff0c;三要素之一 Link State ID&#xff1a;LSAID 三要素之一 Advertising Ro…

Kubeadm 安装 Kubernetes 高可用集群 v1.30.0

1、修改主机名&#xff08;各个节点&#xff09; hostnamectl set-hostname xxx2、hosts 文件加入主机名&#xff08;全部节点&#xff09; cat /etc/hosts 192.168.88.5 master1 192.168.88.6 master2 192.168.88.7 master3 192.168.88.8 node13、关闭防火墙&#xff08;全部…

泥石流灾害风险评估与模拟丨AI与R语言、ArcGIS、HECRAS融合,提升泥石流灾害风险预测的精度和准确性

目录 第一章 理论基础 第二章 泥石流风险评估工具 第三章 数据准备与因子提取 第四章 泥石流灾害评价 第五章 HECRAS软件的应用 第六章 操作注意事项与模型优化 泥石流灾害的频发与严重后果&#xff0c;已成为全球范围内防灾减灾工作的重大挑战。随着科技的不断进步&…

自由学习记录(25)

只要有修改&#xff0c;子表就不用元表的参数了&#xff0c;用自己的参数&#xff08;只不过和元表里的那个同名&#xff09; 子表用__index“继承”了父表的值&#xff0c;此时子表仍然是空表 一定是创建这样一个同名的变量在原本空空的子表里&#xff0c; 传参要传具体的变…

leetcode 3206. 交替组 I 简单

给你一个整数数组 colors &#xff0c;它表示一个由红色和蓝色瓷砖组成的环&#xff0c;第 i 块瓷砖的颜色为 colors[i] &#xff1a; colors[i] 0 表示第 i 块瓷砖的颜色是 红色 。colors[i] 1 表示第 i 块瓷砖的颜色是 蓝色 。 环中连续 3 块瓷砖的颜色如果是 交替 颜色&…

彻底解决 macOS 下Matplotlib 中文显示乱码问题

彻底解决 macOS 下Matplotlib 中文显示乱码问题 在使用 Python 的 Matplotlib 库进行数据可视化时&#xff0c;中文字符的显示常常会出现乱码问题&#xff0c;尤其在 macOS 系统上。在网上找了一大堆方法&#xff0c;花了很久&#xff0c;发现不是要安装各种字体就是要改配置&…

深度学习笔记24_天气预测

&#x1f368; 本文为&#x1f517;365天深度学习训练营 中的学习记录博客&#x1f356; 原作者&#xff1a;K同学啊 | 接辅导、项目定制 一、我的环境 1.语言环境&#xff1a;Python 3.9 2.编译器&#xff1a;Pycharm 3.深度学习环境&#xff1a;TensorFlow 2.10.0 二、GPU设置…

podman 源码 5.3.1编译

1. 构建环境 在麒麟V10服务器操作系统上构建&#xff1a;Kylin-Server-V10-GFB-Release-2204-Build03-ARM64.iso。由于只是编译 podman 源码&#xff0c;没必要特地在物理机或服务上安装一个这样的操作系统&#xff0c;故采用在虚拟机里验证。 2. 安装依赖 参考资料&#xf…

【K8S系列】深入解析 Kubernetes 中的 Deployment

Kubernetes&#xff08;K8s&#xff09;是一个开源的容器编排平台&#xff0c;旨在自动化应用程序的部署、扩展和管理。在 Kubernetes 中&#xff0c;Deployment 是一种用于管理无状态应用的工作负载资源&#xff0c;提供了丰富的功能&#xff0c;包括版本控制、滚动更新和回滚…

玩转 Burp Suite (1)

内容预览 ≧∀≦ゞ 玩转 Burp Suite (1)声明Burp Suite 简介Dashboard&#xff08;仪表盘&#xff09;1. 默认任务管理2. 暂停任务3. 新建扫描任务4. 使用总结 Target&#xff08;目标&#xff09;1. SIte Map &#xff08;站点地图&#xff09;2. Scope&#xff08;范围&#…

【ArcGISPro】Sentinel-2数据处理

错误 默认拉进去只组织了4个波段,但是实际有12个波段 解决方案 数据下载 Sentinel-2 数据下载-CSDN博客 数据处理 数据查看 创建镶嵌数据集 在数据管理工具箱中找到创建镶嵌数据集

智慧环保大数据解决方案

1. 智慧环保概述 智慧环保是“数字环保”的延伸&#xff0c;借助物联网技术整合环境监控对象&#xff0c;通过云计算实现环境管理与决策的智能化。其核心在于快速感知城市环境指标&#xff0c;保障人体健康与生命安全。 2. 智慧环保总体目标 智慧环保的总体目标是建立全面感…

【H2O2|全栈】JS进阶知识(八)ES6(4)

目录 前言 开篇语 准备工作 浅拷贝和深拷贝 浅拷贝 概念 常见方法 弊端 案例 深拷贝 概念 常见方法 弊端 逐层拷贝 原型 构造函数 概念 形式 成员 弊端 显式原型和隐式原型 概念 形式 constructor 概念 形式 原型链 概念 形式 结束语 前言 开篇语…

03-微服务搭建

1、搭建分布式基本环境 分布式组件 功能 SpringCloud Alibaba - Nacos 注册中心&#xff08;服务发现/注册&#xff09;、配置中心&#xff08;动态配置管理&#xff09; SpringCloud Alibaba - Sentinel 服务容错&#xff08;限流、降级、熔断&#xff09; SpringCloud …

Vue前端开发2.3.2-4 绑定指令

本文介绍了Vue中的绑定指令&#xff0c;包括属性绑定指令v-bind、事件绑定指令v-on以及双向数据绑定指令v-model。通过创建单文件组件&#xff0c;演示了如何使用这些指令来控制DOM属性、监听事件和实现表单输入与数据的双向同步。同时&#xff0c;探讨了v-model的修饰符如.num…

uniapp开发支付宝小程序自定义tabbar样式异常

解决方案&#xff1a; 这个问题应该是支付宝基础库的问题&#xff0c;除了依赖于官方更新之外&#xff0c;开发者可以利用《自定义 tabBar》曲线救国 也就是创建一个空内容的自定义tabBar&#xff0c;这样即使 tabBar 被渲染出来&#xff0c;但从视觉上也不会有问题 1.官方文…