Hadoop3:MapReduce中的Reduce Join和Map Join

一、概念说明

学过MySQL的都知道,join和left join
这里的join含义和MySQL的join含义一样
就是对两张表的数据,进行关联查询

Hadoop的MapReduce阶段,分为2个阶段
一个Map,一个Reduce
那么,join逻辑,就可以在这两个阶段实现。

两者有什么区别了?
我们都知道,一般情况下,MapTaskReduceTask线程数更多。
所以,当两张表,有一个表数据量非常大,一个表非常小的时候
我们建议放在Map阶段进行join,这样可以提高性能。

二、需求说明

有两张表数据
在这里插入图片描述
将商品信息表中数据根据商品pid合并到订单数据表中
在这里插入图片描述

三、代码实现

1、Reduce Join

TableBean

package com.atguigu.mapreduce.reduceJoin;

import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class TableBean implements Writable {

    private String id; // 订单id
    private String pid; // 商品id
    private int amount; // 商品数量
    private String pname;// 商品名称
    private String flag; // 标记是什么表 order pd

    // 空参构造
    public TableBean() {
    }

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getPid() {
        return pid;
    }

    public void setPid(String pid) {
        this.pid = pid;
    }

    public int getAmount() {
        return amount;
    }

    public void setAmount(int amount) {
        this.amount = amount;
    }

    public String getPname() {
        return pname;
    }

    public void setPname(String pname) {
        this.pname = pname;
    }

    public String getFlag() {
        return flag;
    }

    public void setFlag(String flag) {
        this.flag = flag;
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(id);
        out.writeUTF(pid);
        out.writeInt(amount);
        out.writeUTF(pname);
        out.writeUTF(flag);
    }

    @Override
    public void readFields(DataInput in) throws IOException {

        this.id = in.readUTF();
        this.pid = in.readUTF();
        this.amount = in.readInt();
        this.pname = in.readUTF();
        this.flag = in.readUTF();
    }

    @Override
    public String toString() {
        // id	pname	amount
        return  id + "\t" +  pname + "\t" + amount ;
    }
}

TableMapper
源数据,是多个文件的时候,我们要在setup方法里,获取文件信息
这样才能在map方法里知道,当前读取的是哪个文件,从而实现区别处理。

package com.atguigu.mapreduce.reduceJoin;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import java.io.IOException;

public class TableMapper extends Mapper<LongWritable, Text, Text, TableBean> {

    private String fileName;
    private Text outK  = new Text();
    private TableBean outV = new TableBean();

    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        // 初始化  order  pd
        FileSplit split = (FileSplit) context.getInputSplit();

        fileName = split.getPath().getName();
    }

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // 1 获取一行
        String line = value.toString();

        // 2 判断是哪个文件的
        if (fileName.contains("order")){// 处理的是订单表

            String[] split = line.split("\t");

            // 封装k  v
            outK.set(split[1]);
            outV.setId(split[0]);
            outV.setPid(split[1]);
            outV.setAmount(Integer.parseInt(split[2]));
            outV.setPname("");
            outV.setFlag("order");

        }else {// 处理的是商品表
            String[] split = line.split("\t");

            outK.set(split[0]);
            outV.setId("");
            outV.setPid(split[0]);
            outV.setAmount(0);
            outV.setPname(split[1]);
            outV.setFlag("pd");
        }

        // 写出
        context.write(outK, outV);
    }
}

TableReducer

这里要注意
for循环处理bean list的时候,我们要在循环里面,new一个bean,存入list中
因为,Hadoop中,Iterable里存放的是地址,所以,不在循环内new一个bean来存放
会导致数据覆盖,最终只是存了一个bean

package com.atguigu.mapreduce.reduceJoin;

import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;

public class TableReducer extends Reducer<Text, TableBean,TableBean, NullWritable> {

    @Override
    protected void reduce(Text key, Iterable<TableBean> values, Context context) throws IOException, InterruptedException {
//        01 	1001	1   order
//        01 	1004	4   order
//        01	小米   	     pd
        // 准备初始化集合
        ArrayList<TableBean> orderBeans = new ArrayList<>();
        TableBean pdBean = new TableBean();

        // 循环遍历
        for (TableBean value : values) {

            if ("order".equals(value.getFlag())){// 订单表

                TableBean tmptableBean = new TableBean();

                try {
                    BeanUtils.copyProperties(tmptableBean,value);
                } catch (IllegalAccessException e) {
                    e.printStackTrace();
                } catch (InvocationTargetException e) {
                    e.printStackTrace();
                }

                orderBeans.add(tmptableBean);
            }else {// 商品表

                try {
                    BeanUtils.copyProperties(pdBean,value);
                } catch (IllegalAccessException e) {
                    e.printStackTrace();
                } catch (InvocationTargetException e) {
                    e.printStackTrace();
                }
            }
        }

        // 循环遍历orderBeans,赋值 pdname
        for (TableBean orderBean : orderBeans) {

            orderBean.setPname(pdBean.getPname());

            context.write(orderBean,NullWritable.get());
        }
    }
}

