(17)Hive ——MR任务的map与reduce个数由什么决定?

一、MapTask的数量由什么决定?

    MapTask的数量由以下参数决定

  • 文件个数
  • 文件大小
  • blocksize

 一般而言,对于每一个输入的文件会有一个map split,每一个分片会开启一个map任务,很容易导致小文件问题(如果不进行小文件合并,极可能导致Hadoop集群资源雪崩)

hive中小文件产生的原因及解决方案见文章:

(14)Hive调优——合并小文件-CSDN博客文章浏览阅读779次,点赞10次,收藏17次。Hive的小文件问题https://blog.csdn.net/SHWAITME/article/details/136108785

maxSize的默认值为256M,minSize的默认值是1byte切片大小splitSize的计算公式:

splitSize=Min(maxSize,Max(minSize,blockSize)) = Min(256M,Max(1 byte ,128M)) = 128M =blockSize

所以默认splitSize就等于blockSize块大小

# minSize的默认值是1byte
set mapred.min.split.size=1

#maxSize的默认值为256M
set mapred.max.split.size=256000000

#hive.input.format是用来指定输入格式的参数。决定了Hive读取数据时使用的输入格式,
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat

二、如何调整MapTask的数量

  假设blockSize一直是128M,且splitSize = blockSize = 128M。在不改变blockSize块大小的情况下,如何增加/减少mapTask数量

2.1 增加map的数量

      增加map:需要调小maxSize,且要小于blockSize才有效,例如maxSize调成100byte

      splitSize=Min(maxSize,Max(minSize,blockSize)) = Min(100,Max(1,128*1000*1000)) =100 byte = maxSize

   调整前的map数 = 输入文件的大小/ splitSize = 输入文件的大小/ 128M

   调整后的map数 = 输入文件的大小/ splitSize = 输入文件的大小/ 100byte

2.2 减少map的数量 

     减少map:需要调大minSize ,且要大于blockSize才有效,例如minSize 调成200M

      splitSize=Min(maxSize,Max(minSize,blockSize)) = Min(256m, Max(200M,128M)) = 200M=minSize

   调整前的map数 = 输入文件的大小/ splitSize = 输入文件的大小/ 128M

   调整后的map数 = 输入文件的大小/ splitSize = 输入文件的大小/ 200M

三、ReduceTask的数量决定

    reduce的个数决定hdfs上落地文件的个数(即: reduce个数决定文件的输出个数)。 ReduceTask的数量,由参数mapreduce.job.reduces 控制,默认值为 -1 时,代表ReduceTask的数量是根据hive的数据量动态计算的。

总体而言,ReduceTask的数量决定方式有以下两种:

3.1 方式一:hive动态计算

3.1.1 动态计算公式

 ReduceTask数量 = min (参数2,输入的总数据量/ 参数1)

参数1:hive.exec.reducers.bytes.per.reducer 

      含义:每个reduce任务处理的数据量(默认值:256M)

参数2:hive.exec.reducers.max 

     含义:每个MR任务能开启的reduce任务数的上限值(默认值:1009个)

ps: 一般参数2的值不会轻易变动,因此在普通集群规模下,hive根据数据量动态计算reduce的个数,计算公式为:输入总数据量/hive.exec.reducers.bytes.per.reducer

3.1.2 源码分析

(1)通过源码分析 hive是如何动态计算reduceTask的个数的?

	在org.apache.hadoop.hive.ql.exec.mr包下的 MapRedTask类中
	//方法类调用逻辑
	MapRedTask 
	     | ----setNumberOfReducers 
	                  | ---- estimateNumberOfReducers
	                    		 |---- estimateReducers         

