Hadoop中MapReduce过程中Shuffle过程实现自定义排序

文章目录

  • Hadoop中MapReduce过程中Shuffle过程实现自定义排序
    • 一、引言
    • 二、实现WritableComparable接口
      • 1、自定义Key类
    • 三、使用Job.setSortComparatorClass方法
      • 2、设置自定义排序器
      • 3、自定义排序器类
    • 四、使用示例
    • 五、总结

Hadoop中MapReduce过程中Shuffle过程实现自定义排序

在这里插入图片描述

一、引言

MapReduce框架中的Shuffle过程是连接Map阶段和Reduce阶段的桥梁,负责将Map任务的输出结果按照key进行分组和排序,并将相同key的数据传递给对应的Reduce任务进行处理。Shuffle过程的性能直接影响到整个MapReduce作业的执行效率。在默认情况下,Hadoop使用TotalOrderPartitioner进行排序,但有时我们需要根据特定的业务逻辑进行自定义排序。本文将介绍两种方法来实现自定义排序:实现WritableComparable接口和使用Job.setSortComparatorClass方法。下面是详细的步骤和代码示例。

二、实现WritableComparable接口

1、自定义Key类

首先,我们需要定义一个类并实现WritableComparable接口,该接口要求实现compareTo方法,用于定义排序逻辑。

package mr;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class Employee implements WritableComparable<Employee> {
    private int empno;
    private String ename;
    private String job;
    private int mgr;
    private String hiredate;
    private int sal;
    private int comm;
    private int deptno;

    @Override
    public String toString(){
        return "Employee[empno="+empno+",ename="+ename+",sal="+sal+",deptno="+deptno+"]";
    }

    @Override
    public int compareTo(Employee o) {
        // 多个列的排序:select * from emp order by deptno,sal;
        // 首先按照deptno排序
        if(this.deptno > o.getDeptno()){
            return 1;
        }else if(this.deptno < o.getDeptno()){
            return -1;
        }
        // 如果deptno相等,按照sal排序
        if(this.sal >= o.getSal()){
            return 1;
        }else{
            return -1;
        }
    }

    @Override
    public void write(DataOutput output) throws IOException {
        // 序列化
        output.writeInt(this.empno);
        output.writeUTF(this.ename);
        output.writeUTF(this.job);
        output.writeInt(this.mgr);
        output.writeUTF(this.hiredate);
        output.writeInt(this.sal);
        output.writeInt(this.comm);
        output.writeInt(this.deptno);
    }

    @Override
    public void readFields(DataInput input) throws IOException {
        // 反序列化
        this.empno = input.readInt();
        this.ename = input.readUTF();
        this.job = input.readUTF();
        this.mgr = input.readInt();
        this.hiredate = input.readUTF();
        this.sal = input.readInt();
        this.comm = input.readInt();
        this.deptno = input.readInt();
    }
}

三、使用Job.setSortComparatorClass方法

2、设置自定义排序器

除了实现WritableComparable接口外,我们还可以使用Job.setSortComparatorClass方法来设置自定义排序器。这种方法允许我们在不修改Key类的情况下实现自定义排序。

package mr;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class CustomSort {
    public static class Map extends Mapper<Object, Text, Employee, IntWritable> {
        private static Employee emp = new Employee();
        private static IntWritable one = new IntWritable(1);

        @Override
        protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            String[] line = value.toString().split("\t");
            emp.setEmpno(Integer.parseInt(line[0]));
            emp.setEname(line[1]);
            emp.setJob(line[2]);
            emp.setMgr(Integer.parseInt(line[3]));
            emp.setHiredate(line[4]);
            emp.setSal(Integer.parseInt(line[5]));
            emp.setComm(Integer.parseInt(line[6]));
            emp.setDeptno(Integer.parseInt(line[7]));
            context.write(emp, one);
        }
    }

    public static class Reduce extends Reducer<Employee, IntWritable, Employee, IntWritable> {
        @Override
        protected void reduce(Employee key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            for (IntWritable val : values) {
                context.write(key, val);
            }
        }
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "CustomSort");
        job.setJarByClass(CustomSort.class);
        job.setMapperClass(Map.class);
        job.setReducerClass(Reduce.class);
        job.setOutputKeyClass(Employee.class);
        job.setOutputValueClass(IntWritable.class);
        // 设置自定义排序器
        job.setSortComparatorClass(EmployeeComparator.class);
        
        Path in = new Path("hdfs://localhost:9000/mr/in/customsort");
        Path out = new Path("hdfs://localhost:9000/mr/out/customsort");
        FileInputFormat.addInputPath(job, in);
        FileOutputFormat.setOutputPath(job, out);

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

3、自定义排序器类