TableDriver

package com.atguigu.mapreduce.reduceJoin;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class TableDriver {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Job job = Job.getInstance(new Configuration());

        job.setJarByClass(TableDriver.class);
        job.setMapperClass(TableMapper.class);
        job.setReducerClass(TableReducer.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(TableBean.class);

        job.setOutputKeyClass(TableBean.class);
        job.setOutputValueClass(NullWritable.class);

        FileInputFormat.setInputPaths(job, new Path("E:\\workspace\\data\\inputtable"));
        FileOutputFormat.setOutputPath(job, new Path("E:\\workspace\\data\\join1"));

        boolean b = job.waitForCompletion(true);
        System.exit(b ? 0 : 1);
    }

}

测试

在这里插入图片描述在这里插入图片描述

数据变化

1、源数据

在这里插入图片描述

2、Map方法中,按行读取数据

在这里插入图片描述

3、Shuffle阶段排序

因为,map方法中,用pid作为key,所以,这里对pid进行排序
在这里插入图片描述

4、Reduce方法,按key读取数据

这里的key只有3个,所以,reduce被调用了3次
每封装好一条数据,就write一次
reduce方法执行完毕后,进行归并排序,得到最终数据文件,输出到磁盘
在这里插入图片描述

2、Map Join

关键技术:
采用DistributedCache,在map阶段缓存小表数据
并且,取消reduce阶段

MapJoinDriver
关键代码:

        // 加载缓存数据
        job.addCacheFile(new URI("file:///D:/input/tablecache/pd.txt"));
        
        //缓存普通文件到Task运行节点。
		//job.addCacheFile(new URI("file:///e:/cache/pd.txt"));
		//如果是集群运行,需要设置HDFS路径
		//job.addCacheFile(new URI("hdfs://hadoop102:8020/cache/pd.txt"));
		
        // Map端Join的逻辑不需要Reduce阶段,设置reduceTask数量为0
        job.setNumReduceTasks(0);
package com.atguigu.mapreduce.mapjoin;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

public class MapJoinDriver {
    public static void main(String[] args) throws IOException, URISyntaxException, ClassNotFoundException, InterruptedException {

        // 1 获取job信息
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        // 2 设置加载jar包路径
        job.setJarByClass(MapJoinDriver.class);
        // 3 关联mapper
        job.setMapperClass(MapJoinMapper.class);
        // 4 设置Map输出KV类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);
        // 5 设置最终输出KV类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);

        // 加载缓存数据
        job.addCacheFile(new URI("file:///D:/input/tablecache/pd.txt"));
        // Map端Join的逻辑不需要Reduce阶段,设置reduceTask数量为0
        job.setNumReduceTasks(0);

        // 6 设置输入输出路径
        FileInputFormat.setInputPaths(job, new Path("D:\\input\\inputtable2"));
        FileOutputFormat.setOutputPath(job, new Path("D:\\hadoop\\output8888"));
        // 7 提交
        boolean b = job.waitForCompletion(true);
        System.exit(b ? 0 : 1);
    }

}

MapJoinMapper
setup方法中,使用driver中配置的小表文件路径,创建流,并将数据缓存起来,供map方法使用。

package com.atguigu.mapreduce.mapjoin;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.HashMap;

public class MapJoinMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
    private HashMap<String, String> pdMap = new HashMap<>();
    private Text outK = new Text();

    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        // 获取缓存的文件,并把文件内容封装到集合 pd.txt
        URI[] cacheFiles = context.getCacheFiles();

        FileSystem fs = FileSystem.get(context.getConfiguration());
        FSDataInputStream fis = fs.open(new Path(cacheFiles[0]));

        // 从流中读取数据
        BufferedReader reader = new BufferedReader(new InputStreamReader(fis, "UTF-8"));

        String line;
        while (StringUtils.isNotEmpty(line = reader.readLine())) {
            // 切割
            String[] fields = line.split("\t");

            // 赋值
            pdMap.put(fields[0], fields[1]);
        }

        // 关流
        IOUtils.closeStream(reader);
    }

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        // 处理 order.txt
        String line = value.toString();

        String[] fields = line.split("\t");

        // 获取pid
        String pname = pdMap.get(fields[1]);

        // 获取订单id 和订单数量
        // 封装
        outK.set(fields[0] + "\t" + pname + "\t" + fields[2]);

        context.write(outK, NullWritable.get());
    }
}