(2)核心方法setNumberOfReducers(读取 用户手动设置的reduce个数)

  /**
   * Set the number of reducers for the mapred work.
   */
  private void setNumberOfReducers() throws IOException {
    ReduceWork rWork = work.getReduceWork();
    // this is a temporary hack to fix things that are not fixed in the compiler
    // 获取通过外部传参设置reduce数量的值 rWork.getNumReduceTasks() 
    Integer numReducersFromWork = rWork == null ? 0 : rWork.getNumReduceTasks();

    if (rWork == null) {
      console
          .printInfo("Number of reduce tasks is set to 0 since there's no reduce operator");
    } else {
       
      if (numReducersFromWork >= 0) {
      //如果手动设置了reduce的数量 大于等于0 ,则进来,控制台打印日志
        console.printInfo("Number of reduce tasks determined at compile time: "
            + rWork.getNumReduceTasks());
      } else if (job.getNumReduceTasks() > 0) {
      //如果手动设置了reduce的数量,获取配置中的值,并传入到work中
        int reducers = job.getNumReduceTasks();
        rWork.setNumReduceTasks(reducers);
        console
            .printInfo("Number of reduce tasks not specified. Defaulting to jobconf value of: "
            + reducers);
      } else {
       //如果没有手动设置reduce的数量,进入方法
        if (inputSummary == null) {
          inputSummary =  Utilities.getInputSummary(driverContext.getCtx(), work.getMapWork(), null);
        }
   // #==========【重中之中】estimateNumberOfReducers 
        int reducers = Utilities.estimateNumberOfReducers(conf, inputSummary, work.getMapWork(),
                                                          work.isFinalMapRed());
        rWork.setNumReduceTasks(reducers);
        console
            .printInfo("Number of reduce tasks not specified. Estimated from input data size: "
            + reducers);

      }
      
      //hive shell中所看到的控制台打印日志就在这里
      console
          .printInfo("In order to change the average load for a reducer (in bytes):");
      console.printInfo("  set " + HiveConf.ConfVars.BYTESPERREDUCER.varname
          + "=<number>");
      console.printInfo("In order to limit the maximum number of reducers:");
      console.printInfo("  set " + HiveConf.ConfVars.MAXREDUCERS.varname
          + "=<number>");
      console.printInfo("In order to set a constant number of reducers:");
      console.printInfo("  set " + HiveConf.ConfVars.HADOOPNUMREDUCERS
          + "=<number>");
    }
  }

 (3)如果没有手动设置reduce的个数,hive是如何动态计算reduce个数的?

    int reducers = Utilities.estimateNumberOfReducers(conf, inputSummary, work.getMapWork(),
                                                          work.isFinalMapRed());
                                                          
  /**
   * Estimate the number of reducers needed for this job, based on job input,
   * and configuration parameters.
   *
   * The output of this method should only be used if the output of this
   * MapRedTask is not being used to populate a bucketed table and the user
   * has not specified the number of reducers to use.
   *
   * @return the number of reducers.
   */
  public static int estimateNumberOfReducers(HiveConf conf, ContentSummary inputSummary,
                                             MapWork work, boolean finalMapRed) throws IOException {
    // bytesPerReducer 每个reduce处理的数据量,默认值为256M  BYTESPERREDUCER("hive.exec.reducers.bytes.per.reducer", 256000000L)
    long bytesPerReducer = conf.getLongVar(HiveConf.ConfVars.BYTESPERREDUCER);
    
    //整个mr任务,可以开启的reduce个数的上限值:maxReducers的默认值1009个MAXREDUCERS("hive.exec.reducers.max", 1009)
    int maxReducers = conf.getIntVar(HiveConf.ConfVars.MAXREDUCERS);
   
   //#===========对totalInputFileSize的计算
    double samplePercentage = getHighestSamplePercentage(work);
    long totalInputFileSize = getTotalInputFileSize(inputSummary, work, samplePercentage);

    // if all inputs are sampled, we should shrink the size of reducers accordingly.
    if (totalInputFileSize != inputSummary.getLength()) {
      LOG.info("BytesPerReducer=" + bytesPerReducer + " maxReducers="
          + maxReducers + " estimated totalInputFileSize=" + totalInputFileSize);
    } else {
      LOG.info("BytesPerReducer=" + bytesPerReducer + " maxReducers="
        + maxReducers + " totalInputFileSize=" + totalInputFileSize);
    }

    // If this map reduce job writes final data to a table and bucketing is being inferred,
    // and the user has configured Hive to do this, make sure the number of reducers is a
    // power of two
    boolean powersOfTwo = conf.getBoolVar(HiveConf.ConfVars.HIVE_INFER_BUCKET_SORT_NUM_BUCKETS_POWER_TWO) &&
        finalMapRed && !work.getBucketedColsByDirectory().isEmpty();
    
    //#==============【真正计算reduce个数的方法】看源码的技巧return的方法是重要核心方法
    return estimateReducers(totalInputFileSize, bytesPerReducer, maxReducers, powersOfTwo);
  }                                                

