Hadoop3:MapReduce中Reduce阶段自定义OutputFormat逻辑

一、情景描述

我们知道,在MapTask阶段开始时,需要InputFormat来读取数据
而在ReduceTask阶段结束时,将处理完成的数据,输出到磁盘,此时就要用到OutputFormat

在之前的程序中,我们都没有设置过这部分配置
所以,采用的是默认输出格式:TextOutputFormat

在实际工作中,我们的输出不一定是到磁盘,可能是输出到MySQL、HBase

那么,如何实现自定义的OutputFormat
在这里插入图片描述

二、案例

1、源数据

http://www.baidu.com
http://www.google.com
http://cn.bing.com
http://www.atguigu.com
http://www.sohu.com
http://www.baidu.com
http://www.sina.com
http://www.sin2a.com
http://www.baidu.com
http://www.sin2desa.com
http://www.sindsafa.com

2、需求分析

过滤输入的log日志,包含atguigu的网站输出到e:/atguigu.log,不包含atguigu的网站输出到e:/other.log

3、代码实现

LogMapper.java

package com.atguigu.mapreduce.outputformat;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class LogMapper extends Mapper<LongWritable, Text,Text, NullWritable> {

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // http://www.baidu.com
        //http://www.google.com
        // (http://www.google.com, NullWritable)
        // 不做任何处理
        context.write(value, NullWritable.get());
    }
}

LogReducer.java

package com.atguigu.mapreduce.outputformat;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class LogReducer extends Reducer<Text, NullWritable, Text, NullWritable> {

    @Override
    protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {

        // http://www.baidu.com
        // http://www.baidu.com
        // 防止有相同数据,丢数据
        for (NullWritable value : values) {
            context.write(key, NullWritable.get());
        }
    }
}

LogRecordWriter.java

package com.atguigu.mapreduce.outputformat;

import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

import java.io.IOException;

public class LogRecordWriter extends RecordWriter<Text, NullWritable> {

    private  FSDataOutputStream atguiguOut;
    private  FSDataOutputStream otherOut;

    public LogRecordWriter(TaskAttemptContext job) {
        // 创建两条流
        try {
            FileSystem fs = FileSystem.get(job.getConfiguration());

            atguiguOut = fs.create(new Path("D:\\hadoop\\atguigu.log"));

            otherOut = fs.create(new Path("D:\\hadoop\\other.log"));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void write(Text key, NullWritable value) throws IOException, InterruptedException {
        String log = key.toString();

        // 具体写
        if (log.contains("atguigu")){
            atguiguOut.writeBytes(log+"\n");
        }else {
            otherOut.writeBytes(log+"\n");
        }
    }

    @Override
    public void close(TaskAttemptContext context) throws IOException, InterruptedException {
        // 关流
        IOUtils.closeStream(atguiguOut);
        IOUtils.closeStream(otherOut);
    }
}

LogOutputFormat.java

package com.atguigu.mapreduce.outputformat;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class LogOutputFormat extends FileOutputFormat<Text, NullWritable> {
    @Override
    public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {

        LogRecordWriter lrw = new LogRecordWriter(job);

        return lrw;
    }
}

LogDriver.java

package com.atguigu.mapreduce.outputformat;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class LogDriver {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        job.setJarByClass(LogDriver.class);
        job.setMapperClass(LogMapper.class);
        job.setReducerClass(LogReducer.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);

        //设置自定义的outputformat
        job.setOutputFormatClass(LogOutputFormat.class);

        FileInputFormat.setInputPaths(job, new Path("D:\\input\\inputoutputformat"));
        //虽然我们自定义了outputformat,但是因为我们的outputformat继承自fileoutputformat
        //而fileoutputformat要输出一个_SUCCESS文件,所以在这还得指定一个输出目录
        FileOutputFormat.setOutputPath(job, new Path("D:\\hadoop\\output1111"));

        boolean b = job.waitForCompletion(true);
        System.exit(b ? 0 : 1);

    }
}

3、测试

在这里插入图片描述
在这里插入图片描述

三、总结

关键文件:
LogRecordWriter.java
LogOutputFormat.java
LogDriver.java