测试

在这里插入图片描述在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/753456.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

卸载 ubuntu-wsl2-systemd-script,使用 WSLg 图形用户界面

目录 全新安装 - 以前没有安装 WSL现有 WSL 安装卸载 ubuntu-wsl2-systemd-script使用 Linux GUI参考链接在 Windows 上使用 Linux 开发环境,最好的做法是使用 WSL2。在 WSL 和早期的 WSL2 版本中,并不支持图形用户界面。因此如果想要使用 GUI 程序,需要自行解决。具体方法可…

游戏AI的创造思路-技术基础-深度学习(3)

继续填坑&#xff0c;本篇介绍深度学习中的长短期记忆网络~~~~ 目录 3.3. 长短期记忆网络&#xff08;LSTM&#xff09; 3.3.1. 什么是长短期记忆网络 3.3.2. 形成过程与运行原理 3.3.2.1. 细胞状态与门结构 3.3.2.2. 遗忘门 3.3.2.3. 输入门 3.3.2.4. 细胞状态更新 3.…

一个分析电路图的好助手

GPT。 最进分析电路图的时候发现GPT支持读取图片功能&#xff1a; 还别说&#xff0c;分析的很有道理。 此外&#xff0c;它还可以分析芯片的引脚功能&#xff0c;辅助电路分析&#xff1a; AB胶&#xff1a;粘的非常牢固&#xff0c;需要A和B两种胶混合使用。

有兄弟对这类区域比较感兴趣,也引起我的好奇,我提取出来给大家看看

要说这类地区&#xff0c;亚洲泰国排第二估计没人敢说第一吧&#xff0c;所以我就提取泰国的数据给大家看看&#xff01; 如图&#xff1a;这些特殊服务地区主要集中在曼谷和芭提雅地区&#xff0c;芭提雅最多&#xff01;看来管理还是不错的&#xff0c;限制在一定范围&#x…

php composer 报错

引用文章&#xff1a; Composer设置国内镜像_composer 国内源-CSDN博客 php composer.phar require --prefer-dist yiidoc/yii2-redactor "*" A connection timeout was encountered. If you intend to run Composer without connecting to the internet, run the …

汉江师范学院2024年成人高等继续教育招生简章

汉江师范学院&#xff0c;这所承载着深厚文化底蕴和学术积淀的高等学府&#xff0c;即将在2024年迎来新一季的成人高等继续教育招生。这不仅是一次知识的盛宴&#xff0c;更是对每一位怀揣梦想、追求进步的成年人的诚挚邀请。 汉江师范学院&#xff0c;以其严谨的教学态度、卓…

老师如何发布学校分班情况?

随着新学期的临近&#xff0c;许多老师可能都会回想起过去那些忙碌的日子&#xff0c;他们不得不面对一堆学生名单&#xff0c;手动进行班级分配&#xff0c;然后逐一通知家长和学生&#xff0c;这种工作不仅繁琐而且容易出错&#xff0c;让人倍感压力。 然而&#xff0c;今天我…

真正的IDEA在线版有多好用

前言 在上一篇文章使用过TitanIDE的VS Code在线版以后&#xff0c;尝到了不少甜头&#xff0c;紧接着又去使用了他的在线版IntelliJ IDEA&#xff0c;同样非常惊艳&#xff0c;不需要任何时间去适应这款云原生开发工具,事不宜迟&#xff0c;马上开整 这才是真正的VS Code在线版…

9种慢慢被淘汰的编程语言...【送源码】

技术不断进步&#xff0c;我们使用的编程语言也不例外。 随着人工智能的兴起以及对编程语言使用的影响&#xff0c;我们更加关注哪些语言将在未来继续流行&#xff0c;哪些会被淘汰。 Python、Java 和 JavaScript 等多功能编程语言正在主导市场&#xff0c;而其他一些语言则逐…

第 1 章SwiftUI 简介