(4) 动态计算reduce个数的方法 estimateReducers

	public static int estimateReducers(long totalInputFileSize, long bytesPerReducer,
      int maxReducers, boolean powersOfTwo) {
    
  
    double bytes = Math.max(totalInputFileSize, bytesPerReducer);
  // 假设totalInputFileSize 1000M
    // bytes=Math.max(1000M,256M)=1000M


    int reducers = (int) Math.ceil(bytes / bytesPerReducer);
    
    //reducers=(int)Math.ceil(1000M/256M)=4  此公式说明如果totalInputFileSize 小于256M ,则reducers=1 ;也就是当输入reduce端的数据量特别小,即使手动设置reduce Task数量为5,最终也只会开启1个reduceTask
    
  
    reducers = Math.max(1, reducers);
  //Math.max(1, 4)=4 ,reducers的结果还是4

    reducers = Math.min(maxReducers, reducers);
    //Math.min(1009,4)=4; reducers的结果还是4

      
    int reducersLog = (int)(Math.log(reducers) / Math.log(2)) + 1;
    int reducersPowerTwo = (int)Math.pow(2, reducersLog);

    if (powersOfTwo) {
      // If the original number of reducers was a power of two, use that
      if (reducersPowerTwo / 2 == reducers) {
        // nothing to do
      } else if (reducersPowerTwo > maxReducers) {
        // If the next power of two greater than the original number of reducers is greater
        // than the max number of reducers, use the preceding power of two, which is strictly
        // less than the original number of reducers and hence the max
        reducers = reducersPowerTwo / 2;
      } else {
        // Otherwise use the smallest power of two greater than the original number of reducers
        reducers = reducersPowerTwo;
      }
    }
    return reducers;
  }

3.2 方式二:用户手动指定

 手动调整reduce个数: set mapreduce.job.reduces = 10

 需要注意:出现以下几种情况时,手动调整reduce个数不生效。

3.2.1 order by 全局排序

   sql中使用了order by全局排序,那只能在一个reduce中完成,无论怎么调整reduce的数量都是无效的。

  
  hive (default)>set mapreduce.job.reduces=5;
  hive (default)> select * from empt order  by length(ename);
  Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 1

3.2.2 map端输出的数据量很小

在【3.1.2 源码分析——(4) 】动态计算reduce个数的核心方法 estimateReducers中,有下面这三行代码:

int reducers = (int) Math.ceil(bytes / bytesPerReducer);
reducers = Math.max(1, reducers);
reducers = Math.min(maxReducers, reducers);

   如果map端输出的数据量bytes (假如只有1M) 远小于hive.exec.reducers.bytes.per.reducer (每个reduce处理的数据量默认值为256M) 参数值,maxReducers默认为1009个,计算下列值:

  int reducers  = (int) Math.ceil(1 / 256M)=1;
  reducers = Math.max(1, 1)=1;
  reducers = Math.min(1009, 1)=1;

  此时即使用户手动 set mapreduce.job.reduces=10,也不生效,reduce个数最后还是只有1个。

参考文章:

Hive mapreduce的map与reduce个数由什么决定?_hive中map任务和reduce任务数量计算原理-CSDN博客

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/389428.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

软件实例分享,药店进销存软件医药系统进销存教程

软件实例分享&#xff0c;药店进销存软件医药系统进销存教程 一、前言 以下软件程序教程以 佳易王药店进销存管理系统V16.0为例说明 软件文件下载可以点击最下方官网卡片——软件下载——试用版软件下载 软件可以对药品的有效期进行管理&#xff0c;可以查询还有多少天到期的…

如何查找Windows的桌面文件夹?这里提供详细步骤