        //设置自定义的outputformat
        job.setOutputFormatClass(LogOutputFormat.class);

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/729874.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Linux 运维 | 4.从零开始,文件目录特殊权限管理实践

[ 知识是人生的灯塔&#xff0c;只有不断学习&#xff0c;才能照亮前行的道路 ] 0x00 前言简述 描述&#xff1a;前一章&#xff0c;学习了Linux系统中的用户与用户组的管理&#xff0c;此章节我们将继续学习Linux系统中比较基础且重要的文件权限设置与属性管理&#xff0c;在L…

大数据学习-Hive

介绍 分布式 SQL 计算 做数据的统计分析&#xff0c;SQL 是最方便的工具 在大数据中&#xff0c;有很多的统计分析场景&#xff0c;那么 SQL 来处理大数据是非常合适且频繁的 以后可能就是 SQL Boy 了&#xff0c;所以学习前需要有 MySQL 的基础 Hive 的功能 是一个分布式…

【MySQL】索引的原理及其使用

文章目录 什么叫索引减少磁盘IO次数缓存池(Buffer Pool&#xff09;MySQL的页页内目录页目录 正确理解索引结构为什么Innodb的索引是B树结构各种存储引擎支持的索引聚簇索引和非聚簇索引索引类型 关于索引的操作创建主键索引唯一索引的创建普通索引的创建查看索引删除索引 什么…

在React中,如何利用React.memo函数对函数组件进行优化?

React.memo 是 React 的一个高阶组件&#xff0c;用于对函数组件进行性能优化。它通过记忆化&#xff08;memoization&#xff09;来避免不必要的重新渲染。当组件的 props 没有变化时&#xff0c;React.memo 可以防止组件重新渲染&#xff0c;从而提高应用的性能。 使用 Reac…

【Redis】分布式锁基本理论与简单实现

目录 分布式锁解释作用特性实现方式MySQL、Redis、Zookeeper三种方式对比 原理 reids分布式锁原理目的容错redis简单分布式锁实现锁接口实现类下单场景的实现容错场景1解决思路优化代码 容错场景2Lua脚本Redis利用Lua脚本解决多条命令原子性问题 释放锁的业务流程Lua脚本来表示…

开放式耳机怎么选?五款劲爆机型强势PK!2024推荐版!

身为健身达人&#xff0c;我对耳机的要求可不低。开放式耳机让我在健身时既能享受音乐&#xff0c;又能清晰听到教练的指导。它佩戴舒适&#xff0c;不易掉落&#xff0c;而且音质出色&#xff0c;让我沉浸于运动的节奏中。市面上开放式耳机种类繁多&#xff0c;我为大家挑选了…

SD-WAN为什么适合小企业

SD-WAN&#xff08;软件定义广域网&#xff09;是一种革新性的网络技术&#xff0c;通过软件智能管理&#xff0c;实现灵活和高效的网络连接。在数字化转型浪潮中&#xff0c;企业对网络稳定性和性能的要求不断提升&#xff0c;SD-WAN因此受到了广泛关注。对于资源有限的小型企…

qml/c++:基础界面的串口设置逻辑

文章目录 文章介绍效果图本机串口打开从虚拟端串口传数据到本机串口 代码添加serialporthandler类serialporthandler.hserialporthandler.cpp获取串口列表打开串口关闭串口清空按钮接收数据按钮逻辑&#xff1a;打开和关闭串口、弹出信息框、按钮文字改变 main.cpp 文章介绍 上…

怎么采集阿里巴巴1688的商品或商家数据?

怎么使用简数采集器批量采集阿里巴巴1688的商品或商家相关信息呢&#xff1f; 简数采集器暂时不支持采集阿里巴巴1688的相关数据&#xff0c;谢谢。 简数采集器采集网络网页数据非常简单高效&#xff1a;输入要采集的网址&#xff0c;简数智能算法会自动提取出网页上的关键信…

【自动驾驶】ROS小车系统

文章目录 小车组成轮式运动底盘的组成轮式运动底盘的分类轮式机器人的控制方式感知传感器ROS决策主控ROS介绍ROS的坐标系ROS的单位机器人电气连接变压模块运动底盘的电气连接ROS主控与传感器的电气连接ROS主控和STM32控制器两种控制器的功能运动底盘基本组成电池电机控制器与驱…

90V降5V1.5A恒压WT6039

90V降5V1.5A恒压WT6039 WT6039是一款专为宽电压输入范围设计的降压DC-DC转换器芯片&#xff0c;覆盖12V至90V电压。该芯片集成了包括使能控制开关、参考电源、误差放大器、过热保护、限流保护及短路保护等关键功能&#xff0c;确保在各种操作条件下的系统安全与稳定性。WT6039…

Kotlin 中的可见修饰符

Java 和 Kotlin 中的可见修饰符&#xff1a; Java&#xff1a;public、private、protected 和 default(什么都不写)&#xff1b;Kotlin&#xff1a;public、private、protected 和 internal&#xff1b; 比较&#xff1a; 对于 public 修饰符&#xff1a;在 Java 和 Kotlin 中…

NSSCTF-Web题目13

目录 [SWPUCTF 2022 新生赛]js_sign 1、题目 2、知识点 3、思路 [MoeCTF 2021]Do you know HTTP 1、题目 2、知识点 3、思路 [SWPUCTF 2022 新生赛]js_sign 1、题目 2、知识点 base64编码、敲击码&#xff08;tap code&#xff09; 3、思路 页面没有什么&#xff0c;…

CPRI协议理解——控制字内容

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 CPRI协议理解——控制字内容 前言同步标识L1 Inband ProtocolZ130.0Z.194 C&M 通道慢速C&M 通道快速C&M 通道Vendor Specific DataControl AxC Data 后记 前言 …

IIS代理配置-反向代理

前后端分离项目&#xff0c;前端在开发中使用proxy代理解决跨域问题&#xff0c;打包之后无效。 未配置前无法访问 部署环境为windows IIS&#xff0c;要在iis设置反向代理 安装代理模块 需要在iis中实现代理&#xff0c;需要安装Application Request Routing Cache和URL重…

【论文精读】ViM: Out-Of-Distribution with Virtual-logit Matching 使用虚拟分对数匹配的分布外检测

文章目录 一、文章概览&#xff08;一&#xff09;问题来源&#xff08;二&#xff09;文章的主要工作&#xff08;三&#xff09;相关研究 二、动机&#xff1a;Logits 中缺失的信息&#xff08;一&#xff09;logits&#xff08;三&#xff09;基于零空间的 OOD 评分&#xf…

水光互补+短期调度!梯级水光互补系统最大化可消纳电量期望短期优化调度模型程序代码!

前言 构建含风电、光伏的多能互补系统是解决新能源并网灵活性的重要途径。国家发展和改革委员会、能源局《关于推进电力源网荷储一体化和多能互补发展的指导意见》&#xff08;发改能源规〔2021〕280号&#xff09;明确提出了多能互补的实施路径&#xff0c;要充分发挥流域梯级…

Python图像处理库之pyvips使用详解

概要 在图像处理领域,高效和快速的图像处理工具对于开发者来说至关重要。pyvips 是一个强大的 Python 库,基于 libvips 图像处理库,提供高效、快速且节省内存的图像处理能力。pyvips 支持多种图像格式,并且能够执行各种复杂的图像处理任务,如裁剪、缩放、旋转、滤波等。本…

哪里还能申请免费一年期SSL证书?

SSL证书是网络安全的基石之一&#xff0c;它确保了数据传输的安全性和网站身份的真实性。而申请免费一年期SSL证书&#xff0c;则为广大用户提供了一个经济高效的方式来提升网站的安全性。具体介绍如下&#xff1a; 基于不同服务平台的免费SSL证书申请 FreeSSL&#xff1a;此平…

SAFEnet加密机的加密算法和技术

SAFEnet加密机是一款功能强大、安全可靠的加密设备&#xff0c;它在网络安全领域发挥着不可替代的作用。下面将从特点、功能、应用及优势等方面对SAFEnet加密机进行详细介绍。 一、特点 先进的加密算法和技术&#xff1a;SAFEnet加密机采用了最先进的加密算法和技术&#xff0c…