package mr;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class EmployeeComparator extends WritableComparator {
    protected EmployeeComparator() {
        super(Employee.class, true);
    }

    @Override
    public int compare(WritableComparable w1, WritableComparable w2) {
        Employee e1 = (Employee) w1;
        Employee e2 = (Employee) w2;
        // 首先按照deptno排序
        int deptCompare = Integer.compare(e1.getDeptno(), e2.getDeptno());
        if (deptCompare != 0) {
            return deptCompare;
        }
        // 如果deptno相等,按照sal排序
        return Integer.compare(e1.getSal(), e2.getSal());
    }
}

四、使用示例

下面是一个简单的MapReduce示例,展示了Shuffle过程在实际应用中的使用。这个示例中,我们使用了自定义的Employee类作为Key,并设置了自定义的排序器EmployeeComparator

五、总结

通过实现WritableComparable接口和使用Job.setSortComparatorClass方法,我们可以在Hadoop MapReduce过程中实现自定义排序。这两种方法提供了灵活的排序机制,允许我们根据不同的业务需求对数据进行排序处理,从而提高数据处理的效率和准确性。


版权声明:本博客内容为原创,转载请保留原文链接及作者信息。

参考文章

  • Hadoop之mapreduce数据排序案例(详细代码)
  • Java Job.setSortComparatorClass方法代码示例

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/942142.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

论文《Vertical Federated Learning: Concepts, Advances, and Challenges》阅读

论文《Vertical Federated Learning: Concepts, Advances, and Challenges》阅读 论文概况纵向联邦VFL框架介绍问题定义VFL 训练协议 对通信效率的优化对性能的优化自监督方案&#xff08;Self-Supervised Approaches&#xff09;半监督方案&#xff08;Semi-Supervised Approa…

【Rust自学】4.5. 切片(Slice)

4.5.0. 写在正文之前 这是第四章的最后一篇文章了&#xff0c;在这里也顺便对这章做一个总结&#xff1a; 所有权、借用和切片的概念确保 Rust 程序在编译时的内存安全。 Rust语言让程序员能够以与其他系统编程语言相同的方式控制内存使用情况&#xff0c;但是当数据所有者超…

WEB入门——文件上传漏洞

文件上传漏洞 一、文件上传漏洞 1.1常见的WebShell有哪些&#xff1f;1.2 一句话木马演示1.2 文件上传漏洞可以利用需满足三个条件1.3 文件上传导致的危害 二、常用工具 2.1 搭建upload-labs环境2.2 工具准备 三、文件上传绕过 3.1 客户端绕过 3.1.1 实战练习 &#xff1a;upl…

【NLP高频面题 - Transformer篇】Transformer的位置编码是如何计算的?

【NLP高频面题 - Transformer篇】Transformer的位置编码是如何计算的&#xff1f; 重要性&#xff1a;★★★ NLP Github 项目&#xff1a; NLP 项目实践&#xff1a;fasterai/nlp-project-practice 介绍&#xff1a;该仓库围绕着 NLP 任务模型的设计、训练、优化、部署和应用…

[react 3种方法] 获取ant组件ref用ts如何定义?

获取ant的轮播图组件, 我用ts如何定义? Strongly Type useRef with ElementRef | Total TypeScript import React, { ElementRef } from react; const lunboRef useRef<ElementRef<typeof Carousel>>(null); <Carousel autoplay ref{lunboRef}> 这样就…

模型优化之知识蒸馏

文章目录 知识蒸馏优点工作原理示例代码 知识蒸馏优点 把老师模型中的规律迁移到学生模型中&#xff0c;相比从头训练&#xff0c;加快了训练速度。另一方面&#xff0c;如果学生模型的训练精度和老师模型差不多&#xff0c;相当于得到了规模更小的学生模型&#xff0c;起到模…

职业技能赛赛后心得

这是一位粉丝所要求的&#xff0c;也感谢这位粉丝对我的支持。 那么本篇文章我也是分成四个部分&#xff0c;来总结一下这次赛后心得。 赛中问题 那么这里的赛中问题不会只包含我所遇到的问题&#xff0c;也会包含赛中其他选手出现的问题。 那么首先我先说一下我在赛中遇到的…

基于springboot+vue实现的博物馆游客预约系统 (源码+L文+ppt)4-127

摘 要 旅游行业的快速发展使得博物馆游客预约系统成为了一个必不可少的工具。基于Java的博物馆游客预约系统旨在提供高效、准确和便捷的适用博物馆游客预约服务。本文讲述了基于java语言开发&#xff0c;后台数据库选择MySQL进行数据的存储。该软件的主要功能是进行博物馆游客…

前沿重器[57] | sigir24:大模型推荐系统的文本ID对齐学习