当你的电脑上有不同的用户时&#xff0c;Windows 11、10、…上的桌面文件夹或桌面目录特别有用&#xff0c;那么哪里才是真正的桌面文件夹目录。 自己的Windows桌面目录 1、启动Windows资源管理器 2、按F4键并输入%UserProfile% 3、点击桌面 这是你个人桌面的正确文件夹路径…

【数据分享】1980s~2020年青藏高原中分辨率土地覆被数据

各位同学们好&#xff0c;今天和大伙儿分享的是1980s~2020年青藏高原中分辨率土地覆被数据。如果大家有下载处理数据等方面的问题&#xff0c;您可以私信或评论。 吴炳方. (2023). 青藏高原中分辨率土地覆被数据&#xff08;1980s-2020&#xff09;. 国家青藏高原数据中心. 1 …

【STM32 CubeMX】I2C查询方式

文章目录 前言一、CubeMX配置IIC二、查询方式的使用2.1 分析一种情况2.2 Master模式2.3 Mem模式 总结 前言 在STM32 CubeMX环境中&#xff0c;I2C&#xff08;Inter-Integrated Circuit&#xff09;通信协议的查询方式是一种简单而常见的通信方式。通过查询方式&#xff0c;微…

【并发编程】AQS原理

&#x1f4dd;个人主页&#xff1a;五敷有你 &#x1f525;系列专栏&#xff1a;并发编程 ⛺️稳中求进&#xff0c;晒太阳 1. 概述 全称是 AbstractQueuedSynchronizer&#xff0c;是阻塞式锁和相关的同步器工具的框架 特点&#xff1a; 用 state 属性来表示资源的状…

【漏洞复现】某情侣飞行棋源码未授权访问漏洞

Nx01 产品简介 情侣飞行棋源码是一款专为情侣设计的互动游戏。通过游戏的方式&#xff0c;它可以帮助情侣们建立共同兴趣&#xff0c;增加互动&#xff0c;增进了解&#xff0c;培养默契&#xff0c;并创造美好的回忆。 Nx02 漏洞描述 某情侣飞行棋源码存在未授权访问漏洞&…

讲解用Python处理Excel表格

我们今天来一起探索一下用Python怎么操作Excel文件。与word文件的操作库python-docx类似&#xff0c;Python也有专门的库为Excel文件的操作提供支持&#xff0c;这些库包括xlrd、xlwt、xlutils、openpyxl、xlsxwriter几种&#xff0c;其中我最喜欢用的是openpyxl&#xff0c;这…

[嵌入式系统-16]:RT-Thread -2- 主要功能功能组件详解与API函数说明

目录 一、RT-Thread主要功能组件 二、内核组件 2.1 概述 2.2 API 三、设备驱动 3.1 概述 3.2 API 四、通信组件 4.1 概述 4.4 API 五、网络组件 5.1 概述 5.2 API 5.3 补充&#xff1a;MQTT协议 六、文件系统 6.1 概述 6.2 API 七、GUI 组件 7.1 概述 7.2 …

小米米家智能摄像头mp4多碎片手工恢复案例

小米米家智能摄像头mp4多碎片手工恢复案例 智能摄像头品牌中小米算是绝对的大厂&#xff0c;其采用的方案也是比较成熟比较典型的&#xff1a;日志截图1分钟1个文件。小米米家的智能摄像头之前处理过很多&#xff0c;这次来讲一个比较特殊的案例。 故障存储: 32G TF卡 fat…

机器学习和统计学的区别?

1、本质区别&#xff1a; 目标&#xff1a;机器学习的核心目标是建立一个可以自动学习和改进的模型&#xff0c;以预测未知数据。它更关注结果的准确性和模型的泛化能力&#xff0c;通常不关心模型是否可以解释。而统计学的目标是探究变量之间的关系&#xff0c;理解数据的内在…

HCIA-HarmonyOS设备开发认证V2.0-轻量系统内核基础-信号量semaphore

目录 一、信号量基本概念二、信号量运行机制三、信号量开发流程四、信号量接口五、代码分析&#xff08;待续...&#xff09;坚持就有收获 一、信号量基本概念 信号量&#xff08;Semaphore&#xff09;是一种实现任务间通信的机制&#xff0c;可以实现任务间同步或共享资源的…

