hadoop学习:mapreduce入门案例四:partitioner 和 combiner

先简单介绍一下partitioner 和 combiner 

Partitioner类

  • 用于在Map端对key进行分区
    • 默认使用的是HashPartitioner
      • 获取key的哈希值
      • 使用key的哈希值对Reduce任务数求模
    • 决定每条记录应该送到哪个Reducer处理
  • 自定义Partitioner
    • 继承抽象类Partitioner,重写getPartition方法
    • job.setPartitionerClass(MyPartitioner.class)

Combiner类

  • Combiner相当于本地化的Reduce操作
    • 在shuffle之前进行本地聚合
    • 用于性能优化,可选项
    • 输入和输出类型一致
  • Reducer可以被用作Combiner的条件
    • 符合交换律和结合律
  • 实现Combiner
    • job.setCombinerClass(WCReducer.class)

我们进入案例来看这两个知识点

一 案例需求

一个存放电话号码的文本,我们需要136 137,138 139和其它开头的号码分开存放统计其每个数字开头的号码个数

效果

 二 PhoneMapper 类

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class PhoneMapper extends Mapper<LongWritable, Text,Text, IntWritable> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String phone = value.toString();
        Text text = new Text(phone);
        IntWritable intWritable = new IntWritable(1);
        context.write(text,intWritable);
    }
}

三 PhoneReducer 类

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class PhoneReducer extends Reducer<Text, IntWritable,Text,IntWritable> {
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int count = 0;
        for (IntWritable intWritable : values){
            count += intWritable.get();
        }
        context.write(key, new IntWritable(count));
    }
}

四 PhonePartitioner 类

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class PhonePartitioner extends Partitioner<Text, IntWritable> {
    @Override
    public int getPartition(Text text, IntWritable intWritable, int i) {
        //136,137   138,139     其它号码放一起
        if("136".equals(text.toString().substring(0,3)) || "137".equals(text.toString().substring(0,3))){
            return 0;
        }else if ("138".equals(text.toString().substring(0,3)) || "139".equals(text.toString().substring(0,3))){
            return 1;
        }else {
            return 2;
        }

    }
}

五 PhoneCombiner 类

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class PhoneCombiner extends Reducer<Text, IntWritable,Text,IntWritable> {
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int count = 0;
        for(IntWritable intWritable : values){
            count += intWritable.get();
        }
        context.write(new Text(key.toString().substring(0,3)), new IntWritable(count));
    }
}

六 PhoneDriver 类

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class PhoneDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        job.setJarByClass(PhoneDriver.class);

        job.setMapperClass(PhoneMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        job.setCombinerClass(PhoneCombiner.class);

        job.setPartitionerClass(PhonePartitioner.class);
        job.setNumReduceTasks(3);

        job.setReducerClass(PhoneReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        Path inPath = new Path("in/demo4/phone.csv");
        FileInputFormat.setInputPaths(job, inPath);

        Path outPath = new Path("out/out6");
        FileSystem fs = FileSystem.get(outPath.toUri(),conf);
        if (fs.exists(outPath)){
            fs.delete(outPath, true);
        }
        FileOutputFormat.setOutputPath(job, outPath);

        job.waitForCompletion(true);

    }
}

七 小结

该案例新知识点在于分区(partition)和结合(combine)

这次代码的流程是 

driver——》mapper——》partitioner——》combiner——》reducer

map 每处理一条数据都经过一次 partitioner 分区然后存到环形缓存区中去,然后map再去处理下一条数据以此反复直至所有数据处理完成

combine 则是将环形缓存区溢出的缓存文件合并,并提前进行一次排序和计算(对每个溢出文件计算后再合并)最后将一个大的文件给到 reducer,这样大大减少了 reducer 的计算负担

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/102442.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

C++算法 —— 动态规划(1)斐波那契数列模型

文章目录 1、动规思路简介2、第N个泰波那契数列3、三步问题4、使用最小花费爬楼梯5、解码方法6、动规分析总结 1、动规思路简介 动规的思路有五个步骤&#xff0c;且最好画图来理解细节&#xff0c;不要怕麻烦。当你开始画图&#xff0c;仔细阅读题时&#xff0c;学习中的沉浸…

简明易懂:Python中的分支与循环

文章目录 前言分支结构if 语句&#xff1a;单一条件判断else语句&#xff1a;提供备选方案elif 语句&#xff1a;多条件判断嵌套的分支结构&#xff1a;复杂条件逻辑 循环结构for循环&#xff1a;遍历序列range()函数与for循环while循环&#xff1a;条件重复循环控制&#xff1…

day-01 Docker

一、docker简介 Docker 是一种开源的容器化平台&#xff0c;它可以帮助开发人员将应用程序及其依赖项打包成一个独立的、可移植的容器&#xff0c;而无需担心环境差异和依赖问题。通过使用 Docker&#xff0c;您可以更轻松地创建、分发和运行应用程序&#xff0c;无论是在开发、…

Java后端开发面试题——多线程

创建线程的方式有哪些&#xff1f; 继承Thread类 public class MyThread extends Thread {Overridepublic void run() {System.out.println("MyThread...run...");}public static void main(String[] args) {// 创建MyThread对象MyThread t1 new MyThread() ;MyTh…

纽扣电池/锂电池UN38.3安全检测报告

根据规章要求&#xff0c;航空公司和机场货物收运部门应对锂电池进行运输文件审查&#xff0c;重要的是每种型号的锂电池UN38.3安全检测报告。该报告可由的三方检测机构。如不能提供此项检测报告&#xff0c;将禁止锂电池进行航空运输. UN38.3包含产品&#xff1a;1、 锂电池2…

JVM 访问对象的两种方式