前沿重器 栏目主要给大家分享各种大厂、顶会的论文和分享&#xff0c;从中抽取关键精华的部分和大家分享&#xff0c;和大家一起把握前沿技术。具体介绍&#xff1a;仓颉专项&#xff1a;飞机大炮我都会&#xff0c;利器心法我还有。&#xff08;算起来&#xff0c;专项启动已经…

Dubbo 3.x源码(28)—Dubbo服务发布导出源码(7)应用级服务接口元数据发布

基于Dubbo 3.1&#xff0c;详细介绍了Dubbo服务的发布与引用的源码。 此前我们在Dubbo启动过程的DefaultModuleDeployer#startSync方法中&#xff0c;学习了Dubbo服务的导出exportServices方法和服务的引入referServices方法。 在这两个操作执行完毕之后&#xff0c;将会继续调…

电脑使用CDR时弹出错误“计算机丢失mfc140u.dll”是什么原因?“计算机丢失mfc140u.dll”要怎么解决?

电脑使用CDR时弹出“计算机丢失mfc140u.dll”错误&#xff1a;原因与解决方案 在日常电脑使用中&#xff0c;我们时常会遇到各种系统报错和文件丢失问题。特别是当我们使用某些特定软件&#xff0c;如CorelDRAW&#xff08;简称CDR&#xff09;时&#xff0c;可能会遇到“计算…

深入解读数据资产化实践指南(2024年)

本指南主要介绍了数据资产化的概念、目标和意义&#xff0c;以及实施数据资产化的过程。指南详细阐述了数据资产化的内涵&#xff0c;包括数据资产的定义、数据资产化的目标与意义&#xff0c;并介绍了数据资产化的过程包括业务数据化、数据资源化、数据产品化和数据资本化。 …

【算法篇】——数据结构中常见八大排序算法的过程原理详解

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言一、插入排序1.直接插入法2.希尔排序法 二、交换排序1. 冒泡排序2. 快速排序 三、选择排序1. 简单选择排序2. 堆排序 四、归并排序五、基数排序 前言 C数据结构…

仿闲鱼的二手交易小程序软件开发闲置物品回收平台系统源码

市场前景 闲置物品交易软件的市场前景广阔&#xff0c;主要基于以下几个方面的因素&#xff1a; 环保意识提升&#xff1a;随着人们环保意识的增强&#xff0c;越来越多的人开始关注资源的循环利用&#xff0c;闲置物品交易因此受到了广泛的关注。消费升级与时尚节奏加快&…

情报信息收集能力

红队专题-Web渗透之资产思路框架知识整理 钓鱼社工 钓鱼自动化zip域名ARP欺骗快捷方式ToolsburpsuiteApp 抓包ffuf模糊测试QingScanWiresharkCloudCFEn-Decodeffffffff0xInfodirbdirmapdirsearchdnsenum使用测试常规使用使用字典文件进行dns查询子域名暴力查询部分C类IP地址IP块…

ensp 关于acl的运用和讲解

ACL&#xff08;Access Control List&#xff0c;访问控制列表&#xff09;是一种常用于网络设备&#xff08;如路由器、交换机&#xff09;上的安全机制&#xff0c;用于控制数据包的流动与访问权限。ACL 可以指定哪些数据包允许进入或离开某个网络接口&#xff0c;基于不同的…

5、mysql的读写分离

主从复制 主从复制的含义 主从复制&#xff1a;在一个mysql的集群当中&#xff0c;至少3台&#xff0c;即主1台&#xff0c;从2台。 当有数据写入时&#xff0c;主负责写入本库&#xff0c;然后把数据同步到从服务器。 一定是在主服务器写入数据&#xff0c;从服务器的写入…

高质量配音如何影响游戏的受欢迎度

在游戏行业中&#xff0c;创造沉浸式、引人入胜且令人难忘的体验往往决定了游戏的成功或失败。在影响游戏流行度的众多因素中&#xff0c;配音脱颖而出&#xff0c;成为将叙事与玩家互动连接起来的重要元素。高质量的配音将游戏中的对白转化为游戏的活跃部分&#xff0c;让玩家…

鸿蒙-expandSafeArea使用

应用未使用setWindowLayoutFullScreen()接口设置窗口全屏布局时&#xff0c;默认使能组件安全区布局。可以使用expandSafeArea属性扩展安全区域属性进行调整 扩展安全区域属性原理 布局阶段按照安全区范围大小进行UI元素布局。布局完成后查看设置了expandSafeArea的组件边界&…

Java测试开发平台搭建(四)拦截器

1. 拦截器的作用及使用场景 能够在请求的生命周期的不同阶段进行拦截和处理。常见的使用场景包括&#xff1a;1. 日志记录&#xff1a;记录请求和响应的日志。 2. 权限验证&#xff1a;检查用户的登录状态、权限。 3. 性能监控&#xff1a;记录请求的处理时间&#xff0c;监控…