在 2019 年的 WWDC 上,Apple 宣布推出一款名为 SwiftUI 的全新框架,令开发者们大吃一惊。该框架不仅改变了开发 iOS 应用的方式,还代表了自 Swift 首次亮相以来 Apple 开发者生态系统最重大的转变。SwiftUI 适用于所有 Apple 平台,包括 iPadOS、macOS、tvOS 和 watchOS,这…

REST API 中的 HTTP 请求参数

当我们在谈论现代 Web 开发时&#xff0c;REST API (Representational State Transfer Application Programming Interface) 扮演着至关重要的角色。它允许不同的系统以一种简洁且高效的方式进行通信。HTTP 请求参数是控制此通信流程中数据如何被发送和接收的重要组成部分。 H…

加密教程:pdf怎么加密?7个pdf加密技巧任你选(图文详解)

pdf作为一种便携式文档&#xff0c;是展示内容的首选格式&#xff0c;目前也已广泛应用于交换和分享重要等温&#xff0c;例如内部报告、人力资源文件&#xff0c;以及商业提案等包含敏感信息的文档。然而&#xff0c;在如今的数字化时代&#xff0c;随着越来越多的企业将其文档…

mfc140.dll怎么安装?mfc140.dll丢失安装详细解决方法

当电脑出现找不到mfc140.dll丢失问题&#xff0c;我们需要怎么办&#xff1f;怎么解决mfc140.dll丢失问题&#xff1f;mfc140.dll到底是什么&#xff1f;下面我给大家详细介绍与分析&#xff0c;最重要的是mfc140.dll的解决方法&#xff01; 一、文件丢失原因分析 在分析mfc14…

golang 获取系统的主机 CPU 内存 磁盘等信息

golang 获取系统的主机 CPU 内存 磁盘等信息 要求 需要go1.18或更高版本 官方地址&#xff1a;https://github.com/shirou/gopsutil 使用 #下载包 go get github.com/shirou/gopsutil/v3/cpu go get github.com/shirou/gopsutil/v3/disk go get github.com/shirou/gopsuti…

PIP安装Python扩展包超时解决办法-国内镜像

问题描述 使用pip安装Python扩展包经常超时&#xff0c;无法安装 解决方法 使用清华大学镜像&#xff1a; https://pypi.tuna.tsinghua.edu.cn/simple/ 使用方法&#xff1a;以openpyxl为例 原来&#xff1a;pip install openpyxl 现在&#xff1a;pip install -i https…

Git与GitLab的企业实战--尚硅谷git课程

Git与GitLab的企业实战 第1章 Git概述 Git是一个免费的、开源的分布式版本控制系统&#xff0c;可以快速高效地处理从小型到大型的各种项目。 Git易于学习&#xff0c;占地面积小&#xff0c;性能极快。 它具有廉价的本地库&#xff0c;方便的暂存区域和多个工作流分支等特性…

IEEE JSTSP综述:从信号处理领域分析视触觉传感器的研究

触觉传感器是机器人系统的重要组成部分&#xff0c;虽然与视觉相比触觉具有较小的感知面积&#xff0c;但却可以提供机器人与物体交互过程中更加真实的物理信息。 视觉触觉传感是一种分辨率高、成本低的触觉感知技术&#xff0c;被广泛应用于分类、抓取、操作等领域中。近期&a…

什么是指令微调(LLM)

经过大规模数据预训练后的语言模型已经具备较强的模型能力&#xff0c;能够编码丰富的世界知识&#xff0c;但是由于预训练任务形式所限&#xff0c;这些模型更擅长于文本补全&#xff0c;并不适合直接解决具体的任务。 指令微调是相对“预训练”来讲的&#xff0c;预训练的时…

UG_NX11.0之Windows11中安装出错及解决方法

UG_NX11.0之Windows11中安装出错及解决方法 文章目录 UG_NX11.0之Windows11中安装出错及解决方法1. 安装出错2. 解决方法1. 设置以兼容性模式运行2. 配置环境变量 3. 再次安装问题解决4. 安装后可删除配置的环境变量(可选) 1. 安装出错 以管理员身份运行Launch.exe,如下 点击D…

浅谈逻辑控制器之while控制器

浅谈逻辑控制器之while控制器 “While控制器”是一种高级控制结构&#xff0c;它允许用户基于特定条件来循环执行其下的子采样器或控制器&#xff0c;直至该条件不再满足。本文旨在详细介绍While控制器的功能、配置方法、使用场景以及实践示例&#xff0c;帮助测试工程师高效利…