Java 程序会通过栈上的 reference 数据来操作堆上的具体对象。由于 reference 类型在《Java 虚拟机规范》里面只规定了它是一个指向对象的引用&#xff0c;并没有定义这个引用应该通过什么方式去定位、访问到堆中对象的具体位置&#xff0c;所以对象访问方式也是由虚拟机实现而…

【SpringSecurity】十二、集成JWT搭配Redis实现退出登录

文章目录 1、登出的实现思路2、集成Redis3、认证成功处理器4、退出成功处理器5、修改token校验过滤器6、调试 1、登出的实现思路 这是目前的token实现图&#xff1a; 因为JWT的无状态&#xff0c;服务端无法在使用过程中主动废止某个 token&#xff0c;或者更改 token 的权限…

【python爬虫】批量识别pdf中的英文,自动翻译成中文上

不管是上学还是上班,有时不可避免需要看英文文章,特别是在写毕业论文的时候。比较头疼的是把专业性很强的英文pdf文章翻译成中文。我记得我上学的时候,是一段一段复制,或者碰到不认识的单词就百度翻译一下,非常耗费时间。本文提供批量识别pdf中英文的方法,后续文章实现自…

Python3 条件控制

Python3 条件控制 Python 条件语句是通过一条或多条语句的执行结果&#xff08;True 或者 False&#xff09;来决定执行的代码块。 可以通过下图来简单了解条件语句的执行过程: 代码执行过程&#xff1a; if 语句 Python中if语句的一般形式如下所示&#xff1a; if conditi…

(超简单)将图片转换为ASCII字符图像

将一张图片转换为ASCII字符图像 原图&#xff1a; 效果图&#xff1a; import javax.imageio.ImageIO; import java.awt.image.BufferedImage; import java.io.File; import java.io.FileWriter; import java.io.IOException;public class ImageToASCII {/*** 将图片转换为A…

Java“牵手”1688商品列表数据,关键词搜索1688商品数据接口,1688API申请指南

1688商城是一个网上购物平台&#xff0c;售卖各类商品&#xff0c;包括服装、鞋类、家居用品、美妆产品、电子产品等。要获取1688商品列表和商品详情页面数据&#xff0c;您可以通过开放平台的接口或者直接访问1688商城的网页来获取商品详情信息。以下是两种常用方法的介绍&…

Python教程(12)——Python数据结构集合set介绍

集合 创建集合访问集合删除集合修改集合元素添加集合元素删除集合元素 集合运算&#xff1a;并集&#xff08;Union&#xff09;交集&#xff08;Intersection&#xff09;差集&#xff08;Difference&#xff09;对称差集&#xff08;Symmetric Difference&#xff09; 集合的…

嵌入式学习之进程

1.进程间通信概述 UNIX系统IPC是各种进程通信方式的统称。 2.管道通信原理 特点&#xff1a; 1.它是半双工的&#xff08;即数据只能在一个方向上流动&#xff09;&#xff0c;具有固定的读端和写端。 2.它只能用于具有亲缘关系的进程之间通信&#xff08;也是父子进程或者…

基于springboot跟redis实现的排行榜功能(实战)

概述 前段时间&#xff0c;做了一个世界杯竞猜积分排行榜。对世界杯64场球赛胜负平进行猜测&#xff0c;猜对1分&#xff0c;错误0分&#xff0c;一人一场只能猜一次。 1.展示前一百名列表。 2.展示个人排名(如&#xff1a;张三&#xff0c;您当前的排名106579)。 一.redis so…

K8S访问控制------认证(authentication )、授权(authorization )、准入控制(admission control )体系

一、账号分类 在K8S体系中有两种账号类型:User accounts(用户账号),即针对human user的;Service accounts(服务账号),即针对pod的。这两种账号都可以访问 API server,都需要经历认证、授权、准入控制等步骤,相关逻辑图如下所示: 二、authentication (认证) 在…

23062day6

作业&#xff1a;将dict.txt导入到数据库中。 方法1&#xff1a;创建shell脚本&#xff0c; 调用指令创建数据库和表格&#xff0c;使用循环在循环中用数组存储dict.txt的内容并插入表格中。 方法2&#xff1a;在终端创建数据库和表格&#xff0c;将dict.txt中的内容手动输入…

带纽扣电池产品出口澳洲安全标准,纽扣电池IEC 60086认证

澳大利亚政府公布了《消费品&#xff08;纽扣/硬币电池&#xff09;安全标准》和《消费品&#xff08;纽扣/硬币电池&#xff09;信息标准》。届时出口纽扣/硬币电池以及含有纽扣/硬币电池产品到澳大利亚的供应商&#xff0c;必须遵守这些标准中的要求。 一、 安全标准及信息标…

db2迁移至oracle

1.思路 &#xff08;1&#xff09;用java连接数据库&#xff08;2&#xff09;把DB2数据导出为通用的格式如csv&#xff0c;json等&#xff08;3&#xff09;导入其他数据库&#xff0c;比如oracle&#xff0c;mongodb。这个方法自由发挥的空间比较大。朋友说他会用springboot…

yolov5的pytorch配置

1. conda create -n rdd38 python3.82、pip install torch1.8.0 torchvision0.9.0 torchaudio0.8.0 -f https://download.pytorch.org/whl/cu113/torch_stable.html -i https://pypi.tuna.tsinghua.edu.cn/simple 3、conda install cudatoolkit10.2

在抖音中使用语聚AI,实现自动回复用户视频评论、私信问答

您可以通过集简云数据流程&#xff0c;将语聚AI助手集成到抖音视频评论、抖音私信&#xff0c;实现自动回复用户视频评论、私信问答&#xff0c;大大提升账号互动与运营效率。 效果如下&#xff1a; 自动化流程&#xff1a; ● 抖音普通号评论对接语聚AI&#xff08;点击可一…