RT-Thread(RTT)如何打印输出浮点数

问题&#xff1a; 一、基于RTT的工程下&#xff0c;打印输出浮点数 二、输出的都是这些&#xff0c;因为RTT默认下不支持输出浮点数 解决&#xff1a; 一、点击RT-Thread Settings 二、点击添加软件包 三、输入print &#xff0c;搜索后添加rt_vsnprintf_full这个 四、添加后…

【惠友小课堂】滑雪的尽头是骨科?这份滑雪指南快收好,安全快乐两不误

今年滑雪运动异常火爆&#xff0c;寒假一开启&#xff0c;不少家长趁着放假打算带孩子出门玩一趟&#xff0c;各地的滑雪场也成了最热门的旅游项目之一。 但说到滑雪 不少网友调侃“听说雪道的尽头是骨科”还有人说“今年滑雪一共花了2万”“滑雪2000&#xff0c;骨折进医院180…

相机图像质量研究(13)常见问题总结:光学结构对成像的影响--鬼影

系列文章目录 相机图像质量研究(1)Camera成像流程介绍 相机图像质量研究(2)ISP专用平台调优介绍 相机图像质量研究(3)图像质量测试介绍 相机图像质量研究(4)常见问题总结&#xff1a;光学结构对成像的影响--焦距 相机图像质量研究(5)常见问题总结&#xff1a;光学结构对成…

RK3568笔记十六:Framebuffer实验

若该文为原创文章&#xff0c;转载请注明原文出处。 本意是移植LVGL&#xff0c;但在编译DRM过程中一直编译失败&#xff0c;然后就想Framebuffer是否可以用&#xff0c;所以测试一下。 一、framebuffer介绍 FrameBuffer中文译名为帧缓冲驱动&#xff0c;它是出现在2.2.xx内…

【刷题记录】合并两个有序数组、移除元素

本系列博客为个人刷题思路分享&#xff0c;有需要借鉴即可。 1.题目链接&#xff1a; T1&#xff1a;LINK T2&#xff1a;LINK 2.详解思路&#xff1a; T1: 思路1&#xff1a;弄个新数组&#xff0c;比较两个数组中的值&#xff0c;哪个小就把哪个值放到新数组中。 分析1&a…

精品jsp+ssm汽车店维保信息系统

《[含文档PPT源码等]精品jspssm汽车店维保信息系统[包运行成功]》该项目含有源码、文档、PPT、配套开发软件、软件安装教程、项目发布教程、包运行成功&#xff01; 使用技术&#xff1a; 开发语言&#xff1a;Java 框架&#xff1a;ssm 技术&#xff1a;JSP JDK版本&…

智慧供应链控制塔大数据解决方案

一、供应链控制塔的概念定义 (1) Gartner 的定义: “控制塔是一个物理或虚拟仪表板,提供准确的、及时的、完整的物流事件和数据,从组织和服务的内部和跨组织运作供应链,以协调所有相关活动。”、“供应链控制塔…提供供应链端到端整体可见性和近实时信息和决策的概念……

0901多元函数的基本概念-多元函数微分法及其应用

文章目录 1 平面点集1.1 坐标平面1.2 平面点集1.3 邻域1.4 电与点集的关系1.5 聚点1.6 点集所属点的特征定义的平面点集 2 多元函数的概念2.1 定义2.2 值域2.3推广2.4 自然定义域2.5 二元函数的图形 3 多元函数的极限4 多元函数的连续性4.1 连续函数定义4.2 间断点定义4.3 多元…

『 C++ - STL 』位图(BitMap)与布隆过滤器(Bloom Filter)

文章目录 &#x1f9f8; 位图(BitMap)概念&#x1f9f8; 位图的实现&#x1fa85; 总体框架&#x1fa85; 位图的数据插入&#x1f9e9; 左移操作与右移操作的区别 &#x1fa85; 位图的数据删除&#x1fa85; 位图的数据查找&#x1fa85; 位图整体代码(供参考) &#x1f9f8;…